summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2021-10-13 10:27:00 +0100
committerKarl Nilsson <kjnilsson@gmail.com>2021-10-13 10:27:00 +0100
commit5520c6cafe715e324a810e715d00c8b61ed42190 (patch)
treeed1be0a9897977688f4cbb30e47d581d5562e052
parent49a47586b075c19e5af521a3f1d333ed8fab4654 (diff)
downloadrabbitmq-server-git-stream-queue-handle-unsupported-header-values.tar.gz
Stream queue: handle unsupported header value typesstream-queue-handle-unsupported-header-values
As AMQP 0.9.1 headers are translated into AMQP 1.0 application properties they are not able to contain complex values such as arrays or tables. RabbitMQ federation does use array and table values so to avoid crashing when delivering a federated message to a stream queue we drop them. These header values should be considered internal however so dropping them before a final queue deliver should not be a huge problem.
-rw-r--r--deps/rabbit/src/rabbit_msg_record.erl9
-rw-r--r--deps/rabbit/test/rabbit_msg_record_SUITE.erl17
2 files changed, 25 insertions, 1 deletions
diff --git a/deps/rabbit/src/rabbit_msg_record.erl b/deps/rabbit/src/rabbit_msg_record.erl
index 247a9d8677..7520788d28 100644
--- a/deps/rabbit/src/rabbit_msg_record.erl
+++ b/deps/rabbit/src/rabbit_msg_record.erl
@@ -195,7 +195,7 @@ from_amqp091(#'P_basic'{message_id = MsgId,
<- case Headers of
undefined -> [];
_ -> Headers
- end],
+ end, not unsupported_header_value_type(T)],
%% properties that do not map directly to AMQP 1.0 properties are stored
%% in application properties
APC = map_add(utf8, <<"x-basic-type">>, utf8, Type,
@@ -395,6 +395,13 @@ message_id({utf8, S}, HKey, H0) ->
message_id(MsgId, _, H) ->
{H, unwrap(MsgId)}.
+ unsupported_header_value_type(array) ->
+ true;
+ unsupported_header_value_type(table) ->
+ true;
+ unsupported_header_value_type(_) ->
+ false.
+
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
diff --git a/deps/rabbit/test/rabbit_msg_record_SUITE.erl b/deps/rabbit/test/rabbit_msg_record_SUITE.erl
index 0c86ce646e..7a8447ed02 100644
--- a/deps/rabbit/test/rabbit_msg_record_SUITE.erl
+++ b/deps/rabbit/test/rabbit_msg_record_SUITE.erl
@@ -24,6 +24,7 @@ all() ->
all_tests() ->
[
ampq091_roundtrip,
+ unsupported_091_header_is_dropped,
message_id_ulong,
message_id_uuid,
message_id_binary,
@@ -90,6 +91,22 @@ ampq091_roundtrip(_Config) ->
test_amqp091_roundtrip(#'P_basic'{}, Payload),
ok.
+unsupported_091_header_is_dropped(_Config) ->
+ Props = #'P_basic'{
+ headers = [
+ {<<"x-received-from">>, array, []}
+ ]
+ },
+ MsgRecord0 = rabbit_msg_record:from_amqp091(Props, <<"payload">>),
+ MsgRecord = rabbit_msg_record:init(
+ iolist_to_binary(rabbit_msg_record:to_iodata(MsgRecord0))),
+ % meck:unload(),
+ {PropsOut, <<"payload">>} = rabbit_msg_record:to_amqp091(MsgRecord),
+
+ ?assertMatch(#'P_basic'{headers = undefined}, PropsOut),
+
+ ok.
+
message_id_ulong(_Config) ->
Num = 9876789,
ULong = erlang:integer_to_binary(Num),