Skip to content

Commit

Permalink
Spark Executor Tasks Instrumentation (DataDog#6515)
Browse files Browse the repository at this point in the history
Add a new instrumentation `spark-executor` to trace tasks running on spark executors generating traces for `spark.task` with underlying spans (like s3 or database calls) being linked.

The existing spark instrumentation is running on the spark driver, capturing spark internals (spark.sql, spark.job, spark.stage...), while this new instrumentation will trace task being run on spark executors.

Additional notes:
- each `spark.task` is a distinct trace and not on the existing spark.application / spark.job traces:
  - a spark job/application can have millions on tasks, making it unpractical to have everything in one trace
  - sampling should be applied to spark.task, while all spark.application and spark.job traces are kept
  - Span link will be used to link the spark.task to other spark traces
- task errors are not captured for now: spark catch all throwable in task execution, making it complex to retrieve it from the instrumentation
  • Loading branch information
paul-laffon-dd authored Feb 27, 2024
1 parent b2a77cb commit 2c1fe7c
Show file tree
Hide file tree
Showing 6 changed files with 347 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,7 @@ dd-java-agent/instrumentation/spring-security-5/ @DataDog/asm-java
**/iast/ @DataDog/asm-java
**/Iast*.java @DataDog/asm-java
**/Iast*.groovy @DataDog/asm-java

# @DataDog/data-jobs-monitoring
dd-java-agent/instrumentation/spark/ @DataDog/data-jobs-monitoring
dd-java-agent/instrumentation/spark-executor/ @DataDog/data-jobs-monitoring
45 changes: 45 additions & 0 deletions dd-java-agent/instrumentation/spark-executor/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
muzzle {
pass {
group = "org.apache.spark"
module = "spark-sql_2.12"
versions = "[2.4.0,)"
assertInverse = true
}
pass {
group = "org.apache.spark"
module = "spark-sql_2.13"
versions = "[3.2.0,)"
assertInverse = true
}
}

apply from: "$rootDir/gradle/java.gradle"

// Since we are using different scala versions for different test sets,
// we put the test classes in the baseTest test set so that the scala
// version is not inherited
addTestSuite('baseTest')
addTestSuiteForDir('latest212DepTest', 'baseTest')
addTestSuiteForDir('latest213DepTest', 'baseTest')

ext {
// Hadoop does not behave correctly with OpenJ9 https://issues.apache.org/jira/browse/HADOOP-18174
excludeJdk = ['SEMERU8', 'SEMERU11']

// Spark does not support Java > 11 until 3.3.0 https://issues.apache.org/jira/browse/SPARK-33772
maxJavaVersionForTests = JavaVersion.VERSION_11
}

dependencies {
compileOnly group: 'org.apache.spark', name: 'spark-core_2.12', version: '2.4.0'
compileOnly group: 'org.apache.spark', name: 'spark-sql_2.12', version: '2.4.0'

baseTestImplementation group: 'org.apache.spark', name: "spark-core_2.12", version: "2.4.0"
baseTestImplementation group: 'org.apache.spark', name: "spark-sql_2.12", version: "2.4.0"

latest212DepTestImplementation group: 'org.apache.spark', name: "spark-core_2.12", version: '+'
latest212DepTestImplementation group: 'org.apache.spark', name: "spark-sql_2.12", version: '+'

latest213DepTestImplementation group: 'org.apache.spark', name: "spark-core_2.13", version: '+'
latest213DepTestImplementation group: 'org.apache.spark', name: "spark-sql_2.13", version: '+'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.bootstrap.instrumentation.api.Tags
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.RowFactory
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType

class SparkExecutorTest extends AgentTestRunner {

@Override
void configurePreAgent() {
super.configurePreAgent()
injectSysConfig("dd.integration.spark-executor.enabled", "true")
}

private Dataset<Row> generateSampleDataframe(SparkSession spark) {
def structType = new StructType()
structType = structType.add("col", "String", false)

def rows = new ArrayList<Row>()
rows.add(RowFactory.create("value"))
spark.createDataFrame(rows, structType)
}

def "generate spark task run spans"() {
setup:
def sparkSession = SparkSession.builder()
.config("spark.master", "local[2]")
.config("spark.sql.shuffle.partitions", "2")
.appName("test-app")
.getOrCreate()

def df = generateSampleDataframe(sparkSession)
df.count()
sparkSession.stop()

expect:
assertTraces(2) {
trace(1) {
span {
operationName "spark.task"
resourceName "spark.task"
parent()
tags {
"$Tags.COMPONENT" "spark"
"task_id" 0
"task_thread_name" String
"stage_id" 0
"stage_attempt_id" 0
"job_id" 0
"app_id" String
"application_name" "test-app"

// Spark metrics
"spark.executor_deserialize_time" Long
"spark.executor_deserialize_cpu_time" Long
"spark.executor_run_time" Long
"spark.executor_cpu_time" Long
"spark.result_size" Long
"spark.jvm_gc_time" Long
"spark.result_serialization_time" Long
"spark.memory_bytes_spilled" Long
"spark.disk_bytes_spilled" Long
"spark.peak_execution_memory" Long
"spark.input_bytes" Long
"spark.input_records" Long
"spark.output_bytes" Long
"spark.output_records" Long
"spark.shuffle_read_bytes" Long
"spark.shuffle_read_bytes_local" Long
"spark.shuffle_read_bytes_remote" Long
"spark.shuffle_read_bytes_remote_to_disk" Long
"spark.shuffle_read_fetch_wait_time" Long
"spark.shuffle_read_records" Long
"spark.shuffle_write_bytes" Long
"spark.shuffle_write_records" Long
"spark.shuffle_write_time" Long
defaultTags()
}
}
}
trace(1) {
span {
operationName "spark.task"
resourceName "spark.task"
parent()
tags {
"$Tags.COMPONENT" "spark"
"task_id" 1
"task_thread_name" String
"stage_id" Integer
"stage_attempt_id" 0
"job_id" Integer
"app_id" String
"application_name" "test-app"

// Spark metrics
"spark.executor_deserialize_time" Long
"spark.executor_deserialize_cpu_time" Long
"spark.executor_run_time" Long
"spark.executor_cpu_time" Long
"spark.result_size" Long
"spark.jvm_gc_time" Long
"spark.result_serialization_time" Long
"spark.memory_bytes_spilled" Long
"spark.disk_bytes_spilled" Long
"spark.peak_execution_memory" Long
"spark.input_bytes" Long
"spark.input_records" Long
"spark.output_bytes" Long
"spark.output_records" Long
"spark.shuffle_read_bytes" Long
"spark.shuffle_read_bytes_local" Long
"spark.shuffle_read_bytes_remote" Long
"spark.shuffle_read_bytes_remote_to_disk" Long
"spark.shuffle_read_fetch_wait_time" Long
"spark.shuffle_read_records" Long
"spark.shuffle_write_bytes" Long
"spark.shuffle_write_records" Long
"spark.shuffle_write_time" Long
defaultTags()
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package datadog.trace.instrumentation.spark;

import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import datadog.trace.bootstrap.instrumentation.decorator.BaseDecorator;
import org.apache.spark.executor.Executor;
import org.apache.spark.executor.TaskMetrics;

public class SparkExecutorDecorator extends BaseDecorator {
public static final CharSequence SPARK_TASK = UTF8BytesString.create("spark.task");
public static final CharSequence SPARK = UTF8BytesString.create("spark");
public static SparkExecutorDecorator DECORATE = new SparkExecutorDecorator();

@Override
protected String[] instrumentationNames() {
return new String[] {"spark-executor"};
}

@Override
protected CharSequence spanType() {
return null;
}

@Override
protected CharSequence component() {
return SPARK;
}

public void onTaskStart(AgentSpan span, Executor.TaskRunner taskRunner) {
span.setTag("task_id", taskRunner.taskId());
span.setTag("task_thread_name", taskRunner.threadName());
}

public void onTaskEnd(AgentSpan span, Executor.TaskRunner taskRunner) {
// task is set by spark in run() by deserializing the task binary coming from the driver
if (taskRunner.task() == null) {
return;
}

span.setTag("stage_id", taskRunner.task().stageId());
span.setTag("stage_attempt_id", taskRunner.task().stageAttemptId());

if (taskRunner.task().jobId().isDefined()) {
span.setTag("job_id", taskRunner.task().jobId().get());
}
if (taskRunner.task().appId().isDefined()) {
span.setTag("app_id", taskRunner.task().appId().get());
}
if (taskRunner.task().appAttemptId().isDefined()) {
span.setTag("app_attempt_id", taskRunner.task().appAttemptId().get());
}
span.setTag(
"application_name", taskRunner.task().localProperties().getProperty("spark.app.name"));

TaskMetrics metrics = taskRunner.task().metrics();
span.setMetric("spark.executor_deserialize_time", metrics.executorDeserializeTime());
span.setMetric("spark.executor_deserialize_cpu_time", metrics.executorDeserializeCpuTime());
span.setMetric("spark.executor_run_time", metrics.executorRunTime());
span.setMetric("spark.executor_cpu_time", metrics.executorCpuTime());
span.setMetric("spark.result_size", metrics.resultSize());
span.setMetric("spark.jvm_gc_time", metrics.jvmGCTime());
span.setMetric("spark.result_serialization_time", metrics.resultSerializationTime());
span.setMetric("spark.memory_bytes_spilled", metrics.memoryBytesSpilled());
span.setMetric("spark.disk_bytes_spilled", metrics.diskBytesSpilled());
span.setMetric("spark.peak_execution_memory", metrics.peakExecutionMemory());

span.setMetric("spark.input_bytes", metrics.inputMetrics().bytesRead());
span.setMetric("spark.input_records", metrics.inputMetrics().recordsRead());
span.setMetric("spark.output_bytes", metrics.outputMetrics().bytesWritten());
span.setMetric("spark.output_records", metrics.outputMetrics().recordsWritten());

span.setMetric("spark.shuffle_read_bytes", metrics.shuffleReadMetrics().totalBytesRead());
span.setMetric("spark.shuffle_read_bytes_local", metrics.shuffleReadMetrics().localBytesRead());
span.setMetric(
"spark.shuffle_read_bytes_remote", metrics.shuffleReadMetrics().remoteBytesRead());
span.setMetric(
"spark.shuffle_read_bytes_remote_to_disk",
metrics.shuffleReadMetrics().remoteBytesReadToDisk());
span.setMetric(
"spark.shuffle_read_fetch_wait_time", metrics.shuffleReadMetrics().fetchWaitTime());
span.setMetric("spark.shuffle_read_records", metrics.shuffleReadMetrics().recordsRead());

span.setMetric("spark.shuffle_write_bytes", metrics.shuffleWriteMetrics().bytesWritten());
span.setMetric("spark.shuffle_write_records", metrics.shuffleWriteMetrics().recordsWritten());
span.setMetric("spark.shuffle_write_time", metrics.shuffleWriteMetrics().writeTime());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package datadog.trace.instrumentation.spark;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.instrumentation.spark.SparkExecutorDecorator.DECORATE;
import static datadog.trace.instrumentation.spark.SparkExecutorDecorator.SPARK_TASK;
import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import net.bytebuddy.asm.Advice;
import org.apache.spark.executor.Executor;

@AutoService(Instrumenter.class)
public class SparkExecutorInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForSingleType {

public SparkExecutorInstrumentation() {
super("spark-executor");
}

@Override
public boolean defaultEnabled() {
return false;
}

@Override
public String instrumentedType() {
return "org.apache.spark.executor.Executor$TaskRunner";
}

@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".SparkExecutorDecorator",
};
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isMethod()
.and(named("run"))
.and(isDeclaredBy(named("org.apache.spark.executor.Executor$TaskRunner"))),
SparkExecutorInstrumentation.class.getName() + "$RunAdvice");
}

public static final class RunAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static AgentScope enter(@Advice.This Executor.TaskRunner taskRunner) {
final AgentSpan span = startSpan("spark-executor", SPARK_TASK);

DECORATE.afterStart(span);
DECORATE.onTaskStart(span, taskRunner);

return activateSpan(span);
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void exit(
@Advice.Enter final AgentScope scope, @Advice.This final Executor.TaskRunner taskRunner) {
if (scope == null) {
return;
}

final AgentSpan span = scope.span();

try {
DECORATE.onTaskEnd(span, taskRunner);
DECORATE.beforeFinish(scope);
} finally {
scope.close();
span.finish();
}
}
}
}
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ include ':dd-java-agent:instrumentation:slick'
include ':dd-java-agent:instrumentation:spark'
include ':dd-java-agent:instrumentation:spark:spark_2.12'
include ':dd-java-agent:instrumentation:spark:spark_2.13'
include ':dd-java-agent:instrumentation:spark-executor'
include ':dd-java-agent:instrumentation:sparkjava-2.3'
include ':dd-java-agent:instrumentation:spray-1.3'
include ':dd-java-agent:instrumentation:spring-beans'
Expand Down

0 comments on commit 2c1fe7c

Please sign in to comment.