Skip to content

Commit

Permalink
ZOOKEEPER-2853: Update lastZxidSeen in FileTxnLog
Browse files Browse the repository at this point in the history
Author: Fangmin Lyu <[email protected]>

Reviewers: Michael Han <[email protected]>, maoling <[email protected]>

Closes apache#322 from lvfangmin/ZOOKEEPER-2853
  • Loading branch information
Fangmin Lyu authored and hanm committed Aug 3, 2017
1 parent 69c8cbe commit 5c4e443
Showing 1 changed file with 63 additions and 61 deletions.
124 changes: 63 additions & 61 deletions src/java/main/org/apache/zookeeper/server/persistence/FileTxnLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,39 +53,39 @@
* <blockquote><pre>
* LogFile:
* FileHeader TxnList ZeroPad
*
*
* FileHeader: {
* magic 4bytes (ZKLG)
* version 4bytes
* dbid 8bytes
* }
*
*
* TxnList:
* Txn || Txn TxnList
*
*
* Txn:
* checksum Txnlen TxnHeader Record 0x42
*
*
* checksum: 8bytes Adler32 is currently used
* calculated across payload -- Txnlen, TxnHeader, Record and 0x42
*
*
* Txnlen:
* len 4bytes
*
*
* TxnHeader: {
* sessionid 8bytes
* cxid 4bytes
* zxid 8bytes
* time 8bytes
* type 4bytes
* }
*
*
* Record:
* See Jute definition file for details on the various record types
*
*
* ZeroPad:
* 0 padded to EOF (filled during preallocation stage)
* </pre></blockquote>
* </pre></blockquote>
*/
public class FileTxnLog implements TxnLog {
private static final Logger LOG;
Expand Down Expand Up @@ -175,7 +175,7 @@ public synchronized void rollLog() throws IOException {
/**
* close all the open file handles
* @throws IOException
*/
*/
public synchronized void close() throws IOException {
if (logStream != null) {
logStream.close();
Expand All @@ -184,54 +184,56 @@ public synchronized void close() throws IOException {
log.close();
}
}

/**
* append an entry to the transaction log
* @param hdr the header of the transaction
* @param txn the transaction part of the entry
* returns true iff something appended, otw false
* returns true iff something appended, otw false
*/
public synchronized boolean append(TxnHeader hdr, Record txn)
throws IOException
{
if (hdr != null) {
if (hdr.getZxid() <= lastZxidSeen) {
LOG.warn("Current zxid " + hdr.getZxid()
+ " is <= " + lastZxidSeen + " for "
+ hdr.getType());
}
if (logStream==null) {
if(LOG.isInfoEnabled()){
LOG.info("Creating new log file: log." +
Long.toHexString(hdr.getZxid()));
}

logFileWrite = new File(logDir, ("log." +
Long.toHexString(hdr.getZxid())));
fos = new FileOutputStream(logFileWrite);
logStream=new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
fhdr.serialize(oa, "fileheader");
// Make sure that the magic number is written before padding.
logStream.flush();
currentSize = fos.getChannel().position();
streamsToFlush.add(fos);
}
padFile(fos);
byte[] buf = Util.marshallTxnEntry(hdr, txn);
if (buf == null || buf.length == 0) {
throw new IOException("Faulty serialization for header " +
"and txn");
}
Checksum crc = makeChecksumAlgorithm();
crc.update(buf, 0, buf.length);
oa.writeLong(crc.getValue(), "txnEntryCRC");
Util.writeTxnBytes(oa, buf);

return true;
if (hdr == null) {
return false;
}
return false;
if (hdr.getZxid() <= lastZxidSeen) {
LOG.warn("Current zxid " + hdr.getZxid()
+ " is <= " + lastZxidSeen + " for "
+ hdr.getType());
} else {
lastZxidSeen = hdr.getZxid();
}
if (logStream==null) {
if(LOG.isInfoEnabled()){
LOG.info("Creating new log file: log." +
Long.toHexString(hdr.getZxid()));
}

logFileWrite = new File(logDir, ("log." +
Long.toHexString(hdr.getZxid())));
fos = new FileOutputStream(logFileWrite);
logStream=new BufferedOutputStream(fos);
oa = BinaryOutputArchive.getArchive(logStream);
FileHeader fhdr = new FileHeader(TXNLOG_MAGIC,VERSION, dbId);
fhdr.serialize(oa, "fileheader");
// Make sure that the magic number is written before padding.
logStream.flush();
currentSize = fos.getChannel().position();
streamsToFlush.add(fos);
}
padFile(fos);
byte[] buf = Util.marshallTxnEntry(hdr, txn);
if (buf == null || buf.length == 0) {
throw new IOException("Faulty serialization for header " +
"and txn");
}
Checksum crc = makeChecksumAlgorithm();
crc.update(buf, 0, buf.length);
oa.writeLong(crc.getValue(), "txnEntryCRC");
Util.writeTxnBytes(oa, buf);

return true;
}

/**
Expand Down Expand Up @@ -456,10 +458,10 @@ public boolean isForceSync() {
}

/**
* a class that keeps track of the position
* a class that keeps track of the position
* in the input stream. The position points to offset
* that has been consumed by the applications. It can
* wrap buffered input streams to provide the right offset
* that has been consumed by the applications. It can
* wrap buffered input streams to provide the right offset
* for the application.
*/
static class PositionInputStream extends FilterInputStream {
Expand All @@ -468,7 +470,7 @@ protected PositionInputStream(InputStream in) {
super(in);
position = 0;
}

@Override
public int read() throws IOException {
int rc = super.read();
Expand All @@ -483,9 +485,9 @@ public int read(byte[] b) throws IOException {
if (rc > 0) {
position += rc;
}
return rc;
return rc;
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
int rc = super.read(b, off, len);
Expand All @@ -494,7 +496,7 @@ public int read(byte[] b, int off, int len) throws IOException {
}
return rc;
}

@Override
public long skip(long n) throws IOException {
long rc = super.skip(n);
Expand Down Expand Up @@ -522,7 +524,7 @@ public void reset() {
throw new UnsupportedOperationException("reset");
}
}

/**
* this class implements the txnlog iterator interface
* which is used for reading the transaction logs
Expand All @@ -535,7 +537,7 @@ public static class FileTxnIterator implements TxnLog.TxnIterator {
File logFile;
InputArchive ia;
static final String CRC_ERROR="CRC check failed";

PositionInputStream inputStream=null;
//stored files is the list of files greater than
//the zxid we are looking for.
Expand Down Expand Up @@ -564,7 +566,7 @@ public FileTxnIterator(File logDir, long zxid, boolean fastForward)
}
}
}

/**
* create an iterator over a transaction database directory
* @param logDir the transaction database directory
Expand Down Expand Up @@ -596,7 +598,7 @@ else if (Util.getZxidFromName(f.getName(), "log") < zxid) {
goToNextLog();
next();
}

/**
* Return total storage size of txnlog that will return by this iterator.
*/
Expand Down Expand Up @@ -634,7 +636,7 @@ protected void inStreamCreated(InputArchive ia, InputStream is)
FileHeader header= new FileHeader();
header.deserialize(ia, "fileheader");
if (header.getMagic() != FileTxnLog.TXNLOG_MAGIC) {
throw new IOException("Transaction log: " + this.logFile + " has invalid magic number "
throw new IOException("Transaction log: " + this.logFile + " has invalid magic number "
+ header.getMagic()
+ " != " + FileTxnLog.TXNLOG_MAGIC);
}
Expand Down

0 comments on commit 5c4e443

Please sign in to comment.