Skip to content

Commit

Permalink
Merge pull request apache#2505 from gianm/rt-exceptions
Browse files Browse the repository at this point in the history
Harmonize realtime indexing loop across the task and standalone nodes.
  • Loading branch information
fjy committed Feb 19, 2016
2 parents 44e925c + 243ac53 commit a3c29b9
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import io.druid.segment.realtime.plumber.Committers;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.PlumberSchool;
import io.druid.segment.realtime.plumber.Plumbers;
import io.druid.segment.realtime.plumber.RealtimePlumberSchool;
import io.druid.segment.realtime.plumber.VersioningPolicy;
import io.druid.server.coordination.DataSegmentAnnouncer;
Expand Down Expand Up @@ -336,31 +337,7 @@ public String getVersion(final Interval interval)

// Time to read data!
while (firehose != null && (!gracefullyStopped || firehoseDrainableByClosing) && firehose.hasMore()) {
final InputRow inputRow;

try {
inputRow = firehose.nextRow();

if (inputRow == null) {
log.debug("thrown away null input row, considering unparseable");
fireDepartment.getMetrics().incrementUnparseable();
continue;
}
}
catch (ParseException e) {
log.debug(e, "thrown away line due to exception, considering unparseable");
fireDepartment.getMetrics().incrementUnparseable();
continue;
}

int numRows = plumber.add(inputRow, committerSupplier);
if (numRows == -1) {
fireDepartment.getMetrics().incrementThrownAway();
log.debug("Throwing away event[%s]", inputRow);
continue;
}

fireDepartment.getMetrics().incrementProcessed();
Plumbers.addNextRow(committerSupplier, firehose, plumber, fireDepartment.getMetrics());
}
}
catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.plumber.Committers;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.Plumbers;
import org.joda.time.Interval;

import java.io.Closeable;
Expand Down Expand Up @@ -339,42 +340,7 @@ private void runFirehose(Firehose firehose)
{
final Supplier<Committer> committerSupplier = Committers.supplierFromFirehose(firehose);
while (firehose.hasMore()) {
final InputRow inputRow;
try {
inputRow = firehose.nextRow();

if (inputRow == null) {
log.debug("thrown away null input row, considering unparseable");
metrics.incrementUnparseable();
continue;
}
}
catch (ParseException e) {
log.debug(e, "thrown away line due to exception, considering unparseable");
metrics.incrementUnparseable();
continue;
}

boolean lateEvent = false;
boolean indexLimitExceeded = false;
try {
lateEvent = plumber.add(inputRow, committerSupplier) == -1;
}
catch (IndexSizeExceededException e) {
log.info("Index limit exceeded: %s", e.getMessage());
indexLimitExceeded = true;
}
if (indexLimitExceeded || lateEvent) {
metrics.incrementThrownAway();
log.debug("Throwing away event[%s]", inputRow);

if (indexLimitExceeded) {
plumber.persist(committerSupplier.get());
}

continue;
}
metrics.incrementProcessed();
Plumbers.addNextRow(committerSupplier, firehose, plumber, metrics);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.segment.realtime.plumber;

import com.google.common.base.Supplier;
import com.metamx.common.ISE;
import com.metamx.common.logger.Logger;
import com.metamx.common.parsers.ParseException;
import io.druid.data.input.Committer;
import io.druid.data.input.Firehose;
import io.druid.data.input.InputRow;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.realtime.FireDepartmentMetrics;

public class Plumbers
{
private static final Logger log = new Logger(Plumbers.class);

private Plumbers()
{
// No instantiation
}

public static void addNextRow(
final Supplier<Committer> committerSupplier,
final Firehose firehose,
final Plumber plumber,
final FireDepartmentMetrics metrics
)
{
try {
final InputRow inputRow = firehose.nextRow();

if (inputRow == null) {
log.debug("Discarded null input row, considering unparseable.");
metrics.incrementUnparseable();
return;
}

// Included in ParseException try/catch, as additional parsing can be done during indexing.
int numRows = plumber.add(inputRow, committerSupplier);

if (numRows == -1) {
metrics.incrementThrownAway();
log.debug("Discarded row[%s], considering thrownAway.", inputRow);
return;
}

metrics.incrementProcessed();
}
catch (ParseException e) {
log.debug(e, "Discarded row due to exception, considering unparseable.");
metrics.incrementUnparseable();
}
catch (IndexSizeExceededException e) {
// Shouldn't happen if this is only being called by a single thread.
// plumber.add should be swapping out indexes before they fill up.
throw new ISE(e, "WTF?! Index size exceeded, this shouldn't happen. Bad Plumber!");
}
}
}

0 comments on commit a3c29b9

Please sign in to comment.