summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl7
-rw-r--r--src/rabbit_fifo.erl236
-rw-r--r--src/rabbit_fifo.hrl10
-rw-r--r--src/rabbit_quorum_queue.erl5
4 files changed, 181 insertions, 77 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 8b5fe91cc9..2148a435b6 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -762,7 +762,12 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
end,
State = lists:foldl(
fun({MsgId, {MsgHeader, Msg}}, Acc) ->
- IsDelivered = maps:is_key(delivery_count, MsgHeader),
+ IsDelivered = case MsgHeader of
+ #{delivery_count := _} ->
+ true;
+ _ ->
+ false
+ end,
Msg1 = add_delivery_count_header(MsgHeader, Msg),
handle_deliver(CTag, AckRequired,
{QName, From, MsgId, IsDelivered, Msg1},
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 062fb7eee1..891a6827dc 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -143,7 +143,14 @@ update_config(Conf, State) ->
competing
end,
Cfg = State#?MODULE.cfg,
- State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = SHI,
+ SHICur = case State#?MODULE.cfg of
+ #cfg{release_cursor_interval = {_, C}} ->
+ C;
+ #cfg{release_cursor_interval = C} ->
+ C
+ end,
+
+ State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {SHI, SHICur},
dead_letter_handler = DLH,
become_leader_handler = BLH,
max_length = MaxLength,
@@ -588,8 +595,6 @@ tick(_Ts, #?MODULE{cfg = #cfg{name = Name,
query_consumer_count(State), % Consumers
EnqueueBytes,
CheckoutBytes},
- %% TODO: call a handler that works out if any known nodes need to be
- %% purged and emit a command effect to append this to the log
[{mod_call, rabbit_quorum_queue,
handle_tick, [QName, Metrics, all_nodes(State)]}, {aux, emit}].
@@ -759,7 +764,8 @@ messages_ready(#?MODULE{messages = M,
prefix_msgs = {PreR, PreM},
returns = R}) ->
- %% TODO: optimise to avoid length/1 call
+ %% prefix messages will rarely have anything in them during normal
+ %% operations so length/1 is fine here
maps:size(M) + lqueue:len(R) + length(PreR) + length(PreM).
messages_total(#?MODULE{ra_indexes = I,
@@ -795,9 +801,9 @@ moving_average(Time, HalfLife, Next, Current) ->
Next * (1 - Weight) + Current * Weight.
num_checked_out(#?MODULE{consumers = Cons}) ->
- lists:foldl(fun (#consumer{checked_out = C}, Acc) ->
- maps:size(C) + Acc
- end, 0, maps:values(Cons)).
+ maps:fold(fun (_, #consumer{checked_out = C}, Acc) ->
+ maps:size(C) + Acc
+ end, 0, Cons).
cancel_consumer(ConsumerId,
#?MODULE{cfg = #cfg{consumer_strategy = competing}} = State,
@@ -949,13 +955,16 @@ drop_head(#?MODULE{ra_indexes = Indexes0} = State0, Effects0) ->
end.
enqueue(RaftIdx, RawMsg, #?MODULE{messages = Messages,
- low_msg_num = LowMsgNum,
- next_msg_num = NextMsgNum} = State0) ->
- Header = #{size => message_size(RawMsg)},
+ low_msg_num = LowMsgNum,
+ next_msg_num = NextMsgNum} = State0) ->
+ %% the initial header is an integer only - it will get expanded to a map
+ %% when the next required key is added
+ Header = message_size(RawMsg),
{State1, Msg} =
case evaluate_memory_limit(Header, State0) of
true ->
- {State0, {RaftIdx, {Header, 'empty'}}}; % indexed message with header map
+ % indexed message with header map
+ {State0, {RaftIdx, {Header, 'empty'}}};
false ->
{add_in_memory_counts(Header, State0),
{RaftIdx, {Header, RawMsg}}} % indexed message with header map
@@ -975,17 +984,30 @@ append_to_master_index(RaftIdx,
incr_enqueue_count(#?MODULE{enqueue_count = C,
- cfg = #cfg{release_cursor_interval = C}} = State0) ->
- % this will trigger a dehydrated version of the state to be stored
- % at this raft index for potential future snapshot generation
+ cfg = #cfg{release_cursor_interval = {_Base, C}}
+ } = State0) ->
+ %% this will trigger a dehydrated version of the state to be stored
+ %% at this raft index for potential future snapshot generation
+ %% Q: Why don't we just stash the release cursor here?
+ %% A: Because it needs to be the very last thing we do and we
+ %% first needs to run the checkout logic.
State0#?MODULE{enqueue_count = 0};
+incr_enqueue_count(#?MODULE{cfg = #cfg{release_cursor_interval = C} = Cfg}
+ = State0)
+ when is_integer(C) ->
+ %% conversion to new release cursor interval format
+ State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}},
+ incr_enqueue_count(State);
incr_enqueue_count(#?MODULE{enqueue_count = C} = State) ->
State#?MODULE{enqueue_count = C + 1}.
maybe_store_dehydrated_state(RaftIdx,
- #?MODULE{ra_indexes = Indexes,
+ #?MODULE{cfg =
+ #cfg{release_cursor_interval = {Base, _}}
+ = Cfg,
+ ra_indexes = Indexes,
enqueue_count = 0,
- release_cursors = Cursors} = State) ->
+ release_cursors = Cursors0} = State) ->
case rabbit_fifo_index:exists(RaftIdx, Indexes) of
false ->
%% the incoming enqueue must already have been dropped
@@ -993,8 +1015,20 @@ maybe_store_dehydrated_state(RaftIdx,
true ->
Dehydrated = dehydrate_state(State),
Cursor = {release_cursor, RaftIdx, Dehydrated},
- State#?MODULE{release_cursors = lqueue:in(Cursor, Cursors)}
+ Cursors = lqueue:in(Cursor, Cursors0),
+ Interval = lqueue:len(Cursors) * Base,
+ State#?MODULE{release_cursors = Cursors,
+ cfg = Cfg#cfg{release_cursor_interval =
+ {Base, Interval}}}
end;
+maybe_store_dehydrated_state(RaftIdx,
+ #?MODULE{cfg =
+ #cfg{release_cursor_interval = C} = Cfg}
+ = State0)
+ when is_integer(C) ->
+ %% convert to new format
+ State = State0#?MODULE{cfg = Cfg#cfg{release_cursor_interval = {C, C}}},
+ maybe_store_dehydrated_state(RaftIdx, State);
maybe_store_dehydrated_state(_RaftIdx, State) ->
State.
@@ -1065,15 +1099,17 @@ complete(ConsumerId, Discarded,
#consumer{checked_out = Checked} = Con0, Effects0,
#?MODULE{consumers = Cons0, service_queue = SQ0,
ra_indexes = Indexes0} = State0) ->
+ %% TODO optimise use of Discarded map here
MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)],
%% credit_mode = simple_prefetch should automatically top-up credit
%% as messages are simple_prefetch or otherwise returned
Con = Con0#consumer{checked_out = maps:without(maps:keys(Discarded), Checked),
- credit = increase_credit(Con0, maps:size(Discarded))},
+ credit = increase_credit(Con0, map_size(Discarded))},
{Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0,
MsgRaftIdxs),
+ %% TODO: use maps:fold instead
State1 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) ->
add_bytes_settle(Header, Acc);
({'$prefix_msg', Header}, Acc) ->
@@ -1162,14 +1198,21 @@ find_next_cursor(Smallest, Cursors0, Potential) ->
{Potential, Cursors0}
end.
+update_header(Key, UpdateFun, Default, Header)
+ when is_integer(Header) ->
+ update_header(Key, UpdateFun, Default, #{size => Header});
+update_header(Key, UpdateFun, Default, Header) ->
+ maps:update_with(Key, UpdateFun, Default, Header).
+
+
return_one(MsgId, 0, {Tag, Header0},
#?MODULE{returns = Returns,
consumers = Consumers,
cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
- Effects0, ConsumerId) when Tag == '$prefix_msg'; Tag == '$empty_msg' ->
+ Effects0, ConsumerId)
+ when Tag == '$prefix_msg'; Tag == '$empty_msg' ->
#consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers),
- Header = maps:update_with(delivery_count, fun (C) -> C+1 end,
- 1, Header0),
+ Header = update_header(delivery_count, fun (C) -> C+1 end, 1, Header0),
Msg0 = {Tag, Header},
case maps:get(delivery_count, Header) of
DeliveryCount when DeliveryCount > DeliveryLimit ->
@@ -1198,8 +1241,7 @@ return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}},
cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
Effects0, ConsumerId) ->
#consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers),
- Header = maps:update_with(delivery_count, fun (C) -> C+1 end,
- 1, Header0),
+ Header = update_header(delivery_count, fun (C) -> C+1 end, 1, Header0),
Msg0 = {RaftId, {Header, RawMsg}},
case maps:get(delivery_count, Header) of
DeliveryCount when DeliveryCount > DeliveryLimit ->
@@ -1211,13 +1253,15 @@ return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}},
Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)},
%% this should not affect the release cursor in any way
{Msg, State1} = case RawMsg of
- 'empty' -> {Msg0, State0};
- _ -> case evaluate_memory_limit(Header, State0) of
- true ->
- {{RaftId, {Header, 'empty'}}, State0};
- false ->
- {Msg0, add_in_memory_counts(Header, State0)}
- end
+ 'empty' ->
+ {Msg0, State0};
+ _ ->
+ case evaluate_memory_limit(Header, State0) of
+ true ->
+ {{RaftId, {Header, 'empty'}}, State0};
+ false ->
+ {Msg0, add_in_memory_counts(Header, State0)}
+ end
end,
{add_bytes_return(
Header,
@@ -1289,13 +1333,18 @@ evaluate_limit(Result, State0, Effects0) ->
{State0, Result, Effects0}
end.
-evaluate_memory_limit(_Header, #?MODULE{cfg = #cfg{max_in_memory_length = undefined,
- max_in_memory_bytes = undefined}}) ->
+evaluate_memory_limit(_Header,
+ #?MODULE{cfg = #cfg{max_in_memory_length = undefined,
+ max_in_memory_bytes = undefined}}) ->
false;
-evaluate_memory_limit(#{size := Size}, #?MODULE{cfg = #cfg{max_in_memory_length = MaxLength,
- max_in_memory_bytes = MaxBytes},
- msg_bytes_in_memory = Bytes,
- msgs_ready_in_memory = Length}) ->
+evaluate_memory_limit(#{size := Size}, State) ->
+ evaluate_memory_limit(Size, State);
+evaluate_memory_limit(Size,
+ #?MODULE{cfg = #cfg{max_in_memory_length = MaxLength,
+ max_in_memory_bytes = MaxBytes},
+ msg_bytes_in_memory = Bytes,
+ msgs_ready_in_memory = Length})
+ when is_integer(Size) ->
(Length >= MaxLength) orelse ((Bytes + Size) > MaxBytes).
append_send_msg_effects(Effects, AccMap) when map_size(AccMap) == 0 ->
@@ -1538,12 +1587,13 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
ServiceQueue0
end.
-
%% creates a dehydrated version of the current state to be cached and
%% potentially used to for a snaphot at a later point
dehydrate_state(#?MODULE{messages = Messages,
consumers = Consumers,
returns = Returns,
+ low_msg_num = Low,
+ next_msg_num = Next,
prefix_msgs = {PrefRet0, PrefMsg0},
waiting_consumers = Waiting0} = State) ->
%% TODO: optimise this function as far as possible
@@ -1558,13 +1608,10 @@ dehydrate_state(#?MODULE{messages = Messages,
end,
lists:reverse(PrefRet0),
lqueue:to_list(Returns)),
- PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {_, 'empty'} = Msg}}, Acc) ->
- [Msg | Acc];
- ({_, {_RaftIdx, {Header, _}}}, Acc) ->
- [Header | Acc]
- end,
- lists:reverse(PrefMsg0),
- lists:sort(maps:to_list(Messages))),
+ PrefMsgsSuff = dehydrate_messages(Low, Next - 1, Messages, []),
+ %% prefix messages are not populated in normal operation only after
+ %% recovering from a snapshot
+ PrefMsgs = PrefMsg0 ++ PrefMsgsSuff,
Waiting = [{Cid, dehydrate_consumer(C)} || {Cid, C} <- Waiting0],
State#?MODULE{messages = #{},
ra_indexes = rabbit_fifo_index:empty(),
@@ -1575,9 +1622,21 @@ dehydrate_state(#?MODULE{messages = Messages,
end, Consumers),
returns = lqueue:new(),
prefix_msgs = {lists:reverse(PrefRet),
- lists:reverse(PrefMsgs)},
+ PrefMsgs},
waiting_consumers = Waiting}.
+dehydrate_messages(Low, Next, _Msgs, Acc)
+ when Next < Low ->
+ Acc;
+dehydrate_messages(Low, Next, Msgs, Acc0) ->
+ Acc = case maps:get(Next, Msgs) of
+ {_RaftIdx, {_, 'empty'} = Msg} ->
+ [Msg | Acc0];
+ {_RaftIdx, {Header, _}} ->
+ [Header | Acc0]
+ end,
+ dehydrate_messages(Low, Next - 1, Msgs, Acc).
+
dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
Checked = maps:map(fun (_, {'$prefix_msg', _} = M) ->
M;
@@ -1643,49 +1702,82 @@ make_purge_nodes(Nodes) ->
make_update_config(Config) ->
#update_config{config = Config}.
-add_bytes_enqueue(#{size := Bytes}, #?MODULE{msg_bytes_enqueue = Enqueue} = State) ->
- State#?MODULE{msg_bytes_enqueue = Enqueue + Bytes}.
-
-add_bytes_drop(#{size := Bytes}, #?MODULE{msg_bytes_enqueue = Enqueue} = State) ->
- State#?MODULE{msg_bytes_enqueue = Enqueue - Bytes}.
-
-add_bytes_checkout(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout,
- msg_bytes_enqueue = Enqueue } = State) ->
+add_bytes_enqueue(Bytes,
+ #?MODULE{msg_bytes_enqueue = Enqueue} = State)
+ when is_integer(Bytes) ->
+ State#?MODULE{msg_bytes_enqueue = Enqueue + Bytes};
+add_bytes_enqueue(#{size := Bytes}, State) ->
+ add_bytes_enqueue(Bytes, State).
+
+add_bytes_drop(Bytes,
+ #?MODULE{msg_bytes_enqueue = Enqueue} = State)
+ when is_integer(Bytes) ->
+ State#?MODULE{msg_bytes_enqueue = Enqueue - Bytes};
+add_bytes_drop(#{size := Bytes}, State) ->
+ add_bytes_drop(Bytes, State).
+
+add_bytes_checkout(Bytes,
+ #?MODULE{msg_bytes_checkout = Checkout,
+ msg_bytes_enqueue = Enqueue } = State)
+ when is_integer(Bytes) ->
State#?MODULE{msg_bytes_checkout = Checkout + Bytes,
- msg_bytes_enqueue = Enqueue - Bytes}.
-
-add_bytes_settle(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout} = State) ->
- State#?MODULE{msg_bytes_checkout = Checkout - Bytes}.
-
-add_bytes_return(#{size := Bytes}, #?MODULE{msg_bytes_checkout = Checkout,
- msg_bytes_enqueue = Enqueue} = State) ->
+ msg_bytes_enqueue = Enqueue - Bytes};
+add_bytes_checkout(#{size := Bytes}, State) ->
+ add_bytes_checkout(Bytes, State).
+
+add_bytes_settle(Bytes,
+ #?MODULE{msg_bytes_checkout = Checkout} = State)
+ when is_integer(Bytes) ->
+ State#?MODULE{msg_bytes_checkout = Checkout - Bytes};
+add_bytes_settle(#{size := Bytes}, State) ->
+ add_bytes_settle(Bytes, State).
+
+add_bytes_return(Bytes,
+ #?MODULE{msg_bytes_checkout = Checkout,
+ msg_bytes_enqueue = Enqueue} = State)
+ when is_integer(Bytes) ->
State#?MODULE{msg_bytes_checkout = Checkout - Bytes,
- msg_bytes_enqueue = Enqueue + Bytes}.
-
-add_in_memory_counts(#{size := Bytes}, #?MODULE{msg_bytes_in_memory = InMemoryBytes,
- msgs_ready_in_memory = InMemoryCount} = State) ->
+ msg_bytes_enqueue = Enqueue + Bytes};
+add_bytes_return(#{size := Bytes}, State) ->
+ add_bytes_return(Bytes, State).
+
+add_in_memory_counts(Bytes,
+ #?MODULE{msg_bytes_in_memory = InMemoryBytes,
+ msgs_ready_in_memory = InMemoryCount} = State)
+ when is_integer(Bytes) ->
State#?MODULE{msg_bytes_in_memory = InMemoryBytes + Bytes,
- msgs_ready_in_memory = InMemoryCount + 1}.
+ msgs_ready_in_memory = InMemoryCount + 1};
+add_in_memory_counts(#{size := Bytes}, State) ->
+ add_in_memory_counts(Bytes, State).
-subtract_in_memory_counts(#{size := Bytes},
+subtract_in_memory_counts(Bytes,
#?MODULE{msg_bytes_in_memory = InMemoryBytes,
- msgs_ready_in_memory = InMemoryCount} = State) ->
+ msgs_ready_in_memory = InMemoryCount} = State)
+ when is_integer(Bytes) ->
State#?MODULE{msg_bytes_in_memory = InMemoryBytes - Bytes,
- msgs_ready_in_memory = InMemoryCount - 1}.
+ msgs_ready_in_memory = InMemoryCount - 1};
+subtract_in_memory_counts(#{size := Bytes}, State) ->
+ subtract_in_memory_counts(Bytes, State).
message_size(#basic_message{content = Content}) ->
#content{payload_fragments_rev = PFR} = Content,
iolist_size(PFR);
-message_size({'$prefix_msg', #{size := B}}) ->
- B;
-message_size({'$empty_msg', #{size := B}}) ->
- B;
+message_size({'$prefix_msg', H}) ->
+ get_size_from_header(H);
+message_size({'$empty_msg', H}) ->
+ get_size_from_header(H);
message_size(B) when is_binary(B) ->
byte_size(B);
message_size(Msg) ->
%% probably only hit this for testing so ok to use erts_debug
erts_debug:size(Msg).
+get_size_from_header(Size) when is_integer(Size) ->
+ Size;
+get_size_from_header(#{size := B}) ->
+ B.
+
+
all_nodes(#?MODULE{consumers = Cons0,
enqueuers = Enqs0,
waiting_consumers = WaitingConsumers0}) ->
diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl
index 0e9de0fb10..16e665f9df 100644
--- a/src/rabbit_fifo.hrl
+++ b/src/rabbit_fifo.hrl
@@ -18,11 +18,13 @@
%% in enqueue messages. Used to ensure ordering of messages send from the
%% same process
--type msg_header() :: #{size := msg_size(),
+-type msg_header() :: msg_size() |
+ #{size := msg_size(),
delivery_count => non_neg_integer()}.
-%% The message header map:
+%% The message header:
%% delivery_count: the number of unsuccessful delivery attempts.
%% A non-zero value indicates a previous attempt.
+%% If it only contains the size it can be condensed to an integer only
-type msg() :: {msg_header(), raw_msg()}.
%% message with a header map.
@@ -102,7 +104,9 @@
-record(cfg,
{name :: atom(),
resource :: rabbit_types:r('queue'),
- release_cursor_interval = ?RELEASE_CURSOR_EVERY :: non_neg_integer(),
+ release_cursor_interval =
+ {?RELEASE_CURSOR_EVERY, ?RELEASE_CURSOR_EVERY} ::
+ non_neg_integer() | {non_neg_integer(), non_neg_integer()},
dead_letter_handler :: option(applied_mfa()),
become_leader_handler :: option(applied_mfa()),
max_length :: option(non_neg_integer()),
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 4f8c129291..b52678605b 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -473,7 +473,10 @@ basic_get(Q, NoAck, CTag0, QState0) when ?amqqueue_is_quorum(Q) ->
{ok, empty, QState} ->
{ok, empty, QState};
{ok, {{MsgId, {MsgHeader, Msg0}}, MsgsReady}, QState} ->
- Count = maps:get(delivery_count, MsgHeader, 0),
+ Count = case MsgHeader of
+ #{delivery_count := C} -> C;
+ _ -> 0
+ end,
IsDelivered = Count > 0,
Msg = rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg0),
{ok, MsgsReady, {QName, Id, MsgId, IsDelivered, Msg}, QState};