diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_basic.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 6 |
3 files changed, 20 insertions, 3 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index d474e9cad3..22ceefb85f 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -23,6 +23,7 @@ extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4, header_routes/1, parse_expiration/1, header/2, header/3]). -export([build_content/2, from_content/1, msg_size/1, maybe_gc_large_msg/1]). +-export([add_header/4]). %%---------------------------------------------------------------------------- @@ -300,3 +301,12 @@ maybe_gc_large_msg(Content) -> msg_size(Content) -> rabbit_writer:msg_size(Content). + +add_header(Name, Type, Value, #basic_message{content = Content0} = Msg) -> + Content = rabbit_basic:map_headers( + fun(undefined) -> + rabbit_misc:set_table_value([], Name, Type, Value); + (Headers) -> + rabbit_misc:set_table_value(Headers, Name, Type, Value) + end, Content0), + Msg#basic_message{content = Content}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 1b74b655f5..2671a1b76e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -649,8 +649,9 @@ handle_info({ra_event, {Name, _} = From, _} = Evt, State = lists:foldl( fun({MsgId, {MsgHeader, Msg}}, Acc) -> IsDelivered = maps:is_key(delivery_count, MsgHeader), + Msg1 = add_delivery_count_header(MsgHeader, Msg), handle_deliver(CTag, AckRequired, - {QName, From, MsgId, IsDelivered, Msg}, + {QName, From, MsgId, IsDelivered, Msg1}, Acc) end, State0#ch{queue_states = maps:put(Name, QState2, QueueStates)}, Msgs), noreply(State); @@ -2488,3 +2489,7 @@ maybe_monitor(_, QMons) -> maybe_monitor_all([], S) -> S; %% optimisation maybe_monitor_all([Item], S) -> maybe_monitor(Item, S); %% optimisation maybe_monitor_all(Items, S) -> lists:foldl(fun maybe_monitor/2, S, Items). + +add_delivery_count_header(MsgHeader, Msg) -> + Count = maps:get(delivery_count, MsgHeader, 0), + rabbit_basic:add_header(<<"x-redelivery-count">>, long, Count, Msg). diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index e24b907600..ab28fadf53 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -330,8 +330,10 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck, case rabbit_fifo_client:dequeue(CTag, Settlement, QState0) of {ok, empty, QState} -> {ok, empty, QState}; - {ok, {MsgId, {MsgHeader, Msg}}, QState} -> - IsDelivered = maps:is_key(delivery_count, MsgHeader), + {ok, {MsgId, {MsgHeader, Msg0}}, QState} -> + Count = maps:get(delivery_count, MsgHeader, 0), + IsDelivered = Count > 0, + Msg = rabbit_basic:add_header(<<"x-redelivery-count">>, long, Count, Msg0), {ok, quorum_messages(Name), {QName, Id, MsgId, IsDelivered, Msg}, QState}; {timeout, _} -> {error, timeout} |
