Skip to content

Commit

Permalink
Add futures to SQLMetadataSegmentManager and SQLMetadataRuleManager
Browse files Browse the repository at this point in the history
  • Loading branch information
drcrallen committed May 15, 2015
1 parent 3c3db72 commit 051c3cc
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 47 deletions.
83 changes: 47 additions & 36 deletions server/src/main/java/io/druid/metadata/SQLMetadataRuleManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.common.Pair;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
Expand All @@ -41,7 +43,6 @@
import io.druid.server.coordinator.rules.ForeverLoadRule;
import io.druid.server.coordinator.rules.Rule;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.skife.jdbi.v2.FoldController;
import org.skife.jdbi.v2.Folder3;
import org.skife.jdbi.v2.Handle;
Expand All @@ -58,7 +59,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand Down Expand Up @@ -134,12 +135,13 @@ public Void withHandle(Handle handle) throws Exception
private final AtomicReference<ImmutableMap<String, List<Rule>>> rules;
private final AuditManager auditManager;

private volatile ScheduledExecutorService exec;

private final Object lock = new Object();

private volatile boolean started = false;

private volatile ListeningScheduledExecutorService exec = null;
private volatile ListenableFuture<?> future = null;

@Inject
public SQLMetadataRuleManager(
@Json ObjectMapper jsonMapper,
Expand Down Expand Up @@ -168,21 +170,26 @@ public void start()
return;
}

this.exec = Execs.scheduledSingleThreaded("DatabaseRuleManager-Exec--%d");
exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded("DatabaseRuleManager-Exec--%d"));

createDefaultRule(dbi, getRulesTable(), config.get().getDefaultRule(), jsonMapper);
ScheduledExecutors.scheduleWithFixedDelay(
exec,
new Duration(0),
config.get().getPollDuration().toStandardDuration(),
future = exec.scheduleWithFixedDelay(
new Runnable()
{
@Override
public void run()
{
poll();
try {
poll();
}
catch (Exception e) {
log.error(e, "uncaught exception in rule manager polling thread");
}
}
}
},
0,
config.get().getPollDuration().toStandardDuration().getMillis(),
TimeUnit.MILLISECONDS
);

started = true;
Expand All @@ -199,6 +206,8 @@ public void stop()

rules.set(ImmutableMap.<String, List<Rule>>of());

future.cancel(false);
future = null;
started = false;
exec.shutdownNow();
exec = null;
Expand Down Expand Up @@ -235,7 +244,9 @@ public Pair<String, List<Rule>> map(int index, ResultSet r, StatementContext ctx
return Pair.of(
r.getString("dataSource"),
jsonMapper.<List<Rule>>readValue(
r.getBytes("payload"), new TypeReference<List<Rule>>(){}
r.getBytes("payload"), new TypeReference<List<Rule>>()
{
}
)
);
}
Expand All @@ -245,29 +256,29 @@ public Pair<String, List<Rule>> map(int index, ResultSet r, StatementContext ctx
}
}
)
.fold(
Maps.<String, List<Rule>>newHashMap(),
new Folder3<Map<String, List<Rule>>, Pair<String, List<Rule>>>()
{
@Override
public Map<String, List<Rule>> fold(
Map<String, List<Rule>> retVal,
Pair<String, List<Rule>> stringObjectMap,
FoldController foldController,
StatementContext statementContext
) throws SQLException
{
try {
String dataSource = stringObjectMap.lhs;
retVal.put(dataSource, stringObjectMap.rhs);
return retVal;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
.fold(
Maps.<String, List<Rule>>newHashMap(),
new Folder3<Map<String, List<Rule>>, Pair<String, List<Rule>>>()
{
@Override
public Map<String, List<Rule>> fold(
Map<String, List<Rule>> retVal,
Pair<String, List<Rule>> stringObjectMap,
FoldController foldController,
StatementContext statementContext
) throws SQLException
{
try {
String dataSource = stringObjectMap.lhs;
retVal.put(dataSource, stringObjectMap.rhs);
return retVal;
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
}
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import com.metamx.common.MapUtils;
import com.metamx.common.concurrent.ScheduledExecutors;
import com.metamx.common.lifecycle.LifecycleStart;
import com.metamx.common.lifecycle.LifecycleStop;
import com.metamx.common.logger.Logger;
Expand Down Expand Up @@ -56,7 +58,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand All @@ -74,7 +76,8 @@ public class SQLMetadataSegmentManager implements MetadataSegmentManager
private final AtomicReference<ConcurrentHashMap<String, DruidDataSource>> dataSources;
private final IDBI dbi;

private volatile ScheduledExecutorService exec;
private volatile ListeningScheduledExecutorService exec = null;
private volatile ListenableFuture<?> future = null;

private volatile boolean started = false;

Expand Down Expand Up @@ -103,23 +106,27 @@ public void start()
return;
}

this.exec = Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d");
exec = MoreExecutors.listeningDecorator(Execs.scheduledSingleThreaded("DatabaseSegmentManager-Exec--%d"));

final Duration delay = config.get().getPollDuration().toStandardDuration();
ScheduledExecutors.scheduleWithFixedDelay(
exec,
new Duration(0),
delay,
future = exec.scheduleWithFixedDelay(
new Runnable()
{
@Override
public void run()
{
poll();
try {
poll();
}
catch (Exception e) {
log.error(e, "uncaught exception in segment manager polling thread");
}
}
}
},
0,
delay.getMillis(),
TimeUnit.MILLISECONDS
);

started = true;
}
}
Expand All @@ -134,6 +141,8 @@ public void stop()

started = false;
dataSources.set(new ConcurrentHashMap<String, DruidDataSource>());
future.cancel(false);
future = null;
exec.shutdownNow();
exec = null;
}
Expand Down

0 comments on commit 051c3cc

Please sign in to comment.