From a7e4dea7abccff018d123d54dd5d3ccc1544484e Mon Sep 17 00:00:00 2001 From: tison Date: Tue, 12 Jul 2022 13:03:07 +0200 Subject: [PATCH] ZOOKEEPER-4573: Encapsulate request bytebuffer in Request This patch is based on #1903. This closes #1903. Author: tison Reviewers: Andor Molnar , Mate Szalay-Beko Closes #1904 from tisonkun/encapsulate-request-bytebuffer --- .../org/apache/jute/BinaryInputArchive.java | 18 +++---- .../apache/zookeeper/ClientCnxnSocket.java | 2 +- .../apache/zookeeper/audit/AuditHelper.java | 4 +- .../server/ByteBufferInputStream.java | 7 +-- .../server/ByteBufferOutputStream.java | 12 +++-- .../server/FinalRequestProcessor.java | 45 ++++++++--------- .../zookeeper/server/NIOServerCnxn.java | 7 ++- .../zookeeper/server/NettyServerCnxn.java | 5 +- .../server/PrepRequestProcessor.java | 42 +++++++--------- .../org/apache/zookeeper/server/Request.java | 25 +++++++++- .../zookeeper/server/ZooKeeperServer.java | 49 ++++++++----------- .../zookeeper/server/quorum/Learner.java | 10 ++-- .../server/quorum/QuorumZooKeeperServer.java | 9 +--- .../zookeeper/server/CreateContainerTest.java | 14 ++---- .../server/ZooKeeperServerCreationTest.java | 8 +-- .../zookeeper/server/ZooKeeperServerTest.java | 22 ++++----- .../quorum/ReadOnlyZooKeeperServerTest.java | 36 +++++++------- .../quorum/SessionUpgradeQuorumTest.java | 7 +-- 18 files changed, 156 insertions(+), 166 deletions(-) diff --git a/zookeeper-jute/src/main/java/org/apache/jute/BinaryInputArchive.java b/zookeeper-jute/src/main/java/org/apache/jute/BinaryInputArchive.java index c4d1bbebf9d..ae1310af03d 100644 --- a/zookeeper-jute/src/main/java/org/apache/jute/BinaryInputArchive.java +++ b/zookeeper-jute/src/main/java/org/apache/jute/BinaryInputArchive.java @@ -47,27 +47,27 @@ public class BinaryInputArchive implements InputArchive { } } - private DataInput in; - private int maxBufferSize; - private int extraMaxBufferSize; + private final DataInput in; + private final int maxBufferSize; + private final int extraMaxBufferSize; - public static BinaryInputArchive getArchive(InputStream strm) { - return new BinaryInputArchive(new DataInputStream(strm)); + public static BinaryInputArchive getArchive(InputStream stream) { + return new BinaryInputArchive(new DataInputStream(stream)); } private static class BinaryIndex implements Index { - private int nelems; + private int n; BinaryIndex(int nelems) { - this.nelems = nelems; + this.n = nelems; } public boolean done() { - return (nelems <= 0); + return (n <= 0); } public void incr() { - nelems--; + n--; } } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java index 35af4a2f121..17c0ad27925 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java @@ -141,7 +141,7 @@ void readConnectResult() throws IOException { ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ConnectResponse conRsp = protocolManager.deserializeConnectResponse(bbia); - if (protocolManager.isReadonlyAvailable()) { + if (!protocolManager.isReadonlyAvailable()) { LOG.warn("Connected to an old server; r-o mode will be unavailable"); } this.sessionId = conRsp.getSessionId(); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java b/zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java index b98c42d2588..5aca1711f7a 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java @@ -32,7 +32,6 @@ import org.apache.zookeeper.proto.DeleteRequest; import org.apache.zookeeper.proto.SetACLRequest; import org.apache.zookeeper.proto.SetDataRequest; -import org.apache.zookeeper.server.ByteBufferInputStream; import org.apache.zookeeper.server.DataTree.ProcessTxnResult; import org.apache.zookeeper.server.Request; import org.slf4j.Logger; @@ -127,8 +126,7 @@ public static void addAuditLog(Request request, ProcessTxnResult txnResult, bool } private static void deserialize(Request request, Record record) throws IOException { - request.request.rewind(); - ByteBufferInputStream.byteBuffer2Record(request.request.slice(), record); + request.readRequestRecord(record); } private static Result getResult(ProcessTxnResult rc, boolean failedTxn) { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferInputStream.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferInputStream.java index 9a93abd6115..8fe8b6fc94e 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferInputStream.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferInputStream.java @@ -21,12 +21,13 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; +import javax.annotation.Nonnull; import org.apache.jute.BinaryInputArchive; import org.apache.jute.Record; public class ByteBufferInputStream extends InputStream { - ByteBuffer bb; + private final ByteBuffer bb; public ByteBufferInputStream(ByteBuffer bb) { this.bb = bb; @@ -46,7 +47,7 @@ public int available() throws IOException { } @Override - public int read(byte[] b, int off, int len) throws IOException { + public int read(@Nonnull byte[] b, int off, int len) throws IOException { if (bb.remaining() == 0) { return -1; } @@ -58,7 +59,7 @@ public int read(byte[] b, int off, int len) throws IOException { } @Override - public int read(byte[] b) throws IOException { + public int read(@Nonnull byte[] b) throws IOException { return read(b, 0, b.length); } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferOutputStream.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferOutputStream.java index 2531cbaffee..35a528cdb86 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferOutputStream.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferOutputStream.java @@ -21,27 +21,33 @@ import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; +import javax.annotation.Nonnull; import org.apache.jute.BinaryOutputArchive; import org.apache.jute.Record; public class ByteBufferOutputStream extends OutputStream { - ByteBuffer bb; + private final ByteBuffer bb; + public ByteBufferOutputStream(ByteBuffer bb) { this.bb = bb; } + @Override public void write(int b) throws IOException { bb.put((byte) b); } + @Override - public void write(byte[] b) throws IOException { + public void write(@Nonnull byte[] b) throws IOException { bb.put(b); } + @Override - public void write(byte[] b, int off, int len) throws IOException { + public void write(@Nonnull byte[] b, int off, int len) throws IOException { bb.put(b, off, len); } + public static void record2ByteBuffer(Record record, ByteBuffer bb) throws IOException { BinaryOutputArchive oa; oa = BinaryOutputArchive.getArchive(new ByteBufferOutputStream(bb)); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java index e73f85d7cc7..bc5b019f812 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java @@ -20,7 +20,6 @@ import static java.nio.charset.StandardCharsets.UTF_8; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -271,7 +270,7 @@ public void processRequest(Request request) { case OpCode.multiRead: { lastOp = "MLTR"; MultiOperationRecord multiReadRecord = new MultiOperationRecord(); - ByteBufferInputStream.byteBuffer2Record(request.request, multiReadRecord); + request.readRequestRecord(multiReadRecord); rsp = new MultiResponse(); OpResult subResult; for (Op readOp : multiReadRecord) { @@ -350,7 +349,7 @@ public void processRequest(Request request) { case OpCode.sync: { lastOp = "SYNC"; SyncRequest syncRequest = new SyncRequest(); - ByteBufferInputStream.byteBuffer2Record(request.request, syncRequest); + request.readRequestRecord(syncRequest); rsp = new SyncResponse(syncRequest.getPath()); requestPathMetricsCollector.registerRequest(request.type, syncRequest.getPath()); break; @@ -365,7 +364,7 @@ public void processRequest(Request request) { lastOp = "EXIS"; // TODO we need to figure out the security requirement for this! ExistsRequest existsRequest = new ExistsRequest(); - ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest); + request.readRequestRecord(existsRequest); path = existsRequest.getPath(); if (path.indexOf('\0') != -1) { throw new KeeperException.BadArgumentsException(); @@ -378,7 +377,7 @@ public void processRequest(Request request) { case OpCode.getData: { lastOp = "GETD"; GetDataRequest getDataRequest = new GetDataRequest(); - ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest); + request.readRequestRecord(getDataRequest); path = getDataRequest.getPath(); rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo); requestPathMetricsCollector.registerRequest(request.type, path); @@ -387,9 +386,7 @@ public void processRequest(Request request) { case OpCode.setWatches: { lastOp = "SETW"; SetWatches setWatches = new SetWatches(); - // TODO we really should not need this - request.request.rewind(); - ByteBufferInputStream.byteBuffer2Record(request.request, setWatches); + request.readRequestRecord(setWatches); long relativeZxid = setWatches.getRelativeZxid(); zks.getZKDatabase() .setWatches( @@ -405,9 +402,7 @@ public void processRequest(Request request) { case OpCode.setWatches2: { lastOp = "STW2"; SetWatches2 setWatches = new SetWatches2(); - // TODO we really should not need this - request.request.rewind(); - ByteBufferInputStream.byteBuffer2Record(request.request, setWatches); + request.readRequestRecord(setWatches); long relativeZxid = setWatches.getRelativeZxid(); zks.getZKDatabase().setWatches(relativeZxid, setWatches.getDataWatches(), @@ -421,8 +416,7 @@ public void processRequest(Request request) { case OpCode.addWatch: { lastOp = "ADDW"; AddWatchRequest addWatcherRequest = new AddWatchRequest(); - ByteBufferInputStream.byteBuffer2Record(request.request, - addWatcherRequest); + request.readRequestRecord(addWatcherRequest); zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode()); rsp = new ErrorResponse(0); break; @@ -430,7 +424,7 @@ public void processRequest(Request request) { case OpCode.getACL: { lastOp = "GETA"; GetACLRequest getACLRequest = new GetACLRequest(); - ByteBufferInputStream.byteBuffer2Record(request.request, getACLRequest); + request.readRequestRecord(getACLRequest); path = getACLRequest.getPath(); DataNode n = zks.getZKDatabase().getNode(path); if (n == null) { @@ -473,7 +467,7 @@ public void processRequest(Request request) { case OpCode.getChildren: { lastOp = "GETC"; GetChildrenRequest getChildrenRequest = new GetChildrenRequest(); - ByteBufferInputStream.byteBuffer2Record(request.request, getChildrenRequest); + request.readRequestRecord(getChildrenRequest); path = getChildrenRequest.getPath(); rsp = handleGetChildrenRequest(getChildrenRequest, cnxn, request.authInfo); requestPathMetricsCollector.registerRequest(request.type, path); @@ -482,7 +476,7 @@ public void processRequest(Request request) { case OpCode.getAllChildrenNumber: { lastOp = "GETACN"; GetAllChildrenNumberRequest getAllChildrenNumberRequest = new GetAllChildrenNumberRequest(); - ByteBufferInputStream.byteBuffer2Record(request.request, getAllChildrenNumberRequest); + request.readRequestRecord(getAllChildrenNumberRequest); path = getAllChildrenNumberRequest.getPath(); DataNode n = zks.getZKDatabase().getNode(path); if (n == null) { @@ -502,7 +496,7 @@ public void processRequest(Request request) { case OpCode.getChildren2: { lastOp = "GETC"; GetChildren2Request getChildren2Request = new GetChildren2Request(); - ByteBufferInputStream.byteBuffer2Record(request.request, getChildren2Request); + request.readRequestRecord(getChildren2Request); Stat stat = new Stat(); path = getChildren2Request.getPath(); DataNode n = zks.getZKDatabase().getNode(path); @@ -524,7 +518,7 @@ public void processRequest(Request request) { case OpCode.checkWatches: { lastOp = "CHKW"; CheckWatchesRequest checkWatches = new CheckWatchesRequest(); - ByteBufferInputStream.byteBuffer2Record(request.request, checkWatches); + request.readRequestRecord(checkWatches); WatcherType type = WatcherType.fromInt(checkWatches.getType()); path = checkWatches.getPath(); boolean containsWatcher = zks.getZKDatabase().containsWatcher(path, type, cnxn); @@ -538,7 +532,7 @@ public void processRequest(Request request) { case OpCode.removeWatches: { lastOp = "REMW"; RemoveWatchesRequest removeWatches = new RemoveWatchesRequest(); - ByteBufferInputStream.byteBuffer2Record(request.request, removeWatches); + request.readRequestRecord(removeWatches); WatcherType type = WatcherType.fromInt(removeWatches.getType()); path = removeWatches.getPath(); boolean removed = zks.getZKDatabase().removeWatch(path, type, cnxn); @@ -557,7 +551,7 @@ public void processRequest(Request request) { case OpCode.getEphemerals: { lastOp = "GETE"; GetEphemeralsRequest getEphemerals = new GetEphemeralsRequest(); - ByteBufferInputStream.byteBuffer2Record(request.request, getEphemerals); + request.readRequestRecord(getEphemerals); String prefixPath = getEphemerals.getPrefixPath(); Set allEphems = zks.getZKDatabase().getDataTree().getEphemerals(request.sessionId); List ephemerals = new ArrayList<>(); @@ -592,10 +586,13 @@ public void processRequest(Request request) { // error to the user LOG.error("Failed to process {}", request, e); StringBuilder sb = new StringBuilder(); - ByteBuffer bb = request.request; - bb.rewind(); - while (bb.hasRemaining()) { - sb.append(String.format("%02x", (0xff & bb.get()))); + byte[] payload = request.readRequestBytes(); + if (payload != null) { + for (byte b : payload) { + sb.append(String.format("%02x", (0xff & b))); + } + } else { + sb.append("request buffer is null"); } LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), sb); err = Code.MARSHALLINGERROR; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java index dd62154fc87..83e7491e51b 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java @@ -40,6 +40,7 @@ import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.proto.ConnectRequest; import org.apache.zookeeper.proto.ReplyHeader; import org.apache.zookeeper.proto.WatcherEvent; import org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread; @@ -427,11 +428,13 @@ public void enableRecv() { } } - private void readConnectRequest() throws IOException, InterruptedException, ClientCnxnLimitException { + private void readConnectRequest() throws IOException, ClientCnxnLimitException { if (!isZKServerRunning()) { throw new IOException("ZooKeeperServer not running"); } - zkServer.processConnectRequest(this, incomingBuffer); + BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer)); + ConnectRequest request = protocolManager.deserializeConnectRequest(bia); + zkServer.processConnectRequest(this, request); initialized = true; } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java index 8937039bca7..ae482ce2b5e 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java @@ -45,6 +45,7 @@ import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.proto.ConnectRequest; import org.apache.zookeeper.proto.ReplyHeader; import org.apache.zookeeper.proto.WatcherEvent; import org.apache.zookeeper.server.command.CommandExecutor; @@ -482,7 +483,9 @@ private void receiveMessage(ByteBuf message) { zks.processPacket(this, bb); } else { LOG.debug("got conn req request from {}", getRemoteSocketAddress()); - zks.processConnectRequest(this, bb); + BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(bb)); + ConnectRequest request = protocolManager.deserializeConnectRequest(bia); + zks.processConnectRequest(this, request); initialized = true; } bb = null; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java index f836927236d..9733a48aca1 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java @@ -332,7 +332,7 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record, break; } case OpCode.deleteContainer: { - String path = new String(request.request.array(), UTF_8); + String path = new String(request.readRequestBytes(), UTF_8); String parentPath = getParentPathAndValidate(path); ChangeRecord nodeRecord = getRecordForPath(path); if (nodeRecord.childCount > 0) { @@ -360,7 +360,7 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record, zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); DeleteRequest deleteRequest = (DeleteRequest) record; if (deserialize) { - ByteBufferInputStream.byteBuffer2Record(request.request, deleteRequest); + request.readRequestRecord(deleteRequest); } String path = deleteRequest.getPath(); String parentPath = getParentPathAndValidate(path); @@ -388,7 +388,7 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record, zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); SetDataRequest setDataRequest = (SetDataRequest) record; if (deserialize) { - ByteBufferInputStream.byteBuffer2Record(request.request, setDataRequest); + request.readRequestRecord(setDataRequest); } path = setDataRequest.getPath(); validatePath(path, request.sessionId); @@ -560,7 +560,7 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record, zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); SetACLRequest setAclRequest = (SetACLRequest) record; if (deserialize) { - ByteBufferInputStream.byteBuffer2Record(request.request, setAclRequest); + request.readRequestRecord(setAclRequest); } path = setAclRequest.getPath(); validatePath(path, request.sessionId); @@ -577,12 +577,11 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record, addChangeRecord(nodeRecord); break; case OpCode.createSession: - request.request.rewind(); - int to = request.request.getInt(); - request.setTxn(new CreateSessionTxn(to)); - request.request.rewind(); + CreateSessionTxn createSessionTxn = new CreateSessionTxn(); + request.readRequestRecord(createSessionTxn); + request.setTxn(createSessionTxn); // only add the global session tracker but not to ZKDb - zks.sessionTracker.trackSession(request.sessionId, to); + zks.sessionTracker.trackSession(request.sessionId, createSessionTxn.getTimeOut()); zks.setOwner(request.sessionId, request.getOwner()); break; case OpCode.closeSession: @@ -632,7 +631,7 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record, zks.sessionTracker.checkSession(request.sessionId, request.getOwner()); CheckVersionRequest checkVersionRequest = (CheckVersionRequest) record; if (deserialize) { - ByteBufferInputStream.byteBuffer2Record(request.request, checkVersionRequest); + request.readRequestRecord(checkVersionRequest); } path = checkVersionRequest.getPath(); validatePath(path, request.sessionId); @@ -656,7 +655,7 @@ protected void pRequest2Txn(int type, long zxid, Request request, Record record, private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException { if (deserialize) { - ByteBufferInputStream.byteBuffer2Record(request.request, record); + request.readRequestRecord(record); } int flags; @@ -786,10 +785,8 @@ protected void pRequest(Request request) throws RequestProcessorException { /** * This method is a helper to pRequest method - * - * @param request */ - private void pRequestHelper(Request request) throws RequestProcessorException { + private void pRequestHelper(Request request) { try { switch (request.type) { case OpCode.createContainer: @@ -813,7 +810,7 @@ private void pRequestHelper(Request request) throws RequestProcessorException { break; case OpCode.reconfig: ReconfigRequest reconfigRequest = new ReconfigRequest(); - ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest); + request.readRequestRecord(reconfigRequest); pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true); break; case OpCode.setACL: @@ -827,12 +824,12 @@ private void pRequestHelper(Request request) throws RequestProcessorException { case OpCode.multi: MultiOperationRecord multiRequest = new MultiOperationRecord(); try { - ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest); + request.readRequestRecord(multiRequest); } catch (IOException e) { request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi)); throw e; } - List txns = new ArrayList(); + List txns = new ArrayList<>(); //Each op in a multi-op must have the same zxid! long zxid = zks.getNextZxid(); KeeperException ke = null; @@ -947,18 +944,15 @@ private void pRequestHelper(Request request) throws RequestProcessorException { // log at error level as we are returning a marshalling // error to the user LOG.error("Failed to process {}", request, e); - StringBuilder sb = new StringBuilder(); - ByteBuffer bb = request.request; - if (bb != null) { - bb.rewind(); - while (bb.hasRemaining()) { - sb.append(String.format("%02x", (0xff & bb.get()))); + byte[] payload = request.readRequestBytes(); + if (payload != null) { + for (byte b : payload) { + sb.append(String.format("%02x", (0xff & b))); } } else { sb.append("request buffer is null"); } - LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), sb); if (request.getHdr() != null) { request.getHdr().setType(OpCode.error); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java index 41e3d7fcd36..1aee6aee24f 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java @@ -19,6 +19,7 @@ package org.apache.zookeeper.server; import static java.nio.charset.StandardCharsets.UTF_8; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.List; import org.apache.jute.Record; @@ -78,7 +79,29 @@ public Request(long sessionId, int xid, int type, TxnHeader hdr, Record txn, lon public final int type; - public final ByteBuffer request; + private final ByteBuffer request; + + public void readRequestRecord(Record record) throws IOException { + if (request != null) { + request.rewind(); + ByteBufferInputStream.byteBuffer2Record(request, record); + request.rewind(); + return; + } + throw new IOException(new NullPointerException("request")); + } + + public byte[] readRequestBytes() { + if (request != null) { + request.rewind(); + int len = request.remaining(); + byte[] b = new byte[len]; + request.get(b); + request.rewind(); + return b; + } + return null; + } public final ServerCnxn cnxn; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java index 4260913b796..0303ca645bd 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java @@ -1375,17 +1375,13 @@ public double getConnectionDropChance() { } @SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "the value won't change after startup") - public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) - throws IOException, ClientCnxnLimitException { - - BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer)); - ConnectRequest connReq = cnxn.protocolManager.deserializeConnectRequest(bia); + public void processConnectRequest(ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException { LOG.debug( "Session establishment request from client {} client's lastZxid is 0x{}", cnxn.getRemoteSocketAddress(), - Long.toHexString(connReq.getLastZxidSeen())); + Long.toHexString(request.getLastZxidSeen())); - long sessionId = connReq.getSessionId(); + long sessionId = request.getSessionId(); int tokensNeeded = 1; if (connThrottle.isConnectionWeightEnabled()) { if (sessionId == 0) { @@ -1405,22 +1401,22 @@ public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit()); ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1); - if (cnxn.protocolManager.isReadonlyAvailable()) { + if (!cnxn.protocolManager.isReadonlyAvailable()) { LOG.warn( "Connection request from old client {}; will be dropped if server is in r-o mode", cnxn.getRemoteSocketAddress()); } - if (!connReq.getReadOnly() && this instanceof ReadOnlyZooKeeperServer) { + if (!request.getReadOnly() && this instanceof ReadOnlyZooKeeperServer) { String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress(); LOG.info(msg); throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT); } - if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) { + if (request.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) { String msg = "Refusing session request for client " + cnxn.getRemoteSocketAddress() + " as it has seen zxid 0x" - + Long.toHexString(connReq.getLastZxidSeen()) + + Long.toHexString(request.getLastZxidSeen()) + " our last zxid is 0x" + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid()) + " client must try another server"; @@ -1428,8 +1424,8 @@ public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) LOG.info(msg); throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD); } - int sessionTimeout = connReq.getTimeOut(); - byte[] passwd = connReq.getPasswd(); + int sessionTimeout = request.getTimeOut(); + byte[] passwd = request.getPasswd(); int minSessionTimeout = getMinSessionTimeout(); if (sessionTimeout < minSessionTimeout) { sessionTimeout = minSessionTimeout; @@ -1447,16 +1443,16 @@ public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer) LOG.debug( "Client attempting to establish new session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}", Long.toHexString(id), - Long.toHexString(connReq.getLastZxidSeen()), - connReq.getTimeOut(), + Long.toHexString(request.getLastZxidSeen()), + request.getTimeOut(), cnxn.getRemoteSocketAddress()); } else { validateSession(cnxn, sessionId); LOG.debug( "Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}", Long.toHexString(sessionId), - Long.toHexString(connReq.getLastZxidSeen()), - connReq.getTimeOut(), + Long.toHexString(request.getLastZxidSeen()), + request.getTimeOut(), cnxn.getRemoteSocketAddress()); if (serverCnxnFactory != null) { serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT); @@ -2182,7 +2178,7 @@ private String effectiveACLPath(Request request) throws KeeperException.BadArgum case OpCode.create: case OpCode.create2: { CreateRequest req = new CreateRequest(); - if (buffer2Record(request.request, req)) { + if (readRequestRecord(request, req)) { mustCheckACL = true; acl = req.getAcl(); path = parentPath(req.getPath()); @@ -2191,21 +2187,21 @@ private String effectiveACLPath(Request request) throws KeeperException.BadArgum } case OpCode.delete: { DeleteRequest req = new DeleteRequest(); - if (buffer2Record(request.request, req)) { + if (readRequestRecord(request, req)) { path = parentPath(req.getPath()); } break; } case OpCode.setData: { SetDataRequest req = new SetDataRequest(); - if (buffer2Record(request.request, req)) { + if (readRequestRecord(request, req)) { path = req.getPath(); } break; } case OpCode.setACL: { SetACLRequest req = new SetACLRequest(); - if (buffer2Record(request.request, req)) { + if (readRequestRecord(request, req)) { mustCheckACL = true; acl = req.getAcl(); path = req.getPath(); @@ -2299,16 +2295,13 @@ public boolean authWriteRequest(Request request) { return err == KeeperException.Code.OK.intValue(); } - private boolean buffer2Record(ByteBuffer request, Record record) { - boolean rv = false; + private boolean readRequestRecord(Request request, Record record) { try { - ByteBufferInputStream.byteBuffer2Record(request, record); - request.rewind(); - rv = true; + request.readRequestRecord(record); + return true; } catch (IOException ex) { + return false; } - - return rv; } public int getOutstandingHandshakeNum() { diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java index 594c87fb90d..b6eeb758ac9 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java @@ -254,13 +254,9 @@ void request(Request request) throws IOException { oa.writeLong(request.sessionId); oa.writeInt(request.cxid); oa.writeInt(request.type); - if (request.request != null) { - request.request.rewind(); - int len = request.request.remaining(); - byte[] b = new byte[len]; - request.request.get(b); - request.request.rewind(); - oa.write(b); + byte[] payload = request.readRequestBytes(); + if (payload != null) { + oa.write(payload); } oa.close(); QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java index f27ce827421..2f24347b778 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java @@ -31,7 +31,6 @@ import org.apache.zookeeper.ZooDefs.OpCode; import org.apache.zookeeper.metrics.MetricsContext; import org.apache.zookeeper.proto.CreateRequest; -import org.apache.zookeeper.server.ByteBufferInputStream; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.ServerMetrics; import org.apache.zookeeper.server.ZKDatabase; @@ -78,9 +77,7 @@ public Request checkUpgradeSession(Request request) throws IOException, KeeperEx if (OpCode.multi == request.type) { MultiOperationRecord multiTransactionRecord = new MultiOperationRecord(); - request.request.rewind(); - ByteBufferInputStream.byteBuffer2Record(request.request, multiTransactionRecord); - request.request.rewind(); + request.readRequestRecord(multiTransactionRecord); boolean containsEphemeralCreate = false; for (Op op : multiTransactionRecord) { if (op.getType() == OpCode.create || op.getType() == OpCode.create2) { @@ -97,9 +94,7 @@ public Request checkUpgradeSession(Request request) throws IOException, KeeperEx } } else { CreateRequest createRequest = new CreateRequest(); - request.request.rewind(); - ByteBufferInputStream.byteBuffer2Record(request.request, createRequest); - request.request.rewind(); + request.readRequestRecord(createRequest); CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags()); if (!createMode.isEphemeral()) { return null; diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/CreateContainerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/CreateContainerTest.java index 6722473f2a5..f9fd6d8b588 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/CreateContainerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/CreateContainerTest.java @@ -27,7 +27,6 @@ import java.util.Collection; import java.util.Collections; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -221,11 +220,11 @@ public void testFalseEmpty() throws KeeperException, InterruptedException { @Test @Timeout(value = 30) public void testMaxPerMinute() throws InterruptedException { - final BlockingQueue queue = new LinkedBlockingQueue(); + final BlockingQueue queue = new LinkedBlockingQueue<>(); RequestProcessor processor = new RequestProcessor() { @Override public void processRequest(Request request) { - queue.add(new String(request.request.array())); + queue.add(new String(request.readRequestBytes())); } @Override @@ -243,12 +242,9 @@ protected Collection getCandidates() { return Arrays.asList("/one", "/two", "/three", "/four"); } }; - Executors.newSingleThreadExecutor().submit(new Callable() { - @Override - public Void call() throws Exception { - containerManager.checkContainers(); - return null; - } + Executors.newSingleThreadExecutor().submit(() -> { + containerManager.checkContainers(); + return null; }); assertEquals(queue.poll(5, TimeUnit.SECONDS), "/one"); assertEquals(queue.poll(5, TimeUnit.SECONDS), "/two"); diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerCreationTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerCreationTest.java index ac46b4e0f29..6c22091f426 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerCreationTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerCreationTest.java @@ -18,10 +18,7 @@ package org.apache.zookeeper.server; -import java.io.ByteArrayOutputStream; import java.io.File; -import java.nio.ByteBuffer; -import org.apache.jute.BinaryOutputArchive; import org.apache.zookeeper.proto.ConnectRequest; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.test.ClientBase; @@ -51,10 +48,7 @@ public void submitRequest(Request si) { ServerCnxn cnxn = new MockServerCnxn(); ConnectRequest connReq = new ConnectRequest(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); - connReq.serialize(boa, "connect"); - zks.processConnectRequest(cnxn, ByteBuffer.wrap(baos.toByteArray())); + zks.processConnectRequest(cnxn, connReq); } } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerTest.java index 4d41dac1a9b..e9f9b7db544 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerTest.java @@ -26,12 +26,12 @@ import static org.mockito.Mockito.mock; import java.io.File; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.UUID; import org.apache.zookeeper.ZKTestCase; import org.apache.zookeeper.metrics.MetricsUtils; +import org.apache.zookeeper.proto.ConnectRequest; import org.apache.zookeeper.server.persistence.FileTxnLog; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.server.persistence.SnapStream; @@ -150,21 +150,17 @@ public void testClientZxidAhead() { final ZKDatabase zkDatabase = new ZKDatabase(mock(FileTxnSnapLog.class)); zooKeeperServer.setZKDatabase(zkDatabase); - final ByteBuffer output = ByteBuffer.allocate(30); - // serialize a connReq - output.putInt(1); - // lastZxid - output.putLong(99L); - output.putInt(500); - output.putLong(123L); - output.putInt(1); - output.put((byte) 1); - output.put((byte) 1); - output.flip(); + final ConnectRequest request = new ConnectRequest(); + request.setProtocolVersion(1); + request.setLastZxidSeen(99L); + request.setTimeOut(500); + request.setSessionId(123L); + request.setPasswd(new byte[]{ 1 }); + request.setReadOnly(true); ServerCnxn.CloseRequestException e = assertThrows( ServerCnxn.CloseRequestException.class, - () -> zooKeeperServer.processConnectRequest(new MockServerCnxn(), output)); + () -> zooKeeperServer.processConnectRequest(new MockServerCnxn(), request)); assertEquals(e.getReason(), ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD); } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServerTest.java index 3ee6016f086..298b5d1d95f 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServerTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServerTest.java @@ -21,7 +21,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; -import java.nio.ByteBuffer; +import org.apache.zookeeper.proto.ConnectRequest; import org.apache.zookeeper.server.MockServerCnxn; import org.apache.zookeeper.server.ServerCnxn; import org.apache.zookeeper.server.ZKDatabase; @@ -35,28 +35,26 @@ public class ReadOnlyZooKeeperServerTest { /** - * test method {@link ZooKeeperServer#processConnectRequest(org.apache.zookeeper.server.ServerCnxn, java.nio.ByteBuffer)} + * test method {@link ZooKeeperServer#processConnectRequest(ServerCnxn, ConnectRequest)} */ @Test public void testReadOnlyZookeeperServer() { ReadOnlyZooKeeperServer readOnlyZooKeeperServer = new ReadOnlyZooKeeperServer( - mock(FileTxnSnapLog.class), mock(QuorumPeer.class), mock(ZKDatabase.class)); - - final ByteBuffer output = ByteBuffer.allocate(30); - // serialize a connReq - output.putInt(1); - output.putLong(1L); - output.putInt(500); - output.putLong(123L); - output.putInt(1); - output.put((byte) 1); - // set readOnly false - output.put((byte) 0); - output.flip(); - - ServerCnxn.CloseRequestException e = assertThrows(ServerCnxn.CloseRequestException.class, () -> { - readOnlyZooKeeperServer.processConnectRequest(new MockServerCnxn(), output); - }); + mock(FileTxnSnapLog.class), + mock(QuorumPeer.class), + mock(ZKDatabase.class)); + + final ConnectRequest request = new ConnectRequest(); + request.setProtocolVersion(1); + request.setLastZxidSeen(99L); + request.setTimeOut(500); + request.setSessionId(123L); + request.setPasswd(new byte[]{ 1 }); + request.setReadOnly(false); + + ServerCnxn.CloseRequestException e = assertThrows( + ServerCnxn.CloseRequestException.class, + () -> readOnlyZooKeeperServer.processConnectRequest(new MockServerCnxn(), request)); assertEquals(e.getReason(), ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT); } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java index 2131158d811..8803a73cbab 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java @@ -39,7 +39,6 @@ import org.apache.zookeeper.ZooKeeper.States; import org.apache.zookeeper.data.Id; import org.apache.zookeeper.proto.CreateRequest; -import org.apache.zookeeper.server.ByteBufferInputStream; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.apache.zookeeper.test.ClientBase; @@ -319,15 +318,13 @@ protected void request(Request request) throws IOException { if (request.type == ZooDefs.OpCode.create && request.cnxn != null) { CreateRequest createRequest = new CreateRequest(); - request.request.rewind(); - ByteBufferInputStream.byteBuffer2Record(request.request, createRequest); - request.request.rewind(); + request.readRequestRecord(createRequest); try { CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags()); if (createMode.isEphemeral()) { request.cnxn.sendCloseSession(); } - } catch (KeeperException e) { + } catch (KeeperException ignore) { } return; }