summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2019-01-25 17:41:12 +0100
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2019-01-25 17:41:12 +0100
commitdcf663fcf99eeefb1c0de07e7389b47988cfbd92 (patch)
treeeff2f2f31d572615bc2453378f3b760d1323bb9f /src
parent9eaa79d5f80ec3025ce0dbbac5e81a60437dec7c (diff)
parenta4b602567081b28c4bc53ac5995b5c054a305da9 (diff)
downloadrabbitmq-server-git-dcf663fcf99eeefb1c0de07e7389b47988cfbd92.tar.gz
Merge branch 'master' into rabbitmq-server-1838-active-field-for-consumers
Conflicts: src/rabbit_fifo.erl
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl6
-rw-r--r--src/rabbit_fifo.erl581
-rw-r--r--src/rabbit_fifo_index.erl26
-rw-r--r--src/rabbit_quorum_queue.erl16
4 files changed, 376 insertions, 253 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index c8c6968ab0..44a044a4dc 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -31,7 +31,7 @@
-export([start/2, stop/1, prep_stop/1]).
-export([start_apps/1, start_apps/2, stop_apps/1]).
-export([log_locations/0, config_files/0, decrypt_config/2]). %% for testing and mgmt-agent
--export([is_booted/1]).
+-export([is_booted/1, is_booted/0, is_booting/1, is_booting/0]).
-ifdef(TEST).
@@ -696,6 +696,8 @@ await_startup(Node) ->
end
end.
+is_booting() -> is_booting(node()).
+
is_booting(Node) ->
case rpc:call(Node, erlang, whereis, [rabbit_boot]) of
{badrpc, _} = Err -> Err;
@@ -790,6 +792,8 @@ is_running() -> is_running(node()).
is_running(Node) -> rabbit_nodes:is_process_running(Node, rabbit).
+is_booted() -> is_booted(node()).
+
is_booted(Node) ->
case is_booting(Node) of
false ->
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index ad02205e02..612cd52dcc 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -38,6 +38,7 @@
% queries
query_messages_ready/1,
query_messages_checked_out/1,
+ query_messages_total/1,
query_processes/1,
query_ra_indexes/1,
query_consumer_count/1,
@@ -87,8 +88,13 @@
-type msg() :: {msg_header(), raw_msg()}.
%% message with a header map.
+-type msg_size() :: non_neg_integer().
+%% the size in bytes of the msg payload
+
-type indexed_msg() :: {ra_index(), msg()}.
+-type prefix_msg() :: {'$prefix_msg', msg_size()}.
+
-type delivery_msg() :: {msg_id(), msg()}.
%% A tuple consisting of the message id and the headered message.
@@ -157,7 +163,7 @@
-type applied_mfa() :: {module(), atom(), list()}.
% represents a partially applied module call
--define(SHADOW_COPY_INTERVAL, 4096 * 4).
+-define(SHADOW_COPY_INTERVAL, 4096 * 8).
-define(USE_AVG_HALF_LIFE, 10000.0).
-record(consumer,
@@ -202,7 +208,8 @@
next_msg_num = 1 :: msg_in_id(),
% list of returned msg_in_ids - when checking out it picks from
% this list first before taking low_msg_num
- returns = lqueue:new() :: lqueue:lqueue('$prefix_msg' | msg_in_id()),
+ returns = lqueue:new() :: lqueue:lqueue(prefix_msg() |
+ {msg_in_id(), indexed_msg()}),
% a counter of enqueues - used to trigger shadow copy points
enqueue_count = 0 :: non_neg_integer(),
% a map containing all the live processes that have ever enqueued
@@ -224,19 +231,20 @@
dead_letter_handler :: maybe(applied_mfa()),
become_leader_handler :: maybe(applied_mfa()),
%% This is a special field that is only used for snapshots
- %% It represents the number of queued messages at the time the
+ %% It represents the queued messages at the time the
%% dehydrated snapshot state was cached.
%% As release_cursors are only emitted for raft indexes where all
%% prior messages no longer contribute to the current state we can
- %% replace all message payloads at some index with a single integer
- %% to be decremented during `checkout_one' until it's 0 after which
- %% it instead takes messages from the `messages' map.
+ %% replace all message payloads with their sizes (to be used for
+ %% overflow calculations).
%% This is done so that consumers are still served in a deterministic
%% order on recovery.
- prefix_msg_counts = {0, 0} :: {Return :: non_neg_integer(),
- PrefixMsgs :: non_neg_integer()},
+ prefix_msgs = {[], []} :: {Return :: [msg_size()],
+ PrefixMsgs :: [msg_size()]},
msg_bytes_enqueue = 0 :: non_neg_integer(),
msg_bytes_checkout = 0 :: non_neg_integer(),
+ max_length :: maybe(non_neg_integer()),
+ max_bytes :: maybe(non_neg_integer()),
%% whether single active consumer is on or not for this queue
consumer_strategy = default :: default | single_active,
%% waiting consumers, one is picked active consumer is cancelled or dies
@@ -251,6 +259,8 @@
dead_letter_handler => applied_mfa(),
become_leader_handler => applied_mfa(),
shadow_copy_interval => non_neg_integer(),
+ max_length => non_neg_integer(),
+ max_bytes => non_neg_integer(),
single_active_consumer_on => boolean()}.
-export_type([protocol/0,
@@ -272,12 +282,14 @@
init(#{name := Name,
queue_resource := Resource} = Conf) ->
update_config(Conf, #state{name = Name,
- queue_resource = Resource}).
+ queue_resource = Resource}).
update_config(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
BLH = maps:get(become_leader_handler, Conf, undefined),
SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL),
+ MaxLength = maps:get(max_length, Conf, undefined),
+ MaxBytes = maps:get(max_bytes, Conf, undefined),
ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of
true ->
single_active;
@@ -287,6 +299,8 @@ update_config(Conf, State) ->
State#state{dead_letter_handler = DLH,
become_leader_handler = BLH,
shadow_copy_interval = SHI,
+ max_length = MaxLength,
+ max_bytes = MaxBytes,
consumer_strategy = ConsumerStrategy}.
zero(_) ->
@@ -294,59 +308,49 @@ zero(_) ->
% msg_ids are scoped per consumer
% ra_indexes holds all raft indexes for enqueues currently on queue
--spec apply(ra_machine:command_meta_data(), command(),
- state()) ->
- {state(), Reply :: term(), ra_machine:effects()}.
-apply(#{index := RaftIdx}, #enqueue{pid = From, seq = Seq,
- msg = RawMsg}, State00) ->
- case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State00) of
- {ok, State0, Effects1} ->
- %% need to checkout before capturing the shadow copy else
- %% snapshots may not be complete
- {State, ok, Effects} = checkout(
- add_bytes_enqueue(RawMsg, State0),
- Effects1),
- append_to_master_index(RaftIdx, Effects, State);
- {duplicate, State, Effects} ->
- {State, ok, lists:reverse(Effects)}
- end;
-apply(#{index := RaftIdx},
+-spec apply(ra_machine:command_meta_data(), command(), state()) ->
+ {state(), Reply :: term(), ra_machine:effects()} |
+ {state(), Reply :: term()}.
+apply(Metadata, #enqueue{pid = From, seq = Seq,
+ msg = RawMsg}, State00) ->
+ apply_enqueue(Metadata, From, Seq, RawMsg, State00);
+apply(Meta,
#settle{msg_ids = MsgIds, consumer_id = ConsumerId},
#state{consumers = Cons0} = State) ->
case Cons0 of
#{ConsumerId := Con0} ->
% need to increment metrics before completing as any snapshot
% states taken need to includ them
- complete_and_checkout(RaftIdx, MsgIds, ConsumerId,
+ complete_and_checkout(Meta, MsgIds, ConsumerId,
Con0, [], State);
_ ->
{State, ok}
end;
-apply(#{index := RaftIdx}, #discard{msg_ids = MsgIds, consumer_id = ConsumerId},
+apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId},
#state{consumers = Cons0} = State0) ->
case Cons0 of
#{ConsumerId := Con0} ->
Discarded = maps:with(MsgIds, Con0#consumer.checked_out),
Effects = dead_letter_effects(Discarded, State0, []),
- complete_and_checkout(RaftIdx, MsgIds, ConsumerId, Con0,
+ complete_and_checkout(Meta, MsgIds, ConsumerId, Con0,
Effects, State0);
_ ->
{State0, ok}
end;
-apply(_, #return{msg_ids = MsgIds, consumer_id = ConsumerId},
+apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId},
#state{consumers = Cons0} = State) ->
case Cons0 of
#{ConsumerId := Con0 = #consumer{checked_out = Checked0}} ->
Checked = maps:without(MsgIds, Checked0),
Returned = maps:with(MsgIds, Checked0),
MsgNumMsgs = maps:values(Returned),
- return(ConsumerId, MsgNumMsgs, Con0, Checked, [], State);
+ return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked, [], State);
_ ->
{State, ok}
end;
-apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
- drain = Drain, consumer_id = ConsumerId},
+apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
+ drain = Drain, consumer_id = ConsumerId},
#state{consumers = Cons0,
service_queue = ServiceQueue0} = State0) ->
case Cons0 of
@@ -359,7 +363,7 @@ apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
ServiceQueue0),
Cons = maps:put(ConsumerId, Con1, Cons0),
{State1, ok, Effects} =
- checkout(State0#state{service_queue = ServiceQueue,
+ checkout(Meta, State0#state{service_queue = ServiceQueue,
consumers = Cons}, []),
Response = {send_credit_reply, maps:size(State1#state.messages)},
%% by this point all checkouts for the updated credit value
@@ -389,59 +393,63 @@ apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
%% credit for unknown consumer - just ignore
{State0, ok}
end;
-apply(_, #checkout{spec = {dequeue, _}},
- #state{messages = M,
- prefix_msg_counts = {0, 0}} = State0) when map_size(M) == 0 ->
- %% FIXME: also check if there are returned messages
- %% TODO do we need metric visibility of empty get requests?
- {State0, {dequeue, empty}};
-apply(Meta, #checkout{spec = {dequeue, settled}, meta = ConsumerMeta,
+apply(Meta, #checkout{spec = {dequeue, Settlement},
+ meta = ConsumerMeta,
consumer_id = ConsumerId},
- State0) ->
- % TODO: this clause could probably be optimised
- State1 = update_consumer(ConsumerId, ConsumerMeta,
- {once, 1, simple_prefetch}, State0),
- % turn send msg effect into reply
- {success, _, MsgId, Msg, State2} = checkout_one(State1),
- % immediately settle
- {State, _, Effects} = apply(Meta, make_settle(ConsumerId, [MsgId]), State2),
- {State, {dequeue, {MsgId, Msg}}, Effects};
-apply(_, #checkout{spec = {dequeue, unsettled},
- meta = ConsumerMeta, consumer_id = {_, Pid} = ConsumerId},
- State0) ->
- State1 = update_consumer(ConsumerId, ConsumerMeta,
- {once, 1, simple_prefetch}, State0),
- case checkout_one(State1) of
- {success, _, MsgId, Msg, S} ->
- {S, {dequeue, {MsgId, Msg}}, [{monitor, process, Pid}]};
- {inactive, S} ->
- {S, {dequeue, empty}, [{aux, inactive}]};
- S ->
- {S, {dequeue, empty}}
+ #state{consumers = Consumers} = State0) ->
+ Exists = maps:is_key(ConsumerId, Consumers),
+ case messages_ready(State0) of
+ 0 ->
+ {State0, {dequeue, empty}};
+ _ when Exists ->
+ %% a dequeue using the same consumer_id isn't possible at this point
+ {State0, {dequeue, empty}};
+ _ ->
+ State1 = update_consumer(ConsumerId, ConsumerMeta,
+ {once, 1, simple_prefetch}, State0),
+ {success, _, MsgId, Msg, State2} = checkout_one(State1),
+ case Settlement of
+ unsettled ->
+ {_, Pid} = ConsumerId,
+ {State2, {dequeue, {MsgId, Msg}},
+ [{monitor, process, Pid}]};
+ settled ->
+ %% immediately settle the checkout
+ {State, _, Effects} = apply(Meta,
+ make_settle(ConsumerId, [MsgId]),
+ State2),
+ {State, {dequeue, {MsgId, Msg}}, Effects}
+ end
end;
-apply(_, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
+apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
{State, Effects} = cancel_consumer(ConsumerId, State0, []),
% TODO: here we should really demonitor the pid but _only_ if it has no
- % other consumers or enqueuers.
- checkout(State, Effects);
-apply(_, #checkout{spec = Spec, meta = Meta,
- consumer_id = {_, Pid} = ConsumerId},
+ % other consumers or enqueuers. leaving a monitor in place isn't harmful
+ % however
+ checkout(Meta, State, Effects);
+apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta,
+ consumer_id = {_, Pid} = ConsumerId},
State0) ->
- State1 = update_consumer(ConsumerId, Meta, Spec, State0),
- checkout(State1, [{monitor, process, Pid}]);
+ State1 = update_consumer(ConsumerId, ConsumerMeta, Spec, State0),
+ checkout(Meta, State1, [{monitor, process, Pid}]);
apply(#{index := RaftIdx}, #purge{},
#state{ra_indexes = Indexes0,
+ returns = Returns,
messages = Messages} = State0) ->
- Total = maps:size(Messages),
- Indexes = lists:foldl(fun rabbit_fifo_index:delete/2,
+ Total = messages_ready(State0),
+ Indexes1 = lists:foldl(fun rabbit_fifo_index:delete/2,
Indexes0,
[I || {I, _} <- lists:sort(maps:values(Messages))]),
+ Indexes = lists:foldl(fun rabbit_fifo_index:delete/2,
+ Indexes1,
+ [I || {_, {I, _}} <- lqueue:to_list(Returns)]),
{State, _, Effects} =
update_smallest_raft_index(RaftIdx, Indexes0,
State0#state{ra_indexes = Indexes,
messages = #{},
returns = lqueue:new(),
msg_bytes_enqueue = 0,
+ prefix_msgs = {[], []},
low_msg_num = undefined},
[]),
%% as we're not checking out after a purge (no point) we have to
@@ -461,8 +469,6 @@ apply(_, {down, ConsumerPid, noconnection},
#consumer{checked_out = Checked0} = C,
{Co, St0, Eff}) when node(P) =:= Node ->
St = return_all(St0, Checked0),
- %% TODO: need to increment credit here
- %% with the size of the Checked map
Credit = increase_credit(C, maps:size(Checked0)),
Eff1 = ConsumerUpdateActiveFun(St, K, C, false, suspected_down, Eff),
{maps:put(K, C#consumer{suspected_down = true,
@@ -488,8 +494,8 @@ apply(_, {down, ConsumerPid, noconnection},
%% TODO: should we run a checkout here?
{State#state{consumers = Cons, enqueuers = Enqs,
waiting_consumers = WaitingConsumers}, ok, Effects2};
-apply(_, {down, Pid, _Info}, #state{consumers = Cons0,
- enqueuers = Enqs0} = State0) ->
+apply(Meta, {down, Pid, _Info}, #state{consumers = Cons0,
+ enqueuers = Enqs0} = State0) ->
% Remove any enqueuer for the same pid and enqueue any pending messages
% This should be ok as we won't see any more enqueues from this pid
State1 = case maps:take(Pid, Enqs0) of
@@ -508,8 +514,8 @@ apply(_, {down, Pid, _Info}, #state{consumers = Cons0,
{State, Effects} = lists:foldl(fun(ConsumerId, {S, E}) ->
cancel_consumer(ConsumerId, S, E)
end, {State2, Effects1}, DownConsumers),
- checkout(State, Effects);
-apply(_, {nodeup, Node}, #state{consumers = Cons0,
+ checkout(Meta, State, Effects);
+apply(Meta, {nodeup, Node}, #state{consumers = Cons0,
enqueuers = Enqs0,
service_queue = SQ0} = State0) ->
%% A node we are monitoring has come back.
@@ -538,14 +544,14 @@ apply(_, {nodeup, Node}, #state{consumers = Cons0,
(_, _, Acc) ->
Acc
end, {Cons0, SQ0, Monitors}, Cons0),
- % TODO: avoid list concat
- checkout(State0#state{consumers = Cons1, enqueuers = Enqs1,
- service_queue = SQ,
- waiting_consumers = WaitingConsumers}, Effects);
+
+ checkout(Meta, State0#state{consumers = Cons1, enqueuers = Enqs1,
+ service_queue = SQ,
+ waiting_consumers = WaitingConsumers}, Effects);
apply(_, {nodedown, _Node}, State) ->
{State, ok};
-apply(_, #update_config{config = Conf}, State) ->
- {update_config(Conf, State), ok}.
+apply(Meta, #update_config{config = Conf}, State) ->
+ checkout(Meta, update_config(Conf, State), []).
consumer_active_flag_update_function(#state{consumer_strategy = default}) ->
fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) ->
@@ -605,7 +611,7 @@ state_enter(leader, #state{consumers = Cons,
enqueuers = Enqs,
waiting_consumers = WaitingConsumers,
name = Name,
- prefix_msg_counts = {0, 0},
+ prefix_msgs = {[], []},
become_leader_handler = BLH}) ->
% return effects to monitor all current consumers and enqueuers
Pids = lists:usort(maps:keys(Enqs)
@@ -621,10 +627,10 @@ state_enter(leader, #state{consumers = Cons,
{Mod, Fun, Args} ->
[{mod_call, Mod, Fun, Args ++ [Name]} | Effects]
end;
-state_enter(recovered, #state{prefix_msg_counts = PrefixMsgCounts})
- when PrefixMsgCounts =/= {0, 0} ->
+state_enter(recovered, #state{prefix_msgs = PrefixMsgCounts})
+ when PrefixMsgCounts =/= {[], []} ->
%% TODO: remove assertion?
- exit({rabbit_fifo, unexpected_prefix_msg_counts, PrefixMsgCounts});
+ exit({rabbit_fifo, unexpected_prefix_msgs, PrefixMsgCounts});
state_enter(eol, #state{enqueuers = Enqs,
consumers = Custs0,
waiting_consumers = WaitingConsumers0}) ->
@@ -642,14 +648,12 @@ state_enter(_, _) ->
-spec tick(non_neg_integer(), state()) -> ra_machine:effects().
tick(_Ts, #state{name = Name,
queue_resource = QName,
- messages = Messages,
- ra_indexes = Indexes,
msg_bytes_enqueue = EnqueueBytes,
msg_bytes_checkout = CheckoutBytes} = State) ->
Metrics = {Name,
- maps:size(Messages), % Ready
+ messages_ready(State),
num_checked_out(State), % checked out
- rabbit_fifo_index:size(Indexes), %% Total
+ messages_total(State),
query_consumer_count(State), % Consumers
EnqueueBytes,
CheckoutBytes},
@@ -659,17 +663,14 @@ tick(_Ts, #state{name = Name,
-spec overview(state()) -> map().
overview(#state{consumers = Cons,
enqueuers = Enqs,
- messages = Messages,
- ra_indexes = Indexes,
msg_bytes_enqueue = EnqueueBytes,
- msg_bytes_checkout = CheckoutBytes
- } = State) ->
+ msg_bytes_checkout = CheckoutBytes} = State) ->
#{type => ?MODULE,
num_consumers => maps:size(Cons),
num_checked_out => num_checked_out(State),
num_enqueuers => maps:size(Enqs),
- num_ready_messages => maps:size(Messages),
- num_messages => rabbit_fifo_index:size(Indexes),
+ num_ready_messages => messages_ready(State),
+ num_messages => messages_total(State),
enqueue_message_bytes => EnqueueBytes,
checkout_message_bytes => CheckoutBytes}.
@@ -706,13 +707,16 @@ handle_aux(_, cast, Cmd, {Name, Use0}, Log, _) ->
%%% Queries
-query_messages_ready(#state{messages = M}) ->
- M.
+query_messages_ready(State) ->
+ messages_ready(State).
query_messages_checked_out(#state{consumers = Consumers}) ->
maps:fold(fun (_, #consumer{checked_out = C}, S) ->
- maps:merge(S, maps:from_list(maps:values(C)))
- end, #{}, Consumers).
+ maps:size(C) + S
+ end, 0, Consumers).
+
+query_messages_total(State) ->
+ messages_total(State).
query_processes(#state{enqueuers = Enqs, consumers = Cons0}) ->
Cons = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Cons0),
@@ -802,6 +806,18 @@ usage(Name) when is_atom(Name) ->
%%% Internal
+messages_ready(#state{messages = M,
+ prefix_msgs = {PreR, PreM},
+ returns = R}) ->
+
+ %% TODO: optimise to avoid length/1 call
+ maps:size(M) + lqueue:len(R) + length(PreR) + length(PreM).
+
+messages_total(#state{ra_indexes = I,
+ prefix_msgs = {PreR, PreM}}) ->
+
+ rabbit_fifo_index:size(I) + length(PreR) + length(PreM).
+
update_use({inactive, _, _, _} = CUInfo, inactive) ->
CUInfo;
update_use({active, _, _} = CUInfo, active) ->
@@ -913,6 +929,35 @@ cancel_consumer0(ConsumerId,
{S0, Effects0}
end.
+apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
+ Bytes = message_size(RawMsg),
+ case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of
+ {ok, State1, Effects1} ->
+ State2 = append_to_master_index(RaftIdx,
+ add_bytes_enqueue(Bytes, State1)),
+ {State, ok, Effects} = checkout(Meta, State2, Effects1),
+ {maybe_store_dehydrated_state(RaftIdx, State), ok, Effects};
+ {duplicate, State, Effects} ->
+ {State, ok, Effects}
+ end.
+
+drop_head(#state{ra_indexes = Indexes0} = State0, Effects0) ->
+ case take_next_msg(State0) of
+ {FullMsg = {_MsgId, {RaftIdxToDrop, {_Header, Msg}}},
+ State1} ->
+ Indexes = rabbit_fifo_index:delete(RaftIdxToDrop, Indexes0),
+ Bytes = message_size(Msg),
+ State = add_bytes_drop(Bytes, State1#state{ra_indexes = Indexes}),
+ Effects = dead_letter_effects(maps:put(none, FullMsg, #{}),
+ State, Effects0),
+ {State, Effects};
+ {{'$prefix_msg', Bytes}, State1} ->
+ State = add_bytes_drop(Bytes, State1),
+ {State, Effects0};
+ empty ->
+ {State0, Effects0}
+ end.
+
enqueue(RaftIdx, RawMsg, #state{messages = Messages,
low_msg_num = LowMsgNum,
next_msg_num = NextMsgNum} = State0) ->
@@ -923,19 +968,28 @@ enqueue(RaftIdx, RawMsg, #state{messages = Messages,
low_msg_num = min(LowMsgNum, NextMsgNum),
next_msg_num = NextMsgNum + 1}.
-append_to_master_index(RaftIdx, Effects,
+append_to_master_index(RaftIdx,
#state{ra_indexes = Indexes0} = State0) ->
- {State, Shadow} = incr_enqueue_count(State0),
- Indexes = rabbit_fifo_index:append(RaftIdx, Shadow, Indexes0),
- {State#state{ra_indexes = Indexes}, ok, Effects}.
+ State = incr_enqueue_count(State0),
+ Indexes = rabbit_fifo_index:append(RaftIdx, undefined, Indexes0),
+ State#state{ra_indexes = Indexes}.
incr_enqueue_count(#state{enqueue_count = C,
shadow_copy_interval = C} = State0) ->
- % time to stash a dehydrated state version
- State = State0#state{enqueue_count = 0},
- {State, dehydrate_state(State)};
+ % this will trigger a dehydrated version of the state to be stored
+ % at this raft index for potential future snapshot generation
+ State0#state{enqueue_count = 0};
incr_enqueue_count(#state{enqueue_count = C} = State) ->
- {State#state{enqueue_count = C + 1}, undefined}.
+ State#state{enqueue_count = C + 1}.
+
+maybe_store_dehydrated_state(RaftIdx, #state{enqueue_count = 0,
+ ra_indexes = Indexes} = State) ->
+ Dehydrated = dehydrate_state(State),
+ State#state{ra_indexes =
+ rabbit_fifo_index:update_if_present(RaftIdx, Dehydrated,
+ Indexes)};
+maybe_store_dehydrated_state(_RaftIdx, State) ->
+ State.
enqueue_pending(From,
@@ -950,7 +1004,8 @@ enqueue_pending(From, Enq, #state{enqueuers = Enqueuers0} = State) ->
maybe_enqueue(RaftIdx, undefined, undefined, RawMsg, Effects, State0) ->
% direct enqueue without tracking
- {ok, enqueue(RaftIdx, RawMsg, State0), Effects};
+ State = enqueue(RaftIdx, RawMsg, State0),
+ {ok, State, Effects};
maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0,
#state{enqueuers = Enqueuers0} = State0) ->
case maps:get(From, Enqueuers0, undefined) of
@@ -981,19 +1036,19 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0,
snd(T) ->
element(2, T).
-return(ConsumerId, MsgNumMsgs, Con0, Checked,
+return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked,
Effects0, #state{consumers = Cons0, service_queue = SQ0} = State0) ->
Con = Con0#consumer{checked_out = Checked,
credit = increase_credit(Con0, length(MsgNumMsgs))},
{Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
- State1 = lists:foldl(fun('$prefix_msg' = Msg, S0) ->
+ State1 = lists:foldl(fun({'$prefix_msg', _} = Msg, S0) ->
return_one(0, Msg, S0);
({MsgNum, Msg}, S0) ->
return_one(MsgNum, Msg, S0)
end, State0, MsgNumMsgs),
- checkout(State1#state{consumers = Cons,
- service_queue = SQ},
+ checkout(Meta, State1#state{consumers = Cons,
+ service_queue = SQ},
Effects).
% used to processes messages that are finished
@@ -1024,7 +1079,7 @@ increase_credit(#consumer{lifetime = auto,
increase_credit(#consumer{credit = Current}, Credit) ->
Current + Credit.
-complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId,
+complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
#consumer{checked_out = Checked0} = Con0,
Effects0, #state{ra_indexes = Indexes0} = State0) ->
Checked = maps:without(MsgIds, Checked0),
@@ -1032,15 +1087,15 @@ complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId,
MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)],
State1 = lists:foldl(fun({_, {_, {_, RawMsg}}}, Acc) ->
add_bytes_settle(RawMsg, Acc);
- (_, Acc) ->
- Acc
+ ({'$prefix_msg', _} = M, Acc) ->
+ add_bytes_settle(M, Acc)
end, State0, maps:values(Discarded)),
%% need to pass the length of discarded as $prefix_msgs would be filtered
%% by the above list comprehension
{State2, Effects1} = complete(ConsumerId, MsgRaftIdxs,
maps:size(Discarded),
Con0, Checked, Effects0, State1),
- {State, ok, Effects} = checkout(State2, Effects1),
+ {State, ok, Effects} = checkout(Meta, State2, Effects1),
% settle metrics are incremented separately
update_smallest_raft_index(IncomingRaftIdx, Indexes0, State, Effects).
@@ -1050,7 +1105,7 @@ dead_letter_effects(_Discarded,
Effects;
dead_letter_effects(Discarded,
#state{dead_letter_handler = {Mod, Fun, Args}}, Effects) ->
- DeadLetters = maps:fold(fun(_, {_, {_, {_, Msg}}},
+ DeadLetters = maps:fold(fun(_, {_, {_, {_Header, Msg}}},
% MsgId, MsgIdID, RaftId, Header
Acc) -> [{rejected, Msg} | Acc]
end, [], Discarded),
@@ -1062,7 +1117,6 @@ cancel_consumer_effects(ConsumerId, #state{queue_resource = QName}, Effects) ->
update_smallest_raft_index(IncomingRaftIdx, OldIndexes,
#state{ra_indexes = Indexes,
- % prefix_msg_count = 0,
messages = Messages} = State, Effects) ->
case rabbit_fifo_index:size(Indexes) of
0 when map_size(Messages) =:= 0 ->
@@ -1080,46 +1134,50 @@ update_smallest_raft_index(IncomingRaftIdx, OldIndexes,
% effects
{State, ok, Effects};
{_, {Smallest, Shadow}} when Shadow =/= undefined ->
- % ?INFO("RELEASE ~w ~w ~w~n", [IncomingRaftIdx, Smallest,
- % Shadow]),
{State, ok, [{release_cursor, Smallest, Shadow}]};
_ -> % smallest
- % no shadow taken for this index,
% no release cursor increase
{State, ok, Effects}
end
end.
-% TODO update message then update messages and returns in single operations
-return_one(0, '$prefix_msg',
+return_one(0, {'$prefix_msg', _} = Msg,
#state{returns = Returns} = State0) ->
- State0#state{returns = lqueue:in('$prefix_msg', Returns)};
+ add_bytes_return(Msg,
+ State0#state{returns = lqueue:in(Msg, Returns)});
return_one(MsgNum, {RaftId, {Header0, RawMsg}},
- #state{messages = Messages,
- returns = Returns} = State0) ->
+ #state{returns = Returns} = State0) ->
Header = maps:update_with(delivery_count,
fun (C) -> C+1 end,
1, Header0),
Msg = {RaftId, {Header, RawMsg}},
% this should not affect the release cursor in any way
add_bytes_return(RawMsg,
- State0#state{messages = maps:put(MsgNum, Msg, Messages),
- returns = lqueue:in(MsgNum, Returns)}).
+ State0#state{returns = lqueue:in({MsgNum, Msg}, Returns)}).
-return_all(State, Checked0) ->
+return_all(State0, Checked0) ->
%% need to sort the list so that we return messages in the order
%% they were checked out
Checked = lists:sort(maps:to_list(Checked0)),
- lists:foldl(fun ({_, '$prefix_msg'}, S) ->
- return_one(0, '$prefix_msg', S);
+ lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, S) ->
+ return_one(0, Msg, S);
({_, {MsgNum, Msg}}, S) ->
return_one(MsgNum, Msg, S)
- end, State, Checked).
+ end, State0, Checked).
%% checkout new messages to consumers
%% reverses the effects list
-checkout(State, Effects) ->
- checkout0(checkout_one(State), Effects, #{}).
+checkout(#{index := Index}, State0, Effects0) ->
+ {State1, _Result, Effects1} = checkout0(checkout_one(State0),
+ Effects0, #{}),
+ case evaluate_limit(State0#state.ra_indexes, false,
+ State1, Effects1) of
+ {State, true, Effects} ->
+ update_smallest_raft_index(Index, State0#state.ra_indexes,
+ State, Effects);
+ {State, false, Effects} ->
+ {State, ok, Effects}
+ end.
checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, Acc0) ->
DelMsg = {MsgId, Msg},
@@ -1127,12 +1185,30 @@ checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, Acc0) ->
fun (M) -> [DelMsg | M] end,
[DelMsg], Acc0),
checkout0(checkout_one(State), Effects, Acc);
-checkout0({inactive, State}, Effects0, Acc) ->
- Effects = append_send_msg_effects(Effects0, Acc),
- {State, ok, lists:reverse([{aux, inactive} | Effects])};
-checkout0(State, Effects0, Acc) ->
- Effects = append_send_msg_effects(Effects0, Acc),
- {State, ok, lists:reverse(Effects)}.
+checkout0({Activity, State0}, Effects0, Acc) ->
+ Effects1 = case Activity of
+ nochange ->
+ append_send_msg_effects(Effects0, Acc);
+ inactive ->
+ [{aux, inactive}
+ | append_send_msg_effects(Effects0, Acc)]
+ end,
+ {State0, ok, lists:reverse(Effects1)}.
+
+evaluate_limit(_OldIndexes, Result,
+ #state{max_length = undefined,
+ max_bytes = undefined} = State,
+ Effects) ->
+ {State, Result, Effects};
+evaluate_limit(OldIndexes, Result,
+ State0, Effects0) ->
+ case is_over_limit(State0) of
+ true ->
+ {State, Effects} = drop_head(State0, Effects0),
+ evaluate_limit(OldIndexes, true, State, Effects);
+ false ->
+ {State0, Result, Effects0}
+ end.
append_send_msg_effects(Effects, AccMap) when map_size(AccMap) == 0 ->
Effects;
@@ -1142,57 +1218,50 @@ append_send_msg_effects(Effects0, AccMap) ->
end, Effects0, AccMap),
[{aux, active} | Effects].
-next_checkout_message(#state{prefix_msg_counts = {PReturns, P}} = State)
- when PReturns > 0 ->
+%% next message is determined as follows:
+%% First we check if there are are prefex returns
+%% Then we check if there are current returns
+%% then we check prefix msgs
+%% then we check current messages
+%%
+%% When we return it is always done to the current return queue
+%% for both prefix messages and current messages
+take_next_msg(#state{prefix_msgs = {[Bytes | Rem], P}} = State) ->
%% there are prefix returns, these should be served first
- {'$prefix_msg', State#state{prefix_msg_counts = {PReturns - 1, P}}};
-next_checkout_message(#state{returns = Returns,
- low_msg_num = Low0,
- prefix_msg_counts = {R, P},
- next_msg_num = NextMsgNum} = State) ->
+ {{'$prefix_msg', Bytes},
+ State#state{prefix_msgs = {Rem, P}}};
+take_next_msg(#state{returns = Returns,
+ low_msg_num = Low0,
+ messages = Messages0,
+ prefix_msgs = {R, P}} = State) ->
%% use peek rather than out there as the most likely case is an empty
%% queue
case lqueue:peek(Returns) of
- {value, Next} ->
- {Next, State#state{returns = lqueue:drop(Returns)}};
- empty when P == 0 ->
+ {value, NextMsg} ->
+ {NextMsg,
+ State#state{returns = lqueue:drop(Returns)}};
+ empty when P == [] ->
case Low0 of
undefined ->
- {undefined, State};
+ empty;
_ ->
- case Low0 + 1 of
- NextMsgNum ->
- %% the map will be empty after this item is removed
- {Low0, State#state{low_msg_num = undefined}};
- Low ->
- {Low0, State#state{low_msg_num = Low}}
+ {Msg, Messages} = maps:take(Low0, Messages0),
+ case maps:size(Messages) of
+ 0 ->
+ {{Low0, Msg},
+ State#state{messages = Messages,
+ low_msg_num = undefined}};
+ _ ->
+ {{Low0, Msg},
+ State#state{messages = Messages,
+ low_msg_num = Low0 + 1}}
end
end;
empty ->
+ [Bytes | Rem] = P,
%% There are prefix msgs
- {'$prefix_msg', State#state{prefix_msg_counts = {R, P - 1}}}
- end.
-
-%% next message is determined as follows:
-%% First we check if there are are prefex returns
-%% Then we check if there are current returns
-%% then we check prefix msgs
-%% then we check current messages
-%%
-%% When we return it is always done to the current return queue
-%% for both prefix messages and current messages
-take_next_msg(#state{messages = Messages0} = State0) ->
- case next_checkout_message(State0) of
- {'$prefix_msg', State} ->
- {'$prefix_msg', State, Messages0};
- {NextMsgInId, State} ->
- %% messages are available
- case maps:take(NextMsgInId, Messages0) of
- {IdxMsg, Messages} ->
- {{NextMsgInId, IdxMsg}, State, Messages};
- error ->
- error
- end
+ {{'$prefix_msg', Bytes},
+ State#state{prefix_msgs = {R, Rem}}}
end.
send_msg_effect({CTag, CPid}, Msgs) ->
@@ -1204,7 +1273,7 @@ checkout_one(#state{service_queue = SQ0,
case queue:peek(SQ0) of
{value, ConsumerId} ->
case take_next_msg(InitState) of
- {ConsumerMsg, State0, Messages} ->
+ {ConsumerMsg, State0} ->
SQ1 = queue:drop(SQ0),
%% there are consumers waiting to be serviced
%% process consumer checkout
@@ -1229,31 +1298,31 @@ checkout_one(#state{service_queue = SQ0,
update_or_remove_sub(ConsumerId, Con,
Cons0, SQ1, []),
State1 = State0#state{service_queue = SQ,
- messages = Messages,
consumers = Cons},
{State, Msg} =
case ConsumerMsg of
- '$prefix_msg' ->
- {State1, '$prefix_msg'};
+ {'$prefix_msg', _} ->
+ {add_bytes_checkout(ConsumerMsg, State1),
+ ConsumerMsg};
{_, {_, {_, RawMsg} = M}} ->
- {add_bytes_checkout(RawMsg, State1), M}
+ {add_bytes_checkout(RawMsg, State1),
+ M}
end,
{success, ConsumerId, Next, Msg, State};
error ->
%% consumer did not exist but was queued, recurse
checkout_one(InitState#state{service_queue = SQ1})
end;
- error ->
- InitState
+ empty ->
+ {nochange, InitState}
end;
empty ->
case maps:size(Messages0) of
- 0 -> InitState;
+ 0 -> {nochange, InitState};
_ -> {inactive, InitState}
end
end.
-
update_or_remove_sub(ConsumerId, #consumer{lifetime = auto,
credit = 0} = Con,
Cons, ServiceQueue, Effects) ->
@@ -1296,14 +1365,14 @@ update_consumer(ConsumerId, Meta, Spec,
%% general case, single active consumer off
update_consumer0(ConsumerId, Meta, Spec, State0);
update_consumer(ConsumerId, Meta, Spec,
- #state{consumers = Cons0,
+ #state{consumers = Cons0,
consumer_strategy = single_active} = State0)
when map_size(Cons0) == 0 ->
%% single active consumer on, no one is consuming yet
update_consumer0(ConsumerId, Meta, Spec, State0);
update_consumer(ConsumerId, Meta, {Life, Credit, Mode},
- #state{consumer_strategy = single_active,
- waiting_consumers = WaitingConsumers0} = State0) ->
+ #state{consumer_strategy = single_active,
+ waiting_consumers = WaitingConsumers0} = State0) ->
%% single active consumer on and one active consumer already
%% adding the new consumer to the waiting list
Consumer = #consumer{lifetime = Life, meta = Meta,
@@ -1323,11 +1392,10 @@ update_consumer0(ConsumerId, Meta, {Life, Credit, Mode},
%% the credit update
N = maps:size(S#consumer.checked_out),
C = max(0, Credit - N),
- S#consumer{lifetime = Life,
- credit = C}
+ S#consumer{lifetime = Life, credit = C}
end, Init, Cons0),
ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons),
- ServiceQueue0),
+ ServiceQueue0),
State0#state{consumers = Cons, service_queue = ServiceQueue}.
@@ -1347,13 +1415,20 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
dehydrate_state(#state{messages = Messages,
consumers = Consumers,
returns = Returns,
- prefix_msg_counts = {PrefRetCnt, MsgCount}} = State) ->
- %% TODO: optimise to avoid having to iterate the queue to get the number
- %% of current returned messages
- RetLen = lqueue:len(Returns), % O(1)
- CurReturns = length([R || R <- lqueue:to_list(Returns),
- R =/= '$prefix_msg']),
- PrefixMsgCnt = MsgCount + maps:size(Messages) - CurReturns,
+ prefix_msgs = {PrefRet0, PrefMsg0}} = State) ->
+ %% TODO: optimise this function as far as possible
+ PrefRet = lists:foldl(fun ({'$prefix_msg', Bytes}, Acc) ->
+ [Bytes | Acc];
+ ({_, {_, {_, Raw}}}, Acc) ->
+ [message_size(Raw) | Acc]
+ end,
+ lists:reverse(PrefRet0),
+ lqueue:to_list(Returns)),
+ PrefMsgs = lists:foldl(fun ({_, {_RaftIdx, {_H, Raw}}}, Acc) ->
+ [message_size(Raw) | Acc]
+ end,
+ lists:reverse(PrefMsg0),
+ lists:sort(maps:to_list(Messages))),
State#state{messages = #{},
ra_indexes = rabbit_fifo_index:empty(),
low_msg_num = undefined,
@@ -1361,14 +1436,26 @@ dehydrate_state(#state{messages = Messages,
dehydrate_consumer(C)
end, Consumers),
returns = lqueue:new(),
- %% messages include returns
- prefix_msg_counts = {RetLen + PrefRetCnt,
- PrefixMsgCnt}}.
+ prefix_msgs = {lists:reverse(PrefRet),
+ lists:reverse(PrefMsgs)}}.
dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
- Checked = maps:map(fun (_, _) -> '$prefix_msg' end, Checked0),
+ Checked = maps:map(fun (_, {'$prefix_msg', _} = M) ->
+ M;
+ (_, {_, {_, {_, Raw}}}) ->
+ {'$prefix_msg', message_size(Raw)}
+ end, Checked0),
Con#consumer{checked_out = Checked}.
+is_over_limit(#state{max_length = undefined,
+ max_bytes = undefined}) ->
+ false;
+is_over_limit(#state{max_length = MaxLength,
+ max_bytes = MaxBytes,
+ msg_bytes_enqueue = BytesEnq} = State) ->
+
+ messages_ready(State) > MaxLength orelse (BytesEnq > MaxBytes).
+
-spec make_enqueue(maybe(pid()), maybe(msg_seqno()), raw_msg()) -> protocol().
make_enqueue(Pid, Seq, Msg) ->
#enqueue{pid = Pid, seq = Seq, msg = Msg}.
@@ -1405,10 +1492,12 @@ make_purge() -> #purge{}.
make_update_config(Config) ->
#update_config{config = Config}.
-add_bytes_enqueue(Msg, #state{msg_bytes_enqueue = Enqueue} = State) ->
- Bytes = message_size(Msg),
+add_bytes_enqueue(Bytes, #state{msg_bytes_enqueue = Enqueue} = State) ->
State#state{msg_bytes_enqueue = Enqueue + Bytes}.
+add_bytes_drop(Bytes, #state{msg_bytes_enqueue = Enqueue} = State) ->
+ State#state{msg_bytes_enqueue = Enqueue - Bytes}.
+
add_bytes_checkout(Msg, #state{msg_bytes_checkout = Checkout,
msg_bytes_enqueue = Enqueue } = State) ->
Bytes = message_size(Msg),
@@ -1428,6 +1517,8 @@ add_bytes_return(Msg, #state{msg_bytes_checkout = Checkout,
message_size(#basic_message{content = Content}) ->
#content{payload_fragments_rev = PFR} = Content,
iolist_size(PFR);
+message_size({'$prefix_msg', B}) ->
+ B;
message_size(B) when is_binary(B) ->
byte_size(B);
message_size(Msg) ->
@@ -1588,7 +1679,7 @@ enq_enq_deq_deq_settle_test() ->
{State3, {dequeue, {0, {_, first}}}, [{monitor, _, _}]} =
apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}),
State2),
- {_State4, {dequeue, empty}, _} =
+ {_State4, {dequeue, empty}} =
apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}),
State3),
ok.
@@ -1633,7 +1724,7 @@ release_cursor_test() ->
checkout_enq_settle_test() ->
Cid = {?FUNCTION_NAME, self()},
- {State1, [{monitor, _, _}]} = check(Cid, 1, test_init(test)),
+ {State1, [{monitor, _, _} | _]} = check(Cid, 1, test_init(test)),
{State2, Effects0} = enq(2, 1, first, State1),
?ASSERT_EFF({send_msg, _,
{delivery, ?FUNCTION_NAME,
@@ -1648,7 +1739,7 @@ checkout_enq_settle_test() ->
out_of_order_enqueue_test() ->
Cid = {?FUNCTION_NAME, self()},
- {State1, [{monitor, _, _}]} = check_n(Cid, 5, 5, test_init(test)),
+ {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)),
{State2, Effects2} = enq(2, 1, first, State1),
?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2),
% assert monitor was set up
@@ -1678,7 +1769,7 @@ out_of_order_first_enqueue_test() ->
duplicate_enqueue_test() ->
Cid = {<<"duplicate_enqueue_test">>, self()},
- {State1, [{monitor, _, _}]} = check_n(Cid, 5, 5, test_init(test)),
+ {State1, [{monitor, _, _} | _]} = check_n(Cid, 5, 5, test_init(test)),
{State2, Effects2} = enq(2, 1, first, State1),
?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects2),
{_State3, Effects3} = enq(3, 1, first, State2),
@@ -1697,10 +1788,10 @@ return_checked_out_test() ->
{State0, [_, _]} = enq(1, 1, first, test_init(test)),
{State1, [_Monitor,
{send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event},
- {aux, active}
+ {aux, active} | _
]} = check(Cid, 2, State0),
% return
- {_State2, _, [_, _]} = apply(meta(3), make_return(Cid, [MsgId]), State1),
+ {_State2, _, [_]} = apply(meta(3), make_return(Cid, [MsgId]), State1),
ok.
return_auto_checked_out_test() ->
@@ -1729,7 +1820,8 @@ cancelled_checkout_out_test() ->
{State1, _} = check_auto(Cid, 2, State0),
% cancelled checkout should return all pending messages to queue
{State2, _, _} = apply(meta(3), make_checkout(Cid, cancel, #{}), State1),
- ?assertEqual(2, maps:size(State2#state.messages)),
+ ?assertEqual(1, maps:size(State2#state.messages)),
+ ?assertEqual(1, lqueue:len(State2#state.returns)),
{State3, {dequeue, {0, {_, first}}}, _} =
apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State2),
@@ -1782,14 +1874,14 @@ down_with_noconnection_returns_unack_test() ->
?assertEqual(0, maps:size(State1#state.messages)),
?assertEqual(0, lqueue:len(State1#state.returns)),
{State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1),
- ?assertEqual(1, maps:size(State2a#state.messages)),
+ ?assertEqual(0, maps:size(State2a#state.messages)),
?assertEqual(1, lqueue:len(State2a#state.returns)),
ok.
down_with_noproc_enqueuer_is_cleaned_up_test() ->
State00 = test_init(test),
Pid = spawn(fun() -> ok end),
- {State0, _, Effects0} = apply(meta(1), {enqueue, Pid, 1, first}, State00),
+ {State0, _, Effects0} = apply(meta(1), make_enqueue(Pid, 1, first), State00),
?ASSERT_EFF({monitor, process, _}, Effects0),
{State1, _, _} = apply(meta(3), {down, Pid, noproc}, State0),
% ensure there are no enqueuers
@@ -2150,7 +2242,7 @@ single_active_consumer_test() ->
% adding some consumers
AddConsumer = fun(CTag, State) ->
{NewState, _, _} = apply(
- #{},
+ meta(1),
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, self()}},
@@ -2168,7 +2260,8 @@ single_active_consumer_test() ->
?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State1#state.waiting_consumers)),
% cancelling a waiting consumer
- {State2, _, Effects1} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag3">>, self()}}, State1),
+ {State2, _, Effects1} = apply(meta(2),
+ #checkout{spec = cancel, consumer_id = {<<"ctag3">>, self()}}, State1),
% the active consumer should still be in place
?assertEqual(1, map_size(State2#state.consumers)),
?assert(maps:is_key({<<"ctag1">>, self()}, State2#state.consumers)),
@@ -2180,7 +2273,7 @@ single_active_consumer_test() ->
?assertEqual(1, length(Effects1)),
% cancelling the active consumer
- {State3, _, Effects2} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag1">>, self()}}, State2),
+ {State3, _, Effects2} = apply(meta(3), #checkout{spec = cancel, consumer_id = {<<"ctag1">>, self()}}, State2),
% the second registered consumer is now the active one
?assertEqual(1, map_size(State3#state.consumers)),
?assert(maps:is_key({<<"ctag2">>, self()}, State3#state.consumers)),
@@ -2191,7 +2284,7 @@ single_active_consumer_test() ->
?assertEqual(2, length(Effects2)),
% cancelling the active consumer
- {State4, _, Effects3} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3),
+ {State4, _, Effects3} = apply(meta(4), #checkout{spec = cancel, consumer_id = {<<"ctag2">>, self()}}, State3),
% the last waiting consumer became the active one
?assertEqual(1, map_size(State4#state.consumers)),
?assert(maps:is_key({<<"ctag4">>, self()}, State4#state.consumers)),
@@ -2201,7 +2294,7 @@ single_active_consumer_test() ->
?assertEqual(2, length(Effects3)),
% cancelling the last consumer
- {State5, _, Effects4} = apply(#{}, #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4),
+ {State5, _, Effects4} = apply(meta(5), #checkout{spec = cancel, consumer_id = {<<"ctag4">>, self()}}, State4),
% no active consumer anymore
?assertEqual(0, map_size(State5#state.consumers)),
% still nothing in the waiting list
@@ -2226,7 +2319,7 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() ->
% adding some consumers
AddConsumer = fun({CTag, ChannelId}, State) ->
{NewState, _, _} = apply(
- #{},
+ #{index => 1},
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, ChannelId}},
@@ -2237,16 +2330,17 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() ->
[{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
% the channel of the active consumer goes down
- {State2, _, Effects} = apply(#{}, {down, Pid1, doesnotmatter}, State1),
+ {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, doesnotmatter}, State1),
% fell back to another consumer
?assertEqual(1, map_size(State2#state.consumers)),
% there are still waiting consumers
?assertEqual(2, length(State2#state.waiting_consumers)),
- % effects to unregister the consumer and to update the new active one (metrics) are there
+ % effects to unregister the consumer and
+ % to update the new active one (metrics) are there
?assertEqual(2, length(Effects)),
% the channel of the active consumer and a waiting consumer goes down
- {State3, _, Effects2} = apply(#{}, {down, Pid2, doesnotmatter}, State2),
+ {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, doesnotmatter}, State2),
% fell back to another consumer
?assertEqual(1, map_size(State3#state.consumers)),
% no more waiting consumer
@@ -2255,7 +2349,7 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() ->
?assertEqual(3, length(Effects2)),
% the last channel goes down
- {State4, _, Effects3} = apply(#{}, {down, Pid3, doesnotmatter}, State3),
+ {State4, _, Effects3} = apply(#{index => 4}, {down, Pid3, doesnotmatter}, State3),
% no more consumers
?assertEqual(0, map_size(State4#state.consumers)),
?assertEqual(0, length(State4#state.waiting_consumers)),
@@ -2271,10 +2365,11 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti
shadow_copy_interval => 0,
single_active_consumer_on => true}),
+ Meta = #{index => 1},
% adding some consumers
AddConsumer = fun(CTag, State) ->
{NewState, _, _} = apply(
- #{},
+ Meta,
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, self()}},
@@ -2293,7 +2388,7 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti
end, State2#state.waiting_consumers),
% simulate node goes back up
- {State3, _, _} = apply(#{}, {nodeup, node(self())}, State2),
+ {State3, _, _} = apply(#{index => 2}, {nodeup, node(self())}, State2),
% all the waiting consumers should be un-suspected
?assertEqual(3, length(State3#state.waiting_consumers)),
@@ -2315,10 +2410,11 @@ single_active_consumer_state_enter_leader_include_waiting_consumers_test() ->
Pid2 = spawn(DummyFunction),
Pid3 = spawn(DummyFunction),
+ Meta = #{index => 1},
% adding some consumers
AddConsumer = fun({CTag, ChannelId}, State) ->
{NewState, _, _} = apply(
- #{},
+ Meta,
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, ChannelId}},
@@ -2344,10 +2440,11 @@ single_active_consumer_state_enter_eol_include_waiting_consumers_test() ->
Pid2 = spawn(DummyFunction),
Pid3 = spawn(DummyFunction),
+ Meta = #{index => 1},
% adding some consumers
AddConsumer = fun({CTag, ChannelId}, State) ->
{NewState, _, _} = apply(
- #{},
+ Meta,
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, ChannelId}},
@@ -2371,7 +2468,7 @@ query_consumers_test() ->
% adding some consumers
AddConsumer = fun(CTag, State) ->
{NewState, _, _} = apply(
- #{},
+ #{index => 1},
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, self()}},
@@ -2405,11 +2502,11 @@ query_consumers_when_single_active_consumer_is_on_test() ->
atom_to_binary(?FUNCTION_NAME, utf8)),
shadow_copy_interval => 0,
single_active_consumer_on => true}),
-
+ Meta = #{index => 1},
% adding some consumers
AddConsumer = fun(CTag, State) ->
{NewState, _, _} = apply(
- #{},
+ Meta,
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, self()}},
@@ -2448,7 +2545,7 @@ active_flag_updated_when_consumer_suspected_unsuspected_test() ->
% adding some consumers
AddConsumer = fun({CTag, ChannelId}, State) ->
{NewState, _, _} = apply(
- #{},
+ #{index => 1},
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, ChannelId}},
@@ -2458,11 +2555,11 @@ active_flag_updated_when_consumer_suspected_unsuspected_test() ->
State1 = lists:foldl(AddConsumer, State0,
[{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
- {State2, _, Effects2} = apply(doesnotmatter, {down, Pid1, noconnection}, State1),
+ {State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1),
% 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node
?assertEqual(4 + 1, length(Effects2)),
- {_, _, Effects3} = apply(doesnotmatter, {nodeup, node(self())}, State2),
+ {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2),
% for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID
?assertEqual(4 + 4, length(Effects3)).
@@ -2481,7 +2578,7 @@ active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_co
% adding some consumers
AddConsumer = fun({CTag, ChannelId}, State) ->
{NewState, _, _} = apply(
- #{},
+ #{index => 1},
#checkout{spec = {once, 1, simple_prefetch},
meta = #{},
consumer_id = {CTag, ChannelId}},
@@ -2491,11 +2588,11 @@ active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_co
State1 = lists:foldl(AddConsumer, State0,
[{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
- {State2, _, Effects2} = apply(doesnotmatter, {down, Pid1, noconnection}, State1),
+ {State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1),
% only 1 effect to monitor the node
?assertEqual(1, length(Effects2)),
- {_, _, Effects3} = apply(doesnotmatter, {nodeup, node(self())}, State2),
+ {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2),
% for each consumer: 1 effect to monitor the consumer PID
?assertEqual(4, length(Effects3)).
diff --git a/src/rabbit_fifo_index.erl b/src/rabbit_fifo_index.erl
index f8f414f453..184002611e 100644
--- a/src/rabbit_fifo_index.erl
+++ b/src/rabbit_fifo_index.erl
@@ -4,12 +4,14 @@
empty/0,
fetch/2,
append/3,
+ update_if_present/3,
return/3,
delete/2,
size/1,
smallest/1,
next_key_after/2,
- map/2
+ map/2,
+ to_map/1
]).
-include_lib("ra/include/ra.hrl").
@@ -36,12 +38,22 @@ fetch(Key, #?MODULE{data = Data}) ->
-spec append(integer(), term(), state()) -> state().
append(Key, Value,
#?MODULE{data = Data,
- smallest = Smallest,
- largest = Largest} = State)
+ smallest = Smallest,
+ largest = Largest} = State)
when Key > Largest orelse Largest =:= undefined ->
State#?MODULE{data = maps:put(Key, Value, Data),
- smallest = ra_lib:default(Smallest, Key),
- largest = Key}.
+ smallest = ra_lib:default(Smallest, Key),
+ largest = Key}.
+
+-spec update_if_present(integer(), term(), state()) -> state().
+update_if_present(Key, Value, #?MODULE{data = Data} = State) ->
+ case Data of
+ #{Key := _} ->
+ State#?MODULE{data = maps:put(Key, Value, Data)};
+ _ ->
+ State
+ end.
+
-spec return(integer(), term(), state()) -> state().
return(Key, Value, #?MODULE{data = Data, smallest = Smallest} = State)
@@ -76,6 +88,10 @@ delete(Key, #?MODULE{data = Data} = State) ->
size(#?MODULE{data = Data}) ->
maps:size(Data).
+-spec to_map(state()) -> #{integer() => term()}.
+to_map(#?MODULE{data = Data}) ->
+ Data.
+
-spec smallest(state()) -> undefined | {integer(), term()}.
smallest(#?MODULE{smallest = undefined}) ->
undefined;
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 6c8c99516d..114af4fdfb 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -152,10 +152,15 @@ ra_machine(Q) ->
ra_machine_config(Q = #amqqueue{name = QName,
pid = {Name, _}}) ->
+ %% take the minimum value of the policy and the queue arg if present
+ MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q),
+ MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
#{name => Name,
queue_resource => QName,
dead_letter_handler => dlx_mfa(Q),
become_leader_handler => {?MODULE, become_leader, [QName]},
+ max_length => MaxLength,
+ max_bytes => MaxBytes,
single_active_consumer_on => single_active_consumer_on(Q)}.
single_active_consumer_on(#amqqueue{arguments = QArguments}) ->
@@ -641,8 +646,10 @@ delete_member(#amqqueue{pid = {RaName, _}, name = QName}, Node) ->
%%----------------------------------------------------------------------------
dlx_mfa(Q) ->
- DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>, fun res_arg/2, Q), Q),
- DLXRKey = args_policy_lookup(<<"dead-letter-routing-key">>, fun res_arg/2, Q),
+ DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>,
+ fun res_arg/2, Q), Q),
+ DLXRKey = args_policy_lookup(<<"dead-letter-routing-key">>,
+ fun res_arg/2, Q),
{?MODULE, dead_letter_publish, [DLX, DLXRKey, Q#amqqueue.name]}.
init_dlx(undefined, _Q) ->
@@ -834,9 +841,8 @@ qnode({_, Node}) ->
Node.
check_invalid_arguments(QueueName, Args) ->
- Keys = [<<"x-expires">>, <<"x-message-ttl">>, <<"x-max-length">>,
- <<"x-max-length-bytes">>, <<"x-max-priority">>, <<"x-overflow">>,
- <<"x-queue-mode">>],
+ Keys = [<<"x-expires">>, <<"x-message-ttl">>,
+ <<"x-max-priority">>, <<"x-queue-mode">>, <<"x-overflow">>],
[case rabbit_misc:table_lookup(Args, Key) of
undefined -> ok;
_TypeVal -> rabbit_misc:protocol_error(