Skip to content

Commit 60a1e05

Browse files
Mike WiederholdMichael Wiederhold
authored andcommitted
Improved performance of write queue processing during timeouts
There were two major issues here. First if there are a lot of timeouts in a row then we processing the write queue we removed them only one at a time. We now are able to process the entire write queue each time the handleIO function is called. The second issue was that it is possible to send timed out requests to the server. This would happen if there was one not timed out request followed by some timed out requests. Since in the while loop we don't check to see if an operation is timed out (or cancelled) we would send it anyways. This commit addresses both of these issues by adding a getNextWritable() op which only operations that should be written. If the write queue is empty it returns null. Change-Id: I427e5a47cf2e8ad89e3f59c895f3ef8e0ad36550 Reviewed-on: http://review.couchbase.org/9429 Reviewed-by: Michael Wiederhold <[email protected]> Tested-by: Michael Wiederhold <[email protected]>
1 parent c02c042 commit 60a1e05

File tree

1 file changed

+25
-26
lines changed

1 file changed

+25
-26
lines changed

src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -156,26 +156,8 @@ private boolean preparePending() {
156156
public final void fillWriteBuffer(boolean shouldOptimize) {
157157
if(toWrite == 0 && readQ.remainingCapacity() > 0) {
158158
getWbuf().clear();
159-
Operation o=getCurrentWriteOp();
159+
Operation o=getNextWritableOp();
160160

161-
if (o != null && o.getState() == OperationState.WRITE_QUEUED) {
162-
if (o.isCancelled()) {
163-
getLogger().debug("Not writing cancelled op.");
164-
Operation cancelledOp = removeCurrentWriteOp();
165-
assert o == cancelledOp;
166-
return;
167-
} else if (o.isTimedOut(defaultOpTimeout)) {
168-
getLogger().debug("Not writing timed out op.");
169-
Operation timedOutOp = removeCurrentWriteOp();
170-
assert o == timedOutOp;
171-
return;
172-
} else {
173-
o.writing();
174-
if (!(o instanceof TapAckOperationImpl)) {
175-
readQ.add(o);
176-
}
177-
}
178-
}
179161
while(o != null && toWrite < getWbuf().capacity()) {
180162
assert o.getState() == OperationState.WRITING;
181163

@@ -197,13 +179,7 @@ public final void fillWriteBuffer(boolean shouldOptimize) {
197179
optimize();
198180
}
199181

200-
o=getCurrentWriteOp();
201-
if (o != null) {
202-
o.writing();
203-
if (!(o instanceof TapAckOperationImpl)) {
204-
readQ.add(o);
205-
}
206-
}
182+
o=getNextWritableOp();
207183
}
208184
toWrite += bytesToCopy;
209185
}
@@ -218,6 +194,29 @@ public final void fillWriteBuffer(boolean shouldOptimize) {
218194
}
219195
}
220196

197+
private Operation getNextWritableOp() {
198+
Operation o = getCurrentWriteOp();
199+
while (o != null && o.getState() == OperationState.WRITE_QUEUED) {
200+
if (o.isCancelled()) {
201+
getLogger().debug("Not writing cancelled op.");
202+
Operation cancelledOp = removeCurrentWriteOp();
203+
assert o == cancelledOp;
204+
} else if (o.isTimedOut(defaultOpTimeout)) {
205+
getLogger().debug("Not writing timed out op.");
206+
Operation timedOutOp = removeCurrentWriteOp();
207+
assert o == timedOutOp;
208+
} else {
209+
o.writing();
210+
if (!(o instanceof TapAckOperationImpl)) {
211+
readQ.add(o);
212+
}
213+
return o;
214+
}
215+
o = getCurrentWriteOp();
216+
}
217+
return o;
218+
}
219+
221220
/* (non-Javadoc)
222221
* @see net.spy.memcached.MemcachedNode#transitionWriteItem()
223222
*/

0 commit comments

Comments
 (0)