Skip to content

Commit

Permalink
KAFKA-16383: Ensure tasks have already polled their consumers before …
Browse files Browse the repository at this point in the history
…producing verified records in MirrorConnectorsIntegrationBaseTest::testReplicateFromLatest (apache#16598)

Reviewers: Greg Harris <[email protected]>
  • Loading branch information
C0urante authored Jul 16, 2024
1 parent b015a83 commit 177b38a
Showing 1 changed file with 31 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -932,25 +932,54 @@ public void testReplicateFromLatest() throws Exception {
String topic = "test-topic-1";
produceMessages(primaryProducer, topic, NUM_PARTITIONS);

String sentinelTopic = "test-topic-sentinel";
primary.kafka().createTopic(sentinelTopic);

// consume from the ends of topics when no committed offsets are found
mm2Props.put(PRIMARY_CLUSTER_ALIAS + ".consumer." + AUTO_OFFSET_RESET_CONFIG, "latest");
// one way replication from primary to backup
mm2Props.put(BACKUP_CLUSTER_ALIAS + "->" + PRIMARY_CLUSTER_ALIAS + ".enabled", "false");
mm2Config = new MirrorMakerConfig(mm2Props);
waitUntilMirrorMakerIsRunning(backup, CONNECTOR_LIST, mm2Config, PRIMARY_CLUSTER_ALIAS, BACKUP_CLUSTER_ALIAS);

String backupSentinelTopic = remoteTopicName(sentinelTopic, PRIMARY_CLUSTER_ALIAS);
waitForTopicCreated(backup, backupSentinelTopic);

// wait for proof that the task has managed to poll its consumers at least once;
// this should also mean that it knows the proper end offset of the other test topic,
// and will consume exactly the expected number of records that we produce after
// this assertion passes
// NOTE: this assumes that there is a single MirrorSourceTask instance running;
// if there are multiple tasks, the logic will need to be updated to ensure that each
// task has managed to poll its consumer before continuing
waitForCondition(
() -> {
primary.kafka().produce(sentinelTopic, "sentinel-value");
int sentinelValues = backup.kafka().consumeAll(1_000, backupSentinelTopic).count();
return sentinelValues > 0;
},
RECORD_TRANSFER_DURATION_MS,
"Records were not produced to sentinel topic in time"
);

// produce some more messages to the topic, now that MM2 is running and replication should be taking place
produceMessages(primaryProducer, topic, NUM_PARTITIONS);

String backupTopic = remoteTopicName(topic, PRIMARY_CLUSTER_ALIAS);
// wait for at least the expected number of records to be replicated to the backup cluster
backup.kafka().consume(NUM_PARTITIONS * NUM_RECORDS_PER_PARTITION, RECORD_TRANSFER_DURATION_MS, backupTopic);
backup.kafka().consume(NUM_RECORDS_PRODUCED, RECORD_TRANSFER_DURATION_MS, backupTopic);

// consume all records from backup cluster
ConsumerRecords<byte[], byte[]> replicatedRecords = backup.kafka().consumeAll(RECORD_TRANSFER_DURATION_MS, backupTopic);

// ensure that we only replicated the records produced after startup
replicatedRecords.partitions().forEach(topicPartition -> {
int replicatedCount = replicatedRecords.records(topicPartition).size();
assertEquals(NUM_RECORDS_PER_PARTITION, replicatedCount);
assertEquals(
NUM_RECORDS_PER_PARTITION,
replicatedCount,
"Unexpected number of replicated records for partition " + topicPartition.partition()
);
});
}

Expand Down Expand Up @@ -1324,7 +1353,6 @@ private static <T> void waitForConsumingAllRecords(Consumer<T, T> consumer, int
private static Map<String, String> basicMM2Config() {
Map<String, String> mm2Props = new HashMap<>();
mm2Props.put("clusters", PRIMARY_CLUSTER_ALIAS + ", " + BACKUP_CLUSTER_ALIAS);
mm2Props.put("max.tasks", "10");
mm2Props.put("groups", "consumer-group-.*");
mm2Props.put("sync.topic.acls.enabled", "false");
mm2Props.put("emit.checkpoints.interval.seconds", String.valueOf(CHECKPOINT_INTERVAL_DURATION_MS / 1000));
Expand Down

0 comments on commit 177b38a

Please sign in to comment.