From dfaa19a0402b27c3fd711e57cd941bb011c4da70 Mon Sep 17 00:00:00 2001 From: dennis zhuang Date: Tue, 27 Oct 2020 20:32:04 +0800 Subject: [PATCH] (fix) revisiting segment log storage and fix some corner cases (#523) * (fix) revisiting segment log storage and fix some corner cases * (fix) test * (feat) ensure the value size is in file range * (feat) Adds log when checking election priority returns false * (fix) CR * (fix) by CR * (feat) ensure buffer's visibility * (feat) ensure buffer's visibility * (fix) blank segment may fail to save header, we must fix it * (fix) format * (fix) fail to mmap files are collected into corruptedHeaderSegments * (feat) delete the file when fail to create new segment --- .../com/alipay/sofa/jraft/core/NodeImpl.java | 4 + .../sofa/jraft/storage/log/AbortFile.java | 23 +- .../storage/log/RocksDBSegmentLogStorage.java | 236 +++++++++++++----- .../sofa/jraft/storage/log/SegmentFile.java | 89 ++++--- .../sofa/jraft/core/CliServiceTest.java | 8 +- .../com/alipay/sofa/jraft/core/NodeTest.java | 4 +- .../storage/impl/BaseLogStorageTest.java | 7 +- .../impl/RocksDBSegmentLogStorageTest.java | 121 ++++++++- .../PriorityElectionNode.java | 20 +- 9 files changed, 386 insertions(+), 126 deletions(-) diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java index 6c41d916c..21d4fe509 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java @@ -647,6 +647,8 @@ private boolean allowLaunchElection() { // Priority 0 is a special value so that a node will never participate in election. if (this.serverId.isPriorityNotElected()) { + LOG.warn("Node {} will never participate in election, because it's priority={}.", getNodeId(), + this.serverId.getPriority()); return false; } @@ -668,6 +670,8 @@ private boolean allowLaunchElection() { } if (this.electionTimeoutCounter == 1) { + LOG.debug("Node {} does not initiate leader election and waits for the next election timeout.", + getNodeId()); return false; } } diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/log/AbortFile.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/log/AbortFile.java index a39db9c6b..84f5f6c00 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/log/AbortFile.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/log/AbortFile.java @@ -40,25 +40,26 @@ public AbortFile(final String path) { } public boolean create() throws IOException { - return writeDate(); - } - - @SuppressWarnings("deprecation") - private boolean writeDate() throws IOException { final File file = new File(this.path); if (file.createNewFile()) { - try (FileWriter writer = new FileWriter(file, false)) { - writer.write(new Date().toGMTString()); - writer.write(System.lineSeparator()); - } + writeDate(); return true; } else { return false; } } - public boolean touch() throws IOException { - return writeDate(); + @SuppressWarnings("deprecation") + private void writeDate() throws IOException { + final File file = new File(this.path); + try (final FileWriter writer = new FileWriter(file, false)) { + writer.write(new Date().toGMTString()); + writer.write(System.lineSeparator()); + } + } + + public void touch() throws IOException { + writeDate(); } public boolean exists() { diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/log/RocksDBSegmentLogStorage.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/log/RocksDBSegmentLogStorage.java index 0b3fc7185..0e1077f5c 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/log/RocksDBSegmentLogStorage.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/log/RocksDBSegmentLogStorage.java @@ -293,8 +293,8 @@ public RocksDBSegmentLogStorage(final String path, final RaftOptions raftOptions public RocksDBSegmentLogStorage(final String path, final RaftOptions raftOptions, final int valueSizeThreshold, final int maxSegmentFileSize) { - this(path, raftOptions, DEFAULT_VALUE_SIZE_THRESHOLD, maxSegmentFileSize, PRE_ALLOCATE_SEGMENT_COUNT, - MEM_SEGMENT_COUNT, DEFAULT_CHECKPOINT_INTERVAL_MS, createDefaultWriteExecutor()); + this(path, raftOptions, valueSizeThreshold, maxSegmentFileSize, PRE_ALLOCATE_SEGMENT_COUNT, MEM_SEGMENT_COUNT, + DEFAULT_CHECKPOINT_INTERVAL_MS, createDefaultWriteExecutor()); } private static ThreadPoolExecutor createDefaultWriteExecutor() { @@ -338,7 +338,7 @@ private SegmentFile getLastSegmentFile(final long logIndex, final int waitToWrot if (!this.segments.isEmpty()) { segmentCount = this.segments.size(); - final SegmentFile currLastFile = this.segments.get(this.segments.size() - 1); + final SegmentFile currLastFile = getLastSegmentWithoutLock(); if (waitToWroteSize <= 0 || !currLastFile.reachesFileEndBy(waitToWroteSize)) { lastFile = currLastFile; } @@ -370,7 +370,7 @@ private SegmentFile createNewSegmentFile(final long logIndex, final int oldSegme } if (!this.segments.isEmpty()) { // Sync current last file and correct it's lastLogIndex. - final SegmentFile currLastFile = this.segments.get(this.segments.size() - 1); + final SegmentFile currLastFile = getLastSegmentWithoutLock(); currLastFile.setLastLogIndex(logIndex - 1); ctx.startJob(); // Attach a finish hook to set last segment file to be read-only. @@ -416,7 +416,8 @@ private SegmentFile allocateSegmentFile(final long index) throws InterruptedExce } private SegmentFile allocateNewSegmentFile() throws IOException { - SegmentFile segmentFile = new SegmentFile(this.maxSegmentFileSize, getNewSegmentFilePath(), this.writeExecutor); + final String newSegPath = getNewSegmentFilePath(); + SegmentFile segmentFile = new SegmentFile(this.maxSegmentFileSize, newSegPath, this.writeExecutor); final SegmentFileOptions opts = SegmentFileOptions.builder() // .setSync(false) // .setRecover(false) // @@ -424,12 +425,18 @@ private SegmentFile allocateNewSegmentFile() throws IOException { .setNewFile(true) // .setPos(0).build(); - if (!segmentFile.init(opts)) { - throw new IOException("Fail to create new segment file"); + try { + if (!segmentFile.init(opts)) { + throw new IOException("Fail to create new segment file"); + } + segmentFile.hintLoad(); + LOG.info("Create a new segment file {}.", segmentFile.getPath()); + return segmentFile; + } catch (IOException e) { + // Delete the file if fails + FileUtils.deleteQuietly(new File(newSegPath)); + throw e; } - segmentFile.hintLoad(); - LOG.info("Create a new segment file {}.", segmentFile.getPath()); - return segmentFile; } private String getNewSegmentFilePath() { @@ -468,6 +475,7 @@ protected boolean onInitLoaded() { } this.segments = new ArrayList<>(segmentFiles == null ? 10 : segmentFiles.length); this.blankSegments = new ArrayDeque<>(); + List corruptedHeaderSegments = new ArrayList<>(); if (segmentFiles != null && segmentFiles.length > 0) { // Sort by sequences. @@ -483,72 +491,34 @@ protected boolean onInitLoaded() { this.writeExecutor); if (!segmentFile.mmapFile(false)) { - LOG.error("Fail to mmap segment file {}.", segFile.getAbsoluteFile()); - return false; + assert (segmentFile.isHeaderCorrupted()); + corruptedHeaderSegments.add(segFile); + continue; } if (segmentFile.isBlank()) { - this.blankSegments.add(new AllocatedResult(segmentFile)); + this.blankSegments.add(new AllocatedResult(segmentFile)); + } else if (segmentFile.isHeaderCorrupted()) { + corruptedHeaderSegments.add(segFile); } else { - this.segments.add(segmentFile); + this.segments.add(segmentFile); } } + // Processing corrupted header files + //TODO(boyan) maybe we can find a better solution for such case that new allocated segment file is corrupted when power failure etc. + if(!processCorruptedHeaderFiles(corruptedHeaderSegments)) { + return false; + } + // init blank segments - for (AllocatedResult ret : this.blankSegments) { - final SegmentFile segmentFile = ret.segmentFile; - final SegmentFileOptions opts = SegmentFileOptions.builder().setSync(false).build(); - if (!segmentFile.init(opts)) { - LOG.error("Fail to load blank segment file {}.", segmentFile.getPath()); - segmentFile.shutdown(); - return false; - } + if(!initBlankFiles()) { + return false; } // try to recover segments - boolean needRecover = false; - SegmentFile prevFile = null; - for (int i = 0; i < this.segments.size(); i++) { - final boolean isLastFile = i == this.segments.size() - 1; - SegmentFile segmentFile = this.segments.get(i); - int pos = segmentFile.getSize(); - if (StringUtil.equalsIgnoreCase(checkpointSegFile, segmentFile.getFilename())) { - needRecover = true; - assert (checkpoint != null); - pos = checkpoint.committedPos; - } else { - if (needRecover) { - pos = 0; - } - } - - final SegmentFileOptions opts = SegmentFileOptions.builder() // - .setSync(isSync()) // - .setRecover(needRecover && !normalExit) // - .setLastFile(isLastFile) // - .setNewFile(false) // - .setPos(pos).build(); - - if (!segmentFile.init(opts)) { - LOG.error("Fail to load segment file {}.", segmentFile.getPath()); - segmentFile.shutdown(); - return false; - } - /** - * It's wrote position is from start(HEADER_SIZE) but it's not the last file, SHOULD not happen. - */ - if (segmentFile.getWrotePos() == SegmentFile.HEADER_SIZE && !isLastFile) { - LOG.error("Detected corrupted segment file {}.", segmentFile.getPath()); - return false; - } - - if (prevFile != null) { - prevFile.setLastLogIndex(segmentFile.getFirstLogIndex() - 1); - } - prevFile = segmentFile; - } - if (getLastLogIndex() > 0 && prevFile != null) { - prevFile.setLastLogIndex(getLastLogIndex()); + if(!recoverFiles(checkpoint, normalExit, checkpointSegFile)) { + return false; } } else { if (checkpoint != null) { @@ -584,6 +554,95 @@ protected boolean onInitLoaded() { } } + private boolean recoverFiles(final Checkpoint checkpoint, final boolean normalExit, final String checkpointSegFile) { + boolean needRecover = false; + SegmentFile prevFile = null; + for (int i = 0; i < this.segments.size(); i++) { + final boolean isLastFile = i == this.segments.size() - 1; + SegmentFile segmentFile = this.segments.get(i); + int pos = segmentFile.getSize(); + if (StringUtil.equalsIgnoreCase(checkpointSegFile, segmentFile.getFilename())) { + needRecover = true; + assert (checkpoint != null); + pos = checkpoint.committedPos; + } else { + if (needRecover) { + pos = 0; + } + } + + final SegmentFileOptions opts = SegmentFileOptions.builder() // + .setSync(isSync()) // + .setRecover(needRecover && !normalExit) // + .setLastFile(isLastFile) // + .setNewFile(false) // + .setPos(pos).build(); + + if (!segmentFile.init(opts)) { + LOG.error("Fail to load segment file {}.", segmentFile.getPath()); + segmentFile.shutdown(); + return false; + } + /** + * It's wrote position is from start(HEADER_SIZE) but it's not the last file, SHOULD not happen. + */ + if (segmentFile.getWrotePos() == SegmentFile.HEADER_SIZE && !isLastFile) { + LOG.error("Detected corrupted segment file {}.", segmentFile.getPath()); + return false; + } + + if (prevFile != null) { + prevFile.setLastLogIndex(segmentFile.getFirstLogIndex() - 1); + } + prevFile = segmentFile; + } + if (getLastLogIndex() > 0 && prevFile != null) { + prevFile.setLastLogIndex(getLastLogIndex()); + } + return true; + } + + private boolean initBlankFiles() { + for (AllocatedResult ret : this.blankSegments) { + final SegmentFile segmentFile = ret.segmentFile; + final SegmentFileOptions opts = SegmentFileOptions.builder() // + .setSync(false) // + .setRecover(false) // + .setLastFile(true) // + .build(); + + if (!segmentFile.init(opts)) { + LOG.error("Fail to load blank segment file {}.", segmentFile.getPath()); + segmentFile.shutdown(); + return false; + } + } + return true; + } + + private boolean processCorruptedHeaderFiles(final List corruptedHeaderSegments) throws IOException { + if (corruptedHeaderSegments.size() == 1) { + final File corruptedFile = corruptedHeaderSegments.get(0); + if (getFileSequenceFromFileName(corruptedFile) != this.nextFileSequence.get() - 1) { + LOG.error("Detected corrupted header segment file {}.", corruptedFile); + return false; + } else { + // The file is the last file,it's the new blank segment but fail to save header, we can + // remove it safely. + LOG.warn("Truncate the last segment file {} which it's header is corrupted.", + corruptedFile.getAbsolutePath()); + // We don't want to delete it, but rename it for safety. + FileUtils.moveFile(corruptedFile, new File(corruptedFile.getAbsolutePath() + ".corrupted")); + } + } else if (corruptedHeaderSegments.size() > 1) { + // FATAL: it should not happen. + LOG.error("Detected corrupted header segment files: {}.", corruptedHeaderSegments); + return false; + } + + return true; + } + private void startSegmentAllocator() throws IOException { // Warmup if (this.blankSegments.isEmpty()) { @@ -794,7 +853,7 @@ private void doCheckpoint() { } } - private SegmentFile getLastSegmentFileForRead() throws IOException, InterruptedException { + public SegmentFile getLastSegmentFileForRead() throws IOException, InterruptedException { return getLastSegmentFile(-1, 0, false, null); } @@ -815,6 +874,10 @@ protected void onReset(final long nextLogIndex) { } } + private SegmentFile getLastSegmentWithoutLock() { + return this.segments.get(this.segments.size() - 1); + } + @Override protected void onTruncatePrefix(final long startIndex, final long firstIndexKept) throws RocksDBException, IOException { @@ -822,13 +885,25 @@ protected void onTruncatePrefix(final long startIndex, final long firstIndexKept this.writeLock.lock(); try { int fromIndex = binarySearchFileIndexByLogIndex(startIndex); - final int toIndex = binarySearchFileIndexByLogIndex(firstIndexKept); + int toIndex = binarySearchFileIndexByLogIndex(firstIndexKept); if (fromIndex < 0) { fromIndex = 0; } if (toIndex < 0) { - return; + // When all the segments contain logs that index is smaller than firstIndexKept, + // truncate all segments. + do { + if (!this.segments.isEmpty()) { + if (getLastSegmentWithoutLock().getLastLogIndex() < firstIndexKept) { + toIndex = this.segments.size(); + break; + } + } + LOG.warn("Segment file not found by logIndex={} to be truncate_prefix, current segments:\n{}.", + firstIndexKept, descSegments()); + return; + } while (false); } final List removedFiles = this.segments.subList(fromIndex, toIndex); @@ -854,6 +929,10 @@ private boolean isMetadata(final byte[] data) { return true; } + private SegmentFile getFirstSegmentWithoutLock() { + return this.segments.get(0); + } + @Override protected void onTruncateSuffix(final long lastIndexKept) throws RocksDBException, IOException { List destroyedFiles = null; @@ -863,6 +942,20 @@ protected void onTruncateSuffix(final long lastIndexKept) throws RocksDBExceptio int toIndex = binarySearchFileIndexByLogIndex(getLastLogIndex()); if (keptFileIndex < 0) { + // When all the segments contain logs that index is greater than lastIndexKept, + // truncate all segments. + if (!this.segments.isEmpty()) { + final long firstLogIndex = getFirstSegmentWithoutLock().getFirstLogIndex(); + if (firstLogIndex > lastIndexKept) { + final List removedFiles = this.segments.subList(0, this.segments.size()); + destroyedFiles = new ArrayList<>(removedFiles); + removedFiles.clear(); + } + LOG.info( + "Truncating all segments in {} because the first log index {} is greater than lastIndexKept={}", + this.segmentsPath, firstLogIndex, lastIndexKept); + } + LOG.warn("Segment file not found by logIndex={} to be truncate_suffix, current segments:\n{}.", lastIndexKept, descSegments()); return; @@ -985,7 +1078,12 @@ protected WriteContext newWriteContext() { @Override protected byte[] onDataAppend(final long logIndex, final byte[] value, final WriteContext ctx) throws IOException, InterruptedException { - SegmentFile lastSegmentFile = getLastSegmentFile(logIndex, SegmentFile.getWriteBytes(value), true, ctx); + final int waitToWroteBytes = SegmentFile.getWriteBytes(value); + SegmentFile lastSegmentFile = getLastSegmentFile(logIndex, waitToWroteBytes, true, ctx); + if (lastSegmentFile.reachesFileEndBy(waitToWroteBytes)) { + throw new IOException("Too large value size: " + value.length + ", maxSegmentFileSize=" + + this.maxSegmentFileSize); + } if (value.length < this.valueSizeThreshold) { // Small value will be stored in rocksdb directly. lastSegmentFile.setLastLogIndex(logIndex); diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/log/SegmentFile.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/log/SegmentFile.java index 8dfc79abe..2086930aa 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/log/SegmentFile.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/storage/log/SegmentFile.java @@ -42,6 +42,7 @@ import com.alipay.sofa.jraft.storage.log.SegmentFile.SegmentFileOptions; import com.alipay.sofa.jraft.util.Bits; import com.alipay.sofa.jraft.util.BytesUtil; +import com.alipay.sofa.jraft.util.OnlyForTest; import com.alipay.sofa.jraft.util.Utils; import com.sun.jna.NativeLong; import com.sun.jna.Pointer; @@ -78,7 +79,7 @@ public class SegmentFile implements Lifecycle { * @author boyan(boyan@antfin.com) * */ - private static class SegmentHeader { + public static class SegmentHeader { private static final long RESERVED_FLAG = 0L; // The file first log index(inclusive) @@ -250,7 +251,8 @@ long getLastLogIndex() { return this.lastLogIndex; } - int getWrotePos() { + @OnlyForTest + public int getWrotePos() { return this.wrotePos; } @@ -282,6 +284,10 @@ boolean isBlank() { return this.header.firstLogIndex == BLANK_LOG_INDEX; } + boolean isHeaderCorrupted() { + return this.header == null; + } + String getPath() { return this.path; } @@ -302,9 +308,10 @@ private void swapIn() { if (!this.swappedOut) { return; } + long startMs = Utils.monotonicMs(); mmapFile(false); this.swappedOut = false; - LOG.info("Swapped in segment file {}", this.path); + LOG.info("Swapped in segment file {} cost {} ms.", this.path, Utils.monotonicMs() - startMs); } finally { this.writeLock.unlock(); } @@ -350,7 +357,7 @@ public void swapOut() { unmap(this.buffer); this.buffer = null; this.swappedOutTimestamp = now; - LOG.info("Swapped out segment file {}", this.path); + LOG.info("Swapped out segment file {} cost {} ms.", this.path, Utils.monotonicMs() - now); } finally { this.writeLock.unlock(); } @@ -366,6 +373,9 @@ public void swapOut() { public void truncateSuffix(final int wrotePos, final long logIndex, final boolean sync) { this.writeLock.lock(); try { + if (wrotePos >= this.wrotePos) { + return; + } swapInIfNeed(); final int oldPos = this.wrotePos; clear(wrotePos, sync); @@ -411,7 +421,7 @@ public void clear(final int startPos, final boolean sync) { this.buffer.put(i, (byte) 0); } if (sync) { - fsync(); + fsync(this.buffer); } LOG.info("Segment file {} cleared data in [{}, {}).", this.path, startPos, endPos); } finally { @@ -519,8 +529,10 @@ boolean mmapFile(final boolean create) { if (file.exists()) { this.size = (int) file.length(); } else { - LOG.error("File {} is not exists.", this.path); - return false; + if (!create) { + LOG.error("File {} is not exists.", this.path); + return false; + } } try (FileChannel fc = openFileChannel(create)) { this.buffer = fc.map(MapMode.READ_WRITE, 0, this.size); @@ -563,7 +575,7 @@ void saveHeader(final boolean sync) { assert (headerBuf.remaining() == HEADER_SIZE); this.buffer.put(headerBuf); if (sync) { - fsync(); + fsync(this.buffer); } } finally { this.buffer.position(oldPos); @@ -622,19 +634,34 @@ private boolean recover(final SegmentFileOptions opts) throws IOException { } if (this.buffer.remaining() < RECORD_DATA_LENGTH_SIZE) { - LOG.error("Corrupted data length in segment file {} at pos={}, will truncate it.", this.path, - this.buffer.position()); - truncateFile(opts.sync); - break; + if (opts.isLastFile) { + LOG.error("Corrupted data length in segment file {} at pos={}, will truncate it.", this.path, + this.buffer.position()); + truncateFile(opts.sync); + break; + } else { + LOG.error( + "Fail to recover segment file {}, invalid data length remaining: {}, expected {} at pos={}.", + this.path, this.buffer.remaining(), RECORD_DATA_LENGTH_SIZE, this.wrotePos); + return false; + } } final int dataLen = this.buffer.getInt(); if (this.buffer.remaining() < dataLen) { - LOG.error( - "Corrupted data in segment file {} at pos={}, expectDataLength={}, but remaining is {}, will truncate it.", - this.path, this.buffer.position(), dataLen, this.buffer.remaining()); - truncateFile(opts.sync); - break; + if (opts.isLastFile) { + LOG.error( + "Corrupted data in segment file {} at pos={}, expectDataLength={}, but remaining is {}, will truncate it.", + this.path, this.buffer.position(), dataLen, this.buffer.remaining()); + truncateFile(opts.sync); + break; + } else { + LOG.error( + "Fail to recover segment file {}, invalid data: expected {} bytes in buf but actual {} at pos={}.", + this.path, dataLen, this.buffer.remaining(), this.wrotePos); + return false; + } + } // Skip data this.buffer.position(this.buffer.position() + dataLen); @@ -678,9 +705,11 @@ static int getWriteBytes(final byte[] data) { @SuppressWarnings("NonAtomicOperationOnVolatileField") public int write(final long logIndex, final byte[] data, final WriteContext ctx) { int pos = -1; + MappedByteBuffer buf = null; this.writeLock.lock(); try { assert (this.wrotePos == this.buffer.position()); + buf = this.buffer; pos = this.wrotePos; this.wrotePos += RECORD_MAGIC_BYTES_SIZE + RECORD_DATA_LENGTH_SIZE + data.length; this.buffer.position(this.wrotePos); @@ -695,11 +724,12 @@ public int write(final long logIndex, final byte[] data, final WriteContext ctx) } finally { this.writeLock.unlock(); final int wroteIndex = pos; + final MappedByteBuffer buffer = buf; this.writeExecutor.execute(() -> { try { - put(wroteIndex, RECORD_MAGIC_BYTES); - putInt(wroteIndex + RECORD_MAGIC_BYTES_SIZE, data.length); - put(wroteIndex + RECORD_MAGIC_BYTES_SIZE + RECORD_DATA_LENGTH_SIZE, data); + put(buffer, wroteIndex, RECORD_MAGIC_BYTES); + putInt(buffer, wroteIndex + RECORD_MAGIC_BYTES_SIZE, data.length); + put(buffer, wroteIndex + RECORD_MAGIC_BYTES_SIZE + RECORD_DATA_LENGTH_SIZE, data); } catch (final Exception e) { ctx.setError(e); } finally { @@ -709,17 +739,17 @@ public int write(final long logIndex, final byte[] data, final WriteContext ctx) } } - private void putInt(final int index, final int n) { + private static void putInt(final MappedByteBuffer buffer, final int index, final int n) { byte[] bs = new byte[RECORD_DATA_LENGTH_SIZE]; Bits.putInt(bs, 0, n); for (int i = 0; i < bs.length; i++) { - this.buffer.put(index + i, bs[i]); + buffer.put(index + i, bs[i]); } } - private void put(final int index, final byte[] data) { + private static void put(final MappedByteBuffer buffer, final int index, final byte[] data) { for (int i = 0; i < data.length; i++) { - this.buffer.put(index + i, data[i]); + buffer.put(index + i, data[i]); } } @@ -754,6 +784,7 @@ public byte[] read(final long logIndex, final int pos) throws IOException { } readBuffer.position(pos + RECORD_MAGIC_BYTES_SIZE); final int dataLen = readBuffer.getInt(); + //TODO(boyan) reuse data array? final byte[] data = new byte[dataLen]; readBuffer.get(data); return data; @@ -773,25 +804,27 @@ private void swapInIfNeed() { * storage device containing the mapped file. */ public void sync(final boolean sync) throws IOException { + MappedByteBuffer buf = null; this.writeLock.lock(); try { if (this.committedPos >= this.wrotePos) { return; } this.committedPos = this.wrotePos; + buf = this.buffer; LOG.debug("Commit segment file {} at pos {}.", this.path, this.committedPos); } finally { this.writeLock.unlock(); } if (sync) { - fsync(); + fsync(buf); } } - private void fsync() { - if (this.buffer != null) { + private void fsync(final MappedByteBuffer buffer) { + if (buffer != null) { long startMs = Utils.monotonicMs(); - this.buffer.force(); + buffer.force(); final long cost = Utils.monotonicMs() - startMs; if (cost >= FSYNC_COST_MS_THRESHOLD) { LOG.warn("Call fsync on file {} cost {} ms.", this.path, cost); diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/CliServiceTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/CliServiceTest.java index 377f0962f..d51258c0b 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/CliServiceTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/CliServiceTest.java @@ -163,11 +163,11 @@ public void testLearnerServices() throws Exception { // Add learner3 this.cliService.addLearners(this.groupId, this.conf, Arrays.asList(learner3)); - Thread.sleep(100); + Thread.sleep(1000); assertEquals(10, this.cluster.getFsmByPeer(learner3).getLogs().size()); sendTestTaskAndWait(this.cluster.getLeader(), 0); - Thread.sleep(500); + Thread.sleep(1000); for (final MockStateMachine fsm : this.cluster.getFsms()) { assertEquals(20, fsm.getLogs().size()); @@ -180,7 +180,7 @@ public void testLearnerServices() throws Exception { // Remove 3 this.cliService.removeLearners(this.groupId, this.conf, Arrays.asList(learner3)); sendTestTaskAndWait(this.cluster.getLeader(), 0); - Thread.sleep(500); + Thread.sleep(1000); for (final MockStateMachine fsm : this.cluster.getFsms()) { if (!fsm.getAddress().equals(learner3.getEndpoint())) { assertEquals(30, fsm.getLogs().size()); @@ -197,7 +197,7 @@ public void testLearnerServices() throws Exception { assertEquals(30, this.cluster.getFsmByPeer(learner3).getLogs().size()); sendTestTaskAndWait(this.cluster.getLeader(), 0); - Thread.sleep(500); + Thread.sleep(1000); // Latest 10 logs are not replicated to learner1 and learner2, because they were removed by resetting learners set. for (final MockStateMachine fsm : this.cluster.getFsms()) { if (!oldLearners.contains(new PeerId(fsm.getAddress(), 0))) { diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java index b3196de1c..b1fc1edd6 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java @@ -1550,8 +1550,8 @@ public void run(final Status status, final long theIndex, final byte[] reqCtx) { assertArrayEquals(requestContext, reqCtx); success.set(true); } else { - assertTrue(status.getErrorMsg().contains("RPC exception:Check connection[")); - assertTrue(status.getErrorMsg().contains("] fail and try to create new one")); + assertTrue(status.getErrorMsg(), status.getErrorMsg().contains("RPC exception:Check connection[")); + assertTrue(status.getErrorMsg(), status.getErrorMsg().contains("] fail and try to create new one")); } latch.countDown(); } diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/BaseLogStorageTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/BaseLogStorageTest.java index 26e550637..2d4532b38 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/BaseLogStorageTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/BaseLogStorageTest.java @@ -46,7 +46,7 @@ import com.alipay.sofa.jraft.util.Utils; public abstract class BaseLogStorageTest extends BaseStorageTest { - private LogStorage logStorage; + protected LogStorage logStorage; private ConfigurationManager confManager; private LogEntryCodecFactory logEntryCodecFactory; @@ -65,7 +65,7 @@ public void setup() throws Exception { protected abstract LogStorage newLogStorage(); - private LogStorageOptions newLogStorageOptions() { + protected LogStorageOptions newLogStorageOptions() { final LogStorageOptions opts = new LogStorageOptions(); opts.setConfigurationManager(this.confManager); opts.setLogEntryCodecFactory(this.logEntryCodecFactory); @@ -182,9 +182,6 @@ public void testTruncatePrefix() { @Test public void testAppendMantyLargeEntries() { - - appendLargeEntries(10000, 1024, 10); - final long start = Utils.monotonicMs(); final int totalLogs = 100000; final int logSize = 16 * 1024; diff --git a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/RocksDBSegmentLogStorageTest.java b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/RocksDBSegmentLogStorageTest.java index ca12e2d30..818a1326a 100644 --- a/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/RocksDBSegmentLogStorageTest.java +++ b/jraft-core/src/test/java/com/alipay/sofa/jraft/storage/impl/RocksDBSegmentLogStorageTest.java @@ -16,15 +16,134 @@ */ package com.alipay.sofa.jraft.storage.impl; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import java.io.File; +import java.util.Arrays; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.commons.io.FileUtils; +import org.junit.Test; +import com.alipay.sofa.jraft.option.LogStorageOptions; import com.alipay.sofa.jraft.option.RaftOptions; import com.alipay.sofa.jraft.storage.LogStorage; import com.alipay.sofa.jraft.storage.log.RocksDBSegmentLogStorage; +import com.alipay.sofa.jraft.test.TestUtils; public class RocksDBSegmentLogStorageTest extends BaseLogStorageTest { @Override protected LogStorage newLogStorage() { - return new RocksDBSegmentLogStorage(this.path, new RaftOptions()); + return new RocksDBSegmentLogStorage(this.path, new RaftOptions(), 0, 1024 * 1024); + } + + @Test + public void testTruncateChaos() throws Exception { + int times = 100; + int n = 100; + + for (int i = 0; i < times; i++) { + this.logStorage.shutdown(); + FileUtils.deleteDirectory(new File(this.path)); + this.path = TestUtils.mkTempDir(); + FileUtils.forceMkdir(new File(this.path)); + this.logStorage = new RocksDBSegmentLogStorage(this.path, new RaftOptions(), 32, 256); + final LogStorageOptions opts = newLogStorageOptions(); + this.logStorage.init(opts); + + for (int j = 0; j < n; j++) { + this.logStorage.appendEntries(Arrays.asList(TestUtils.mockEntry(j, 1, ThreadLocalRandom.current() + .nextInt(180)))); + } + + int index = ThreadLocalRandom.current().nextInt(n); + boolean truncatePrefix = ThreadLocalRandom.current().nextBoolean(); + if (truncatePrefix) { + this.logStorage.truncatePrefix(index); + + for (int j = 0; j < n; j++) { + if (j < index) { + assertNull(this.logStorage.getEntry(j)); + } else { + assertNotNull(this.logStorage.getEntry(j)); + } + } + } else { + this.logStorage.truncateSuffix(index); + + for (int j = 0; j < n; j++) { + if (j <= index) { + assertNotNull(this.logStorage.getEntry(j)); + } else { + assertNull(this.logStorage.getEntry(j)); + } + } + } + } + } + + @Test + public void testTruncateSuffixWithDifferentValueSize() throws Exception { + // shutdown the old one + this.logStorage.shutdown(); + // Set value threshold to be 32 bytes. + this.logStorage = new RocksDBSegmentLogStorage(this.path, new RaftOptions(), 32, 64); + final LogStorageOptions opts = newLogStorageOptions(); + this.logStorage.init(opts); + int term = 1; + + for (int i = 0; i < 10; i++) { + this.logStorage.appendEntries(Arrays.asList(TestUtils.mockEntry(i, term, i))); + } + + this.logStorage.appendEntries(Arrays.asList(TestUtils.mockEntry(10, term, 64))); + + for (int i = 11; i < 20; i++) { + this.logStorage.appendEntries(Arrays.asList(TestUtils.mockEntry(i, term, i))); + } + + for (int i = 0; i < 20; i++) { + assertNotNull(this.logStorage.getEntry(i)); + } + + assertEquals(((RocksDBSegmentLogStorage) this.logStorage).getLastSegmentFileForRead().getWrotePos(), 179); + + this.logStorage.truncateSuffix(15); + + for (int i = 0; i < 20; i++) { + if (i <= 15) { + assertNotNull(this.logStorage.getEntry(i)); + } else { + assertNull(this.logStorage.getEntry(i)); + } + } + + assertEquals(((RocksDBSegmentLogStorage) this.logStorage).getLastSegmentFileForRead().getWrotePos(), 102); + + this.logStorage.truncateSuffix(13); + + for (int i = 0; i < 20; i++) { + if (i <= 13) { + assertNotNull(this.logStorage.getEntry(i)); + } else { + assertNull(this.logStorage.getEntry(i)); + } + } + + assertEquals(((RocksDBSegmentLogStorage) this.logStorage).getLastSegmentFileForRead().getWrotePos(), 102); + + this.logStorage.truncateSuffix(5); + for (int i = 0; i < 20; i++) { + if (i <= 5) { + assertNotNull(this.logStorage.getEntry(i)); + } else { + assertNull(this.logStorage.getEntry(i)); + } + } + + assertNull(((RocksDBSegmentLogStorage) this.logStorage).getLastSegmentFileForRead()); + this.logStorage.appendEntries(Arrays.asList(TestUtils.mockEntry(20, term, 10))); + assertNotNull(this.logStorage.getEntry(20)); } } diff --git a/jraft-example/src/main/java/com/alipay/sofa/jraft/example/priorityelection/PriorityElectionNode.java b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/priorityelection/PriorityElectionNode.java index 98e51fb41..86f2c8d18 100644 --- a/jraft-example/src/main/java/com/alipay/sofa/jraft/example/priorityelection/PriorityElectionNode.java +++ b/jraft-example/src/main/java/com/alipay/sofa/jraft/example/priorityelection/PriorityElectionNode.java @@ -21,11 +21,9 @@ import java.nio.file.Paths; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; - import org.apache.commons.io.FileUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import com.alipay.sofa.jraft.Lifecycle; import com.alipay.sofa.jraft.Node; import com.alipay.sofa.jraft.RaftGroupService; @@ -89,7 +87,17 @@ public boolean init(final PriorityElectionNodeOptions opts) { if (!serverId.parse(opts.getServerAddress())) { throw new IllegalArgumentException("Fail to parse serverId: " + opts.getServerAddress()); } - // Set priority value, required + + /** + * Set priority value, required for priority-based election, it must be a positive value when + * enable the feature, some special value meaning: + *
    + *
  • -1 : disable priority-based election.
  • + *
  • 0: will never participate in election.
  • + *
  • 1: minimum value
  • + *
+ * value. + */ nodeOpts.setElectionPriority(serverId.getPriority()); final RpcServer rpcServer = RaftRpcServerFactory.createRaftRpcServer(serverId.getEndpoint()); @@ -119,15 +127,15 @@ public void shutdown() { } public Node getNode() { - return node; + return this.node; } public PriorityElectionOnlyStateMachine getFsm() { - return fsm; + return this.fsm; } public boolean isStarted() { - return started; + return this.started; } public boolean isLeader() {