Skip to content

Commit

Permalink
(feat) Set segment file to be read-only after write and sync (sofasta…
Browse files Browse the repository at this point in the history
…ck#409)

* (feat) Set segment file to be read-only after write and sync

* (feat) use CopyOnWriteArrayList for hooks
  • Loading branch information
killme2008 authored Mar 30, 2020
1 parent 2d04955 commit e565e16
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ default void startJob() {
default void finishJob() {
}

/**
* Adds a callback that will be invoked after all sub jobs finish.
*/
default void addFinishHook(final Runnable r) {

}

/**
* Set an exception to context.
* @param e
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -83,14 +84,23 @@ public AllocatedResult(final IOException ie) {
}

public static class BarrierWriteContext implements WriteContext {
private final CountDownEvent events = new CountDownEvent();
private volatile Exception e;
private final CountDownEvent events = new CountDownEvent();
private volatile Exception e;
private volatile List<Runnable> hooks;

@Override
public void startJob() {
this.events.incrementAndGet();
}

@Override
public synchronized void addFinishHook(final Runnable r) {
if (this.hooks == null) {
this.hooks = new CopyOnWriteArrayList<>();
}
this.hooks.add(r);
}

@Override
public void finishJob() {
this.events.countDown();
Expand All @@ -104,6 +114,11 @@ public void setError(final Exception e) {
@Override
public void joinAll() throws InterruptedException, IOException {
this.events.await();
if (this.hooks != null) {
for (Runnable r : this.hooks) {
r.run();
}
}
if (this.e != null) {
throw new IOException("Fail to apppend entries", this.e);
}
Expand Down Expand Up @@ -358,10 +373,11 @@ private SegmentFile createNewSegmentFile(final long logIndex, final int oldSegme
final SegmentFile currLastFile = this.segments.get(this.segments.size() - 1);
currLastFile.setLastLogIndex(logIndex - 1);
ctx.startJob();
// Attach a finish hook to set last segment file to be read-only.
ctx.addFinishHook(() -> currLastFile.setReadOnly(true));
// Run sync in parallel
this.writeExecutor.execute(() -> {
try {
currLastFile.setReadOnly(true);
currLastFile.sync(isSync());
} catch (final IOException e) {
ctx.setError(e);
Expand Down

0 comments on commit e565e16

Please sign in to comment.