Skip to content

Commit

Permalink
Introduce feature flag message_containers_store_amqp_v1
Browse files Browse the repository at this point in the history
Prior to this commit test case
```
bazel test //deps/rabbit:amqp_client_SUITE-mixed -t- \
    --test_sharding_strategy=disabled --test_env \
    FOCUS="-group [cluster_size_3] -case quorum_queue_on_old_node"
```
was failing because `mc_amqp:size(#v1{})` was called on the old node
which doesn't understand the new AMQP on disk message format.

Even though the old 3.13 node never stored mc_amqp messages in classic
or quorum queues,
1. it will either need to understand the new mc_amqp message format, or
2. we should prevent the new format being sent to 3.13. nodes.

In this commit we decide for the 2nd solution.
In `mc:prepare(store, Msg)`, we convert the new mc_amqp format to
mc_amqpl which is guaranteed to be understood by the old node.
Note that `mc:prepare(store, Msg)` is not only stored before actual
storage, but already before the message is sent to the queue process
(which might be hosted by the old node).
The 2nd solution is easier to reason about over the 1st solution
because:
a) We don't have to backport code meant to be for 4.0 to 3.13, and
b) 3.13 is guaranteed to never store mc_amqp messages in classic or
   quorum queues, even in mixed version clusters.

The disadvantage of the 2nd solution is that messages are converted from
mc_amqp to mc_amqpl and back to mc_amqp if there is an AMQP sender and
AMQP receiver. However, this only happens while the new feature flag
is disabled during the rolling upgrade. In a certain sense, this is a
hybrid to how the AMQP 1.0 plugin worked in 3.13: Even though we don't
proxy via AMQP 0.9.1 anymore, we still convert to AMQP 0.9.1 (mc_amqpl)
messages when feature flag message_containers_store_amqp_v1 is disabled.
  • Loading branch information
ansd committed May 2, 2024
1 parent fc7f458 commit 0b51b8d
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 0 deletions.
8 changes: 8 additions & 0 deletions deps/rabbit/src/mc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,14 @@ last_death(BasicMsg) ->
mc_compat:last_death(BasicMsg).

-spec prepare(read | store, state()) -> state().
prepare(store, #?MODULE{protocol = mc_amqp} = State) ->
case rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1) of
true ->
State#?MODULE{data = mc_amqp:prepare(store, State#?MODULE.data)};
false ->
State1 = convert(mc_amqpl, State),
State1#?MODULE{data = mc_amqpl:prepare(store, State1#?MODULE.data)}
end;
prepare(For, #?MODULE{protocol = Proto,
data = Data} = State) ->
State#?MODULE{data = Proto:prepare(For, Data)};
Expand Down
7 changes: 7 additions & 0 deletions deps/rabbit/src/rabbit_core_ff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,10 @@
#{desc => "Credit API v2 between queue clients and queue processes",
stability => stable
}}).

-rabbit_feature_flag(
{message_containers_store_amqp_v1,
#{desc => "Support storing messages in message containers AMQP 1.0 disk format v1",
stability => stable,
depends_on => [message_containers]
}}).

0 comments on commit 0b51b8d

Please sign in to comment.