Skip to content

Commit caf0afa

Browse files
authored
Merge pull request #959 from Steveb-p/phprdkafka-4
Compatibility with Phprdkafka 4.0
2 parents 61177d7 + 05a5073 commit caf0afa

8 files changed

+46
-29
lines changed

.travis.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ script:
7878
- if [ "$PHPSTAN" = true ] && [ ! -z "${PKG_PHP_CHANGED_FILES}" ]; then docker run --workdir="/mqdev" -v "`pwd`:/mqdev" --rm enqueue/dev:latest php -d memory_limit=1024M bin/phpstan analyse -l 1 -c phpstan.neon -- ${PKG_PHP_CHANGED_FILES[@]} ; fi
7979
- if [ "$UNIT_TESTS" = true ]; then bin/phpunit --exclude-group=functional; fi
8080
- if [ "$FUNCTIONAL_TESTS" = true ]; then bin/test.sh --exclude-group=rdkafka; fi
81-
- if [ "RDKAFKA_TESTS" = true ]; then bin/test.sh --group=rdkafka; fi
81+
- if [ "$RDKAFKA_TESTS" = true ]; then bin/test.sh --group=rdkafka; fi
8282

8383
notifications:
8484
webhooks:

composer.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
"doctrine/doctrine-bundle": "~1.2|^2",
6060
"doctrine/mongodb-odm-bundle": "^3.5|^4",
6161
"alcaeus/mongo-php-adapter": "^1.0",
62-
"kwn/php-rdkafka-stubs": "^1.0.2",
62+
"kwn/php-rdkafka-stubs": "^1.0.2 | ^2.0",
6363
"friendsofphp/php-cs-fixer": "^2"
6464
},
6565
"autoload": {

docker-compose.yml

+3-3
Original file line numberDiff line numberDiff line change
@@ -99,13 +99,13 @@ services:
9999
- '2181:2181'
100100

101101
kafka:
102-
image: 'wurstmeister/kafka:0.10.2.1'
102+
image: 'wurstmeister/kafka'
103103
ports:
104104
- '9092:9092'
105105
environment:
106106
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
107-
volumes:
108-
- '/var/run/docker.sock:/var/run/docker.sock'
107+
KAFKA_ADVERTISED_HOST_NAME: kafka
108+
KAFKA_ADVERTISED_PORT: 9092
109109

110110
google-pubsub:
111111
image: 'google/cloud-sdk:latest'

pkg/rdkafka/RdKafkaConnectionFactory.php

+1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class RdKafkaConnectionFactory implements ConnectionFactory
2828
* 'partitioner' => null, // https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka-topicconf.setpartitioner.html
2929
* 'log_level' => null,
3030
* 'commit_async' => false,
31+
* 'shutdown_timeout' => -1, // https://github.com/arnaud-lb/php-rdkafka#proper-shutdown
3132
* ]
3233
*
3334
* or

pkg/rdkafka/RdKafkaContext.php

+23-18
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class RdKafkaContext implements Context
3636
private $conf;
3737

3838
/**
39-
* @var Producer
39+
* @var RdKafkaProducer
4040
*/
4141
private $producer;
4242

@@ -50,9 +50,6 @@ class RdKafkaContext implements Context
5050
*/
5151
private $rdKafkaConsumers;
5252

53-
/**
54-
* @param array $config
55-
*/
5653
public function __construct(array $config)
5754
{
5855
$this->config = $config;
@@ -96,7 +93,23 @@ public function createTemporaryQueue(): Queue
9693
*/
9794
public function createProducer(): Producer
9895
{
99-
return new RdKafkaProducer($this->getProducer(), $this->getSerializer());
96+
if (!isset($this->producer)) {
97+
$producer = new VendorProducer($this->getConf());
98+
99+
if (isset($this->config['log_level'])) {
100+
$producer->setLogLevel($this->config['log_level']);
101+
}
102+
103+
$this->producer = new RdKafkaProducer($producer, $this->getSerializer());
104+
105+
// Once created RdKafkaProducer can store messages internally that need to be delivered before PHP shuts
106+
// down. Otherwise, we are bound to lose messages in transit.
107+
// Note that it is generally preferable to call "close" method explicitly before shutdown starts, since
108+
// otherwise we might not have access to some objects, like database connections.
109+
register_shutdown_function([$this->producer, 'flush'], $this->config['shutdown_timeout'] ?? -1);
110+
}
111+
112+
return $this->producer;
100113
}
101114

102115
/**
@@ -139,6 +152,11 @@ public function close(): void
139152
foreach ($kafkaConsumers as $kafkaConsumer) {
140153
$kafkaConsumer->unsubscribe();
141154
}
155+
156+
// Compatibility with phprdkafka 4.0.
157+
if (isset($this->producer)) {
158+
$this->producer->flush($this->config['shutdown_timeout'] ?? -1);
159+
}
142160
}
143161

144162
public function createSubscriptionConsumer(): SubscriptionConsumer
@@ -163,19 +181,6 @@ public static function getLibrdKafkaVersion(): string
163181
return "$major.$minor.$patch";
164182
}
165183

166-
private function getProducer(): VendorProducer
167-
{
168-
if (null === $this->producer) {
169-
$this->producer = new VendorProducer($this->getConf());
170-
171-
if (isset($this->config['log_level'])) {
172-
$this->producer->setLogLevel($this->config['log_level']);
173-
}
174-
}
175-
176-
return $this->producer;
177-
}
178-
179184
private function getConf(): Conf
180185
{
181186
if (null === $this->conf) {

pkg/rdkafka/RdKafkaProducer.php

+8
Original file line numberDiff line numberDiff line change
@@ -111,4 +111,12 @@ public function getTimeToLive(): ?int
111111
{
112112
return null;
113113
}
114+
115+
public function flush(int $timeout): void
116+
{
117+
// Flush method is exposed in phprdkafka 4.0
118+
if (method_exists($this->producer, 'flush')) {
119+
$this->producer->flush($timeout);
120+
}
121+
}
114122
}

pkg/rdkafka/Tests/Spec/RdKafkaSendToAndReceiveFromTopicTest.php

+7-4
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,14 @@ public function test()
1919

2020
$topic = $this->createTopic($context, uniqid('', true));
2121

22-
$consumer = $context->createConsumer($topic);
23-
2422
$expectedBody = __CLASS__.time();
23+
$producer = $context->createProducer();
24+
$producer->send($topic, $context->createMessage($expectedBody));
25+
26+
// Calling close causes Producer to flush (wait for messages to be delivered to Kafka)
27+
$context->close();
28+
29+
$consumer = $context->createConsumer($topic);
2530

2631
$context->createProducer()->send($topic, $context->createMessage($expectedBody));
2732

@@ -48,8 +53,6 @@ protected function createContext()
4853

4954
$context = (new RdKafkaConnectionFactory($config))->createContext();
5055

51-
sleep(3);
52-
5356
return $context;
5457
}
5558
}

pkg/rdkafka/composer.json

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,15 @@
77
"license": "MIT",
88
"require": {
99
"php": "^7.1.3",
10-
"ext-rdkafka": "^3.0.3",
10+
"ext-rdkafka": "^3.0.3|^4.0",
1111
"queue-interop/queue-interop": "^0.8"
1212
},
1313
"require-dev": {
1414
"phpunit/phpunit": "~7.5",
1515
"enqueue/test": "0.10.x-dev",
1616
"enqueue/null": "0.10.x-dev",
1717
"queue-interop/queue-spec": "^0.6",
18-
"kwn/php-rdkafka-stubs": "^1.0.2"
18+
"kwn/php-rdkafka-stubs": "^1.0.2 | ^2.0"
1919
},
2020
"support": {
2121
"email": "[email protected]",

0 commit comments

Comments
 (0)