Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to get last offset Number directly or to filter the offsets #225

Open
HafsaAsif opened this issue Jul 29, 2015 · 1 comment
Open

How to get last offset Number directly or to filter the offsets #225

HafsaAsif opened this issue Jul 29, 2015 · 1 comment

Comments

@HafsaAsif
Copy link

Hi,
I am using a simple consumer/producer in Kafka NodeJS. My producer is sending messages which I am easily getting in consumer. Producer and Consumer code is below. In Consumer, I ws expecting that offset.fetch() gives me all offset IDs of today, but it doesnot. KIndly guide me that how I can get results from this method and also mention a method that directly gives the last offset number in the topic of any partition. I also want to know that how can I filter offsets in the coming streaming. e.g: If I want to get only last 20 messages in my consumer?

My producer is:
var kafka = require('kafka-node');
var Producer = kafka.Producer;
var Client = kafka.Client;
var client = new Client('localhost:2181');
var producer = new Producer(client);
producer.on('ready', function () {
producer.send([
{ topic: 'test', key:'key1', partition: 0, messages: ['banana','carrot','lemon','apple','melon','kiwi','mango','avacado'], attributes: 0}
], function (err, result) {
console.log(err || result);
process.exit();
});
});

My Consumer is:
var kafka = require('kafka-node');
var Consumer = kafka.Consumer;
var client = new kafka.Client('localhost:2181');
var offset = new kafka.Offset(client);
offset.fetch([
{ topic: 'test' }
], function (err, data) {
console.log(data);

});
var consumer = new Consumer(
client,
[
{ topic: 'test', partition: 0}
],

    {  autoCommit: false, autoCommitIntervalMs: 5000,  fetchMaxWaitMs: 100, fromOffset: true, fetchMinBytes: 1, fetchMaxBytes: 1024 * 10
    }
);

consumer.on('message', function (message) {
console.log(message);
});

@anandpathak
Copy link

when you are fetching offset you can mention the time from when you want to fetch offset as well as you can get how many offset you want.

offset.fetch([
        { topic: 't', partition: 0, time: Date.now(), maxNum: 1 }
    ], function (err, data) {
        // data 
        // { 't': { '0': [999] } } 
    });

here you can see the time is mentioned is current so by changing it to 20 min before you can get the offset of the message which came 20 min before. now you can pass this offset value to consumer API and can get messages from 20 min before .
keep it in mind that the JSON returned is contain partition as key so reading that would difficult so store partition number in a variable as string and then pass it like object[t][partition_number][0]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants