Skip to content

Commit 7fc8cca

Browse files
committed
[consumption] Implement only required extension interfaces. Remvoe EmptyExtensionTrait.
1 parent c31f8b4 commit 7fc8cca

26 files changed

+212
-197
lines changed

docs/bundle/consumption_extension.md

+2-5
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,11 @@ Let's first create an extension itself:
88
// src/AppBundle/Enqueue;
99
namespace AppBundle\Enqueue;
1010

11-
use Enqueue\Consumption\ExtensionInterface;
12-
use Enqueue\Consumption\EmptyExtensionTrait;
11+
use Enqueue\Consumption\PostMessageReceivedExtensionInterface;
1312
use Enqueue\Consumption\Context\PostMessageReceived;
1413

15-
class CountProcessedMessagesExtension implements ExtensionInterface
14+
class CountProcessedMessagesExtension implements PostMessageReceivedExtensionInterface
1615
{
17-
use EmptyExtensionTrait;
18-
1916
private $processedMessages = 0;
2017

2118
public function onPostMessageReceived(PostMessageReceived $context): void

pkg/enqueue-bundle/Consumption/Extension/DoctrineClearIdentityMapExtension.php

+2-5
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,11 @@
33
namespace Enqueue\Bundle\Consumption\Extension;
44

55
use Enqueue\Consumption\Context\MessageReceived;
6-
use Enqueue\Consumption\EmptyExtensionTrait;
7-
use Enqueue\Consumption\ExtensionInterface;
6+
use Enqueue\Consumption\MessageReceivedExtensionInterface;
87
use Symfony\Bridge\Doctrine\RegistryInterface;
98

10-
class DoctrineClearIdentityMapExtension implements ExtensionInterface
9+
class DoctrineClearIdentityMapExtension implements MessageReceivedExtensionInterface
1110
{
12-
use EmptyExtensionTrait;
13-
1411
/**
1512
* @var RegistryInterface
1613
*/

pkg/enqueue-bundle/Consumption/Extension/DoctrinePingConnectionExtension.php

+2-5
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,11 @@
44

55
use Doctrine\DBAL\Connection;
66
use Enqueue\Consumption\Context\MessageReceived;
7-
use Enqueue\Consumption\EmptyExtensionTrait;
8-
use Enqueue\Consumption\ExtensionInterface;
7+
use Enqueue\Consumption\MessageReceivedExtensionInterface;
98
use Symfony\Bridge\Doctrine\RegistryInterface;
109

11-
class DoctrinePingConnectionExtension implements ExtensionInterface
10+
class DoctrinePingConnectionExtension implements MessageReceivedExtensionInterface
1211
{
13-
use EmptyExtensionTrait;
14-
1512
/**
1613
* @var RegistryInterface
1714
*/

pkg/enqueue/Client/ConsumptionExtension/DelayRedeliveredMessageExtension.php

+2-5
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,11 @@
44

55
use Enqueue\Client\DriverInterface;
66
use Enqueue\Consumption\Context\MessageReceived;
7-
use Enqueue\Consumption\EmptyExtensionTrait;
8-
use Enqueue\Consumption\ExtensionInterface;
7+
use Enqueue\Consumption\MessageReceivedExtensionInterface;
98
use Enqueue\Consumption\Result;
109

11-
class DelayRedeliveredMessageExtension implements ExtensionInterface
10+
class DelayRedeliveredMessageExtension implements MessageReceivedExtensionInterface
1211
{
13-
use EmptyExtensionTrait;
14-
1512
const PROPERTY_REDELIVER_COUNT = 'enqueue.redelivery_count';
1613

1714
/**

pkg/enqueue/Client/ConsumptionExtension/ExclusiveCommandExtension.php

+2-5
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,10 @@
66
use Enqueue\Client\DriverInterface;
77
use Enqueue\Client\Route;
88
use Enqueue\Consumption\Context\MessageReceived;
9-
use Enqueue\Consumption\EmptyExtensionTrait as ConsumptionEmptyExtensionTrait;
10-
use Enqueue\Consumption\ExtensionInterface as ConsumptionExtensionInterface;
9+
use Enqueue\Consumption\MessageReceivedExtensionInterface;
1110

12-
final class ExclusiveCommandExtension implements ConsumptionExtensionInterface
11+
final class ExclusiveCommandExtension implements MessageReceivedExtensionInterface
1312
{
14-
use ConsumptionEmptyExtensionTrait;
15-
1613
/**
1714
* @var DriverInterface
1815
*/

pkg/enqueue/Client/ConsumptionExtension/FlushSpoolProducerExtension.php

+3-5
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,11 @@
55
use Enqueue\Client\SpoolProducer;
66
use Enqueue\Consumption\Context\End;
77
use Enqueue\Consumption\Context\PostMessageReceived;
8-
use Enqueue\Consumption\EmptyExtensionTrait;
9-
use Enqueue\Consumption\ExtensionInterface;
8+
use Enqueue\Consumption\EndExtensionInterface;
9+
use Enqueue\Consumption\PostMessageReceivedExtensionInterface;
1010

11-
class FlushSpoolProducerExtension implements ExtensionInterface
11+
class FlushSpoolProducerExtension implements PostMessageReceivedExtensionInterface, EndExtensionInterface
1212
{
13-
use EmptyExtensionTrait;
14-
1513
/**
1614
* @var SpoolProducer
1715
*/

pkg/enqueue/Client/ConsumptionExtension/SetRouterPropertiesExtension.php

+2-5
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,10 @@
55
use Enqueue\Client\Config;
66
use Enqueue\Client\DriverInterface;
77
use Enqueue\Consumption\Context\MessageReceived;
8-
use Enqueue\Consumption\EmptyExtensionTrait;
9-
use Enqueue\Consumption\ExtensionInterface;
8+
use Enqueue\Consumption\MessageReceivedExtensionInterface;
109

11-
class SetRouterPropertiesExtension implements ExtensionInterface
10+
class SetRouterPropertiesExtension implements MessageReceivedExtensionInterface
1211
{
13-
use EmptyExtensionTrait;
14-
1512
/**
1613
* @var DriverInterface
1714
*/

pkg/enqueue/Client/ConsumptionExtension/SetupBrokerExtension.php

+2-5
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,10 @@
44

55
use Enqueue\Client\DriverInterface;
66
use Enqueue\Consumption\Context\Start;
7-
use Enqueue\Consumption\EmptyExtensionTrait;
8-
use Enqueue\Consumption\ExtensionInterface;
7+
use Enqueue\Consumption\StartExtensionInterface;
98

10-
class SetupBrokerExtension implements ExtensionInterface
9+
class SetupBrokerExtension implements StartExtensionInterface
1110
{
12-
use EmptyExtensionTrait;
13-
1411
/**
1512
* @var DriverInterface
1613
*/

pkg/enqueue/Consumption/ChainExtension.php

+101-18
Original file line numberDiff line numberDiff line change
@@ -14,83 +14,166 @@
1414

1515
class ChainExtension implements ExtensionInterface
1616
{
17-
use EmptyExtensionTrait;
18-
19-
/**
20-
* @var ExtensionInterface[]
21-
*/
22-
private $extensions;
17+
private $startExtensions;
18+
private $preSubscribeExtensions;
19+
private $preConsumeExtensions;
20+
private $messageReceivedExtensions;
21+
private $messageResultExtensions;
22+
private $postMessageReceivedExtensions;
23+
private $processorExceptionExtensions;
24+
private $postConsumeExtensions;
25+
private $endExtensions;
2326

2427
/**
2528
* @param ExtensionInterface[] $extensions
2629
*/
2730
public function __construct(array $extensions)
2831
{
29-
$this->extensions = [];
30-
array_walk($extensions, function (ExtensionInterface $extension) {
31-
$this->extensions[] = $extension;
32+
$this->startExtensions = [];
33+
$this->preSubscribeExtensions = [];
34+
$this->preConsumeExtensions = [];
35+
$this->messageReceivedExtensions = [];
36+
$this->messageResultExtensions = [];
37+
$this->postMessageReceivedExtensions = [];
38+
$this->processorExceptionExtensions = [];
39+
$this->postConsumeExtensions = [];
40+
$this->endExtensions = [];
41+
42+
array_walk($extensions, function ($extension) {
43+
if ($extension instanceof ExtensionInterface) {
44+
$this->startExtensions[] = $extension;
45+
$this->preSubscribeExtensions[] = $extension;
46+
$this->preConsumeExtensions[] = $extension;
47+
$this->messageReceivedExtensions[] = $extension;
48+
$this->messageResultExtensions[] = $extension;
49+
$this->postMessageReceivedExtensions[] = $extension;
50+
$this->processorExceptionExtensions[] = $extension;
51+
$this->postConsumeExtensions[] = $extension;
52+
$this->endExtensions[] = $extension;
53+
54+
return;
55+
}
56+
57+
$extensionValid = false;
58+
if ($extension instanceof StartExtensionInterface) {
59+
$this->startExtensions[] = $extension;
60+
61+
$extensionValid = true;
62+
}
63+
64+
if ($extension instanceof PreSubscribeExtensionInterface) {
65+
$this->preSubscribeExtensions[] = $extension;
66+
67+
$extensionValid = true;
68+
}
69+
70+
if ($extension instanceof PreConsumeExtensionInterface) {
71+
$this->preConsumeExtensions[] = $extension;
72+
73+
$extensionValid = true;
74+
}
75+
76+
if ($extension instanceof MessageReceivedExtensionInterface) {
77+
$this->messageReceivedExtensions[] = $extension;
78+
79+
$extensionValid = true;
80+
}
81+
82+
if ($extension instanceof MessageResultExtensionInterface) {
83+
$this->messageResultExtensions[] = $extension;
84+
85+
$extensionValid = true;
86+
}
87+
88+
if ($extension instanceof ProcessorExceptionExtensionInterface) {
89+
$this->processorExceptionExtensions[] = $extension;
90+
91+
$extensionValid = true;
92+
}
93+
94+
if ($extension instanceof PostMessageReceivedExtensionInterface) {
95+
$this->postMessageReceivedExtensions[] = $extension;
96+
97+
$extensionValid = true;
98+
}
99+
100+
if ($extension instanceof PostConsumeExtensionInterface) {
101+
$this->postConsumeExtensions[] = $extension;
102+
103+
$extensionValid = true;
104+
}
105+
106+
if ($extension instanceof EndExtensionInterface) {
107+
$this->endExtensions[] = $extension;
108+
109+
$extensionValid = true;
110+
}
111+
112+
if (false == $extensionValid) {
113+
throw new \LogicException('Invalid extension given');
114+
}
32115
});
33116
}
34117

35118
public function onStart(Start $context): void
36119
{
37-
foreach ($this->extensions as $extension) {
120+
foreach ($this->startExtensions as $extension) {
38121
$extension->onStart($context);
39122
}
40123
}
41124

42125
public function onPreSubscribe(PreSubscribe $context): void
43126
{
44-
foreach ($this->extensions as $extension) {
127+
foreach ($this->preSubscribeExtensions as $extension) {
45128
$extension->onPreSubscribe($context);
46129
}
47130
}
48131

49132
public function onPreConsume(PreConsume $context): void
50133
{
51-
foreach ($this->extensions as $extension) {
134+
foreach ($this->preConsumeExtensions as $extension) {
52135
$extension->onPreConsume($context);
53136
}
54137
}
55138

56139
public function onMessageReceived(MessageReceived $context): void
57140
{
58-
foreach ($this->extensions as $extension) {
141+
foreach ($this->messageReceivedExtensions as $extension) {
59142
$extension->onMessageReceived($context);
60143
}
61144
}
62145

63146
public function onResult(MessageResult $context): void
64147
{
65-
foreach ($this->extensions as $extension) {
148+
foreach ($this->messageResultExtensions as $extension) {
66149
$extension->onResult($context);
67150
}
68151
}
69152

70153
public function onProcessorException(ProcessorException $context): void
71154
{
72-
foreach ($this->extensions as $extension) {
155+
foreach ($this->processorExceptionExtensions as $extension) {
73156
$extension->onProcessorException($context);
74157
}
75158
}
76159

77160
public function onPostMessageReceived(PostMessageReceived $context): void
78161
{
79-
foreach ($this->extensions as $extension) {
162+
foreach ($this->postMessageReceivedExtensions as $extension) {
80163
$extension->onPostMessageReceived($context);
81164
}
82165
}
83166

84167
public function onPostConsume(PostConsume $context): void
85168
{
86-
foreach ($this->extensions as $extension) {
169+
foreach ($this->postConsumeExtensions as $extension) {
87170
$extension->onPostConsume($context);
88171
}
89172
}
90173

91174
public function onEnd(End $context): void
92175
{
93-
foreach ($this->extensions as $extension) {
176+
foreach ($this->endExtensions as $extension) {
94177
$extension->onEnd($context);
95178
}
96179
}

pkg/enqueue/Consumption/EmptyExtensionTrait.php

-52
This file was deleted.

pkg/enqueue/Consumption/Extension/LimitConsumedMessagesExtension.php

+3-5
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,12 @@
44

55
use Enqueue\Consumption\Context\PostMessageReceived;
66
use Enqueue\Consumption\Context\PreConsume;
7-
use Enqueue\Consumption\EmptyExtensionTrait;
8-
use Enqueue\Consumption\ExtensionInterface;
7+
use Enqueue\Consumption\PostMessageReceivedExtensionInterface;
8+
use Enqueue\Consumption\PreConsumeExtensionInterface;
99
use Psr\Log\LoggerInterface;
1010

11-
class LimitConsumedMessagesExtension implements ExtensionInterface
11+
class LimitConsumedMessagesExtension implements PreConsumeExtensionInterface, PostMessageReceivedExtensionInterface
1212
{
13-
use EmptyExtensionTrait;
14-
1513
/**
1614
* @var int
1715
*/

0 commit comments

Comments
 (0)