Skip to content

Commit

Permalink
Enable the use of temporal as a generic queue (#9571)
Browse files Browse the repository at this point in the history
  • Loading branch information
gosusnp committed Nov 1, 2023
1 parent 2ec6399 commit d51b9b3
Show file tree
Hide file tree
Showing 18 changed files with 260 additions and 13 deletions.
1 change: 1 addition & 0 deletions airbyte-commons-temporal-core/build.gradle
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
plugins {
id "io.airbyte.gradle.jvm.lib"
id "io.airbyte.gradle.publish"
id "org.jetbrains.kotlin.jvm"
}

dependencies {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.micronaut.temporal.annotations;
package io.airbyte.commons.temporal.annotations;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package io.airbyte.commons.temporal.queue

import com.fasterxml.jackson.annotation.JsonTypeInfo
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import com.google.common.annotations.VisibleForTesting
import io.airbyte.commons.temporal.annotations.TemporalActivityStub
import io.temporal.activity.ActivityInterface
import io.temporal.activity.ActivityMethod
import io.temporal.workflow.WorkflowInterface
import io.temporal.workflow.WorkflowMethod

/**
* Message abstraction.
*
* We wrap the actual message to be able to pass metadata around.
*/
@JsonDeserialize(builder = Message.Builder::class)
class Message<T : Any> constructor(data: T) {
// TODO this should be a data class, however, need to make the JsonTypeInfo annotation work

// This enables passing T around
@JsonTypeInfo(include = JsonTypeInfo.As.WRAPPER_ARRAY, use = JsonTypeInfo.Id.CLASS, property = "@bodyClass")
val data: T

init {
this.data = data
}

/**
* Builder for messages.
*
* A builder is needed in order to have data as non-nullable because the deserializer requires a no-arg constructor.
*/
class Builder<T : Any>
@JvmOverloads
constructor(data: T? = null) {
@JsonTypeInfo(include = JsonTypeInfo.As.WRAPPER_ARRAY, use = JsonTypeInfo.Id.CLASS, property = "@bodyClass")
var data: T?

init {
this.data = data
}

fun data(data: T) = apply { this.data = data }

fun build() = Message(data = data!!)
}

override fun equals(other: Any?): Boolean {
if (this === other) return true
if (javaClass != other?.javaClass) return false

other as Message<*>

if (data != other.data) return false

return true
}

override fun hashCode(): Int {
return data.hashCode()
}
}

/**
* Generic queue activity.
*/
@ActivityInterface
interface QueueActivity<T : Any> {
@ActivityMethod
fun consume(message: Message<T>)
}

/**
* Generic temporal workflow interface for a message queue.
*/
@WorkflowInterface
interface QueueWorkflow<T : Any> {
/**
* Submits a message to the queue.
*/
@WorkflowMethod
fun publish(message: Message<T>)
}

/**
* Generic temporal queue activity implementation.
*/
class QueueActivityImpl<T : Any>(private val messageConsumer: MessageConsumer<T>) : QueueActivity<T> {
override fun consume(message: Message<T>) {
messageConsumer.consume(message.data)
}
}

/**
* Generic temporal queue workflow implementation.
*
* This is open to simplify initialization when starting a workflow because temporal requires a type reference.
*/
open class QueueWorkflowImpl<T : Any> : QueueWorkflow<T> {
/**
* The consumer activity.
*
* This is lateinit because the TemporalActivityStub will initialize the value post creation using activities
* from the dependency injection container.
*/
@VisibleForTesting
@TemporalActivityStub(activityOptionsBeanName = "queueActivityOptions")
protected lateinit var activity: QueueActivity<T>

override fun publish(message: Message<T>) {
activity.consume(message)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.airbyte.commons.temporal.queue

/**
* MessageConsumer interface for a temporal queue.
*/
interface MessageConsumer<T : Any> {
fun consume(input: T)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.airbyte.commons.temporal.queue

import io.airbyte.commons.temporal.WorkflowClientWrapped
import io.temporal.client.WorkflowOptions

/**
* Generic message producer for a temporal based queue.
*/
class TemporalMessageProducer<T : Any>(private val workflowClientWrapped: WorkflowClientWrapped) {
/**
* Publish a message to the subject.
*/
fun publish(
subject: String,
message: T,
) {
doPublish<QueueWorkflow<T>>(subject, message)
}

// This is a workaround to get a class with a generic.
// Temporal newWorkflowStub call requires a Class<T>, reified enables this.
// This is added as a private fun because of the visibility constraint from inline on member access.
private inline fun <reified W : QueueWorkflow<T>> doPublish(
subject: String,
message: T,
) {
val workflowOptions = WorkflowOptions.newBuilder().setTaskQueue(subject).build()
val workflow = workflowClientWrapped.newWorkflowStub(W::class.java, workflowOptions)
workflow.publish(Message(message))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package io.airbyte.commons.temporal.queue

import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import io.airbyte.commons.temporal.WorkflowClientWrapped
import io.airbyte.metrics.lib.MetricClient
import io.temporal.activity.ActivityOptions
import io.temporal.client.WorkflowClient
import io.temporal.testing.TestWorkflowEnvironment
import io.temporal.worker.Worker
import io.temporal.workflow.Workflow
import org.junit.jupiter.api.AfterAll
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Test
import org.mockito.Mockito.mock
import org.mockito.Mockito.spy
import org.mockito.Mockito.verify
import java.time.Duration

// Payload for the Queue
@JsonDeserialize(builder = TestQueueInput.Builder::class)
data class TestQueueInput(val input: String) {
// Using a builder here to prove that we can use a payload with non-nullable fields.
data class Builder(var input: String? = null) {
fun input(input: String) = apply { this.input = input }

fun build() = TestQueueInput(input = input!!)
}
}

// The actual consumer
class TestConsumer : MessageConsumer<TestQueueInput> {
override fun consume(input: TestQueueInput) {
println(input)
}
}

// Test implementation, this is required to map the activities and for registering an implementation with temporal.
class TestWorkflowImpl : QueueWorkflowImpl<TestQueueInput>() {
init {
initializeActivity<QueueActivity<TestQueueInput>>()
}

// reified is a trick to be able to retrieve the type reference on a generic class to pass it to temporal.
private inline fun <reified W : QueueActivity<TestQueueInput>> initializeActivity() {
// Initializing the activity the official temporal way to be able to run the temporal standard tests.
// For an actual workflow, we'd want to inject the activities so this init wouldn't be needed.
this.activity =
Workflow.newActivityStub(
W::class.java,
ActivityOptions.newBuilder().setStartToCloseTimeout(Duration.ofSeconds(10)).build(),
)
}
}

class BasicQueueTest {
companion object {
val QUEUE_NAME = "testQueue"

lateinit var activity: QueueActivityImpl<TestQueueInput>

lateinit var testEnv: TestWorkflowEnvironment
lateinit var worker: Worker
lateinit var client: WorkflowClient

@JvmStatic
@BeforeAll
fun setUp() {
testEnv = TestWorkflowEnvironment.newInstance()
worker = testEnv.newWorker(QUEUE_NAME)
worker.registerWorkflowImplementationTypes(TestWorkflowImpl::class.java)
client = testEnv.workflowClient

activity = spy(QueueActivityImpl(TestConsumer()))
worker.registerActivitiesImplementations(activity)
testEnv.start()
}

@JvmStatic
@AfterAll
fun tearDown() {
testEnv.close()
}
}

@Test
fun testRoundTrip() {
val producer = TemporalMessageProducer<TestQueueInput>(WorkflowClientWrapped(client, mock(MetricClient::class.java)))
val message = TestQueueInput("boom!")
producer.publish(QUEUE_NAME, message)

verify(activity).consume(Message(message))
}
}
2 changes: 1 addition & 1 deletion airbyte-micronaut-temporal/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ dependencies {
exclude module: 'guava'
}

testImplementation project(':airbyte-commons-temporal-core')
implementation project(':airbyte-commons-temporal-core')

testImplementation libs.assertj.core
testImplementation libs.bundles.junit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package io.airbyte.micronaut.temporal;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.micronaut.temporal.annotations.TemporalActivityStub;
import io.airbyte.commons.temporal.annotations.TemporalActivityStub;
import io.micronaut.context.BeanRegistration;
import io.temporal.activity.ActivityOptions;
import io.temporal.workflow.Workflow;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

package io.airbyte.micronaut.temporal.stubs;

import io.airbyte.commons.temporal.annotations.TemporalActivityStub;
import io.airbyte.commons.temporal.exception.RetryableException;
import io.airbyte.micronaut.temporal.annotations.TemporalActivityStub;

@SuppressWarnings("PMD.UnusedPrivateField")
public class InvalidTestWorkflowImpl implements TestWorkflow {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

package io.airbyte.micronaut.temporal.stubs;

import io.airbyte.micronaut.temporal.annotations.TemporalActivityStub;
import io.airbyte.commons.temporal.annotations.TemporalActivityStub;

public class ValidTestWorkflowImpl implements TestWorkflow {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.commons.temporal.annotations.TemporalActivityStub;
import io.airbyte.commons.temporal.scheduling.CheckConnectionWorkflow;
import io.airbyte.config.ActorType;
import io.airbyte.config.ConnectorJobOutput;
Expand All @@ -18,7 +19,6 @@
import io.airbyte.config.StandardCheckConnectionInput;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.micronaut.temporal.annotations.TemporalActivityStub;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.workers.temporal.check.connection.CheckConnectionActivity.CheckConnectionInput;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.commons.temporal.annotations.TemporalActivityStub;
import io.airbyte.commons.temporal.scheduling.DiscoverCatalogWorkflow;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.config.StandardDiscoverCatalogInput;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.micronaut.temporal.annotations.TemporalActivityStub;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import datadog.trace.api.Trace;
import io.airbyte.commons.constants.WorkerConstants;
import io.airbyte.commons.temporal.TemporalWorkflowUtils;
import io.airbyte.commons.temporal.annotations.TemporalActivityStub;
import io.airbyte.commons.temporal.exception.RetryableException;
import io.airbyte.commons.temporal.scheduling.CheckConnectionWorkflow;
import io.airbyte.commons.temporal.scheduling.ConnectionManagerWorkflow;
Expand All @@ -35,7 +36,6 @@
import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricTags;
import io.airbyte.metrics.lib.OssMetricsRegistry;
import io.airbyte.micronaut.temporal.annotations.TemporalActivityStub;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.workers.helper.FailureHelper;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

package io.airbyte.workers.temporal.scheduling;

import io.airbyte.commons.temporal.annotations.TemporalActivityStub;
import io.airbyte.commons.temporal.scheduling.NotificationWorkflow;
import io.airbyte.micronaut.temporal.annotations.TemporalActivityStub;
import io.airbyte.notification.NotificationEvent;
import io.airbyte.workers.temporal.scheduling.activities.NotifyActivity;
import java.util.UUID;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKFLOW_TRACE_OPERATION_NAME;

import datadog.trace.api.Trace;
import io.airbyte.commons.temporal.annotations.TemporalActivityStub;
import io.airbyte.commons.temporal.scheduling.SpecWorkflow;
import io.airbyte.config.ConnectorJobOutput;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.micronaut.temporal.annotations.TemporalActivityStub;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
import java.util.Map;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import datadog.trace.api.Trace;
import io.airbyte.api.client.model.generated.ConnectionStatus;
import io.airbyte.commons.temporal.annotations.TemporalActivityStub;
import io.airbyte.commons.temporal.scheduling.SyncWorkflow;
import io.airbyte.config.NormalizationInput;
import io.airbyte.config.NormalizationSummary;
Expand All @@ -31,7 +32,6 @@
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.metrics.lib.MetricTags;
import io.airbyte.metrics.lib.OssMetricsRegistry;
import io.airbyte.micronaut.temporal.annotations.TemporalActivityStub;
import io.airbyte.persistence.job.models.IntegrationLauncherConfig;
import io.airbyte.persistence.job.models.JobRunConfig;
import io.airbyte.workers.models.RefreshSchemaActivityInput;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@

package io.airbyte.workers.temporal.stubs;

import io.airbyte.commons.temporal.annotations.TemporalActivityStub;
import io.airbyte.commons.temporal.exception.RetryableException;
import io.airbyte.micronaut.temporal.annotations.TemporalActivityStub;

@SuppressWarnings("PMD.UnusedPrivateField")
public class InvalidTestWorkflowImpl implements TestWorkflow {
Expand Down
Loading

0 comments on commit d51b9b3

Please sign in to comment.