Skip to content

Conversation

lionelchg
Copy link

Summary

Motivation

  • Some guardrails must run before any downstream LLM/tool activity (e.g., safety tripwires), while others can run
    concurrently (e.g., telemetry or advisory checks). Previously, all input guardrails ran in parallel with the first
    model call, which meant tripwires could trigger after downstream actions had already started.
  • In streaming mode, if a blocking guardrail triggers before the model produces any events, the consumer could hang
    awaiting events that never arrive.

Key Changes

  • Input Guardrails API:
    • src/agents/guardrail.py
    • Add block_downstream_calls: bool = True to InputGuardrail.
    • Extend @input_guardrail decorator with block_downstream_calls (default True for safety/back-compat).
  • Runner (non-streamed):
    • src/agents/run.py
    • On first turn, separate input guardrails into blocking vs non-blocking.
    • If any blocking guardrails exist:
      • Await them to complete before the first model call and tool execution.
      • Run non-blocking guardrails concurrently with the model call/tools.
    • If none blocking:
      • Run all guardrails concurrently with the model call/tools as before.
    • Helpers:
      • _separate_blocking_guardrails(...): returns (blocking, non_blocking).
      • _get_model_response_only(...): obtains the model response without executing tools.
      • _execute_tools_from_model_response(...): runs tool execution/side effects from a model response.
  • Runner (streamed):
    • src/agents/run.py
    • On first turn, run blocking guardrails to completion first and push their results to the guardrail queue. If a
      tripwire triggers, the run task exits and the stream consumer detects the error.
    • Start non-blocking guardrails in the background queue (or all guardrails if none are blocking).
    • Append guardrail results to RunResultStreaming.input_guardrail_results to avoid overwriting.
  • Streaming Hang Fix (important):
    • src/agents/result.py
    • In RunResultStreaming.stream_events(), wrap self._event_queue.get() in a short timeout:
      • item = await asyncio.wait_for(self._event_queue.get(), timeout=0.1)
      • On TimeoutError, loop continues and re-checks for stored exceptions and guardrail tripwires.
    • Why: If a blocking input guardrail triggers before any model events are streamed, the event queue may stay empty
      forever. The timeout wakes the loop to:
      • Drain _input_guardrail_queue and raise InputGuardrailTripwireTriggered.
      • Detect exceptions stored from the background run task (e.g., tripwire raised) and surface them.
    • Effect: Prevents indefinite hangs when there are no model events. Normal streaming behavior is unchanged; the
      consumer still yields events as they arrive.
  • Tests:
    • Add tests covering:
    • Blocking guardrail prevents the first model call.
    • Blocking guardrail delays tool execution until completion.
    • Non-blocking guardrail allows parallel tool execution with tools.
    • Mixed blocking and non-blocking ordering.
    • Streamed: blocking guardrail prevents starting model streaming.
  • Small adjustments in existing tests to attach a model when asserting model behavior.

Behavior

  • Default behavior is unchanged for existing users: guardrails default to block_downstream_calls=True, preserving the
    safest behavior.
  • Opt-in parallelism: set block_downstream_calls=False to allow a guardrail to run concurrently with the first model
    call/tools.
  • Streaming never hangs waiting for model events that never come due to early guardrail tripwires.
    • When idle (no events), the loop wakes 10 times/second to re-check errors, which is lightweight and ensures
      responsiveness.

Tests

  • Unit tests added/updated; run with make tests.
  • Specific reproduction fixed: tests/
    test_agent_runner_streamed.py::test_input_guardrail_tripwire_triggered_causes_exception_streamed previously hung; with
    the timeout it surfaces the tripwire and exits as expected.

Files Touched

  • src/agents/guardrail.py: New flag + decorator param.
  • src/agents/run.py: Guardrail separation, gating logic, streamed behavior, helper methods, accumulate results.
  • src/agents/result.py: Short timeout in stream_events() to avoid hangs.
  • tests/*: New and adjusted tests.

…model events are produced

RunResultStreaming.stream_events() previously awaited indefinitely on . If a blocking input guardrail trips before any model output, no events are ever enqueued, causing the consumer to hang. Adding a small timeout lets the loop periodically re-check the guardrail queue and background task state, surfacing tripwires and exceptions promptly without waiting for model events.
@lionelchg
Copy link
Author

The following python script shows the expected behaviour with blocking and non-blocking guardrails:

import asyncio
from agents import Agent, InputGuardrail, GuardrailFunctionOutput, Runner, ItemHelpers
from agents.exceptions import InputGuardrailTripwireTriggered
from pydantic import BaseModel

class HomeworkOutput(BaseModel):
    is_homework: bool
    reasoning: str

guardrail_agent = Agent(
    name="Guardrail check",
    instructions="Check if the user is asking about homework.",
    output_type=HomeworkOutput,
)

math_tutor_agent = Agent(
    name="Math Tutor",
    handoff_description="Specialist agent for math questions",
    instructions="You provide help with math problems. "
    "Explain your reasoning at each step and include examples",
)

async def homework_guardrail(ctx, agent, input_data):
    result = await Runner.run(guardrail_agent, input_data, context=ctx.context)
    final_output = result.final_output_as(HomeworkOutput)
    return GuardrailFunctionOutput(
        output_info=final_output,
        tripwire_triggered=not final_output.is_homework,
    )

triage_agent_blocking = Agent(
    name="Triage Agent",
    instructions="You determine which agent to use based on the user's homework question",
    handoffs=[math_tutor_agent],
    input_guardrails=[
        InputGuardrail(guardrail_function=homework_guardrail, block_downstream_calls=True),
    ],
)

triage_agent_non_blocking = Agent(
    name="Triage Agent",
    instructions="You determine which agent to use based on the user's homework question",
    handoffs=[math_tutor_agent],
    input_guardrails=[
        InputGuardrail(guardrail_function=homework_guardrail, block_downstream_calls=False),
    ],
)

async def main():
    # Example: General/philosophical question
    try:
        result = await Runner.run(triage_agent_blocking, "What is the meaning of life?")
        print(result.final_output)
    except InputGuardrailTripwireTriggered as e:
        print("Guardrail blocked this input:", e)

    # Example: Non-blocking guardrail
    try:
        result = await Runner.run(triage_agent_non_blocking, "What is the meaning of life?")
        print(result.final_output)
    except InputGuardrailTripwireTriggered as e:
        print("Guardrail blocked this input:", e)

if __name__ == "__main__":
    asyncio.run(main())

Triggering the following traces in OpenAI platform:
Screenshot 2025-08-30 at 23 03 52
which shows the first call being blocked and the second non-blocking where the subsequent call has been run

@lionelchg
Copy link
Author

Happy to get your feedback @seratch and @rm-openai here as I started from your original PR :). Hope that helps!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

FileSearchTool runs despite InputGuardrailTripwireTriggered
1 participant