From 2e0dd1d792dfcef196bb2d5a4168844948007007 Mon Sep 17 00:00:00 2001 From: Himanshu Gupta Date: Fri, 31 Jul 2015 11:36:03 -0500 Subject: [PATCH] adding UTs and addressing review comments to firehoseV2 addition to Realtime[Manager|Plumber], essential segment metadata persist support, kafka-simple-consumer-firehose extension patch --- .../druid/common/utils/SerializerUtils.java | 37 +-- .../kafka-simple-consumer-firehose.md | 8 +- docs/content/toc.textile | 1 + extensions/kafka-eight-simpleConsumer/pom.xml | 6 +- .../KafkaEightSimpleConsumerDruidModule.java | 41 ++-- ...fkaEightSimpleConsumerFirehoseFactory.java | 143 ++++++----- .../firehose/kafka/KafkaSimpleConsumer.java | 125 +++++----- .../io.druid.initialization.DruidModule | 21 +- .../indexer/LegacyIndexGeneratorJob.java | 2 +- .../common/index/YeOldePlumberSchool.java | 1 + .../indexing/common/TestRealtimeTaskV2.java | 88 ------- .../indexing/common/task/TaskSerdeTest.java | 47 ++-- .../IngestSegmentFirehoseFactoryTest.java | 2 +- .../overlord/RemoteTaskRunnerTest.java | 95 -------- pom.xml | 1 + .../main/java/io/druid/segment/IndexIO.java | 64 +++-- .../java/io/druid/segment/IndexMaker.java | 81 ++++--- .../java/io/druid/segment/IndexMerger.java | 105 ++++++-- .../io/druid/segment/IndexableAdapter.java | 2 - .../java/io/druid/segment/QueryableIndex.java | 35 +-- .../QueryableIndexIndexableAdapter.java | 38 ++- .../segment/RowboatFilteringIndexAdapter.java | 38 ++- .../druid/segment/SimpleQueryableIndex.java | 44 ++-- .../incremental/IncrementalIndexAdapter.java | 38 ++- .../aggregation/AggregationTestHelper.java | 2 +- .../java/io/druid/segment/EmptyIndexTest.java | 1 + .../segment/IndexMakerParameterizedTest.java | 77 +++++- .../java/io/druid/segment/IndexMakerTest.java | 58 ++++- .../io/druid/segment/IndexMergerTest.java | 134 ++++++++-- .../io/druid/segment/SchemalessIndex.java | 54 +++-- .../test/java/io/druid/segment/TestIndex.java | 38 +-- .../filter/SpatialFilterBonusTest.java | 8 +- .../segment/filter/SpatialFilterTest.java | 8 +- .../segment/indexing/RealtimeIOConfig.java | 36 +-- .../segment/realtime/FireDepartment.java | 35 +-- .../segment/realtime/RealtimeManager.java | 68 +++--- .../firehose/LocalFirehoseFactoryV2.java | 229 ------------------ .../realtime/plumber/RealtimePlumber.java | 163 ++++++++----- .../segment/realtime/RealtimeManagerTest.java | 44 ++-- .../plumber/RealtimePlumberSchoolTest.java | 99 ++++++-- 40 files changed, 1072 insertions(+), 1045 deletions(-) delete mode 100644 indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTaskV2.java delete mode 100644 server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactoryV2.java diff --git a/common/src/main/java/io/druid/common/utils/SerializerUtils.java b/common/src/main/java/io/druid/common/utils/SerializerUtils.java index 0fb8b59af325..ec99c97f411d 100644 --- a/common/src/main/java/io/druid/common/utils/SerializerUtils.java +++ b/common/src/main/java/io/druid/common/utils/SerializerUtils.java @@ -1,25 +1,28 @@ /* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ package io.druid.common.utils; import com.google.common.io.ByteStreams; import com.google.common.io.OutputSupplier; import com.google.common.primitives.Ints; +import com.metamx.common.StringUtils; import io.druid.collections.IntList; import java.io.IOException; @@ -61,13 +64,13 @@ public String readString(InputStream in) throws IOException final int length = readInt(in); byte[] stringBytes = new byte[length]; ByteStreams.readFully(in, stringBytes); - return new String(stringBytes, UTF8); + return StringUtils.fromUtf8(stringBytes); } public String readString(ByteBuffer in) throws IOException { final int length = in.getInt(); - return new String(readBytes(in, length), UTF8); + return StringUtils.fromUtf8(readBytes(in, length)); } public byte[] readBytes(ByteBuffer in, int length) throws IOException diff --git a/docs/content/development/kafka-simple-consumer-firehose.md b/docs/content/development/kafka-simple-consumer-firehose.md index ef21535f9c7a..e49074a80ad0 100644 --- a/docs/content/development/kafka-simple-consumer-firehose.md +++ b/docs/content/development/kafka-simple-consumer-firehose.md @@ -2,14 +2,14 @@ layout: doc_page --- # KafkaSimpleConsumerFirehose -This firehose acts as a Kafka simple consumer and ingests data from Kafka, currently still in experimental section. +This is an experimental firehose to ingest data from kafka using kafka simple consumer api. Currently, this firehose would only work inside standalone realtime nodes. The configuration for KafkaSimpleConsumerFirehose is similar to the KafkaFirehose [Kafka firehose example](realtime-ingestion.html#realtime-specfile), except `firehose` should be replaced with `firehoseV2` like this: ```json "firehoseV2": { "type" : "kafka-0.8-v2", "brokerList" : ["localhost:4443"], "queueBufferLength":10001, -"resetBehavior":"latest", +"resetOffsetToEarliest":"true", "partitionIdList" : ["0"], "clientId" : "localclient", "feed": "wikipedia" @@ -21,10 +21,10 @@ The configuration for KafkaSimpleConsumerFirehose is similar to the KafkaFirehos |type|kafka-0.8-v2|yes| |brokerList|list of the kafka brokers|yes| |queueBufferLength|the buffer length for kafka message queue|no default(20000)| -|resetBehavior|in case of kafkaOffsetOutOfRange error happens, consumer should starts from the earliest or latest message available|no default(earliest)| +|resetOffsetToEarliest|in case of kafkaOffsetOutOfRange error happens, consumer should starts from the earliest or latest message available|true| |partitionIdList|list of kafka partition ids|yes| |clientId|the clientId for kafka SimpleConsumer|yes| |feed|kafka topic|yes| -For using this firehose at scale and possibly in production, it is recommended to set replication factor to at least three, which means at least three Kafka brokers in the `brokerList`. For a 1*10^4 events per second topic, keeping one partition can work properly, but more partition could be added if higher throughput is required. +For using this firehose at scale and possibly in production, it is recommended to set replication factor to at least three, which means at least three Kafka brokers in the `brokerList`. For a 1*10^4 events per second kafka topic, keeping one partition can work properly, but more partitions could be added if higher throughput is required. diff --git a/docs/content/toc.textile b/docs/content/toc.textile index 8f75bd28da2c..6e158fa401e3 100644 --- a/docs/content/toc.textile +++ b/docs/content/toc.textile @@ -90,6 +90,7 @@ h2. Development ** "Select Query":../development/select-query.html ** "Approximate Histograms and Quantiles":../development/approximate-histograms.html ** "Router node":../development/router.html +** "New Kafka Firehose":../development/kafka-simple-consumer-firehose.html h2. Misc * "Papers & Talks":../misc/papers-and-talks.html diff --git a/extensions/kafka-eight-simpleConsumer/pom.xml b/extensions/kafka-eight-simpleConsumer/pom.xml index ee4de3f10358..88f1e6580141 100644 --- a/extensions/kafka-eight-simpleConsumer/pom.xml +++ b/extensions/kafka-eight-simpleConsumer/pom.xml @@ -27,7 +27,7 @@ io.druid druid - 0.8.0-SNAPSHOT + 0.8.2-SNAPSHOT ../../pom.xml @@ -36,6 +36,10 @@ io.druid druid-api + + com.metamx + emitter + org.apache.kafka kafka_2.10 diff --git a/extensions/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerDruidModule.java b/extensions/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerDruidModule.java index 7648b1f5b3e7..7b497b8eeb26 100644 --- a/extensions/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerDruidModule.java +++ b/extensions/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerDruidModule.java @@ -1,32 +1,33 @@ /* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ package io.druid.firehose.kafka; -import java.util.List; - import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.jsontype.NamedType; import com.fasterxml.jackson.databind.module.SimpleModule; import com.google.common.collect.ImmutableList; import com.google.inject.Binder; - import io.druid.initialization.DruidModule; +import java.util.List; + public class KafkaEightSimpleConsumerDruidModule implements DruidModule { @Override @@ -35,8 +36,8 @@ public List getJacksonModules() return ImmutableList.of( new SimpleModule("KafkaEightSimpleConsumerFirehoseModule").registerSubtypes( new NamedType(KafkaEightSimpleConsumerFirehoseFactory.class, "kafka-0.8-v2") - ) - ); + ) + ); } @Override diff --git a/extensions/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java b/extensions/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java index a06b4945aa55..482e0faa5819 100644 --- a/extensions/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java +++ b/extensions/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaEightSimpleConsumerFirehoseFactory.java @@ -1,50 +1,52 @@ /* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ package io.druid.firehose.kafka; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import com.google.common.io.Closeables; +import com.metamx.common.logger.Logger; +import com.metamx.emitter.EmittingLogger; +import io.druid.data.input.ByteBufferInputRowParser; +import io.druid.data.input.Committer; +import io.druid.data.input.FirehoseFactoryV2; +import io.druid.data.input.FirehoseV2; +import io.druid.data.input.InputRow; +import io.druid.firehose.kafka.KafkaSimpleConsumer.BytesMessageWithOffset; + import java.io.Closeable; import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; -import com.google.common.collect.Maps; -import com.metamx.common.logger.Logger; -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.collect.Sets; - -import io.druid.data.input.ByteBufferInputRowParser; -import io.druid.data.input.Committer; -import io.druid.data.input.FirehoseFactoryV2; -import io.druid.data.input.FirehoseV2; -import io.druid.data.input.InputRow; -import io.druid.firehose.kafka.KafkaSimpleConsumer.BytesMessageWithOffset; - public class KafkaEightSimpleConsumerFirehoseFactory implements FirehoseFactoryV2 { - private static final Logger log = new Logger( + private static final EmittingLogger log = new EmittingLogger( KafkaEightSimpleConsumerFirehoseFactory.class ); @@ -64,12 +66,12 @@ public class KafkaEightSimpleConsumerFirehoseFactory implements private final int queueBufferLength; @JsonProperty - private boolean earliest; + private final boolean earliest; private final List consumerWorkers = new CopyOnWriteArrayList<>(); private static final int DEFAULT_QUEUE_BUFFER_LENGTH = 20000; - private static final String RESET_TO_LATEST = "latest"; private static final int CONSUMER_FETCH_TIMEOUT = 10000; + @JsonCreator public KafkaEightSimpleConsumerFirehoseFactory( @JsonProperty("brokerList") List brokerList, @@ -77,19 +79,43 @@ public KafkaEightSimpleConsumerFirehoseFactory( @JsonProperty("clientId") String clientId, @JsonProperty("feed") String feed, @JsonProperty("queueBufferLength") Integer queueBufferLength, - @JsonProperty("resetBehavior") String resetBehavior + @JsonProperty("resetOffsetToEarliest") Boolean resetOffsetToEarliest ) { this.brokerList = brokerList; + Preconditions.checkArgument( + brokerList != null && brokerList.size() > 0, + "brokerList is null/empty" + ); + this.partitionIdList = partitionIdList; + Preconditions.checkArgument( + partitionIdList != null && partitionIdList.size() > 0, + "partitionIdList is null/empty" + ); + + this.clientId = clientId; + Preconditions.checkArgument( + clientId != null && !clientId.isEmpty(), + "clientId is null/empty" + ); + this.feed = feed; + Preconditions.checkArgument( + feed != null && !feed.isEmpty(), + "feed is null/empty" + ); this.queueBufferLength = queueBufferLength == null ? DEFAULT_QUEUE_BUFFER_LENGTH : queueBufferLength; + Preconditions.checkArgument(queueBufferLength > 0, "queueBufferLength must be positive number"); log.info("queueBufferLength loaded as[%s]", this.queueBufferLength); - this.earliest = RESET_TO_LATEST.equalsIgnoreCase(resetBehavior) ? false : true; - log.info("Default behavior of cosumer set to earliest? [%s]", this.earliest); + this.earliest = resetOffsetToEarliest == null ? true : resetOffsetToEarliest.booleanValue(); + log.info( + "if old offsets are not known, data from partition will be read from [%s] available offset.", + this.earliest ? "earliest" : "latest" + ); } private Map loadOffsetFromPreviousMetaData(Object lastCommit) @@ -113,7 +139,7 @@ private Map loadOffsetFromPreviousMetaData(Object lastCommit) } log.info("Loaded offset map[%s]", offsetMap); } else { - log.error("Unable to cast lastCommit to Map"); + log.makeAlert("Unable to cast lastCommit to Map for feed [%s]", feed); } return offsetMap; } @@ -123,20 +149,6 @@ public FirehoseV2 connect(final ByteBufferInputRowParser firehoseParser, Object { final Map lastOffsets = loadOffsetFromPreviousMetaData(lastCommit); - Set newDimExclus = Sets.union( - firehoseParser.getParseSpec().getDimensionsSpec().getDimensionExclusions(), - Sets.newHashSet("feed") - ); - final ByteBufferInputRowParser theParser = firehoseParser.withParseSpec( - firehoseParser.getParseSpec() - .withDimensionsSpec( - firehoseParser.getParseSpec() - .getDimensionsSpec() - .withDimensionExclusions( - newDimExclus - ) - ) - ); for (Integer partition : partitionIdList) { final KafkaSimpleConsumer kafkaSimpleConsumer = new KafkaSimpleConsumer( feed, partition, clientId, brokerList, earliest @@ -148,7 +160,9 @@ public FirehoseV2 connect(final ByteBufferInputRowParser firehoseParser, Object consumerWorkers.add(worker); } - final LinkedBlockingQueue messageQueue = new LinkedBlockingQueue(queueBufferLength); + final LinkedBlockingQueue messageQueue = new LinkedBlockingQueue( + queueBufferLength + ); log.info("Kicking off all consumers"); for (PartitionConsumerWorker worker : consumerWorkers) { worker.go(messageQueue); @@ -157,15 +171,13 @@ public FirehoseV2 connect(final ByteBufferInputRowParser firehoseParser, Object return new FirehoseV2() { - private ConcurrentMap lastOffsetPartitions; - private volatile boolean stop; - private volatile boolean interrupted; - + private Map lastOffsetPartitions; + private volatile boolean stopped; private volatile BytesMessageWithOffset msg = null; private volatile InputRow row = null; { - lastOffsetPartitions = Maps.newConcurrentMap(); + lastOffsetPartitions = Maps.newHashMap(); lastOffsetPartitions.putAll(lastOffsets); } @@ -178,7 +190,7 @@ public void start() throws Exception @Override public boolean advance() { - if (stop) { + if (stopped) { return false; } @@ -196,22 +208,22 @@ private void nextMessage() } msg = messageQueue.take(); - interrupted = false; final byte[] message = msg.message(); - row = message == null ? null : theParser.parse(ByteBuffer.wrap(message)); + row = message == null ? null : firehoseParser.parse(ByteBuffer.wrap(message)); } } catch (InterruptedException e) { - interrupted = true; - log.info(e, "Interrupted when taken from queue"); + //Let the caller decide whether to stop or continue when thread is interrupted. + log.warn(e, "Thread Interrupted while taking from queue, propagating the interrupt"); + Thread.currentThread().interrupt(); } } @Override public InputRow currRow() { - if (interrupted) { + if (stopped) { return null; } return row; @@ -242,9 +254,9 @@ public void run() public void close() throws IOException { log.info("Stopping kafka 0.8 simple firehose"); - stop = true; + stopped = true; for (PartitionConsumerWorker t : consumerWorkers) { - t.close(); + Closeables.close(t, true); } } }; @@ -268,7 +280,8 @@ private static class PartitionConsumerWorker implements Closeable this.startOffset = startOffset; } - public void go(final LinkedBlockingQueue messageQueue) { + public void go(final LinkedBlockingQueue messageQueue) + { thread = new Thread() { @Override diff --git a/extensions/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaSimpleConsumer.java b/extensions/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaSimpleConsumer.java index ad9760ff70df..8686ce5edf54 100644 --- a/extensions/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaSimpleConsumer.java +++ b/extensions/kafka-eight-simpleConsumer/src/main/java/io/druid/firehose/kafka/KafkaSimpleConsumer.java @@ -1,39 +1,37 @@ /* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ package io.druid.firehose.kafka; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import com.google.common.base.Function; +import com.google.common.base.Preconditions; import com.google.common.base.Predicate; +import com.google.common.net.HostAndPort; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.logger.Logger; import kafka.api.FetchRequest; import kafka.api.FetchRequestBuilder; import kafka.api.PartitionOffsetRequestInfo; -import kafka.javaapi.FetchResponse; import kafka.cluster.Broker; import kafka.common.ErrorMapping; import kafka.common.TopicAndPartition; +import kafka.javaapi.FetchResponse; import kafka.javaapi.OffsetRequest; import kafka.javaapi.OffsetResponse; import kafka.javaapi.PartitionMetadata; @@ -43,11 +41,16 @@ import kafka.javaapi.consumer.SimpleConsumer; import kafka.message.MessageAndOffset; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + /** - * refer @{link - * https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer - * +Example} - *

+ * refer @{link https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example} + *

* This class is not thread safe, the caller must ensure all the methods be * called from single thread */ @@ -58,7 +61,7 @@ public class KafkaSimpleConsumer private static final Logger log = new Logger(KafkaSimpleConsumer.class); - private final List allBrokers; + private final List allBrokers; private final String topic; private final int partitionId; private final String clientId; @@ -66,7 +69,7 @@ public class KafkaSimpleConsumer private final boolean earliest; private volatile Broker leaderBroker; - private List replicaBrokers; + private List replicaBrokers; private SimpleConsumer consumer = null; private static final int SO_TIMEOUT = 30000; @@ -76,21 +79,17 @@ public class KafkaSimpleConsumer public KafkaSimpleConsumer(String topic, int partitionId, String clientId, List brokers, boolean earliest) { - List brokerList = new ArrayList(); + List brokerList = new ArrayList<>(); for (String broker : brokers) { - String[] tokens = broker.split(":"); - if (tokens.length != 2) { - log.warn("wrong broker name [%s], its format should be host:port", broker); - continue; - } - - try { - brokerList.add(new KafkaBroker(tokens[0], Integer.parseInt(tokens[1]))); - } - catch (NumberFormatException e) { - log.warn("wrong broker name [%s], its format should be host:port", broker); - continue; - } + HostAndPort brokerHostAndPort = HostAndPort.fromString(broker); + Preconditions.checkArgument( + brokerHostAndPort.getHostText() != null && + !brokerHostAndPort.getHostText().isEmpty() && + brokerHostAndPort.hasPort(), + "kafka broker [%s] is not valid, must be :", + broker + ); + brokerList.add(brokerHostAndPort); } this.allBrokers = Collections.unmodifiableList(brokerList); @@ -101,7 +100,11 @@ public KafkaSimpleConsumer(String topic, int partitionId, String clientId, List< this.replicaBrokers = new ArrayList<>(); this.replicaBrokers.addAll(this.allBrokers); this.earliest = earliest; - log.info("KafkaSimpleConsumer initialized with clientId [%s] for message consumption and clientId [%s] for leader lookup", this.clientId, this.leaderLookupClientId); + log.info( + "KafkaSimpleConsumer initialized with clientId [%s] for message consumption and clientId [%s] for leader lookup", + this.clientId, + this.leaderLookupClientId + ); } private void ensureConsumer(Broker leader) throws InterruptedException @@ -150,24 +153,6 @@ public long offset() } } - static class KafkaBroker - { - final String host; - final int port; - - KafkaBroker(String host, int port) - { - this.host = host; - this.port = port; - } - - @Override - public String toString() - { - return String.format("%s:%d", host, port); - } - } - private Iterable filterAndDecode(Iterable kafkaMessages, final long offset) { return FunctionalIterable @@ -265,7 +250,7 @@ public Iterable fetch(long offset, int timeoutMs) throws if (errorCode == ErrorMapping.RequestTimedOutCode()) { log.info("kafka request timed out, response[%s]", response); } else if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) { - long newOffset = getOffset(earliest); + long newOffset = getOffset(earliest); log.info("got [%s] offset[%s] for [%s][%s]", earliest ? "earliest" : "latest", newOffset, topic, partitionId); if (newOffset < 0) { needNewLeader = true; @@ -307,7 +292,6 @@ private void stopConsumer() } } - // stop the consumer public void stop() { stopConsumer(); @@ -316,11 +300,11 @@ public void stop() private PartitionMetadata findLeader() throws InterruptedException { - for (KafkaBroker broker : replicaBrokers) { + for (HostAndPort broker : replicaBrokers) { SimpleConsumer consumer = null; try { - log.info("Finding new leader from Kafka brokers, try broker %s:%s", broker.host, broker.port); - consumer = new SimpleConsumer(broker.host, broker.port, SO_TIMEOUT, BUFFER_SIZE, leaderLookupClientId); + log.info("Finding new leader from Kafka brokers, try broker [%s]", broker.toString()); + consumer = new SimpleConsumer(broker.getHostText(), broker.getPort(), SO_TIMEOUT, BUFFER_SIZE, leaderLookupClientId); TopicMetadataResponse resp = consumer.send(new TopicMetadataRequest(Collections.singletonList(topic))); List metaData = resp.topicsMetadata(); @@ -358,7 +342,9 @@ private Broker findNewLeader(Broker oldLeader) throws InterruptedException if (metadata != null) { replicaBrokers.clear(); for (Broker replica : metadata.replicas()) { - replicaBrokers.add(new KafkaBroker(replica.host(), replica.port())); + replicaBrokers.add( + HostAndPort.fromParts(replica.host(), replica.port()) + ); } log.debug("Got new Kafka leader metadata : [%s], previous leader : [%s]", metadata, oldLeader); @@ -385,7 +371,8 @@ private Broker findNewLeader(Broker oldLeader) throws InterruptedException } } - private boolean isValidNewLeader(Broker broker) { + private boolean isValidNewLeader(Broker broker) + { // broker is considered valid new leader if it is not the same as old leaderBroker return !(leaderBroker.host().equalsIgnoreCase(broker.host()) && leaderBroker.port() == broker.port()); } @@ -393,7 +380,7 @@ private boolean isValidNewLeader(Broker broker) { private void ensureNotInterrupted(Exception e) throws InterruptedException { if (Thread.interrupted()) { - log.info(e, "Interrupted during fetching for %s - %s", topic, partitionId); + log.error(e, "Interrupted during fetching for %s - %s", topic, partitionId); throw new InterruptedException(); } } diff --git a/extensions/kafka-eight-simpleConsumer/src/main/resources/META-INF/services/io.druid.initialization.DruidModule b/extensions/kafka-eight-simpleConsumer/src/main/resources/META-INF/services/io.druid.initialization.DruidModule index f38689577eb4..2ebfa29f3663 100644 --- a/extensions/kafka-eight-simpleConsumer/src/main/resources/META-INF/services/io.druid.initialization.DruidModule +++ b/extensions/kafka-eight-simpleConsumer/src/main/resources/META-INF/services/io.druid.initialization.DruidModule @@ -1 +1,20 @@ -io.druid.firehose.kafka.KafkaEightSimpleConsumerDruidModule \ No newline at end of file +# +# Licensed to Metamarkets Group Inc. (Metamarkets) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. Metamarkets licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +io.druid.firehose.kafka.KafkaEightSimpleConsumerDruidModule diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/LegacyIndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/LegacyIndexGeneratorJob.java index 0ba9b0570571..5a937251765d 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/LegacyIndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/LegacyIndexGeneratorJob.java @@ -68,7 +68,7 @@ protected File persist( IncrementalIndex index, Interval interval, File file, ProgressIndicator progressIndicator ) throws IOException { - return IndexMerger.persist(index, interval, file, config.getIndexSpec(), progressIndicator); + return IndexMerger.persist(index, interval, file, null, config.getIndexSpec(), progressIndicator); } @Override diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java index d0ab4ecce648..cf6450428a4e 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java @@ -207,6 +207,7 @@ private void spillIfSwappable() IndexMerger.persist( indexToPersist.getIndex(), dirToPersist, + null, config.getIndexSpec() ); diff --git a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTaskV2.java b/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTaskV2.java deleted file mode 100644 index 9b65ddc4d084..000000000000 --- a/indexing-service/src/test/java/io/druid/indexing/common/TestRealtimeTaskV2.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.indexing.common; - -import java.io.File; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonTypeName; - -import io.druid.indexing.common.task.RealtimeIndexTask; -import io.druid.indexing.common.task.TaskResource; -import io.druid.query.aggregation.AggregatorFactory; -import io.druid.segment.indexing.DataSchema; -import io.druid.segment.indexing.RealtimeIOConfig; -import io.druid.segment.indexing.RealtimeTuningConfig; -import io.druid.segment.realtime.FireDepartment; -import io.druid.segment.realtime.FireDepartmentMetrics; -import io.druid.segment.realtime.firehose.LocalFirehoseFactoryV2; -import io.druid.segment.realtime.plumber.Plumber; -import io.druid.segment.realtime.plumber.PlumberSchool; - -/** - */ -@JsonTypeName("test_realtime") -public class TestRealtimeTaskV2 extends RealtimeIndexTask -{ - private final TaskStatus status; - - @JsonCreator - public TestRealtimeTaskV2( - @JsonProperty("id") String id, - @JsonProperty("resource") TaskResource taskResource, - @JsonProperty("dataSource") String dataSource, - @JsonProperty("taskStatus") TaskStatus status - ) - { - super( - id, - taskResource, - new FireDepartment( - new DataSchema(dataSource, null, new AggregatorFactory[]{}, null), - new RealtimeIOConfig( - null, - new PlumberSchool() { - @Override - public Plumber findPlumber( - DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics - ) - { - return null; - } - }, - new LocalFirehoseFactoryV2(new File("lol"), "rofl", null) - ), null - ) - ); - this.status = status; - } - - @Override - @JsonProperty - public String getType() - { - return "test_realtime"; - } - - @Override - public TaskStatus run(TaskToolbox toolbox) throws Exception - { - return status; - } -} diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 4fb546406fe3..3704e2a2795b 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -1,19 +1,21 @@ /* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ package io.druid.indexing.common.task; @@ -282,7 +284,8 @@ public Plumber findPlumber( return null; } }, - null), + null + ), new RealtimeTuningConfig( 1, @@ -333,8 +336,10 @@ public Plumber findPlumber( task.getRealtimeIngestionSchema().getDataSchema().getGranularitySpec().getSegmentGranularity(), task2.getRealtimeIngestionSchema().getDataSchema().getGranularitySpec().getSegmentGranularity() ); - Assert.assertEquals(task.getRealtimeIngestionSchema().getTuningConfig().getAggregationBufferRatio(), - task2.getRealtimeIngestionSchema().getTuningConfig().getAggregationBufferRatio(), 0.0f); + Assert.assertEquals( + task.getRealtimeIngestionSchema().getTuningConfig().getAggregationBufferRatio(), + task2.getRealtimeIngestionSchema().getTuningConfig().getAggregationBufferRatio(), 0.0f + ); } @Test @@ -463,9 +468,9 @@ public void testSegmentConvertSerde() throws IOException Assert.assertEquals( convertSegmentTaskOriginal.getIndexSpec().getBitmapSerdeFactory().getClass().getCanonicalName(), convertSegmentTask.getIndexSpec() - .getBitmapSerdeFactory() - .getClass() - .getCanonicalName() + .getBitmapSerdeFactory() + .getClass() + .getCanonicalName() ); Assert.assertEquals( convertSegmentTaskOriginal.getIndexSpec().getDimensionCompression(), diff --git a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java index c632c84ad6d6..bc0dfb547801 100644 --- a/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/firehose/IngestSegmentFirehoseFactoryTest.java @@ -136,7 +136,7 @@ public static Collection constructorFeeder() throws IOException if (!persistDir.mkdirs() && !persistDir.exists()) { throw new IOException(String.format("Could not create directory at [%s]", persistDir.getAbsolutePath())); } - IndexMerger.persist(index, persistDir, indexSpec); + IndexMerger.persist(index, persistDir, null, indexSpec); final TaskLockbox tl = new TaskLockbox(ts); final IndexerSQLMetadataStorageCoordinator mdc = new IndexerSQLMetadataStorageCoordinator(null, null, null) diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java index bc7d10f73f30..570417577998 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/RemoteTaskRunnerTest.java @@ -31,7 +31,6 @@ import com.metamx.common.lifecycle.Lifecycle; import com.metamx.emitter.EmittingLogger; import com.metamx.emitter.service.ServiceEmitter; - import io.druid.common.guava.DSuppliers; import io.druid.curator.PotentiallyGzippedCompressionProvider; import io.druid.curator.cache.SimplePathChildrenCacheFactory; @@ -39,7 +38,6 @@ import io.druid.indexing.common.TaskStatus; import io.druid.indexing.common.TestMergeTask; import io.druid.indexing.common.TestRealtimeTask; -import io.druid.indexing.common.TestRealtimeTaskV2; import io.druid.indexing.common.TestUtils; import io.druid.indexing.common.task.Task; import io.druid.indexing.common.task.TaskResource; @@ -50,7 +48,6 @@ import io.druid.jackson.DefaultObjectMapper; import io.druid.server.initialization.IndexerZkConfig; import io.druid.server.initialization.ZkPathsConfig; - import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; @@ -234,51 +231,6 @@ public boolean isValid() Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTaskId().equals("rt2")); } - @Test - public void testRunSameAvailabilityGroupV2() throws Exception - { - doSetup(); - - TestRealtimeTaskV2 task1 = new TestRealtimeTaskV2("rtV1", new TaskResource("rtV1", 1), "foo", TaskStatus.running("rtV1")); - remoteTaskRunner.run(task1); - Assert.assertTrue(taskAnnounced(task1.getId())); - mockWorkerRunningTask(task1); - - TestRealtimeTaskV2 task2 = new TestRealtimeTaskV2("rtV2", new TaskResource("rtV1", 1), "foo", TaskStatus.running("rtV2")); - remoteTaskRunner.run(task2); - - TestRealtimeTaskV2 task3 = new TestRealtimeTaskV2("rtV3", new TaskResource("rtV2", 1), "foo", TaskStatus.running("rtV3")); - remoteTaskRunner.run(task3); - - Assert.assertTrue( - TestUtils.conditionValid( - new IndexingServiceCondition() - { - @Override - public boolean isValid() - { - return remoteTaskRunner.getRunningTasks().size() == 2; - } - } - ) - ); - - Assert.assertTrue( - TestUtils.conditionValid( - new IndexingServiceCondition() - { - @Override - public boolean isValid() - { - return remoteTaskRunner.getPendingTasks().size() == 1; - } - } - ) - ); - - Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTaskId().equals("rtV2")); - } - @Test public void testRunWithCapacity() throws Exception { @@ -326,53 +278,6 @@ public boolean isValid() Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTaskId().equals("rt2")); } - @Test - public void testRunWithCapacityV2() throws Exception - { - doSetup(); - - TestRealtimeTaskV2 task1 = new TestRealtimeTaskV2("rtV1", new TaskResource("rtV1", 1), "foo", TaskStatus.running("rtV1")); - remoteTaskRunner.run(task1); - Assert.assertTrue(taskAnnounced(task1.getId())); - mockWorkerRunningTask(task1); - - TestRealtimeTaskV2 task2 = new TestRealtimeTaskV2("rtV2", new TaskResource("rtV2", 3), "foo", TaskStatus.running("rtV2")); - remoteTaskRunner.run(task2); - - TestRealtimeTaskV2 task3 = new TestRealtimeTaskV2("rtV3", new TaskResource("rtV3", 2), "foo", TaskStatus.running("rtV3")); - remoteTaskRunner.run(task3); - Assert.assertTrue(taskAnnounced(task3.getId())); - mockWorkerRunningTask(task3); - - Assert.assertTrue( - TestUtils.conditionValid( - new IndexingServiceCondition() - { - @Override - public boolean isValid() - { - return remoteTaskRunner.getRunningTasks().size() == 2; - } - } - ) - ); - - Assert.assertTrue( - TestUtils.conditionValid( - new IndexingServiceCondition() - { - @Override - public boolean isValid() - { - return remoteTaskRunner.getPendingTasks().size() == 1; - } - } - ) - ); - - Assert.assertTrue(remoteTaskRunner.getPendingTasks().iterator().next().getTaskId().equals("rtV2")); - } - @Test public void testStatusRemoved() throws Exception { diff --git a/pom.xml b/pom.xml index a748fdbab23c..15e298159d34 100644 --- a/pom.xml +++ b/pom.xml @@ -104,6 +104,7 @@ extensions/azure-extensions extensions/namespace-lookup extensions/kafka-extraction-namespace + extensions-distribution distribution diff --git a/processing/src/main/java/io/druid/segment/IndexIO.java b/processing/src/main/java/io/druid/segment/IndexIO.java index 057520a3983d..592e5bb6cc88 100644 --- a/processing/src/main/java/io/druid/segment/IndexIO.java +++ b/processing/src/main/java/io/druid/segment/IndexIO.java @@ -1,22 +1,25 @@ /* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ package io.druid.segment; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; @@ -236,7 +239,7 @@ public static boolean convertSegment( default: if (forceIfCurrent) { IndexMerger.convert(toConvert, converted, indexSpec); - if(validate){ + if (validate) { DefaultIndexIOHandler.validateTwoSegments(toConvert, converted); } return true; @@ -472,8 +475,8 @@ public MMappedIndex mapDir(File inDir) throws IOException public static void validateTwoSegments(File dir1, File dir2) throws IOException { - try(QueryableIndex queryableIndex1 = loadIndex(dir1)) { - try(QueryableIndex queryableIndex2 = loadIndex(dir2)) { + try (QueryableIndex queryableIndex1 = loadIndex(dir1)) { + try (QueryableIndex queryableIndex2 = loadIndex(dir2)) { validateTwoSegments( new QueryableIndexIndexableAdapter(queryableIndex1), new QueryableIndexIndexableAdapter(queryableIndex2) @@ -872,6 +875,11 @@ public boolean apply(String s) serializerUtils.writeString(writer, segmentBitmapSerdeFactoryString); writer.close(); + final ByteBuffer metadataBuffer = v8SmooshedFiles.mapFile("metadata.drd"); + if (metadataBuffer != null) { + v9Smoosher.add("metadata.drd", metadataBuffer); + } + log.info("Skipped files[%s]", skippedFiles); v9Smoosher.close(); @@ -1016,10 +1024,20 @@ public QueryableIndex load(File inDir) throws IOException segmentBitmapSerdeFactory = new BitmapSerde.LegacyBitmapSerdeFactory(); } - Object commitMetaData = null; - ByteBuffer metadata = smooshedFiles.mapFile("metadata.drd"); - if (metadata != null) { - commitMetaData = mapper.readValue(serializerUtils.readBytes(metadata, metadata.remaining()), Object.class); + Map metadata = null; + ByteBuffer metadataBB = smooshedFiles.mapFile("metadata.drd"); + if (metadataBB != null) { + try { + metadata = mapper.readValue( + serializerUtils.readBytes(metadataBB, metadataBB.remaining()), + new TypeReference>() + { + } + ); + } + catch (IOException ex) { + throw new IOException("Failed to read metadata", ex); + } } Map columns = Maps.newHashMap(); @@ -1031,7 +1049,7 @@ public QueryableIndex load(File inDir) throws IOException columns.put(Column.TIME_COLUMN_NAME, deserializeColumn(mapper, smooshedFiles.mapFile("__time"))); final QueryableIndex index = new SimpleQueryableIndex( - dataInterval, cols, dims, segmentBitmapSerdeFactory.getBitmapFactory(), columns, smooshedFiles, commitMetaData + dataInterval, cols, dims, segmentBitmapSerdeFactory.getBitmapFactory(), columns, smooshedFiles, metadata ); log.debug("Mapped v9 index[%s] in %,d millis", inDir, System.currentTimeMillis() - startTime); diff --git a/processing/src/main/java/io/druid/segment/IndexMaker.java b/processing/src/main/java/io/druid/segment/IndexMaker.java index 14874fca8a70..d7ce6e2583a2 100644 --- a/processing/src/main/java/io/druid/segment/IndexMaker.java +++ b/processing/src/main/java/io/druid/segment/IndexMaker.java @@ -1,19 +1,21 @@ /* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ package io.druid.segment; @@ -115,9 +117,14 @@ public class IndexMaker mapper = injector.getInstance(ObjectMapper.class); } - public static File persist(final IncrementalIndex index, File outDir, final Object commitMetaData, final IndexSpec indexSpec) throws IOException + public static File persist( + final IncrementalIndex index, + File outDir, + final Map segmentMetadata, + final IndexSpec indexSpec + ) throws IOException { - return persist(index, index.getInterval(), outDir, commitMetaData, indexSpec); + return persist(index, index.getInterval(), outDir, segmentMetadata, indexSpec); } /** @@ -134,12 +141,12 @@ public static File persist( final IncrementalIndex index, final Interval dataInterval, File outDir, - final Object commitMetaData, + final Map segmentMetadata, final IndexSpec indexSpec ) throws IOException { return persist( - index, dataInterval, outDir, commitMetaData, indexSpec, new LoggingProgressIndicator(outDir.toString()) + index, dataInterval, outDir, segmentMetadata, indexSpec, new LoggingProgressIndicator(outDir.toString()) ); } @@ -147,7 +154,7 @@ public static File persist( final IncrementalIndex index, final Interval dataInterval, File outDir, - final Object commitMetaData, + final Map segmentMetadata, final IndexSpec indexSpec, ProgressIndicator progress ) throws IOException @@ -185,7 +192,7 @@ public static File persist( ), index.getMetricAggs(), outDir, - commitMetaData, + segmentMetadata, indexSpec, progress ); @@ -227,11 +234,11 @@ public IndexableAdapter apply(final QueryableIndex input) } public static File merge( - List adapters, final AggregatorFactory[] metricAggs, File outDir, final String commitMetaData, final IndexSpec indexSpec + List adapters, final AggregatorFactory[] metricAggs, File outDir, final IndexSpec indexSpec ) throws IOException { return merge( - adapters, metricAggs, outDir, commitMetaData, indexSpec, new LoggingProgressIndicator(outDir.toString()) + adapters, metricAggs, outDir, null, indexSpec, new LoggingProgressIndicator(outDir.toString()) ); } @@ -239,7 +246,7 @@ public static File merge( List adapters, final AggregatorFactory[] metricAggs, File outDir, - final Object commitMetaData, + final Map segmentMetaData, final IndexSpec indexSpec, ProgressIndicator progress ) throws IOException @@ -330,7 +337,7 @@ public Iterable apply( }; return makeIndexFiles( - adapters, outDir, progress, mergedDimensions, mergedMetrics, commitMetaData, rowMergerFn, indexSpec + adapters, outDir, progress, mergedDimensions, mergedMetrics, segmentMetaData, rowMergerFn, indexSpec ); } @@ -352,7 +359,7 @@ public static File convert( progress, Lists.newArrayList(adapter.getDimensionNames()), Lists.newArrayList(adapter.getMetricNames()), - adapter.getMetaData(), + null, new Function>, Iterable>() { @Nullable @@ -367,20 +374,18 @@ public Iterable apply(ArrayList> input) } } - public static File append( final List adapters, final File outDir, final IndexSpec indexSpec ) throws IOException { - return append(adapters, outDir, null, new LoggingProgressIndicator(outDir.toString()), indexSpec); + return append(adapters, outDir, new LoggingProgressIndicator(outDir.toString()), indexSpec); } public static File append( final List adapters, final File outDir, - final String commitMetaData, final ProgressIndicator progress, final IndexSpec indexSpec ) throws IOException @@ -451,7 +456,7 @@ public Iterable apply( } }; - return makeIndexFiles(adapters, outDir, progress, mergedDimensions, mergedMetrics, commitMetaData, rowMergerFn, indexSpec); + return makeIndexFiles(adapters, outDir, progress, mergedDimensions, mergedMetrics, null, rowMergerFn, indexSpec); } private static File makeIndexFiles( @@ -460,7 +465,7 @@ private static File makeIndexFiles( final ProgressIndicator progress, final List mergedDimensions, final List mergedMetrics, - final Object commitMetaData, + final Map segmentMetadata, final Function>, Iterable> rowMergerFn, final IndexSpec indexSpec ) throws IOException @@ -556,7 +561,7 @@ private static File makeIndexFiles( makeIndexBinary( v9Smoosher, adapters, outDir, mergedDimensions, mergedMetrics, skippedDimensions, progress, indexSpec ); - makeMetadataBinary(v9Smoosher, progress, commitMetaData); + makeMetadataBinary(v9Smoosher, progress, segmentMetadata); v9Smoosher.close(); @@ -1404,7 +1409,7 @@ public boolean apply(String input) + 16 + serializerUtils.getSerializedStringByteSize(bitmapSerdeFactoryType); - final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes); + final SmooshedWriter writer = v9Smoosher.addWithSmooshedWriter("index.drd", numBytes); cols.writeToChannel(writer); dims.writeToChannel(writer); @@ -1433,14 +1438,14 @@ public boolean apply(String input) private static void makeMetadataBinary( final FileSmoosher v9Smoosher, final ProgressIndicator progress, - final Object commitMetadata + final Map segmentMetadata ) throws IOException { - progress.startSection("metadata.drd"); - if (commitMetadata != null) { - v9Smoosher.add("metadata.drd", ByteBuffer.wrap(mapper.writeValueAsBytes(commitMetadata))); + if (segmentMetadata != null && !segmentMetadata.isEmpty()) { + progress.startSection("metadata.drd"); + v9Smoosher.add("metadata.drd", ByteBuffer.wrap(mapper.writeValueAsBytes(segmentMetadata))); + progress.stopSection("metadata.drd"); } - progress.stopSection("metadata.drd"); } private static void writeColumn( diff --git a/processing/src/main/java/io/druid/segment/IndexMerger.java b/processing/src/main/java/io/druid/segment/IndexMerger.java index 26eb29f60f57..56fdd4079fc0 100644 --- a/processing/src/main/java/io/druid/segment/IndexMerger.java +++ b/processing/src/main/java/io/druid/segment/IndexMerger.java @@ -1,19 +1,21 @@ /* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ package io.druid.segment; @@ -127,9 +129,14 @@ public void configure(Binder binder) } - public static File persist(final IncrementalIndex index, File outDir, IndexSpec indexSpec) throws IOException + public static File persist( + final IncrementalIndex index, + File outDir, + Map segmentMetadata, + IndexSpec indexSpec + ) throws IOException { - return persist(index, index.getInterval(), outDir, indexSpec); + return persist(index, index.getInterval(), outDir, segmentMetadata, indexSpec); } /** @@ -148,16 +155,18 @@ public static File persist( final IncrementalIndex index, final Interval dataInterval, File outDir, + Map segmentMetadata, IndexSpec indexSpec ) throws IOException { - return persist(index, dataInterval, outDir, indexSpec, new BaseProgressIndicator()); + return persist(index, dataInterval, outDir, segmentMetadata, indexSpec, new BaseProgressIndicator()); } public static File persist( final IncrementalIndex index, final Interval dataInterval, File outDir, + Map segmentMetadata, IndexSpec indexSpec, ProgressIndicator progress ) throws IOException @@ -195,6 +204,7 @@ public static File persist( ), index.getMetricAggs(), outDir, + segmentMetadata, indexSpec, progress ); @@ -229,22 +239,28 @@ public IndexableAdapter apply(final QueryableIndex input) ), metricAggs, outDir, + null, indexSpec, progress ); } public static File merge( - List indexes, final AggregatorFactory[] metricAggs, File outDir, IndexSpec indexSpec + List indexes, + final AggregatorFactory[] metricAggs, + File outDir, + Map segmentMetadata, + IndexSpec indexSpec ) throws IOException { - return merge(indexes, metricAggs, outDir, indexSpec, new BaseProgressIndicator()); + return merge(indexes, metricAggs, outDir, segmentMetadata, indexSpec, new BaseProgressIndicator()); } public static File merge( List indexes, final AggregatorFactory[] metricAggs, File outDir, + Map segmentMetadata, IndexSpec indexSpec, ProgressIndicator progress ) throws IOException @@ -333,7 +349,16 @@ public Iterable apply( } }; - return makeIndexFiles(indexes, outDir, progress, mergedDimensions, mergedMetrics, rowMergerFn, indexSpec); + return makeIndexFiles( + indexes, + outDir, + progress, + mergedDimensions, + mergedMetrics, + segmentMetadata, + rowMergerFn, + indexSpec + ); } // Faster than IndexMaker @@ -354,6 +379,7 @@ public static File convert( progress, Lists.newArrayList(adapter.getDimensionNames()), Lists.newArrayList(adapter.getMetricNames()), + null, new Function>, Iterable>() { @Nullable @@ -445,7 +471,7 @@ public Iterable apply( } }; - return makeIndexFiles(indexes, outDir, progress, mergedDimensions, mergedMetrics, rowMergerFn, indexSpec); + return makeIndexFiles(indexes, outDir, progress, mergedDimensions, mergedMetrics, null, rowMergerFn, indexSpec); } private static File makeIndexFiles( @@ -454,6 +480,7 @@ private static File makeIndexFiles( final ProgressIndicator progress, final List mergedDimensions, final List mergedMetrics, + final Map segmentMetadata, final Function>, Iterable> rowMergerFn, final IndexSpec indexSpec ) throws IOException @@ -900,6 +927,13 @@ public Rowboat apply(@Nullable Rowboat input) ) ); + if (segmentMetadata != null && !segmentMetadata.isEmpty()) { + writeMetadataToFile( new File(v8OutDir, "metadata.drd"), segmentMetadata); + log.info("wrote metadata.drd in outDir[%s].", v8OutDir); + + expectedFiles.add("metadata.drd"); + } + Map files = Maps.newLinkedHashMap(); for (String fileName : expectedFiles) { files.put(fileName, new File(v8OutDir, fileName)); @@ -1273,4 +1307,31 @@ static boolean isNullColumn(Iterable dimValues) } return true; } + + private static void writeMetadataToFile(File metadataFile, Map metadata) throws IOException + { + FileOutputStream metadataFileOutputStream = null; + FileChannel metadataFilechannel = null; + try { + metadataFileOutputStream = new FileOutputStream(metadataFile); + metadataFilechannel = metadataFileOutputStream.getChannel(); + + byte[] metadataBytes = mapper.writeValueAsBytes(metadata); + if (metadataBytes.length != metadataFilechannel.write(ByteBuffer.wrap(metadataBytes))) { + throw new IOException("Failed to write metadata for file"); + } + } + finally { + if (metadataFilechannel != null) { + metadataFilechannel.close(); + metadataFilechannel = null; + } + + if (metadataFileOutputStream != null) { + metadataFileOutputStream.close(); + metadataFileOutputStream = null; + } + } + IndexIO.checkFileSize(metadataFile); + } } diff --git a/processing/src/main/java/io/druid/segment/IndexableAdapter.java b/processing/src/main/java/io/druid/segment/IndexableAdapter.java index bf13dec6e857..3c09d7a31559 100644 --- a/processing/src/main/java/io/druid/segment/IndexableAdapter.java +++ b/processing/src/main/java/io/druid/segment/IndexableAdapter.java @@ -44,6 +44,4 @@ public interface IndexableAdapter String getMetricType(String metric); ColumnCapabilities getCapabilities(String column); - - Object getMetaData(); } diff --git a/processing/src/main/java/io/druid/segment/QueryableIndex.java b/processing/src/main/java/io/druid/segment/QueryableIndex.java index d68c1565edf4..77a508e12d98 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndex.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndex.java @@ -1,19 +1,21 @@ /* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ package io.druid.segment; @@ -23,6 +25,7 @@ import java.io.Closeable; import java.io.IOException; +import java.util.Map; /** */ @@ -33,7 +36,7 @@ public interface QueryableIndex extends ColumnSelector, Closeable public Indexed getColumnNames(); public Indexed getAvailableDimensions(); public BitmapFactory getBitmapFactoryForDimensions(); - public Object getMetaData(); + public Map getMetaData(); /** * The close method shouldn't actually be here as this is nasty. We will adjust it in the future. diff --git a/processing/src/main/java/io/druid/segment/QueryableIndexIndexableAdapter.java b/processing/src/main/java/io/druid/segment/QueryableIndexIndexableAdapter.java index c6ec649913d7..5d64edea45bb 100644 --- a/processing/src/main/java/io/druid/segment/QueryableIndexIndexableAdapter.java +++ b/processing/src/main/java/io/druid/segment/QueryableIndexIndexableAdapter.java @@ -1,19 +1,21 @@ /* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ package io.druid.segment; @@ -42,7 +44,6 @@ import io.druid.segment.data.ListIndexed; import org.joda.time.Interval; -import javax.annotation.Nullable; import java.io.Closeable; import java.util.HashSet; import java.util.Iterator; @@ -81,11 +82,6 @@ public QueryableIndexIndexableAdapter(QueryableIndex input) } } - @Override - public Object getMetaData() { - return input.getMetaData(); - } - @Override public Interval getDataInterval() { diff --git a/processing/src/main/java/io/druid/segment/RowboatFilteringIndexAdapter.java b/processing/src/main/java/io/druid/segment/RowboatFilteringIndexAdapter.java index e81106fdf891..bc4059a836f0 100644 --- a/processing/src/main/java/io/druid/segment/RowboatFilteringIndexAdapter.java +++ b/processing/src/main/java/io/druid/segment/RowboatFilteringIndexAdapter.java @@ -1,19 +1,21 @@ /* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ package io.druid.segment; @@ -37,12 +39,6 @@ public RowboatFilteringIndexAdapter(IndexableAdapter baseAdapter, Predicate columns; private final SmooshedFileMapper fileMapper; - private final Object commitMetaData; + private final Map metadata; public SimpleQueryableIndex( Interval dataInterval, @@ -46,7 +48,7 @@ public SimpleQueryableIndex( BitmapFactory bitmapFactory, Map columns, SmooshedFileMapper fileMapper, - Object commitMetaData + Map metadata ) { Preconditions.checkNotNull(columns.get(Column.TIME_COLUMN_NAME)); @@ -56,7 +58,7 @@ public SimpleQueryableIndex( this.bitmapFactory = bitmapFactory; this.columns = columns; this.fileMapper = fileMapper; - this.commitMetaData = commitMetaData; + this.metadata = metadata; } @Override @@ -101,9 +103,9 @@ public void close() throws IOException fileMapper.close(); } - @Override - public Object getMetaData() + @Override + public Map getMetaData() { - return commitMetaData; + return metadata; } } diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java index 5d09fdb41058..d593a68f1dc1 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAdapter.java @@ -1,19 +1,21 @@ /* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ package io.druid.segment.incremental; @@ -98,12 +100,6 @@ public IncrementalIndexAdapter( } } - @Override - public Object getMetaData() - { - return null; - } - @Override public Interval getDataInterval() { diff --git a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java index afd9d9110364..7f5649c070ae 100644 --- a/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java +++ b/processing/src/test/java/io/druid/query/aggregation/AggregationTestHelper.java @@ -249,7 +249,7 @@ public void createIndex( index.add(parser.parse(row)); } } - IndexMerger.persist(index, outDir, new IndexSpec()); + IndexMerger.persist(index, outDir, null, new IndexSpec()); } } diff --git a/processing/src/test/java/io/druid/segment/EmptyIndexTest.java b/processing/src/test/java/io/druid/segment/EmptyIndexTest.java index 2c3d99777854..b244cdf42336 100644 --- a/processing/src/test/java/io/druid/segment/EmptyIndexTest.java +++ b/processing/src/test/java/io/druid/segment/EmptyIndexTest.java @@ -61,6 +61,7 @@ public void testEmptyIndex() throws Exception Lists.newArrayList(emptyIndexAdapter), new AggregatorFactory[0], tmpDir, + null, new IndexSpec() ); diff --git a/processing/src/test/java/io/druid/segment/IndexMakerParameterizedTest.java b/processing/src/test/java/io/druid/segment/IndexMakerParameterizedTest.java index bf4b350ff362..5486f763158b 100644 --- a/processing/src/test/java/io/druid/segment/IndexMakerParameterizedTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMakerParameterizedTest.java @@ -128,7 +128,16 @@ public void testPersist() throws Exception IncrementalIndexTest.populateIndex(timestamp, toPersist); final File tempDir = temporaryFolder.newFolder(); - QueryableIndex index = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist, tempDir, null, indexSpec))); + QueryableIndex index = closer.closeLater( + IndexIO.loadIndex( + IndexMaker.persist( + toPersist, + tempDir, + null, + indexSpec + ) + ) + ); Assert.assertEquals(2, index.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions())); @@ -171,13 +180,31 @@ public void testPersistMerge() throws Exception final File tempDir2 = temporaryFolder.newFolder(); final File mergedDir = temporaryFolder.newFolder(); - QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, null, indexSpec))); + QueryableIndex index1 = closer.closeLater( + IndexIO.loadIndex( + IndexMaker.persist( + toPersist1, + tempDir1, + null, + indexSpec + ) + ) + ); Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); Assert.assertEquals(3, index1.getColumnNames().size()); - QueryableIndex index2 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist2, tempDir2, null, indexSpec))); + QueryableIndex index2 = closer.closeLater( + IndexIO.loadIndex( + IndexMaker.persist( + toPersist2, + tempDir2, + null, + indexSpec + ) + ) + ); Assert.assertEquals(2, index2.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index2.getAvailableDimensions())); @@ -293,7 +320,16 @@ public void testMergeRetainsValues() throws Exception .getBitmapFactory() ); - QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, null, indexSpec))); + QueryableIndex index1 = closer.closeLater( + IndexIO.loadIndex( + IndexMaker.persist( + toPersist1, + tempDir1, + null, + indexSpec + ) + ) + ); final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); @@ -396,7 +432,16 @@ public void testMergeSpecChange() throws Exception .getBitmapFactory() ); - QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, null, indexSpec))); + QueryableIndex index1 = closer.closeLater( + IndexIO.loadIndex( + IndexMaker.persist( + toPersist1, + tempDir1, + null, + indexSpec + ) + ) + ); final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); @@ -453,7 +498,16 @@ public void testConvertSame() throws Exception .getBitmapFactory() ); - QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, null, indexSpec))); + QueryableIndex index1 = closer.closeLater( + IndexIO.loadIndex( + IndexMaker.persist( + toPersist1, + tempDir1, + null, + indexSpec + ) + ) + ); final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); @@ -502,7 +556,16 @@ public void testConvertDifferent() throws Exception .getBitmapFactory() ); - QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMaker.persist(toPersist1, tempDir1, null, indexSpec))); + QueryableIndex index1 = closer.closeLater( + IndexIO.loadIndex( + IndexMaker.persist( + toPersist1, + tempDir1, + null, + indexSpec + ) + ) + ); final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); diff --git a/processing/src/test/java/io/druid/segment/IndexMakerTest.java b/processing/src/test/java/io/druid/segment/IndexMakerTest.java index 5da3098f105b..e9bccd2eefa9 100644 --- a/processing/src/test/java/io/druid/segment/IndexMakerTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMakerTest.java @@ -170,7 +170,7 @@ public void setUp() throws IOException } tmpDir = Files.createTempDir(); persistTmpDir = new File(tmpDir, "persistDir"); - IndexMerger.persist(toPersist, persistTmpDir, INDEX_SPEC); + IndexMerger.persist(toPersist, persistTmpDir, null, INDEX_SPEC); } @After @@ -179,10 +179,40 @@ public void tearDown() throws IOException FileUtils.deleteDirectory(tmpDir); } + @Test + public void testPersistWithSegmentMetadata() throws IOException + { + File outDir = Files.createTempDir(); + QueryableIndex index = null; + try { + outDir = Files.createTempDir(); + Map segmentMetadata = ImmutableMap.of("key", "value"); + index = IndexIO.loadIndex(IndexMaker.persist(toPersist, outDir, segmentMetadata, INDEX_SPEC)); + + Assert.assertEquals(segmentMetadata, index.getMetaData()); + } + finally { + if (index != null) { + index.close(); + ; + } + + if (outDir != null) { + FileUtils.deleteDirectory(outDir); + } + } + } + @Test public void testSimpleReprocess() throws IOException { - final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(closer.closeLater(IndexIO.loadIndex(persistTmpDir))); + final IndexableAdapter adapter = new QueryableIndexIndexableAdapter( + closer.closeLater( + IndexIO.loadIndex( + persistTmpDir + ) + ) + ); Assert.assertEquals(events.size(), adapter.getNumRows()); reprocessAndValidate(persistTmpDir, new File(tmpDir, "reprocessed")); } @@ -212,7 +242,13 @@ private File appendAndValidate(File inDir, File tmpDir) throws IOException @Test public void testIdempotentReprocess() throws IOException { - final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(closer.closeLater(IndexIO.loadIndex(persistTmpDir))); + final IndexableAdapter adapter = new QueryableIndexIndexableAdapter( + closer.closeLater( + IndexIO.loadIndex( + persistTmpDir + ) + ) + ); Assert.assertEquals(events.size(), adapter.getNumRows()); final File tmpDir1 = new File(tmpDir, "reprocessed1"); reprocessAndValidate(persistTmpDir, tmpDir1); @@ -231,7 +267,13 @@ public void testIdempotentReprocess() throws IOException @Test public void testSimpleAppend() throws IOException { - final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(closer.closeLater(IndexIO.loadIndex(persistTmpDir))); + final IndexableAdapter adapter = new QueryableIndexIndexableAdapter( + closer.closeLater( + IndexIO.loadIndex( + persistTmpDir + ) + ) + ); Assert.assertEquals(events.size(), adapter.getNumRows()); appendAndValidate(persistTmpDir, new File(tmpDir, "reprocessed")); } @@ -239,7 +281,13 @@ public void testSimpleAppend() throws IOException @Test public void testIdempotentAppend() throws IOException { - final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(closer.closeLater(IndexIO.loadIndex(persistTmpDir))); + final IndexableAdapter adapter = new QueryableIndexIndexableAdapter( + closer.closeLater( + IndexIO.loadIndex( + persistTmpDir + ) + ) + ); Assert.assertEquals(events.size(), adapter.getNumRows()); final File tmpDir1 = new File(tmpDir, "reprocessed1"); appendAndValidate(persistTmpDir, tmpDir1); diff --git a/processing/src/test/java/io/druid/segment/IndexMergerTest.java b/processing/src/test/java/io/druid/segment/IndexMergerTest.java index 4b246187dcb2..1f48426317ab 100644 --- a/processing/src/test/java/io/druid/segment/IndexMergerTest.java +++ b/processing/src/test/java/io/druid/segment/IndexMergerTest.java @@ -1,19 +1,21 @@ /* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ package io.druid.segment; @@ -52,6 +54,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.Map; @RunWith(Parameterized.class) public class IndexMergerTest @@ -126,13 +129,53 @@ public void testPersist() throws Exception IncrementalIndexTest.populateIndex(timestamp, toPersist); final File tempDir = temporaryFolder.newFolder(); - QueryableIndex index = closer.closeLater(IndexIO.loadIndex(IndexMerger.persist(toPersist, tempDir, indexSpec))); + QueryableIndex index = closer.closeLater( + IndexIO.loadIndex( + IndexMerger.persist( + toPersist, + tempDir, + null, + indexSpec + ) + ) + ); + + Assert.assertEquals(2, index.getColumn(Column.TIME_COLUMN_NAME).getLength()); + Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions())); + Assert.assertEquals(3, index.getColumnNames().size()); + + assertDimCompression(index, indexSpec.getDimensionCompressionStrategy()); + } + + @Test + public void testPersistWithSegmentMetadata() throws Exception + { + final long timestamp = System.currentTimeMillis(); + + IncrementalIndex toPersist = IncrementalIndexTest.createIndex(true, null); + IncrementalIndexTest.populateIndex(timestamp, toPersist); + + Map segmentMetadata = ImmutableMap.of("key", "value"); + + final File tempDir = temporaryFolder.newFolder(); + QueryableIndex index = closer.closeLater( + IndexIO.loadIndex( + IndexMerger.persist( + toPersist, + tempDir, + segmentMetadata, + indexSpec + ) + ) + ); Assert.assertEquals(2, index.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index.getAvailableDimensions())); Assert.assertEquals(3, index.getColumnNames().size()); assertDimCompression(index, indexSpec.getDimensionCompressionStrategy()); + + Assert.assertEquals(segmentMetadata, index.getMetaData()); } @Test @@ -169,13 +212,31 @@ public void testPersistMerge() throws Exception final File tempDir2 = temporaryFolder.newFolder(); final File mergedDir = temporaryFolder.newFolder(); - QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec))); + QueryableIndex index1 = closer.closeLater( + IndexIO.loadIndex( + IndexMerger.persist( + toPersist1, + tempDir1, + null, + indexSpec + ) + ) + ); Assert.assertEquals(2, index1.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index1.getAvailableDimensions())); Assert.assertEquals(3, index1.getColumnNames().size()); - QueryableIndex index2 = closer.closeLater(IndexIO.loadIndex(IndexMerger.persist(toPersist2, tempDir2, indexSpec))); + QueryableIndex index2 = closer.closeLater( + IndexIO.loadIndex( + IndexMerger.persist( + toPersist2, + tempDir2, + null, + indexSpec + ) + ) + ); Assert.assertEquals(2, index2.getColumn(Column.TIME_COLUMN_NAME).getLength()); Assert.assertEquals(Arrays.asList("dim1", "dim2"), Lists.newArrayList(index2.getAvailableDimensions())); @@ -240,6 +301,7 @@ public void testPersistEmptyColumn() throws Exception IndexMerger.persist( toPersist1, tmpDir1, + null, indexSpec ) ) @@ -249,6 +311,7 @@ public void testPersistEmptyColumn() throws Exception IndexMerger.persist( toPersist1, tmpDir2, + null, indexSpec ) ) @@ -294,7 +357,16 @@ public void testMergeRetainsValues() throws Exception .getBitmapFactory() ); - QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec))); + QueryableIndex index1 = closer.closeLater( + IndexIO.loadIndex( + IndexMerger.persist( + toPersist1, + tempDir1, + null, + indexSpec + ) + ) + ); final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); @@ -397,7 +469,16 @@ public void testMergeSpecChange() throws Exception .getBitmapFactory() ); - QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec))); + QueryableIndex index1 = closer.closeLater( + IndexIO.loadIndex( + IndexMerger.persist( + toPersist1, + tempDir1, + null, + indexSpec + ) + ) + ); final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); @@ -464,7 +545,7 @@ public void testConvertSame() throws Exception ); QueryableIndex index1 = closer.closeLater( - IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec)) + IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, null, indexSpec)) ); final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); @@ -521,7 +602,16 @@ public void testConvertDifferent() throws Exception .getBitmapFactory() ); - QueryableIndex index1 = closer.closeLater(IndexIO.loadIndex(IndexMerger.persist(toPersist1, tempDir1, indexSpec))); + QueryableIndex index1 = closer.closeLater( + IndexIO.loadIndex( + IndexMerger.persist( + toPersist1, + tempDir1, + null, + indexSpec + ) + ) + ); final IndexableAdapter queryableAdapter = new QueryableIndexIndexableAdapter(index1); diff --git a/processing/src/test/java/io/druid/segment/SchemalessIndex.java b/processing/src/test/java/io/druid/segment/SchemalessIndex.java index 762201914f01..0ef7d5582787 100644 --- a/processing/src/test/java/io/druid/segment/SchemalessIndex.java +++ b/processing/src/test/java/io/druid/segment/SchemalessIndex.java @@ -1,19 +1,21 @@ /* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ package io.druid.segment; @@ -27,7 +29,6 @@ import com.google.common.collect.Ordering; import com.google.common.hash.Hashing; import com.metamx.common.Pair; -import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.logger.Logger; import io.druid.data.input.MapBasedInputRow; import io.druid.granularity.QueryGranularity; @@ -149,7 +150,8 @@ public static QueryableIndex getIncrementalIndex(int index1, int index2) try { theIndex.add(new MapBasedInputRow(timestamp, dims, event)); - } catch(IndexSizeExceededException e) { + } + catch (IndexSizeExceededException e) { Throwables.propagate(e); } @@ -186,12 +188,15 @@ public static QueryableIndex getMergedIncrementalIndex() mergedFile.mkdirs(); mergedFile.deleteOnExit(); - IndexMerger.persist(top, topFile, indexSpec); - IndexMerger.persist(bottom, bottomFile, indexSpec); + IndexMerger.persist(top, topFile, null, indexSpec); + IndexMerger.persist(bottom, bottomFile, null, indexSpec); mergedIndex = io.druid.segment.IndexIO.loadIndex( IndexMerger.mergeQueryableIndex( - Arrays.asList(IndexIO.loadIndex(topFile), IndexIO.loadIndex(bottomFile)), METRIC_AGGS, mergedFile, indexSpec + Arrays.asList(IndexIO.loadIndex(topFile), IndexIO.loadIndex(bottomFile)), + METRIC_AGGS, + mergedFile, + indexSpec ) ); @@ -233,7 +238,10 @@ public static QueryableIndex getMergedIncrementalIndex(int index1, int index2) QueryableIndex index = IndexIO.loadIndex( IndexMerger.mergeQueryableIndex( - Arrays.asList(rowPersistedIndexes.get(index1), rowPersistedIndexes.get(index2)), METRIC_AGGS, mergedFile, indexSpec + Arrays.asList(rowPersistedIndexes.get(index1), rowPersistedIndexes.get(index2)), + METRIC_AGGS, + mergedFile, + indexSpec ) ); @@ -350,7 +358,7 @@ private static void makeRowPersistedIndexes() tmpFile.mkdirs(); tmpFile.deleteOnExit(); - IndexMerger.persist(rowIndex, tmpFile, indexSpec); + IndexMerger.persist(rowIndex, tmpFile, null, indexSpec); rowPersistedIndexes.add(IndexIO.loadIndex(tmpFile)); } } @@ -410,7 +418,7 @@ private static List makeFilesToMap(File tmpFile, Iterable * This is a metaphor for a realtime stream (Firehose) and a coordinator of sinks (Plumber). The Firehose provides the * realtime stream of data. The Plumber directs each drop of water from the firehose into the correct sink and makes * sure that the sinks don't overflow. diff --git a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java index af42ad7fb21f..79ff660947e5 100644 --- a/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java +++ b/server/src/main/java/io/druid/segment/realtime/RealtimeManager.java @@ -1,19 +1,21 @@ /* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ package io.druid.segment.realtime; @@ -158,7 +160,8 @@ public QueryRunner apply(String input) Iterable chiefsOfDataSource = chiefs.get(input); return chiefsOfDataSource == null ? new NoopQueryRunner() : factory.getToolchest().mergeResults( factory.mergeRunners( - MoreExecutors.sameThreadExecutor(), // Chaining query runners which wait on submitted chain query runners can make executor pools deadlock + MoreExecutors.sameThreadExecutor(), + // Chaining query runners which wait on submitted chain query runners can make executor pools deadlock Iterables.transform( chiefsOfDataSource, new Function>() { @@ -217,10 +220,10 @@ public Firehose initFirehose() } } - public FirehoseV2 initFirehose(Object metaData) + public FirehoseV2 initFirehoseV2(Object metaData) { synchronized (this) { - if (firehose == null && firehoseV2 == null) { + if (firehoseV2 == null) { try { log.info("Calling the FireDepartment and getting a FirehoseV2."); firehoseV2 = fireDepartment.connect(metaData); @@ -230,7 +233,7 @@ public FirehoseV2 initFirehose(Object metaData) throw Throwables.propagate(e); } } else { - log.warn("Firehose already connected, skipping initFirehoseV2()."); + log.warn("FirehoseV2 already connected, skipping initFirehoseV2()."); } return firehoseV2; @@ -266,7 +269,7 @@ public void run() Object metadata = plumber.startJob(); if (fireDepartment.checkFirehoseV2()) { - firehoseV2 = initFirehose(metadata); + firehoseV2 = initFirehoseV2(metadata); runFirehoseV2(firehoseV2); } else { firehose = initFirehose(); @@ -306,8 +309,8 @@ private void runFirehoseV2(FirehoseV2 firehose) firehose.start(); } catch (Exception e) { - log.error(e, "Failed to start firehoseV2"); - return; + log.error(e, "Failed to start firehoseV2"); + return; } long nextFlush = new DateTime().plus(intermediatePersistPeriod).getMillis(); log.info("FirehoseV2 started with nextFlush [%s]", nextFlush); @@ -322,7 +325,7 @@ private void runFirehoseV2(FirehoseV2 firehose) numRows = plumber.add(inputRow); } catch (IndexSizeExceededException e) { - log.info("Index limit exceeded: %s", e.getMessage()); + log.debug(e, "Index limit exceeded: %s", e.getMessage()); nextFlush = doIncrementalPersist(firehose.makeCommitter(), intermediatePersistPeriod); continue; } @@ -332,10 +335,13 @@ private void runFirehoseV2(FirehoseV2 firehose) } else { metrics.incrementProcessed(); } + } else { + log.debug("thrown away null input row, considering unparseable"); + metrics.incrementUnparseable(); } } catch (Exception e) { - log.makeAlert(e, "Some exception got thrown while processing rows. Ignoring and continuing.") + log.makeAlert(e, "Unknown exception, Ignoring and continuing.") .addData("inputRow", inputRow); } @@ -343,18 +349,22 @@ private void runFirehoseV2(FirehoseV2 firehose) haveRow = firehose.advance(); } catch (Exception e) { - log.debug(e, "thrown away line due to exception, considering unparseable"); + log.debug(e, "exception in firehose.advance(), considering unparseable row"); metrics.incrementUnparseable(); continue; } try { - final Sink sink = plumber.getSink(inputRow.getTimestampFromEpoch()); + final Sink sink = inputRow != null ? plumber.getSink(inputRow.getTimestampFromEpoch()) : null; if ((sink != null && !sink.canAppendRow()) || System.currentTimeMillis() > nextFlush) { nextFlush = doIncrementalPersist(firehose.makeCommitter(), intermediatePersistPeriod); } - } catch (Exception e) { - log.makeAlert(e, "An exception happened while queue to persist!? We hope it is transient. Ignore and continue."); + } + catch (Exception e) { + log.makeAlert( + e, + "An exception happened while queue to persist!? We hope it is transient. Ignore and continue." + ); } } } @@ -365,7 +375,6 @@ private long doIncrementalPersist(Committer committer, Period intermediatePersis return new DateTime().plus(intermediatePersistPeriod).getMillis(); } - private void runFirehose(Firehose firehose) { @@ -381,7 +390,6 @@ private void runFirehose(Firehose firehose) if (inputRow == null) { log.debug("thrown away null input row, considering unparseable"); - log.info("thrown away null input row, considering unparseable"); metrics.incrementUnparseable(); continue; } diff --git a/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactoryV2.java b/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactoryV2.java deleted file mode 100644 index eb7aa59189dc..000000000000 --- a/server/src/main/java/io/druid/segment/realtime/firehose/LocalFirehoseFactoryV2.java +++ /dev/null @@ -1,229 +0,0 @@ -/* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.druid.segment.realtime.firehose; - -import com.fasterxml.jackson.annotation.JsonCreator; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.Throwables; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.metamx.common.ISE; -import com.metamx.common.parsers.ParseException; -import com.metamx.emitter.EmittingLogger; - -import io.druid.data.input.Committer; -import io.druid.data.input.FirehoseFactoryV2; -import io.druid.data.input.FirehoseV2; -import io.druid.data.input.InputRow; -import io.druid.data.input.impl.StringInputRowParser; -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.LineIterator; -import org.apache.commons.io.filefilter.TrueFileFilter; -import org.apache.commons.io.filefilter.WildcardFileFilter; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.Map; -import java.util.concurrent.ConcurrentMap; - -/** - */ -public class LocalFirehoseFactoryV2 implements FirehoseFactoryV2 -{ - private static final EmittingLogger log = new EmittingLogger(LocalFirehoseFactory.class); - - private final File baseDir; - private final String filter; - private final StringInputRowParser parser; - - @JsonCreator - public LocalFirehoseFactoryV2( - @JsonProperty("baseDir") File baseDir, - @JsonProperty("filter") String filter, - // Backwards compatible - @JsonProperty("parser") StringInputRowParser parser - ) - { - this.baseDir = baseDir; - this.filter = filter; - this.parser = parser; - } - - @JsonProperty - public File getBaseDir() - { - return baseDir; - } - - @JsonProperty - public String getFilter() - { - return filter; - } - - @JsonProperty - public StringInputRowParser getParser() - { - return parser; - } - - @Override - public FirehoseV2 connect(StringInputRowParser firehoseParser, Object metadata) throws IOException, ParseException - { - log.info("Searching for all [%s] in and beneath [%s]", filter, baseDir.getAbsoluteFile()); - - Collection foundFiles = FileUtils.listFiles( - baseDir.getAbsoluteFile(), - new WildcardFileFilter(filter), - TrueFileFilter.INSTANCE - ); - - if (foundFiles == null || foundFiles.isEmpty()) { - throw new ISE("Found no files to ingest! Check your schema."); - } - log.info ("Found files: " + foundFiles); - - final LinkedList files = Lists.newLinkedList( - foundFiles - ); - return new FileIteratingFirehoseV2(new Iterator() - { - @Override - public boolean hasNext() - { - return !files.isEmpty(); - } - - @Override - public LineIterator next() - { - try { - return FileUtils.lineIterator(files.poll()); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - - @Override - public void remove() - { - throw new UnsupportedOperationException(); - } - }, firehoseParser); - - - - } - class FileIteratingFirehoseV2 implements FirehoseV2 { - private ConcurrentMap lastOffsetPartitions; - private volatile boolean stop; - - private volatile InputRow row = null; - - private final Iterator lineIterators; - private final StringInputRowParser parser; - - private LineIterator lineIterator = null; - - public FileIteratingFirehoseV2( - Iterator lineIterators, - StringInputRowParser parser - ) - { - this.lineIterators = lineIterators; - this.parser = parser; - } - @Override - public void close() throws IOException - { - stop = true; - } - - @Override - public boolean advance() - { - if (stop) { - return false; - } - - nextMessage(); - return true; - } - - @Override - public InputRow currRow() - { - return row; - } - - @Override - public Committer makeCommitter() - { - final Map offsets = Maps.newHashMap(lastOffsetPartitions);//TODO no test on offset - - return new Committer() - { - @Override - public Object getMetadata() - { - return offsets; - } - - @Override - public void run() - { - - } - }; - } - - @Override - public void start() throws Exception - { - nextMessage(); - } - private void nextMessage() - { - while ((lineIterator == null || !lineIterator.hasNext()) && lineIterators.hasNext()) { - lineIterator = lineIterators.next(); - } - - stop = !(lineIterator != null && lineIterator.hasNext()); - try { - if (lineIterator == null || !lineIterator.hasNext()) { - // Close old streams, maybe. - if (lineIterator != null) { - lineIterator.close(); - } - - lineIterator = lineIterators.next(); - } - - row = parser.parse((String)lineIterator.next());//parser.parse(lineIterator.next());TODO - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } - }; -} diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index 983a5348858e..aef5cef80fa7 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -1,19 +1,21 @@ /* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ package io.druid.segment.realtime.plumber; @@ -22,6 +24,7 @@ import com.google.common.base.Predicate; import com.google.common.base.Stopwatch; import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -41,8 +44,6 @@ import io.druid.common.guava.ThreadRenamingRunnable; import io.druid.concurrent.Execs; import io.druid.data.input.Committer; -import io.druid.data.input.Firehose; -import io.druid.data.input.FirehoseV2; import io.druid.data.input.InputRow; import io.druid.query.MetricsEmittingQueryRunner; import io.druid.query.Query; @@ -85,9 +86,6 @@ import java.io.File; import java.io.FilenameFilter; import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.nio.file.attribute.BasicFileAttributes; import java.util.Arrays; import java.util.Comparator; import java.util.List; @@ -127,6 +125,9 @@ public class RealtimePlumber implements Plumber private volatile ExecutorService mergeExecutor = null; private volatile ScheduledExecutorService scheduledExecutor = null; + private static final String COMMIT_METADATA_KEY = "%commitMetadata%"; + private static final String COMMIT_METADATA_TIMESTAMP_KEY = "%commitMetadataTimestamp%"; + public RealtimePlumber( DataSchema schema, RealtimeTuningConfig config, @@ -359,7 +360,7 @@ public void run() } @Override - public void persist(final Committer commitRunnable) + public void persist(final Committer committer) { final List> indexesToPersist = Lists.newArrayList(); for (Sink sink : sinks.values()) { @@ -372,21 +373,55 @@ public void persist(final Committer commitRunnable) final Stopwatch runExecStopwatch = Stopwatch.createStarted(); final Stopwatch persistStopwatch = Stopwatch.createStarted(); + + final Map metadata = committer.getMetadata() == null ? null : + ImmutableMap.of( + COMMIT_METADATA_KEY, + committer.getMetadata(), + COMMIT_METADATA_TIMESTAMP_KEY, + System.currentTimeMillis() + ); + persistExecutor.execute( new ThreadRenamingRunnable(String.format("%s-incremental-persist", schema.getDataSource())) { @Override public void doRun() { + /* Note: + If plumber crashes after storing a subset of all the hydrants then we will lose data and next + time we will start with the commitMetadata stored in those hydrants. + option#1: + maybe it makes sense to store the metadata outside the segments in a separate file. This is because the + commit metadata isn't really associated with an individual segment-- it's associated with a set of segments + that are persisted at the same time or maybe whole datasource. So storing it in the segments is asking for problems. + Sort of like this: + + { + "metadata" : {"foo": "bar"}, + "segments": [ + {"id": "datasource_2000_2001_2000_1", "hydrant": 10}, + {"id": "datasource_2001_2002_2001_1", "hydrant": 12}, + ] + } + When a realtime node crashes and starts back up, it would delete any hydrants numbered higher than the + ones in the commit file. + + option#2 + We could also just include the set of segments for the same chunk of metadata in more metadata on each + of the segments. we might also have to think about the hand-off in terms of the full set of segments being + handed off instead of individual segments being handed off (that is, if one of the set succeeds in handing + off and the others fail, the real-time would believe that it needs to re-ingest the data). + */ try { for (Pair pair : indexesToPersist) { metrics.incrementRowOutputCount( persistHydrant( - pair.lhs, schema, pair.rhs, commitRunnable.getMetadata() + pair.lhs, schema, pair.rhs, metadata ) ); } - commitRunnable.run(); + committer.run(); } catch (Exception e) { metrics.incrementFailedPersists(); @@ -445,6 +480,11 @@ public void doRun() return; } + /* + Note: it the plumber crashes after persisting a subset of hydrants then might duplicate data as these + hydrants will be read but older commitMetadata will be used. fixing this possibly needs structural + changes to plumber. + */ for (FireHydrant hydrant : sink) { synchronized (hydrant) { if (!hydrant.hasSwapped()) { @@ -483,26 +523,19 @@ public void doRun() QueryableIndex index = IndexIO.loadIndex(mergedFile); log.info("Pushing [%s] to deep storage", sink.getSegment().getIdentifier()); - try { - DataSegment segment = dataSegmentPusher.push( - mergedFile, - sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions())) - ); - log.info("Inserting [%s] to the metadata store", sink.getSegment().getIdentifier()); - segmentPublisher.publishSegment(segment); - - if (!isPushedMarker.createNewFile()) { - log.makeAlert("Failed to create marker file for [%s]", schema.getDataSource()) - .addData("interval", sink.getInterval()) - .addData("partitionNum", segment.getShardSpec().getPartitionNum()) - .addData("marker", isPushedMarker) - .emit(); - } - } - catch (Throwable e) { - log.info("Exception happen when pushing to deep storage"); - log.makeAlert(e, "Failed to persist merged index[%s]", schema.getDataSource()) - .addData("interval", interval) + + DataSegment segment = dataSegmentPusher.push( + mergedFile, + sink.getSegment().withDimensions(Lists.newArrayList(index.getAvailableDimensions())) + ); + log.info("Inserting [%s] to the metadata store", sink.getSegment().getIdentifier()); + segmentPublisher.publishSegment(segment); + + if (!isPushedMarker.createNewFile()) { + log.makeAlert("Failed to create marker file for [%s]", schema.getDataSource()) + .addData("interval", sink.getInterval()) + .addData("partitionNum", segment.getShardSpec().getPartitionNum()) + .addData("marker", isPushedMarker) .emit(); } } @@ -683,16 +716,24 @@ public int compare(File o1, File o2) catch (Exception e1) { log.error(e1, "Failed to rename %s", segmentDir.getAbsolutePath()); } + //Note: skipping corrupted segment might lead to dropping some data. This strategy should be changed + //at some point. continue; } - BasicFileAttributes attr = Files.readAttributes(segmentDir.toPath(), BasicFileAttributes.class); - if (attr.creationTime().toMillis() > latestCommitTime) { - log.info( - "Found metaData [%s] with latestCommitTime [%s] greater than previous recorded [%s]", - queryableIndex.getMetaData(), attr.creationTime().toMillis(), latestCommitTime - ); - latestCommitTime = attr.creationTime().toMillis(); - metadata = queryableIndex.getMetaData(); + Map segmentMetadata = queryableIndex.getMetaData(); + if (segmentMetadata != null) { + Object timestampObj = segmentMetadata.get(COMMIT_METADATA_TIMESTAMP_KEY); + if (timestampObj != null) { + long timestamp = ((Long) timestampObj).longValue(); + if (timestamp > latestCommitTime) { + log.info( + "Found metaData [%s] with latestCommitTime [%s] greater than previous recorded [%s]", + queryableIndex.getMetaData(), timestamp, latestCommitTime + ); + latestCommitTime = timestamp; + metadata = queryableIndex.getMetaData().get(COMMIT_METADATA_KEY); + } + } } hydrants.add( new FireHydrant( @@ -870,9 +911,12 @@ protected File computeBaseDir(DataSchema schema) protected File computeCorruptedFileDumpDir(File persistDir, DataSchema schema) { - return new File(persistDir.getAbsolutePath().replace(schema.getDataSource(), "corrupted/"+schema.getDataSource())); + return new File( + persistDir.getAbsolutePath() + .replace(schema.getDataSource(), "corrupted" + File.pathSeparator + schema.getDataSource()) + ); } - + protected File computePersistDir(DataSchema schema, Interval interval) { return new File(computeBaseDir(schema), interval.toString().replace("/", "_")); @@ -887,7 +931,12 @@ protected File computePersistDir(DataSchema schema, Interval interval) * * @return the number of rows persisted */ - protected int persistHydrant(FireHydrant indexToPersist, DataSchema schema, Interval interval, Object commitMetaData) + protected int persistHydrant( + FireHydrant indexToPersist, + DataSchema schema, + Interval interval, + Map metaData + ) { synchronized (indexToPersist) { if (indexToPersist.hasSwapped()) { @@ -902,7 +951,7 @@ protected int persistHydrant(FireHydrant indexToPersist, DataSchema schema, Inte "DataSource[%s], Interval[%s], Metadata [%s] persisting Hydrant[%s]", schema.getDataSource(), interval, - commitMetaData, + metaData, indexToPersist ); try { @@ -910,17 +959,19 @@ protected int persistHydrant(FireHydrant indexToPersist, DataSchema schema, Inte final File persistedFile; final IndexSpec indexSpec = config.getIndexSpec(); + if (config.isPersistInHeap()) { persistedFile = IndexMaker.persist( indexToPersist.getIndex(), new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())), - commitMetaData, + metaData, indexSpec ); } else { persistedFile = IndexMerger.persist( indexToPersist.getIndex(), new File(computePersistDir(schema, interval), String.valueOf(indexToPersist.getCount())), + metaData, indexSpec ); } diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index cb072e1a48b6..fcb8a039f85c 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -1,19 +1,21 @@ /* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ package io.druid.segment.realtime; @@ -22,11 +24,11 @@ import com.metamx.common.Granularity; import com.metamx.common.ISE; import com.metamx.common.parsers.ParseException; +import io.druid.data.input.Committer; import io.druid.data.input.Firehose; import io.druid.data.input.FirehoseFactory; -import io.druid.data.input.Committer; -import io.druid.data.input.FirehoseV2; import io.druid.data.input.FirehoseFactoryV2; +import io.druid.data.input.FirehoseV2; import io.druid.data.input.InputRow; import io.druid.data.input.Row; import io.druid.data.input.impl.InputRowParser; @@ -44,7 +46,6 @@ import io.druid.segment.realtime.plumber.PlumberSchool; import io.druid.segment.realtime.plumber.Sink; import io.druid.utils.Runnables; - import org.joda.time.DateTime; import org.joda.time.Interval; import org.joda.time.Period; @@ -210,7 +211,7 @@ public void testRunV2() throws Exception Assert.assertEquals(1, realtimeManager2.getMetrics("testV2").processed()); Assert.assertEquals(1, realtimeManager2.getMetrics("testV2").thrownAway()); - Assert.assertEquals(1, realtimeManager2.getMetrics("testV2").unparseable()); + Assert.assertEquals(2, realtimeManager2.getMetrics("testV2").unparseable()); Assert.assertTrue(plumber2.isStartedJob()); Assert.assertTrue(plumber2.isFinishedJob()); Assert.assertEquals(1, plumber2.getPersistCount()); @@ -339,11 +340,14 @@ private static class TestFirehoseV2 implements FirehoseV2 private final Iterator rows; private InputRow currRow; private boolean stop; + private TestFirehoseV2(Iterator rows) { this.rows = rows; } - private void nextMessage() { + + private void nextMessage() + { currRow = null; while (currRow == null) { final TestInputRowHolder holder = rows.next(); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index d1bc50e12e5e..03f8c78437c7 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -1,19 +1,21 @@ /* - * Druid - a distributed column store. - * Copyright 2012 - 2015 Metamarkets Group Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ +* Licensed to Metamarkets Group Inc. (Metamarkets) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. Metamarkets licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ package io.druid.segment.realtime.plumber; @@ -25,6 +27,7 @@ import com.metamx.emitter.service.ServiceEmitter; import io.druid.client.FilteredServerView; import io.druid.client.ServerView; +import io.druid.data.input.Committer; import io.druid.data.input.InputRow; import io.druid.data.input.impl.DimensionsSpec; import io.druid.data.input.impl.InputRowParser; @@ -45,12 +48,13 @@ import io.druid.segment.realtime.SegmentPublisher; import io.druid.server.coordination.DataSegmentAnnouncer; import io.druid.timeline.DataSegment; -import junit.framework.Assert; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang.mutable.MutableBoolean; import org.easymock.EasyMock; import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -71,6 +75,7 @@ public class RealtimePlumberSchoolTest { private final RejectionPolicyFactory rejectionPolicy; private RealtimePlumber plumber; + private RealtimePlumberSchool realtimePlumberSchool; private DataSegmentAnnouncer announcer; private SegmentPublisher segmentPublisher; private DataSegmentPusher dataSegmentPusher; @@ -169,7 +174,7 @@ public InputRowParser withParseSpec(ParseSpec parseSpec) null ); - RealtimePlumberSchool realtimePlumberSchool = new RealtimePlumberSchool( + realtimePlumberSchool = new RealtimePlumberSchool( emitter, new DefaultQueryRunnerFactoryConglomerate(Maps., QueryRunnerFactory>newHashMap()), dataSegmentPusher, @@ -187,10 +192,31 @@ public InputRowParser withParseSpec(ParseSpec parseSpec) public void tearDown() throws Exception { EasyMock.verify(announcer, segmentPublisher, dataSegmentPusher, serverView, emitter); + FileUtils.deleteDirectory( + new File( + tuningConfig.getBasePersistDirectory(), + schema.getDataSource() + ) + ); } @Test(timeout = 60000) public void testPersist() throws Exception + { + testPersist(null); + } + + @Test(timeout = 60000) + public void testPersistWithCommitMetadata() throws Exception + { + final Object commitMetadata = "dummyCommitMetadata"; + testPersist(commitMetadata); + + plumber = (RealtimePlumber) realtimePlumberSchool.findPlumber(schema, tuningConfig, metrics); + Assert.assertEquals(commitMetadata, plumber.startJob()); + } + + private void testPersist(final Object commitMetadata) throws Exception { final MutableBoolean committed = new MutableBoolean(false); plumber.getSinks() @@ -203,22 +229,43 @@ public void testPersist() throws Exception new DateTime("2014-12-01T12:34:56.789").toString() ) ); - plumber.startJob(); + Assert.assertNull(plumber.startJob()); + final InputRow row = EasyMock.createNiceMock(InputRow.class); EasyMock.expect(row.getTimestampFromEpoch()).andReturn(0L); EasyMock.expect(row.getDimensions()).andReturn(new ArrayList()); EasyMock.replay(row); plumber.add(row); - plumber.persist( - new Runnable() - { - @Override - public void run() + + if (commitMetadata != null) { + plumber.persist( + new Committer() { - committed.setValue(true); + @Override + public Object getMetadata() + { + return commitMetadata; + } + + @Override + public void run() + { + committed.setValue(true); + } } - } - ); + ); + } else { + plumber.persist( + new Runnable() + { + @Override + public void run() + { + committed.setValue(true); + } + } + ); + } while (!committed.booleanValue()) { Thread.sleep(100);