Skip to content

Commit

Permalink
Fix memory leak. (sofastack#619)
Browse files Browse the repository at this point in the history
* memory leak fix.

* code clean.

* log info format fix.

* reset should after event process.
  • Loading branch information
horizonzy authored Jun 28, 2021
1 parent bbd97f1 commit 4cbb722
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@ private static class ReadIndexEvent {
ReadIndexClosure done;
CountDownLatch shutdownLatch;
long startTime;

private void reset() {
this.requestContext = null;
this.done = null;
this.shutdownLatch = null;
this.startTime = 0L;
}
}

private static class ReadIndexEventFactory implements EventFactory<ReadIndexEvent> {
Expand All @@ -116,16 +123,23 @@ public void onEvent(final ReadIndexEvent newEvent, final long sequence, final bo
throws Exception {
if (newEvent.shutdownLatch != null) {
executeReadIndexEvents(this.events);
this.events.clear();
reset();
newEvent.shutdownLatch.countDown();
return;
}

this.events.add(newEvent);
if (this.events.size() >= ReadOnlyServiceImpl.this.raftOptions.getApplyBatch() || endOfBatch) {
executeReadIndexEvents(this.events);
this.events.clear();
reset();
}
}

private void reset() {
for (final ReadIndexEvent event : this.events) {
event.reset();
}
this.events.clear();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,19 +51,19 @@ public LogExceptionHandler(String name, OnEventException<T> onEventException) {

@Override
public void handleOnStartException(Throwable ex) {
LOG.error("Fail to start {} disruptor, {}", this.name, ex);
LOG.error("Fail to start {} disruptor", this.name, ex);
}

@Override
public void handleOnShutdownException(Throwable ex) {
LOG.error("Fail to shutdown {} disruptor, {}", this.name, ex);
LOG.error("Fail to shutdown {} disruptor", this.name, ex);
}

@Override
public void handleEventException(Throwable ex, long sequence, T event) {
LOG.error("Handle {} disruptor event error, event is {}, {}", this.name, event, ex);
LOG.error("Handle {} disruptor event error, event is {}", this.name, event, ex);
if (this.onEventException != null) {
this.onEventException.onException(event, ex);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1763,12 +1763,12 @@ public void onEvent(final KeyEvent event, final long sequence, final boolean end
}

if (size == 1) {
reset();
try {
get(event.key, this.readOnlySafe, event.future, false);
} catch (final Throwable t) {
exceptionally(t, event.future);
}
reset();
} else {
final List<byte[]> keys = Lists.newArrayListWithCapacity(size);
final CompletableFuture<byte[]>[] futures = new CompletableFuture[size];
Expand Down Expand Up @@ -1813,13 +1813,13 @@ public void onEvent(final KVEvent event, final long sequence, final boolean endO
}

if (size == 1) {
reset();
final KVEntry kv = event.kvEntry;
try {
put(kv.getKey(), kv.getValue(), event.future, false);
} catch (final Throwable t) {
exceptionally(t, event.future);
}
reset();
} else {
final List<KVEntry> entries = Lists.newArrayListWithCapacity(size);
final CompletableFuture<Boolean>[] futures = new CompletableFuture[size];
Expand All @@ -1846,7 +1846,7 @@ public void onEvent(final KVEvent event, final long sequence, final boolean endO
}
}

private abstract class AbstractBatchingHandler<T> implements EventHandler<T> {
private abstract class AbstractBatchingHandler<T extends Event> implements EventHandler<T> {

protected final Histogram histogramWithKeys;
protected final Histogram histogramWithBytes;
Expand All @@ -1869,27 +1869,37 @@ public void reset() {
this.histogramWithKeys.update(this.events.size());
this.histogramWithBytes.update(this.cachedBytes);

for (final T event : events) {
event.reset();
}
this.events.clear();
this.cachedBytes = 0;
}

}

private interface Event {
void reset();
}

private static class KeyEvent {
private static class KeyEvent implements Event {

private byte[] key;
private CompletableFuture<byte[]> future;

@Override
public void reset() {
this.key = null;
this.future = null;
}
}

private static class KVEvent {
private static class KVEvent implements Event {

private KVEntry kvEntry;
private CompletableFuture<Boolean> future;

@Override
public void reset() {
this.kvEntry = null;
this.future = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public void run(final Status status) {
if (done != null) {
done.run(status);
}
clear();
reset();
}

@Override
Expand Down Expand Up @@ -104,7 +104,7 @@ public void setData(Object data) {
}
}

private void clear() {
private void reset() {
done = null;
operation = null;
}
Expand Down

0 comments on commit 4cbb722

Please sign in to comment.