forked from zeakai/wechat-ai
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtoken_bucket.py
45 lines (38 loc) · 1.41 KB
/
token_bucket.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import threading
import time
class TokenBucket:
def __init__(self, tpm, timeout=None):
self.capacity = int(tpm) # 令牌桶容量
self.tokens = 0 # 初始令牌数为0
self.rate = int(tpm) / 60 # 令牌每秒生成速率
self.timeout = timeout # 等待令牌超时时间
self.cond = threading.Condition() # 条件变量
self.is_running = True
# 开启令牌生成线程
threading.Thread(target=self._generate_tokens).start()
def _generate_tokens(self):
"""生成令牌"""
while self.is_running:
with self.cond:
if self.tokens < self.capacity:
self.tokens += 1
self.cond.notify() # 通知获取令牌的线程
time.sleep(1 / self.rate)
def get_token(self):
"""获取令牌"""
with self.cond:
while self.tokens <= 0:
flag = self.cond.wait(self.timeout)
if not flag: # 超时
return False
self.tokens -= 1
return True
def close(self):
self.is_running = False
if __name__ == "__main__":
token_bucket = TokenBucket(20, None) # 创建一个每分钟生产20个tokens的令牌桶
# token_bucket = TokenBucket(20, 0.1)
for i in range(3):
if token_bucket.get_token():
print(f"第{i+1}次请求成功")
token_bucket.close()