Skip to content

Commit

Permalink
feat: implement pub-sub writer (#12812)
Browse files Browse the repository at this point in the history
  • Loading branch information
subodh1810 committed Jun 18, 2024
1 parent d77f3f9 commit 7ad6444
Show file tree
Hide file tree
Showing 12 changed files with 262 additions and 0 deletions.
2 changes: 2 additions & 0 deletions airbyte-commons-worker/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ dependencies {
ksp(platform(libs.micronaut.platform))
ksp(libs.bundles.micronaut.annotation.processor)

api(libs.google.cloud.pubsub)

implementation(platform(libs.micronaut.platform))
implementation(libs.bundles.micronaut)
implementation(libs.bundles.micronaut.metrics)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.airbyte.workers.config

import com.google.api.gax.batching.BatchingSettings
import com.google.cloud.pubsub.v1.Publisher
import com.google.pubsub.v1.TopicName
import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Requires
import io.micronaut.context.annotation.Value
import jakarta.inject.Named
import jakarta.inject.Singleton
import org.threeten.bp.Duration
import java.util.function.Supplier

private const val MAX_BATCH_SIZE = 50L
private const val MAX_BYTES_THRESHOLD = 5000L
private const val MAX_DELAY_THRESHOLD = 100L

@Factory
class StateCheckSumEventPubSubPublisherFactory {
@Singleton
@Requires(property = "airbyte.cloud.pubsub.enabled", value = "true", defaultValue = "false")
@Named("pubSubPublisherSupplier")
fun pubSubPublisherSupplier(
@Value("\${airbyte.cloud.pubsub.topic}") topicName: String,
@Value("\${airbyte.cloud.pubsub.request-bytes-threshold}") requestBytesThreshold: Int,
@Value("\${airbyte.cloud.pubsub.message-count-batch-size}") messageCountBatchSize: Int,
@Value("\${airbyte.cloud.pubsub.publish-delay-threshold-ms}") publishDelayThreshold: Int,
): Supplier<Publisher> {
val batchingSettings =
BatchingSettings.newBuilder()
.setElementCountThreshold(Math.min(messageCountBatchSize.toLong(), MAX_BATCH_SIZE))
.setRequestByteThreshold(Math.min(requestBytesThreshold.toLong(), MAX_BYTES_THRESHOLD))
.setDelayThreshold(Duration.ofMillis(Math.min(publishDelayThreshold.toLong(), MAX_DELAY_THRESHOLD)))
.build()

return Supplier {
Publisher.newBuilder(TopicName.parse(topicName)).setBatchingSettings(batchingSettings).build()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.airbyte.workers.general

import com.google.cloud.pubsub.v1.Publisher
import com.google.protobuf.ByteString
import com.google.pubsub.v1.PubsubMessage
import io.airbyte.commons.json.Jsons
import io.airbyte.workers.models.StateCheckSumCountEvent
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Requires
import jakarta.inject.Named
import jakarta.inject.Singleton
import java.util.concurrent.TimeUnit
import java.util.function.Supplier

private val logger = KotlinLogging.logger { }

@Singleton
@Requires(property = "airbyte.cloud.pubsub.enabled", value = "true", defaultValue = "false")
@Named("pubSubWriter")
class StateCheckSumEventPubSubWriter(
@Named("pubSubPublisherSupplier") private val publisherSupplier: Supplier<Publisher>,
) {
private val publisher: Publisher by lazy { publisherSupplier.get() }

fun publishEvent(event: List<StateCheckSumCountEvent>) {
try {
event.forEach { stateCheckSumEvent -> Jsons.serialize(stateCheckSumEvent)?.let { writeMessageToPubSub(it) } }
} catch (e: Exception) {
logger.error(e) { "Swallowed exception during publishing pubsub message" }
}
}

private fun writeMessageToPubSub(message: String) {
try {
val data = ByteString.copyFromUtf8(message)
val pubsubMessage: PubsubMessage =
PubsubMessage
.newBuilder()
.setData(data)
// Filter in subscription is set to -> attributes.event = "state_checksum_metrics"
.putAllAttributes(mapOf("event" to "state_checksum_metrics"))
.build()
publisher.publish(pubsubMessage)
} catch (e: Exception) {
logger.error(e) { "Swallowed exception during writing pubsub message" }
}
}

fun close() {
try {
logger.info { "Closing StateCheckSumEventPubSubWriter" }
publisher.shutdown()
publisher.awaitTermination(1, TimeUnit.MINUTES)
} catch (e: Exception) {
logger.error(e) { "Swallowed exception during closing pubsub class" }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.airbyte.workers.models

import com.fasterxml.jackson.annotation.JsonCreator
import com.fasterxml.jackson.annotation.JsonProperty

data class StateCheckSumCountEvent
@JsonCreator
constructor(
@JsonProperty("airbyte_version") val airbyteVersion: String?,
@JsonProperty("attempt_number") val attemptNumber: Long,
@JsonProperty("connection_id") val connectionId: String,
@JsonProperty("deployment_id") val deploymentId: String?,
@JsonProperty("deployment_mode") val deploymentMode: String?,
@JsonProperty("email") val email: String?,
@JsonProperty("id") val id: String,
@JsonProperty("job_id") val jobId: String,
@JsonProperty("record_count") val recordCount: Long,
@JsonProperty("state_hash") val stateHash: String,
@JsonProperty("state_id") val stateId: String,
@JsonProperty("state_origin") val stateOrigin: String,
@JsonProperty("state_type") val stateType: String,
@JsonProperty("stream_name") val streamName: String?,
@JsonProperty("stream_namespace") val streamNamespace: String?,
@JsonProperty("timestamp") val timestamp: Long,
@JsonProperty("valid_data") val validData: Boolean,
)
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ object OrchestratorConstants {
EnvVar.MINIO_ENDPOINT,
EnvVar.OTEL_COLLECTOR_ENDPOINT,
EnvVar.PUBLISH_METRICS,
EnvVar.PUB_SUB_ENABLED,
EnvVar.PUB_SUB_TOPIC_NAME,
EnvVar.ROOTLESS_WORKLOAD,
EnvVar.SOCAT_KUBE_CPU_LIMIT,
EnvVar.SOCAT_KUBE_CPU_REQUEST,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package io.airbyte.workers.general

import com.google.api.core.ApiFuture
import com.google.cloud.pubsub.v1.Publisher
import com.google.protobuf.ByteString
import com.google.pubsub.v1.PubsubMessage
import io.airbyte.commons.json.Jsons
import io.airbyte.workers.models.StateCheckSumCountEvent
import io.micronaut.context.annotation.Property
import io.micronaut.test.extensions.junit5.annotation.MicronautTest
import io.mockk.Runs
import io.mockk.clearAllMocks
import io.mockk.every
import io.mockk.just
import io.mockk.mockk
import io.mockk.verify
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import java.time.Instant
import java.util.Random
import java.util.UUID
import java.util.function.Supplier
import javax.inject.Inject

@MicronautTest
@Property(name = "airbyte.cloud.pubsub.enabled", value = "true")
@Property(name = "airbyte.cloud.pubsub.topic", value = "dummy-topic")
@Property(name = "airbyte.cloud.pubsub.message-count-batch-size", value = "10")
@Property(name = "airbyte.cloud.pubsub.request-bytes-threshold", value = "10")
@Property(name = "airbyte.cloud.pubsub.message-count-batch-size", value = "10")
@Property(name = "airbyte.cloud.pubsub.publish-delay-threshold-ms", value = "10")
class StateCheckSumEventPubSubWriterTest {
private val publisherSupplier = mockk<Supplier<Publisher>>(relaxed = true)

private val publisher = mockk<Publisher>(relaxed = true)

private lateinit var pubSubWriter: StateCheckSumEventPubSubWriter

@Inject
lateinit var pubSubWriterMicronautBean: StateCheckSumEventPubSubWriter

@BeforeEach
fun setUp() {
every { publisherSupplier.get() } returns publisher
pubSubWriter = StateCheckSumEventPubSubWriter(publisherSupplier)
}

@AfterEach
fun tearDown() {
clearAllMocks()
}

@Test
fun `bean creation should work`() {
pubSubWriterMicronautBean.publishEvent(listOf(getDummyCheckSumEvent()))
pubSubWriterMicronautBean.close()
}

@Test
fun `events should be written to pubsub`() {
val event = getDummyCheckSumEvent()
val expectedMessage: PubsubMessage =
PubsubMessage
.newBuilder()
.setData(ByteString.copyFromUtf8(Jsons.serialize(event)))
// Filter in subscription is set to -> attributes.event = "state_checksum_metrics"
.putAllAttributes(mapOf("event" to "state_checksum_metrics"))
.build()
every { publisher.shutdown() } just Runs
every { publisher.awaitTermination(any(), any()) } returns true
every { publisher.publish(expectedMessage) } returns mockk<ApiFuture<String>>()

pubSubWriter.publishEvent(listOf(event, event, event, event, event, event))
pubSubWriter.close()

verify(exactly = 6) { publisher.publish(expectedMessage) }
}

@Test
fun `close should shutdown publisher`() {
every { publisher.shutdown() } just Runs
every { publisher.awaitTermination(any(), any()) } returns true

pubSubWriter.close()

verify(exactly = 1) { publisher.shutdown() }
}

private fun getDummyCheckSumEvent(): StateCheckSumCountEvent {
return StateCheckSumCountEvent(
UUID.randomUUID().toString(),
Random().nextLong(),
UUID.randomUUID().toString(),
"LOCAL_MACHINE",
"LOCAL",
"[email protected]",
UUID.randomUUID().toString(),
Random().nextLong().toString(),
Random().nextLong(),
UUID.randomUUID().toString(),
Random().nextLong().toString(),
"SOURCE",
"STREAM",
"stream",
null,
Instant.now().toEpochMilli() * 1000L,
Random().nextBoolean(),
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ enum class EnvVar {
PATH_TO_CONNECTORS,

PUBLISH_METRICS,
PUB_SUB_ENABLED,
PUB_SUB_TOPIC_NAME,

REMOTE_DATAPLANE_SERVICEACCOUNTS,
ROOTLESS_WORKLOAD,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ airbyte:
test:
enabled: ${ACCEPTANCE_TEST_ENABLED:false}
cloud:
pubsub:
enabled: ${PUB_SUB_ENABLED:false}
topic: ${PUB_SUB_TOPIC_NAME:}
request-bytes-threshold: ${PUB_SUB_REQUEST_BYTES_THRESHOLD:5000}
message-count-batch-size: ${PUB_SUB_MESSAGE_COUNT_BATCH_SIZE:50}
publish-delay-threshold-ms: ${PUB_SUB_PUBLISH_DELAY_THRESHOLD_MS:100}
storage:
type: ${STORAGE_TYPE}
bucket:
Expand Down
10 changes: 10 additions & 0 deletions charts/airbyte-workload-launcher/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ spec:
key: AIRBYTE_VERSION
- name: DATA_PLANE_ID
value: "local"
- name: PUB_SUB_ENABLED
valueFrom:
configMapKeyRef:
name: {{ .Release.Name }}-airbyte-env
key: PUB_SUB_ENABLED
- name: PUB_SUB_TOPIC_NAME
valueFrom:
configMapKeyRef:
name: {{ .Release.Name }}-airbyte-env
key: PUB_SUB_TOPIC_NAME
- name: CONFIG_ROOT
valueFrom:
configMapKeyRef:
Expand Down
2 changes: 2 additions & 0 deletions charts/airbyte/templates/env-configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,6 @@ data:
KUBERNETES_CLIENT_MAX_IDLE_CONNECTIONS: ""
WORKLOAD_LAUNCHER_PARALLELISM: "10"
CONNECTOR_BUILDER_SERVER_API_HOST: http://{{ .Release.Name }}-airbyte-connector-builder-server-svc:{{ index .Values "connector-builder-server" "service" "port" }}
PUB_SUB_ENABLED: "false"
PUB_SUB_TOPIC_NAME: ""
{{- end }}
2 changes: 2 additions & 0 deletions charts/helm-tests/tests/basic_template_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ var commonConfigMapKeys = toStringSet(
"WORKLOAD_LAUNCHER_PARALLELISM",
"WORKSPACE_DOCKER_MOUNT",
"WORKSPACE_ROOT",
"PUB_SUB_ENABLED",
"PUB_SUB_TOPIC_NAME",
)

var proEditionConfigMapKeys = toStringSet(
Expand Down
1 change: 1 addition & 0 deletions deps.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ glassfish = { module = "org.glassfish.jersey:jackson-bom", version.ref = "glassf
google-auth-library-oauth2-http = { module = "com.google.auth:google-auth-library-oauth2-http", version = "1.20.0" }
google-cloud-storage = { module = "com.google.cloud:google-cloud-storage", version = "2.17.2" }
google-cloud-storage-secretmanager = { module = "com.google.cloud:google-cloud-secretmanager", version = "2.0.5" }
google-cloud-pubsub = { module = "com.google.cloud:google-cloud-pubsub", version = "1.130.0" }
google-cloud-sqladmin = { module = "com.google.apis:google-api-services-sqladmin", version = "v1-rev20240317-2.0.0" }
google-cloud-api-client = { module = "com.google.api-client:google-api-client", version = "2.4.0" }
grpc-inprocess = { module = "io.grpc:grpc-inprocess", version.ref = "grpc_version" }
Expand Down

0 comments on commit 7ad6444

Please sign in to comment.