Skip to content

Commit

Permalink
Added ID encoding based message repositories. (EventSaucePHP#22)
Browse files Browse the repository at this point in the history
* Added ID encoding based message repository for Doctrine v3.

* Implement other IdEncoder based repositories and prep for a v1 release

* Only test on pushes on main.

* Renamed property.

* Renamed property

* Renamed base test cases.

* Renamed base test case.

* Deprecate old repository implementations with uuid-encoding.

* Narrowed return type
  • Loading branch information
frankdejonge authored May 27, 2023
1 parent 957dc16 commit 0e81bff
Show file tree
Hide file tree
Showing 43 changed files with 850 additions and 93 deletions.
8 changes: 3 additions & 5 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ name: Tests

on:
push:
branches:
- main
pull_request:
schedule:
- cron: '0 0 * * *'
Expand Down Expand Up @@ -33,7 +35,7 @@ jobs:
fail-fast: true
matrix:
php: [8.0, 8.1, 8.2]
eventsauce: ['^1.1', '^2.0', '^3.0']
eventsauce: ['^3.0']
stability: [prefer-lowest, prefer-stable]

name: PHP ${{ matrix.php }} - EventSauce ${{ matrix.eventsauce }} - ${{ matrix.stability }}
Expand All @@ -57,10 +59,6 @@ jobs:
run: |
composer require "doctrine/dbal:^3.1" -w --prefer-dist --no-interaction --no-update
- name: Install specific illuminate version
run: |
composer require "illuminate/database:^8.35" -w --prefer-dist --no-interaction --no-update
- name: Install dependencies
run: composer update --${{ matrix.stability }} --prefer-dist --no-interaction --no-progress

Expand Down
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
},
"autoload-dev": {
"psr-4": {
"EventSauce\\IdEncoding\\": "src/IdEncoding",
"EventSauce\\UuidEncoding\\": "src/UuidEncoding",
"EventSauce\\MessageOutbox\\TestTooling\\": "src/TestTooling",
"EventSauce\\MessageOutbox\\": "src/MessageOutbox",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

namespace EventSauce\MessageRepository\DoctrineMessageRepository;

use EventSauce\IdEncoding\StringIdEncoder;
use EventSauce\EventSourcing\MessageRepository;
use EventSauce\EventSourcing\Serialization\ConstructingMessageSerializer;
use EventSauce\MessageRepository\TableSchema\DefaultTableSchema;
use function getenv;

class DefaultDoctrineMessageRepositoryForPostgresTest extends DoctrineMessageRepositoryTestCase
{
protected string $tableName = 'domain_messages_uuid';

protected function messageRepository(): MessageRepository
{
return new DoctrineMessageRepository(
connection: $this->connection,
tableName: $this->tableName,
serializer: new ConstructingMessageSerializer(),
tableSchema: new DefaultTableSchema(),
aggregateRootIdEncoder: new StringIdEncoder(),
);
}

protected function formatDsn(): string
{
$host = getenv('EVENTSAUCE_TESTING_PGSQL_HOST') ?: '127.0.0.1';
$port = getenv('EVENTSAUCE_TESTING_PGSQL_PORT') ?: '5432';
$dsn = "pgsql://username:password@$host:$port/outbox_messages";

return $dsn;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

namespace EventSauce\MessageRepository\DoctrineMessageRepository;

use EventSauce\IdEncoding\BinaryUuidIdEncoder;
use EventSauce\EventSourcing\Serialization\ConstructingMessageSerializer;
use EventSauce\MessageRepository\TableSchema\DefaultTableSchema;

class DefaultDoctrineMessageRepositoryTest extends DoctrineMessageRepositoryTestCase
{
protected string $tableName = 'domain_messages_uuid';

protected function messageRepository(): DoctrineMessageRepository
{
return new DoctrineMessageRepository(
connection: $this->connection,
tableName: $this->tableName,
serializer: new ConstructingMessageSerializer(),
tableSchema: new DefaultTableSchema(),
aggregateRootIdEncoder: new BinaryUuidIdEncoder(),
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

use function getenv;

class DefaultDoctrineUuidV4MessageRepositoryForPostgresTest extends DoctrineUuidV4MessageRepositoryTestCase
class DefaultDoctrineUuidV4MessageRepositoryForPostgresTest extends DoctrineMessageRepositoryTestCase
{
protected string $tableName = 'domain_messages_uuid';

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
use EventSauce\MessageRepository\TableSchema\DefaultTableSchema;
use EventSauce\UuidEncoding\BinaryUuidEncoder;

class DefaultDoctrineUuidV4MessageRepositoryTest extends DoctrineUuidV4MessageRepositoryTestCase
class DefaultDoctrineUuidV4MessageRepositoryTest extends DoctrineMessageRepositoryTestCase
{
protected string $tableName = 'domain_messages_uuid';

Expand Down
200 changes: 200 additions & 0 deletions src/DoctrineMessageRepository/DoctrineMessageRepository.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
<?php

namespace EventSauce\MessageRepository\DoctrineMessageRepository;

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Query\QueryBuilder;
use EventSauce\IdEncoding\BinaryUuidIdEncoder;
use EventSauce\IdEncoding\IdEncoder;
use EventSauce\EventSourcing\AggregateRootId;
use EventSauce\EventSourcing\Header;
use EventSauce\EventSourcing\Message;
use EventSauce\EventSourcing\MessageRepository;
use EventSauce\EventSourcing\OffsetCursor;
use EventSauce\EventSourcing\PaginationCursor;
use EventSauce\EventSourcing\Serialization\MessageSerializer;
use EventSauce\EventSourcing\UnableToPersistMessages;
use EventSauce\EventSourcing\UnableToRetrieveMessages;
use EventSauce\MessageRepository\TableSchema\DefaultTableSchema;
use EventSauce\MessageRepository\TableSchema\TableSchema;
use Generator;
use LogicException;
use Ramsey\Uuid\Uuid;
use Throwable;
use function array_keys;
use function array_map;
use function array_merge;
use function count;
use function get_class;
use function implode;
use function json_decode;
use function json_encode;
use function sprintf;

class DoctrineMessageRepository implements MessageRepository
{
private TableSchema $tableSchema;
private IdEncoder $aggregateRootIdEncoder;
private IdEncoder $eventIdEncoder;

public function __construct(
private Connection $connection,
private string $tableName,
private MessageSerializer $serializer,
private int $jsonEncodeOptions = 0,
?TableSchema $tableSchema = null,
?IdEncoder $aggregateRootIdEncoder = null,
?IdEncoder $eventIdEncoder = null,
)
{
$this->tableSchema = $tableSchema ?? new DefaultTableSchema();
$this->aggregateRootIdEncoder = $aggregateRootIdEncoder ?? new BinaryUuidIdEncoder();
$this->eventIdEncoder = $eventIdEncoder ?? $this->aggregateRootIdEncoder;
}

public function persist(Message ...$messages): void
{
if (count($messages) === 0) {
return;
}

$insertColumns = [
$this->tableSchema->eventIdColumn(),
$this->tableSchema->aggregateRootIdColumn(),
$this->tableSchema->versionColumn(),
$this->tableSchema->payloadColumn(),
...array_keys($additionalColumns = $this->tableSchema->additionalColumns()),
];

$insertValues = [];
$insertParameters = [];

foreach ($messages as $index => $message) {
$payload = $this->serializer->serializeMessage($message);
$payload['headers'][Header::EVENT_ID] ??= Uuid::uuid4()->toString();

$messageParameters = [
$this->indexParameter('event_id', $index) => $this->eventIdEncoder->encodeId($payload['headers'][Header::EVENT_ID]),
$this->indexParameter('aggregate_root_id', $index) => $this->aggregateRootIdEncoder->encodeId($message->aggregateRootId()),
$this->indexParameter('version', $index) => $payload['headers'][Header::AGGREGATE_ROOT_VERSION] ?? 0,
$this->indexParameter('payload', $index) => json_encode($payload, $this->jsonEncodeOptions),
];

foreach ($additionalColumns as $column => $header) {
$messageParameters[$this->indexParameter($column, $index)] = $payload['headers'][$header];
}

// Creates a values line like: (:event_id_1, :aggregate_root_id_1, ...)
$insertValues[] = implode(', ', $this->formatNamedParameters(array_keys($messageParameters)));

// Flatten the message parameters into the query parameters
$insertParameters = array_merge($insertParameters, $messageParameters);
}

$insertQuery = sprintf(
"INSERT INTO %s (%s) VALUES\n(%s)",
$this->tableName,
implode(', ', $insertColumns),
implode("),\n(", $insertValues),
);

try {
$this->connection->executeStatement($insertQuery, $insertParameters);
} catch (Throwable $exception) {
throw UnableToPersistMessages::dueTo('', $exception);
}
}

private function indexParameter(string $name, int $index): string
{
return $name . '_' . $index;
}

private function formatNamedParameters(array $parameters): array
{
return array_map(static fn(string $name) => ':' . $name, $parameters);
}

public function retrieveAll(AggregateRootId $id): Generator
{
$builder = $this->createQueryBuilder();
$builder->where(sprintf('%s = :aggregate_root_id', $this->tableSchema->aggregateRootIdColumn()));
$builder->setParameter('aggregate_root_id', $this->aggregateRootIdEncoder->encodeId($id));

try {
return $this->yieldMessagesFromPayloads($builder->executeQuery()->iterateColumn());
} catch (Throwable $exception) {
throw UnableToRetrieveMessages::dueTo('', $exception);
}
}

/**
* @psalm-return Generator<Message>
*/
public function retrieveAllAfterVersion(AggregateRootId $id, int $aggregateRootVersion): Generator
{
$builder = $this->createQueryBuilder();
$builder->where(sprintf('%s = :aggregate_root_id', $this->tableSchema->aggregateRootIdColumn()));
$builder->andWhere(sprintf('%s > :version', $this->tableSchema->versionColumn()));
$builder->setParameter('aggregate_root_id', $this->aggregateRootIdEncoder->encodeId($id));
$builder->setParameter('version', $aggregateRootVersion);

try {
return $this->yieldMessagesFromPayloads($builder->executeQuery()->iterateColumn());
} catch (Throwable $exception) {
throw UnableToRetrieveMessages::dueTo('', $exception);
}
}

private function createQueryBuilder(): QueryBuilder
{
$builder = $this->connection->createQueryBuilder();
$builder->select($this->tableSchema->payloadColumn());
$builder->from($this->tableName);
$builder->orderBy($this->tableSchema->versionColumn(), 'ASC');

return $builder;
}

/**
* @psalm-return Generator<Message>
*/
private function yieldMessagesFromPayloads(iterable $payloads): Generator
{
foreach ($payloads as $payload) {
yield $message = $this->serializer->unserializePayload(json_decode($payload, true));
}

return isset($message)
? $message->header(Header::AGGREGATE_ROOT_VERSION) ?: 0
: 0;
}

public function paginate(PaginationCursor $cursor): Generator
{
if (!$cursor instanceof OffsetCursor) {
throw new LogicException(sprintf('Wrong cursor type used, expected %s, received %s', OffsetCursor::class, get_class($cursor)));
}

$numberOfMessages = 0;
$builder = $this->connection->createQueryBuilder();
$builder->select($this->tableSchema->payloadColumn());
$builder->from($this->tableName);
$incrementalIdColumn = $this->tableSchema->incrementalIdColumn();
$builder->orderBy($incrementalIdColumn, 'ASC');
$builder->setMaxResults($cursor->limit());
$builder->where($incrementalIdColumn . ' > :id');
$builder->setParameter('id', $cursor->offset());

try {
foreach ($builder->executeQuery()->iterateColumn() as $payload) {
$numberOfMessages++;
yield $this->serializer->unserializePayload(json_decode($payload, true));
}
} catch (Throwable $exception) {
throw UnableToRetrieveMessages::dueTo($exception->getMessage(), $exception);
}

return $cursor->plusOffset($numberOfMessages);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
use function getenv;
use function str_starts_with;

abstract class DoctrineUuidV4MessageRepositoryTestCase extends MessageRepositoryTestCase
abstract class DoctrineMessageRepositoryTestCase extends MessageRepositoryTestCase
{
protected Connection $connection;

Expand Down Expand Up @@ -47,8 +47,6 @@ protected function truncateTable(): void
}
}

abstract protected function messageRepository(): DoctrineUuidV4MessageRepository;

protected function aggregateRootId(): AggregateRootId
{
return DummyAggregateRootId::generate();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,11 @@
use function json_decode;
use function json_encode;
use function sprintf;
use function var_dump;

/**
* @deprecated Will be removed in 2.0.0
* @see DoctrineMessageRepository
*/
class DoctrineUuidV4MessageRepository implements MessageRepository
{
private TableSchema $tableSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

use function getenv;

class LegacyDoctrineUuidV4MessageRepositoryForPostgresTest extends DoctrineUuidV4MessageRepositoryTestCase
class LegacyDoctrineUuidV4MessageRepositoryForPostgresTest extends DoctrineMessageRepositoryTestCase
{
protected string $tableName = 'legacy_domain_messages_uuid';

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

use function getenv;

class LegacyDoctrineUuidV4MessageRepositoryTest extends DoctrineUuidV4MessageRepositoryTestCase
class LegacyDoctrineUuidV4MessageRepositoryTest extends DoctrineMessageRepositoryTestCase
{
protected string $tableName = 'legacy_domain_messages_uuid';

Expand Down
12 changes: 4 additions & 8 deletions src/DoctrineMessageRepository/composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,15 @@
],
"require": {
"doctrine/dbal": "^3.1",
"eventsauce/eventsauce": "^1.0||^2.0||^3.0",
"eventsauce/message-repository-table-schema": "^0.2||^0.3||^0.4",
"eventsauce/uuid-encoding": "^0.2||^0.3||^0.4",
"eventsauce/eventsauce": "^3.0",
"eventsauce/message-repository-table-schema": "^1.0",
"eventsauce/uuid-encoding": "^1.0",
"eventsauce/id-encoding": "^1.0",
"ramsey/uuid": "^4.1"
},
"autoload": {
"psr-4": {
"EventSauce\\MessageRepository\\DoctrineMessageRepository\\": "./"
}
},
"extra": {
"branch-alias": {
"dev-main": "0.2-dev"
}
}
}
Loading

0 comments on commit 0e81bff

Please sign in to comment.