summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_basic.erl10
-rw-r--r--src/rabbit_channel.erl7
-rw-r--r--src/rabbit_quorum_queue.erl6
-rw-r--r--test/quorum_queue_SUITE.erl97
4 files changed, 116 insertions, 4 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index d474e9cad3..22ceefb85f 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -23,6 +23,7 @@
extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4,
header_routes/1, parse_expiration/1, header/2, header/3]).
-export([build_content/2, from_content/1, msg_size/1, maybe_gc_large_msg/1]).
+-export([add_header/4]).
%%----------------------------------------------------------------------------
@@ -300,3 +301,12 @@ maybe_gc_large_msg(Content) ->
msg_size(Content) ->
rabbit_writer:msg_size(Content).
+
+add_header(Name, Type, Value, #basic_message{content = Content0} = Msg) ->
+ Content = rabbit_basic:map_headers(
+ fun(undefined) ->
+ rabbit_misc:set_table_value([], Name, Type, Value);
+ (Headers) ->
+ rabbit_misc:set_table_value(Headers, Name, Type, Value)
+ end, Content0),
+ Msg#basic_message{content = Content}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 1b74b655f5..2671a1b76e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -649,8 +649,9 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
State = lists:foldl(
fun({MsgId, {MsgHeader, Msg}}, Acc) ->
IsDelivered = maps:is_key(delivery_count, MsgHeader),
+ Msg1 = add_delivery_count_header(MsgHeader, Msg),
handle_deliver(CTag, AckRequired,
- {QName, From, MsgId, IsDelivered, Msg},
+ {QName, From, MsgId, IsDelivered, Msg1},
Acc)
end, State0#ch{queue_states = maps:put(Name, QState2, QueueStates)}, Msgs),
noreply(State);
@@ -2488,3 +2489,7 @@ maybe_monitor(_, QMons) ->
maybe_monitor_all([], S) -> S; %% optimisation
maybe_monitor_all([Item], S) -> maybe_monitor(Item, S); %% optimisation
maybe_monitor_all(Items, S) -> lists:foldl(fun maybe_monitor/2, S, Items).
+
+add_delivery_count_header(MsgHeader, Msg) ->
+ Count = maps:get(delivery_count, MsgHeader, 0),
+ rabbit_basic:add_header(<<"x-redelivery-count">>, long, Count, Msg).
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index e24b907600..ab28fadf53 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -330,8 +330,10 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck,
case rabbit_fifo_client:dequeue(CTag, Settlement, QState0) of
{ok, empty, QState} ->
{ok, empty, QState};
- {ok, {MsgId, {MsgHeader, Msg}}, QState} ->
- IsDelivered = maps:is_key(delivery_count, MsgHeader),
+ {ok, {MsgId, {MsgHeader, Msg0}}, QState} ->
+ Count = maps:get(delivery_count, MsgHeader, 0),
+ IsDelivered = Count > 0,
+ Msg = rabbit_basic:add_header(<<"x-redelivery-count">>, long, Count, Msg0),
{ok, quorum_messages(Name), {QName, Id, MsgId, IsDelivered, Msg}, QState};
{timeout, _} ->
{error, timeout}
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 19f3a66a1d..4a1fb08dce 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -108,7 +108,9 @@ all_tests() ->
cancel_sync_queue,
basic_recover,
idempotent_recover,
- vhost_with_quorum_queue_is_deleted
+ vhost_with_quorum_queue_is_deleted,
+ consume_redelivery_count,
+ subscribe_redelivery_count
].
%% -------------------------------------------------------------------
@@ -1633,6 +1635,99 @@ basic_recover(Config) ->
amqp_channel:cast(Ch, #'basic.recover'{requeue = true}),
wait_for_messages_ready(Servers, RaName, 1),
wait_for_messages_pending_ack(Servers, RaName, 0).
+
+subscribe_redelivery_count(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch, QQ, false),
+
+ DTag = <<"x-redelivery-count">>,
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false},
+ #amqp_msg{props = #'P_basic'{headers = H0}}} ->
+ ?assertMatch({DTag, _, 0}, rabbit_basic:header(DTag, H0)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true})
+ end,
+
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag1,
+ redelivered = true},
+ #amqp_msg{props = #'P_basic'{headers = H1}}} ->
+ ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
+ multiple = false,
+ requeue = true})
+ end,
+
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag2,
+ redelivered = true},
+ #amqp_msg{props = #'P_basic'{headers = H2}}} ->
+ ?assertMatch({DTag, _, 2}, rabbit_basic:header(DTag, H2)),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag2,
+ multiple = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0)
+ end.
+
+consume_redelivery_count(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+
+ DTag = <<"x-redelivery-count">>,
+
+ {#'basic.get_ok'{delivery_tag = DeliveryTag,
+ redelivered = false},
+ #amqp_msg{props = #'P_basic'{headers = H0}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = false}),
+ ?assertMatch({DTag, _, 0}, rabbit_basic:header(DTag, H0)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true}),
+
+ {#'basic.get_ok'{delivery_tag = DeliveryTag1,
+ redelivered = true},
+ #amqp_msg{props = #'P_basic'{headers = H1}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = false}),
+ ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
+ multiple = false,
+ requeue = true}),
+
+ {#'basic.get_ok'{delivery_tag = DeliveryTag2,
+ redelivered = true},
+ #amqp_msg{props = #'P_basic'{headers = H2}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = false}),
+ ?assertMatch({DTag, _, 2}, rabbit_basic:header(DTag, H2)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag2,
+ multiple = false,
+ requeue = true}).
+
+
%%----------------------------------------------------------------------------
declare(Ch, Q) ->