summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl7
-rw-r--r--src/rabbit_fifo.erl199
-rw-r--r--src/rabbit_fifo.hrl6
-rw-r--r--src/rabbit_quorum_queue.erl5
-rw-r--r--test/rabbit_fifo_SUITE.erl8
5 files changed, 155 insertions, 70 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 8b5fe91cc9..2148a435b6 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -762,7 +762,12 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
end,
State = lists:foldl(
fun({MsgId, {MsgHeader, Msg}}, Acc) ->
- IsDelivered = maps:is_key(delivery_count, MsgHeader),
+ IsDelivered = case MsgHeader of
+ #{delivery_count := _} ->
+ true;
+ _ ->
+ false
+ end,
Msg1 = add_delivery_count_header(MsgHeader, Msg),
handle_deliver(CTag, AckRequired,
{QName, From, MsgId, IsDelivered, Msg1},
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 062fb7eee1..37afefef7e 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -588,8 +588,6 @@ tick(_Ts, #?MODULE{cfg = #cfg{name = Name,
query_consumer_count(State), % Consumers
EnqueueBytes,
CheckoutBytes},
- %% TODO: call a handler that works out if any known nodes need to be
- %% purged and emit a command effect to append this to the log
[{mod_call, rabbit_quorum_queue,
handle_tick, [QName, Metrics, all_nodes(State)]}, {aux, emit}].
@@ -759,7 +757,8 @@ messages_ready(#?MODULE{messages = M,
prefix_msgs = {PreR, PreM},
returns = R}) ->
- %% TODO: optimise to avoid length/1 call
+ %% prefix messages will rarely have anything in them during normal
+ %% operations so length/1 is fine here
maps:size(M) + lqueue:len(R) + length(PreR) + length(PreM).
messages_total(#?MODULE{ra_indexes = I,
@@ -795,9 +794,9 @@ moving_average(Time, HalfLife, Next, Current) ->
Next * (1 - Weight) + Current * Weight.
num_checked_out(#?MODULE{consumers = Cons}) ->
- lists:foldl(fun (#consumer{checked_out = C}, Acc) ->
- maps:size(C) + Acc
- end, 0, maps:values(Cons)).
+ maps:fold(fun (_, #consumer{checked_out = C}, Acc) ->
+ maps:size(C) + Acc
+ end, 0, Cons).
cancel_consumer(ConsumerId,
#?MODULE{cfg = #cfg{consumer_strategy = competing}} = State,
@@ -949,13 +948,16 @@ drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) ->
end.
enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages,
- low_msg_num = LowMsgNum,
- next_msg_num = NextMsgNum} = State0) ->
- Header = #{size => message_size(RawMsg)},
+ low_msg_num = LowMsgNum,
+ next_msg_num = NextMsgNum} = State0) ->
+ %% the initial header is an integer only - it will get expanded to a map
+ %% when the next required key is added
+ Header = message_size(RawMsg),
{State1, Msg} =
case evaluate_memory_limit(Header, State0) of
true ->
- {State0, {RaftIdx, {Header, 'empty'}}}; % indexed message with header map
+ % indexed message with header map
+ {State0, {RaftIdx, {Header, 'empty'}}};
false ->
{add_in_memory_counts(Header, State0),
{RaftIdx, {Header, RawMsg}}} % indexed message with header map
@@ -975,17 +977,30 @@ append_to_master_index(RaftIdx,
incr_enqueue_count(#?MODULE{enqueue_count = C,
- cfg = #cfg{release_cursor_interval = C}} = State0) ->
- % this will trigger a dehydrated version of the state to be stored
- % at this raft index for potential future snapshot generation
+ cfg = #cfg{release_cursor_interval = {_Base, C}}
+ } = State0) ->
+ %% this will trigger a dehydrated version of the state to be stored
+ %% at this raft index for potential future snapshot generation
+ %% Q: Why don't we just stash the release cursor here?
+ %% A: Because it needs to be the very last thing we do and we
+ %% first needs to run the checkout logic.
State0#?MODULE{enqueue_count = 0};
+incr_enqueue_count(#?MODULE{cfg = #cfg{release_cursor_interval = C} = Cfg}
+ = State0)
+ when is_integer(C) ->
+ %% conversion to new release cursor interval format
+ State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}},
+ incr_enqueue_count(State);
incr_enqueue_count(#?MODULE{enqueue_count = C} = State) ->
State#?MODULE{enqueue_count = C + 1}.
maybe_store_dehydrated_state(RaftIdx,
- #?MODULE{ra_indexes = Indexes,
+ #?MODULE{cfg =
+ #cfg{release_cursor_interval = {Base, _}}
+ = Cfg,
+ ra_indexes = Indexes,
enqueue_count = 0,
- release_cursors = Cursors} = State) ->
+ release_cursors = Cursors0} = State) ->
case rabbit_fifo_index:exists(RaftIdx, Indexes) of
false ->
%% the incoming enqueue must already have been dropped
@@ -993,8 +1008,20 @@ maybe_store_dehydrated_state(RaftIdx,
true ->
Dehydrated = dehydrate_state(State),
Cursor = {release_cursor, RaftIdx, Dehydrated},
- State#?MODULE{release_cursors = lqueue:in(Cursor, Cursors)}
+ Cursors = lqueue:in(Cursor, Cursors0),
+ Interval = lqueue:len(Cursors) * Base,
+ State#?MODULE{release_cursors = Cursors,
+ cfg = Cfg#cfg{release_cursor_interval =
+ {Base, Interval}}}
end;
+maybe_store_dehydrated_state(RaftIdx,
+ #?MODULE{cfg =
+ #cfg{release_cursor_interval = C} = Cfg}
+ = State0)
+ when is_integer(C) ->
+ %% convert to new format
+ State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}},
+ maybe_store_dehydrated_state(RaftIdx, State);
maybe_store_dehydrated_state(_RaftIdx, State) ->
State.
@@ -1065,15 +1092,17 @@ complete(ConsumerId, Discarded,
#consumer{checked_out = Checked} = Con0, Effects0,
#?MODULE{consumers = Cons0, service_queue = SQ0,
ra_indexes = Indexes0} = State0) ->
+ %% TODO optimise use of Discarded map here
MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)],
%% credit_mode = simple_prefetch should automatically top-up credit
%% as messages are simple_prefetch or otherwise returned
Con = Con0#consumer{checked_out = maps:without(maps:keys(Discarded), Checked),
- credit = increase_credit(Con0, maps:size(Discarded))},
+ credit = increase_credit(Con0, map_size(Discarded))},
{Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0,
MsgRaftIdxs),
+ %% TODO: use maps:fold instead
State1 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) ->
add_bytes_settle(Header, Acc);
({'$prefix_msg', Header}, Acc) ->
@@ -1162,14 +1191,21 @@ find_next_cursor(Smallest, Cursors0, Potential) ->
{Potential, Cursors0}
end.
+update_header(Key, UpdateFun, Default, Header)
+ when is_integer(Header) ->
+ update_header(Key, UpdateFun, Default, #{size => Header});
+update_header(Key, UpdateFun, Default, Header) ->
+ maps:update_with(Key, UpdateFun, Default, Header).
+
+
return_one(MsgId, 0, {Tag, Header0},
#?MODULE{returns = Returns,
consumers = Consumers,
cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
- Effects0, ConsumerId) when Tag == '$prefix_msg'; Tag == '$empty_msg' ->
+ Effects0, ConsumerId)
+ when Tag == '$prefix_msg'; Tag == '$empty_msg' ->
#consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers),
- Header = maps:update_with(delivery_count, fun (C) -> C+1 end,
- 1, Header0),
+ Header = update_header(delivery_count, fun (C) -> C+1 end, 1, Header0),
Msg0 = {Tag, Header},
case maps:get(delivery_count, Header) of
DeliveryCount when DeliveryCount > DeliveryLimit ->
@@ -1198,8 +1234,7 @@ return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}},
cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
Effects0, ConsumerId) ->
#consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers),
- Header = maps:update_with(delivery_count, fun (C) -> C+1 end,
- 1, Header0),
+ Header = update_header(delivery_count, fun (C) -> C+1 end, 1, Header0),
Msg0 = {RaftId, {Header, RawMsg}},
case maps:get(delivery_count, Header) of
DeliveryCount when DeliveryCount > DeliveryLimit ->
@@ -1211,13 +1246,15 @@ return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}},
Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)},
%% this should not affect the release cursor in any way
{Msg, State1} = case RawMsg of
- 'empty' -> {Msg0, State0};
- _ -> case evaluate_memory_limit(Header, State0) of
- true ->
- {{RaftId, {Header, 'empty'}}, State0};
- false ->
- {Msg0, add_in_memory_counts(Header, State0)}
- end
+ 'empty' ->
+ {Msg0, State0};
+ _ ->
+ case evaluate_memory_limit(Header, State0) of
+ true ->
+ {{RaftId, {Header, 'empty'}}, State0};
+ false ->
+ {Msg0, add_in_memory_counts(Header, State0)}
+ end
end,
{add_bytes_return(
Header,
@@ -1289,13 +1326,18 @@ evaluate_limit(Result, State0, Effects0) ->
{State0, Result, Effects0}
end.
-evaluate_memory_limit(_Header, #?MODULE{cfg = #cfg{max_in_memory_length = undefined,
- max_in_memory_bytes = undefined}}) ->
+evaluate_memory_limit(_Header,
+ #?MODULE{cfg = #cfg{max_in_memory_length = undefined,
+ max_in_memory_bytes = undefined}}) ->
false;
-evaluate_memory_limit(#{size := Size}, #?MODULE{cfg = #cfg{max_in_memory_length = MaxLength,
- max_in_memory_bytes = MaxBytes},
- msg_bytes_in_memory = Bytes,
- msgs_ready_in_memory = Length}) ->
+evaluate_memory_limit(#{size := Size}, State) ->
+ evaluate_memory_limit(Size, State);
+evaluate_memory_limit(Size,
+ #?MODULE{cfg = #cfg{max_in_memory_length = MaxLength,
+ max_in_memory_bytes = MaxBytes},
+ msg_bytes_in_memory = Bytes,
+ msgs_ready_in_memory = Length})
+ when is_integer(Size) ->
(Length >= MaxLength) orelse ((Bytes + Size) > MaxBytes).
append_send_msg_effects(Effects, AccMap) when map_size(AccMap) == 0 ->
@@ -1643,49 +1685,82 @@ make_purge_nodes(Nodes) ->
make_update_config(Config) ->
#update_config{config = Config}.
-add_bytes_enqueue(#{size := Bytes}, #?MODULE{msg_bytes_enqueue = Enqueue} = State) ->
- State#?MODULE{msg_bytes_enqueue = Enqueue + Bytes}.
-
-add_bytes_drop(#{size := Bytes}, #?MODULE{msg_bytes_enqueue = Enqueue} = State) ->
- State#?MODULE{msg_bytes_enqueue = Enqueue - Bytes}.
-
-add_bytes_checkout(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout,
- msg_bytes_enqueue = Enqueue } = State) ->
+add_bytes_enqueue(Bytes,
+ #?MODULE{msg_bytes_enqueue = Enqueue} = State)
+ when is_integer(Bytes) ->
+ State#?MODULE{msg_bytes_enqueue = Enqueue + Bytes};
+add_bytes_enqueue(#{size := Bytes}, State) ->
+ add_bytes_enqueue(Bytes, State).
+
+add_bytes_drop(Bytes,
+ #?MODULE{msg_bytes_enqueue = Enqueue} = State)
+ when is_integer(Bytes) ->
+ State#?MODULE{msg_bytes_enqueue = Enqueue - Bytes};
+add_bytes_drop(#{size := Bytes}, State) ->
+ add_bytes_drop(Bytes, State).
+
+add_bytes_checkout(Bytes,
+ #?MODULE{msg_bytes_checkout = Checkout,
+ msg_bytes_enqueue = Enqueue } = State)
+ when is_integer(Bytes) ->
State#?MODULE{msg_bytes_checkout = Checkout + Bytes,
- msg_bytes_enqueue = Enqueue - Bytes}.
-
-add_bytes_settle(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout} = State) ->
- State#?MODULE{msg_bytes_checkout = Checkout - Bytes}.
-
-add_bytes_return(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout,
- msg_bytes_enqueue = Enqueue} = State) ->
+ msg_bytes_enqueue = Enqueue - Bytes};
+add_bytes_checkout(#{size := Bytes}, State) ->
+ add_bytes_checkout(Bytes, State).
+
+add_bytes_settle(Bytes,
+ #?MODULE{msg_bytes_checkout = Checkout} = State)
+ when is_integer(Bytes) ->
+ State#?MODULE{msg_bytes_checkout = Checkout - Bytes};
+add_bytes_settle(#{size := Bytes}, State) ->
+ add_bytes_settle(Bytes, State).
+
+add_bytes_return(Bytes,
+ #?MODULE{msg_bytes_checkout = Checkout,
+ msg_bytes_enqueue = Enqueue} = State)
+ when is_integer(Bytes) ->
State#?MODULE{msg_bytes_checkout = Checkout - Bytes,
- msg_bytes_enqueue = Enqueue + Bytes}.
-
-add_in_memory_counts(#{size := Bytes}, #?MODULE{msg_bytes_in_memory = InMemoryBytes,
- msgs_ready_in_memory = InMemoryCount} = State) ->
+ msg_bytes_enqueue = Enqueue + Bytes};
+add_bytes_return(#{size := Bytes}, State) ->
+ add_bytes_return(Bytes, State).
+
+add_in_memory_counts(Bytes,
+ #?MODULE{msg_bytes_in_memory = InMemoryBytes,
+ msgs_ready_in_memory = InMemoryCount} = State)
+ when is_integer(Bytes) ->
State#?MODULE{msg_bytes_in_memory = InMemoryBytes + Bytes,
- msgs_ready_in_memory = InMemoryCount + 1}.
+ msgs_ready_in_memory = InMemoryCount + 1};
+add_in_memory_counts(#{size := Bytes}, State) ->
+ add_in_memory_counts(Bytes, State).
-subtract_in_memory_counts(#{size := Bytes},
+subtract_in_memory_counts(Bytes,
#?MODULE{msg_bytes_in_memory = InMemoryBytes,
- msgs_ready_in_memory = InMemoryCount} = State) ->
+ msgs_ready_in_memory = InMemoryCount} = State)
+ when is_integer(Bytes) ->
State#?MODULE{msg_bytes_in_memory = InMemoryBytes - Bytes,
- msgs_ready_in_memory = InMemoryCount - 1}.
+ msgs_ready_in_memory = InMemoryCount - 1};
+subtract_in_memory_counts(#{size := Bytes}, State) ->
+ subtract_in_memory_counts(Bytes, State).
message_size(#basic_message{content = Content}) ->
#content{payload_fragments_rev = PFR} = Content,
iolist_size(PFR);
-message_size({'$prefix_msg', #{size := B}}) ->
- B;
-message_size({'$empty_msg', #{size := B}}) ->
- B;
+message_size({'$prefix_msg', H}) ->
+ get_size_from_header(H);
+message_size({'$empty_msg', H}) ->
+ get_size_from_header(H);
message_size(B) when is_binary(B) ->
byte_size(B);
message_size(Msg) ->
%% probably only hit this for testing so ok to use erts_debug
erts_debug:size(Msg).
+get_size_from_header(Size) when is_integer(Size) ->
+ Size;
+get_size_from_header(#{size := B}) ->
+ B.
+
+
all_nodes(#?MODULE{consumers = Cons0,
enqueuers = Enqs0,
waiting_consumers = WaitingConsumers0}) ->
diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl
index 0e9de0fb10..0d1d5ed2d1 100644
--- a/src/rabbit_fifo.hrl
+++ b/src/rabbit_fifo.hrl
@@ -18,11 +18,13 @@
%% in enqueue messages. Used to ensure ordering of messages send from the
%% same process
--type msg_header() :: #{size := msg_size(),
+-type msg_header() :: msg_size() |
+ #{size := msg_size(),
delivery_count => non_neg_integer()}.
-%% The message header map:
+%% The message header:
%% delivery_count: the number of unsuccessful delivery attempts.
%% A non-zero value indicates a previous attempt.
+%% If it only contains the size it can be condensed to an integer only
-type msg() :: {msg_header(), raw_msg()}.
%% message with a header map.
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 4f8c129291..b52678605b 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -473,7 +473,10 @@ basic_get(Q, NoAck, CTag0, QState0) when ?amqqueue_is_quorum(Q) ->
{ok, empty, QState} ->
{ok, empty, QState};
{ok, {{MsgId, {MsgHeader, Msg0}}, MsgsReady}, QState} ->
- Count = maps:get(delivery_count, MsgHeader, 0),
+ Count = case MsgHeader of
+ #{delivery_count := C} -> C;
+ _ -> 0
+ end,
IsDelivered = Count > 0,
Msg = rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg0),
{ok, MsgsReady, {QName, Id, MsgId, IsDelivered, Msg}, QState};
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 0a0ac94e63..0d9acfa1fa 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -444,12 +444,12 @@ discarded_message_without_dead_letter_handler_is_removed_test(_) ->
{State0, [_, _]} = enq(1, 1, first, test_init(test)),
{State1, Effects1} = check_n(Cid, 2, 10, State0),
?ASSERT_EFF({send_msg, _,
- {delivery, _, [{0, {#{}, first}}]}, _},
+ {delivery, _, [{0, {_, first}}]}, _},
Effects1),
{_State2, _, Effects2} = apply(meta(1),
rabbit_fifo:make_discard(Cid, [0]), State1),
?assertNoEffect({send_msg, _,
- {delivery, _, [{0, {#{}, first}}]}, _},
+ {delivery, _, [{0, {_, first}}]}, _},
Effects2),
ok.
@@ -462,7 +462,7 @@ discarded_message_with_dead_letter_handler_emits_mod_call_effect_test(_) ->
{State0, [_, _]} = enq(1, 1, first, State00),
{State1, Effects1} = check_n(Cid, 2, 10, State0),
?ASSERT_EFF({send_msg, _,
- {delivery, _, [{0, {#{}, first}}]}, _},
+ {delivery, _, [{0, {_, first}}]}, _},
Effects1),
{_State2, _, Effects2} = apply(meta(1), rabbit_fifo:make_discard(Cid, [0]), State1),
% assert mod call effect with appended reason and message
@@ -502,7 +502,7 @@ delivery_query_returns_deliveries_test(_) ->
Entries = lists:zip(Indexes, Commands),
{State, _Effects} = run_log(test_init(help), Entries),
% 3 deliveries are returned
- [{0, {#{}, one}}] = rabbit_fifo:get_checked_out(Cid, 0, 0, State),
+ [{0, {_, one}}] = rabbit_fifo:get_checked_out(Cid, 0, 0, State),
[_, _, _] = rabbit_fifo:get_checked_out(Cid, 1, 3, State),
ok.