Skip to content

Commit

Permalink
Simple improvement for realtime manager (apache#4377)
Browse files Browse the repository at this point in the history
* Simple improvement for realtime manager

* Address comments

* tmp

* Address comments and add more tests

* Add catch for InterruptedException

* Address comments
  • Loading branch information
jihoonson authored and fjy committed Jun 12, 2017
1 parent 113b800 commit 073695d
Show file tree
Hide file tree
Showing 2 changed files with 308 additions and 227 deletions.
235 changes: 112 additions & 123 deletions server/src/main/java/io/druid/segment/realtime/RealtimeManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,21 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import io.druid.concurrent.Execs;
import io.druid.data.input.Committer;
import io.druid.data.input.Firehose;
import io.druid.data.input.FirehoseV2;
import io.druid.data.input.InputRow;
import io.druid.java.util.common.guava.CloseQuietly;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.io.Closer;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.query.FinalizeResultsQueryRunner;
Expand All @@ -54,11 +57,12 @@
import io.druid.server.coordination.DataSegmentServerAnnouncer;
import org.joda.time.Interval;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**
*/
Expand All @@ -75,6 +79,9 @@ public class RealtimeManager implements QuerySegmentWalker
*/
private final Map<String, Map<Integer, FireChief>> chiefs;

private ExecutorService fireChiefExecutor;
private boolean stopping;

@Inject
public RealtimeManager(
List<FireDepartment> fireDepartments,
Expand All @@ -99,43 +106,46 @@ public RealtimeManager(
this.chiefs = chiefs == null ? Maps.newHashMap() : Maps.newHashMap(chiefs);
}

@VisibleForTesting
Map<Integer, FireChief> getFireChiefs(String dataSource)
{
return chiefs.get(dataSource);
}

@LifecycleStart
public void start() throws IOException
{
serverAnnouncer.announce();

fireChiefExecutor = Execs.multiThreaded(fireDepartments.size(), "chief-%d");

for (final FireDepartment fireDepartment : fireDepartments) {
final DataSchema schema = fireDepartment.getDataSchema();

final FireChief chief = new FireChief(fireDepartment, conglomerate);
Map<Integer, FireChief> partitionChiefs = chiefs.get(schema.getDataSource());
if (partitionChiefs == null) {
partitionChiefs = new HashMap<>();
chiefs.put(schema.getDataSource(), partitionChiefs);
}
partitionChiefs.put(fireDepartment.getTuningConfig().getShardSpec().getPartitionNum(), chief);

chief.setName(
String.format(
"chief-%s[%s]",
schema.getDataSource(),
fireDepartment.getTuningConfig().getShardSpec().getPartitionNum()
)
);
chief.setDaemon(true);
chief.start();
chiefs.computeIfAbsent(schema.getDataSource(), k -> new HashMap<>())
.put(fireDepartment.getTuningConfig().getShardSpec().getPartitionNum(), chief);

fireChiefExecutor.submit(chief);
}
}

@LifecycleStop
public void stop()
{
for (Map<Integer, FireChief> chiefs : this.chiefs.values()) {
for (FireChief chief : chiefs.values()) {
CloseQuietly.close(chief);
stopping = true;
try {
if (fireChiefExecutor != null) {
fireChiefExecutor.shutdownNow();
Preconditions.checkState(
fireChiefExecutor.awaitTermination(10, TimeUnit.SECONDS),
"persistExecutor not terminated"
);
}
}

catch (InterruptedException e) {
throw new ISE(e, "Failed to shutdown fireChiefExecutor during stop()");
}
serverAnnouncer.unannounce();
}

Expand Down Expand Up @@ -211,79 +221,55 @@ public QueryRunner<T> apply(SegmentDescriptor spec)
);
}

static class FireChief extends Thread implements Closeable
class FireChief implements Runnable
{
private final FireDepartment fireDepartment;
private final FireDepartmentMetrics metrics;
private final RealtimeTuningConfig config;
private final QueryRunnerFactoryConglomerate conglomerate;

private volatile Firehose firehose = null;
private volatile FirehoseV2 firehoseV2 = null;
private volatile Plumber plumber = null;
private volatile boolean normalExit = true;
private Plumber plumber;

public FireChief(FireDepartment fireDepartment, QueryRunnerFactoryConglomerate conglomerate)
FireChief(FireDepartment fireDepartment, QueryRunnerFactoryConglomerate conglomerate)
{
this.fireDepartment = fireDepartment;
this.conglomerate = conglomerate;
this.config = fireDepartment.getTuningConfig();
this.metrics = fireDepartment.getMetrics();
}

public Firehose initFirehose()
private Firehose initFirehose()
{
synchronized (this) {
if (firehose == null) {
try {
log.info("Calling the FireDepartment and getting a Firehose.");
firehose = fireDepartment.connect();
log.info("Firehose acquired!");
}
catch (IOException e) {
throw Throwables.propagate(e);
}
} else {
log.warn("Firehose already connected, skipping initFirehose().");
}

return firehose;
try {
log.info("Calling the FireDepartment and getting a Firehose.");
return fireDepartment.connect();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}

public FirehoseV2 initFirehoseV2(Object metaData)
private FirehoseV2 initFirehoseV2(Object metaData)
{
synchronized (this) {
if (firehoseV2 == null) {
try {
log.info("Calling the FireDepartment and getting a FirehoseV2.");
firehoseV2 = fireDepartment.connect(metaData);
log.info("FirehoseV2 acquired!");
}
catch (IOException e) {
throw Throwables.propagate(e);
}
} else {
log.warn("FirehoseV2 already connected, skipping initFirehoseV2().");
}

return firehoseV2;
try {
log.info("Calling the FireDepartment and getting a FirehoseV2.");
return fireDepartment.connect(metaData);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}

public Plumber initPlumber()
private void initPlumber()
{
synchronized (this) {
if (plumber == null) {
log.info("Someone get us a plumber!");
plumber = fireDepartment.findPlumber();
log.info("We have our plumber!");
} else {
log.warn("Plumber already trained, skipping initPlumber().");
}
log.info("Someone get us a plumber!");
plumber = fireDepartment.findPlumber();
}

return plumber;
}
@VisibleForTesting
Plumber getPlumber()
{
return plumber;
}

public FireDepartmentMetrics getMetrics()
Expand All @@ -294,60 +280,69 @@ public FireDepartmentMetrics getMetrics()
@Override
public void run()
{
plumber = initPlumber();
initPlumber();

try {
Object metadata = plumber.startJob();

if (fireDepartment.checkFirehoseV2()) {
firehoseV2 = initFirehoseV2(metadata);
runFirehoseV2(firehoseV2);
} else {
firehose = initFirehose();
runFirehose(firehose);
}
final Closer closer = Closer.create();

try {
Object metadata = plumber.startJob();

Firehose firehose;
FirehoseV2 firehoseV2;
final boolean success;
if (fireDepartment.checkFirehoseV2()) {
firehoseV2 = initFirehoseV2(metadata);
closer.register(firehoseV2);
success = runFirehoseV2(firehoseV2);
} else {
firehose = initFirehose();
closer.register(firehose);
success = runFirehose(firehose);
}
if (success) {
// pluber.finishJob() is called only when every processing is successfully finished.
closer.register(() -> plumber.finishJob());
}
}
catch (InterruptedException e) {
log.warn("Interrupted while running a firehose");
throw closer.rethrow(e);
}
catch (Exception e) {
log.makeAlert(
e,
"[%s] aborted realtime processing[%s]",
e.getClass().getSimpleName(),
fireDepartment.getDataSchema().getDataSource()
).emit();
throw closer.rethrow(e);
}
catch (Error e) {
log.makeAlert(e, "Error aborted realtime processing[%s]", fireDepartment.getDataSchema().getDataSource())
.emit();
throw closer.rethrow(e);
}
finally {
closer.close();
}
}
catch (RuntimeException e) {
log.makeAlert(
e,
"[%s] aborted realtime processing[%s]",
e.getClass().getSimpleName(),
fireDepartment.getDataSchema().getDataSource()
).emit();
normalExit = false;
catch (IOException e) {
throw Throwables.propagate(e);
}
catch (Error e) {
log.makeAlert(e, "Error aborted realtime processing[%s]", fireDepartment.getDataSchema().getDataSource())
.emit();
normalExit = false;
throw e;
}
finally {
CloseQuietly.close(firehose);
if (normalExit) {
plumber.finishJob();
plumber = null;
firehose = null;
}
}
}

private void runFirehoseV2(FirehoseV2 firehose)
private boolean runFirehoseV2(FirehoseV2 firehose) throws Exception
{
try {
firehose.start();
}
catch (Exception e) {
log.error(e, "Failed to start firehoseV2");
return;
}
firehose.start();

log.info("FirehoseV2 started");
final Supplier<Committer> committerSupplier = Committers.supplierFromFirehoseV2(firehose);
boolean haveRow = true;
while (haveRow) {
if (Thread.interrupted() || stopping) {
return false;
}
InputRow inputRow = null;
int numRows = 0;
try {
Expand Down Expand Up @@ -378,14 +373,19 @@ private void runFirehoseV2(FirehoseV2 firehose)
metrics.incrementUnparseable();
}
}
return true;
}

private void runFirehose(Firehose firehose)
private boolean runFirehose(Firehose firehose)
{
final Supplier<Committer> committerSupplier = Committers.supplierFromFirehose(firehose);
while (firehose.hasMore()) {
if (Thread.interrupted() || stopping) {
return false;
}
Plumbers.addNextRow(committerSupplier, firehose, plumber, config.isReportParseExceptions(), metrics);
}
return true;
}

public <T> QueryRunner<T> getQueryRunner(Query<T> query)
Expand All @@ -395,16 +395,5 @@ public <T> QueryRunner<T> getQueryRunner(Query<T> query)

return new FinalizeResultsQueryRunner<T>(plumber.getQueryRunner(query), toolChest);
}

@Override
public void close() throws IOException
{
synchronized (this) {
if (firehose != null) {
normalExit = false;
firehose.close();
}
}
}
}
}
Loading

0 comments on commit 073695d

Please sign in to comment.