Skip to content

Commit

Permalink
Maintain footer
Browse files Browse the repository at this point in the history
Ensure footer gets deliverd to AMQP client as received from AMQP client
when feature flag message_containers_store_amqp_v1 is disabled.

Fixes test
```
bazel test //deps/rabbit:amqp_system_SUITE-mixed  -t- --test_sharding_strategy=disabled --test_env FOCUS="-group [dotnet] -case footer"
```
  • Loading branch information
ansd committed May 2, 2024
1 parent 81709d9 commit 2e15704
Showing 1 changed file with 39 additions and 27 deletions.
66 changes: 39 additions & 27 deletions deps/rabbit/src/mc_amqpl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
-define(AMQP10_PROPERTIES_HEADER, <<"x-amqp-1.0-properties">>).
-define(AMQP10_APP_PROPERTIES_HEADER, <<"x-amqp-1.0-app-properties">>).
-define(AMQP10_MESSAGE_ANNOTATIONS_HEADER, <<"x-amqp-1.0-message-annotations">>).
-define(AMQP10_FOOTER, <<"x-amqp-1.0-footer">>).
-define(PROTOMOD, rabbit_framing_amqp_0_9_1).
-define(CLASS_ID, 60).

Expand All @@ -56,30 +57,29 @@ init(#content{} = Content0) ->
{strip_header(Content, ?DELETED_HEADER), Anns}.

convert_from(mc_amqp, Sections, Env) ->
{H, MAnn, Prop, AProp, BodyRev} =
lists:foldl(
fun
(#'v1_0.header'{} = S, Acc) ->
setelement(1, Acc, S);
(#'v1_0.message_annotations'{} = S, Acc) ->
setelement(2, Acc, S);
(#'v1_0.properties'{} = S, Acc) ->
setelement(3, Acc, S);
(#'v1_0.application_properties'{} = S, Acc) ->
setelement(4, Acc, S);
(#'v1_0.delivery_annotations'{}, Acc) ->
%% delivery annotations not currently used
Acc;
(#'v1_0.footer'{}, Acc) ->
%% footer not currently used
Acc;
(undefined, Acc) ->
Acc;
(BodySection, Acc) ->
Body = element(5, Acc),
setelement(5, Acc, [BodySection | Body])
end, {undefined, undefined, undefined, undefined, []},
Sections),
{H, MAnn, Prop, AProp, BodyRev, Footer} =
lists:foldl(
fun(#'v1_0.header'{} = S, Acc) ->
setelement(1, Acc, S);
(#'v1_0.message_annotations'{} = S, Acc) ->
setelement(2, Acc, S);
(#'v1_0.properties'{} = S, Acc) ->
setelement(3, Acc, S);
(#'v1_0.application_properties'{} = S, Acc) ->
setelement(4, Acc, S);
(#'v1_0.delivery_annotations'{}, Acc) ->
%% delivery annotations not currently used
Acc;
(#'v1_0.footer'{} = S, Acc) ->
setelement(6, Acc, S);
(undefined, Acc) ->
Acc;
(BodySection, Acc) ->
Body = element(5, Acc),
setelement(5, Acc, [BodySection | Body])
end,
{undefined, undefined, undefined, undefined, [], undefined},
Sections),

{PayloadRev, Type0} =
case BodyRev of
Expand Down Expand Up @@ -188,7 +188,14 @@ convert_from(mc_amqp, Sections, Env) ->
MAnnBin = iolist_to_binary(amqp10_framing:encode_bin(MAnn)),
[{?AMQP10_MESSAGE_ANNOTATIONS_HEADER, longstr, MAnnBin} | Headers4]
end,
Headers5;
Headers6 = case Footer of
undefined ->
Headers5;
#'v1_0.footer'{} ->
FootBin = iolist_to_binary(amqp10_framing:encode_bin(Footer)),
[{?AMQP10_FOOTER, longstr, FootBin} | Headers5]
end,
Headers6;
_ ->
Headers2
end,
Expand Down Expand Up @@ -405,16 +412,21 @@ convert_to(mc_amqp, #content{payload_fragments_rev = Payload} = Content, Env) ->
Section ->
Section
end,

BodySections = case Type of
?AMQP10_TYPE ->
amqp10_framing:decode_bin(
iolist_to_binary(lists:reverse(Payload)));
_ ->
[#'v1_0.data'{content = lists:reverse(Payload)}]
end,
Tail = case amqp10_section_header(?AMQP10_FOOTER, Headers) of
undefined ->
BodySections;
#'v1_0.footer'{} = Footer ->
BodySections ++ [Footer]
end,

Sections = [H, MA, P, AP | BodySections],
Sections = [H, MA, P, AP | Tail],
mc_amqp:convert_from(mc_amqp, Sections, Env);
convert_to(_TargetProto, _Content, _Env) ->
not_implemented.
Expand Down

0 comments on commit 2e15704

Please sign in to comment.