Skip to content

Commit fb6e0f0

Browse files
daschlMichael Nitschinger
authored andcommitted
SPY-132: Adding Callbacks to Futures.
This changeset adds a onComplete callback to futures and executes the callback in a configurable ExecutorService. By default, this is a fixed size thread pool (threads = num of processors), but configurable through the Factory. Change-Id: I516af74918e57521542a0bad2ff7142b75ab7b13 Reviewed-on: http://review.couchbase.org/28025 Tested-by: Michael Nitschinger <[email protected]> Reviewed-by: Matt Ingenthron <[email protected]>
1 parent 17d412d commit fb6e0f0

17 files changed

+813
-29
lines changed

src/main/java/net/spy/memcached/ConnectionFactory.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Collection;
3131
import java.util.List;
3232
import java.util.concurrent.BlockingQueue;
33+
import java.util.concurrent.ExecutorService;
3334

3435
import net.spy.memcached.auth.AuthDescriptor;
3536
import net.spy.memcached.metrics.MetricCollector;
@@ -82,6 +83,12 @@ MemcachedNode createMemcachedNode(SocketAddress sa, SocketChannel c,
8283
*/
8384
long getOpQueueMaxBlockTime();
8485

86+
/**
87+
* Get the ExecutorService which is used to asynchronously execute listeners
88+
* on futures.
89+
*/
90+
ExecutorService getListenerExecutorService();
91+
8592
/**
8693
* Create a NodeLocator instance for the given list of nodes.
8794
*/

src/main/java/net/spy/memcached/DefaultConnectionFactory.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import java.util.List;
3333
import java.util.concurrent.ArrayBlockingQueue;
3434
import java.util.concurrent.BlockingQueue;
35+
import java.util.concurrent.ExecutorService;
36+
import java.util.concurrent.Executors;
3537
import java.util.concurrent.LinkedBlockingQueue;
3638
import java.util.concurrent.TimeUnit;
3739

@@ -114,6 +116,12 @@ public class DefaultConnectionFactory extends SpyObject implements
114116
*/
115117
public static final MetricType DEFAULT_METRIC_TYPE = MetricType.OFF;
116118

119+
/**
120+
* The ExecutorService in which the listener callbacks will be executed.
121+
*/
122+
public static final ExecutorService DEFAULT_LISTENER_EXECUTOR_SERVICE =
123+
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
124+
117125
protected final int opQueueLen;
118126
private final int readBufSize;
119127
private final HashAlgorithm hashAlg;
@@ -249,6 +257,10 @@ public long getOpQueueMaxBlockTime() {
249257
return DEFAULT_OP_QUEUE_MAX_BLOCK_TIME;
250258
}
251259

260+
public ExecutorService getListenerExecutorService() {
261+
return DEFAULT_LISTENER_EXECUTOR_SERVICE;
262+
}
263+
252264
/*
253265
* (non-Javadoc)
254266
*

src/main/java/net/spy/memcached/MemcachedClient.java

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/**
22
* Copyright (C) 2006-2009 Dustin Sallings
3-
* Copyright (C) 2009-2011 Couchbase, Inc.
3+
* Copyright (C) 2009-2013 Couchbase, Inc.
44
*
55
* Permission is hereby granted, free of charge, to any person obtaining a copy
66
* of this software and associated documentation files (the "Software"), to deal
@@ -41,6 +41,7 @@
4141
import java.util.concurrent.ConcurrentMap;
4242
import java.util.concurrent.CountDownLatch;
4343
import java.util.concurrent.ExecutionException;
44+
import java.util.concurrent.ExecutorService;
4445
import java.util.concurrent.Future;
4546
import java.util.concurrent.TimeUnit;
4647
import java.util.concurrent.TimeoutException;
@@ -154,6 +155,8 @@ public class MemcachedClient extends SpyObject implements MemcachedClientIF,
154155

155156
protected final AuthThreadMonitor authMonitor = new AuthThreadMonitor();
156157

158+
protected final ExecutorService executorService;
159+
157160
/**
158161
* Get a memcache client operating on the specified memcached locations.
159162
*
@@ -205,6 +208,7 @@ public MemcachedClient(ConnectionFactory cf, List<InetSocketAddress> addrs)
205208
assert mconn != null : "Connection factory failed to make a connection";
206209
operationTimeout = cf.getOperationTimeout();
207210
authDescriptor = cf.getAuthDescriptor();
211+
executorService = cf.getListenerExecutorService();
208212
if (authDescriptor != null) {
209213
addObserver(this);
210214
}
@@ -292,7 +296,8 @@ private <T> OperationFuture<Boolean> asyncStore(StoreType storeType,
292296
CachedData co = tc.encode(value);
293297
final CountDownLatch latch = new CountDownLatch(1);
294298
final OperationFuture<Boolean> rv =
295-
new OperationFuture<Boolean>(key, latch, operationTimeout);
299+
new OperationFuture<Boolean>(key, latch, operationTimeout,
300+
executorService);
296301
Operation op = opFact.store(storeType, key, co.getFlags(), exp,
297302
co.getData(), new StoreOperation.Callback() {
298303
public void receivedStatus(OperationStatus val) {
@@ -321,7 +326,7 @@ private <T> OperationFuture<Boolean> asyncCat(ConcatenationType catType,
321326
CachedData co = tc.encode(value);
322327
final CountDownLatch latch = new CountDownLatch(1);
323328
final OperationFuture<Boolean> rv = new OperationFuture<Boolean>(key,
324-
latch, operationTimeout);
329+
latch, operationTimeout, executorService);
325330
Operation op = opFact.cat(catType, cas, key, co.getData(),
326331
new OperationCallback() {
327332
public void receivedStatus(OperationStatus val) {
@@ -367,7 +372,8 @@ public <T> OperationFuture<Boolean> touch(final String key, final int exp,
367372
final Transcoder<T> tc) {
368373
final CountDownLatch latch = new CountDownLatch(1);
369374
final OperationFuture<Boolean> rv =
370-
new OperationFuture<Boolean>(key, latch, operationTimeout);
375+
new OperationFuture<Boolean>(key, latch, operationTimeout,
376+
executorService);
371377

372378
Operation op = opFact.touch(key, exp, new OperationCallback() {
373379
public void receivedStatus(OperationStatus status) {
@@ -587,7 +593,8 @@ public <T> OperationFuture<Boolean> prepend(String key, T val,
587593
CachedData co = tc.encode(value);
588594
final CountDownLatch latch = new CountDownLatch(1);
589595
final OperationFuture<CASResponse> rv =
590-
new OperationFuture<CASResponse>(key, latch, operationTimeout);
596+
new OperationFuture<CASResponse>(key, latch, operationTimeout,
597+
executorService);
591598
Operation op = opFact.cas(StoreType.set, key, casId, co.getFlags(), exp,
592599
co.getData(), new StoreOperation.Callback() {
593600
public void receivedStatus(OperationStatus val) {
@@ -947,7 +954,8 @@ public OperationFuture<Boolean> replace(String key, int exp, Object o) {
947954
public <T> GetFuture<T> asyncGet(final String key, final Transcoder<T> tc) {
948955

949956
final CountDownLatch latch = new CountDownLatch(1);
950-
final GetFuture<T> rv = new GetFuture<T>(latch, operationTimeout, key);
957+
final GetFuture<T> rv = new GetFuture<T>(latch, operationTimeout, key,
958+
executorService);
951959
Operation op = opFact.get(key, new GetOperation.Callback() {
952960
private Future<T> val = null;
953961

@@ -997,7 +1005,8 @@ public <T> OperationFuture<CASValue<T>> asyncGets(final String key,
9971005

9981006
final CountDownLatch latch = new CountDownLatch(1);
9991007
final OperationFuture<CASValue<T>> rv =
1000-
new OperationFuture<CASValue<T>>(key, latch, operationTimeout);
1008+
new OperationFuture<CASValue<T>>(key, latch, operationTimeout,
1009+
executorService);
10011010

10021011
Operation op = opFact.gets(key, new GetsOperation.Callback() {
10031012
private CASValue<T> val = null;
@@ -1228,7 +1237,7 @@ public <T> BulkFuture<Map<String, T>> asyncGetBulk(Iterator<String> keyIter,
12281237

12291238
final CountDownLatch latch = new CountDownLatch(chunks.size());
12301239
final Collection<Operation> ops = new ArrayList<Operation>(chunks.size());
1231-
final BulkGetFuture<T> rv = new BulkGetFuture<T>(m, ops, latch);
1240+
final BulkGetFuture<T> rv = new BulkGetFuture<T>(m, ops, latch, executorService);
12321241

12331242
GetOperation.Callback cb = new GetOperation.Callback() {
12341243
@SuppressWarnings("synthetic-access")
@@ -1396,7 +1405,7 @@ public <T> OperationFuture<CASValue<T>> asyncGetAndTouch(final String key,
13961405
final int exp, final Transcoder<T> tc) {
13971406
final CountDownLatch latch = new CountDownLatch(1);
13981407
final OperationFuture<CASValue<T>> rv = new OperationFuture<CASValue<T>>(
1399-
key, latch, operationTimeout);
1408+
key, latch, operationTimeout, executorService);
14001409

14011410
Operation op = opFact.getAndTouch(key, exp,
14021411
new GetAndTouchOperation.Callback() {
@@ -1843,7 +1852,7 @@ private OperationFuture<Long> asyncMutate(Mutator m, String key, long by,
18431852
long def, int exp) {
18441853
final CountDownLatch latch = new CountDownLatch(1);
18451854
final OperationFuture<Long> rv =
1846-
new OperationFuture<Long>(key, latch, operationTimeout);
1855+
new OperationFuture<Long>(key, latch, operationTimeout, executorService);
18471856
Operation op = opFact.mutate(m, key, by, def, exp,
18481857
new OperationCallback() {
18491858
public void receivedStatus(OperationStatus s) {
@@ -2023,7 +2032,7 @@ public OperationFuture<Boolean> delete(String key) {
20232032
public OperationFuture<Boolean> delete(String key, long cas) {
20242033
final CountDownLatch latch = new CountDownLatch(1);
20252034
final OperationFuture<Boolean> rv = new OperationFuture<Boolean>(key,
2026-
latch, operationTimeout);
2035+
latch, operationTimeout, executorService);
20272036

20282037
DeleteOperation.Callback callback = new DeleteOperation.Callback() {
20292038
public void receivedStatus(OperationStatus s) {
@@ -2082,7 +2091,7 @@ public void complete() {
20822091
});
20832092

20842093
return new OperationFuture<Boolean>(null, blatch, flushResult,
2085-
operationTimeout) {
2094+
operationTimeout, executorService) {
20862095
@Override
20872096
public boolean cancel(boolean ign) {
20882097
boolean rv = false;

src/main/java/net/spy/memcached/MemcachedConnection.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.TreeMap;
5050
import java.util.concurrent.ConcurrentLinkedQueue;
5151
import java.util.concurrent.CountDownLatch;
52+
import java.util.concurrent.ExecutorService;
5253
import java.util.concurrent.TimeUnit;
5354

5455
import net.spy.memcached.compat.SpyThread;
@@ -131,6 +132,7 @@ public class MemcachedConnection extends SpyThread {
131132
private final Collection<Operation> retryOps;
132133
protected final ConcurrentLinkedQueue<MemcachedNode> nodesToShutdown;
133134
private final boolean verifyAliveOnConnect;
135+
private final ExecutorService listenerExecutorService;
134136

135137
protected final MetricCollector metrics;
136138
protected final MetricType metricType;
@@ -158,6 +160,7 @@ public MemcachedConnection(int bufSize, ConnectionFactory f,
158160
selector = Selector.open();
159161
retryOps = new ArrayList<Operation>();
160162
nodesToShutdown = new ConcurrentLinkedQueue<MemcachedNode>();
163+
listenerExecutorService = f.getListenerExecutorService();
161164
this.bufSize = bufSize;
162165
this.connectionFactory = f;
163166

@@ -485,7 +488,8 @@ private void handleIO(SelectionKey sk) {
485488
// Test to see if it's truly alive, could be a hung process, OS
486489
final CountDownLatch latch = new CountDownLatch(1);
487490
final OperationFuture<Boolean> rv =
488-
new OperationFuture<Boolean>("noop", latch, 2500);
491+
new OperationFuture<Boolean>("noop", latch, 2500,
492+
listenerExecutorService);
489493
NoopOperation testOp = opFact.noop(new OperationCallback() {
490494
public void receivedStatus(OperationStatus status) {
491495
rv.set(status.isSuccess(), status);

0 commit comments

Comments
 (0)