Skip to content

Commit 49e3936

Browse files
authored
Merge pull request #1091 from atrauzzi/feature/stomp-artemis-support
Add first pass for Apache ActiveMQ Artemis support
2 parents caf0afa + e3d07c9 commit 49e3936

11 files changed

+139
-79
lines changed

pkg/enqueue/Tests/Client/Driver/RabbitMqStompDriverTest.php

+9-8
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
use Enqueue\Client\MessagePriority;
1313
use Enqueue\Client\Route;
1414
use Enqueue\Client\RouteCollection;
15+
use Enqueue\Stomp\ExtensionType;
1516
use Enqueue\Stomp\StompContext;
1617
use Enqueue\Stomp\StompDestination;
1718
use Enqueue\Stomp\StompMessage;
@@ -47,7 +48,7 @@ public function testShouldBeSubClassOfStompDriver()
4748

4849
public function testShouldCreateAndReturnStompQueueInstance()
4950
{
50-
$expectedQueue = new StompDestination();
51+
$expectedQueue = new StompDestination(ExtensionType::RABBITMQ);
5152

5253
$context = $this->createContextMock();
5354
$context
@@ -185,10 +186,10 @@ public function testShouldInitDeliveryDelayIfDelayPropertyOnSendToProcessor()
185186

186187
public function shouldSendMessageToDelayExchangeIfDelaySet()
187188
{
188-
$queue = new StompDestination();
189+
$queue = new StompDestination(ExtensionType::RABBITMQ);
189190
$queue->setStompName('queueName');
190191

191-
$delayTopic = new StompDestination();
192+
$delayTopic = new StompDestination(ExtensionType::RABBITMQ);
192193
$delayTopic->setStompName('delayTopic');
193194

194195
$transportMessage = new StompMessage();
@@ -339,7 +340,7 @@ public function testShouldSetupBroker()
339340
->expects($this->any())
340341
->method('createQueue')
341342
->willReturnCallback(function (string $name) {
342-
$destination = new StompDestination();
343+
$destination = new StompDestination(ExtensionType::RABBITMQ);
343344
$destination->setType(StompDestination::TYPE_QUEUE);
344345
$destination->setStompName($name);
345346

@@ -431,7 +432,7 @@ public function testSetupBrokerShouldCreateDelayExchangeIfEnabled()
431432
->expects($this->any())
432433
->method('createQueue')
433434
->willReturnCallback(function (string $name) {
434-
$destination = new StompDestination();
435+
$destination = new StompDestination(ExtensionType::RABBITMQ);
435436
$destination->setType(StompDestination::TYPE_QUEUE);
436437
$destination->setStompName($name);
437438

@@ -442,7 +443,7 @@ public function testSetupBrokerShouldCreateDelayExchangeIfEnabled()
442443
->expects($this->any())
443444
->method('createTopic')
444445
->willReturnCallback(function (string $name) {
445-
$destination = new StompDestination();
446+
$destination = new StompDestination(ExtensionType::RABBITMQ);
446447
$destination->setType(StompDestination::TYPE_TOPIC);
447448
$destination->setStompName($name);
448449

@@ -503,7 +504,7 @@ protected function createProducerMock(): InteropProducer
503504
*/
504505
protected function createQueue(string $name): InteropQueue
505506
{
506-
$destination = new StompDestination();
507+
$destination = new StompDestination(ExtensionType::RABBITMQ);
507508
$destination->setType(StompDestination::TYPE_QUEUE);
508509
$destination->setStompName($name);
509510

@@ -515,7 +516,7 @@ protected function createQueue(string $name): InteropQueue
515516
*/
516517
protected function createTopic(string $name): InteropTopic
517518
{
518-
$destination = new StompDestination();
519+
$destination = new StompDestination(ExtensionType::RABBITMQ);
519520
$destination->setType(StompDestination::TYPE_TOPIC);
520521
$destination->setStompName($name);
521522

pkg/enqueue/Tests/Client/Driver/StompDriverTest.php

+3-2
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
use Enqueue\Client\Message;
1010
use Enqueue\Client\MessagePriority;
1111
use Enqueue\Client\RouteCollection;
12+
use Enqueue\Stomp\ExtensionType;
1213
use Enqueue\Stomp\StompContext;
1314
use Enqueue\Stomp\StompDestination;
1415
use Enqueue\Stomp\StompMessage;
@@ -127,7 +128,7 @@ protected function createProducerMock(): InteropProducer
127128
*/
128129
protected function createQueue(string $name): InteropQueue
129130
{
130-
$destination = new StompDestination();
131+
$destination = new StompDestination(ExtensionType::RABBITMQ);
131132
$destination->setType(StompDestination::TYPE_QUEUE);
132133
$destination->setStompName($name);
133134

@@ -139,7 +140,7 @@ protected function createQueue(string $name): InteropQueue
139140
*/
140141
protected function createTopic(string $name): InteropTopic
141142
{
142-
$destination = new StompDestination();
143+
$destination = new StompDestination(ExtensionType::RABBITMQ);
143144
$destination->setType(StompDestination::TYPE_TOPIC);
144145
$destination->setStompName($name);
145146

pkg/stomp/ExtensionType.php

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Enqueue\Stomp;
6+
7+
class ExtensionType
8+
{
9+
const ACTIVEMQ = 'activemq';
10+
const RABBITMQ = 'rabbitmq';
11+
const ARTEMIS = 'artemis';
12+
}

pkg/stomp/StompConnectionFactory.php

+15-12
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,11 @@
1313

1414
class StompConnectionFactory implements ConnectionFactory
1515
{
16-
const SCHEME_EXT_ACTIVEMQ = 'activemq';
17-
const SCHEME_EXT_RABBITMQ = 'rabbitmq';
16+
const SUPPORTED_SCHEMES = [
17+
ExtensionType::ACTIVEMQ,
18+
ExtensionType::RABBITMQ,
19+
ExtensionType::ARTEMIS,
20+
];
1821

1922
/**
2023
* @var array
@@ -71,15 +74,14 @@ public function __construct($config = 'stomp:')
7174
*/
7275
public function createContext(): Context
7376
{
74-
$useExchangePrefix = self::SCHEME_EXT_RABBITMQ === $this->config['target'] ? true : false;
75-
7677
if ($this->config['lazy']) {
77-
return new StompContext(function () {
78-
return $this->establishConnection();
79-
}, $useExchangePrefix);
78+
return new StompContext(
79+
function () { return $this->establishConnection(); },
80+
$this->config['target']
81+
);
8082
}
8183

82-
return new StompContext($this->establishConnection(), $useExchangePrefix);
84+
return new StompContext($this->establishConnection(), $this->config['target']);
8385
}
8486

8587
private function establishConnection(): BufferedStompClient
@@ -123,10 +125,11 @@ private function parseDsn(string $dsn): array
123125

124126
$schemeExtension = current($dsn->getSchemeExtensions());
125127
if (false === $schemeExtension) {
126-
$schemeExtension = self::SCHEME_EXT_RABBITMQ;
128+
$schemeExtension = ExtensionType::RABBITMQ;
127129
}
128-
if (self::SCHEME_EXT_ACTIVEMQ !== $schemeExtension && self::SCHEME_EXT_RABBITMQ !== $schemeExtension) {
129-
throw new \LogicException(sprintf('The given DSN is not supported. The scheme extension "%s" provided is invalid. It must be one of "%s" or "%s".', $schemeExtension, self::SCHEME_EXT_ACTIVEMQ, self::SCHEME_EXT_RABBITMQ));
130+
131+
if (false === in_array($schemeExtension, self::SUPPORTED_SCHEMES, true)) {
132+
throw new \LogicException(sprintf('The given DSN is not supported. The scheme extension "%s" provided is not supported. It must be one of %s.', $schemeExtension, implode(', ', self::SUPPORTED_SCHEMES)));
130133
}
131134

132135
return array_filter(array_replace($dsn->getQuery(), [
@@ -151,7 +154,7 @@ private function parseDsn(string $dsn): array
151154
private function defaultConfig(): array
152155
{
153156
return [
154-
'target' => self::SCHEME_EXT_RABBITMQ,
157+
'target' => ExtensionType::RABBITMQ,
155158
'host' => 'localhost',
156159
'port' => 61613,
157160
'login' => 'guest',

pkg/stomp/StompConsumer.php

+38-16
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
namespace Enqueue\Stomp;
66

77
use Interop\Queue\Consumer;
8+
use Interop\Queue\Exception\Exception;
89
use Interop\Queue\Exception\InvalidMessageException;
910
use Interop\Queue\Message;
1011
use Interop\Queue\Queue;
1112
use Stomp\Client;
13+
use Stomp\Exception\ErrorFrameException;
1214
use Stomp\Transport\Frame;
1315

1416
class StompConsumer implements Consumer
@@ -96,16 +98,20 @@ public function receive(int $timeout = 0): ?Message
9698
{
9799
$this->subscribe();
98100

99-
if (0 === $timeout) {
100-
while (true) {
101-
if ($message = $this->stomp->readMessageFrame($this->subscriptionId, 100)) {
101+
try {
102+
if (0 === $timeout) {
103+
while (true) {
104+
if ($message = $this->stomp->readMessageFrame($this->subscriptionId, 100)) {
105+
return $this->convertMessage($message);
106+
}
107+
}
108+
} else {
109+
if ($message = $this->stomp->readMessageFrame($this->subscriptionId, $timeout)) {
102110
return $this->convertMessage($message);
103111
}
104112
}
105-
} else {
106-
if ($message = $this->stomp->readMessageFrame($this->subscriptionId, $timeout)) {
107-
return $this->convertMessage($message);
108-
}
113+
} catch (ErrorFrameException $e) {
114+
throw new Exception($e->getMessage()."\n".$e->getFrame()->getBody(), null, $e);
109115
}
110116

111117
return null;
@@ -143,10 +149,11 @@ public function reject(Message $message, bool $requeue = false): void
143149

144150
$nackFrame = $this->stomp->getProtocol()->getNackFrame($message->getFrame());
145151

146-
// rabbitmq STOMP protocol extension
147-
$nackFrame->addHeaders([
148-
'requeue' => $requeue ? 'true' : 'false',
149-
]);
152+
if (ExtensionType::RABBITMQ === $this->queue->getExtensionType()) {
153+
$nackFrame->addHeaders([
154+
'requeue' => $requeue ? 'true' : 'false',
155+
]);
156+
}
150157

151158
$this->stomp->sendFrame($nackFrame);
152159
}
@@ -168,13 +175,28 @@ private function subscribe(): void
168175
$this->ackMode
169176
);
170177

171-
// rabbitmq STOMP protocol extension
172178
$headers = $this->queue->getHeaders();
173-
$headers['prefetch-count'] = $this->prefetchCount;
174-
$headers = StompHeadersEncoder::encode($headers);
175179

176-
foreach ($headers as $key => $value) {
177-
$frame[$key] = $value;
180+
if (ExtensionType::RABBITMQ === $this->queue->getExtensionType()) {
181+
$headers['prefetch-count'] = $this->prefetchCount;
182+
$headers = StompHeadersEncoder::encode($headers);
183+
184+
foreach ($headers as $key => $value) {
185+
$frame[$key] = $value;
186+
}
187+
} elseif (ExtensionType::ARTEMIS === $this->queue->getExtensionType()) {
188+
$subscriptionName = $this->subscriptionId.'-'.$this->queue->getStompName();
189+
190+
$artemisHeaders = [];
191+
192+
$artemisHeaders['client-id'] = true ? $this->subscriptionId : null;
193+
$artemisHeaders['durable-subscription-name'] = true ? $subscriptionName : null;
194+
195+
$artemisHeaders = StompHeadersEncoder::encode(array_filter($artemisHeaders));
196+
197+
foreach ($artemisHeaders as $key => $value) {
198+
$frame[$key] = $value;
199+
}
178200
}
179201

180202
$this->stomp->sendFrame($frame);

pkg/stomp/StompContext.php

+12-10
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ class StompContext implements Context
2323
*/
2424
private $stomp;
2525

26+
/**
27+
* @var string
28+
*/
29+
private $extensionType;
30+
2631
/**
2732
* @var bool
2833
*/
@@ -35,9 +40,8 @@ class StompContext implements Context
3540

3641
/**
3742
* @param BufferedStompClient|callable $stomp
38-
* @param bool $useExchangePrefix
3943
*/
40-
public function __construct($stomp, $useExchangePrefix = true)
44+
public function __construct($stomp, string $extensionType)
4145
{
4246
if ($stomp instanceof BufferedStompClient) {
4347
$this->stomp = $stomp;
@@ -47,7 +51,8 @@ public function __construct($stomp, $useExchangePrefix = true)
4751
throw new \InvalidArgumentException('The stomp argument must be either BufferedStompClient or callable that return BufferedStompClient.');
4852
}
4953

50-
$this->useExchangePrefix = $useExchangePrefix;
54+
$this->extensionType = $extensionType;
55+
$this->useExchangePrefix = ExtensionType::RABBITMQ === $extensionType;
5156
}
5257

5358
/**
@@ -64,7 +69,7 @@ public function createMessage(string $body = '', array $properties = [], array $
6469
public function createQueue(string $name): Queue
6570
{
6671
if (0 !== strpos($name, '/')) {
67-
$destination = new StompDestination();
72+
$destination = new StompDestination($this->extensionType);
6873
$destination->setType(StompDestination::TYPE_QUEUE);
6974
$destination->setStompName($name);
7075

@@ -91,7 +96,7 @@ public function createTemporaryQueue(): Queue
9196
public function createTopic(string $name): Topic
9297
{
9398
if (0 !== strpos($name, '/')) {
94-
$destination = new StompDestination();
99+
$destination = new StompDestination($this->extensionType);
95100
$destination->setType($this->useExchangePrefix ? StompDestination::TYPE_EXCHANGE : StompDestination::TYPE_TOPIC);
96101
$destination->setStompName($name);
97102

@@ -151,7 +156,7 @@ public function createDestination(string $destination): StompDestination
151156
$routingKey = $pieces[1];
152157
}
153158

154-
$destination = new StompDestination();
159+
$destination = new StompDestination($this->extensionType);
155160
$destination->setType($type);
156161
$destination->setStompName($name);
157162
$destination->setRoutingKey($routingKey);
@@ -199,10 +204,7 @@ public function getStomp(): BufferedStompClient
199204
if (false == $this->stomp) {
200205
$stomp = call_user_func($this->stompFactory);
201206
if (false == $stomp instanceof BufferedStompClient) {
202-
throw new \LogicException(sprintf(
203-
'The factory must return instance of BufferedStompClient. It returns %s',
204-
is_object($stomp) ? get_class($stomp) : gettype($stomp)
205-
));
207+
throw new \LogicException(sprintf('The factory must return instance of BufferedStompClient. It returns %s', is_object($stomp) ? get_class($stomp) : gettype($stomp)));
206208
}
207209

208210
$this->stomp = $stomp;

pkg/stomp/StompDestination.php

+16-1
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,25 @@ class StompDestination implements Topic, Queue
3939
* @var array
4040
*/
4141
private $headers;
42+
/**
43+
* @var string
44+
*/
45+
private $extensionType;
4246

43-
public function __construct()
47+
public function __construct(string $extensionType)
4448
{
4549
$this->headers = [
4650
self::HEADER_DURABLE => false,
4751
self::HEADER_AUTO_DELETE => true,
4852
self::HEADER_EXCLUSIVE => false,
4953
];
54+
55+
$this->extensionType = $extensionType;
56+
}
57+
58+
public function getExtensionType(): string
59+
{
60+
return $this->extensionType;
5061
}
5162

5263
public function getStompName(): string
@@ -65,6 +76,10 @@ public function getQueueName(): string
6576
throw new \LogicException('Destination name is not set');
6677
}
6778

79+
if (ExtensionType::ARTEMIS === $this->extensionType) {
80+
return $this->getStompName();
81+
}
82+
6883
$name = '/'.$this->getType().'/'.$this->getStompName();
6984

7085
if ($this->getRoutingKey()) {

pkg/stomp/Tests/StompConsumerTest.php

+2-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Enqueue\Stomp\Tests;
44

55
use Enqueue\Stomp\BufferedStompClient;
6+
use Enqueue\Stomp\ExtensionType;
67
use Enqueue\Stomp\StompConsumer;
78
use Enqueue\Stomp\StompDestination;
89
use Enqueue\Stomp\StompMessage;
@@ -557,7 +558,7 @@ private function createStompClientMock()
557558

558559
private function createDummyDestination(): StompDestination
559560
{
560-
$destination = new StompDestination();
561+
$destination = new StompDestination(ExtensionType::RABBITMQ);
561562
$destination->setStompName('aName');
562563
$destination->setType(StompDestination::TYPE_QUEUE);
563564

0 commit comments

Comments
 (0)