Skip to content

Commit

Permalink
Revert "修复kafka消费回滚的bug (alibaba#2521)" (alibaba#2526)
Browse files Browse the repository at this point in the history
This reverts commit 31acfc0.
  • Loading branch information
rewerma authored Jan 2, 2020
1 parent 31acfc0 commit b6333f0
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,6 @@ protected boolean mqWriteOutData(int retry, long timeout, int i, final boolean f
if (i == retry - 1) {
connector.ack();
logger.error(e.getMessage() + " Error sync but ACK!");
try {
Thread.sleep(500);
} catch (InterruptedException e1) {
// ignore
}
return true;
} else {
connector.rollback();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ public KafkaCanalConnector(String servers, String topic, Integer partition, Stri
batchSize = 100;
}
properties.put("max.poll.records", batchSize.toString());
properties.put("key.deserializer", StringDeserializer.class);
properties.put("key.deserializer", StringDeserializer.class.getName());
if (!flatMessage) {
properties.put("value.deserializer", MessageDeserializer.class);
properties.put("value.deserializer", MessageDeserializer.class.getName());
} else {
properties.put("value.deserializer", StringDeserializer.class);
properties.put("value.deserializer", StringDeserializer.class.getName());
}
}

Expand Down Expand Up @@ -183,13 +183,14 @@ public List<Message> getListWithoutAck(Long timeout, TimeUnit unit) throws Canal

ConsumerRecords<String, Message> records = kafkaConsumer.poll(unit.toMillis(timeout));

currentOffsets.clear();
for (TopicPartition topicPartition : records.partitions()) {
currentOffsets.put(topicPartition.partition(), kafkaConsumer.position(topicPartition));
}

if (!records.isEmpty()) {
currentOffsets.clear();
List<Message> messages = new ArrayList<>();
for (ConsumerRecord<String, Message> record : records) {
if (currentOffsets.get(record.partition()) == null) {
currentOffsets.put(record.partition(), record.offset());
}
messages.add(record.value());
}
return messages;
Expand Down Expand Up @@ -220,13 +221,14 @@ public List<FlatMessage> getFlatListWithoutAck(Long timeout, TimeUnit unit) thro

ConsumerRecords<String, String> records = kafkaConsumer2.poll(unit.toMillis(timeout));

currentOffsets.clear();
for (TopicPartition topicPartition : records.partitions()) {
currentOffsets.put(topicPartition.partition(), kafkaConsumer2.position(topicPartition));
}

if (!records.isEmpty()) {
currentOffsets.clear();
List<FlatMessage> flatMessages = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) {
if (currentOffsets.get(record.partition()) == null) {
currentOffsets.put(record.partition(), record.offset());
}
String flatMessageJson = record.value();
FlatMessage flatMessage = JSON.parseObject(flatMessageJson, FlatMessage.class);
flatMessages.add(flatMessage);
Expand All @@ -246,14 +248,12 @@ public void rollback() {
// 回滚所有分区
if (kafkaConsumer != null) {
for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
kafkaConsumer.seek(new TopicPartition(topic, entry.getKey()), currentOffsets.get(entry.getKey()));
kafkaConsumer.commitSync();
kafkaConsumer.seek(new TopicPartition(topic, entry.getKey()), entry.getValue() - 1);
}
}
if (kafkaConsumer2 != null) {
for (Map.Entry<Integer, Long> entry : currentOffsets.entrySet()) {
kafkaConsumer2.seek(new TopicPartition(topic, entry.getKey()), currentOffsets.get(entry.getKey()));
kafkaConsumer.commitSync();
kafkaConsumer2.seek(new TopicPartition(topic, entry.getKey()), entry.getValue() - 1);
}
}
}
Expand Down

0 comments on commit b6333f0

Please sign in to comment.