Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unable to consume specific amount of messages without waiting with a setTimeout #450

Open
danrevah opened this issue Jul 12, 2018 · 8 comments

Comments

@danrevah
Copy link

danrevah commented Jul 12, 2018

Environment Information

  • OS [e.g. Mac, Arch, Windows 10]: RedHat
  • Node Version [e.g. 8.2.1]: 10.6.0
  • NPM Version [e.g. 5.4.2]: 6.1.0
  • C++ Toolchain [e.g. Visual Studio, llvm, g++]:
  • node-rdkafka version [e.g. 2.3.3]: 2.3.2
// Non-flowing mode
consumer.connect();

consumer
  .on('ready', function() {
    consumer.subscribe(['librdtesting-01']);

    // Read only 1000 messages
    consumer.consume(1000); // <---------- This is not working...
  })
  .on('data', function(data) {
    console.log('Message found!  Contents below.');
    console.log(data.value.toString());
  });

If i'm replacing the line that I've marked above with the following, then it does work:

setTimeout(() => consumer.consume(1000), 60000); // <-- this is working..

Could this be related to node-rdkafka? or is it something that has to do with my kafka configuration?

@webmakersteve
Copy link
Contributor

You need to handle the potential error case in consume through its callback. There is a chance, especially directly after a consume, that the consume will fail because a timeout or because a rebalance is happening.

In your code that executes a timeout and then does it, it is likely that the fetcher is already set up and it works coincidentally.

@thebigredgeek
Copy link

thebigredgeek commented Oct 8, 2018

Is there not an error event emitted? I am seeing something very similar, but error is null and the messages array is empty. It seems like ready is being fired before the consumer has fetched partition offsets, according to the logs when i set debug to `consumer. Also, it appears that the offset is being incremented even though I am not receiving the messages through the data or consume callbacks

@noderat
Copy link

noderat commented Nov 5, 2018

@thebigredgeek This is also exactly what I was experiencing: Error is null, messages array is empty and offset is being incremented (unless configured to not have automatic offset handling).

@stale
Copy link

stale bot commented Feb 3, 2019

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs.

@stale stale bot added the stale Stale issues label Feb 3, 2019
@stale stale bot closed this as completed Feb 10, 2019
@rosshadden
Copy link

Multiple people have the same problem (myself included). This should not be closed.

@philipyoo
Copy link
Contributor

I'm also facing this issue atm. Using a setTimeout, but the timeout is not reliable

@Albinzr
Copy link

Albinzr commented Jan 2, 2020

Same issue, Any fix for this problem, without this we cant manually consume data and commit

@talaltoukan
Copy link

We are experiencing the same issue. Please re-open this issue.

@iradul iradul removed the stale Stale issues label May 25, 2022
@iradul iradul reopened this May 25, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

9 participants