forked from kyegomez/swarms
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathant_swarm.py
207 lines (173 loc) · 6.89 KB
/
ant_swarm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
import os
import time
from typing import List, Dict, Any, Union
from pydantic import BaseModel, Field
from loguru import logger
from swarms import Agent
from swarm_models import OpenAIChat
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Pydantic model to track metadata for each agent
class AgentMetadata(BaseModel):
agent_id: str
start_time: float = Field(default_factory=time.time)
end_time: Union[float, None] = None
task: str
output: Union[str, None] = None
error: Union[str, None] = None
status: str = "running"
# Worker Agent class
class WorkerAgent:
def __init__(
self,
agent_name: str,
system_prompt: str,
model_name: str = "gpt-4o-mini",
):
"""Initialize a Worker agent with its own model, name, and system prompt."""
api_key = os.getenv("OPENAI_API_KEY")
# Create the LLM model for the worker
self.model = OpenAIChat(
openai_api_key=api_key,
model_name=model_name,
temperature=0.1,
)
# Initialize the worker agent with a unique prompt and name
self.agent = Agent(
agent_name=agent_name,
system_prompt=system_prompt,
llm=self.model,
max_loops=1,
autosave=True,
dashboard=False,
verbose=True,
dynamic_temperature_enabled=True,
saved_state_path=f"{agent_name}_state.json",
user_name="swarms_corp",
retry_attempts=1,
context_length=200000,
return_step_meta=False,
)
def perform_task(self, task: str) -> Dict[str, Any]:
"""Perform the task assigned by the Queen and return the result."""
metadata = AgentMetadata(
agent_id=self.agent.agent_name, task=task
)
try:
logger.info(
f"{self.agent.agent_name} is starting task '{task}'."
)
result = self.agent.run(task)
metadata.output = result
metadata.status = "completed"
except Exception as e:
logger.error(
f"{self.agent.agent_name} encountered an error: {e}"
)
metadata.error = str(e)
metadata.status = "failed"
finally:
metadata.end_time = time.time()
return metadata.dict()
# Queen Agent class to manage the workers and dynamically decompose tasks
class QueenAgent:
def __init__(
self,
worker_count: int = 5,
model_name: str = "gpt-4o-mini",
queen_name: str = "Queen-Agent",
queen_prompt: str = "You are the queen of the hive",
):
"""Initialize the Queen agent who assigns tasks to workers.
Args:
worker_count (int): Number of worker agents to manage.
model_name (str): The model used by worker agents.
queen_name (str): The name of the Queen agent.
queen_prompt (str): The system prompt for the Queen agent.
"""
self.queen_name = queen_name
self.queen_prompt = queen_prompt
# Queen agent initialization with a unique prompt for dynamic task decomposition
api_key = os.getenv("OPENAI_API_KEY")
self.queen_model = OpenAIChat(
openai_api_key=api_key,
model_name=model_name,
temperature=0.1,
)
# Initialize worker agents
self.workers = [
WorkerAgent(
agent_name=f"Worker-{i+1}",
system_prompt=f"Worker agent {i+1}, specialized in helping with financial analysis.",
model_name=model_name,
)
for i in range(worker_count)
]
self.worker_metadata: Dict[str, AgentMetadata] = {}
def decompose_task(self, task: str) -> List[str]:
"""Dynamically decompose a task into multiple subtasks using prompting."""
decomposition_prompt = f"""You are a highly efficient problem solver. Given the following task:
'{task}', please decompose this task into 3-5 smaller subtasks, and explain how they can be completed step by step."""
logger.info(
f"{self.queen_name} is generating subtasks using prompting for the task: '{task}'"
)
# Use the queen's model to generate subtasks dynamically
subtasks_output = self.queen_model.run(decomposition_prompt)
logger.info(f"Queen output: {subtasks_output}")
subtasks = subtasks_output.split("\n")
# Filter and clean up subtasks
subtasks = [
subtask.strip() for subtask in subtasks if subtask.strip()
]
return subtasks
def assign_subtasks(self, subtasks: List[str]) -> Dict[str, Any]:
"""Assign subtasks to workers dynamically and collect their results.
Args:
subtasks (List[str]): The list of subtasks to distribute among workers.
Returns:
dict: A dictionary containing results from workers.
"""
logger.info(
f"{self.queen_name} is assigning subtasks to workers."
)
results = {}
for i, subtask in enumerate(subtasks):
# Assign each subtask to a different worker
worker = self.workers[
i % len(self.workers)
] # Circular assignment if more subtasks than workers
worker_result = worker.perform_task(subtask)
results[worker_result["agent_id"]] = worker_result
self.worker_metadata[worker_result["agent_id"]] = (
worker_result
)
return results
def gather_results(self) -> Dict[str, Any]:
"""Gather all results from the worker agents."""
return self.worker_metadata
def run_swarm(self, task: str) -> Dict[str, Any]:
"""Run the swarm by decomposing a task into subtasks, assigning them to workers, and gathering results."""
logger.info(f"{self.queen_name} is initializing the swarm.")
# Decompose the task into subtasks using prompting
subtasks = self.decompose_task(task)
logger.info(f"Subtasks generated by the Queen: {subtasks}")
# Assign subtasks to workers
results = self.assign_subtasks(subtasks)
logger.info(
f"{self.queen_name} has collected results from all workers."
)
return results
# Example usage
if __name__ == "__main__":
# Queen oversees 3 worker agents with a custom system prompt
queen = QueenAgent(
worker_count=3,
queen_name="Queen-Overseer",
queen_prompt="You are the overseer queen of a financial analysis swarm. Decompose and distribute tasks wisely.",
)
# Task for the swarm to execute
task = "Analyze the best strategies to establish a ROTH IRA and maximize tax savings."
# Run the swarm on the task and gather results
final_results = queen.run_swarm(task)
print("Final Swarm Results:", final_results)