Skip to content

Commit

Permalink
attempt to fix-2906 (apache#2985)
Browse files Browse the repository at this point in the history
* attempt to fix-2984

* review comments

* Add test
  • Loading branch information
nishantmonu51 authored and himanshug committed May 18, 2016
1 parent a8c2f07 commit 778f97a
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 7 deletions.
7 changes: 7 additions & 0 deletions indexing-hadoop/src/main/java/io/druid/indexer/Bucket.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package io.druid.indexer;

import com.google.common.annotations.VisibleForTesting;
import com.metamx.common.Pair;
import org.joda.time.DateTime;

Expand Down Expand Up @@ -121,4 +122,10 @@ public static final Pair<Bucket, byte[]> fromGroupKey(byte[] keyBytes)

return Pair.of(bucket, bytesLeft);
}

@VisibleForTesting
protected int getShardNum()
{
return shardNum;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ public static HadoopDruidIndexerConfig fromConfiguration(Configuration conf)

private volatile HadoopIngestionSpec schema;
private volatile PathSpec pathSpec;
private volatile Map<DateTime, ShardSpecLookup> shardSpecLookups = Maps.newHashMap();
private volatile Map<ShardSpec, HadoopyShardSpec> hadoopShardSpecLookup = Maps.newHashMap();
private final Map<DateTime, ShardSpecLookup> shardSpecLookups = Maps.newHashMap();
private final Map<DateTime, Map<ShardSpec, HadoopyShardSpec>> hadoopShardSpecLookup = Maps.newHashMap();
private final QueryGranularity rollupGran;

@JsonCreator
Expand Down Expand Up @@ -242,9 +242,13 @@ public ShardSpec apply(HadoopyShardSpec input)
)
)
);

Map<ShardSpec, HadoopyShardSpec> innerHadoopShardSpecLookup = Maps.newHashMap();
for (HadoopyShardSpec hadoopyShardSpec : entry.getValue()) {
hadoopShardSpecLookup.put(hadoopyShardSpec.getActualSpec(), hadoopyShardSpec);
innerHadoopShardSpecLookup.put(hadoopyShardSpec.getActualSpec(), hadoopyShardSpec);
}
hadoopShardSpecLookup.put(entry.getKey(), innerHadoopShardSpecLookup);

}
this.rollupGran = spec.getDataSchema().getGranularitySpec().getQueryGranularity();
}
Expand Down Expand Up @@ -393,18 +397,18 @@ public Optional<Bucket> getBucket(InputRow inputRow)
if (!timeBucket.isPresent()) {
return Optional.absent();
}

final ShardSpec actualSpec = shardSpecLookups.get(timeBucket.get().getStart())
final DateTime bucketStart = timeBucket.get().getStart();
final ShardSpec actualSpec = shardSpecLookups.get(bucketStart)
.getShardSpec(
rollupGran.truncate(inputRow.getTimestampFromEpoch()),
inputRow
);
final HadoopyShardSpec hadoopyShardSpec = hadoopShardSpecLookup.get(actualSpec);
final HadoopyShardSpec hadoopyShardSpec = hadoopShardSpecLookup.get(bucketStart).get(actualSpec);

return Optional.of(
new Bucket(
hadoopyShardSpec.getShardNum(),
timeBucket.get().getStart(),
bucketStart,
actualSpec.getPartitionNum()
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.HashBasedNumberedShardSpec;
import io.druid.timeline.partition.NoneShardSpec;
import io.druid.timeline.partition.NumberedShardSpec;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -252,4 +253,69 @@ public void testHashedBucketSelection()
}

}

@Test
public void testNoneShardSpecBucketSelection()
{
HadoopIngestionSpec spec = new HadoopIngestionSpec(
new DataSchema(
"foo",
null,
new AggregatorFactory[0],
new UniformGranularitySpec(
Granularity.MINUTE,
QueryGranularities.MINUTE,
ImmutableList.of(new Interval("2010-01-01/P1D"))
),
jsonMapper
),
new HadoopIOConfig(ImmutableMap.<String, Object>of("paths", "bar", "type", "static"), null, null),
new HadoopTuningConfig(
null,
null,
null,
ImmutableMap.<DateTime, List<HadoopyShardSpec>>of(new DateTime("2010-01-01T01:00:00"),
Lists.newArrayList(new HadoopyShardSpec(
NoneShardSpec.instance(),
1
)),
new DateTime("2010-01-01T02:00:00"),
Lists.newArrayList(new HadoopyShardSpec(
NoneShardSpec.instance(),
2
))
),
null,
null,
false,
false,
false,
false,
null,
false,
false,
null,
null,
null
)
);
HadoopDruidIndexerConfig config = HadoopDruidIndexerConfig.fromSpec(spec);
final List<String> dims = Arrays.asList("diM1", "dIM2");
final ImmutableMap<String, Object> values = ImmutableMap.<String, Object>of(
"Dim1",
"1",
"DiM2",
"2",
"dim1",
"3",
"dim2",
"4"
);
final long ts1 = new DateTime("2010-01-01T01:00:01").getMillis();
Assert.assertEquals(config.getBucket(new MapBasedInputRow(ts1, dims, values)).get().getShardNum(), 1);

final long ts2 = new DateTime("2010-01-01T02:00:01").getMillis();
Assert.assertEquals(config.getBucket(new MapBasedInputRow(ts2, dims, values)).get().getShardNum(), 2);

}
}

0 comments on commit 778f97a

Please sign in to comment.