diff options
| -rw-r--r-- | src/rabbit_basic.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 6 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 97 |
4 files changed, 116 insertions, 4 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index d474e9cad3..22ceefb85f 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -23,6 +23,7 @@ extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4, header_routes/1, parse_expiration/1, header/2, header/3]). -export([build_content/2, from_content/1, msg_size/1, maybe_gc_large_msg/1]). +-export([add_header/4]). %%---------------------------------------------------------------------------- @@ -300,3 +301,12 @@ maybe_gc_large_msg(Content) -> msg_size(Content) -> rabbit_writer:msg_size(Content). + +add_header(Name, Type, Value, #basic_message{content = Content0} = Msg) -> + Content = rabbit_basic:map_headers( + fun(undefined) -> + rabbit_misc:set_table_value([], Name, Type, Value); + (Headers) -> + rabbit_misc:set_table_value(Headers, Name, Type, Value) + end, Content0), + Msg#basic_message{content = Content}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 1b74b655f5..bc273bf100 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -649,8 +649,9 @@ handle_info({ra_event, {Name, _} = From, _} = Evt, State = lists:foldl( fun({MsgId, {MsgHeader, Msg}}, Acc) -> IsDelivered = maps:is_key(delivery_count, MsgHeader), + Msg1 = add_delivery_count_header(MsgHeader, Msg), handle_deliver(CTag, AckRequired, - {QName, From, MsgId, IsDelivered, Msg}, + {QName, From, MsgId, IsDelivered, Msg1}, Acc) end, State0#ch{queue_states = maps:put(Name, QState2, QueueStates)}, Msgs), noreply(State); @@ -2488,3 +2489,7 @@ maybe_monitor(_, QMons) -> maybe_monitor_all([], S) -> S; %% optimisation maybe_monitor_all([Item], S) -> maybe_monitor(Item, S); %% optimisation maybe_monitor_all(Items, S) -> lists:foldl(fun maybe_monitor/2, S, Items). + +add_delivery_count_header(MsgHeader, Msg) -> + Count = maps:get(delivery_count, MsgHeader, 0), + rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg). diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 684f9d91d4..57053081f1 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -335,8 +335,10 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck, case rabbit_fifo_client:dequeue(CTag, Settlement, QState0) of {ok, empty, QState} -> {ok, empty, QState}; - {ok, {MsgId, {MsgHeader, Msg}}, QState} -> - IsDelivered = maps:is_key(delivery_count, MsgHeader), + {ok, {MsgId, {MsgHeader, Msg0}}, QState} -> + Count = maps:get(delivery_count, MsgHeader, 0), + IsDelivered = Count > 0, + Msg = rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg0), {ok, quorum_messages(Name), {QName, Id, MsgId, IsDelivered, Msg}, QState}; {timeout, _} -> {error, timeout} diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 26357c3fee..88c79acf79 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -113,7 +113,9 @@ all_tests() -> cancel_sync_queue, basic_recover, idempotent_recover, - vhost_with_quorum_queue_is_deleted + vhost_with_quorum_queue_is_deleted, + consume_redelivery_count, + subscribe_redelivery_count ]. %% ------------------------------------------------------------------- @@ -1820,6 +1822,99 @@ basic_recover(Config) -> amqp_channel:cast(Ch, #'basic.recover'{requeue = true}), wait_for_messages_ready(Servers, RaName, 1), wait_for_messages_pending_ack(Servers, RaName, 0). + +subscribe_redelivery_count(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(Ch, QQ, false), + + DTag = <<"x-delivery-count">>, + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, + #amqp_msg{props = #'P_basic'{headers = H0}}} -> + ?assertMatch({DTag, _, 0}, rabbit_basic:header(DTag, H0)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}) + end, + + receive + {#'basic.deliver'{delivery_tag = DeliveryTag1, + redelivered = true}, + #amqp_msg{props = #'P_basic'{headers = H1}}} -> + ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1, + multiple = false, + requeue = true}) + end, + + receive + {#'basic.deliver'{delivery_tag = DeliveryTag2, + redelivered = true}, + #amqp_msg{props = #'P_basic'{headers = H2}}} -> + ?assertMatch({DTag, _, 2}, rabbit_basic:header(DTag, H2)), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag2, + multiple = false}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0) + end. + +consume_redelivery_count(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + + DTag = <<"x-delivery-count">>, + + {#'basic.get_ok'{delivery_tag = DeliveryTag, + redelivered = false}, + #amqp_msg{props = #'P_basic'{headers = H0}}} = + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = false}), + ?assertMatch({DTag, _, 0}, rabbit_basic:header(DTag, H0)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}), + + {#'basic.get_ok'{delivery_tag = DeliveryTag1, + redelivered = true}, + #amqp_msg{props = #'P_basic'{headers = H1}}} = + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = false}), + ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1, + multiple = false, + requeue = true}), + + {#'basic.get_ok'{delivery_tag = DeliveryTag2, + redelivered = true}, + #amqp_msg{props = #'P_basic'{headers = H2}}} = + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = false}), + ?assertMatch({DTag, _, 2}, rabbit_basic:header(DTag, H2)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag2, + multiple = false, + requeue = true}). + + %%---------------------------------------------------------------------------- declare(Ch, Q) -> |
