Skip to content

Commit

Permalink
feat: add stashing and refactor code base (Tochemey#113)
Browse files Browse the repository at this point in the history
  • Loading branch information
Tochemey authored Sep 9, 2023
1 parent 9831db4 commit 9df7abf
Show file tree
Hide file tree
Showing 45 changed files with 1,324 additions and 690 deletions.
22 changes: 6 additions & 16 deletions Earthfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ RUN go install connectrpc.com/connect/cmd/protoc-gen-connect-go@latest
RUN apk --no-cache add git ca-certificates gcc musl-dev libc-dev binutils-gold

pbs:
BUILD +internal-pb
BUILD +protogen
BUILD +sample-pb

Expand Down Expand Up @@ -65,19 +64,6 @@ local-test:

SAVE ARTIFACT coverage.out AS LOCAL coverage.out

internal-pb:
# copy the proto files to generate
COPY --dir protos/ ./
COPY buf.work.yaml buf.gen.yaml ./

# generate the pbs
RUN buf generate \
--template buf.gen.yaml \
--path protos/internal/goakt

# save artifact to
SAVE ARTIFACT gen/goakt AS LOCAL internal/goakt

protogen:
# copy the proto files to generate
COPY --dir protos/ ./
Expand All @@ -86,10 +72,14 @@ protogen:
# generate the pbs
RUN buf generate \
--template buf.gen.yaml \
--path protos/public/address
--path protos/goakt/address \
--path protos/goakt/messages \
--path protos/goakt/internal

# save artifact to
SAVE ARTIFACT gen/address AS LOCAL pb
SAVE ARTIFACT gen/address AS LOCAL pb/address
SAVE ARTIFACT gen/messages AS LOCAL pb/messages
SAVE ARTIFACT gen/internal AS LOCAL internal

testprotogen:
# copy the proto files to generate
Expand Down
39 changes: 21 additions & 18 deletions actors/actor_system.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ import (
"github.com/pkg/errors"
"github.com/tochemey/goakt/cluster"
"github.com/tochemey/goakt/discovery"
goaktpb "github.com/tochemey/goakt/internal/goakt/v1"
"github.com/tochemey/goakt/internal/goakt/v1/goaktv1connect"
internalpb "github.com/tochemey/goakt/internal/v1"
"github.com/tochemey/goakt/internal/v1/internalpbconnect"
"github.com/tochemey/goakt/log"
pb "github.com/tochemey/goakt/pb/v1"
addresspb "github.com/tochemey/goakt/pb/address/v1"
"github.com/tochemey/goakt/pkg/resync"
"github.com/tochemey/goakt/telemetry"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -58,12 +58,12 @@ type ActorSystem interface {
// RemoteActor returns the address of a remote actor when cluster is enabled
// When the cluster mode is not enabled an actor not found error will be returned
// One can always check whether cluster is enabled before calling this method or just use the ActorOf method.
RemoteActor(ctx context.Context, actorName string) (addr *pb.Address, err error)
RemoteActor(ctx context.Context, actorName string) (addr *addresspb.Address, err error)
// ActorOf returns an existing actor in the local system or in the cluster when clustering is enabled
// When cluster mode is activated, the PID will be nil.
// When remoting is enabled this method will return and error
// An actor not found error is return when the actor is not found.
ActorOf(ctx context.Context, actorName string) (addr *pb.Address, pid PID, err error)
ActorOf(ctx context.Context, actorName string) (addr *addresspb.Address, pid PID, err error)
// InCluster states whether the actor system is running within a cluster of nodes
InCluster() bool
// GetPartition returns the partition where a given actor is located
Expand All @@ -78,7 +78,7 @@ type ActorSystem interface {
// ActorSystem represent a collection of actors on a given node
// Only a single instance of the ActorSystem can be created on a given node
type actorSystem struct {
goaktv1connect.UnimplementedRemoteMessagingServiceHandler
internalpbconnect.UnimplementedRemoteMessagingServiceHandler

// map of actors in the system
actors cmp.ConcurrentMap[string, PID]
Expand Down Expand Up @@ -127,14 +127,16 @@ type actorSystem struct {
partitionsCount uint64
// cluster mode
cluster cluster.Interface
clusterChan chan *goaktpb.WireActor
clusterChan chan *internalpb.WireActor

// help protect some the fields to set
mu sync.Mutex
// specifies actors mailbox size
mailboxSize uint64
// specifies the mailbox to use for the actors
mailbox Mailbox
// specifies the stash buffer
stashBuffer uint64
}

// enforce compilation error when all methods of the ActorSystem interface are not implemented
Expand All @@ -159,7 +161,7 @@ func NewActorSystem(name string, opts ...Option) (ActorSystem, error) {
actors: cmp.New[PID](),
hasStarted: atomic.NewBool(false),
typesLoader: NewTypesLoader(),
clusterChan: make(chan *goaktpb.WireActor, 10),
clusterChan: make(chan *internalpb.WireActor, 10),
name: name,
logger: log.DefaultLogger,
expireActorAfter: DefaultPassivationTimeout,
Expand Down Expand Up @@ -250,6 +252,7 @@ func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor) PID {
withSupervisorStrategy(x.supervisorStrategy),
withMailboxSize(x.mailboxSize),
withMailbox(x.mailbox),
withStash(x.stashBuffer),
withTelemetry(x.telemetry))

// add the given actor to the actor map
Expand All @@ -261,7 +264,7 @@ func (x *actorSystem) Spawn(ctx context.Context, name string, actor Actor) PID {
// when cluster is enabled replicate the actor metadata across the cluster
if x.clusterEnabled.Load() {
// send it to the cluster channel a wire actor
x.clusterChan <- &goaktpb.WireActor{
x.clusterChan <- &internalpb.WireActor{
ActorName: name,
ActorAddress: actorPath.RemoteAddress(),
ActorPath: actorPath.String(),
Expand Down Expand Up @@ -358,7 +361,7 @@ func (x *actorSystem) Actors() []PID {
// When cluster mode is activated, the PID will be nil.
// When remoting is enabled this method will return and error
// An actor not found error is return when the actor is not found.
func (x *actorSystem) ActorOf(ctx context.Context, actorName string) (addr *pb.Address, pid PID, err error) {
func (x *actorSystem) ActorOf(ctx context.Context, actorName string) (addr *addresspb.Address, pid PID, err error) {
// acquire the lock
x.mu.Lock()
// release the lock
Expand Down Expand Up @@ -431,7 +434,7 @@ func (x *actorSystem) LocalActor(ctx context.Context, actorName string) (PID, er
// RemoteActor returns the address of a remote actor when cluster is enabled
// When the cluster mode is not enabled an actor not found error will be returned
// One can always check whether cluster is enabled before calling this method or just use the ActorOf method.
func (x *actorSystem) RemoteActor(ctx context.Context, actorName string) (addr *pb.Address, err error) {
func (x *actorSystem) RemoteActor(ctx context.Context, actorName string) (addr *addresspb.Address, err error) {
// acquire the lock
x.mu.Lock()
// release the lock
Expand Down Expand Up @@ -553,7 +556,7 @@ func (x *actorSystem) Stop(ctx context.Context) error {
}

// RemoteLookup for an actor on a remote host.
func (x *actorSystem) RemoteLookup(ctx context.Context, request *connect.Request[goaktpb.RemoteLookupRequest]) (*connect.Response[goaktpb.RemoteLookupResponse], error) {
func (x *actorSystem) RemoteLookup(ctx context.Context, request *connect.Request[internalpb.RemoteLookupRequest]) (*connect.Response[internalpb.RemoteLookupResponse], error) {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "RemoteLookup")
defer span.End()
Expand Down Expand Up @@ -597,13 +600,13 @@ func (x *actorSystem) RemoteLookup(ctx context.Context, request *connect.Request
// let us construct the address
addr := pid.ActorPath().RemoteAddress()

return connect.NewResponse(&goaktpb.RemoteLookupResponse{Address: addr}), nil
return connect.NewResponse(&internalpb.RemoteLookupResponse{Address: addr}), nil
}

// RemoteAsk is used to send a message to an actor remotely and expect a response
// immediately. With this type of message the receiver cannot communicate back to Sender
// except reply the message with a response. This one-way communication
func (x *actorSystem) RemoteAsk(ctx context.Context, request *connect.Request[goaktpb.RemoteAskRequest]) (*connect.Response[goaktpb.RemoteAskResponse], error) {
func (x *actorSystem) RemoteAsk(ctx context.Context, request *connect.Request[internalpb.RemoteAskRequest]) (*connect.Response[internalpb.RemoteAskResponse], error) {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "RemoteAsk")
defer span.End()
Expand Down Expand Up @@ -659,13 +662,13 @@ func (x *actorSystem) RemoteAsk(ctx context.Context, request *connect.Request[go
}
// let us marshal the reply
marshaled, _ := anypb.New(reply)
return connect.NewResponse(&goaktpb.RemoteAskResponse{Message: marshaled}), nil
return connect.NewResponse(&internalpb.RemoteAskResponse{Message: marshaled}), nil
}

// RemoteTell is used to send a message to an actor remotely by another actor
// This is the only way remote actors can interact with each other. The actor on the
// other line can reply to the sender by using the Sender in the message
func (x *actorSystem) RemoteTell(ctx context.Context, request *connect.Request[goaktpb.RemoteTellRequest]) (*connect.Response[goaktpb.RemoteTellResponse], error) {
func (x *actorSystem) RemoteTell(ctx context.Context, request *connect.Request[internalpb.RemoteTellRequest]) (*connect.Response[internalpb.RemoteTellResponse], error) {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "RemoteTell")
defer span.End()
Expand Down Expand Up @@ -722,7 +725,7 @@ func (x *actorSystem) RemoteTell(ctx context.Context, request *connect.Request[g
logger.Error(ErrRemoteSendFailure(err))
return nil, ErrRemoteSendFailure(err)
}
return connect.NewResponse(new(goaktpb.RemoteTellResponse)), nil
return connect.NewResponse(new(internalpb.RemoteTellResponse)), nil
}

// registerMetrics register the PID metrics with OTel instrumentation.
Expand Down Expand Up @@ -832,7 +835,7 @@ func (x *actorSystem) enableRemoting(ctx context.Context) {
// create a http service mux
mux := http.NewServeMux()
// create the resource and handler
path, handler := goaktv1connect.NewRemoteMessagingServiceHandler(
path, handler := internalpbconnect.NewRemoteMessagingServiceHandler(
x,
connect.WithInterceptors(interceptor(x.telemetry.TracerProvider, x.telemetry.MeterProvider)),
)
Expand Down
2 changes: 1 addition & 1 deletion actors/actor_system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/tochemey/goakt/discovery"
"github.com/tochemey/goakt/log"
addresspb "github.com/tochemey/goakt/pb/v1"
addresspb "github.com/tochemey/goakt/pb/address/v1"
testpb "github.com/tochemey/goakt/test/data/pb/v1"
testkit "github.com/tochemey/goakt/testkit/discovery"
"github.com/travisjeffery/go-dynaport"
Expand Down
38 changes: 38 additions & 0 deletions actors/actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,41 @@ func (e *Exchanger) PostStop(context.Context) error {
}

var _ Actor = &Exchanger{}

type Stasher struct{}

func (x *Stasher) PreStart(context.Context) error {
return nil
}

func (x *Stasher) Receive(ctx ReceiveContext) {
switch ctx.Message().(type) {
case *testspb.TestStash:
ctx.Become(x.Ready)
ctx.Stash()
case *testspb.TestLogin:
case *testspb.TestBye:
_ = ctx.Self().Shutdown(ctx.Context())
}
}

func (x *Stasher) Ready(ctx ReceiveContext) {
switch ctx.Message().(type) {
case *testspb.TestStash:
case *testspb.TestLogin:
ctx.Stash()
case *testspb.TestSend:
// do nothing
case *testspb.TestUnstashAll:
ctx.UnBecome()
ctx.UnstashAll()
case *testspb.TestUnstash:
ctx.Unstash()
}
}

func (x *Stasher) PostStop(context.Context) error {
return nil
}

var _ Actor = &Stasher{}
32 changes: 16 additions & 16 deletions actors/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (

"connectrpc.com/connect"
"connectrpc.com/otelconnect"
goaktpb "github.com/tochemey/goakt/internal/goakt/v1"
"github.com/tochemey/goakt/internal/goakt/v1/goaktv1connect"
pb "github.com/tochemey/goakt/pb/v1"
internalpb "github.com/tochemey/goakt/internal/v1"
"github.com/tochemey/goakt/internal/v1/internalpbconnect"
addresspb "github.com/tochemey/goakt/pb/address/v1"
"github.com/tochemey/goakt/pkg/http"
"github.com/tochemey/goakt/telemetry"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -38,7 +38,7 @@ func Ask(ctx context.Context, to PID, message proto.Message, timeout time.Durati

// set the actual message
switch msg := message.(type) {
case *goaktpb.RemoteMessage:
case *internalpb.RemoteMessage:
// define the actual message variable
var actual proto.Message
// unmarshal the message and handle the error
Expand Down Expand Up @@ -99,7 +99,7 @@ func Tell(ctx context.Context, to PID, message proto.Message) error {

// set the actual message
switch msg := message.(type) {
case *goaktpb.RemoteMessage:
case *internalpb.RemoteMessage:
var (
// define the actual message variable
actual proto.Message
Expand Down Expand Up @@ -134,7 +134,7 @@ func Tell(ctx context.Context, to PID, message proto.Message) error {
}

// RemoteTell sends a message to an actor remotely without expecting any reply
func RemoteTell(ctx context.Context, to *pb.Address, message proto.Message) error {
func RemoteTell(ctx context.Context, to *addresspb.Address, message proto.Message) error {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "RemoteTell")
defer span.End()
Expand All @@ -146,15 +146,15 @@ func RemoteTell(ctx context.Context, to *pb.Address, message proto.Message) erro
}

// create an instance of remote client service
remoteClient := goaktv1connect.NewRemoteMessagingServiceClient(
remoteClient := internalpbconnect.NewRemoteMessagingServiceClient(
http.Client(),
http.URL(to.GetHost(), int(to.GetPort())),
connect.WithInterceptors(otelconnect.NewInterceptor()),
connect.WithGRPC(),
)
// prepare the rpcRequest to send
request := connect.NewRequest(&goaktpb.RemoteTellRequest{
RemoteMessage: &goaktpb.RemoteMessage{
request := connect.NewRequest(&internalpb.RemoteTellRequest{
RemoteMessage: &internalpb.RemoteMessage{
Sender: RemoteNoSender,
Receiver: to,
Message: marshaled,
Expand All @@ -168,7 +168,7 @@ func RemoteTell(ctx context.Context, to *pb.Address, message proto.Message) erro
}

// RemoteAsk sends a synchronous message to another actor remotely and expect a response.
func RemoteAsk(ctx context.Context, to *pb.Address, message proto.Message) (response *anypb.Any, err error) {
func RemoteAsk(ctx context.Context, to *addresspb.Address, message proto.Message) (response *anypb.Any, err error) {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "RemoteAsk")
defer span.End()
Expand All @@ -180,15 +180,15 @@ func RemoteAsk(ctx context.Context, to *pb.Address, message proto.Message) (resp
}

// create an instance of remote client service
remoteClient := goaktv1connect.NewRemoteMessagingServiceClient(
remoteClient := internalpbconnect.NewRemoteMessagingServiceClient(
http.Client(),
http.URL(to.GetHost(), int(to.GetPort())),
connect.WithInterceptors(otelconnect.NewInterceptor()),
connect.WithGRPC(),
)
// prepare the rpcRequest to send
rpcRequest := connect.NewRequest(&goaktpb.RemoteAskRequest{
RemoteMessage: &goaktpb.RemoteMessage{
rpcRequest := connect.NewRequest(&internalpb.RemoteAskRequest{
RemoteMessage: &internalpb.RemoteMessage{
Sender: RemoteNoSender,
Receiver: to,
Message: marshaled,
Expand All @@ -205,21 +205,21 @@ func RemoteAsk(ctx context.Context, to *pb.Address, message proto.Message) (resp
}

// RemoteLookup look for an actor address on a remote node.
func RemoteLookup(ctx context.Context, host string, port int, name string) (addr *pb.Address, err error) {
func RemoteLookup(ctx context.Context, host string, port int, name string) (addr *addresspb.Address, err error) {
// add a span context
ctx, span := telemetry.SpanContext(ctx, "RemoteLookup")
defer span.End()

// create an instance of remote client service
remoteClient := goaktv1connect.NewRemoteMessagingServiceClient(
remoteClient := internalpbconnect.NewRemoteMessagingServiceClient(
http.Client(),
http.URL(host, port),
connect.WithInterceptors(otelconnect.NewInterceptor()),
connect.WithGRPC(),
)

// prepare the request to send
request := connect.NewRequest(&goaktpb.RemoteLookupRequest{
request := connect.NewRequest(&internalpb.RemoteLookupRequest{
Host: host,
Port: int32(port),
Name: name,
Expand Down
Loading

0 comments on commit 9df7abf

Please sign in to comment.