Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make it possible to filter message by origin #223

Merged
merged 4 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
package akka.javasdk.testkit;

import akka.javasdk.Metadata;
import akka.javasdk.timedaction.TimedAction;
import akka.javasdk.testkit.impl.TimedActionResultImpl;
import akka.javasdk.testkit.impl.TestKitCommandContextTimed;
import akka.javasdk.testkit.impl.TimedActionResultImpl;
import akka.javasdk.timedaction.TimedAction;

import java.util.Optional;
import java.util.function.Function;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,5 @@ class AbstractTestKitContext(mockRegistry: MockRegistry) extends Context with In
.getOrElse(throw new NoSuchElementException(
s"Could not find mock for component of type $serviceClass. Hint: use ${classOf[MockRegistry].getName} to provide an instance when testing services calling other components."))

override def selfRegion(): String = ""
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ final class TestKitEventSourcedEntityCommandContext(
override val commandName: String = "stubCommandName",
override val sequenceNumber: Long = 0L,
override val isDeleted: Boolean = false,
override val selfRegion: String = "",
override val metadata: Metadata = Metadata.EMPTY)
extends CommandContext
with InternalContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,5 @@ import akka.javasdk.eventsourcedentity.EventContext
final class TestKitEventSourcedEntityEventContext extends EventContext {
override def entityId = "testkit-entity-id"
override def sequenceNumber = 0L
override def selfRegion: String = ""
}
6 changes: 5 additions & 1 deletion akka-javasdk/src/main/java/akka/javasdk/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,8 @@
package akka.javasdk;

/** Root class of all contexts. */
public interface Context {}
public interface Context {

/** Returns the region where this instance is running. */
String selfRegion();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Context is ubiquitous. All contexts inherit from it. By adding the selfRegion here we make it possible to always check 'where' we are.

This includes component creation contexts, command contexts, http request context and consumer messages (both consumers and views).

}
54 changes: 54 additions & 0 deletions akka-javasdk/src/main/java/akka/javasdk/OriginAwareContext.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright (C) 2021-2024 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.javasdk;

import akka.javasdk.annotations.Consume;
import akka.javasdk.eventsourcedentity.EventSourcedEntity;
import akka.javasdk.keyvalueentity.KeyValueEntity;

import java.util.Optional;

public interface OriginAwareContext extends Context {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MessageContext (Consumer) and UpdateContext (Views) now extends OriginAwareContext which gives access to the originRegion and method hasLocalOrigin.



/**
* When available, this method returns the region where a message was first created.
*
* <ul>
* <li>It returns a non-empty Optional when consuming events from an {@link EventSourcedEntity}
* or a change updates from a {@link KeyValueEntity}</li>
* <li>It returns an empty Optional when consuming messages from a topic (see {@link Consume.FromTopic})
* or from another service (see {@link Consume.FromServiceStream})</li>
* </ul>
*
* @return the region where a message was first created. When not applicable, it returns an empty Optional.
*/
Optional<String> originRegion();
Copy link
Contributor

@aludwiko aludwiko Feb 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mentioned this on the runtime side, but maybe I should do it here. In my opinion, it feels more natural to get the originRegion information from the metadata. We can add a helper method extract it, similar to subject, time, etc., I guess there was a reason to make it this way, what was that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hasLocalOrigin is nice, but if that is the only reason, I think users can create such a method themselves if they need it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the implementation with meta will also be much simpler because we are passing this information through all the layers from the runtime to the sdk.

Copy link
Member Author

@octonato octonato Feb 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I didn't add it to the metadata is because we used to also have JWT and ACL there and we stepped away from it. JWT and ACL is now only present on the endpoints request, which makes more sense.

Also, metadata is available in places where it doesn't make sense to have the originRegion, for example in commands. We should have it only when consuming messages.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About hasLocalOrigin, the method impl is quite simple and indeed anyone can implement this logic, but there is some hidden knowledge in there that otherwise we will need to document.

  1. empty means we are consuming from a broker or service-to-service, so not local
    (we will have to document this)
  2. for non-empty origins, it needs to match selfRegion, so it depends
    (this is easy and kind of obvious)
  3. in dev-mode, both will be "" and therefore local by definition
    (this is also hidden logic that this method is encapsulating)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, metadata is available in places where it doesn't make sense to have the originRegion, for example in commands. We should have it only when consuming messages.

Good point, but that implies that we have a leaking abstraction because I can run metadata.asCloudEvent in my command handler, which doesn't make any sense. Maybe we should have a separate CommandMetadata, Message/EventMetadata ?


/**
* Returns {@code true} if this message originated in the same region where it is currently being processed.
* A message is considered to have originated in a region if it was created in that region.
* In all other regions, the same message is treated as a replication.
*
* <p>For messages coming from Akka entities:
*
* <ul>
* <li>If the message is an event from an {@link EventSourcedEntity} or a change update from a
* {@link KeyValueEntity}, it returns {@code true} if it was originated in the region where this consumer is
* running. Otherwise, it returns {@code false}.
* <li>This method will always return {@code false} when consuming messages from another service
* (see {@link Consume.FromServiceStream}) or from a topic (see {@link Consume.FromTopic}).
* </ul>
*
* @return {@code true} if the message originated in the current processing region, {@code false} otherwise
*/
default boolean hasLocalOrigin() {
// empty means we are consuming from a broker or service-to-service
// for non-empty origins, it needs to match selfRegion
// in dev-mode, both will be "" and therefore always considered as local
return originRegion().stream().allMatch(orig -> orig.equals(selfRegion()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@

import akka.javasdk.CloudEvent;
import akka.javasdk.MetadataContext;
import akka.javasdk.OriginAwareContext;
import akka.javasdk.Tracing;
import io.opentelemetry.api.trace.Tracer;

import java.util.Optional;

/** Context for an incoming message. */
public interface MessageContext extends MetadataContext {
public interface MessageContext extends MetadataContext, OriginAwareContext {

/**
* The origin subject of the {@link CloudEvent}. For example, the entity id when the event was
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@

import akka.javasdk.CloudEvent;
import akka.javasdk.MetadataContext;
import akka.javasdk.OriginAwareContext;

import java.util.Optional;

/** Context for view update calls. */
public interface UpdateContext extends MetadataContext {
public interface UpdateContext extends MetadataContext, OriginAwareContext {

/**
* The origin subject of the {@link CloudEvent}. For example, the entity id when the event was
Expand Down
18 changes: 14 additions & 4 deletions akka-javasdk/src/main/scala/akka/javasdk/impl/SdkRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ import akka.runtime.sdk.spi.ConsumerDescriptor
import akka.runtime.sdk.spi.EventSourcedEntityDescriptor
import akka.runtime.sdk.spi.GrpcEndpointRequestConstructionContext
import akka.runtime.sdk.spi.HttpEndpointConstructionContext
import akka.runtime.sdk.spi.RegionInfo
import akka.runtime.sdk.spi.RemoteIdentification
import akka.runtime.sdk.spi.SpiComponents
import akka.runtime.sdk.spi.SpiDevModeSettings
Expand Down Expand Up @@ -172,12 +173,12 @@ class SdkRunner private (dependencyProvider: Option[DependencyProvider], disable
ApplicationConfig(startContext.system).overrideConfig(applicationConfig)
val app = new Sdk(
startContext.system,
startContext.sdkDispatcherName,
startContext.executionContext,
startContext.materializer,
startContext.componentClients,
startContext.remoteIdentification,
startContext.tracerFactory,
startContext.regionInfo,
dependencyProvider,
disabledComponents,
startedPromise,
Expand Down Expand Up @@ -318,12 +319,12 @@ private[javasdk] object Sdk {
@InternalApi
private final class Sdk(
system: ActorSystem[_],
sdkDispatcherName: String,
sdkExecutionContext: ExecutionContext,
sdkMaterializer: Materializer,
runtimeComponentClients: ComponentClients,
remoteIdentification: Option[RemoteIdentification],
tracerFactory: String => Tracer,
regionInfo: RegionInfo,
dependencyProviderOverride: Option[DependencyProvider],
disabledComponents: Set[Class[_]],
startedPromise: Promise[StartupContext],
Expand Down Expand Up @@ -411,6 +412,7 @@ private final class Sdk(
timerClient = runtimeComponentClients.timerClient,
sdkExecutionContext,
sdkTracerFactory,
regionInfo,
{ context =>

val workflow = wiredInstance(clz) {
Expand Down Expand Up @@ -486,6 +488,7 @@ private final class Sdk(
serializer,
ComponentDescriptor.descriptorFor(clz, serializer),
entityStateType,
regionInfo,
context =>
wiredInstance(clz.asInstanceOf[Class[EventSourcedEntity[AnyRef, AnyRef]]]) {
// remember to update component type API doc and docs if changing the set of injectables
Expand Down Expand Up @@ -522,6 +525,7 @@ private final class Sdk(
serializer,
ComponentDescriptor.descriptorFor(clz, serializer),
entityStateType,
regionInfo,
context =>
wiredInstance(clz.asInstanceOf[Class[KeyValueEntity[AnyRef]]]) {
// remember to update component type API doc and docs if changing the set of injectables
Expand Down Expand Up @@ -567,6 +571,7 @@ private final class Sdk(
sdkExecutionContext,
sdkTracerFactory,
serializer,
regionInfo,
ComponentDescriptor.descriptorFor(timedActionClass, serializer))
timedActionDescriptors :+=
new TimedActionDescriptor(componentId, clz.getName, timedActionSpi)
Expand All @@ -587,7 +592,8 @@ private final class Sdk(
sdkTracerFactory,
serializer,
ComponentDescriptorFactory.findIgnore(consumerClass),
ComponentDescriptor.descriptorFor(consumerClass, serializer))
ComponentDescriptor.descriptorFor(consumerClass, serializer),
regionInfo)
consumerDescriptors :+=
new ConsumerDescriptor(
componentId,
Expand All @@ -597,7 +603,7 @@ private final class Sdk(
consumerSpi)

case clz if classOf[View].isAssignableFrom(clz) =>
viewDescriptors :+= ViewDescriptorFactory(clz, serializer, sdkExecutionContext)
viewDescriptors :+= ViewDescriptorFactory(clz, serializer, regionInfo, sdkExecutionContext)

case clz if Reflect.isRestEndpoint(clz) =>
// handled separately because ComponentId is not mandatory
Expand Down Expand Up @@ -746,6 +752,8 @@ private final class Sdk(
context.requestHeaders.allHeaders.asInstanceOf[Seq[HttpHeader]].asJava

override def tracing(): Tracing = new SpanTracingImpl(context.openTelemetrySpan, sdkTracerFactory)

override def selfRegion(): String = regionInfo.selfRegion
}
val instance = wiredInstance(httpEndpointClass) {
sideEffectingComponentInjects(context.openTelemetrySpan).orElse {
Expand Down Expand Up @@ -777,6 +785,8 @@ private final class Sdk(
override def metadata(): Metadata = new JavaMetadataImpl(context.metadata)

override def tracing(): Tracing = new SpanTracingImpl(context.openTelemetrySpan, sdkTracerFactory)

override def selfRegion(): String = regionInfo.selfRegion
}

val instance = wiredInstance(grpcEndpointClass) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import java.util.Optional

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.jdk.OptionConverters.RichOption
import scala.util.control.NonFatal

import akka.actor.ActorSystem
Expand All @@ -34,6 +35,7 @@ import akka.javasdk.impl.timer.TimerSchedulerImpl
import akka.javasdk.timer.TimerScheduler
import akka.runtime.sdk.spi.BytesPayload
import akka.runtime.sdk.spi.ConsumerDestination
import akka.runtime.sdk.spi.RegionInfo
import akka.runtime.sdk.spi.SpiConsumer
import akka.runtime.sdk.spi.SpiConsumer.Effect
import akka.runtime.sdk.spi.SpiConsumer.Message
Expand All @@ -58,7 +60,8 @@ private[impl] final class ConsumerImpl[C <: Consumer](
tracerFactory: () => Tracer,
serializer: JsonSerializer,
ignoreUnknown: Boolean,
componentDescriptor: ComponentDescriptor)
componentDescriptor: ComponentDescriptor,
regionInfo: RegionInfo)
extends SpiConsumer {

private val log: Logger = LoggerFactory.getLogger(consumerClass)
Expand All @@ -76,12 +79,21 @@ private[impl] final class ConsumerImpl[C <: Consumer](
// FIXME would be good if we could record the chosen method in the span
val span: Option[Span] =
traceInstrumentation.buildSpan(ComponentType.Consumer, componentId, metadata.subjectScala, message.metadata)

val updatedMetadata = span.map(metadata.withTracing).getOrElse(metadata)

span.foreach(s => MDC.put(Telemetry.TRACE_ID, s.getSpanContext.getTraceId))
val fut =
try {
val messageContext = new MessageContextImpl(updatedMetadata, timerClient, tracerFactory, span)
val messageContext =
new MessageContextImpl(
updatedMetadata,
timerClient,
tracerFactory,
span,
regionInfo.selfRegion,
message.originRegion.toJava)

val payload: BytesPayload = message.payload.getOrElse(throw new IllegalArgumentException("No message payload"))
val effect = createRouter()
.handleCommand(MessageEnvelope.of(payload, messageContext.metadata), messageContext)
Expand Down Expand Up @@ -153,7 +165,9 @@ private[impl] final class MessageContextImpl(
override val metadata: Metadata,
timerClient: TimerClient,
tracerFactory: () => Tracer,
val span: Option[Span])
val span: Option[Span],
override val selfRegion: String,
override val originRegion: Optional[String])
extends AbstractContext
with MessageContext {

Expand All @@ -175,4 +189,5 @@ private[impl] final class MessageContextImpl(
}

override def tracing(): Tracing = new SpanTracingImpl(span, tracerFactory)

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import akka.javasdk.impl.telemetry.SpanTracingImpl
import akka.javasdk.impl.telemetry.Telemetry
import akka.javasdk.impl.telemetry.TraceInstrumentation
import akka.runtime.sdk.spi.BytesPayload
import akka.runtime.sdk.spi.RegionInfo
import akka.runtime.sdk.spi.SpiEntity
import akka.runtime.sdk.spi.SpiEventSourcedEntity
import akka.runtime.sdk.spi.SpiMetadata
Expand All @@ -53,6 +54,7 @@ private[impl] object EventSourcedEntityImpl {
override val sequenceNumber: Long,
override val commandName: String,
override val isDeleted: Boolean,
override val selfRegion: String,
override val metadata: Metadata,
span: Option[Span],
tracerFactory: () => Tracer)
Expand All @@ -64,12 +66,15 @@ private[impl] object EventSourcedEntityImpl {
override def commandId(): Long = 0
}

private class EventSourcedEntityContextImpl(override final val entityId: String)
private class EventSourcedEntityContextImpl(override final val entityId: String, override val selfRegion: String)
extends AbstractContext
with EventSourcedEntityContext

private final class EventContextImpl(entityId: String, override val sequenceNumber: Long)
extends EventSourcedEntityContextImpl(entityId)
private final class EventContextImpl(
entityId: String,
override val sequenceNumber: Long,
override val selfRegion: String)
extends EventSourcedEntityContextImpl(entityId, selfRegion)
with EventContext

}
Expand All @@ -86,14 +91,15 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
serializer: JsonSerializer,
componentDescriptor: ComponentDescriptor,
entityStateType: Class[S],
regionInfo: RegionInfo,
factory: EventSourcedEntityContext => ES)
extends SpiEventSourcedEntity {
import EventSourcedEntityImpl._

private val traceInstrumentation = new TraceInstrumentation(componentId, EventSourcedEntityCategory, tracerFactory)

private val router: ReflectiveEventSourcedEntityRouter[AnyRef, AnyRef, EventSourcedEntity[AnyRef, AnyRef]] = {
val context = new EventSourcedEntityContextImpl(entityId)
val context = new EventSourcedEntityContextImpl(entityId, regionInfo.selfRegion)
new ReflectiveEventSourcedEntityRouter[S, E, ES](factory(context), componentDescriptor.methodInvokers, serializer)
.asInstanceOf[ReflectiveEventSourcedEntityRouter[AnyRef, AnyRef, EventSourcedEntity[AnyRef, AnyRef]]]
}
Expand All @@ -120,6 +126,7 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
command.sequenceNumber,
command.name,
command.isDeleted,
regionInfo.selfRegion,
metadata,
span,
tracerFactory)
Expand Down Expand Up @@ -221,7 +228,7 @@ private[impl] final class EventSourcedEntityImpl[S, E, ES <: EventSourcedEntity[
state: SpiEventSourcedEntity.State,
event: AnyRef,
sequenceNumber: Long): SpiEventSourcedEntity.State = {
val eventContext = new EventContextImpl(entityId, sequenceNumber)
val eventContext = new EventContextImpl(entityId, sequenceNumber, regionInfo.selfRegion)
entity._internalSetEventContext(Optional.of(eventContext))
val clearState = entity._internalSetCurrentState(state, false)
try {
Expand Down
Loading