Skip to content

Commit

Permalink
(fix) revisiting segment log storage and fix some corner cases (sofas…
Browse files Browse the repository at this point in the history
…tack#523)

* (fix) revisiting segment log storage and fix some corner cases

* (fix) test

* (feat) ensure the value size is in file range

* (feat) Adds log when checking election priority returns false

* (fix) CR

* (fix) by CR

* (feat) ensure buffer's visibility

* (feat) ensure buffer's visibility

* (fix) blank segment may fail to save header, we must fix it

* (fix) format

* (fix) fail to mmap files are collected into corruptedHeaderSegments

* (feat) delete the file when fail to create new segment
  • Loading branch information
killme2008 authored Oct 27, 2020
1 parent 0d9f17c commit dfaa19a
Show file tree
Hide file tree
Showing 9 changed files with 386 additions and 126 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,8 @@ private boolean allowLaunchElection() {

// Priority 0 is a special value so that a node will never participate in election.
if (this.serverId.isPriorityNotElected()) {
LOG.warn("Node {} will never participate in election, because it's priority={}.", getNodeId(),
this.serverId.getPriority());
return false;
}

Expand All @@ -668,6 +670,8 @@ private boolean allowLaunchElection() {
}

if (this.electionTimeoutCounter == 1) {
LOG.debug("Node {} does not initiate leader election and waits for the next election timeout.",
getNodeId());
return false;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,26 @@ public AbortFile(final String path) {
}

public boolean create() throws IOException {
return writeDate();
}

@SuppressWarnings("deprecation")
private boolean writeDate() throws IOException {
final File file = new File(this.path);
if (file.createNewFile()) {
try (FileWriter writer = new FileWriter(file, false)) {
writer.write(new Date().toGMTString());
writer.write(System.lineSeparator());
}
writeDate();
return true;
} else {
return false;
}
}

public boolean touch() throws IOException {
return writeDate();
@SuppressWarnings("deprecation")
private void writeDate() throws IOException {
final File file = new File(this.path);
try (final FileWriter writer = new FileWriter(file, false)) {
writer.write(new Date().toGMTString());
writer.write(System.lineSeparator());
}
}

public void touch() throws IOException {
writeDate();
}

public boolean exists() {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.alipay.sofa.jraft.storage.log.SegmentFile.SegmentFileOptions;
import com.alipay.sofa.jraft.util.Bits;
import com.alipay.sofa.jraft.util.BytesUtil;
import com.alipay.sofa.jraft.util.OnlyForTest;
import com.alipay.sofa.jraft.util.Utils;
import com.sun.jna.NativeLong;
import com.sun.jna.Pointer;
Expand Down Expand Up @@ -78,7 +79,7 @@ public class SegmentFile implements Lifecycle<SegmentFileOptions> {
* @author boyan([email protected])
*
*/
private static class SegmentHeader {
public static class SegmentHeader {

private static final long RESERVED_FLAG = 0L;
// The file first log index(inclusive)
Expand Down Expand Up @@ -250,7 +251,8 @@ long getLastLogIndex() {
return this.lastLogIndex;
}

int getWrotePos() {
@OnlyForTest
public int getWrotePos() {
return this.wrotePos;
}

Expand Down Expand Up @@ -282,6 +284,10 @@ boolean isBlank() {
return this.header.firstLogIndex == BLANK_LOG_INDEX;
}

boolean isHeaderCorrupted() {
return this.header == null;
}

String getPath() {
return this.path;
}
Expand All @@ -302,9 +308,10 @@ private void swapIn() {
if (!this.swappedOut) {
return;
}
long startMs = Utils.monotonicMs();
mmapFile(false);
this.swappedOut = false;
LOG.info("Swapped in segment file {}", this.path);
LOG.info("Swapped in segment file {} cost {} ms.", this.path, Utils.monotonicMs() - startMs);
} finally {
this.writeLock.unlock();
}
Expand Down Expand Up @@ -350,7 +357,7 @@ public void swapOut() {
unmap(this.buffer);
this.buffer = null;
this.swappedOutTimestamp = now;
LOG.info("Swapped out segment file {}", this.path);
LOG.info("Swapped out segment file {} cost {} ms.", this.path, Utils.monotonicMs() - now);
} finally {
this.writeLock.unlock();
}
Expand All @@ -366,6 +373,9 @@ public void swapOut() {
public void truncateSuffix(final int wrotePos, final long logIndex, final boolean sync) {
this.writeLock.lock();
try {
if (wrotePos >= this.wrotePos) {
return;
}
swapInIfNeed();
final int oldPos = this.wrotePos;
clear(wrotePos, sync);
Expand Down Expand Up @@ -411,7 +421,7 @@ public void clear(final int startPos, final boolean sync) {
this.buffer.put(i, (byte) 0);
}
if (sync) {
fsync();
fsync(this.buffer);
}
LOG.info("Segment file {} cleared data in [{}, {}).", this.path, startPos, endPos);
} finally {
Expand Down Expand Up @@ -519,8 +529,10 @@ boolean mmapFile(final boolean create) {
if (file.exists()) {
this.size = (int) file.length();
} else {
LOG.error("File {} is not exists.", this.path);
return false;
if (!create) {
LOG.error("File {} is not exists.", this.path);
return false;
}
}
try (FileChannel fc = openFileChannel(create)) {
this.buffer = fc.map(MapMode.READ_WRITE, 0, this.size);
Expand Down Expand Up @@ -563,7 +575,7 @@ void saveHeader(final boolean sync) {
assert (headerBuf.remaining() == HEADER_SIZE);
this.buffer.put(headerBuf);
if (sync) {
fsync();
fsync(this.buffer);
}
} finally {
this.buffer.position(oldPos);
Expand Down Expand Up @@ -622,19 +634,34 @@ private boolean recover(final SegmentFileOptions opts) throws IOException {
}

if (this.buffer.remaining() < RECORD_DATA_LENGTH_SIZE) {
LOG.error("Corrupted data length in segment file {} at pos={}, will truncate it.", this.path,
this.buffer.position());
truncateFile(opts.sync);
break;
if (opts.isLastFile) {
LOG.error("Corrupted data length in segment file {} at pos={}, will truncate it.", this.path,
this.buffer.position());
truncateFile(opts.sync);
break;
} else {
LOG.error(
"Fail to recover segment file {}, invalid data length remaining: {}, expected {} at pos={}.",
this.path, this.buffer.remaining(), RECORD_DATA_LENGTH_SIZE, this.wrotePos);
return false;
}
}

final int dataLen = this.buffer.getInt();
if (this.buffer.remaining() < dataLen) {
LOG.error(
"Corrupted data in segment file {} at pos={}, expectDataLength={}, but remaining is {}, will truncate it.",
this.path, this.buffer.position(), dataLen, this.buffer.remaining());
truncateFile(opts.sync);
break;
if (opts.isLastFile) {
LOG.error(
"Corrupted data in segment file {} at pos={}, expectDataLength={}, but remaining is {}, will truncate it.",
this.path, this.buffer.position(), dataLen, this.buffer.remaining());
truncateFile(opts.sync);
break;
} else {
LOG.error(
"Fail to recover segment file {}, invalid data: expected {} bytes in buf but actual {} at pos={}.",
this.path, dataLen, this.buffer.remaining(), this.wrotePos);
return false;
}

}
// Skip data
this.buffer.position(this.buffer.position() + dataLen);
Expand Down Expand Up @@ -678,9 +705,11 @@ static int getWriteBytes(final byte[] data) {
@SuppressWarnings("NonAtomicOperationOnVolatileField")
public int write(final long logIndex, final byte[] data, final WriteContext ctx) {
int pos = -1;
MappedByteBuffer buf = null;
this.writeLock.lock();
try {
assert (this.wrotePos == this.buffer.position());
buf = this.buffer;
pos = this.wrotePos;
this.wrotePos += RECORD_MAGIC_BYTES_SIZE + RECORD_DATA_LENGTH_SIZE + data.length;
this.buffer.position(this.wrotePos);
Expand All @@ -695,11 +724,12 @@ public int write(final long logIndex, final byte[] data, final WriteContext ctx)
} finally {
this.writeLock.unlock();
final int wroteIndex = pos;
final MappedByteBuffer buffer = buf;
this.writeExecutor.execute(() -> {
try {
put(wroteIndex, RECORD_MAGIC_BYTES);
putInt(wroteIndex + RECORD_MAGIC_BYTES_SIZE, data.length);
put(wroteIndex + RECORD_MAGIC_BYTES_SIZE + RECORD_DATA_LENGTH_SIZE, data);
put(buffer, wroteIndex, RECORD_MAGIC_BYTES);
putInt(buffer, wroteIndex + RECORD_MAGIC_BYTES_SIZE, data.length);
put(buffer, wroteIndex + RECORD_MAGIC_BYTES_SIZE + RECORD_DATA_LENGTH_SIZE, data);
} catch (final Exception e) {
ctx.setError(e);
} finally {
Expand All @@ -709,17 +739,17 @@ public int write(final long logIndex, final byte[] data, final WriteContext ctx)
}
}

private void putInt(final int index, final int n) {
private static void putInt(final MappedByteBuffer buffer, final int index, final int n) {
byte[] bs = new byte[RECORD_DATA_LENGTH_SIZE];
Bits.putInt(bs, 0, n);
for (int i = 0; i < bs.length; i++) {
this.buffer.put(index + i, bs[i]);
buffer.put(index + i, bs[i]);
}
}

private void put(final int index, final byte[] data) {
private static void put(final MappedByteBuffer buffer, final int index, final byte[] data) {
for (int i = 0; i < data.length; i++) {
this.buffer.put(index + i, data[i]);
buffer.put(index + i, data[i]);
}
}

Expand Down Expand Up @@ -754,6 +784,7 @@ public byte[] read(final long logIndex, final int pos) throws IOException {
}
readBuffer.position(pos + RECORD_MAGIC_BYTES_SIZE);
final int dataLen = readBuffer.getInt();
//TODO(boyan) reuse data array?
final byte[] data = new byte[dataLen];
readBuffer.get(data);
return data;
Expand All @@ -773,25 +804,27 @@ private void swapInIfNeed() {
* storage device containing the mapped file.
*/
public void sync(final boolean sync) throws IOException {
MappedByteBuffer buf = null;
this.writeLock.lock();
try {
if (this.committedPos >= this.wrotePos) {
return;
}
this.committedPos = this.wrotePos;
buf = this.buffer;
LOG.debug("Commit segment file {} at pos {}.", this.path, this.committedPos);
} finally {
this.writeLock.unlock();
}
if (sync) {
fsync();
fsync(buf);
}
}

private void fsync() {
if (this.buffer != null) {
private void fsync(final MappedByteBuffer buffer) {
if (buffer != null) {
long startMs = Utils.monotonicMs();
this.buffer.force();
buffer.force();
final long cost = Utils.monotonicMs() - startMs;
if (cost >= FSYNC_COST_MS_THRESHOLD) {
LOG.warn("Call fsync on file {} cost {} ms.", this.path, cost);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,11 @@ public void testLearnerServices() throws Exception {

// Add learner3
this.cliService.addLearners(this.groupId, this.conf, Arrays.asList(learner3));
Thread.sleep(100);
Thread.sleep(1000);
assertEquals(10, this.cluster.getFsmByPeer(learner3).getLogs().size());

sendTestTaskAndWait(this.cluster.getLeader(), 0);
Thread.sleep(500);
Thread.sleep(1000);
for (final MockStateMachine fsm : this.cluster.getFsms()) {
assertEquals(20, fsm.getLogs().size());

Expand All @@ -180,7 +180,7 @@ public void testLearnerServices() throws Exception {
// Remove 3
this.cliService.removeLearners(this.groupId, this.conf, Arrays.asList(learner3));
sendTestTaskAndWait(this.cluster.getLeader(), 0);
Thread.sleep(500);
Thread.sleep(1000);
for (final MockStateMachine fsm : this.cluster.getFsms()) {
if (!fsm.getAddress().equals(learner3.getEndpoint())) {
assertEquals(30, fsm.getLogs().size());
Expand All @@ -197,7 +197,7 @@ public void testLearnerServices() throws Exception {
assertEquals(30, this.cluster.getFsmByPeer(learner3).getLogs().size());

sendTestTaskAndWait(this.cluster.getLeader(), 0);
Thread.sleep(500);
Thread.sleep(1000);
// Latest 10 logs are not replicated to learner1 and learner2, because they were removed by resetting learners set.
for (final MockStateMachine fsm : this.cluster.getFsms()) {
if (!oldLearners.contains(new PeerId(fsm.getAddress(), 0))) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1550,8 +1550,8 @@ public void run(final Status status, final long theIndex, final byte[] reqCtx) {
assertArrayEquals(requestContext, reqCtx);
success.set(true);
} else {
assertTrue(status.getErrorMsg().contains("RPC exception:Check connection["));
assertTrue(status.getErrorMsg().contains("] fail and try to create new one"));
assertTrue(status.getErrorMsg(), status.getErrorMsg().contains("RPC exception:Check connection["));
assertTrue(status.getErrorMsg(), status.getErrorMsg().contains("] fail and try to create new one"));
}
latch.countDown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import com.alipay.sofa.jraft.util.Utils;

public abstract class BaseLogStorageTest extends BaseStorageTest {
private LogStorage logStorage;
protected LogStorage logStorage;
private ConfigurationManager confManager;
private LogEntryCodecFactory logEntryCodecFactory;

Expand All @@ -65,7 +65,7 @@ public void setup() throws Exception {

protected abstract LogStorage newLogStorage();

private LogStorageOptions newLogStorageOptions() {
protected LogStorageOptions newLogStorageOptions() {
final LogStorageOptions opts = new LogStorageOptions();
opts.setConfigurationManager(this.confManager);
opts.setLogEntryCodecFactory(this.logEntryCodecFactory);
Expand Down Expand Up @@ -182,9 +182,6 @@ public void testTruncatePrefix() {

@Test
public void testAppendMantyLargeEntries() {

appendLargeEntries(10000, 1024, 10);

final long start = Utils.monotonicMs();
final int totalLogs = 100000;
final int logSize = 16 * 1024;
Expand Down
Loading

0 comments on commit dfaa19a

Please sign in to comment.