Skip to content

Commit

Permalink
ZOOKEEPER-4573: Encapsulate request bytebuffer in Request
Browse files Browse the repository at this point in the history
This patch is based on apache#1903.

This closes apache#1903.

Author: tison <[email protected]>

Reviewers: Andor Molnar <[email protected]>, Mate Szalay-Beko <[email protected]>

Closes apache#1904 from tisonkun/encapsulate-request-bytebuffer
  • Loading branch information
tisonkun authored and Mate Szalay-Beko committed Jul 12, 2022
1 parent de7c586 commit a7e4dea
Show file tree
Hide file tree
Showing 18 changed files with 156 additions and 166 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,27 +47,27 @@ public class BinaryInputArchive implements InputArchive {
}
}

private DataInput in;
private int maxBufferSize;
private int extraMaxBufferSize;
private final DataInput in;
private final int maxBufferSize;
private final int extraMaxBufferSize;

public static BinaryInputArchive getArchive(InputStream strm) {
return new BinaryInputArchive(new DataInputStream(strm));
public static BinaryInputArchive getArchive(InputStream stream) {
return new BinaryInputArchive(new DataInputStream(stream));
}

private static class BinaryIndex implements Index {
private int nelems;
private int n;

BinaryIndex(int nelems) {
this.nelems = nelems;
this.n = nelems;
}

public boolean done() {
return (nelems <= 0);
return (n <= 0);
}

public void incr() {
nelems--;
n--;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ void readConnectResult() throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ConnectResponse conRsp = protocolManager.deserializeConnectResponse(bbia);
if (protocolManager.isReadonlyAvailable()) {
if (!protocolManager.isReadonlyAvailable()) {
LOG.warn("Connected to an old server; r-o mode will be unavailable");
}
this.sessionId = conRsp.getSessionId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.zookeeper.proto.DeleteRequest;
import org.apache.zookeeper.proto.SetACLRequest;
import org.apache.zookeeper.proto.SetDataRequest;
import org.apache.zookeeper.server.ByteBufferInputStream;
import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
import org.apache.zookeeper.server.Request;
import org.slf4j.Logger;
Expand Down Expand Up @@ -127,8 +126,7 @@ public static void addAuditLog(Request request, ProcessTxnResult txnResult, bool
}

private static void deserialize(Request request, Record record) throws IOException {
request.request.rewind();
ByteBufferInputStream.byteBuffer2Record(request.request.slice(), record);
request.readRequestRecord(record);
}

private static Result getResult(ProcessTxnResult rc, boolean failedTxn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import javax.annotation.Nonnull;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.Record;

public class ByteBufferInputStream extends InputStream {

ByteBuffer bb;
private final ByteBuffer bb;

public ByteBufferInputStream(ByteBuffer bb) {
this.bb = bb;
Expand All @@ -46,7 +47,7 @@ public int available() throws IOException {
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
public int read(@Nonnull byte[] b, int off, int len) throws IOException {
if (bb.remaining() == 0) {
return -1;
}
Expand All @@ -58,7 +59,7 @@ public int read(byte[] b, int off, int len) throws IOException {
}

@Override
public int read(byte[] b) throws IOException {
public int read(@Nonnull byte[] b) throws IOException {
return read(b, 0, b.length);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,27 +21,33 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import javax.annotation.Nonnull;
import org.apache.jute.BinaryOutputArchive;
import org.apache.jute.Record;

public class ByteBufferOutputStream extends OutputStream {

ByteBuffer bb;
private final ByteBuffer bb;

public ByteBufferOutputStream(ByteBuffer bb) {
this.bb = bb;
}

@Override
public void write(int b) throws IOException {
bb.put((byte) b);
}

@Override
public void write(byte[] b) throws IOException {
public void write(@Nonnull byte[] b) throws IOException {
bb.put(b);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
public void write(@Nonnull byte[] b, int off, int len) throws IOException {
bb.put(b, off, len);
}

public static void record2ByteBuffer(Record record, ByteBuffer bb) throws IOException {
BinaryOutputArchive oa;
oa = BinaryOutputArchive.getArchive(new ByteBufferOutputStream(bb));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import static java.nio.charset.StandardCharsets.UTF_8;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -271,7 +270,7 @@ public void processRequest(Request request) {
case OpCode.multiRead: {
lastOp = "MLTR";
MultiOperationRecord multiReadRecord = new MultiOperationRecord();
ByteBufferInputStream.byteBuffer2Record(request.request, multiReadRecord);
request.readRequestRecord(multiReadRecord);
rsp = new MultiResponse();
OpResult subResult;
for (Op readOp : multiReadRecord) {
Expand Down Expand Up @@ -350,7 +349,7 @@ public void processRequest(Request request) {
case OpCode.sync: {
lastOp = "SYNC";
SyncRequest syncRequest = new SyncRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, syncRequest);
request.readRequestRecord(syncRequest);
rsp = new SyncResponse(syncRequest.getPath());
requestPathMetricsCollector.registerRequest(request.type, syncRequest.getPath());
break;
Expand All @@ -365,7 +364,7 @@ public void processRequest(Request request) {
lastOp = "EXIS";
// TODO we need to figure out the security requirement for this!
ExistsRequest existsRequest = new ExistsRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest);
request.readRequestRecord(existsRequest);
path = existsRequest.getPath();
if (path.indexOf('\0') != -1) {
throw new KeeperException.BadArgumentsException();
Expand All @@ -378,7 +377,7 @@ public void processRequest(Request request) {
case OpCode.getData: {
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest);
request.readRequestRecord(getDataRequest);
path = getDataRequest.getPath();
rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo);
requestPathMetricsCollector.registerRequest(request.type, path);
Expand All @@ -387,9 +386,7 @@ public void processRequest(Request request) {
case OpCode.setWatches: {
lastOp = "SETW";
SetWatches setWatches = new SetWatches();
// TODO we really should not need this
request.request.rewind();
ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
request.readRequestRecord(setWatches);
long relativeZxid = setWatches.getRelativeZxid();
zks.getZKDatabase()
.setWatches(
Expand All @@ -405,9 +402,7 @@ public void processRequest(Request request) {
case OpCode.setWatches2: {
lastOp = "STW2";
SetWatches2 setWatches = new SetWatches2();
// TODO we really should not need this
request.request.rewind();
ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
request.readRequestRecord(setWatches);
long relativeZxid = setWatches.getRelativeZxid();
zks.getZKDatabase().setWatches(relativeZxid,
setWatches.getDataWatches(),
Expand All @@ -421,16 +416,15 @@ public void processRequest(Request request) {
case OpCode.addWatch: {
lastOp = "ADDW";
AddWatchRequest addWatcherRequest = new AddWatchRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
addWatcherRequest);
request.readRequestRecord(addWatcherRequest);
zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode());
rsp = new ErrorResponse(0);
break;
}
case OpCode.getACL: {
lastOp = "GETA";
GetACLRequest getACLRequest = new GetACLRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getACLRequest);
request.readRequestRecord(getACLRequest);
path = getACLRequest.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
if (n == null) {
Expand Down Expand Up @@ -473,7 +467,7 @@ public void processRequest(Request request) {
case OpCode.getChildren: {
lastOp = "GETC";
GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getChildrenRequest);
request.readRequestRecord(getChildrenRequest);
path = getChildrenRequest.getPath();
rsp = handleGetChildrenRequest(getChildrenRequest, cnxn, request.authInfo);
requestPathMetricsCollector.registerRequest(request.type, path);
Expand All @@ -482,7 +476,7 @@ public void processRequest(Request request) {
case OpCode.getAllChildrenNumber: {
lastOp = "GETACN";
GetAllChildrenNumberRequest getAllChildrenNumberRequest = new GetAllChildrenNumberRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getAllChildrenNumberRequest);
request.readRequestRecord(getAllChildrenNumberRequest);
path = getAllChildrenNumberRequest.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
if (n == null) {
Expand All @@ -502,7 +496,7 @@ public void processRequest(Request request) {
case OpCode.getChildren2: {
lastOp = "GETC";
GetChildren2Request getChildren2Request = new GetChildren2Request();
ByteBufferInputStream.byteBuffer2Record(request.request, getChildren2Request);
request.readRequestRecord(getChildren2Request);
Stat stat = new Stat();
path = getChildren2Request.getPath();
DataNode n = zks.getZKDatabase().getNode(path);
Expand All @@ -524,7 +518,7 @@ public void processRequest(Request request) {
case OpCode.checkWatches: {
lastOp = "CHKW";
CheckWatchesRequest checkWatches = new CheckWatchesRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, checkWatches);
request.readRequestRecord(checkWatches);
WatcherType type = WatcherType.fromInt(checkWatches.getType());
path = checkWatches.getPath();
boolean containsWatcher = zks.getZKDatabase().containsWatcher(path, type, cnxn);
Expand All @@ -538,7 +532,7 @@ public void processRequest(Request request) {
case OpCode.removeWatches: {
lastOp = "REMW";
RemoveWatchesRequest removeWatches = new RemoveWatchesRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, removeWatches);
request.readRequestRecord(removeWatches);
WatcherType type = WatcherType.fromInt(removeWatches.getType());
path = removeWatches.getPath();
boolean removed = zks.getZKDatabase().removeWatch(path, type, cnxn);
Expand All @@ -557,7 +551,7 @@ public void processRequest(Request request) {
case OpCode.getEphemerals: {
lastOp = "GETE";
GetEphemeralsRequest getEphemerals = new GetEphemeralsRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, getEphemerals);
request.readRequestRecord(getEphemerals);
String prefixPath = getEphemerals.getPrefixPath();
Set<String> allEphems = zks.getZKDatabase().getDataTree().getEphemerals(request.sessionId);
List<String> ephemerals = new ArrayList<>();
Expand Down Expand Up @@ -592,10 +586,13 @@ public void processRequest(Request request) {
// error to the user
LOG.error("Failed to process {}", request, e);
StringBuilder sb = new StringBuilder();
ByteBuffer bb = request.request;
bb.rewind();
while (bb.hasRemaining()) {
sb.append(String.format("%02x", (0xff & bb.get())));
byte[] payload = request.readRequestBytes();
if (payload != null) {
for (byte b : payload) {
sb.append(String.format("%02x", (0xff & b)));
}
} else {
sb.append("request buffer is null");
}
LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), sb);
err = Code.MARSHALLINGERROR;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.ConnectRequest;
import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.proto.WatcherEvent;
import org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread;
Expand Down Expand Up @@ -427,11 +428,13 @@ public void enableRecv() {
}
}

private void readConnectRequest() throws IOException, InterruptedException, ClientCnxnLimitException {
private void readConnectRequest() throws IOException, ClientCnxnLimitException {
if (!isZKServerRunning()) {
throw new IOException("ZooKeeperServer not running");
}
zkServer.processConnectRequest(this, incomingBuffer);
BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
ConnectRequest request = protocolManager.deserializeConnectRequest(bia);
zkServer.processConnectRequest(this, request);
initialized = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.proto.ConnectRequest;
import org.apache.zookeeper.proto.ReplyHeader;
import org.apache.zookeeper.proto.WatcherEvent;
import org.apache.zookeeper.server.command.CommandExecutor;
Expand Down Expand Up @@ -482,7 +483,9 @@ private void receiveMessage(ByteBuf message) {
zks.processPacket(this, bb);
} else {
LOG.debug("got conn req request from {}", getRemoteSocketAddress());
zks.processConnectRequest(this, bb);
BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(bb));
ConnectRequest request = protocolManager.deserializeConnectRequest(bia);
zks.processConnectRequest(this, request);
initialized = true;
}
bb = null;
Expand Down
Loading

0 comments on commit a7e4dea

Please sign in to comment.