Skip to content

Commit

Permalink
MOD: remove ToolServerMonitor
Browse files Browse the repository at this point in the history
  • Loading branch information
luyaxi committed Nov 25, 2023
1 parent 312c0b2 commit 928a696
Show file tree
Hide file tree
Showing 15 changed files with 106 additions and 361 deletions.
4 changes: 3 additions & 1 deletion ToolServer/ToolServerManager/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import yaml
import logging
from typing import Dict, Any, Union
import uuid

class ManagerConfig:
"""
Expand Down Expand Up @@ -63,4 +64,5 @@ def update(self, new_config: Dict) -> None:

CONFIG = ManagerConfig()
logger = logging.getLogger(CONFIG['logger'])
logger.setLevel(CONFIG['logger_level'])
logger.setLevel(CONFIG['logger_level'])
MANAGER_ID = uuid.uuid4().hex
30 changes: 9 additions & 21 deletions ToolServer/ToolServerManager/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,19 @@
from motor.core import AgnosticDatabase
from config import CONFIG, logger

DB_TYPE = 'sqlite3'

# Database connection object
# It may be of two types, either a SQLite Connection (sqlite3.Connection) or
# an AsyncIO MongoDB Client (AgnosticDatabase), depending on the environment variables set.
db: Union[sqlite3.Connection, AgnosticDatabase] = None

# Check if DB_USERNAME environment variable is set.
# If it is set, that means we are using MongoDB, else SQLite.
if os.getenv('DB_USERNAME') is not None:
DB_TYPE = 'mongodb'

db_url = f"mongodb://{os.getenv('DB_USERNAME', '')}:{os.getenv('DB_PASSWORD', '')}@{os.getenv('DB_HOST', 'localhost')}:{os.getenv('DB_PORT', '27017')}/"

# Create an instance of AsyncIOMotorClient for asynchronous MongoDB operations.
# It will be used to connect to the MongoDB database.
mongo_client = AsyncIOMotorClient(db_url)

db: AgnosticDatabase = mongo_client[os.getenv('DB_COLLECTION', 'TSM')]
else:
# Create the directory in the path if it does not exist already.
os.makedirs(os.path.split(CONFIG['database']['sqlite_db'])[0], exist_ok=True)

# Connect to SQLite database as specified by config settings.
db = sqlite3.connect(CONFIG['database']['sqlite_db'])

DB_TYPE = 'mongodb'
db_url = f"mongodb://{os.getenv('DB_USERNAME', '')}:{os.getenv('DB_PASSWORD', '')}@{os.getenv('DB_HOST', 'localhost')}:{os.getenv('DB_PORT', '27017')}/"

# Create an instance of AsyncIOMotorClient for asynchronous MongoDB operations.
# It will be used to connect to the MongoDB database.
mongo_client = AsyncIOMotorClient(db_url)

db: AgnosticDatabase = mongo_client[os.getenv('DB_COLLECTION', 'TSM')]

# Log confirmation message
logger.info("Database connected")
Expand Down
129 changes: 46 additions & 83 deletions ToolServer/ToolServerManager/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import uvicorn
import httpx
import sqlite3
Expand All @@ -8,8 +9,9 @@

from fastapi import FastAPI, Cookie,Request,HTTPException,Response
from fastapi.responses import JSONResponse,RedirectResponse
from config import CONFIG,logger
from connections import DB_TYPE,db,docker_client
from config import CONFIG,logger,MANAGER_ID
from connections import db,docker_client
from models import ToolServerNode, NodeChecker

app = FastAPI()

Expand All @@ -20,22 +22,26 @@ async def startup():
like checking and creating table nodes if not exists in databse, creating subprocess
to update node status, and registering path to node.
"""
if DB_TYPE == 'sqlite3':
# check if table nodes exists
db_cursor:sqlite3.Cursor = db.cursor()
db_cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='nodes'")
if db_cursor.fetchone() is None:
db_cursor.execute("CREATE TABLE nodes (node_id text, node_short_id text, node_status text, node_ip text, node_last_req_time text,node_health text)")
db.commit()
logger.info("Table nodes created")
app.db_cursor = db_cursor


from beanie import init_beanie
await init_beanie(database=db,
document_models=[ToolServerNode,NodeChecker],)

# create subprocess to update node status
if CONFIG['builtin_monitor']:
from node_checker import check_nodes_status_loop
asyncio.create_task(check_nodes_status_loop())


checker = await NodeChecker.find_one()
if checker is None:
checker = NodeChecker(
manager_id=MANAGER_ID,
interval=float(CONFIG['node'].get('health_check_interval',1))
)
await checker.insert()

loop = asyncio.get_running_loop()
loop.create_task(check_nodes_status_loop())


# register path to node
for path in CONFIG['redirect_to_node_path']['post']:
app.add_api_route(path, route_to_node, methods=["POST"])
Expand All @@ -44,13 +50,14 @@ async def startup():
app.add_api_route(path, route_to_node, methods=["GET"])

@app.on_event("shutdown")
def shutdown():
async def shutdown():
"""
Event handler on shutdown of the app. Specifically closes the database cursor if
the database type os sqlite3.
"""
if DB_TYPE == 'sqlite3':
app.db_cursor.close()
async for checker in NodeChecker.find(NodeChecker.manager_id == MANAGER_ID):
await checker.delete()
db.client.close()

@app.get('/alive')
async def alive():
Expand All @@ -62,35 +69,6 @@ async def alive():
"""
return "alive"

async def get_node_info(node_id:str):
"""
Fetch node information associated with given node_id from the database.
Args:
node_id (str): The unique identifier of the node whose information is to fetched.
Returns:
dict: A dictionary containing all the key-value pairs of node details,
from the database.
"""
# check if cookie is valid
if DB_TYPE == 'sqlite3':
db_cursor:sqlite3.Cursor = app.db_cursor
db_cursor.execute("SELECT * FROM nodes WHERE node_id = ?",(node_id,))
node = db_cursor.fetchone()
if node is not None:
node = {
'node_id':node[0],
'node_short_id':node[1],
'node_status':node[2],
'node_ip':node[3],
'node_last_req_time':node[4],
'node_health':node[5]
}
if DB_TYPE == 'mongodb':
node = await db['nodes'].find_one({'node_id':node_id})
return node

async def wait_for_node_startup(node_id:str):
"""
Wait for the startup of node with id node_id. It probes the node status every seconds until
Expand All @@ -108,16 +86,16 @@ async def wait_for_node_startup(node_id:str):
MAX_PROBE_TIMES = CONFIG['node']['creation_wait_seconds']
probe_times = 0
while probe_times < MAX_PROBE_TIMES:
node = await get_node_info(node_id)
node = await ToolServerNode.find_one(ToolServerNode.id == node_id)

if node is None:
raise HTTPException(status_code=503, detail="Failed to detect node status! Node not found in db!")

if CONFIG['node']['health_check']:
if node['node_health'] == 'healthy':
if node.health == 'healthy':
return True
else:
if node['node_status'] == "running":
if node.status == "running":
return True

probe_times += 1
Expand Down Expand Up @@ -150,28 +128,16 @@ async def read_cookie_info():
response.set_cookie(key="node_id", value=container.id)
container.reload()

if DB_TYPE == 'sqlite3':
db_cursor:sqlite3.Cursor = app.db_cursor
# add node to db
db_cursor.execute("INSERT INTO nodes (node_id,node_short_id,node_status,node_ip,node_last_req_time,node_health) VALUES (?, ?, ?, ?, ?, ?)",
(container.id,
container.short_id,
container.attrs["State"]["Status"],
container.attrs["NetworkSettings"]["Networks"][CONFIG['node']['creation_kwargs']['network']]["IPAddress"],
datetime.datetime.utcnow().isoformat()),
container.attrs['State']['Health']['Status'])
db.commit()

if DB_TYPE == 'mongodb':
logger.debug(container.attrs['State'])
await db['nodes'].insert_one({
'node_id':container.id,
'node_short_id':container.short_id,
'node_status':container.attrs["State"]["Status"],
'node_ip':container.attrs["NetworkSettings"]["Networks"][CONFIG['node']['creation_kwargs']['network']]["IPAddress"],
'node_last_req_time':datetime.datetime.utcnow().isoformat(),
'node_health':container.attrs['State']['Health']['Status']
})
node = ToolServerNode(
id=container.id,
short_id=container.short_id,
status=container.attrs["State"]["Status"],
ip=container.attrs["NetworkSettings"]["Networks"][CONFIG['node']['creation_kwargs']['network']]["IPAddress"],
port=CONFIG['node'].get('port',31942),
last_req_time=datetime.datetime.utcnow(),
health=container.attrs['State']['Health']['Status'] if CONFIG['node']['health_check'] else None
)
await node.insert()

# probe node status every seconds until creation_wait_seconds reached
if await wait_for_node_startup(container.id):
Expand All @@ -194,7 +160,7 @@ async def reconnect_session(node_id:str = Cookie(None)):
Raises:
HTTPException: If node restart timeout occurs.
"""
node = await get_node_info(node_id)
node = await ToolServerNode.find_one(ToolServerNode.id == node_id)
if node is None:
return "invalid node_id: " + str(node_id)
# restart node
Expand All @@ -220,7 +186,7 @@ async def close_session(node_id:str = Cookie(None)):
Returns:
str: Success message if node stops successfully.
"""
node = await get_node_info(node_id)
node = await ToolServerNode.find_one(ToolServerNode.id == node_id)
if node is None:
return "invalid node_id: " + str(node_id)
# stop node
Expand All @@ -242,7 +208,7 @@ async def release_session(node_id:str = Cookie(None)):
Returns:
str: Success message if node is successfully killed and removed.
"""
node = await get_node_info(node_id)
node = await ToolServerNode.find_one(ToolServerNode.id == node_id)
if node is None:
return "invalid node_id: " + str(node_id)

Expand Down Expand Up @@ -271,25 +237,22 @@ async def route_to_node(requset:Request,*,node_id:str = Cookie(None)):
HTTPException: If node_id is not valid or if the node is not running or not responding.
"""
# logger.info("accept node_id:",node_id)
node = await get_node_info(node_id)
node = await ToolServerNode.find_one(ToolServerNode.id == node_id)
if node is None:
raise HTTPException(status_code=403,detail="invalid node_id: " + str(node_id))

if node['node_status'] != "running":
if node.status != "running":
raise HTTPException(status_code=503,detail="node is not running: " + str(node_id))

# update latest_req_time in db
if DB_TYPE == 'sqlite3':
app.db_cursor.execute("UPDATE nodes SET node_last_req_time = ? WHERE node_id = ?",(datetime.datetime.utcnow().isoformat(),node_id))
db.commit()
if DB_TYPE == 'mongodb':
await db['nodes'].update_one({'node_id':node_id},{'$set':{'node_last_req_time':datetime.datetime.utcnow().isoformat()}})
node.last_req_time = datetime.datetime.utcnow()
await node.replace()

#post request to node
method = requset.method
headers = dict(requset.headers)
body = await requset.body()
url = "http://" + node['node_ip']+":31942" + requset.url.path
url = "http://" + node.ip +":"+str(node.port) + requset.url.path
logger.info("Request to node: " + url)

async with httpx.AsyncClient(timeout=None) as client:
Expand Down
19 changes: 19 additions & 0 deletions ToolServer/ToolServerManager/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from beanie import Document
from datetime import datetime


class ToolServerNode(Document):
"""
A class that represents a node in the database.
"""
id: str
short_id: str
status: str
health: str
last_req_time: datetime
ip: str
port: int

class NodeChecker(Document):
manager_id: str
interval: float
Loading

0 comments on commit 928a696

Please sign in to comment.