diff --git a/common/src/main/java/io/druid/metadata/MetadataStorageConnector.java b/common/src/main/java/io/druid/metadata/MetadataStorageConnector.java
index 497ff75f85a5..3d72fe3496cc 100644
--- a/common/src/main/java/io/druid/metadata/MetadataStorageConnector.java
+++ b/common/src/main/java/io/druid/metadata/MetadataStorageConnector.java
@@ -31,7 +31,6 @@ Void insertOrUpdate(
final byte[] value
) throws Exception;
-
byte[] lookup(
final String tableName,
final String keyColumn,
@@ -39,6 +38,8 @@ byte[] lookup(
final String key
);
+ void createDataSourceTable();
+
void createPendingSegmentsTable();
void createSegmentTable();
diff --git a/common/src/main/java/io/druid/metadata/MetadataStorageTablesConfig.java b/common/src/main/java/io/druid/metadata/MetadataStorageTablesConfig.java
index 8552ea45826a..20c870a8e070 100644
--- a/common/src/main/java/io/druid/metadata/MetadataStorageTablesConfig.java
+++ b/common/src/main/java/io/druid/metadata/MetadataStorageTablesConfig.java
@@ -31,7 +31,7 @@ public class MetadataStorageTablesConfig
{
public static MetadataStorageTablesConfig fromBase(String base)
{
- return new MetadataStorageTablesConfig(base, null, null, null, null, null, null, null, null);
+ return new MetadataStorageTablesConfig(base, null, null, null, null, null, null, null, null, null);
}
public static final String TASK_ENTRY_TYPE = "task";
@@ -45,6 +45,9 @@ public static MetadataStorageTablesConfig fromBase(String base)
@JsonProperty("base")
private final String base;
+ @JsonProperty("dataSource")
+ private final String dataSourceTable;
+
@JsonProperty("pendingSegments")
private final String pendingSegmentsTable;
@@ -72,6 +75,7 @@ public static MetadataStorageTablesConfig fromBase(String base)
@JsonCreator
public MetadataStorageTablesConfig(
@JsonProperty("base") String base,
+ @JsonProperty("dataSource") String dataSourceTable,
@JsonProperty("pendingSegments") String pendingSegmentsTable,
@JsonProperty("segments") String segmentsTable,
@JsonProperty("rules") String rulesTable,
@@ -83,6 +87,7 @@ public MetadataStorageTablesConfig(
)
{
this.base = (base == null) ? DEFAULT_BASE : base;
+ this.dataSourceTable = makeTableName(dataSourceTable, "dataSource");
this.pendingSegmentsTable = makeTableName(pendingSegmentsTable, "pendingSegments");
this.segmentsTable = makeTableName(segmentsTable, "segments");
this.rulesTable = makeTableName(rulesTable, "rules");
@@ -115,6 +120,11 @@ public String getBase()
return base;
}
+ public String getDataSourceTable()
+ {
+ return dataSourceTable;
+ }
+
public String getPendingSegmentsTable()
{
return pendingSegmentsTable;
diff --git a/distribution/pom.xml b/distribution/pom.xml
index eb6d4e20f569..2e089af916c6 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -81,6 +81,8 @@
-c
io.druid.extensions:druid-kafka-extraction-namespace
-c
+ io.druid.extensions:druid-kafka-indexing-service
+ -c
io.druid.extensions:mysql-metadata-storage
-c
io.druid.extensions:druid-namespace-lookup
diff --git a/extensions-core/kafka-indexing-service/pom.xml b/extensions-core/kafka-indexing-service/pom.xml
new file mode 100644
index 000000000000..7b32b5bf48b0
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/pom.xml
@@ -0,0 +1,91 @@
+
+
+
+
+ 4.0.0
+
+ io.druid.extensions
+ druid-kafka-indexing-service
+ druid-kafka-indexing-service
+ druid-kafka-indexing-service
+
+
+ io.druid
+ druid
+ 0.9.0-SNAPSHOT
+ ../../pom.xml
+
+
+
+
+ io.druid
+ druid-api
+
+
+ io.druid
+ druid-indexing-service
+ 0.9.0-SNAPSHOT
+ provided
+
+
+ org.apache.kafka
+ kafka-clients
+ 0.9.0.0
+
+
+
+
+ junit
+ junit
+ test
+
+
+ org.apache.kafka
+ kafka_2.11
+ 0.9.0.0
+ test
+
+
+ io.druid
+ druid-server
+ 0.9.0-SNAPSHOT
+ test-jar
+ test
+
+
+ io.druid
+ druid-indexing-service
+ 0.9.0-SNAPSHOT
+ test-jar
+ test
+
+
+ org.apache.curator
+ curator-test
+ test
+
+
+ org.easymock
+ easymock
+ test
+
+
+
+
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaDataSourceMetadata.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaDataSourceMetadata.java
new file mode 100644
index 000000000000..84b61dca2b09
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaDataSourceMetadata.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.kafka;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Maps;
+import com.metamx.common.IAE;
+import io.druid.indexing.overlord.DataSourceMetadata;
+
+import java.util.Map;
+import java.util.Objects;
+
+public class KafkaDataSourceMetadata implements DataSourceMetadata
+{
+ private final KafkaPartitions kafkaPartitions;
+
+ @JsonCreator
+ public KafkaDataSourceMetadata(
+ @JsonProperty("partitions") KafkaPartitions kafkaPartitions
+ )
+ {
+ this.kafkaPartitions = kafkaPartitions;
+ }
+
+ @JsonProperty("partitions")
+ public KafkaPartitions getKafkaPartitions()
+ {
+ return kafkaPartitions;
+ }
+
+ @Override
+ public boolean isValidStart()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean matches(DataSourceMetadata other)
+ {
+ if (getClass() != other.getClass()) {
+ return false;
+ }
+
+ return plus(other).equals(other.plus(this));
+ }
+
+ @Override
+ public DataSourceMetadata plus(DataSourceMetadata other)
+ {
+ if (!(other instanceof KafkaDataSourceMetadata)) {
+ throw new IAE(
+ "Expected instance of %s, got %s",
+ KafkaDataSourceMetadata.class.getCanonicalName(),
+ other.getClass().getCanonicalName()
+ );
+ }
+
+ final KafkaDataSourceMetadata that = (KafkaDataSourceMetadata) other;
+
+ if (that.getKafkaPartitions().getTopic().equals(kafkaPartitions.getTopic())) {
+ // Same topic, merge offsets.
+ final Map newMap = Maps.newHashMap();
+
+ for (Map.Entry entry : kafkaPartitions.getPartitionOffsetMap().entrySet()) {
+ newMap.put(entry.getKey(), entry.getValue());
+ }
+
+ for (Map.Entry entry : that.getKafkaPartitions().getPartitionOffsetMap().entrySet()) {
+ newMap.put(entry.getKey(), entry.getValue());
+ }
+
+ return new KafkaDataSourceMetadata(new KafkaPartitions(kafkaPartitions.getTopic(), newMap));
+ } else {
+ // Different topic, prefer "other".
+ return other;
+ }
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ KafkaDataSourceMetadata that = (KafkaDataSourceMetadata) o;
+ return Objects.equals(kafkaPartitions, that.kafkaPartitions);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(kafkaPartitions);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "KafkaDataSourceMetadata{" +
+ "kafkaPartitions=" + kafkaPartitions +
+ '}';
+ }
+}
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
new file mode 100644
index 000000000000..550465e6ef68
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.kafka;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import io.druid.segment.indexing.IOConfig;
+
+import java.util.Map;
+
+public class KafkaIOConfig implements IOConfig
+{
+ private static final boolean DEFAULT_USE_TRANSACTION = true;
+
+ private final String sequenceName;
+ private final KafkaPartitions startPartitions;
+ private final KafkaPartitions endPartitions;
+ private final Map consumerProperties;
+ private final boolean useTransaction;
+
+ @JsonCreator
+ public KafkaIOConfig(
+ @JsonProperty("sequenceName") String sequenceName,
+ @JsonProperty("startPartitions") KafkaPartitions startPartitions,
+ @JsonProperty("endPartitions") KafkaPartitions endPartitions,
+ @JsonProperty("consumerProperties") Map consumerProperties,
+ @JsonProperty("useTransaction") Boolean useTransaction
+ )
+ {
+ this.sequenceName = Preconditions.checkNotNull(sequenceName, "sequenceName");
+ this.startPartitions = Preconditions.checkNotNull(startPartitions, "startPartitions");
+ this.endPartitions = Preconditions.checkNotNull(endPartitions, "endPartitions");
+ this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties");
+ this.useTransaction = useTransaction != null ? useTransaction : DEFAULT_USE_TRANSACTION;
+
+ Preconditions.checkArgument(
+ startPartitions.getTopic().equals(endPartitions.getTopic()),
+ "start topic and end topic must match"
+ );
+
+ Preconditions.checkArgument(
+ startPartitions.getPartitionOffsetMap().keySet().equals(endPartitions.getPartitionOffsetMap().keySet()),
+ "start partition set and end partition set must match"
+ );
+
+ for (int partition : endPartitions.getPartitionOffsetMap().keySet()) {
+ Preconditions.checkArgument(
+ endPartitions.getPartitionOffsetMap().get(partition) >= startPartitions.getPartitionOffsetMap()
+ .get(partition),
+ "end offset must be >= start offset for partition[%d]",
+ partition
+ );
+ }
+ }
+
+ @JsonProperty
+ public String getSequenceName()
+ {
+ return sequenceName;
+ }
+
+ @JsonProperty
+ public KafkaPartitions getStartPartitions()
+ {
+ return startPartitions;
+ }
+
+ @JsonProperty
+ public KafkaPartitions getEndPartitions()
+ {
+ return endPartitions;
+ }
+
+ @JsonProperty
+ public Map getConsumerProperties()
+ {
+ return consumerProperties;
+ }
+
+ @JsonProperty
+ public boolean isUseTransaction()
+ {
+ return useTransaction;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "KafkaIOConfig{" +
+ "sequenceName='" + sequenceName + '\'' +
+ ", startPartitions=" + startPartitions +
+ ", endPartitions=" + endPartitions +
+ ", consumerProperties=" + consumerProperties +
+ ", useTransaction=" + useTransaction +
+ '}';
+ }
+}
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
new file mode 100644
index 000000000000..6474ca5cd241
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java
@@ -0,0 +1,587 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.kafka;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.base.Supplier;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.primitives.Ints;
+import com.metamx.common.ISE;
+import com.metamx.common.RetryUtils;
+import com.metamx.common.guava.Sequence;
+import com.metamx.common.logger.Logger;
+import com.metamx.common.parsers.ParseException;
+import io.druid.data.input.Committer;
+import io.druid.data.input.InputRow;
+import io.druid.data.input.impl.InputRowParser;
+import io.druid.indexing.appenderator.ActionBasedSegmentAllocator;
+import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker;
+import io.druid.indexing.common.TaskStatus;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.actions.SegmentInsertAction;
+import io.druid.indexing.common.actions.TaskActionClient;
+import io.druid.indexing.common.task.AbstractTask;
+import io.druid.indexing.common.task.TaskResource;
+import io.druid.query.DruidMetrics;
+import io.druid.query.NoopQueryRunner;
+import io.druid.query.Query;
+import io.druid.query.QueryRunner;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.indexing.RealtimeIOConfig;
+import io.druid.segment.realtime.FireDepartment;
+import io.druid.segment.realtime.FireDepartmentMetrics;
+import io.druid.segment.realtime.RealtimeMetricsMonitor;
+import io.druid.segment.realtime.appenderator.Appenderator;
+import io.druid.segment.realtime.appenderator.Appenderators;
+import io.druid.segment.realtime.appenderator.FiniteAppenderatorDriver;
+import io.druid.segment.realtime.appenderator.SegmentIdentifier;
+import io.druid.segment.realtime.appenderator.SegmentsAndMetadata;
+import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher;
+import io.druid.timeline.DataSegment;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+
+public class KafkaIndexTask extends AbstractTask
+{
+ private static final Logger log = new Logger(KafkaIndexTask.class);
+ private static final String TYPE = "index_kafka";
+ private static final Random RANDOM = new Random();
+ private static final long POLL_TIMEOUT = 100;
+ private static final String METADATA_NEXT_PARTITIONS = "nextPartitions";
+
+ private final DataSchema dataSchema;
+ private final InputRowParser parser;
+ private final KafkaTuningConfig tuningConfig;
+ private final KafkaIOConfig ioConfig;
+
+ private volatile Appenderator appenderator = null;
+ private volatile FireDepartmentMetrics fireDepartmentMetrics = null;
+ private volatile boolean startedReading = false;
+ private volatile boolean stopping = false;
+ private volatile boolean publishing = false;
+ private volatile Thread runThread = null;
+
+ @JsonCreator
+ public KafkaIndexTask(
+ @JsonProperty("id") String id,
+ @JsonProperty("resource") TaskResource taskResource,
+ @JsonProperty("dataSchema") DataSchema dataSchema,
+ @JsonProperty("tuningConfig") KafkaTuningConfig tuningConfig,
+ @JsonProperty("ioConfig") KafkaIOConfig ioConfig,
+ @JsonProperty("context") Map context
+ )
+ {
+ super(
+ id == null ? makeTaskId(dataSchema.getDataSource(), RANDOM.nextInt()) : id,
+ String.format("%s_%s", TYPE, dataSchema.getDataSource()),
+ taskResource,
+ dataSchema.getDataSource(),
+ context
+ );
+
+ this.dataSchema = Preconditions.checkNotNull(dataSchema, "dataSchema");
+ this.parser = Preconditions.checkNotNull((InputRowParser) dataSchema.getParser(), "parser");
+ this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig");
+ this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");
+ }
+
+ private static String makeTaskId(String dataSource, int randomBits)
+ {
+ final StringBuilder suffix = new StringBuilder(8);
+ for (int i = 0; i < Ints.BYTES * 2; ++i) {
+ suffix.append((char) ('a' + ((randomBits >>> (i * 4)) & 0x0F)));
+ }
+ return Joiner.on("_").join(TYPE, dataSource, suffix);
+ }
+
+ @Override
+ public String getType()
+ {
+ return TYPE;
+ }
+
+ @Override
+ public boolean isReady(TaskActionClient taskActionClient) throws Exception
+ {
+ return true;
+ }
+
+ @JsonProperty
+ public DataSchema getDataSchema()
+ {
+ return dataSchema;
+ }
+
+ @JsonProperty
+ public KafkaTuningConfig getTuningConfig()
+ {
+ return tuningConfig;
+ }
+
+ @JsonProperty("ioConfig")
+ public KafkaIOConfig getIOConfig()
+ {
+ return ioConfig;
+ }
+
+ /**
+ * Public for tests.
+ */
+ @JsonIgnore
+ public boolean hasStartedReading()
+ {
+ return startedReading;
+ }
+
+ @Override
+ public TaskStatus run(final TaskToolbox toolbox) throws Exception
+ {
+ log.info("Starting up!");
+
+ runThread = Thread.currentThread();
+
+ // Set up FireDepartmentMetrics
+ final FireDepartment fireDepartmentForMetrics = new FireDepartment(
+ dataSchema,
+ new RealtimeIOConfig(null, null, null),
+ null
+ );
+ fireDepartmentMetrics = fireDepartmentForMetrics.getMetrics();
+ toolbox.getMonitorScheduler().addMonitor(
+ new RealtimeMetricsMonitor(
+ ImmutableList.of(fireDepartmentForMetrics),
+ ImmutableMap.of(DruidMetrics.TASK_ID, new String[]{getId()})
+ )
+ );
+
+ try (
+ final Appenderator appenderator0 = newAppenderator(fireDepartmentMetrics, toolbox);
+ final FiniteAppenderatorDriver driver = newDriver(appenderator0, toolbox);
+ final KafkaConsumer consumer = newConsumer()
+ ) {
+ appenderator = appenderator0;
+
+ final String topic = ioConfig.getStartPartitions().getTopic();
+
+ // Start up, set up initial offsets.
+ final Object restoredMetadata = driver.startJob();
+ final Map nextOffsets = Maps.newHashMap();
+ if (restoredMetadata == null) {
+ nextOffsets.putAll(ioConfig.getStartPartitions().getPartitionOffsetMap());
+ } else {
+ final Map restoredMetadataMap = (Map) restoredMetadata;
+ final KafkaPartitions restoredNextPartitions = toolbox.getObjectMapper().convertValue(
+ restoredMetadataMap.get(METADATA_NEXT_PARTITIONS),
+ KafkaPartitions.class
+ );
+ nextOffsets.putAll(restoredNextPartitions.getPartitionOffsetMap());
+
+ // Sanity checks.
+ if (!restoredNextPartitions.getTopic().equals(ioConfig.getStartPartitions().getTopic())) {
+ throw new ISE(
+ "WTF?! Restored topic[%s] but expected topic[%s]",
+ restoredNextPartitions.getTopic(),
+ ioConfig.getStartPartitions().getTopic()
+ );
+ }
+
+ if (!nextOffsets.keySet().equals(ioConfig.getStartPartitions().getPartitionOffsetMap().keySet())) {
+ throw new ISE(
+ "WTF?! Restored partitions[%s] but expected partitions[%s]",
+ nextOffsets.keySet(),
+ ioConfig.getStartPartitions().getPartitionOffsetMap().keySet()
+ );
+ }
+ }
+
+ // Set up committer.
+ final Supplier committerSupplier = new Supplier()
+ {
+ @Override
+ public Committer get()
+ {
+ final Map snapshot = ImmutableMap.copyOf(nextOffsets);
+
+ return new Committer()
+ {
+ @Override
+ public Object getMetadata()
+ {
+ return ImmutableMap.of(
+ METADATA_NEXT_PARTITIONS, new KafkaPartitions(
+ ioConfig.getStartPartitions().getTopic(),
+ snapshot
+ )
+ );
+ }
+
+ @Override
+ public void run()
+ {
+ // Do nothing.
+ }
+ };
+ }
+ };
+
+ // Initialize consumer assignment.
+ final Set assignment = Sets.newHashSet();
+ for (Map.Entry entry : nextOffsets.entrySet()) {
+ final long endOffset = ioConfig.getEndPartitions().getPartitionOffsetMap().get(entry.getKey());
+ if (entry.getValue() < endOffset) {
+ assignment.add(entry.getKey());
+ } else if (entry.getValue() == endOffset) {
+ log.info("Finished reading partition[%d].", entry.getKey());
+ } else {
+ throw new ISE(
+ "WTF?! Cannot start from offset[%,d] > endOffset[%,d]",
+ entry.getValue(),
+ endOffset
+ );
+ }
+ }
+
+ assignPartitions(consumer, topic, assignment);
+
+ // Seek to starting offsets.
+ for (final int partition : assignment) {
+ final long offset = nextOffsets.get(partition);
+ log.info("Seeking partition[%d] to offset[%,d].", partition, offset);
+ consumer.seek(new TopicPartition(topic, partition), offset);
+ }
+
+ // Main loop.
+ // Could eventually support early termination (triggered by a supervisor)
+ // Could eventually support leader/follower mode (for keeping replicas more in sync)
+ boolean stillReading = true;
+ while (stillReading) {
+ if (stopping) {
+ log.info("Stopping early.");
+ break;
+ }
+
+ // The retrying business is because the KafkaConsumer throws OffsetOutOfRangeException if the seeked-to
+ // offset is not present in the topic-partition. This can happen if we're asking a task to read from data
+ // that has not been written yet (which is totally legitimate). So let's wait for it to show up.
+ final ConsumerRecords records = RetryUtils.retry(
+ new Callable>()
+ {
+ @Override
+ public ConsumerRecords call() throws Exception
+ {
+ try {
+ return consumer.poll(POLL_TIMEOUT);
+ }
+ finally {
+ startedReading = true;
+ }
+ }
+ },
+ new Predicate()
+ {
+ @Override
+ public boolean apply(Throwable input)
+ {
+ return input instanceof OffsetOutOfRangeException;
+ }
+ },
+ Integer.MAX_VALUE
+ );
+
+ for (ConsumerRecord record : records) {
+ if (log.isTraceEnabled()) {
+ log.trace(
+ "Got topic[%s] partition[%d] offset[%,d].",
+ record.topic(),
+ record.partition(),
+ record.offset()
+ );
+ }
+
+ if (record.offset() < ioConfig.getEndPartitions().getPartitionOffsetMap().get(record.partition())) {
+ if (record.offset() != nextOffsets.get(record.partition())) {
+ throw new ISE(
+ "WTF?! Got offset[%,d] after offset[%,d] in partition[%d].",
+ record.offset(),
+ nextOffsets.get(record.partition()),
+ record.partition()
+ );
+ }
+
+ try {
+ final InputRow row = Preconditions.checkNotNull(parser.parse(ByteBuffer.wrap(record.value())), "row");
+ final SegmentIdentifier identifier = driver.add(row, committerSupplier);
+
+ if (identifier == null) {
+ // Failure to allocate segment puts determinism at risk, bail out to be safe.
+ // May want configurable behavior here at some point.
+ // If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks.
+ throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp());
+ }
+
+ fireDepartmentMetrics.incrementProcessed();
+ }
+ catch (ParseException e) {
+ if (tuningConfig.isReportParseExceptions()) {
+ throw e;
+ } else {
+ log.debug(
+ e,
+ "Dropping unparseable row from partition[%d] offset[%,d].",
+ record.partition(),
+ record.offset()
+ );
+
+ fireDepartmentMetrics.incrementUnparseable();
+ }
+ }
+
+ final long nextOffset = record.offset() + 1;
+ final long endOffset = ioConfig.getEndPartitions().getPartitionOffsetMap().get(record.partition());
+
+ nextOffsets.put(record.partition(), nextOffset);
+
+ if (nextOffset == endOffset && assignment.remove(record.partition())) {
+ log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition());
+ assignPartitions(consumer, topic, assignment);
+ stillReading = !assignment.isEmpty();
+ }
+ }
+ }
+ }
+
+ // Persist pending data.
+ final Committer finalCommitter = committerSupplier.get();
+ driver.persist(finalCommitter);
+
+ publishing = true;
+ if (stopping) {
+ // Stopped gracefully. Exit code shouldn't matter, so fail to be on the safe side.
+ return TaskStatus.failure(getId());
+ }
+
+ final TransactionalSegmentPublisher publisher = new TransactionalSegmentPublisher()
+ {
+ @Override
+ public boolean publishSegments(Set segments, Object commitMetadata) throws IOException
+ {
+ // Sanity check, we should only be publishing things that match our desired end state.
+ if (!ioConfig.getEndPartitions().equals(((Map) commitMetadata).get(METADATA_NEXT_PARTITIONS))) {
+ throw new ISE("WTF?! Driver attempted to publish invalid metadata[%s].", commitMetadata);
+ }
+
+ final SegmentInsertAction action;
+
+ if (ioConfig.isUseTransaction()) {
+ action = new SegmentInsertAction(
+ segments,
+ new KafkaDataSourceMetadata(ioConfig.getStartPartitions()),
+ new KafkaDataSourceMetadata(ioConfig.getEndPartitions())
+ );
+ } else {
+ action = new SegmentInsertAction(segments, null, null);
+ }
+
+ log.info("Publishing with isTransaction[%s].", ioConfig.isUseTransaction());
+
+ return toolbox.getTaskActionClient().submit(action).isSuccess();
+ }
+ };
+
+ final SegmentsAndMetadata published = driver.finish(publisher, committerSupplier.get());
+ if (published == null) {
+ throw new ISE("Transaction failure publishing segments, aborting");
+ } else {
+ log.info(
+ "Published segments[%s] with metadata[%s].",
+ Joiner.on(", ").join(
+ Iterables.transform(
+ published.getSegments(),
+ new Function()
+ {
+ @Override
+ public String apply(DataSegment input)
+ {
+ return input.getIdentifier();
+ }
+ }
+ )
+ ),
+ published.getCommitMetadata()
+ );
+ }
+ }
+
+ return success();
+ }
+
+ @Override
+ public boolean canRestore()
+ {
+ return true;
+ }
+
+ @Override
+ public void stopGracefully()
+ {
+ log.info("Stopping gracefully.");
+
+ stopping = true;
+ if (publishing && runThread.isAlive()) {
+ log.info("stopGracefully: Run thread started publishing, interrupting it.");
+ runThread.interrupt();
+ }
+ }
+
+ @Override
+ public QueryRunner getQueryRunner(Query query)
+ {
+ if (appenderator == null) {
+ // Not yet initialized, no data yet, just return a noop runner.
+ return new NoopQueryRunner<>();
+ }
+
+ return new QueryRunner()
+ {
+ @Override
+ public Sequence run(final Query query, final Map responseContext)
+ {
+ return query.run(appenderator, responseContext);
+ }
+ };
+ }
+
+ @VisibleForTesting
+ public FireDepartmentMetrics getFireDepartmentMetrics()
+ {
+ return fireDepartmentMetrics;
+ }
+
+ private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox)
+ {
+ return Appenderators.createRealtime(
+ dataSchema,
+ tuningConfig.withBasePersistDirectory(new File(toolbox.getTaskWorkDir(), "persist")),
+ metrics,
+ toolbox.getSegmentPusher(),
+ toolbox.getObjectMapper(),
+ toolbox.getIndexIO(),
+ tuningConfig.getBuildV9Directly() ? toolbox.getIndexMergerV9() : toolbox.getIndexMerger(),
+ toolbox.getQueryRunnerFactoryConglomerate(),
+ toolbox.getSegmentAnnouncer(),
+ toolbox.getEmitter(),
+ toolbox.getQueryExecutorService(),
+ toolbox.getCache(),
+ toolbox.getCacheConfig()
+ );
+ }
+
+ private FiniteAppenderatorDriver newDriver(
+ final Appenderator appenderator,
+ final TaskToolbox toolbox
+ )
+ {
+ return new FiniteAppenderatorDriver(
+ appenderator,
+ new ActionBasedSegmentAllocator(
+ toolbox.getTaskActionClient(),
+ dataSchema,
+ ioConfig.getSequenceName()
+ ),
+ toolbox.getSegmentHandoffNotifierFactory(),
+ new ActionBasedUsedSegmentChecker(toolbox.getTaskActionClient()),
+ toolbox.getObjectMapper(),
+ tuningConfig.getMaxRowsPerSegment(),
+ tuningConfig.getHandoffConditionTimeout()
+ );
+ }
+
+ private KafkaConsumer newConsumer()
+ {
+ ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+
+ final Properties props = new Properties();
+
+ for (Map.Entry entry : ioConfig.getConsumerProperties().entrySet()) {
+ props.setProperty(entry.getKey(), entry.getValue());
+ }
+
+ props.setProperty("enable.auto.commit", "false");
+ props.setProperty("auto.offset.reset", "none");
+ props.setProperty("key.deserializer", ByteArrayDeserializer.class.getName());
+ props.setProperty("value.deserializer", ByteArrayDeserializer.class.getName());
+
+ return new KafkaConsumer<>(props);
+ }
+ finally {
+ Thread.currentThread().setContextClassLoader(currCtxCl);
+ }
+ }
+
+ private static void assignPartitions(
+ final KafkaConsumer consumer,
+ final String topic,
+ final Set partitions
+ )
+ {
+ consumer.assign(
+ Lists.newArrayList(
+ Iterables.transform(
+ partitions,
+ new Function()
+ {
+ @Override
+ public TopicPartition apply(Integer n)
+ {
+ return new TopicPartition(topic, n);
+ }
+ }
+ )
+ )
+ );
+ }
+}
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskModule.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskModule.java
new file mode 100644
index 000000000000..cce67287be50
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTaskModule.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.kafka;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.common.collect.ImmutableList;
+import com.google.inject.Binder;
+import io.druid.initialization.DruidModule;
+
+import java.util.List;
+
+public class KafkaIndexTaskModule implements DruidModule
+{
+ @Override
+ public List extends Module> getJacksonModules()
+ {
+ return ImmutableList.of(
+ new SimpleModule(getClass().getSimpleName())
+ .registerSubtypes(
+ new NamedType(KafkaIndexTask.class, "index_kafka"),
+ new NamedType(KafkaDataSourceMetadata.class, "kafka"),
+ new NamedType(KafkaIOConfig.class, "kafka"),
+ new NamedType(KafkaTuningConfig.class, "kafka")
+ )
+ );
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+ }
+}
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaPartitions.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaPartitions.java
new file mode 100644
index 000000000000..f0d7370bfc8f
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaPartitions.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.kafka;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+import java.util.Objects;
+
+public class KafkaPartitions
+{
+ private final String topic;
+ private final Map partitionOffsetMap;
+
+ @JsonCreator
+ public KafkaPartitions(
+ @JsonProperty("topic") final String topic,
+ @JsonProperty("partitionOffsetMap") final Map partitionOffsetMap
+ )
+ {
+ this.topic = topic;
+ this.partitionOffsetMap = ImmutableMap.copyOf(partitionOffsetMap);
+
+ // Validate partitionOffsetMap
+ for (Map.Entry entry : partitionOffsetMap.entrySet()) {
+ Preconditions.checkArgument(
+ entry.getValue() >= 0,
+ String.format(
+ "partition[%d] offset[%d] invalid",
+ entry.getKey(),
+ entry.getValue()
+ )
+ );
+ }
+ }
+
+ @JsonProperty
+ public String getTopic()
+ {
+ return topic;
+ }
+
+ @JsonProperty
+ public Map getPartitionOffsetMap()
+ {
+ return partitionOffsetMap;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ KafkaPartitions that = (KafkaPartitions) o;
+ return Objects.equals(topic, that.topic) &&
+ Objects.equals(partitionOffsetMap, that.partitionOffsetMap);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(topic, partitionOffsetMap);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "KafkaPartitions{" +
+ "topic='" + topic + '\'' +
+ ", partitionOffsetMap=" + partitionOffsetMap +
+ '}';
+ }
+}
diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java
new file mode 100644
index 000000000000..374b2dec909c
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.kafka;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.druid.segment.IndexSpec;
+import io.druid.segment.indexing.RealtimeTuningConfig;
+import io.druid.segment.indexing.TuningConfig;
+import io.druid.segment.realtime.appenderator.AppenderatorConfig;
+import org.joda.time.Period;
+
+import java.io.File;
+
+public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
+{
+ private static final int DEFAULT_MAX_ROWS_PER_SEGMENT = 5_000_000;
+
+ private final int maxRowsInMemory;
+ private final int maxRowsPerSegment;
+ private final Period intermediatePersistPeriod;
+ private final File basePersistDirectory;
+ private final int maxPendingPersists;
+ private final IndexSpec indexSpec;
+ private final boolean buildV9Directly;
+ private final boolean reportParseExceptions;
+ private final long handoffConditionTimeout;
+
+ @JsonCreator
+ public KafkaTuningConfig(
+ @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
+ @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
+ @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
+ @JsonProperty("basePersistDirectory") File basePersistDirectory,
+ @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
+ @JsonProperty("indexSpec") IndexSpec indexSpec,
+ @JsonProperty("buildV9Directly") Boolean buildV9Directly,
+ @JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
+ @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout
+ )
+ {
+ // Cannot be a static because default basePersistDirectory is unique per-instance
+ final RealtimeTuningConfig defaults = RealtimeTuningConfig.makeDefaultTuningConfig(basePersistDirectory);
+
+ this.maxRowsInMemory = maxRowsInMemory == null ? defaults.getMaxRowsInMemory() : maxRowsInMemory;
+ this.maxRowsPerSegment = maxRowsPerSegment == null ? DEFAULT_MAX_ROWS_PER_SEGMENT : maxRowsPerSegment;
+ this.intermediatePersistPeriod = intermediatePersistPeriod == null
+ ? defaults.getIntermediatePersistPeriod()
+ : intermediatePersistPeriod;
+ this.basePersistDirectory = defaults.getBasePersistDirectory();
+ this.maxPendingPersists = maxPendingPersists == null ? defaults.getMaxPendingPersists() : maxPendingPersists;
+ this.indexSpec = indexSpec == null ? defaults.getIndexSpec() : indexSpec;
+ this.buildV9Directly = buildV9Directly == null ? defaults.getBuildV9Directly() : buildV9Directly;
+ this.reportParseExceptions = reportParseExceptions == null
+ ? defaults.isReportParseExceptions()
+ : reportParseExceptions;
+ this.handoffConditionTimeout = handoffConditionTimeout == null
+ ? defaults.getHandoffConditionTimeout()
+ : handoffConditionTimeout;
+ }
+
+ @JsonProperty
+ public int getMaxRowsInMemory()
+ {
+ return maxRowsInMemory;
+ }
+
+ @JsonProperty
+ public int getMaxRowsPerSegment()
+ {
+ return maxRowsPerSegment;
+ }
+
+ @JsonProperty
+ public Period getIntermediatePersistPeriod()
+ {
+ return intermediatePersistPeriod;
+ }
+
+ @JsonProperty
+ public File getBasePersistDirectory()
+ {
+ return basePersistDirectory;
+ }
+
+ @JsonProperty
+ public int getMaxPendingPersists()
+ {
+ return maxPendingPersists;
+ }
+
+ @JsonProperty
+ public IndexSpec getIndexSpec()
+ {
+ return indexSpec;
+ }
+
+ @JsonProperty
+ public boolean getBuildV9Directly()
+ {
+ return buildV9Directly;
+ }
+
+ @JsonProperty
+ public boolean isReportParseExceptions()
+ {
+ return reportParseExceptions;
+ }
+
+ @JsonProperty
+ public long getHandoffConditionTimeout()
+ {
+ return handoffConditionTimeout;
+ }
+
+ public KafkaTuningConfig withBasePersistDirectory(File dir)
+ {
+ return new KafkaTuningConfig(
+ maxRowsInMemory,
+ maxRowsPerSegment,
+ intermediatePersistPeriod,
+ dir,
+ maxPendingPersists,
+ indexSpec,
+ buildV9Directly,
+ reportParseExceptions,
+ handoffConditionTimeout
+ );
+ }
+}
diff --git a/extensions-core/kafka-indexing-service/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions-core/kafka-indexing-service/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
new file mode 100644
index 000000000000..16aec94a8dfe
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/main/resources/META-INF/services/io.druid.initialization.DruidModule
@@ -0,0 +1 @@
+io.druid.indexing.kafka.KafkaIndexTaskModule
diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaDataSourceMetadataTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaDataSourceMetadataTest.java
new file mode 100644
index 000000000000..32e9b08022c1
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaDataSourceMetadataTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.kafka;
+
+import com.google.common.collect.ImmutableMap;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Map;
+
+public class KafkaDataSourceMetadataTest
+{
+ private static final KafkaDataSourceMetadata KM0 = KM("foo", ImmutableMap.of());
+ private static final KafkaDataSourceMetadata KM1 = KM("foo", ImmutableMap.of(0, 2L, 1, 3L));
+ private static final KafkaDataSourceMetadata KM2 = KM("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L));
+ private static final KafkaDataSourceMetadata KM3 = KM("foo", ImmutableMap.of(0, 2L, 2, 5L));
+
+ @Test
+ public void testMatches()
+ {
+ Assert.assertTrue(KM0.matches(KM0));
+ Assert.assertTrue(KM0.matches(KM1));
+ Assert.assertTrue(KM0.matches(KM2));
+ Assert.assertTrue(KM0.matches(KM3));
+
+ Assert.assertTrue(KM1.matches(KM0));
+ Assert.assertTrue(KM1.matches(KM1));
+ Assert.assertFalse(KM1.matches(KM2));
+ Assert.assertTrue(KM1.matches(KM3));
+
+ Assert.assertTrue(KM2.matches(KM0));
+ Assert.assertFalse(KM2.matches(KM1));
+ Assert.assertTrue(KM2.matches(KM2));
+ Assert.assertTrue(KM2.matches(KM3));
+
+ Assert.assertTrue(KM3.matches(KM0));
+ Assert.assertTrue(KM3.matches(KM1));
+ Assert.assertTrue(KM3.matches(KM2));
+ Assert.assertTrue(KM3.matches(KM3));
+ }
+
+ @Test
+ public void testIsValidStart()
+ {
+ Assert.assertTrue(KM0.isValidStart());
+ Assert.assertTrue(KM1.isValidStart());
+ Assert.assertTrue(KM2.isValidStart());
+ Assert.assertTrue(KM3.isValidStart());
+ }
+
+ @Test
+ public void testPlus()
+ {
+ Assert.assertEquals(
+ KM("foo", ImmutableMap.of(0, 2L, 1, 3L, 2, 5L)),
+ KM1.plus(KM3)
+ );
+
+ Assert.assertEquals(
+ KM("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)),
+ KM0.plus(KM2)
+ );
+
+ Assert.assertEquals(
+ KM("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)),
+ KM1.plus(KM2)
+ );
+
+ Assert.assertEquals(
+ KM("foo", ImmutableMap.of(0, 2L, 1, 3L, 2, 5L)),
+ KM2.plus(KM1)
+ );
+
+ Assert.assertEquals(
+ KM("foo", ImmutableMap.of(0, 2L, 1, 4L, 2, 5L)),
+ KM2.plus(KM2)
+ );
+ }
+
+ private static KafkaDataSourceMetadata KM(String topic, Map offsets)
+ {
+ return new KafkaDataSourceMetadata(new KafkaPartitions(topic, offsets));
+ }
+}
diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
new file mode 100644
index 000000000000..70911cc3dd73
--- /dev/null
+++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -0,0 +1,1244 @@
+/*
+ * Licensed to Metamarkets Group Inc. (Metamarkets) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Metamarkets licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package io.druid.indexing.kafka;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Charsets;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.base.Predicates;
+import com.google.common.base.Throwables;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.metamx.common.CompressionUtils;
+import com.metamx.common.Granularity;
+import com.metamx.common.ISE;
+import com.metamx.common.guava.Sequences;
+import com.metamx.common.logger.Logger;
+import com.metamx.emitter.EmittingLogger;
+import com.metamx.emitter.core.LoggingEmitter;
+import com.metamx.emitter.service.ServiceEmitter;
+import com.metamx.metrics.MonitorScheduler;
+import io.druid.client.cache.CacheConfig;
+import io.druid.client.cache.MapCache;
+import io.druid.concurrent.Execs;
+import io.druid.data.input.impl.DimensionsSpec;
+import io.druid.data.input.impl.JSONParseSpec;
+import io.druid.data.input.impl.JSONPathFieldSpec;
+import io.druid.data.input.impl.JSONPathSpec;
+import io.druid.data.input.impl.StringInputRowParser;
+import io.druid.data.input.impl.TimestampSpec;
+import io.druid.granularity.QueryGranularity;
+import io.druid.indexing.common.SegmentLoaderFactory;
+import io.druid.indexing.common.TaskLock;
+import io.druid.indexing.common.TaskStatus;
+import io.druid.indexing.common.TaskToolbox;
+import io.druid.indexing.common.TaskToolboxFactory;
+import io.druid.indexing.common.TestUtils;
+import io.druid.indexing.common.actions.LocalTaskActionClientFactory;
+import io.druid.indexing.common.actions.TaskActionClientFactory;
+import io.druid.indexing.common.actions.TaskActionToolbox;
+import io.druid.indexing.common.config.TaskConfig;
+import io.druid.indexing.common.config.TaskStorageConfig;
+import io.druid.indexing.common.task.Task;
+import io.druid.indexing.kafka.test.TestBroker;
+import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
+import io.druid.indexing.overlord.MetadataTaskStorage;
+import io.druid.indexing.overlord.TaskLockbox;
+import io.druid.indexing.overlord.TaskStorage;
+import io.druid.indexing.test.TestDataSegmentAnnouncer;
+import io.druid.indexing.test.TestDataSegmentKiller;
+import io.druid.jackson.DefaultObjectMapper;
+import io.druid.metadata.EntryExistsException;
+import io.druid.metadata.IndexerSQLMetadataStorageCoordinator;
+import io.druid.metadata.SQLMetadataStorageActionHandlerFactory;
+import io.druid.metadata.TestDerbyConnector;
+import io.druid.query.DefaultQueryRunnerFactoryConglomerate;
+import io.druid.query.Druids;
+import io.druid.query.IntervalChunkingQueryRunnerDecorator;
+import io.druid.query.Query;
+import io.druid.query.QueryRunner;
+import io.druid.query.QueryRunnerFactory;
+import io.druid.query.QueryRunnerFactoryConglomerate;
+import io.druid.query.QueryToolChest;
+import io.druid.query.QueryWatcher;
+import io.druid.query.Result;
+import io.druid.query.SegmentDescriptor;
+import io.druid.query.aggregation.AggregatorFactory;
+import io.druid.query.aggregation.CountAggregatorFactory;
+import io.druid.query.aggregation.LongSumAggregatorFactory;
+import io.druid.query.timeseries.TimeseriesQuery;
+import io.druid.query.timeseries.TimeseriesQueryEngine;
+import io.druid.query.timeseries.TimeseriesQueryQueryToolChest;
+import io.druid.query.timeseries.TimeseriesQueryRunnerFactory;
+import io.druid.query.timeseries.TimeseriesResultValue;
+import io.druid.segment.IndexIO;
+import io.druid.segment.QueryableIndex;
+import io.druid.segment.column.DictionaryEncodedColumn;
+import io.druid.segment.indexing.DataSchema;
+import io.druid.segment.indexing.granularity.UniformGranularitySpec;
+import io.druid.segment.loading.DataSegmentPusher;
+import io.druid.segment.loading.LocalDataSegmentPusher;
+import io.druid.segment.loading.LocalDataSegmentPusherConfig;
+import io.druid.segment.loading.SegmentLoaderConfig;
+import io.druid.segment.loading.SegmentLoaderLocalCacheManager;
+import io.druid.segment.loading.StorageLocationConfig;
+import io.druid.segment.realtime.plumber.SegmentHandoffNotifier;
+import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
+import io.druid.timeline.DataSegment;
+import org.apache.curator.test.TestingCluster;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.easymock.EasyMock;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+@RunWith(Parameterized.class)
+public class KafkaIndexTaskTest
+{
+ private final boolean buildV9Directly;
+ private long handoffConditionTimeout = 0;
+ private boolean reportParseExceptions = false;
+ private boolean doHandoff = true;
+
+ private TestingCluster zkServer;
+ private TestBroker kafkaServer;
+ private ServiceEmitter emitter;
+ private ListeningExecutorService taskExec;
+ private TaskToolboxFactory toolboxFactory;
+ private IndexerMetadataStorageCoordinator metadataStorageCoordinator;
+ private TaskStorage taskStorage;
+ private TaskLockbox taskLockbox;
+ private File directory;
+
+ private final List runningTasks = Lists.newArrayList();
+
+ private static final Logger log = new Logger(KafkaIndexTaskTest.class);
+
+ private static final DataSchema DATA_SCHEMA;
+
+ private static final List> RECORDS = ImmutableList.of(
+ new ProducerRecord("topic0", 0, null, JB("2008", "a", "y", 1.0f)),
+ new ProducerRecord("topic0", 0, null, JB("2009", "b", "y", 1.0f)),
+ new ProducerRecord("topic0", 0, null, JB("2010", "c", "y", 1.0f)),
+ new ProducerRecord("topic0", 0, null, JB("2011", "d", "y", 1.0f)),
+ new ProducerRecord("topic0", 0, null, JB("2011", "e", "y", 1.0f)),
+ new ProducerRecord("topic0", 0, null, "unparseable".getBytes()),
+ new ProducerRecord("topic0", 0, null, JB("2013", "f", "y", 1.0f)),
+ new ProducerRecord("topic0", 1, null, JB("2012", "g", "y", 1.0f))
+ );
+
+ static {
+ ObjectMapper objectMapper = new DefaultObjectMapper();
+ DATA_SCHEMA = new DataSchema(
+ "test_ds",
+ objectMapper.convertValue(
+ new StringInputRowParser(
+ new JSONParseSpec(
+ new TimestampSpec("timestamp", "iso", null),
+ new DimensionsSpec(
+ ImmutableList.of("dim1", "dim2"),
+ null,
+ null
+ ),
+ new JSONPathSpec(true, ImmutableList.of()),
+ ImmutableMap.of()
+ ),
+ Charsets.UTF_8.name()
+ ),
+ Map.class
+ ),
+ new AggregatorFactory[]{new CountAggregatorFactory("rows")},
+ new UniformGranularitySpec(Granularity.DAY, QueryGranularity.NONE, null),
+ objectMapper
+ );
+ }
+
+ @Parameterized.Parameters(name = "buildV9Directly = {0}")
+ public static Iterable