Skip to content

Commit

Permalink
ITKafkaIndexingServiceTest fixes (apache#3872)
Browse files Browse the repository at this point in the history
* remove wait between sends, wait for ingestion to complete before querying

send fixed number of events

more fixes

* handle interrupted exception

* remove while

* review comments
  • Loading branch information
pjain1 authored and fjy committed Jan 22, 2017
1 parent efb1b40 commit 515caa8
Showing 1 changed file with 29 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import com.google.common.base.Throwables;
import com.google.inject.Inject;

import io.druid.java.util.common.ISE;
import io.druid.java.util.common.logger.Logger;
import io.druid.testing.IntegrationTestingConfig;
Expand Down Expand Up @@ -56,12 +55,12 @@
public class ITKafkaIndexingServiceTest extends AbstractIndexerTest
{
private static final Logger LOG = new Logger(ITKafkaIndexingServiceTest.class);
private static final int DELAY_BETWEEN_EVENTS_SECS = 5;
private static final String INDEXER_FILE = "/indexer/kafka_supervisor_spec.json";
private static final String QUERIES_FILE = "/indexer/kafka_index_queries.json";
private static final String DATASOURCE = "kafka_indexing_service_test";
private static final String TOPIC_NAME = "kafka_indexing_service_topic";
private static final int MINUTES_TO_SEND = 4;
private static final int NUM_EVENTS_TO_SEND = 60;
private static final long WAIT_TIME_MILLIS = 2 * 60 * 1000L;

// We'll fill in the current time and numbers for added, deleted and changed
// before sending the event.
Expand Down Expand Up @@ -163,15 +162,13 @@ public void testKafka()
DateTime dt = new DateTime(zone); // timestamp to put on events
dtFirst = dt; // timestamp of 1st event
dtLast = dt; // timestamp of last event
// stop sending events when time passes this
DateTime dtStop = dtFirst.plusMinutes(MINUTES_TO_SEND).plusSeconds(30);

// these are used to compute the expected aggregations
int added = 0;
int num_events = 0;

// send data to kafka
while (dt.compareTo(dtStop) < 0) { // as long as we're within the time span
while (num_events < NUM_EVENTS_TO_SEND) {
num_events++;
added += num_events;
// construct the event to send
Expand All @@ -184,16 +181,22 @@ public void testKafka()
throw Throwables.propagate(ioe);
}

try {
Thread.sleep(DELAY_BETWEEN_EVENTS_SECS * 1000);
}
catch (InterruptedException ex) { /* nothing */ }
dtLast = dt;
dt = new DateTime(zone);
}

producer.close();

LOG.info("Waiting for [%s] millis for Kafka indexing tasks to consume events", WAIT_TIME_MILLIS);
try {
Thread.sleep(WAIT_TIME_MILLIS);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}


InputStream is = ITKafkaIndexingServiceTest.class.getResourceAsStream(QUERIES_FILE);
if (null == is) {
throw new ISE("could not open query file: %s", QUERIES_FILE);
Expand All @@ -214,7 +217,7 @@ public void testKafka()
.replace("%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast))
.replace("%%TIMEBOUNDARY_RESPONSE_MINTIME%%", TIMESTAMP_FMT.print(dtFirst))
.replace("%%TIMESERIES_QUERY_START%%", INTERVAL_FMT.print(dtFirst))
.replace("%%TIMESERIES_QUERY_END%%", INTERVAL_FMT.print(dtFirst.plusMinutes(MINUTES_TO_SEND + 2)))
.replace("%%TIMESERIES_QUERY_END%%", INTERVAL_FMT.print(dtLast.plusMinutes(2)))
.replace("%%TIMESERIES_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst))
.replace("%%TIMESERIES_ADDED%%", Integer.toString(added))
.replace("%%TIMESERIES_NUMEVENTS%%", Integer.toString(num_events));
Expand All @@ -227,8 +230,23 @@ public void testKafka()
throw Throwables.propagate(e);
}

LOG.info("Shutting down Kafka Supervisor");
indexer.shutdownSupervisor(supervisorId);

// wait for all kafka indexing tasks to finish
LOG.info("Waiting for all kafka indexing tasks to finish");
RetryUtil.retryUntilTrue(
new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
return (indexer.getPendingTasks().size() + indexer.getRunningTasks().size() + indexer.getWaitingTasks()
.size()) == 0;
}
}, "Waiting for Tasks Completion"
);

// wait for segments to be handed off
try {
RetryUtil.retryUntil(
Expand Down

0 comments on commit 515caa8

Please sign in to comment.