Skip to content

Commit

Permalink
ZOOKEEPER-2816: Code refactoring for ZK_SERVER module
Browse files Browse the repository at this point in the history
* Fix spell issues
* Merge exceptions with `|` character
* Remove unnecessary boxing
* Remove unused import
* Using enhanced `for` loop
* Using `LinkedList` for removing duplicates ACL

Author: asdf2014 <[email protected]>

Reviewers: Michael Han <[email protected]>

Closes apache#288 from asdf2014/zk_quorum
  • Loading branch information
asdf2014 authored and hanm committed Jun 25, 2017
1 parent 8ec4c58 commit a0eba7a
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ public void processRequest(Request request) {
checkWatches.getPath(), type, cnxn);
if (!containsWatcher) {
String msg = String.format(Locale.ENGLISH, "%s (type: %s)",
new Object[] { checkWatches.getPath(), type });
checkWatches.getPath(), type);
throw new KeeperException.NoWatcherException(msg);
}
break;
Expand All @@ -414,7 +414,7 @@ public void processRequest(Request request) {
removeWatches.getPath(), type, cnxn);
if (!removed) {
String msg = String.format(Locale.ENGLISH, "%s (type: %s)",
new Object[] { removeWatches.getPath(), type });
removeWatches.getPath(), type);
throw new KeeperException.NoWatcherException(msg);
}
break;
Expand Down
26 changes: 10 additions & 16 deletions src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import org.apache.zookeeper.proto.SetACLRequest;
import org.apache.zookeeper.proto.SetDataRequest;
import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
import org.apache.zookeeper.server.auth.AuthenticationProvider;
import org.apache.zookeeper.server.auth.ProviderRegistry;
import org.apache.zookeeper.server.auth.ServerAuthenticationProvider;
import org.apache.zookeeper.server.quorum.Leader.XidRolloverException;
Expand Down Expand Up @@ -73,7 +72,6 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
Expand Down Expand Up @@ -104,7 +102,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements

/**
* this is only for testing purposes.
* should never be useed otherwise
* should never be used otherwise
*/
private static boolean failCreate = false;

Expand Down Expand Up @@ -454,7 +452,8 @@ protected void pRequest2Txn(int type, long zxid, Request request,
long configId = reconfigRequest.getCurConfigId();

if (configId != -1 && configId!=lzks.self.getLastSeenQuorumVerifier().getVersion()){
String msg = "Reconfiguration from version " + configId + " failed -- last seen version is " + lzks.self.getLastSeenQuorumVerifier().getVersion();
String msg = "Reconfiguration from version " + configId + " failed -- last seen version is " +
lzks.self.getLastSeenQuorumVerifier().getVersion();
throw new KeeperException.BadVersionException(msg);
}

Expand All @@ -471,11 +470,9 @@ protected void pRequest2Txn(int type, long zxid, Request request,
props.load(new StringReader(newMembers));
request.qv = QuorumPeerConfig.parseDynamicConfig(props, lzks.self.getElectionType(), true, false);
request.qv.setVersion(request.getHdr().getZxid());
} catch (IOException e) {
} catch (IOException | ConfigException e) {
throw new KeeperException.BadArgumentsException(e.getMessage());
} catch (ConfigException e) {
throw new KeeperException.BadArgumentsException(e.getMessage());
}
}
} else { //incremental change - must be a majority quorum system
LOG.info("Incremental reconfig");

Expand Down Expand Up @@ -529,7 +526,7 @@ protected void pRequest2Txn(int type, long zxid, Request request,
}

nextServers.remove(qs.id);
nextServers.put(Long.valueOf(qs.id), qs);
nextServers.put(qs.id, qs);
}
}
} catch (ConfigException e){
Expand Down Expand Up @@ -914,11 +911,9 @@ protected void pRequest(Request request) throws RequestProcessorException {

private List<ACL> removeDuplicates(List<ACL> acl) {

ArrayList<ACL> retval = new ArrayList<ACL>();
Iterator<ACL> it = acl.iterator();
while (it.hasNext()) {
ACL a = it.next();
if (retval.contains(a) == false) {
LinkedList<ACL> retval = new LinkedList<ACL>();
for (ACL a : acl) {
if (!retval.contains(a)) {
retval.add(a);
}
}
Expand All @@ -930,8 +925,7 @@ private void validateCreateRequest(String path, CreateMode createMode, Request r
try {
EphemeralType.validateTTL(createMode, ttl);
} catch (IllegalArgumentException e) {
BadArgumentsException bae = new BadArgumentsException(path);
throw bae;
throw new BadArgumentsException(path);
}
if (createMode.isEphemeral()) {
// Exception is set when local session failed to upgrade
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import org.apache.zookeeper.server.ServerCnxn.CloseRequestException;
import org.apache.zookeeper.server.SessionTracker.Session;
import org.apache.zookeeper.server.SessionTracker.SessionExpirer;
import org.apache.zookeeper.server.auth.AuthenticationProvider;
import org.apache.zookeeper.server.auth.ProviderRegistry;
import org.apache.zookeeper.server.auth.ServerAuthenticationProvider;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
Expand Down Expand Up @@ -104,7 +103,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
protected volatile State state = State.INITIAL;

protected enum State {
INITIAL, RUNNING, SHUTDOWN, ERROR;
INITIAL, RUNNING, SHUTDOWN, ERROR
}

/**
Expand Down Expand Up @@ -229,7 +228,7 @@ public ZooKeeperServer(File snapDir, File logDir, int tickTime)
}

/**
* Default constructor, relies on the config for its agrument values
* Default constructor, relies on the config for its argument values
*
* @throws IOException
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,24 +354,24 @@ public void run() {
QuorumPacket qp = new QuorumPacket();
ia.readRecord(qp, "packet");
if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){
LOG.error("First packet " + qp.toString()
LOG.error("First packet " + qp.toString()
+ " is not FOLLOWERINFO or OBSERVERINFO!");
return;
}

byte learnerInfoData[] = qp.getData();
if (learnerInfoData != null) {
ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
if (learnerInfoData.length >= 8) {
this.sid = bbsid.getLong();
}
}
if (learnerInfoData.length >= 12) {
this.version = bbsid.getInt(); // protocolVersion
}
if (learnerInfoData.length >= 20) {
long configVersion = bbsid.getLong();
if (configVersion > leader.self.getQuorumVerifier().getVersion()) {
throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
throw new IOException("Follower is ahead of the leader (has a later activated configuration)");
}
}
} else {
Expand Down Expand Up @@ -566,7 +566,8 @@ public void run() {
// owns the session
leader.zk.setOwner(id, this);
} catch (SessionExpiredException e) {
LOG.error("Somehow session " + Long.toHexString(id) + " expired right after being renewed! (impossible)", e);
LOG.error("Somehow session " + Long.toHexString(id) +
" expired right after being renewed! (impossible)", e);
}
}
if (LOG.isTraceEnabled()) {
Expand Down Expand Up @@ -690,7 +691,7 @@ public boolean syncFollower(long peerLastZxid, ZKDatabase db, Leader leader) {

if (db.getCommittedLog().isEmpty()) {
/*
* It is possible that commitedLog is empty. In that case
* It is possible that committedLog is empty. In that case
* setting these value to the latest txn in leader db
* will reduce the case that we need to handle
*
Expand All @@ -710,7 +711,7 @@ public boolean syncFollower(long peerLastZxid, ZKDatabase db, Leader leader) {
* 2. Peer and leader is already sync, send empty diff
* 3. Follower has txn that we haven't seen. This may be old leader
* so we need to send TRUNC. However, if peer has newEpochZxid,
* we cannot send TRUC since the follower has no txnlog
* we cannot send TRUNC since the follower has no txnlog
* 4. Follower is within committedLog range or already in-sync.
* We may need to send DIFF or TRUNC depending on follower's zxid
* We always send empty DIFF if follower is already in-sync
Expand All @@ -730,7 +731,7 @@ public boolean syncFollower(long peerLastZxid, ZKDatabase db, Leader leader) {
needOpPacket = false;
needSnap = false;
} else if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) {
// Newer than commitedLog, send trunc and done
// Newer than committedLog, send trunc and done
LOG.debug("Sending TRUNC to follower zxidToSend=0x" +
Long.toHexString(maxCommittedLog) +
" for peer sid:" + getSid());
Expand Down Expand Up @@ -799,12 +800,12 @@ public boolean syncFollower(long peerLastZxid, ZKDatabase db, Leader leader) {
* @param itr iterator point to the proposals
* @param peerLastZxid last zxid seen by the follower
* @param maxZxid max zxid of the proposal to queue, null if no limit
* @param lastCommitedZxid when sending diff, we need to send lastCommitedZxid
* @param lastCommittedZxid when sending diff, we need to send lastCommittedZxid
* on the leader to follow Zab 1.0 protocol.
* @return last zxid of the queued proposal
*/
protected long queueCommittedProposals(Iterator<Proposal> itr,
long peerLastZxid, Long maxZxid, Long lastCommitedZxid) {
long peerLastZxid, Long maxZxid, Long lastCommittedZxid) {
boolean isPeerNewEpochZxid = (peerLastZxid & 0xffffffffL) == 0;
long queuedZxid = peerLastZxid;
// as we look through proposals, this variable keeps track of previous
Expand Down Expand Up @@ -832,19 +833,19 @@ protected long queueCommittedProposals(Iterator<Proposal> itr,
// Send diff when we see the follower's zxid in our history
if (packetZxid == peerLastZxid) {
LOG.info("Sending DIFF zxid=0x" +
Long.toHexString(lastCommitedZxid) +
Long.toHexString(lastCommittedZxid) +
" for peer sid: " + getSid());
queueOpPacket(Leader.DIFF, lastCommitedZxid);
queueOpPacket(Leader.DIFF, lastCommittedZxid);
needOpPacket = false;
continue;
}

if (isPeerNewEpochZxid) {
// Send diff and fall through if zxid is of a new-epoch
LOG.info("Sending DIFF zxid=0x" +
Long.toHexString(lastCommitedZxid) +
Long.toHexString(lastCommittedZxid) +
" for peer sid: " + getSid());
queueOpPacket(Leader.DIFF, lastCommitedZxid);
queueOpPacket(Leader.DIFF, lastCommittedZxid);
needOpPacket = false;
} else if (packetZxid > peerLastZxid ) {
// Peer have some proposals that the leader hasn't seen yet
Expand Down Expand Up @@ -886,9 +887,9 @@ protected long queueCommittedProposals(Iterator<Proposal> itr,
// is the catch when our history older than learner and there is
// no new txn since then. So we need an empty diff
LOG.info("Sending DIFF zxid=0x" +
Long.toHexString(lastCommitedZxid) +
Long.toHexString(lastCommittedZxid) +
" for peer sid: " + getSid());
queueOpPacket(Leader.DIFF, lastCommitedZxid);
queueOpPacket(Leader.DIFF, lastCommittedZxid);
needOpPacket = false;
}

Expand Down

0 comments on commit a0eba7a

Please sign in to comment.