Skip to content

Commit b477eb9

Browse files
authored
Merge pull request RasaHQ#4715 from RasaHQ/rabbit-app-id-property
RabbitMQ app_id message property
2 parents 6b9016c + e57760c commit b477eb9

File tree

3 files changed

+70
-12
lines changed

3 files changed

+70
-12
lines changed

CHANGELOG.rst

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,13 @@ This project adheres to `Semantic Versioning`_ starting with version 1.0.
1212

1313
Added
1414
-----
15+
- ``PikaEventProducer`` adds the RabbitMQ ``App ID`` message property to published
16+
messages with the value of the ``RASA_ENVIRONMENT`` environment variable. The
17+
message property will not be assigned if this environment variable isn't set.
1518

1619
Changed
1720
-------
18-
- updated mattermost connector documentation to be more clear.
21+
- Updated Mattermost connector documentation to be more clear.
1922
- Updated format strings to f-strings where appropriate.
2023

2124
Removed

rasa/core/brokers/pika.py

Lines changed: 47 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,11 @@
1212
from rasa.constants import ENV_LOG_LEVEL_LIBRARIES, DEFAULT_LOG_LEVEL_LIBRARIES
1313
from rasa.core.brokers.event_channel import EventChannel
1414
from rasa.utils.endpoints import EndpointConfig
15+
from rasa.utils.io import DEFAULT_ENCODING
1516

1617
if typing.TYPE_CHECKING:
1718
from pika.adapters.blocking_connection import BlockingChannel
18-
from pika import SelectConnection, BlockingConnection
19+
from pika import SelectConnection, BlockingConnection, BasicProperties
1920
from pika.channel import Channel
2021
from pika.connection import Parameters, Connection
2122

@@ -139,16 +140,16 @@ def initialise_pika_channel(
139140
"""Initialise a Pika channel with a durable queue.
140141
141142
Args:
142-
host: Pika host
143-
queue: Pika queue to declare
144-
username: username for authentication with Pika host
145-
password: password for authentication with Pika host
146-
port: port of the Pika host
147-
connection_attempts: number of channel attempts before giving up
148-
retry_delay_in_seconds: delay in seconds between channel attempts
143+
host: Pika host.
144+
queue: Pika queue to declare.
145+
username: Username for authentication with Pika host.
146+
password: Password for authentication with Pika host.
147+
port: port of the Pika host.
148+
connection_attempts: Number of channel attempts before giving up.
149+
retry_delay_in_seconds: Delay in seconds between channel attempts.
149150
150151
Returns:
151-
Pika `BlockingChannel` with declared queue
152+
Pika `BlockingChannel` with declared queue.
152153
153154
"""
154155

@@ -206,6 +207,17 @@ def __init__(
206207
ENV_LOG_LEVEL_LIBRARIES, DEFAULT_LOG_LEVEL_LIBRARIES
207208
),
208209
):
210+
"""RabbitMQ event producer.
211+
212+
Args:
213+
host: Pika host.
214+
username: Username for authentication with Pika host.
215+
password: Password for authentication with Pika host.
216+
port: port of the Pika host.
217+
queue: Pika queue to declare.
218+
loglevel: Logging level.
219+
220+
"""
209221
logging.getLogger("pika").setLevel(loglevel)
210222

211223
self.queue = queue
@@ -224,6 +236,10 @@ def __del__(self) -> None:
224236
close_pika_channel(self.channel)
225237
close_pika_connection(self.channel.connection)
226238

239+
@property
240+
def rasa_environment(self) -> Optional[Text]:
241+
return os.environ.get("RASA_ENVIRONMENT")
242+
227243
@classmethod
228244
def from_endpoint_config(
229245
cls, broker_config: Optional["EndpointConfig"]
@@ -285,7 +301,6 @@ def publish(
285301
body = json.dumps(event)
286302

287303
while retries:
288-
# noinspection PyBroadException
289304
try:
290305
self._publish(body)
291306
return
@@ -304,6 +319,22 @@ def publish(
304319
"'{}':\n{}".format(self.queue, self.host, body)
305320
)
306321

322+
@property
323+
def _message_properties(self) -> "BasicProperties":
324+
"""Create RabbitMQ message properties.
325+
326+
Returns:
327+
pika.spec.BasicProperties with the `RASA_ENVIRONMENT` environment
328+
variable as the properties' `app_id` value. If this variable is unset, empty
329+
pika.spec.BasicProperties.
330+
331+
"""
332+
from pika.spec import BasicProperties
333+
334+
kwargs = {"app_id": self.rasa_environment} if self.rasa_environment else {}
335+
336+
return BasicProperties(**kwargs)
337+
307338
def _publish(self, body: Text) -> None:
308339
if self._pika_connection.is_closed:
309340
# Try to reset connection
@@ -317,7 +348,12 @@ def _publish(self, body: Text) -> None:
317348
)
318349
self._unpublished_messages.append(body)
319350
else:
320-
self.channel.basic_publish("", self.queue, body)
351+
self.channel.basic_publish(
352+
"",
353+
self.queue,
354+
body.encode(DEFAULT_ENCODING),
355+
properties=self._message_properties,
356+
)
321357

322358
logger.debug(
323359
f"Published Pika events to queue '{self.queue}' on host "

tests/core/test_broker.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
import json
2+
from unittest.mock import patch
3+
4+
from _pytest.monkeypatch import MonkeyPatch
25

36
import rasa.core.brokers.utils as broker_utils
47
from rasa.core.brokers.file_producer import FileProducer
@@ -28,6 +31,22 @@ def test_pika_broker_from_config():
2831
assert actual.queue == "queue"
2932

3033

34+
# noinspection PyProtectedMember
35+
def test_pika_message_property_app_id(monkeypatch: MonkeyPatch):
36+
# patch PikaProducer so it doesn't try to connect to RabbitMQ on init
37+
with patch.object(PikaProducer, "_run_pika", lambda _: None):
38+
pika_producer = PikaProducer("", "", "")
39+
40+
# unset RASA_ENVIRONMENT env var results in empty App ID
41+
monkeypatch.delenv("RASA_ENVIRONMENT", raising=False)
42+
assert not pika_producer._message_properties.app_id
43+
44+
# setting it to some value results in that value as the App ID
45+
rasa_environment = "some-test-environment"
46+
monkeypatch.setenv("RASA_ENVIRONMENT", rasa_environment)
47+
assert pika_producer._message_properties.app_id == rasa_environment
48+
49+
3150
def test_no_broker_in_config():
3251
cfg = read_endpoint_config(DEFAULT_ENDPOINTS_FILE, "event_broker")
3352

0 commit comments

Comments
 (0)