Skip to content

Commit

Permalink
finished merging into druid-0.7.x; derby not working (to be fixed)
Browse files Browse the repository at this point in the history
  • Loading branch information
jisookim0513 authored and jisookim0513 committed Sep 26, 2014
1 parent 43cc628 commit 6a64162
Show file tree
Hide file tree
Showing 188 changed files with 1,344 additions and 767 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@
/**
* An OrderedMergeIterator is an iterator that merges together multiple sorted iterators. It is written assuming
* that the input Iterators are provided in order. That is, it places an extra restriction in the input iterators.
* <p/>
*
* Normally a merge operation could operate with the actual input iterators in any order as long as the actual values
* in the iterators are sorted. This requires that not only the individual values be sorted, but that the iterators
* be provided in the order of the first element of each iterator.
* <p/>
*
* If this doesn't make sense, check out OrderedMergeIteratorTest.testScrewsUpOnOutOfOrderBeginningOfList()
* <p/>
*
* It places this extra restriction on the input data in order to implement an optimization that allows it to
* remain as lazy as possible in the face of a common case where the iterators are just appended one after the other.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Ordering;
import com.google.common.io.Closeables;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.CloseQuietly;
import com.metamx.common.guava.Sequence;
Expand All @@ -38,13 +37,13 @@
/**
* An OrderedMergeIterator is an iterator that merges together multiple sorted iterators. It is written assuming
* that the input Iterators are provided in order. That is, it places an extra restriction in the input iterators.
* <p/>
*
* Normally a merge operation could operate with the actual input iterators in any order as long as the actual values
* in the iterators are sorted. This requires that not only the individual values be sorted, but that the iterators
* be provided in the order of the first element of each iterator.
* <p/>
*
* If this doesn't make sense, check out OrderedMergeSequenceTest.testScrewsUpOnOutOfOrderBeginningOfList()
* <p/>
*
* It places this extra restriction on the input data in order to implement an optimization that allows it to
* remain as lazy as possible in the face of a common case where the iterators are just appended one after the other.
*/
Expand Down
12 changes: 6 additions & 6 deletions common/src/main/java/io/druid/common/config/ConfigManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class ConfigManager
private final Object lock = new Object();
private boolean started = false;

private final MetadataDbConnector metadataDbConnector;
private final MetadataDbConnector dbConnector;
private final Supplier<ConfigManagerConfig> config;

private final ScheduledExecutorService exec;
Expand All @@ -58,9 +58,9 @@ public class ConfigManager
private volatile ConfigManager.PollingCallable poller;

@Inject
public ConfigManager(MetadataDbConnector metadataDbConnector, Supplier<MetadataTablesConfig> dbTables, Supplier<ConfigManagerConfig> config)
public ConfigManager(MetadataDbConnector dbConnector, Supplier<MetadataTablesConfig> dbTables, Supplier<ConfigManagerConfig> config)
{
this.metadataDbConnector = metadataDbConnector;
this.dbConnector = dbConnector;
this.config = config;

this.exec = ScheduledExecutors.fixed(1, "config-manager-%s");
Expand Down Expand Up @@ -105,7 +105,7 @@ private void poll()
{
for (Map.Entry<String, ConfigHolder> entry : watchedConfigs.entrySet()) {
try {
if (entry.getValue().swapIfNew(metadataDbConnector.lookup(configTable, "name", "payload", entry.getKey()))) {
if (entry.getValue().swapIfNew(dbConnector.lookup(configTable, "name", "payload", entry.getKey()))) {
log.info("New value for key[%s] seen.", entry.getKey());
}
}
Expand Down Expand Up @@ -137,7 +137,7 @@ public ConfigHolder<T> call() throws Exception
// Multiple of these callables can be submitted at the same time, but the callables themselves
// are executed serially, so double check that it hasn't already been populated.
if (!watchedConfigs.containsKey(key)) {
byte[] value = metadataDbConnector.lookup(configTable, "name", "payload", key);
byte[] value = dbConnector.lookup(configTable, "name", "payload", key);
ConfigHolder<T> holder = new ConfigHolder<T>(value, serde);
watchedConfigs.put(key, holder);
}
Expand Down Expand Up @@ -181,7 +181,7 @@ public <T> boolean set(final String key, final ConfigSerde<T> serde, final T obj
@Override
public Boolean call() throws Exception
{
metadataDbConnector.insertOrUpdate(configTable, "name", "payload", key, newBytes);
dbConnector.insertOrUpdate(configTable, "name", "payload", key, newBytes);

final ConfigHolder configHolder = watchedConfigs.get(key);
if (configHolder != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,4 +262,4 @@ public float[] readFloats(InputStream in) throws IOException

return retVal;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void configure(Binder binder)

@Provides @ManageLifecycle
public ConfigManager getConfigManager(
final MetadataDbConnector metadataDbConnector,
final MetadataDbConnector dbConnector,
final Supplier<MetadataTablesConfig> dbTables,
final Supplier<ConfigManagerConfig> config,
final Lifecycle lifecycle
Expand All @@ -55,7 +55,7 @@ public ConfigManager getConfigManager(
@Override
public void start() throws Exception
{
metadataDbConnector.createConfigTable();
dbConnector.createConfigTable();
}

@Override
Expand All @@ -66,6 +66,6 @@ public void stop()
}
);

return new ConfigManager(metadataDbConnector, dbTables, config);
return new ConfigManager(dbConnector, dbTables, config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,17 @@

/**
* VersionedIntervalTimeline is a data structure that manages objects on a specific timeline.
* <p/>
*
* It associates a jodatime Interval and a generically-typed version with the object that is being stored.
* <p/>
*
* In the event of overlapping timeline entries, timeline intervals may be chunked. The underlying data associated
* with a timeline entry remains unchanged when chunking occurs.
* <p/>
*
* After loading objects via the add() method, the lookup(Interval) method can be used to get the list of the most
* recent objects (according to the version) that match the given interval. The intent is that objects represent
* a certain time period and when you do a lookup(), you are asking for all of the objects that you need to look
* at in order to get a correct answer about that time period.
* <p/>
*
* The findOvershadowed() method returns a list of objects that will never be returned by a call to lookup() because
* they are overshadowed by some other object. This can be used in conjunction with the add() and remove() methods
* to achieve "atomic" updates. First add new items, then check if those items caused anything to be overshadowed, if
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@
import com.metamx.common.guava.SequenceTestHelper;
import com.metamx.common.guava.Sequences;
import com.metamx.common.guava.TestSequence;
import com.metamx.common.guava.Yielder;
import com.metamx.common.guava.YieldingAccumulator;
import junit.framework.Assert;
import org.junit.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
},
"pathSpec" : {
"type" : "static",
"paths" : "examples/bin/examples/indexing/wikipedia_data.json"
"paths" : "examples/indexing/wikipedia_data.json"
},
"targetPartitionSize" : 5000000,
"rollupSpec" : {
Expand Down
2 changes: 1 addition & 1 deletion examples/bin/examples/indexing/wikipedia_index_task.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
}],
"firehose" : {
"type" : "local",
"baseDir" : "examples/bin/examples/indexing",
"baseDir" : "examples/indexing/",
"filter" : "wikipedia_data.json",
"parser" : {
"timestampSpec" : {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,13 @@
/**
* Determines appropriate ShardSpecs for a job by determining whether or not partitioning is necessary, and if so,
* choosing the best dimension that satisfies the criteria:
* <p/>
*
* <ul>
* <li>Must have exactly one value per row.</li>
* <li>Must not generate oversized partitions. A dimension with N rows having the same value will necessarily
* put all those rows in the same partition, and that partition may be much larger than the target size.</li>
* </ul>
* <p/>
*
* "Best" means a very high cardinality dimension, or, if none exist, the dimension that minimizes variation of
* segment size relative to the target.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ public HadoopDruidIndexerJob(
this.config = config;

if (config.isUpdaterJobSpecSet()) {
this.dbUpdaterJob = new DbUpdaterJob(config);
dbUpdaterJob = new DbUpdaterJob(config);
} else {
this.dbUpdaterJob = null;
dbUpdaterJob = null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.db.MetadataDbConnectorConfig;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.indexer.partitions.PartitionsSpec;
import io.druid.indexer.partitions.RandomPartitionsSpec;
import io.druid.indexer.partitions.SingleDimensionPartitionsSpec;
import io.druid.indexer.updater.DbUpdaterJobSpec;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package io.druid.indexer.partitions;

import com.google.common.base.Throwables;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopDruidIndexerConfigTest;
import org.junit.Assert;
import org.junit.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package io.druid.indexer.partitions;

import com.google.common.base.Throwables;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopDruidIndexerConfigTest;
import org.junit.Assert;
import org.junit.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
/**
* Represents the status of a task from the perspective of the coordinator. The task may be ongoing
* ({@link #isComplete()} false) or it may be complete ({@link #isComplete()} true).
* <p/>
*
* TaskStatus objects are immutable.
*/
public class TaskStatus
Expand Down Expand Up @@ -101,6 +101,8 @@ public long getDuration()
/**
* Signals that a task is not yet complete, and is still runnable on a worker. Exactly one of isRunnable,
* isSuccess, or isFailure will be true at any one time.
*
* @return whether the task is runnable.
*/
@JsonIgnore
public boolean isRunnable()
Expand All @@ -110,6 +112,8 @@ public boolean isRunnable()

/**
* Inverse of {@link #isRunnable}.
*
* @return whether the task is complete.
*/
@JsonIgnore
public boolean isComplete()
Expand All @@ -120,6 +124,8 @@ public boolean isComplete()
/**
* Returned by tasks when they spawn subtasks. Exactly one of isRunnable, isSuccess, or isFailure will
* be true at any one time.
*
* @return whether the task succeeded.
*/
@JsonIgnore
public boolean isSuccess()
Expand All @@ -130,6 +136,8 @@ public boolean isSuccess()
/**
* Returned by tasks when they complete unsuccessfully. Exactly one of isRunnable, isSuccess, or
* isFailure will be true at any one time.
*
* @return whether the task failed
*/
@JsonIgnore
public boolean isFailure()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.druid.indexing.common.config.TaskConfig;
import io.druid.indexing.common.task.Task;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.column.ColumnConfig;
import io.druid.segment.loading.DataSegmentArchiver;
import io.druid.segment.loading.DataSegmentKiller;
import io.druid.segment.loading.DataSegmentMover;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public Set<DataSegment> perform(Task task, TaskActionToolbox toolbox) throws IOE
{
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);

final Set<DataSegment> retVal = toolbox.getIndexerMetadataCoordinator().announceHistoricalSegments(segments);
final Set<DataSegment> retVal = toolbox.getIndexerDBCoordinator().announceHistoricalSegments(segments);

// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public TypeReference<List<DataSegment>> getReturnTypeReference()
@Override
public List<DataSegment> perform(Task task, TaskActionToolbox toolbox) throws IOException
{
return toolbox.getIndexerMetadataCoordinator().getUnusedSegmentsForInterval(dataSource, interval);
return toolbox.getIndexerDBCoordinator().getUnusedSegmentsForInterval(dataSource, interval);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public TypeReference<List<DataSegment>> getReturnTypeReference()
@Override
public List<DataSegment> perform(Task task, TaskActionToolbox toolbox) throws IOException
{
return toolbox.getIndexerMetadataCoordinator().getUsedSegmentsForInterval(dataSource, interval);
return toolbox.getIndexerDBCoordinator().getUsedSegmentsForInterval(dataSource, interval);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public Void perform(
) throws IOException
{
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
toolbox.getIndexerMetadataCoordinator().updateSegmentMetadata(segments);
toolbox.getIndexerDBCoordinator().updateSegmentMetadata(segments);

// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public TypeReference<Void> getReturnTypeReference()
public Void perform(Task task, TaskActionToolbox toolbox) throws IOException
{
toolbox.verifyTaskLocksAndSinglePartitionSettitude(task, segments, true);
toolbox.getIndexerMetadataCoordinator().deleteSegments(segments);
toolbox.getIndexerDBCoordinator().deleteSegments(segments);

// Emit metrics
final ServiceMetricEvent.Builder metricBuilder = new ServiceMetricEvent.Builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,18 @@
public class TaskActionToolbox
{
private final TaskLockbox taskLockbox;
private final IndexerMetadataCoordinator indexerMetadataCoordinator;
private final IndexerMetadataCoordinator indexerDBCoordinator;
private final ServiceEmitter emitter;

@Inject
public TaskActionToolbox(
TaskLockbox taskLockbox,
IndexerMetadataCoordinator indexerMetadataCoordinator,
IndexerMetadataCoordinator indexerDBCoordinator,
ServiceEmitter emitter
)
{
this.taskLockbox = taskLockbox;
this.indexerMetadataCoordinator = indexerMetadataCoordinator;
this.indexerDBCoordinator = indexerDBCoordinator;
this.emitter = emitter;
}

Expand All @@ -57,9 +57,9 @@ public TaskLockbox getTaskLockbox()
return taskLockbox;
}

public IndexerMetadataCoordinator getIndexerMetadataCoordinator()
public IndexerMetadataCoordinator getIndexerDBCoordinator()
{
return indexerMetadataCoordinator;
return indexerDBCoordinator;
}

public ServiceEmitter getEmitter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,12 @@ public <T> QueryRunner<T> getQueryRunner(Query<T> query)
return null;
}

@Override
public String getClasspathPrefix()
{
return null;
}

@Override
public String toString()
{
Expand All @@ -118,7 +124,10 @@ public String toString()
}

/**
* Start helper methods *
* Start helper methods
*
* @param objects objects to join
* @return string of joined objects
*/
public static String joinId(Object... objects)
{
Expand Down
Loading

0 comments on commit 6a64162

Please sign in to comment.