Skip to content

Commit

Permalink
Wrap direct runner PipelineResult in DefaultPipelineLauncher
Browse files Browse the repository at this point in the history
- DefaultPipelineLauncher support direct runner

- Support Pipeline defined metrics for both direct runner and Dataflow runner

- Port existing IO load test to use DefaultPipelineLauncher

PiperOrigin-RevId: 523723380
  • Loading branch information
cloud-teleport committed Apr 12, 2023
1 parent d8e6159 commit 95609e3
Show file tree
Hide file tree
Showing 6 changed files with 440 additions and 79 deletions.
12 changes: 6 additions & 6 deletions it/google-cloud-platform/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,12 @@
<version>${guava.version}</version>
</dependency>
<!-- Beam. Try to minimize Beam dependencies, since this is about testing
pipelines, not writing them. -->
pipelines, not writing them. -->
<dependency>
<!-- Needed by DataflowPipelineOptions which extends BigQueryOptions -->
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
</dependency>
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-dataflow</artifactId>
Expand Down Expand Up @@ -99,11 +104,6 @@
<version>${autovalue.version}</version>
</dependency>
<!-- TODO(pranavbhandari): Remove the following dependencies after BigQueryIOLT is moved to the right directory -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-synthetic</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,37 @@ PipelineLauncher launcher() {
}

/** a utility DoFn that count element passed. */
protected static final class CountingFn<T> extends DoFn<T, Void> {
public static final class CountingFn<T> extends DoFn<T, Void> {

private final Counter elementCounter;

public CountingFn(String namespace, String name) {
elementCounter = Metrics.counter(namespace, name);
public CountingFn(String name) {
elementCounter = Metrics.counter(BEAM_METRICS_NAMESPACE, name);
}

@ProcessElement
public void processElement() {
elementCounter.inc(1L);
}
}

// To make PipelineLauncher.getMetric works in a unified way for both runner provided metrics and
// pipeline defined
// metrics, here we wrap Beam provided metrics as a pre-defined metrics name
// [name_space:metric_type:metric_name
// which will be recognized by getMetric method
public enum PipelineMetricsType {
COUNTER,
STARTTIME,
ENDTIME,
RUNTIME,
}

/** Namespace for Beam provided pipeline metrics (set up by Metrics transform). */
public static final String BEAM_METRICS_NAMESPACE = "BEAM_METRICS";

/** Given a metrics name, return Beam metrics name. */
public static String getBeamMetricsName(PipelineMetricsType metricstype, String metricsName) {
return BEAM_METRICS_NAMESPACE + ":" + metricstype + ":" + metricsName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,31 @@
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.cloud.teleport.it.common.AbstractPipelineLauncher;
import com.google.cloud.teleport.it.common.PipelineLauncher;
import com.google.cloud.teleport.it.common.utils.PipelineUtils;
import com.google.cloud.teleport.it.gcp.IOLoadTestBase;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.re2j.Matcher;
import com.google.re2j.Pattern;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.stream.StreamSupport;
import org.apache.beam.runners.dataflow.DataflowPipelineJob;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.metrics.DistributionResult;
import org.apache.beam.sdk.metrics.MetricNameFilter;
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricResult;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -41,6 +58,29 @@ public class DefaultPipelineLauncher extends AbstractPipelineLauncher {
private static final Logger LOG = LoggerFactory.getLogger(DefaultPipelineLauncher.class);
private static final Pattern JOB_ID_PATTERN = Pattern.compile("Submitted job: (\\S+)");

// For unsupported runners (other than dataflow), implement launcher methods by operating with
// PipelineResult.
private static final Map<String, PipelineResult> MANAGED_JOBS = new HashMap<>();

// For supported runners (e.g. DataflowRunner), still keep a PipelineResult for pipeline specific
// usages, e.g.,
// polling custom metrics
private static final Map<String, PipelineResult> UNMANAGED_JOBS = new HashMap<>();

private static final long UNKNOWNL_METRIC_VALUE = -1L;

private static final Map<PipelineResult.State, JobState> PIPILINE_STATE_TRANSLATE =
ImmutableMap.<PipelineResult.State, JobState>builder()
.put(PipelineResult.State.CANCELLED, JobState.CANCELLED)
.put(PipelineResult.State.RUNNING, JobState.RUNNING)
.put(PipelineResult.State.DONE, JobState.DONE)
.put(PipelineResult.State.FAILED, JobState.FAILED)
.put(PipelineResult.State.STOPPED, JobState.STOPPED)
.put(PipelineResult.State.UNKNOWN, JobState.UNKNOWN)
.put(PipelineResult.State.UPDATED, JobState.UPDATED)
.put(PipelineResult.State.UNRECOGNIZED, JobState.UNKNOWN)
.build();

private DefaultPipelineLauncher(DefaultPipelineLauncher.Builder builder) {
super(
new Dataflow(
Expand All @@ -55,6 +95,174 @@ public static DefaultPipelineLauncher.Builder builder() {
return new DefaultPipelineLauncher.Builder();
}

@Override
public JobState getJobStatus(String project, String region, String jobId) throws IOException {
if (MANAGED_JOBS.containsKey(jobId)) {
return PIPILINE_STATE_TRANSLATE.get(MANAGED_JOBS.get(jobId).getState());
} else {
return super.handleJobState(getJob(project, region, jobId));
}
}

@Override
public Job cancelJob(String project, String region, String jobId) {
if (MANAGED_JOBS.containsKey(jobId)) {
try {
MANAGED_JOBS.get(jobId).cancel();
} catch (IOException e) {
throw new RuntimeException(e);
}
return new Job().setId(jobId).setRequestedState(JobState.CANCELLED.toString());
} else {
return super.cancelJob(project, region, jobId);
}
}

@Override
public Job getJob(String project, String region, String jobId) throws IOException {
if (MANAGED_JOBS.containsKey(jobId)) {
return new Job()
.setId(jobId)
.setRequestedState(
PIPILINE_STATE_TRANSLATE.get(MANAGED_JOBS.get(jobId).getState()).toString());
} else {
return super.getJob(project, region, jobId);
}
}

@Override
public Job drainJob(String project, String region, String jobId) {
if (MANAGED_JOBS.containsKey(jobId)) {
// drain unsupported. Just cancel.
Job job = new Job().setId(jobId).setRequestedState(JobState.DRAINED.toString());
cancelJob(project, region, jobId);
return job;
} else {
return super.drainJob(project, region, jobId);
}
}

private static <T> void checkIfMetricResultIsUnique(
String name, Iterable<MetricResult<T>> metricResult) throws IllegalStateException {
int resultCount = Iterables.size(metricResult);
Preconditions.checkState(
resultCount <= 1,
"More than one metric result matches name: %s in namespace %s. Metric results count: %s",
name,
IOLoadTestBase.BEAM_METRICS_NAMESPACE,
resultCount);
}

private static Iterable<MetricResult<DistributionResult>> getDistributions(
PipelineResult result, String metricName) {
MetricQueryResults metrics =
result
.metrics()
.queryMetrics(
MetricsFilter.builder()
.addNameFilter(
MetricNameFilter.named(IOLoadTestBase.BEAM_METRICS_NAMESPACE, metricName))
.build());
return metrics.getDistributions();
}

/** Pull Beam pipeline defined metrics given the jobId. */
public Long getBeamMetric(
String jobId, IOLoadTestBase.PipelineMetricsType metricType, String metricName) {
PipelineResult pipelineResult =
MANAGED_JOBS.getOrDefault(jobId, UNMANAGED_JOBS.getOrDefault(jobId, null));
if (pipelineResult != null) {
MetricQueryResults metrics =
pipelineResult
.metrics()
.queryMetrics(
MetricsFilter.builder()
.addNameFilter(
MetricNameFilter.named(IOLoadTestBase.BEAM_METRICS_NAMESPACE, metricName))
.build());

switch (metricType) {
case COUNTER:
Iterable<MetricResult<Long>> counters = metrics.getCounters();
checkIfMetricResultIsUnique(metricName, counters);
try {
MetricResult<Long> metricResult = counters.iterator().next();
return metricResult.getAttempted();
} catch (NoSuchElementException e) {
LOG.error(
"Failed to get metric {}, from namespace {}",
metricName,
IOLoadTestBase.BEAM_METRICS_NAMESPACE);
}
return UNKNOWNL_METRIC_VALUE;
case STARTTIME:
case ENDTIME:
case RUNTIME:
Iterable<MetricResult<DistributionResult>> distributions =
getDistributions(pipelineResult, metricName);
Long lowestMin =
StreamSupport.stream(distributions.spliterator(), true)
.map(element -> Objects.requireNonNull(element.getAttempted()).getMin())
.min(Long::compareTo)
.orElse(UNKNOWNL_METRIC_VALUE);
Long greatestMax =
StreamSupport.stream(distributions.spliterator(), true)
.map(element -> Objects.requireNonNull(element.getAttempted()).getMax())
.max(Long::compareTo)
.orElse(UNKNOWNL_METRIC_VALUE);
if (metricType == IOLoadTestBase.PipelineMetricsType.STARTTIME) {
return lowestMin;
} else if (metricType == IOLoadTestBase.PipelineMetricsType.ENDTIME) {
return greatestMax;
} else {
if (lowestMin != UNKNOWNL_METRIC_VALUE && greatestMax != UNKNOWNL_METRIC_VALUE) {
return greatestMax - lowestMin;
} else {
return UNKNOWNL_METRIC_VALUE;
}
}
default:
throw new IllegalArgumentException(
String.format("Unexpected metric type %s.", metricType));
}
} else {
LOG.warn("Query pipeline defined metrics this SDK or runner is currently unsupported.");
return UNKNOWNL_METRIC_VALUE;
}
}

@Override
public Double getMetric(String project, String region, String jobId, String metricName)
throws IOException {
if (metricName.startsWith(IOLoadTestBase.BEAM_METRICS_NAMESPACE)) {
String[] nameSpacedMetrics = metricName.split(":", 3);
Preconditions.checkState(
nameSpacedMetrics.length == 3,
String.format(
"Invalid Beam metrics name: %s, expected: '%s:metric_type:metric_name'",
metricName, IOLoadTestBase.BEAM_METRICS_NAMESPACE));
IOLoadTestBase.PipelineMetricsType metricType =
IOLoadTestBase.PipelineMetricsType.valueOf(nameSpacedMetrics[1]);

// Pipeline defined metrics are long values. Have to cast to double that is what the base
// class defined.
return getBeamMetric(jobId, metricType, nameSpacedMetrics[2]).doubleValue();
} else {
return super.getMetric(project, region, jobId, metricName);
}
}

@Override
public Map<String, Double> getMetrics(String project, String region, String jobId)
throws IOException {
if (MANAGED_JOBS.containsKey(jobId)) {
// unsupported. Just return an empty map
return new HashMap<>();
} else {
return super.getMetrics(project, region, jobId);
}
}

@Override
public LaunchInfo launch(String project, String region, LaunchConfig options) throws IOException {
checkState(
Expand All @@ -72,10 +280,40 @@ public LaunchInfo launch(String project, String region, LaunchConfig options) th
options.pipeline() != null,
"Cannot launch a dataflow job "
+ "without pipeline specified. Please specify pipeline and try again!");
cmd = extractOptions(project, region, options);
DataflowPipelineJob job =
(DataflowPipelineJob) options.pipeline().runWithAdditionalOptionArgs(cmd);
jobId = job.getJobId();
// we expect options are mostly ready except those provided as arguments (project, region,
// runner)
PipelineOptions pipelineOptions = options.pipeline().getOptions();
pipelineOptions.setRunner(PipelineUtils.getRunnerClass(options.getParameter("runner")));
pipelineOptions.setJobName(options.jobName());
if (Objects.equals(options.getParameter("runner"), "DataflowRunner")) {
pipelineOptions.as(DataflowPipelineOptions.class).setProject(project);
pipelineOptions.as(DataflowPipelineOptions.class).setRegion(region);
}
PipelineResult pipelineResult = options.pipeline().run();
if (Objects.equals(options.getParameter("runner"), "DataflowRunner")) {
// dataflow runner generated a jobId of certain format for each job
DataflowPipelineJob job = (DataflowPipelineJob) pipelineResult;
jobId = job.getJobId();
UNMANAGED_JOBS.put(jobId, pipelineResult);
} else {
// for unsupported runners (e.g. direct runner), set jobId the same as jobName
jobId = options.jobName();
MANAGED_JOBS.put(jobId, pipelineResult);

// for unsupported runners (e.g. direct runner), return a wrapped LaunchInfo
return LaunchInfo.builder()
.setJobId(jobId)
.setProjectId(project)
.setRegion(region)
.setCreateTime("")
.setSdk("DirectBeam")
.setVersion("0.0.1")
.setJobType("JOB_TYPE_BATCH")
.setRunner(options.getParameter("runner"))
.setParameters(options.parameters())
.setState(JobState.RUNNING)
.build();
}
break;
case PYTHON:
checkState(
Expand Down
Loading

0 comments on commit 95609e3

Please sign in to comment.