diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..bdf2dcb --- /dev/null +++ b/.gitattributes @@ -0,0 +1,5 @@ +/Tests export-ignore +.gitattributes export-ignore +.gitignore export-ignore +.travis.yml export-ignore +phpunit.xml.dist export-ignore diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..65cfbbb --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,29 @@ +name: CI +on: + pull_request: + push: + branches: + - master +jobs: + tests: + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + php: ['7.4', '8.0', '8.1', '8.2'] + + name: PHP ${{ matrix.php }} tests + + steps: + - uses: actions/checkout@v2 + + - uses: shivammathur/setup-php@v2 + with: + php-version: ${{ matrix.php }} + coverage: none + + - uses: "ramsey/composer-install@v1" + with: + composer-options: "--prefer-source" + + - run: SYMFONY_DEPRECATIONS_HELPER=weak vendor/bin/phpunit --exclude-group=functional diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 1a44d0c..0000000 --- a/.travis.yml +++ /dev/null @@ -1,21 +0,0 @@ -sudo: false - -git: - depth: 10 - -language: php - -php: - - '5.6' - - '7.0' - -cache: - directories: - - $HOME/.composer/cache - -install: - - composer self-update - - composer install --prefer-source - -script: - - SYMFONY_DEPRECATIONS_HELPER=weak vendor/bin/phpunit --exclude-group=functional diff --git a/CannotObtainLockException.php b/CannotObtainLockException.php new file mode 100644 index 0000000..2788ded --- /dev/null +++ b/CannotObtainLockException.php @@ -0,0 +1,11 @@ +context = $context; - $this->config = $config; - $this->queueMetaRegistry = $queueMetaRegistry; - } - - /** - * {@inheritdoc} - */ - public function sendToRouter(Message $message) - { - if (false == $message->getProperty(Config::PARAMETER_TOPIC_NAME)) { - throw new \LogicException('Topic name parameter is required but is not set'); - } - - $topic = $this->createRouterTopic(); - $transportMessage = $this->createTransportMessage($message); - - $this->context->createProducer()->send($topic, $transportMessage); - } - - /** - * {@inheritdoc} - */ - public function sendToProcessor(Message $message) - { - if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) { - throw new \LogicException('Processor name parameter is required but is not set'); - } - - if (false == $queueName = $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) { - throw new \LogicException('Queue name parameter is required but is not set'); - } - - $transportMessage = $this->createTransportMessage($message); - $destination = $this->createQueue($queueName); - - $this->context->createProducer()->send($destination, $transportMessage); - } - - /** - * {@inheritdoc} - */ - public function setupBroker(LoggerInterface $logger = null) - { - $logger = $logger ?: new NullLogger(); - $log = function ($text, ...$args) use ($logger) { - $logger->debug(sprintf('[FsDriver] '.$text, ...$args)); - }; - - // setup router - $routerTopic = $this->createRouterTopic(); - $routerQueue = $this->createQueue($this->config->getRouterQueueName()); - - $log('Declare router exchange "%s" file: %s', $routerTopic->getTopicName(), $routerTopic->getFileInfo()); - $this->context->declareDestination($routerTopic); - - $log('Declare router queue "%s" file: %s', $routerQueue->getQueueName(), $routerTopic->getFileInfo()); - $this->context->declareDestination($routerQueue); - - // setup queues - foreach ($this->queueMetaRegistry->getQueuesMeta() as $meta) { - $queue = $this->createQueue($meta->getClientName()); - - $log('Declare processor queue "%s" file: %s', $queue->getQueueName(), $queue->getFileInfo()); - $this->context->declareDestination($queue); - } - } - - /** - * {@inheritdoc} - * - * @return FsDestination - */ - public function createQueue($queueName) - { - $transportName = $this->queueMetaRegistry->getQueueMeta($queueName)->getTransportName(); - - return $this->context->createQueue($transportName); - } - - /** - * {@inheritdoc} - * - * @return FsMessage - */ - public function createTransportMessage(Message $message) - { - $properties = $message->getProperties(); - - $headers = $message->getHeaders(); - $headers['content_type'] = $message->getContentType(); - - $transportMessage = $this->context->createMessage(); - $transportMessage->setBody($message->getBody()); - $transportMessage->setHeaders($headers); - $transportMessage->setProperties($properties); - $transportMessage->setMessageId($message->getMessageId()); - $transportMessage->setTimestamp($message->getTimestamp()); - $transportMessage->setReplyTo($message->getReplyTo()); - $transportMessage->setCorrelationId($message->getCorrelationId()); - - return $transportMessage; - } - - /** - * @param FsMessage $message - * - * {@inheritdoc} - */ - public function createClientMessage(PsrMessage $message) - { - $clientMessage = new Message(); - - $clientMessage->setBody($message->getBody()); - $clientMessage->setHeaders($message->getHeaders()); - $clientMessage->setProperties($message->getProperties()); - - $clientMessage->setContentType($message->getHeader('content_type')); - $clientMessage->setMessageId($message->getMessageId()); - $clientMessage->setTimestamp($message->getTimestamp()); - $clientMessage->setPriority(MessagePriority::NORMAL); - $clientMessage->setReplyTo($message->getReplyTo()); - $clientMessage->setCorrelationId($message->getCorrelationId()); - - return $clientMessage; - } - - /** - * @return Config - */ - public function getConfig() - { - return $this->config; - } - - /** - * @return FsDestination - */ - private function createRouterTopic() - { - return $this->context->createTopic( - $this->config->createTransportQueueName($this->config->getRouterTopicName()) - ); - } -} diff --git a/FsConnectionFactory.php b/FsConnectionFactory.php index 8627edb..9c4ba17 100644 --- a/FsConnectionFactory.php +++ b/FsConnectionFactory.php @@ -1,10 +1,14 @@ sys_get_temp_dir().'/enqueue']; + $config = $this->parseDsn('file://'.sys_get_temp_dir().'/enqueue'); } elseif (is_string($config)) { - $config = $this->parseDsn($config); + if ('/' === $config[0]) { + $config = $this->parseDsn('file://'.$config); + } else { + $config = $this->parseDsn($config); + } } elseif (is_array($config)) { } else { throw new \LogicException('The config must be either an array of options, a DSN string or null'); } $this->config = array_replace($this->defaultConfig(), $config); + + if (empty($this->config['path'])) { + throw new \LogicException('The path option must be set.'); + } } /** - * {@inheritdoc} - * * @return FsContext */ - public function createContext() + public function createContext(): Context { return new FsContext( $this->config['path'], @@ -58,49 +68,24 @@ public function createContext() ); } - /** - * @param string $dsn - * - * @return array - */ - private function parseDsn($dsn) + private function parseDsn(string $dsn): array { - if ($dsn && '/' === $dsn[0]) { - return ['path' => $dsn]; - } + $dsn = Dsn::parseFirst($dsn); - if (false === strpos($dsn, 'file:')) { - throw new \LogicException(sprintf('The given DSN "%s" is not supported. Must start with "file:".', $dsn)); + $supportedSchemes = ['file']; + if (false == in_array($dsn->getSchemeProtocol(), $supportedSchemes, true)) { + throw new \LogicException(sprintf('The given scheme protocol "%s" is not supported. It must be one of "%s"', $dsn->getSchemeProtocol(), implode('", "', $supportedSchemes))); } - $dsn = substr($dsn, 7); - - $path = parse_url($dsn, PHP_URL_PATH); - $query = parse_url($dsn, PHP_URL_QUERY); - - if ('/' != $path[0]) { - throw new \LogicException(sprintf('Failed to parse DSN path "%s". The path must start with "/"', $path)); - } - - if ($query) { - $config = []; - parse_str($query, $config); - } - - if (isset($config['pre_fetch_count'])) { - $config['pre_fetch_count'] = (int) $config['pre_fetch_count']; - } - - if (isset($config['chmod'])) { - $config['chmod'] = intval($config['chmod'], 8); - } - - $config['path'] = $path; - - return $config; + return array_filter(array_replace($dsn->getQuery(), [ + 'path' => $dsn->getPath(), + 'pre_fetch_count' => $dsn->getDecimal('pre_fetch_count'), + 'chmod' => $dsn->getOctal('chmod'), + 'polling_interval' => $dsn->getDecimal('polling_interval'), + ]), function ($value) { return null !== $value; }); } - private function defaultConfig() + private function defaultConfig(): array { return [ 'path' => null, diff --git a/FsConsumer.php b/FsConsumer.php index c42ea4d..614461e 100644 --- a/FsConsumer.php +++ b/FsConsumer.php @@ -1,12 +1,15 @@ context = $context; $this->destination = $destination; @@ -49,40 +49,32 @@ public function __construct(FsContext $context, FsDestination $destination, $pre /** * Set polling interval in milliseconds. - * - * @param int $msec */ - public function setPollingInterval($msec) + public function setPollingInterval(int $msec): void { - $this->pollingInterval = $msec * 1000; + $this->pollingInterval = $msec; } /** * Get polling interval in milliseconds. - * - * @return int */ - public function getPollingInterval() + public function getPollingInterval(): int { - return (int) $this->pollingInterval / 1000; + return $this->pollingInterval; } /** - * {@inheritdoc} - * * @return FsDestination */ - public function getQueue() + public function getQueue(): Queue { return $this->destination; } /** - * {@inheritdoc} - * - * @return FsMessage|null + * @return FsMessage */ - public function receive($timeout = 0) + public function receive(int $timeout = 0): ?Message { $timeout /= 1000; $startAt = microtime(true); @@ -95,21 +87,21 @@ public function receive($timeout = 0) } if ($timeout && (microtime(true) - $startAt) >= $timeout) { - return; + return null; } - usleep($this->pollingInterval); + usleep($this->pollingInterval * 1000); if ($timeout && (microtime(true) - $startAt) >= $timeout) { - return; + return null; } } } /** - * {@inheritdoc} + * @return FsMessage */ - public function receiveNoWait() + public function receiveNoWait(): ?Message { if ($this->preFetchedMessages) { return array_shift($this->preFetchedMessages); @@ -120,7 +112,7 @@ public function receiveNoWait() while ($count) { $frame = $this->readFrame($file, 1); - //guards + // guards if ($frame && false == ('|' == $frame[0] || ' ' == $frame[0])) { throw new \LogicException(sprintf('The frame could start from either " " or "|". The malformed frame starts with "%s".', $frame[0])); } @@ -131,7 +123,8 @@ public function receiveNoWait() ftruncate($file, fstat($file)['size'] - strlen($frame)); rewind($file); - $rawMessage = substr(trim($frame), 1); + $rawMessage = str_replace('\|\{', '|{', $frame); + $rawMessage = substr(trim($rawMessage), 1); if ($rawMessage) { try { @@ -139,15 +132,15 @@ public function receiveNoWait() $expireAt = $fetchedMessage->getHeader('x-expire-at'); if ($expireAt && $expireAt - microtime(true) < 0) { // message has expired, just drop it. - return; + return null; } $this->preFetchedMessages[] = $fetchedMessage; } catch (\Exception $e) { - throw new \LogicException(sprintf("Cannot decode json message '%s'", $rawMessage), null, $e); + throw new \LogicException(sprintf("Cannot decode json message '%s'", $rawMessage), 0, $e); } } else { - return; + return null; } --$count; @@ -157,20 +150,16 @@ public function receiveNoWait() if ($this->preFetchedMessages) { return array_shift($this->preFetchedMessages); } + + return null; } - /** - * {@inheritdoc} - */ - public function acknowledge(PsrMessage $message) + public function acknowledge(Message $message): void { // do nothing. fs transport always works in auto ack mode } - /** - * {@inheritdoc} - */ - public function reject(PsrMessage $message, $requeue = false) + public function reject(Message $message, bool $requeue = false): void { InvalidMessageException::assertMessageInstanceOf($message, FsMessage::class); @@ -181,40 +170,31 @@ public function reject(PsrMessage $message, $requeue = false) } } - /** - * @return int - */ - public function getPreFetchCount() + public function getPreFetchCount(): int { return $this->preFetchCount; } - /** - * @param int $preFetchCount - */ - public function setPreFetchCount($preFetchCount) + public function setPreFetchCount(int $preFetchCount): void { $this->preFetchCount = $preFetchCount; } /** * @param resource $file - * @param int $frameNumber - * - * @return string */ - private function readFrame($file, $frameNumber) + private function readFrame($file, int $frameNumber): string { $frameSize = 64; $offset = $frameNumber * $frameSize; - fseek($file, -$offset, SEEK_END); + fseek($file, -$offset, \SEEK_END); $frame = fread($file, $frameSize); if ('' == $frame) { return ''; } - if (false !== strpos($frame, '|{')) { + if (str_contains($frame, '|{')) { return $frame; } diff --git a/FsContext.php b/FsContext.php index 6662240..c735e13 100644 --- a/FsContext.php +++ b/FsContext.php @@ -1,17 +1,23 @@ mkdir($storeDir); @@ -54,122 +54,98 @@ public function __construct($storeDir, $preFetchCount, $chmod, $pollingInterval $this->chmod = $chmod; $this->pollingInterval = $pollingInterval; - $this->lockHandlers = []; + $this->lock = new LegacyFilesystemLock(); } /** - * {@inheritdoc} - * * @return FsMessage */ - public function createMessage($body = '', array $properties = [], array $headers = []) + public function createMessage(string $body = '', array $properties = [], array $headers = []): Message { return new FsMessage($body, $properties, $headers); } /** - * {@inheritdoc} - * * @return FsDestination */ - public function createTopic($topicName) + public function createTopic(string $topicName): Topic { return $this->createQueue($topicName); } /** - * {@inheritdoc} - * * @return FsDestination */ - public function createQueue($queueName) + public function createQueue(string $queueName): Queue { return new FsDestination(new \SplFileInfo($this->getStoreDir().'/'.$queueName)); } - /** - * @param PsrDestination|FsDestination $destination - */ - public function declareDestination(PsrDestination $destination) + public function declareDestination(FsDestination $destination): void { - InvalidDestinationException::assertDestinationInstanceOf($destination, FsDestination::class); + // InvalidDestinationException::assertDestinationInstanceOf($destination, FsDestination::class); set_error_handler(function ($severity, $message, $file, $line) { - // do not throw on a deprecation notice. - if (E_USER_DEPRECATED === $severity && false !== strpos($message, LockHandler::class)) { - return; - } - throw new \ErrorException($message, 0, $severity, $file, $line); }); try { - if (false == file_exists($destination->getFileInfo())) { - touch($destination->getFileInfo()); - chmod($destination->getFileInfo(), $this->chmod); + if (false == file_exists((string) $destination->getFileInfo())) { + touch((string) $destination->getFileInfo()); + chmod((string) $destination->getFileInfo(), $this->chmod); } } finally { restore_error_handler(); } } - public function workWithFile(FsDestination $destination, $mode, callable $callback) + public function workWithFile(FsDestination $destination, string $mode, callable $callback) { $this->declareDestination($destination); set_error_handler(function ($severity, $message, $file, $line) { throw new \ErrorException($message, 0, $severity, $file, $line); - }); + }, \E_ALL & ~\E_USER_DEPRECATED); try { - $file = fopen($destination->getFileInfo(), $mode); - $lockHandler = $this->getLockHandler($destination); - - if (false == $lockHandler->lock(true)) { - throw new \LogicException(sprintf('Cannot obtain the lock for destination %s', $destination->getName())); - } + $file = fopen((string) $destination->getFileInfo(), $mode); + $this->lock->lock($destination); return call_user_func($callback, $destination, $file); } finally { if (isset($file)) { fclose($file); } - if (isset($lockHandler)) { - $lockHandler->release(); - } + $this->lock->release($destination); restore_error_handler(); } } /** - * {@inheritdoc} - * * @return FsDestination */ - public function createTemporaryQueue() + public function createTemporaryQueue(): Queue { - return new FsDestination(new TempFile($this->getStoreDir().'/'.uniqid('tmp-q-', true))); + return new FsDestination( + new TempFile($this->getStoreDir().'/'.uniqid('tmp-q-', true)) + ); } /** - * {@inheritdoc} - * * @return FsProducer */ - public function createProducer() + public function createProducer(): Producer { return new FsProducer($this); } /** - * {@inheritdoc} - * - * @param FsDestination|PsrDestination $destination + * @param FsDestination $destination * * @return FsConsumer */ - public function createConsumer(PsrDestination $destination) + public function createConsumer(Destination $destination): Consumer { InvalidDestinationException::assertDestinationInstanceOf($destination, FsDestination::class); @@ -182,19 +158,20 @@ public function createConsumer(PsrDestination $destination) return $consumer; } - public function close() + public function close(): void { - foreach ($this->lockHandlers as $lockHandler) { - $lockHandler->release(); - } + $this->lock->releaseAll(); + } - $this->lockHandlers = []; + public function createSubscriptionConsumer(): SubscriptionConsumer + { + throw SubscriptionConsumerNotSupportedException::providerDoestNotSupportIt(); } /** - * @param PsrQueue|FsDestination $queue + * @param FsDestination $queue */ - public function purge(PsrQueue $queue) + public function purgeQueue(Queue $queue): void { InvalidDestinationException::assertDestinationInstanceOf($queue, FsDestination::class); @@ -203,26 +180,17 @@ public function purge(PsrQueue $queue) }); } - /** - * @return int - */ - public function getPreFetchCount() + public function getPreFetchCount(): int { return $this->preFetchCount; } - /** - * @param int $preFetchCount - */ - public function setPreFetchCount($preFetchCount) + public function setPreFetchCount(int $preFetchCount): void { $this->preFetchCount = $preFetchCount; } - /** - * @return string - */ - private function getStoreDir() + private function getStoreDir(): string { if (false == is_dir($this->storeDir)) { throw new \LogicException(sprintf('The directory %s does not exist', $this->storeDir)); @@ -234,18 +202,4 @@ private function getStoreDir() return $this->storeDir; } - - /** - * @param FsDestination $destination - * - * @return LockHandler - */ - private function getLockHandler(FsDestination $destination) - { - if (false == isset($this->lockHandlers[$destination->getName()])) { - $this->lockHandlers[$destination->getName()] = new LockHandler($destination->getName(), $this->storeDir); - } - - return $this->lockHandlers[$destination->getName()]; - } } diff --git a/FsDestination.php b/FsDestination.php index 47fd47f..5593917 100644 --- a/FsDestination.php +++ b/FsDestination.php @@ -1,54 +1,41 @@ file = $file; } - /** - * @return \SplFileInfo - */ - public function getFileInfo() + public function getFileInfo(): \SplFileInfo { return $this->file; } - /** - * @return string - */ - public function getName() + public function getName(): string { return $this->file->getFilename(); } - /** - * {@inheritdoc} - */ - public function getQueueName() + public function getQueueName(): string { - return $this->getName(); + return $this->file->getFilename(); } - /** - * {@inheritdoc} - */ - public function getTopicName() + public function getTopicName(): string { - return $this->getName(); + return $this->file->getFilename(); } } diff --git a/FsMessage.php b/FsMessage.php index ccd58f2..45312e5 100644 --- a/FsMessage.php +++ b/FsMessage.php @@ -1,10 +1,12 @@ body = $body; $this->properties = $properties; @@ -39,172 +36,109 @@ public function __construct($body = '', array $properties = [], array $headers = $this->redelivered = false; } - /** - * @param string $body - */ - public function setBody($body) + public function setBody(string $body): void { $this->body = $body; } - /** - * {@inheritdoc} - */ - public function getBody() + public function getBody(): string { return $this->body; } - /** - * @param array $properties - */ - public function setProperties(array $properties) + public function setProperties(array $properties): void { $this->properties = $properties; } - /** - * {@inheritdoc} - */ - public function getProperties() + public function getProperties(): array { return $this->properties; } - /** - * {@inheritdoc} - */ - public function setProperty($name, $value) + public function setProperty(string $name, $value): void { $this->properties[$name] = $value; } - /** - * {@inheritdoc} - */ - public function getProperty($name, $default = null) + public function getProperty(string $name, $default = null) { return array_key_exists($name, $this->properties) ? $this->properties[$name] : $default; } - /** - * @param array $headers - */ - public function setHeaders(array $headers) + public function setHeaders(array $headers): void { $this->headers = $headers; } - /** - * {@inheritdoc} - */ - public function getHeaders() + public function getHeaders(): array { return $this->headers; } - /** - * {@inheritdoc} - */ - public function setHeader($name, $value) + public function setHeader(string $name, $value): void { $this->headers[$name] = $value; } - /** - * {@inheritdoc} - */ - public function getHeader($name, $default = null) + public function getHeader(string $name, $default = null) { return array_key_exists($name, $this->headers) ? $this->headers[$name] : $default; } - /** - * @return bool - */ - public function isRedelivered() + public function isRedelivered(): bool { return $this->redelivered; } - /** - * @param bool $redelivered - */ - public function setRedelivered($redelivered) + public function setRedelivered(bool $redelivered): void { $this->redelivered = $redelivered; } - /** - * {@inheritdoc} - */ - public function setCorrelationId($correlationId) + public function setCorrelationId(?string $correlationId = null): void { $this->setHeader('correlation_id', (string) $correlationId); } - /** - * {@inheritdoc} - */ - public function getCorrelationId() + public function getCorrelationId(): ?string { return $this->getHeader('correlation_id'); } - /** - * {@inheritdoc} - */ - public function setMessageId($messageId) + public function setMessageId(?string $messageId = null): void { $this->setHeader('message_id', (string) $messageId); } - /** - * {@inheritdoc} - */ - public function getMessageId() + public function getMessageId(): ?string { return $this->getHeader('message_id'); } - /** - * {@inheritdoc} - */ - public function getTimestamp() + public function getTimestamp(): ?int { $value = $this->getHeader('timestamp'); - return $value === null ? null : (int) $value; + return null === $value ? null : (int) $value; } - /** - * {@inheritdoc} - */ - public function setTimestamp($timestamp) + public function setTimestamp(?int $timestamp = null): void { $this->setHeader('timestamp', $timestamp); } - /** - * @param string|null $replyTo - */ - public function setReplyTo($replyTo) + public function setReplyTo(?string $replyTo = null): void { $this->setHeader('reply_to', $replyTo); } - /** - * @return string|null - */ - public function getReplyTo() + public function getReplyTo(): ?string { return $this->getHeader('reply_to'); } - /** - * {@inheritdoc} - */ - public function jsonSerialize() + public function jsonSerialize(): array { return [ 'body' => $this->getBody(), @@ -213,20 +147,11 @@ public function jsonSerialize() ]; } - /** - * @param string $json - * - * @return FsMessage - */ - public static function jsonUnserialize($json) + public static function jsonUnserialize(string $json): self { $data = json_decode($json, true); - if (JSON_ERROR_NONE !== json_last_error()) { - throw new \InvalidArgumentException(sprintf( - 'The malformed json given. Error %s and message %s', - json_last_error(), - json_last_error_msg() - )); + if (\JSON_ERROR_NONE !== json_last_error()) { + throw new \InvalidArgumentException(sprintf('The malformed json given. Error %s and message %s', json_last_error(), json_last_error_msg())); } return new self($data['body'], $data['properties'], $data['headers']); diff --git a/FsProducer.php b/FsProducer.php index a8dbac9..067e54b 100644 --- a/FsProducer.php +++ b/FsProducer.php @@ -1,17 +1,19 @@ context = $context; } /** - * {@inheritdoc} - * * @param FsDestination $destination * @param FsMessage $message */ - public function send(PsrDestination $destination, PsrMessage $message) + public function send(Destination $destination, Message $message): void { InvalidDestinationException::assertDestinationInstanceOf($destination, FsDestination::class); InvalidMessageException::assertMessageInstanceOf($message, FsMessage::class); $this->context->workWithFile($destination, 'a+', function (FsDestination $destination, $file) use ($message) { $fileInfo = $destination->getFileInfo(); - if ($fileInfo instanceof TempFile && false == file_exists($fileInfo)) { + if ($fileInfo instanceof TempFile && false == file_exists((string) $fileInfo)) { return; } @@ -52,14 +49,12 @@ public function send(PsrDestination $destination, PsrMessage $message) $message->setHeader('x-expire-at', microtime(true) + ($this->timeToLive / 1000)); } - $rawMessage = '|'.json_encode($message); + $rawMessage = json_encode($message); + $rawMessage = str_replace('|{', '\|\{', $rawMessage); + $rawMessage = '|'.$rawMessage; - if (JSON_ERROR_NONE !== json_last_error()) { - throw new \InvalidArgumentException(sprintf( - 'Could not encode value into json. Error %s and message %s', - json_last_error(), - json_last_error_msg() - )); + if (\JSON_ERROR_NONE !== json_last_error()) { + throw new \InvalidArgumentException(sprintf('Could not encode value into json. Error %s and message %s', json_last_error(), json_last_error_msg())); } $rawMessage = str_repeat(' ', 64 - (strlen($rawMessage) % 64)).$rawMessage; @@ -68,60 +63,42 @@ public function send(PsrDestination $destination, PsrMessage $message) }); } - /** - * {@inheritdoc} - */ - public function setDeliveryDelay($deliveryDelay) + public function setDeliveryDelay(?int $deliveryDelay = null): Producer { if (null === $deliveryDelay) { - return; + return $this; } throw DeliveryDelayNotSupportedException::providerDoestNotSupportIt(); } - /** - * {@inheritdoc} - */ - public function getDeliveryDelay() + public function getDeliveryDelay(): ?int { return null; } - /** - * {@inheritdoc} - */ - public function setPriority($priority) + public function setPriority(?int $priority = null): Producer { if (null === $priority) { - return; + return $this; } throw PriorityNotSupportedException::providerDoestNotSupportIt(); } - /** - * {@inheritdoc} - */ - public function getPriority() + public function getPriority(): ?int { return null; } - /** - * {@inheritdoc} - */ - public function setTimeToLive($timeToLive) + public function setTimeToLive(?int $timeToLive = null): Producer { $this->timeToLive = $timeToLive; return $this; } - /** - * {@inheritdoc} - */ - public function getTimeToLive() + public function getTimeToLive(): ?int { return null; } diff --git a/LegacyFilesystemLock.php b/LegacyFilesystemLock.php new file mode 100644 index 0000000..328fb70 --- /dev/null +++ b/LegacyFilesystemLock.php @@ -0,0 +1,177 @@ +lockHandlers = []; + } + + public function lock(FsDestination $destination) + { + $lockHandler = $this->getLockHandler($destination); + + if (false == $lockHandler->lock(true)) { + throw new CannotObtainLockException(sprintf('Cannot obtain the lock for destination %s', $destination->getName())); + } + } + + public function release(FsDestination $destination) + { + $lockHandler = $this->getLockHandler($destination); + + $lockHandler->release(); + } + + public function releaseAll() + { + foreach ($this->lockHandlers as $lockHandler) { + $lockHandler->release(); + } + + $this->lockHandlers = []; + } + + /** + * @return LockHandler + */ + private function getLockHandler(FsDestination $destination) + { + if (false == isset($this->lockHandlers[$destination->getName()])) { + $this->lockHandlers[$destination->getName()] = new LockHandler( + $destination->getName(), + $destination->getFileInfo()->getPath() + ); + } + + return $this->lockHandlers[$destination->getName()]; + } +} + +// symfony/lock component works only with 3.x and 4.x Symfony +// For symfony 2.x we should use LockHandler from symfony/component which was removed from 4.x +// because we cannot use both at the same time. I copied and pasted the lock handler here + +/* + * This file is part of the Symfony package. + * + * (c) Fabien Potencier + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +/** + * LockHandler class provides a simple abstraction to lock anything by means of + * a file lock. + * + * A locked file is created based on the lock name when calling lock(). Other + * lock handlers will not be able to lock the same name until it is released + * (explicitly by calling release() or implicitly when the instance holding the + * lock is destroyed). + * + * @author Grégoire Pineau + * @author Romain Neutron + * @author Nicolas Grekas + * + * @deprecated since version 3.4, to be removed in 4.0. Use Symfony\Component\Lock\Store\SemaphoreStore or Symfony\Component\Lock\Store\FlockStore instead. + */ +class LockHandler +{ + private $file; + private $handle; + + /** + * @param string $name The lock name + * @param string|null $lockPath The directory to store the lock. Default values will use temporary directory + * + * @throws IOException If the lock directory could not be created or is not writable + */ + public function __construct($name, $lockPath = null) + { + $lockPath = $lockPath ?: sys_get_temp_dir(); + + if (!is_dir($lockPath)) { + $fs = new Filesystem(); + $fs->mkdir($lockPath); + } + + if (!is_writable($lockPath)) { + throw new IOException(sprintf('The directory "%s" is not writable.', $lockPath), 0, null, $lockPath); + } + + $this->file = sprintf('%s/sf.%s.%s.lock', $lockPath, preg_replace('/[^a-z0-9\._-]+/i', '-', $name), hash('sha256', $name)); + } + + /** + * Lock the resource. + * + * @param bool $blocking Wait until the lock is released + * + * @throws IOException If the lock file could not be created or opened + * + * @return bool Returns true if the lock was acquired, false otherwise + */ + public function lock($blocking = false) + { + if ($this->handle) { + return true; + } + + $error = null; + + // Silence error reporting + set_error_handler(function ($errno, $msg) use (&$error) { + $error = $msg; + }); + + if (!$this->handle = fopen($this->file, 'r')) { + if ($this->handle = fopen($this->file, 'x')) { + chmod($this->file, 0444); + } elseif (!$this->handle = fopen($this->file, 'r')) { + usleep(100); // Give some time for chmod() to complete + $this->handle = fopen($this->file, 'r'); + } + } + restore_error_handler(); + + if (!$this->handle) { + throw new IOException($error, 0, null, $this->file); + } + + // On Windows, even if PHP doc says the contrary, LOCK_NB works, see + // https://bugs.php.net/54129 + if (!flock($this->handle, \LOCK_EX | ($blocking ? 0 : \LOCK_NB))) { + fclose($this->handle); + $this->handle = null; + + return false; + } + + return true; + } + + /** + * Release the resource. + */ + public function release() + { + if ($this->handle) { + flock($this->handle, \LOCK_UN | \LOCK_NB); + fclose($this->handle); + $this->handle = null; + } + } +} diff --git a/Lock.php b/Lock.php new file mode 100644 index 0000000..16349f2 --- /dev/null +++ b/Lock.php @@ -0,0 +1,20 @@ +Supporting Enqueue + +Enqueue is an MIT-licensed open source project with its ongoing development made possible entirely by the support of community and our customers. If you'd like to join them, please consider: + +- [Become a sponsor](https://www.patreon.com/makasim) +- [Become our client](http://forma-pro.com/) + +--- + # Enqueue Filesystem Transport [![Gitter](https://badges.gitter.im/php-enqueue/Lobby.svg)](https://gitter.im/php-enqueue/Lobby) -[![Build Status](https://travis-ci.org/php-enqueue/fs.png?branch=master)](https://travis-ci.org/php-enqueue/fs) +[![Build Status](https://img.shields.io/github/actions/workflow/status/php-enqueue/fs/ci.yml?branch=master)](https://github.com/php-enqueue/fs/actions?query=workflow%3ACI) [![Total Downloads](https://poser.pugx.org/enqueue/fs/d/total.png)](https://packagist.org/packages/enqueue/fs) [![Latest Stable Version](https://poser.pugx.org/enqueue/fs/version.png)](https://packagist.org/packages/enqueue/fs) - -This is an implementation of PSR queue specification. It allows you to send and consume message stored locally in files. + +This is an implementation of Queue Interop specification. It allows you to send and consume message stored locally in files. ## Resources -* [Documentation](https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md) +* [Site](https://enqueue.forma-pro.com/) +* [Documentation](https://php-enqueue.github.io/transport/filesystem/) * [Questions](https://gitter.im/php-enqueue/Lobby) * [Issue Tracker](https://github.com/php-enqueue/enqueue-dev/issues) ## Developed by Forma-Pro -Forma-Pro is a full stack development company which interests also spread to open source development. -Being a team of strong professionals we have an aim an ability to help community by developing cutting edge solutions in the areas of e-commerce, docker & microservice oriented architecture where we have accumulated a huge many-years experience. +Forma-Pro is a full stack development company which interests also spread to open source development. +Being a team of strong professionals we have an aim an ability to help community by developing cutting edge solutions in the areas of e-commerce, docker & microservice oriented architecture where we have accumulated a huge many-years experience. Our main specialization is Symfony framework based solution, but we are always looking to the technologies that allow us to do our job the best way. We are committed to creating solutions that revolutionize the way how things are developed in aspects of architecture & scalability. If you have any questions and inquires about our open source development, this product particularly or any other matter feel free to contact at opensource@forma-pro.com diff --git a/Symfony/FsTransportFactory.php b/Symfony/FsTransportFactory.php deleted file mode 100644 index 9908f89..0000000 --- a/Symfony/FsTransportFactory.php +++ /dev/null @@ -1,122 +0,0 @@ -name = $name; - } - - /** - * {@inheritdoc} - */ - public function addConfiguration(ArrayNodeDefinition $builder) - { - $builder - ->beforeNormalization() - ->ifString() - ->then(function ($v) { - return ['dsn' => $v]; - }) - ->end() - ->children() - ->scalarNode('dsn') - ->info('The path to a directory where to store messages given as DSN. For example file://tmp/foo') - ->end() - ->scalarNode('path') - ->cannotBeEmpty() - ->info('The store directory where all queue\topics files will be created and messages are stored') - ->end() - ->integerNode('pre_fetch_count') - ->min(1) - ->defaultValue(1) - ->info('The option tells how many messages should be read from file at once. The feature save resources but could lead to bigger messages lose.') - ->end() - ->integerNode('chmod') - ->defaultValue(0600) - ->info('The queue files are created with this given permissions if not exist.') - ->end() - ->integerNode('polling_interval') - ->defaultValue(100) - ->min(50) - ->info('How often query for new messages.') - ->end() - ; - } - - /** - * {@inheritdoc} - */ - public function createConnectionFactory(ContainerBuilder $container, array $config) - { - $factory = new Definition(FsConnectionFactory::class); - $factory->setArguments(isset($config['dsn']) ? [$config['dsn']] : [$config]); - - $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); - $container->setDefinition($factoryId, $factory); - - return $factoryId; - } - - /** - * {@inheritdoc} - */ - public function createContext(ContainerBuilder $container, array $config) - { - $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); - - $context = new Definition(FsContext::class); - $context->setFactory([new Reference($factoryId), 'createContext']); - - $contextId = sprintf('enqueue.transport.%s.context', $this->getName()); - $container->setDefinition($contextId, $context); - - return $contextId; - } - - /** - * {@inheritdoc} - */ - public function createDriver(ContainerBuilder $container, array $config) - { - $driver = new Definition(FsDriver::class); - $driver->setArguments([ - new Reference(sprintf('enqueue.transport.%s.context', $this->getName())), - new Reference('enqueue.client.config'), - new Reference('enqueue.client.meta.queue_meta_registry'), - ]); - - $driverId = sprintf('enqueue.client.%s.driver', $this->getName()); - $container->setDefinition($driverId, $driver); - - return $driverId; - } - - /** - * {@inheritdoc} - */ - public function getName() - { - return $this->name; - } -} diff --git a/Tests/Client/FsDriverTest.php b/Tests/Client/FsDriverTest.php deleted file mode 100644 index 14dfcf0..0000000 --- a/Tests/Client/FsDriverTest.php +++ /dev/null @@ -1,392 +0,0 @@ -assertClassImplements(DriverInterface::class, FsDriver::class); - } - - public function testCouldBeConstructedWithRequiredArguments() - { - new FsDriver( - $this->createPsrContextMock(), - $this->createDummyConfig(), - $this->createDummyQueueMetaRegistry() - ); - } - - public function testShouldReturnConfigObject() - { - $config = $this->createDummyConfig(); - - $driver = new FsDriver($this->createPsrContextMock(), $config, $this->createDummyQueueMetaRegistry()); - - $this->assertSame($config, $driver->getConfig()); - } - - public function testShouldCreateAndReturnQueueInstance() - { - $expectedQueue = new FsDestination(new TempFile(sys_get_temp_dir().'/queue-name')); - - $context = $this->createPsrContextMock(); - $context - ->expects($this->once()) - ->method('createQueue') - ->with('aprefix.afooqueue') - ->willReturn($expectedQueue) - ; - - $driver = new FsDriver($context, $this->createDummyConfig(), $this->createDummyQueueMetaRegistry()); - - $queue = $driver->createQueue('aFooQueue'); - - $this->assertSame($expectedQueue, $queue); - } - - public function testShouldCreateAndReturnQueueInstanceWithHardcodedTransportName() - { - $expectedQueue = new FsDestination(new TempFile(sys_get_temp_dir().'/queue-name')); - - $context = $this->createPsrContextMock(); - $context - ->expects($this->once()) - ->method('createQueue') - ->with('aBarQueue') - ->willReturn($expectedQueue) - ; - - $driver = new FsDriver($context, $this->createDummyConfig(), $this->createDummyQueueMetaRegistry()); - - $queue = $driver->createQueue('aBarQueue'); - - $this->assertSame($expectedQueue, $queue); - } - - public function testShouldConvertTransportMessageToClientMessage() - { - $transportMessage = new FsMessage(); - $transportMessage->setBody('body'); - $transportMessage->setHeaders(['hkey' => 'hval']); - $transportMessage->setProperties(['key' => 'val']); - $transportMessage->setHeader('content_type', 'ContentType'); - $transportMessage->setMessageId('MessageId'); - $transportMessage->setTimestamp(1000); - $transportMessage->setReplyTo('theReplyTo'); - $transportMessage->setCorrelationId('theCorrelationId'); - - $driver = new FsDriver( - $this->createPsrContextMock(), - $this->createDummyConfig(), - $this->createDummyQueueMetaRegistry() - ); - - $clientMessage = $driver->createClientMessage($transportMessage); - - $this->assertInstanceOf(Message::class, $clientMessage); - $this->assertSame('body', $clientMessage->getBody()); - $this->assertSame([ - 'hkey' => 'hval', - 'content_type' => 'ContentType', - 'message_id' => 'MessageId', - 'timestamp' => 1000, - 'reply_to' => 'theReplyTo', - 'correlation_id' => 'theCorrelationId', - ], $clientMessage->getHeaders()); - $this->assertSame([ - 'key' => 'val', - ], $clientMessage->getProperties()); - $this->assertSame('MessageId', $clientMessage->getMessageId()); - $this->assertSame('ContentType', $clientMessage->getContentType()); - $this->assertSame(1000, $clientMessage->getTimestamp()); - $this->assertSame('theReplyTo', $clientMessage->getReplyTo()); - $this->assertSame('theCorrelationId', $clientMessage->getCorrelationId()); - - $this->assertNull($clientMessage->getExpire()); - $this->assertSame(MessagePriority::NORMAL, $clientMessage->getPriority()); - } - - public function testShouldConvertClientMessageToTransportMessage() - { - $clientMessage = new Message(); - $clientMessage->setBody('body'); - $clientMessage->setHeaders(['hkey' => 'hval']); - $clientMessage->setProperties(['key' => 'val']); - $clientMessage->setContentType('ContentType'); - $clientMessage->setExpire(123); - $clientMessage->setPriority(MessagePriority::VERY_HIGH); - $clientMessage->setMessageId('MessageId'); - $clientMessage->setTimestamp(1000); - $clientMessage->setReplyTo('theReplyTo'); - $clientMessage->setCorrelationId('theCorrelationId'); - - $context = $this->createPsrContextMock(); - $context - ->expects($this->once()) - ->method('createMessage') - ->willReturn(new FsMessage()) - ; - - $driver = new FsDriver( - $context, - $this->createDummyConfig(), - $this->createDummyQueueMetaRegistry() - ); - - $transportMessage = $driver->createTransportMessage($clientMessage); - - $this->assertInstanceOf(FsMessage::class, $transportMessage); - $this->assertSame('body', $transportMessage->getBody()); - $this->assertSame([ - 'hkey' => 'hval', - 'content_type' => 'ContentType', - 'message_id' => 'MessageId', - 'timestamp' => 1000, - 'reply_to' => 'theReplyTo', - 'correlation_id' => 'theCorrelationId', - ], $transportMessage->getHeaders()); - $this->assertSame([ - 'key' => 'val', - ], $transportMessage->getProperties()); - $this->assertSame('MessageId', $transportMessage->getMessageId()); - $this->assertSame(1000, $transportMessage->getTimestamp()); - $this->assertSame('theReplyTo', $transportMessage->getReplyTo()); - $this->assertSame('theCorrelationId', $transportMessage->getCorrelationId()); - } - - public function testShouldSendMessageToRouter() - { - $topic = new FsDestination(TempFile::generate()); - $transportMessage = new FsMessage(); - $config = $this->createDummyConfig(); - - $producer = $this->createPsrProducerMock(); - $producer - ->expects($this->once()) - ->method('send') - ->with($this->identicalTo($topic), $this->identicalTo($transportMessage)) - ; - $context = $this->createPsrContextMock(); - $context - ->expects($this->once()) - ->method('createTopic') - ->with('aprefix.router') - ->willReturn($topic) - ; - $context - ->expects($this->once()) - ->method('createProducer') - ->willReturn($producer) - ; - $context - ->expects($this->once()) - ->method('createMessage') - ->willReturn($transportMessage) - ; - - $driver = new FsDriver( - $context, - $config, - $this->createDummyQueueMetaRegistry() - ); - - $message = new Message(); - $message->setProperty(Config::PARAMETER_TOPIC_NAME, 'topic'); - - $driver->sendToRouter($message); - } - - public function testShouldThrowExceptionIfTopicParameterIsNotSet() - { - $driver = new FsDriver( - $this->createPsrContextMock(), - $this->createDummyConfig(), - $this->createDummyQueueMetaRegistry() - ); - - $this->expectException(\LogicException::class); - $this->expectExceptionMessage('Topic name parameter is required but is not set'); - - $driver->sendToRouter(new Message()); - } - - public function testShouldSendMessageToProcessor() - { - $queue = new FsDestination(TempFile::generate()); - $transportMessage = new FsMessage(); - - $producer = $this->createPsrProducerMock(); - $producer - ->expects($this->once()) - ->method('send') - ->with($this->identicalTo($queue), $this->identicalTo($transportMessage)) - ; - $context = $this->createPsrContextMock(); - $context - ->expects($this->once()) - ->method('createQueue') - ->willReturn($queue) - ; - $context - ->expects($this->once()) - ->method('createProducer') - ->willReturn($producer) - ; - $context - ->expects($this->once()) - ->method('createMessage') - ->willReturn($transportMessage) - ; - - $driver = new FsDriver( - $context, - $this->createDummyConfig(), - $this->createDummyQueueMetaRegistry() - ); - - $message = new Message(); - $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, 'processor'); - $message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, 'aFooQueue'); - - $driver->sendToProcessor($message); - } - - public function testShouldThrowExceptionIfProcessorNameParameterIsNotSet() - { - $driver = new FsDriver( - $this->createPsrContextMock(), - $this->createDummyConfig(), - $this->createDummyQueueMetaRegistry() - ); - - $this->expectException(\LogicException::class); - $this->expectExceptionMessage('Processor name parameter is required but is not set'); - - $driver->sendToProcessor(new Message()); - } - - public function testShouldThrowExceptionIfProcessorQueueNameParameterIsNotSet() - { - $driver = new FsDriver( - $this->createPsrContextMock(), - $this->createDummyConfig(), - $this->createDummyQueueMetaRegistry() - ); - - $this->expectException(\LogicException::class); - $this->expectExceptionMessage('Queue name parameter is required but is not set'); - - $message = new Message(); - $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, 'processor'); - - $driver->sendToProcessor($message); - } - - public function testShouldSetupBroker() - { - $routerTopic = new FsDestination(TempFile::generate()); - $routerQueue = new FsDestination(TempFile::generate()); - - $processorQueue = new FsDestination(TempFile::generate()); - - $context = $this->createPsrContextMock(); - // setup router - $context - ->expects($this->at(0)) - ->method('createTopic') - ->willReturn($routerTopic) - ; - $context - ->expects($this->at(1)) - ->method('createQueue') - ->willReturn($routerQueue) - ; - $context - ->expects($this->at(2)) - ->method('declareDestination') - ->with($this->identicalTo($routerTopic)) - ; - $context - ->expects($this->at(3)) - ->method('declareDestination') - ->with($this->identicalTo($routerQueue)) - ; - // setup processor queue - $context - ->expects($this->at(4)) - ->method('createQueue') - ->willReturn($processorQueue) - ; - $context - ->expects($this->at(5)) - ->method('declareDestination') - ->with($this->identicalTo($processorQueue)) - ; - - $meta = new QueueMetaRegistry($this->createDummyConfig(), [ - 'default' => [], - ]); - - $driver = new FsDriver( - $context, - $this->createDummyConfig(), - $meta - ); - - $driver->setupBroker(); - } - - /** - * @return \PHPUnit_Framework_MockObject_MockObject|FsContext - */ - private function createPsrContextMock() - { - return $this->createMock(FsContext::class); - } - - /** - * @return \PHPUnit_Framework_MockObject_MockObject|PsrProducer - */ - private function createPsrProducerMock() - { - return $this->createMock(PsrProducer::class); - } - - /** - * @return QueueMetaRegistry - */ - private function createDummyQueueMetaRegistry() - { - $registry = new QueueMetaRegistry($this->createDummyConfig(), []); - $registry->add('default'); - $registry->add('aFooQueue'); - $registry->add('aBarQueue', 'aBarQueue'); - - return $registry; - } - - /** - * @return Config - */ - private function createDummyConfig() - { - return Config::create('aPrefix'); - } -} diff --git a/Tests/FsConnectionFactoryConfigTest.php b/Tests/FsConnectionFactoryConfigTest.php index c79581c..0b3411f 100644 --- a/Tests/FsConnectionFactoryConfigTest.php +++ b/Tests/FsConnectionFactoryConfigTest.php @@ -4,6 +4,7 @@ use Enqueue\Fs\FsConnectionFactory; use Enqueue\Test\ClassExtensionTrait; +use Enqueue\Test\ReadAttributeTrait; use PHPUnit\Framework\TestCase; /** @@ -12,6 +13,7 @@ class FsConnectionFactoryConfigTest extends TestCase { use ClassExtensionTrait; + use ReadAttributeTrait; public function testThrowNeitherArrayStringNorNullGivenAsConfig() { @@ -21,10 +23,10 @@ public function testThrowNeitherArrayStringNorNullGivenAsConfig() new FsConnectionFactory(new \stdClass()); } - public function testThrowIfSchemeIsNotAmqp() + public function testThrowIfSchemeIsNotFileScheme() { $this->expectException(\LogicException::class); - $this->expectExceptionMessage('The given DSN "http://example.com" is not supported. Must start with "file:'); + $this->expectExceptionMessage('The given scheme protocol "http" is not supported. It must be one of "file"'); new FsConnectionFactory('http://example.com'); } @@ -32,16 +34,23 @@ public function testThrowIfSchemeIsNotAmqp() public function testThrowIfDsnCouldNotBeParsed() { $this->expectException(\LogicException::class); - $this->expectExceptionMessage('Failed to parse DSN path ":@/". The path must start with "/"'); + $this->expectExceptionMessage('The DSN is invalid.'); - new FsConnectionFactory('file://:@/'); + new FsConnectionFactory('foo'); + } + + public function testThrowIfArrayConfigGivenWithEmptyPath() + { + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The path option must be set.'); + + new FsConnectionFactory([ + 'path' => null, + ]); } /** * @dataProvider provideConfigs - * - * @param mixed $config - * @param mixed $expectedConfig */ public function testShouldParseConfigurationAsExpected($config, $expectedConfig) { diff --git a/Tests/FsConnectionFactoryTest.php b/Tests/FsConnectionFactoryTest.php index aa7c511..2df4423 100644 --- a/Tests/FsConnectionFactoryTest.php +++ b/Tests/FsConnectionFactoryTest.php @@ -5,21 +5,23 @@ use Enqueue\Fs\FsConnectionFactory; use Enqueue\Fs\FsContext; use Enqueue\Test\ClassExtensionTrait; -use Interop\Queue\PsrConnectionFactory; +use Enqueue\Test\ReadAttributeTrait; +use Interop\Queue\ConnectionFactory; class FsConnectionFactoryTest extends \PHPUnit\Framework\TestCase { use ClassExtensionTrait; + use ReadAttributeTrait; public function testShouldImplementConnectionFactoryInterface() { - $this->assertClassImplements(PsrConnectionFactory::class, FsConnectionFactory::class); + $this->assertClassImplements(ConnectionFactory::class, FsConnectionFactory::class); } public function testShouldCreateContext() { $factory = new FsConnectionFactory([ - 'path' => 'theDir', + 'path' => __DIR__, 'pre_fetch_count' => 123, 'chmod' => 0765, ]); @@ -28,7 +30,7 @@ public function testShouldCreateContext() $this->assertInstanceOf(FsContext::class, $context); - $this->assertAttributeSame('theDir', 'storeDir', $context); + $this->assertAttributeSame(__DIR__, 'storeDir', $context); $this->assertAttributeSame(123, 'preFetchCount', $context); $this->assertAttributeSame(0765, 'chmod', $context); } diff --git a/Tests/FsConsumerTest.php b/Tests/FsConsumerTest.php index b3f1426..67f03ae 100644 --- a/Tests/FsConsumerTest.php +++ b/Tests/FsConsumerTest.php @@ -8,7 +8,7 @@ use Enqueue\Fs\FsMessage; use Enqueue\Fs\FsProducer; use Enqueue\Test\ClassExtensionTrait; -use Interop\Queue\PsrConsumer; +use Interop\Queue\Consumer; use Makasim\File\TempFile; class FsConsumerTest extends \PHPUnit\Framework\TestCase @@ -17,12 +17,7 @@ class FsConsumerTest extends \PHPUnit\Framework\TestCase public function testShouldImplementConsumerInterface() { - $this->assertClassImplements(PsrConsumer::class, FsConsumer::class); - } - - public function testCouldBeConstructedWithContextAndDestinationAndPreFetchCountAsArguments() - { - new FsConsumer($this->createContextMock(), new FsDestination(TempFile::generate()), 1); + $this->assertClassImplements(Consumer::class, FsConsumer::class); } public function testShouldReturnDestinationSetInConstructorOnGetQueue() @@ -50,6 +45,9 @@ public function testShouldAllowGetPreviouslySetPreFetchCount() $this->assertSame(456, $consumer->getPreFetchCount()); } + /** + * @doesNotPerformAssertions + */ public function testShouldDoNothingOnAcknowledge() { $consumer = new FsConsumer($this->createContextMock(), new FsDestination(TempFile::generate()), 123); @@ -57,6 +55,9 @@ public function testShouldDoNothingOnAcknowledge() $consumer->acknowledge(new FsMessage()); } + /** + * @doesNotPerformAssertions + */ public function testShouldDoNothingOnReject() { $consumer = new FsConsumer($this->createContextMock(), new FsDestination(TempFile::generate()), 123); @@ -134,7 +135,7 @@ public function testShouldWaitTwoSecondsForMessageAndExitOnReceive() } /** - * @return \PHPUnit_Framework_MockObject_MockObject|FsProducer + * @return \PHPUnit\Framework\MockObject\MockObject|FsProducer */ private function createProducerMock() { @@ -142,7 +143,7 @@ private function createProducerMock() } /** - * @return \PHPUnit_Framework_MockObject_MockObject|FsContext + * @return \PHPUnit\Framework\MockObject\MockObject|FsContext */ private function createContextMock() { diff --git a/Tests/FsContextTest.php b/Tests/FsContextTest.php index 5ce324a..9d5a5f1 100644 --- a/Tests/FsContextTest.php +++ b/Tests/FsContextTest.php @@ -9,27 +9,24 @@ use Enqueue\Fs\FsProducer; use Enqueue\Null\NullQueue; use Enqueue\Test\ClassExtensionTrait; -use Interop\Queue\InvalidDestinationException; -use Interop\Queue\PsrContext; +use Enqueue\Test\ReadAttributeTrait; +use Interop\Queue\Context; +use Interop\Queue\Exception\InvalidDestinationException; use Makasim\File\TempFile; class FsContextTest extends \PHPUnit\Framework\TestCase { use ClassExtensionTrait; + use ReadAttributeTrait; public function testShouldImplementContextInterface() { - $this->assertClassImplements(PsrContext::class, FsContext::class); - } - - public function testCouldBeConstructedWithExpectedArguments() - { - new FsContext(sys_get_temp_dir(), 1, 0666); + $this->assertClassImplements(Context::class, FsContext::class); } public function testShouldAllowCreateEmptyMessage() { - $context = new FsContext(sys_get_temp_dir(), 1, 0666); + $context = new FsContext(sys_get_temp_dir(), 1, 0666, 100); $message = $context->createMessage(); @@ -42,7 +39,7 @@ public function testShouldAllowCreateEmptyMessage() public function testShouldAllowCreateCustomMessage() { - $context = new FsContext(sys_get_temp_dir(), 1, 0666); + $context = new FsContext(sys_get_temp_dir(), 1, 0666, 100); $message = $context->createMessage('theBody', ['aProp' => 'aPropVal'], ['aHeader' => 'aHeaderVal']); @@ -57,7 +54,7 @@ public function testShouldCreateQueue() { $tmpFile = new TempFile(sys_get_temp_dir().'/foo'); - $context = new FsContext(sys_get_temp_dir(), 1, 0666); + $context = new FsContext(sys_get_temp_dir(), 1, 0666, 100); $queue = $context->createQueue($tmpFile->getFilename()); @@ -72,7 +69,7 @@ public function testShouldAllowCreateTopic() { $tmpFile = new TempFile(sys_get_temp_dir().'/foo'); - $context = new FsContext(sys_get_temp_dir(), 1, 0666); + $context = new FsContext(sys_get_temp_dir(), 1, 0666, 100); $topic = $context->createTopic($tmpFile->getFilename()); @@ -87,7 +84,7 @@ public function testShouldAllowCreateTmpQueue() { $tmpFile = new TempFile(sys_get_temp_dir().'/foo'); - $context = new FsContext(sys_get_temp_dir(), 1, 0666); + $context = new FsContext(sys_get_temp_dir(), 1, 0666, 100); $queue = $context->createTemporaryQueue(); @@ -100,7 +97,7 @@ public function testShouldCreateProducer() { $tmpFile = new TempFile(sys_get_temp_dir().'/foo'); - $context = new FsContext(sys_get_temp_dir(), 1, 0666); + $context = new FsContext(sys_get_temp_dir(), 1, 0666, 100); $producer = $context->createProducer(); @@ -111,7 +108,7 @@ public function testShouldThrowIfNotFsDestinationGivenOnCreateConsumer() { $tmpFile = new TempFile(sys_get_temp_dir().'/foo'); - $context = new FsContext(sys_get_temp_dir(), 1, 0666); + $context = new FsContext(sys_get_temp_dir(), 1, 0666, 100); $this->expectException(InvalidDestinationException::class); $this->expectExceptionMessage('The destination must be an instance of Enqueue\Fs\FsDestination but got Enqueue\Null\NullQueue.'); @@ -120,11 +117,14 @@ public function testShouldThrowIfNotFsDestinationGivenOnCreateConsumer() $this->assertInstanceOf(FsConsumer::class, $consumer); } + /** + * @doesNotPerformAssertions + */ public function testShouldCreateConsumer() { $tmpFile = new TempFile(sys_get_temp_dir().'/foo'); - $context = new FsContext(sys_get_temp_dir(), 1, 0666); + $context = new FsContext(sys_get_temp_dir(), 1, 0666, 100); $queue = $context->createQueue($tmpFile->getFilename()); @@ -135,7 +135,7 @@ public function testShouldPropagatePreFetchCountToCreatedConsumer() { $tmpFile = new TempFile(sys_get_temp_dir().'/foo'); - $context = new FsContext(sys_get_temp_dir(), 123, 0666); + $context = new FsContext(sys_get_temp_dir(), 123, 0666, 100); $queue = $context->createQueue($tmpFile->getFilename()); @@ -151,7 +151,7 @@ public function testShouldAllowGetPreFetchCountSetInConstructor() { $tmpFile = new TempFile(sys_get_temp_dir().'/foo'); - $context = new FsContext(sys_get_temp_dir(), 123, 0666); + $context = new FsContext(sys_get_temp_dir(), 123, 0666, 100); $this->assertSame(123, $context->getPreFetchCount()); } @@ -160,7 +160,7 @@ public function testShouldAllowGetPreviouslySetPreFetchCount() { $tmpFile = new TempFile(sys_get_temp_dir().'/foo'); - $context = new FsContext(sys_get_temp_dir(), 1, 0666); + $context = new FsContext(sys_get_temp_dir(), 1, 0666, 100); $context->setPreFetchCount(456); @@ -173,52 +173,24 @@ public function testShouldAllowPurgeMessagesFromQueue() file_put_contents($tmpFile, 'foo'); - $context = new FsContext(sys_get_temp_dir(), 1, 0666); + $context = new FsContext(sys_get_temp_dir(), 1, 0666, 100); $queue = $context->createQueue($tmpFile->getFilename()); - $context->purge($queue); + $context->purgeQueue($queue); $this->assertEmpty(file_get_contents($tmpFile)); } - public function testShouldReleaseAllLocksOnClose() - { - new TempFile(sys_get_temp_dir().'/foo'); - new TempFile(sys_get_temp_dir().'/bar'); - - $context = new FsContext(sys_get_temp_dir(), 1, 0666); - - $fooQueue = $context->createQueue('foo'); - $barQueue = $context->createTopic('bar'); - - $this->assertAttributeCount(0, 'lockHandlers', $context); - - $context->workWithFile($fooQueue, 'r+', function () { - }); - $context->workWithFile($barQueue, 'r+', function () { - }); - $context->workWithFile($fooQueue, 'c+', function () { - }); - $context->workWithFile($barQueue, 'c+', function () { - }); - - $this->assertAttributeCount(2, 'lockHandlers', $context); - - $context->close(); - - $this->assertAttributeCount(0, 'lockHandlers', $context); - } - public function testShouldCreateFileOnFilesystemIfNotExistOnDeclareDestination() { $tmpFile = new TempFile(sys_get_temp_dir().'/'.uniqid()); - $context = new FsContext(sys_get_temp_dir(), 1, 0666); + $context = new FsContext(sys_get_temp_dir(), 1, 0666, 100); $queue = $context->createQueue($tmpFile->getFilename()); - $this->assertFileNotExists((string) $tmpFile); + $this->assertFileDoesNotExist((string) $tmpFile); $context->declareDestination($queue); diff --git a/Tests/FsDestinationTest.php b/Tests/FsDestinationTest.php index cfe9e1a..6e5753f 100644 --- a/Tests/FsDestinationTest.php +++ b/Tests/FsDestinationTest.php @@ -4,8 +4,8 @@ use Enqueue\Fs\FsDestination; use Enqueue\Test\ClassExtensionTrait; -use Interop\Queue\PsrQueue; -use Interop\Queue\PsrTopic; +use Interop\Queue\Queue; +use Interop\Queue\Topic; use Makasim\File\TempFile; class FsDestinationTest extends \PHPUnit\Framework\TestCase @@ -14,8 +14,8 @@ class FsDestinationTest extends \PHPUnit\Framework\TestCase public function testShouldImplementsTopicAndQueueInterfaces() { - $this->assertClassImplements(PsrTopic::class, FsDestination::class); - $this->assertClassImplements(PsrQueue::class, FsDestination::class); + $this->assertClassImplements(Topic::class, FsDestination::class); + $this->assertClassImplements(Queue::class, FsDestination::class); } public function testCouldBeConstructedWithSplFileAsFirstArgument() diff --git a/Tests/FsMessageTest.php b/Tests/FsMessageTest.php index c2f788d..90655b6 100644 --- a/Tests/FsMessageTest.php +++ b/Tests/FsMessageTest.php @@ -89,7 +89,7 @@ public function testCouldBeUnserializedFromJson() $json = json_encode($message); - //guard + // guard $this->assertNotEmpty($json); $unserializedMessage = FsMessage::jsonUnserialize($json); diff --git a/Tests/FsProducerTest.php b/Tests/FsProducerTest.php index 201cac3..266854c 100644 --- a/Tests/FsProducerTest.php +++ b/Tests/FsProducerTest.php @@ -9,9 +9,9 @@ use Enqueue\Null\NullMessage; use Enqueue\Null\NullQueue; use Enqueue\Test\ClassExtensionTrait; -use Interop\Queue\InvalidDestinationException; -use Interop\Queue\InvalidMessageException; -use Interop\Queue\PsrProducer; +use Interop\Queue\Exception\InvalidDestinationException; +use Interop\Queue\Exception\InvalidMessageException; +use Interop\Queue\Producer; use Makasim\File\TempFile; class FsProducerTest extends \PHPUnit\Framework\TestCase @@ -20,12 +20,7 @@ class FsProducerTest extends \PHPUnit\Framework\TestCase public function testShouldImplementProducerInterface() { - $this->assertClassImplements(PsrProducer::class, FsProducer::class); - } - - public function testCouldBeConstructedWithContextAsFirstArgument() - { - new FsProducer($this->createContextMock()); + $this->assertClassImplements(Producer::class, FsProducer::class); } public function testThrowIfDestinationNotFsOnSend() @@ -63,7 +58,7 @@ public function testShouldCallContextWorkWithFileAndCallbackToItOnSend() } /** - * @return \PHPUnit_Framework_MockObject_MockObject|FsContext + * @return \PHPUnit\Framework\MockObject\MockObject|FsContext */ private function createContextMock() { diff --git a/Tests/Functional/FsCommonUseCasesTest.php b/Tests/Functional/FsCommonUseCasesTest.php index ee7e9e4..b96091e 100644 --- a/Tests/Functional/FsCommonUseCasesTest.php +++ b/Tests/Functional/FsCommonUseCasesTest.php @@ -17,14 +17,14 @@ class FsCommonUseCasesTest extends \PHPUnit\Framework\TestCase */ private $fsContext; - public function setUp() + protected function setUp(): void { $this->fsContext = (new FsConnectionFactory(['path' => sys_get_temp_dir()]))->createContext(); new TempFile(sys_get_temp_dir().'/fs_test_queue'); } - public function tearDown() + protected function tearDown(): void { $this->fsContext->close(); } @@ -111,7 +111,7 @@ public function testConsumerReceiveMessageWithZeroTimeout() $topic = $this->fsContext->createTopic('fs_test_queue_exchange'); $consumer = $this->fsContext->createConsumer($topic); - //guard + // guard $this->assertNull($consumer->receive(1000)); $message = $this->fsContext->createMessage(__METHOD__); @@ -139,7 +139,7 @@ public function testPurgeMessagesFromQueue() $producer->send($queue, $message); $producer->send($queue, $message); - $this->fsContext->purge($queue); + $this->fsContext->purgeQueue($queue); $this->assertNull($consumer->receive(1)); } diff --git a/Tests/Functional/FsConsumerTest.php b/Tests/Functional/FsConsumerTest.php index f3a6411..3be009b 100644 --- a/Tests/Functional/FsConsumerTest.php +++ b/Tests/Functional/FsConsumerTest.php @@ -15,14 +15,14 @@ class FsConsumerTest extends TestCase */ private $fsContext; - public function setUp() + protected function setUp(): void { $this->fsContext = (new FsConnectionFactory(['path' => sys_get_temp_dir()]))->createContext(); - $this->fsContext->purge($this->fsContext->createQueue('fs_test_queue')); + $this->fsContext->purgeQueue($this->fsContext->createQueue('fs_test_queue')); } - public function tearDown() + protected function tearDown(): void { $this->fsContext->close(); } @@ -76,7 +76,7 @@ public function testShouldNotFailOnSpecificMessageSize() { $context = $this->fsContext; $queue = $context->createQueue('fs_test_queue'); - $context->purge($queue); + $context->purgeQueue($queue); $consumer = $context->createConsumer($queue); $producer = $context->createProducer(); @@ -102,7 +102,7 @@ public function testShouldNotCorruptFrameSize() { $context = $this->fsContext; $queue = $context->createQueue('fs_test_queue'); - $context->purge($queue); + $context->purgeQueue($queue); $consumer = $context->createConsumer($queue); $producer = $context->createProducer(); @@ -134,7 +134,7 @@ public function testShouldThrowExceptionForTheCorruptedQueueFile() { $context = $this->fsContext; $queue = $context->createQueue('fs_test_queue'); - $context->purge($queue); + $context->purgeQueue($queue); $context->workWithFile($queue, 'a+', function (FsDestination $destination, $file) { fwrite($file, '|{"body":"{\"path\":\"\\\/p\\\/r\\\/pr_swoppad_6_4910_red_1.jpg\",\"filters\":null,\"force\":false}","properties":{"enqueue.topic_name":"liip_imagine_resolve_cache"},"headers":{"content_type":"application\/json","message_id":"46fdc345-5d0c-426e-95ac-227c7e657839","timestamp":1505379216,"reply_to":null,"correlation_id":""}} |{"body":"{\"path\":\"\\\/p\\\/r\\\/pr_swoppad_6_4910_black_1.jpg\",\"filters\":null,\"force\":false}","properties":{"enqueue.topic_name":"liip_imagine_resolve_cache"},"headers":{"content_type":"application\/json","message_id":"c4d60e39-3a8c-42df-b536-c8b7c13e006d","timestamp":1505379216,"reply_to":null,"correlation_id":""}} |{"body":"{\"path\":\"\\\/p\\\/r\\\/pr_swoppad_6_4910_green_1.jpg\",\"filters\":null,\"force\":false}","properties":{"enqueue.topic_name":"liip_imagine_resolve_cache"},"headers":{"content_type":"application\/json","message_id":"3a6aa176-c879-4435-9626-c48e0643defa","timestamp":1505379216,"reply_to":null,"correlation_id":""}}'); @@ -155,11 +155,11 @@ public function testShouldThrowExceptionWhenFrameSizeNotDivideExactly() { $context = $this->fsContext; $queue = $context->createQueue('fs_test_queue'); - $context->purge($queue); + $context->purgeQueue($queue); $context->workWithFile($queue, 'a+', function (FsDestination $destination, $file) { $msg = '|{"body":""}'; - //guard + // guard $this->assertNotSame(0, strlen($msg) % 64); fwrite($file, $msg); @@ -171,4 +171,31 @@ public function testShouldThrowExceptionWhenFrameSizeNotDivideExactly() $this->expectExceptionMessage('The frame size is "12" and it must divide exactly to 64 but it leaves a reminder "12".'); $consumer->receiveNoWait(); } + + /** + * @group bug + * @group bug390 + */ + public function testShouldUnEscapeDelimiterSymbolsInMessageBody() + { + $context = $this->fsContext; + $queue = $context->createQueue('fs_test_queue'); + $context->purgeQueue($queue); + + $message = $this->fsContext->createMessage(' |{"body":"aMessageData","properties":{"enqueue.topic_name":"user_updated"},"headers":{"content_type":"text\/plain","message_id":"90979b6c-d9ff-4b39-9938-878b83a95360","timestamp":1519899428,"reply_to":null,"correlation_id":""}}'); + + $this->fsContext->createProducer()->send($queue, $message); + + $this->assertSame(0, strlen(file_get_contents(sys_get_temp_dir().'/fs_test_queue')) % 64); + $this->assertSame( + ' |{"body":" \|\{\"body\":\"aMessageData\",\"properties\":{\"enqueue.topic_name\":\"user_updated\"},\"headers\":{\"content_type\":\"text\\\\\/plain\",\"message_id\":\"90979b6c-d9ff-4b39-9938-878b83a95360\",\"timestamp\":1519899428,\"reply_to\":null,\"correlation_id\":\"\"}}","properties":[],"headers":[]}', + file_get_contents(sys_get_temp_dir().'/fs_test_queue') + ); + + $consumer = $context->createConsumer($queue); + + $message = $consumer->receiveNoWait(); + + $this->assertSame(' |{"body":"aMessageData","properties":{"enqueue.topic_name":"user_updated"},"headers":{"content_type":"text\/plain","message_id":"90979b6c-d9ff-4b39-9938-878b83a95360","timestamp":1519899428,"reply_to":null,"correlation_id":""}}', $message->getBody()); + } } diff --git a/Tests/Functional/FsConsumptionUseCasesTest.php b/Tests/Functional/FsConsumptionUseCasesTest.php index 6662f2b..334a8fe 100644 --- a/Tests/Functional/FsConsumptionUseCasesTest.php +++ b/Tests/Functional/FsConsumptionUseCasesTest.php @@ -10,9 +10,9 @@ use Enqueue\Consumption\Result; use Enqueue\Fs\FsConnectionFactory; use Enqueue\Fs\FsContext; -use Interop\Queue\PsrContext; -use Interop\Queue\PsrMessage; -use Interop\Queue\PsrProcessor; +use Interop\Queue\Context; +use Interop\Queue\Message; +use Interop\Queue\Processor; use Makasim\File\TempFile; /** @@ -25,14 +25,14 @@ class FsConsumptionUseCasesTest extends \PHPUnit\Framework\TestCase */ private $fsContext; - public function setUp() + protected function setUp(): void { $this->fsContext = (new FsConnectionFactory(['path' => sys_get_temp_dir()]))->createContext(); new TempFile(sys_get_temp_dir().'/fs_test_queue'); } - public function tearDown() + protected function tearDown(): void { $this->fsContext->close(); } @@ -54,7 +54,7 @@ public function testConsumeOneMessageAndExit() $queueConsumer->consume(); - $this->assertInstanceOf(PsrMessage::class, $processor->lastProcessedMessage); + $this->assertInstanceOf(Message::class, $processor->lastProcessedMessage); $this->assertEquals(__METHOD__, $processor->lastProcessedMessage->getBody()); } @@ -85,22 +85,22 @@ public function testConsumeOneMessageAndSendReplyExit() $queueConsumer->bind($replyQueue, $replyProcessor); $queueConsumer->consume(); - $this->assertInstanceOf(PsrMessage::class, $processor->lastProcessedMessage); + $this->assertInstanceOf(Message::class, $processor->lastProcessedMessage); $this->assertEquals(__METHOD__, $processor->lastProcessedMessage->getBody()); - $this->assertInstanceOf(PsrMessage::class, $replyProcessor->lastProcessedMessage); + $this->assertInstanceOf(Message::class, $replyProcessor->lastProcessedMessage); $this->assertEquals(__METHOD__.'.reply', $replyProcessor->lastProcessedMessage->getBody()); } } -class StubProcessor implements PsrProcessor +class StubProcessor implements Processor { public $result = self::ACK; - /** @var PsrMessage */ + /** @var Message */ public $lastProcessedMessage; - public function process(PsrMessage $message, PsrContext $context) + public function process(Message $message, Context $context) { $this->lastProcessedMessage = $message; diff --git a/Tests/Functional/FsContextTest.php b/Tests/Functional/FsContextTest.php index bdcfb34..806b9f5 100644 --- a/Tests/Functional/FsContextTest.php +++ b/Tests/Functional/FsContextTest.php @@ -14,7 +14,7 @@ class FsContextTest extends TestCase */ private $fsContext; - public function tearDown() + protected function tearDown(): void { $fs = new Filesystem(); $fs->remove(sys_get_temp_dir().'/enqueue'); diff --git a/Tests/Functional/FsProducerTest.php b/Tests/Functional/FsProducerTest.php index 634e098..75625cf 100644 --- a/Tests/Functional/FsProducerTest.php +++ b/Tests/Functional/FsProducerTest.php @@ -14,7 +14,7 @@ class FsProducerTest extends TestCase */ private $fsContext; - public function setUp() + protected function setUp(): void { $this->fsContext = (new FsConnectionFactory(['path' => sys_get_temp_dir()]))->createContext(); @@ -22,7 +22,7 @@ public function setUp() file_put_contents(sys_get_temp_dir().'/fs_test_queue', ''); } - public function tearDown() + protected function tearDown(): void { $this->fsContext->close(); } diff --git a/Tests/Functional/FsRpcUseCasesTest.php b/Tests/Functional/FsRpcUseCasesTest.php index 91c40f4..3a0327d 100644 --- a/Tests/Functional/FsRpcUseCasesTest.php +++ b/Tests/Functional/FsRpcUseCasesTest.php @@ -20,7 +20,7 @@ class FsRpcUseCasesTest extends TestCase */ private $fsContext; - public function setUp() + protected function setUp(): void { $this->fsContext = (new FsConnectionFactory(['path' => sys_get_temp_dir()]))->createContext(); @@ -28,7 +28,7 @@ public function setUp() new TempFile(sys_get_temp_dir().'/fs_reply_queue'); } - public function tearDown() + protected function tearDown(): void { $this->fsContext->close(); } diff --git a/Tests/LegacyFilesystemLockTest.php b/Tests/LegacyFilesystemLockTest.php new file mode 100644 index 0000000..5197128 --- /dev/null +++ b/Tests/LegacyFilesystemLockTest.php @@ -0,0 +1,52 @@ +assertClassImplements(Lock::class, LegacyFilesystemLock::class); + } + + public function testShouldReleaseAllLocksOnClose() + { + $context = new FsContext(sys_get_temp_dir(), 1, 0666, 100); + $fooQueue = $context->createQueue('foo'); + $barQueue = $context->createTopic('bar'); + + new TempFile(sys_get_temp_dir().'/foo'); + new TempFile(sys_get_temp_dir().'/bar'); + + $lock = new LegacyFilesystemLock(); + + $this->assertAttributeCount(0, 'lockHandlers', $lock); + + $lock->lock($fooQueue); + $this->assertAttributeCount(1, 'lockHandlers', $lock); + + $lock->release($fooQueue); + $this->assertAttributeCount(1, 'lockHandlers', $lock); + + $lock->lock($barQueue); + $lock->lock($fooQueue); + $lock->lock($barQueue); + + $this->assertAttributeCount(2, 'lockHandlers', $lock); + + $lock->releaseAll(); + + $this->assertAttributeCount(0, 'lockHandlers', $lock); + } +} diff --git a/Tests/Spec/FsMessageTest.php b/Tests/Spec/FsMessageTest.php index da34ab6..f1ece8e 100644 --- a/Tests/Spec/FsMessageTest.php +++ b/Tests/Spec/FsMessageTest.php @@ -3,13 +3,10 @@ namespace Enqueue\Fs\Tests\Spec; use Enqueue\Fs\FsMessage; -use Interop\Queue\Spec\PsrMessageSpec; +use Interop\Queue\Spec\MessageSpec; -class FsMessageTest extends PsrMessageSpec +class FsMessageTest extends MessageSpec { - /** - * {@inheritdoc} - */ protected function createMessage() { return new FsMessage(); diff --git a/Tests/Spec/FsSendAndReceiveTimeToLiveMessagesFromQueueTest.php b/Tests/Spec/FsSendAndReceiveTimeToLiveMessagesFromQueueTest.php index 3dd1697..d6a76ca 100644 --- a/Tests/Spec/FsSendAndReceiveTimeToLiveMessagesFromQueueTest.php +++ b/Tests/Spec/FsSendAndReceiveTimeToLiveMessagesFromQueueTest.php @@ -9,8 +9,6 @@ class FsSendAndReceiveTimeToLiveMessagesFromQueueTest extends SendAndReceiveTimeToLiveMessagesFromQueueSpec { /** - * {@inheritdoc} - * * @return FsContext */ protected function createContext() diff --git a/Tests/Symfony/FsTransportFactoryTest.php b/Tests/Symfony/FsTransportFactoryTest.php deleted file mode 100644 index 1fccf15..0000000 --- a/Tests/Symfony/FsTransportFactoryTest.php +++ /dev/null @@ -1,162 +0,0 @@ -assertClassImplements(TransportFactoryInterface::class, FsTransportFactory::class); - } - - public function testCouldBeConstructedWithDefaultName() - { - $transport = new FsTransportFactory(); - - $this->assertEquals('fs', $transport->getName()); - } - - public function testCouldBeConstructedWithCustomName() - { - $transport = new FsTransportFactory('theCustomName'); - - $this->assertEquals('theCustomName', $transport->getName()); - } - - public function testShouldAllowAddConfiguration() - { - $transport = new FsTransportFactory(); - $tb = new TreeBuilder(); - $rootNode = $tb->root('foo'); - - $transport->addConfiguration($rootNode); - $processor = new Processor(); - $config = $processor->process($tb->buildTree(), [[ - 'path' => sys_get_temp_dir(), - ]]); - - $this->assertEquals([ - 'path' => sys_get_temp_dir(), - 'pre_fetch_count' => 1, - 'chmod' => 0600, - 'polling_interval' => 100, - ], $config); - } - - public function testShouldAllowAddConfigurationAsString() - { - $transport = new FsTransportFactory(); - $tb = new TreeBuilder(); - $rootNode = $tb->root('foo'); - - $transport->addConfiguration($rootNode); - $processor = new Processor(); - $config = $processor->process($tb->buildTree(), ['fileDSN']); - - $this->assertEquals([ - 'dsn' => 'fileDSN', - 'pre_fetch_count' => 1, - 'chmod' => 0600, - 'polling_interval' => 100, - ], $config); - } - - public function testShouldCreateConnectionFactory() - { - $container = new ContainerBuilder(); - - $transport = new FsTransportFactory(); - - $serviceId = $transport->createConnectionFactory($container, [ - 'path' => sys_get_temp_dir(), - 'pre_fetch_count' => 1, - 'chmod' => 0600, - 'polling_interval' => 100, - ]); - - $this->assertTrue($container->hasDefinition($serviceId)); - $factory = $container->getDefinition($serviceId); - $this->assertEquals(FsConnectionFactory::class, $factory->getClass()); - $this->assertSame([[ - 'path' => sys_get_temp_dir(), - 'pre_fetch_count' => 1, - 'chmod' => 0600, - 'polling_interval' => 100, - ]], $factory->getArguments()); - } - - public function testShouldCreateConnectionFactoryFromDsnString() - { - $container = new ContainerBuilder(); - - $transport = new FsTransportFactory(); - - $serviceId = $transport->createConnectionFactory($container, [ - 'dsn' => 'theFileDSN', - ]); - - $this->assertTrue($container->hasDefinition($serviceId)); - $factory = $container->getDefinition($serviceId); - $this->assertEquals(FsConnectionFactory::class, $factory->getClass()); - $this->assertSame(['theFileDSN'], $factory->getArguments()); - } - - public function testShouldCreateContext() - { - $container = new ContainerBuilder(); - - $transport = new FsTransportFactory(); - - $serviceId = $transport->createContext($container, [ - 'path' => sys_get_temp_dir(), - 'pre_fetch_count' => 1, - 'chmod' => 0600, - 'polling_interval' => 100, - ]); - - $this->assertEquals('enqueue.transport.fs.context', $serviceId); - $this->assertTrue($container->hasDefinition($serviceId)); - - $context = $container->getDefinition('enqueue.transport.fs.context'); - $this->assertInstanceOf(Reference::class, $context->getFactory()[0]); - $this->assertEquals('enqueue.transport.fs.connection_factory', (string) $context->getFactory()[0]); - $this->assertEquals('createContext', $context->getFactory()[1]); - } - - public function testShouldCreateDriver() - { - $container = new ContainerBuilder(); - - $transport = new FsTransportFactory(); - - $serviceId = $transport->createDriver($container, []); - - $this->assertEquals('enqueue.client.fs.driver', $serviceId); - $this->assertTrue($container->hasDefinition($serviceId)); - - $driver = $container->getDefinition($serviceId); - $this->assertSame(FsDriver::class, $driver->getClass()); - - $this->assertInstanceOf(Reference::class, $driver->getArgument(0)); - $this->assertEquals('enqueue.transport.fs.context', (string) $driver->getArgument(0)); - - $this->assertInstanceOf(Reference::class, $driver->getArgument(1)); - $this->assertEquals('enqueue.client.config', (string) $driver->getArgument(1)); - - $this->assertInstanceOf(Reference::class, $driver->getArgument(2)); - $this->assertEquals('enqueue.client.meta.queue_meta_registry', (string) $driver->getArgument(2)); - } -} diff --git a/composer.json b/composer.json index b9a94b7..4dd2ff8 100644 --- a/composer.json +++ b/composer.json @@ -3,22 +3,29 @@ "type": "library", "description": "Enqueue Filesystem based transport", "keywords": ["messaging", "queue", "filesystem", "local"], + "homepage": "https://enqueue.forma-pro.com/", "license": "MIT", "require": { - "php": ">=5.6", - "queue-interop/queue-interop": "^0.6", - "symfony/filesystem": "^2.8|^3@stable", + "php": "^8.1", + "queue-interop/queue-interop": "^0.8", + "enqueue/dsn": "^0.10", + "symfony/filesystem": "^5.4|^6.0", "makasim/temp-file": "^0.2@stable" }, "require-dev": { - "phpunit/phpunit": "~5.5", - "enqueue/enqueue": "^0.7", - "enqueue/null": "^0.7", - "enqueue/test": "^0.7", - "queue-interop/queue-spec": "^0.5", - "symfony/dependency-injection": "^2.8|^3@stable", - "symfony/config": "^2.8|^3@stable", - "symfony/phpunit-bridge": "^2.8|^3@stable" + "phpunit/phpunit": "^9.5", + "enqueue/null": "0.10.x-dev", + "enqueue/test": "0.10.x-dev", + "queue-interop/queue-spec": "^0.6.2", + "symfony/dependency-injection": "^5.4|^6.0", + "symfony/yaml": "^5.4|^6.0" + }, + "support": { + "email": "opensource@forma-pro.com", + "issues": "https://github.com/php-enqueue/enqueue-dev/issues", + "forum": "https://gitter.im/php-enqueue/Lobby", + "source": "https://github.com/php-enqueue/enqueue-dev", + "docs": "https://github.com/php-enqueue/enqueue-dev/blob/master/docs/index.md" }, "autoload": { "psr-4": { "Enqueue\\Fs\\": "" }, @@ -29,7 +36,7 @@ "minimum-stability": "dev", "extra": { "branch-alias": { - "dev-master": "0.8.x-dev" + "dev-master": "0.10.x-dev" } } } diff --git a/phpunit.xml.dist b/phpunit.xml.dist index 9754bd4..79088ae 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -1,16 +1,11 @@ - +