Skip to content

Commit

Permalink
Replace WorkerPlane with Micronaut environments (#17286)
Browse files Browse the repository at this point in the history
* Replace WorkerPlane with Micronaut environments

* Set proper environment for airbyte-workers
  • Loading branch information
jdpgrailsdev authored Sep 28, 2022
1 parent dd95d3a commit 0ace816
Show file tree
Hide file tree
Showing 36 changed files with 109 additions and 170 deletions.
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ LOG_LEVEL=INFO

### APPLICATIONS ###
# Worker #
WORKERS_MICRONAUT_ENVIRONMENTS=control
WORKERS_MICRONAUT_ENVIRONMENTS=control-plane
# Relevant to scaling.
MAX_SYNC_WORKERS=5
MAX_SPEC_WORKERS=5
Expand Down
2 changes: 1 addition & 1 deletion .env.dev
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ API_URL=/api/v1/
INTERNAL_API_HOST=airbyte-server:8001
SYNC_JOB_MAX_ATTEMPTS=3
SYNC_JOB_MAX_TIMEOUT_DAYS=3
WORKERS_MICRONAUT_ENVIRONMENTS=control
WORKERS_MICRONAUT_ENVIRONMENTS=control-plane

# Sentry
SENTRY_DSN=""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -568,15 +568,6 @@ public interface Configs {
*/
boolean shouldRunConnectionManagerWorkflows();

/**
* Define if the worker is operating within Airbyte's Control Plane, or within an external Data
* Plane. - Workers in the Control Plane process tasks related to control-flow, like scheduling and
* routing, as well as data syncing tasks that are enqueued for the Control Plane's default task
* queue. - Workers in a Data Plane process only tasks related to data syncing that are specifically
* enqueued for that worker's particular Data Plane.
*/
WorkerPlane getWorkerPlane();

// Worker - Control Plane configs

/**
Expand Down Expand Up @@ -717,9 +708,4 @@ enum SecretPersistenceType {
VAULT
}

enum WorkerPlane {
CONTROL_PLANE,
DATA_PLANE
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ public class EnvConfigs implements Configs {
private static final String SHOULD_RUN_DISCOVER_WORKFLOWS = "SHOULD_RUN_DISCOVER_WORKFLOWS";
private static final String SHOULD_RUN_SYNC_WORKFLOWS = "SHOULD_RUN_SYNC_WORKFLOWS";
private static final String SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS = "SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS";
private static final String WORKER_PLANE = "WORKER_PLANE";

// Worker - Control plane configs
private static final String DEFAULT_DATA_SYNC_TASK_QUEUES = "SYNC"; // should match TemporalJobType.SYNC.name()
Expand Down Expand Up @@ -229,8 +228,6 @@ public EnvConfigs(final Map<String, String> envMap) {
this.getAllEnvKeys = envMap::keySet;
this.logConfigs = new LogConfigs(getLogConfiguration());
this.stateStorageCloudConfigs = getStateStorageConfiguration().orElse(null);

validateSyncWorkflowConfigs();
}

private Optional<CloudStorageConfigs> getLogConfiguration() {
Expand Down Expand Up @@ -931,11 +928,6 @@ public boolean shouldRunConnectionManagerWorkflows() {
return getEnvOrDefault(SHOULD_RUN_CONNECTION_MANAGER_WORKFLOWS, true);
}

@Override
public WorkerPlane getWorkerPlane() {
return getEnvOrDefault(WORKER_PLANE, WorkerPlane.CONTROL_PLANE, s -> WorkerPlane.valueOf(s.toUpperCase()));
}

// Worker - Control plane

@Override
Expand Down Expand Up @@ -973,22 +965,6 @@ public String getDataPlaneServiceAccountEmail() {
return getEnvOrDefault(DATA_PLANE_SERVICE_ACCOUNT_EMAIL, "");
}

/**
* Ensures the user hasn't configured themselves into a corner by making sure that the worker is set
* up to properly process sync workflows. With sensible defaults, it should be hard to fail this
* validation, but this provides a safety net regardless.
*/
private void validateSyncWorkflowConfigs() {
if (shouldRunSyncWorkflows()) {
if (getWorkerPlane().equals(WorkerPlane.DATA_PLANE) && getDataSyncTaskQueues().isEmpty()) {
throw new IllegalArgumentException(String.format(
"When %s is true, the worker must either be configured as a Control Plane worker, or %s must be non-empty.",
SHOULD_RUN_SYNC_WORKFLOWS,
DATA_SYNC_TASK_QUEUES));
}
}
}

@Override
public Set<Integer> getTemporalWorkerPorts() {
final var ports = getEnvOrDefault(TEMPORAL_WORKER_PORTS, "");
Expand Down
2 changes: 1 addition & 1 deletion airbyte-workers/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ run {

environment 'AIRBYTE_ROLE', System.getenv('AIRBYTE_ROLE')
environment 'AIRBYTE_VERSION', env.VERSION
environment 'MICRONAUT_ENVIRONMENTS', 'local,control'
environment 'MICRONAUT_ENVIRONMENTS', 'control-plane'
}

task cloudStorageIntegrationTest(type: Test) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import io.airbyte.config.Configs.DeploymentMode;
import io.airbyte.config.Configs.TrackingStrategy;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.Configs.WorkerPlane;
import io.airbyte.config.MaxWorkersConfig;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.helpers.LogConfigs;
Expand All @@ -23,6 +22,7 @@
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.metrics.lib.MetricEmittingApps;
import io.airbyte.persistence.job.JobPersistence;
import io.airbyte.workers.config.WorkerMode;
import io.airbyte.workers.process.KubePortManagerSingleton;
import io.airbyte.workers.temporal.check.connection.CheckConnectionWorkflowImpl;
import io.airbyte.workers.temporal.discover.catalog.DiscoverCatalogWorkflowImpl;
Expand Down Expand Up @@ -138,21 +138,21 @@ public class ApplicationInitializer implements ApplicationEventListener<ServiceR
private WorkerEnvironment workerEnvironment;
@Inject
private WorkerFactory workerFactory;
@Inject
private WorkerPlane workerPlane;
@Value("${airbyte.workspace.root}")
private String workspaceRoot;
@Value("${temporal.cloud.namespace}")
private String temporalCloudNamespace;
@Value("${airbyte.data.sync.task-queue}")
private String syncTaskQueue;
@Inject
private Environment environment;

@Override
public void onApplicationEvent(final ServiceReadyEvent event) {
try {
initializeCommonDependencies();

if (WorkerPlane.CONTROL_PLANE.equals(workerPlane)) {
if (environment.getActiveNames().contains(WorkerMode.CONTROL_PLANE)) {
initializeControlPlaneDependencies();
} else {
log.info("Skipping Control Plane dependency initialization.");
Expand All @@ -163,7 +163,7 @@ public void onApplicationEvent(final ServiceReadyEvent event) {
log.info("Starting worker factory...");
workerFactory.start();

log.info("Application initialized (mode = {}).", workerPlane);
log.info("Application initialized.");
} catch (final DatabaseCheckException | ExecutionException | InterruptedException | IOException | TimeoutException e) {
log.error("Unable to initialize application.", e);
throw new IllegalStateException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,15 @@
public class ActivityBeanFactory {

@Singleton
@Requires(property = "airbyte.worker.plane",
pattern = "(?i)^(?!data_plane).*")
@Requires(env = WorkerMode.CONTROL_PLANE)
@Named("checkConnectionActivities")
public List<Object> checkConnectionActivities(
final CheckConnectionActivity checkConnectionActivity) {
return List.of(checkConnectionActivity);
}

@Singleton
@Requires(property = "airbyte.worker.plane",
pattern = "(?i)^(?!data_plane).*")
@Requires(env = WorkerMode.CONTROL_PLANE)
@Named("connectionManagerActivities")
public List<Object> connectionManagerActivities(
final GenerateInputActivity generateInputActivity,
Expand All @@ -79,17 +77,15 @@ public List<Object> connectionManagerActivities(
}

@Singleton
@Requires(property = "airbyte.worker.plane",
pattern = "(?i)^(?!data_plane).*")
@Requires(env = WorkerMode.CONTROL_PLANE)
@Named("discoverActivities")
public List<Object> discoverActivities(
final DiscoverCatalogActivity discoverCatalogActivity) {
return List.of(discoverCatalogActivity);
}

@Singleton
@Requires(property = "airbyte.worker.plane",
pattern = "(?i)^(?!data_plane).*")
@Requires(env = WorkerMode.CONTROL_PLANE)
@Named("specActivities")
public List<Object> specActivities(
final SpecActivity specActivity) {
Expand Down Expand Up @@ -154,8 +150,7 @@ public ActivityOptions shortActivityOptions(@Property(name = "airbyte.activity.m
}

@Singleton
@Requires(property = "airbyte.worker.plane",
pattern = "(?i)^(?!data_plane).*")
@Requires(env = WorkerMode.CONTROL_PLANE)
@Named("specActivityOptions")
public ActivityOptions specActivityOptions() {
return ActivityOptions.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@
import com.auth0.jwt.algorithms.Algorithm;
import com.google.auth.oauth2.ServiceAccountCredentials;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.config.Configs.WorkerPlane;
import io.micronaut.context.BeanProvider;
import io.micronaut.context.annotation.Factory;
import io.micronaut.context.annotation.Prototype;
import io.micronaut.context.annotation.Value;
import io.micronaut.context.env.Environment;
import jakarta.inject.Named;
import jakarta.inject.Singleton;
import java.io.FileInputStream;
Expand Down Expand Up @@ -58,10 +58,10 @@ public AirbyteApiClient airbyteApiClient(

@Singleton
@Named("internalApiScheme")
public String internalApiScheme(final WorkerPlane workerPlane) {
public String internalApiScheme(final Environment environment) {
// control plane workers communicate with the Airbyte API within their internal network, so https
// isn't needed
return WorkerPlane.CONTROL_PLANE.equals(workerPlane) ? "http" : "https";
return environment.getActiveNames().contains(WorkerMode.CONTROL_PLANE) ? "http" : "https";
}

/**
Expand All @@ -81,12 +81,12 @@ public String internalApiAuthToken(
@Value("${airbyte.control.plane.auth-endpoint}") final String controlPlaneAuthEndpoint,
@Value("${airbyte.data.plane.service-account.email}") final String dataPlaneServiceAccountEmail,
@Value("${airbyte.data.plane.service-account.credentials-path}") final String dataPlaneServiceAccountCredentialsPath,
final WorkerPlane workerPlane) {
if (WorkerPlane.CONTROL_PLANE.equals(workerPlane)) {
final Environment environment) {
if (environment.getActiveNames().contains(WorkerMode.CONTROL_PLANE)) {
// control plane workers communicate with the Airbyte API within their internal network, so a signed
// JWT isn't needed
return airbyteApiAuthHeaderValue;
} else if (WorkerPlane.DATA_PLANE.equals(workerPlane)) {
} else if (environment.getActiveNames().contains(WorkerMode.DATA_PLANE)) {
try {
final Date now = new Date();
final Date expTime = new Date(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(JWT_TTL_MINUTES));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.airbyte.config.Configs.SecretPersistenceType;
import io.airbyte.config.Configs.TrackingStrategy;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.Configs.WorkerPlane;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.StatePersistence;
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
Expand Down Expand Up @@ -73,11 +72,6 @@ public WorkerEnvironment workerEnvironment(@Value("${airbyte.worker.env}") final
return convertToEnum(workerEnv, WorkerEnvironment::valueOf, WorkerEnvironment.DOCKER);
}

@Singleton
public WorkerPlane workerPlane(@Value("${airbyte.worker.plane}") final String workerPlane) {
return convertToEnum(workerPlane, WorkerPlane::valueOf, WorkerPlane.CONTROL_PLANE);
}

@Singleton
@Named("workspaceRoot")
public Path workspaceRoot(@Value("${airbyte.workspace.root}") final String workspaceRoot) {
Expand Down Expand Up @@ -106,8 +100,7 @@ public FeatureFlags featureFlags() {
}

@Singleton
@Requires(property = "airbyte.worker.plane",
pattern = "(?i)^(?!data_plane).*")
@Requires(env = WorkerMode.CONTROL_PLANE)
public JobNotifier jobNotifier(
final ConfigRepository configRepository,
final TrackingClient trackingClient,
Expand All @@ -121,8 +114,7 @@ public JobNotifier jobNotifier(
}

@Singleton
@Requires(property = "airbyte.worker.plane",
pattern = "(?i)^(?!data_plane).*")
@Requires(env = WorkerMode.CONTROL_PLANE)
public JobTracker jobTracker(
final ConfigRepository configRepository,
final JobPersistence jobPersistence,
Expand All @@ -131,8 +123,7 @@ public JobTracker jobTracker(
}

@Singleton
@Requires(property = "airbyte.worker.plane",
pattern = "(?i)^(?!data_plane).*")
@Requires(env = WorkerMode.CONTROL_PLANE)
public JsonSecretsProcessor jsonSecretsProcessor(final FeatureFlags featureFlags) {
return JsonSecretsProcessor.builder()
.maskSecrets(!featureFlags.exposeSecretsInExport())
Expand All @@ -141,15 +132,13 @@ public JsonSecretsProcessor jsonSecretsProcessor(final FeatureFlags featureFlags
}

@Singleton
@Requires(property = "airbyte.worker.plane",
pattern = "(?i)^(?!data_plane).*")
@Requires(env = WorkerMode.CONTROL_PLANE)
public WebUrlHelper webUrlHelper(@Value("${airbyte.web-app.url}") final String webAppUrl) {
return new WebUrlHelper(webAppUrl);
}

@Singleton
@Requires(property = "airbyte.worker.plane",
pattern = "(?i)^(?!data_plane).*")
@Requires(env = WorkerMode.CONTROL_PLANE)
public WorkspaceHelper workspaceHelper(
final ConfigRepository configRepository,
final JobPersistence jobPersistence) {
Expand Down
Loading

0 comments on commit 0ace816

Please sign in to comment.