Skip to content

Support --only-ticks-over with async-profiler #470

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,22 +112,21 @@ public Sampler start(SparkPlatform platform) throws UnsupportedOperationExceptio
throw new IllegalArgumentException("samplingInterval = " + this.samplingInterval);
}

boolean canUseAsyncProfiler = AsyncProfilerAccess.getInstance(platform).checkSupported(platform);
AsyncProfilerAccess asyncProfiler = AsyncProfilerAccess.getInstance(platform);

boolean onlyTicksOverMode = this.ticksOver != -1 && this.tickHook != null;
boolean canUseAsyncProfiler = asyncProfiler.checkSupported(platform) && (!onlyTicksOverMode || platform.getTickReporter() != null);

if (this.mode == SamplerMode.ALLOCATION) {
if (!canUseAsyncProfiler || !AsyncProfilerAccess.getInstance(platform).checkAllocationProfilingSupported(platform)) {
if (!canUseAsyncProfiler || !asyncProfiler.checkAllocationProfilingSupported(platform)) {
throw new UnsupportedOperationException("Allocation profiling is not supported on your system. Check the console for more info.");
}
if (this.ignoreSleeping) {
platform.getPlugin().log(Level.WARNING, "Ignoring sleeping threads is not supported in allocation profiling mode. Sleeping threads will be included in the results.");
}
if (onlyTicksOverMode) {
platform.getPlugin().log(Level.WARNING, "'Only-ticks-over' is not supported in allocation profiling mode.");
}
}

if (onlyTicksOverMode || this.forceJavaSampler) {
if (this.forceJavaSampler) {
canUseAsyncProfiler = false;
}

Expand All @@ -139,14 +138,17 @@ public Sampler start(SparkPlatform platform) throws UnsupportedOperationExceptio
SamplerSettings settings = new SamplerSettings(interval, this.threadDumper, this.threadGrouper.get(), this.autoEndTime, this.background, this.ignoreSleeping);

Sampler sampler;
if (this.mode == SamplerMode.ALLOCATION) {
sampler = new AsyncSampler(platform, settings, new SampleCollector.Allocation(interval, this.allocLiveOnly));
} else if (canUseAsyncProfiler) {
sampler = new AsyncSampler(platform, settings, new SampleCollector.Execution(interval));
} else if (onlyTicksOverMode) {
sampler = new JavaSampler(platform, settings, this.tickHook, this.ticksOver);
if (canUseAsyncProfiler) {
SampleCollector<?> collector = this.mode == SamplerMode.ALLOCATION
? new SampleCollector.Allocation(interval, this.allocLiveOnly)
: new SampleCollector.Execution(interval);
sampler = onlyTicksOverMode
? new AsyncSampler(platform, settings, collector, this.ticksOver)
: new AsyncSampler(platform, settings, collector);
} else {
sampler = new JavaSampler(platform, settings);
sampler = onlyTicksOverMode
? new JavaSampler(platform, settings, this.tickHook, this.ticksOver)
: new JavaSampler(platform, settings);
}

sampler.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
/**
* Data aggregator for {@link AsyncSampler}.
*/
public class AsyncDataAggregator extends AbstractDataAggregator {
public class AsyncDataAggregator extends AbstractDataAggregator implements AutoCloseable {

/** A describer for async-profiler stack trace elements. */
private static final StackTraceNode.Describer<AsyncStackTraceElement> STACK_TRACE_DESCRIBER = (element, parent) ->
Expand Down Expand Up @@ -79,4 +79,8 @@ private static boolean isSleeping(ProfileSegment element) {
return false;
}

@Override
public void close() {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ static AsyncProfilerJob createNew(AsyncProfilerAccess access, AsyncProfiler prof
private int window;
/** If the profiler should run in quiet mode */
private boolean quiet;
/** If the profiler needs to use the same clock as {@link System#nanoTime()} */
private boolean forceNanoTime;

/** The file used by async-profiler to output data */
private Path outputFile;
Expand Down Expand Up @@ -117,12 +119,13 @@ private void checkActive() {
}

// Initialise the job
public void init(SparkPlatform platform, SampleCollector<?> collector, ThreadDumper threadDumper, int window, boolean quiet) {
public void init(SparkPlatform platform, SampleCollector<?> collector, ThreadDumper threadDumper, int window, boolean quiet, boolean forceNanoTime) {
this.platform = platform;
this.sampleCollector = collector;
this.threadDumper = threadDumper;
this.window = window;
this.quiet = quiet;
this.forceNanoTime = forceNanoTime;
}

/**
Expand Down Expand Up @@ -151,6 +154,9 @@ public void start() {
if (this.threadDumper instanceof ThreadDumper.Specific) {
command.add("filter");
}
if (this.forceNanoTime) {
command.add("clock=monotonic");
}

// start the profiler
String resp = execute(command.build()).trim();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import me.lucko.spark.common.sampler.SamplerSettings;
import me.lucko.spark.common.sampler.SamplerType;
import me.lucko.spark.common.sampler.window.ProfilingWindowUtils;
import me.lucko.spark.common.sampler.window.WindowStatisticsCollector;
import me.lucko.spark.common.tick.TickHook;
import me.lucko.spark.common.util.SparkThreadFactory;
import me.lucko.spark.common.ws.ViewerSocket;
Expand All @@ -53,6 +54,9 @@ public class AsyncSampler extends AbstractSampler {
/** Responsible for aggregating and then outputting collected sampling data */
private final AsyncDataAggregator dataAggregator;

/** Whether to force the sampler to use monotonic/nano time */
private final boolean forceNanoTime;

/** Mutex for the current profiler job */
private final Object[] currentJobMutex = new Object[0];

Expand All @@ -66,10 +70,19 @@ public class AsyncSampler extends AbstractSampler {
private ScheduledFuture<?> socketStatisticsTask;

public AsyncSampler(SparkPlatform platform, SamplerSettings settings, SampleCollector<?> collector) {
this(platform, settings, collector, new AsyncDataAggregator(settings.threadGrouper(), settings.ignoreSleeping()), false);
}

public AsyncSampler(SparkPlatform platform, SamplerSettings settings, SampleCollector<?> collector, int tickLengthThreshold) {
this(platform, settings, collector, new TickedAsyncDataAggregator(settings.threadGrouper(), settings.ignoreSleeping(), platform.getTickReporter(), tickLengthThreshold), true);
}

private AsyncSampler(SparkPlatform platform, SamplerSettings settings, SampleCollector<?> collector, AsyncDataAggregator dataAggregator, boolean forceNanoTime) {
super(platform, settings);
this.sampleCollector = collector;
this.dataAggregator = dataAggregator;
this.forceNanoTime = forceNanoTime;
this.profilerAccess = AsyncProfilerAccess.getInstance(platform);
this.dataAggregator = new AsyncDataAggregator(settings.threadGrouper(), settings.ignoreSleeping());
this.scheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("spark-async-sampler-worker-thread")
Expand All @@ -93,7 +106,7 @@ public void start() {
int window = ProfilingWindowUtils.windowNow();

AsyncProfilerJob job = this.profilerAccess.startNewProfilerJob();
job.init(this.platform, this.sampleCollector, this.threadDumper, window, this.background);
job.init(this.platform, this.sampleCollector, this.threadDumper, window, this.background, this.forceNanoTime);
job.start();
this.windowStatisticsCollector.recordWindowStartTime(window);
this.currentJob = job;
Expand Down Expand Up @@ -131,7 +144,7 @@ private void rotateProfilerJob() {
// start a new job
int window = previousJob.getWindow() + 1;
AsyncProfilerJob newJob = this.profilerAccess.startNewProfilerJob();
newJob.init(this.platform, this.sampleCollector, this.threadDumper, window, this.background);
newJob.init(this.platform, this.sampleCollector, this.threadDumper, window, this.background, this.forceNanoTime);
newJob.start();
this.windowStatisticsCollector.recordWindowStartTime(window);
this.currentJob = newJob;
Expand Down Expand Up @@ -204,6 +217,7 @@ public void stop(boolean cancelled) {
this.scheduler.shutdown();
this.scheduler = null;
}
this.dataAggregator.close();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* This file is part of spark.
*
* Copyright (c) lucko (Luck) <[email protected]>
* Copyright (c) contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package me.lucko.spark.common.sampler.async;

import me.lucko.spark.common.tick.TickReporter;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.LongSupplier;

class ExceedingTicksFilter implements TickReporter.Callback {

/** The ticks that exceeded the threshold, cleared one-by-one when inserting data */
private final Queue<ExceededTick> ticksOver = new ConcurrentLinkedQueue<>();

/** Counts the number of ticks aggregated */
private final AtomicInteger tickCounter = new AtomicInteger();

/** Tick durations under this threshold will not be inserted, measured in milliseconds */
private final int tickLengthThreshold;

/** The source to get the current nano time from */
private final LongSupplier nanoTimeSource;

ExceedingTicksFilter(int tickLengthThreshold, LongSupplier nanoTimeSource) {
this.tickLengthThreshold = tickLengthThreshold;
this.nanoTimeSource = nanoTimeSource;
}

public ExceedingTicksFilter(int tickLengthThreshold) {
this(tickLengthThreshold, System::nanoTime);
}

@Override
public void onTick(double duration) {
if (duration > this.tickLengthThreshold) {
long end = this.nanoTimeSource.getAsLong();
long start = (long) (end - (duration * 1_000_000)); // ms to ns
this.ticksOver.add(new ExceededTick(start, end));
this.tickCounter.getAndIncrement();
}
}

public int exceedingTicksCount() {
return this.tickCounter.get();
}

public boolean duringExceedingTick(long time) {
while (true) {
ExceededTick earliestExceeding = this.ticksOver.peek();
if (earliestExceeding == null) {
// no tick over threshold anymore
return false;
} else if (time - earliestExceeding.start < 0) {
// segment happened before current exceeding
return false;
} else if (earliestExceeding.end - time < 0) {
// segment happened after current exceeding,
// but it might fall into the next one
this.ticksOver.remove();
} else {
// segment falls exactly into exceeding, record it
return true;
}
}
}

private static final class ExceededTick {
// times are in nanoseconds from System.nanoTime()
private final long start;
private final long end;

ExceededTick(long start, long end) {
this.start = start;
this.end = end;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,16 @@ public class ProfileSegment {
private final long value;
/** The state of the thread. {@value #UNKNOWN_THREAD_STATE} if state is unknown */
private final String threadState;
/** The time at which this segment was recorded, as if it was produced by {@link System#nanoTime()} */
private final long time;

private ProfileSegment(int nativeThreadId, String threadName, AsyncStackTraceElement[] stackTrace, long value, String threadState) {
private ProfileSegment(int nativeThreadId, String threadName, AsyncStackTraceElement[] stackTrace, long value, String threadState, long time) {
this.nativeThreadId = nativeThreadId;
this.threadName = threadName;
this.stackTrace = stackTrace;
this.value = value;
this.threadState = threadState;
this.time = time;
}

public int getNativeThreadId() {
Expand All @@ -74,6 +77,10 @@ public String getThreadState() {
return this.threadState;
}

public long getTime() {
return this.time;
}

public static ProfileSegment parseSegment(JfrReader reader, JfrReader.Event sample, String threadName, long value) {
JfrReader.StackTrace stackTrace = reader.stackTraces.get(sample.stackTraceId);
int len = stackTrace != null ? stackTrace.methods.length : 0;
Expand All @@ -90,7 +97,7 @@ public static ProfileSegment parseSegment(JfrReader reader, JfrReader.Event samp
threadState = threadStateLookup.getOrDefault(executionSample.threadState, UNKNOWN_THREAD_STATE);
}

return new ProfileSegment(sample.tid, threadName, stack, value, threadState);
return new ProfileSegment(sample.tid, threadName, stack, value, threadState, sample.time);
}

private static AsyncStackTraceElement parseStackFrame(JfrReader reader, long methodId) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* This file is part of spark.
*
* Copyright (c) lucko (Luck) <[email protected]>
* Copyright (c) contributors
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/

package me.lucko.spark.common.sampler.async;

import me.lucko.spark.common.sampler.ThreadGrouper;
import me.lucko.spark.common.tick.TickReporter;
import me.lucko.spark.proto.SparkSamplerProtos;

import java.util.concurrent.TimeUnit;

public class TickedAsyncDataAggregator extends AsyncDataAggregator {

/** The callback called when this aggregator is closed, to clean up resources */
private final Runnable closeCallback;

/** Tick durations under this threshold will not be inserted, measured in milliseconds */
private final long tickLengthThreshold;

private final ExceedingTicksFilter filter;

protected TickedAsyncDataAggregator(ThreadGrouper threadGrouper, boolean ignoreSleeping, TickReporter tickReporter, int tickLengthThreshold) {
super(threadGrouper, ignoreSleeping);
this.tickLengthThreshold = TimeUnit.MILLISECONDS.toMicros(tickLengthThreshold);
this.filter = new ExceedingTicksFilter(tickLengthThreshold);
tickReporter.addCallback(this.filter);
this.closeCallback = () -> tickReporter.removeCallback(this.filter);
}

@Override
public void insertData(ProfileSegment element, int window) {
// with async-profiler clock=monotonic, the event time uses the same clock
// as System.nanoTime(), so we can compare it directly
long time = element.getTime();
if (!this.filter.duringExceedingTick(time)) {
return;
}
super.insertData(element, window);
}

@Override
public SparkSamplerProtos.SamplerMetadata.DataAggregator getMetadata() {
return SparkSamplerProtos.SamplerMetadata.DataAggregator.newBuilder()
.setType(SparkSamplerProtos.SamplerMetadata.DataAggregator.Type.TICKED)
.setThreadGrouper(this.threadGrouper.asProto())
.setTickLengthThreshold(this.tickLengthThreshold)
.setNumberOfIncludedTicks(this.filter.exceedingTicksCount())
.build();

}

@Override
public void close() {
this.closeCallback.run();
}

}
Loading