Skip to content

Commit

Permalink
in coordinator db polling for available segments, ignore corrupted en…
Browse files Browse the repository at this point in the history
…tries in segments table so that coordinator continues to load new segments even if there are few corrupted segment entries
  • Loading branch information
himanshug committed Mar 7, 2016
1 parent 4fa08a1 commit 1288784
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
package io.druid.metadata;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Predicates;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
Expand Down Expand Up @@ -431,7 +433,7 @@ public void poll()

log.debug("Starting polling of segment table");

List<DataSegment> segments = dbi.withHandle(
final List<DataSegment> segments = dbi.withHandle(
new HandleCallback<List<DataSegment>>()
{
@Override
Expand All @@ -451,7 +453,8 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx)
return jsonMapper.readValue(r.getBytes("payload"), DataSegment.class);
}
catch (IOException e) {
throw new SQLException(e);
log.makeAlert(e, "Failed to read segment from db.");
return null;
}
}
}
Expand All @@ -466,9 +469,13 @@ public DataSegment map(int index, ResultSet r, StatementContext ctx)
return;
}

final Collection<DataSegment> segmentsFinal = Collections2.filter(
segments, Predicates.notNull()
);

log.info("Polled and found %,d segments in the database", segments.size());

for (final DataSegment segment : segments) {
for (final DataSegment segment : segmentsFinal) {
String datasourceName = segment.getDataSource();

DruidDataSource dataSource = newDataSources.get(datasourceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package io.druid.metadata;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import io.druid.timeline.DataSegment;
Expand Down Expand Up @@ -63,6 +64,32 @@ public SQLMetadataSegmentPublisher(

@Override
public void publishSegment(final DataSegment segment) throws IOException
{
publishSegment(
segment.getIdentifier(),
segment.getDataSource(),
new DateTime().toString(),
segment.getInterval().getStart().toString(),
segment.getInterval().getEnd().toString(),
(segment.getShardSpec() instanceof NoneShardSpec) ? false : true,
segment.getVersion(),
true,
jsonMapper.writeValueAsBytes(segment)
);
}

@VisibleForTesting
void publishSegment(
final String identifier,
final String dataSource,
final String createdDate,
final String start,
final String end,
final boolean partitioned,
final String version,
final boolean used,
final byte[] payload
)
{
try {
final DBI dbi = connector.getDBI();
Expand All @@ -75,14 +102,14 @@ public List<Map<String, Object>> withHandle(Handle handle) throws Exception
return handle.createQuery(
String.format("SELECT id FROM %s WHERE id=:id", config.getSegmentsTable())
)
.bind("id", segment.getIdentifier())
.bind("id", identifier)
.list();
}
}
);

if (!exists.isEmpty()) {
log.info("Found [%s] in DB, not updating DB", segment.getIdentifier());
log.info("Found [%s] in DB, not updating DB", identifier);
return;
}

Expand All @@ -93,15 +120,15 @@ public List<Map<String, Object>> withHandle(Handle handle) throws Exception
public Void withHandle(Handle handle) throws Exception
{
handle.createStatement(statement)
.bind("id", segment.getIdentifier())
.bind("dataSource", segment.getDataSource())
.bind("created_date", new DateTime().toString())
.bind("start", segment.getInterval().getStart().toString())
.bind("end", segment.getInterval().getEnd().toString())
.bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true)
.bind("version", segment.getVersion())
.bind("used", true)
.bind("payload", jsonMapper.writeValueAsBytes(segment))
.bind("id", identifier)
.bind("dataSource", dataSource)
.bind("created_date", createdDate)
.bind("start", start)
.bind("end", end)
.bind("partitioned", partitioned)
.bind("version", version)
.bind("used", used)
.bind("payload", payload)
.execute();

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.metamx.emitter.EmittingLogger;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.joda.time.Interval;
Expand All @@ -40,6 +43,7 @@ public class MetadataSegmentManagerTest
public final TestDerbyConnector.DerbyConnectorRule derbyConnectorRule = new TestDerbyConnector.DerbyConnectorRule();

private SQLMetadataSegmentManager manager;
private SQLMetadataSegmentPublisher publisher;
private final ObjectMapper jsonMapper = new DefaultObjectMapper();

private final DataSegment segment1 = new DataSegment(
Expand Down Expand Up @@ -78,15 +82,14 @@ public class MetadataSegmentManagerTest
public void setUp() throws Exception
{
TestDerbyConnector connector = derbyConnectorRule.getConnector();

manager = new SQLMetadataSegmentManager(
jsonMapper,
Suppliers.ofInstance(new MetadataSegmentManagerConfig()),
derbyConnectorRule.metadataTablesConfigSupplier(),
connector
);

SQLMetadataSegmentPublisher publisher = new SQLMetadataSegmentPublisher(
publisher = new SQLMetadataSegmentPublisher(
jsonMapper,
derbyConnectorRule.metadataTablesConfigSupplier().get(),
connector
Expand Down Expand Up @@ -114,6 +117,33 @@ public void testPoll()
manager.stop();
}

@Test
public void testPollWithCurroptedSegment()
{
//create a corrupted segment entry in segments table, which tests
//that overall loading of segments from database continues to work
//even in one of the entries are corrupted.
publisher.publishSegment(
"corrupt-segment-id",
"corrupt-datasource",
"corrupt-create-date",
"corrupt-start-date",
"corrupt-end-date",
true,
"corrupt-version",
true,
"corrupt-payload".getBytes()
);

EmittingLogger.registerEmitter(new NoopServiceEmitter());
manager.start();
manager.poll();

Assert.assertEquals(
"wikipedia", Iterables.getOnlyElement(manager.getInventory()).getName()
);
}

@Test
public void testGetUnusedSegmentsForInterval() throws Exception
{
Expand Down

0 comments on commit 1288784

Please sign in to comment.