Skip to content

Commit 456d284

Browse files
authored
Fixed Race Condition in RunResultStreaming.stream_events() Method (#1745)
1 parent 31ed091 commit 456d284

File tree

1 file changed

+30
-23
lines changed

1 file changed

+30
-23
lines changed

src/agents/result.py

Lines changed: 30 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -185,35 +185,42 @@ async def stream_events(self) -> AsyncIterator[StreamEvent]:
185185
- A MaxTurnsExceeded exception if the agent exceeds the max_turns limit.
186186
- A GuardrailTripwireTriggered exception if a guardrail is tripped.
187187
"""
188-
while True:
189-
self._check_errors()
190-
if self._stored_exception:
191-
logger.debug("Breaking due to stored exception")
192-
self.is_complete = True
193-
break
188+
try:
189+
while True:
190+
self._check_errors()
191+
if self._stored_exception:
192+
logger.debug("Breaking due to stored exception")
193+
self.is_complete = True
194+
break
194195

195-
if self.is_complete and self._event_queue.empty():
196-
break
196+
if self.is_complete and self._event_queue.empty():
197+
break
197198

198-
try:
199-
item = await self._event_queue.get()
200-
except asyncio.CancelledError:
201-
break
199+
try:
200+
item = await self._event_queue.get()
201+
except asyncio.CancelledError:
202+
break
202203

203-
if isinstance(item, QueueCompleteSentinel):
204-
# Await input guardrails if they are still running, so late exceptions are captured.
205-
await self._await_task_safely(self._input_guardrails_task)
204+
if isinstance(item, QueueCompleteSentinel):
205+
# Await input guardrails if they are still running, so late
206+
# exceptions are captured.
207+
await self._await_task_safely(self._input_guardrails_task)
206208

207-
self._event_queue.task_done()
209+
self._event_queue.task_done()
208210

209-
# Check for errors, in case the queue was completed due to an exception
210-
self._check_errors()
211-
break
212-
213-
yield item
214-
self._event_queue.task_done()
211+
# Check for errors, in case the queue was completed
212+
# due to an exception
213+
self._check_errors()
214+
break
215215

216-
self._cleanup_tasks()
216+
yield item
217+
self._event_queue.task_done()
218+
finally:
219+
# Ensure main execution completes before cleanup to avoid race conditions
220+
# with session operations
221+
await self._await_task_safely(self._run_impl_task)
222+
# Safely terminate all background tasks after main execution has finished
223+
self._cleanup_tasks()
217224

218225
if self._stored_exception:
219226
raise self._stored_exception

0 commit comments

Comments
 (0)