Skip to content

Commit

Permalink
Kafka task minimum message time (apache#3035)
Browse files Browse the repository at this point in the history
* add KafkaIndexTask support for minimumMessageTime

* add Kafka supervisor support for lateMessageRejectionPeriod
  • Loading branch information
dclim authored and gianm committed May 31, 2016
1 parent e662efa commit f6c39cc
Show file tree
Hide file tree
Showing 10 changed files with 850 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|`period`|ISO8601 Period|How often the supervisor will execute its management logic. Note that the supervisor will also run in response to certain events (such as tasks succeeding, failing, and reaching their taskDuration) so this value specifies the maximum time between iterations.|no (default == PT30S)|
|`useEarliestOffset`|Boolean|If a supervisor is managing a dataSource for the first time, it will obtain a set of starting offsets from Kafka. This flag determines whether it retrieves the earliest or latest offsets in Kafka. Under normal circumstances, subsequent tasks will start from where the previous segments ended so this flag will only be used on first run.|no (default == false)|
|`completionTimeout`|ISO8601 Period|The length of time to wait before declaring a publishing task as failed and terminating it. If this is set too low, your tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|no (default == PT30M)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|

## Supervisor API

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import io.druid.segment.indexing.IOConfig;
import org.joda.time.DateTime;

import java.util.Map;

Expand All @@ -37,6 +39,7 @@ public class KafkaIOConfig implements IOConfig
private final Map<String, String> consumerProperties;
private final boolean useTransaction;
private final boolean pauseAfterRead;
private final Optional<DateTime> minimumMessageTime;

@JsonCreator
public KafkaIOConfig(
Expand All @@ -45,7 +48,8 @@ public KafkaIOConfig(
@JsonProperty("endPartitions") KafkaPartitions endPartitions,
@JsonProperty("consumerProperties") Map<String, String> consumerProperties,
@JsonProperty("useTransaction") Boolean useTransaction,
@JsonProperty("pauseAfterRead") Boolean pauseAfterRead
@JsonProperty("pauseAfterRead") Boolean pauseAfterRead,
@JsonProperty("minimumMessageTime") DateTime minimumMessageTime
)
{
this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "baseSequenceName");
Expand All @@ -54,6 +58,7 @@ public KafkaIOConfig(
this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
this.useTransaction = useTransaction != null ? useTransaction : DEFAULT_USE_TRANSACTION;
this.pauseAfterRead = pauseAfterRead != null ? pauseAfterRead : DEFAULT_PAUSE_AFTER_READ;
this.minimumMessageTime = Optional.fromNullable(minimumMessageTime);

Preconditions.checkArgument(
startPartitions.getTopic().equals(endPartitions.getTopic()),
Expand Down Expand Up @@ -111,6 +116,12 @@ public boolean isPauseAfterRead()
return pauseAfterRead;
}

@JsonProperty
public Optional<DateTime> getMinimumMessageTime()
{
return minimumMessageTime;
}

@Override
public String toString()
{
Expand All @@ -121,6 +132,7 @@ public String toString()
", consumerProperties=" + consumerProperties +
", useTransaction=" + useTransaction +
", pauseAfterRead=" + pauseAfterRead +
", minimumMessageTime=" + minimumMessageTime +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.TaskToolbox;
import io.druid.indexing.common.actions.SegmentInsertAction;
import io.druid.indexing.common.actions.SegmentTransactionalInsertAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.task.AbstractTask;
Expand Down Expand Up @@ -411,20 +410,27 @@ public boolean apply(Throwable input)

try {
final InputRow row = Preconditions.checkNotNull(parser.parse(ByteBuffer.wrap(record.value())), "row");
final SegmentIdentifier identifier = driver.add(
row,
sequenceNames.get(record.partition()),
committerSupplier
);

if (identifier == null) {
// Failure to allocate segment puts determinism at risk, bail out to be safe.
// May want configurable behavior here at some point.
// If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks.
throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp());
}
if (!ioConfig.getMinimumMessageTime().isPresent() ||
!ioConfig.getMinimumMessageTime().get().isAfter(row.getTimestamp())) {

final SegmentIdentifier identifier = driver.add(
row,
sequenceNames.get(record.partition()),
committerSupplier
);

fireDepartmentMetrics.incrementProcessed();
if (identifier == null) {
// Failure to allocate segment puts determinism at risk, bail out to be safe.
// May want configurable behavior here at some point.
// If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks.
throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp());
}

fireDepartmentMetrics.incrementProcessed();
} else {
fireDepartmentMetrics.incrementThrownAway();
}
}
catch (ParseException e) {
if (tuningConfig.isReportParseExceptions()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import com.metamx.common.ISE;
import com.metamx.emitter.EmittingLogger;
import io.druid.concurrent.Execs;
import io.druid.indexing.common.TaskLocation;
import io.druid.indexing.common.TaskInfoProvider;
import io.druid.indexing.common.TaskLocation;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.task.Task;
import io.druid.indexing.common.task.TaskResource;
Expand Down Expand Up @@ -113,11 +113,13 @@ private class TaskGroup
final Map<Integer, Long> partitionOffsets;

final Map<String, TaskData> tasks = new HashMap<>();
final Optional<DateTime> minimumMessageTime;
DateTime completionTimeout; // is set after signalTasksToFinish(); if not done by timeout, take corrective action

public TaskGroup(Map<Integer, Long> partitionOffsets)
public TaskGroup(Map<Integer, Long> partitionOffsets, Optional<DateTime> minimumMessageTime)
{
this.partitionOffsets = partitionOffsets;
this.minimumMessageTime = minimumMessageTime;
}
}

Expand Down Expand Up @@ -450,6 +452,9 @@ String generateSequenceName(int groupId)
}
String partitionOffsetStr = sb.toString().substring(1);

Optional<DateTime> minimumMessageTime = taskGroups.get(groupId).minimumMessageTime;
String minMsgTimeStr = (minimumMessageTime.isPresent() ? String.valueOf(minimumMessageTime.get().getMillis()) : "");

String dataSchema, tuningConfig;
try {
dataSchema = sortingMapper.writeValueAsString(spec.getDataSchema());
Expand All @@ -459,7 +464,8 @@ String generateSequenceName(int groupId)
throw Throwables.propagate(e);
}

String hashCode = DigestUtils.sha1Hex(dataSchema + tuningConfig + partitionOffsetStr).substring(0, 15);
String hashCode = DigestUtils.sha1Hex(dataSchema + tuningConfig + partitionOffsetStr + minMsgTimeStr)
.substring(0, 15);

return Joiner.on("_").join("index_kafka", dataSource, hashCode);
}
Expand Down Expand Up @@ -599,11 +605,19 @@ private void discoverTasks()
log.debug("Creating new task group [%d]", taskGroupId);
taskGroups.put(
taskGroupId,
new TaskGroup(kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap())
new TaskGroup(
kafkaTask.getIOConfig().getStartPartitions().getPartitionOffsetMap(),
kafkaTask.getIOConfig().getMinimumMessageTime()
)
);
}

taskGroups.get(taskGroupId).tasks.put(taskId, new TaskData());
if (!isTaskCurrent(taskGroupId, taskId)) {
log.info("Stopping task [%s] which does not match the expected parameters and ingestion spec", taskId);
stopTask(taskId, false);
} else {
taskGroups.get(taskGroupId).tasks.put(taskId, new TaskData());
}
}
}
}
Expand Down Expand Up @@ -635,7 +649,11 @@ private void addDiscoveredTaskToPendingCompletionTaskGroups(
}

log.info("Creating new pending completion task group for discovered task [%s]", taskId);
TaskGroup newTaskGroup = new TaskGroup(startingPartitions);

// reading the minimumMessageTime from the publishing task and setting it here is not necessary as this task cannot
// change to a state where it will read any more events
TaskGroup newTaskGroup = new TaskGroup(startingPartitions, Optional.<DateTime>absent());

newTaskGroup.tasks.put(taskId, new TaskData());
newTaskGroup.completionTimeout = DateTime.now().plus(ioConfig.getCompletionTimeout());

Expand Down Expand Up @@ -889,7 +907,8 @@ private void checkCurrentTaskState()
TaskGroup taskGroup = taskGroupEntry.getValue();

// Iterate the list of known tasks in this group and:
// 1) Kill any tasks which are not "current" (have the partitions and starting offsets in [taskGroups]
// 1) Kill any tasks which are not "current" (have the partitions, starting offsets, and minimumMessageTime
// (if applicable) in [taskGroups])
// 2) Remove any tasks that have failed from the list
// 3) If any task completed successfully, stop all the tasks in this group and move to the next group

Expand Down Expand Up @@ -933,7 +952,12 @@ void createNewTasks()
for (Integer groupId : partitionGroups.keySet()) {
if (!taskGroups.containsKey(groupId)) {
log.info("Creating new task group [%d] for partitions %s", groupId, partitionGroups.get(groupId).keySet());
taskGroups.put(groupId, new TaskGroup(generateStartingOffsetsForPartitionGroup(groupId)));

Optional<DateTime> minimumMessageTime = (ioConfig.getLateMessageRejectionPeriod().isPresent() ? Optional.of(
DateTime.now().minus(ioConfig.getLateMessageRejectionPeriod().get())
) : Optional.<DateTime>absent());

taskGroups.put(groupId, new TaskGroup(generateStartingOffsetsForPartitionGroup(groupId), minimumMessageTime));
}
}

Expand Down Expand Up @@ -971,14 +995,16 @@ private void createKafkaTasksForGroup(int groupId, int replicas)
String sequenceName = generateSequenceName(groupId);

Map<String, String> consumerProperties = Maps.newHashMap(ioConfig.getConsumerProperties());
DateTime minimumMessageTime = taskGroups.get(groupId).minimumMessageTime.orNull();

KafkaIOConfig kafkaIOConfig = new KafkaIOConfig(
sequenceName,
new KafkaPartitions(ioConfig.getTopic(), startPartitions),
new KafkaPartitions(ioConfig.getTopic(), endPartitions),
consumerProperties,
true,
false
false,
minimumMessageTime
);

for (int i = 0; i < replicas; i++) {
Expand Down Expand Up @@ -1098,7 +1124,8 @@ private long getOffsetFromKafkaForPartition(int partition, boolean useEarliestOf

/**
* Compares the sequence name from the task with one generated for the task's group ID and returns false if they do
* not match. The sequence name is generated from a hash of the dataSchema, tuningConfig, and starting offsets.
* not match. The sequence name is generated from a hash of the dataSchema, tuningConfig, starting offsets, and the
* minimumMessageTime if set.
*/
private boolean isTaskCurrent(int taskGroupId, String taskId)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.joda.time.Duration;
import org.joda.time.Period;
Expand All @@ -40,6 +41,7 @@ public class KafkaSupervisorIOConfig
private final Duration period;
private final Boolean useEarliestOffset;
private final Duration completionTimeout;
private final Optional<Duration> lateMessageRejectionPeriod;

@JsonCreator
public KafkaSupervisorIOConfig(
Expand All @@ -51,7 +53,8 @@ public KafkaSupervisorIOConfig(
@JsonProperty("startDelay") Period startDelay,
@JsonProperty("period") Period period,
@JsonProperty("useEarliestOffset") Boolean useEarliestOffset,
@JsonProperty("completionTimeout") Period completionTimeout
@JsonProperty("completionTimeout") Period completionTimeout,
@JsonProperty("lateMessageRejectionPeriod") Period lateMessageRejectionPeriod
)
{
this.topic = Preconditions.checkNotNull(topic, "topic");
Expand All @@ -68,6 +71,9 @@ public KafkaSupervisorIOConfig(
this.period = defaultDuration(period, "PT30S");
this.useEarliestOffset = (useEarliestOffset != null ? useEarliestOffset : false);
this.completionTimeout = defaultDuration(completionTimeout, "PT30M");
this.lateMessageRejectionPeriod = (lateMessageRejectionPeriod == null
? Optional.<Duration>absent()
: Optional.of(lateMessageRejectionPeriod.toStandardDuration()));
}

@JsonProperty
Expand Down Expand Up @@ -124,6 +130,12 @@ public Duration getCompletionTimeout()
return completionTimeout;
}

@JsonProperty
public Optional<Duration> getLateMessageRejectionPeriod()
{
return lateMessageRejectionPeriod;
}

private static Duration defaultDuration(final Period period, final String theDefault)
{
return (period == null ? new Period(theDefault) : period).toStandardDuration();
Expand Down
Loading

0 comments on commit f6c39cc

Please sign in to comment.