Skip to content

Commit

Permalink
Merge pull request #34 from ytake/fixed/sender-deleted-when-shapshot
Browse files Browse the repository at this point in the history
fixed: the sender is deleted when the snapshot is taken
  • Loading branch information
ytake authored Dec 26, 2024
2 parents fe8f2c7 + 04ef3e8 commit 97bb399
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 6 deletions.
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,25 @@ PHP 8.3 と swooleが必要です。
メッセージのシリアライズには、Protocol Buffersを利用します。
他のシリアライズフォーマットはまだサポートされていません。

```mermaid
sequenceDiagram
participant A as Actor A
participant B as Actor B
participant C as Actor C
A->>B: Message(Data X)
Note over B: B processes the received message, updates internal state,<br> and prepares the next message
B->>C: Message(Data Y)
Note over C: C processes the message from B.<br>Meanwhile, A or other actors can progress with separate tasks.
par Concurrent Execution
A->>C: Another Message(Data Z)
C->>B: Result Message(Computed W)
and
B->>A: State Report(State S)
end
```

## Installation

```bash
Expand Down
9 changes: 7 additions & 2 deletions src/Phluxor/ActorSystem/ActorContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Phluxor\ActorSystem\Message\MessageEnvelope;
use Phluxor\Metrics\ActorMetrics;
use Phluxor\Metrics\PhluxorMetrics;
use Phluxor\Persistence\Message\RequestSnapshot;
use Phluxor\Stack\SinglyLinkedList;
use Phluxor\Value\ContextExtensionId;
use Phluxor\Value\ExtensionInterface;
Expand Down Expand Up @@ -324,8 +325,12 @@ public function receive(?MessageEnvelope $envelope): void
{
$this->messageOrEnvelope = $envelope;
$this->defaultReceive();
// release message
$this->messageOrEnvelope = null;
if ($envelope !== null) {
$messageType = $envelope->getMessage();
if (!$messageType instanceof RequestSnapshot) {
$this->messageOrEnvelope = null;
}
}
}

public function defaultReceive(): void
Expand Down
18 changes: 15 additions & 3 deletions src/Phluxor/Persistence/Mixin.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@
namespace Phluxor\Persistence;

use Google\Protobuf\Internal\Message;
use Phluxor\ActorSystem\ActorContext;
use Phluxor\ActorSystem\Context\ContextInterface;
use Phluxor\ActorSystem\Context\ReceiverInterface;
use Phluxor\ActorSystem\Context\ReceiverPartInterface;
use Phluxor\ActorSystem\Message\MessageEnvelope;
use Phluxor\ActorSystem\Ref;
use Phluxor\Persistence\Message\OfferSnapshot;
use Phluxor\Persistence\Message\ReplayCompleted;
use Phluxor\Persistence\Message\RequestSnapshot;
Expand Down Expand Up @@ -85,9 +87,19 @@ public function persistenceReceive(Message $message): void
{
$this->providerState?->persistenceEvent($this->name(), $this->eventIndex, $message);
if ($this->eventIndex % $this->providerState?->getSnapshotInterval() === 0) {
$this->receiver->receive(
new MessageEnvelope(header: null, message: new RequestSnapshot())
);
$envelope = new MessageEnvelope(header: null, message: new RequestSnapshot());
if($this->receiver instanceof ActorContext) {
$sender = $this->receiver->sender();
// if the sender is set in the context, do not rewrite the sender
if ($sender instanceof Ref) {
$envelope = new MessageEnvelope(
header: null,
message: new RequestSnapshot(),
sender: $sender
);
}
};
$this->receiver->receive($envelope);
}
$this->eventIndex++;
}
Expand Down
63 changes: 62 additions & 1 deletion tests/Test/Persistence/PersistenceTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public function testReceiveRecovery(): void
});
$props = ActorSystem\Props::fromProducer(fn() => $this->receiveRecoverActor(),
ActorSystem\Props::withReceiverMiddleware(
new EventSourcedBehavior(new InMemoryStateProvider(new InMemoryProvider(2)))
new EventSourcedBehavior(new InMemoryStateProvider(new InMemoryProvider(1)))
));
$ref = $system->root()->spawnNamed($props, 'test.actor');
$this->assertNull($ref->isError());
Expand Down Expand Up @@ -160,4 +160,65 @@ public function receiveRecover(mixed $message): void
}
};
}

// snapshot取得を実行してもSenderが上書きされないことを確認
public function testReceiveRecoverySender(): void
{
run(function () {
go(function () {
$system = ActorSystem::create();
$props = ActorSystem\Props::fromProducer(fn() => $this->receiveRecoverActorSender(),
ActorSystem\Props::withReceiverMiddleware(
new EventSourcedBehavior(new InMemoryStateProvider(new InMemoryProvider(1)))
));
$ref = $system->root()->spawnNamed($props, 'test.actor');
$system->root()->requestFuture($ref->getRef(), new TestMessage(['message' => 'hello3']), 1);
$system->root()->requestFuture($ref->getRef(), new TestMessage(['message' => 'hello3']), 1);
$system->root()->requestFuture($ref->getRef(), new TestMessage(['message' => 'hello3']), 1);
$system->root()->requestFuture($ref->getRef(), new TestMessage(['message' => 'hello3']), 1);
$system->root()->requestFuture($ref->getRef(), new TestMessage(['message' => 'hello3']), 1);
$s = $system->root()->poisonFuture($ref->getRef())?->wait();
$ref = $system->root()->spawnNamed($props, 'test.actor');
$r = $system->root()->requestFuture($ref->getRef(), new TestMessage(['message' => 'hello3']), 1);
$this->assertSame('ok', $r->result()->value());
$r = $system->root()->requestFuture($ref->getRef(), new TestMessage(['message' => 'hello3']), 1);
$this->assertSame('ok', $r->result()->value());
});
});
}

private function receiveRecoverActorSender(): ActorSystem\Message\ActorInterface
{
return new class() implements ActorSystem\Message\ActorInterface, PersistentInterface {
use Mixin;

private string $state = '';
/** @var string[] */
private array $receiveRecoverMessages = [];

public function receive(ActorSystem\Context\ContextInterface $context): void
{
$msg = $context->message();
switch (true) {
case $msg instanceof RequestSnapshot:
$this->persistenceSnapshot(new TestSnapshot(['message' => $this->state]));
break;
case $msg instanceof TestMessage:
if (!$this->recovering()) {
$this->persistenceReceive($msg);
}
$context->respond('ok');
break;
case $msg instanceof Messages:
$context->respond($this->receiveRecoverMessages);
break;
}
}

public function receiveRecover(mixed $message): void
{
$this->receiveRecoverMessages[] = get_debug_type($message);
}
};
}
}

0 comments on commit 97bb399

Please sign in to comment.