From 372731cee95a97a6d4e14008f2ce6936688803c3 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 17 Jan 2018 09:57:24 +0200 Subject: [PATCH 01/56] 0.9 --- composer.json | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/composer.json b/composer.json index a25a00a..5623c2b 100644 --- a/composer.json +++ b/composer.json @@ -7,17 +7,17 @@ "license": "MIT", "require": { "php": ">=5.6", - "enqueue/enqueue": "^0.8@dev", + "enqueue/enqueue": "^0.9@dev", "symfony/dependency-injection": "^2.8|^3|^4", "symfony/config": "^2.8|^3|^4", "symfony/console": "^2.8|^3|^4" }, "require-dev": { "phpunit/phpunit": "~5.5", - "enqueue/test": "^0.8@dev", - "enqueue/amqp-ext": "^0.8@dev", - "enqueue/fs": "^0.8@dev", - "enqueue/null": "^0.8@dev" + "enqueue/test": "^0.9@dev", + "enqueue/amqp-ext": "^0.9@dev", + "enqueue/fs": "^0.9@dev", + "enqueue/null": "^0.9@dev" }, "support": { "email": "opensource@forma-pro.com", @@ -35,7 +35,7 @@ "minimum-stability": "dev", "extra": { "branch-alias": { - "dev-master": "0.8.x-dev" + "dev-master": "0.9.x-dev" } } } From bcd28d6f8d733f595876d099b44ed5905720f0da Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 5 Apr 2018 10:25:20 +0300 Subject: [PATCH 02/56] Drop PHP5 support, Require Queue Interop with PHP7 features. --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index 5623c2b..7e844eb 100644 --- a/composer.json +++ b/composer.json @@ -6,7 +6,7 @@ "homepage": "https://enqueue.forma-pro.com/", "license": "MIT", "require": { - "php": ">=5.6", + "php": "^7.1.3", "enqueue/enqueue": "^0.9@dev", "symfony/dependency-injection": "^2.8|^3|^4", "symfony/config": "^2.8|^3|^4", From fa05ac0fc6e572988a344453b168adc765daf975 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 5 Apr 2018 10:30:01 +0300 Subject: [PATCH 03/56] Drop Symfony 2.x support, set Symfony 3.4 minimum supported version. --- composer.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/composer.json b/composer.json index 7e844eb..e900bb9 100644 --- a/composer.json +++ b/composer.json @@ -8,9 +8,9 @@ "require": { "php": "^7.1.3", "enqueue/enqueue": "^0.9@dev", - "symfony/dependency-injection": "^2.8|^3|^4", - "symfony/config": "^2.8|^3|^4", - "symfony/console": "^2.8|^3|^4" + "symfony/dependency-injection": "^3.4|^4", + "symfony/config": "^3.4|^4", + "symfony/console": "^3.4|^4" }, "require-dev": { "phpunit/phpunit": "~5.5", From e4a09858ef65f583f950aa70994b5254e83dc903 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 8 Aug 2018 17:32:42 +0300 Subject: [PATCH 04/56] fix req. --- composer.json | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/composer.json b/composer.json index e900bb9..e133d58 100644 --- a/composer.json +++ b/composer.json @@ -7,17 +7,17 @@ "license": "MIT", "require": { "php": "^7.1.3", - "enqueue/enqueue": "^0.9@dev", + "enqueue/enqueue": "0.9.x-dev", "symfony/dependency-injection": "^3.4|^4", "symfony/config": "^3.4|^4", "symfony/console": "^3.4|^4" }, "require-dev": { "phpunit/phpunit": "~5.5", - "enqueue/test": "^0.9@dev", - "enqueue/amqp-ext": "^0.9@dev", - "enqueue/fs": "^0.9@dev", - "enqueue/null": "^0.9@dev" + "enqueue/test": "0.9.x-dev", + "enqueue/amqp-ext": "0.9.x-dev", + "enqueue/fs": "0.9.x-dev", + "enqueue/null": "0.9.x-dev" }, "support": { "email": "opensource@forma-pro.com", From a41e1082cb6ae2ee7d0875adfe3d795d682dfbb9 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Tue, 14 Aug 2018 10:38:52 +0300 Subject: [PATCH 05/56] update pkgs travis files. --- .travis.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 566e0af..408d8b7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -6,8 +6,8 @@ git: language: php php: - - '5.6' - - '7.0' + - '7.1' + - '7.2' cache: directories: From 44e3544b58cc68579fef7656f88b2cf6248d0b86 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Tue, 14 Aug 2018 15:36:24 +0300 Subject: [PATCH 06/56] [Consumption] Add QueueConsumerInterface, make QueueConsumer final. --- SimpleClient.php | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/SimpleClient.php b/SimpleClient.php index 8114de2..916f0db 100644 --- a/SimpleClient.php +++ b/SimpleClient.php @@ -15,7 +15,7 @@ use Enqueue\Client\RouterProcessor; use Enqueue\Consumption\CallbackProcessor; use Enqueue\Consumption\ExtensionInterface; -use Enqueue\Consumption\QueueConsumer; +use Enqueue\Consumption\QueueConsumerInterface; use Enqueue\Dbal\DbalConnectionFactory; use Enqueue\Dbal\Symfony\DbalTransportFactory; use Enqueue\Fs\FsConnectionFactory; @@ -194,10 +194,7 @@ public function getContext() return $this->container->get('enqueue.transport.context'); } - /** - * @return QueueConsumer - */ - public function getQueueConsumer() + public function getQueueConsumer(): QueueConsumerInterface { return $this->container->get('enqueue.client.queue_consumer'); } From cdb50360fde648e0341d45e8c3a8907ababab2d1 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 15 Aug 2018 12:40:50 +0300 Subject: [PATCH 07/56] Remove deprecated code. --- SimpleClient.php | 16 ---------------- Tests/Functional/SimpleClientTest.php | 4 ++-- 2 files changed, 2 insertions(+), 18 deletions(-) diff --git a/SimpleClient.php b/SimpleClient.php index 916f0db..1f1d1d0 100644 --- a/SimpleClient.php +++ b/SimpleClient.php @@ -148,22 +148,6 @@ public function sendEvent($topic, $message) $this->getProducer()->sendEvent($topic, $message); } - /** - * @deprecated since 0.8.18 and will be removed in 0.9. Use sendEvent method instead - * - * @param string $topic - * @param string|array $message - * @param bool $setupBroker - */ - public function send($topic, $message, $setupBroker = false) - { - if ($setupBroker) { - $this->setupBroker(); - } - - $this->sendEvent($topic, $message); - } - /** * @param ExtensionInterface|null $runtimeExtension */ diff --git a/Tests/Functional/SimpleClientTest.php b/Tests/Functional/SimpleClientTest.php index 87d6003..cb47282 100644 --- a/Tests/Functional/SimpleClientTest.php +++ b/Tests/Functional/SimpleClientTest.php @@ -112,7 +112,7 @@ public function testProduceAndConsumeOneMessage($config) return Result::ACK; }); - $client->send('foo_topic', 'Hello there!', true); + $client->sendEvent('foo_topic', 'Hello there!', true); $client->consume(new ChainExtension([ new LimitConsumptionTimeExtension(new \DateTime('+5sec')), @@ -144,7 +144,7 @@ public function testProduceAndRouteToTwoConsumes($config) return Result::ACK; }); - $client->send('foo_topic', 'Hello there!', true); + $client->sendEvent('foo_topic', 'Hello there!', true); $client->consume(new ChainExtension([ new LimitConsumptionTimeExtension(new \DateTime('+5sec')), From 6c24e9954caf03b64000c83f028db462cd7ce00e Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 15 Aug 2018 14:59:58 +0300 Subject: [PATCH 08/56] fix simple client tests . --- Tests/Functional/SimpleClientTest.php | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/Tests/Functional/SimpleClientTest.php b/Tests/Functional/SimpleClientTest.php index cb47282..723ee3a 100644 --- a/Tests/Functional/SimpleClientTest.php +++ b/Tests/Functional/SimpleClientTest.php @@ -112,7 +112,9 @@ public function testProduceAndConsumeOneMessage($config) return Result::ACK; }); - $client->sendEvent('foo_topic', 'Hello there!', true); + $client->setupBroker(); + + $client->sendEvent('foo_topic', 'Hello there!'); $client->consume(new ChainExtension([ new LimitConsumptionTimeExtension(new \DateTime('+5sec')), @@ -144,7 +146,9 @@ public function testProduceAndRouteToTwoConsumes($config) return Result::ACK; }); - $client->sendEvent('foo_topic', 'Hello there!', true); + $client->setupBroker(); + + $client->sendEvent('foo_topic', 'Hello there!'); $client->consume(new ChainExtension([ new LimitConsumptionTimeExtension(new \DateTime('+5sec')), From 9504f95b3bd6c2d894e3c4a86ca2c14d1259e43b Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Mon, 20 Aug 2018 13:51:56 +0300 Subject: [PATCH 09/56] add tests, remove functions. --- SimpleClientContainerExtension.php | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/SimpleClientContainerExtension.php b/SimpleClientContainerExtension.php index 7bcb849..fba04d1 100644 --- a/SimpleClientContainerExtension.php +++ b/SimpleClientContainerExtension.php @@ -7,10 +7,12 @@ use Enqueue\Client\ConsumptionExtension\DelayRedeliveredMessageExtension; use Enqueue\Client\ConsumptionExtension\SetRouterPropertiesExtension; use Enqueue\Client\DelegateProcessor; +use Enqueue\Client\DriverFactory; use Enqueue\Client\Meta\QueueMetaRegistry; use Enqueue\Client\Meta\TopicMetaRegistry; use Enqueue\Client\Producer; use Enqueue\Client\RouterProcessor; +use Enqueue\ConnectionFactoryFactory; use Enqueue\Consumption\ChainExtension as ConsumptionChainExtension; use Enqueue\Consumption\QueueConsumer; use Enqueue\Rpc\RpcFactory; @@ -91,18 +93,27 @@ public function load(array $configs, ContainerBuilder $container) $transportConfig, ]); + $container->register('enqueue.connection_factory_factory', ConnectionFactoryFactory::class); + + $container->register('enqueue.client.driver_factory', DriverFactory::class) + ->addArgument(new Reference('enqueue.client.config')) + ->addArgument(new Reference('enqueue.client.meta.queue_meta_registry')) + ; + $container->register('enqueue.client.rpc_factory', RpcFactory::class) ->setPublic(true) ->setArguments([ new Reference('enqueue.transport.context'), - ]); + ]) + ; $container->register('enqueue.client.producer', Producer::class) ->setPublic(true) ->setArguments([ new Reference('enqueue.client.driver'), new Reference('enqueue.client.rpc_factory'), - ]); + ]) + ; $container->setAlias('enqueue.client.producer_v2', new Alias('enqueue.client.producer', true)); From bdea721af1197371c654d21c24c91f6b6526007c Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Sat, 1 Sep 2018 20:48:13 +0300 Subject: [PATCH 10/56] explisitly add queue interop dev package as dep --- composer.json | 2 ++ 1 file changed, 2 insertions(+) diff --git a/composer.json b/composer.json index e133d58..aea1a20 100644 --- a/composer.json +++ b/composer.json @@ -9,6 +9,8 @@ "php": "^7.1.3", "enqueue/enqueue": "0.9.x-dev", "symfony/dependency-injection": "^3.4|^4", + "queue-interop/amqp-interop": "0.8.x-dev", + "queue-interop/queue-interop": "0.7.x-dev", "symfony/config": "^3.4|^4", "symfony/console": "^3.4|^4" }, From 3e5b1086b96b7ba888389498239fdd4d8720eb57 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Tue, 11 Sep 2018 15:59:21 +0300 Subject: [PATCH 11/56] fix simple client. --- SimpleClient.php | 282 +++++--------------------- SimpleClientContainerExtension.php | 121 ++++------- Tests/Functional/SimpleClientTest.php | 114 +++++------ Tests/SimpleClientTest.php | 117 ----------- 4 files changed, 141 insertions(+), 493 deletions(-) delete mode 100644 Tests/SimpleClientTest.php diff --git a/SimpleClient.php b/SimpleClient.php index 1f1d1d0..18f1887 100644 --- a/SimpleClient.php +++ b/SimpleClient.php @@ -2,43 +2,20 @@ namespace Enqueue\SimpleClient; -use Enqueue\AmqpBunny\AmqpConnectionFactory as AmqpBunnyConnectionFactory; -use Enqueue\AmqpExt\AmqpConnectionFactory as AmqpExtConnectionFactory; -use Enqueue\AmqpLib\AmqpConnectionFactory as AmqpLibConnectionFactory; use Enqueue\Client\ArrayProcessorRegistry; use Enqueue\Client\Config; use Enqueue\Client\DelegateProcessor; use Enqueue\Client\DriverInterface; +use Enqueue\Client\Message; use Enqueue\Client\Meta\QueueMetaRegistry; use Enqueue\Client\Meta\TopicMetaRegistry; +use Enqueue\Client\ProcessorRegistryInterface; use Enqueue\Client\ProducerInterface; use Enqueue\Client\RouterProcessor; use Enqueue\Consumption\CallbackProcessor; use Enqueue\Consumption\ExtensionInterface; use Enqueue\Consumption\QueueConsumerInterface; -use Enqueue\Dbal\DbalConnectionFactory; -use Enqueue\Dbal\Symfony\DbalTransportFactory; -use Enqueue\Fs\FsConnectionFactory; -use Enqueue\Fs\Symfony\FsTransportFactory; -use Enqueue\Gps\GpsConnectionFactory; -use Enqueue\Gps\Symfony\GpsTransportFactory; -use Enqueue\Mongodb\MongodbConnectionFactory; -use Enqueue\Mongodb\Symfony\MongodbTransportFactory; -use Enqueue\Null\Symfony\NullTransportFactory; -use Enqueue\RdKafka\RdKafkaConnectionFactory; -use Enqueue\RdKafka\Symfony\RdKafkaTransportFactory; -use Enqueue\Redis\RedisConnectionFactory; -use Enqueue\Redis\Symfony\RedisTransportFactory; use Enqueue\Rpc\Promise; -use Enqueue\Sqs\SqsConnectionFactory; -use Enqueue\Sqs\Symfony\SqsTransportFactory; -use Enqueue\Stomp\StompConnectionFactory; -use Enqueue\Stomp\Symfony\RabbitMqStompTransportFactory; -use Enqueue\Stomp\Symfony\StompTransportFactory; -use Enqueue\Symfony\AmqpTransportFactory; -use Enqueue\Symfony\DefaultTransportFactory; -use Enqueue\Symfony\MissingTransportFactory; -use Enqueue\Symfony\RabbitMqAmqpTransportFactory; use Interop\Queue\PsrContext; use Interop\Queue\PsrProcessor; use Symfony\Component\DependencyInjection\ContainerBuilder; @@ -59,28 +36,34 @@ final class SimpleClient /** * The config could be a transport DSN (string) or an array, here's an example of a few DSNs:. * - * amqp: - * amqp://guest:guest@localhost:5672/%2f?lazy=1&persisted=1 - * file://foo/bar/ - * null: + * $config = amqp: + * $config = amqp://guest:guest@localhost:5672/%2f?lazy=1&persisted=1 + * $config = file://foo/bar/ + * $config = null: * - * or an array, the most simple: + * or an array: * - *$config = [ + * $config = [ * 'transport' => [ - * 'default' => 'amqp', - * 'amqp' => [], // amqp options here - * ], + * 'dsn' => 'amqps://guest:guest@localhost:5672/%2f', + * 'ssl_cacert' => '/a/dir/cacert.pem', + * 'ssl_cert' => '/a/dir/cert.pem', + * 'ssl_key' => '/a/dir/key.pem', * ] * - * or a with all details: + * with custom connection factory class * * $config = [ * 'transport' => [ - * 'default' => 'amqp', - * 'amqp' => [], - * .... - * ], + * 'dsn' => 'amqps://guest:guest@localhost:5672/%2f', + * 'connection_factory_class' => 'aCustomConnectionFactory', + * // other options available options are factory_class and factory_service + * ] + * + * The client config + * + * $config = [ + * 'transport' => 'null:', * 'client' => [ * 'prefix' => 'enqueue', * 'app_name' => 'app', @@ -105,11 +88,9 @@ public function __construct($config, ContainerBuilder $container = null) } /** - * @param string $topic - * @param string $processorName * @param callable|PsrProcessor $processor */ - public function bind($topic, $processorName, $processor) + public function bind(string $topic, string $processorName, $processor): void { if (is_callable($processor)) { $processor = new CallbackProcessor($processor); @@ -121,48 +102,41 @@ public function bind($topic, $processorName, $processor) $queueName = $this->getConfig()->getDefaultProcessorQueueName(); + $this->getRouterProcessor()->add($topic, $queueName, $processorName); $this->getTopicMetaRegistry()->addProcessor($topic, $processorName); $this->getQueueMetaRegistry()->addProcessor($queueName, $processorName); $this->getProcessorRegistry()->add($processorName, $processor); - $this->getRouterProcessor()->add($topic, $queueName, $processorName); } /** - * @param string $command - * @param mixed $message - * @param bool $needReply - * - * @return Promise|null + * @param string|array|\JsonSerializable|Message $message */ - public function sendCommand($command, $message, $needReply = false) + public function sendCommand(string $command, $message, bool $needReply = false): ?Promise { return $this->getProducer()->sendCommand($command, $message, $needReply); } /** - * @param string $topic - * @param string|array $message + * @param string|array|Message $message */ - public function sendEvent($topic, $message) + public function sendEvent(string $topic, $message): void { $this->getProducer()->sendEvent($topic, $message); } - /** - * @param ExtensionInterface|null $runtimeExtension - */ - public function consume(ExtensionInterface $runtimeExtension = null) + public function consume(ExtensionInterface $runtimeExtension = null): void { $this->setupBroker(); $processor = $this->getDelegateProcessor(); $queueConsumer = $this->getQueueConsumer(); $defaultQueueName = $this->getConfig()->getDefaultProcessorQueueName(); - $defaultTransportQueueName = $this->getConfig()->createTransportQueueName($defaultQueueName); - + $defaultTransportQueueName = $this->getDriver()->createQueue($defaultQueueName); $queueConsumer->bind($defaultTransportQueueName, $processor); - if ($this->getConfig()->getRouterQueueName() != $defaultQueueName) { - $routerTransportQueueName = $this->getConfig()->createTransportQueueName($this->getConfig()->getRouterQueueName()); + + $routerQueueName = $this->getConfig()->getRouterQueueName(); + if ($routerQueueName != $defaultQueueName) { + $routerTransportQueueName = $this->getDriver()->createQueue($routerQueueName); $queueConsumer->bind($routerTransportQueueName, $processor); } @@ -170,10 +144,7 @@ public function consume(ExtensionInterface $runtimeExtension = null) $queueConsumer->consume($runtimeExtension); } - /** - * @return PsrContext - */ - public function getContext() + public function getContext(): PsrContext { return $this->container->get('enqueue.transport.context'); } @@ -183,51 +154,34 @@ public function getQueueConsumer(): QueueConsumerInterface return $this->container->get('enqueue.client.queue_consumer'); } - /** - * @return Config - */ - public function getConfig() + public function getConfig(): Config { return $this->container->get('enqueue.client.config'); } - /** - * @return DriverInterface - */ - public function getDriver() + public function getDriver(): DriverInterface { - return $this->container->get('enqueue.client.driver'); + return $this->container->get('enqueue.client.default.driver'); } - /** - * @return TopicMetaRegistry - */ - public function getTopicMetaRegistry() + public function getTopicMetaRegistry(): TopicMetaRegistry { return $this->container->get('enqueue.client.meta.topic_meta_registry'); } - /** - * @return QueueMetaRegistry - */ - public function getQueueMetaRegistry() + public function getQueueMetaRegistry(): QueueMetaRegistry { return $this->container->get('enqueue.client.meta.queue_meta_registry'); } - /** - * @param bool $setupBroker - * - * @return ProducerInterface - */ - public function getProducer($setupBroker = false) + public function getProducer(bool $setupBroker = false): ProducerInterface { $setupBroker && $this->setupBroker(); return $this->container->get('enqueue.client.producer'); } - public function setupBroker() + public function setupBroker(): void { $this->getDriver()->setupBroker(); } @@ -235,38 +189,24 @@ public function setupBroker() /** * @return ArrayProcessorRegistry */ - public function getProcessorRegistry() + public function getProcessorRegistry(): ProcessorRegistryInterface { return $this->container->get('enqueue.client.processor_registry'); } - /** - * @return DelegateProcessor - */ - public function getDelegateProcessor() + public function getDelegateProcessor(): DelegateProcessor { return $this->container->get('enqueue.client.delegate_processor'); } - /** - * @return RouterProcessor - */ - public function getRouterProcessor() + public function getRouterProcessor(): RouterProcessor { return $this->container->get('enqueue.client.router_processor'); } - /** - * @param array|string $config - * @param ContainerBuilder $container - * - * @return ContainerInterface - */ - private function buildContainer($config, ContainerBuilder $container) + private function buildContainer($config, ContainerBuilder $container): ContainerInterface { - $config = $this->buildConfig($config); - $extension = $this->buildContainerExtension(); - + $extension = new SimpleClientContainerExtension(); $container->registerExtension($extension); $container->loadFromExtension($extension->getAlias(), $config); @@ -274,132 +214,4 @@ private function buildContainer($config, ContainerBuilder $container) return $container; } - - /** - * @return SimpleClientContainerExtension - */ - private function buildContainerExtension() - { - $extension = new SimpleClientContainerExtension(); - - $extension->addTransportFactory(new DefaultTransportFactory('default')); - $extension->addTransportFactory(new NullTransportFactory('null')); - - if (class_exists(StompConnectionFactory::class)) { - $extension->addTransportFactory(new StompTransportFactory('stomp')); - $extension->addTransportFactory(new RabbitMqStompTransportFactory('rabbitmq_stomp')); - } else { - $extension->addTransportFactory(new MissingTransportFactory('stomp', ['enqueue/stomp'])); - $extension->addTransportFactory(new MissingTransportFactory('rabbitmq_stomp', ['enqueue/stomp'])); - } - - if ( - class_exists(AmqpBunnyConnectionFactory::class) || - class_exists(AmqpExtConnectionFactory::class) || - class_exists(AmqpLibConnectionFactory::class) - ) { - $extension->addTransportFactory(new AmqpTransportFactory('amqp')); - $extension->addTransportFactory(new RabbitMqAmqpTransportFactory('rabbitmq_amqp')); - } else { - $amppPackages = ['enqueue/amqp-ext', 'enqueue/amqp-bunny', 'enqueue/amqp-lib']; - $extension->addTransportFactory(new MissingTransportFactory('amqp', $amppPackages)); - $extension->addTransportFactory(new MissingTransportFactory('rabbitmq_amqp', $amppPackages)); - } - - if (class_exists(FsConnectionFactory::class)) { - $extension->addTransportFactory(new FsTransportFactory('fs')); - } else { - $extension->addTransportFactory(new MissingTransportFactory('fs', ['enqueue/fs'])); - } - - if (class_exists(RedisConnectionFactory::class)) { - $extension->addTransportFactory(new RedisTransportFactory('redis')); - } else { - $extension->addTransportFactory(new MissingTransportFactory('redis', ['enqueue/redis'])); - } - - if (class_exists(DbalConnectionFactory::class)) { - $extension->addTransportFactory(new DbalTransportFactory('dbal')); - } else { - $extension->addTransportFactory(new MissingTransportFactory('dbal', ['enqueue/dbal'])); - } - - if (class_exists(SqsConnectionFactory::class)) { - $extension->addTransportFactory(new SqsTransportFactory('sqs')); - } else { - $extension->addTransportFactory(new MissingTransportFactory('sqs', ['enqueue/sqs'])); - } - - if (class_exists(GpsConnectionFactory::class)) { - $extension->addTransportFactory(new GpsTransportFactory('gps')); - } else { - $extension->addTransportFactory(new MissingTransportFactory('gps', ['enqueue/gps'])); - } - - if (class_exists(RdKafkaConnectionFactory::class)) { - $extension->addTransportFactory(new RdKafkaTransportFactory('rdkafka')); - } else { - $extension->addTransportFactory(new MissingTransportFactory('rdkafka', ['enqueue/rdkafka'])); - } - - if (class_exists(MongodbConnectionFactory::class)) { - $extension->addTransportFactory(new MongodbTransportFactory('mongodb')); - } else { - $extension->addTransportFactory(new MissingTransportFactory('mongodb', ['enqueue/mongodb'])); - } - - return $extension; - } - - /** - * @param array|string $config - * - * @return array - */ - private function buildConfig($config) - { - if (is_string($config) && false !== strpos($config, ':')) { - $extConfig = [ - 'client' => [], - 'transport' => [ - 'default' => $config, - ], - ]; - } elseif (is_string($config)) { - $extConfig = [ - 'client' => [], - 'transport' => [ - 'default' => $config, - $config => [], - ], - ]; - } elseif (is_array($config)) { - $extConfig = array_replace_recursive([ - 'client' => [], - 'transport' => [], - ], $config); - } else { - throw new \LogicException('Expects config is string or array'); - } - - if (empty($extConfig['transport']['default'])) { - $defaultTransport = null; - foreach ($extConfig['transport'] as $transport => $config) { - if ('default' === $transport) { - continue; - } - - $defaultTransport = $transport; - break; - } - - if (false == $defaultTransport) { - throw new \LogicException('There is no transport configured'); - } - - $extConfig['transport']['default'] = $defaultTransport; - } - - return $extConfig; - } } diff --git a/SimpleClientContainerExtension.php b/SimpleClientContainerExtension.php index fba04d1..6c862a6 100644 --- a/SimpleClientContainerExtension.php +++ b/SimpleClientContainerExtension.php @@ -16,71 +16,40 @@ use Enqueue\Consumption\ChainExtension as ConsumptionChainExtension; use Enqueue\Consumption\QueueConsumer; use Enqueue\Rpc\RpcFactory; -use Enqueue\Symfony\TransportFactoryInterface; +use Enqueue\Symfony\DependencyInjection\TransportFactory; use Symfony\Component\Config\Definition\Builder\TreeBuilder; use Symfony\Component\Config\Definition\NodeInterface; use Symfony\Component\Config\Definition\Processor; -use Symfony\Component\DependencyInjection\Alias; use Symfony\Component\DependencyInjection\ContainerBuilder; use Symfony\Component\DependencyInjection\Extension\Extension; use Symfony\Component\DependencyInjection\Reference; class SimpleClientContainerExtension extends Extension { - /** - * @var TransportFactoryInterface[] - */ - private $factories; - - public function __construct() - { - $this->factories = []; - } - - /** - * {@inheritdoc} - */ - public function getAlias() + public function getAlias(): string { return 'enqueue'; } - /** - * @param TransportFactoryInterface $transportFactory - */ - public function addTransportFactory(TransportFactoryInterface $transportFactory) - { - $name = $transportFactory->getName(); - - if (empty($name)) { - throw new \LogicException('Transport factory name cannot be empty'); - } - if (array_key_exists($name, $this->factories)) { - throw new \LogicException(sprintf('Transport factory with such name already added. Name %s', $name)); - } - - $this->factories[$name] = $transportFactory; - } - - /** - * {@inheritdoc} - */ - public function load(array $configs, ContainerBuilder $container) + public function load(array $configs, ContainerBuilder $container): void { $configProcessor = new Processor(); $config = $configProcessor->process($this->createConfiguration(), $configs); - foreach ($config['transport'] as $name => $transportConfig) { - $this->factories[$name]->createConnectionFactory($container, $transportConfig); - $this->factories[$name]->createContext($container, $transportConfig); - $this->factories[$name]->createDriver($container, $transportConfig); - } + $container->register('enqueue.connection_factory_factory', ConnectionFactoryFactory::class); - $transportConfig = isset($config['transport']['default']['alias']) ? - $config['transport'][$config['transport']['default']['alias']] : - [] + $container->register('enqueue.client.driver_factory', DriverFactory::class) + ->addArgument(new Reference('enqueue.client.config')) + ->addArgument(new Reference('enqueue.client.meta.queue_meta_registry')) ; + $transportFactory = (new TransportFactory('default')); + $transportFactory->createConnectionFactory($container, $config['transport']); + $transportFactory->createContext($container, $config['transport']); + + $driverId = $transportFactory->createDriver($container, $config['transport']); + $container->getDefinition($driverId)->setPublic(true); + $container->register('enqueue.client.config', Config::class) ->setPublic(true) ->setArguments([ @@ -90,33 +59,25 @@ public function load(array $configs, ContainerBuilder $container) $config['client']['router_queue'], $config['client']['default_processor_queue'], 'enqueue.client.router_processor', - $transportConfig, - ]); - - $container->register('enqueue.connection_factory_factory', ConnectionFactoryFactory::class); - - $container->register('enqueue.client.driver_factory', DriverFactory::class) - ->addArgument(new Reference('enqueue.client.config')) - ->addArgument(new Reference('enqueue.client.meta.queue_meta_registry')) + $config['transport'], + ]) ; $container->register('enqueue.client.rpc_factory', RpcFactory::class) ->setPublic(true) ->setArguments([ - new Reference('enqueue.transport.context'), + new Reference('enqueue.transport.default.context'), ]) ; $container->register('enqueue.client.producer', Producer::class) ->setPublic(true) ->setArguments([ - new Reference('enqueue.client.driver'), + new Reference('enqueue.client.default.driver'), new Reference('enqueue.client.rpc_factory'), ]) ; - $container->setAlias('enqueue.client.producer_v2', new Alias('enqueue.client.producer', true)); - $container->register('enqueue.client.meta.topic_meta_registry', TopicMetaRegistry::class) ->setPublic(true) ->setArguments([[]]); @@ -136,14 +97,14 @@ public function load(array $configs, ContainerBuilder $container) $container->register('enqueue.client.queue_consumer', QueueConsumer::class) ->setPublic(true) ->setArguments([ - new Reference('enqueue.transport.context'), + new Reference('enqueue.transport.default.context'), new Reference('enqueue.consumption.extensions'), ]); // router $container->register('enqueue.client.router_processor', RouterProcessor::class) ->setPublic(true) - ->setArguments([new Reference('enqueue.client.driver'), []]); + ->setArguments([new Reference('enqueue.client.default.driver'), []]); $container->getDefinition('enqueue.client.processor_registry') ->addMethodCall('add', ['enqueue.client.router_processor', new Reference('enqueue.client.router_processor')]); $container->getDefinition('enqueue.client.meta.queue_meta_registry') @@ -155,7 +116,7 @@ public function load(array $configs, ContainerBuilder $container) $container->register('enqueue.client.delay_redelivered_message_extension', DelayRedeliveredMessageExtension::class) ->setPublic(true) ->setArguments([ - new Reference('enqueue.client.driver'), + new Reference('enqueue.client.default.driver'), $config['client']['redelivered_delay_time'], ]); @@ -164,7 +125,7 @@ public function load(array $configs, ContainerBuilder $container) $container->register('enqueue.client.extension.set_router_properties', SetRouterPropertiesExtension::class) ->setPublic(true) - ->setArguments([new Reference('enqueue.client.driver')]); + ->setArguments([new Reference('enqueue.client.default.driver')]); $extensions[] = new Reference('enqueue.client.extension.set_router_properties'); @@ -173,32 +134,32 @@ public function load(array $configs, ContainerBuilder $container) ->setArguments([$extensions]); } - /** - * @return NodeInterface - */ - private function createConfiguration() + private function createConfiguration(): NodeInterface { $tb = new TreeBuilder(); $rootNode = $tb->root('enqueue'); - $transportChildren = $rootNode->children() - ->arrayNode('transport')->isRequired()->children(); + $rootNode + ->beforeNormalization() + ->ifEmpty()->then(function () { + return ['transport' => ['dsn' => 'null:']]; + }); - foreach ($this->factories as $factory) { - $factory->addConfiguration( - $transportChildren->arrayNode($factory->getName()) - ); - } + $transportNode = $rootNode->children()->arrayNode('transport'); + (new TransportFactory('default'))->addConfiguration($transportNode); $rootNode->children() - ->arrayNode('client')->children() - ->scalarNode('prefix')->defaultValue('enqueue')->end() - ->scalarNode('app_name')->defaultValue('app')->end() - ->scalarNode('router_topic')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() - ->scalarNode('router_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() - ->scalarNode('default_processor_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() - ->integerNode('redelivered_delay_time')->min(0)->defaultValue(0)->end() - ->end()->end() + ->arrayNode('client') + ->addDefaultsIfNotSet() + ->children() + ->scalarNode('prefix')->defaultValue('enqueue')->end() + ->scalarNode('app_name')->defaultValue('app')->end() + ->scalarNode('router_topic')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() + ->scalarNode('router_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() + ->scalarNode('default_processor_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() + ->integerNode('redelivered_delay_time')->min(0)->defaultValue(0)->end() + ->end() + ->end() ->arrayNode('extensions')->addDefaultsIfNotSet()->children() ->booleanNode('signal_extension')->defaultValue(function_exists('pcntl_signal_dispatch'))->end() ->end()->end() diff --git a/Tests/Functional/SimpleClientTest.php b/Tests/Functional/SimpleClientTest.php index 723ee3a..29216e6 100644 --- a/Tests/Functional/SimpleClientTest.php +++ b/Tests/Functional/SimpleClientTest.php @@ -7,9 +7,8 @@ use Enqueue\Consumption\Extension\LimitConsumptionTimeExtension; use Enqueue\Consumption\Result; use Enqueue\SimpleClient\SimpleClient; -use Enqueue\Test\RabbitManagementExtensionTrait; -use Enqueue\Test\RabbitmqAmqpExtension; use Interop\Queue\PsrMessage; +use Interop\Queue\PurgeQueueNotSupportedException; use PHPUnit\Framework\TestCase; /** @@ -17,82 +16,43 @@ */ class SimpleClientTest extends TestCase { - use RabbitmqAmqpExtension; - use RabbitManagementExtensionTrait; - - public function setUp() - { - if (false == getenv('RABBITMQ_HOST')) { - throw new \PHPUnit_Framework_SkippedTestError('Functional tests are not allowed in this environment'); - } - - $this->removeQueue('enqueue.app.default'); - } - public function transportConfigDataProvider() { - yield 'amqp' => [[ - 'transport' => [ - 'default' => 'amqp', - 'amqp' => [ - 'driver' => 'ext', - 'host' => getenv('RABBITMQ_HOST'), - 'port' => getenv('RABBITMQ_AMQP__PORT'), - 'user' => getenv('RABBITMQ_USER'), - 'pass' => getenv('RABBITMQ_PASSWORD'), - 'vhost' => getenv('RABBITMQ_VHOST'), - ], - ], + yield 'amqp_dsn' => [[ + 'transport' => getenv('AMQP_DSN'), ]]; - yield 'config_as_dsn_string' => [getenv('AMQP_DSN')]; + yield 'dbal_dsn' => [[ + 'transport' => getenv('DOCTRINE_DSN'), + ]]; - yield 'amqp_dsn' => [[ + yield 'rabbitmq_stomp' => [[ 'transport' => [ - 'default' => 'amqp', - 'amqp' => getenv('AMQP_DSN'), + 'dsn' => getenv('RABITMQ_STOMP_DSN'), + 'lazy' => false, + 'management_plugin_installed' => true, ], ]]; - yield 'default_amqp_as_dsn' => [[ + yield 'predis_dsn' => [[ 'transport' => [ - 'default' => getenv('AMQP_DSN'), + 'dsn' => getenv('PREDIS_DSN'), + 'lazy' => false, ], ]]; - yield [[ - 'transport' => [ - 'default' => 'rabbitmq_amqp', - 'rabbitmq_amqp' => [ - 'driver' => 'ext', - 'host' => getenv('RABBITMQ_HOST'), - 'port' => getenv('RABBITMQ_AMQP__PORT'), - 'user' => getenv('RABBITMQ_USER'), - 'pass' => getenv('RABBITMQ_PASSWORD'), - 'vhost' => getenv('RABBITMQ_VHOST'), - ], - ], + yield 'fs_dsn' => [[ + 'transport' => 'file://'.sys_get_temp_dir(), ]]; - yield [[ + yield 'sqs' => [[ 'transport' => [ - 'default' => 'rabbitmq_amqp', - 'rabbitmq_amqp' => [ - 'driver' => 'ext', - 'host' => getenv('RABBITMQ_HOST'), - 'port' => getenv('RABBITMQ_AMQP__PORT'), - 'user' => getenv('RABBITMQ_USER'), - 'pass' => getenv('RABBITMQ_PASSWORD'), - 'vhost' => getenv('RABBITMQ_VHOST'), - ], + 'dsn' => getenv('SQS_DSN'), ], ]]; yield 'mongodb_dsn' => [[ - 'transport' => [ - 'default' => 'mongodb', - 'mongodb' => getenv('MONGO_DSN'), - ], + 'transport' => getenv('MONGO_DSN'), ]]; } @@ -101,11 +61,20 @@ public function transportConfigDataProvider() * * @param mixed $config */ - public function testProduceAndConsumeOneMessage($config) + public function testProduceAndConsumeOneMessage(array $config) { $actualMessage = null; + $config['client'] = [ + 'prefix' => str_replace('.', '', uniqid('enqueue', true)), + 'app_name' => 'simple_client', + 'router_topic' => 'test', + 'router_queue' => 'test', + 'default_processor_queue' => 'test', + ]; + $client = new SimpleClient($config); + $client->bind('foo_topic', 'foo_processor', function (PsrMessage $message) use (&$actualMessage) { $actualMessage = $message; @@ -113,11 +82,12 @@ public function testProduceAndConsumeOneMessage($config) }); $client->setupBroker(); + $this->purgeQueue($client); $client->sendEvent('foo_topic', 'Hello there!'); $client->consume(new ChainExtension([ - new LimitConsumptionTimeExtension(new \DateTime('+5sec')), + new LimitConsumptionTimeExtension(new \DateTime('+30sec')), new LimitConsumedMessagesExtension(2), ])); @@ -134,7 +104,16 @@ public function testProduceAndRouteToTwoConsumes($config) { $received = 0; + $config['client'] = [ + 'prefix' => str_replace('.', '', uniqid('enqueue', true)), + 'app_name' => 'simple_client', + 'router_topic' => 'test', + 'router_queue' => 'test', + 'default_processor_queue' => 'test', + ]; + $client = new SimpleClient($config); + $client->bind('foo_topic', 'foo_processor1', function () use (&$received) { ++$received; @@ -147,14 +126,27 @@ public function testProduceAndRouteToTwoConsumes($config) }); $client->setupBroker(); + $this->purgeQueue($client); $client->sendEvent('foo_topic', 'Hello there!'); $client->consume(new ChainExtension([ - new LimitConsumptionTimeExtension(new \DateTime('+5sec')), + new LimitConsumptionTimeExtension(new \DateTime('+2sec')), new LimitConsumedMessagesExtension(3), ])); $this->assertSame(2, $received); } + + protected function purgeQueue(SimpleClient $client): void + { + $driver = $client->getDriver(); + + $queue = $driver->createQueue($driver->getConfig()->getDefaultProcessorQueueName()); + + try { + $client->getContext()->purgeQueue($queue); + } catch (PurgeQueueNotSupportedException $e) { + } + } } diff --git a/Tests/SimpleClientTest.php b/Tests/SimpleClientTest.php deleted file mode 100644 index 3ff8bad..0000000 --- a/Tests/SimpleClientTest.php +++ /dev/null @@ -1,117 +0,0 @@ -removeQueue('enqueue.app.default'); - } - - public function transportConfigDataProvider() - { - yield 'amqp' => [[ - 'transport' => [ - 'default' => 'amqp', - 'amqp' => [ - 'driver' => 'ext', - 'host' => getenv('RABBITMQ_HOST'), - 'port' => getenv('RABBITMQ_AMQP__PORT'), - 'user' => getenv('RABBITMQ_USER'), - 'pass' => getenv('RABBITMQ_PASSWORD'), - 'vhost' => getenv('RABBITMQ_VHOST'), - ], - ], - ]]; - - yield 'config_as_dsn_string' => [getenv('AMQP_DSN')]; - - yield 'config_as_dsn_without_host' => ['amqp:?lazy=1']; - - yield 'amqp_dsn' => [[ - 'transport' => [ - 'default' => 'amqp', - 'amqp' => getenv('AMQP_DSN'), - ], - ]]; - - yield 'default_amqp_as_dsn' => [[ - 'transport' => [ - 'default' => getenv('AMQP_DSN'), - ], - ]]; - - yield [[ - 'transport' => [ - 'default' => 'rabbitmq_amqp', - 'rabbitmq_amqp' => [ - 'driver' => 'ext', - 'host' => getenv('RABBITMQ_HOST'), - 'port' => getenv('RABBITMQ_AMQP__PORT'), - 'user' => getenv('RABBITMQ_USER'), - 'pass' => getenv('RABBITMQ_PASSWORD'), - 'vhost' => getenv('RABBITMQ_VHOST'), - ], - ], - ]]; - - yield [[ - 'transport' => [ - 'default' => 'rabbitmq_amqp', - 'rabbitmq_amqp' => [ - 'driver' => 'ext', - 'host' => getenv('RABBITMQ_HOST'), - 'port' => getenv('RABBITMQ_AMQP__PORT'), - 'user' => getenv('RABBITMQ_USER'), - 'pass' => getenv('RABBITMQ_PASSWORD'), - 'vhost' => getenv('RABBITMQ_VHOST'), - ], - ], - ]]; - - yield 'mongodb_dsn' => [[ - 'transport' => [ - 'default' => 'mongodb', - 'mongodb' => getenv('MONGO_DSN'), - ], - ]]; - } - - /** - * @dataProvider transportConfigDataProvider - * - * @param mixed $config - */ - public function testProduceAndConsumeOneMessage($config) - { - $actualMessage = null; - - $client = new SimpleClient($config); - $client->bind('foo_topic', 'foo_processor', function (PsrMessage $message) use (&$actualMessage) { - $actualMessage = $message; - - return Result::ACK; - }); - - $this->assertInstanceOf(PsrContext::class, $client->getContext()); - } -} From 47d65bc9613c39ad8cebf2c664f50a99268f7c8e Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Fri, 21 Sep 2018 12:46:49 +0300 Subject: [PATCH 12/56] upd simple client. --- SimpleClient.php | 34 +++++++++-- SimpleClientContainerExtension.php | 8 ++- Tests/Functional/SimpleClientTest.php | 86 +++++++++++++-------------- 3 files changed, 79 insertions(+), 49 deletions(-) diff --git a/SimpleClient.php b/SimpleClient.php index 18f1887..719631a 100644 --- a/SimpleClient.php +++ b/SimpleClient.php @@ -11,6 +11,8 @@ use Enqueue\Client\Meta\TopicMetaRegistry; use Enqueue\Client\ProcessorRegistryInterface; use Enqueue\Client\ProducerInterface; +use Enqueue\Client\Route; +use Enqueue\Client\RouteCollection; use Enqueue\Client\RouterProcessor; use Enqueue\Consumption\CallbackProcessor; use Enqueue\Consumption\ExtensionInterface; @@ -90,7 +92,7 @@ public function __construct($config, ContainerBuilder $container = null) /** * @param callable|PsrProcessor $processor */ - public function bind(string $topic, string $processorName, $processor): void + public function bindTopic(string $topic, $processor, string $processorName = null): void { if (is_callable($processor)) { $processor = new CallbackProcessor($processor); @@ -100,11 +102,28 @@ public function bind(string $topic, string $processorName, $processor): void throw new \LogicException('The processor must be either callable or instance of PsrProcessor'); } - $queueName = $this->getConfig()->getDefaultProcessorQueueName(); + $processorName = $processorName ?: uniqid(get_class($processor)); - $this->getRouterProcessor()->add($topic, $queueName, $processorName); - $this->getTopicMetaRegistry()->addProcessor($topic, $processorName); - $this->getQueueMetaRegistry()->addProcessor($queueName, $processorName); + $this->getRouteCollection()->add(new Route($topic, Route::TOPIC, $processorName)); + $this->getProcessorRegistry()->add($processorName, $processor); + } + + /** + * @param callable|PsrProcessor $processor + */ + public function bindCommand(string $command, $processor, string $processorName = null): void + { + if (is_callable($processor)) { + $processor = new CallbackProcessor($processor); + } + + if (false == $processor instanceof PsrProcessor) { + throw new \LogicException('The processor must be either callable or instance of PsrProcessor'); + } + + $processorName = $processorName ?: uniqid(get_class($processor)); + + $this->getRouteCollection()->add(new Route($command, Route::COMMAND, $processorName)); $this->getProcessorRegistry()->add($processorName, $processor); } @@ -204,6 +223,11 @@ public function getRouterProcessor(): RouterProcessor return $this->container->get('enqueue.client.router_processor'); } + private function getRouteCollection(): RouteCollection + { + return $this->container->get('enqueue.client.route_collection'); + } + private function buildContainer($config, ContainerBuilder $container): ContainerInterface { $extension = new SimpleClientContainerExtension(); diff --git a/SimpleClientContainerExtension.php b/SimpleClientContainerExtension.php index 6c862a6..ec5146c 100644 --- a/SimpleClientContainerExtension.php +++ b/SimpleClientContainerExtension.php @@ -11,6 +11,7 @@ use Enqueue\Client\Meta\QueueMetaRegistry; use Enqueue\Client\Meta\TopicMetaRegistry; use Enqueue\Client\Producer; +use Enqueue\Client\RouteCollection; use Enqueue\Client\RouterProcessor; use Enqueue\ConnectionFactoryFactory; use Enqueue\Consumption\ChainExtension as ConsumptionChainExtension; @@ -40,7 +41,7 @@ public function load(array $configs, ContainerBuilder $container): void $container->register('enqueue.client.driver_factory', DriverFactory::class) ->addArgument(new Reference('enqueue.client.config')) - ->addArgument(new Reference('enqueue.client.meta.queue_meta_registry')) + ->addArgument(new Reference('enqueue.client.route_collection')) ; $transportFactory = (new TransportFactory('default')); @@ -63,6 +64,11 @@ public function load(array $configs, ContainerBuilder $container): void ]) ; + $container->register('enqueue.client.route_collection', RouteCollection::class) + ->setPublic(true) + ->addArgument([]) + ; + $container->register('enqueue.client.rpc_factory', RpcFactory::class) ->setPublic(true) ->setArguments([ diff --git a/Tests/Functional/SimpleClientTest.php b/Tests/Functional/SimpleClientTest.php index 29216e6..b5615ac 100644 --- a/Tests/Functional/SimpleClientTest.php +++ b/Tests/Functional/SimpleClientTest.php @@ -75,7 +75,7 @@ public function testProduceAndConsumeOneMessage(array $config) $client = new SimpleClient($config); - $client->bind('foo_topic', 'foo_processor', function (PsrMessage $message) use (&$actualMessage) { + $client->bindTopic('foo_topic', function (PsrMessage $message) use (&$actualMessage) { $actualMessage = $message; return Result::ACK; @@ -95,48 +95,48 @@ public function testProduceAndConsumeOneMessage(array $config) $this->assertSame('Hello there!', $actualMessage->getBody()); } - /** - * @dataProvider transportConfigDataProvider - * - * @param mixed $config - */ - public function testProduceAndRouteToTwoConsumes($config) - { - $received = 0; - - $config['client'] = [ - 'prefix' => str_replace('.', '', uniqid('enqueue', true)), - 'app_name' => 'simple_client', - 'router_topic' => 'test', - 'router_queue' => 'test', - 'default_processor_queue' => 'test', - ]; - - $client = new SimpleClient($config); - - $client->bind('foo_topic', 'foo_processor1', function () use (&$received) { - ++$received; - - return Result::ACK; - }); - $client->bind('foo_topic', 'foo_processor2', function () use (&$received) { - ++$received; - - return Result::ACK; - }); - - $client->setupBroker(); - $this->purgeQueue($client); - - $client->sendEvent('foo_topic', 'Hello there!'); - - $client->consume(new ChainExtension([ - new LimitConsumptionTimeExtension(new \DateTime('+2sec')), - new LimitConsumedMessagesExtension(3), - ])); - - $this->assertSame(2, $received); - } +// /** +// * @dataProvider transportConfigDataProvider +// * +// * @param mixed $config +// */ +// public function testProduceAndRouteToTwoConsumes($config) +// { +// $received = 0; +// +// $config['client'] = [ +// 'prefix' => str_replace('.', '', uniqid('enqueue', true)), +// 'app_name' => 'simple_client', +// 'router_topic' => 'test', +// 'router_queue' => 'test', +// 'default_processor_queue' => 'test', +// ]; +// +// $client = new SimpleClient($config); +// +// $client->bindTopic('foo_topic', function () use (&$received) { +// ++$received; +// +// return Result::ACK; +// }); +// $client->bindTopic('foo_topic', function () use (&$received) { +// ++$received; +// +// return Result::ACK; +// }); +// +// $client->setupBroker(); +// $this->purgeQueue($client); +// +// $client->sendEvent('foo_topic', 'Hello there!'); +// +// $client->consume(new ChainExtension([ +// new LimitConsumptionTimeExtension(new \DateTime('+2sec')), +// new LimitConsumedMessagesExtension(3), +// ])); +// +// $this->assertSame(2, $received); +// } protected function purgeQueue(SimpleClient $client): void { From c77abb49cee36f1442798d07b02ea79558eaf1eb Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Fri, 21 Sep 2018 23:22:37 +0300 Subject: [PATCH 13/56] [client] migrate simple client to new concept. adopt route collection. --- SimpleClient.php | 39 ++++--- SimpleClientContainerExtension.php | 12 --- Tests/Functional/SimpleClientTest.php | 141 ++++++++++++++++---------- 3 files changed, 107 insertions(+), 85 deletions(-) diff --git a/SimpleClient.php b/SimpleClient.php index 719631a..7112ad3 100644 --- a/SimpleClient.php +++ b/SimpleClient.php @@ -7,8 +7,6 @@ use Enqueue\Client\DelegateProcessor; use Enqueue\Client\DriverInterface; use Enqueue\Client\Message; -use Enqueue\Client\Meta\QueueMetaRegistry; -use Enqueue\Client\Meta\TopicMetaRegistry; use Enqueue\Client\ProcessorRegistryInterface; use Enqueue\Client\ProducerInterface; use Enqueue\Client\Route; @@ -146,21 +144,28 @@ public function sendEvent(string $topic, $message): void public function consume(ExtensionInterface $runtimeExtension = null): void { $this->setupBroker(); + $processor = $this->getDelegateProcessor(); - $queueConsumer = $this->getQueueConsumer(); + $consumer = $this->getQueueConsumer(); + + $boundQueues = []; + + $routerQueue = $this->getDriver()->createQueue($this->getConfig()->getRouterQueueName()); + $consumer->bind($routerQueue, $processor); + $boundQueues[$routerQueue->getQueueName()] = true; - $defaultQueueName = $this->getConfig()->getDefaultProcessorQueueName(); - $defaultTransportQueueName = $this->getDriver()->createQueue($defaultQueueName); - $queueConsumer->bind($defaultTransportQueueName, $processor); + foreach ($this->getRouteCollection()->all() as $route) { + $queue = $this->getDriver()->createRouteQueue($route); + if (array_key_exists($queue->getQueueName(), $boundQueues)) { + continue; + } - $routerQueueName = $this->getConfig()->getRouterQueueName(); - if ($routerQueueName != $defaultQueueName) { - $routerTransportQueueName = $this->getDriver()->createQueue($routerQueueName); + $consumer->bind($queue, $processor); - $queueConsumer->bind($routerTransportQueueName, $processor); + $boundQueues[$queue->getQueueName()] = true; } - $queueConsumer->consume($runtimeExtension); + $consumer->consume($runtimeExtension); } public function getContext(): PsrContext @@ -183,16 +188,6 @@ public function getDriver(): DriverInterface return $this->container->get('enqueue.client.default.driver'); } - public function getTopicMetaRegistry(): TopicMetaRegistry - { - return $this->container->get('enqueue.client.meta.topic_meta_registry'); - } - - public function getQueueMetaRegistry(): QueueMetaRegistry - { - return $this->container->get('enqueue.client.meta.queue_meta_registry'); - } - public function getProducer(bool $setupBroker = false): ProducerInterface { $setupBroker && $this->setupBroker(); @@ -223,7 +218,7 @@ public function getRouterProcessor(): RouterProcessor return $this->container->get('enqueue.client.router_processor'); } - private function getRouteCollection(): RouteCollection + public function getRouteCollection(): RouteCollection { return $this->container->get('enqueue.client.route_collection'); } diff --git a/SimpleClientContainerExtension.php b/SimpleClientContainerExtension.php index ec5146c..6ca86b8 100644 --- a/SimpleClientContainerExtension.php +++ b/SimpleClientContainerExtension.php @@ -8,8 +8,6 @@ use Enqueue\Client\ConsumptionExtension\SetRouterPropertiesExtension; use Enqueue\Client\DelegateProcessor; use Enqueue\Client\DriverFactory; -use Enqueue\Client\Meta\QueueMetaRegistry; -use Enqueue\Client\Meta\TopicMetaRegistry; use Enqueue\Client\Producer; use Enqueue\Client\RouteCollection; use Enqueue\Client\RouterProcessor; @@ -84,14 +82,6 @@ public function load(array $configs, ContainerBuilder $container): void ]) ; - $container->register('enqueue.client.meta.topic_meta_registry', TopicMetaRegistry::class) - ->setPublic(true) - ->setArguments([[]]); - - $container->register('enqueue.client.meta.queue_meta_registry', QueueMetaRegistry::class) - ->setPublic(true) - ->setArguments([new Reference('enqueue.client.config'), []]); - $container->register('enqueue.client.processor_registry', ArrayProcessorRegistry::class) ->setPublic(true) ; @@ -113,8 +103,6 @@ public function load(array $configs, ContainerBuilder $container): void ->setArguments([new Reference('enqueue.client.default.driver'), []]); $container->getDefinition('enqueue.client.processor_registry') ->addMethodCall('add', ['enqueue.client.router_processor', new Reference('enqueue.client.router_processor')]); - $container->getDefinition('enqueue.client.meta.queue_meta_registry') - ->addMethodCall('addProcessor', [$config['client']['router_queue'], 'enqueue.client.router_processor']); // extensions $extensions = []; diff --git a/Tests/Functional/SimpleClientTest.php b/Tests/Functional/SimpleClientTest.php index b5615ac..25fdc43 100644 --- a/Tests/Functional/SimpleClientTest.php +++ b/Tests/Functional/SimpleClientTest.php @@ -20,11 +20,11 @@ public function transportConfigDataProvider() { yield 'amqp_dsn' => [[ 'transport' => getenv('AMQP_DSN'), - ]]; + ], '+1sec']; yield 'dbal_dsn' => [[ 'transport' => getenv('DOCTRINE_DSN'), - ]]; + ], '+1sec']; yield 'rabbitmq_stomp' => [[ 'transport' => [ @@ -32,28 +32,28 @@ public function transportConfigDataProvider() 'lazy' => false, 'management_plugin_installed' => true, ], - ]]; + ], '+1sec']; yield 'predis_dsn' => [[ 'transport' => [ 'dsn' => getenv('PREDIS_DSN'), 'lazy' => false, ], - ]]; + ], '+1sec']; yield 'fs_dsn' => [[ 'transport' => 'file://'.sys_get_temp_dir(), - ]]; + ], '+1sec']; yield 'sqs' => [[ 'transport' => [ 'dsn' => getenv('SQS_DSN'), ], - ]]; + ], '+1sec']; yield 'mongodb_dsn' => [[ 'transport' => getenv('MONGO_DSN'), - ]]; + ], '+1sec']; } /** @@ -61,7 +61,7 @@ public function transportConfigDataProvider() * * @param mixed $config */ - public function testProduceAndConsumeOneMessage(array $config) + public function testSendEventWithOneSubscriber(array $config, string $timeLimit) { $actualMessage = null; @@ -86,8 +86,9 @@ public function testProduceAndConsumeOneMessage(array $config) $client->sendEvent('foo_topic', 'Hello there!'); + $client->getQueueConsumer()->setReceiveTimeout(200); $client->consume(new ChainExtension([ - new LimitConsumptionTimeExtension(new \DateTime('+30sec')), + new LimitConsumptionTimeExtension(new \DateTime($timeLimit)), new LimitConsumedMessagesExtension(2), ])); @@ -95,48 +96,86 @@ public function testProduceAndConsumeOneMessage(array $config) $this->assertSame('Hello there!', $actualMessage->getBody()); } -// /** -// * @dataProvider transportConfigDataProvider -// * -// * @param mixed $config -// */ -// public function testProduceAndRouteToTwoConsumes($config) -// { -// $received = 0; -// -// $config['client'] = [ -// 'prefix' => str_replace('.', '', uniqid('enqueue', true)), -// 'app_name' => 'simple_client', -// 'router_topic' => 'test', -// 'router_queue' => 'test', -// 'default_processor_queue' => 'test', -// ]; -// -// $client = new SimpleClient($config); -// -// $client->bindTopic('foo_topic', function () use (&$received) { -// ++$received; -// -// return Result::ACK; -// }); -// $client->bindTopic('foo_topic', function () use (&$received) { -// ++$received; -// -// return Result::ACK; -// }); -// -// $client->setupBroker(); -// $this->purgeQueue($client); -// -// $client->sendEvent('foo_topic', 'Hello there!'); -// -// $client->consume(new ChainExtension([ -// new LimitConsumptionTimeExtension(new \DateTime('+2sec')), -// new LimitConsumedMessagesExtension(3), -// ])); -// -// $this->assertSame(2, $received); -// } + /** + * @dataProvider transportConfigDataProvider + * + * @param mixed $config + */ + public function testSendEventWithTwoSubscriber(array $config, string $timeLimit) + { + $received = 0; + + $config['client'] = [ + 'prefix' => str_replace('.', '', uniqid('enqueue', true)), + 'app_name' => 'simple_client', + 'router_topic' => 'test', + 'router_queue' => 'test', + 'default_processor_queue' => 'test', + ]; + + $client = new SimpleClient($config); + + $client->bindTopic('foo_topic', function () use (&$received) { + ++$received; + + return Result::ACK; + }); + $client->bindTopic('foo_topic', function () use (&$received) { + ++$received; + + return Result::ACK; + }); + + $client->setupBroker(); + $this->purgeQueue($client); + + $client->sendEvent('foo_topic', 'Hello there!'); + $client->getQueueConsumer()->setReceiveTimeout(200); + $client->consume(new ChainExtension([ + new LimitConsumptionTimeExtension(new \DateTime($timeLimit)), + new LimitConsumedMessagesExtension(3), + ])); + + $this->assertSame(2, $received); + } + + /** + * @dataProvider transportConfigDataProvider + * + * @param mixed $config + */ + public function testSendCommand(array $config, string $timeLimit) + { + $received = 0; + + $config['client'] = [ + 'prefix' => str_replace('.', '', uniqid('enqueue', true)), + 'app_name' => 'simple_client', + 'router_topic' => 'test', + 'router_queue' => 'test', + 'default_processor_queue' => 'test', + ]; + + $client = new SimpleClient($config); + + $client->bindCommand('foo_command', function () use (&$received) { + ++$received; + + return Result::ACK; + }); + + $client->setupBroker(); + $this->purgeQueue($client); + + $client->sendCommand('foo_command', 'Hello there!'); + $client->getQueueConsumer()->setReceiveTimeout(200); + $client->consume(new ChainExtension([ + new LimitConsumptionTimeExtension(new \DateTime($timeLimit)), + new LimitConsumedMessagesExtension(1), + ])); + + $this->assertSame(1, $received); + } protected function purgeQueue(SimpleClient $client): void { From a9adba1c9fbf96cf3646ceca1e7eb1e1ffa2dc15 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 27 Sep 2018 14:58:22 +0300 Subject: [PATCH 14/56] [simple-client] Do not use Symfony Container --- SimpleClient.php | 207 ++++++++++++++++++-------- SimpleClientContainerExtension.php | 164 -------------------- Tests/Functional/SimpleClientTest.php | 2 +- composer.json | 4 +- 4 files changed, 144 insertions(+), 233 deletions(-) delete mode 100644 SimpleClientContainerExtension.php diff --git a/SimpleClient.php b/SimpleClient.php index 7112ad3..bdcd9a6 100644 --- a/SimpleClient.php +++ b/SimpleClient.php @@ -3,35 +3,59 @@ namespace Enqueue\SimpleClient; use Enqueue\Client\ArrayProcessorRegistry; +use Enqueue\Client\ChainExtension as ClientChainExtensions; use Enqueue\Client\Config; +use Enqueue\Client\ConsumptionExtension\DelayRedeliveredMessageExtension; +use Enqueue\Client\ConsumptionExtension\SetRouterPropertiesExtension; use Enqueue\Client\DelegateProcessor; +use Enqueue\Client\DriverFactory; use Enqueue\Client\DriverInterface; use Enqueue\Client\Message; -use Enqueue\Client\ProcessorRegistryInterface; +use Enqueue\Client\Producer; use Enqueue\Client\ProducerInterface; use Enqueue\Client\Route; use Enqueue\Client\RouteCollection; use Enqueue\Client\RouterProcessor; +use Enqueue\ConnectionFactoryFactory; use Enqueue\Consumption\CallbackProcessor; +use Enqueue\Consumption\ChainExtension as ConsumptionChainExtension; use Enqueue\Consumption\ExtensionInterface; +use Enqueue\Consumption\QueueConsumer; use Enqueue\Consumption\QueueConsumerInterface; use Enqueue\Rpc\Promise; -use Interop\Queue\PsrContext; +use Enqueue\Rpc\RpcFactory; +use Enqueue\Symfony\DependencyInjection\TransportFactory; use Interop\Queue\PsrProcessor; -use Symfony\Component\DependencyInjection\ContainerBuilder; -use Symfony\Component\DependencyInjection\ContainerInterface; +use Symfony\Component\Config\Definition\Builder\TreeBuilder; +use Symfony\Component\Config\Definition\NodeInterface; +use Symfony\Component\Config\Definition\Processor; final class SimpleClient { /** - * @var ContainerInterface + * @var DriverInterface */ - private $container; + private $driver; /** - * @var array|string + * @var Producer */ - private $config; + private $producer; + + /** + * @var QueueConsumer + */ + private $queueConsumer; + + /** + * @var ArrayProcessorRegistry + */ + private $processorRegistry; + + /** + * @var DelegateProcessor + */ + private $delegateProcessor; /** * The config could be a transport DSN (string) or an array, here's an example of a few DSNs:. @@ -78,13 +102,11 @@ final class SimpleClient * ] * * - * @param string|array $config - * @param ContainerBuilder|null $container + * @param string|array $config */ - public function __construct($config, ContainerBuilder $container = null) + public function __construct($config) { - $this->container = $this->buildContainer($config, $container ?: new ContainerBuilder()); - $this->config = $config; + $this->build(['enqueue' => $config]); } /** @@ -102,8 +124,8 @@ public function bindTopic(string $topic, $processor, string $processorName = nul $processorName = $processorName ?: uniqid(get_class($processor)); - $this->getRouteCollection()->add(new Route($topic, Route::TOPIC, $processorName)); - $this->getProcessorRegistry()->add($processorName, $processor); + $this->driver->getRouteCollection()->add(new Route($topic, Route::TOPIC, $processorName)); + $this->processorRegistry->add($processorName, $processor); } /** @@ -121,8 +143,8 @@ public function bindCommand(string $command, $processor, string $processorName = $processorName = $processorName ?: uniqid(get_class($processor)); - $this->getRouteCollection()->add(new Route($command, Route::COMMAND, $processorName)); - $this->getProcessorRegistry()->add($processorName, $processor); + $this->driver->getRouteCollection()->add(new Route($command, Route::COMMAND, $processorName)); + $this->processorRegistry->add($processorName, $processor); } /** @@ -130,7 +152,7 @@ public function bindCommand(string $command, $processor, string $processorName = */ public function sendCommand(string $command, $message, bool $needReply = false): ?Promise { - return $this->getProducer()->sendCommand($command, $message, $needReply); + return $this->producer->sendCommand($command, $message, $needReply); } /** @@ -138,61 +160,48 @@ public function sendCommand(string $command, $message, bool $needReply = false): */ public function sendEvent(string $topic, $message): void { - $this->getProducer()->sendEvent($topic, $message); + $this->producer->sendEvent($topic, $message); } public function consume(ExtensionInterface $runtimeExtension = null): void { $this->setupBroker(); - $processor = $this->getDelegateProcessor(); - $consumer = $this->getQueueConsumer(); - $boundQueues = []; - $routerQueue = $this->getDriver()->createQueue($this->getConfig()->getRouterQueueName()); - $consumer->bind($routerQueue, $processor); + $routerQueue = $this->getDriver()->createQueue($this->getDriver()->getConfig()->getRouterQueueName()); + $this->queueConsumer->bind($routerQueue, $this->delegateProcessor); $boundQueues[$routerQueue->getQueueName()] = true; - foreach ($this->getRouteCollection()->all() as $route) { + foreach ($this->driver->getRouteCollection()->all() as $route) { $queue = $this->getDriver()->createRouteQueue($route); if (array_key_exists($queue->getQueueName(), $boundQueues)) { continue; } - $consumer->bind($queue, $processor); + $this->queueConsumer->bind($queue, $this->delegateProcessor); $boundQueues[$queue->getQueueName()] = true; } - $consumer->consume($runtimeExtension); - } - - public function getContext(): PsrContext - { - return $this->container->get('enqueue.transport.context'); + $this->queueConsumer->consume($runtimeExtension); } public function getQueueConsumer(): QueueConsumerInterface { - return $this->container->get('enqueue.client.queue_consumer'); - } - - public function getConfig(): Config - { - return $this->container->get('enqueue.client.config'); + return $this->queueConsumer; } public function getDriver(): DriverInterface { - return $this->container->get('enqueue.client.default.driver'); + return $this->driver; } public function getProducer(bool $setupBroker = false): ProducerInterface { $setupBroker && $this->setupBroker(); - return $this->container->get('enqueue.client.producer'); + return $this->producer; } public function setupBroker(): void @@ -200,37 +209,105 @@ public function setupBroker(): void $this->getDriver()->setupBroker(); } - /** - * @return ArrayProcessorRegistry - */ - public function getProcessorRegistry(): ProcessorRegistryInterface + public function build(array $configs): void { - return $this->container->get('enqueue.client.processor_registry'); - } + $configProcessor = new Processor(); + $simpleClientConfig = $configProcessor->process($this->createConfiguration(), $configs); - public function getDelegateProcessor(): DelegateProcessor - { - return $this->container->get('enqueue.client.delegate_processor'); - } + if (isset($simpleClientConfig['transport']['factory_service'])) { + throw new \LogicException('transport.factory_service option is not supported by simple client'); + } + if (isset($simpleClientConfig['transport']['factory_class'])) { + throw new \LogicException('transport.factory_class option is not supported by simple client'); + } + if (isset($simpleClientConfig['transport']['connection_factory_class'])) { + throw new \LogicException('transport.connection_factory_class option is not supported by simple client'); + } - public function getRouterProcessor(): RouterProcessor - { - return $this->container->get('enqueue.client.router_processor'); - } + $connectionFactoryFactory = new ConnectionFactoryFactory(); + $connection = $connectionFactoryFactory->create($simpleClientConfig['transport']); - public function getRouteCollection(): RouteCollection - { - return $this->container->get('enqueue.client.route_collection'); - } + $clientExtensions = new ClientChainExtensions([]); - private function buildContainer($config, ContainerBuilder $container): ContainerInterface - { - $extension = new SimpleClientContainerExtension(); - $container->registerExtension($extension); - $container->loadFromExtension($extension->getAlias(), $config); + $config = new Config( + $simpleClientConfig['client']['prefix'], + $simpleClientConfig['client']['app_name'], + $simpleClientConfig['client']['router_topic'], + $simpleClientConfig['client']['router_queue'], + $simpleClientConfig['client']['default_processor_queue'], + 'enqueue.client.router_processor', + $simpleClientConfig['transport'] + ); + $routeCollection = new RouteCollection([]); + $driverFactory = new DriverFactory($config, $routeCollection); + + $driver = $driverFactory->create( + $connection, + $simpleClientConfig['transport']['dsn'], + $simpleClientConfig['transport'] + ); + + $rpcFactory = new RpcFactory($driver->getContext()); + + $producer = new Producer($driver, $rpcFactory, $clientExtensions); - $container->compile(); + $processorRegistry = new ArrayProcessorRegistry([]); - return $container; + $delegateProcessor = new DelegateProcessor($processorRegistry); + + // consumption extensions + $consumptionExtensions = []; + if ($simpleClientConfig['client']['redelivered_delay_time']) { + $consumptionExtensions[] = new DelayRedeliveredMessageExtension($driver, $simpleClientConfig['client']['redelivered_delay_time']); + } + + $consumptionExtensions[] = new SetRouterPropertiesExtension($driver); + + $consumptionChainExtension = new ConsumptionChainExtension($consumptionExtensions); + $queueConsumer = new QueueConsumer($driver->getContext(), $consumptionChainExtension); + + $routerProcessor = new RouterProcessor($driver); + + $processorRegistry->add($config->getRouterProcessorName(), $routerProcessor); + + $this->driver = $driver; + $this->producer = $producer; + $this->queueConsumer = $queueConsumer; + $this->delegateProcessor = $delegateProcessor; + $this->processorRegistry = $processorRegistry; + } + + private function createConfiguration(): NodeInterface + { + $tb = new TreeBuilder(); + $rootNode = $tb->root('enqueue'); + + $rootNode + ->beforeNormalization() + ->ifEmpty()->then(function () { + return ['transport' => ['dsn' => 'null:']]; + }); + + $transportNode = $rootNode->children()->arrayNode('transport'); + (new TransportFactory('default'))->addConfiguration($transportNode); + + $rootNode->children() + ->arrayNode('client') + ->addDefaultsIfNotSet() + ->children() + ->scalarNode('prefix')->defaultValue('enqueue')->end() + ->scalarNode('app_name')->defaultValue('app')->end() + ->scalarNode('router_topic')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() + ->scalarNode('router_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() + ->scalarNode('default_processor_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() + ->integerNode('redelivered_delay_time')->min(0)->defaultValue(0)->end() + ->end() + ->end() + ->arrayNode('extensions')->addDefaultsIfNotSet()->children() + ->booleanNode('signal_extension')->defaultValue(function_exists('pcntl_signal_dispatch'))->end() + ->end()->end() + ; + + return $tb->buildTree(); } } diff --git a/SimpleClientContainerExtension.php b/SimpleClientContainerExtension.php deleted file mode 100644 index 6ca86b8..0000000 --- a/SimpleClientContainerExtension.php +++ /dev/null @@ -1,164 +0,0 @@ -process($this->createConfiguration(), $configs); - - $container->register('enqueue.connection_factory_factory', ConnectionFactoryFactory::class); - - $container->register('enqueue.client.driver_factory', DriverFactory::class) - ->addArgument(new Reference('enqueue.client.config')) - ->addArgument(new Reference('enqueue.client.route_collection')) - ; - - $transportFactory = (new TransportFactory('default')); - $transportFactory->createConnectionFactory($container, $config['transport']); - $transportFactory->createContext($container, $config['transport']); - - $driverId = $transportFactory->createDriver($container, $config['transport']); - $container->getDefinition($driverId)->setPublic(true); - - $container->register('enqueue.client.config', Config::class) - ->setPublic(true) - ->setArguments([ - $config['client']['prefix'], - $config['client']['app_name'], - $config['client']['router_topic'], - $config['client']['router_queue'], - $config['client']['default_processor_queue'], - 'enqueue.client.router_processor', - $config['transport'], - ]) - ; - - $container->register('enqueue.client.route_collection', RouteCollection::class) - ->setPublic(true) - ->addArgument([]) - ; - - $container->register('enqueue.client.rpc_factory', RpcFactory::class) - ->setPublic(true) - ->setArguments([ - new Reference('enqueue.transport.default.context'), - ]) - ; - - $container->register('enqueue.client.producer', Producer::class) - ->setPublic(true) - ->setArguments([ - new Reference('enqueue.client.default.driver'), - new Reference('enqueue.client.rpc_factory'), - ]) - ; - - $container->register('enqueue.client.processor_registry', ArrayProcessorRegistry::class) - ->setPublic(true) - ; - - $container->register('enqueue.client.delegate_processor', DelegateProcessor::class) - ->setPublic(true) - ->setArguments([new Reference('enqueue.client.processor_registry')]); - - $container->register('enqueue.client.queue_consumer', QueueConsumer::class) - ->setPublic(true) - ->setArguments([ - new Reference('enqueue.transport.default.context'), - new Reference('enqueue.consumption.extensions'), - ]); - - // router - $container->register('enqueue.client.router_processor', RouterProcessor::class) - ->setPublic(true) - ->setArguments([new Reference('enqueue.client.default.driver'), []]); - $container->getDefinition('enqueue.client.processor_registry') - ->addMethodCall('add', ['enqueue.client.router_processor', new Reference('enqueue.client.router_processor')]); - - // extensions - $extensions = []; - if ($config['client']['redelivered_delay_time']) { - $container->register('enqueue.client.delay_redelivered_message_extension', DelayRedeliveredMessageExtension::class) - ->setPublic(true) - ->setArguments([ - new Reference('enqueue.client.default.driver'), - $config['client']['redelivered_delay_time'], - ]); - - $extensions[] = new Reference('enqueue.client.delay_redelivered_message_extension'); - } - - $container->register('enqueue.client.extension.set_router_properties', SetRouterPropertiesExtension::class) - ->setPublic(true) - ->setArguments([new Reference('enqueue.client.default.driver')]); - - $extensions[] = new Reference('enqueue.client.extension.set_router_properties'); - - $container->register('enqueue.consumption.extensions', ConsumptionChainExtension::class) - ->setPublic(true) - ->setArguments([$extensions]); - } - - private function createConfiguration(): NodeInterface - { - $tb = new TreeBuilder(); - $rootNode = $tb->root('enqueue'); - - $rootNode - ->beforeNormalization() - ->ifEmpty()->then(function () { - return ['transport' => ['dsn' => 'null:']]; - }); - - $transportNode = $rootNode->children()->arrayNode('transport'); - (new TransportFactory('default'))->addConfiguration($transportNode); - - $rootNode->children() - ->arrayNode('client') - ->addDefaultsIfNotSet() - ->children() - ->scalarNode('prefix')->defaultValue('enqueue')->end() - ->scalarNode('app_name')->defaultValue('app')->end() - ->scalarNode('router_topic')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() - ->scalarNode('router_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() - ->scalarNode('default_processor_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() - ->integerNode('redelivered_delay_time')->min(0)->defaultValue(0)->end() - ->end() - ->end() - ->arrayNode('extensions')->addDefaultsIfNotSet()->children() - ->booleanNode('signal_extension')->defaultValue(function_exists('pcntl_signal_dispatch'))->end() - ->end()->end() - ; - - return $tb->buildTree(); - } -} diff --git a/Tests/Functional/SimpleClientTest.php b/Tests/Functional/SimpleClientTest.php index 25fdc43..d486ccf 100644 --- a/Tests/Functional/SimpleClientTest.php +++ b/Tests/Functional/SimpleClientTest.php @@ -184,7 +184,7 @@ protected function purgeQueue(SimpleClient $client): void $queue = $driver->createQueue($driver->getConfig()->getDefaultProcessorQueueName()); try { - $client->getContext()->purgeQueue($queue); + $client->getDriver()->getContext()->purgeQueue($queue); } catch (PurgeQueueNotSupportedException $e) { } } diff --git a/composer.json b/composer.json index aea1a20..4f6c07a 100644 --- a/composer.json +++ b/composer.json @@ -8,11 +8,9 @@ "require": { "php": "^7.1.3", "enqueue/enqueue": "0.9.x-dev", - "symfony/dependency-injection": "^3.4|^4", "queue-interop/amqp-interop": "0.8.x-dev", "queue-interop/queue-interop": "0.7.x-dev", - "symfony/config": "^3.4|^4", - "symfony/console": "^3.4|^4" + "symfony/config": "^3.4|^4" }, "require-dev": { "phpunit/phpunit": "~5.5", From e6a12f9ae96e56d186207c8f4730590e8bdd3bae Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Fri, 28 Sep 2018 14:58:31 +0300 Subject: [PATCH 15/56] [simple-client] Do not use deprecated classes. --- SimpleClient.php | 18 +++++++++--------- Tests/Functional/SimpleClientTest.php | 8 ++++---- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/SimpleClient.php b/SimpleClient.php index bdcd9a6..a748364 100644 --- a/SimpleClient.php +++ b/SimpleClient.php @@ -25,10 +25,10 @@ use Enqueue\Rpc\Promise; use Enqueue\Rpc\RpcFactory; use Enqueue\Symfony\DependencyInjection\TransportFactory; -use Interop\Queue\PsrProcessor; +use Interop\Queue\Processor; use Symfony\Component\Config\Definition\Builder\TreeBuilder; use Symfony\Component\Config\Definition\NodeInterface; -use Symfony\Component\Config\Definition\Processor; +use Symfony\Component\Config\Definition\Processor as ConfigProcessor; final class SimpleClient { @@ -110,7 +110,7 @@ public function __construct($config) } /** - * @param callable|PsrProcessor $processor + * @param callable|Processor $processor */ public function bindTopic(string $topic, $processor, string $processorName = null): void { @@ -118,8 +118,8 @@ public function bindTopic(string $topic, $processor, string $processorName = nul $processor = new CallbackProcessor($processor); } - if (false == $processor instanceof PsrProcessor) { - throw new \LogicException('The processor must be either callable or instance of PsrProcessor'); + if (false == $processor instanceof Processor) { + throw new \LogicException('The processor must be either callable or instance of Processor'); } $processorName = $processorName ?: uniqid(get_class($processor)); @@ -129,7 +129,7 @@ public function bindTopic(string $topic, $processor, string $processorName = nul } /** - * @param callable|PsrProcessor $processor + * @param callable|Processor $processor */ public function bindCommand(string $command, $processor, string $processorName = null): void { @@ -137,8 +137,8 @@ public function bindCommand(string $command, $processor, string $processorName = $processor = new CallbackProcessor($processor); } - if (false == $processor instanceof PsrProcessor) { - throw new \LogicException('The processor must be either callable or instance of PsrProcessor'); + if (false == $processor instanceof Processor) { + throw new \LogicException('The processor must be either callable or instance of Processor'); } $processorName = $processorName ?: uniqid(get_class($processor)); @@ -211,7 +211,7 @@ public function setupBroker(): void public function build(array $configs): void { - $configProcessor = new Processor(); + $configProcessor = new ConfigProcessor(); $simpleClientConfig = $configProcessor->process($this->createConfiguration(), $configs); if (isset($simpleClientConfig['transport']['factory_service'])) { diff --git a/Tests/Functional/SimpleClientTest.php b/Tests/Functional/SimpleClientTest.php index d486ccf..61e0618 100644 --- a/Tests/Functional/SimpleClientTest.php +++ b/Tests/Functional/SimpleClientTest.php @@ -7,8 +7,8 @@ use Enqueue\Consumption\Extension\LimitConsumptionTimeExtension; use Enqueue\Consumption\Result; use Enqueue\SimpleClient\SimpleClient; -use Interop\Queue\PsrMessage; -use Interop\Queue\PurgeQueueNotSupportedException; +use Interop\Queue\Exception\PurgeQueueNotSupportedException; +use Interop\Queue\Message; use PHPUnit\Framework\TestCase; /** @@ -75,7 +75,7 @@ public function testSendEventWithOneSubscriber(array $config, string $timeLimit) $client = new SimpleClient($config); - $client->bindTopic('foo_topic', function (PsrMessage $message) use (&$actualMessage) { + $client->bindTopic('foo_topic', function (Message $message) use (&$actualMessage) { $actualMessage = $message; return Result::ACK; @@ -92,7 +92,7 @@ public function testSendEventWithOneSubscriber(array $config, string $timeLimit) new LimitConsumedMessagesExtension(2), ])); - $this->assertInstanceOf(PsrMessage::class, $actualMessage); + $this->assertInstanceOf(Message::class, $actualMessage); $this->assertSame('Hello there!', $actualMessage->getBody()); } From 871a8db31d0b9d9152da5d73e924c24f1030a9d2 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Fri, 28 Sep 2018 19:00:31 +0300 Subject: [PATCH 16/56] Merge master --- SimpleClient.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/SimpleClient.php b/SimpleClient.php index a748364..7a4cff3 100644 --- a/SimpleClient.php +++ b/SimpleClient.php @@ -297,9 +297,9 @@ private function createConfiguration(): NodeInterface ->children() ->scalarNode('prefix')->defaultValue('enqueue')->end() ->scalarNode('app_name')->defaultValue('app')->end() - ->scalarNode('router_topic')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() - ->scalarNode('router_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() - ->scalarNode('default_processor_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() + ->scalarNode('router_topic')->defaultValue('default')->cannotBeEmpty()->end() + ->scalarNode('router_queue')->defaultValue('default')->cannotBeEmpty()->end() + ->scalarNode('default_processor_queue')->defaultValue('default')->cannotBeEmpty()->end() ->integerNode('redelivered_delay_time')->min(0)->defaultValue(0)->end() ->end() ->end() From 8bd159d76878669db7a2255c60067d39f674a139 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Mon, 1 Oct 2018 18:33:11 +0300 Subject: [PATCH 17/56] [client] Rename config options. --- SimpleClient.php | 4 ++-- Tests/Functional/SimpleClientTest.php | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/SimpleClient.php b/SimpleClient.php index 7a4cff3..1f0d1c1 100644 --- a/SimpleClient.php +++ b/SimpleClient.php @@ -169,7 +169,7 @@ public function consume(ExtensionInterface $runtimeExtension = null): void $boundQueues = []; - $routerQueue = $this->getDriver()->createQueue($this->getDriver()->getConfig()->getRouterQueueName()); + $routerQueue = $this->getDriver()->createQueue($this->getDriver()->getConfig()->getRouterQueue()); $this->queueConsumer->bind($routerQueue, $this->delegateProcessor); $boundQueues[$routerQueue->getQueueName()] = true; @@ -268,7 +268,7 @@ public function build(array $configs): void $routerProcessor = new RouterProcessor($driver); - $processorRegistry->add($config->getRouterProcessorName(), $routerProcessor); + $processorRegistry->add($config->getRouterProcessor(), $routerProcessor); $this->driver = $driver; $this->producer = $producer; diff --git a/Tests/Functional/SimpleClientTest.php b/Tests/Functional/SimpleClientTest.php index 61e0618..63699c2 100644 --- a/Tests/Functional/SimpleClientTest.php +++ b/Tests/Functional/SimpleClientTest.php @@ -181,7 +181,7 @@ protected function purgeQueue(SimpleClient $client): void { $driver = $client->getDriver(); - $queue = $driver->createQueue($driver->getConfig()->getDefaultProcessorQueueName()); + $queue = $driver->createQueue($driver->getConfig()->getDefaultQueue()); try { $client->getDriver()->getContext()->purgeQueue($queue); From 85eb709ea438175cd90047ed442c9fe4b86f7cf2 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Tue, 2 Oct 2018 15:16:49 +0300 Subject: [PATCH 18/56] [consumption] Add ability to consume from multiple transports. --- SimpleClient.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/SimpleClient.php b/SimpleClient.php index 1f0d1c1..f1adc3b 100644 --- a/SimpleClient.php +++ b/SimpleClient.php @@ -2,7 +2,7 @@ namespace Enqueue\SimpleClient; -use Enqueue\Client\ArrayProcessorRegistry; +use Enqueue\ArrayProcessorRegistry; use Enqueue\Client\ChainExtension as ClientChainExtensions; use Enqueue\Client\Config; use Enqueue\Client\ConsumptionExtension\DelayRedeliveredMessageExtension; From 7d108c064a6596e90d95e8ec901d413d388f58cc Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 4 Oct 2018 13:58:47 +0300 Subject: [PATCH 19/56] Make cli commands multi transport\client. --- SimpleClient.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/SimpleClient.php b/SimpleClient.php index f1adc3b..ea7f3c7 100644 --- a/SimpleClient.php +++ b/SimpleClient.php @@ -289,7 +289,7 @@ private function createConfiguration(): NodeInterface }); $transportNode = $rootNode->children()->arrayNode('transport'); - (new TransportFactory('default'))->addConfiguration($transportNode); + (new TransportFactory('default'))->addTransportConfiguration($transportNode); $rootNode->children() ->arrayNode('client') From 2c4e7a1553cbd00fe7dfe7bebefa390c89987170 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Mon, 8 Oct 2018 19:06:16 +0300 Subject: [PATCH 20/56] [simple-client] Register log extension. --- SimpleClient.php | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/SimpleClient.php b/SimpleClient.php index ea7f3c7..de5922d 100644 --- a/SimpleClient.php +++ b/SimpleClient.php @@ -6,6 +6,7 @@ use Enqueue\Client\ChainExtension as ClientChainExtensions; use Enqueue\Client\Config; use Enqueue\Client\ConsumptionExtension\DelayRedeliveredMessageExtension; +use Enqueue\Client\ConsumptionExtension\LogExtension; use Enqueue\Client\ConsumptionExtension\SetRouterPropertiesExtension; use Enqueue\Client\DelegateProcessor; use Enqueue\Client\DriverFactory; @@ -26,6 +27,8 @@ use Enqueue\Rpc\RpcFactory; use Enqueue\Symfony\DependencyInjection\TransportFactory; use Interop\Queue\Processor; +use Psr\Log\LoggerInterface; +use Psr\Log\NullLogger; use Symfony\Component\Config\Definition\Builder\TreeBuilder; use Symfony\Component\Config\Definition\NodeInterface; use Symfony\Component\Config\Definition\Processor as ConfigProcessor; @@ -57,6 +60,11 @@ final class SimpleClient */ private $delegateProcessor; + /** + * @var LoggerInterface + */ + private $logger; + /** * The config could be a transport DSN (string) or an array, here's an example of a few DSNs:. * @@ -104,9 +112,11 @@ final class SimpleClient * * @param string|array $config */ - public function __construct($config) + public function __construct($config, LoggerInterface $logger = null) { $this->build(['enqueue' => $config]); + + $this->logger = $logger ?: new NullLogger(); } /** @@ -262,9 +272,10 @@ public function build(array $configs): void } $consumptionExtensions[] = new SetRouterPropertiesExtension($driver); + $consumptionExtensions[] = new LogExtension(); $consumptionChainExtension = new ConsumptionChainExtension($consumptionExtensions); - $queueConsumer = new QueueConsumer($driver->getContext(), $consumptionChainExtension); + $queueConsumer = new QueueConsumer($driver->getContext(), $consumptionChainExtension, [], $this->logger); $routerProcessor = new RouterProcessor($driver); From bd8142848b2ba69e3cc3a16bf9c9fd7729219435 Mon Sep 17 00:00:00 2001 From: webmake Date: Sat, 27 Oct 2018 10:56:55 +0000 Subject: [PATCH 21/56] Do not export non source files --- .gitattributes | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .gitattributes diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..bdf2dcb --- /dev/null +++ b/.gitattributes @@ -0,0 +1,5 @@ +/Tests export-ignore +.gitattributes export-ignore +.gitignore export-ignore +.travis.yml export-ignore +phpunit.xml.dist export-ignore From f451600335c9c04bc0fead696cb0ed477a46d685 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Tue, 6 Nov 2018 18:06:13 +0200 Subject: [PATCH 22/56] [doc][skip ci] Add sponsoring section. --- README.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/README.md b/README.md index 2368b31..c85444e 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,12 @@ +

Supporting Enqueue

+ +Enqueue is an MIT-licensed open source project with its ongoing development made possible entirely by the support of community and our customers. If you'd like to join them, please consider: + +- [Become a sponsor](https://www.patreon.com/makasim). +- [Become our client](http://forma-pro.com/) + +--- + # Message Queue. Simple client [![Gitter](https://badges.gitter.im/php-enqueue/Lobby.svg)](https://gitter.im/php-enqueue/Lobby) From 0f15559982c0fcd4b144c29ff92bbf8de82602f9 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Tue, 6 Nov 2018 18:12:08 +0200 Subject: [PATCH 23/56] [docs][skip ci] Add sponsoring to docs. --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index c85444e..d7054f1 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ Enqueue is an MIT-licensed open source project with its ongoing development made possible entirely by the support of community and our customers. If you'd like to join them, please consider: -- [Become a sponsor](https://www.patreon.com/makasim). +- [Become a sponsor](https://www.patreon.com/makasim) - [Become our client](http://forma-pro.com/) --- From 903a7e57c42e8f810e46f9b829299a3349b48016 Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Tue, 13 Nov 2018 13:25:39 +0200 Subject: [PATCH 24/56] multi client configuration --- SimpleClient.php | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/SimpleClient.php b/SimpleClient.php index de5922d..118a435 100644 --- a/SimpleClient.php +++ b/SimpleClient.php @@ -299,8 +299,7 @@ private function createConfiguration(): NodeInterface return ['transport' => ['dsn' => 'null:']]; }); - $transportNode = $rootNode->children()->arrayNode('transport'); - (new TransportFactory('default'))->addTransportConfiguration($transportNode); + $rootNode->children()->append(TransportFactory::getConfiguration()); $rootNode->children() ->arrayNode('client') From 90b4fd79bd6b49bfd2be459280db9093d48e59ce Mon Sep 17 00:00:00 2001 From: Alexander Kozienko Date: Fri, 16 Nov 2018 13:52:04 +0200 Subject: [PATCH 25/56] multi client configuration --- composer.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/composer.json b/composer.json index 4f6c07a..e204c05 100644 --- a/composer.json +++ b/composer.json @@ -8,8 +8,8 @@ "require": { "php": "^7.1.3", "enqueue/enqueue": "0.9.x-dev", - "queue-interop/amqp-interop": "0.8.x-dev", - "queue-interop/queue-interop": "0.7.x-dev", + "queue-interop/amqp-interop": "^0.8", + "queue-interop/queue-interop": "^0.7", "symfony/config": "^3.4|^4" }, "require-dev": { From a7992b1779a4d16aa42129827668e016dd1388ee Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Wed, 21 Nov 2018 13:55:20 +0200 Subject: [PATCH 26/56] Rework DriverFactory, add separator option to Client Config. --- SimpleClient.php | 50 ++++++++++++++++----------- Tests/Functional/SimpleClientTest.php | 6 ++-- 2 files changed, 33 insertions(+), 23 deletions(-) diff --git a/SimpleClient.php b/SimpleClient.php index 118a435..4e3d8e9 100644 --- a/SimpleClient.php +++ b/SimpleClient.php @@ -20,11 +20,14 @@ use Enqueue\ConnectionFactoryFactory; use Enqueue\Consumption\CallbackProcessor; use Enqueue\Consumption\ChainExtension as ConsumptionChainExtension; +use Enqueue\Consumption\Extension\ReplyExtension; +use Enqueue\Consumption\Extension\SignalExtension; use Enqueue\Consumption\ExtensionInterface; use Enqueue\Consumption\QueueConsumer; use Enqueue\Consumption\QueueConsumerInterface; use Enqueue\Rpc\Promise; use Enqueue\Rpc\RpcFactory; +use Enqueue\Symfony\Client\DependencyInjection\ClientFactory; use Enqueue\Symfony\DependencyInjection\TransportFactory; use Interop\Queue\Processor; use Psr\Log\LoggerInterface; @@ -98,14 +101,16 @@ final class SimpleClient * 'transport' => 'null:', * 'client' => [ * 'prefix' => 'enqueue', + * 'separator' => '.', * 'app_name' => 'app', * 'router_topic' => 'router', * 'router_queue' => 'default', - * 'default_processor_queue' => 'default', + * 'default_queue' => 'default', * 'redelivered_delay_time' => 0 * ], * 'extensions' => [ * 'signal_extension' => true, + * 'reply_extension' => true, * ] * ] * @@ -241,20 +246,23 @@ public function build(array $configs): void $config = new Config( $simpleClientConfig['client']['prefix'], + $simpleClientConfig['client']['separator'], $simpleClientConfig['client']['app_name'], $simpleClientConfig['client']['router_topic'], $simpleClientConfig['client']['router_queue'], - $simpleClientConfig['client']['default_processor_queue'], + $simpleClientConfig['client']['default_queue'], 'enqueue.client.router_processor', - $simpleClientConfig['transport'] + $simpleClientConfig['transport'], + [] ); + $routeCollection = new RouteCollection([]); - $driverFactory = new DriverFactory($config, $routeCollection); + $driverFactory = new DriverFactory(); $driver = $driverFactory->create( $connection, - $simpleClientConfig['transport']['dsn'], - $simpleClientConfig['transport'] + $config, + $routeCollection ); $rpcFactory = new RpcFactory($driver->getContext()); @@ -271,6 +279,14 @@ public function build(array $configs): void $consumptionExtensions[] = new DelayRedeliveredMessageExtension($driver, $simpleClientConfig['client']['redelivered_delay_time']); } + if ($simpleClientConfig['extensions']['signal_extension']) { + $consumptionExtensions[] = new SignalExtension(); + } + + if ($simpleClientConfig['extensions']['reply_extension']) { + $consumptionExtensions[] = new ReplyExtension(); + } + $consumptionExtensions[] = new SetRouterPropertiesExtension($driver); $consumptionExtensions[] = new LogExtension(); @@ -299,23 +315,17 @@ private function createConfiguration(): NodeInterface return ['transport' => ['dsn' => 'null:']]; }); - $rootNode->children()->append(TransportFactory::getConfiguration()); + $rootNode + ->append(TransportFactory::getConfiguration()) + ->append(TransportFactory::getQueueConsumerConfiguration()) + ->append(ClientFactory::getConfiguration(false)) + ; $rootNode->children() - ->arrayNode('client') - ->addDefaultsIfNotSet() - ->children() - ->scalarNode('prefix')->defaultValue('enqueue')->end() - ->scalarNode('app_name')->defaultValue('app')->end() - ->scalarNode('router_topic')->defaultValue('default')->cannotBeEmpty()->end() - ->scalarNode('router_queue')->defaultValue('default')->cannotBeEmpty()->end() - ->scalarNode('default_processor_queue')->defaultValue('default')->cannotBeEmpty()->end() - ->integerNode('redelivered_delay_time')->min(0)->defaultValue(0)->end() - ->end() - ->end() - ->arrayNode('extensions')->addDefaultsIfNotSet()->children() + ->arrayNode('extensions')->addDefaultsIfNotSet()->children() ->booleanNode('signal_extension')->defaultValue(function_exists('pcntl_signal_dispatch'))->end() - ->end()->end() + ->booleanNode('reply_extension')->defaultTrue()->end() + ->end() ; return $tb->buildTree(); diff --git a/Tests/Functional/SimpleClientTest.php b/Tests/Functional/SimpleClientTest.php index 63699c2..4a4c832 100644 --- a/Tests/Functional/SimpleClientTest.php +++ b/Tests/Functional/SimpleClientTest.php @@ -70,7 +70,7 @@ public function testSendEventWithOneSubscriber(array $config, string $timeLimit) 'app_name' => 'simple_client', 'router_topic' => 'test', 'router_queue' => 'test', - 'default_processor_queue' => 'test', + 'default_queue' => 'test', ]; $client = new SimpleClient($config); @@ -110,7 +110,7 @@ public function testSendEventWithTwoSubscriber(array $config, string $timeLimit) 'app_name' => 'simple_client', 'router_topic' => 'test', 'router_queue' => 'test', - 'default_processor_queue' => 'test', + 'default_queue' => 'test', ]; $client = new SimpleClient($config); @@ -153,7 +153,7 @@ public function testSendCommand(array $config, string $timeLimit) 'app_name' => 'simple_client', 'router_topic' => 'test', 'router_queue' => 'test', - 'default_processor_queue' => 'test', + 'default_queue' => 'test', ]; $client = new SimpleClient($config); From 9e0af08693e4ffedea732304c9544a51db754e9c Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Tue, 27 Nov 2018 19:54:18 +0200 Subject: [PATCH 27/56] Allow installing stable dependencies. --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index e204c05..a4bbc47 100644 --- a/composer.json +++ b/composer.json @@ -7,7 +7,7 @@ "license": "MIT", "require": { "php": "^7.1.3", - "enqueue/enqueue": "0.9.x-dev", + "enqueue/enqueue": "^0.9", "queue-interop/amqp-interop": "^0.8", "queue-interop/queue-interop": "^0.7", "symfony/config": "^3.4|^4" From 25fd82753ffb79a3c5a0159d32973d65d5bc5f8f Mon Sep 17 00:00:00 2001 From: Kotlyar Maksim Date: Thu, 13 Dec 2018 17:31:01 +0200 Subject: [PATCH 28/56] Allow queue interop 0.8 in packages --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index a4bbc47..571cca7 100644 --- a/composer.json +++ b/composer.json @@ -9,7 +9,7 @@ "php": "^7.1.3", "enqueue/enqueue": "^0.9", "queue-interop/amqp-interop": "^0.8", - "queue-interop/queue-interop": "^0.7", + "queue-interop/queue-interop": "^0.7|^0.8", "symfony/config": "^3.4|^4" }, "require-dev": { From c6c04d7c3e34ab87155bab0782b9fb451a603f1d Mon Sep 17 00:00:00 2001 From: Jo Carter Date: Fri, 14 Dec 2018 12:15:30 +0000 Subject: [PATCH 29/56] Fix TreeBuilder in Symfony 4.2 A tree builder without a root node is deprecated since Symfony 4.2 and will not be supported anymore in 5.0. --- SimpleClient.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/SimpleClient.php b/SimpleClient.php index 4e3d8e9..fcaa52d 100644 --- a/SimpleClient.php +++ b/SimpleClient.php @@ -306,8 +306,8 @@ public function build(array $configs): void private function createConfiguration(): NodeInterface { - $tb = new TreeBuilder(); - $rootNode = $tb->root('enqueue'); + $tb = new TreeBuilder('enqueue'); + $rootNode = $tb->getRootNode(); $rootNode ->beforeNormalization() From 3ded0bb56480e8d6ebb970378bdbf18cedeeb8c9 Mon Sep 17 00:00:00 2001 From: Jo Carter Date: Fri, 14 Dec 2018 12:27:16 +0000 Subject: [PATCH 30/56] BC fix. --- SimpleClient.php | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/SimpleClient.php b/SimpleClient.php index fcaa52d..dd0eb74 100644 --- a/SimpleClient.php +++ b/SimpleClient.php @@ -307,7 +307,12 @@ public function build(array $configs): void private function createConfiguration(): NodeInterface { $tb = new TreeBuilder('enqueue'); - $rootNode = $tb->getRootNode(); + + if (method_exists($tb, 'getRootNode')) { + $rootNode = $tb->getRootNode(); + } else { + $rootNode = $tb->root('enqueue'); + } $rootNode ->beforeNormalization() From e4273e482dcb8051f285baa42eb4fba1dfe96a09 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Fri, 14 Dec 2018 14:39:47 +0200 Subject: [PATCH 31/56] Add commands for single transport\client with typed arguments. --- SimpleClient.php | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/SimpleClient.php b/SimpleClient.php index 4e3d8e9..b4c146e 100644 --- a/SimpleClient.php +++ b/SimpleClient.php @@ -219,6 +219,11 @@ public function getProducer(bool $setupBroker = false): ProducerInterface return $this->producer; } + public function getDelegateProcessor(): DelegateProcessor + { + return $this->delegateProcessor; + } + public function setupBroker(): void { $this->getDriver()->setupBroker(); From 2798aaef6c7431c2b1bbd62eeac5580182ad527c Mon Sep 17 00:00:00 2001 From: Jo Carter Date: Fri, 14 Dec 2018 13:03:59 +0000 Subject: [PATCH 32/56] Further BC fix --- SimpleClient.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/SimpleClient.php b/SimpleClient.php index dd0eb74..0e03da7 100644 --- a/SimpleClient.php +++ b/SimpleClient.php @@ -306,11 +306,11 @@ public function build(array $configs): void private function createConfiguration(): NodeInterface { - $tb = new TreeBuilder('enqueue'); - - if (method_exists($tb, 'getRootNode')) { + if (method_exists(TreeBuilder::class, 'getRootNode')) { + $tb = new TreeBuilder('enqueue'); $rootNode = $tb->getRootNode(); } else { + $tb = new TreeBuilder(); $rootNode = $tb->root('enqueue'); } From 68c581089f4e5b72693f269f767de075824e430d Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Fri, 14 Dec 2018 15:07:46 +0200 Subject: [PATCH 33/56] [simple-client] Simple Client should accept string DSN as constructor argument. --- SimpleClient.php | 7 ++++++ Tests/Functional/SimpleClientTest.php | 33 ++++++++++++++++++++++++--- 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/SimpleClient.php b/SimpleClient.php index b4c146e..ea07ef0 100644 --- a/SimpleClient.php +++ b/SimpleClient.php @@ -119,6 +119,13 @@ final class SimpleClient */ public function __construct($config, LoggerInterface $logger = null) { + if (is_string($config)) { + $config = [ + 'transport' => $config, + 'client' => true, + ]; + } + $this->build(['enqueue' => $config]); $this->logger = $logger ?: new NullLogger(); diff --git a/Tests/Functional/SimpleClientTest.php b/Tests/Functional/SimpleClientTest.php index 4a4c832..13320fb 100644 --- a/Tests/Functional/SimpleClientTest.php +++ b/Tests/Functional/SimpleClientTest.php @@ -56,12 +56,39 @@ public function transportConfigDataProvider() ], '+1sec']; } + public function testShouldWorkWithStringDsnConstructorArgument() + { + $actualMessage = null; + + $client = new SimpleClient(getenv('AMQP_DSN')); + + $client->bindTopic('foo_topic', function (Message $message) use (&$actualMessage) { + $actualMessage = $message; + + return Result::ACK; + }); + + $client->setupBroker(); + $this->purgeQueue($client); + + $client->sendEvent('foo_topic', 'Hello there!'); + + $client->getQueueConsumer()->setReceiveTimeout(200); + $client->consume(new ChainExtension([ + new LimitConsumptionTimeExtension(new \DateTime('+1sec')), + new LimitConsumedMessagesExtension(2), + ])); + + $this->assertInstanceOf(Message::class, $actualMessage); + $this->assertSame('Hello there!', $actualMessage->getBody()); + } + /** * @dataProvider transportConfigDataProvider * * @param mixed $config */ - public function testSendEventWithOneSubscriber(array $config, string $timeLimit) + public function testSendEventWithOneSubscriber($config, string $timeLimit) { $actualMessage = null; @@ -101,7 +128,7 @@ public function testSendEventWithOneSubscriber(array $config, string $timeLimit) * * @param mixed $config */ - public function testSendEventWithTwoSubscriber(array $config, string $timeLimit) + public function testSendEventWithTwoSubscriber($config, string $timeLimit) { $received = 0; @@ -144,7 +171,7 @@ public function testSendEventWithTwoSubscriber(array $config, string $timeLimit) * * @param mixed $config */ - public function testSendCommand(array $config, string $timeLimit) + public function testSendCommand($config, string $timeLimit) { $received = 0; From 9a2dccbb1e6b70015c6f0ecd3e7998211f9e3f2a Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Fri, 14 Dec 2018 21:01:02 +0200 Subject: [PATCH 34/56] fix extensions version in tests. --- Tests/fix_composer_json.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Tests/fix_composer_json.php b/Tests/fix_composer_json.php index fc430e2..f025f60 100644 --- a/Tests/fix_composer_json.php +++ b/Tests/fix_composer_json.php @@ -4,6 +4,6 @@ $composerJson = json_decode(file_get_contents(__DIR__.'/../composer.json'), true); -$composerJson['config']['platform']['ext-amqp'] = '1.7'; +$composerJson['config']['platform']['ext-amqp'] = '1.9.3'; file_put_contents(__DIR__.'/../composer.json', json_encode($composerJson, JSON_PRETTY_PRINT)); From 89c0f7b7756f32d3d2750a68d699726abcd6ec4c Mon Sep 17 00:00:00 2001 From: Aaron Bonner Date: Fri, 1 Feb 2019 21:39:55 +0000 Subject: [PATCH 35/56] Fix typo --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index d7054f1..ddc024f 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ Enqueue is an MIT-licensed open source project with its ongoing development made [![Total Downloads](https://poser.pugx.org/enqueue/simple-client/d/total.png)](https://packagist.org/packages/enqueue/simple-client) [![Latest Stable Version](https://poser.pugx.org/enqueue/simple-client/version.png)](https://packagist.org/packages/enqueue/simple-client) -The simple client takes Enqueue client classes and Symfony components and combines it to easy to use facade called `SimpleCLient`. +The simple client takes Enqueue client classes and Symfony components and combines it to easy to use facade called `SimpleClient`. ## Resources From 86bcbe289faff4b5c74297cec4b8505280299b1a Mon Sep 17 00:00:00 2001 From: Aaron Bonner Date: Fri, 1 Feb 2019 21:40:26 +0000 Subject: [PATCH 36/56] Ensure logger assigned before initialising objects If the logger isn't assigned in the SimpleClient's constructor before the object graph is initialised, dependencies will get a NullLogger rather than the LoggerInterface passed in. --- SimpleClient.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/SimpleClient.php b/SimpleClient.php index 52329a6..9158364 100644 --- a/SimpleClient.php +++ b/SimpleClient.php @@ -126,9 +126,9 @@ public function __construct($config, LoggerInterface $logger = null) ]; } - $this->build(['enqueue' => $config]); - $this->logger = $logger ?: new NullLogger(); + + $this->build(['enqueue' => $config]); } /** From 1e13bddb44ed422871bdff8a077daea3b271a883 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Niedzielski?= Date: Mon, 27 May 2019 15:12:22 +0200 Subject: [PATCH 37/56] Prefer github pages in packages' readme files --- README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index ddc024f..0c4dc45 100644 --- a/README.md +++ b/README.md @@ -13,20 +13,20 @@ Enqueue is an MIT-licensed open source project with its ongoing development made [![Build Status](https://travis-ci.org/php-enqueue/simple-client.png?branch=master)](https://travis-ci.org/php-enqueue/simple-client) [![Total Downloads](https://poser.pugx.org/enqueue/simple-client/d/total.png)](https://packagist.org/packages/enqueue/simple-client) [![Latest Stable Version](https://poser.pugx.org/enqueue/simple-client/version.png)](https://packagist.org/packages/enqueue/simple-client) - -The simple client takes Enqueue client classes and Symfony components and combines it to easy to use facade called `SimpleClient`. + +The simple client takes Enqueue client classes and Symfony components and combines it to easy to use facade called `SimpleClient`. ## Resources * [Site](https://enqueue.forma-pro.com/) -* [Documentation](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md) +* [Documentation](https://php-enqueue.github.com/) * [Questions](https://gitter.im/php-enqueue/Lobby) * [Issue Tracker](https://github.com/php-enqueue/enqueue-dev/issues) ## Developed by Forma-Pro -Forma-Pro is a full stack development company which interests also spread to open source development. -Being a team of strong professionals we have an aim an ability to help community by developing cutting edge solutions in the areas of e-commerce, docker & microservice oriented architecture where we have accumulated a huge many-years experience. +Forma-Pro is a full stack development company which interests also spread to open source development. +Being a team of strong professionals we have an aim an ability to help community by developing cutting edge solutions in the areas of e-commerce, docker & microservice oriented architecture where we have accumulated a huge many-years experience. Our main specialization is Symfony framework based solution, but we are always looking to the technologies that allow us to do our job the best way. We are committed to creating solutions that revolutionize the way how things are developed in aspects of architecture & scalability. If you have any questions and inquires about our open source development, this product particularly or any other matter feel free to contact at opensource@forma-pro.com From 6001604ad775df0d43327e3b658a4d9b52081db1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Niedzielski?= Date: Wed, 29 May 2019 17:08:39 +0200 Subject: [PATCH 38/56] Fix documentation links Whoops. --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 0c4dc45..c9e0b4f 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ The simple client takes Enqueue client classes and Symfony components and combin ## Resources * [Site](https://enqueue.forma-pro.com/) -* [Documentation](https://php-enqueue.github.com/) +* [Documentation](https://php-enqueue.github.io/) * [Questions](https://gitter.im/php-enqueue/Lobby) * [Issue Tracker](https://github.com/php-enqueue/enqueue-dev/issues) From 94c673ba06255c1272579e4449c0f6ce939edf2e Mon Sep 17 00:00:00 2001 From: Jeroeny Date: Sat, 23 Nov 2019 11:31:39 +0100 Subject: [PATCH 39/56] allow symfony 5 --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index 571cca7..604eb69 100644 --- a/composer.json +++ b/composer.json @@ -10,7 +10,7 @@ "enqueue/enqueue": "^0.9", "queue-interop/amqp-interop": "^0.8", "queue-interop/queue-interop": "^0.7|^0.8", - "symfony/config": "^3.4|^4" + "symfony/config": "^3.4|^4|^5" }, "require-dev": { "phpunit/phpunit": "~5.5", From 2fe0edc73cfeea32c8b83b97fe3f8466324692ae Mon Sep 17 00:00:00 2001 From: Mathieu Lemoine Date: Wed, 11 Dec 2019 16:32:12 +0100 Subject: [PATCH 40/56] Reduced dependency to voryx/Thruway --- composer.json | 5 +++-- phpunit.xml.dist | 1 - 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/composer.json b/composer.json index 604eb69..3f9e175 100644 --- a/composer.json +++ b/composer.json @@ -13,11 +13,12 @@ "symfony/config": "^3.4|^4|^5" }, "require-dev": { - "phpunit/phpunit": "~5.5", + "phpunit/phpunit": "~7.5", "enqueue/test": "0.9.x-dev", "enqueue/amqp-ext": "0.9.x-dev", "enqueue/fs": "0.9.x-dev", - "enqueue/null": "0.9.x-dev" + "enqueue/null": "0.9.x-dev", + "symfony/yaml": "^3.4|^4|^5" }, "support": { "email": "opensource@forma-pro.com", diff --git a/phpunit.xml.dist b/phpunit.xml.dist index e86476d..57d353f 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -8,7 +8,6 @@ convertWarningsToExceptions="true" processIsolation="false" stopOnFailure="false" - syntaxCheck="false" bootstrap="./vendor/autoload.php" > From 0a894c0b5848b11dcbceafb264b7e45af0235921 Mon Sep 17 00:00:00 2001 From: Mathieu Lemoine Date: Thu, 12 Dec 2019 11:56:38 +0100 Subject: [PATCH 41/56] drop support for Symfony < 4.3 --- composer.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/composer.json b/composer.json index 3f9e175..18e87e3 100644 --- a/composer.json +++ b/composer.json @@ -10,7 +10,7 @@ "enqueue/enqueue": "^0.9", "queue-interop/amqp-interop": "^0.8", "queue-interop/queue-interop": "^0.7|^0.8", - "symfony/config": "^3.4|^4|^5" + "symfony/config": "^4.3|^5" }, "require-dev": { "phpunit/phpunit": "~7.5", @@ -18,7 +18,7 @@ "enqueue/amqp-ext": "0.9.x-dev", "enqueue/fs": "0.9.x-dev", "enqueue/null": "0.9.x-dev", - "symfony/yaml": "^3.4|^4|^5" + "symfony/yaml": "^4.3|^5" }, "support": { "email": "opensource@forma-pro.com", From 5add8e9dbaa14d596e407acdefa1812ae26e5f08 Mon Sep 17 00:00:00 2001 From: Maksim Kotlyar Date: Thu, 19 Dec 2019 09:01:09 +0200 Subject: [PATCH 42/56] master is 0.10 --- composer.json | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/composer.json b/composer.json index 18e87e3..3b525bf 100644 --- a/composer.json +++ b/composer.json @@ -7,17 +7,17 @@ "license": "MIT", "require": { "php": "^7.1.3", - "enqueue/enqueue": "^0.9", + "enqueue/enqueue": "^0.10", "queue-interop/amqp-interop": "^0.8", - "queue-interop/queue-interop": "^0.7|^0.8", + "queue-interop/queue-interop": "^0.8", "symfony/config": "^4.3|^5" }, "require-dev": { "phpunit/phpunit": "~7.5", - "enqueue/test": "0.9.x-dev", - "enqueue/amqp-ext": "0.9.x-dev", - "enqueue/fs": "0.9.x-dev", - "enqueue/null": "0.9.x-dev", + "enqueue/test": "0.10.x-dev", + "enqueue/amqp-ext": "0.10.x-dev", + "enqueue/fs": "0.10.x-dev", + "enqueue/null": "0.10.x-dev", "symfony/yaml": "^4.3|^5" }, "support": { @@ -36,7 +36,7 @@ "minimum-stability": "dev", "extra": { "branch-alias": { - "dev-master": "0.9.x-dev" + "dev-master": "0.10.x-dev" } } } From bf000a21aa602fadd42a6287a77170a4a24efca5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20Niedzielski?= Date: Mon, 27 Jan 2020 00:45:10 +0100 Subject: [PATCH 43/56] Add schema declaration to phpunit files. Remove parameters set to phpunit defaults. --- phpunit.xml.dist | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/phpunit.xml.dist b/phpunit.xml.dist index 57d353f..81d59cf 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -1,15 +1,11 @@ - + From 14a56ee22bb4ad71bf65b31ec5c11dc2ba9e0da0 Mon Sep 17 00:00:00 2001 From: Witold Wasiczko Date: Thu, 30 Jul 2020 20:38:01 +0200 Subject: [PATCH 44/56] Add php 7.3 and 7.4 travis env to test --- .travis.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.travis.yml b/.travis.yml index 408d8b7..ebe7146 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,6 +8,8 @@ language: php php: - '7.1' - '7.2' + - '7.3' + - '7.4' cache: directories: From e6614ddc4e4bc9406402cbe2e110ffb750130e1d Mon Sep 17 00:00:00 2001 From: Andrew M-Y Date: Mon, 25 Jan 2021 09:51:30 +0200 Subject: [PATCH 45/56] Update PHP and PHPUnit across all packages; fix ext-rdkafka requirement --- composer.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/composer.json b/composer.json index 3b525bf..e06d880 100644 --- a/composer.json +++ b/composer.json @@ -6,14 +6,14 @@ "homepage": "https://enqueue.forma-pro.com/", "license": "MIT", "require": { - "php": "^7.1.3", + "php": "^7.3", "enqueue/enqueue": "^0.10", "queue-interop/amqp-interop": "^0.8", "queue-interop/queue-interop": "^0.8", "symfony/config": "^4.3|^5" }, "require-dev": { - "phpunit/phpunit": "~7.5", + "phpunit/phpunit": "^9.5", "enqueue/test": "0.10.x-dev", "enqueue/amqp-ext": "0.10.x-dev", "enqueue/fs": "0.10.x-dev", From c02c0ebb519e35357096103c87a6c6ed70b6eddc Mon Sep 17 00:00:00 2001 From: Andrew M-Y Date: Tue, 9 Feb 2021 02:14:42 +0200 Subject: [PATCH 46/56] Fix package CI - Add push to master trigger; - Replace the clunky cache + composer install steps with an Action; - Replace Travis with GH WF in packages; - Add the missing CI to amqp-tools; - Replace the shields. --- .github/workflows/ci.yml | 31 +++++++++++++++++++++++++++++++ .travis.yml | 24 ------------------------ README.md | 2 +- 3 files changed, 32 insertions(+), 25 deletions(-) create mode 100644 .github/workflows/ci.yml delete mode 100644 .travis.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..c9683ac --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,31 @@ +name: CI +on: + pull_request: + push: + branches: + - master +jobs: + tests: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + php: ['7.3', '7.4'] + + name: PHP ${{ matrix.php }} tests + + steps: + - uses: actions/checkout@v2 + + - uses: shivammathur/setup-php@v2 + with: + php-version: ${{ matrix.php }} + coverage: none + + - run: php Tests/fix_composer_json.php + + - uses: "ramsey/composer-install@v1" + with: + composer-options: "--prefer-source" + + - run: vendor/bin/phpunit --exlude-group=functional diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index ebe7146..0000000 --- a/.travis.yml +++ /dev/null @@ -1,24 +0,0 @@ -sudo: false - -git: - depth: 10 - -language: php - -php: - - '7.1' - - '7.2' - - '7.3' - - '7.4' - -cache: - directories: - - $HOME/.composer/cache - -install: - - php Tests/fix_composer_json.php - - composer self-update - - composer install --prefer-source - -script: - - vendor/bin/phpunit --exclude-group=functional diff --git a/README.md b/README.md index c9e0b4f..b3e5a3f 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ Enqueue is an MIT-licensed open source project with its ongoing development made # Message Queue. Simple client [![Gitter](https://badges.gitter.im/php-enqueue/Lobby.svg)](https://gitter.im/php-enqueue/Lobby) -[![Build Status](https://travis-ci.org/php-enqueue/simple-client.png?branch=master)](https://travis-ci.org/php-enqueue/simple-client) +[![Build Status](https://img.shields.io/github/workflow/status/php-enqueue/simple-client/CI)](https://github.com/php-enqueue/simple-client/actions?query=workflow%3ACI) [![Total Downloads](https://poser.pugx.org/enqueue/simple-client/d/total.png)](https://packagist.org/packages/enqueue/simple-client) [![Latest Stable Version](https://poser.pugx.org/enqueue/simple-client/version.png)](https://packagist.org/packages/enqueue/simple-client) From 204ec5442065458d01b35b7749f7635e3b775251 Mon Sep 17 00:00:00 2001 From: Andrew M-Y Date: Mon, 25 Jan 2021 10:26:59 +0200 Subject: [PATCH 47/56] Allow php8 in all composer.json files --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index e06d880..6b277c6 100644 --- a/composer.json +++ b/composer.json @@ -6,7 +6,7 @@ "homepage": "https://enqueue.forma-pro.com/", "license": "MIT", "require": { - "php": "^7.3", + "php": "^7.3|^8.0", "enqueue/enqueue": "^0.10", "queue-interop/amqp-interop": "^0.8", "queue-interop/queue-interop": "^0.8", From b80705430e35ec4b1d414a19dba9e20ef70da296 Mon Sep 17 00:00:00 2001 From: Andrew M-Y Date: Tue, 9 Feb 2021 14:01:28 +0200 Subject: [PATCH 48/56] Add php 8 to packages' build matrices; normalize composer reqs --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c9683ac..7fcc096 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,7 +10,7 @@ jobs: strategy: fail-fast: false matrix: - php: ['7.3', '7.4'] + php: ['7.3', '7.4', '8.0'] name: PHP ${{ matrix.php }} tests From 7f1ec132fd9a3db71497f1943b6dd5c246484d2e Mon Sep 17 00:00:00 2001 From: Andrew M-Y Date: Wed, 17 Feb 2021 20:57:43 +0200 Subject: [PATCH 49/56] Bump amqp-interop and queue-interop --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index 6b277c6..02e8a93 100644 --- a/composer.json +++ b/composer.json @@ -8,7 +8,7 @@ "require": { "php": "^7.3|^8.0", "enqueue/enqueue": "^0.10", - "queue-interop/amqp-interop": "^0.8", + "queue-interop/amqp-interop": "^0.8.2", "queue-interop/queue-interop": "^0.8", "symfony/config": "^4.3|^5" }, From 1242cf0360bb057e7ec88437c869b60d6e7af93a Mon Sep 17 00:00:00 2001 From: Andrew M-Y Date: Thu, 10 Feb 2022 18:49:44 +0200 Subject: [PATCH 50/56] Add support for Symfony 6 --- composer.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/composer.json b/composer.json index 02e8a93..037bf3f 100644 --- a/composer.json +++ b/composer.json @@ -10,7 +10,7 @@ "enqueue/enqueue": "^0.10", "queue-interop/amqp-interop": "^0.8.2", "queue-interop/queue-interop": "^0.8", - "symfony/config": "^4.3|^5" + "symfony/config": "^4.3|^5|^6.0" }, "require-dev": { "phpunit/phpunit": "^9.5", @@ -18,7 +18,7 @@ "enqueue/amqp-ext": "0.10.x-dev", "enqueue/fs": "0.10.x-dev", "enqueue/null": "0.10.x-dev", - "symfony/yaml": "^4.3|^5" + "symfony/yaml": "^4.3|^5|^6.0" }, "support": { "email": "opensource@forma-pro.com", From 5ba58c6e514a9cd1a27fb79325f229885348b06d Mon Sep 17 00:00:00 2001 From: Andrew M-Y Date: Wed, 16 Feb 2022 13:14:49 +0200 Subject: [PATCH 51/56] =?UTF-8?q?Drop=20Symfony=20<=205.1=20=F0=9F=94=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- composer.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/composer.json b/composer.json index 037bf3f..f12215e 100644 --- a/composer.json +++ b/composer.json @@ -10,7 +10,7 @@ "enqueue/enqueue": "^0.10", "queue-interop/amqp-interop": "^0.8.2", "queue-interop/queue-interop": "^0.8", - "symfony/config": "^4.3|^5|^6.0" + "symfony/config": "^5.1|^6.0" }, "require-dev": { "phpunit/phpunit": "^9.5", @@ -18,7 +18,7 @@ "enqueue/amqp-ext": "0.10.x-dev", "enqueue/fs": "0.10.x-dev", "enqueue/null": "0.10.x-dev", - "symfony/yaml": "^4.3|^5|^6.0" + "symfony/yaml": "^5.1|^6.0" }, "support": { "email": "opensource@forma-pro.com", From d9607b557e7efce1566878b0359b42922a0f5610 Mon Sep 17 00:00:00 2001 From: Alex Mayer Date: Sat, 25 Feb 2023 22:18:02 -0500 Subject: [PATCH 52/56] Fix Shield URLs in READMEs --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index b3e5a3f..8ecb670 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ Enqueue is an MIT-licensed open source project with its ongoing development made # Message Queue. Simple client [![Gitter](https://badges.gitter.im/php-enqueue/Lobby.svg)](https://gitter.im/php-enqueue/Lobby) -[![Build Status](https://img.shields.io/github/workflow/status/php-enqueue/simple-client/CI)](https://github.com/php-enqueue/simple-client/actions?query=workflow%3ACI) +[![Build Status](https://img.shields.io/github/actions/workflow/status/php-enqueue/simple-client/ci.yml?branch=master)](https://github.com/php-enqueue/simple-client/actions?query=workflow%3ACI) [![Total Downloads](https://poser.pugx.org/enqueue/simple-client/d/total.png)](https://packagist.org/packages/enqueue/simple-client) [![Latest Stable Version](https://poser.pugx.org/enqueue/simple-client/version.png)](https://packagist.org/packages/enqueue/simple-client) From 1a2375ef64bf8e3885618d32feff41faeaa573ea Mon Sep 17 00:00:00 2001 From: Andrew M-Y Date: Sun, 19 Feb 2023 15:13:41 +0200 Subject: [PATCH 53/56] Properly drop old PHP and Symfony versions --- composer.json | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/composer.json b/composer.json index f12215e..7cb07b6 100644 --- a/composer.json +++ b/composer.json @@ -6,11 +6,11 @@ "homepage": "https://enqueue.forma-pro.com/", "license": "MIT", "require": { - "php": "^7.3|^8.0", + "php": "^7.4|^8.0", "enqueue/enqueue": "^0.10", "queue-interop/amqp-interop": "^0.8.2", "queue-interop/queue-interop": "^0.8", - "symfony/config": "^5.1|^6.0" + "symfony/config": "^5.4|^6.0" }, "require-dev": { "phpunit/phpunit": "^9.5", @@ -18,7 +18,7 @@ "enqueue/amqp-ext": "0.10.x-dev", "enqueue/fs": "0.10.x-dev", "enqueue/null": "0.10.x-dev", - "symfony/yaml": "^5.1|^6.0" + "symfony/yaml": "^5.4|^6.0" }, "support": { "email": "opensource@forma-pro.com", From f8a5e00d1648559748bfa79485168c10acb075be Mon Sep 17 00:00:00 2001 From: Witold Wasiczko Date: Thu, 23 Mar 2023 10:50:55 +0100 Subject: [PATCH 54/56] Update and fix pkg test with 8.1 and 8.2 PHP support --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7fcc096..6b24b0f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,7 +10,7 @@ jobs: strategy: fail-fast: false matrix: - php: ['7.3', '7.4', '8.0'] + php: ['7.4', '8.0', '8.1', '8.2'] name: PHP ${{ matrix.php }} tests @@ -28,4 +28,4 @@ jobs: with: composer-options: "--prefer-source" - - run: vendor/bin/phpunit --exlude-group=functional + - run: vendor/bin/phpunit --exclude-group=functional From 94a7c620b0db5dc49decab737d41c464e176b7b8 Mon Sep 17 00:00:00 2001 From: James Read Date: Wed, 15 Jan 2025 22:51:32 +0000 Subject: [PATCH 55/56] Running php-cs-fixer Running php-cs-fixer to fix CS drift --- SimpleClient.php | 13 ++++++------- Tests/Functional/SimpleClientTest.php | 6 ------ Tests/fix_composer_json.php | 2 +- 3 files changed, 7 insertions(+), 14 deletions(-) diff --git a/SimpleClient.php b/SimpleClient.php index 9158364..bbfd8b9 100644 --- a/SimpleClient.php +++ b/SimpleClient.php @@ -114,10 +114,9 @@ final class SimpleClient * ] * ] * - * * @param string|array $config */ - public function __construct($config, LoggerInterface $logger = null) + public function __construct($config, ?LoggerInterface $logger = null) { if (is_string($config)) { $config = [ @@ -134,7 +133,7 @@ public function __construct($config, LoggerInterface $logger = null) /** * @param callable|Processor $processor */ - public function bindTopic(string $topic, $processor, string $processorName = null): void + public function bindTopic(string $topic, $processor, ?string $processorName = null): void { if (is_callable($processor)) { $processor = new CallbackProcessor($processor); @@ -144,7 +143,7 @@ public function bindTopic(string $topic, $processor, string $processorName = nul throw new \LogicException('The processor must be either callable or instance of Processor'); } - $processorName = $processorName ?: uniqid(get_class($processor)); + $processorName = $processorName ?: uniqid($processor::class); $this->driver->getRouteCollection()->add(new Route($topic, Route::TOPIC, $processorName)); $this->processorRegistry->add($processorName, $processor); @@ -153,7 +152,7 @@ public function bindTopic(string $topic, $processor, string $processorName = nul /** * @param callable|Processor $processor */ - public function bindCommand(string $command, $processor, string $processorName = null): void + public function bindCommand(string $command, $processor, ?string $processorName = null): void { if (is_callable($processor)) { $processor = new CallbackProcessor($processor); @@ -163,7 +162,7 @@ public function bindCommand(string $command, $processor, string $processorName = throw new \LogicException('The processor must be either callable or instance of Processor'); } - $processorName = $processorName ?: uniqid(get_class($processor)); + $processorName = $processorName ?: uniqid($processor::class); $this->driver->getRouteCollection()->add(new Route($command, Route::COMMAND, $processorName)); $this->processorRegistry->add($processorName, $processor); @@ -185,7 +184,7 @@ public function sendEvent(string $topic, $message): void $this->producer->sendEvent($topic, $message); } - public function consume(ExtensionInterface $runtimeExtension = null): void + public function consume(?ExtensionInterface $runtimeExtension = null): void { $this->setupBroker(); diff --git a/Tests/Functional/SimpleClientTest.php b/Tests/Functional/SimpleClientTest.php index 13320fb..ce75457 100644 --- a/Tests/Functional/SimpleClientTest.php +++ b/Tests/Functional/SimpleClientTest.php @@ -85,8 +85,6 @@ public function testShouldWorkWithStringDsnConstructorArgument() /** * @dataProvider transportConfigDataProvider - * - * @param mixed $config */ public function testSendEventWithOneSubscriber($config, string $timeLimit) { @@ -125,8 +123,6 @@ public function testSendEventWithOneSubscriber($config, string $timeLimit) /** * @dataProvider transportConfigDataProvider - * - * @param mixed $config */ public function testSendEventWithTwoSubscriber($config, string $timeLimit) { @@ -168,8 +164,6 @@ public function testSendEventWithTwoSubscriber($config, string $timeLimit) /** * @dataProvider transportConfigDataProvider - * - * @param mixed $config */ public function testSendCommand($config, string $timeLimit) { diff --git a/Tests/fix_composer_json.php b/Tests/fix_composer_json.php index f025f60..01f73c9 100644 --- a/Tests/fix_composer_json.php +++ b/Tests/fix_composer_json.php @@ -6,4 +6,4 @@ $composerJson['config']['platform']['ext-amqp'] = '1.9.3'; -file_put_contents(__DIR__.'/../composer.json', json_encode($composerJson, JSON_PRETTY_PRINT)); +file_put_contents(__DIR__.'/../composer.json', json_encode($composerJson, \JSON_PRETTY_PRINT)); From de4c4a4244252e4d1407afbc7f4702ad4968523f Mon Sep 17 00:00:00 2001 From: James Read Date: Sun, 27 Apr 2025 14:30:31 +0100 Subject: [PATCH 56/56] Updating composer Updating composer.json in pkg directory to correctly reflect the supported versions of PHP --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index 7cb07b6..2d2bd37 100644 --- a/composer.json +++ b/composer.json @@ -6,7 +6,7 @@ "homepage": "https://enqueue.forma-pro.com/", "license": "MIT", "require": { - "php": "^7.4|^8.0", + "php": "^8.1", "enqueue/enqueue": "^0.10", "queue-interop/amqp-interop": "^0.8.2", "queue-interop/queue-interop": "^0.8",