Skip to content

Commit

Permalink
SpecificSegmentQueryRunner misses missing segments from toYielder() (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
navis authored and fjy committed Oct 30, 2016
1 parent 23a8e22 commit 3fca3be
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.druid.java.util.common.guava.Accumulator;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Yielder;
import io.druid.java.util.common.guava.Yielders;
import io.druid.java.util.common.guava.YieldingAccumulator;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
Expand Down Expand Up @@ -87,12 +88,7 @@ public OutType call() throws Exception
return baseSequence.accumulate(initValue, accumulator);
}
catch (SegmentMissingException e) {
List<SegmentDescriptor> missingSegments = (List<SegmentDescriptor>) responseContext.get(Result.MISSING_SEGMENTS_KEY);
if (missingSegments == null) {
missingSegments = Lists.newArrayList();
responseContext.put(Result.MISSING_SEGMENTS_KEY, missingSegments);
}
missingSegments.add(specificSpec.getDescriptor());
appendMissingSegment(responseContext);
return initValue;
}
}
Expand All @@ -112,7 +108,13 @@ public <OutType> Yielder<OutType> toYielder(
@Override
public Yielder<OutType> call() throws Exception
{
return makeYielder(baseSequence.toYielder(initValue, accumulator));
try {
return makeYielder(baseSequence.toYielder(initValue, accumulator));
}
catch (SegmentMissingException e) {
appendMissingSegment(responseContext);
return Yielders.done(initValue, null);
}
}
}
);
Expand Down Expand Up @@ -164,6 +166,16 @@ private <RetType> RetType doItNamed(Callable<RetType> toRun)
};
}

private void appendMissingSegment(Map<String, Object> responseContext)
{
List<SegmentDescriptor> missingSegments = (List<SegmentDescriptor>) responseContext.get(Result.MISSING_SEGMENTS_KEY);
if (missingSegments == null) {
missingSegments = Lists.newArrayList();
responseContext.put(Result.MISSING_SEGMENTS_KEY, missingSegments);
}
missingSegments.add(specificSpec.getDescriptor());
}

private <RetType> RetType doNamed(Thread currThread, String currName, String newName, Callable<RetType> toRun)
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public Yielder<Object> toYielder(
Object initValue, YieldingAccumulator accumulator
)
{
return null;
throw new SegmentMissingException("FAILSAUCE");
}
};

Expand All @@ -94,7 +94,8 @@ public Yielder<Object> toYielder(
)
);

final Map<String, Object> responseContext = Maps.newHashMap();
// from accumulate
Map<String, Object> responseContext = Maps.newHashMap();
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("foo")
.granularity(QueryGranularities.ALL)
Expand All @@ -105,24 +106,26 @@ public Yielder<Object> toYielder(
)
)
.build();
Sequence results = queryRunner.run(
query,
responseContext
);
Sequence results = queryRunner.run(query, responseContext);
Sequences.toList(results, Lists.newArrayList());
validate(mapper, descriptor, responseContext);

Object missingSegments = responseContext.get(Result.MISSING_SEGMENTS_KEY);

Assert.assertTrue(missingSegments != null);
Assert.assertTrue(missingSegments instanceof List);

Object segmentDesc = ((List) missingSegments).get(0);

Assert.assertTrue(segmentDesc instanceof SegmentDescriptor);

SegmentDescriptor newDesc = mapper.readValue(mapper.writeValueAsString(segmentDesc), SegmentDescriptor.class);

Assert.assertEquals(descriptor, newDesc);
// from toYielder
responseContext = Maps.newHashMap();
results = queryRunner.run(query, responseContext);
results.toYielder(
null, new YieldingAccumulator()
{
final List lists = Lists.newArrayList();
@Override
public Object accumulate(Object accumulated, Object in)
{
lists.add(in);
return in;
}
}
);
validate(mapper, descriptor, responseContext);
}

@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -195,6 +198,12 @@ public void run()

Assert.assertTrue(1L == theVal.getValue().getLongMetric("rows"));

validate(mapper, descriptor, responseContext);
}

private void validate(ObjectMapper mapper, SegmentDescriptor descriptor, Map<String, Object> responseContext)
throws java.io.IOException
{
Object missingSegments = responseContext.get(Result.MISSING_SEGMENTS_KEY);

Assert.assertTrue(missingSegments != null);
Expand Down

0 comments on commit 3fca3be

Please sign in to comment.