Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added propagation #38

Merged
merged 2 commits into from
Jan 6, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
added propagation
  • Loading branch information
ytake committed Jan 6, 2025
commit 2364fa889d86e83df06b7af3bd7b6bfcafd77f17
11 changes: 6 additions & 5 deletions src/Phluxor/ActorSystem/ActorContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,12 @@ public function __construct(
public function ensureExtras(): ActorContextExtras
{
if ($this->extras == null) {
$ctxd = $this;
$ctx = $this;
if ($this->props != null && $this->props->contextDecoratorChain() != null) {
$c = $this->props->contextDecoratorChain();
$ctxd = $c($this);
$ctx = $c($this);
}
$this->extras = new ActorContextExtras($ctxd);
$this->extras = new ActorContextExtras($ctx);
}
return $this->extras;
}
Expand Down Expand Up @@ -391,8 +391,9 @@ public function spawnNamed(Props $props, string $name): SpawnResult
if ($this->self != null) {
$id = $this->self->protobufPid()->getId();
}
if ($this->props->spawnMiddlewareChain() != null) {
$r = $this->props->spawnMiddlewareChain()($this->actorSystem, sprintf("%s/%s", $id, $name), $props, $this);
$chain = $this->props->spawnMiddlewareChain();
if ($chain != null) {
$r = $chain($this->actorSystem, sprintf("%s/%s", $id, $name), $props, $this);
} else {
$r = $props->spawn($this->actorSystem, sprintf("%s/%s", $id, $name), $this);
}
Expand Down
10 changes: 0 additions & 10 deletions src/Phluxor/ActorSystem/ConcurrentMapResult.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,4 @@ public function __construct(
public bool $exists
) {
}

public function getValue(): mixed
{
return $this->value;
}

public function exists(): bool
{
return $this->exists;
}
}
5 changes: 3 additions & 2 deletions src/Phluxor/ActorSystem/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,9 @@ public function setLoggerFactory(ActorSystem\Logger\LoggerInterface $loggerFacto
* @param ActorSystem\Metrics\ProviderInterface $metricsProvider
* @return Config
*/
public function setMetricsProvider(ActorSystem\Metrics\ProviderInterface $metricsProvider): Config
{
public function setMetricsProvider(
ActorSystem\Metrics\ProviderInterface $metricsProvider
): Config {
$this->metricsProvider = $metricsProvider->provide();
return $this;
}
Expand Down
3 changes: 1 addition & 2 deletions src/Phluxor/ActorSystem/Metrics/HttpJsonMeterProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
use OpenTelemetry\API\Globals;
use OpenTelemetry\API\Instrumentation\Configurator;
use OpenTelemetry\Context\Context;
use OpenTelemetry\Context\ContextStorage;
use OpenTelemetry\Contrib\Context\Swoole\SwooleContextStorage;
use OpenTelemetry\Contrib\Otlp\ContentTypes;
use OpenTelemetry\Contrib\Otlp\MetricExporter;
Expand Down Expand Up @@ -71,7 +70,7 @@ protected function resource(): ResourceInfo
public function provide(): MeterProviderInterface
{
// Use Swoole context storage
Context::setStorage(new SwooleContextStorage(new ContextStorage()));
Context::setStorage(new SwooleContextStorage(Context::storage()));
$transport = new PsrTransportFactory(
$this->transportClient(),
Psr17FactoryDiscovery::findRequestFactory(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
<?php

declare(strict_types=1);

namespace Phluxor\ActorSystem\Middleware\Propagator;

use Closure;
use Phluxor\ActorSystem\Props\ContextDecoratorInterface;
use Phluxor\ActorSystem\Props\ReceiverMiddlewareInterface;
use Phluxor\ActorSystem\Props\SenderMiddlewareInterface;
use Phluxor\ActorSystem\Props\SpawnMiddlewareInterface;

class MiddlewarePropagation
{
/** @var Closure|SpawnMiddlewareInterface[] */
private array $spawnMiddleware = [];

/** @var Closure|SenderMiddlewareInterface[] */
private array $senderMiddleware = [];

/** @var Closure|ReceiverMiddlewareInterface[] */
private array $receiverMiddleware = [];

/** @var Closure|ContextDecoratorInterface[] */
private array $contextDecorators = [];

public function __construct(
private SpawnMiddleware $spawnMiddlewareProcess = new SpawnMiddleware()
){ }

public function setSpawnMiddleware(Closure|SpawnMiddlewareInterface ...$middleware): MiddlewarePropagation
{
$this->spawnMiddleware = array_merge($this->spawnMiddleware, $middleware);
return $this;
}

public function setItselfForwarded(): MiddlewarePropagation
{
return $this->setSpawnMiddleware($this->spawnMiddleware());
}

public function setSenderMiddleware(Closure|SenderMiddlewareInterface ...$middleware): MiddlewarePropagation
{
$this->senderMiddleware = array_merge($this->senderMiddleware, $middleware);
return $this;
}

public function setReceiverMiddleware(Closure|ReceiverMiddlewareInterface ...$middleware): MiddlewarePropagation
{
$this->receiverMiddleware = array_merge($this->receiverMiddleware, $middleware);
return $this;
}

public function setContextDecorator(Closure|ContextDecoratorInterface ...$decorators): MiddlewarePropagation
{
$this->contextDecorators = array_merge($this->contextDecorators, $decorators);
return $this;
}

/**
* @return SpawnMiddlewareInterface
*/
public function spawnMiddleware(): SpawnMiddlewareInterface
{
return $this->spawnMiddlewareProcess
->setSpawnMiddleware(...$this->spawnMiddleware)
->setSenderMiddleware(...$this->senderMiddleware)
->setReceiverMiddleware(...$this->receiverMiddleware)
->setContextDecorator(...$this->contextDecorators);
}
}
63 changes: 63 additions & 0 deletions src/Phluxor/ActorSystem/Middleware/Propagator/SpawnMiddleware.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<?php

declare(strict_types=1);

namespace Phluxor\ActorSystem\Middleware\Propagator;

use Closure;
use Phluxor\ActorSystem\Props\ContextDecoratorInterface;
use Phluxor\ActorSystem\Props\ReceiverMiddlewareInterface;
use Phluxor\ActorSystem\Props\SenderMiddlewareInterface;
use Phluxor\ActorSystem\Props\SpawnMiddlewareInterface;
use Phluxor\ActorSystem\SpawnFunctionInterface;

class SpawnMiddleware implements SpawnMiddlewareInterface
{
/** @var SpawnMiddlewareInterface[] */
private array $spawnMiddleware = [];
/** @var SenderMiddlewareInterface[] */
private array $senderMiddleware = [];
/** @var ReceiverMiddlewareInterface[] */
private array $receiverMiddleware = [];
/** @var ContextDecoratorInterface[] */
private array $contextDecorators = [];

public function setSpawnMiddleware(SpawnMiddlewareInterface ...$middleware): self
{
$this->spawnMiddleware = array_merge($this->spawnMiddleware, $middleware);
return $this;
}

public function setSenderMiddleware(SenderMiddlewareInterface ...$middleware): self
{
$this->senderMiddleware = array_merge($this->senderMiddleware, $middleware);
return $this;
}

public function setReceiverMiddleware(ReceiverMiddlewareInterface ...$middleware): self
{
$this->receiverMiddleware = array_merge($this->receiverMiddleware, $middleware);
return $this;
}

public function setContextDecorator(ContextDecoratorInterface ...$decorators): self
{
$this->contextDecorators = array_merge($this->contextDecorators, $decorators);
return $this;
}

/**
* @param Closure|SpawnFunctionInterface $next
* @return Closure|SpawnFunctionInterface
*/
public function __invoke(Closure|SpawnFunctionInterface $next): Closure|SpawnFunctionInterface
{
return new SpawnMiddlewareProcess(
$next,
$this->spawnMiddleware,
$this->senderMiddleware,
$this->receiverMiddleware,
$this->contextDecorators
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
<?php

declare(strict_types=1);

namespace Phluxor\ActorSystem\Middleware\Propagator;

use Closure;
use Phluxor\ActorSystem;
use Phluxor\ActorSystem\Context;
use Phluxor\ActorSystem\Props;
use Phluxor\ActorSystem\Props\ContextDecoratorInterface;
use Phluxor\ActorSystem\Props\ReceiverMiddlewareInterface;
use Phluxor\ActorSystem\Props\SenderMiddlewareInterface;
use Phluxor\ActorSystem\Props\SpawnMiddlewareInterface;
use Phluxor\ActorSystem\SpawnFunctionInterface;
use Phluxor\ActorSystem\SpawnResult;

readonly class SpawnMiddlewareProcess implements SpawnFunctionInterface
{
/**
* @param Closure|SpawnFunctionInterface $next
* @param SpawnMiddlewareInterface[] $spawnMiddleware
* @param SenderMiddlewareInterface[] $senderMiddleware
* @param ReceiverMiddlewareInterface[] $receiverMiddleware
* @param ContextDecoratorInterface[] $contextDecorators
*/
public function __construct(
private Closure|SpawnFunctionInterface $next,
private array $spawnMiddleware = [],
private array $senderMiddleware = [],
private array $receiverMiddleware = [],
private array $contextDecorators = []
) {
}

/**
* {@inheritDoc}
*/
public function __invoke(
ActorSystem $actorSystem,
string $id,
Props $props,
Context\SpawnerInterface $parentContext
): SpawnResult {
if (!empty($this->spawnMiddleware)) {
$props = $props->configure(
Props::withSpawnMiddleware(...$this->spawnMiddleware)
);
}
if (!empty($this->senderMiddleware)) {
$props = $props->configure(
Props::withSenderMiddleware(...$this->senderMiddleware)
);
}
if (!empty($this->receiverMiddleware)) {
$props = $props->configure(
Props::withReceiverMiddleware(...$this->receiverMiddleware)
);
}
if (!empty($this->contextDecorators)) {
$props = $props->configure(
Props::withContextDecorator(...$this->contextDecorators)
);
}
$next = $this->next;
return $next($actorSystem, $id, $props, $parentContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

namespace Phluxor\ActorSystem\Middleware\Trace;

use Exception;

interface HeaderHandlerInterface
{
/**
* @param string $key
* @param string $val
* @return Exception|null
*/
public function __invoke(string $key, string $val): ?Exception;
}
33 changes: 33 additions & 0 deletions src/Phluxor/ActorSystem/Middleware/Trace/MessageHeaderReader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?php

declare(strict_types=1);

namespace Phluxor\ActorSystem\Middleware\Trace;

use OpenTelemetry\Context\Propagation\PropagationGetterInterface;
use Phluxor\ActorSystem\Message\MessageHeader;

readonly class MessageHeaderReader implements PropagationGetterInterface
{
public function __construct(
private MessageHeader $header
) {
}

/**
* {@inheritdoc}
*/
public function keys($carrier): array
{
return $this->header->keys();
}

/**
* {@inheritdoc}
*/
public function get($carrier, string $key): ?string
{
$value = $carrier[$key] ??= null;
return $this->header->get($key) ?? $value;
}
}
25 changes: 25 additions & 0 deletions src/Phluxor/ActorSystem/Middleware/Trace/MessageHeaderWriter.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

declare(strict_types=1);

namespace Phluxor\ActorSystem\Middleware\Trace;

use OpenTelemetry\Context\Propagation\PropagationSetterInterface;
use Phluxor\ActorSystem\Message\MessageHeader;

readonly class MessageHeaderWriter implements PropagationSetterInterface
{
public function __construct(
private MessageHeader $header
) {
}

/**
* {@inheritdoc}
*/
public function set(&$carrier, string $key, string $value): void
{
$this->header->set($key, $value);
$carrier[$key] = $value;
}
}
Loading
Loading