diff --git a/airbyte-commons-worker/build.gradle.kts b/airbyte-commons-worker/build.gradle.kts index 534600bfb23..afaf4f90e53 100644 --- a/airbyte-commons-worker/build.gradle.kts +++ b/airbyte-commons-worker/build.gradle.kts @@ -12,6 +12,8 @@ dependencies { ksp(platform(libs.micronaut.platform)) ksp(libs.bundles.micronaut.annotation.processor) + api(libs.google.cloud.pubsub) + implementation(platform(libs.micronaut.platform)) implementation(libs.bundles.micronaut) implementation(libs.bundles.micronaut.metrics) diff --git a/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/config/StateCheckSumEventPubSubPublisherFactory.kt b/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/config/StateCheckSumEventPubSubPublisherFactory.kt new file mode 100644 index 00000000000..ee700a96ba4 --- /dev/null +++ b/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/config/StateCheckSumEventPubSubPublisherFactory.kt @@ -0,0 +1,40 @@ +package io.airbyte.workers.config + +import com.google.api.gax.batching.BatchingSettings +import com.google.cloud.pubsub.v1.Publisher +import com.google.pubsub.v1.TopicName +import io.micronaut.context.annotation.Factory +import io.micronaut.context.annotation.Requires +import io.micronaut.context.annotation.Value +import jakarta.inject.Named +import jakarta.inject.Singleton +import org.threeten.bp.Duration +import java.util.function.Supplier + +private const val MAX_BATCH_SIZE = 50L +private const val MAX_BYTES_THRESHOLD = 5000L +private const val MAX_DELAY_THRESHOLD = 100L + +@Factory +class StateCheckSumEventPubSubPublisherFactory { + @Singleton + @Requires(property = "airbyte.cloud.pubsub.enabled", value = "true", defaultValue = "false") + @Named("pubSubPublisherSupplier") + fun pubSubPublisherSupplier( + @Value("\${airbyte.cloud.pubsub.topic}") topicName: String, + @Value("\${airbyte.cloud.pubsub.request-bytes-threshold}") requestBytesThreshold: Int, + @Value("\${airbyte.cloud.pubsub.message-count-batch-size}") messageCountBatchSize: Int, + @Value("\${airbyte.cloud.pubsub.publish-delay-threshold-ms}") publishDelayThreshold: Int, + ): Supplier { + val batchingSettings = + BatchingSettings.newBuilder() + .setElementCountThreshold(Math.min(messageCountBatchSize.toLong(), MAX_BATCH_SIZE)) + .setRequestByteThreshold(Math.min(requestBytesThreshold.toLong(), MAX_BYTES_THRESHOLD)) + .setDelayThreshold(Duration.ofMillis(Math.min(publishDelayThreshold.toLong(), MAX_DELAY_THRESHOLD))) + .build() + + return Supplier { + Publisher.newBuilder(TopicName.parse(topicName)).setBatchingSettings(batchingSettings).build() + } + } +} diff --git a/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/general/StateCheckSumEventPubSubWriter.kt b/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/general/StateCheckSumEventPubSubWriter.kt new file mode 100644 index 00000000000..77e62bf19f3 --- /dev/null +++ b/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/general/StateCheckSumEventPubSubWriter.kt @@ -0,0 +1,58 @@ +package io.airbyte.workers.general + +import com.google.cloud.pubsub.v1.Publisher +import com.google.protobuf.ByteString +import com.google.pubsub.v1.PubsubMessage +import io.airbyte.commons.json.Jsons +import io.airbyte.workers.models.StateCheckSumCountEvent +import io.github.oshai.kotlinlogging.KotlinLogging +import io.micronaut.context.annotation.Requires +import jakarta.inject.Named +import jakarta.inject.Singleton +import java.util.concurrent.TimeUnit +import java.util.function.Supplier + +private val logger = KotlinLogging.logger { } + +@Singleton +@Requires(property = "airbyte.cloud.pubsub.enabled", value = "true", defaultValue = "false") +@Named("pubSubWriter") +class StateCheckSumEventPubSubWriter( + @Named("pubSubPublisherSupplier") private val publisherSupplier: Supplier, +) { + private val publisher: Publisher by lazy { publisherSupplier.get() } + + fun publishEvent(event: List) { + try { + event.forEach { stateCheckSumEvent -> Jsons.serialize(stateCheckSumEvent)?.let { writeMessageToPubSub(it) } } + } catch (e: Exception) { + logger.error(e) { "Swallowed exception during publishing pubsub message" } + } + } + + private fun writeMessageToPubSub(message: String) { + try { + val data = ByteString.copyFromUtf8(message) + val pubsubMessage: PubsubMessage = + PubsubMessage + .newBuilder() + .setData(data) + // Filter in subscription is set to -> attributes.event = "state_checksum_metrics" + .putAllAttributes(mapOf("event" to "state_checksum_metrics")) + .build() + publisher.publish(pubsubMessage) + } catch (e: Exception) { + logger.error(e) { "Swallowed exception during writing pubsub message" } + } + } + + fun close() { + try { + logger.info { "Closing StateCheckSumEventPubSubWriter" } + publisher.shutdown() + publisher.awaitTermination(1, TimeUnit.MINUTES) + } catch (e: Exception) { + logger.error(e) { "Swallowed exception during closing pubsub class" } + } + } +} diff --git a/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/models/StateCheckSumCountEvent.kt b/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/models/StateCheckSumCountEvent.kt new file mode 100644 index 00000000000..4ccb3922e3a --- /dev/null +++ b/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/models/StateCheckSumCountEvent.kt @@ -0,0 +1,26 @@ +package io.airbyte.workers.models + +import com.fasterxml.jackson.annotation.JsonCreator +import com.fasterxml.jackson.annotation.JsonProperty + +data class StateCheckSumCountEvent + @JsonCreator + constructor( + @JsonProperty("airbyte_version") val airbyteVersion: String?, + @JsonProperty("attempt_number") val attemptNumber: Long, + @JsonProperty("connection_id") val connectionId: String, + @JsonProperty("deployment_id") val deploymentId: String?, + @JsonProperty("deployment_mode") val deploymentMode: String?, + @JsonProperty("email") val email: String?, + @JsonProperty("id") val id: String, + @JsonProperty("job_id") val jobId: String, + @JsonProperty("record_count") val recordCount: Long, + @JsonProperty("state_hash") val stateHash: String, + @JsonProperty("state_id") val stateId: String, + @JsonProperty("state_origin") val stateOrigin: String, + @JsonProperty("state_type") val stateType: String, + @JsonProperty("stream_name") val streamName: String?, + @JsonProperty("stream_namespace") val streamNamespace: String?, + @JsonProperty("timestamp") val timestamp: Long, + @JsonProperty("valid_data") val validData: Boolean, + ) diff --git a/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/sync/OrchestratorConstants.kt b/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/sync/OrchestratorConstants.kt index a74b04fff06..c07033a8706 100644 --- a/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/sync/OrchestratorConstants.kt +++ b/airbyte-commons-worker/src/main/kotlin/io/airbyte/workers/sync/OrchestratorConstants.kt @@ -111,6 +111,8 @@ object OrchestratorConstants { EnvVar.MINIO_ENDPOINT, EnvVar.OTEL_COLLECTOR_ENDPOINT, EnvVar.PUBLISH_METRICS, + EnvVar.PUB_SUB_ENABLED, + EnvVar.PUB_SUB_TOPIC_NAME, EnvVar.ROOTLESS_WORKLOAD, EnvVar.SOCAT_KUBE_CPU_LIMIT, EnvVar.SOCAT_KUBE_CPU_REQUEST, diff --git a/airbyte-commons-worker/src/test/kotlin/io/airbyte/workers/general/StateCheckSumEventPubSubWriterTest.kt b/airbyte-commons-worker/src/test/kotlin/io/airbyte/workers/general/StateCheckSumEventPubSubWriterTest.kt new file mode 100644 index 00000000000..86069bbb337 --- /dev/null +++ b/airbyte-commons-worker/src/test/kotlin/io/airbyte/workers/general/StateCheckSumEventPubSubWriterTest.kt @@ -0,0 +1,111 @@ +package io.airbyte.workers.general + +import com.google.api.core.ApiFuture +import com.google.cloud.pubsub.v1.Publisher +import com.google.protobuf.ByteString +import com.google.pubsub.v1.PubsubMessage +import io.airbyte.commons.json.Jsons +import io.airbyte.workers.models.StateCheckSumCountEvent +import io.micronaut.context.annotation.Property +import io.micronaut.test.extensions.junit5.annotation.MicronautTest +import io.mockk.Runs +import io.mockk.clearAllMocks +import io.mockk.every +import io.mockk.just +import io.mockk.mockk +import io.mockk.verify +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import java.time.Instant +import java.util.Random +import java.util.UUID +import java.util.function.Supplier +import javax.inject.Inject + +@MicronautTest +@Property(name = "airbyte.cloud.pubsub.enabled", value = "true") +@Property(name = "airbyte.cloud.pubsub.topic", value = "dummy-topic") +@Property(name = "airbyte.cloud.pubsub.message-count-batch-size", value = "10") +@Property(name = "airbyte.cloud.pubsub.request-bytes-threshold", value = "10") +@Property(name = "airbyte.cloud.pubsub.message-count-batch-size", value = "10") +@Property(name = "airbyte.cloud.pubsub.publish-delay-threshold-ms", value = "10") +class StateCheckSumEventPubSubWriterTest { + private val publisherSupplier = mockk>(relaxed = true) + + private val publisher = mockk(relaxed = true) + + private lateinit var pubSubWriter: StateCheckSumEventPubSubWriter + + @Inject + lateinit var pubSubWriterMicronautBean: StateCheckSumEventPubSubWriter + + @BeforeEach + fun setUp() { + every { publisherSupplier.get() } returns publisher + pubSubWriter = StateCheckSumEventPubSubWriter(publisherSupplier) + } + + @AfterEach + fun tearDown() { + clearAllMocks() + } + + @Test + fun `bean creation should work`() { + pubSubWriterMicronautBean.publishEvent(listOf(getDummyCheckSumEvent())) + pubSubWriterMicronautBean.close() + } + + @Test + fun `events should be written to pubsub`() { + val event = getDummyCheckSumEvent() + val expectedMessage: PubsubMessage = + PubsubMessage + .newBuilder() + .setData(ByteString.copyFromUtf8(Jsons.serialize(event))) + // Filter in subscription is set to -> attributes.event = "state_checksum_metrics" + .putAllAttributes(mapOf("event" to "state_checksum_metrics")) + .build() + every { publisher.shutdown() } just Runs + every { publisher.awaitTermination(any(), any()) } returns true + every { publisher.publish(expectedMessage) } returns mockk>() + + pubSubWriter.publishEvent(listOf(event, event, event, event, event, event)) + pubSubWriter.close() + + verify(exactly = 6) { publisher.publish(expectedMessage) } + } + + @Test + fun `close should shutdown publisher`() { + every { publisher.shutdown() } just Runs + every { publisher.awaitTermination(any(), any()) } returns true + + pubSubWriter.close() + + verify(exactly = 1) { publisher.shutdown() } + } + + private fun getDummyCheckSumEvent(): StateCheckSumCountEvent { + return StateCheckSumCountEvent( + UUID.randomUUID().toString(), + Random().nextLong(), + UUID.randomUUID().toString(), + "LOCAL_MACHINE", + "LOCAL", + "nope@nope.com", + UUID.randomUUID().toString(), + Random().nextLong().toString(), + Random().nextLong(), + UUID.randomUUID().toString(), + Random().nextLong().toString(), + "SOURCE", + "STREAM", + "stream", + null, + Instant.now().toEpochMilli() * 1000L, + Random().nextBoolean(), + ) + } +} diff --git a/airbyte-commons/src/main/kotlin/io/airbyte/commons/envvar/EnvVar.kt b/airbyte-commons/src/main/kotlin/io/airbyte/commons/envvar/EnvVar.kt index acb43bd7aa5..6f174d42458 100644 --- a/airbyte-commons/src/main/kotlin/io/airbyte/commons/envvar/EnvVar.kt +++ b/airbyte-commons/src/main/kotlin/io/airbyte/commons/envvar/EnvVar.kt @@ -75,6 +75,8 @@ enum class EnvVar { PATH_TO_CONNECTORS, PUBLISH_METRICS, + PUB_SUB_ENABLED, + PUB_SUB_TOPIC_NAME, REMOTE_DATAPLANE_SERVICEACCOUNTS, ROOTLESS_WORKLOAD, diff --git a/airbyte-container-orchestrator/src/main/resources/application.yml b/airbyte-container-orchestrator/src/main/resources/application.yml index cbcfb050349..fd8af6b481b 100644 --- a/airbyte-container-orchestrator/src/main/resources/application.yml +++ b/airbyte-container-orchestrator/src/main/resources/application.yml @@ -42,6 +42,12 @@ airbyte: test: enabled: ${ACCEPTANCE_TEST_ENABLED:false} cloud: + pubsub: + enabled: ${PUB_SUB_ENABLED:false} + topic: ${PUB_SUB_TOPIC_NAME:} + request-bytes-threshold: ${PUB_SUB_REQUEST_BYTES_THRESHOLD:5000} + message-count-batch-size: ${PUB_SUB_MESSAGE_COUNT_BATCH_SIZE:50} + publish-delay-threshold-ms: ${PUB_SUB_PUBLISH_DELAY_THRESHOLD_MS:100} storage: type: ${STORAGE_TYPE} bucket: diff --git a/charts/airbyte-workload-launcher/templates/deployment.yaml b/charts/airbyte-workload-launcher/templates/deployment.yaml index 8ed4d8010fc..79ffb050322 100644 --- a/charts/airbyte-workload-launcher/templates/deployment.yaml +++ b/charts/airbyte-workload-launcher/templates/deployment.yaml @@ -70,6 +70,16 @@ spec: key: AIRBYTE_VERSION - name: DATA_PLANE_ID value: "local" + - name: PUB_SUB_ENABLED + valueFrom: + configMapKeyRef: + name: {{ .Release.Name }}-airbyte-env + key: PUB_SUB_ENABLED + - name: PUB_SUB_TOPIC_NAME + valueFrom: + configMapKeyRef: + name: {{ .Release.Name }}-airbyte-env + key: PUB_SUB_TOPIC_NAME - name: CONFIG_ROOT valueFrom: configMapKeyRef: diff --git a/charts/airbyte/templates/env-configmap.yaml b/charts/airbyte/templates/env-configmap.yaml index 51001017f50..cc9bcdddb3d 100644 --- a/charts/airbyte/templates/env-configmap.yaml +++ b/charts/airbyte/templates/env-configmap.yaml @@ -160,4 +160,6 @@ data: KUBERNETES_CLIENT_MAX_IDLE_CONNECTIONS: "" WORKLOAD_LAUNCHER_PARALLELISM: "10" CONNECTOR_BUILDER_SERVER_API_HOST: http://{{ .Release.Name }}-airbyte-connector-builder-server-svc:{{ index .Values "connector-builder-server" "service" "port" }} + PUB_SUB_ENABLED: "false" + PUB_SUB_TOPIC_NAME: "" {{- end }} diff --git a/charts/helm-tests/tests/basic_template_test.go b/charts/helm-tests/tests/basic_template_test.go index 37e7161601b..d19acb73224 100644 --- a/charts/helm-tests/tests/basic_template_test.go +++ b/charts/helm-tests/tests/basic_template_test.go @@ -80,6 +80,8 @@ var commonConfigMapKeys = toStringSet( "WORKLOAD_LAUNCHER_PARALLELISM", "WORKSPACE_DOCKER_MOUNT", "WORKSPACE_ROOT", + "PUB_SUB_ENABLED", + "PUB_SUB_TOPIC_NAME", ) var proEditionConfigMapKeys = toStringSet( diff --git a/deps.toml b/deps.toml index b8d505ff2dd..5507bdfe243 100644 --- a/deps.toml +++ b/deps.toml @@ -87,6 +87,7 @@ glassfish = { module = "org.glassfish.jersey:jackson-bom", version.ref = "glassf google-auth-library-oauth2-http = { module = "com.google.auth:google-auth-library-oauth2-http", version = "1.20.0" } google-cloud-storage = { module = "com.google.cloud:google-cloud-storage", version = "2.17.2" } google-cloud-storage-secretmanager = { module = "com.google.cloud:google-cloud-secretmanager", version = "2.0.5" } +google-cloud-pubsub = { module = "com.google.cloud:google-cloud-pubsub", version = "1.130.0" } google-cloud-sqladmin = { module = "com.google.apis:google-api-services-sqladmin", version = "v1-rev20240317-2.0.0" } google-cloud-api-client = { module = "com.google.api-client:google-api-client", version = "2.4.0" } grpc-inprocess = { module = "io.grpc:grpc-inprocess", version.ref = "grpc_version" }