Skip to content

Commit

Permalink
Merge pull request apache#1705 from duhenglucky/pull_consumer_offset
Browse files Browse the repository at this point in the history
[ISSUE apache#1706]feat(pull_consumer) refactor the consumer offset update logic
  • Loading branch information
vongosling authored Jan 8, 2020
2 parents 1aa2355 + ad149dd commit 366e6e7
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ public void registerTopicMessageQueueChangeListener(String topic,

@Override
public void commitSync() {
this.defaultLitePullConsumerImpl.commitSync();
this.defaultLitePullConsumerImpl.commitAll();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ public void removeAssignedMessageQueue(String topic) {
}
}

public Set<MessageQueue> getAssignedMessageQueues() {
return this.assignedMessageQueueState.keySet();
}

private class MessageQueueState {
private MessageQueue messageQueue;
private ProcessQueue processQueue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -590,37 +590,14 @@ private void removePullTask(final String topic) {
}
}

public synchronized void commitSync() {
public synchronized void commitAll() {
try {
for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) {
long consumerOffset = assignedMessageQueue.getConsumerOffset(messageQueue);
if (consumerOffset != -1) {
ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
long preConsumerOffset = this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY);
if (processQueue != null && !processQueue.isDropped() && consumerOffset != preConsumerOffset) {
if (processQueue != null && !processQueue.isDropped()) {
updateConsumeOffset(messageQueue, consumerOffset);
updateConsumeOffsetToBroker(messageQueue, consumerOffset, false);
}
}
}
if (defaultLitePullConsumer.getMessageModel() == MessageModel.BROADCASTING) {
offsetStore.persistAll(assignedMessageQueue.messageQueues());
}
} catch (Exception e) {
log.error("An error occurred when update consume offset synchronously.", e);
}
}

private synchronized void commitAll() {
try {
for (MessageQueue messageQueue : assignedMessageQueue.messageQueues()) {
long consumerOffset = assignedMessageQueue.getConsumerOffset(messageQueue);
if (consumerOffset != -1) {
ProcessQueue processQueue = assignedMessageQueue.getProcessQueue(messageQueue);
long preConsumerOffset = this.getOffsetStore().readOffset(messageQueue, ReadOffsetType.READ_FROM_MEMORY);
if (processQueue != null && !processQueue.isDropped() && consumerOffset != preConsumerOffset) {
updateConsumeOffset(messageQueue, consumerOffset);
updateConsumeOffsetToBroker(messageQueue, consumerOffset, true);
}
}
}
Expand Down Expand Up @@ -927,11 +904,16 @@ public void persistConsumerOffset() {
try {
checkServiceState();
Set<MessageQueue> mqs = new HashSet<MessageQueue>();
Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
mqs.addAll(allocateMq);
if (this.subscriptionType == SubscriptionType.SUBSCRIBE) {
Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
mqs.addAll(allocateMq);
} else if (this.subscriptionType == SubscriptionType.ASSIGN) {
Set<MessageQueue> assignedMessageQueue = this.assignedMessageQueue.getAssignedMessageQueues();
mqs.addAll(assignedMessageQueue);
}
this.offsetStore.persistAll(mqs);
} catch (Exception e) {
log.error("group: " + this.defaultLitePullConsumer.getConsumerGroup() + " persistConsumerOffset exception", e);
log.error("Persist consumer offset error for group: {} ", this.defaultLitePullConsumer.getConsumerGroup(), e);
}
}

Expand Down

0 comments on commit 366e6e7

Please sign in to comment.