-
Notifications
You must be signed in to change notification settings - Fork 45
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: "write /dev/stdout: broken pipe" error during execution #613
base: main
Are you sure you want to change the base?
Conversation
📝 WalkthroughWalkthroughThe changes refactor the cleanup logic within the Changes
Sequence Diagram(s)sequenceDiagram
participant Caller
participant StreamManager as _stream_from_subprocess
participant Subprocess
Caller ->> StreamManager: Initiate subprocess streaming
StreamManager ->> Subprocess: Start subprocess and stream output
alt Process termination (try block)
StreamManager ->> Subprocess: Attempt termination & exit code check
end
StreamManager ->> Subprocess: Finally close stdout
StreamManager -->> Caller: Return stream data or error signal
Would you like to adjust or add more details to the sequence diagram, wdyt? ✨ Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (2)
airbyte/_executors/base.py (2)
120-148
: Great improvements to the process cleanup! 🎉The new implementation is much more robust, with graceful shutdown, proper exit code handling, and guaranteed resource cleanup. This should effectively address the "broken pipe" errors.
A small suggestion: would you consider making the termination timeout configurable, perhaps as a parameter with a default value of 10 seconds? wdyt?
@contextmanager def _stream_from_subprocess( args: list[str], *, stdin: IO[str] | AirbyteMessageIterator | None = None, log_file: IO[str] | None = None, + termination_timeout: float = 10.0, ) -> Generator[Iterable[str], None, None]: # ... existing code ... try: # Wait for a short period to allow process to terminate gracefully - process.wait(timeout=10) + process.wait(timeout=termination_timeout)
132-144
: Consider enhancing the error message for better debugging.The error message could be more descriptive about what happened during termination. Maybe we could include whether it was a graceful shutdown or force kill? wdyt?
# If the exit code is not 0 or -15 (SIGTERM), raise an exception if exit_code not in {0, -15}: raise exc.AirbyteSubprocessFailedError( run_args=args, exit_code=exit_code, + message=f"Process failed with exit code {exit_code}. Process was {'force killed' if process._was_killed else 'terminated gracefully'}.", original_exception=( exception_holder.exception if not isinstance(exception_holder.exception, BrokenPipeError) else None ), )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte/_executors/base.py
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: Pytest (No Creds)
- GitHub Check: Pytest (Fast)
I have encountered this error many times during my tests.
Summary by CodeRabbit