diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java index e4e6fc89a6f0..d1ec3df3d483 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIOConfig.java @@ -29,12 +29,14 @@ public class KafkaIOConfig implements IOConfig { private static final boolean DEFAULT_USE_TRANSACTION = true; + private static final boolean DEFAULT_PAUSE_AFTER_READ = false; private final String baseSequenceName; private final KafkaPartitions startPartitions; private final KafkaPartitions endPartitions; private final Map consumerProperties; private final boolean useTransaction; + private final boolean pauseAfterRead; @JsonCreator public KafkaIOConfig( @@ -42,7 +44,8 @@ public KafkaIOConfig( @JsonProperty("startPartitions") KafkaPartitions startPartitions, @JsonProperty("endPartitions") KafkaPartitions endPartitions, @JsonProperty("consumerProperties") Map consumerProperties, - @JsonProperty("useTransaction") Boolean useTransaction + @JsonProperty("useTransaction") Boolean useTransaction, + @JsonProperty("pauseAfterRead") Boolean pauseAfterRead ) { this.baseSequenceName = Preconditions.checkNotNull(baseSequenceName, "baseSequenceName"); @@ -50,6 +53,7 @@ public KafkaIOConfig( this.endPartitions = Preconditions.checkNotNull(endPartitions, "endPartitions"); this.consumerProperties = Preconditions.checkNotNull(consumerProperties, "consumerProperties"); this.useTransaction = useTransaction != null ? useTransaction : DEFAULT_USE_TRANSACTION; + this.pauseAfterRead = pauseAfterRead != null ? pauseAfterRead : DEFAULT_PAUSE_AFTER_READ; Preconditions.checkArgument( startPartitions.getTopic().equals(endPartitions.getTopic()), @@ -101,6 +105,12 @@ public boolean isUseTransaction() return useTransaction; } + @JsonProperty + public boolean isPauseAfterRead() + { + return pauseAfterRead; + } + @Override public String toString() { @@ -110,6 +120,7 @@ public String toString() ", endPartitions=" + endPartitions + ", consumerProperties=" + consumerProperties + ", useTransaction=" + useTransaction + + ", pauseAfterRead=" + pauseAfterRead + '}'; } } diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java index 0c28768092bc..1bd9122f33a8 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaIndexTask.java @@ -19,15 +19,19 @@ package io.druid.indexing.kafka; +import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Joiner; +import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.base.Supplier; +import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -66,6 +70,8 @@ import io.druid.segment.realtime.appenderator.SegmentIdentifier; import io.druid.segment.realtime.appenderator.SegmentsAndMetadata; import io.druid.segment.realtime.appenderator.TransactionalSegmentPublisher; +import io.druid.segment.realtime.firehose.ChatHandler; +import io.druid.segment.realtime.firehose.ChatHandlerProvider; import io.druid.timeline.DataSegment; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -73,7 +79,17 @@ import org.apache.kafka.clients.consumer.OffsetOutOfRangeException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; - +import org.joda.time.DateTime; + +import javax.ws.rs.Consumes; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -82,9 +98,25 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; -public class KafkaIndexTask extends AbstractTask +public class KafkaIndexTask extends AbstractTask implements ChatHandler { + public static final long PAUSE_FOREVER = -1L; + + public enum Status + { + NOT_STARTED, + STARTING, + READING, + PAUSED, + PUBLISHING + } + private static final Logger log = new Logger(KafkaIndexTask.class); private static final String TYPE = "index_kafka"; private static final Random RANDOM = new Random(); @@ -95,13 +127,42 @@ public class KafkaIndexTask extends AbstractTask private final InputRowParser parser; private final KafkaTuningConfig tuningConfig; private final KafkaIOConfig ioConfig; + private final Optional chatHandlerProvider; + + private final Map endOffsets = new ConcurrentHashMap<>(); + private final Map nextOffsets = new ConcurrentHashMap<>(); + + private ObjectMapper mapper; private volatile Appenderator appenderator = null; private volatile FireDepartmentMetrics fireDepartmentMetrics = null; - private volatile boolean startedReading = false; - private volatile boolean stopping = false; - private volatile boolean publishing = false; + private volatile DateTime startTime; + private volatile Status status = Status.NOT_STARTED; // this is only ever set by the task runner thread (runThread) private volatile Thread runThread = null; + private volatile boolean stopRequested = false; + private volatile boolean publishOnStop = false; + + // The pause lock and associated conditions are to support coordination between the Jetty threads and the main + // ingestion loop. The goal is to provide callers of the API a guarantee that if pause() returns successfully + // the ingestion loop has been stopped at the returned offsets and will not ingest any more data until resumed. The + // fields are used as follows (every step requires acquiring [pauseLock]): + // Pausing: + // - In pause(), [pauseRequested] is set to true and then execution waits for [status] to change to PAUSED, with the + // condition checked when [hasPaused] is signalled. + // - In possiblyPause() called from the main loop, if [pauseRequested] is true, [status] is set to PAUSED, + // [hasPaused] is signalled, and execution pauses until [pauseRequested] becomes false, either by being set or by + // the [pauseMillis] timeout elapsing. [pauseRequested] is checked when [shouldResume] is signalled. + // Resuming: + // - In resume(), [pauseRequested] is set to false, [shouldResume] is signalled, and execution waits for [status] to + // change to something other than PAUSED, with the condition checked when [shouldResume] is signalled. + // - In possiblyPause(), when [shouldResume] is signalled, if [pauseRequested] has become false the pause loop ends, + // [status] is changed to STARTING and [shouldResume] is signalled. + + private final Lock pauseLock = new ReentrantLock(); + private final Condition hasPaused = pauseLock.newCondition(); + private final Condition shouldResume = pauseLock.newCondition(); + private volatile boolean pauseRequested = false; + private volatile long pauseMillis = 0; @JsonCreator public KafkaIndexTask( @@ -110,7 +171,8 @@ public KafkaIndexTask( @JsonProperty("dataSchema") DataSchema dataSchema, @JsonProperty("tuningConfig") KafkaTuningConfig tuningConfig, @JsonProperty("ioConfig") KafkaIOConfig ioConfig, - @JsonProperty("context") Map context + @JsonProperty("context") Map context, + @JacksonInject ChatHandlerProvider chatHandlerProvider ) { super( @@ -125,6 +187,9 @@ public KafkaIndexTask( this.parser = Preconditions.checkNotNull((InputRowParser) dataSchema.getParser(), "parser"); this.tuningConfig = Preconditions.checkNotNull(tuningConfig, "tuningConfig"); this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig"); + this.chatHandlerProvider = Optional.fromNullable(chatHandlerProvider); + + this.endOffsets.putAll(ioConfig.getEndPartitions().getPartitionOffsetMap()); } private static String makeTaskId(String dataSource, int randomBits) @@ -166,19 +231,20 @@ public KafkaIOConfig getIOConfig() return ioConfig; } - /** - * Public for tests. - */ - @JsonIgnore - public boolean hasStartedReading() - { - return startedReading; - } - @Override public TaskStatus run(final TaskToolbox toolbox) throws Exception { log.info("Starting up!"); + startTime = DateTime.now(); + mapper = toolbox.getObjectMapper(); + status = Status.STARTING; + + if (chatHandlerProvider.isPresent()) { + log.info("Found chat handler of class[%s]", chatHandlerProvider.get().getClass().getName()); + chatHandlerProvider.get().register(getId(), this); + } else { + log.warn("No chat handler detected"); + } runThread = Thread.currentThread(); @@ -207,7 +273,6 @@ public TaskStatus run(final TaskToolbox toolbox) throws Exception // Start up, set up initial offsets. final Object restoredMetadata = driver.startJob(); - final Map nextOffsets = Maps.newHashMap(); if (restoredMetadata == null) { nextOffsets.putAll(ioConfig.getStartPartitions().getPartitionOffsetMap()); } else { @@ -272,153 +337,142 @@ public void run() } }; - // Initialize consumer assignment. - final Set assignment = Sets.newHashSet(); - for (Map.Entry entry : nextOffsets.entrySet()) { - final long endOffset = ioConfig.getEndPartitions().getPartitionOffsetMap().get(entry.getKey()); - if (entry.getValue() < endOffset) { - assignment.add(entry.getKey()); - } else if (entry.getValue() == endOffset) { - log.info("Finished reading partition[%d].", entry.getKey()); - } else { - throw new ISE( - "WTF?! Cannot start from offset[%,d] > endOffset[%,d]", - entry.getValue(), - endOffset - ); - } - } - - assignPartitions(consumer, topic, assignment); - - // Seek to starting offsets. - for (final int partition : assignment) { - final long offset = nextOffsets.get(partition); - log.info("Seeking partition[%d] to offset[%,d].", partition, offset); - consumer.seek(new TopicPartition(topic, partition), offset); - } + Set assignment = assignPartitionsAndSeekToNext(consumer, topic); // Main loop. - // Could eventually support early termination (triggered by a supervisor) // Could eventually support leader/follower mode (for keeping replicas more in sync) boolean stillReading = !assignment.isEmpty(); - while (stillReading) { - if (stopping) { - log.info("Stopping early."); - break; - } + try { + while (stillReading) { + if (possiblyPause(assignment)) { + // The partition assignments may have changed while paused by a call to setEndOffsets() so reassign + // partitions upon resuming. This is safe even if the end offsets have not been modified. + assignment = assignPartitionsAndSeekToNext(consumer, topic); + + if (assignment.isEmpty()) { + log.info("All partitions have been fully read"); + publishOnStop = true; + stopRequested = true; + } + } - // The retrying business is because the KafkaConsumer throws OffsetOutOfRangeException if the seeked-to - // offset is not present in the topic-partition. This can happen if we're asking a task to read from data - // that has not been written yet (which is totally legitimate). So let's wait for it to show up. - final ConsumerRecords records = RetryUtils.retry( - new Callable>() - { - @Override - public ConsumerRecords call() throws Exception + if (stopRequested) { + break; + } + + // The retrying business is because the KafkaConsumer throws OffsetOutOfRangeException if the seeked-to + // offset is not present in the topic-partition. This can happen if we're asking a task to read from data + // that has not been written yet (which is totally legitimate). So let's wait for it to show up. + final ConsumerRecords records = RetryUtils.retry( + new Callable>() { - try { - return consumer.poll(POLL_TIMEOUT); - } - finally { - startedReading = true; + @Override + public ConsumerRecords call() throws Exception + { + try { + return consumer.poll(POLL_TIMEOUT); + } + finally { + status = Status.READING; + } } - } - }, - new Predicate() - { - @Override - public boolean apply(Throwable input) + }, + new Predicate() { - return input instanceof OffsetOutOfRangeException; - } - }, - Integer.MAX_VALUE - ); - - for (ConsumerRecord record : records) { - if (log.isTraceEnabled()) { - log.trace( - "Got topic[%s] partition[%d] offset[%,d].", - record.topic(), - record.partition(), - record.offset() - ); - } + @Override + public boolean apply(Throwable input) + { + return input instanceof OffsetOutOfRangeException; + } + }, + Integer.MAX_VALUE + ); - if (record.offset() < ioConfig.getEndPartitions().getPartitionOffsetMap().get(record.partition())) { - if (record.offset() != nextOffsets.get(record.partition())) { - throw new ISE( - "WTF?! Got offset[%,d] after offset[%,d] in partition[%d].", - record.offset(), - nextOffsets.get(record.partition()), - record.partition() + for (ConsumerRecord record : records) { + if (log.isTraceEnabled()) { + log.trace( + "Got topic[%s] partition[%d] offset[%,d].", + record.topic(), + record.partition(), + record.offset() ); } - try { - final InputRow row = Preconditions.checkNotNull(parser.parse(ByteBuffer.wrap(record.value())), "row"); - final SegmentIdentifier identifier = driver.add( - row, - sequenceNames.get(record.partition()), - committerSupplier - ); - - if (identifier == null) { - // Failure to allocate segment puts determinism at risk, bail out to be safe. - // May want configurable behavior here at some point. - // If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks. - throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp()); + if (record.offset() < endOffsets.get(record.partition())) { + if (record.offset() != nextOffsets.get(record.partition())) { + throw new ISE( + "WTF?! Got offset[%,d] after offset[%,d] in partition[%d].", + record.offset(), + nextOffsets.get(record.partition()), + record.partition() + ); } - fireDepartmentMetrics.incrementProcessed(); - } - catch (ParseException e) { - if (tuningConfig.isReportParseExceptions()) { - throw e; - } else { - log.debug( - e, - "Dropping unparseable row from partition[%d] offset[%,d].", - record.partition(), - record.offset() + try { + final InputRow row = Preconditions.checkNotNull(parser.parse(ByteBuffer.wrap(record.value())), "row"); + final SegmentIdentifier identifier = driver.add( + row, + sequenceNames.get(record.partition()), + committerSupplier ); - fireDepartmentMetrics.incrementUnparseable(); - } - } + if (identifier == null) { + // Failure to allocate segment puts determinism at risk, bail out to be safe. + // May want configurable behavior here at some point. + // If we allow continuing, then consider blacklisting the interval for a while to avoid constant checks. + throw new ISE("Could not allocate segment for row with timestamp[%s]", row.getTimestamp()); + } - final long nextOffset = record.offset() + 1; - final long endOffset = ioConfig.getEndPartitions().getPartitionOffsetMap().get(record.partition()); + fireDepartmentMetrics.incrementProcessed(); + } + catch (ParseException e) { + if (tuningConfig.isReportParseExceptions()) { + throw e; + } else { + log.debug( + e, + "Dropping unparseable row from partition[%d] offset[%,d].", + record.partition(), + record.offset() + ); + + fireDepartmentMetrics.incrementUnparseable(); + } + } - nextOffsets.put(record.partition(), nextOffset); + nextOffsets.put(record.partition(), record.offset() + 1); + } - if (nextOffset == endOffset && assignment.remove(record.partition())) { + if (nextOffsets.get(record.partition()).equals(endOffsets.get(record.partition())) + && assignment.remove(record.partition())) { log.info("Finished reading topic[%s], partition[%,d].", record.topic(), record.partition()); assignPartitions(consumer, topic, assignment); - stillReading = !assignment.isEmpty(); + stillReading = ioConfig.isPauseAfterRead() || !assignment.isEmpty(); } } } } + finally { + driver.persist(committerSupplier.get()); // persist pending data + } - // Persist pending data. - final Committer finalCommitter = committerSupplier.get(); - driver.persist(finalCommitter); - - publishing = true; - if (stopping) { - // Stopped gracefully. Exit code shouldn't matter, so fail to be on the safe side. - return TaskStatus.failure(getId()); + if (stopRequested && !publishOnStop) { + throw new InterruptedException("Stopping without publishing"); } + status = Status.PUBLISHING; final TransactionalSegmentPublisher publisher = new TransactionalSegmentPublisher() { @Override public boolean publishSegments(Set segments, Object commitMetadata) throws IOException { + final KafkaPartitions finalPartitions = toolbox.getObjectMapper().convertValue( + ((Map) commitMetadata).get(METADATA_NEXT_PARTITIONS), + KafkaPartitions.class + ); + // Sanity check, we should only be publishing things that match our desired end state. - if (!ioConfig.getEndPartitions().equals(((Map) commitMetadata).get(METADATA_NEXT_PARTITIONS))) { + if (!endOffsets.equals(finalPartitions.getPartitionOffsetMap())) { throw new ISE("WTF?! Driver attempted to publish invalid metadata[%s].", commitMetadata); } @@ -428,7 +482,7 @@ public boolean publishSegments(Set segments, Object commitMetadata) action = new SegmentInsertAction( segments, new KafkaDataSourceMetadata(ioConfig.getStartPartitions()), - new KafkaDataSourceMetadata(ioConfig.getEndPartitions()) + new KafkaDataSourceMetadata(finalPartitions) ); } else { action = new SegmentInsertAction(segments, null, null); @@ -463,6 +517,15 @@ public String apply(DataSegment input) ); } } + catch (InterruptedException e) { + // if we were interrupted because we were asked to stop, handle the exception and return success, else rethrow + if (!stopRequested) { + Thread.currentThread().interrupt(); + throw e; + } + + log.info("The task was asked to stop before completing"); + } return success(); } @@ -473,14 +536,15 @@ public boolean canRestore() return true; } + @POST + @Path("/stop") @Override public void stopGracefully() { log.info("Stopping gracefully."); - - stopping = true; - if (publishing && runThread.isAlive()) { - log.info("stopGracefully: Run thread started publishing, interrupting it."); + stopRequested = true; + if (runThread.isAlive()) { + log.info("Interrupting run thread (status: [%s])", status); runThread.interrupt(); } } @@ -503,12 +567,183 @@ public Sequence run(final Query query, final Map responseC }; } + @GET + @Path("/status") + @Produces(MediaType.APPLICATION_JSON) + public Status getStatus() + { + return status; + } + + @GET + @Path("/offsets/current") + @Produces(MediaType.APPLICATION_JSON) + public Map getCurrentOffsets() + { + return nextOffsets; + } + + @GET + @Path("/offsets/end") + @Produces(MediaType.APPLICATION_JSON) + public Map getEndOffsets() + { + return endOffsets; + } + + @POST + @Path("/offsets/end") + @Consumes(MediaType.APPLICATION_JSON) + @Produces(MediaType.APPLICATION_JSON) + public Response setEndOffsets( + Map offsets, + @QueryParam("resume") @DefaultValue("false") final boolean resume + ) throws InterruptedException + { + if (offsets == null) { + return Response.status(Response.Status.BAD_REQUEST) + .entity("Request body must contain a map of { partition:endOffset }") + .build(); + } else if (!endOffsets.keySet().containsAll(offsets.keySet())) { + return Response.status(Response.Status.BAD_REQUEST) + .entity( + String.format( + "Request contains partitions not being handled by this task, my partitions: %s", + endOffsets.keySet() + ) + ) + .build(); + } + + pauseLock.lockInterruptibly(); + try { + if (!isPaused()) { + return Response.status(Response.Status.BAD_REQUEST) + .entity("Task must be paused before changing the end offsets") + .build(); + } + + for (Map.Entry entry : offsets.entrySet()) { + if (entry.getValue().compareTo(nextOffsets.get(entry.getKey())) < 0) { + return Response.status(Response.Status.BAD_REQUEST) + .entity( + String.format( + "End offset must be >= current offset for partition [%s] (current: %s)", + entry.getKey(), + nextOffsets.get(entry.getKey()) + ) + ) + .build(); + } + } + + endOffsets.putAll(offsets); + log.info("endOffsets changed to %s", endOffsets); + } + finally { + pauseLock.unlock(); + } + + if (resume) { + resume(); + } + + return Response.ok(endOffsets).build(); + } + + /** + * Signals the ingestion loop to pause. + * + * @param timeout how long to pause for before resuming in milliseconds, <= 0 means indefinitely + * + * @return one of the following Responses: 400 Bad Request if the task has started publishing; 202 Accepted if the + * method has timed out and returned before the task has paused; 200 OK with a map of the current partition offsets + * in the response body if the task successfully paused + */ + @POST + @Path("/pause") + @Produces(MediaType.APPLICATION_JSON) + public Response pause(@QueryParam("timeout") @DefaultValue("0") final long timeout) + throws InterruptedException + { + if (!(status == Status.PAUSED || status == Status.READING)) { + return Response.status(Response.Status.BAD_REQUEST) + .entity(String.format("Can't pause, task is not in a pausable state (state: [%s])", status)) + .build(); + } + + pauseLock.lockInterruptibly(); + try { + pauseMillis = timeout <= 0 ? PAUSE_FOREVER : timeout; + pauseRequested = true; + + if (isPaused()) { + shouldResume.signalAll(); // kick the monitor so it re-awaits with the new pauseMillis + } + + long nanos = TimeUnit.SECONDS.toNanos(2); + while (!isPaused()) { + if (nanos <= 0L) { + return Response.status(Response.Status.ACCEPTED) + .entity("Request accepted but task has not yet paused") + .build(); + } + nanos = hasPaused.awaitNanos(nanos); + } + } + finally { + pauseLock.unlock(); + } + + try { + return Response.ok().entity(mapper.writeValueAsString(getCurrentOffsets())).build(); + } + catch (JsonProcessingException e) { + throw Throwables.propagate(e); + } + } + + @POST + @Path("/resume") + public void resume() throws InterruptedException + { + pauseLock.lockInterruptibly(); + try { + pauseRequested = false; + shouldResume.signalAll(); + + long nanos = TimeUnit.SECONDS.toNanos(5); + while (isPaused()) { + if (nanos <= 0L) { + throw new RuntimeException("Resume command was not accepted within 5 seconds"); + } + nanos = shouldResume.awaitNanos(nanos); + } + } + finally { + pauseLock.unlock(); + } + } + + @GET + @Path("/time/start") + @Produces(MediaType.APPLICATION_JSON) + public DateTime getStartTime() + { + return startTime; + } + @VisibleForTesting - public FireDepartmentMetrics getFireDepartmentMetrics() + FireDepartmentMetrics getFireDepartmentMetrics() { return fireDepartmentMetrics; } + private boolean isPaused() + { + return status == Status.PAUSED; + } + private Appenderator newAppenderator(FireDepartmentMetrics metrics, TaskToolbox toolbox) { return Appenderators.createRealtime( @@ -590,4 +825,95 @@ public TopicPartition apply(Integer n) ) ); } + + private Set assignPartitionsAndSeekToNext(KafkaConsumer consumer, String topic) + { + // Initialize consumer assignment. + final Set assignment = Sets.newHashSet(); + for (Map.Entry entry : nextOffsets.entrySet()) { + final long endOffset = endOffsets.get(entry.getKey()); + if (entry.getValue() < endOffset) { + assignment.add(entry.getKey()); + } else if (entry.getValue() == endOffset) { + log.info("Finished reading partition[%d].", entry.getKey()); + } else { + throw new ISE( + "WTF?! Cannot start from offset[%,d] > endOffset[%,d]", + entry.getValue(), + endOffset + ); + } + } + + assignPartitions(consumer, topic, assignment); + + // Seek to starting offsets. + for (final int partition : assignment) { + final long offset = nextOffsets.get(partition); + log.info("Seeking partition[%d] to offset[%,d].", partition, offset); + consumer.seek(new TopicPartition(topic, partition), offset); + } + + return assignment; + } + + /** + * Checks if the pauseRequested flag was set and if so blocks: + * a) if pauseMillis == PAUSE_FOREVER, until pauseRequested is cleared + * b) if pauseMillis != PAUSE_FOREVER, until pauseMillis elapses -or- pauseRequested is cleared + *

+ * If pauseMillis is changed while paused, the new pause timeout will be applied. This allows adjustment of the + * pause timeout (making a timed pause into an indefinite pause and vice versa is valid) without having to resume + * and ensures that the loop continues to stay paused without ingesting any new events. You will need to signal + * shouldResume after adjusting pauseMillis for the new value to take effect. + *

+ * Sets paused = true and signals paused so callers can be notified when the pause command has been accepted. + *

+ * Additionally, pauses if all partitions assignments have been read and pauseAfterRead flag is set. + * + * @return true if a pause request was handled, false otherwise + */ + private boolean possiblyPause(Set assignment) throws InterruptedException + { + pauseLock.lockInterruptibly(); + try { + if (ioConfig.isPauseAfterRead() && assignment.isEmpty()) { + pauseMillis = PAUSE_FOREVER; + pauseRequested = true; + } + + if (pauseRequested) { + status = Status.PAUSED; + long nanos = 0; + hasPaused.signalAll(); + + while (pauseRequested) { + if (pauseMillis == PAUSE_FOREVER) { + log.info("Pausing ingestion until resumed"); + shouldResume.await(); + } else { + if (pauseMillis > 0) { + log.info("Pausing ingestion for [%,d] ms", pauseMillis); + nanos = TimeUnit.MILLISECONDS.toNanos(pauseMillis); + pauseMillis = 0; + } + if (nanos <= 0L) { + pauseRequested = false; // timeout elapsed + } + nanos = shouldResume.awaitNanos(nanos); + } + } + + status = Status.READING; + shouldResume.signalAll(); + log.info("Ingestion loop resumed"); + return true; + } + } + finally { + pauseLock.unlock(); + } + + return false; + } } diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index aab45e2cc67e..6b5e1b8df48c 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -19,6 +19,8 @@ package io.druid.indexing.kafka; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Charsets; @@ -137,6 +139,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; @RunWith(Parameterized.class) public class KafkaIndexTaskTest @@ -159,6 +162,7 @@ public class KafkaIndexTaskTest private final List runningTasks = Lists.newArrayList(); private static final Logger log = new Logger(KafkaIndexTaskTest.class); + private static final ObjectMapper objectMapper = new DefaultObjectMapper(); private static final DataSchema DATA_SCHEMA; @@ -175,7 +179,6 @@ public class KafkaIndexTaskTest ); static { - ObjectMapper objectMapper = new DefaultObjectMapper(); DATA_SCHEMA = new DataSchema( "test_ds", objectMapper.convertValue( @@ -298,7 +301,8 @@ public void testRunAfterDataInserted() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), - true + true, + false ), null ); @@ -337,7 +341,8 @@ public void testRunBeforeDataInserted() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), - true + true, + false ), null ); @@ -345,7 +350,7 @@ public void testRunBeforeDataInserted() throws Exception final ListenableFuture future = runTask(task); // Wait for the task to start reading - while (!task.hasStartedReading()) { + while (task.getStatus() != KafkaIndexTask.Status.READING) { Thread.sleep(10); } @@ -395,7 +400,8 @@ public void testRunOnNothing() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), kafkaServer.consumerProperties(), - true + true, + false ), null ); @@ -433,7 +439,8 @@ public void testHandoffConditionTimeoutWhenHandoffOccurs() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), - true + true, + false ), null ); @@ -482,7 +489,8 @@ public void testHandoffConditionTimeoutWhenHandoffDoesNotOccur() throws Exceptio new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), - true + true, + false ), null ); @@ -530,7 +538,8 @@ public void testReportParseExceptions() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 7L)), kafkaServer.consumerProperties(), - true + true, + false ), null ); @@ -560,7 +569,8 @@ public void testRunReplicas() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), - true + true, + false ), null ); @@ -571,7 +581,8 @@ public void testRunReplicas() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), - true + true, + false ), null ); @@ -622,7 +633,8 @@ public void testRunConflicting() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), - true + true, + false ), null ); @@ -633,7 +645,8 @@ public void testRunConflicting() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 3L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 7L)), kafkaServer.consumerProperties(), - true + true, + false ), null ); @@ -685,6 +698,7 @@ public void testRunConflictingWithoutTransactions() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), + false, false ), null @@ -696,6 +710,7 @@ public void testRunConflictingWithoutTransactions() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 3L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 7L)), kafkaServer.consumerProperties(), + false, false ), null @@ -753,7 +768,8 @@ public void testRunOneTaskTwoPartitions() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 2L, 1, 0L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 5L, 1, 2L)), kafkaServer.consumerProperties(), - true + true, + false ), null ); @@ -807,7 +823,8 @@ public void testRunTwoTasksTwoPartitions() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), - true + true, + false ), null ); @@ -818,7 +835,8 @@ public void testRunTwoTasksTwoPartitions() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(1, 0L)), new KafkaPartitions("topic0", ImmutableMap.of(1, 1L)), kafkaServer.consumerProperties(), - true + true, + false ), null ); @@ -871,7 +889,8 @@ public void testRestore() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), - true + true, + false ), null ); @@ -891,9 +910,9 @@ public void testRestore() throws Exception Assert.assertEquals(2, countEvents(task1)); - // Stop gracefully + // Stop without publishing segment task1.stopGracefully(); - Assert.assertEquals(TaskStatus.Status.FAILED, future1.get().getStatusCode()); + Assert.assertEquals(TaskStatus.Status.SUCCESS, future1.get().getStatusCode()); // Start a new task final KafkaIndexTask task2 = createTask( @@ -903,7 +922,8 @@ public void testRestore() throws Exception new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), kafkaServer.consumerProperties(), - true + true, + false ), null ); @@ -942,6 +962,172 @@ public void testRestore() throws Exception Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2)); } + @Test(timeout = 60_000L) + public void testRunWithPauseAndResume() throws Exception + { + final KafkaIndexTask task = createTask( + null, + new KafkaIOConfig( + "sequence0", + new KafkaPartitions("topic0", ImmutableMap.of(0, 2L)), + new KafkaPartitions("topic0", ImmutableMap.of(0, 5L)), + kafkaServer.consumerProperties(), + true, + false + ), + null + ); + + final ListenableFuture future = runTask(task); + + // Insert some data, but not enough for the task to finish + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + for (ProducerRecord record : Iterables.limit(RECORDS, 4)) { + kafkaProducer.send(record).get(); + } + } + + while (countEvents(task) != 2) { + Thread.sleep(25); + } + + Assert.assertEquals(2, countEvents(task)); + Assert.assertEquals(KafkaIndexTask.Status.READING, task.getStatus()); + + Map currentOffsets = objectMapper.readValue( + task.pause(0).getEntity().toString(), + new TypeReference>() + { + } + ); + Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus()); + + // Insert remaining data + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + for (ProducerRecord record : Iterables.skip(RECORDS, 4)) { + kafkaProducer.send(record).get(); + } + } + + try { + future.get(10, TimeUnit.SECONDS); + Assert.fail("Task completed when it should have been paused"); + } + catch (TimeoutException e) { + // carry on.. + } + + Assert.assertEquals(currentOffsets, task.getCurrentOffsets()); + + task.resume(); + + Assert.assertEquals(TaskStatus.Status.SUCCESS, future.get().getStatusCode()); + Assert.assertEquals(task.getEndOffsets(), task.getCurrentOffsets()); + + // Check metrics + Assert.assertEquals(3, task.getFireDepartmentMetrics().processed()); + Assert.assertEquals(0, task.getFireDepartmentMetrics().unparseable()); + Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway()); + + // Check published metadata + SegmentDescriptor desc1 = SD(task, "2010/P1D", 0); + SegmentDescriptor desc2 = SD(task, "2011/P1D", 0); + Assert.assertEquals(ImmutableSet.of(desc1, desc2), publishedDescriptors()); + Assert.assertEquals( + new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 5L))), + metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) + ); + + // Check segments in deep storage + Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc1)); + Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc2)); + } + + @Test(timeout = 60_000L) + public void testRunAndPauseAfterReadWithModifiedEndOffsets() throws Exception + { + final KafkaIndexTask task = createTask( + null, + new KafkaIOConfig( + "sequence0", + new KafkaPartitions("topic0", ImmutableMap.of(0, 1L)), + new KafkaPartitions("topic0", ImmutableMap.of(0, 3L)), + kafkaServer.consumerProperties(), + true, + true + ), + null + ); + + final ListenableFuture future = runTask(task); + + try (final KafkaProducer kafkaProducer = kafkaServer.newProducer()) { + for (ProducerRecord record : RECORDS) { + kafkaProducer.send(record).get(); + } + } + + while (task.getStatus() != KafkaIndexTask.Status.PAUSED) { + Thread.sleep(25); + } + + // reached the end of the assigned offsets and paused instead of publishing + Assert.assertEquals(task.getEndOffsets(), task.getCurrentOffsets()); + Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus()); + + Assert.assertEquals(ImmutableMap.of(0, 3L), task.getEndOffsets()); + Map newEndOffsets = ImmutableMap.of(0, 4L); + task.setEndOffsets(newEndOffsets, false); + Assert.assertEquals(newEndOffsets, task.getEndOffsets()); + Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus()); + task.resume(); + + while (task.getStatus() != KafkaIndexTask.Status.PAUSED) { + Thread.sleep(25); + } + + // reached the end of the updated offsets and paused + Assert.assertEquals(newEndOffsets, task.getCurrentOffsets()); + Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus()); + + // try again but with resume flag == true + newEndOffsets = ImmutableMap.of(0, 6L); + task.setEndOffsets(newEndOffsets, true); + Assert.assertEquals(newEndOffsets, task.getEndOffsets()); + Assert.assertNotEquals(KafkaIndexTask.Status.PAUSED, task.getStatus()); + + while (task.getStatus() != KafkaIndexTask.Status.PAUSED) { + Thread.sleep(25); + } + + Assert.assertEquals(newEndOffsets, task.getCurrentOffsets()); + Assert.assertEquals(KafkaIndexTask.Status.PAUSED, task.getStatus()); + + task.resume(); + + Assert.assertEquals(TaskStatus.Status.SUCCESS, future.get().getStatusCode()); + + // Check metrics + Assert.assertEquals(4, task.getFireDepartmentMetrics().processed()); + Assert.assertEquals(1, task.getFireDepartmentMetrics().unparseable()); + Assert.assertEquals(0, task.getFireDepartmentMetrics().thrownAway()); + + // Check published metadata + SegmentDescriptor desc1 = SD(task, "2009/P1D", 0); + SegmentDescriptor desc2 = SD(task, "2010/P1D", 0); + SegmentDescriptor desc3 = SD(task, "2011/P1D", 0); + Assert.assertEquals(ImmutableSet.of(desc1, desc2, desc3), publishedDescriptors()); + Assert.assertEquals( + new KafkaDataSourceMetadata(new KafkaPartitions("topic0", ImmutableMap.of(0, 6L))), + metadataStorageCoordinator.getDataSourceMetadata(DATA_SCHEMA.getDataSource()) + ); + + // Check segments in deep storage + Assert.assertEquals(ImmutableList.of("b"), readSegmentDim1(desc1)); + Assert.assertEquals(ImmutableList.of("c"), readSegmentDim1(desc2)); + Assert.assertEquals(ImmutableList.of("d", "e"), readSegmentDim1(desc3)); + } + private ListenableFuture runTask(final Task task) { try { @@ -1015,6 +1201,7 @@ private KafkaIndexTask createTask( DATA_SCHEMA, tuningConfig, ioConfig, + null, null ); } diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java b/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java index b646dfdb2c2b..588609a30dda 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/TestUtils.java @@ -30,6 +30,8 @@ import io.druid.segment.IndexMerger; import io.druid.segment.IndexMergerV9; import io.druid.segment.column.ColumnConfig; +import io.druid.segment.realtime.firehose.ChatHandlerProvider; +import io.druid.segment.realtime.firehose.NoopChatHandlerProvider; import java.util.List; import java.util.concurrent.TimeUnit; @@ -70,6 +72,7 @@ public int columnCacheSizeBytes() .addValue(IndexIO.class, indexIO) .addValue(IndexMerger.class, indexMerger) .addValue(ObjectMapper.class, jsonMapper) + .addValue(ChatHandlerProvider.class, new NoopChatHandlerProvider()) ); }