Skip to content

Commit

Permalink
Use timestamp in millis as Map key instead of DateTime object (apache…
Browse files Browse the repository at this point in the history
…#3674)

* Use Long timestamp as key instead of DateTime.

DateTime representation is screwed up when you store with an obj
and read with a different DateTime obj.

For example: The code below fails when you use DateTime as key
```
        DateTime odt = DateTime.now(DateTimeUtils.getZone(DateTimeZone.forID("America/Los_Angeles")));
        HashMap<DateTime, String> map = new HashMap<>();
        map.put(odt, "abc");
        DateTime dt = new DateTime(odt.getMillis());
        System.out.println(map.get(dt));
```

* Respect timezone when creating the file.

* Update docs with timezone caveat in granularity spec

* Remove unused imports
  • Loading branch information
praveev authored and fjy committed Nov 11, 2016
1 parent fd54514 commit 52a74cf
Show file tree
Hide file tree
Showing 13 changed files with 51 additions and 50 deletions.
4 changes: 2 additions & 2 deletions docs/content/ingestion/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ This spec is used to generated segments with uniform intervals.
| queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. | no (default == 'NONE') |
| rollup | boolean | rollup or not | no (default == true) |
| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for batch, no for real-time |
| timezone | string | The timezone to represent the interval offsets in. | no (default == 'UTC')
| timezone | string | The timezone to represent the interval offsets in. Only valid if intervals are explicitly specified for batch ingestion. Will not be valid for kafka based ingestion. | no (default == 'UTC')

### Arbitrary Granularity Spec

Expand All @@ -200,7 +200,7 @@ This spec is used to generate segments with arbitrary intervals (it tries to cre
| queryGranularity | string | The minimum granularity to be able to query results at and the granularity of the data inside the segment. E.g. a value of "minute" will mean that data is aggregated at minutely granularity. That is, if there are collisions in the tuple (minute(timestamp), dimensions), then it will aggregate values together using the aggregators instead of storing individual rows. | no (default == 'NONE') |
| rollup | boolean | rollup or not | no (default == true) |
| intervals | string | A list of intervals for the raw data being ingested. Ignored for real-time ingestion. | yes for batch, no for real-time |
| timezone | string | The timezone to represent the interval offsets in. | no (default == 'UTC')
| timezone | string | The timezone to represent the interval offsets in. Only valid if intervals are explicitly specified for batch ingestion. Will not be valid for kafka based ingestion. | no (default == 'UTC')

# IO Config

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,11 +302,11 @@ private void verifyJob(IndexGeneratorJob job) throws IOException
}
}

private Map<DateTime, List<HadoopyShardSpec>> loadShardSpecs(
private Map<Long, List<HadoopyShardSpec>> loadShardSpecs(
Integer[][][] shardInfoForEachShard
)
{
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
Map<Long, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
int shardCount = 0;
int segmentNum = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
Expand All @@ -319,7 +319,7 @@ private Map<DateTime, List<HadoopyShardSpec>> loadShardSpecs(
actualSpecs.add(new HadoopyShardSpec(spec, shardCount++));
}

shardSpecs.put(segmentGranularity.getStart(), actualSpecs);
shardSpecs.put(segmentGranularity.getStartMillis(), actualSpecs);
}

return shardSpecs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public boolean run()
);
log.info("Determined Intervals for Job [%s].", config.getSegmentGranularIntervals());
}
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
Map<Long, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
int shardCount = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
DateTime bucket = segmentGranularity.getStart();
Expand Down Expand Up @@ -191,7 +191,7 @@ public boolean run()
}
}

shardSpecs.put(bucket, actualSpecs);
shardSpecs.put(bucket.getMillis(), actualSpecs);

} else {
log.info("Path[%s] didn't exist!?", partitionInfoPath);
Expand Down Expand Up @@ -323,7 +323,12 @@ protected void reduce(
HyperLogLogCollector.makeCollector(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()))
);
}
Interval interval = config.getGranularitySpec().getSegmentGranularity().bucket(new DateTime(key.get()));
Optional<Interval> intervalOptional = config.getGranularitySpec().bucketInterval(new DateTime(key.get()));

if (!intervalOptional.isPresent()) {
throw new ISE("WTF?! No bucket found for timestamp: %s", key.get());
}
Interval interval = intervalOptional.get();
intervals.add(interval);
final Path outPath = config.makeSegmentPartitionInfoPath(interval);
final OutputStream out = Utils.makePathAndOutputStream(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator;
import org.joda.time.Interval;

import java.io.IOException;
Expand Down Expand Up @@ -218,7 +217,7 @@ public boolean run()

log.info("Job completed, loading up partitions for intervals[%s].", config.getSegmentGranularIntervals());
FileSystem fileSystem = null;
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
Map<Long, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap();
int shardCount = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
final Path partitionInfoPath = config.makeSegmentPartitionInfoPath(segmentGranularity);
Expand All @@ -238,7 +237,7 @@ public boolean run()
log.info("DateTime[%s], partition[%d], spec[%s]", segmentGranularity, i, actualSpecs.get(i));
}

shardSpecs.put(segmentGranularity.getStart(), actualSpecs);
shardSpecs.put(segmentGranularity.getStartMillis(), actualSpecs);
} else {
log.info("Path[%s] didn't exist!?", partitionInfoPath);
}
Expand Down Expand Up @@ -370,17 +369,17 @@ public static class DeterminePartitionsDimSelectionMapperHelper
{
private final HadoopDruidIndexerConfig config;
private final String partitionDimension;
private final Map<DateTime, Integer> intervalIndexes;
private final Map<Long, Integer> intervalIndexes;

public DeterminePartitionsDimSelectionMapperHelper(HadoopDruidIndexerConfig config, String partitionDimension)
{
this.config = config;
this.partitionDimension = partitionDimension;

final ImmutableMap.Builder<DateTime, Integer> timeIndexBuilder = ImmutableMap.builder();
final ImmutableMap.Builder<Long, Integer> timeIndexBuilder = ImmutableMap.builder();
int idx = 0;
for (final Interval bucketInterval : config.getGranularitySpec().bucketIntervals().get()) {
timeIndexBuilder.put(bucketInterval.getStart(), idx);
timeIndexBuilder.put(bucketInterval.getStartMillis(), idx);
idx++;
}

Expand All @@ -400,7 +399,7 @@ public void emitDimValueCounts(
}

final Interval interval = maybeInterval.get();
final int intervalIndex = intervalIndexes.get(interval.getStart());
final int intervalIndex = intervalIndexes.get(interval.getStartMillis());

final ByteBuffer buf = ByteBuffer.allocate(4 + 8);
buf.putInt(intervalIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator;
import org.joda.time.Interval;

import java.util.List;
Expand Down Expand Up @@ -59,7 +58,7 @@ public boolean run()
jobs.add(config.getPartitionsSpec().getPartitionJob(config));
} else {
int shardsPerInterval = config.getPartitionsSpec().getNumShards();
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap(DateTimeComparator.getInstance());
Map<Long, List<HadoopyShardSpec>> shardSpecs = Maps.newTreeMap();
int shardCount = 0;
for (Interval segmentGranularity : config.getSegmentGranularIntervals().get()) {
DateTime bucket = segmentGranularity.getStart();
Expand All @@ -78,11 +77,11 @@ public boolean run()
)
);
}
shardSpecs.put(bucket, specs);
shardSpecs.put(bucket.getMillis(), specs);
log.info("DateTime[%s], spec[%s]", bucket, specs);
} else {
final HadoopyShardSpec spec = new HadoopyShardSpec(NoneShardSpec.instance(), shardCount++);
shardSpecs.put(bucket, Lists.newArrayList(spec));
shardSpecs.put(bucket.getMillis(), Lists.newArrayList(spec));
log.info("DateTime[%s], spec[%s]", bucket, spec);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,8 @@ public static HadoopDruidIndexerConfig fromConfiguration(Configuration conf)

private volatile HadoopIngestionSpec schema;
private volatile PathSpec pathSpec;
private final Map<DateTime, ShardSpecLookup> shardSpecLookups = Maps.newHashMap();
private final Map<DateTime, Map<ShardSpec, HadoopyShardSpec>> hadoopShardSpecLookup = Maps.newHashMap();
private final Map<Long, ShardSpecLookup> shardSpecLookups = Maps.newHashMap();
private final Map<Long, Map<ShardSpec, HadoopyShardSpec>> hadoopShardSpecLookup = Maps.newHashMap();
private final QueryGranularity rollupGran;

@JsonCreator
Expand All @@ -226,7 +226,7 @@ public HadoopDruidIndexerConfig(
{
this.schema = spec;
this.pathSpec = JSON_MAPPER.convertValue(spec.getIOConfig().getPathSpec(), PathSpec.class);
for (Map.Entry<DateTime, List<HadoopyShardSpec>> entry : spec.getTuningConfig().getShardSpecs().entrySet()) {
for (Map.Entry<Long, List<HadoopyShardSpec>> entry : spec.getTuningConfig().getShardSpecs().entrySet()) {
if (entry.getValue() == null || entry.getValue().isEmpty()) {
continue;
}
Expand Down Expand Up @@ -310,7 +310,7 @@ public void setVersion(String version)
this.pathSpec = JSON_MAPPER.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class);
}

public void setShardSpecs(Map<DateTime, List<HadoopyShardSpec>> shardSpecs)
public void setShardSpecs(Map<Long, List<HadoopyShardSpec>> shardSpecs)
{
this.schema = schema.withTuningConfig(schema.getTuningConfig().withShardSpecs(shardSpecs));
this.pathSpec = JSON_MAPPER.convertValue(schema.getIOConfig().getPathSpec(), PathSpec.class);
Expand Down Expand Up @@ -363,12 +363,12 @@ public InputRowParser getParser()

public HadoopyShardSpec getShardSpec(Bucket bucket)
{
return schema.getTuningConfig().getShardSpecs().get(bucket.time).get(bucket.partitionNum);
return schema.getTuningConfig().getShardSpecs().get(bucket.time.getMillis()).get(bucket.partitionNum);
}

public int getShardSpecCount(Bucket bucket)
{
return schema.getTuningConfig().getShardSpecs().get(bucket.time).size();
return schema.getTuningConfig().getShardSpecs().get(bucket.time.getMillis()).size();
}

public boolean isBuildV9Directly()
Expand Down Expand Up @@ -411,12 +411,12 @@ public Optional<Bucket> getBucket(InputRow inputRow)
return Optional.absent();
}
final DateTime bucketStart = timeBucket.get().getStart();
final ShardSpec actualSpec = shardSpecLookups.get(bucketStart)
final ShardSpec actualSpec = shardSpecLookups.get(bucketStart.getMillis())
.getShardSpec(
rollupGran.truncate(inputRow.getTimestampFromEpoch()),
inputRow
);
final HadoopyShardSpec hadoopyShardSpec = hadoopShardSpecLookup.get(bucketStart).get(actualSpec);
final HadoopyShardSpec hadoopyShardSpec = hadoopShardSpecLookup.get(bucketStart.getMillis()).get(actualSpec);

return Optional.of(
new Bucket(
Expand Down Expand Up @@ -452,7 +452,7 @@ public Optional<Iterable<Bucket>> getAllBuckets()
public Iterable<Bucket> apply(Interval input)
{
final DateTime bucketTime = input.getStart();
final List<HadoopyShardSpec> specs = schema.getTuningConfig().getShardSpecs().get(bucketTime);
final List<HadoopyShardSpec> specs = schema.getTuningConfig().getShardSpecs().get(bucketTime.getMillis());
if (specs == null) {
return ImmutableList.of();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
public class HadoopTuningConfig implements TuningConfig
{
private static final PartitionsSpec DEFAULT_PARTITIONS_SPEC = HashedPartitionsSpec.makeDefaultHashedPartitionsSpec();
private static final Map<DateTime, List<HadoopyShardSpec>> DEFAULT_SHARD_SPECS = ImmutableMap.of();
private static final Map<Long, List<HadoopyShardSpec>> DEFAULT_SHARD_SPECS = ImmutableMap.of();
private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 75000;
private static final boolean DEFAULT_USE_COMBINER = false;
Expand Down Expand Up @@ -73,7 +73,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig()
private final String workingPath;
private final String version;
private final PartitionsSpec partitionsSpec;
private final Map<DateTime, List<HadoopyShardSpec>> shardSpecs;
private final Map<Long, List<HadoopyShardSpec>> shardSpecs;
private final IndexSpec indexSpec;
private final int rowFlushBoundary;
private final boolean leaveIntermediate;
Expand All @@ -93,7 +93,7 @@ public HadoopTuningConfig(
final @JsonProperty("workingPath") String workingPath,
final @JsonProperty("version") String version,
final @JsonProperty("partitionsSpec") PartitionsSpec partitionsSpec,
final @JsonProperty("shardSpecs") Map<DateTime, List<HadoopyShardSpec>> shardSpecs,
final @JsonProperty("shardSpecs") Map<Long, List<HadoopyShardSpec>> shardSpecs,
final @JsonProperty("indexSpec") IndexSpec indexSpec,
final @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
final @JsonProperty("leaveIntermediate") boolean leaveIntermediate,
Expand Down Expand Up @@ -156,7 +156,7 @@ public PartitionsSpec getPartitionsSpec()
}

@JsonProperty
public Map<DateTime, List<HadoopyShardSpec>> getShardSpecs()
public Map<Long, List<HadoopyShardSpec>> getShardSpecs()
{
return shardSpecs;
}
Expand Down Expand Up @@ -287,7 +287,7 @@ public HadoopTuningConfig withVersion(String ver)
);
}

public HadoopTuningConfig withShardSpecs(Map<DateTime, List<HadoopyShardSpec>> specs)
public HadoopTuningConfig withShardSpecs(Map<Long, List<HadoopyShardSpec>> specs)
{
return new HadoopTuningConfig(
workingPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOExce
final FileSystem fs = betaInput.getFileSystem(job.getConfiguration());
final Granularity segmentGranularity = config.getGranularitySpec().getSegmentGranularity();

Map<DateTime, Long> inputModifiedTimes = new TreeMap<>(
Map<Long, Long> inputModifiedTimes = new TreeMap<>(
Comparators.inverse(Comparators.comparable())
);

Expand All @@ -83,12 +83,12 @@ public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOExce
final Long currVal = inputModifiedTimes.get(key);
final long mTime = status.getModificationTime();

inputModifiedTimes.put(key, currVal == null ? mTime : Math.max(currVal, mTime));
inputModifiedTimes.put(key.getMillis(), currVal == null ? mTime : Math.max(currVal, mTime));
}

Set<Interval> bucketsToRun = Sets.newTreeSet(Comparators.intervals());
for (Map.Entry<DateTime, Long> entry : inputModifiedTimes.entrySet()) {
DateTime timeBucket = entry.getKey();
for (Map.Entry<Long, Long> entry : inputModifiedTimes.entrySet()) {
DateTime timeBucket = new DateTime(entry.getKey());
long mTime = entry.getValue();

String bucketOutput = String.format(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,8 @@ private HadoopDruidIndexerConfig makeHadoopDruidIndexerConfig(Map<String, Object
);

config.setShardSpecs(
ImmutableMap.<DateTime, List<HadoopyShardSpec>>of(
INTERVAL_FULL.getStart(),
ImmutableMap.<Long, List<HadoopyShardSpec>>of(
INTERVAL_FULL.getStartMillis(),
ImmutableList.of(
new HadoopyShardSpec(
new HashBasedNumberedShardSpec(0, 1, null, HadoopDruidIndexerConfig.JSON_MAPPER),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import io.druid.query.aggregation.DoubleSumAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -175,13 +174,13 @@ public DetermineHashedPartitionsJobTest(String dataFilePath, long targetPartitio
public void testDetermineHashedPartitions(){
DetermineHashedPartitionsJob determineHashedPartitionsJob = new DetermineHashedPartitionsJob(indexerConfig);
determineHashedPartitionsJob.run();
Map<DateTime, List<HadoopyShardSpec>> shardSpecs = indexerConfig.getSchema().getTuningConfig().getShardSpecs();
Map<Long, List<HadoopyShardSpec>> shardSpecs = indexerConfig.getSchema().getTuningConfig().getShardSpecs();
Assert.assertEquals(
expectedNumTimeBuckets,
shardSpecs.entrySet().size()
);
int i=0;
for(Map.Entry<DateTime, List<HadoopyShardSpec>> entry : shardSpecs.entrySet()) {
for(Map.Entry<Long, List<HadoopyShardSpec>> entry : shardSpecs.entrySet()) {
Assert.assertEquals(
expectedNumOfShards[i++],
entry.getValue().size(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.timeline.partition.SingleDimensionShardSpec;
import org.apache.commons.io.FileUtils;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
Expand Down Expand Up @@ -285,7 +284,7 @@ public void testPartitionJob()
int segmentNum = 0;
Assert.assertEquals(expectedNumOfSegments, config.getSchema().getTuningConfig().getShardSpecs().size());

for (Map.Entry<DateTime, List<HadoopyShardSpec>> entry : config.getSchema()
for (Map.Entry<Long, List<HadoopyShardSpec>> entry : config.getSchema()
.getTuningConfig()
.getShardSpecs()
.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public void testHashedBucketSelection()
null,
null,
null,
ImmutableMap.of(new DateTime("2010-01-01T01:00:00"), specs),
ImmutableMap.of(new DateTime("2010-01-01T01:00:00").getMillis(), specs),
null,
null,
false,
Expand Down Expand Up @@ -276,12 +276,12 @@ public void testNoneShardSpecBucketSelection()
null,
null,
null,
ImmutableMap.<DateTime, List<HadoopyShardSpec>>of(new DateTime("2010-01-01T01:00:00"),
ImmutableMap.<Long, List<HadoopyShardSpec>>of(new DateTime("2010-01-01T01:00:00").getMillis(),
Lists.newArrayList(new HadoopyShardSpec(
NoneShardSpec.instance(),
1
)),
new DateTime("2010-01-01T02:00:00"),
new DateTime("2010-01-01T02:00:00").getMillis(),
Lists.newArrayList(new HadoopyShardSpec(
NoneShardSpec.instance(),
2
Expand Down
Loading

0 comments on commit 52a74cf

Please sign in to comment.