Skip to content

Commit

Permalink
auto reset option for Kafka Indexing service (apache#3842)
Browse files Browse the repository at this point in the history
* auto reset option for Kafka Indexing service in case message at the offset being fetched is not present anymore at kafka brokers

* review comments

* review comments

* reverted last change

* review comments

* review comments

* fix typo
  • Loading branch information
pjain1 authored and himanshug committed Feb 2, 2017
1 parent a457cde commit 1aabb45
Show file tree
Hide file tree
Showing 26 changed files with 485 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,36 @@ public DataSourceMetadata plus(DataSourceMetadata other)
}
}

@Override
public DataSourceMetadata minus(DataSourceMetadata other)
{
if (!(other instanceof KafkaDataSourceMetadata)) {
throw new IAE(
"Expected instance of %s, got %s",
KafkaDataSourceMetadata.class.getCanonicalName(),
other.getClass().getCanonicalName()
);
}

final KafkaDataSourceMetadata that = (KafkaDataSourceMetadata) other;

if (that.getKafkaPartitions().getTopic().equals(kafkaPartitions.getTopic())) {
// Same topic, remove partitions present in "that" from "this"
final Map<Integer, Long> newMap = Maps.newHashMap();

for (Map.Entry<Integer, Long> entry : kafkaPartitions.getPartitionOffsetMap().entrySet()) {
if(!that.getKafkaPartitions().getPartitionOffsetMap().containsKey(entry.getKey())) {
newMap.put(entry.getKey(), entry.getValue());
}
}

return new KafkaDataSourceMetadata(new KafkaPartitions(kafkaPartitions.getTopic(), newMap));
} else {
// Different topic, prefer "this".
return this;
}
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public class KafkaIOConfig implements IOConfig
{
private static final boolean DEFAULT_USE_TRANSACTION = true;
private static final boolean DEFAULT_PAUSE_AFTER_READ = false;
private static final boolean DEFAULT_USE_EARLIEST_OFFSET = false;

private final String baseSequenceName;
private final KafkaPartitions startPartitions;
Expand All @@ -41,7 +40,6 @@ public class KafkaIOConfig implements IOConfig
private final boolean useTransaction;
private final boolean pauseAfterRead;
private final Optional<DateTime> minimumMessageTime;
private final boolean useEarliestOffset;

@JsonCreator
public KafkaIOConfig(
Expand All @@ -51,8 +49,7 @@ public KafkaIOConfig(
@JsonProperty("consumerProperties") Map<String, String> consumerProperties,
@JsonProperty("useTransaction") Boolean useTransaction,
@JsonProperty("pauseAfterRead") Boolean pauseAfterRead,
@JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
@JsonProperty("useEarliestOffset") Boolean useEarliestOffset
@JsonProperty("minimumMessageTime") DateTime minimumMessageTime
)
{
this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "baseSequenceName");
Expand All @@ -62,7 +59,6 @@ public KafkaIOConfig(
this.useTransaction = useTransaction != null ? useTransaction : DEFAULT_USE_TRANSACTION;
this.pauseAfterRead = pauseAfterRead != null ? pauseAfterRead : DEFAULT_PAUSE_AFTER_READ;
this.minimumMessageTime = Optional.fromNullable(minimumMessageTime);
this.useEarliestOffset = useEarliestOffset != null ? useEarliestOffset : DEFAULT_USE_EARLIEST_OFFSET;

Preconditions.checkArgument(
startPartitions.getTopic().equals(endPartitions.getTopic()),
Expand Down Expand Up @@ -126,12 +122,6 @@ public Optional<DateTime> getMinimumMessageTime()
return minimumMessageTime;
}

@JsonProperty
public boolean isUseEarliestOffset()
{
return useEarliestOffset;
}

@Override
public String toString()
{
Expand All @@ -143,7 +133,6 @@ public String toString()
", useTransaction=" + useTransaction +
", pauseAfterRead=" + pauseAfterRead +
", minimumMessageTime=" + minimumMessageTime +
", useEarliestOffest=" + useEarliestOffset +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.ResetDataSourceMetadataAction;
import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.task.AbstractTask;
Expand Down Expand Up @@ -386,7 +387,7 @@ public void run()
}
catch (OffsetOutOfRangeException e) {
log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, assignment);
possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, toolbox);
stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty();
}

Expand Down Expand Up @@ -995,11 +996,12 @@ private boolean possiblyPause(Set<Integer> assignment) throws InterruptedExcepti
private void possiblyResetOffsetsOrWait(
Map<TopicPartition, Long> outOfRangePartitions,
KafkaConsumer<byte[], byte[]> consumer,
Set<Integer> assignment
) throws InterruptedException
TaskToolbox taskToolbox
) throws InterruptedException, IOException
{
boolean shouldRetry = false;
if(tuningConfig.isResetOffsetAutomatically()) {
final Map<TopicPartition, Long> resetPartitions = Maps.newHashMap();
boolean doReset = false;
if (tuningConfig.isResetOffsetAutomatically()) {
for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
final TopicPartition topicPartition = outOfRangePartition.getKey();
final long nextOffset = outOfRangePartition.getValue();
Expand All @@ -1012,15 +1014,15 @@ private void possiblyResetOffsetsOrWait(
// and the current message offset in the kafka partition is more than the
// next message offset that we are trying to fetch
if (leastAvailableOffset > nextOffset) {
resetOffset(consumer, assignment, topicPartition);
} else {
shouldRetry = true;
doReset = true;
resetPartitions.put(topicPartition, nextOffset);
}
}
} else {
shouldRetry = true;
}
if (shouldRetry) {

if (doReset) {
sendResetRequestAndWait(resetPartitions, taskToolbox);
} else {
log.warn("Retrying in %dms", POLL_RETRY_MS);
pollRetryLock.lockInterruptibly();
try {
Expand All @@ -1035,34 +1037,33 @@ private void possiblyResetOffsetsOrWait(
}
}

private void resetOffset(
KafkaConsumer<byte[], byte[]> consumer,
Set<Integer> assignment,
TopicPartition topicPartition
)
private void sendResetRequestAndWait(Map<TopicPartition, Long> outOfRangePartitions, TaskToolbox taskToolbox) throws IOException
{
log.warn(
"Resetting consumer offset to [%s] for partition [%d]",
ioConfig.isUseEarliestOffset() ? "earliest" : "latest",
topicPartition.partition()
);
if (ioConfig.isUseEarliestOffset()) {
consumer.seekToBeginning(topicPartition);
} else {
consumer.seekToEnd(topicPartition);
Map<Integer, Long> partitionOffsetMap = Maps.newHashMap();
for (Map.Entry<TopicPartition, Long> outOfRangePartition: outOfRangePartitions.entrySet()) {
partitionOffsetMap.put(outOfRangePartition.getKey().partition(), outOfRangePartition.getValue());
}
nextOffsets.put(topicPartition.partition(), consumer.position(topicPartition));
log.warn("Consumer is now at offset [%d]", nextOffsets.get(topicPartition.partition()));
// check if we seeked passed the endOffset for this partition
if (nextOffsets.get(topicPartition.partition()) >= endOffsets.get(topicPartition.partition())
&& assignment.remove(topicPartition.partition())) {
log.info(
"Finished reading topic[%s], partition[%,d].",
topicPartition.topic(),
topicPartition.partition()
);
boolean result = taskToolbox.getTaskActionClient()
.submit(new ResetDataSourceMetadataAction(
getDataSource(),
new KafkaDataSourceMetadata(new KafkaPartitions(
ioConfig.getStartPartitions()
.getTopic(),
partitionOffsetMap
))
));

if (result) {
log.warn("Successfully sent the reset request for partitions [%s], waiting to be killed", partitionOffsetMap.keySet());
// wait for being killed by supervisor
try {
Thread.sleep(Long.MAX_VALUE);
}
catch (InterruptedException e) {
throw new RuntimeException("Got interrupted while waiting to be killed");
}
} else {
log.makeAlert("Failed to send reset request for partitions [%s]", partitionOffsetMap.keySet()).emit();
}
// update assignments if something changed
assignPartitions(consumer, topicPartition.topic(), assignment);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import io.druid.indexing.overlord.TaskStorage;
import io.druid.indexing.overlord.supervisor.Supervisor;
import io.druid.indexing.overlord.supervisor.SupervisorReport;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.collect.JavaCompatUtils;
import io.druid.metadata.EntryExistsException;
Expand All @@ -74,6 +75,7 @@
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -417,10 +419,10 @@ public SupervisorReport getStatus()
}

@Override
public void reset()
public void reset(DataSourceMetadata dataSourceMetadata)
{
log.info("Posting ResetNotice");
notices.add(new ResetNotice());
notices.add(new ResetNotice(dataSourceMetadata));
}

public void possiblyRegisterListener()
Expand Down Expand Up @@ -506,29 +508,113 @@ public void handle() throws InterruptedException, ExecutionException, TimeoutExc

private class ResetNotice implements Notice
{
final DataSourceMetadata dataSourceMetadata;

ResetNotice(DataSourceMetadata dataSourceMetadata)
{
this.dataSourceMetadata = dataSourceMetadata;
}

@Override
public void handle()
{
resetInternal();
log.makeAlert("Resetting dataSource [%s]", dataSource).emit();
resetInternal(dataSourceMetadata);
}
}

@VisibleForTesting
void resetInternal()
void resetInternal(DataSourceMetadata dataSourceMetadata)
{
boolean result = indexerMetadataStorageCoordinator.deleteDataSourceMetadata(dataSource);
log.info("Reset dataSource[%s] - dataSource metadata entry deleted? [%s]", dataSource, result);
if (dataSourceMetadata == null) {
// Reset everything
boolean result = indexerMetadataStorageCoordinator.deleteDataSourceMetadata(dataSource);
log.info("Reset dataSource[%s] - dataSource metadata entry deleted? [%s]", dataSource, result);
killTaskGroupForPartitions(JavaCompatUtils.keySet(taskGroups));
} else if (!(dataSourceMetadata instanceof KafkaDataSourceMetadata)) {
throw new IAE("Expected KafkaDataSourceMetadata but found instance of [%s]", dataSourceMetadata.getClass());
} else {
// Reset only the partitions in dataSourceMetadata if it has not been reset yet
final KafkaDataSourceMetadata resetKafkaMetadata = (KafkaDataSourceMetadata) dataSourceMetadata;

if (resetKafkaMetadata.getKafkaPartitions().getTopic().equals(ioConfig.getTopic())) {
// metadata can be null
final DataSourceMetadata metadata = indexerMetadataStorageCoordinator.getDataSourceMetadata(dataSource);
if (metadata != null && !(metadata instanceof KafkaDataSourceMetadata)) {
throw new IAE(
"Expected KafkaDataSourceMetadata from metadata store but found instance of [%s]",
metadata.getClass()
);
}
final KafkaDataSourceMetadata currentMetadata = (KafkaDataSourceMetadata) metadata;

// defend against consecutive reset requests from replicas
// as well as the case where the metadata store do not have an entry for the reset partitions
boolean doReset = false;
for (Map.Entry<Integer, Long> resetPartitionOffset : resetKafkaMetadata.getKafkaPartitions()
.getPartitionOffsetMap()
.entrySet()) {
final Long partitionOffsetInMetadataStore = currentMetadata == null
? null
: currentMetadata.getKafkaPartitions()
.getPartitionOffsetMap()
.get(resetPartitionOffset.getKey());
final TaskGroup partitionTaskGroup = taskGroups.get(getTaskGroupIdForPartition(resetPartitionOffset.getKey()));
if (partitionOffsetInMetadataStore != null ||
(partitionTaskGroup != null && partitionTaskGroup.partitionOffsets.get(resetPartitionOffset.getKey())
.equals(resetPartitionOffset.getValue()))) {
doReset = true;
break;
}
}

for (TaskGroup taskGroup : taskGroups.values()) {
for (Map.Entry<String, TaskData> entry : taskGroup.tasks.entrySet()) {
String taskId = entry.getKey();
log.info("Reset dataSource[%s] - killing task [%s]", dataSource, taskId);
killTask(taskId);
if (!doReset) {
return;
}

boolean metadataUpdateSuccess = false;
if (currentMetadata == null) {
metadataUpdateSuccess = true;
} else {
final DataSourceMetadata newMetadata = currentMetadata.minus(resetKafkaMetadata);
try {
metadataUpdateSuccess = indexerMetadataStorageCoordinator.resetDataSourceMetadata(dataSource, newMetadata);
}
catch (IOException e) {
log.error("Resetting DataSourceMetadata failed [%s]", e.getMessage());
Throwables.propagate(e);
}
}
if (metadataUpdateSuccess) {
killTaskGroupForPartitions(JavaCompatUtils.keySet(resetKafkaMetadata.getKafkaPartitions()
.getPartitionOffsetMap()));
} else {
throw new ISE("Unable to reset metadata");
}
} else {
log.warn(
"Reset metadata topic [%s] and supervisor's topic [%s] do not match",
resetKafkaMetadata.getKafkaPartitions().getTopic(),
ioConfig.getTopic()
);
}
}
}

partitionGroups.clear();
taskGroups.clear();
private void killTaskGroupForPartitions(Set<Integer> partitions)
{
for (Integer partition : partitions) {
TaskGroup taskGroup = taskGroups.get(getTaskGroupIdForPartition(partition));
if (taskGroup != null) {
// kill all tasks in this task group
for (String taskId : JavaCompatUtils.keySet(taskGroup.tasks)) {
log.info("Reset dataSource[%s] - killing task [%s]", dataSource, taskId);
killTask(taskId);
}
}
partitionGroups.remove(getTaskGroupIdForPartition(partition));
taskGroups.remove(getTaskGroupIdForPartition(partition));
}
}

@VisibleForTesting
Expand Down Expand Up @@ -1287,8 +1373,7 @@ private void createKafkaTasksForGroup(int groupId, int replicas)
consumerProperties,
true,
false,
minimumMessageTime,
ioConfig.isUseEarliestOffset()
minimumMessageTime
);

for (int i = 0; i < replicas; i++) {
Expand Down
Loading

0 comments on commit 1aabb45

Please sign in to comment.