Skip to content

Commit

Permalink
Memory optimize at log manager. (sofastack#629)
Browse files Browse the repository at this point in the history
* memory optimize at log manager.
  • Loading branch information
horizonzy authored Jul 1, 2021
1 parent 4cbb722 commit 511ee01
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,7 @@ private long runApplyTask(final ApplyTask task, long maxCommittedIndex, final bo
if (task.committedIndex > maxCommittedIndex) {
maxCommittedIndex = task.committedIndex;
}
task.reset();
} else {
if (maxCommittedIndex >= 0) {
this.currTask = TaskType.COMMITTED;
Expand Down Expand Up @@ -432,6 +433,7 @@ private long runApplyTask(final ApplyTask task, long maxCommittedIndex, final bo
}
} finally {
this.nodeMetrics.recordLatency(task.type.metricName(), Utils.monotonicMs() - startMs);
task.reset();
}
}
try {
Expand Down
21 changes: 16 additions & 5 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.stream.Collectors;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -298,7 +299,7 @@ public void onEvent(final LogEntryAndClosure event, final long sequence, final b
if (event.shutdownLatch != null) {
if (!this.tasks.isEmpty()) {
executeApplyingTasks(this.tasks);
this.tasks.clear();
reset();
}
final int num = GLOBAL_NUM_NODES.decrementAndGet();
LOG.info("The number of active nodes decrement to {}.", num);
Expand All @@ -309,9 +310,16 @@ public void onEvent(final LogEntryAndClosure event, final long sequence, final b
this.tasks.add(event);
if (this.tasks.size() >= NodeImpl.this.raftOptions.getApplyBatch() || endOfBatch) {
executeApplyingTasks(this.tasks);
this.tasks.clear();
reset();
}
}

private void reset() {
for (final LogEntryAndClosure task : tasks) {
task.reset();
}
this.tasks.clear();
}
}

/**
Expand Down Expand Up @@ -1357,10 +1365,10 @@ private void executeApplyingTasks(final List<LogEntryAndClosure> tasks) {
st.setError(RaftError.EBUSY, "Is transferring leadership.");
}
LOG.debug("Node {} can't apply, status={}.", getNodeId(), st);
final List<LogEntryAndClosure> savedTasks = new ArrayList<>(tasks);
final List<Closure> dones = tasks.stream().map(ele -> ele.done).collect(Collectors.toList());
Utils.runInThread(() -> {
for (int i = 0; i < size; i++) {
savedTasks.get(i).done.run(st);
for (final Closure done : dones) {
done.run(st);
}
});
return;
Expand All @@ -1375,18 +1383,21 @@ private void executeApplyingTasks(final List<LogEntryAndClosure> tasks) {
final Status st = new Status(RaftError.EPERM, "expected_term=%d doesn't match current_term=%d",
task.expectedTerm, this.currTerm);
Utils.runClosureInThread(task.done, st);
task.reset();
}
continue;
}
if (!this.ballotBox.appendPendingTask(this.conf.getConf(),
this.conf.isStable() ? null : this.conf.getOldConf(), task.done)) {
Utils.runClosureInThread(task.done, new Status(RaftError.EINTERNAL, "Fail to append task."));
task.reset();
continue;
}
// set task entry info before adding to list.
task.entry.getId().setTerm(this.currTerm);
task.entry.setType(EnumOutter.EntryType.ENTRY_TYPE_DATA);
entries.add(task.entry);
task.reset();
}
this.logManager.appendEntries(entries, new LeaderStableClosure(entries));
// update conf.first
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -506,16 +506,20 @@ public void onEvent(final StableClosureEvent event, final long sequence, final b
this.lastId = this.ab.flush();
setDiskId(this.lastId);
LogManagerImpl.this.shutDownLatch.countDown();
event.reset();
return;
}
final StableClosure done = event.done;
final EventType eventType = event.type;

event.reset();

if (done.getEntries() != null && !done.getEntries().isEmpty()) {
this.ab.append(done);
} else {
this.lastId = this.ab.flush();
boolean ret = true;
switch (event.type) {
switch (eventType) {
case LAST_LOG_ID:
((LastLogIdClosure) done).setLastLogId(this.lastId.copy());
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ public T getMessage() {
return message;
}

public void reset() {
this.message = null;
}

public void setMessage(T message) {
this.message = message;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ public class TaskHandler implements EventHandler<MessageEvent<Runnable>>, WorkHa
public void onEvent(final MessageEvent<Runnable> event, final long sequence, final boolean endOfBatch)
throws Exception {
event.getMessage().run();
event.reset();
}

@Override
public void onEvent(final MessageEvent<Runnable> event) throws Exception {
event.getMessage().run();
event.reset();
}

@Override
Expand Down

0 comments on commit 511ee01

Please sign in to comment.