-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[ML] Fix loss of context in the inference API for streaming APIs #118999
[ML] Fix loss of context in the inference API for streaming APIs #118999
Conversation
Hi @jonathan-buttner, I've created a changelog YAML for you. |
Hi @jonathan-buttner, I've updated the changelog YAML for you. |
var responseConsumer = new AsyncInferenceResponseConsumer(); | ||
request.setOptions(RequestOptions.DEFAULT.toBuilder().setHttpAsyncResponseConsumerFactory(() -> responseConsumer).build()); | ||
var latch = new CountDownLatch(1); | ||
client().performRequestAsync(request, new ResponseListener() { | ||
@Override | ||
public void onSuccess(Response response) { | ||
if (responseConsumerCallback != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a way to get the response so we can check the headers
@@ -469,7 +477,7 @@ public void testSupportedStream() throws Exception { | |||
|
|||
var input = IntStream.range(1, 2 + randomInt(8)).mapToObj(i -> randomAlphanumericOfLength(5)).toList(); | |||
try { | |||
var events = streamInferOnMockService(modelId, TaskType.COMPLETION, input); | |||
var events = streamInferOnMockService(modelId, TaskType.COMPLETION, input, VALIDATE_ELASTIC_PRODUCT_HEADER_CONSUMER); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, even if I remove my change this test still passes. I opted for keeping these tests just in case something really goes wrong in the future but they wouldn't have caught the original issue 😞
@@ -115,6 +121,12 @@ private void initializeStream(InferenceAction.Response response) { | |||
) | |||
); | |||
}); | |||
|
|||
nextBodyPartListener = ContextPreservingActionListener.wrapPreservingContext( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I manually test this, it fixes the issue. I always get the header back as expected. Without the fix the _stream
api usually does not return the header. I believe this is because we are create a new listener that maybe gets executed with a different thread context. I think if we responded in the onResponse
implementation of this class it'd probably avoid the issue. But that's not how this was designed so I'm keeping the structure as it was.
…ming-lost-context
Pinging @elastic/ml-core (Team:ML) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for finding this fix for this!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…stic#118999) * Adding context preserving fix * Update docs/changelog/118999.yaml * Update docs/changelog/118999.yaml * Using a setonce and adding a test * Updating the changelog (cherry picked from commit 7ba3cb9) # Conflicts: # x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java
…stic#118999) * Adding context preserving fix * Update docs/changelog/118999.yaml * Update docs/changelog/118999.yaml * Using a setonce and adding a test * Updating the changelog (cherry picked from commit 7ba3cb9) # Conflicts: # x-pack/plugin/inference/qa/inference-service-tests/src/javaRestTest/java/org/elasticsearch/xpack/inference/InferenceBaseRestTest.java # x-pack/plugin/inference/qa/inference-service-tests/src/javaRestTest/java/org/elasticsearch/xpack/inference/InferenceCrudIT.java # x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java # x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rest/RestUnifiedCompletionInferenceAction.java # x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/rest/RestUnifiedCompletionInferenceActionTests.java
💚 All backports created successfully
Questions ?Please refer to the Backport tool documentation |
…stic#118999) * Adding context preserving fix * Update docs/changelog/118999.yaml * Update docs/changelog/118999.yaml * Using a setonce and adding a test * Updating the changelog (cherry picked from commit 7ba3cb9) # Conflicts: # x-pack/plugin/inference/qa/inference-service-tests/src/javaRestTest/java/org/elasticsearch/xpack/inference/InferenceBaseRestTest.java # x-pack/plugin/inference/qa/inference-service-tests/src/javaRestTest/java/org/elasticsearch/xpack/inference/InferenceCrudIT.java # x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/InferencePlugin.java # x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/rest/RestUnifiedCompletionInferenceAction.java # x-pack/plugin/inference/src/test/java/org/elasticsearch/xpack/inference/rest/RestUnifiedCompletionInferenceActionTests.java
This PR fixes an issue where the
X-elastic-produce
header was not being returned sometimes. I believe this was a concurrency issue in someway because when testing using the_stream
API the header is usually not returned. But when testing with the_unified
API the header is usually returned.I believe the issue relates to the fact that the rest handler's listener doesn't immediately return the response. The
onResponse
method creates a new listener that is called sometime later (maybe on a different thread?).One thing I noticed is that the headers are only returned once which is why I only preserve the context on the first action listener.
TODO
SetOnce
instead of theThreadPool
to avoid a potential issue where thecreateComponents()
call gets moved after the route retrieval. ✅Fixes #119000