diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaDataSourceMetadata.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaDataSourceMetadata.java index e4c69de5ec66..b0b0d7be5c78 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaDataSourceMetadata.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaDataSourceMetadata.java @@ -95,6 +95,36 @@ public DataSourceMetadata plus(DataSourceMetadata other) } } + @Override + public DataSourceMetadata minus(DataSourceMetadata other) + { + if (!(other instanceof KafkaDataSourceMetadata)) { + throw new IAE( + "Expected instance of %s, got %s", + KafkaDataSourceMetadata.class.getCanonicalName(), + other.getClass().getCanonicalName() + ); + } + + final KafkaDataSourceMetadata that = (KafkaDataSourceMetadata) other; + + if (that.getKafkaPartitions().getTopic().equals(kafkaPartitions.getTopic())) { + // Same topic, remove partitions present in "that" from "this" + final Map newMap = Maps.newHashMap(); + + for (Map.Entry entry : kafkaPartitions.getPartitionOffsetMap().entrySet()) { + if(!that.getKafkaPartitions().getPartitionOffsetMap().containsKey(entry.getKey())) { + newMap.put(entry.getKey(), entry.getValue()); + } + } + + return new KafkaDataSourceMetadata(new KafkaPartitions(kafkaPartitions.getTopic(), newMap)); + } else { + // Different topic, prefer "this". + return this; + } + } + @Override public boolean equals(Object o) { diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java index 4d62437faac8..6307d77ed12b 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java @@ -32,7 +32,6 @@ public class KafkaIOConfig implements IOConfig { private static final boolean DEFAULT_USE_TRANSACTION = true; private static final boolean DEFAULT_PAUSE_AFTER_READ = false; - private static final boolean DEFAULT_USE_EARLIEST_OFFSET = false; private final String baseSequenceName; private final KafkaPartitions startPartitions; @@ -41,7 +40,6 @@ public class KafkaIOConfig implements IOConfig private final boolean useTransaction; private final boolean pauseAfterRead; private final Optional minimumMessageTime; - private final boolean useEarliestOffset; @JsonCreator public KafkaIOConfig( @@ -51,8 +49,7 @@ public KafkaIOConfig( @JsonProperty("consumerProperties") Map consumerProperties, @JsonProperty("useTransaction") Boolean useTransaction, @JsonProperty("pauseAfterRead") Boolean pauseAfterRead, - @JsonProperty("minimumMessageTime") DateTime minimumMessageTime, - @JsonProperty("useEarliestOffset") Boolean useEarliestOffset + @JsonProperty("minimumMessageTime") DateTime minimumMessageTime ) { this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "baseSequenceName"); @@ -62,7 +59,6 @@ public KafkaIOConfig( this.useTransaction = useTransaction != null ? useTransaction : DEFAULT_USE_TRANSACTION; this.pauseAfterRead = pauseAfterRead != null ? pauseAfterRead : DEFAULT_PAUSE_AFTER_READ; this.minimumMessageTime = Optional.fromNullable(minimumMessageTime); - this.useEarliestOffset = useEarliestOffset != null ? useEarliestOffset : DEFAULT_USE_EARLIEST_OFFSET; Preconditions.checkArgument( startPartitions.getTopic().equals(endPartitions.getTopic()), @@ -126,12 +122,6 @@ public Optional getMinimumMessageTime() return minimumMessageTime; } - @JsonProperty - public boolean isUseEarliestOffset() - { - return useEarliestOffset; - } - @Override public String toString() { @@ -143,7 +133,6 @@ public String toString() ", useTransaction=" + useTransaction + ", pauseAfterRead=" + pauseAfterRead + ", minimumMessageTime=" + minimumMessageTime + - ", useEarliestOffest=" + useEarliestOffset + '}'; } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 89e00051aca8..cf3bc5232c08 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -46,6 +46,7 @@ import io.druid.indexing.appenderator.ActionBasedUsedSegmentChecker; import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TaskToolbox; +import io.druid.indexing.common.actions.ResetDataSourceMetadataAction; import io.druid.indexing.common.actions.SegmentTransactionalInsertAction; import io.druid.indexing.common.actions.TaskActionClient; import io.druid.indexing.common.task.AbstractTask; @@ -386,7 +387,7 @@ public void run() } catch (OffsetOutOfRangeException e) { log.warn("OffsetOutOfRangeException with message [%s]", e.getMessage()); - possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, assignment); + possiblyResetOffsetsOrWait(e.offsetOutOfRangePartitions(), consumer, toolbox); stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty(); } @@ -995,11 +996,12 @@ private boolean possiblyPause(Set assignment) throws InterruptedExcepti private void possiblyResetOffsetsOrWait( Map outOfRangePartitions, KafkaConsumer consumer, - Set assignment - ) throws InterruptedException + TaskToolbox taskToolbox + ) throws InterruptedException, IOException { - boolean shouldRetry = false; - if(tuningConfig.isResetOffsetAutomatically()) { + final Map resetPartitions = Maps.newHashMap(); + boolean doReset = false; + if (tuningConfig.isResetOffsetAutomatically()) { for (Map.Entry outOfRangePartition : outOfRangePartitions.entrySet()) { final TopicPartition topicPartition = outOfRangePartition.getKey(); final long nextOffset = outOfRangePartition.getValue(); @@ -1012,15 +1014,15 @@ private void possiblyResetOffsetsOrWait( // and the current message offset in the kafka partition is more than the // next message offset that we are trying to fetch if (leastAvailableOffset > nextOffset) { - resetOffset(consumer, assignment, topicPartition); - } else { - shouldRetry = true; + doReset = true; + resetPartitions.put(topicPartition, nextOffset); } } - } else { - shouldRetry = true; } - if (shouldRetry) { + + if (doReset) { + sendResetRequestAndWait(resetPartitions, taskToolbox); + } else { log.warn("Retrying in %dms", POLL_RETRY_MS); pollRetryLock.lockInterruptibly(); try { @@ -1035,34 +1037,33 @@ private void possiblyResetOffsetsOrWait( } } - private void resetOffset( - KafkaConsumer consumer, - Set assignment, - TopicPartition topicPartition - ) + private void sendResetRequestAndWait(Map outOfRangePartitions, TaskToolbox taskToolbox) throws IOException { - log.warn( - "Resetting consumer offset to [%s] for partition [%d]", - ioConfig.isUseEarliestOffset() ? "earliest" : "latest", - topicPartition.partition() - ); - if (ioConfig.isUseEarliestOffset()) { - consumer.seekToBeginning(topicPartition); - } else { - consumer.seekToEnd(topicPartition); + Map partitionOffsetMap = Maps.newHashMap(); + for (Map.Entry outOfRangePartition: outOfRangePartitions.entrySet()) { + partitionOffsetMap.put(outOfRangePartition.getKey().partition(), outOfRangePartition.getValue()); } - nextOffsets.put(topicPartition.partition(), consumer.position(topicPartition)); - log.warn("Consumer is now at offset [%d]", nextOffsets.get(topicPartition.partition())); - // check if we seeked passed the endOffset for this partition - if (nextOffsets.get(topicPartition.partition()) >= endOffsets.get(topicPartition.partition()) - && assignment.remove(topicPartition.partition())) { - log.info( - "Finished reading topic[%s], partition[%,d].", - topicPartition.topic(), - topicPartition.partition() - ); + boolean result = taskToolbox.getTaskActionClient() + .submit(new ResetDataSourceMetadataAction( + getDataSource(), + new KafkaDataSourceMetadata(new KafkaPartitions( + ioConfig.getStartPartitions() + .getTopic(), + partitionOffsetMap + )) + )); + + if (result) { + log.warn("Successfully sent the reset request for partitions [%s], waiting to be killed", partitionOffsetMap.keySet()); + // wait for being killed by supervisor + try { + Thread.sleep(Long.MAX_VALUE); + } + catch (InterruptedException e) { + throw new RuntimeException("Got interrupted while waiting to be killed"); + } + } else { + log.makeAlert("Failed to send reset request for partitions [%s]", partitionOffsetMap.keySet()).emit(); } - // update assignments if something changed - assignPartitions(consumer, topicPartition.topic(), assignment); } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java index 9a013440de47..2ec0a8b1fad1 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -63,6 +63,7 @@ import io.druid.indexing.overlord.TaskStorage; import io.druid.indexing.overlord.supervisor.Supervisor; import io.druid.indexing.overlord.supervisor.SupervisorReport; +import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; import io.druid.java.util.common.collect.JavaCompatUtils; import io.druid.metadata.EntryExistsException; @@ -74,6 +75,7 @@ import org.joda.time.DateTime; import javax.annotation.Nullable; +import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -417,10 +419,10 @@ public SupervisorReport getStatus() } @Override - public void reset() + public void reset(DataSourceMetadata dataSourceMetadata) { log.info("Posting ResetNotice"); - notices.add(new ResetNotice()); + notices.add(new ResetNotice(dataSourceMetadata)); } public void possiblyRegisterListener() @@ -506,29 +508,113 @@ public void handle() throws InterruptedException, ExecutionException, TimeoutExc private class ResetNotice implements Notice { + final DataSourceMetadata dataSourceMetadata; + + ResetNotice(DataSourceMetadata dataSourceMetadata) + { + this.dataSourceMetadata = dataSourceMetadata; + } + @Override public void handle() { - resetInternal(); + log.makeAlert("Resetting dataSource [%s]", dataSource).emit(); + resetInternal(dataSourceMetadata); } } @VisibleForTesting - void resetInternal() + void resetInternal(DataSourceMetadata dataSourceMetadata) { - boolean result = indexerMetadataStorageCoordinator.deleteDataSourceMetadata(dataSource); - log.info("Reset dataSource[%s] - dataSource metadata entry deleted? [%s]", dataSource, result); + if (dataSourceMetadata == null) { + // Reset everything + boolean result = indexerMetadataStorageCoordinator.deleteDataSourceMetadata(dataSource); + log.info("Reset dataSource[%s] - dataSource metadata entry deleted? [%s]", dataSource, result); + killTaskGroupForPartitions(JavaCompatUtils.keySet(taskGroups)); + } else if (!(dataSourceMetadata instanceof KafkaDataSourceMetadata)) { + throw new IAE("Expected KafkaDataSourceMetadata but found instance of [%s]", dataSourceMetadata.getClass()); + } else { + // Reset only the partitions in dataSourceMetadata if it has not been reset yet + final KafkaDataSourceMetadata resetKafkaMetadata = (KafkaDataSourceMetadata) dataSourceMetadata; + + if (resetKafkaMetadata.getKafkaPartitions().getTopic().equals(ioConfig.getTopic())) { + // metadata can be null + final DataSourceMetadata metadata = indexerMetadataStorageCoordinator.getDataSourceMetadata(dataSource); + if (metadata != null && !(metadata instanceof KafkaDataSourceMetadata)) { + throw new IAE( + "Expected KafkaDataSourceMetadata from metadata store but found instance of [%s]", + metadata.getClass() + ); + } + final KafkaDataSourceMetadata currentMetadata = (KafkaDataSourceMetadata) metadata; + + // defend against consecutive reset requests from replicas + // as well as the case where the metadata store do not have an entry for the reset partitions + boolean doReset = false; + for (Map.Entry resetPartitionOffset : resetKafkaMetadata.getKafkaPartitions() + .getPartitionOffsetMap() + .entrySet()) { + final Long partitionOffsetInMetadataStore = currentMetadata == null + ? null + : currentMetadata.getKafkaPartitions() + .getPartitionOffsetMap() + .get(resetPartitionOffset.getKey()); + final TaskGroup partitionTaskGroup = taskGroups.get(getTaskGroupIdForPartition(resetPartitionOffset.getKey())); + if (partitionOffsetInMetadataStore != null || + (partitionTaskGroup != null && partitionTaskGroup.partitionOffsets.get(resetPartitionOffset.getKey()) + .equals(resetPartitionOffset.getValue()))) { + doReset = true; + break; + } + } - for (TaskGroup taskGroup : taskGroups.values()) { - for (Map.Entry entry : taskGroup.tasks.entrySet()) { - String taskId = entry.getKey(); - log.info("Reset dataSource[%s] - killing task [%s]", dataSource, taskId); - killTask(taskId); + if (!doReset) { + return; + } + + boolean metadataUpdateSuccess = false; + if (currentMetadata == null) { + metadataUpdateSuccess = true; + } else { + final DataSourceMetadata newMetadata = currentMetadata.minus(resetKafkaMetadata); + try { + metadataUpdateSuccess = indexerMetadataStorageCoordinator.resetDataSourceMetadata(dataSource, newMetadata); + } + catch (IOException e) { + log.error("Resetting DataSourceMetadata failed [%s]", e.getMessage()); + Throwables.propagate(e); + } + } + if (metadataUpdateSuccess) { + killTaskGroupForPartitions(JavaCompatUtils.keySet(resetKafkaMetadata.getKafkaPartitions() + .getPartitionOffsetMap())); + } else { + throw new ISE("Unable to reset metadata"); + } + } else { + log.warn( + "Reset metadata topic [%s] and supervisor's topic [%s] do not match", + resetKafkaMetadata.getKafkaPartitions().getTopic(), + ioConfig.getTopic() + ); } } + } - partitionGroups.clear(); - taskGroups.clear(); + private void killTaskGroupForPartitions(Set partitions) + { + for (Integer partition : partitions) { + TaskGroup taskGroup = taskGroups.get(getTaskGroupIdForPartition(partition)); + if (taskGroup != null) { + // kill all tasks in this task group + for (String taskId : JavaCompatUtils.keySet(taskGroup.tasks)) { + log.info("Reset dataSource[%s] - killing task [%s]", dataSource, taskId); + killTask(taskId); + } + } + partitionGroups.remove(getTaskGroupIdForPartition(partition)); + taskGroups.remove(getTaskGroupIdForPartition(partition)); + } } @VisibleForTesting @@ -1287,8 +1373,7 @@ private void createKafkaTasksForGroup(int groupId, int replicas) consumerProperties, true, false, - minimumMessageTime, - ioConfig.isUseEarliestOffset() + minimumMessageTime ); for (int i = 0; i < replicas; i++) { diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaDataSourceMetadataTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaDataSourceMetadataTest.java index 32e9b08022c1..73cc8ee53126 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaDataSourceMetadataTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaDataSourceMetadataTest.java @@ -94,6 +94,35 @@ public void testPlus() ); } + @Test + public void testMinus() + { + Assert.assertEquals( + KM("foo", ImmutableMap.of(1, 3L)), + KM1.minus(KM3) + ); + + Assert.assertEquals( + KM("foo", ImmutableMap.of()), + KM0.minus(KM2) + ); + + Assert.assertEquals( + KM("foo", ImmutableMap.of()), + KM1.minus(KM2) + ); + + Assert.assertEquals( + KM("foo", ImmutableMap.of(2, 5L)), + KM2.minus(KM1) + ); + + Assert.assertEquals( + KM("foo", ImmutableMap.of()), + KM2.minus(KM2) + ); + } + private static KafkaDataSourceMetadata KM(String topic, Map offsets) { return new KafkaDataSourceMetadata(new KafkaPartitions(topic, offsets)); diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 7535fc7c288d..e1a1309867d8 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -69,6 +69,7 @@ import io.druid.indexing.overlord.MetadataTaskStorage; import io.druid.indexing.overlord.TaskLockbox; import io.druid.indexing.overlord.TaskStorage; +import io.druid.indexing.overlord.supervisor.SupervisorManager; import io.druid.indexing.test.TestDataSegmentAnnouncer; import io.druid.indexing.test.TestDataSegmentKiller; import io.druid.jackson.DefaultObjectMapper; @@ -304,7 +305,6 @@ public void testRunAfterDataInserted() throws Exception kafkaServer.consumerProperties(), true, false, - null, null ), null, @@ -347,7 +347,6 @@ public void testRunBeforeDataInserted() throws Exception kafkaServer.consumerProperties(), true, false, - null, null ), null, @@ -402,8 +401,7 @@ public void testRunWithMinimumMessageTime() throws Exception kafkaServer.consumerProperties(), true, false, - new DateTime("2010"), - null + new DateTime("2010") ), null, null @@ -464,7 +462,6 @@ public void testRunOnNothing() throws Exception kafkaServer.consumerProperties(), true, false, - null, null ), null, @@ -506,7 +503,6 @@ public void testHandoffConditionTimeoutWhenHandoffOccurs() throws Exception kafkaServer.consumerProperties(), true, false, - null, null ), null, @@ -559,7 +555,6 @@ public void testHandoffConditionTimeoutWhenHandoffDoesNotOccur() throws Exceptio kafkaServer.consumerProperties(), true, false, - null, null ), null, @@ -611,7 +606,6 @@ public void testReportParseExceptions() throws Exception kafkaServer.consumerProperties(), true, false, - null, null ), null, @@ -645,7 +639,6 @@ public void testRunReplicas() throws Exception kafkaServer.consumerProperties(), true, false, - null, null ), null, @@ -660,7 +653,6 @@ public void testRunReplicas() throws Exception kafkaServer.consumerProperties(), true, false, - null, null ), null, @@ -715,7 +707,6 @@ public void testRunConflicting() throws Exception kafkaServer.consumerProperties(), true, false, - null, null ), null, @@ -730,7 +721,6 @@ public void testRunConflicting() throws Exception kafkaServer.consumerProperties(), true, false, - null, null ), null, @@ -786,7 +776,6 @@ public void testRunConflictingWithoutTransactions() throws Exception kafkaServer.consumerProperties(), false, false, - null, null ), null, @@ -801,7 +790,6 @@ public void testRunConflictingWithoutTransactions() throws Exception kafkaServer.consumerProperties(), false, false, - null, null ), null, @@ -862,7 +850,6 @@ public void testRunOneTaskTwoPartitions() throws Exception kafkaServer.consumerProperties(), true, false, - null, null ), null, @@ -920,7 +907,6 @@ public void testRunTwoTasksTwoPartitions() throws Exception kafkaServer.consumerProperties(), true, false, - null, null ), null, @@ -935,7 +921,6 @@ public void testRunTwoTasksTwoPartitions() throws Exception kafkaServer.consumerProperties(), true, false, - null, null ), null, @@ -992,7 +977,6 @@ public void testRestore() throws Exception kafkaServer.consumerProperties(), true, false, - null, null ), null, @@ -1028,7 +1012,6 @@ public void testRestore() throws Exception kafkaServer.consumerProperties(), true, false, - null, null ), null, @@ -1081,7 +1064,6 @@ public void testRunWithPauseAndResume() throws Exception kafkaServer.consumerProperties(), true, false, - null, null ), null, @@ -1165,7 +1147,6 @@ public void testRunAndPauseAfterReadWithModifiedEndOffsets() throws Exception kafkaServer.consumerProperties(), true, true, - null, null ), null, @@ -1253,7 +1234,6 @@ public void testRunWithOffsetOutOfRangeExceptionAndPause() throws Exception kafkaServer.consumerProperties(), true, false, - null, null ), null, @@ -1290,7 +1270,6 @@ public void testRunWithOffsetOutOfRangeExceptionAndNextOffsetGreaterThanLeastAva kafkaServer.consumerProperties(), true, false, - null, null ), null, @@ -1468,7 +1447,8 @@ private void makeToolboxFactory() throws IOException final TaskActionToolbox taskActionToolbox = new TaskActionToolbox( taskLockbox, metadataStorageCoordinator, - emitter + emitter, + new SupervisorManager(null) ); final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory( taskStorage, diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index d84fb89a6658..8fbdb1d6cf06 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -26,7 +26,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; - import io.druid.data.input.impl.DimensionSchema; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.JSONParseSpec; @@ -49,6 +48,7 @@ import io.druid.indexing.kafka.KafkaPartitions; import io.druid.indexing.kafka.KafkaTuningConfig; import io.druid.indexing.kafka.test.TestBroker; +import io.druid.indexing.overlord.DataSourceMetadata; import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import io.druid.indexing.overlord.TaskMaster; import io.druid.indexing.overlord.TaskQueue; @@ -1485,9 +1485,86 @@ public void testResetNoTasks() throws Exception expect(indexerMetadataStorageCoordinator.deleteDataSourceMetadata(DATASOURCE)).andReturn(true); replay(indexerMetadataStorageCoordinator); - supervisor.resetInternal(); + supervisor.resetInternal(null); + verifyAll(); + + } + + @Test + public void testResetDataSourceMetadata() throws Exception + { + supervisor = getSupervisor(1, 1, true, "PT1H", null); + expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); + expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); + replayAll(); + + supervisor.start(); + supervisor.runInternal(); + verifyAll(); + + Capture captureDataSource = EasyMock.newCapture(); + Capture captureDataSourceMetadata = EasyMock.newCapture(); + + KafkaDataSourceMetadata kafkaDataSourceMetadata = new KafkaDataSourceMetadata(new KafkaPartitions( + KAFKA_TOPIC, + ImmutableMap.of(0, 1000L, 1, 1000L, 2, 1000L) + )); + + KafkaDataSourceMetadata resetMetadata = new KafkaDataSourceMetadata(new KafkaPartitions( + KAFKA_TOPIC, + ImmutableMap.of(1, 1000L, 2, 1000L) + )); + + KafkaDataSourceMetadata expectedMetadata = new KafkaDataSourceMetadata(new KafkaPartitions( + KAFKA_TOPIC, + ImmutableMap.of(0, 1000L) + )); + + reset(indexerMetadataStorageCoordinator); + expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(kafkaDataSourceMetadata); + expect(indexerMetadataStorageCoordinator.resetDataSourceMetadata( + EasyMock.capture(captureDataSource), + EasyMock.capture(captureDataSourceMetadata) + )).andReturn(true); + replay(indexerMetadataStorageCoordinator); + + supervisor.resetInternal(resetMetadata); + verifyAll(); + + Assert.assertEquals(captureDataSource.getValue(), DATASOURCE); + Assert.assertEquals(captureDataSourceMetadata.getValue(), expectedMetadata); + } + + @Test + public void testResetNoDataSourceMetadata() throws Exception + { + supervisor = getSupervisor(1, 1, true, "PT1H", null); + expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + expect(taskRunner.getRunningTasks()).andReturn(Collections.EMPTY_LIST).anyTimes(); + expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + taskRunner.registerListener(anyObject(TaskRunnerListener.class), anyObject(Executor.class)); + replayAll(); + + supervisor.start(); + supervisor.runInternal(); verifyAll(); + KafkaDataSourceMetadata resetMetadata = new KafkaDataSourceMetadata(new KafkaPartitions( + KAFKA_TOPIC, + ImmutableMap.of(1, 1000L, 2, 1000L) + )); + + reset(indexerMetadataStorageCoordinator); + // no DataSourceMetadata in metadata store + expect(indexerMetadataStorageCoordinator.getDataSourceMetadata(DATASOURCE)).andReturn(null); + replay(indexerMetadataStorageCoordinator); + + supervisor.resetInternal(resetMetadata); + verifyAll(); } @Test @@ -1566,7 +1643,7 @@ public void testResetRunningTasks() throws Exception taskQueue.shutdown("id3"); replay(taskQueue, indexerMetadataStorageCoordinator); - supervisor.resetInternal(); + supervisor.resetInternal(null); verifyAll(); } @@ -1702,8 +1779,7 @@ private KafkaIndexTask createKafkaIndexTask( ImmutableMap.of(), true, false, - minimumMessageTime, - null + minimumMessageTime ), ImmutableMap.of(), null diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/ResetDataSourceMetadataAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/ResetDataSourceMetadataAction.java new file mode 100644 index 000000000000..d874e0d4477c --- /dev/null +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/ResetDataSourceMetadataAction.java @@ -0,0 +1,85 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.indexing.common.actions; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import io.druid.indexing.common.task.Task; +import io.druid.indexing.overlord.DataSourceMetadata; + +import java.io.IOException; + +public class ResetDataSourceMetadataAction implements TaskAction +{ + private final String dataSource; + private final DataSourceMetadata resetMetadata; + + public ResetDataSourceMetadataAction( + @JsonProperty("dataSource") String dataSource, + @JsonProperty("resetMetadata") DataSourceMetadata resetMetadata + ) + { + this.dataSource = dataSource; + this.resetMetadata = resetMetadata; + } + + @JsonProperty + public String getDataSource() + { + return dataSource; + } + + @JsonProperty + public DataSourceMetadata getResetMetadata() + { + return resetMetadata; + } + + @Override + public TypeReference getReturnTypeReference() + { + return new TypeReference() + { + }; + } + + @Override + public Boolean perform( + Task task, TaskActionToolbox toolbox + ) throws IOException + { + return toolbox.getSupervisorManager().resetSupervisor(dataSource, resetMetadata); + } + + @Override + public boolean isAudited() + { + return true; + } + + @Override + public String toString() + { + return "ResetDataSourceMetadataAction{" + + "dataSource='" + dataSource + '\'' + + ", resetMetadata=" + resetMetadata + + '}'; + } +} diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java index 26dc41f1d8a4..c625e68cc801 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskAction.java @@ -38,7 +38,8 @@ @JsonSubTypes.Type(name = "segmentListUnused", value = SegmentListUnusedAction.class), @JsonSubTypes.Type(name = "segmentNuke", value = SegmentNukeAction.class), @JsonSubTypes.Type(name = "segmentMetadataUpdate", value = SegmentMetadataUpdateAction.class), - @JsonSubTypes.Type(name = "segmentAllocate", value = SegmentAllocateAction.class) + @JsonSubTypes.Type(name = "segmentAllocate", value = SegmentAllocateAction.class), + @JsonSubTypes.Type(name = "resetDataSourceMetadata", value = ResetDataSourceMetadataAction.class) }) public interface TaskAction { diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java index 690bc66937b8..c24e1fbb2656 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/TaskActionToolbox.java @@ -27,6 +27,7 @@ import io.druid.indexing.common.task.Task; import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import io.druid.indexing.overlord.TaskLockbox; +import io.druid.indexing.overlord.supervisor.SupervisorManager; import io.druid.java.util.common.ISE; import io.druid.timeline.DataSegment; @@ -38,17 +39,20 @@ public class TaskActionToolbox private final TaskLockbox taskLockbox; private final IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; private final ServiceEmitter emitter; + private final SupervisorManager supervisorManager; @Inject public TaskActionToolbox( TaskLockbox taskLockbox, IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, - ServiceEmitter emitter + ServiceEmitter emitter, + SupervisorManager supervisorManager ) { this.taskLockbox = taskLockbox; this.indexerMetadataStorageCoordinator = indexerMetadataStorageCoordinator; this.emitter = emitter; + this.supervisorManager = supervisorManager; } public TaskLockbox getTaskLockbox() @@ -66,6 +70,11 @@ public ServiceEmitter getEmitter() return emitter; } + public SupervisorManager getSupervisorManager() + { + return supervisorManager; + } + public void verifyTaskLocks( final Task task, final Set segments diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java index 505eec19a233..7f4670792153 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -25,12 +25,14 @@ import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; +import io.druid.indexing.overlord.DataSourceMetadata; import io.druid.java.util.common.Pair; import io.druid.java.util.common.collect.JavaCompatUtils; import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; import io.druid.metadata.MetadataSupervisorManager; +import javax.annotation.Nullable; import java.util.List; import java.util.Map; import java.util.Set; @@ -141,7 +143,7 @@ public Optional getSupervisorStatus(String id) return supervisor == null ? Optional.absent() : Optional.fromNullable(supervisor.lhs.getStatus()); } - public boolean resetSupervisor(String id) + public boolean resetSupervisor(String id, @Nullable DataSourceMetadata dataSourceMetadata) { Preconditions.checkState(started, "SupervisorManager not started"); Preconditions.checkNotNull(id, "id"); @@ -152,7 +154,7 @@ public boolean resetSupervisor(String id) return false; } - supervisor.lhs.reset(); + supervisor.lhs.reset(dataSourceMetadata); return true; } diff --git a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorResource.java b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorResource.java index 018afa992409..a8627965c60e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorResource.java +++ b/indexing-service/src/main/java/io/druid/indexing/overlord/supervisor/SupervisorResource.java @@ -217,7 +217,7 @@ public Response reset(@PathParam("id") final String id) @Override public Response apply(SupervisorManager manager) { - if (manager.resetSupervisor(id)) { + if (manager.resetSupervisor(id, null)) { return Response.ok(ImmutableMap.of("id", id)).build(); } else { return Response.status(Response.Status.NOT_FOUND) diff --git a/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java b/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java index 0178ecfaef5c..3fe74a780eb9 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/actions/TaskActionTestKit.java @@ -26,11 +26,13 @@ import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import io.druid.indexing.overlord.TaskLockbox; import io.druid.indexing.overlord.TaskStorage; +import io.druid.indexing.overlord.supervisor.SupervisorManager; import io.druid.metadata.IndexerSQLMetadataStorageCoordinator; import io.druid.metadata.MetadataStorageConnectorConfig; import io.druid.metadata.MetadataStorageTablesConfig; import io.druid.metadata.TestDerbyConnector; import io.druid.server.metrics.NoopServiceEmitter; +import org.easymock.EasyMock; import org.joda.time.Period; import org.junit.rules.ExternalResource; @@ -91,7 +93,8 @@ public void before() taskActionToolbox = new TaskActionToolbox( taskLockbox, metadataStorageCoordinator, - new NoopServiceEmitter() + new NoopServiceEmitter(), + EasyMock.createMock(SupervisorManager.class) ); testDerbyConnector.createDataSourceTable(); testDerbyConnector.createPendingSegmentsTable(); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index 14b55be17e11..d959abad0545 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -60,6 +60,7 @@ import io.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import io.druid.indexing.overlord.TaskLockbox; import io.druid.indexing.overlord.TaskStorage; +import io.druid.indexing.overlord.supervisor.SupervisorManager; import io.druid.indexing.test.TestDataSegmentAnnouncer; import io.druid.indexing.test.TestDataSegmentKiller; import io.druid.indexing.test.TestDataSegmentPusher; @@ -960,7 +961,8 @@ private TaskToolbox makeToolbox( final TaskActionToolbox taskActionToolbox = new TaskActionToolbox( taskLockbox, mdc, - emitter + emitter, + EasyMock.createMock(SupervisorManager.class) ); final TaskActionClientFactory taskActionClientFactory = new LocalTaskActionClientFactory( taskStorage, diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index 0efce007a62a..96ec7f44d050 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -54,6 +54,7 @@ import io.druid.indexing.common.config.TaskStorageConfig; import io.druid.indexing.overlord.HeapMemoryTaskStorage; import io.druid.indexing.overlord.TaskLockbox; +import io.druid.indexing.overlord.supervisor.SupervisorManager; import io.druid.java.util.common.logger.Logger; import io.druid.metadata.IndexerSQLMetadataStorageCoordinator; import io.druid.query.aggregation.AggregatorFactory; @@ -201,7 +202,7 @@ public void deleteSegments(Set segments) }; final LocalTaskActionClientFactory tac = new LocalTaskActionClientFactory( ts, - new TaskActionToolbox(tl, mdc, newMockEmitter()) + new TaskActionToolbox(tl, mdc, newMockEmitter(), EasyMock.createMock(SupervisorManager.class)) ); SegmentHandoffNotifierFactory notifierFactory = EasyMock.createNiceMock(SegmentHandoffNotifierFactory.class); EasyMock.replay(notifierFactory); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index 658771872d98..d65177c600bf 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -66,6 +66,7 @@ import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.TaskResource; import io.druid.indexing.overlord.config.TaskQueueConfig; +import io.druid.indexing.overlord.supervisor.SupervisorManager; import io.druid.indexing.test.TestIndexerMetadataStorageCoordinator; import io.druid.jackson.DefaultObjectMapper; import io.druid.java.util.common.Granularity; @@ -507,7 +508,8 @@ private TaskToolboxFactory setUpTaskToolboxFactory( Preconditions.checkNotNull(emitter); taskLockbox = new TaskLockbox(taskStorage); - tac = new LocalTaskActionClientFactory(taskStorage, new TaskActionToolbox(taskLockbox, mdc, emitter)); + tac = new LocalTaskActionClientFactory(taskStorage, new TaskActionToolbox(taskLockbox, mdc, emitter, EasyMock.createMock( + SupervisorManager.class))); File tmpDir = temporaryFolder.newFolder(); taskConfig = new TaskConfig(tmpDir.toString(), null, null, 50000, null, false, null, null); diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index 0861bfb77f89..415c3e4620fe 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -21,6 +21,7 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; +import io.druid.indexing.overlord.DataSourceMetadata; import io.druid.metadata.MetadataSupervisorManager; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; @@ -258,12 +259,12 @@ public void testResetSupervisor() throws Exception EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); supervisor1.start(); - supervisor1.reset(); + supervisor1.reset(EasyMock.anyObject(DataSourceMetadata.class)); replayAll(); manager.start(); - Assert.assertTrue("resetValidSupervisor", manager.resetSupervisor("id1")); - Assert.assertFalse("resetInvalidSupervisor", manager.resetSupervisor("nobody_home")); + Assert.assertTrue("resetValidSupervisor", manager.resetSupervisor("id1", null)); + Assert.assertFalse("resetInvalidSupervisor", manager.resetSupervisor("nobody_home", null)); verifyAll(); } diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index 0eff330a801f..398b4603bb3c 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -24,7 +24,9 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; +import io.druid.indexing.overlord.DataSourceMetadata; import io.druid.indexing.overlord.TaskMaster; +import org.easymock.Capture; import org.easymock.EasyMock; import org.easymock.EasyMockRunner; import org.easymock.EasyMockSupport; @@ -270,9 +272,11 @@ public void testSpecGetHistory() throws Exception @Test public void testReset() throws Exception { + Capture id1 = Capture.newInstance(); + Capture id2 = Capture.newInstance(); EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.of(supervisorManager)).times(2); - EasyMock.expect(supervisorManager.resetSupervisor("my-id")).andReturn(true); - EasyMock.expect(supervisorManager.resetSupervisor("my-id-2")).andReturn(false); + EasyMock.expect(supervisorManager.resetSupervisor(EasyMock.capture(id1), EasyMock.anyObject(DataSourceMetadata.class))).andReturn(true); + EasyMock.expect(supervisorManager.resetSupervisor(EasyMock.capture(id2), EasyMock.anyObject(DataSourceMetadata.class))).andReturn(false); replayAll(); Response response = supervisorResource.reset("my-id"); @@ -283,6 +287,8 @@ public void testReset() throws Exception response = supervisorResource.reset("my-id-2"); Assert.assertEquals(404, response.getStatus()); + Assert.assertEquals("my-id", id1.getValue()); + Assert.assertEquals("my-id-2", id2.getValue()); verifyAll(); resetAll(); diff --git a/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java b/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java index b6c13997f452..03e08728490b 100644 --- a/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java +++ b/indexing-service/src/test/java/io/druid/indexing/test/TestIndexerMetadataStorageCoordinator.java @@ -57,6 +57,14 @@ public boolean deleteDataSourceMetadata(String dataSource) throw new UnsupportedOperationException(); } + @Override + public boolean resetDataSourceMetadata( + String dataSource, DataSourceMetadata dataSourceMetadata + ) throws IOException + { + return false; + } + @Override public List getUsedSegmentsForInterval(String dataSource, Interval interval) throws IOException { diff --git a/server/src/main/java/io/druid/indexing/overlord/DataSourceMetadata.java b/server/src/main/java/io/druid/indexing/overlord/DataSourceMetadata.java index 29551c8734c9..ef3dd2cc5a18 100644 --- a/server/src/main/java/io/druid/indexing/overlord/DataSourceMetadata.java +++ b/server/src/main/java/io/druid/indexing/overlord/DataSourceMetadata.java @@ -71,4 +71,14 @@ public interface DataSourceMetadata * @return merged copy */ DataSourceMetadata plus(DataSourceMetadata other); + + /** + * Returns a copy of this instance with "other" subtracted. + * + * Behavior is undefined if you pass in an instance of a different class from this one. + * + * @param other another instance + * @return subtracted copy + */ + DataSourceMetadata minus(DataSourceMetadata other); } diff --git a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java index 808bd35506d1..4872110420e7 100644 --- a/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/indexing/overlord/IndexerMetadataStorageCoordinator.java @@ -128,6 +128,15 @@ SegmentPublishResult announceHistoricalSegments( */ boolean deleteDataSourceMetadata(String dataSource); + /** + * Resets dataSourceMetadata entry for 'dataSource' to the one supplied. + * + * @param dataSource identifier + * @param dataSourceMetadata value to set + * @return true if the entry was reset, false otherwise + */ + boolean resetDataSourceMetadata(String dataSource, DataSourceMetadata dataSourceMetadata) throws IOException; + void updateSegmentMetadata(Set segments) throws IOException; void deleteSegments(Set segments) throws IOException; diff --git a/server/src/main/java/io/druid/indexing/overlord/ObjectMetadata.java b/server/src/main/java/io/druid/indexing/overlord/ObjectMetadata.java index b3989f770731..6cb278f711d4 100644 --- a/server/src/main/java/io/druid/indexing/overlord/ObjectMetadata.java +++ b/server/src/main/java/io/druid/indexing/overlord/ObjectMetadata.java @@ -60,6 +60,12 @@ public DataSourceMetadata plus(DataSourceMetadata other) return other; } + @Override + public DataSourceMetadata minus(DataSourceMetadata other) + { + return this; + } + @Override public boolean equals(Object o) { diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index 837c266655c4..abf506ec6322 100644 --- a/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -19,6 +19,8 @@ package io.druid.indexing.overlord.supervisor; +import io.druid.indexing.overlord.DataSourceMetadata; + /** * Used as a tombstone marker in the supervisors metadata table to indicate that the supervisor has been removed. */ @@ -48,7 +50,7 @@ public SupervisorReport getStatus() } @Override - public void reset() {} + public void reset(DataSourceMetadata dataSourceMetadata) {} }; } } diff --git a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java index 6f64d486a4d6..cec68c9674e2 100644 --- a/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/io/druid/indexing/overlord/supervisor/Supervisor.java @@ -19,6 +19,8 @@ package io.druid.indexing.overlord.supervisor; +import io.druid.indexing.overlord.DataSourceMetadata; + public interface Supervisor { void start(); @@ -33,5 +35,5 @@ public interface Supervisor SupervisorReport getStatus(); - void reset(); + void reset(DataSourceMetadata dataSourceMetadata); } diff --git a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java index 9d8cf4d2fc56..a88f597bcbc6 100644 --- a/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java +++ b/server/src/main/java/io/druid/metadata/IndexerSQLMetadataStorageCoordinator.java @@ -818,6 +818,41 @@ public Boolean withHandle(Handle handle) throws Exception ); } + @Override + public boolean resetDataSourceMetadata( + final String dataSource, final DataSourceMetadata dataSourceMetadata + ) throws IOException + { + final byte[] newCommitMetadataBytes = jsonMapper.writeValueAsBytes(dataSourceMetadata); + final String newCommitMetadataSha1 = BaseEncoding.base16().encode( + Hashing.sha1().hashBytes(newCommitMetadataBytes).asBytes() + ); + + return connector.retryWithHandle( + new HandleCallback() + { + @Override + public Boolean withHandle(Handle handle) throws Exception + { + final int numRows = handle.createStatement( + String.format( + "UPDATE %s SET " + + "commit_metadata_payload = :new_commit_metadata_payload, " + + "commit_metadata_sha1 = :new_commit_metadata_sha1 " + + "WHERE dataSource = :dataSource", + dbTables.getDataSourceTable() + ) + ) + .bind("dataSource", dataSource) + .bind("new_commit_metadata_payload", newCommitMetadataBytes) + .bind("new_commit_metadata_sha1", newCommitMetadataSha1) + .execute(); + return numRows == 1; + } + } + ); + } + public void updateSegmentMetadata(final Set segments) throws IOException { connector.getDBI().inTransaction( diff --git a/services/src/main/java/io/druid/cli/CliOverlord.java b/services/src/main/java/io/druid/cli/CliOverlord.java index 39aa998bf0ae..4e9b822607b7 100644 --- a/services/src/main/java/io/druid/cli/CliOverlord.java +++ b/services/src/main/java/io/druid/cli/CliOverlord.java @@ -73,6 +73,7 @@ import io.druid.indexing.overlord.http.OverlordRedirectInfo; import io.druid.indexing.overlord.http.OverlordResource; import io.druid.indexing.overlord.setup.WorkerBehaviorConfig; +import io.druid.indexing.overlord.supervisor.SupervisorManager; import io.druid.indexing.overlord.supervisor.SupervisorResource; import io.druid.indexing.worker.config.WorkerConfig; import io.druid.java.util.common.logger.Logger; @@ -146,6 +147,7 @@ public void configure(Binder binder) binder.bind(TaskActionToolbox.class).in(LazySingleton.class); binder.bind(TaskLockbox.class).in(LazySingleton.class); binder.bind(TaskStorageQueryAdapter.class).in(LazySingleton.class); + binder.bind(SupervisorManager.class).in(LazySingleton.class); binder.bind(ChatHandlerProvider.class).toProvider(Providers.of(null));