Skip to content

Commit

Permalink
option to reset offest automatically in case of OffsetOutOfRangeExcep…
Browse files Browse the repository at this point in the history
…tion (apache#3678)

* option to reset offset automatically in case of OffsetOutOfRangeException
if the next offset is less than the earliest available offset for that partition

* review comments

* refactoring

* refactor

* review comments
  • Loading branch information
pjain1 authored and himanshug committed Nov 21, 2016
1 parent 7c63bee commit 7ee6bb7
Show file tree
Hide file tree
Showing 10 changed files with 217 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|`buildV9Directly`|Boolean|Whether to build a v9 index directly instead of first building a v8 index and then converting it to v9 format.|no (default == true)|
|`reportParseExceptions`|Boolean|If true, exceptions encountered during parsing will be thrown and will halt ingestion; if false, unparseable rows and fields will be skipped.|no (default == false)|
|`handoffConditionTimeout`|Long|Milliseconds to wait for segment handoff. It must be >= 0, where 0 means to wait forever.|no (default == 0)|
|`resetOffsetAutomatically`|Boolean|Whether to reset the consumer offset if the next offset that it is trying to fetch is less than the earliest available offset for that particular partition. The consumer offset will be reset to either the earliest or latest offset depending on `useEarliestOffset` property of `KafkaSupervisorIOConfig` (see below). This situation typically occurs when messages in Kafka are no longer available for consumption and therefore won't be ingested into Druid. If set to false then ingestion for that particular partition will halt and manual intervention is required to correct the situation, please see `Reset Supervisor` API below.|no (default == false)|
|`workerThreads`|Integer|The number of threads that will be used by the supervisor for asynchronous operations.|no (default == min(10, taskCount))|
|`chatThreads`|Integer|The number of threads that will be used for communicating with indexing tasks.|no (default == min(10, taskCount * replicas))|
|`chatRetries`|Integer|The number of times HTTP requests to indexing tasks will be retried before considering tasks unresponsive.|no (default == 8)|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class KafkaIOConfig implements IOConfig
{
private static final boolean DEFAULT_USE_TRANSACTION = true;
private static final boolean DEFAULT_PAUSE_AFTER_READ = false;
private static final boolean DEFAULT_USE_EARLIEST_OFFSET = false;

private final String baseSequenceName;
private final KafkaPartitions startPartitions;
Expand All @@ -40,6 +41,7 @@ public class KafkaIOConfig implements IOConfig
private final boolean useTransaction;
private final boolean pauseAfterRead;
private final Optional<DateTime> minimumMessageTime;
private final boolean useEarliestOffset;

@JsonCreator
public KafkaIOConfig(
Expand All @@ -49,7 +51,8 @@ public KafkaIOConfig(
@JsonProperty("consumerProperties") Map<String, String> consumerProperties,
@JsonProperty("useTransaction") Boolean useTransaction,
@JsonProperty("pauseAfterRead") Boolean pauseAfterRead,
@JsonProperty("minimumMessageTime") DateTime minimumMessageTime
@JsonProperty("minimumMessageTime") DateTime minimumMessageTime,
@JsonProperty("useEarliestOffset") Boolean useEarliestOffset
)
{
this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "baseSequenceName");
Expand All @@ -59,6 +62,7 @@ public KafkaIOConfig(
this.useTransaction = useTransaction != null ? useTransaction : DEFAULT_USE_TRANSACTION;
this.pauseAfterRead = pauseAfterRead != null ? pauseAfterRead : DEFAULT_PAUSE_AFTER_READ;
this.minimumMessageTime = Optional.fromNullable(minimumMessageTime);
this.useEarliestOffset = useEarliestOffset != null ? useEarliestOffset : DEFAULT_USE_EARLIEST_OFFSET;

Preconditions.checkArgument(
startPartitions.getTopic().equals(endPartitions.getTopic()),
Expand Down Expand Up @@ -122,6 +126,12 @@ public Optional<DateTime> getMinimumMessageTime()
return minimumMessageTime;
}

@JsonProperty
public boolean isUseEarliestOffset()
{
return useEarliestOffset;
}

@Override
public String toString()
{
Expand All @@ -133,6 +143,7 @@ public String toString()
", useTransaction=" + useTransaction +
", pauseAfterRead=" + pauseAfterRead +
", minimumMessageTime=" + minimumMessageTime +
", useEarliestOffest=" + useEarliestOffset +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Ints;

import com.metamx.emitter.EmittingLogger;
import io.druid.data.input.Committer;
import io.druid.data.input.InputRow;
import io.druid.data.input.impl.InputRowParser;
Expand All @@ -52,7 +52,6 @@
import io.druid.indexing.common.task.TaskResource;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.logger.Logger;
import io.druid.java.util.common.parsers.ParseException;
import io.druid.query.DruidMetrics;
import io.druid.query.NoopQueryRunner;
Expand Down Expand Up @@ -116,7 +115,7 @@ public enum Status
PUBLISHING
}

private static final Logger log = new Logger(KafkaIndexTask.class);
private static final EmittingLogger log = new EmittingLogger(KafkaIndexTask.class);
private static final String TYPE = "index_kafka";
private static final Random RANDOM = new Random();
private static final long POLL_TIMEOUT = 100;
Expand Down Expand Up @@ -386,17 +385,9 @@ public void run()
records = consumer.poll(POLL_TIMEOUT);
}
catch (OffsetOutOfRangeException e) {
log.warn("OffsetOutOfRangeException with message [%s], retrying in %dms", e.getMessage(), POLL_RETRY_MS);
pollRetryLock.lockInterruptibly();
try {
long nanos = TimeUnit.MILLISECONDS.toNanos(POLL_RETRY_MS);
while (nanos > 0L && !pauseRequested && !stopRequested) {
nanos = isAwaitingRetry.awaitNanos(nanos);
}
}
finally {
pollRetryLock.unlock();
}
log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage());
possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, assignment);
stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty();
}

for (ConsumerRecord<byte[], byte[]> record : records) {
Expand Down Expand Up @@ -1000,4 +991,78 @@ private boolean possiblyPause(Set<Integer> assignment) throws InterruptedExcepti

return false;
}

private void possiblyResetOffsetsOrWait(
Map<TopicPartition, Long> outOfRangePartitions,
KafkaConsumer<byte[], byte[]> consumer,
Set<Integer> assignment
) throws InterruptedException
{
boolean shouldRetry = false;
if(tuningConfig.isResetOffsetAutomatically()) {
for (Map.Entry<TopicPartition, Long> outOfRangePartition : outOfRangePartitions.entrySet()) {
final TopicPartition topicPartition = outOfRangePartition.getKey();
final long nextOffset = outOfRangePartition.getValue();
// seek to the beginning to get the least available offset
consumer.seekToBeginning(topicPartition);
final long leastAvailableOffset = consumer.position(topicPartition);
// reset the seek
consumer.seek(topicPartition, nextOffset);
// Reset consumer offset if resetOffsetAutomatically is set to true
// and the current message offset in the kafka partition is more than the
// next message offset that we are trying to fetch
if (leastAvailableOffset > nextOffset) {
resetOffset(consumer, assignment, topicPartition);
} else {
shouldRetry = true;
}
}
} else {
shouldRetry = true;
}
if (shouldRetry) {
log.warn("Retrying in %dms", POLL_RETRY_MS);
pollRetryLock.lockInterruptibly();
try {
long nanos = TimeUnit.MILLISECONDS.toNanos(POLL_RETRY_MS);
while (nanos > 0L && !pauseRequested && !stopRequested) {
nanos = isAwaitingRetry.awaitNanos(nanos);
}
}
finally {
pollRetryLock.unlock();
}
}
}

private void resetOffset(
KafkaConsumer<byte[], byte[]> consumer,
Set<Integer> assignment,
TopicPartition topicPartition
)
{
log.warn(
"Resetting consumer offset to [%s] for partition [%d]",
ioConfig.isUseEarliestOffset() ? "earliest" : "latest",
topicPartition.partition()
);
if (ioConfig.isUseEarliestOffset()) {
consumer.seekToBeginning(topicPartition);
} else {
consumer.seekToEnd(topicPartition);
}
nextOffsets.put(topicPartition.partition(), consumer.position(topicPartition));
log.warn("Consumer is now at offset [%d]", nextOffsets.get(topicPartition.partition()));
// check if we seeked passed the endOffset for this partition
if (nextOffsets.get(topicPartition.partition()) >= endOffsets.get(topicPartition.partition())
&& assignment.remove(topicPartition.partition())) {
log.info(
"Finished reading topic[%s], partition[%,d].",
topicPartition.topic(),
topicPartition.partition()
);
}
// update assignments if something changed
assignPartitions(consumer, topicPartition.topic(), assignment);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
{
private static final int DEFAULT_MAX_ROWS_PER_SEGMENT = 5_000_000;
private static final boolean DEFAULT_RESET_OFFSET_AUTOMATICALLY = false;

private final int maxRowsInMemory;
private final int maxRowsPerSegment;
Expand All @@ -42,6 +43,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
private final boolean buildV9Directly;
private final boolean reportParseExceptions;
private final long handoffConditionTimeout;
private final boolean resetOffsetAutomatically;

@JsonCreator
public KafkaTuningConfig(
Expand All @@ -53,7 +55,8 @@ public KafkaTuningConfig(
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
@JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically
)
{
// Cannot be a static because default basePersistDirectory is unique per-instance
Expand All @@ -74,6 +77,9 @@ public KafkaTuningConfig(
this.handoffConditionTimeout = handoffConditionTimeout == null
? defaults.getHandoffConditionTimeout()
: handoffConditionTimeout;
this.resetOffsetAutomatically = resetOffsetAutomatically == null
? DEFAULT_RESET_OFFSET_AUTOMATICALLY
: resetOffsetAutomatically;
}

public static KafkaTuningConfig copyOf(KafkaTuningConfig config)
Expand All @@ -87,7 +93,8 @@ public static KafkaTuningConfig copyOf(KafkaTuningConfig config)
config.indexSpec,
config.buildV9Directly,
config.reportParseExceptions,
config.handoffConditionTimeout
config.handoffConditionTimeout,
config.resetOffsetAutomatically
);
}

Expand Down Expand Up @@ -145,6 +152,12 @@ public long getHandoffConditionTimeout()
return handoffConditionTimeout;
}

@JsonProperty
public boolean isResetOffsetAutomatically()
{
return resetOffsetAutomatically;
}

public KafkaTuningConfig withBasePersistDirectory(File dir)
{
return new KafkaTuningConfig(
Expand All @@ -156,7 +169,8 @@ public KafkaTuningConfig withBasePersistDirectory(File dir)
indexSpec,
buildV9Directly,
reportParseExceptions,
handoffConditionTimeout
handoffConditionTimeout,
resetOffsetAutomatically
);
}

Expand All @@ -171,7 +185,8 @@ public KafkaTuningConfig withMaxRowsInMemory(int rows)
indexSpec,
buildV9Directly,
reportParseExceptions,
handoffConditionTimeout
handoffConditionTimeout,
resetOffsetAutomatically
);
}

Expand Down Expand Up @@ -205,6 +220,9 @@ public boolean equals(Object o)
if (handoffConditionTimeout != that.handoffConditionTimeout) {
return false;
}
if (resetOffsetAutomatically != that.resetOffsetAutomatically) {
return false;
}
if (intermediatePersistPeriod != null
? !intermediatePersistPeriod.equals(that.intermediatePersistPeriod)
: that.intermediatePersistPeriod != null) {
Expand All @@ -215,7 +233,8 @@ public boolean equals(Object o)
: that.basePersistDirectory != null) {
return false;
}
return !(indexSpec != null ? !indexSpec.equals(that.indexSpec) : that.indexSpec != null);
return indexSpec != null ? indexSpec.equals(that.indexSpec) : that.indexSpec == null;

}

@Override
Expand All @@ -230,6 +249,7 @@ public int hashCode()
result = 31 * result + (buildV9Directly ? 1 : 0);
result = 31 * result + (reportParseExceptions ? 1 : 0);
result = 31 * result + (int) (handoffConditionTimeout ^ (handoffConditionTimeout >>> 32));
result = 31 * result + (resetOffsetAutomatically ? 1 : 0);
return result;
}

Expand All @@ -246,6 +266,7 @@ public String toString()
", buildV9Directly=" + buildV9Directly +
", reportParseExceptions=" + reportParseExceptions +
", handoffConditionTimeout=" + handoffConditionTimeout +
", resetOffsetAutomatically=" + resetOffsetAutomatically +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1261,7 +1261,8 @@ private void createKafkaTasksForGroup(int groupId, int replicas)
consumerProperties,
true,
false,
minimumMessageTime
minimumMessageTime,
ioConfig.isUseEarliestOffset()
);

for (int i = 0; i < replicas; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ public KafkaSupervisorSpec(
null,
null,
null,
null,
null
);
this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public KafkaSupervisorTuningConfig(
@JsonProperty("buildV9Directly") Boolean buildV9Directly,
@JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
@JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
@JsonProperty("resetOffsetAutomatically") Boolean resetOffsetAutomatically,
@JsonProperty("workerThreads") Integer workerThreads,
@JsonProperty("chatThreads") Integer chatThreads,
@JsonProperty("chatRetries") Long chatRetries,
Expand All @@ -61,7 +62,8 @@ public KafkaSupervisorTuningConfig(
indexSpec,
buildV9Directly,
reportParseExceptions,
handoffConditionTimeout
handoffConditionTimeout,
resetOffsetAutomatically
);

this.workerThreads = workerThreads;
Expand Down Expand Up @@ -114,6 +116,7 @@ public String toString()
", buildV9Directly=" + getBuildV9Directly() +
", reportParseExceptions=" + isReportParseExceptions() +
", handoffConditionTimeout=" + getHandoffConditionTimeout() +
", resetOffsetAutomatically=" + isResetOffsetAutomatically() +
", workerThreads=" + workerThreads +
", chatThreads=" + chatThreads +
", chatRetries=" + chatRetries +
Expand Down
Loading

0 comments on commit 7ee6bb7

Please sign in to comment.