diff --git a/integration-tests/docker/Dockerfile b/integration-tests/docker/Dockerfile index 29f4ef9b8ef9..02cedcc461ac 100644 --- a/integration-tests/docker/Dockerfile +++ b/integration-tests/docker/Dockerfile @@ -22,6 +22,13 @@ RUN wget -q -O - http://www.us.apache.org/dist/zookeeper/zookeeper-3.4.6/zookeep RUN cp /usr/local/zookeeper-3.4.6/conf/zoo_sample.cfg /usr/local/zookeeper-3.4.6/conf/zoo.cfg RUN ln -s /usr/local/zookeeper-3.4.6 /usr/local/zookeeper +# Kafka +RUN wget -q -O - http://www.us.apache.org/dist/kafka/0.8.2.0/kafka_2.10-0.8.2.0.tgz | tar -xzf - -C /usr/local +RUN ln -s /usr/local/kafka_2.10-0.8.2.0 /usr/local/kafka +# unless advertised.host.name is set to docker ip, publishing data fails +ADD docker_ip docker_ip +RUN perl -pi -e "s/#advertised.port=.*/advertised.port=9092/; s/#advertised.host.*/advertised.host.name=$(cat docker_ip)/" /usr/local/kafka/config/server.properties + # git RUN apt-get install -y git diff --git a/integration-tests/docker/broker.conf b/integration-tests/docker/broker.conf index 63b6af12e3c7..332796a66e34 100644 --- a/integration-tests/docker/broker.conf +++ b/integration-tests/docker/broker.conf @@ -11,7 +11,7 @@ command=java -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.host=%(ENV_HOST_IP)s - -Ddruid.zk.service.host=druid-zookeeper + -Ddruid.zk.service.host=druid-zookeeper-kafka -Ddruid.processing.buffer.sizeBytes=75000000 -Ddruid.server.http.numThreads=100 -Ddruid.processing.numThreads=1 diff --git a/integration-tests/docker/coordinator.conf b/integration-tests/docker/coordinator.conf index 60cf6b97362d..c8fa1c8f770d 100644 --- a/integration-tests/docker/coordinator.conf +++ b/integration-tests/docker/coordinator.conf @@ -13,7 +13,7 @@ command=java -Ddruid.metadata.storage.connector.connectURI=jdbc:mysql://druid-metadata-storage/druid -Ddruid.metadata.storage.connector.user=druid -Ddruid.metadata.storage.connector.password=diurd - -Ddruid.zk.service.host=druid-zookeeper + -Ddruid.zk.service.host=druid-zookeeper-kafka -Ddruid.coordinator.startDelay=PT5S -cp /usr/local/druid/lib/* io.druid.cli.Main server coordinator diff --git a/integration-tests/docker/historical.conf b/integration-tests/docker/historical.conf index ab02bbc664a8..85e32e725001 100644 --- a/integration-tests/docker/historical.conf +++ b/integration-tests/docker/historical.conf @@ -11,7 +11,7 @@ command=java -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.host=%(ENV_HOST_IP)s - -Ddruid.zk.service.host=druid-zookeeper + -Ddruid.zk.service.host=druid-zookeeper-kafka -Ddruid.s3.accessKey=AKIAIMKECRUYKDQGR6YQ -Ddruid.s3.secretKey=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 -Ddruid.processing.buffer.sizeBytes=75000000 diff --git a/integration-tests/docker/kafka.conf b/integration-tests/docker/kafka.conf new file mode 100644 index 000000000000..861aa36b32cf --- /dev/null +++ b/integration-tests/docker/kafka.conf @@ -0,0 +1,5 @@ +[program:kafka] +command=/usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties +user=daemon +priority=0 +stdout_logfile=/shared/logs/kafka.log diff --git a/integration-tests/docker/middlemanager.conf b/integration-tests/docker/middlemanager.conf index cf1436182e76..c32e8a580b36 100644 --- a/integration-tests/docker/middlemanager.conf +++ b/integration-tests/docker/middlemanager.conf @@ -9,7 +9,7 @@ command=java -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.host=%(ENV_HOST_IP)s - -Ddruid.zk.service.host=druid-zookeeper + -Ddruid.zk.service.host=druid-zookeeper-kafka -Ddruid.indexer.logs.directory=/shared/tasklogs -Ddruid.storage.storageDirectory=/shared/storage -Ddruid.indexer.runner.javaOpts=-server -Xmx256m -Xms256m -XX:NewSize=128m -XX:MaxNewSize=128m -XX:+UseConcMarkSweepGC -XX:+PrintGCDetails -XX:+PrintGCTimeStamps diff --git a/integration-tests/docker/overlord.conf b/integration-tests/docker/overlord.conf index a0d436c5a025..1c646b82fbf7 100644 --- a/integration-tests/docker/overlord.conf +++ b/integration-tests/docker/overlord.conf @@ -13,7 +13,7 @@ command=java -Ddruid.metadata.storage.connector.connectURI=jdbc:mysql://druid-metadata-storage/druid -Ddruid.metadata.storage.connector.user=druid -Ddruid.metadata.storage.connector.password=diurd - -Ddruid.zk.service.host=druid-zookeeper + -Ddruid.zk.service.host=druid-zookeeper-kafka -Ddruid.indexer.storage.type=metadata -Ddruid.indexer.logs.directory=/shared/tasklogs -Ddruid.indexer.runner.type=remote diff --git a/integration-tests/docker/router.conf b/integration-tests/docker/router.conf index ddd8121f6c9d..17d5146bdf7b 100644 --- a/integration-tests/docker/router.conf +++ b/integration-tests/docker/router.conf @@ -8,7 +8,7 @@ command=java -Duser.timezone=UTC -Dfile.encoding=UTF-8 -Ddruid.host=%(ENV_HOST_IP)s - -Ddruid.zk.service.host=druid-zookeeper + -Ddruid.zk.service.host=druid-zookeeper-kafka -Ddruid.computation.buffer.size=75000000 -Ddruid.server.http.numThreads=100 -Ddruid.processing.numThreads=1 diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index 3bbbf9f199f1..6fe8d368b73a 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -29,6 +29,11 @@ 0.9.0-SNAPSHOT + + 0.8.2.1 + 0.4 + + io.druid @@ -40,6 +45,11 @@ druid-s3-extensions ${project.parent.version} + + io.druid.extensions + druid-kafka-eight + ${project.parent.version} + io.druid.extensions druid-histogram @@ -71,6 +81,29 @@ easymock test + + com.101tec + zkclient + ${zkclient.version} + + + org.apache.zookeeper + zookeeper + + + + + org.apache.kafka + kafka_2.10 + ${apache.kafka.version} + + + + org.slf4j + slf4j-log4j12 + + + diff --git a/integration-tests/run_cluster.sh b/integration-tests/run_cluster.sh index ea543915e8d8..1cccc9228521 100755 --- a/integration-tests/run_cluster.sh +++ b/integration-tests/run_cluster.sh @@ -1,5 +1,5 @@ # cleanup -for node in druid-historical druid-coordinator druid-overlord druid-router druid-broker druid-middlemanager druid-zookeeper druid-metadata-storage; +for node in druid-historical druid-coordinator druid-overlord druid-router druid-broker druid-middlemanager druid-zookeeper-kafka druid-metadata-storage; do docker stop $node docker rm $node @@ -12,6 +12,9 @@ SHARED_DIR=${HOME}/shared SUPERVISORDIR=/usr/lib/druid/conf RESOURCEDIR=$DIR/src/test/resources +# so docker IP addr will be known during docker build +echo $DOCKER_IP > $DOCKERDIR/docker_ip + # Make directories if they dont exist mkdir -p $SHARED_DIR/logs mkdir -p $SHARED_DIR/tasklogs @@ -24,26 +27,26 @@ mvn dependency:copy-dependencies -DoutputDirectory=$SHARED_DIR/docker/lib # Build Druid Cluster Image docker build -t druid/cluster $SHARED_DIR/docker -# Start zookeeper -docker run -d --name druid-zookeeper -p 2181:2181 -v $SHARED_DIR:/shared -v $DOCKERDIR/zookeeper.conf:$SUPERVISORDIR/zookeeper.conf druid/cluster +# Start zookeeper and kafka +docker run -d --name druid-zookeeper-kafka -p 2181:2181 -p 9092:9092 -v $SHARED_DIR:/shared -v $DOCKERDIR/zookeeper.conf:$SUPERVISORDIR/zookeeper.conf -v $DOCKERDIR/kafka.conf:$SUPERVISORDIR/kafka.conf druid/cluster # Start MYSQL docker run -d --name druid-metadata-storage -v $SHARED_DIR:/shared -v $DOCKERDIR/metadata-storage.conf:$SUPERVISORDIR/metadata-storage.conf druid/cluster # Start Overlord -docker run -d --name druid-overlord -p 8090:8090 -v $SHARED_DIR:/shared -v $DOCKERDIR/overlord.conf:$SUPERVISORDIR/overlord.conf --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper:druid-zookeeper druid/cluster +docker run -d --name druid-overlord -p 8090:8090 -v $SHARED_DIR:/shared -v $DOCKERDIR/overlord.conf:$SUPERVISORDIR/overlord.conf --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster # Start Coordinator -docker run -d --name druid-coordinator -p 8081:8081 -v $SHARED_DIR:/shared -v $DOCKERDIR/coordinator.conf:$SUPERVISORDIR/coordinator.conf --link druid-overlord:druid-overlord --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper:druid-zookeeper druid/cluster +docker run -d --name druid-coordinator -p 8081:8081 -v $SHARED_DIR:/shared -v $DOCKERDIR/coordinator.conf:$SUPERVISORDIR/coordinator.conf --link druid-overlord:druid-overlord --link druid-metadata-storage:druid-metadata-storage --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster # Start Historical -docker run -d --name druid-historical -v $SHARED_DIR:/shared -v $DOCKERDIR/historical.conf:$SUPERVISORDIR/historical.conf --link druid-zookeeper:druid-zookeeper druid/cluster +docker run -d --name druid-historical -v $SHARED_DIR:/shared -v $DOCKERDIR/historical.conf:$SUPERVISORDIR/historical.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka druid/cluster # Start Middlemanger -docker run -d --name druid-middlemanager -p 8100:8100 -p 8101:8101 -p 8102:8102 -p 8103:8103 -p 8104:8104 -p 8105:8105 -v $RESOURCEDIR:/resources -v $SHARED_DIR:/shared -v $DOCKERDIR/middlemanager.conf:$SUPERVISORDIR/middlemanager.conf --link druid-zookeeper:druid-zookeeper --link druid-overlord:druid-overlord druid/cluster +docker run -d --name druid-middlemanager -p 8100:8100 -p 8101:8101 -p 8102:8102 -p 8103:8103 -p 8104:8104 -p 8105:8105 -v $RESOURCEDIR:/resources -v $SHARED_DIR:/shared -v $DOCKERDIR/middlemanager.conf:$SUPERVISORDIR/middlemanager.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-overlord:druid-overlord druid/cluster # Start Broker -docker run -d --name druid-broker -p 8082:8082 -v $SHARED_DIR:/shared -v $DOCKERDIR/broker.conf:$SUPERVISORDIR/broker.conf --link druid-zookeeper:druid-zookeeper --link druid-middlemanager:druid-middlemanager --link druid-historical:druid-historical druid/cluster +docker run -d --name druid-broker -p 8082:8082 -v $SHARED_DIR:/shared -v $DOCKERDIR/broker.conf:$SUPERVISORDIR/broker.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-middlemanager:druid-middlemanager --link druid-historical:druid-historical druid/cluster # Start Router -docker run -d --name druid-router -p 8888:8888 -v $SHARED_DIR:/shared -v $DOCKERDIR/router.conf:$SUPERVISORDIR/router.conf --link druid-zookeeper:druid-zookeeper --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster +docker run -d --name druid-router -p 8888:8888 -v $SHARED_DIR:/shared -v $DOCKERDIR/router.conf:$SUPERVISORDIR/router.conf --link druid-zookeeper-kafka:druid-zookeeper-kafka --link druid-coordinator:druid-coordinator --link druid-broker:druid-broker druid/cluster diff --git a/integration-tests/src/main/java/io/druid/testing/ConfigFileConfigProvider.java b/integration-tests/src/main/java/io/druid/testing/ConfigFileConfigProvider.java index 6975952ee3b3..b181f855ece0 100644 --- a/integration-tests/src/main/java/io/druid/testing/ConfigFileConfigProvider.java +++ b/integration-tests/src/main/java/io/druid/testing/ConfigFileConfigProvider.java @@ -42,6 +42,7 @@ public class ConfigFileConfigProvider implements IntegrationTestingConfigProvide private String indexerHost = ""; private String middleManagerHost = ""; private String zookeeperHosts = ""; // comma-separated list of host:port + private String kafkaHost = ""; private Map props = null; @JsonCreator @@ -62,20 +63,26 @@ private void loadProperties(String configFile) catch (IOException ex) { throw new RuntimeException(ex); } - routerHost = props.get("router_host") + ":" + props.get("router_port"); + // there might not be a router; we want routerHost to be null in that case + routerHost = props.get("router_host"); + if (null != routerHost) { + routerHost += ":" + props.get("router_port"); + } brokerHost = props.get("broker_host") + ":" + props.get("broker_port"); historicalHost = props.get("historical_host") + ":" + props.get("historical_port"); coordinatorHost = props.get("coordinator_host") + ":" + props.get("coordinator_port"); indexerHost = props.get("indexer_host") + ":" + props.get("indexer_port"); middleManagerHost = props.get("middlemanager_host"); zookeeperHosts = props.get("zookeeper_hosts"); + kafkaHost = props.get("kafka_host") + ":" + props.get ("kafka_port"); LOG.info ("router: [%s]", routerHost); - LOG.info ("broker [%s]: ", brokerHost); + LOG.info ("broker: [%s]", brokerHost); LOG.info ("coordinator: [%s]", coordinatorHost); LOG.info ("overlord: [%s]", indexerHost); LOG.info ("middle manager: [%s]", middleManagerHost); LOG.info ("zookeepers: [%s]", zookeeperHosts); + LOG.info ("kafka: [%s]", kafkaHost); } @Override @@ -125,6 +132,12 @@ public String getZookeeperHosts() return zookeeperHosts; } + @Override + public String getKafkaHost() + { + return kafkaHost; + } + @Override public String getProperty(String keyword) { diff --git a/integration-tests/src/main/java/io/druid/testing/DockerConfigProvider.java b/integration-tests/src/main/java/io/druid/testing/DockerConfigProvider.java index 1068c0a8f60c..eea79aa1715a 100644 --- a/integration-tests/src/main/java/io/druid/testing/DockerConfigProvider.java +++ b/integration-tests/src/main/java/io/druid/testing/DockerConfigProvider.java @@ -76,6 +76,12 @@ public String getZookeeperHosts() return dockerIp + ":2181"; } + @Override + public String getKafkaHost() + { + return dockerIp + ":9092"; + } + @Override public String getProperty(String prop) { diff --git a/integration-tests/src/main/java/io/druid/testing/IntegrationTestingConfig.java b/integration-tests/src/main/java/io/druid/testing/IntegrationTestingConfig.java index 162ca9736e02..ace8d8fb866a 100644 --- a/integration-tests/src/main/java/io/druid/testing/IntegrationTestingConfig.java +++ b/integration-tests/src/main/java/io/druid/testing/IntegrationTestingConfig.java @@ -35,5 +35,7 @@ public interface IntegrationTestingConfig public String getZookeeperHosts(); + public String getKafkaHost(); + public String getProperty(String prop); } diff --git a/integration-tests/src/main/java/io/druid/testing/IntegrationTestingCuratorConfig.java b/integration-tests/src/main/java/io/druid/testing/IntegrationTestingCuratorConfig.java new file mode 100644 index 000000000000..752e8a332a58 --- /dev/null +++ b/integration-tests/src/main/java/io/druid/testing/IntegrationTestingCuratorConfig.java @@ -0,0 +1,45 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.testing; + +import com.google.inject.Inject; +import io.druid.curator.CuratorConfig; + +/** + * We will use this instead of druid server's CuratorConfig, because CuratorConfig in + * a test cluster environment sees zookeeper at localhost even if zookeeper is elsewhere. + * We'll take the zookeeper host from the configuration file instead. + */ +public class IntegrationTestingCuratorConfig extends CuratorConfig +{ + private IntegrationTestingConfig config; + + @Inject + public IntegrationTestingCuratorConfig (IntegrationTestingConfig config) + { + this.config = config; + } + + @Override + public String getZkHosts() + { + return config.getZookeeperHosts(); + } +} diff --git a/integration-tests/src/main/java/io/druid/testing/clients/OverlordResourceTestClient.java b/integration-tests/src/main/java/io/druid/testing/clients/OverlordResourceTestClient.java index 69c15c095a5f..a4880c4b5743 100644 --- a/integration-tests/src/main/java/io/druid/testing/clients/OverlordResourceTestClient.java +++ b/integration-tests/src/main/java/io/druid/testing/clients/OverlordResourceTestClient.java @@ -115,7 +115,7 @@ public String submitTask(String task) public TaskStatus.Status getTaskStatus(String taskID) { try { - StatusResponseHolder response = makeRequest( + StatusResponseHolder response = makeRequest( HttpMethod.GET, String.format( "%stask/%s/status", getIndexerURL(), @@ -156,7 +156,7 @@ public List getPendingTasks() private List getTasks(String identifier) { try { - StatusResponseHolder response = makeRequest( + StatusResponseHolder response = makeRequest( HttpMethod.GET, String.format("%s%s", getIndexerURL(), identifier) ); LOG.info("Tasks %s response %s", identifier, response.getContent()); @@ -171,6 +171,26 @@ private List getTasks(String identifier) } } + public Map shutDownTask(String taskID) + { + try { + StatusResponseHolder response = makeRequest( HttpMethod.POST, + String.format("%stask/%s/shutdown", getIndexerURL(), + URLEncoder.encode(taskID, "UTF-8") + ) + ); + LOG.info("Shutdown Task %s response %s", taskID, response.getContent()); + return jsonMapper.readValue( + response.getContent(), new TypeReference>() + { + } + ); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + public void waitUntilTaskCompletes(final String taskID) { RetryUtil.retryUntil( @@ -193,11 +213,11 @@ public Boolean call() throws Exception ); } - private StatusResponseHolder makeRequest(String url) + private StatusResponseHolder makeRequest(HttpMethod method, String url) { try { StatusResponseHolder response = this.httpClient - .go(new Request(HttpMethod.GET, new URL(url)), responseHandler).get(); + .go(new Request(method, new URL(url)), responseHandler).get(); if (!response.getStatus().equals(HttpResponseStatus.OK)) { throw new ISE("Error while making request to indexer [%s %s]", response.getStatus(), response.getContent()); } diff --git a/integration-tests/src/main/java/io/druid/testing/guice/DruidTestModule.java b/integration-tests/src/main/java/io/druid/testing/guice/DruidTestModule.java index 1524e1d7fb33..eb8ddd104645 100644 --- a/integration-tests/src/main/java/io/druid/testing/guice/DruidTestModule.java +++ b/integration-tests/src/main/java/io/druid/testing/guice/DruidTestModule.java @@ -30,6 +30,8 @@ import io.druid.guice.ManageLifecycle; import io.druid.testing.IntegrationTestingConfig; import io.druid.testing.IntegrationTestingConfigProvider; +import io.druid.curator.CuratorConfig; +import io.druid.testing.IntegrationTestingCuratorConfig; /** */ @@ -40,6 +42,8 @@ public void configure(Binder binder) { binder.bind(IntegrationTestingConfig.class).toProvider(IntegrationTestingConfigProvider.class).in(ManageLifecycle.class); JsonConfigProvider.bind(binder, "druid.test.config", IntegrationTestingConfigProvider.class); + + binder.bind(CuratorConfig.class).to(IntegrationTestingCuratorConfig.class); } @Provides diff --git a/integration-tests/src/main/java/org/testng/DruidTestRunnerFactory.java b/integration-tests/src/main/java/org/testng/DruidTestRunnerFactory.java index 64b47cdb71b2..e94f9993e4ea 100644 --- a/integration-tests/src/main/java/org/testng/DruidTestRunnerFactory.java +++ b/integration-tests/src/main/java/org/testng/DruidTestRunnerFactory.java @@ -90,7 +90,11 @@ public void run() ; waitUntilInstanceReady(client, config.getCoordinatorHost()); waitUntilInstanceReady(client, config.getIndexerHost()); - waitUntilInstanceReady(client, config.getRouterHost()); + waitUntilInstanceReady(client, config.getBrokerHost()); + String routerHost = config.getRouterHost(); + if (null != routerHost) { + waitUntilInstanceReady(client, config.getRouterHost()); + } Lifecycle lifecycle = injector.getInstance(Lifecycle.class); try { lifecycle.start(); diff --git a/integration-tests/src/test/java/io/druid/tests/indexer/ITKafkaTest.java b/integration-tests/src/test/java/io/druid/tests/indexer/ITKafkaTest.java new file mode 100644 index 000000000000..621df21ded99 --- /dev/null +++ b/integration-tests/src/test/java/io/druid/tests/indexer/ITKafkaTest.java @@ -0,0 +1,304 @@ +/* +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ + +package io.druid.tests.indexer; + +import com.google.common.base.Throwables; +import com.google.inject.Inject; +import com.metamx.common.ISE; +import com.metamx.common.logger.Logger; +import io.druid.testing.IntegrationTestingConfig; +import io.druid.testing.guice.DruidTestModuleFactory; +import io.druid.testing.utils.RetryUtil; +import io.druid.testing.utils.TestQueryHelper; +import kafka.admin.AdminUtils; +import kafka.common.TopicExistsException; +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; +import kafka.utils.ZKStringSerializer$; +import org.I0Itec.zkclient.ZkClient; + +import java.util.concurrent.TimeUnit; + +import org.joda.time.DateTime; +import org.joda.time.DateTimeZone; +import org.joda.time.format.DateTimeFormat; +import org.joda.time.format.DateTimeFormatter; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Guice; +import org.testng.annotations.Test; + +import org.apache.commons.io.IOUtils; + +import java.io.InputStream; +import java.io.IOException; + +import java.util.Properties; +import java.util.concurrent.Callable; + +/* + * This is a test for the kafka firehose. + */ +@Guice(moduleFactory = DruidTestModuleFactory.class) +public class ITKafkaTest extends AbstractIndexerTest +{ + private static final Logger LOG = new Logger(ITKafkaTest.class); + private static final int DELAY_BETWEEN_EVENTS_SECS = 5; + private static final String INDEXER_FILE = "/indexer/kafka_index_task.json"; + private static final String QUERIES_FILE = "/indexer/kafka_index_queries.json"; + private static final String DATASOURCE = "kafka_test"; + private static final String TOPIC_NAME = "kafkaTopic"; + private static final int MINUTES_TO_SEND = 2; + + // We'll fill in the current time and numbers for added, deleted and changed + // before sending the event. + final String event_template = + "{\"timestamp\": \"%s\"," + + "\"page\": \"Gypsy Danger\"," + + "\"language\" : \"en\"," + + "\"user\" : \"nuclear\"," + + "\"unpatrolled\" : \"true\"," + + "\"newPage\" : \"true\"," + + "\"robot\": \"false\"," + + "\"anonymous\": \"false\"," + + "\"namespace\":\"article\"," + + "\"continent\":\"North America\"," + + "\"country\":\"United States\"," + + "\"region\":\"Bay Area\"," + + "\"city\":\"San Francisco\"," + + "\"added\":%d," + + "\"deleted\":%d," + + "\"delta\":%d}"; + + private String taskID; + private ZkClient zkClient; + private Boolean segmentsExist; // to tell if we should remove segments during teardown + + // format for the querying interval + private final DateTimeFormatter INTERVAL_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:'00Z'"); + // format for the expected timestamp in a query response + private final DateTimeFormatter TIMESTAMP_FMT = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'.000Z'"); + private DateTime dtFirst; // timestamp of 1st event + private DateTime dtLast; // timestamp of last event + + @Inject + private TestQueryHelper queryHelper; + @Inject + private IntegrationTestingConfig config; + + @Test + public void testKafka() + { + LOG.info("Starting test: ITKafkaTest"); + + // create topic + try { + int sessionTimeoutMs = 10000; + int connectionTimeoutMs = 10000; + String zkHosts = config.getZookeeperHosts(); + zkClient = new ZkClient( + zkHosts, sessionTimeoutMs, connectionTimeoutMs, + ZKStringSerializer$.MODULE$ + ); + int numPartitions = 1; + int replicationFactor = 1; + Properties topicConfig = new Properties(); + AdminUtils.createTopic(zkClient, TOPIC_NAME, numPartitions, replicationFactor, topicConfig); + } + catch (TopicExistsException e) { + // it's ok if the topic already exists + } + catch (Exception e) { + throw new ISE(e, "could not create kafka topic"); + } + + String indexerSpec = ""; + + // replace temp strings in indexer file + try { + LOG.info("indexerFile name: [%s]", INDEXER_FILE); + indexerSpec = getTaskAsString(INDEXER_FILE); + indexerSpec = indexerSpec.replaceAll("%%TOPIC%%", TOPIC_NAME); + indexerSpec = indexerSpec.replaceAll("%%ZOOKEEPER_SERVER%%", config.getZookeeperHosts()); + indexerSpec = indexerSpec.replaceAll("%%GROUP_ID%%", Long.toString(System.currentTimeMillis())); + indexerSpec = indexerSpec.replaceAll( + "%%SHUTOFFTIME%%", + new DateTime( + System.currentTimeMillis() + TimeUnit.MINUTES.toMillis( + 2 + * MINUTES_TO_SEND + ) + ).toString() + ); + LOG.info("indexerFile: [%s]\n", indexerSpec); + } + catch (Exception e) { + // log here so the message will appear in the console output + LOG.error("could not read indexer file [%s]", INDEXER_FILE); + throw new ISE(e, "could not read indexer file [%s]", INDEXER_FILE); + } + + // start indexing task + taskID = indexer.submitTask(indexerSpec); + LOG.info("-------------SUBMITTED TASK"); + + // set up kafka producer + Properties properties = new Properties(); + properties.put("metadata.broker.list", config.getKafkaHost()); + LOG.info("kafka host: [%s]", config.getKafkaHost()); + properties.put("serializer.class", "kafka.serializer.StringEncoder"); + properties.put("request.required.acks", "1"); + properties.put("producer.type", "async"); + ProducerConfig producerConfig = new ProducerConfig(properties); + Producer producer = new Producer(producerConfig); + + DateTimeZone zone = DateTimeZone.forID("UTC"); + // format for putting into events + DateTimeFormatter event_fmt = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss'Z'"); + + DateTime dt = new DateTime(zone); // timestamp to put on events + dtFirst = dt; // timestamp of 1st event + dtLast = dt; // timestamp of last event + // stop sending events when time passes this + DateTime dtStop = dtFirst.plusMinutes(MINUTES_TO_SEND).plusSeconds(30); + + // these are used to compute the expected aggregations + int added = 0; + int num_events = 0; + + // send data to kafka + while (dt.compareTo(dtStop) < 0) { // as long as we're within the time span + LOG.info("sending event at [%s]", event_fmt.print(dt)); + num_events++; + added += num_events; + // construct the event to send + String event = String.format( + event_template, + event_fmt.print(dt), num_events, 0, num_events + ); + LOG.debug("event: [%s]", event); + try { + // Send event to kafka + KeyedMessage message = new KeyedMessage(TOPIC_NAME, event); + producer.send(message); + } + catch (Exception ioe) { + Throwables.propagate(ioe); + } + + try { + Thread.sleep(DELAY_BETWEEN_EVENTS_SECS * 1000); + } + catch (InterruptedException ex) { /* nothing */ } + dtLast = dt; + dt = new DateTime(zone); + } + + producer.close(); + + // put the timestamps into the query structure + String query_response_template = null; + InputStream is = ITKafkaTest.class.getResourceAsStream(QUERIES_FILE); + if (null == is) { + throw new ISE("could not open query file: %s", QUERIES_FILE); + } + + try { + query_response_template = IOUtils.toString(is, "UTF-8"); + } catch (IOException e) { + throw new ISE(e, "could not read query file: %s", QUERIES_FILE); + } + + String queryStr = query_response_template + // time boundary + .replace("%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst)) + .replace("%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", TIMESTAMP_FMT.print(dtLast)) + .replace("%%TIMEBOUNDARY_RESPONSE_MINTIME%%", TIMESTAMP_FMT.print(dtFirst)) + // time series + .replace("%%TIMESERIES_QUERY_START%%", INTERVAL_FMT.print(dtFirst)) + .replace("%%TIMESERIES_QUERY_END%%", INTERVAL_FMT.print(dtFirst.plusMinutes(MINUTES_TO_SEND + 2))) + .replace("%%TIMESERIES_RESPONSE_TIMESTAMP%%", TIMESTAMP_FMT.print(dtFirst)) + .replace("%%TIMESERIES_ADDED%%", Integer.toString(added)) + .replace("%%TIMESERIES_NUMEVENTS%%", Integer.toString(num_events)); + + // this query will probably be answered from the realtime task + try { + this.queryHelper.testQueriesFromString(queryStr, 2); + } catch (Exception e) { + Throwables.propagate(e); + } + + // wait for segments to be handed off + try { + RetryUtil.retryUntil( + new Callable() + { + @Override + public Boolean call() throws Exception + { + return coordinator.areSegmentsLoaded(DATASOURCE); + } + }, + true, + 30000, + 10, + "Real-time generated segments loaded" + ); + } + catch (Exception e) { + Throwables.propagate(e); + } + LOG.info("segments are present"); + segmentsExist = true; + + // this query will be answered by historical + try { + this.queryHelper.testQueriesFromString(queryStr, 2); + } + catch (Exception e) { + Throwables.propagate(e); + } + } + + @AfterClass + public void afterClass() + { + LOG.info("teardown"); + + // wait for the task to complete + indexer.waitUntilTaskCompletes(taskID); + + // delete kafka topic + AdminUtils.deleteTopic(zkClient, TOPIC_NAME); + + // remove segments + if (segmentsExist) { + try { + String first = DateTimeFormat.forPattern("yyyy-MM-dd'T00:00:00.000Z'").print(dtFirst); + String last = DateTimeFormat.forPattern("yyyy-MM-dd'T00:00:00.000Z'").print(dtFirst.plusDays(1)); + unloadAndKillData(DATASOURCE, first, last); + } + catch (Exception e) { + LOG.warn("exception while removing segments: [%s]", e.getMessage()); + } + } + } +} + diff --git a/integration-tests/src/test/resources/indexer/kafka_index_queries.json b/integration-tests/src/test/resources/indexer/kafka_index_queries.json new file mode 100644 index 000000000000..46c2bf0b56f9 --- /dev/null +++ b/integration-tests/src/test/resources/indexer/kafka_index_queries.json @@ -0,0 +1,40 @@ +[ + { + "description": "timeBoundary", + "query": { + "queryType":"timeBoundary", + "dataSource":"kafka_test" + }, + "expectedResults":[ + { + "timestamp":"%%TIMEBOUNDARY_RESPONSE_TIMESTAMP%%", + "result": { + "maxTime" : "%%TIMEBOUNDARY_RESPONSE_MAXTIME%%", + "minTime":"%%TIMEBOUNDARY_RESPONSE_MINTIME%%" + } + } + ] + }, + { + "description": "timeseries", + "query": { + "queryType": "timeseries", + "dataSource": "kafka_test", + "intervals": [ "%%TIMESERIES_QUERY_START%%/%%TIMESERIES_QUERY_END%%" ], + "granularity": "all", + "aggregations": [ + {"type": "longSum", "fieldName": "count", "name": "edit_count"}, + {"type": "longSum", "fieldName": "added", "name": "chars_added"} + ] + }, + "expectedResults": [ + { + "timestamp" : "%%TIMESERIES_RESPONSE_TIMESTAMP%%", + "result" : { + "chars_added" : %%TIMESERIES_ADDED%%, + "edit_count" : %%TIMESERIES_NUMEVENTS%% + } + } + ] + } +] diff --git a/integration-tests/src/test/resources/indexer/kafka_index_task.json b/integration-tests/src/test/resources/indexer/kafka_index_task.json new file mode 100644 index 000000000000..626a9403a604 --- /dev/null +++ b/integration-tests/src/test/resources/indexer/kafka_index_task.json @@ -0,0 +1,77 @@ +{ + "type" : "index_realtime", + "spec" : { + "dataSchema": { + "dataSource": "kafka_test", + "parser" : { + "type" : "string", + "parseSpec" : { + "format" : "json", + "timestampSpec" : { + "column" : "timestamp", + "format" : "auto" + }, + "dimensionsSpec" : { + "dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"], + "dimensionExclusions" : [], + "spatialDimensions" : [] + } + } + }, + "metricsSpec": [ + { + "type": "count", + "name": "count" + }, + { + "type": "doubleSum", + "name": "added", + "fieldName": "added" + }, + { + "type": "doubleSum", + "name": "deleted", + "fieldName": "deleted" + }, + { + "type": "doubleSum", + "name": "delta", + "fieldName": "delta" + } + ], + "granularitySpec": { + "type" : "uniform", + "segmentGranularity": "MINUTE", + "queryGranularity": "NONE" + } + }, + "ioConfig" : { + "type" : "realtime", + "firehose": { + "type": "timed", + "shutoffTime": "%%SHUTOFFTIME%%", + "delegate": { + "type": "kafka-0.8", + "consumerProps": { + "zookeeper.connect": "%%ZOOKEEPER_SERVER%%", + "zookeeper.connection.timeout.ms" : "15000", + "zookeeper.session.timeout.ms" : "15000", + "zookeeper.sync.time.ms" : "5000", + "group.id": "%%GROUP_ID%%", + "fetch.message.max.bytes" : "1048586", + "auto.offset.reset": "smallest", + "auto.commit.enable": "false" + }, + "feed": "%%TOPIC%%" + } + } + }, + "tuningConfig": { + "type" : "realtime", + "maxRowsInMemory": 500000, + "intermediatePersistPeriod": "PT3M", + "windowPeriod": "PT1M", + "basePersistDirectory": "/home/y/var/druid_state/kafka_test/realtime/basePersist" + } + } +} diff --git a/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json b/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json index 995378aab3d5..8f6ff4c7f4b8 100644 --- a/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json +++ b/integration-tests/src/test/resources/queries/wikipedia_editstream_queries.json @@ -373,7 +373,7 @@ "context": { "useCache": "true", "populateCache": "true", - "timeout": 180000 + "timeout": 360000 } }, "expectedResults": [ diff --git a/integration-tests/stop_cluster.sh b/integration-tests/stop_cluster.sh index b7bd21bac529..d242cf47c6e6 100755 --- a/integration-tests/stop_cluster.sh +++ b/integration-tests/stop_cluster.sh @@ -1,4 +1,4 @@ -for node in druid-historical druid-coordinator druid-overlord druid-router druid-broker druid-middlemanager druid-zookeeper druid-metadata-storage; +for node in druid-historical druid-coordinator druid-overlord druid-router druid-broker druid-middlemanager druid-zookeeper-kafka druid-metadata-storage; do docker stop $node docker rm $node