Skip to content

Commit

Permalink
[FLINK-18581] Do not try to run GC phantom cleaners for jdk < 8u72
Browse files Browse the repository at this point in the history
The private JVM method Reference#tryHandlePending was introduced at Java 8u72.
The explicit processing of queued phantom GC cleaners was exposed before 8u72, also is was not used while reserving JVM direct memory.
Therefore, we can only hope that the GC will be triggered and the cleaners get processed in GC after some timeout.
This is suboptimal, therefore the PR changes Flink to not fail if the method is unavailable but logs a warning to upgrade Java.

This closes apache#12981.
  • Loading branch information
azagrebin committed Jul 28, 2020
1 parent 3d056c8 commit 2f03841
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ private static CleanerProvider createLegacyCleanerProvider() {
"clean"),
new PendingCleanersRunnerProvider(
name,
reflectionUtils,
"tryHandlePending",
LEGACY_WAIT_FOR_REFERENCE_PROCESSING_ARGS,
LEGACY_WAIT_FOR_REFERENCE_PROCESSING_ARG_TYPES));
Expand Down Expand Up @@ -126,7 +125,6 @@ private static CleanerProvider createJava9CleanerProvider() {
"clean"),
new PendingCleanersRunnerProvider(
name,
reflectionUtils,
"waitForReferenceProcessing",
JAVA9_WAIT_FOR_REFERENCE_PROCESSING_ARGS,
JAVA9_WAIT_FOR_REFERENCE_PROCESSING_ARG_TYPES));
Expand Down Expand Up @@ -189,12 +187,13 @@ public String toString() {
private static class CleanerManager {
private final String cleanerName;
private final CleanerFactory cleanerFactory;
@Nullable
private final PendingCleanersRunner pendingCleanersRunner;

private CleanerManager(
String cleanerName,
CleanerFactory cleanerFactory,
PendingCleanersRunner pendingCleanersRunner) {
@Nullable PendingCleanersRunner pendingCleanersRunner) {
this.cleanerName = cleanerName;
this.cleanerFactory = cleanerFactory;
this.pendingCleanersRunner = pendingCleanersRunner;
Expand All @@ -205,7 +204,7 @@ private Runnable create(Object owner, Runnable cleanOperation) {
}

private boolean tryRunPendingCleaners() throws InterruptedException {
return pendingCleanersRunner.tryRunPendingCleaners();
return pendingCleanersRunner != null && pendingCleanersRunner.tryRunPendingCleaners();
}

@Override
Expand Down Expand Up @@ -303,32 +302,38 @@ private Runnable create(Object owner, Runnable cleanupOperation) {
private static class PendingCleanersRunnerProvider {
private static final String REFERENCE_CLASS = "java.lang.ref.Reference";
private final String cleanerName;
private final ReflectionUtils reflectionUtils;
private final String waitForReferenceProcessingName;
private final Object[] waitForReferenceProcessingArgs;
private final Class<?>[] waitForReferenceProcessingArgTypes;

private PendingCleanersRunnerProvider(
String cleanerName,
ReflectionUtils reflectionUtils,
String waitForReferenceProcessingName,
Object[] waitForReferenceProcessingArgs,
Class<?>[] waitForReferenceProcessingArgTypes) {
this.cleanerName = cleanerName;
this.reflectionUtils = reflectionUtils;
this.waitForReferenceProcessingName = waitForReferenceProcessingName;
this.waitForReferenceProcessingArgs = waitForReferenceProcessingArgs;
this.waitForReferenceProcessingArgTypes = waitForReferenceProcessingArgTypes;
}

@Nullable
private PendingCleanersRunner createPendingCleanersRunner() {
Class<?> referenceClass = reflectionUtils.findClass(REFERENCE_CLASS);
Method waitForReferenceProcessingMethod = reflectionUtils.findMethod(
referenceClass,
waitForReferenceProcessingName,
waitForReferenceProcessingArgTypes);
waitForReferenceProcessingMethod.setAccessible(true);
return new PendingCleanersRunner(waitForReferenceProcessingMethod, waitForReferenceProcessingArgs);
try {
Class<?> referenceClass = Class.forName(REFERENCE_CLASS);
Method waitForReferenceProcessingMethod = referenceClass.getDeclaredMethod(
waitForReferenceProcessingName,
waitForReferenceProcessingArgTypes);
waitForReferenceProcessingMethod.setAccessible(true);
return new PendingCleanersRunner(waitForReferenceProcessingMethod, waitForReferenceProcessingArgs);
} catch (ClassNotFoundException | NoSuchMethodException e) {
LOG.warn(
"Cannot run pending GC phantom cleaners. " +
"This can result in suboptimal memory management or failures. " +
"Try to upgrade to Java 8u72 or higher.",
e);
return null;
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,13 @@ void reserveMemory(long size, int maxSleeps) throws MemoryReservationException {
}

// no luck
throw new MemoryReservationException(
String.format("Could not allocate %d bytes, only %d bytes are remaining", size, availableOrReserved));
throw new MemoryReservationException(String.format(
"Could not allocate %d bytes, only %d bytes are remaining. This usually indicates " +
"that you are requesting more memory than you have reserved. " +
"However, when running an old JVM version it can also be caused by slow garbage collection. " +
"Try to upgrade to Java 8u72 or higher if running on an old Java version.",
size,
availableOrReserved));

} finally {
if (interrupted) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,11 @@ private void verifyAllManagedMemoryIsReleasedAfter(CompletableFuture<Void> after
after.thenRunAsync(
() -> {
if (!memoryManager.verifyEmpty()) {
LOG.warn("Not all slot memory is freed, potential memory leak at {}", this);
LOG.warn(
"Not all slot managed memory is freed at {}. This usually indicates memory leak. " +
"However, when running an old JVM version it can also be caused by slow garbage collection. " +
"Try to upgrade to Java 8u72 or higher if running on an old Java version.",
this);
}
},
asyncExecutor);
Expand Down

0 comments on commit 2f03841

Please sign in to comment.