-
Notifications
You must be signed in to change notification settings - Fork 166
/
Copy pathinstruments.py
177 lines (135 loc) · 5.24 KB
/
instruments.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
from postgres_da_ai_agent.modules.db import PostgresManager
from postgres_da_ai_agent.modules import file
import os
BASE_DIR = os.environ.get("BASE_DIR", "./agent_results")
class AgentInstruments:
"""
Base class for multli-agent instruments that are tools, state, and functions that an agent can use across the lifecycle of conversations
"""
def __init__(self) -> None:
self.session_id = None
self.messages = []
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
pass
def sync_messages(self, messages: list):
"""
Syncs messages with the orchestrator
"""
raise NotImplementedError
def make_agent_chat_file(self, team_name: str):
return os.path.join(self.root_dir, f"agent_chats_{team_name}.json")
def make_agent_cost_file(self, team_name: str):
return os.path.join(self.root_dir, f"agent_cost_{team_name}.json")
@property
def root_dir(self):
return os.path.join(BASE_DIR, self.session_id)
class PostgresAgentInstruments(AgentInstruments):
"""
Unified Toolset for the Postgres Data Analytics Multi-Agent System
Advantages:
- All agents have access to the same state and functions
- Gives agent functions awareness of changing context
- Clear and concise capabilities for agents
- Clean database connection management
Guidelines:
- Agent Functions should not call other agent functions directly
- Instead Agent Functions should call external lower level modules
- Prefer 1 to 1 mapping of agents and their functions
- The state lifecycle lives between all agent orchestrations
"""
def __init__(self, db_url: str, session_id: str) -> None:
super().__init__()
self.db_url = db_url
self.db = None
self.session_id = session_id
self.messages = []
self.innovation_index = 0
def __enter__(self):
"""
Support entering the 'with' statement
"""
self.reset_files()
self.db = PostgresManager()
self.db.connect_with_url(self.db_url)
return self, self.db
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Support exiting the 'with' statement
"""
self.db.close()
def sync_messages(self, messages: list):
"""
Syncs messages with the orchestrator
"""
self.messages = messages
def reset_files(self):
"""
Clear everything in the root_dir
"""
# if it does not exist create it
if not os.path.exists(self.root_dir):
os.makedirs(self.root_dir)
for fname in os.listdir(self.root_dir):
os.remove(os.path.join(self.root_dir, fname))
def get_file_path(self, fname: str):
"""
Get the full path to a file in the root_dir
"""
return os.path.join(self.root_dir, fname)
# -------------------------- Agent Properties -------------------------- #
@property
def run_sql_results_file(self):
return self.get_file_path("run_sql_results.json")
@property
def sql_query_file(self):
return self.get_file_path("sql_query.sql")
# -------------------------- Agent Functions -------------------------- #
def run_sql(self, sql: str) -> str:
"""
Run a SQL query against the postgres database
"""
results_as_json = self.db.run_sql(sql)
fname = self.run_sql_results_file
# dump these results to a file
with open(fname, "w") as f:
f.write(results_as_json)
with open(self.sql_query_file, "w") as f:
f.write(sql)
return "Successfully delivered results to json file"
def validate_run_sql(self):
"""
validate that the run_sql results file exists and has content
"""
fname = self.run_sql_results_file
with open(fname, "r") as f:
content = f.read()
if not content:
return False, f"File {fname} is empty"
return True, ""
def write_file(self, content: str):
fname = self.get_file_path(f"write_file.txt")
return file.write_file(fname, content)
def write_json_file(self, json_str: str):
fname = self.get_file_path(f"write_json_file.json")
return file.write_json_file(fname, json_str)
def write_yml_file(self, json_str: str):
fname = self.get_file_path(f"write_yml_file.yml")
return file.write_yml_file(fname, json_str)
def write_innovation_file(self, content: str):
fname = self.get_file_path(f"{self.innovation_index}_innovation_file.json")
file.write_file(fname, content)
self.innovation_index += 1
return f"Successfully wrote innovation file. You can check my work."
def validate_innovation_files(self):
"""
loop from 0 to innovation_index and verify file exists with content
"""
for i in range(self.innovation_index):
fname = self.get_file_path(f"{i}_innovation_file.json")
with open(fname, "r") as f:
content = f.read()
if not content:
return False, f"File {fname} is empty"
return True, ""