forked from geekan/MetaGPT
-
Notifications
You must be signed in to change notification settings - Fork 0
/
environment.py
66 lines (55 loc) · 1.93 KB
/
environment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/5/11 22:12
@Author : alexanderwu
@File : environment.py
"""
import asyncio
from queue import Queue
from typing import Iterable
from metagpt.manager import Manager
from metagpt.roles import Role
from metagpt.schema import Message
from metagpt.memory import Memory
class Environment:
"""环境,承载一批角色,角色可以向环境发布消息,可以被其他角色观察到"""
def __init__(self):
self.roles: dict[str, Role] = {}
self.message_queue = Queue()
self.memory = Memory()
self.history = ''
def add_role(self, role: Role):
"""增加一个在当前环境的Role"""
role.set_env(self)
self.roles[role.profile] = role
def add_roles(self, roles: Iterable[Role]):
"""增加一批在当前环境的Role"""
for role in roles:
self.add_role(role)
def set_manager(self, manager):
"""设置一个当前环境的管理员"""
self.manager = manager
def publish_message(self, message: Message):
"""向当前环境发布信息"""
self.message_queue.put(message)
self.memory.add(message)
self.history += f"\n{message}"
async def run(self, k=1):
"""处理一次所有Role的运行"""
# while not self.message_queue.empty():
# message = self.message_queue.get()
# rsp = await self.manager.handle(message, self)
# self.message_queue.put(rsp)
for _ in range(k):
futures = []
for role in self.roles.values():
future = role.run()
futures.append(future)
await asyncio.gather(*futures)
def get_roles(self) -> dict[str, Role]:
"""获得环境内的所有Role"""
return self.roles
def get_role(self, name: str) -> Role:
"""获得环境内的指定Role"""
return self.roles.get(name, None)