Skip to content

Commit 0e8c594

Browse files
daschlMichael Nitschinger
authored andcommitted
SPY-161: Cascade op cancellations.
Motivation ---------- When an operation is redistributed (be it because of reconnects and/or "not my vbucket" responses, it gets cloned. The problem is that the initial callback always points to the original operation, not the cloned one. While the clone gets the callback, the other way round (like cancelling from user-code) will never trickle through to the cloned ops. Modifications ------------- In order to fix this issue, code has been added which connects the original op with the cloned ones, by keeping a list of cloned operations in the original op. To prevent overly long stack in malicious scenarios, an overall clone-cap of 100 clones is introduced. If this cap is reached, the operation will not be cloned further but cancelled. Result ------ With this modifications, timeouts and cancellations from user code will trickle through to the clone operations, prevent issues when cloned operations are around "forever". Change-Id: I304534a62202b705d78ca5f775f47afcd6be238a Reviewed-on: http://review.couchbase.org/34883 Reviewed-by: Michael Nitschinger <[email protected]> Tested-by: Michael Nitschinger <[email protected]>
1 parent 60b83de commit 0e8c594

File tree

3 files changed

+81
-0
lines changed

3 files changed

+81
-0
lines changed

src/main/java/net/spy/memcached/MemcachedConnection.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,12 @@ public class MemcachedConnection extends SpyThread {
9393
*/
9494
private static final int EXCESSIVE_EMPTY = 0x1000000;
9595

96+
/**
97+
* If an operation gets cloned more than this ceiling, cancel it for
98+
* safety reasons.
99+
*/
100+
private static final int MAX_CLONE_COUNT = 100;
101+
96102
private static final String RECON_QUEUE_METRIC =
97103
"[MEM] Reconnecting Nodes (ReconnectQueue)";
98104
private static final String SHUTD_QUEUE_METRIC =
@@ -992,6 +998,13 @@ public void redistributeOperation(Operation op) {
992998
return;
993999
}
9941000

1001+
if (op.getCloneCount() >= MAX_CLONE_COUNT) {
1002+
getLogger().warn("Cancelling operation " + op + "because it has been "
1003+
+ "retried (cloned) more than " + MAX_CLONE_COUNT + "times.");
1004+
op.cancel();
1005+
return;
1006+
}
1007+
9951008
// The operation gets redistributed but has never been actually written,
9961009
// it we just straight re-add it without cloning.
9971010
if (op.getState() == OperationState.WRITE_QUEUED) {
@@ -1011,6 +1024,8 @@ public void redistributeOperation(Operation op) {
10111024
KeyedOperation newKeyedOp = (KeyedOperation) newop;
10121025
for (String k : newKeyedOp.getKeys()) {
10131026
addOperation(k, newop);
1027+
op.addClone(newop);
1028+
newop.setCloneCount(op.getCloneCount()+1);
10141029
}
10151030
} else {
10161031
newop.cancel();

src/main/java/net/spy/memcached/ops/Operation.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,4 +165,23 @@ public interface Operation {
165165
* @return the raw error message content.
166166
*/
167167
byte[] getErrorMsg();
168+
169+
/**
170+
* Add the clone from this operation.
171+
*
172+
* @param op the cloned operation.
173+
*/
174+
void addClone(Operation op);
175+
176+
/**
177+
* Returns the number of times this operation has been cloned.
178+
*
179+
* @return the number of clones.
180+
*/
181+
int getCloneCount();
182+
183+
/**
184+
* Sets the clone count for this operation.
185+
*/
186+
void setCloneCount(int count);
168187
}

src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,13 @@
2525

2626
import java.io.IOException;
2727
import java.nio.ByteBuffer;
28+
import java.util.ArrayList;
2829
import java.util.Collection;
30+
import java.util.Collections;
2931
import java.util.HashSet;
32+
import java.util.Iterator;
33+
import java.util.List;
34+
import java.util.concurrent.atomic.AtomicLong;
3035

3136
import net.spy.memcached.MemcachedNode;
3237
import net.spy.memcached.compat.SpyObject;
@@ -65,6 +70,18 @@ public abstract class BaseOperationImpl extends SpyObject implements Operation {
6570
new HashSet<MemcachedNode>();
6671
private long writeCompleteTimestamp;
6772

73+
/**
74+
* If the operation gets cloned, the reference is used to cascade cancellations
75+
* and timeouts.
76+
*/
77+
private List<Operation> clones =
78+
Collections.synchronizedList(new ArrayList<Operation>());
79+
80+
/**
81+
* Number of clones for this operation.
82+
*/
83+
private volatile int cloneCount;
84+
6885
public BaseOperationImpl() {
6986
super();
7087
creationTime = System.nanoTime();
@@ -98,6 +115,14 @@ public final OperationException getException() {
98115

99116
public final synchronized void cancel() {
100117
cancelled = true;
118+
119+
synchronized (clones) {
120+
Iterator<Operation> i = clones.iterator();
121+
while(i.hasNext()) {
122+
i.next().cancel();
123+
}
124+
}
125+
101126
wasCancelled();
102127
callback.receivedStatus(CANCELLED);
103128
callback.complete();
@@ -193,6 +218,14 @@ public void setHandlingNode(MemcachedNode to) {
193218
@Override
194219
public synchronized void timeOut() {
195220
timedout = true;
221+
222+
synchronized (clones) {
223+
Iterator<Operation> i = clones.iterator();
224+
while(i.hasNext()) {
225+
i.next().timeOut();
226+
}
227+
}
228+
196229
callback.receivedStatus(TIMED_OUT);
197230
callback.complete();
198231
}
@@ -228,4 +261,18 @@ public long getWriteCompleteTimestamp() {
228261
return writeCompleteTimestamp;
229262
}
230263

264+
@Override
265+
public void addClone(Operation op) {
266+
clones.add(op);
267+
}
268+
269+
@Override
270+
public int getCloneCount() {
271+
return cloneCount;
272+
}
273+
274+
@Override
275+
public void setCloneCount(int count) {
276+
cloneCount = count;
277+
}
231278
}

0 commit comments

Comments
 (0)