File tree Expand file tree Collapse file tree 2 files changed +44
-10
lines changed Expand file tree Collapse file tree 2 files changed +44
-10
lines changed Original file line number Diff line number Diff line change @@ -45,6 +45,11 @@ class RdKafkaContext implements Context
45
45
*/
46
46
private $ kafkaConsumers ;
47
47
48
+ /**
49
+ * @var RdKafkaConsumer[]
50
+ */
51
+ private $ rdKafkaConsumers ;
52
+
48
53
/**
49
54
* @param array $config
50
55
*/
@@ -102,20 +107,26 @@ public function createConsumer(Destination $destination): Consumer
102
107
{
103
108
InvalidDestinationException::assertDestinationInstanceOf ($ destination , RdKafkaTopic::class);
104
109
105
- $ this -> kafkaConsumers [] = $ kafkaConsumer = new KafkaConsumer ( $ this -> getConf () );
110
+ $ queueName = $ destination -> getQueueName ( );
106
111
107
- $ consumer = new RdKafkaConsumer (
108
- $ kafkaConsumer ,
109
- $ this ,
110
- $ destination ,
111
- $ this ->getSerializer ()
112
- );
112
+ if (!isset ($ this ->rdKafkaConsumers [$ queueName ])) {
113
+ $ this ->kafkaConsumers [] = $ kafkaConsumer = new KafkaConsumer ($ this ->getConf ());
114
+
115
+ $ consumer = new RdKafkaConsumer (
116
+ $ kafkaConsumer ,
117
+ $ this ,
118
+ $ destination ,
119
+ $ this ->getSerializer ()
120
+ );
121
+
122
+ if (isset ($ this ->config ['commit_async ' ])) {
123
+ $ consumer ->setCommitAsync ($ this ->config ['commit_async ' ]);
124
+ }
113
125
114
- if (isset ($ this ->config ['commit_async ' ])) {
115
- $ consumer ->setCommitAsync ($ this ->config ['commit_async ' ]);
126
+ $ this ->rdKafkaConsumers [$ queueName ] = $ consumer ;
116
127
}
117
128
118
- return $ consumer ;
129
+ return $ this -> rdKafkaConsumers [ $ queueName ] ;
119
130
}
120
131
121
132
public function close (): void
Original file line number Diff line number Diff line change @@ -69,4 +69,27 @@ public function testShouldInjectItsSerializerToConsumer()
69
69
70
70
$ this ->assertSame ($ context ->getSerializer (), $ producer ->getSerializer ());
71
71
}
72
+
73
+ public function testShouldNotCreateConsumerTwice ()
74
+ {
75
+ $ context = new RdKafkaContext ([]);
76
+ $ queue = $ context ->createQueue ('aQueue ' );
77
+
78
+ $ consumer = $ context ->createConsumer ($ queue );
79
+ $ consumer2 = $ context ->createConsumer ($ queue );
80
+
81
+ $ this ->assertSame ($ consumer , $ consumer2 );
82
+ }
83
+
84
+ public function testShouldCreateTwoConsumers ()
85
+ {
86
+ $ context = new RdKafkaContext ([]);
87
+ $ queueA = $ context ->createQueue ('aQueue ' );
88
+ $ queueB = $ context ->createQueue ('aQueue ' );
89
+
90
+ $ consumer = $ context ->createConsumer ($ queueA );
91
+ $ consumer2 = $ context ->createConsumer ($ queueB );
92
+
93
+ $ this ->assertNotSame ($ consumer , $ consumer2 );
94
+ }
72
95
}
You can’t perform that action at this time.
0 commit comments