Skip to content

Commit

Permalink
AIOS Modularization Pt.1: Functional Components (agiresearch#199)
Browse files Browse the repository at this point in the history
* feat: pip fallback when conda isn't installed

* feat: hooks

* feat: functional components

* bug: extract partial hooks into global sequence

* style: dev tests passing / remove cache files

* style: remove unused import

* style: ruff checks sigh
  • Loading branch information
BRama10 authored Aug 7, 2024
1 parent fd7e2d0 commit 5dd289e
Show file tree
Hide file tree
Showing 18 changed files with 325 additions and 129 deletions.
Empty file added aios/hooks/__init__.py
Empty file.
98 changes: 98 additions & 0 deletions aios/hooks/llm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
from concurrent.futures import ThreadPoolExecutor, Future, as_completed
from typing import Any

from aios.llm_core.llms import LLM

from aios.scheduler.fifo_scheduler import FIFOScheduler

from aios.hooks.types.llm import AgentSubmitDeclaration, FactoryParams, LLMParams, SchedulerParams, LLMRequestQueue, QueueGetMessage, QueueAddMessage, QueueCheckEmpty
from aios.hooks.validate import validate

from aios.hooks.stores import queue as QueueStore, processes as ProcessStore

from aios.hooks.utils import generate_random_string

from pyopenagi.agents.agent_factory import AgentFactory
from pyopenagi.agents.agent_process import AgentProcessFactory

@validate(LLMParams)
def useKernel(params: LLMParams) -> LLM:
return LLM(**params.model_dump())

def useLLMRequestQueue() -> tuple[LLMRequestQueue, QueueGetMessage, QueueAddMessage, QueueCheckEmpty]:
r_str = generate_random_string()
_ = LLMRequestQueue()

QueueStore.LLM_REQUEST_QUEUE[r_str] = _

def getMessage():
return QueueStore.getMessage(_)

def addMessage(message: str):
return QueueStore.addMessage(_, message)

def isEmpty():
return QueueStore.isEmpty(_)


return _, getMessage, addMessage, isEmpty

@validate(SchedulerParams)
def useFIFOScheduler(params: SchedulerParams):
if params.get_queue_message is None:

from aios.hooks.stores._global import global_llm_req_queue_get_message

params.get_queue_message = global_llm_req_queue_get_message

scheduler = FIFOScheduler(**params.model_dump())

def startScheduler():
scheduler.start()

def stopScheduler():
scheduler.stop()

return startScheduler, stopScheduler


@validate(FactoryParams)
def useFactory(params: FactoryParams):
process_factory = AgentProcessFactory()

agent_factory = AgentFactory(
agent_process_factory=process_factory,
agent_log_mode=params.log_mode,
)

thread_pool = ThreadPoolExecutor(max_workers=params.max_workers)

@validate(AgentSubmitDeclaration)
def submitAgent(declaration_params: AgentSubmitDeclaration) -> None:
_submitted_agent: Future = thread_pool.submit(
agent_factory.run_agent,
declaration_params.agent_name,
declaration_params.task_input
)

ProcessStore.addProcess(_submitted_agent)

def awaitAgentExecution() -> list[dict[str, Any]]:
res = []

for r in as_completed(ProcessStore.AGENT_PROCESSES):
_ = r.result()
res.append(_)

return res


return submitAgent, awaitAgentExecution








Empty file added aios/hooks/stores/__init__.py
Empty file.
5 changes: 5 additions & 0 deletions aios/hooks/stores/_global.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#global vars

from aios.hooks.llm import useLLMRequestQueue

global_llm_req_queue, global_llm_req_queue_get_message, global_llm_req_queue_add_message, global_llm_req_queue_is_empty = useLLMRequestQueue()
9 changes: 9 additions & 0 deletions aios/hooks/stores/processes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from concurrent.futures import Future

AGENT_PROCESSES: list[Future] = []

def addProcess(p: Future) -> None:
AGENT_PROCESSES.append(p)

def clearProcesses() -> None:
AGENT_PROCESSES.clear()
15 changes: 15 additions & 0 deletions aios/hooks/stores/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from aios.hooks.types.llm import LLMRequestQueue

LLM_REQUEST_QUEUE: dict[str, LLMRequestQueue] = {}

def getMessage(q: LLMRequestQueue):
return q.get(block=True, timeout=1)

def addMessage(q: LLMRequestQueue, message: str):
q.put(message)

return None

def isEmpty(q: LLMRequestQueue):
return q.empty()

1 change: 1 addition & 0 deletions aios/hooks/types/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Empty file added aios/hooks/types/hooks.py
Empty file.
35 changes: 35 additions & 0 deletions aios/hooks/types/llm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from pydantic import BaseModel
from typing import Any, TypeAlias, Callable

from queue import Queue

from pyopenagi.agents.agent_process import AgentProcess

LLMRequestQueue: TypeAlias = Queue[AgentProcess]

QueueGetMessage: TypeAlias = Callable[[], AgentProcess]
QueueAddMessage: TypeAlias = Callable[[str], None]
QueueCheckEmpty: TypeAlias = Callable[[], bool]

class LLMParams(BaseModel):
llm_name: str
max_gpu_memory: dict | None = None,
eval_device: str | None = None,
max_new_tokens: int = 256,
log_mode: str = "console",
use_backend: str | None = None


class SchedulerParams(BaseModel):
llm: Any
log_mode: str
get_queue_message: QueueGetMessage | None


class FactoryParams(BaseModel):
log_mode: str = "console",
max_workers: int = 500

class AgentSubmitDeclaration(BaseModel):
agent_name: str
task_input: str | int | float | dict | tuple | list
5 changes: 5 additions & 0 deletions aios/hooks/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import random
import string

def generate_random_string(length: int = 6) -> str:
return ''.join(random.choices(string.ascii_letters, k=length))
20 changes: 20 additions & 0 deletions aios/hooks/validate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from pydantic import BaseModel, ValidationError
from typing import Callable, Type

def validate(model_class: Type[BaseModel]):
"""
Decorator factory to validate and parse parameters using a specified Pydantic model.
:param model_class: The Pydantic model class to validate against
"""
def decorator(func: Callable):
def wrapper(*args, **kwargs):
try:
params = model_class(**kwargs)

return func(params)
except ValidationError as e:
print(f"Validation error: {e}")
return None
return wrapper
return decorator
15 changes: 8 additions & 7 deletions aios/scheduler/fifo_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@
# instead of 0.05 seconds.

from .base import BaseScheduler
from aios.hooks.types.llm import QueueGetMessage

from queue import Queue, Empty

import traceback
import time

from pyopenagi.queues.llm_request_queue import LLMRequestQueue

class FIFOScheduler(BaseScheduler):
def __init__(self, llm, log_mode):
def __init__(self, llm, log_mode, get_queue_message: QueueGetMessage):
super().__init__(llm, log_mode)
self.agent_process_queue = Queue()

self.get_queue_message = get_queue_message

def run(self):
while self.active:
Expand All @@ -23,15 +23,16 @@ def run(self):
wait 1 second between each iteration at the minimum
if there is nothing received in a second, it will raise Empty
"""
# agent_process = self.agent_process_queue.get(block=True, timeout=1)
agent_process = LLMRequestQueue.get_message()
# print("Get the request")

agent_process = self.get_queue_message()
agent_process.set_status("executing")
self.logger.log(f"{agent_process.agent_name} is executing. \n", "execute")
agent_process.set_start_time(time.time())
self.execute_request(agent_process)
except Empty:
pass
except Exception:
traceback.print_exc()

def execute_request(self, agent_process):
self.llm.address_request(
Expand Down
61 changes: 27 additions & 34 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,13 @@
# This is a main script that tests the functionality of specific agents.
# It requires no user input.


from aios.scheduler.fifo_scheduler import FIFOScheduler


from aios.utils.utils import (
parse_global_args,
)

from pyopenagi.agents.agent_factory import AgentFactory

from pyopenagi.agents.agent_process import AgentProcessFactory

import warnings

from aios.llm_core import llms

from concurrent.futures import ThreadPoolExecutor, as_completed

from aios.hooks.llm import useFactory, useKernel, useFIFOScheduler

from aios.utils.utils import delete_directories
from dotenv import load_dotenv
Expand All @@ -44,42 +33,44 @@ def main():
max_gpu_memory = args.max_gpu_memory
eval_device = args.eval_device
max_new_tokens = args.max_new_tokens
scheduler_log_mode = args.scheduler_log_mode
agent_log_mode = args.agent_log_mode
llm_kernel_log_mode = args.llm_kernel_log_mode
# scheduler_log_mode = args.scheduler_log_mode
# agent_log_mode = args.agent_log_mode
# llm_kernel_log_mode = args.llm_kernel_log_mode
use_backend = args.use_backend
load_dotenv()

llm = llms.LLM(
llm = useKernel(
llm_name=llm_name,
max_gpu_memory=max_gpu_memory,
eval_device=eval_device,
max_new_tokens=max_new_tokens,
log_mode=llm_kernel_log_mode,
log_mode='console',
use_backend=use_backend
)

# run agents concurrently for maximum efficiency using a scheduler

scheduler = FIFOScheduler(llm=llm, log_mode=scheduler_log_mode)

agent_process_factory = AgentProcessFactory()
# scheduler = FIFOScheduler(llm=llm, log_mode=scheduler_log_mode)

agent_factory = AgentFactory(
agent_process_queue=scheduler.agent_process_queue,
agent_process_factory=agent_process_factory,
agent_log_mode=agent_log_mode,
startScheduler, stopScheduler = useFIFOScheduler(
llm=llm,
log_mode='console',
get_queue_message=None
)

agent_thread_pool = ThreadPoolExecutor(max_workers=500)
submitAgent, awaitAgentExecution = useFactory(
log_mode='console',
max_workers=500
)

scheduler.start()
# scheduler.start()
startScheduler()

academic_agent = agent_thread_pool.submit(
agent_factory.run_agent,
"example/academic_agent",
"Find recent papers on the impact of social media on mental health in adolescents.",
submitAgent(
agent_name="example/academic_agent",
task_input="Find recent papers on the impact of social media on mental health in adolescents."
)

# creation_agent = agent_thread_pool.submit(
# agent_factory.run_agent,
# "example/creation_agent", "Create an Instagram post: Image of a person using a new tech gadget, text highlighting its key features and benefits."
Expand Down Expand Up @@ -157,7 +148,7 @@ def main():
# "example/tech_support_agent", "I want to take a trip to Paris, France from July 4th to July 10th, 2024, and I am traveling from New York City. Help me plan this trip."
# )

agent_tasks = [academic_agent]
# agent_tasks = [academic_agent]
# agent_tasks = [cocktail_mixlogist]
# agent_tasks = [cook_therapist]
# agent_tasks = [creation_agent]
Expand All @@ -176,10 +167,12 @@ def main():
# agent_tasks = [story_teller]
# agent_tasks = [tech_support_agent]

for r in as_completed(agent_tasks):
_res = r.result()
awaitAgentExecution()



scheduler.stop()
# scheduler.stop()
stopScheduler()

clean_cache(root_directory="./")

Expand Down
4 changes: 2 additions & 2 deletions pyopenagi/agents/agent_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@

class AgentFactory:
def __init__(self,
agent_process_queue,
# agent_process_queue,
agent_process_factory,
agent_log_mode
):
self.max_aid = 256
# self.llm = llm
self.aid_pool = [i for i in range(self.max_aid)]
heapq.heapify(self.aid_pool)
self.agent_process_queue = agent_process_queue
# self.agent_process_queue = agent_process_queue
self.agent_process_factory = agent_process_factory

self.current_agents = {}
Expand Down
Loading

0 comments on commit 5dd289e

Please sign in to comment.