Skip to content

Commit eeef272

Browse files
committed
refactor ruff
1 parent 654af12 commit eeef272

File tree

12 files changed

+546
-208
lines changed

12 files changed

+546
-208
lines changed

agent_telephony/twilio_livekit/agent_twilio/src/workflows/logic.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,7 @@ async def run(self, workflow_input: LogicWorkflowInput) -> str:
4343
slow_response: LlmLogicResponse = await workflow.step(
4444
function=llm_logic,
4545
function_input=LlmLogicInput(
46-
messages=[
47-
msg.model_dump() for msg in workflow_input.messages
48-
],
46+
messages=[msg.model_dump() for msg in workflow_input.messages],
4947
documentation=documentation,
5048
),
5149
start_to_close_timeout=timedelta(seconds=60),

agent_telephony/twilio_livekit/livekit_pipeline/entrypoint.sh

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
set -e
33

44
echo "Running download-files..."
5-
uv run python src/pipeline.py download-files
5+
uv run python src/worker.py download-files
66

7-
echo "Starting pipeline..."
8-
exec uv run python src/pipeline.py start
7+
echo "Starting livekit worker..."
8+
exec uv run python src/worker.py start

agent_telephony/twilio_livekit/livekit_pipeline/src/client.py

Lines changed: 0 additions & 19 deletions
This file was deleted.
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
"""Env Check.
2+
3+
This module checks that all required environment variables are set.
4+
It is intended to be used during application startup to warn developers about missing configurations.
5+
"""
6+
7+
import os
8+
from dotenv import load_dotenv
9+
from src.utils import logger # Using the shared logger from utils
10+
11+
REQUIRED_ENVS: dict[str, str] = {
12+
"LIVEKIT_URL": "LiveKit server URL",
13+
"LIVEKIT_API_KEY": "API Key for LiveKit",
14+
"LIVEKIT_API_SECRET": "API Secret for LiveKit",
15+
"DEEPGRAM_API_KEY": "API key for Deepgram (used for STT)",
16+
"ELEVEN_API_KEY": "API key for ElevenLabs (used for TTS)",
17+
}
18+
19+
20+
# Load environment variables from the .env file.
21+
load_dotenv(dotenv_path=".env")
22+
23+
def check_env_vars() -> None:
24+
"""Check required environment variables and log warnings if any are missing."""
25+
try:
26+
for key, description in REQUIRED_ENVS.items():
27+
if not os.environ.get(key):
28+
logger.warning(
29+
"Environment variable '%s' (%s) is not set.",
30+
key,
31+
description,
32+
)
33+
logger.info("Environment variable check complete.")
34+
except Exception as e:
35+
logger.exception(
36+
"Error during environment variable check: %s", e
37+
)
38+
raise
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
"""Metrics.
2+
3+
Provides functions for handling and sending metrics data from Livekit to Restack.
4+
"""
5+
6+
import asyncio
7+
import json
8+
9+
from livekit.agents import metrics
10+
from livekit.agents.pipeline import VoicePipelineAgent
11+
from src.restack.client import client
12+
from src.utils import logger, track_task
13+
14+
15+
async def send_metrics(
16+
pipeline_metrics: metrics.AgentMetrics,
17+
agent_id: str,
18+
run_id: str,
19+
) -> None:
20+
"""Send metrics data to restack asynchronously.
21+
22+
Args:
23+
pipeline_metrics (metrics.AgentMetrics): The metrics data to be sent.
24+
agent_id (str): The identifier for the agent.
25+
run_id (str): The current execution run identifier.
26+
27+
"""
28+
try:
29+
latencies = []
30+
if isinstance(
31+
pipeline_metrics, metrics.PipelineEOUMetrics
32+
):
33+
total_latency = (
34+
pipeline_metrics.end_of_utterance_delay
35+
)
36+
latencies.append(total_latency * 1000)
37+
elif isinstance(
38+
pipeline_metrics, metrics.PipelineLLMMetrics
39+
):
40+
total_latency = pipeline_metrics.ttft
41+
latencies.append(total_latency * 1000)
42+
elif isinstance(
43+
pipeline_metrics, metrics.PipelineTTSMetrics
44+
):
45+
total_latency = pipeline_metrics.ttfb
46+
latencies.append(total_latency * 1000)
47+
if latencies:
48+
metrics_latencies = str(
49+
json.dumps({"latencies": latencies})
50+
)
51+
logger.info(
52+
"Sending pipeline metrics: %s", metrics_latencies
53+
)
54+
await client.send_agent_event(
55+
event_name="pipeline_metrics",
56+
agent_id=agent_id.replace("local-", ""),
57+
run_id=run_id,
58+
event_input={
59+
"metrics": pipeline_metrics,
60+
"latencies": metrics_latencies,
61+
},
62+
)
63+
except (TypeError, ValueError) as exc:
64+
logger.exception("Error processing metrics data: %s", exc)
65+
raise
66+
except Exception as exc:
67+
logger.exception(
68+
"Unexpected error sending pipeline metrics: %s", exc
69+
)
70+
raise
71+
72+
73+
def setup_pipeline_metrics(
74+
pipeline: VoicePipelineAgent,
75+
agent_id: str,
76+
run_id: str,
77+
usage_collector: metrics.UsageCollector,
78+
) -> None:
79+
"""Configure the pipeline to send metrics when they are collected.
80+
81+
Attaches a callback to the pipeline that logs metrics data and sends it to restack.
82+
83+
Args:
84+
pipeline (VoicePipelineAgent): The pipeline instance.
85+
agent_id (str): The identifier for the agent.
86+
run_id (str): The current run identifier.
87+
usage_collector (metrics.UsageCollector): Collector for aggregating usage metrics.
88+
89+
"""
90+
91+
@pipeline.on("metrics_collected")
92+
def on_metrics_collected(
93+
pipeline_metrics: metrics.AgentMetrics,
94+
) -> None:
95+
try:
96+
metrics.log_metrics(pipeline_metrics)
97+
track_task(
98+
asyncio.create_task(
99+
send_metrics(
100+
pipeline_metrics, agent_id, run_id
101+
)
102+
)
103+
)
104+
usage_collector.collect(pipeline_metrics)
105+
except (TypeError, ValueError) as exc:
106+
logger.exception(
107+
"Error processing collected metrics: %s", exc
108+
)
109+
raise
110+
except Exception as exc:
111+
logger.exception(
112+
"Unexpected error handling collected metrics: %s",
113+
exc,
114+
)
115+
raise

0 commit comments

Comments
 (0)