You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
importbase64importosimportreimportstreamlitasstfromconfigimportMODEL_CHOICES, MODEL_TOKEN_LIMITSfromutils.auth_utilsimportget_api_keyfromutils.ui_utilsimportget_llm_provider, update_discussion_and_whiteboarddefagent_button_callback(agent_index):
# Callback function to handle state update and logic executiondefcallback():
st.session_state['selected_agent_index'] =agent_indexagent=st.session_state.agents[agent_index]
agent_name=agent['config']['name'] if'config'inagentand'name'inagent['config'] else''st.session_state['form_agent_name'] =agent_namest.session_state['form_agent_description'] =agent['description'] if'description'inagentelse''# Directly call process_agent_interaction here if appropriateprocess_agent_interaction(agent_index)
returncallbackdefconstruct_request(agent_name, description, user_request, user_input, rephrased_request, reference_url, tool_results):
request=f"Act as the {agent_name} who {description}."ifuser_request:
request+=f" Original request was: {user_request}."ifrephrased_request:
request+=f" You are helping a team work on satisfying {rephrased_request}."ifuser_input:
request+=f" Additional input: {user_input}."ifreference_urlandreference_urlinst.session_state.reference_html:
html_content=st.session_state.reference_html[reference_url]
request+=f" Reference URL content: {html_content}."ifst.session_state.discussion:
request+=f" The discussion so far has been {st.session_state.discussion[-50000:]}."iftool_results:
request+=f" tool results: {tool_results}."returnrequestdefdisplay_agents():
if"agents"inst.session_stateandst.session_state.agentsandlen(st.session_state.agents) >0:
st.sidebar.title("Your Agents")
st.sidebar.subheader("Click to interact")
display_agent_buttons(st.session_state.agents)
ifst.session_state.get('show_edit'):
edit_index=st.session_state.get('edit_agent_index')
ifedit_indexisnotNoneand0<=edit_index<len(st.session_state.agents):
agent=st.session_state.agents[edit_index]
display_agent_edit_form(agent, edit_index)
else:
st.sidebar.warning("Invalid agent selected for editing.")
else:
st.sidebar.warning(f"No agents have yet been created. Please enter a new request.")
st.sidebar.warning(f"NOTE: GPT models can only be used locally, not in the online demo.")
st.sidebar.warning(f"ALSO: If no agents are created, do a hard reset (CTL-F5) and try switching models. LLM results can be unpredictable.")
st.sidebar.warning(f"SOURCE: https://github.com/jgravelle/AutoGroq\n\r\n\r https://j.gravelle.us\n\r\n\r DISCORD: https://discord.gg/DXjFPX84gs \n\r\n\r YouTube: https://www.youtube.com/playlist?list=PLPu97iZ5SLTsGX3WWJjQ5GNHy7ZX66ryP")
defdisplay_agent_buttons(agents):
forindex, agentinenumerate(agents):
agent_name=agent["config"]["name"] ifagent["config"].get("name") elsef"Unnamed Agent {index+1}"col1, col2=st.sidebar.columns([1, 4])
withcol1:
gear_icon="⚙️"# Unicode character for gear iconifst.button(
gear_icon,
key=f"gear_{index}",
help="Edit Agent"# Add the tooltip text
):
st.session_state['edit_agent_index'] =indexst.session_state['show_edit'] =Truewithcol2:
if"next_agent"inst.session_stateandst.session_state.next_agent==agent_name:
button_style=""" <style> div[data-testid*="stButton"] > button[kind="secondary"] { background-color: green !important; color: white !important; } </style> """st.markdown(button_style, unsafe_allow_html=True)
st.button(agent_name, key=f"agent_{index}", on_click=agent_button_callback(index))
defdisplay_agent_edit_form(agent, edit_index):
withst.expander(f"Edit Properties of {agent['config'].get('name', '')}", expanded=True):
col1, col2=st.columns([4, 1])
withcol1:
new_name=st.text_input("Name", value=agent['config'].get('name', ''), key=f"name_{edit_index}")
withcol2:
container=st.container()
ifcontainer.button("X", key=f"delete_{edit_index}"):
ifst.session_state.get(f"delete_confirmed_{edit_index}", False):
st.session_state.agents.pop(edit_index)
st.session_state['show_edit'] =Falsest.experimental_rerun()
else:
st.session_state[f"delete_confirmed_{edit_index}"] =Truest.experimental_rerun()
ifst.session_state.get(f"delete_confirmed_{edit_index}", False):
ifcontainer.button("Confirm Deletion", key=f"confirm_delete_{edit_index}"):
st.session_state.agents.pop(edit_index)
st.session_state['show_edit'] =Falsedelst.session_state[f"delete_confirmed_{edit_index}"]
st.experimental_rerun()
ifcontainer.button("Cancel", key=f"cancel_delete_{edit_index}"):
delst.session_state[f"delete_confirmed_{edit_index}"]
st.experimental_rerun()
description_value=agent.get('new_description', agent.get('description', ''))
col1, col2=st.columns([3, 1])
withcol1:
selected_model=st.selectbox("Model", options=list(MODEL_CHOICES.keys()), index=list(MODEL_CHOICES.keys()).index(agent['config']['llm_config']['config_list'][0]['model']), key=f"model_select_{edit_index}")
withcol2:
ifst.button("Set for ALL agents", key=f"set_all_agents_{edit_index}"):
foragentinst.session_state.agents:
agent['config']['llm_config']['config_list'][0]['model'] =selected_modelagent['config']['llm_config']['max_tokens'] =MODEL_CHOICES[selected_model]
st.experimental_rerun()
new_description=st.text_area("Description", value=description_value, key=f"desc_{edit_index}")
col1, col2=st.columns([3, 1])
withcol1:
ifst.button("Update User Description", key=f"regenerate_{edit_index}"):
print(f"Regenerate button clicked for agent {edit_index}")
new_description=regenerate_agent_description(agent)
ifnew_description:
agent['new_description'] =new_descriptionprint(f"Description regenerated for {agent['config']['name']}: {new_description}")
st.session_state[f"regenerate_description_{edit_index}"] =Truedescription_value=new_descriptionst.experimental_rerun()
else:
print(f"Failed to regenerate description for {agent['config']['name']}")
withcol2:
ifst.button("Save", key=f"save_{edit_index}"):
agent['config']['name'] =new_nameagent['description'] =agent.get('new_description', new_description)
ifselected_model!='default':
agent['config']['llm_config']['config_list'][0]['model'] =selected_modelagent['config']['llm_config']['max_tokens'] =MODEL_CHOICES[selected_model]
else:
agent['config']['llm_config']['config_list'][0]['model'] =st.session_state.modelagent['config']['llm_config']['max_tokens'] =MODEL_TOKEN_LIMITS.get(st.session_state.model, 4096)
st.session_state['show_edit'] =Falseif'edit_agent_index'inst.session_state:
delst.session_state['edit_agent_index']
if'new_description'inagent:
delagent['new_description']
st.session_state.agents[edit_index] =agentdefdownload_agent_file(expert_name):
# Format the expert_nameformatted_expert_name=re.sub(r'[^a-zA-Z0-9\s]', '', expert_name) # Remove non-alphanumeric charactersformatted_expert_name=formatted_expert_name.lower().replace(' ', '_') # Convert to lowercase and replace spaces with underscores# Get the full path to the agent JSON fileagents_dir=os.path.abspath(os.path.join(os.path.dirname(__file__), "agents"))
json_file=os.path.join(agents_dir, f"{formatted_expert_name}.json")
# Check if the file existsifos.path.exists(json_file):
# Read the file contentwithopen(json_file, "r") asf:
file_content=f.read()
# Encode the file content as base64b64_content=base64.b64encode(file_content.encode()).decode()
# Create a download linkhref=f'<a href="data:application/json;base64,{b64_content}" download="{formatted_expert_name}.json">Download {formatted_expert_name}.json</a>'st.markdown(href, unsafe_allow_html=True)
else:
st.error(f"File not found: {json_file}")
defprocess_agent_interaction(agent_index):
agent_name, description=retrieve_agent_information(agent_index)
user_request=st.session_state.get('user_request', '')
user_input=st.session_state.get('user_input', '')
rephrased_request=st.session_state.get('rephrased_request', '')
reference_url=st.session_state.get('reference_url', '')
# Execute associated tools for the agentagent=st.session_state.agents[agent_index]
agent_tools=agent.get("tools", [])
tool_results= {}
fortool_nameinagent_tools:
iftool_nameinst.session_state.tool_functions:
tool_function=st.session_state.tool_functions[tool_name]
tool_result=tool_function()
tool_results[tool_name] =tool_resultrequest=construct_request(agent_name, description, user_request, user_input, rephrased_request, reference_url, tool_results)
print(f"Request: {request}")
# Use the dynamic LLM provider to send the requestapi_key=get_api_key()
llm_provider=get_llm_provider(api_key=api_key)
llm_request_data= {
"model": st.session_state.model,
"temperature": st.session_state.get('temperature', 0.1),
"max_tokens": st.session_state.max_tokens,
"top_p": 1,
"stop": "TERMINATE",
"messages": [
{
"role": "user",
"content": request
}
]
}
response=llm_provider.send_request(llm_request_data)
ifresponse.status_code==200:
response_data=llm_provider.process_response(response)
if"choices"inresponse_dataandresponse_data["choices"]:
content=response_data["choices"][0]["message"]["content"]
update_discussion_and_whiteboard(agent_name, content, user_input)
st.session_state['form_agent_name'] =agent_namest.session_state['form_agent_description'] =descriptionst.session_state['selected_agent_index'] =agent_indexdefregenerate_agent_description(agent):
agent_name=agent['config']['name']
print(f"agent_name: {agent_name}")
agent_description=agent['description']
print(f"agent_description: {agent_description}")
user_request=st.session_state.get('user_request', '')
print(f"user_request: {user_request}")
discussion_history=st.session_state.get('discussion_history', '')
prompt=f""" You are an AI assistant helping to improve an agent's description. The agent's current details are: Name: {agent_name} Description: {agent_description} The current user request is: {user_request} The discussion history so far is: {discussion_history} Please generate a revised description for this agent that defines it in the best manner possible to address the current user request, taking into account the discussion thus far. Return only the revised description, written in the third-person, without any additional commentary or narrative. It is imperative that you return ONLY the text of the new description written in the third-person. No preamble, no narrative, no superfluous commentary whatsoever. Just the description, written in the third-person, unlabeled, please. You will have been successful if your reply is thorough, comprehensive, concise, written in the third-person, and adherent to all of these instructions. """print(f"regenerate_agent_description called with agent_name: {agent_name}")
print(f"regenerate_agent_description called with prompt: {prompt}")
api_key=get_api_key()
llm_provider=get_llm_provider(api_key=api_key)
llm_request_data= {
"model": st.session_state.model,
"temperature": st.session_state.get('temperature', 0.1),
"max_tokens": st.session_state.max_tokens,
"top_p": 1,
"stop": "TERMINATE",
"messages": [
{
"role": "user",
"content": prompt
}
]
}
response=llm_provider.send_request(llm_request_data)
ifresponse.status_code==200:
response_data=llm_provider.process_response(response)
if"choices"inresponse_dataandresponse_data["choices"]:
content=response_data["choices"][0]["message"]["content"]
returncontent.strip()
returnNonedefretrieve_agent_information(agent_index):
agent=st.session_state.agents[agent_index]
agent_name=agent["config"]["name"]
description=agent["description"]
returnagent_name, descriptiondefsend_request(agent_name, request):
api_key=get_api_key()
llm_provider=get_llm_provider(api_key=api_key)
response=llm_provider.send_request(request)
returnresponse
AutoGroq\config.py
importos# Get user home directoryhome_dir=os.path.expanduser("~")
default_db_path=f'{home_dir}/.autogenstudio/database.sqlite'# DebugDEFAULT_DEBUG=False# Default configurationsDEFAULT_LLM_PROVIDER="groq"DEFAULT_GROQ_API_URL="https://api.groq.com/openai/v1/chat/completions"DEFAULT_LMSTUDIO_API_URL="http://localhost:1234/v1/chat/completions"DEFAULT_OLLAMA_API_URL="http://127.0.0.1:11434/api/generate"DEFAULT_OPENAI_API_KEY=NoneDEFAULT_OPENAI_API_URL="https://api.openai.com/v1/chat/completions"# Try to import user-specific configurations from config_local.pytry:
fromconfig_localimport*exceptImportError:
pass# Set the configurations using the user-specific values if available, otherwise use the defaultsDEBUG=locals().get('DEBUG', DEFAULT_DEBUG)
LLM_PROVIDER=locals().get('LLM_PROVIDER', DEFAULT_LLM_PROVIDER)
GROQ_API_URL=locals().get('GROQ_API_URL', DEFAULT_GROQ_API_URL)
LMSTUDIO_API_URL=locals().get('LMSTUDIO_API_URL', DEFAULT_LMSTUDIO_API_URL)
OLLAMA_API_URL=locals().get('OLLAMA_API_URL', DEFAULT_OLLAMA_API_URL)
OPENAI_API_KEY=locals().get('OPENAI_API_KEY', DEFAULT_OPENAI_API_KEY)
OPENAI_API_URL=locals().get('OPENAI_API_URL', DEFAULT_OPENAI_API_URL)
API_KEY_NAMES= {
"groq": "GROQ_API_KEY",
"lmstudio": None,
"ollama": None,
"openai": "OPENAI_API_KEY",
# Add other LLM providers and their respective API key names here
}
# Retry settingsMAX_RETRIES=3RETRY_DELAY=2# in secondsRETRY_TOKEN_LIMIT=5000# Model configurationsifLLM_PROVIDER=="groq":
API_URL=GROQ_API_URLMODEL_TOKEN_LIMITS= {
'mixtral-8x7b-32768': 32768,
'llama3-70b-8192': 8192,
'llama3-8b-8192': 8192,
'gemma-7b-it': 8192,
}
elifLLM_PROVIDER=="lmstudio":
API_URL=LMSTUDIO_API_URLMODEL_TOKEN_LIMITS= {
'instructlab/granite-7b-lab-GGUF': 2048,
'MaziyarPanahi/Codestral-22B-v0.1-GGUF': 32768,
}
elifLLM_PROVIDER=="openai":
API_URL=OPENAI_API_URLMODEL_TOKEN_LIMITS= {
'gpt-4o': 4096,
}
elifLLM_PROVIDER=="ollama":
API_URL=OLLAMA_API_URLMODEL_TOKEN_LIMITS= {
'llama3': 8192,
}
else:
MODEL_TOKEN_LIMITS= {}
# Database path# FRAMEWORK_DB_PATH="/path/to/custom/database.sqlite"FRAMEWORK_DB_PATH=os.environ.get('FRAMEWORK_DB_PATH', default_db_path)
MODEL_CHOICES= {
'default': None,
'gemma-7b-it': 8192,
'gpt-4o': 4096,
'instructlab/granite-7b-lab-GGUF': 2048,
'MaziyarPanahi/Codestral-22B-v0.1-GGUF': 32768,
'llama3': 8192,
'llama3-70b-8192': 8192,
'llama3-8b-8192': 8192,
'mixtral-8x7b-32768': 32768
}
defcreate_project_manager_prompt(rephrased_text):
returnf""" You are a Project Manager tasked with creating a comprehensive project outline and describing the perfect team of experts that should be created to work on the following project:{rephrased_text} Please provide a detailed project outline, including a single block of key deliverables listed in logical order of accomplishment. Label the deliverables with "Deliverables:" or "Key Deliverables" and list them in a clear and concise manner. Also, describe the ideal team of experts required for this project, including their roles, and responsibilities. Your analysis shall consider the complexity, domain, and specific needs of the request to assemble a multidisciplinary team of experts. The team should be as small as possible while still providing a complete and comprehensive talent pool able to properly address the user's request. Each recommended agent shall come with a defined role, and a brief but thorough description of their expertise. Return your response in the following format: Project Outline: [Detailed project outline] Team of Experts: [Description of the ideal team of experts] """defget_agent_prompt(rephrased_request):
returnf""" Based on the following user request, please create a detailed and comprehensive description of an AI agent that can effectively assist with the request: User Request: "{rephrased_request}" Provide a clear and concise description of the agent's role, capabilities, and expertise. The description should be efficiently written in a concise, professional and engaging manner, highlighting the agent's ability to understand and respond to the request efficiently. Agent Description: """defget_agents_prompt():
returnf""" This agent is an expert system designed to format the JSON describing each member of the team of AI agents specifically listed in this provided text: $text. Fulfill the following guidelines without ever explicitly stating them in this agent's response. Guidelines: 1. **Agent Roles**: Clearly transcribe the titles of each agent listed in the provided text by iterating through the 'Team of Experts:' section of the provided text. Transcribe the info for those specific agents. Do not create new agents. 2. **Expertise Description**: Provide a brief but thorough description of each agent's expertise based upon the provided text. Do not create new agents. 3. **Format**: Return the results in JSON format with values labeled as expert_name, and description. 'expert_name' should be the agent's title, not their given or proper name. ALWAYS and ONLY return the results in the following JSON format, with no other narrative, commentary, synopsis, or superfluous text of any kind: [ {{ "expert_name": "agent_title", "description": "agent_description", }} ] This agent will only have been successful if it has returned the results in the above format and followed these guidelines precisely by transcribing the provided text and returning the results in JSON format without any other narrative, commentary, synopsis, or superfluous text of any kind, and taking care to only transcribe the agents from the provided text without creating new agents. """# Contributed by ScruffyNerfdefget_generate_tool_prompt(rephrased_tool_request):
returnf''' Based on the rephrased tool request below, please do the following: 1. Do step-by-step reasoning and think to better understand the request. 2. Code the best Autogen Studio Python tool as per the request as a [tool_name].py file. 3. Return only the tool file, no commentary, intro, or other extra text. If there ARE any non-code lines, please pre-pend them with a '#' symbol to comment them out. 4. A proper tool will have these parts: a. Imports (import libraries needed for the tool) b. Function definition AND docstrings (this helps the LLM understand what the function does and how to use it) c. Function body (the actual code that implements the function) d. (optional) Example usage - ALWAYS commented out Here is an example of a well formatted tool: # Tool filename: save_file_to_disk.py # Import necessary module(s) import os def save_file_to_disk(contents, file_name): # docstrings """ Saves the given contents to a file with the given file name. Parameters: contents (str): The string contents to save to the file. file_name (str): The name of the file, including its extension. Returns: str: A message indicating the success of the operation. """ # Body of tool # Ensure the directory exists; create it if it doesn't directory = os.path.dirname(file_name) if directory and not os.path.exists(directory): os.makedirs(directory) # Write the contents to the file with open(file_name, 'w') as file: file.write(contents) return f"File file_name has been saved successfully." # Example usage: # contents_to_save = "Hello, world!" # file_name = "example.txt" # print(save_file_to_disk(contents_to_save, file_name)) Rephrased tool request: "{rephrased_tool_request}" '''defget_moderator_prompt(discussion_history, goal, last_comment, last_speaker,team_members_str):
returnf""" This agent is our Moderator Bot. It's goal is to mediate the conversation between a team of AI agents in a manner that persuades them to act in the most expeditious and thorough manner to accomplish their goal. This will entail considering the user's stated goal, the conversation thus far, the descriptions of all the available agent/experts in the current team, the last speaker, and their remark. Based upon a holistic analysis of all the facts at hand, use logic and reasoning to decide who should speak next. Then draft a prompt directed at that agent that persuades them to act in the most expeditious and thorough manner toward helping this team of agents accomplish their goal.\n\nTheir goal is: {goal}.\nThe last speaker was {last_speaker}, who said: {last_comment}\nHere is the current conversational discussion history: {discussion_history}\n And here are the team members and their descriptions:\n{team_members_str}\n\n This agent's response should be JUST the requested prompt addressed to the next agent, and should not contain any introduction, narrative, or any other superfluous text whatsoever. """defget_rephrased_user_prompt(user_request):
returnf"""THis agent is a professional prompt engineer and refactor the following user request into an optimized prompt. This agent's goal is to rephrase the request with a focus on the satisfying all following the criteria without explicitly stating them: 1. Clarity: Ensure the prompt is clear and unambiguous. 2. Specific Instructions: Provide detailed steps or guidelines. 3. Context: Include necessary background information. 4. Structure: Organize the prompt logically. 5. Language: Use concise and precise language. 6. Examples: Offer examples to illustrate the desired output. 7. Constraints: Define any limits or guidelines. 8. Engagement: Make the prompt engaging and interesting. 9. Feedback Mechanism: Suggest a way to improve or iterate on the response. Do NOT reply with a direct response to these instructions OR the original user request. Instead, rephrase the user's request as a well-structured prompt, and return ONLY that rephrased prompt. Do not preface the rephrased prompt with any other text or superfluous narrative. Do not enclose the rephrased prompt in quotes. This agent will be successful only if it returns a well-formed rephrased prompt ready for submission as an LLM request. User request: "{user_request}" Rephrased: """
AutoGroq\cli\create_agent.py
importargparseimportdatetimeimportjsonimportosimportsys# Add the root directory to the Python module search pathsys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
fromconfigimportMODEL_TOKEN_LIMITSfrompromptsimportget_agent_promptfromutils.api_utilsimportget_llm_providerfromutils.agent_utilsimportcreate_agent_datafromutils.auth_utilsimportget_api_keyfromutils.file_utilsimportsanitize_textdefcreate_agent(request, provider, model, temperature, max_tokens, output_file):
# Get the API key and providerapi_key=get_api_key()
llm_provider=get_llm_provider(api_key=api_key)
# Generate the prompt using get_agent_promptprompt=get_agent_prompt(request)
# Adjust the token limit based on the selected modelmax_tokens=MODEL_TOKEN_LIMITS.get(provider, {}).get(model, 4096)
# Make the request to the LLM APIllm_request_data= {
"model": model,
"temperature": temperature,
"max_tokens": max_tokens,
"messages": [{"role": "user", "content": prompt}],
}
response=llm_provider.send_request(llm_request_data)
ifresponse.status_code!=200:
print(f"Error: Received status code {response.status_code}")
print(response.text)
returnresponse_data=response.json()
if'choices'notinresponse_dataorlen(response_data['choices']) ==0:
print("Error: 'choices' not found in the response data or it's empty")
print(json.dumps(response_data, indent=2))
returnagent_description=response_data['choices'][0]['message']['content'].strip()
agent_data= {
"type": "assistant",
"config": {
"name": request,
"llm_config": {
"config_list": [
{
"user_id": "default",
"timestamp": datetime.datetime.now().isoformat(),
"model": model,
"base_url": None,
"api_type": None,
"api_version": None,
"description": "OpenAI model configuration"
}
],
"temperature": temperature,
"cache_seed": None,
"timeout": None,
"max_tokens": max_tokens,
"extra_body": None
},
"human_input_mode": "NEVER",
"max_consecutive_auto_reply": 8,
"system_message": f"You are a helpful assistant that can act as {request} who {sanitize_text(agent_description)}.",
"is_termination_msg": None,
"code_execution_config": None,
"default_auto_reply": "",
"description": agent_description# Ensure the description key is present
},
"timestamp": datetime.datetime.now().isoformat(),
"user_id": "default",
"tools": []
}
# Debug print to verify agent_dataprint("Agent Data:", json.dumps(agent_data, indent=2))
# Create the appropriate agent dataautogen_agent_data, crewai_agent_data=create_agent_data(agent_data)
# Save the agent data to the output filewithopen(output_file, "w") asf:
json.dump(autogen_agent_data, f, indent=2)
print(f"Agent created successfully. Output saved to: {output_file}")
if__name__=="__main__":
parser=argparse.ArgumentParser(description="Create an agent based on a user request.")
parser.add_argument("--request", required=True, help="The user request for creating the agent.")
parser.add_argument("--model", default="mixtral-8x7b-32768", help="The model to use for the agent.")
parser.add_argument("--temperature", type=float, default=0.5, help="The temperature value for the agent.")
parser.add_argument("--max_tokens", type=int, default=32768, help="The maximum number of tokens for the agent.")
parser.add_argument("--agent_type", default="autogen", choices=["autogen", "crewai"], help="The type of agent to create.")
parser.add_argument("--output", default="agent.json", help="The output file path for the agent JSON.")
parser.add_argument("--provider", default="groq", help="The LLM provider to use (e.g., 'openai', 'anthropic').")
args=parser.parse_args()
create_agent(args.request, args.provider, args.model, args.temperature, args.max_tokens, args.output)
AutoGroq\cli\rephrase_prompt.py
importargparseimportosimportsys# Add the root directory to the Python module search pathsys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
fromconfigimportMODEL_TOKEN_LIMITS, LLM_PROVIDERfromutils.api_utilsimportget_llm_providerfromutils.auth_utilsimportget_api_keyfromutils.ui_utilsimportrephrase_promptdefrephrase_prompt_cli(prompt, provider, model, temperature, max_tokens):
# Get the API keyapi_key=get_api_key()
# Use the provider specified in the CLI argumentsllm_provider=get_llm_provider(api_key=api_key, provider=provider)
# Override the model and max_tokens if specified in the command-line argumentsmodel_to_use=modelifmodelelseprovidermax_tokens_to_use=MODEL_TOKEN_LIMITS.get(model_to_use, max_tokens)
rephrased_prompt=rephrase_prompt(prompt, model_to_use, max_tokens_to_use, llm_provider=llm_provider, provider=provider)
ifrephrased_prompt:
print(f"Rephrased Prompt: {rephrased_prompt}")
else:
print("Error: Failed to rephrase the prompt.")
if__name__=="__main__":
parser=argparse.ArgumentParser(description="Rephrase a user prompt.")
parser.add_argument("--prompt", required=True, help="The user prompt to rephrase.")
parser.add_argument("--model", default=None, help="The model to use for rephrasing.")
parser.add_argument("--temperature", type=float, default=0.5, help="The temperature value for rephrasing.")
parser.add_argument("--max_tokens", type=int, default=32768, help="The maximum number of tokens for rephrasing.")
parser.add_argument("--provider", default=None, help="The LLM provider to use (e.g., 'openai', 'anthropic').")
args=parser.parse_args()
rephrase_prompt_cli(args.prompt, args.provider, args.model, args.temperature, args.max_tokens)
# Thanks to MADTANK: https://github.com/madtank# README: https://github.com/madtank/autogenstudio-skills/blob/main/rag/README.mdimportargparseimportbs4importcsvimportjsonimportosimportpickleimportreimporttracebackfromtypingimportDict, List, Literal, Tupletry:
importtiktokenfromlangchain_community.embeddingsimportHuggingFaceEmbeddingsfromlangchain_community.vectorstoresimportFAISSexceptImportError:
raiseImportError("Please install the dependencies first.")
defchunk_str_overlap(
s: str,
separator: chr="\n",
num_tokens: int=64,
step_tokens: int=64,
encoding: tiktoken.Encoding=None,
) ->List[str]:
""" Split a string into chunks with overlap :param s: the input string :param separator: the separator to split the string :param num_tokens: the number of tokens in each chunk :param step_tokens: the number of tokens to step forward :param encoding: the encoding to encode the string """assertstep_tokens<=num_tokens, (
f"The number of tokens {num_tokens} in each chunk "f"should be larger than the step size {step_tokens}."
)
lines=s.split(separator)
chunks=dict()
final_chunks= []
iflen(lines) ==0:
return []
first_line=lines[0]
first_line_size=len(encoding.encode(first_line))
chunks[0] = [first_line, first_line_size]
this_step_size=first_line_sizeforiinrange(1, len(lines)):
line=lines[i]
line_size=len(encoding.encode(line))
to_pop= []
forkeyinchunks:
ifchunks[key][1] +line_size>num_tokens:
to_pop.append(key)
else:
chunks[key][0] +=f"{separator}{line}"chunks[key][1] +=line_sizefinal_chunks+= [chunks.pop(key)[0] forkeyinto_pop]
ifthis_step_size+line_size>step_tokens:
chunks[i] = [line, line_size]
this_step_size=0this_step_size+=line_sizemax_remained_chunk=""max_remained_chunk_size=0forkeyinchunks:
ifchunks[key][1] >max_remained_chunk_size:
max_remained_chunk_size=chunks[key][1]
max_remained_chunk=chunks[key][0]
ifmax_remained_chunk_size>0:
final_chunks.append(max_remained_chunk)
returnfinal_chunksdefget_title(
file_name: str,
prop="title: ",
) ->str:
""" Get the title of a file :param file_name: the file name :param prop: the property to get the title """withopen(file_name, encoding="utf-8", errors="ignore") asf_in:
forlineinf_in:
line=line.strip()
iflineand (line.startswith(prop) orany([c.isalnum() forcinline])):
returnlinereturn""defextract_text_from_file(
file: str,
file_type: Literal["pdf", "docx", "csv", "pptx"],
) ->Tuple[str, str]:
""" Extract text from a file in pdf, docx, csv or pptx format :param file: the file path :param file_type: the extension of the file """iffile_type=="pdf":
try:
frompypdfimportPdfReaderexceptImportError:
raiseImportError("Please install pypdf first.")
# Extract text from pdf using PyPDF2reader=PdfReader(file)
extracted_text=" ".join([page.extract_text() forpageinreader.pages])
title=extracted_text.split("\n")[0]
eliffile_type=="docx":
try:
importdocx2txtexceptImportError:
raiseImportError("Please install docx2txt first.")
# Extract text from docx using docx2txtextracted_text=docx2txt.process(file)
title=extracted_text.split("\n")[0]
eliffile_type=="csv":
# Extract text from csv using csv moduleextracted_text=""title=""reader=csv.reader(file)
forrowinreader:
extracted_text+=" ".join(row) +"\n"eliffile_type=="pptx":
try:
importpptxexceptImportError:
raiseImportError("Please install python-pptx first.")
extracted_text=""no_title=Truetitle=""presentation=pptx.Presentation(file)
forslideinpresentation.slides:
forshapeinslide.shapes:
ifshape.has_text_frame:
forparagraphinshape.text_frame.paragraphs:
forruninparagraph.runs:
extracted_text+=run.text+" "ifno_titleandlen(run.text) >10:
title=run.textno_title=Falseextracted_text+="\n"else:
# Unsupported file typeraiseValueError(f"Unsupported file type: {file_type}")
returntitle[:100], extracted_textdeftext_parser(
read_file: str,
) ->Tuple[str, str]:
""" Returns the title, parsed text and a BeautifulSoup object with different file extension : param read_file: the input file with a given extension : return: the title, parsed text and a BeautifulSoup object, the BeautifulSoup object is used to get the document link from the html files """filename, extension=os.path.splitext(read_file)
extension=extension.lstrip(".")
title=filenamesoup=Nonesupported_extensions= ["md", "markdown", "html", "htm", "txt", "json", "jsonl"]
other_extensions= ["docx", "pptx", "pdf", "csv"]
# utf-8-sig will treat BOM header as a metadata of a file not a part of the file contentdefault_encoding="utf-8-sig"ifextensionin ("md", "markdown", "txt"):
title=get_title(read_file)
withopen(read_file, "r", encoding=default_encoding, errors="ignore") asf:
text=f.read()
elifextensionin ("html", "htm"):
frombs4importBeautifulSoupwithopen(read_file, "r", encoding=default_encoding, errors="ignore") asf:
soup=BeautifulSoup(f, "html.parser")
title=next(soup.stripped_strings)[:100]
text=soup.get_text("\n")
# read json/jsonl file in and convert each json to a row of stringelifextensionin ("json", "jsonl"):
try:
withopen(read_file, "r", encoding=default_encoding, errors="ignore") asf:
data=json.load(f) ifextension=="json"else [json.loads(line) forlineinf]
except:
# json file encoding issue, skip this filereturntitle, ""ifisinstance(data, dict):
text=json.dumps(data)
elifisinstance(data, list):
content_list= [json.dumps(each_json) foreach_jsonindata]
text="\n".join(content_list)
title=filenameelifextensioninother_extensions:
title, text=extract_text_from_file(read_file, extension)
else: # no support for other formatprint(
f"Not support for file with extension: {extension}. "f"The supported extensions are {supported_extensions}",
)
returntitle, ""output_text=re.sub(r"\n{3,}", "\n\n", text)
# keep whitespaces for formattingoutput_text=re.sub(r"-{3,}", "---", output_text)
output_text=re.sub(r"\*{3,}", "***", output_text)
output_text=re.sub(r"_{3,}", "___", output_text)
returntitle, output_textdefchunk_document(
doc_path: str,
chunk_size: int,
chunk_step: int,
) ->Tuple[int, List[str], List[Dict[str, str]], Dict[str, int]]:
""" Split documents into chunks :param doc_path: the path of the documents :param chunk_size: the size of the chunk :param chunk_step: the step size of the chunk """texts= []
metadata_list= []
file_count=0chunk_id_to_index=dict()
enc=tiktoken.encoding_for_model("gpt-3.5-turbo")
# traverse all files under dirprint("Split documents into chunks...")
forroot, dirs, filesinos.walk(doc_path):
fornameinfiles:
f=os.path.join(root, name)
print(f"Reading {f}")
try:
title, content=text_parser(f)
file_count+=1iffile_count%100==0:
print(f"{file_count} files read.")
iflen(content) ==0:
continuechunks=chunk_str_overlap(
content.strip(),
num_tokens=chunk_size,
step_tokens=chunk_step,
separator="\n",
encoding=enc,
)
source=os.path.sep.join(f.split(os.path.sep)[4:])
foriinrange(len(chunks)):
# custom metadata if neededmetadata= {
"source": source,
"title": title,
"chunk_id": i,
}
chunk_id_to_index[f"{source}_{i}"] =len(texts) +imetadata_list.append(metadata)
texts.extend(chunks)
exceptExceptionase:
print(f"Error encountered when reading {f}: {traceback.format_exc()}{e}")
returnfile_count, texts, metadata_list, chunk_id_to_indexif__name__=="__main__":
# parse argumentsparser=argparse.ArgumentParser()
parser.add_argument(
"-d",
"--doc_path",
help="the path of the documents",
type=str,
default="documents",
)
parser.add_argument(
"-c",
"--chunk_size",
help="the size of the chunk",
type=int,
default=64,
)
parser.add_argument(
"-s",
"--chunk_step",
help="the step size of the chunk",
type=int,
default=64,
)
parser.add_argument(
"-o",
"--output_path",
help="the path of the output",
type=str,
default="knowledge",
)
args=parser.parse_args()
file_count, texts, metadata_list, chunk_id_to_index=chunk_document(
doc_path=args.doc_path,
chunk_size=args.chunk_size,
chunk_step=args.chunk_step,
)
embeddings=HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
vectorstore=FAISS.from_texts(
texts=texts,
metadatas=metadata_list,
embedding=embeddings,
)
vectorstore.save_local(folder_path=args.output_path)
withopen(os.path.join(args.output_path, "chunk_id_to_index.pkl"), "wb") asf:
pickle.dump(chunk_id_to_index, f)
print(f"Saved vectorstore to {args.output_path}")
AutoGroq\tools\document_retriever.py
# # Thanks to MADTANK: https://github.com/madtank# # README: https://github.com/madtank/autogenstudio-skills/blob/main/rag/README.md# import os# import pickle# import json# import argparse# try:# import tiktoken# from langchain_community.embeddings import HuggingFaceEmbeddings# from langchain_community.vectorstores import FAISS# except ImportError:# raise ImportError("Please install langchain-community first.")# # Configuration - Users/AI skill developers must update this path to their specific index folder# # To test with sample data set index_folder to "knowledge"# CONFIG = {# "index_folder": "rag/knowledge", # TODO: Update this path before using# }# class DocumentRetriever:# def __init__(self, index_folder):# self.index_folder = index_folder# self.vectorstore = None# self.chunk_id_to_index = None# self.embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")# self._init()# self.enc = tiktoken.encoding_for_model("gpt-3.5-turbo")# def _init(self):# self.vectorstore = FAISS.load_local(# folder_path=self.index_folder,# embeddings=self.embeddings,# )# with open(os.path.join(self.index_folder, "chunk_id_to_index.pkl"), "rb") as f:# self.chunk_id_to_index = pickle.load(f)# def __call__(self, query: str, size: int = 5, target_length: int = 256):# if self.vectorstore is None:# raise Exception("Vectorstore not initialized")# result = self.vectorstore.similarity_search(query=query, k=size)# expanded_chunks = self.do_expand(result, target_length)# return json.dumps(expanded_chunks, indent=4)# def do_expand(self, result, target_length):# expanded_chunks = []# # do expansion# for r in result:# source = r.metadata["source"]# chunk_id = r.metadata["chunk_id"]# content = r.page_content# expanded_result = content# left_chunk_id, right_chunk_id = chunk_id - 1, chunk_id + 1# left_valid, right_valid = True, True# chunk_ids = [chunk_id]# while True:# current_length = len(self.enc.encode(expanded_result))# if f"{source}_{left_chunk_id}" in self.chunk_id_to_index:# chunk_ids.append(left_chunk_id)# left_chunk_index = self.vectorstore.index_to_docstore_id[# self.chunk_id_to_index[f"{source}_{left_chunk_id}"]# ]# left_chunk = self.vectorstore.docstore.search(left_chunk_index)# encoded_left_chunk = self.enc.encode(left_chunk.page_content)# if len(encoded_left_chunk) + current_length < target_length:# expanded_result = left_chunk.page_content + expanded_result# left_chunk_id -= 1# current_length += len(encoded_left_chunk)# else:# expanded_result += self.enc.decode(# encoded_left_chunk[-(target_length - current_length) :],# )# current_length = target_length# break# else:# left_valid = False# if f"{source}_{right_chunk_id}" in self.chunk_id_to_index:# chunk_ids.append(right_chunk_id)# right_chunk_index = self.vectorstore.index_to_docstore_id[# self.chunk_id_to_index[f"{source}_{right_chunk_id}"]# ]# right_chunk = self.vectorstore.docstore.search(right_chunk_index)# encoded_right_chunk = self.enc.encode(right_chunk.page_content)# if len(encoded_right_chunk) + current_length < target_length:# expanded_result += right_chunk.page_content# right_chunk_id += 1# current_length += len(encoded_right_chunk)# else:# expanded_result += self.enc.decode(# encoded_right_chunk[: target_length - current_length],# )# current_length = target_length# break# else:# right_valid = False# if not left_valid and not right_valid:# break# expanded_chunks.append(# {# "chunk": expanded_result,# "metadata": r.metadata,# # "length": current_length,# # "chunk_ids": chunk_ids# },# )# return expanded_chunks# # Example Usage# if __name__ == "__main__":# parser = argparse.ArgumentParser(description='Retrieve documents based on a query.')# parser.add_argument('query', nargs='?', type=str, help='The query to retrieve documents for.')# args = parser.parse_args()# if not args.query:# parser.print_help()# print("Error: No query provided.")# exit(1)# # Ensure the index_folder path is correctly set in CONFIG before proceeding# index_folder = CONFIG["index_folder"]# if index_folder == "path/to/your/knowledge/directory":# print("Error: Index folder in CONFIG has not been set. Please update it to your index folder path.")# exit(1)# # Instantiate and use the DocumentRetriever with the configured index folder# retriever = DocumentRetriever(index_folder=index_folder)# query = args.query# size = 5 # Number of results to retrieve# target_length = 256 # Target length of expanded content# results = retriever(query, size, target_length)# print(results)
AutoGroq\tools\execute_powershell_command.py
# Thanks to aj47: https://github.com/aj47 importsubprocessdefexecute_powershell_command(command):
""" Execute a command in PowerShell from Python. :param command: The PowerShell command to execute as a string. :return: The output of the command as a string. """# Ensure the command is executed in PowerShellcmd= ['powershell', '-Command', command]
# Execute the command and capture the outputtry:
result=subprocess.run(cmd, capture_output=True, text=True, check=True)
returnresult.stdoutexceptsubprocess.CalledProcessErrorase:
returnf"An error occurred: {e.stderr}"# Example usage# if __name__ == "__main__":command="Get-Date"# Example command to get the current date and timeoutput=execute_powershell_command(command)
print(output)
AutoGroq\tools\fetch_web_content.py
# Thanks to MADTANK: https://github.com/madtankfromtypingimportOptionalimportrequestsimportcollectionscollections.Callable=collections.abc.Callablefrombs4importBeautifulSoupdeffetch_web_content(url: str) ->Optional[str]:
""" Fetches the text content from a website. Args: url (str): The URL of the website. Returns: Optional[str]: The content of the website. """try:
# Send a GET request to the URLresponse=requests.get(url)
# Check for successful access to the webpageifresponse.status_code==200:
# Parse the HTML content of the page using BeautifulSoupsoup=BeautifulSoup(response.text, "html.parser")
# Extract the content of the <body> tagbody_content=soup.bodyifbody_content:
# Return all the text in the body tag, stripping leading/trailing whitespacesreturn" ".join(body_content.get_text(strip=True).split())
else:
# Return None if the <body> tag is not foundreturnNoneelse:
# Return None if the status code isn't 200 (success)returnNoneexceptrequests.RequestException:
# Return None if any request-related exception is caughtreturnNone
AutoGroq\tools\generate_sd_images.py
# Thanks to marc-shade: https://github.com/marc-shade# Ollama only? -jjgfromtypingimportListimportjsonimportrequestsimportioimportbase64fromPILimportImagefrompathlibimportPathimportuuid# Import the uuid library# Format: protocol://server:portbase_url="http://0.0.0.0:7860"defgenerate_sd_images(query: str, image_size: str="512x512", team_name: str="default") ->List[str]:
""" Function to paint, draw or illustrate images based on the users query or request. Generates images locally with the automatic1111 API and saves them to disk. Use the code below anytime there is a request to create an image. :param query: A natural language description of the image to be generated. :param image_size: The size of the image to be generated. (default is "512x512") :param team_name: The name of the team to associate the image with. :return: A list containing a single filename for the saved image. """# Split the image size string at "x"parts=image_size.split("x")
image_width=parts[0]
image_height=parts[1]
# list of file paths returned to AutoGensaved_files= []
payload= {
"prompt": query,
"steps": 40,
"cfg_scale": 7,
"denoising_strength": 0.5,
"sampler_name": "DPM++ 2M Karras",
"n_iter": 1,
"batch_size": 1, # Ensure only one image is generated per batch"override_settings": {
'sd_model_checkpoint': "starlightAnimated_v3",
}
}
api_url=f"{base_url}/sdapi/v1/txt2img"response=requests.post(url=api_url, json=payload)
ifresponse.status_code==200:
r=response.json()
# Access only the final generated image (index 0)encoded_image=r['images'][0]
image=Image.open(io.BytesIO(base64.b64decode(encoded_image.split(",", 1)[0])))
# --- Generate a unique filename with team name and UUID ---unique_id=str(uuid.uuid4())[:8] # Get a short UUIDfile_name=f"images/{team_name}_{unique_id}_output.png"file_path=Path(file_name)
image.save(file_path)
print(f"Image saved to {file_path}")
saved_files.append(str(file_path))
else:
print(f"Failed to download the image from {api_url}")
returnsaved_files
AutoGroq\tools\get_complementary_colors.py
# Tool filename: complementary_colors.py# Import necessary module(s)importcolorsysdefget_complementary_colors(color):
# docstrings""" Returns a list of complementary colors for the given color. Parameters: color (str): The color in hexadecimal format (e.g., '#FF0000' for red). Returns: list: A list of complementary colors in hexadecimal format. """# Body of tool# Convert the color from hexadecimal to RGBr, g, b=tuple(int(color.lstrip('#')[i:i+2], 16) foriin (0, 2, 4))
# Convert RGB to HSVh, s, v=colorsys.rgb_to_hsv(r/255, g/255, b/255)
# Calculate the complementary hueh_compl= (h+0.5) %1# Convert the complementary hue back to RGBr_compl, g_compl, b_compl=colorsys.hsv_to_rgb(h_compl, 1, 1)
# Convert RGB to hexadecimalcolor_compl='#{:02x}{:02x}{:02x}'.format(int(r_compl*255), int(g_compl*255), int(b_compl*255))
# Return the complementary colorreturn [color_compl]
# Example usage:# color = '#FF0000'# print(get_complementary_colors(color))
AutoGroq\tools\get_weather.py
importrequestsfromtypingimportOptionaldefget_weather(zipcode: str, api_key: str) ->Optional[dict]:
""" Fetches the current weather for the given ZIP code using the OpenWeatherMap API. Args: zipcode (str): The ZIP code for which to fetch the weather. api_key (str): Your OpenWeatherMap API key. Returns: Optional[dict]: A dictionary containing the weather information, or None if an error occurs. """base_url="http://api.openweathermap.org/data/2.5/weather"params= {
"zip": zipcode,
"appid": api_key,
"units": "imperial"# Use "metric" for Celsius
}
try:
response=requests.get(base_url, params=params)
response.raise_for_status() # Raise an HTTPError for bad responsesreturnresponse.json()
exceptrequests.RequestExceptionase:
print(f"An error occurred: {e}")
returnNone# Example usage:# api_key = "your_openweathermap_api_key"# weather = get_weather("94040", api_key)# print(weather)
AutoGroq\tools\plot_diagram.py
# Thanks to MADTANK: https://github.com/madtankimportosimportmatplotlib.pyplotaspltimportmatplotlib.patchesaspatches# Function to draw the geometric structure with customizable file namedefdraw_geometric_structure(file_name, base_circles=4, base_circle_color='blue', top_circle_color='orange', line_color='grey', line_width=2):
# Define the directory and save path using the file_name parameterdirectory='diagrams'ifnotos.path.exists(directory):
os.makedirs(directory)
save_path=f'{directory}/{file_name}.png'fig, ax=plt.subplots()
# Draw base circlesforiinrange(base_circles):
circle=patches.Circle((i*1.5, 0), 0.5, color=base_circle_color)
ax.add_patch(circle)
# Draw top circletop_circle=patches.Circle(((base_circles-1) *0.75, 2), 0.6, color=top_circle_color)
ax.add_patch(top_circle)
# Draw linesforiinrange(base_circles):
line=plt.Line2D([(i*1.5), ((base_circles-1) *0.75)], [0, 2], color=line_color, linewidth=line_width)
ax.add_line(line)
# Set limits and aspectax.set_xlim(-1, base_circles*1.5)
ax.set_ylim(-1, 3)
ax.set_aspect('equal')
# Remove axesax.axis('off')
# Save the plot to the specified pathplt.savefig(save_path, bbox_inches='tight', pad_inches=0)
plt.close()
# Return the path for verificationreturnsave_path# Example usage:#file_name = 'custom_geometric_structure'#image_path = draw_geometric_structure(file_name, base_circles=8, base_circle_color='blue', top_circle_color='orange', line_color='grey', line_width=2)
AutoGroq\tools\save_file_to_disk.py
# Thanks to aj47: https://github.com/aj47importosdefsave_file_to_disk(contents, file_name):
""" Saves the given contents to a file with the given file name. Parameters: contents (str): The string contents to save to the file. file_name (str): The name of the file, including its extension. Returns: str: A message indicating the success of the operation. """# Ensure the directory exists; create it if it doesn'tdirectory=os.path.dirname(file_name)
ifdirectoryandnotos.path.exists(directory):
os.makedirs(directory)
# Write the contents to the filewithopen(file_name, 'w') asfile:
file.write(contents)
returnf"File '{file_name}' has been saved successfully."# Example usage:# contents_to_save = "Hello, world!"# file_name = "example.txt"# print(save_file_to_disk(contents_to_save, file_name))
AutoGroq\tools\slackoverflow_teams.py
# # Thanks to MADTANK: https://github.com/madtank# # README: https://github.com/madtank/autogenstudio-skills/blob/main/stackoverflow_teams/README.md# import os# import requests# import json# import sys# class StackOverflowTeamsSearcher:# def __init__(self):# self.api_key = os.getenv("STACK_OVERFLOW_TEAMS_API_KEY")# if not self.api_key:# raise ValueError("API key not found in environment variables")# self.base_url = "https://api.stackoverflowteams.com/2.3/search"# self.headers = {"X-API-Access-Token": self.api_key}# def search(self, query, team_name):# params = {"intitle": query, "team": team_name}# response = requests.get(self.base_url, headers=self.headers, params=params)# if response.status_code != 200:# print(f"Error: Received status code {response.status_code}")# print(response.text)# return None# try:# data = response.json()# simplified_output = []# for item in data['items']:# question = {"question": item['title']}# if 'accepted_answer_id' in item:# answer_id = item['accepted_answer_id']# answer_url = f"https://api.stackoverflowteams.com/2.3/answers/{answer_id}"# answer_params = {"team": team_name, "filter": "withbody"}# answer_response = requests.get(answer_url, headers=self.headers, params=answer_params)# if answer_response.status_code == 200:# answer_data = answer_response.json()# first_item = answer_data['items'][0]# if 'body' in first_item:# answer_text = first_item['body']# question['answer'] = answer_text# # else:# # print(f"Question {item['link']} has no answer body")# # else:# # print(f"Error: Received status code {answer_response.status_code}")# # print(answer_response.text)# # else:# # print(f"Question {item['link']} has no answer")# simplified_output.append(question)# return json.dumps(simplified_output, indent=4) # Pretty-printing# except ValueError as e:# print(f"Error parsing JSON: {e}")# print("Response text:", response.text)# return None# # Example Usage# if __name__ == "__main__":# if len(sys.argv) < 2:# print("Usage: python stackoverflow_teams.py <query>")# sys.exit(1)# query = sys.argv[1]# team_name = "yourteamname" # TODO Set your team name here# # Instantiate and use the StackOverflowTeamsSearcher with the query string passed in# searcher = StackOverflowTeamsSearcher()# results = searcher.search(query, team_name)# print(results)
AutoGroq\tools\slack_search.py
# # Thanks to MADTANK: https://github.com/madtank# # README: https://github.com/madtank/autogenstudio-skills/blob/main/slack/README.md# import os# import requests# import json# import re# import sys# class SlackSearcher:# def __init__(self):# self.api_token = os.getenv("SLACK_API_TOKEN")# if not self.api_token:# raise ValueError("Slack API token not found in environment variables")# self.base_url = "https://slack.com/api"# self.headers = {"Authorization": f"Bearer {self.api_token}"}# # Replace these example channel names with the actual channel names you want to search# self.channel_names = ["general", "random"]# def search(self, query):# query_with_channels = self.build_query_with_channels(query)# search_url = f"{self.base_url}/search.messages"# params = {"query": query_with_channels}# response = requests.get(search_url, headers=self.headers, params=params)# if response.status_code != 200:# print(f"Error: Received status code {response.status_code}")# print(response.text)# return None# try:# data = response.json()# if not data['ok']:# print(f"Error: {data['error']}")# return None# simplified_output = []# for message in data['messages']['matches']:# simplified_message = {# "user": message['user'],# "text": message['text'],# "permalink": message['permalink']# }# thread_ts = self.extract_thread_ts(message['permalink'])# if thread_ts:# thread_messages = self.get_thread_messages(message['channel']['id'], thread_ts)# simplified_message['thread'] = thread_messages# simplified_output.append(simplified_message)# return json.dumps(simplified_output, indent=4) # Pretty-printing# except ValueError as e:# print(f"Error parsing JSON: {e}")# print("Response text:", response.text)# return None# def build_query_with_channels(self, query):# channel_queries = [f"in:{channel}" for channel in self.channel_names]# return f"{query} {' '.join(channel_queries)}"# def extract_thread_ts(self, permalink):# match = re.search(r"thread_ts=([0-9.]+)", permalink)# return match.group(1) if match else None# def get_thread_messages(self, channel_id, thread_ts):# thread_url = f"{self.base_url}/conversations.replies"# params = {"channel": channel_id, "ts": thread_ts}# response = requests.get(thread_url, headers=self.headers, params=params)# if response.status_code != 200 or not response.json()['ok']:# print(f"Error fetching thread messages: {response.text}")# return []# thread_messages = []# for message in response.json()['messages']:# if message['ts'] != thread_ts: # Exclude the parent message# thread_messages.append({# "user": message['user'],# "text": message['text']# })# return thread_messages# # Example Usage# if __name__ == "__main__":# if len(sys.argv) < 2:# print("Usage: python slack_search.py <query>")# sys.exit(1)# query = sys.argv[1]# searcher = SlackSearcher()# results = searcher.search(query)# print(results)
AutoGroq\tools\test.py
# bfrglz; = # return# import json
AutoGroq\tools\webscrape.py
# Thanks to MADTANK: https://github.com/madtankimportrequestsfrombs4importBeautifulSoupdefsave_webpage_as_text(url, output_filename):
# Send a GET request to the URLresponse=requests.get(url)
# Initialize BeautifulSoup to parse the contentsoup=BeautifulSoup(response.text, 'html.parser')
# Extract text from the BeautifulSoup object# You can adjust the elements you extract based on your needstext=soup.get_text(separator='\n', strip=True)
# Save the extracted text to a filewithopen(output_filename, 'w', encoding='utf-8') asfile:
file.write(text)
# Return the file pathreturnoutput_filename# Example usage:# url = 'https://j.gravelle.us /'# output_filename = 'webpage_content.txt'# file_path = save_webpage_as_text(url, output_filename)# print("File saved at:", file_path)# For a list of urls:# urls = ['http://example.com', 'http://example.org']# for i, url in enumerate(urls):# output_filename = f'webpage_content_{i}.txt'# save_webpage_as_text(url, output_filename)
AutoGroq\tools\web_search.py
# # Thanks to MADTANK: https://github.com/madtank# # README: https://github.com/madtank/autogenstudio-skills/blob/main/web_search/README.MD# import requests# from typing import List, Tuple, Optional# # Define the structure of a search result entry# ResponseEntry = Tuple[str, str, str]# # Configuration variables for the web search function# CONFIG = {# "api_provider": "google", # or "bing"# "result_count": 3,# # For Google Search enter these values # # Refer to readme for help: https://github.com/madtank/autogenstudio-skills/blob/main/web_search/README.MD # "google_api_key": "your_google_api_key_here",# "google_search_engine_id": "your_google_search_engine_id_here",# # Or Bing Search enter these values# "bing_api_key": "your_bing_api_key_here"# }# class WebSearch:# """# A class that encapsulates the functionality to perform web searches using# Google Custom Search API or Bing Search API based on the provided configuration.# """# def __init__(self, config: dict):# """# Initializes the WebSearch class with the provided configuration.# Parameters:# - config (dict): A dictionary containing configuration settings.# """# self.config = config# def search_query(self, query: str) -> Optional[List[ResponseEntry]]:# """# Performs a web search based on the query and configuration.# Parameters:# - query (str): The search query string.# Returns:# - A list of ResponseEntry tuples containing the title, URL, and snippet of each result.# """# api_provider = self.config.get("api_provider", "google")# result_count = int(self.config.get("result_count", 3))# try:# if api_provider == "google":# return self._search_google(query, cnt=result_count)# elif api_provider == "bing":# return self._search_bing(query, cnt=result_count)# except ValueError as e:# print(f"An error occurred: {e}")# except Exception as e:# print(f"An unexpected error occurred: {e}")# return None# def _search_google(self, query: str, cnt: int) -> Optional[List[ResponseEntry]]:# """# Performs a Google search and processes the results.# Parameters:# - query (str): The search query string.# - cnt (int): The number of search results to return.# Returns:# - A list of ResponseEntry tuples containing the title, URL, and snippet of each Google search result.# """# api_key = self.config.get("google_api_key")# search_engine_id = self.config.get("google_search_engine_id")# url = f"https://www.googleapis.com/customsearch/v1?key={api_key}&cx={search_engine_id}&q={query}"# if cnt > 0:# url += f"&num={cnt}"# response = requests.get(url)# if response.status_code == 200:# result_list: List[ResponseEntry] = []# for item in response.json().get("items", []):# result_list.append((item["title"], item["link"], item["snippet"]))# return result_list# else:# print(f"Error with Google Custom Search API: {response.status_code}")# return None# def _search_bing(self, query: str, cnt: int) -> Optional[List[ResponseEntry]]:# """# Performs a Bing search and processes the results.# Parameters:# - query (str): The search query string.# - cnt (int): The number of search results to return.# Returns:# - A list of ResponseEntry tuples containing the name, URL, and snippet of each Bing search result.# """# api_key = self.config.get("bing_api_key")# url = f"https://api.bing.microsoft.com/v7.0/search?q={query}"# if cnt > 0:# url += f"&count={cnt}"# headers = {"Ocp-Apim-Subscription-Key": api_key}# response = requests.get(url, headers=headers)# if response.status_code == 200:# result_list: List[ResponseEntry] = []# for item in response.json().get("webPages", {}).get("value", []):# result_list.append((item["name"], item["url"], item["snippet"]))# return result_list# else:# print(f"Error with Bing Search API: {response.status_code}")# return None# # Remember to replace the placeholders in CONFIG with your actual API keys.# # Example usage# # search = WebSearch(CONFIG)# # results = search.search_query("Example Query")# # if results is not None:# # for title, link, snippet in results:# # print(title, link, snippet)
importimportlibimportrequestsimportstreamlitasstimporttimefromconfigimportAPI_URL, LLM_PROVIDER, RETRY_TOKEN_LIMITdefget_llm_provider(api_key=None, api_url=None, provider=None):
ifproviderisNone:
provider=LLM_PROVIDERprovider_module=importlib.import_module(f"llm_providers.{provider}_provider")
provider_class=getattr(provider_module, f"{provider.capitalize()}Provider")
ifapi_urlisNone:
api_url=API_URLreturnprovider_class(api_url=api_url, api_key=api_key)
defmake_api_request(url, data, headers, api_key):
time.sleep(2) # Throttle the request to ensure at least 2 seconds between callstry:
ifnotapi_key:
llm=LLM_PROVIDER.upper()
raiseValueError(f"{llm}_API_KEY not found. Please enter your API key.")
headers["Authorization"] =f"Bearer {api_key}"response=requests.post(url, json=data, headers=headers)
ifresponse.status_code==200:
returnresponse.json()
elifresponse.status_code==429:
error_message=response.json().get("error", {}).get("message", "")
st.error(f"Rate limit reached for the current model. If you click 'Update' again, we'll retry with a reduced token count. Or you can try selecting a different model.")
st.error(f"Error details: {error_message}")
returnNoneelse:
print(f"Error: API request failed with status {response.status_code}, response: {response.text}")
returnNoneexceptrequests.RequestExceptionase:
print(f"Error: Request failed {e}")
returnNonedefsend_request_with_retry(url, data, headers, api_key):
response=make_api_request(url, data, headers, api_key)
ifresponseisNone:
# Add a retry buttonifst.button("Retry with decreased token limit"):
# Update the token limit in the request datadata["max_tokens"] =RETRY_TOKEN_LIMIT# Retry the request with the decreased token limitprint(f"Retrying the request with decreased token limit.")
print(f"URL: {url}")
print(f"Retry token limit: {RETRY_TOKEN_LIMIT}")
response=make_api_request(url, data, headers, api_key)
ifresponseisnotNone:
print(f"Retry successful. Response: {response}")
else:
print("Retry failed.")
returnresponsedefset_llm_provider_title():
# "What's life without whimsy?" ~Sheldon CooperifLLM_PROVIDER=="groq":
st.title("AutoGroq™")
elifLLM_PROVIDER=="ollama":
st.title("Auto̶G̶r̶o̶qOllama")
elifLLM_PROVIDER=="lmstudio":
st.title("Auto̶G̶r̶o̶qLM_Studio")
elifLLM_PROVIDER=="openai":
st.title("Auto̶G̶r̶o̶qChatGPT")
AutoGroq\utils\auth_utils.py
importosimportstreamlitasstfromconfigimportLLM_PROVIDERdefget_api_key():
api_key_env_var=f"{LLM_PROVIDER.upper()}_API_KEY"api_key=os.environ.get(api_key_env_var)
ifapi_keyisNone:
api_key=st.session_state.get(api_key_env_var)
returnapi_keydefget_api_url():
api_url_env_var=f"{LLM_PROVIDER.upper()}_API_URL"api_url=os.environ.get(api_url_env_var)
ifapi_urlisNone:
api_url=globals().get(api_url_env_var)
ifapi_urlisNone:
ifapi_url_env_varnotinst.session_state:
api_url=st.text_input(f"Enter the {LLM_PROVIDER.upper()} API URL:", type="password", key=f"{LLM_PROVIDER}_api_url_input")
ifapi_url:
st.session_state[api_url_env_var] =api_urlst.success("API URL entered successfully.")
else:
st.warning(f"Please enter the {LLM_PROVIDER.upper()} API URL to use the app.")
else:
api_url=st.session_state.get(api_url_env_var)
returnapi_url
AutoGroq\utils\db_utils.py
importdatetimeimportjsonimportosimportsqlite3importstreamlitasstimportuuidfromconfigimportFRAMEWORK_DB_PATH, MODEL_CHOICES, MODEL_TOKEN_LIMITSfromutils.agent_utilsimportcreate_agent_datafromutils.file_utilsimportsanitize_textfromutils.workflow_utilsimportget_workflow_from_agentsdefexport_to_autogen():
# Check if the app is running on Streamlit Sharingurl_params=st.query_paramsif"streamlit.app"inurl_params.get("url", ""):
st.warning("Exporting to Autogen is only possible with a locally running copy of AutoGroq™.")
returndb_path=FRAMEWORK_DB_PATHprint(f"Database path: {db_path}")
ifdb_path:
export_data(db_path)
else:
st.warning("Please provide a valid database path in config.py.")
defexport_data(db_path):
print(f"Exporting data to: {db_path}")
ifdb_path:
try:
conn=sqlite3.connect(db_path)
cursor=conn.cursor()
print("Connected to the database successfully.")
# Access agents from st.session_stateagents=st.session_state.agentsprint(f"Number of agents: {len(agents)}")
# Keep track of inserted skills to avoid duplicatesinserted_skills=set()
foragentinagents:
agent_name=agent['config']['name']
formatted_agent_name=sanitize_text(agent_name).lower().replace(' ', '_')
autogen_agent_data, _=create_agent_data(agent)
# Update the model and max_tokens in the autogen_agent_dataautogen_agent_data['config']['llm_config']['config_list'][0]['model'] =agent['config']['llm_config']['config_list'][0]['model']
autogen_agent_data['config']['llm_config']['max_tokens'] =MODEL_CHOICES.get(agent['config']['llm_config']['config_list'][0]['model'], MODEL_TOKEN_LIMITS.get(st.session_state.model, 4096))
agent_data= (
str(uuid.uuid4()), # Generate a unique ID for the agent'default',
datetime.datetime.now().isoformat(),
json.dumps(autogen_agent_data['config']),
autogen_agent_data['type'],
json.dumps(autogen_agent_data['tools'])
)
cursor.execute("INSERT INTO agents (id, user_id, timestamp, config, type, skills) VALUES (?, ?, ?, ?, ?, ?)", agent_data)
print(f"Inserted agent: {formatted_agent_name}")
project_root=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
skill_folder=os.path.join(project_root, "skills")
fortool_nameinst.session_state.selected_skills:
iftool_namenotininserted_skills:
skill_file_path=os.path.join(skill_folder, f"{tool_name}.py")
withopen(skill_file_path, 'r') asfile:
skill_data=file.read()
skill_json=st.session_state.toolskill_data= (
str(uuid.uuid4()), # Generate a unique ID for the skill'default', # Set the user ID to 'default'datetime.datetime.now().isoformat(),
skill_data,
skill_json['title'],
skill_json['file_name']
)
cursor.execute("INSERT INTO skills (id, user_id, timestamp, content, title, file_name) VALUES (?, ?, ?, ?, ?, ?)", skill_data)
print(f"Inserted skill: {skill_json['title']}")
inserted_skills.add(tool_name) # Add the inserted skill to the set# Access agents from st.session_state for workflowworkflow_data=get_workflow_from_agents(st.session_state.agents)[0]
workflow_data= (
str(uuid.uuid4()), # Generate a unique ID for the workflow'default',
datetime.datetime.now().isoformat(),
json.dumps(workflow_data['sender']),
json.dumps(workflow_data['receiver']),
workflow_data['type'],
workflow_data['name'],
workflow_data['description'],
workflow_data['summary_method']
)
cursor.execute("INSERT INTO workflows (id, user_id, timestamp, sender, receiver, type, name, description, summary_method) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", workflow_data)
print("Inserted workflow data.")
conn.commit()
print("Changes committed to the database.")
conn.close()
print("Database connection closed.")
st.success("Data exported to Autogen successfully!")
exceptsqlite3.Errorase:
st.error(f"Error exporting data to Autogen: {str(e)}")
print(f"Error exporting data to Autogen: {str(e)}")
defsql_to_db(sql: str, params: tuple=None):
try:
conn=sqlite3.connect(FRAMEWORK_DB_PATH)
cursor=conn.cursor()
print("Connected to the database successfully.")
ifparams:
cursor.execute(sql, params)
else:
cursor.execute(sql)
conn.commit()
print("SQL executed successfully.")
exceptsqlite3.Errorase:
print(f"Error executing SQL: {str(e)}")
raisefinally:
ifconn:
conn.close()
print("Database connection closed.")
#FUTURE functions for exporting to new Autogen Studio schema:# def create_or_update_agent(agent: dict, db_path: str):# with sqlite3.connect(db_path) as conn:# cursor = conn.cursor()# cursor.execute("""# INSERT OR REPLACE INTO Agent (id, skills, created_at, updated_at, user_id, workflows, type, config, models)# VALUES (:id, :skills, :created_at, :updated_at, :user_id, :workflows, :type, :config, :models)# """, agent)# conn.commit()# def create_or_update_skill(skill: dict, db_path: str):# with sqlite3.connect(db_path) as conn:# cursor = conn.cursor()# cursor.execute("""# INSERT OR REPLACE INTO Skill (id, created_at, updated_at, user_id, name, content, description, secrets, libraries)# VALUES (:id, :created_at, :updated_at, :user_id, :name, :content, :description, :secrets, :libraries)# """, skill)# conn.commit()# def create_or_update_workflow(workflow: dict, db_path: str):# with sqlite3.connect(db_path) as conn:# cursor = conn.cursor()# cursor.execute("""# INSERT OR REPLACE INTO Workflow (id, agents, created_at, updated_at, user_id, name, description, type, summary_method)# VALUES (:id, :agents, :created_at, :updated_at, :user_id, :name, :description, :type, :summary_method)# """, workflow)# conn.commit()# def get_agent_by_id(agent_id: int, db_path: str) -> Optional[dict]:# with sqlite3.connect(db_path) as conn:# cursor = conn.cursor()# cursor.execute("SELECT * FROM Agent WHERE id = ?", (agent_id,))# row = cursor.fetchone()# if row:# columns = [column[0] for column in cursor.description]# return dict(zip(columns, row))# return None# def get_skill_by_id(skill_id: int, db_path: str) -> Optional[dict]:# with sqlite3.connect(db_path) as conn:# cursor = conn.cursor()# cursor.execute("SELECT * FROM Skill WHERE id = ?", (skill_id,))# row = cursor.fetchone()# if row:# columns = [column[0] for column in cursor.description]# return dict(zip(columns, row))# return None# def get_workflow_by_id(workflow_id: int, db_path: str) -> Optional[dict]:# with sqlite3.connect(db_path) as conn:# cursor = conn.cursor()# cursor.execute("SELECT * FROM Workflow WHERE id = ?", (workflow_id,))# row = cursor.fetchone()# if row:# columns = [column[0] for column in cursor.description]# return dict(zip(columns, row))# return None
importredefsanitize_text(text):
# Remove non-ASCII characters text=re.sub(r'[^\x00-\x7F]+', '', text)
# Remove non-alphanumeric characters except for standard punctuation text=re.sub(r'[^a-zA-Z0-9\s.,!?:;\'"-]+', '', text)
returntext
AutoGroq\utils\tool_utils.py
importdatetimeimportimportlibimportosimportreimportsqlite3importstreamlitasstimportuuidfromconfigimportFRAMEWORK_DB_PATHfrommodels.tool_base_modelimportToolBaseModelfrompromptsimportget_generate_tool_promptfromutils.auth_utilsimportget_api_keyfromutils.db_utilsimportsql_to_dbfromutils.file_utilsimportregenerate_zip_filesfromutils.ui_utilsimportget_llm_providerdefcreate_tool_data(python_code):
# Extract the function name from the Python codefunction_name_match=re.search(r"def\s+(\w+)\(", python_code)
iffunction_name_match:
function_name=function_name_match.group(1)
else:
function_name="unnamed_function"# Extract the tool description from the docstringdocstring_match=re.search(r'"""(.*?)"""', python_code, re.DOTALL)
ifdocstring_match:
tool_description=docstring_match.group(1).strip()
else:
tool_description="No description available"# Get the current timestampcurrent_timestamp=datetime.datetime.now().isoformat()
# Update st.session_state.tool_model with the tool datast.session_state.tool_model.name=function_namest.session_state.tool_model.description=tool_descriptionst.session_state.tool_model.title=function_namest.session_state.tool_model.file_name=f"{function_name}.py"st.session_state.tool_model.content=python_codest.session_state.tool_model.user_id="default"st.session_state.tool_model.timestamp=current_timestampdefexport_tool_as_skill(tool_name: str, edited_skill: str):
print(f"Exporting skill '{tool_name}'...")
try:
create_tool_data(edited_skill)
print(f"Skill data: {st.session_state.tool_model.dict()}") # Use dict() to get the dictionary representationskill_tuple= (
str(uuid.uuid4()), # Generate a unique ID for the skill'default', # Set the user ID to 'default'datetime.datetime.now().isoformat(),
edited_skill,
st.session_state.tool_model.title,
st.session_state.tool_model.file_name
)
print(f"Inserting skill data: {skill_tuple}")
sql="INSERT INTO skills (id, user_id, timestamp, content, title, file_name) VALUES (?, ?, ?, ?, ?, ?)"sql_to_db(sql, skill_tuple)
st.success(f"Skill '{tool_name}' exported tool successfully!")
st.experimental_rerun()
exceptsqlite3.Errorase:
st.error(f"Error exporting skill: {str(e)}")
print(f"Error exporting skill: {str(e)}")
defgenerate_tool(rephrased_tool_request):
temperature_value=st.session_state.get('temperature', 0.1)
max_tokens_value=st.session_state.get('max_tokens', 100)
top_p_value=st.session_state.get('top_p', 1)
llm_request_data= {
"model": st.session_state.model,
"temperature": temperature_value,
"max_tokens": max_tokens_value,
"top_p": top_p_value,
"stop": "TERMINATE",
"messages": [
{
"role": "user",
"content": get_generate_tool_prompt(rephrased_tool_request)
}
]
}
api_key=get_api_key()
llm_provider=get_llm_provider(api_key=api_key)
response=llm_provider.send_request(llm_request_data)
ifresponse.status_code==200:
response_data=llm_provider.process_response(response)
print(f"Response data: {response_data}")
if"choices"inresponse_dataandresponse_data["choices"]:
proposed_tool=response_data["choices"][0]["message"]["content"].strip()
match=re.search(r"def\s+(\w+)\(", proposed_tool)
ifmatch:
tool_name=match.group(1)
# Update the st.session_state.tool_model with the proposed tool datacreate_tool_data(proposed_tool)
returnproposed_tool, tool_nameelse:
print("Error: Failed to extract tool name from the proposed tool.")
returnNone, NonereturnNone, Nonedefextract_tool_description(proposed_tool):
docstring_match=re.search(r'"""(.*?)"""', proposed_tool, re.DOTALL)
ifdocstring_match:
returndocstring_match.group(1).strip()
else:
return"No description available"defload_tool_functions():
# Get the parent directory of the current scriptparent_directory=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# Define the path to the 'tools' folder in the parent directorytools_folder_path=os.path.join(parent_directory, 'tools')
# List all files in the 'tools' foldertool_files= [fforfinos.listdir(tools_folder_path) iff.endswith('.py')]
tool_functions= {}
fortool_fileintool_files:
tool_name=os.path.splitext(tool_file)[0]
tool_module=importlib.import_module(f"tools.{tool_name}")
ifhasattr(tool_module, tool_name):
tool_functions[tool_name] =getattr(tool_module, tool_name)
st.session_state.tool_functions=tool_functionsdefpopulate_tool_models():
project_root=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
tool_folder=os.path.join(project_root, "tools")
tool_files= [fforfinos.listdir(tool_folder) iff.endswith(".py")]
tool_models= []
fortool_fileintool_files:
tool_name=os.path.splitext(tool_file)[0]
tool_file_path=os.path.join(tool_folder, tool_file)
withopen(tool_file_path, 'r') asfile:
tool_data=file.read()
create_tool_data(tool_data)
tool_model=ToolBaseModel(
name=st.session_state.tool_model.name,
description=st.session_state.tool_model.description,
title=st.session_state.tool_model.title,
file_name=st.session_state.tool_model.file_name,
content=st.session_state.tool_model.content,
id=len(tool_models) +1,
created_at=datetime.datetime.now().isoformat(),
updated_at=datetime.datetime.now().isoformat(),
user_id=st.session_state.tool_model.user_id,
secrets=st.session_state.tool_model.secrets,
libraries=st.session_state.tool_model.libraries,
timestamp=st.session_state.tool_model.timestamp
)
tool_models.append(tool_model)
st.session_state.tool_models=tool_modelsdefprocess_tool_request():
ifst.session_state.tool_requestandnotst.session_state.get('tool_processed', False):
tool_request=st.session_state.tool_requestparent_directory=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
tool_folder=os.path.join(parent_directory, "tools")
print(f"Tool Request: {tool_request}")
rephrased_tool_request=rephrase_tool(tool_request)
ifrephrased_tool_request:
print(f"Generating proposed tool...")
proposed_tool, tool_name=generate_tool(rephrased_tool_request) # Unpack the tupleprint(f"Proposed tool: {proposed_tool}")
ifproposed_tool:
match=re.search(r"def\s+(\w+(?:_\w+)*)\(", proposed_tool) # Updated regex patternprint(f"Match: {match}")
ifmatch:
tool_name=match.group(1)
st.write(f"Proposed tool: {tool_name}")
st.code(proposed_tool)
withst.form(key=f"export_form_{tool_name}"):
submit_export=st.form_submit_button("Export/Write")
ifsubmit_export:
print(f"Exporting tool {tool_name}")
export_tool_as_skill(tool_name, proposed_tool)
st.success(f"tool {tool_name} exported to Autogen successfully!")
# Clear the tool_request input and hide the input fieldst.session_state.show_tool_input=Falsest.session_state.tool_request=""# Clear the 'proposed_tool' and 'tool_name' from the session statest.session_state.proposed_tool=Nonest.session_state.tool_name=Nonest.session_state.tool_processed=True# Set the flag to indicate processing is completest.experimental_rerun()
withst.form(key=f"discard_form_{tool_name}"):
submit_discard=st.form_submit_button("Clear")
ifsubmit_discard:
st.warning("tool discarded.")
# Clear the tool_request input and hide the input fieldst.session_state.show_tool_input=Falsest.session_state.tool_request=""# Clear the 'proposed_tool' and 'tool_name' from the session statest.session_state.proposed_tool=Nonest.session_state.tool_name=Nonest.session_state.tool_processed=True# Set the flag to indicate processing is completest.experimental_rerun()
else:
st.error("Failed to extract tool name from the proposed tool.")
else:
st.error("No proposed tool generated.")
defrephrase_tool(tool_request):
print("Debug: Rephrasing tool: ", tool_request)
temperature_value=st.session_state.get('temperature', 0.1)
llm_request_data= {
"model": st.session_state.model,
"temperature": temperature_value,
"max_tokens": st.session_state.max_tokens,
"top_p": 1,
"stop": "TERMINATE",
"messages": [
{
"role": "user",
"content": f""" Act as a professional tool creator and rephrase the following tool request into an optimized prompt: tool request: "{tool_request}" Rephrased: """
}
]
}
api_key=get_api_key()
llm_provider=get_llm_provider(api_key=api_key)
response=llm_provider.send_request(llm_request_data)
ifresponse.status_code==200:
response_data=llm_provider.process_response(response)
if"choices"inresponse_dataandresponse_data["choices"]:
rephrased=response_data["choices"][0]["message"]["content"].strip()
print(f"Debug: Rephrased tool: {rephrased}")
returnrephrasedreturnNonedefshow_tools():
withst.expander("Tools"):
selected_tools= []
select_all=st.checkbox("Select All", key="select_all_tools")
foridx, tool_modelinenumerate(st.session_state.tool_models):
tool_name=tool_model.nameifselect_all:
tool_checkbox=st.checkbox(f"Add {tool_name} tool to all agents", value=True, key=f"tool_{tool_name}_{idx}")
else:
tool_checkbox=st.checkbox(f"Add {tool_name} tool to all agents", value=False, key=f"tool_{tool_name}_{idx}")
iftool_checkbox:
selected_tools.append(tool_name)
ifselect_all:
st.session_state.selected_tools= [tool_model.namefortool_modelinst.session_state.tool_models]
else:
st.session_state.selected_tools=selected_tools# Update the 'Tools' property of each agent with the selected toolsforagentinst.session_state.agents:
agent['tools'] = [tool_model.namefortool_modelinst.session_state.tool_modelsiftool_model.nameinst.session_state.selected_tools]
regenerate_zip_files()
ifst.button("Add tool", key="add_tool_button"):
st.session_state.show_tool_input=Truest.session_state.tool_request=""st.session_state.tool_processed=Falseifst.session_state.get('show_tool_input'):
tool_request=st.text_input("Need a new tool? Describe what it should do:", key="tool_request_input")
iftool_request:
st.session_state.tool_request=tool_request# Store in a separate session state variableprocess_tool_request() # Pass the tool_request to the process_tool_request functionifselected_toolsor'proposed_tool'inst.session_state:
ifst.button("Attempt to Export tool to Autogen (experimental)", key=f"export_button_{st.session_state.tool_name}"):
tool_name=st.session_state.tool_nameproposed_tool=st.session_state.proposed_toolprint(f"Exporting tool {tool_name} to Autogen")
export_tool_as_skill(tool_name, proposed_tool)
st.success(f"tool {tool_name} exported to Autogen successfully!")
# Clear the tool_request input and hide the input fieldst.session_state.show_tool_input=Falsest.session_state.tool_request=""st.experimental_rerun()
AutoGroq\utils\ui_utils.py
importdatetimeimportioimportjsonimportosimportpandasaspdimportreimportstreamlitasstimporttimeimportzipfilefromconfigimportAPI_URL, DEBUG, LLM_PROVIDER, MAX_RETRIES, MODEL_CHOICES, MODEL_TOKEN_LIMITS, RETRY_DELAYfromcurrent_projectimportCurrent_Projectfromdatetimeimportdatefrommodels.agent_base_modelimportAgentBaseModelfrommodels.tool_base_modelimportToolBaseModelfrommodels.workflow_base_modelimportWorkflowBaseModelfrompromptsimportcreate_project_manager_prompt, get_agents_prompt, get_rephrased_user_promptfromtools.fetch_web_contentimportfetch_web_contentfromutils.api_utilsimportget_llm_providerfromutils.auth_utilsimportget_api_keyfromutils.db_utilsimportexport_to_autogenfromutils.file_utilsimportzip_files_in_memoryfromutils.workflow_utilsimportget_workflow_from_agentsfrompromptsimportget_moderator_promptdefcreate_project_manager(rephrased_text, api_url):
print(f"Creating Project Manager; API_URL: {api_url}")
temperature_value=st.session_state.get('temperature', 0.1)
llm_request_data= {
"model": st.session_state.model,
"temperature": temperature_value,
"max_tokens": st.session_state.max_tokens,
"top_p": 1,
"stop": "TERMINATE",
"messages": [
{
"role": "user",
"content": create_project_manager_prompt(rephrased_text)
}
]
}
api_key=get_api_key()
llm_provider=get_llm_provider(api_key=api_key)
response=llm_provider.send_request(llm_request_data)
ifresponse.status_code==200:
response_data=llm_provider.process_response(response)
if"choices"inresponse_dataandresponse_data["choices"]:
content=response_data["choices"][0]["message"]["content"]
returncontent.strip()
returnNonedefdisplay_api_key_input():
llm=LLM_PROVIDER.upper()
api_key=st.text_input(f"Enter your {llm}_API_KEY:", type="password", value="", key="api_key_input")
ifapi_key:
st.session_state[f"{LLM_PROVIDER.upper()}_API_KEY"] =api_keyst.success("API Key entered successfully.")
returnapi_keydefdisplay_discussion_and_whiteboard():
discussion_history=get_discussion_history()
tabs=st.tabs(["Most Recent Comment", "Whiteboard", "Discussion History", "Deliverables", "Downloads", "Debug"])
withtabs[0]:
st.text_area("Most Recent Comment", value=st.session_state.last_comment, height=400, key="discussion")
withtabs[1]:
st.text_area("Whiteboard", value=st.session_state.whiteboard, height=400, key="whiteboard")
withtabs[2]:
st.write(discussion_history)
withtabs[3]:
if"current_project"inst.session_state:
current_project=st.session_state.current_projectforindex, deliverableinenumerate(current_project.deliverables):
ifdeliverable["text"].strip(): # Check if the deliverable text is not emptycheckbox_key=f"deliverable_{index}"done=st.checkbox(deliverable["text"], value=deliverable["done"], key=checkbox_key)
ifdone!=deliverable["done"]:
ifdone:
current_project.mark_deliverable_done(index)
else:
current_project.mark_deliverable_undone(index)
withtabs[4]:
display_download_button()
ifst.button("Export to Autogen"):
export_to_autogen()
withtabs[5]:
ifDEBUG:
if"project_model"inst.session_state:
project_model=st.session_state.project_modelwithst.expander("Project Details"):
st.write("ID:", project_model.id)
st.write("Re-engineered Prompt:", project_model.re_engineered_prompt)
st.write("Deliverables:", project_model.deliverables)
st.write("Created At:", project_model.created_at)
st.write("Updated At:", project_model.updated_at)
st.write("User ID:", project_model.user_id)
st.write("Name:", project_model.name)
st.write("Description:", project_model.description)
st.write("Status:", project_model.status)
st.write("Due Date:", project_model.due_date)
st.write("Priority:", project_model.priority)
st.write("Tags:", project_model.tags)
st.write("Attachments:", project_model.attachments)
st.write("Notes:", project_model.notes)
st.write("Collaborators:", project_model.collaborators)
st.write("Workflows:", project_model.workflows)
st.write("Tools:", project_model.tools)
if"project_model"inst.session_stateandst.session_state.project_model.workflows:
workflow_data=st.session_state.project_model.workflows[0]
workflow=WorkflowBaseModel.from_dict({**workflow_data, 'settings': workflow_data.get('settings', {})})
withst.expander("Workflow Details"):
st.write("ID:", workflow.id)
st.write("Name:", workflow.name)
st.write("Description:", workflow.description)
# Display the agents in the workflowst.write("Agents:")
foragentinworkflow.receiver.groupchat_config["agents"]:
st.write(f"- {agent['config']['name']}")
st.write("Settings:", workflow.settings)
st.write("Created At:", workflow.created_at)
st.write("Updated At:", workflow.updated_at)
st.write("User ID:", workflow.user_id)
st.write("Type:", workflow.type)
st.write("Summary Method:", workflow.summary_method)
st.write("Sender:", workflow.sender)
st.write("Receiver:", workflow.receiver)
st.write("Groupchat Config:", workflow.groupchat_config)
st.write("Timestamp:", workflow.timestamp)
else:
st.warning("No workflow data available.")
if"project_model"inst.session_stateandst.session_state.project_model.tools:
# show toolstool_data=st.session_state.project_model.tools[0]
tool=ToolBaseModel.from_dict({**tool_data, 'config': tool_data.get('config', {})})
withst.expander("Tool Details"):
st.write("ID:", tool.id)
st.write("Name:", tool.name)
st.write("Description:", tool.description)
st.write("Settings:", tool.settings)
st.write("Created At:", tool.created_at)
st.write("Updated At:", tool.updated_at)
st.write("User ID:", tool.user_id)
st.write("Type:", tool.type)
st.write("Config:", tool.config)
else:
st.warning("No tool data available.")
if"agents"inst.session_state:
withst.expander("Agent Details"):
agent_names= ["Select one..."] + [agent.get('name', f"Agent {index+1}") forindex, agentinenumerate(st.session_state.agents)]
selected_agent=st.selectbox("Select an agent:", agent_names)
ifselected_agent!="Select one...":
agent_index=agent_names.index(selected_agent) -1agent=st.session_state.agents[agent_index]
st.subheader(selected_agent)
st.write("ID:", agent.get('id'))
st.write("Name:", agent.get('name'))
st.write("Description:", agent.get('description'))
# Display the selected tools for the agentst.write("Tools:", ", ".join(agent.get('tools', [])))
st.write("Config:", agent.get('config'))
st.write("Created At:", agent.get('created_at'))
st.write("Updated At:", agent.get('updated_at'))
st.write("User ID:", agent.get('user_id'))
st.write("Workflows:", agent.get('workflows'))
st.write("Type:", agent.get('type'))
st.write("Models:", agent.get('models'))
st.write("Verbose:", agent.get('verbose'))
st.write("Allow Delegation:", agent.get('allow_delegation'))
st.write("New Description:", agent.get('new_description'))
st.write("Timestamp:", agent.get('timestamp'))
else:
st.warning("No agent data available.")
if"tool"inst.session_state:
withst.expander("Tool Details"):
tool_names= ["Select one..."] + [tool.get('name', f"Tool {index+1}") forindex, toolinenumerate(st.session_state.tools)]
selected_tool=st.selectbox("Select a tool:", tool_names)
ifselected_tool!="Select one...":
tool_index=tool_names.index(selected_tool) -1tool=st.session_state.tools[tool_index]
ifisinstance(tool, dict):
st.subheader(selected_tool)
st.write("ID:", tool.get('id'))
st.write("Name:", tool.get('name'))
st.write("Description:", tool.get('description'))
st.write("Created At:", tool.get('created_at'))
st.write("Updated At:", tool.get('updated_at'))
st.write("User ID:", tool.get('user_id'))
st.markdown(tool.get('content', '')) # Display content as markdownst.write("Secrets:", tool.get('secrets'))
st.write("Libraries:", tool.get('libraries'))
st.write("File Name:", tool.get('file_name'))
st.write("Timestamp:", tool.get('timestamp'))
st.write("Title:", tool.get('title'))
else:
st.warning(f"{selected_tool} is not a dictionary.")
else:
st.warning("No tool data available.")
else:
st.warning("Debugging disabled.")
defdisplay_download_button():
col1, col2=st.columns(2)
withcol1:
st.download_button(
label="Download Autogen Files",
data=st.session_state.autogen_zip_buffer,
file_name="autogen_files.zip",
mime="application/zip",
key=f"autogen_download_button_{int(time.time())}"# Generate a unique key based on timestamp
)
withcol2:
st.download_button(
label="Download CrewAI Files",
data=st.session_state.crewai_zip_buffer,
file_name="crewai_files.zip",
mime="application/zip",
key=f"crewai_download_button_{int(time.time())}"# Generate a unique key based on timestamp
)
defdisplay_download_and_export_buttons():
display_download_button()
ifst.button("Export to Autogen"):
export_to_autogen()
defdisplay_goal():
if"current_project"inst.session_state:
current_project=st.session_state.current_projectifcurrent_project.re_engineered_prompt:
st.expander("Goal").markdown(f"**OUR CURRENT GOAL:**\n\r{current_project.re_engineered_prompt}")
defdisplay_user_input():
user_input=st.text_area("Additional Input:", value=st.session_state.get("user_input", ""), key="user_input_widget", height=100, on_change=update_user_input)
reference_url=st.text_input("URL:", key="reference_url_widget")
ifuser_input:
url_pattern=re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+')
url_match=url_pattern.search(user_input)
ifurl_match:
url=url_match.group()
if"reference_html"notinst.session_stateorurlnotinst.session_state.reference_html:
html_content=fetch_web_content(url)
ifhtml_content:
st.session_state.reference_html[url] =html_contentelse:
st.warning("Failed to fetch HTML content.")
else:
st.session_state.reference_html= {}
else:
st.session_state.reference_html= {}
else:
st.session_state.reference_html= {}
returnuser_input, reference_urldefdisplay_reset_and_upload_buttons():
col1, col2=st.columns(2)
withcol1:
ifst.button("Reset", key="reset_button"):
# Define the keys of session state variables to clearkeys_to_reset= [
"rephrased_request", "discussion", "whiteboard", "user_request",
"user_input", "agents", "zip_buffer", "crewai_zip_buffer",
"autogen_zip_buffer", "uploaded_file_content", "discussion_history",
"last_comment", "user_api_key", "reference_url"
]
# Reset each specified keyforkeyinkeys_to_reset:
ifkeyinst.session_state:
delst.session_state[key]
# Additionally, explicitly reset user_input to an empty stringst.session_state.user_input=""st.session_state.show_begin_button=Truest.experimental_rerun()
withcol2:
uploaded_file=st.file_uploader("Upload a sample .csv of your data (optional)", type="csv")
ifuploaded_fileisnotNone:
try:
# Attempt to read the uploaded file as a DataFramedf=pd.read_csv(uploaded_file).head(5)
# Display the DataFrame in the appst.write("Data successfully uploaded and read as DataFrame:")
st.dataframe(df)
# Store the DataFrame in the session statest.session_state.uploaded_data=dfexceptExceptionase:
st.error(f"Error reading the file: {e}")
defdisplay_user_request_input():
ifst.session_state.show_request_input:
ifst.session_state.get("previous_user_request") !=st.session_state.get("user_request", ""):
st.session_state.previous_user_request=st.session_state.get("user_request", "")
ifst.session_state.get("user_request", ""):
handle_user_request(st.session_state)
else:
st.session_state.agents= []
st.session_state.show_request_input=Falsest.experimental_rerun()
defextract_code_from_response(response):
code_pattern=r"```(.*?)```"code_blocks=re.findall(code_pattern, response, re.DOTALL)
html_pattern=r"<html.*?>.*?</html>"html_blocks=re.findall(html_pattern, response, re.DOTALL|re.IGNORECASE)
js_pattern=r"<script.*?>.*?</script>"js_blocks=re.findall(js_pattern, response, re.DOTALL|re.IGNORECASE)
css_pattern=r"<style.*?>.*?</style>"css_blocks=re.findall(css_pattern, response, re.DOTALL|re.IGNORECASE)
all_code_blocks=code_blocks+html_blocks+js_blocks+css_blocksunique_code_blocks=list(set(all_code_blocks))
return"\n\n".join(unique_code_blocks)
defextract_json_objects(json_string):
objects= []
stack= []
start_index=0fori, charinenumerate(json_string):
ifchar=="{":
ifnotstack:
start_index=istack.append(char)
elifchar=="}":
ifstack:
stack.pop()
ifnotstack:
objects.append(json_string[start_index:i+1])
parsed_objects= []
forobj_strinobjects:
try:
parsed_obj=json.loads(obj_str)
parsed_objects.append(parsed_obj)
exceptjson.JSONDecodeErrorase:
print(f"Error parsing JSON object: {e}")
print(f"JSON string: {obj_str}")
returnparsed_objectsdefget_agents_from_text(text, api_url, max_retries=MAX_RETRIES, retry_delay=RETRY_DELAY):
print("Getting agents from text...")
temperature_value=st.session_state.get('temperature', 0.5)
llm_request_data= {
"model": st.session_state.model,
"temperature": temperature_value,
"max_tokens": st.session_state.max_tokens,
"top_p": 1,
"stop": "TERMINATE",
"messages": [
{
"role": "system",
"content": get_agents_prompt()
},
{
"role": "user",
"content": text
}
]
}
api_key=get_api_key()
llm_provider=get_llm_provider(api_key=api_key)
retry_count=0whileretry_count<max_retries:
try:
response=llm_provider.send_request(llm_request_data)
print(f"Response received. Status Code: {response.status_code}")
ifresponse.status_code==200:
print("Request successful. Parsing response...")
response_data=llm_provider.process_response(response)
print(f"Response Data: {json.dumps(response_data, indent=2)}")
if"choices"inresponse_dataandresponse_data["choices"]:
content=response_data["choices"][0]["message"]["content"]
print(f"Content: {content}")
# Preprocess the JSON stringcontent=content.replace("\\n", "\n").replace('\\"', '"')
try:
json_data=json.loads(content)
ifisinstance(json_data, list):
autogen_agents= []
crewai_agents= []
forindex, agent_datainenumerate(json_data, start=1):
expert_name=agent_data.get('expert_name', '')
ifnotexpert_name:
print("Missing agent name. Retrying...")
retry_count+=1time.sleep(retry_delay)
continuedescription=agent_data.get('description', '')
tools=agent_data.get('tools', [])
agent_tools=st.session_state.selected_toolscurrent_timestamp=datetime.datetime.now().isoformat()
autogen_agent_data= {
"id": index,
"name": expert_name,
"type": "assistant",
"config": {
"name": expert_name,
"llm_config": {
"config_list": [
{
"user_id": "default",
"timestamp": current_timestamp,
"model": st.session_state.model,
"base_url": None,
"api_type": None,
"api_version": None,
"description": "OpenAI model configuration"
}
],
"temperature": temperature_value,
"cache_seed": 42,
"timeout": 600,
"max_tokens": MODEL_TOKEN_LIMITS.get(st.session_state.model, 4096),
"extra_body": None
},
"human_input_mode": "NEVER",
"max_consecutive_auto_reply": 8,
"system_message": f"You are a helpful assistant that can act as {expert_name} who {description}."
},
"description": description,
"tools": agent_tools,
"created_at": current_timestamp,
"updated_at": current_timestamp,
"user_id": "default",
"models": [modelformodelinMODEL_CHOICESifmodel!="default"],
"verbose": False,
"allow_delegation": False,
"timestamp": current_timestamp
}
crewai_agent_data= {
"name": expert_name,
"description": description,
"tools": agent_tools,
"verbose": True,
"allow_delegation": True
}
autogen_agents.append(autogen_agent_data)
crewai_agents.append(crewai_agent_data)
print(f"AutoGen Agents: {autogen_agents}")
print(f"CrewAI Agents: {crewai_agents}")
st.session_state.workflow.agents= [AgentBaseModel.from_dict(agent) foragentinautogen_agents]
returnautogen_agents, crewai_agentselse:
print("Invalid JSON format. Expected a list of agents.")
return [], []
exceptjson.JSONDecodeErrorase:
print(f"Error parsing JSON: {e}")
print(f"Content: {content}")
json_data=extract_json_objects(content)
ifjson_data:
autogen_agents= []
crewai_agents= []
forindex, agent_datainenumerate(json_data, start=1):
expert_name=agent_data.get('expert_name', '')
ifnotexpert_name:
print("Missing agent name. Retrying...")
retry_count+=1time.sleep(retry_delay)
continuedescription=agent_data.get('description', '')
tools=agent_data.get('tools', [])
agent_tools=st.session_state.selected_toolscurrent_timestamp=datetime.datetime.now().isoformat()
autogen_agent_data= {
"id": index,
"name": expert_name,
"type": "assistant",
"config": {
"name": expert_name,
"llm_config": {
"config_list": [
{
"user_id": "default",
"timestamp": current_timestamp,
"model": st.session_state.model,
"base_url": None,
"api_type": None,
"api_version": None,
"description": "OpenAI model configuration"
}
],
"temperature": temperature_value,
"timeout": 600,
"cache_seed": 42
},
"human_input_mode": "NEVER",
"max_consecutive_auto_reply": 8,
"system_message": f"You are a helpful assistant that can act as {expert_name} who {description}."
},
"description": description,
"tools": agent_tools,
"created_at": current_timestamp,
"updated_at": current_timestamp,
"user_id": "default",
"models": [modelformodelinMODEL_CHOICESifmodel!="default"],
"verbose": False,
"allow_delegation": False,
"timestamp": current_timestamp
}
crewai_agent_data= {
"name": expert_name,
"description": description,
"tools": agent_tools,
"verbose": True,
"allow_delegation": True
}
autogen_agents.append(autogen_agent_data)
crewai_agents.append(crewai_agent_data)
print(f"AutoGen Agents: {autogen_agents}")
print(f"CrewAI Agents: {crewai_agents}")
returnautogen_agents, crewai_agentselse:
print("Failed to extract JSON objects from content.")
return [], []
else:
print("No agents data found in response")
else:
print(f"API request failed with status code {response.status_code}: {response.text}")
exceptExceptionase:
print(f"Error making API request: {e}")
retry_count+=1time.sleep(retry_delay)
print(f"Maximum retries ({max_retries}) exceeded. Failed to retrieve valid agent names.")
return [], []
defget_discussion_history():
returnst.session_state.discussion_historydefhandle_user_request(session_state):
print("Debug: Handling user request for session state: ", session_state)
user_request=session_state.user_requestmax_retries=MAX_RETRIESretry_delay=RETRY_DELAYforretryinrange(max_retries):
try:
print("Debug: Sending request to rephrase_prompt")
model=session_state.modelprint(f"Debug: Model: {model}")
rephrased_text=rephrase_prompt(user_request, model)
print(f"Debug: Rephrased text: {rephrased_text}")
ifrephrased_text:
session_state.rephrased_request=rephrased_textbreakelse:
print("Error: Failed to rephrase the user request.")
st.warning("Failed to rephrase the user request. Please try again.")
returnexceptExceptionase:
print(f"Error occurred in handle_user_request: {str(e)}")
ifretry<max_retries-1:
print(f"Retrying in {retry_delay} second(s)...")
time.sleep(retry_delay)
else:
print("Max retries exceeded.")
st.warning("An error occurred. Please try again.")
returnif"rephrased_request"notinsession_state:
st.warning("Failed to rephrase the user request. Please try again.")
returnsession_state.project_model.description=session_state.user_requestrephrased_text=session_state.rephrased_requestsession_state.project_model.set_re_engineered_prompt(rephrased_text)
if"project_manager_output"notinsession_state:
project_manager_output=create_project_manager(rephrased_text, API_URL)
ifnotproject_manager_output:
print("Error: Failed to create Project Manager.")
st.warning("Failed to create Project Manager. Please try again.")
returnsession_state.project_manager_output=project_manager_outputcurrent_project=Current_Project()
current_project.set_re_engineered_prompt(rephrased_text)
deliverables_patterns= [
r"(?:Deliverables|Key Deliverables):\n(.*?)(?=Timeline|Team of Experts|$)",
r"\*\*(?:Deliverables|Key Deliverables):\*\*\n(.*?)(?=\*\*Timeline|\*\*Team of Experts|$)"
]
deliverables_text=Noneforpatternindeliverables_patterns:
match=re.search(pattern, project_manager_output, re.DOTALL)
ifmatch:
deliverables_text=match.group(1).strip()
breakifdeliverables_text:
deliverables=re.findall(r'\d+\.\s*(.*)', deliverables_text)
fordeliverableindeliverables:
current_project.add_deliverable(deliverable.strip())
session_state.project_model.add_deliverable(deliverable.strip())
else:
print("Warning: 'Deliverables' or 'Key Deliverables' section not found in Project Manager's output.")
session_state.current_project=current_projectupdate_discussion_and_whiteboard("Project Manager", project_manager_output, "")
else:
project_manager_output=session_state.project_manager_outputteam_of_experts_patterns= [
r"\*\*Team of Experts:\*\*\n(.*)",
r"Team of Experts:\n(.*)"
]
team_of_experts_text=Noneforpatterninteam_of_experts_patterns:
match=re.search(pattern, project_manager_output, re.DOTALL)
ifmatch:
team_of_experts_text=match.group(1).strip()
breakifteam_of_experts_text:
autogen_agents, crewai_agents=get_agents_from_text(team_of_experts_text, API_URL)
print(f"Debug: AutoGen Agents: {autogen_agents}")
print(f"Debug: CrewAI Agents: {crewai_agents}")
ifnotautogen_agents:
print("Error: No agents created.")
st.warning("Failed to create agents. Please try again.")
returnsession_state.agents=autogen_agentssession_state.workflow.agents=session_state.agentsprint(f"Debug: session_state.workflow.agents: {session_state.workflow.agents}")
# Generate the workflow dataworkflow_data, _=get_workflow_from_agents(autogen_agents)
workflow_data["created_at"] =datetime.datetime.now().isoformat()
print(f"Debug: Workflow data: {workflow_data}")
print(f"Debug: CrewAI agents: {crewai_agents}")
# Update the project session state with the workflow datasession_state.project_model.workflows= [workflow_data]
print("Debug: Agents in session state project workflow:")
foragentinworkflow_data["receiver"]["groupchat_config"]["agents"]:
print(agent)
autogen_zip_buffer, crewai_zip_buffer=zip_files_in_memory(workflow_data)
session_state.autogen_zip_buffer=autogen_zip_buffersession_state.crewai_zip_buffer=crewai_zip_bufferelse:
print("Error: 'Team of Experts' section not found in Project Manager's output.")
st.warning("Failed to extract the team of experts from the Project Manager's output. Please try again.")
returndefkey_prompt():
api_key=get_api_key()
ifapi_keyisNone:
api_key=display_api_key_input()
ifapi_keyisNone:
llm=LLM_PROVIDER.upper()
st.warning(f"{llm}_API_KEY not found. Please enter your API key.")
returndefrephrase_prompt(user_request, model, max_tokens=None, llm_provider=None, provider=None):
print("Executing rephrase_prompt()")
refactoring_prompt=get_rephrased_user_prompt(user_request)
ifllm_providerisNone:
# Use the existing functionality for non-CLI callsapi_key=get_api_key()
llm_provider=get_llm_provider(api_key=api_key, provider=provider)
ifmax_tokensisNone:
max_tokens=MODEL_TOKEN_LIMITS.get(model, 4096)
llm_request_data= {
"model": model,
"temperature": 0.1,
"max_tokens": max_tokens,
"top_p": 1,
"stop": "TERMINATE",
"messages": [
{
"role": "user",
"content": refactoring_prompt,
},
],
}
try:
print("Sending request to LLM API...")
print(f"Request Details:")
print(f"Provider: {provider}")
print(f"llm_provider: {llm_provider}")
print(f" Model: {model}")
print(f" Max Tokens: {max_tokens}")
print(f" Messages: {llm_request_data['messages']}")
response=llm_provider.send_request(llm_request_data)
print(f"Response received. Status Code: {response.status_code}")
print(f"Response Content: {response.text}")
ifresponse.status_code==200:
print("Request successful. Parsing response...")
response_data=llm_provider.process_response(response)
print(f"Response Data: {json.dumps(response_data, indent=2)}")
if"choices"inresponse_dataandlen(response_data["choices"]) >0:
rephrased=response_data["choices"][0]["message"]["content"]
returnrephrased.strip()
else:
print("Error: Unexpected response format. 'choices' field missing or empty.")
returnNoneelse:
print(f"Request failed. Status Code: {response.status_code}")
print(f"Response Content: {response.text}")
returnNoneexceptExceptionase:
print(f"An error occurred: {str(e)}")
returnNonedefsave_tool(tool_name, edited_tool):
withopen(f"{tool_name}.py", "w") asf:
f.write(edited_tool)
st.success(f"tool {tool_name} saved successfully!")
defselect_model():
selected_model=st.selectbox(
'Select Model',
options=list(MODEL_TOKEN_LIMITS.keys()),
index=0,
key='model_selection'
)
st.session_state.model=selected_modelst.session_state.max_tokens=MODEL_TOKEN_LIMITS[selected_model]
defset_css():
parent_directory=os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
css_file=os.path.join(parent_directory, "style.css")
ifos.path.exists(css_file):
withopen(css_file) asf:
st.markdown(f'<style>{f.read()}</style>', unsafe_allow_html=True)
else:
st.error(f"CSS file not found: {os.path.abspath(css_file)}")
defset_temperature():
temperature=st.slider(
"Set Temperature",
min_value=0.0,
max_value=1.0,
value=st.session_state.get('temperature', 0.3),
step=0.01,
key='temperature'
)
defshow_interfaces():
st.markdown('<div class="discussion-whiteboard">', unsafe_allow_html=True)
display_discussion_and_whiteboard()
st.markdown('</div>', unsafe_allow_html=True)
st.markdown('<div class="user-input">', unsafe_allow_html=True)
auto_moderate=st.checkbox("Auto-moderate (slow, eats tokens, but very cool)", key="auto_moderate", on_change=trigger_moderator_agent_if_checked)
ifauto_moderateandnotst.session_state.get("user_input"):
moderator_response=trigger_moderator_agent()
ifmoderator_response:
st.session_state.user_input=moderator_responseuser_input, reference_url=display_user_input()
st.markdown('</div>', unsafe_allow_html=True)
deftrigger_moderator_agent():
goal=st.session_state.current_project.re_engineered_promptlast_speaker=st.session_state.last_agentlast_comment=st.session_state.last_commentdiscussion_history=st.session_state.discussion_historyteam_members= []
foragentinst.session_state.agents:
team_members.append(f"{agent['config']['name']}: {agent['description']}")
team_members_str="\n".join(team_members)
moderator_prompt=get_moderator_prompt(discussion_history, goal, last_comment, last_speaker,team_members_str)
api_key=get_api_key()
llm_provider=get_llm_provider(api_key=api_key)
llm_request_data= {
"model": st.session_state.model,
"temperature": st.session_state.get('temperature', 0.3),
"max_tokens": st.session_state.max_tokens,
"top_p": 1,
"stop": "TERMINATE",
"messages": [
{
"role": "user",
"content": moderator_prompt
}
]
}
# wait for RETRY_DELAY secondsretry_delay=RETRY_DELAYtime.sleep(retry_delay)
response=llm_provider.send_request(llm_request_data)
ifresponse.status_code==200:
response_data=llm_provider.process_response(response)
if"choices"inresponse_dataandresponse_data["choices"]:
content=response_data["choices"][0]["message"]["content"]
returncontent.strip()
returnNonedeftrigger_moderator_agent_if_checked():
ifst.session_state.get("auto_moderate", False):
trigger_moderator_agent()
defupdate_discussion_and_whiteboard(agent_name, response, user_input):
ifuser_input:
user_input_text=f"\n\n\n\n{user_input}\n\n"st.session_state.discussion_history+=user_input_textif"last_agent"notinst.session_stateorst.session_state.last_agent!=agent_name:
response_text=f"{agent_name}:\n\n{response}\n\n===\n\n"else:
response_text=f"{response}\n\n===\n\n"st.session_state.discussion_history+=response_textcode_blocks=extract_code_from_response(response)
st.session_state.whiteboard=code_blocksst.session_state.last_agent=agent_namest.session_state.last_comment=response_textifst.session_state.get("auto_moderate", False):
moderator_response=trigger_moderator_agent()
ifmoderator_response:
st.session_state.user_input=moderator_responseelse:
st.session_state.user_input=""# Update the 'Additional Input:' text area with the moderator response or an empty string# st.text_area("Additional Input:", value=st.session_state.user_input, key="user_input_widget_auto_moderate", height=100, on_change=update_user_input)defupdate_user_input():
ifst.session_state.get("auto_moderate"):
st.session_state.user_input=st.session_state.user_input_widget_auto_moderateelse:
st.session_state.user_input=st.session_state.user_input_widget
AutoGroq\utils\workflow_utils.py
importdatetimeimportstreamlitasstfromconfigimportMODEL_TOKEN_LIMITSfromutils.agent_utilsimportcreate_agent_datafromutils.file_utilsimportsanitize_textdefget_workflow_from_agents(agents):
current_timestamp=datetime.datetime.now().isoformat()
temperature_value=st.session_state.get('temperature', 0.3)
workflow= {
"name": "AutoGroq Workflow",
"description": "Workflow auto-generated by AutoGroq.",
"sender": {
"type": "userproxy",
"config": {
"name": "userproxy",
"llm_config": False,
"human_input_mode": "NEVER",
"max_consecutive_auto_reply": 5,
"system_message": "You are a helpful assistant.",
"is_termination_msg": None,
"code_execution_config": {
"work_dir": None,
"use_docker": False
},
"default_auto_reply": "",
"description": None
},
"timestamp": current_timestamp,
"user_id": "default",
"tools": []
},
"receiver": {
"type": "groupchat",
"config": {
"name": "group_chat_manager",
"llm_config": {
"config_list": [
{
"user_id": "default",
"timestamp": current_timestamp,
"model": st.session_state.model,
"base_url": None,
"api_type": None,
"api_version": None,
"description": "OpenAI model configuration"
}
],
"temperature": temperature_value,
"cache_seed": 42,
"timeout": 600,
"max_tokens": MODEL_TOKEN_LIMITS.get(st.session_state.model, 4096),
"extra_body": None
},
"human_input_mode": "NEVER",
"max_consecutive_auto_reply": 10,
"system_message": "Group chat manager",
"is_termination_msg": None,
"code_execution_config": None,
"default_auto_reply": "",
"description": None
},
"groupchat_config": {
"agents": [],
"admin_name": "Admin",
"messages": [],
"max_round": 10,
"speaker_selection_method": "auto",
"allow_repeat_speaker": True
},
"timestamp": current_timestamp,
"user_id": "default",
"tools": []
},
"type": "groupchat",
"user_id": "default",
"timestamp": current_timestamp,
"summary_method": "last"
}
forindex, agentinenumerate(agents):
agent_name=agent["config"]["name"]
description=agent["description"]
formatted_agent_name=sanitize_text(agent_name).lower().replace(' ', '_')
sanitized_description=sanitize_text(description)
system_message=f"You are a helpful assistant that can act as {agent_name} who {sanitized_description}."ifindex==0:
other_agent_names= [sanitize_text(a['config']['name']).lower().replace(' ', '_') forainagents[1:]]
system_message+=f" You are the primary coordinator who will receive suggestions or advice from all the other agents ({', '.join(other_agent_names)}). You must ensure that the final response integrates the suggestions from other agents or team members. YOUR FINAL RESPONSE MUST OFFER THE COMPLETE RESOLUTION TO THE USER'S REQUEST. When the user's request has been satisfied and all perspectives are integrated, you can respond with TERMINATE."agent_config= {
"type": "assistant",
"config": {
"name": formatted_agent_name,
"llm_config": {
"config_list": [
{
"user_id": "default",
"timestamp": current_timestamp,
"model": st.session_state.model,
"base_url": None,
"api_type": None,
"api_version": None,
"description": "OpenAI model configuration"
}
],
"temperature": temperature_value,
"cache_seed": 42,
"timeout": 600,
"max_tokens": MODEL_TOKEN_LIMITS.get(st.session_state.model, 4096),
"extra_body": None
},
"human_input_mode": "NEVER",
"max_consecutive_auto_reply": 8,
"system_message": system_message,
"is_termination_msg": None,
"code_execution_config": None,
"default_auto_reply": "",
"description": None
},
"timestamp": current_timestamp,
"user_id": "default",
"tools": [] # Set tools to null only in the workflow JSON
}
workflow["receiver"]["groupchat_config"]["agents"].append(agent_config)
print("Debug: Workflow agents assigned:")
foragentinworkflow["receiver"]["groupchat_config"]["agents"]:
print(agent)
crewai_agents= []
foragentinagents:
_, crewai_agent_data=create_agent_data(agent)
crewai_agents.append(crewai_agent_data)
returnworkflow, crewai_agents