Skip to content

Commit e69df0c

Browse files
Mike WiederholdMichael Wiederhold
authored andcommitted
Operations can't timeout when writing to the write buffer.
We implemented a mechanism for operations to be able to timeout while waiting to be written to a socket, but failed to take into account the fact that an item can timeout while in the middle of writing to that buffer. This commit adds a new OperationState called WRITE_QUEUED that specifies that the operation is waiting to be written to the write buffer. When an operation begins writing to the write buffer it goes into the WRITING state. When an operation is in the WRITING state it cannot time out Change-Id: Ib4ed9e1764c31e9270f79206b11574d98638e989 Reviewed-on: http://review.couchbase.org/9424 Reviewed-by: Michael Wiederhold <[email protected]> Tested-by: Michael Wiederhold <[email protected]>
1 parent 051d39e commit e69df0c

11 files changed

+42
-24
lines changed

src/main/java/net/spy/memcached/MemcachedClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1941,7 +1941,7 @@ public boolean cancel(boolean ign) {
19411941
boolean rv=false;
19421942
for(Operation op : ops) {
19431943
op.cancel();
1944-
rv |= op.getState() == OperationState.WRITING;
1944+
rv |= op.getState() == OperationState.WRITE_QUEUED;
19451945
}
19461946
return rv;
19471947
}

src/main/java/net/spy/memcached/internal/BulkGetFuture.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public BulkGetFuture(Map<String, Future<T>> m,
4343
public boolean cancel(boolean ign) {
4444
boolean rv=false;
4545
for(Operation op : ops) {
46-
rv |= op.getState() == OperationState.WRITING;
46+
rv |= op.getState() == OperationState.WRITE_QUEUED;
4747
op.cancel();
4848
}
4949
for (Future<T> v : rvMap.values()) {
@@ -174,4 +174,4 @@ public boolean isDone() {
174174
public boolean isTimeout() {
175175
return timeout;
176176
}
177-
}
177+
}

src/main/java/net/spy/memcached/internal/OperationFuture.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public boolean cancel(boolean ign) {
4848
op.cancel();
4949
// This isn't exactly correct, but it's close enough. If we're in
5050
// a writing state, we *probably* haven't started.
51-
return op.getState() == OperationState.WRITING;
51+
return op.getState() == OperationState.WRITE_QUEUED;
5252
}
5353

5454
public T get() throws InterruptedException, ExecutionException {

src/main/java/net/spy/memcached/ops/BaseOperationFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ private String first(Collection<String> keys) {
2121
}
2222

2323
public Collection<Operation> clone(KeyedOperation op) {
24-
assert (op.getState() == OperationState.WRITING || op.getState() == OperationState.RETRY)
24+
assert (op.getState() == OperationState.WRITE_QUEUED || op.getState() == OperationState.RETRY)
2525
: "Who passed me an operation in the " + op.getState() + "state?";
2626
assert !op.isCancelled() : "Attempted to clone a canceled op";
2727
assert !op.hasErrored() : "Attempted to clone an errored op";

src/main/java/net/spy/memcached/ops/Operation.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ public interface Operation {
4646
*/
4747
ByteBuffer getBuffer();
4848

49+
/**
50+
* Invoked when we start writing all of the bytes from this operation to
51+
* the sockets write buffer.
52+
*/
53+
void writing();
54+
4955
/**
5056
* Invoked after having written all of the bytes from the supplied output
5157
* buffer.

src/main/java/net/spy/memcached/ops/OperationState.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44
* State of this operation.
55
*/
66
public enum OperationState {
7+
/**
8+
* State indicating this operation is waiting to be written to the server.
9+
*/
10+
WRITE_QUEUED,
711
/**
812
* State indicating this operation is writing data to the server.
913
*/

src/main/java/net/spy/memcached/protocol/BaseOperationImpl.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ public abstract class BaseOperationImpl extends SpyObject implements Operation {
2525
*/
2626
public static final OperationStatus CANCELLED =
2727
new CancelledOperationStatus();
28-
private OperationState state = OperationState.WRITING;
28+
private OperationState state = OperationState.WRITE_QUEUED;
2929
private ByteBuffer cmd = null;
3030
private boolean cancelled = false;
3131
private OperationException exception = null;
@@ -104,7 +104,7 @@ protected final void transitionState(OperationState newState) {
104104
getLogger().debug("Transitioned state from %s to %s", state, newState);
105105
state=newState;
106106
// Discard our buffer when we no longer need it.
107-
if(state != OperationState.WRITING) {
107+
if(state != OperationState.WRITE_QUEUED && state != OperationState.WRITING) {
108108
cmd=null;
109109
}
110110
if(state == OperationState.COMPLETE) {
@@ -116,6 +116,10 @@ protected final void transitionState(OperationState newState) {
116116
}
117117
}
118118

119+
public final void writing() {
120+
transitionState(OperationState.WRITING);
121+
}
122+
119123
public final void writeComplete() {
120124
transitionState(OperationState.READING);
121125
}

src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -157,17 +157,21 @@ public final void fillWriteBuffer(boolean shouldOptimize) {
157157
if(toWrite == 0 && readQ.remainingCapacity() > 0) {
158158
getWbuf().clear();
159159
Operation o=getCurrentWriteOp();
160-
if (o != null && (o.isCancelled())) {
161-
getLogger().debug("Not writing cancelled op.");
162-
Operation cancelledOp = removeCurrentWriteOp();
163-
assert o == cancelledOp;
164-
return;
165-
}
166-
if (o != null && o.isTimedOut(defaultOpTimeout)) {
167-
getLogger().debug("Not writing timed out op.");
168-
Operation timedOutOp = removeCurrentWriteOp();
169-
assert o == timedOutOp;
170-
return;
160+
161+
if (o != null && o.getState() == OperationState.WRITE_QUEUED) {
162+
if (o.isCancelled()) {
163+
getLogger().debug("Not writing cancelled op.");
164+
Operation cancelledOp = removeCurrentWriteOp();
165+
assert o == cancelledOp;
166+
return;
167+
} else if (o.isTimedOut(defaultOpTimeout)) {
168+
getLogger().debug("Not writing timed out op.");
169+
Operation timedOutOp = removeCurrentWriteOp();
170+
assert o == timedOutOp;
171+
return;
172+
} else {
173+
o.writing();
174+
}
171175
}
172176
while(o != null && toWrite < getWbuf().capacity()) {
173177
assert o.getState() == OperationState.WRITING;
@@ -199,6 +203,9 @@ public final void fillWriteBuffer(boolean shouldOptimize) {
199203
}
200204

201205
o=getCurrentWriteOp();
206+
if (o != null) {
207+
o.writing();
208+
}
202209
}
203210
toWrite += bytesToCopy;
204211
}
@@ -517,5 +524,4 @@ public final void setupForAuth() {
517524
authLatch = new CountDownLatch(0);
518525
}
519526
}
520-
521527
}

src/main/java/net/spy/memcached/protocol/ascii/AsciiMemcachedNodeImpl.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,11 @@ protected void optimize() {
4141

4242
// Initialize the new mega get
4343
optimizedOp.initialize();
44-
assert optimizedOp.getState() == OperationState.WRITING;
44+
assert optimizedOp.getState() == OperationState.WRITE_QUEUED;
4545
ProxyCallback pcb=(ProxyCallback) og.getCallback();
4646
getLogger().debug("Set up %s with %s keys and %s callbacks",
4747
this, pcb.numKeys(), pcb.numCallbacks());
4848
}
4949
}
5050
}
51-
5251
}

src/main/java/net/spy/memcached/protocol/binary/BinaryMemcachedNodeImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ private void optimizeGets() {
5656

5757
// Initialize the new mega get
5858
optimizedOp.initialize();
59-
assert optimizedOp.getState() == OperationState.WRITING;
59+
assert optimizedOp.getState() == OperationState.WRITE_QUEUED;
6060
ProxyCallback pcb=(ProxyCallback) og.getCallback();
6161
getLogger().debug("Set up %s with %s keys and %s callbacks",
6262
this, pcb.numKeys(), pcb.numCallbacks());
@@ -83,7 +83,7 @@ private void optimizeSets() {
8383

8484
// Initialize the new mega set
8585
optimizedOp.initialize();
86-
assert optimizedOp.getState() == OperationState.WRITING;
86+
assert optimizedOp.getState() == OperationState.WRITE_QUEUED;
8787
}
8888
}
8989
}

0 commit comments

Comments
 (0)