Skip to content

Commit a9035c9

Browse files
Daniel MartinMichael Wiederhold
authored andcommitted
Fix concurrent access to operations objects, especially near timeouts
We discovered that having one thread call get() on futures with a timeout much shorter than the timeout built into the client could (after a while) reliably kill off the IO thread with a NullPointerException. After restarting with assertions enabled, we found many other ways to make the IO thread die relating mostly to operations that were canceled or timed out in one thread while they were being used in another thread. The solution to this is to make "timing out" an operation something similar to "cancel" - that is, not a state of the operation's state machine, but just a flag. Then, since this flag (as with the cancelled flag) can be read and set from multiple threads, synchronize the methods that do that. Ideally, transitionState and getState would never be called outside the IO thread; however, since at least getState is currently, both getState and transitionState should then also be synchronized. With these changes, we don't end up killing off the IO thread anymore. Change-Id: Ice7bac2ba1e2dbcd2004f5d7920944eef6bbbcb8 Reviewed-on: http://review.couchbase.org/9431 Reviewed-by: Michael Wiederhold <[email protected]> Tested-by: Michael Wiederhold <[email protected]>
1 parent 60a1e05 commit a9035c9

File tree

4 files changed

+73
-74
lines changed

4 files changed

+73
-74
lines changed

src/main/java/net/spy/memcached/MemcachedConnection.java

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -501,27 +501,27 @@ private void handleReads(SelectionKey sk, MemcachedNode qa)
501501
if(currentOp == null) {
502502
throw new IllegalStateException("No read operation.");
503503
}
504-
currentOp.readFromBuffer(rbuf);
505-
if(currentOp.getState() == OperationState.COMPLETE) {
506-
getLogger().debug(
507-
"Completed read op: %s and giving the next %d bytes",
508-
currentOp, rbuf.remaining());
509-
Operation op=qa.removeCurrentReadOp();
510-
assert op == currentOp
511-
: "Expected to pop " + currentOp + " got " + op;
512-
currentOp=qa.getCurrentReadOp();
513-
} else if (currentOp.getState() == OperationState.RETRY) {
514-
getLogger().warn(
515-
"Reschedule read op due to NOT_MY_VBUCKET error: %s ",
516-
currentOp);
517-
((VBucketAware) currentOp).addNotMyVbucketNode(currentOp.getHandlingNode());
518-
Operation op=qa.removeCurrentReadOp();
519-
assert op == currentOp
520-
: "Expected to pop " + currentOp + " got " + op;
521-
retryOps.add(currentOp);
522-
currentOp=qa.getCurrentReadOp();
523-
504+
synchronized(currentOp) {
505+
currentOp.readFromBuffer(rbuf);
506+
if(currentOp.getState() == OperationState.COMPLETE) {
507+
getLogger().debug(
508+
"Completed read op: %s and giving the next %d bytes",
509+
currentOp, rbuf.remaining());
510+
Operation op=qa.removeCurrentReadOp();
511+
assert op == currentOp
512+
: "Expected to pop " + currentOp + " got " + op;
513+
} else if (currentOp.getState() == OperationState.RETRY) {
514+
getLogger().warn(
515+
"Reschedule read op due to NOT_MY_VBUCKET error: %s ",
516+
currentOp);
517+
((VBucketAware) currentOp).addNotMyVbucketNode(currentOp.getHandlingNode());
518+
Operation op=qa.removeCurrentReadOp();
519+
assert op == currentOp
520+
: "Expected to pop " + currentOp + " got " + op;
521+
retryOps.add(currentOp);
522+
}
524523
}
524+
currentOp=qa.getCurrentReadOp();
525525
}
526526
rbuf.clear();
527527
read=channel.read(rbuf);
@@ -597,6 +597,9 @@ private void cancelOperations(Collection<Operation> ops) {
597597

598598
private void redistributeOperations(Collection<Operation> ops) {
599599
for(Operation op : ops) {
600+
if (op.isCancelled() || op.isTimedOut()) {
601+
continue;
602+
}
600603
if(op instanceof KeyedOperation) {
601604
KeyedOperation ko = (KeyedOperation)op;
602605
int added = 0;

src/main/java/net/spy/memcached/ops/OperationState.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,6 @@ public enum OperationState {
2020
* State indicating this operation is complete.
2121
*/
2222
COMPLETE,
23-
/**
24-
* State indicating this operation timed out without completing.
25-
*/
26-
TIMEDOUT,
2723
/**
2824
* State indicating this operation needs to be resent. Typically
2925
* this means vbucket hashing and there is a topology change.

src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ protected void setCallback(OperationCallback to) {
5555
callback=to;
5656
}
5757

58-
public final boolean isCancelled() {
58+
public final synchronized boolean isCancelled() {
5959
return cancelled;
6060
}
6161

@@ -67,7 +67,7 @@ public final OperationException getException() {
6767
return exception;
6868
}
6969

70-
public final void cancel() {
70+
public final synchronized void cancel() {
7171
cancelled=true;
7272
wasCancelled();
7373
callback.complete();
@@ -80,18 +80,18 @@ protected void wasCancelled() {
8080
getLogger().debug("was cancelled.");
8181
}
8282

83-
public final OperationState getState() {
83+
public final synchronized OperationState getState() {
8484
return state;
8585
}
8686

87-
public final ByteBuffer getBuffer() {
87+
public final synchronized ByteBuffer getBuffer() {
8888
return cmd;
8989
}
9090

9191
/**
9292
* Set the write buffer for this operation.
9393
*/
94-
protected final void setBuffer(ByteBuffer to) {
94+
protected final synchronized void setBuffer(ByteBuffer to) {
9595
assert to != null : "Trying to set buffer to null";
9696
cmd=to;
9797
cmd.mark();
@@ -100,7 +100,7 @@ protected final void setBuffer(ByteBuffer to) {
100100
/**
101101
* Transition the state of this operation to the given state.
102102
*/
103-
protected final void transitionState(OperationState newState) {
103+
protected final synchronized void transitionState(OperationState newState) {
104104
getLogger().debug("Transitioned state from %s to %s", state, newState);
105105
state=newState;
106106
// Discard our buffer when we no longer need it.
@@ -110,10 +110,6 @@ protected final void transitionState(OperationState newState) {
110110
if(state == OperationState.COMPLETE) {
111111
callback.complete();
112112
}
113-
if(state == OperationState.TIMEDOUT) {
114-
cmd = null;
115-
callback.complete();
116-
}
117113
}
118114

119115
public final void writing() {
@@ -160,26 +156,26 @@ public void setHandlingNode(MemcachedNode to) {
160156
}
161157

162158
@Override
163-
public void timeOut() {
159+
public synchronized void timeOut() {
164160
assert (state != OperationState.READING || state != OperationState.COMPLETE);
165-
this.transitionState(OperationState.TIMEDOUT);
166161
timedout = true;
162+
callback.complete();
167163
}
168164

169165
@Override
170-
public boolean isTimedOut() {
166+
public synchronized boolean isTimedOut() {
171167
return timedout;
172168
}
173169

174170
@Override
175-
public boolean isTimedOut(long ttlMillis) {
171+
public synchronized boolean isTimedOut(long ttlMillis) {
176172
long elapsed = System.nanoTime();
177173
long ttlNanos = ttlMillis * 1000 * 1000;
178174
if (elapsed - creationTime > ttlNanos) {
179175
assert (state != OperationState.READING || state != OperationState.COMPLETE);
180-
this.transitionState(OperationState.TIMEDOUT);
181176
timedOutUnsent = true;
182177
timedout = true;
178+
callback.complete();
183179
} else {
184180
// timedout would be false, but we cannot allow you to untimeout an operation
185181
if (timedout) {

src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java

Lines changed: 39 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -159,29 +159,31 @@ public final void fillWriteBuffer(boolean shouldOptimize) {
159159
Operation o=getNextWritableOp();
160160

161161
while(o != null && toWrite < getWbuf().capacity()) {
162-
assert o.getState() == OperationState.WRITING;
163-
164-
ByteBuffer obuf=o.getBuffer();
165-
assert obuf != null : "Didn't get a write buffer from " + o;
166-
int bytesToCopy=Math.min(getWbuf().remaining(),
167-
obuf.remaining());
168-
byte b[]=new byte[bytesToCopy];
169-
obuf.get(b);
170-
getWbuf().put(b);
171-
getLogger().debug("After copying stuff from %s: %s",
172-
o, getWbuf());
173-
if(!o.getBuffer().hasRemaining()) {
174-
o.writeComplete();
175-
transitionWriteItem();
176-
177-
preparePending();
178-
if(shouldOptimize) {
179-
optimize();
162+
synchronized(o) {
163+
assert o.getState() == OperationState.WRITING;
164+
165+
ByteBuffer obuf=o.getBuffer();
166+
assert obuf != null : "Didn't get a write buffer from " + o;
167+
int bytesToCopy=Math.min(getWbuf().remaining(),
168+
obuf.remaining());
169+
byte b[]=new byte[bytesToCopy];
170+
obuf.get(b);
171+
getWbuf().put(b);
172+
getLogger().debug("After copying stuff from %s: %s",
173+
o, getWbuf());
174+
if(!o.getBuffer().hasRemaining()) {
175+
o.writeComplete();
176+
transitionWriteItem();
177+
178+
preparePending();
179+
if(shouldOptimize) {
180+
optimize();
181+
}
182+
183+
o=getNextWritableOp();
180184
}
181-
182-
o=getNextWritableOp();
185+
toWrite += bytesToCopy;
183186
}
184-
toWrite += bytesToCopy;
185187
}
186188
getWbuf().flip();
187189
assert toWrite <= getWbuf().capacity()
@@ -197,22 +199,24 @@ public final void fillWriteBuffer(boolean shouldOptimize) {
197199
private Operation getNextWritableOp() {
198200
Operation o = getCurrentWriteOp();
199201
while (o != null && o.getState() == OperationState.WRITE_QUEUED) {
200-
if (o.isCancelled()) {
201-
getLogger().debug("Not writing cancelled op.");
202-
Operation cancelledOp = removeCurrentWriteOp();
203-
assert o == cancelledOp;
204-
} else if (o.isTimedOut(defaultOpTimeout)) {
205-
getLogger().debug("Not writing timed out op.");
206-
Operation timedOutOp = removeCurrentWriteOp();
207-
assert o == timedOutOp;
208-
} else {
209-
o.writing();
210-
if (!(o instanceof TapAckOperationImpl)) {
211-
readQ.add(o);
202+
synchronized(o) {
203+
if (o.isCancelled()) {
204+
getLogger().debug("Not writing cancelled op.");
205+
Operation cancelledOp = removeCurrentWriteOp();
206+
assert o == cancelledOp;
207+
} else if (o.isTimedOut(defaultOpTimeout)) {
208+
getLogger().debug("Not writing timed out op.");
209+
Operation timedOutOp = removeCurrentWriteOp();
210+
assert o == timedOutOp;
211+
} else {
212+
o.writing();
213+
if (!(o instanceof TapAckOperationImpl)) {
214+
readQ.add(o);
215+
}
216+
return o;
212217
}
213-
return o;
218+
o = getCurrentWriteOp();
214219
}
215-
o = getCurrentWriteOp();
216220
}
217221
return o;
218222
}

0 commit comments

Comments
 (0)