Skip to content

Commit

Permalink
Enforce new ingestion reason for spark traces (DataDog#6310)
Browse files Browse the repository at this point in the history
Enforce new ingestion reason for spark traces

# Motivation

It is critical to keep all spark traces as customers closely monitor job runs. The new ingestion reason will allow tracking of ingested bytes for billing

# Additional Notes

Added the method AgentSpan setSamplingPriority(final int newPriority, int samplingMechanism) in the AgentSpan interface so that it can be called from an instrumentation
  • Loading branch information
paul-laffon-dd authored Dec 11, 2023
1 parent 00358aa commit 163ba8a
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import datadog.trace.api.Config;
import datadog.trace.api.DDTags;
import datadog.trace.api.DDTraceId;
import datadog.trace.api.sampling.PrioritySampling;
import datadog.trace.api.sampling.SamplingMechanism;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import de.thetaphi.forbiddenapis.SuppressForbidden;
Expand Down Expand Up @@ -135,6 +137,7 @@ private void initApplicationSpanIfNotInitialized() {
captureApplicationParameters(builder);

applicationSpan = builder.start();
setDataJobsSamplingPriority(applicationSpan);
applicationSpan.setMeasured(true);
}

Expand Down Expand Up @@ -203,6 +206,7 @@ private AgentSpan getOrCreateStreamingBatchSpan(
}

batchSpan = builder.start();
setDataJobsSamplingPriority(batchSpan);
streamingBatchSpans.put(batchKey, batchSpan);
return batchSpan;
}
Expand Down Expand Up @@ -267,6 +271,7 @@ private AgentSpan getOrCreateSqlSpan(
}

AgentSpan sqlSpan = spanBuilder.start();
setDataJobsSamplingPriority(sqlSpan);
sqlSpans.put(sqlExecutionId, sqlSpan);
return sqlSpan;
}
Expand Down Expand Up @@ -321,6 +326,7 @@ public synchronized void onJobStart(SparkListenerJobStart jobStart) {
captureJobParameters(jobSpanBuilder, jobStart.properties());

AgentSpan jobSpan = jobSpanBuilder.start();
setDataJobsSamplingPriority(jobSpan);
jobSpan.setMeasured(true);

for (int stageId : getSparkJobStageIds(jobStart)) {
Expand Down Expand Up @@ -404,6 +410,7 @@ public synchronized void onStageSubmitted(SparkListenerStageSubmitted stageSubmi
.withTag(DDTags.RESOURCE_NAME, stageSubmitted.stageInfo().name())
.start();

setDataJobsSamplingPriority(stageSpan);
stageSpan.setMeasured(true);

stageSpans.put(stageSpanKey(stageId, stageAttemptId), stageSpan);
Expand Down Expand Up @@ -551,6 +558,7 @@ private void sendTaskSpan(
taskSpan.setTag("count_towards_task_failures", reason.countTowardsTaskFailures());
}

setDataJobsSamplingPriority(taskSpan);
taskSpan.finish(taskEnd.taskInfo().finishTime() * 1000);
}

Expand Down Expand Up @@ -753,6 +761,10 @@ private synchronized void onStreamingQueryProgressEvent(
}
}

private void setDataJobsSamplingPriority(AgentSpan span) {
span.setSamplingPriority(PrioritySampling.USER_KEEP, SamplingMechanism.DATA_JOBS);
}

private AgentTracer.SpanBuilder buildSparkSpan(String spanName, Properties properties) {
AgentTracer.SpanBuilder builder =
tracer.buildSpan(spanName).withSpanType("spark").withTag("app_id", appId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public AgentTrace getTrace() {

@Override
public int getSamplingPriority() {
return PrioritySampling.SAMPLER_KEEP;
return PrioritySampling.UNSET;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package datadog.trace.instrumentation.spark

import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.api.Platform
import datadog.trace.api.sampling.PrioritySampling
import datadog.trace.api.sampling.SamplingMechanism
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.SparkSession
Expand Down Expand Up @@ -69,6 +71,8 @@ class AbstractSparkStructuredStreamingTest extends AgentTestRunner {
resourceName "test-query"
spanType "spark"
parent()
assert span.context().getSamplingPriority() == PrioritySampling.USER_KEEP
assert span.context().getPropagationTags().createTagMap()["_dd.p.dm"] == (-SamplingMechanism.DATA_JOBS).toString()
tags {
defaultTags()
// Streaming tags
Expand Down Expand Up @@ -174,6 +178,8 @@ class AbstractSparkStructuredStreamingTest extends AgentTestRunner {
operationName "spark.streaming_batch"
spanType "spark"
assert span.tags["streaming_query.batch_id"] == 1
assert span.context().getSamplingPriority() == PrioritySampling.USER_KEEP
assert span.context().getPropagationTags().createTagMap()["_dd.p.dm"] == (-SamplingMechanism.DATA_JOBS).toString()
parent()
}
span {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.api.DDSpanId
import datadog.trace.api.DDTraceId
import datadog.trace.api.Platform
import datadog.trace.api.sampling.PrioritySampling
import datadog.trace.api.sampling.SamplingMechanism
import datadog.trace.test.util.Flaky
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
import org.apache.hadoop.yarn.conf.YarnConfiguration
Expand Down Expand Up @@ -51,6 +53,8 @@ abstract class AbstractSparkTest extends AgentTestRunner {
resourceName "spark.application"
spanType "spark"
errored false
assert span.context().getSamplingPriority() == PrioritySampling.USER_KEEP
assert span.context().getPropagationTags().createTagMap()["_dd.p.dm"] == (-SamplingMechanism.DATA_JOBS).toString()
parent()
}
span {
Expand Down Expand Up @@ -254,6 +258,8 @@ abstract class AbstractSparkTest extends AgentTestRunner {
spanType "spark"
traceId 8944764253919609482G
parentSpanId 15104224823446433673G
assert span.context().getSamplingPriority() == PrioritySampling.USER_KEEP
assert span.context().getPropagationTags().createTagMap()["_dd.p.dm"] == (-SamplingMechanism.DATA_JOBS).toString()
assert span.tags["databricks_job_id"] == "1234"
assert span.tags["databricks_job_run_id"] == "5678"
assert span.tags["databricks_task_run_id"] == "9012"
Expand All @@ -275,6 +281,8 @@ abstract class AbstractSparkTest extends AgentTestRunner {
spanType "spark"
traceId 5240384461065211484G
parentSpanId 14128229261586201946G
assert span.context().getSamplingPriority() == PrioritySampling.USER_KEEP
assert span.context().getPropagationTags().createTagMap()["_dd.p.dm"] == (-SamplingMechanism.DATA_JOBS).toString()
assert span.tags["databricks_job_id"] == "3456"
assert span.tags["databricks_job_run_id"] == "901"
assert span.tags["databricks_task_run_id"] == "7890"
Expand All @@ -296,6 +304,8 @@ abstract class AbstractSparkTest extends AgentTestRunner {
spanType "spark"
traceId 2235374731114184741G
parentSpanId 8956125882166502063G
assert span.context().getSamplingPriority() == PrioritySampling.USER_KEEP
assert span.context().getPropagationTags().createTagMap()["_dd.p.dm"] == (-SamplingMechanism.DATA_JOBS).toString()
assert span.tags["databricks_job_id"] == "123"
assert span.tags["databricks_job_run_id"] == "8765"
assert span.tags["databricks_task_run_id"] == "456"
Expand All @@ -316,6 +326,8 @@ abstract class AbstractSparkTest extends AgentTestRunner {
operationName "spark.job"
spanType "spark"
parent()
assert span.context().getSamplingPriority() == PrioritySampling.USER_KEEP
assert span.context().getPropagationTags().createTagMap()["_dd.p.dm"] == (-SamplingMechanism.DATA_JOBS).toString()
assert span.tags["databricks_job_id"] == null
assert span.tags["databricks_job_run_id"] == "8765"
assert span.tags["databricks_task_run_id"] == null
Expand Down Expand Up @@ -429,6 +441,8 @@ abstract class AbstractSparkTest extends AgentTestRunner {
spanType "spark"
traceId 8944764253919609482G
parentSpanId 15104224823446433673G
assert span.context().getSamplingPriority() == PrioritySampling.USER_KEEP
assert span.context().getPropagationTags().createTagMap()["_dd.p.dm"] == (-SamplingMechanism.DATA_JOBS).toString()
}
span {
operationName "spark.job"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ public class SamplingMechanism {
public static final byte REMOTE_USER_RATE = 6;
/** Span Sampling Rate (single span sampled on account of a span sampling rule) */
public static final byte SPAN_SAMPLING_RATE = 8;
/** Data Jobs */
public static final byte DATA_JOBS = 10;
/** Force override sampling decision from external source, like W3C traceparent. */
public static final byte EXTERNAL_OVERRIDE = Byte.MIN_VALUE;

Expand All @@ -40,6 +42,7 @@ public static boolean validateWithSamplingPriority(int mechanism, int priority)
return priority == USER_DROP || priority == USER_KEEP;

case APPSEC:
case DATA_JOBS:
return priority == PrioritySampling.USER_KEEP;

case EXTERNAL_OVERRIDE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ public interface AgentSpan extends MutableSpan, IGSpanInfo {

Integer forceSamplingDecision();

AgentSpan setSamplingPriority(final int newPriority, int samplingMechanism);

TraceConfig traceConfig();

void addLink(AgentSpanLink link);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,11 @@ public Integer forceSamplingDecision() {
return null;
}

@Override
public AgentSpan setSamplingPriority(int newPriority, int samplingMechanism) {
return this;
}

@Override
public Integer getSamplingPriority() {
return (int) PrioritySampling.UNSET;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ class SamplingMechanismTest extends Specification {
APPSEC | userDropX | false
APPSEC | userKeepX | false

DATA_JOBS | UNSET | false
DATA_JOBS | SAMPLER_DROP | false
DATA_JOBS | SAMPLER_KEEP | false
DATA_JOBS | USER_DROP | false
DATA_JOBS | USER_KEEP | true
DATA_JOBS | userDropX | false
DATA_JOBS | userKeepX | false

EXTERNAL_OVERRIDE | UNSET | false
EXTERNAL_OVERRIDE | SAMPLER_DROP | false
EXTERNAL_OVERRIDE | SAMPLER_KEEP | false
Expand Down

0 comments on commit 163ba8a

Please sign in to comment.