Skip to content

Commit

Permalink
feat: make it possible to filter message by origin
Browse files Browse the repository at this point in the history
  • Loading branch information
octonato committed Feb 17, 2025
1 parent 6c531a7 commit 16457cb
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import akka.javasdk.CloudEvent;
import akka.javasdk.MetadataContext;
import akka.javasdk.Tracing;
import io.opentelemetry.api.trace.Tracer;

import java.util.Optional;

Expand All @@ -22,4 +21,21 @@ public interface MessageContext extends MetadataContext {

/** Access to tracing for custom app specific tracing. */
Tracing tracing();
/**
* Returns true if this message was originated on the same region that is being processed.
*
* <p>For messages coming from Akka entities:
*
* <ul>
* <li>If the message is an event from an Event Sourced Entity, it returns true if the event was
* originated in the region where this consumer is running. Otherwise, it returns false.
* <li>If the message is a change update from a KeyValueEntity, it returns true if the update
* was originated in the region where this consumer is running. Otherwise, it returns false.
* <li>For messages coming from any broker (e.g., Kafka or Google PubSub), this method always
* returns false.
* </ul>
*
* @return true if the message was originated in the same region, false otherwise.
*/
boolean hasLocalOrigin();
}
8 changes: 5 additions & 3 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ import akka.runtime.sdk.spi.ComponentClients
import akka.runtime.sdk.spi.ConsumerDescriptor
import akka.runtime.sdk.spi.EventSourcedEntityDescriptor
import akka.runtime.sdk.spi.HttpEndpointConstructionContext
import akka.runtime.sdk.spi.RegionInfo
import akka.runtime.sdk.spi.RemoteIdentification
import akka.runtime.sdk.spi.SpiComponents
import akka.runtime.sdk.spi.SpiDevModeSettings
Expand Down Expand Up @@ -166,12 +167,12 @@ class SdkRunner private (dependencyProvider: Option[DependencyProvider], disable
ApplicationConfig(startContext.system).overrideConfig(applicationConfig)
val app = new Sdk(
startContext.system,
startContext.sdkDispatcherName,
startContext.executionContext,
startContext.materializer,
startContext.componentClients,
startContext.remoteIdentification,
startContext.tracerFactory,
startContext.regionInfo,
dependencyProvider,
disabledComponents,
startedPromise,
Expand Down Expand Up @@ -297,12 +298,12 @@ private[javasdk] object Sdk {
@InternalApi
private final class Sdk(
system: ActorSystem[_],
sdkDispatcherName: String,
sdkExecutionContext: ExecutionContext,
sdkMaterializer: Materializer,
runtimeComponentClients: ComponentClients,
remoteIdentification: Option[RemoteIdentification],
tracerFactory: String => Tracer,
regionInfo: RegionInfo,
dependencyProviderOverride: Option[DependencyProvider],
disabledComponents: Set[Class[_]],
startedPromise: Promise[StartupContext],
Expand Down Expand Up @@ -551,7 +552,8 @@ private final class Sdk(
sdkTracerFactory,
serializer,
ComponentDescriptorFactory.findIgnore(consumerClass),
ComponentDescriptor.descriptorFor(consumerClass, serializer))
ComponentDescriptor.descriptorFor(consumerClass, serializer),
regionInfo)
consumerDescriptors :+=
new ConsumerDescriptor(
componentId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import akka.javasdk.impl.timer.TimerSchedulerImpl
import akka.javasdk.timer.TimerScheduler
import akka.runtime.sdk.spi.BytesPayload
import akka.runtime.sdk.spi.ConsumerDestination
import akka.runtime.sdk.spi.RegionInfo
import akka.runtime.sdk.spi.SpiConsumer
import akka.runtime.sdk.spi.SpiConsumer.Effect
import akka.runtime.sdk.spi.SpiConsumer.Message
Expand All @@ -58,7 +59,8 @@ private[impl] final class ConsumerImpl[C <: Consumer](
tracerFactory: () => Tracer,
serializer: JsonSerializer,
ignoreUnknown: Boolean,
componentDescriptor: ComponentDescriptor)
componentDescriptor: ComponentDescriptor,
regionInfo: RegionInfo)
extends SpiConsumer {

private val log: Logger = LoggerFactory.getLogger(consumerClass)
Expand All @@ -76,12 +78,21 @@ private[impl] final class ConsumerImpl[C <: Consumer](
// FIXME would be good if we could record the chosen method in the span
val span: Option[Span] =
traceInstrumentation.buildSpan(ComponentType.Consumer, componentId, metadata.subjectScala, message.metadata)

val updatedMetadata = span.map(metadata.withTracing).getOrElse(metadata)

span.foreach(s => MDC.put(Telemetry.TRACE_ID, s.getSpanContext.getTraceId))
val fut =
try {
val messageContext = new MessageContextImpl(updatedMetadata, timerClient, tracerFactory, span)
val messageContext =
new MessageContextImpl(
updatedMetadata,
timerClient,
tracerFactory,
span,
regionInfo.selfRegion,
message.originRegion)

val payload: BytesPayload = message.payload.getOrElse(throw new IllegalArgumentException("No message payload"))
val effect = createRouter()
.handleCommand(MessageEnvelope.of(payload, messageContext.metadata), messageContext)
Expand Down Expand Up @@ -153,7 +164,9 @@ private[impl] final class MessageContextImpl(
override val metadata: Metadata,
timerClient: TimerClient,
tracerFactory: () => Tracer,
val span: Option[Span])
val span: Option[Span],
selfRegion: String,
originRegion: Option[String])
extends AbstractContext
with MessageContext {

Expand All @@ -175,4 +188,12 @@ private[impl] final class MessageContextImpl(
}

override def tracing(): Tracing = new SpanTracingImpl(span, tracerFactory)

override def hasLocalOrigin: Boolean = {
// empty means we are consuming from a broker
// for non-empty origins, it needs to match selfRegion
// in dev-mode, both will be "" and therefore always considered as local
originRegion.contains(selfRegion)
}

}

0 comments on commit 16457cb

Please sign in to comment.