Skip to content

Commit 0202fe9

Browse files
authored
Merge pull request RasaHQ#5492 from RasaHQ/pika-old-queue-arg-fix
fix RabbitMQ 'queue' argument usage
2 parents f16d517 + 985052e commit 0202fe9

File tree

3 files changed

+21
-19
lines changed

3 files changed

+21
-19
lines changed

changelog/5492.bugfix.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
Fix an issue where the deprecated ``queue`` parameter for the :ref:`event-brokers-pika`
2+
was ignored and Rasa Open Source published the events to the ``rasa_core_events``
3+
queue instead. Note that this does not change the fact that the ``queue`` argument
4+
is deprecated in favor of the ``queues`` argument.

rasa/core/brokers/pika.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
logger = logging.getLogger(__name__)
2828

2929
RABBITMQ_EXCHANGE = "rasa-exchange"
30+
DEFAULT_QUEUE_NAME = "rasa_core_events"
3031

3132

3233
def initialise_pika_connection(
@@ -224,7 +225,7 @@ def __init__(
224225
username: Text,
225226
password: Text,
226227
port: Union[int, Text] = 5672,
227-
queues: Union[List[Text], Tuple[Text], Text, None] = ("rasa_core_events",),
228+
queues: Union[List[Text], Tuple[Text], Text, None] = None,
228229
should_keep_unpublished_messages: bool = True,
229230
raise_on_failure: bool = False,
230231
log_level: Union[Text, int] = os.environ.get(
@@ -278,7 +279,7 @@ def rasa_environment(self) -> Optional[Text]:
278279

279280
@staticmethod
280281
def _get_queues_from_args(
281-
queues_arg: Union[List[Text], Tuple[Text], Text, None], kwargs: Any,
282+
queues_arg: Union[List[Text], Tuple[Text], Text, None], kwargs: Any
282283
) -> Union[List[Text], Tuple[Text]]:
283284
"""Get queues for this event broker.
284285
@@ -326,12 +327,15 @@ def _get_queues_from_args(
326327
if queue_arg:
327328
return queue_arg # pytype: disable=bad-return-type
328329

329-
raise ValueError(
330-
f"Could not initialise `PikaEventBroker` due to invalid "
331-
f"`queues` or `queue` argument in constructor. See "
332-
f"{DOCS_URL_PIKA_EVENT_BROKER}."
330+
raise_warning(
331+
f"No `queues` or `queue` argument provided. It is suggested to "
332+
f"explicitly specify a queue as described in "
333+
f"{DOCS_URL_PIKA_EVENT_BROKER}. "
334+
f"Using the default queue '{DEFAULT_QUEUE_NAME}' for now."
333335
)
334336

337+
return [DEFAULT_QUEUE_NAME]
338+
335339
@classmethod
336340
def from_endpoint_config(
337341
cls, broker_config: Optional["EndpointConfig"]
@@ -399,7 +403,7 @@ def _run_pika_io_loop(self) -> None:
399403
self._pika_connection.ioloop.start()
400404

401405
def is_ready(
402-
self, attempts: int = 1000, wait_time_between_attempts_in_seconds: float = 0.01,
406+
self, attempts: int = 1000, wait_time_between_attempts_in_seconds: float = 0.01
403407
) -> bool:
404408
"""Spin until the pika channel is open.
405409

tests/core/test_broker.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from rasa.core.brokers.broker import EventBroker
99
from rasa.core.brokers.file import FileEventBroker
1010
from rasa.core.brokers.kafka import KafkaEventBroker
11-
from rasa.core.brokers.pika import PikaEventBroker
11+
from rasa.core.brokers.pika import PikaEventBroker, DEFAULT_QUEUE_NAME
1212
from rasa.core.brokers.sql import SQLEventBroker
1313
from rasa.core.events import Event, Restarted, SlotSet, UserUttered
1414
from rasa.utils.endpoints import EndpointConfig, read_endpoint_config
@@ -53,9 +53,9 @@ def test_pika_message_property_app_id(monkeypatch: MonkeyPatch):
5353
"queue_arg,queues_arg,expected,warning",
5454
[
5555
# default case
56-
(None, ["rasa_core_events"], ["rasa_core_events"], None),
56+
(None, ["q1"], ["q1"], None),
5757
# only provide `queue`
58-
("rasa_core_events", None, ["rasa_core_events"], FutureWarning),
58+
("q1", None, ["q1"], FutureWarning),
5959
# supplying a list for `queue` works too
6060
(["q1", "q2"], None, ["q1", "q2"], FutureWarning),
6161
# `queues` arg supplied, takes precedence
@@ -64,11 +64,13 @@ def test_pika_message_property_app_id(monkeypatch: MonkeyPatch):
6464
("q1", ["q2", "q3"], ["q2", "q3"], FutureWarning),
6565
# only supplying `queues` works, and queues is a string
6666
(None, "q1", ["q1"], None),
67+
# no queues provided. Use default queue and print warning.
68+
(None, None, [DEFAULT_QUEUE_NAME], UserWarning),
6769
],
6870
)
6971
def test_pika_queues_from_args(
70-
queues_arg: Union[Text, List[Text], None],
7172
queue_arg: Union[Text, List[Text], None],
73+
queues_arg: Union[Text, List[Text], None],
7274
expected: List[Text],
7375
warning: Optional[Type[Warning]],
7476
monkeypatch: MonkeyPatch,
@@ -82,14 +84,6 @@ def test_pika_queues_from_args(
8284
assert pika_producer.queues == expected
8385

8486

85-
def test_pika_invalid_queues_argument(monkeypatch: MonkeyPatch):
86-
# patch PikaEventBroker so it doesn't try to connect to RabbitMQ on init
87-
monkeypatch.setattr(PikaEventBroker, "_run_pika", lambda _: None)
88-
89-
with pytest.raises(ValueError):
90-
PikaEventBroker("", "", "", queues=None, queue=None)
91-
92-
9387
def test_no_broker_in_config():
9488
cfg = read_endpoint_config(DEFAULT_ENDPOINTS_FILE, "event_broker")
9589

0 commit comments

Comments
 (0)