Skip to content

Commit

Permalink
feat: implement distributed processing across 3 machines
Browse files Browse the repository at this point in the history
- Add support for 6 total endpoints (2 ports × 3 machines)
- Implement endpoint health tracking and failover
- Add robust error handling for network failures
- Increase timeout for network calls
- Improve endpoint health testing and reporting

Distribute processing load across three 16-core machines using
both ports 5004 and 5005 on each machine, with automatic
failover and health monitoring for improved reliability.
  • Loading branch information
ChaseKolozsy committed Jan 14, 2025
1 parent 90b39d1 commit b7f7769
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 41 deletions.
1 change: 1 addition & 0 deletions src/operations/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -97,3 +97,4 @@ ENV/

# VS Code
.vscode/
last_commit.txt
157 changes: 116 additions & 41 deletions src/operations/app_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,15 @@
import json
from typing import List

# Update BASE_URL to be a list of endpoints
ENDPOINTS = ["http://localhost:5004", "http://localhost:5005"]
# Update ENDPOINTS to include all machines and ports
ENDPOINTS = [
"http://localhost:5004",
"http://localhost:5005",
"http://10.0.0.138:5004",
"http://10.0.0.138:5005",
"http://10.0.0.115:5004",
"http://10.0.0.115:5005"
]

language_abreviations = {
"Hungarian": "hu",
Expand Down Expand Up @@ -77,9 +84,10 @@
class StanzaClient:
def __init__(self):
self.current_language = None
self.client = httpx.AsyncClient()
self.client = httpx.AsyncClient(timeout=30.0) # Increased timeout for network calls
self.current_endpoint = 0
self.batch_size = 1000 # Maximum batch size
self.batch_size = 1000
self.endpoint_health = {endpoint: True for endpoint in ENDPOINTS}

async def __aenter__(self):
return self
Expand All @@ -91,33 +99,66 @@ def select_language(self, language):
self.current_language = language
return True

async def _try_endpoint(self, endpoint, data, is_batch=False):
"""Attempt to use an endpoint and mark it as unhealthy if it fails"""
try:
endpoint_path = "/batch_process" if is_batch else "/process"
response = await self.client.post(f"{endpoint}{endpoint_path}", json=data)
self.endpoint_health[endpoint] = True
return response
except Exception as e:
self.endpoint_health[endpoint] = False
raise e

async def process_text(self, text: str):
if not self.current_language:
raise ValueError("Language not selected")

endpoint = ENDPOINTS[self.current_endpoint] + "/process"
self.current_endpoint = (self.current_endpoint + 1) % len(ENDPOINTS)

data = {
"language": self.current_language,
"text": text
}
response = await self.client.post(endpoint, json=data)
return response
# Find next healthy endpoint
attempts = 0
while attempts < len(ENDPOINTS):
endpoint = ENDPOINTS[self.current_endpoint]
self.current_endpoint = (self.current_endpoint + 1) % len(ENDPOINTS)

if self.endpoint_health[endpoint]:
try:
data = {
"language": self.current_language,
"text": text
}
return await self._try_endpoint(endpoint, data)
except Exception as e:
attempts += 1
continue
else:
attempts += 1

raise Exception("No healthy endpoints available")

async def process_batch(self, texts: List[str]):
if not self.current_language:
raise ValueError("Language not selected")

endpoint = ENDPOINTS[self.current_endpoint] + "/batch_process"
self.current_endpoint = (self.current_endpoint + 1) % len(ENDPOINTS)

data = {
"language": self.current_language,
"texts": texts
}
response = await self.client.post(endpoint, json=data)
return response
# Find next healthy endpoint
attempts = 0
while attempts < len(ENDPOINTS):
endpoint = ENDPOINTS[self.current_endpoint]
self.current_endpoint = (self.current_endpoint + 1) % len(ENDPOINTS)

if self.endpoint_health[endpoint]:
try:
data = {
"language": self.current_language,
"texts": texts
}
return await self._try_endpoint(endpoint, data, is_batch=True)
except Exception as e:
attempts += 1
continue
else:
attempts += 1

raise Exception("No healthy endpoints available")

# Create a singleton instance
_client = StanzaClient()
Expand Down Expand Up @@ -166,7 +207,43 @@ async def test_processing(mode: str = "single"):
"A fodrász hajat vág.",
"A szabó ruhát varr.",
"A cipész cipőt készít.",
"A kovács vasat kalapál."
"A kovács vasat kalapál.",
"A programozó kódot ír.",
"A művész galériában kiállít.",
"A sportoló edzésen vesz részt.",
"A tudós kísérleteket végez.",
"Az építész házat tervez.",
"A könyvtáros könyveket rendez.",
"A pincér ételt szolgál fel.",
"A boltos árut pakol.",
"A bankár pénzt számol.",
"A sofőr buszt vezet.",
"A vadász erdőben jár.",
"A halász hálót vet.",
"A bíró ítéletet hoz.",
"A farmer földet művel.",
"A méhész mézet gyűjt.",
"A hegymászó csúcsra tör.",
"A búvár mélybe merül.",
"A fotós képeket készít.",
"A cukrász tortát díszít.",
"A villanyszerelő vezetéket szerel.",
"A kéményseprő kéményt tisztít.",
"A varrónő gépet használ.",
"A műszerész alkatrészt cserél.",
"A kozmetikus arcot fest.",
"A masszőr izmokat lazít.",
"A díszlettervező színpadot épít.",
"A tolmács nyelveket beszél.",
"A meteorológus időjárást jósol.",
"A régész leleteket keres.",
"A botanikus növényeket vizsgál.",
"A zoológus állatokat figyel.",
"A geológus kőzeteket elemez.",
"A csillagász távcsövet használ.",
"A koreográfus táncot tanít.",
"A karmester zenekart vezényel.",
"A szobrász követ farag."
]

select_language("hu")
Expand Down Expand Up @@ -213,24 +290,22 @@ async def test_processing(mode: str = "single"):
return batch_results, single_results

async def test_endpoints():
"""Test if both endpoints are accessible"""
try:
# Test single process endpoint with minimal valid payload
test_payload = {"language": "hu", "text": "test"}
response = await _client.client.post(ENDPOINTS[0] + "/process", json=test_payload)
if response.status_code != 200:
print(f"Warning: /process endpoint returned status {response.status_code} on {ENDPOINTS[0]}")

# Test batch process endpoint with minimal valid payload
batch_payload = {"language": "hu", "texts": ["test"]}
response = await _client.client.post(ENDPOINTS[1] + "/batch_process", json=batch_payload)
if response.status_code != 200:
print(f"Warning: /batch_process endpoint returned status {response.status_code} on {ENDPOINTS[1]}")

except Exception as e:
print(f"Error testing endpoints: {str(e)}")
return False
return True
"""Test if all endpoints are accessible"""
healthy_endpoints = 0
for endpoint in ENDPOINTS:
try:
test_payload = {"language": "hu", "text": "test"}
response = await _client.client.post(f"{endpoint}/process", json=test_payload)
if response.status_code == 200:
healthy_endpoints += 1
print(f"✅ Endpoint {endpoint} is healthy")
else:
print(f"⚠️ Endpoint {endpoint} returned status {response.status_code}")
except Exception as e:
print(f"❌ Error testing endpoint {endpoint}: {str(e)}")

print(f"\nFound {healthy_endpoints} healthy endpoints out of {len(ENDPOINTS)}")
return healthy_endpoints > 0

async def main():
# First test if endpoints are available
Expand Down

0 comments on commit b7f7769

Please sign in to comment.