-
Notifications
You must be signed in to change notification settings - Fork 0
/
appollama.py
138 lines (116 loc) · 5.2 KB
/
appollama.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import functools, operator, requests, os, json
from bs4 import BeautifulSoup
from duckduckgo_search import DDGS
from langchain.agents import AgentExecutor, create_openai_tools_agent
from langchain_core.messages import BaseMessage, HumanMessage
from langchain.output_parsers.openai_functions import JsonOutputFunctionsParser
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langgraph.graph import StateGraph, END
from langchain.tools import tool
from langchain_openai import ChatOpenAI
from typing import Annotated, Any, Dict, List, Optional, Sequence, TypedDict
from langchain_experimental.llms.ollama_functions import OllamaFunctions
import gradio as gr
# Set environment variables
#os.environ["LANGCHAIN_TRACING_V2"] = "true"
#os.environ["LANGCHAIN_PROJECT"] = "LangGraph Research Agents"
# Initialize model
#llm = ChatOpenAI(model="gpt-4-turbo-preview")
llm = OllamaFunctions(model="mistral")
# 1. Define custom tools
@tool("internet_search", return_direct=False)
def internet_search(query: str) -> str:
"""Searches the internet using DuckDuckGo."""
with DDGS() as ddgs:
results = [r for r in ddgs.text(query, max_results=5)]
return results if results else "No results found."
@tool("process_content", return_direct=False)
def process_content(url: str) -> str:
"""Processes content from a webpage."""
response = requests.get(url)
soup = BeautifulSoup(response.content, 'html.parser')
return soup.get_text()
tools = [internet_search, process_content]
# 2. Agents
# Helper function for creating agents
def create_agent(llm: ChatOpenAI, tools: list, system_prompt: str):
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
MessagesPlaceholder(variable_name="messages"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
agent = create_openai_tools_agent(llm, tools, prompt)
executor = AgentExecutor(agent=agent, tools=tools)
return executor
# Define agent nodes
def agent_node(state, agent, name):
result = agent.invoke(state)
return {"messages": [HumanMessage(content=result["output"], name=name)]}
# Create Agent Supervisor
members = ["Web_Searcher", "Insight_Researcher"]
system_prompt = (
"As a supervisor, your role is to oversee a dialogue between these"
" workers: {members}. Based on the user's request,"
" determine which worker should take the next action. Each worker is responsible for"
" executing a specific task and reporting back their findings and progress. Once all tasks are complete,"
" indicate with 'FINISH'."
)
options = ["FINISH"] + members
function_def = {
"name": "route",
"description": "Select the next role.",
"parameters": {
"title": "routeSchema",
"type": "object",
"properties": {"next": {"title": "Next", "anyOf": [{"enum": options}] }},
"required": ["next"],
},
}
prompt = ChatPromptTemplate.from_messages([
("system", system_prompt),
MessagesPlaceholder(variable_name="messages"),
("system", "Given the conversation above, who should act next? Or should we FINISH? Select one of: {options}"),
]).partial(options=str(options), members=", ".join(members))
supervisor_chain = (prompt | llm.bind(functions=[function_def], function_call={"name": "route"},) | JsonOutputFunctionsParser())
search_agent = create_agent(llm, tools, "You are a web searcher. Search the internet for information.")
search_node = functools.partial(agent_node, agent=search_agent, name="Web_Searcher")
insights_research_agent = create_agent(llm, tools,
"""You are a Insight Researcher. Do step by step.
Based on the provided content first identify the list of topics,
then search internet for each topic one by one
and finally find insights for each topic one by one.
Include the insights and sources in the final response
""")
insights_research_node = functools.partial(agent_node, agent=insights_research_agent, name="Insight_Researcher")
# Define the Agent State, Edges and Graph
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
next: str
workflow = StateGraph(AgentState)
workflow.add_node("Web_Searcher", search_node)
workflow.add_node("Insight_Researcher", insights_research_node)
workflow.add_node("supervisor", supervisor_chain)
# Define edges
for member in members:
workflow.add_edge(member, "supervisor")
conditional_map = {k: k for k in members}
conditional_map["FINISH"] = END
workflow.add_conditional_edges("supervisor", lambda x: x["next"], conditional_map)
workflow.set_entry_point("supervisor")
graph = workflow.compile()
# Run the graph
for s in graph.stream({
"messages": [HumanMessage(content="""Search for the latest AI technology trends in 2024,
summarize the content. After summarise pass it on to insight researcher
to provide insights for each topic""")]
}):
if "__end__" not in s:
print(s)
print("----")
# final_response = graph.invoke({
# "messages": [HumanMessage(
# content="""Search for the latest AI technology trends in 2024,
# summarize the content
# and provide insights for each topic.""")]
# })
# print(final_response['messages'][1].content)