Skip to content

Commit

Permalink
Feature/segment storage improvement (sofastack#399)
Browse files Browse the repository at this point in the history
* (fix) RocksDBSegmentLogStorage can't be read when sync is false

* (feat) Impl concurrent writing for segment log storage

* (feat) use segment log storage as default log storage

* (fix) forgot to reset diskId after loading snapshot

* (feat) tweak RocksDBSegmentLogStorage

* (feat) refactor RocksDBSegmentLogStorage: use a new data format and pre-allocate segments.

* (feat) Adds LogManagerWithSegmentLogStorageTest

* (feat) Named SegmentAllocator thread

* (fix) example log

* (feat) tweak test parameters and warn when using RocksDBSegmentLogStorage on mac

* (feat) make write executor configurable

* (feat) warmup segments for test

* (fix) stackoverflow when fsm caller is overload

* (feat) Impl segment swap in/out

* (fix) panic when RocksDBSegmentLogStorage#onTruncateSuffix log not found.

* (feat) remove mlock,because it has oom risk

* (feat) use filename in checkpoint instead of absolute path

* (feat) use RocksDBLogStorage as default log storage

* (feat) Destroy segment file out of lock

* (feat) minor changes by CR.

* (feat) Introduce WriteContext to wrap CountDownEvents

* (fix) typo

* (feat) minor changes by CR.
  • Loading branch information
killme2008 authored Mar 4, 2020
1 parent 0afdb00 commit 94c2ece
Show file tree
Hide file tree
Showing 23 changed files with 1,492 additions and 387 deletions.
5 changes: 5 additions & 0 deletions jraft-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@
<groupId>org.rocksdb</groupId>
<artifactId>rocksdbjni</artifactId>
</dependency>
<!-- jna -->
<dependency>
<groupId>net.java.dev.jna</groupId>
<artifactId>jna</artifactId>
</dependency>
<!-- jctools -->
<dependency>
<groupId>org.jctools</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ private boolean enqueueTask(final EventTranslator<ApplyTask> tpl) {
return false;
}
if (!this.taskQueue.tryPublishEvent(tpl)) {
onError(new RaftException(ErrorType.ERROR_TYPE_STATE_MACHINE, new Status(RaftError.EBUSY,
setError(new RaftException(ErrorType.ERROR_TYPE_STATE_MACHINE, new Status(RaftError.EBUSY,
"FSMCaller is overload.")));
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,9 @@ public void setSnapshot(final SnapshotMeta meta) {
if (this.lastSnapshotId.compareTo(this.appliedId) > 0) {
this.appliedId = this.lastSnapshotId.copy();
}
if (this.lastSnapshotId.compareTo(this.diskId) > 0) {
this.diskId = this.lastSnapshotId.copy();
}

if (term == 0) {
// last_included_index is larger than last_index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,48 @@ public class RocksDBLogStorage implements LogStorage, Describer {
*/
private interface WriteBatchTemplate {

void execute(WriteBatch batch) throws RocksDBException, IOException;
void execute(WriteBatch batch) throws RocksDBException, IOException, InterruptedException;
}

/**
* A write context
* @author boyan([email protected])
*
*/
public interface WriteContext {
/**
* Start a sub job.
*/
default void startJob() {
}

/**
* Finish a sub job
*/
default void finishJob() {
}

/**
* Set an exception to context.
* @param e
*/
default void setError(final Exception e) {
}

/**
* Wait for all sub jobs finish.
*/
default void joinAll() throws InterruptedException, IOException {
}
}

/**
* An empty write context
* @author boyan([email protected])
*
*/
protected static class EmptyWriteContext implements WriteContext {
static EmptyWriteContext INSTANCE = new EmptyWriteContext();
}

private final String path;
Expand Down Expand Up @@ -122,11 +163,11 @@ public static DBOptions createDBOptions() {

public static ColumnFamilyOptions createColumnFamilyOptions() {
final BlockBasedTableConfig tConfig = StorageOptionsFactory
.getRocksDBTableFormatConfig(RocksDBLogStorage.class);
.getRocksDBTableFormatConfig(RocksDBLogStorage.class);
return StorageOptionsFactory.getRocksDBColumnFamilyOptions(RocksDBLogStorage.class) //
.useFixedLengthPrefixExtractor(8) //
.setTableFormatConfig(tConfig) //
.setMergeOperator(new StringAppendOperator());
.useFixedLengthPrefixExtractor(8) //
.setTableFormatConfig(tConfig) //
.setMergeOperator(new StringAppendOperator());
}

@Override
Expand Down Expand Up @@ -289,6 +330,10 @@ private boolean executeBatch(final WriteBatchTemplate template) {
} catch (final IOException e) {
LOG.error("Execute batch failed with io exception.", e);
return false;
} catch (final InterruptedException e) {
LOG.error("Execute batch failed with interrupt.", e);
Thread.currentThread().interrupt();
return false;
} finally {
this.readLock.unlock();
}
Expand Down Expand Up @@ -430,10 +475,11 @@ private void addConfBatch(final LogEntry entry, final WriteBatch batch) throws R
batch.put(this.confHandle, ks, content);
}

private void addDataBatch(final LogEntry entry, final WriteBatch batch) throws RocksDBException, IOException {
private void addDataBatch(final LogEntry entry, final WriteBatch batch,
final WriteContext ctx) throws RocksDBException, IOException, InterruptedException {
final long logIndex = entry.getId().getIndex();
final byte[] content = this.logEntryEncoder.encode(entry);
batch.put(this.defaultHandle, getKeyBytes(logIndex), onDataAppend(logIndex, content));
batch.put(this.defaultHandle, getKeyBytes(logIndex), onDataAppend(logIndex, content, ctx));
}

@Override
Expand All @@ -447,27 +493,31 @@ public boolean appendEntry(final LogEntry entry) {
LOG.warn("DB not initialized or destroyed.");
return false;
}
final WriteContext writeCtx = newWriteContext();
final long logIndex = entry.getId().getIndex();
final byte[] valueBytes = this.logEntryEncoder.encode(entry);
final byte[] newValueBytes = onDataAppend(logIndex, valueBytes);
final byte[] newValueBytes = onDataAppend(logIndex, valueBytes, writeCtx);
writeCtx.startJob();
this.db.put(this.defaultHandle, this.writeOptions, getKeyBytes(logIndex), newValueBytes);
writeCtx.joinAll();
if (newValueBytes != valueBytes) {
doSync();
}
return true;
} catch (final RocksDBException | IOException e) {
LOG.error("Fail to append entry.", e);
return false;
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
return false;
} finally {
this.readLock.unlock();
}
}
}

private void doSync() throws IOException {
if (this.sync) {
onSync();
}
private void doSync() throws IOException, InterruptedException {
onSync();
}

@Override
Expand All @@ -477,14 +527,17 @@ public int appendEntries(final List<LogEntry> entries) {
}
final int entriesCount = entries.size();
final boolean ret = executeBatch(batch -> {
final WriteContext writeCtx = newWriteContext();
for (int i = 0; i < entriesCount; i++) {
final LogEntry entry = entries.get(i);
if (entry.getType() == EntryType.ENTRY_TYPE_CONFIGURATION) {
addConfBatch(entry, batch);
} else {
addDataBatch(entry, batch);
writeCtx.startJob();
addDataBatch(entry, batch, writeCtx);
}
}
writeCtx.joinAll();
doSync();
});

Expand Down Expand Up @@ -614,13 +667,17 @@ protected void onReset(final long nextLogIndex) {
* @param firstIndexKept the first index to kept
*/
protected void onTruncatePrefix(final long startIndex, final long firstIndexKept) throws RocksDBException,
IOException {
IOException {
}

/**
* Called when sync data into file system.
*/
protected void onSync() throws IOException {
protected void onSync() throws IOException, InterruptedException {
}

protected boolean isSync() {
return this.sync;
}

/**
Expand All @@ -631,14 +688,20 @@ protected void onSync() throws IOException {
protected void onTruncateSuffix(final long lastIndexKept) throws RocksDBException, IOException {
}

protected WriteContext newWriteContext() {
return EmptyWriteContext.INSTANCE;
}

/**
* Called before appending data entry.
*
* @param logIndex the log index
* @param value the data value in log entry.
* @return the new value
*/
protected byte[] onDataAppend(final long logIndex, final byte[] value) throws IOException {
protected byte[] onDataAppend(final long logIndex, final byte[] value,
final WriteContext ctx) throws IOException, InterruptedException {
ctx.finishJob();
return value;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package com.alipay.sofa.jraft.storage.log;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;

import com.alipay.sofa.jraft.util.Utils;
import java.util.Date;

/**
* Abort file
Expand All @@ -40,13 +40,25 @@ public AbortFile(final String path) {
}

public boolean create() throws IOException {
return new File(this.path) //
.createNewFile();
return writeDate();
}

public boolean touch() {
return new File(this.path) //
.setLastModified(Utils.nowMs());
@SuppressWarnings("deprecation")
private boolean writeDate() throws IOException {
final File file = new File(this.path);
if (file.createNewFile()) {
try (FileWriter writer = new FileWriter(file, false)) {
writer.write(new Date().toGMTString());
writer.write(System.lineSeparator());
}
return true;
} else {
return false;
}
}

public boolean touch() throws IOException {
return writeDate();
}

public boolean exists() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import com.alipay.sofa.jraft.entity.LocalFileMetaOutter.LocalFileMeta;
import com.alipay.sofa.jraft.storage.io.ProtoBufFile;
import com.alipay.sofa.jraft.util.AsciiStringUtil;
import com.alipay.sofa.jraft.util.Bits;
import com.google.protobuf.ZeroByteStringHelper;

Expand All @@ -32,32 +33,48 @@
* @author boyan([email protected])
*/
public class CheckpointFile {

/**
* firstLogIndex(8 B) + commitPos (4 B)
*/
private static final int CHECKPOINT_METADATA_SIZE = 12;

/**
* Checkpoint metadata info.
*
* @author boyan([email protected])
*/
public static final class Checkpoint {
// Segment file start offset
public final long firstLogIndex;
// Segment file name
public String segFilename;
// Segment file current commit position.
public final int committedPos;
public int committedPos;

public Checkpoint(final long firstLogIndex, final int committedPos) {
public Checkpoint(final String segFilename, final int committedPos) {
super();
this.firstLogIndex = firstLogIndex;
this.segFilename = segFilename;
this.committedPos = committedPos;
}

/**
* commitPos (4 bytes) + path(4 byte len + string bytes)
*/
byte[] encode() {
byte[] ps = AsciiStringUtil.unsafeEncode(this.segFilename);
byte[] bs = new byte[8 + ps.length];
Bits.putInt(bs, 0, this.committedPos);
Bits.putInt(bs, 4, ps.length);
System.arraycopy(ps, 0, bs, 8, ps.length);
return bs;
}

boolean decode(final byte[] bs) {
if (bs.length < 8) {
return false;
}
this.committedPos = Bits.getInt(bs, 0);
int len = Bits.getInt(bs, 4);
this.segFilename = AsciiStringUtil.unsafeDecode(bs, 8, len);
return this.committedPos >= 0 && !this.segFilename.isEmpty();
}

@Override
public String toString() {
return "Checkpoint [firstLogIndex=" + this.firstLogIndex + ", committedPos=" + this.committedPos + "]";
return "Checkpoint [segFilename=" + this.segFilename + ", committedPos=" + this.committedPos + "]";
}
}

Expand All @@ -78,9 +95,7 @@ public CheckpointFile(final String path) {

public synchronized boolean save(final Checkpoint checkpoint) throws IOException {
final ProtoBufFile file = new ProtoBufFile(this.path);
final byte[] data = new byte[CHECKPOINT_METADATA_SIZE];
Bits.putLong(data, 0, checkpoint.firstLogIndex);
Bits.putInt(data, 8, checkpoint.committedPos);
final byte[] data = checkpoint.encode();

final LocalFileMeta meta = LocalFileMeta.newBuilder() //
.setUserMeta(ZeroByteStringHelper.wrap(data)) //
Expand All @@ -94,8 +109,10 @@ public Checkpoint load() throws IOException {
final LocalFileMeta meta = file.load();
if (meta != null) {
final byte[] data = meta.getUserMeta().toByteArray();
assert (data.length == CHECKPOINT_METADATA_SIZE);
return new Checkpoint(Bits.getLong(data, 0), Bits.getInt(data, 8));
Checkpoint checkpoint = new Checkpoint(null, -1);
if (checkpoint.decode(data)) {
return checkpoint;
}
}
return null;
}
Expand Down
Loading

0 comments on commit 94c2ece

Please sign in to comment.