diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..bdf2dcb --- /dev/null +++ b/.gitattributes @@ -0,0 +1,5 @@ +/Tests export-ignore +.gitattributes export-ignore +.gitignore export-ignore +.travis.yml export-ignore +phpunit.xml.dist export-ignore diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..6b24b0f --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,31 @@ +name: CI +on: + pull_request: + push: + branches: + - master +jobs: + tests: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + php: ['7.4', '8.0', '8.1', '8.2'] + + name: PHP ${{ matrix.php }} tests + + steps: + - uses: actions/checkout@v2 + + - uses: shivammathur/setup-php@v2 + with: + php-version: ${{ matrix.php }} + coverage: none + + - run: php Tests/fix_composer_json.php + + - uses: "ramsey/composer-install@v1" + with: + composer-options: "--prefer-source" + + - run: vendor/bin/phpunit --exclude-group=functional diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 566e0af..0000000 --- a/.travis.yml +++ /dev/null @@ -1,22 +0,0 @@ -sudo: false - -git: - depth: 10 - -language: php - -php: - - '5.6' - - '7.0' - -cache: - directories: - - $HOME/.composer/cache - -install: - - php Tests/fix_composer_json.php - - composer self-update - - composer install --prefer-source - -script: - - vendor/bin/phpunit --exclude-group=functional diff --git a/README.md b/README.md index 2368b31..8ecb670 100644 --- a/README.md +++ b/README.md @@ -1,23 +1,32 @@ +

Supporting Enqueue

+ +Enqueue is an MIT-licensed open source project with its ongoing development made possible entirely by the support of community and our customers. If you'd like to join them, please consider: + +- [Become a sponsor](https://www.patreon.com/makasim) +- [Become our client](http://forma-pro.com/) + +--- + # Message Queue. Simple client [![Gitter](https://badges.gitter.im/php-enqueue/Lobby.svg)](https://gitter.im/php-enqueue/Lobby) -[![Build Status](https://travis-ci.org/php-enqueue/simple-client.png?branch=master)](https://travis-ci.org/php-enqueue/simple-client) +[![Build Status](https://img.shields.io/github/actions/workflow/status/php-enqueue/simple-client/ci.yml?branch=master)](https://github.com/php-enqueue/simple-client/actions?query=workflow%3ACI) [![Total Downloads](https://poser.pugx.org/enqueue/simple-client/d/total.png)](https://packagist.org/packages/enqueue/simple-client) [![Latest Stable Version](https://poser.pugx.org/enqueue/simple-client/version.png)](https://packagist.org/packages/enqueue/simple-client) - -The simple client takes Enqueue client classes and Symfony components and combines it to easy to use facade called `SimpleCLient`. + +The simple client takes Enqueue client classes and Symfony components and combines it to easy to use facade called `SimpleClient`. ## Resources * [Site](https://enqueue.forma-pro.com/) -* [Documentation](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md) +* [Documentation](https://php-enqueue.github.io/) * [Questions](https://gitter.im/php-enqueue/Lobby) * [Issue Tracker](https://github.com/php-enqueue/enqueue-dev/issues) ## Developed by Forma-Pro -Forma-Pro is a full stack development company which interests also spread to open source development. -Being a team of strong professionals we have an aim an ability to help community by developing cutting edge solutions in the areas of e-commerce, docker & microservice oriented architecture where we have accumulated a huge many-years experience. +Forma-Pro is a full stack development company which interests also spread to open source development. +Being a team of strong professionals we have an aim an ability to help community by developing cutting edge solutions in the areas of e-commerce, docker & microservice oriented architecture where we have accumulated a huge many-years experience. Our main specialization is Symfony framework based solution, but we are always looking to the technologies that allow us to do our job the best way. We are committed to creating solutions that revolutionize the way how things are developed in aspects of architecture & scalability. If you have any questions and inquires about our open source development, this product particularly or any other matter feel free to contact at opensource@forma-pro.com diff --git a/SimpleClient.php b/SimpleClient.php index 6402117..bbfd8b9 100644 --- a/SimpleClient.php +++ b/SimpleClient.php @@ -2,407 +2,348 @@ namespace Enqueue\SimpleClient; -use Enqueue\AmqpBunny\AmqpConnectionFactory as AmqpBunnyConnectionFactory; -use Enqueue\AmqpExt\AmqpConnectionFactory as AmqpExtConnectionFactory; -use Enqueue\AmqpLib\AmqpConnectionFactory as AmqpLibConnectionFactory; -use Enqueue\Client\ArrayProcessorRegistry; +use Enqueue\ArrayProcessorRegistry; +use Enqueue\Client\ChainExtension as ClientChainExtensions; use Enqueue\Client\Config; +use Enqueue\Client\ConsumptionExtension\DelayRedeliveredMessageExtension; +use Enqueue\Client\ConsumptionExtension\LogExtension; +use Enqueue\Client\ConsumptionExtension\SetRouterPropertiesExtension; use Enqueue\Client\DelegateProcessor; +use Enqueue\Client\DriverFactory; use Enqueue\Client\DriverInterface; -use Enqueue\Client\Meta\QueueMetaRegistry; -use Enqueue\Client\Meta\TopicMetaRegistry; +use Enqueue\Client\Message; +use Enqueue\Client\Producer; use Enqueue\Client\ProducerInterface; +use Enqueue\Client\Route; +use Enqueue\Client\RouteCollection; use Enqueue\Client\RouterProcessor; +use Enqueue\ConnectionFactoryFactory; use Enqueue\Consumption\CallbackProcessor; +use Enqueue\Consumption\ChainExtension as ConsumptionChainExtension; +use Enqueue\Consumption\Extension\ReplyExtension; +use Enqueue\Consumption\Extension\SignalExtension; use Enqueue\Consumption\ExtensionInterface; use Enqueue\Consumption\QueueConsumer; -use Enqueue\Dbal\DbalConnectionFactory; -use Enqueue\Dbal\Symfony\DbalTransportFactory; -use Enqueue\Fs\FsConnectionFactory; -use Enqueue\Fs\Symfony\FsTransportFactory; -use Enqueue\Gps\GpsConnectionFactory; -use Enqueue\Gps\Symfony\GpsTransportFactory; -use Enqueue\Null\Symfony\NullTransportFactory; -use Enqueue\Redis\RedisConnectionFactory; -use Enqueue\Redis\Symfony\RedisTransportFactory; +use Enqueue\Consumption\QueueConsumerInterface; use Enqueue\Rpc\Promise; -use Enqueue\Sqs\SqsConnectionFactory; -use Enqueue\Sqs\Symfony\SqsTransportFactory; -use Enqueue\Stomp\StompConnectionFactory; -use Enqueue\Stomp\Symfony\RabbitMqStompTransportFactory; -use Enqueue\Stomp\Symfony\StompTransportFactory; -use Enqueue\Symfony\AmqpTransportFactory; -use Enqueue\Symfony\DefaultTransportFactory; -use Enqueue\Symfony\MissingTransportFactory; -use Enqueue\Symfony\RabbitMqAmqpTransportFactory; -use Interop\Queue\PsrContext; -use Interop\Queue\PsrProcessor; -use Symfony\Component\DependencyInjection\ContainerBuilder; -use Symfony\Component\DependencyInjection\ContainerInterface; +use Enqueue\Rpc\RpcFactory; +use Enqueue\Symfony\Client\DependencyInjection\ClientFactory; +use Enqueue\Symfony\DependencyInjection\TransportFactory; +use Interop\Queue\Processor; +use Psr\Log\LoggerInterface; +use Psr\Log\NullLogger; +use Symfony\Component\Config\Definition\Builder\TreeBuilder; +use Symfony\Component\Config\Definition\NodeInterface; +use Symfony\Component\Config\Definition\Processor as ConfigProcessor; final class SimpleClient { /** - * @var ContainerInterface + * @var DriverInterface */ - private $container; + private $driver; /** - * @var array|string + * @var Producer */ - private $config; + private $producer; + + /** + * @var QueueConsumer + */ + private $queueConsumer; + + /** + * @var ArrayProcessorRegistry + */ + private $processorRegistry; + + /** + * @var DelegateProcessor + */ + private $delegateProcessor; + + /** + * @var LoggerInterface + */ + private $logger; /** * The config could be a transport DSN (string) or an array, here's an example of a few DSNs:. * - * amqp: - * amqp://guest:guest@localhost:5672/%2f?lazy=1&persisted=1 - * file://foo/bar/ - * null: + * $config = amqp: + * $config = amqp://guest:guest@localhost:5672/%2f?lazy=1&persisted=1 + * $config = file://foo/bar/ + * $config = null: * - * or an array, the most simple: + * or an array: * - *$config = [ + * $config = [ * 'transport' => [ - * 'default' => 'amqp', - * 'amqp' => [], // amqp options here - * ], + * 'dsn' => 'amqps://guest:guest@localhost:5672/%2f', + * 'ssl_cacert' => '/a/dir/cacert.pem', + * 'ssl_cert' => '/a/dir/cert.pem', + * 'ssl_key' => '/a/dir/key.pem', * ] * - * or a with all details: + * with custom connection factory class * * $config = [ * 'transport' => [ - * 'default' => 'amqp', - * 'amqp' => [], - * .... - * ], + * 'dsn' => 'amqps://guest:guest@localhost:5672/%2f', + * 'connection_factory_class' => 'aCustomConnectionFactory', + * // other options available options are factory_class and factory_service + * ] + * + * The client config + * + * $config = [ + * 'transport' => 'null:', * 'client' => [ * 'prefix' => 'enqueue', + * 'separator' => '.', * 'app_name' => 'app', * 'router_topic' => 'router', * 'router_queue' => 'default', - * 'default_processor_queue' => 'default', + * 'default_queue' => 'default', * 'redelivered_delay_time' => 0 * ], * 'extensions' => [ * 'signal_extension' => true, + * 'reply_extension' => true, * ] * ] * - * - * @param string|array $config - * @param ContainerBuilder|null $container + * @param string|array $config */ - public function __construct($config, ContainerBuilder $container = null) + public function __construct($config, ?LoggerInterface $logger = null) { - $this->container = $this->buildContainer($config, $container ?: new ContainerBuilder()); - $this->config = $config; + if (is_string($config)) { + $config = [ + 'transport' => $config, + 'client' => true, + ]; + } + + $this->logger = $logger ?: new NullLogger(); + + $this->build(['enqueue' => $config]); } /** - * @param string $topic - * @param string $processorName - * @param callable|PsrProcessor $processor + * @param callable|Processor $processor */ - public function bind($topic, $processorName, $processor) + public function bindTopic(string $topic, $processor, ?string $processorName = null): void { if (is_callable($processor)) { $processor = new CallbackProcessor($processor); } - if (false == $processor instanceof PsrProcessor) { - throw new \LogicException('The processor must be either callable or instance of PsrProcessor'); + if (false == $processor instanceof Processor) { + throw new \LogicException('The processor must be either callable or instance of Processor'); } - $queueName = $this->getConfig()->getDefaultProcessorQueueName(); + $processorName = $processorName ?: uniqid($processor::class); - $this->getTopicMetaRegistry()->addProcessor($topic, $processorName); - $this->getQueueMetaRegistry()->addProcessor($queueName, $processorName); - $this->getProcessorRegistry()->add($processorName, $processor); - $this->getRouterProcessor()->add($topic, $queueName, $processorName); + $this->driver->getRouteCollection()->add(new Route($topic, Route::TOPIC, $processorName)); + $this->processorRegistry->add($processorName, $processor); } /** - * @param string $command - * @param mixed $message - * @param bool $needReply - * - * @return Promise|null + * @param callable|Processor $processor */ - public function sendCommand($command, $message, $needReply = false) + public function bindCommand(string $command, $processor, ?string $processorName = null): void { - return $this->getProducer()->sendCommand($command, $message, $needReply); + if (is_callable($processor)) { + $processor = new CallbackProcessor($processor); + } + + if (false == $processor instanceof Processor) { + throw new \LogicException('The processor must be either callable or instance of Processor'); + } + + $processorName = $processorName ?: uniqid($processor::class); + + $this->driver->getRouteCollection()->add(new Route($command, Route::COMMAND, $processorName)); + $this->processorRegistry->add($processorName, $processor); } /** - * @param string $topic - * @param string|array $message + * @param string|array|\JsonSerializable|Message $message */ - public function sendEvent($topic, $message) + public function sendCommand(string $command, $message, bool $needReply = false): ?Promise { - $this->getProducer()->sendEvent($topic, $message); + return $this->producer->sendCommand($command, $message, $needReply); } /** - * @deprecated since 0.8.18 and will be removed in 0.9. Use sendEvent method instead - * - * @param string $topic - * @param string|array $message - * @param bool $setupBroker + * @param string|array|Message $message */ - public function send($topic, $message, $setupBroker = false) + public function sendEvent(string $topic, $message): void { - if ($setupBroker) { - $this->setupBroker(); - } - - $this->sendEvent($topic, $message); + $this->producer->sendEvent($topic, $message); } - /** - * @param ExtensionInterface|null $runtimeExtension - */ - public function consume(ExtensionInterface $runtimeExtension = null) + public function consume(?ExtensionInterface $runtimeExtension = null): void { $this->setupBroker(); - $processor = $this->getDelegateProcessor(); - $queueConsumer = $this->getQueueConsumer(); - $defaultQueueName = $this->getConfig()->getDefaultProcessorQueueName(); - $defaultTransportQueueName = $this->getConfig()->createTransportQueueName($defaultQueueName); + $boundQueues = []; - $queueConsumer->bind($defaultTransportQueueName, $processor); - if ($this->getConfig()->getRouterQueueName() != $defaultQueueName) { - $routerTransportQueueName = $this->getConfig()->createTransportQueueName($this->getConfig()->getRouterQueueName()); + $routerQueue = $this->getDriver()->createQueue($this->getDriver()->getConfig()->getRouterQueue()); + $this->queueConsumer->bind($routerQueue, $this->delegateProcessor); + $boundQueues[$routerQueue->getQueueName()] = true; - $queueConsumer->bind($routerTransportQueueName, $processor); - } + foreach ($this->driver->getRouteCollection()->all() as $route) { + $queue = $this->getDriver()->createRouteQueue($route); + if (array_key_exists($queue->getQueueName(), $boundQueues)) { + continue; + } - $queueConsumer->consume($runtimeExtension); - } + $this->queueConsumer->bind($queue, $this->delegateProcessor); - /** - * @return PsrContext - */ - public function getContext() - { - return $this->container->get('enqueue.transport.context'); - } + $boundQueues[$queue->getQueueName()] = true; + } - /** - * @return QueueConsumer - */ - public function getQueueConsumer() - { - return $this->container->get('enqueue.client.queue_consumer'); + $this->queueConsumer->consume($runtimeExtension); } - /** - * @return Config - */ - public function getConfig() + public function getQueueConsumer(): QueueConsumerInterface { - return $this->container->get('enqueue.client.config'); + return $this->queueConsumer; } - /** - * @return DriverInterface - */ - public function getDriver() + public function getDriver(): DriverInterface { - return $this->container->get('enqueue.client.driver'); + return $this->driver; } - /** - * @return TopicMetaRegistry - */ - public function getTopicMetaRegistry() + public function getProducer(bool $setupBroker = false): ProducerInterface { - return $this->container->get('enqueue.client.meta.topic_meta_registry'); - } + $setupBroker && $this->setupBroker(); - /** - * @return QueueMetaRegistry - */ - public function getQueueMetaRegistry() - { - return $this->container->get('enqueue.client.meta.queue_meta_registry'); + return $this->producer; } - /** - * @param bool $setupBroker - * - * @return ProducerInterface - */ - public function getProducer($setupBroker = false) + public function getDelegateProcessor(): DelegateProcessor { - $setupBroker && $this->setupBroker(); - - return $this->container->get('enqueue.client.producer'); + return $this->delegateProcessor; } - public function setupBroker() + public function setupBroker(): void { $this->getDriver()->setupBroker(); } - /** - * @return ArrayProcessorRegistry - */ - public function getProcessorRegistry() + public function build(array $configs): void { - return $this->container->get('enqueue.client.processor_registry'); - } + $configProcessor = new ConfigProcessor(); + $simpleClientConfig = $configProcessor->process($this->createConfiguration(), $configs); - /** - * @return DelegateProcessor - */ - public function getDelegateProcessor() - { - return $this->container->get('enqueue.client.delegate_processor'); - } + if (isset($simpleClientConfig['transport']['factory_service'])) { + throw new \LogicException('transport.factory_service option is not supported by simple client'); + } + if (isset($simpleClientConfig['transport']['factory_class'])) { + throw new \LogicException('transport.factory_class option is not supported by simple client'); + } + if (isset($simpleClientConfig['transport']['connection_factory_class'])) { + throw new \LogicException('transport.connection_factory_class option is not supported by simple client'); + } - /** - * @return RouterProcessor - */ - public function getRouterProcessor() - { - return $this->container->get('enqueue.client.router_processor'); - } + $connectionFactoryFactory = new ConnectionFactoryFactory(); + $connection = $connectionFactoryFactory->create($simpleClientConfig['transport']); - /** - * @param array|string $config - * @param ContainerBuilder $container - * - * @return ContainerInterface - */ - private function buildContainer($config, ContainerBuilder $container) - { - $config = $this->buildConfig($config); - $extension = $this->buildContainerExtension(); + $clientExtensions = new ClientChainExtensions([]); - $container->registerExtension($extension); - $container->loadFromExtension($extension->getAlias(), $config); + $config = new Config( + $simpleClientConfig['client']['prefix'], + $simpleClientConfig['client']['separator'], + $simpleClientConfig['client']['app_name'], + $simpleClientConfig['client']['router_topic'], + $simpleClientConfig['client']['router_queue'], + $simpleClientConfig['client']['default_queue'], + 'enqueue.client.router_processor', + $simpleClientConfig['transport'], + [] + ); - $container->compile(); + $routeCollection = new RouteCollection([]); + $driverFactory = new DriverFactory(); - return $container; - } + $driver = $driverFactory->create( + $connection, + $config, + $routeCollection + ); - /** - * @return SimpleClientContainerExtension - */ - private function buildContainerExtension() - { - $extension = new SimpleClientContainerExtension(); + $rpcFactory = new RpcFactory($driver->getContext()); - $extension->addTransportFactory(new DefaultTransportFactory('default')); - $extension->addTransportFactory(new NullTransportFactory('null')); + $producer = new Producer($driver, $rpcFactory, $clientExtensions); - if (class_exists(StompConnectionFactory::class)) { - $extension->addTransportFactory(new StompTransportFactory('stomp')); - $extension->addTransportFactory(new RabbitMqStompTransportFactory('rabbitmq_stomp')); - } else { - $extension->addTransportFactory(new MissingTransportFactory('stomp', ['enqueue/stomp'])); - $extension->addTransportFactory(new MissingTransportFactory('rabbitmq_stomp', ['enqueue/stomp'])); - } + $processorRegistry = new ArrayProcessorRegistry([]); - if ( - class_exists(AmqpBunnyConnectionFactory::class) || - class_exists(AmqpExtConnectionFactory::class) || - class_exists(AmqpLibConnectionFactory::class) - ) { - $extension->addTransportFactory(new AmqpTransportFactory('amqp')); - $extension->addTransportFactory(new RabbitMqAmqpTransportFactory('rabbitmq_amqp')); - } else { - $amppPackages = ['enqueue/amqp-ext', 'enqueue/amqp-bunny', 'enqueue/amqp-lib']; - $extension->addTransportFactory(new MissingTransportFactory('amqp', $amppPackages)); - $extension->addTransportFactory(new MissingTransportFactory('rabbitmq_amqp', $amppPackages)); - } + $delegateProcessor = new DelegateProcessor($processorRegistry); - if (class_exists(FsConnectionFactory::class)) { - $extension->addTransportFactory(new FsTransportFactory('fs')); - } else { - $extension->addTransportFactory(new MissingTransportFactory('fs', ['enqueue/fs'])); + // consumption extensions + $consumptionExtensions = []; + if ($simpleClientConfig['client']['redelivered_delay_time']) { + $consumptionExtensions[] = new DelayRedeliveredMessageExtension($driver, $simpleClientConfig['client']['redelivered_delay_time']); } - if (class_exists(RedisConnectionFactory::class)) { - $extension->addTransportFactory(new RedisTransportFactory('redis')); - } else { - $extension->addTransportFactory(new MissingTransportFactory('redis', ['enqueue/redis'])); + if ($simpleClientConfig['extensions']['signal_extension']) { + $consumptionExtensions[] = new SignalExtension(); } - if (class_exists(DbalConnectionFactory::class)) { - $extension->addTransportFactory(new DbalTransportFactory('dbal')); - } else { - $extension->addTransportFactory(new MissingTransportFactory('dbal', ['enqueue/dbal'])); + if ($simpleClientConfig['extensions']['reply_extension']) { + $consumptionExtensions[] = new ReplyExtension(); } - if (class_exists(SqsConnectionFactory::class)) { - $extension->addTransportFactory(new SqsTransportFactory('sqs')); - } else { - $extension->addTransportFactory(new MissingTransportFactory('sqs', ['enqueue/sqs'])); - } + $consumptionExtensions[] = new SetRouterPropertiesExtension($driver); + $consumptionExtensions[] = new LogExtension(); - if (class_exists(GpsConnectionFactory::class)) { - $extension->addTransportFactory(new GpsTransportFactory('gps')); - } else { - $extension->addTransportFactory(new MissingTransportFactory('gps', ['enqueue/gps'])); - } + $consumptionChainExtension = new ConsumptionChainExtension($consumptionExtensions); + $queueConsumer = new QueueConsumer($driver->getContext(), $consumptionChainExtension, [], $this->logger); + + $routerProcessor = new RouterProcessor($driver); - return $extension; + $processorRegistry->add($config->getRouterProcessor(), $routerProcessor); + + $this->driver = $driver; + $this->producer = $producer; + $this->queueConsumer = $queueConsumer; + $this->delegateProcessor = $delegateProcessor; + $this->processorRegistry = $processorRegistry; } - /** - * @param array|string $config - * - * @return array - */ - private function buildConfig($config) + private function createConfiguration(): NodeInterface { - if (is_string($config) && false !== strpos($config, '://')) { - $extConfig = [ - 'client' => [], - 'transport' => [ - 'default' => $config, - ], - ]; - } elseif (is_string($config)) { - $extConfig = [ - 'client' => [], - 'transport' => [ - 'default' => $config, - $config => [], - ], - ]; - } elseif (is_array($config)) { - $extConfig = array_replace_recursive([ - 'client' => [], - 'transport' => [], - ], $config); + if (method_exists(TreeBuilder::class, 'getRootNode')) { + $tb = new TreeBuilder('enqueue'); + $rootNode = $tb->getRootNode(); } else { - throw new \LogicException('Expects config is string or array'); - } - - if (empty($extConfig['transport']['default'])) { - $defaultTransport = null; - foreach ($extConfig['transport'] as $transport => $config) { - if ('default' === $transport) { - continue; - } - - $defaultTransport = $transport; - break; - } - - if (false == $defaultTransport) { - throw new \LogicException('There is no transport configured'); - } - - $extConfig['transport']['default'] = $defaultTransport; + $tb = new TreeBuilder(); + $rootNode = $tb->root('enqueue'); } - return $extConfig; + $rootNode + ->beforeNormalization() + ->ifEmpty()->then(function () { + return ['transport' => ['dsn' => 'null:']]; + }); + + $rootNode + ->append(TransportFactory::getConfiguration()) + ->append(TransportFactory::getQueueConsumerConfiguration()) + ->append(ClientFactory::getConfiguration(false)) + ; + + $rootNode->children() + ->arrayNode('extensions')->addDefaultsIfNotSet()->children() + ->booleanNode('signal_extension')->defaultValue(function_exists('pcntl_signal_dispatch'))->end() + ->booleanNode('reply_extension')->defaultTrue()->end() + ->end() + ; + + return $tb->buildTree(); } } diff --git a/SimpleClientContainerExtension.php b/SimpleClientContainerExtension.php deleted file mode 100644 index 7bcb849..0000000 --- a/SimpleClientContainerExtension.php +++ /dev/null @@ -1,198 +0,0 @@ -factories = []; - } - - /** - * {@inheritdoc} - */ - public function getAlias() - { - return 'enqueue'; - } - - /** - * @param TransportFactoryInterface $transportFactory - */ - public function addTransportFactory(TransportFactoryInterface $transportFactory) - { - $name = $transportFactory->getName(); - - if (empty($name)) { - throw new \LogicException('Transport factory name cannot be empty'); - } - if (array_key_exists($name, $this->factories)) { - throw new \LogicException(sprintf('Transport factory with such name already added. Name %s', $name)); - } - - $this->factories[$name] = $transportFactory; - } - - /** - * {@inheritdoc} - */ - public function load(array $configs, ContainerBuilder $container) - { - $configProcessor = new Processor(); - $config = $configProcessor->process($this->createConfiguration(), $configs); - - foreach ($config['transport'] as $name => $transportConfig) { - $this->factories[$name]->createConnectionFactory($container, $transportConfig); - $this->factories[$name]->createContext($container, $transportConfig); - $this->factories[$name]->createDriver($container, $transportConfig); - } - - $transportConfig = isset($config['transport']['default']['alias']) ? - $config['transport'][$config['transport']['default']['alias']] : - [] - ; - - $container->register('enqueue.client.config', Config::class) - ->setPublic(true) - ->setArguments([ - $config['client']['prefix'], - $config['client']['app_name'], - $config['client']['router_topic'], - $config['client']['router_queue'], - $config['client']['default_processor_queue'], - 'enqueue.client.router_processor', - $transportConfig, - ]); - - $container->register('enqueue.client.rpc_factory', RpcFactory::class) - ->setPublic(true) - ->setArguments([ - new Reference('enqueue.transport.context'), - ]); - - $container->register('enqueue.client.producer', Producer::class) - ->setPublic(true) - ->setArguments([ - new Reference('enqueue.client.driver'), - new Reference('enqueue.client.rpc_factory'), - ]); - - $container->setAlias('enqueue.client.producer_v2', new Alias('enqueue.client.producer', true)); - - $container->register('enqueue.client.meta.topic_meta_registry', TopicMetaRegistry::class) - ->setPublic(true) - ->setArguments([[]]); - - $container->register('enqueue.client.meta.queue_meta_registry', QueueMetaRegistry::class) - ->setPublic(true) - ->setArguments([new Reference('enqueue.client.config'), []]); - - $container->register('enqueue.client.processor_registry', ArrayProcessorRegistry::class) - ->setPublic(true) - ; - - $container->register('enqueue.client.delegate_processor', DelegateProcessor::class) - ->setPublic(true) - ->setArguments([new Reference('enqueue.client.processor_registry')]); - - $container->register('enqueue.client.queue_consumer', QueueConsumer::class) - ->setPublic(true) - ->setArguments([ - new Reference('enqueue.transport.context'), - new Reference('enqueue.consumption.extensions'), - ]); - - // router - $container->register('enqueue.client.router_processor', RouterProcessor::class) - ->setPublic(true) - ->setArguments([new Reference('enqueue.client.driver'), []]); - $container->getDefinition('enqueue.client.processor_registry') - ->addMethodCall('add', ['enqueue.client.router_processor', new Reference('enqueue.client.router_processor')]); - $container->getDefinition('enqueue.client.meta.queue_meta_registry') - ->addMethodCall('addProcessor', [$config['client']['router_queue'], 'enqueue.client.router_processor']); - - // extensions - $extensions = []; - if ($config['client']['redelivered_delay_time']) { - $container->register('enqueue.client.delay_redelivered_message_extension', DelayRedeliveredMessageExtension::class) - ->setPublic(true) - ->setArguments([ - new Reference('enqueue.client.driver'), - $config['client']['redelivered_delay_time'], - ]); - - $extensions[] = new Reference('enqueue.client.delay_redelivered_message_extension'); - } - - $container->register('enqueue.client.extension.set_router_properties', SetRouterPropertiesExtension::class) - ->setPublic(true) - ->setArguments([new Reference('enqueue.client.driver')]); - - $extensions[] = new Reference('enqueue.client.extension.set_router_properties'); - - $container->register('enqueue.consumption.extensions', ConsumptionChainExtension::class) - ->setPublic(true) - ->setArguments([$extensions]); - } - - /** - * @return NodeInterface - */ - private function createConfiguration() - { - $tb = new TreeBuilder(); - $rootNode = $tb->root('enqueue'); - - $transportChildren = $rootNode->children() - ->arrayNode('transport')->isRequired()->children(); - - foreach ($this->factories as $factory) { - $factory->addConfiguration( - $transportChildren->arrayNode($factory->getName()) - ); - } - - $rootNode->children() - ->arrayNode('client')->children() - ->scalarNode('prefix')->defaultValue('enqueue')->end() - ->scalarNode('app_name')->defaultValue('app')->end() - ->scalarNode('router_topic')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() - ->scalarNode('router_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() - ->scalarNode('default_processor_queue')->defaultValue(Config::DEFAULT_PROCESSOR_QUEUE_NAME)->cannotBeEmpty()->end() - ->integerNode('redelivered_delay_time')->min(0)->defaultValue(0)->end() - ->end()->end() - ->arrayNode('extensions')->addDefaultsIfNotSet()->children() - ->booleanNode('signal_extension')->defaultValue(function_exists('pcntl_signal_dispatch'))->end() - ->end()->end() - ; - - return $tb->buildTree(); - } -} diff --git a/Tests/Functional/SimpleClientTest.php b/Tests/Functional/SimpleClientTest.php index 75d508c..ce75457 100644 --- a/Tests/Functional/SimpleClientTest.php +++ b/Tests/Functional/SimpleClientTest.php @@ -7,9 +7,8 @@ use Enqueue\Consumption\Extension\LimitConsumptionTimeExtension; use Enqueue\Consumption\Result; use Enqueue\SimpleClient\SimpleClient; -use Enqueue\Test\RabbitmqAmqpExtension; -use Enqueue\Test\RabbitManagementExtensionTrait; -use Interop\Queue\PsrMessage; +use Interop\Queue\Exception\PurgeQueueNotSupportedException; +use Interop\Queue\Message; use PHPUnit\Framework\TestCase; /** @@ -17,119 +16,197 @@ */ class SimpleClientTest extends TestCase { - use RabbitmqAmqpExtension; - use RabbitManagementExtensionTrait; - - public function setUp() - { - if (false == getenv('RABBITMQ_HOST')) { - throw new \PHPUnit_Framework_SkippedTestError('Functional tests are not allowed in this environment'); - } - - $this->removeQueue('enqueue.app.default'); - } - public function transportConfigDataProvider() { - yield 'amqp' => [[ - 'transport' => [ - 'default' => 'amqp', - 'amqp' => [ - 'driver' => 'ext', - 'host' => getenv('RABBITMQ_HOST'), - 'port' => getenv('RABBITMQ_AMQP__PORT'), - 'user' => getenv('RABBITMQ_USER'), - 'pass' => getenv('RABBITMQ_PASSWORD'), - 'vhost' => getenv('RABBITMQ_VHOST'), - ], - ], - ]]; + yield 'amqp_dsn' => [[ + 'transport' => getenv('AMQP_DSN'), + ], '+1sec']; - yield 'config_as_dsn_string' => [getenv('AMQP_DSN')]; + yield 'dbal_dsn' => [[ + 'transport' => getenv('DOCTRINE_DSN'), + ], '+1sec']; - yield 'amqp_dsn' => [[ + yield 'rabbitmq_stomp' => [[ 'transport' => [ - 'default' => 'amqp', - 'amqp' => getenv('AMQP_DSN'), + 'dsn' => getenv('RABITMQ_STOMP_DSN'), + 'lazy' => false, + 'management_plugin_installed' => true, ], - ]]; + ], '+1sec']; - yield 'default_amqp_as_dsn' => [[ + yield 'predis_dsn' => [[ 'transport' => [ - 'default' => getenv('AMQP_DSN'), + 'dsn' => getenv('PREDIS_DSN'), + 'lazy' => false, ], - ]]; + ], '+1sec']; - yield [[ + yield 'fs_dsn' => [[ + 'transport' => 'file://'.sys_get_temp_dir(), + ], '+1sec']; + + yield 'sqs' => [[ 'transport' => [ - 'default' => 'rabbitmq_amqp', - 'rabbitmq_amqp' => [ - 'driver' => 'ext', - 'host' => getenv('RABBITMQ_HOST'), - 'port' => getenv('RABBITMQ_AMQP__PORT'), - 'user' => getenv('RABBITMQ_USER'), - 'pass' => getenv('RABBITMQ_PASSWORD'), - 'vhost' => getenv('RABBITMQ_VHOST'), - ], + 'dsn' => getenv('SQS_DSN'), ], - ]]; + ], '+1sec']; + + yield 'mongodb_dsn' => [[ + 'transport' => getenv('MONGO_DSN'), + ], '+1sec']; + } + + public function testShouldWorkWithStringDsnConstructorArgument() + { + $actualMessage = null; + + $client = new SimpleClient(getenv('AMQP_DSN')); + + $client->bindTopic('foo_topic', function (Message $message) use (&$actualMessage) { + $actualMessage = $message; + + return Result::ACK; + }); + + $client->setupBroker(); + $this->purgeQueue($client); + + $client->sendEvent('foo_topic', 'Hello there!'); + + $client->getQueueConsumer()->setReceiveTimeout(200); + $client->consume(new ChainExtension([ + new LimitConsumptionTimeExtension(new \DateTime('+1sec')), + new LimitConsumedMessagesExtension(2), + ])); + + $this->assertInstanceOf(Message::class, $actualMessage); + $this->assertSame('Hello there!', $actualMessage->getBody()); } /** * @dataProvider transportConfigDataProvider - * - * @param mixed $config */ - public function testProduceAndConsumeOneMessage($config) + public function testSendEventWithOneSubscriber($config, string $timeLimit) { $actualMessage = null; + $config['client'] = [ + 'prefix' => str_replace('.', '', uniqid('enqueue', true)), + 'app_name' => 'simple_client', + 'router_topic' => 'test', + 'router_queue' => 'test', + 'default_queue' => 'test', + ]; + $client = new SimpleClient($config); - $client->bind('foo_topic', 'foo_processor', function (PsrMessage $message) use (&$actualMessage) { + + $client->bindTopic('foo_topic', function (Message $message) use (&$actualMessage) { $actualMessage = $message; return Result::ACK; }); - $client->send('foo_topic', 'Hello there!', true); + $client->setupBroker(); + $this->purgeQueue($client); + + $client->sendEvent('foo_topic', 'Hello there!'); + $client->getQueueConsumer()->setReceiveTimeout(200); $client->consume(new ChainExtension([ - new LimitConsumptionTimeExtension(new \DateTime('+5sec')), + new LimitConsumptionTimeExtension(new \DateTime($timeLimit)), new LimitConsumedMessagesExtension(2), ])); - $this->assertInstanceOf(PsrMessage::class, $actualMessage); + $this->assertInstanceOf(Message::class, $actualMessage); $this->assertSame('Hello there!', $actualMessage->getBody()); } /** * @dataProvider transportConfigDataProvider - * - * @param mixed $config */ - public function testProduceAndRouteToTwoConsumes($config) + public function testSendEventWithTwoSubscriber($config, string $timeLimit) { $received = 0; + $config['client'] = [ + 'prefix' => str_replace('.', '', uniqid('enqueue', true)), + 'app_name' => 'simple_client', + 'router_topic' => 'test', + 'router_queue' => 'test', + 'default_queue' => 'test', + ]; + $client = new SimpleClient($config); - $client->bind('foo_topic', 'foo_processor1', function () use (&$received) { + + $client->bindTopic('foo_topic', function () use (&$received) { ++$received; return Result::ACK; }); - $client->bind('foo_topic', 'foo_processor2', function () use (&$received) { + $client->bindTopic('foo_topic', function () use (&$received) { ++$received; return Result::ACK; }); - $client->send('foo_topic', 'Hello there!', true); + $client->setupBroker(); + $this->purgeQueue($client); + $client->sendEvent('foo_topic', 'Hello there!'); + $client->getQueueConsumer()->setReceiveTimeout(200); $client->consume(new ChainExtension([ - new LimitConsumptionTimeExtension(new \DateTime('+5sec')), + new LimitConsumptionTimeExtension(new \DateTime($timeLimit)), new LimitConsumedMessagesExtension(3), ])); $this->assertSame(2, $received); } + + /** + * @dataProvider transportConfigDataProvider + */ + public function testSendCommand($config, string $timeLimit) + { + $received = 0; + + $config['client'] = [ + 'prefix' => str_replace('.', '', uniqid('enqueue', true)), + 'app_name' => 'simple_client', + 'router_topic' => 'test', + 'router_queue' => 'test', + 'default_queue' => 'test', + ]; + + $client = new SimpleClient($config); + + $client->bindCommand('foo_command', function () use (&$received) { + ++$received; + + return Result::ACK; + }); + + $client->setupBroker(); + $this->purgeQueue($client); + + $client->sendCommand('foo_command', 'Hello there!'); + $client->getQueueConsumer()->setReceiveTimeout(200); + $client->consume(new ChainExtension([ + new LimitConsumptionTimeExtension(new \DateTime($timeLimit)), + new LimitConsumedMessagesExtension(1), + ])); + + $this->assertSame(1, $received); + } + + protected function purgeQueue(SimpleClient $client): void + { + $driver = $client->getDriver(); + + $queue = $driver->createQueue($driver->getConfig()->getDefaultQueue()); + + try { + $client->getDriver()->getContext()->purgeQueue($queue); + } catch (PurgeQueueNotSupportedException $e) { + } + } } diff --git a/Tests/fix_composer_json.php b/Tests/fix_composer_json.php index fc430e2..01f73c9 100644 --- a/Tests/fix_composer_json.php +++ b/Tests/fix_composer_json.php @@ -4,6 +4,6 @@ $composerJson = json_decode(file_get_contents(__DIR__.'/../composer.json'), true); -$composerJson['config']['platform']['ext-amqp'] = '1.7'; +$composerJson['config']['platform']['ext-amqp'] = '1.9.3'; -file_put_contents(__DIR__.'/../composer.json', json_encode($composerJson, JSON_PRETTY_PRINT)); +file_put_contents(__DIR__.'/../composer.json', json_encode($composerJson, \JSON_PRETTY_PRINT)); diff --git a/composer.json b/composer.json index e900bb9..2d2bd37 100644 --- a/composer.json +++ b/composer.json @@ -6,18 +6,19 @@ "homepage": "https://enqueue.forma-pro.com/", "license": "MIT", "require": { - "php": "^7.1.3", - "enqueue/enqueue": "^0.9@dev", - "symfony/dependency-injection": "^3.4|^4", - "symfony/config": "^3.4|^4", - "symfony/console": "^3.4|^4" + "php": "^8.1", + "enqueue/enqueue": "^0.10", + "queue-interop/amqp-interop": "^0.8.2", + "queue-interop/queue-interop": "^0.8", + "symfony/config": "^5.4|^6.0" }, "require-dev": { - "phpunit/phpunit": "~5.5", - "enqueue/test": "^0.9@dev", - "enqueue/amqp-ext": "^0.9@dev", - "enqueue/fs": "^0.9@dev", - "enqueue/null": "^0.9@dev" + "phpunit/phpunit": "^9.5", + "enqueue/test": "0.10.x-dev", + "enqueue/amqp-ext": "0.10.x-dev", + "enqueue/fs": "0.10.x-dev", + "enqueue/null": "0.10.x-dev", + "symfony/yaml": "^5.4|^6.0" }, "support": { "email": "opensource@forma-pro.com", @@ -35,7 +36,7 @@ "minimum-stability": "dev", "extra": { "branch-alias": { - "dev-master": "0.9.x-dev" + "dev-master": "0.10.x-dev" } } } diff --git a/phpunit.xml.dist b/phpunit.xml.dist index e86476d..81d59cf 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -1,16 +1,11 @@ - +