Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36585][connector/common] SplitFetcherManager.close() should not chain runnables to the element queue future in a tight loop #26205

Merged
merged 1 commit into from
Feb 28, 2025

Conversation

becketqin
Copy link
Contributor

What is the purpose of the change

This is a re-commit of the patch ( #25569 ) after fixing the accidentally changed timeout unit of the SplitFetcherManager close timeout.

Brief change log

Changes in the Original Fix:

This patch fixes an issue introduced in #25130. In SplitFetcherManager.close(), the element queue draining thread was chaining the runnables to the element queue availability future in a tight loop. This causes problem (e.g. OOM, high CPU util) when the fetcher threads do not shutdown quickly.

This patch changes the tight async loop to a blocking loop.

Changes in the followup patch fixes the accidentally changed timeout unit of the SplitFetcherManager close timeout.

Verifying this change

Unit tests have been added.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (No)
  • If yes, how is the feature documented? (No)

…t chain runnables to the element queue future in a tight loop.

This is a re-commit of the patch after fixing the accidentally changed timeout unit of the SplitFetcherManager close timeout.
@flinkbot
Copy link
Collaborator

flinkbot commented Feb 24, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@becketqin
Copy link
Contributor Author

@PatrickRen Can you take another look? cc @snuyanzin

e);
break;
}
timeElapsed = System.currentTimeMillis() - startTime;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we put out a log if we have timed out with elements still on the queue, or would the await termination LOG.warn( catch this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Exactly, the await termination LOG.warn() would catch it.

Copy link
Contributor

@PatrickRen PatrickRen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@becketqin Thanks for updating the patch! LGTM

@becketqin becketqin merged commit a71b1e7 into apache:master Feb 28, 2025
becketqin added a commit that referenced this pull request Feb 28, 2025
…timeout unit of the SplitFetcherManager close timeout. And improve the test stability. (#26205)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants