Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(weave): Fixes Async integration timing #3653

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

tssweeney
Copy link
Collaborator

@tssweeney tssweeney commented Feb 11, 2025

Bug Observation: Many integrations (notably OpenAI) when using async methods would report very long runtimes - specifically inside of an eval loop. Initial triage showed that the start event was getting emitted long before the actual function started executing.

Bug Root Cause: The root cause of this issue is the widespread usage of the following pattern inside the integration. Specifically: wrapping an async function with an outer async function, and binding the op to the outer function:

async def _async_wrapper(*args: Any, **kwargs: Any) -> Any:
    # Possible pre-processing
    res = await fn(*args, **kwargs)
    # Possible post-processing
    return res

wrapped_op = weave.op(_async_wrapper)

Explanation: This is particularly bad because the weave tracing engine thinks that the user-defined method is _async_wrapper, however the function if interest is actually fn. Therefore, we start our wall clock at the entry of _async_wrapper, which immediately pauses. This means the time reported includes all the scheduling delay of the event loop!

Fix: This is an anti-pattern in our integrations. We should move the pre- and post- processing logic to different mechanics. There are a few classes of changes:

  1. Cases where no additional logic was even needed and this pattern spread via copy-pasta:
    • weave/integrations/anthropic/anthropic_sdk.py
    • weave/integrations/cerebras/cerebras_sdk.py
    • weave/integrations/google_ai_studio/google_ai_studio_sdk.py
    • weave/integrations/instructor/instructor_iterable_utils.py
    • weave/integrations/vertexai/vertexai_sdk.py
  2. Cases where post processing was used to change the output to weave. In these cases we should simply use the new postprocess_output method
    • weave/integrations/cohere/cohere_sdk.py
  3. Harder cases (OpenAI). In this case we do some really weird logic where we inject a param to get the costs out. This required two changes: 1) do the param injections outside the op; 2) use a context var to determine skipping logic as opposed to inputs. Note: this is definitely not pretty... but it works
    • weave/integrations/openai/openai_sdk.py

Finally, weave/trace/op_extensions/accumulator.py was needed to be enhanced such that on_output can handle Coroutines now (:

Testing: All the integration tests served as a pretty good suite

@tssweeney tssweeney requested a review from a team as a code owner February 11, 2025 02:59
@circle-job-mirror
Copy link

circle-job-mirror bot commented Feb 11, 2025

op_kwargs = settings.model_dump()
op = weave.op(_post_process_response(fn), **op_kwargs)
user_provided_postprocess_output = op_kwargs.pop("postprocess_output", None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how were we using this before?

@@ -311,32 +312,41 @@ def openai_on_input_handler(
def create_wrapper_sync(settings: OpSettings) -> Callable[[Callable], Callable]:
def wrapper(fn: Callable) -> Callable:
"We need to do this so we can check if `stream` is used"
should_skip_last = contextvars.ContextVar("should_skip_last", default=False)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the comment is now confusing, also single quotes is weird while we are here

Copy link
Member

@gtarpenning gtarpenning left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think @andrewtruong should look at the accumulator bit

@@ -311,32 +312,41 @@ def openai_on_input_handler(
def create_wrapper_sync(settings: OpSettings) -> Callable[[Callable], Callable]:
def wrapper(fn: Callable) -> Callable:
"We need to do this so we can check if `stream` is used"
should_skip_last = contextvars.ContextVar("should_skip_last", default=False)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does skip_last mean? Maybe a comment here would be helpful

@@ -347,33 +357,45 @@ def _openai_stream_options_is_set(inputs: dict) -> bool:
def create_wrapper_async(settings: OpSettings) -> Callable[[Callable], Callable]:
def wrapper(fn: Callable) -> Callable:
"We need to do this so we can check if `stream` is used"
should_skip_last = contextvars.ContextVar(
"should_skip_last_async", default=False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, I'm not sure what skip_last means. Implicitly it has something to do with streaming?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is because openai appends token information to the end. we need that info, so we always try to grab it. if the user wants it, we dont want to skip it, but if the user hasn't requested it, we need to skip it so the output looks as expected

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants