summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2018-12-20 16:21:39 +0000
committerkjnilsson <knilsson@pivotal.io>2018-12-20 16:21:39 +0000
commitb20ad12d7b2f9769493fbbbb35eb672b0bd75343 (patch)
tree4582665f80f20ae6881126d8f301b1993ca2e77a /src
parent794ef8de24ec261d1f3217f0a1ccfae98c8c6873 (diff)
downloadrabbitmq-server-git-b20ad12d7b2f9769493fbbbb35eb672b0bd75343.tar.gz
rabbit_fifo: apply/4 -> apply/3
Support the latest Ra api changes.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl359
-rw-r--r--src/rabbit_quorum_queue.erl4
2 files changed, 186 insertions, 177 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 1fc467913e..579af38bb3 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -20,13 +20,14 @@
-compile(inline_list_funcs).
-compile(inline).
+-compile({no_auto_import, [apply/3]}).
-include_lib("ra/include/ra.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-export([
init/1,
- apply/4,
+ apply/3,
state_enter/2,
tick/2,
overview/1,
@@ -277,55 +278,58 @@ zero(_) ->
% msg_ids are scoped per consumer
% ra_indexes holds all raft indexes for enqueues currently on queue
-spec apply(ra_machine:command_meta_data(), command(),
- ra_machine:effects(), state()) ->
- {state(), ra_machine:effects(), Reply :: term()}.
+ state()) ->
+ {state(), Reply :: term(), ra_machine:effects()}.
apply(#{index := RaftIdx}, #enqueue{pid = From, seq = Seq,
- msg = RawMsg}, Effects0, State00) ->
- case maybe_enqueue(RaftIdx, From, Seq, RawMsg, Effects0, State00) of
+ msg = RawMsg}, State00) ->
+ case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State00) of
{ok, State0, Effects1} ->
- {State, Effects, ok} = checkout(add_bytes_enqueue(RawMsg, State0),
- Effects1),
- {append_to_master_index(RaftIdx, State), Effects, ok};
+ %% need to checkout before capturing the shadow copy else
+ %% snapshots may not be complete
+ {State, ok, Effects} = checkout(
+ add_bytes_enqueue(RawMsg, State0),
+ Effects1),
+ append_to_master_index(RaftIdx, Effects, State);
{duplicate, State, Effects} ->
- {State, Effects, ok}
+ {State, ok, lists:reverse(Effects)}
end;
apply(#{index := RaftIdx},
- #settle{msg_ids = MsgIds, consumer_id = ConsumerId}, Effects0,
+ #settle{msg_ids = MsgIds, consumer_id = ConsumerId},
#state{consumers = Cons0} = State) ->
case Cons0 of
#{ConsumerId := Con0} ->
% need to increment metrics before completing as any snapshot
% states taken need to includ them
complete_and_checkout(RaftIdx, MsgIds, ConsumerId,
- Con0, Effects0, State);
+ Con0, [], State);
_ ->
- {State, Effects0, ok}
+ {State, ok}
+
end;
apply(#{index := RaftIdx}, #discard{msg_ids = MsgIds, consumer_id = ConsumerId},
- Effects0, #state{consumers = Cons0} = State0) ->
+ #state{consumers = Cons0} = State0) ->
case Cons0 of
#{ConsumerId := Con0} ->
- {State, Effects, Res} = complete_and_checkout(RaftIdx, MsgIds,
- ConsumerId, Con0,
- Effects0, State0),
Discarded = maps:with(MsgIds, Con0#consumer.checked_out),
- {State, dead_letter_effects(Discarded, State, Effects), Res};
+ Effects = dead_letter_effects(Discarded, State0, []),
+ complete_and_checkout(RaftIdx, MsgIds, ConsumerId, Con0,
+ Effects, State0);
_ ->
- {State0, Effects0, ok}
+ {State0, ok}
end;
-apply(_, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, Effects0,
+apply(_, #return{msg_ids = MsgIds, consumer_id = ConsumerId},
#state{consumers = Cons0} = State) ->
case Cons0 of
#{ConsumerId := Con0 = #consumer{checked_out = Checked0}} ->
Checked = maps:without(MsgIds, Checked0),
Returned = maps:with(MsgIds, Checked0),
MsgNumMsgs = maps:values(Returned),
- return(ConsumerId, MsgNumMsgs, Con0, Checked, Effects0, State);
+ return(ConsumerId, MsgNumMsgs, Con0, Checked, [], State);
_ ->
- {State, Effects0, ok}
+ {State, ok}
end;
apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
- drain = Drain, consumer_id = ConsumerId}, Effects0,
+ drain = Drain, consumer_id = ConsumerId},
#state{consumers = Cons0,
service_queue = ServiceQueue0} = State0) ->
case Cons0 of
@@ -337,16 +341,16 @@ apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
ServiceQueue = maybe_queue_consumer(ConsumerId, Con1,
ServiceQueue0),
Cons = maps:put(ConsumerId, Con1, Cons0),
- {State1, Effects, ok} =
+ {State1, ok, Effects} =
checkout(State0#state{service_queue = ServiceQueue,
- consumers = Cons}, Effects0),
+ consumers = Cons}, []),
Response = {send_credit_reply, maps:size(State1#state.messages)},
%% by this point all checkouts for the updated credit value
%% should be processed so we can evaluate the drain
case Drain of
false ->
%% just return the result of the checkout
- {State1, Effects, Response};
+ {State1, Response, Effects};
true ->
Con = #consumer{credit = PostCred} =
maps:get(ConsumerId, State1#state.consumers),
@@ -359,71 +363,68 @@ apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
Drained = Con#consumer.credit,
{CTag, _} = ConsumerId,
{State1#state{consumers = Consumers},
- Effects,
%% returning a multi response with two client actions
%% for the channel to execute
- {multi, [Response, {send_drained, [{CTag, Drained}]}]}}
+ {multi, [Response, {send_drained, [{CTag, Drained}]}]},
+ Effects}
end;
_ ->
%% credit for unknown consumer - just ignore
- {State0, Effects0, ok}
+ {State0, ok}
end;
-apply(_, #checkout{spec = {dequeue, _}}, Effects0,
+apply(_, #checkout{spec = {dequeue, _}},
#state{messages = M,
prefix_msg_counts = {0, 0}} = State0) when map_size(M) == 0 ->
%% FIXME: also check if there are returned messages
%% TODO do we need metric visibility of empty get requests?
- {State0, Effects0, {dequeue, empty}};
+ {State0, {dequeue, empty}};
apply(Meta, #checkout{spec = {dequeue, settled}, meta = ConsumerMeta,
consumer_id = ConsumerId},
- Effects0, State0) ->
+ State0) ->
% TODO: this clause could probably be optimised
State1 = update_consumer(ConsumerId, ConsumerMeta,
{once, 1, simple_prefetch}, State0),
% turn send msg effect into reply
{success, _, MsgId, Msg, State2} = checkout_one(State1),
% immediately settle
- {State, Effects, _} = apply(Meta, make_settle(ConsumerId, [MsgId]),
- Effects0, State2),
- {State, Effects, {dequeue, {MsgId, Msg}}};
+ {State, _, Effects} = apply(Meta, make_settle(ConsumerId, [MsgId]), State2),
+ {State, {dequeue, {MsgId, Msg}}, Effects};
apply(_, #checkout{spec = {dequeue, unsettled},
meta = ConsumerMeta, consumer_id = {_, Pid} = ConsumerId},
- Effects0, State0) ->
+ State0) ->
State1 = update_consumer(ConsumerId, ConsumerMeta,
{once, 1, simple_prefetch}, State0),
- Effects1 = [{monitor, process, Pid} | Effects0],
- {State, Reply, Effects} = case checkout_one(State1) of
- {success, _, MsgId, Msg, S} ->
- {S, {MsgId, Msg}, Effects1};
- {inactive, S} ->
- {S, empty, [{aux, inactive} | Effects1]};
- S ->
- {S, empty, Effects1}
- end,
- {State, Effects, {dequeue, Reply}};
-apply(_, #checkout{spec = cancel, consumer_id = ConsumerId}, Effects0, State0) ->
- {CancelEffects, State1} = cancel_consumer(ConsumerId, {Effects0, State0}),
+ case checkout_one(State1) of
+ {success, _, MsgId, Msg, S} ->
+ {S, {dequeue, {MsgId, Msg}}, [{monitor, process, Pid}]};
+ {inactive, S} ->
+ {S, {dequeue, empty}, [{aux, inactive}]};
+ S ->
+ {S, {dequeue, empty}}
+ end;
+apply(_, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) ->
+ {CancelEffects, State1} = cancel_consumer(ConsumerId, {[], State0}),
% TODO: here we should really demonitor the pid but _only_ if it has no
% other consumers or enqueuers.
checkout(State1, CancelEffects);
-apply(_, #checkout{spec = Spec, meta = Meta, consumer_id = {_, Pid} = ConsumerId},
- Effects0, State0) ->
+apply(_, #checkout{spec = Spec, meta = Meta,
+ consumer_id = {_, Pid} = ConsumerId},
+ State0) ->
State1 = update_consumer(ConsumerId, Meta, Spec, State0),
- {State, Effects, Res} = checkout(State1, Effects0),
- {State, [{monitor, process, Pid} | Effects], Res};
-apply(#{index := RaftIdx}, #purge{}, Effects0,
+ checkout(State1, [{monitor, process, Pid}]);
+apply(#{index := RaftIdx}, #purge{},
#state{consumers = Cons0, ra_indexes = Indexes } = State0) ->
Total = rabbit_fifo_index:size(Indexes),
- {State1, Effects1, _} =
+ {State1, Effects1} =
maps:fold(
fun(ConsumerId, C = #consumer{checked_out = Checked0},
- {StateAcc0, EffectsAcc0, ok}) ->
+ {StateAcc0, EffectsAcc0}) ->
MsgRaftIdxs = [RIdx || {_MsgInId, {RIdx, _}}
<- maps:values(Checked0)],
complete(ConsumerId, MsgRaftIdxs, maps:size(Checked0), C,
#{}, EffectsAcc0, StateAcc0)
- end, {State0, Effects0, ok}, Cons0),
- {State, Effects, _} =
+ end, {State0, []}, Cons0),
+ {State, _, Effects} =
update_smallest_raft_index(
RaftIdx, Indexes,
State1#state{ra_indexes = rabbit_fifo_index:empty(),
@@ -432,9 +433,12 @@ apply(#{index := RaftIdx}, #purge{}, Effects0,
msg_bytes_enqueue = 0,
msg_bytes_checkout = 0,
low_msg_num = undefined}, Effects1),
- {State, [garbage_collection | Effects], {purge, Total}};
+ %% as we're not checking out after a purge (no point) we have to
+ %% reverse the effects ourselves
+ {State, {purge, Total},
+ lists:reverse([garbage_collection | Effects])};
apply(_, {down, ConsumerPid, noconnection},
- Effects0, #state{consumers = Cons0,
+ #state{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
Node = node(ConsumerPid),
% mark all consumers and enqueuers as suspected down
@@ -461,14 +465,14 @@ apply(_, {down, ConsumerPid, noconnection},
end, Enqs0),
Effects = case maps:size(Cons) of
0 ->
- [{aux, inactive}, {monitor, node, Node} | Effects0];
+ [{aux, inactive}, {monitor, node, Node}];
_ ->
- [{monitor, node, Node} | Effects0]
+ [{monitor, node, Node}]
end,
- {State#state{consumers = Cons, enqueuers = Enqs}, Effects, ok};
-apply(_, {down, Pid, _Info}, Effects0,
- #state{consumers = Cons0,
- enqueuers = Enqs0} = State0) ->
+ %% TODO: should we run a checkout here?
+ {State#state{consumers = Cons, enqueuers = Enqs}, ok, Effects};
+apply(_, {down, Pid, _Info}, #state{consumers = Cons0,
+ enqueuers = Enqs0} = State0) ->
% Remove any enqueuer for the same pid and enqueue any pending messages
% This should be ok as we won't see any more enqueues from this pid
State1 = case maps:take(Pid, Enqs0) of
@@ -483,13 +487,12 @@ apply(_, {down, Pid, _Info}, Effects0,
% Find the consumers for the down pid
DownConsumers = maps:keys(
maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)),
- {Effects1, State2} = lists:foldl(fun cancel_consumer/2, {Effects0, State1},
+ {Effects1, State2} = lists:foldl(fun cancel_consumer/2, {[], State1},
DownConsumers),
checkout(State2, Effects1);
-apply(_, {nodeup, Node}, Effects0,
- #state{consumers = Cons0,
- enqueuers = Enqs0,
- service_queue = SQ0} = State0) ->
+apply(_, {nodeup, Node}, #state{consumers = Cons0,
+ enqueuers = Enqs0,
+ service_queue = SQ0} = State0) ->
%% A node we are monitoring has come back.
%% If we have suspected any processes of being
%% down we should now re-issue the monitors for them to detect if they're
@@ -517,14 +520,14 @@ apply(_, {nodeup, Node}, Effects0,
CAcc, SQAcc, EAcc);
(_, _, Acc) ->
Acc
- end, {Cons0, SQ0, Effects0}, Cons0),
+ end, {Cons0, SQ0, Monitors}, Cons0),
% TODO: avoid list concat
checkout(State0#state{consumers = Cons1, enqueuers = Enqs1,
- service_queue = SQ}, Monitors ++ Effects);
-apply(_, {nodedown, _Node}, Effects, State) ->
- {State, Effects, ok};
-apply(_, #update_config{config = Conf}, Effects, State) ->
- {update_config(Conf, State), Effects, ok}.
+ service_queue = SQ}, Effects);
+apply(_, {nodedown, _Node}, State) ->
+ {State, ok};
+apply(_, #update_config{config = Conf}, State) ->
+ {update_config(Conf, State), ok}.
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
state_enter(leader, #state{consumers = Cons,
@@ -711,14 +714,6 @@ cancel_consumer(ConsumerId,
{Effects0, S0}
end.
-incr_enqueue_count(#state{enqueue_count = C,
- shadow_copy_interval = C} = State0) ->
- % time to stash a dehydrated state version
- State = State0#state{enqueue_count = 0},
- {State, dehydrate_state(State)};
-incr_enqueue_count(#state{enqueue_count = C} = State) ->
- {State#state{enqueue_count = C + 1}, undefined}.
-
enqueue(RaftIdx, RawMsg, #state{messages = Messages,
low_msg_num = LowMsgNum,
next_msg_num = NextMsgNum} = State0) ->
@@ -729,11 +724,20 @@ enqueue(RaftIdx, RawMsg, #state{messages = Messages,
low_msg_num = min(LowMsgNum, NextMsgNum),
next_msg_num = NextMsgNum + 1}.
-append_to_master_index(RaftIdx,
+append_to_master_index(RaftIdx, Effects,
#state{ra_indexes = Indexes0} = State0) ->
{State, Shadow} = incr_enqueue_count(State0),
Indexes = rabbit_fifo_index:append(RaftIdx, Shadow, Indexes0),
- State#state{ra_indexes = Indexes}.
+ {State#state{ra_indexes = Indexes}, ok, Effects}.
+
+incr_enqueue_count(#state{enqueue_count = C,
+ shadow_copy_interval = C} = State0) ->
+ % time to stash a dehydrated state version
+ State = State0#state{enqueue_count = 0},
+ {State, dehydrate_state(State)};
+incr_enqueue_count(#state{enqueue_count = C} = State) ->
+ {State#state{enqueue_count = C + 1}, undefined}.
+
enqueue_pending(From,
#enqueuer{next_seqno = Next,
@@ -808,7 +812,7 @@ complete(ConsumerId, MsgRaftIdxs, NumDiscarded,
Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, MsgRaftIdxs),
{State0#state{consumers = Cons,
ra_indexes = Indexes,
- service_queue = SQ}, Effects, ok}.
+ service_queue = SQ}, Effects}.
increase_credit(#consumer{lifetime = once,
credit = Credit}, _) ->
@@ -835,10 +839,10 @@ complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId,
end, State0, maps:values(Discarded)),
%% need to pass the length of discarded as $prefix_msgs would be filtered
%% by the above list comprehension
- {State2, Effects1, _} = complete(ConsumerId, MsgRaftIdxs,
- maps:size(Discarded),
- Con0, Checked, Effects0, State1),
- {State, Effects, _} = checkout(State2, Effects1),
+ {State2, Effects1} = complete(ConsumerId, MsgRaftIdxs,
+ maps:size(Discarded),
+ Con0, Checked, Effects0, State1),
+ {State, ok, Effects} = checkout(State2, Effects1),
% settle metrics are incremented separately
update_smallest_raft_index(IncomingRaftIdx, Indexes0, State, Effects).
@@ -867,7 +871,7 @@ update_smallest_raft_index(IncomingRaftIdx, OldIndexes,
% there are no messages on queue anymore and no pending enqueues
% we can forward release_cursor all the way until
% the last received command
- {State, [{release_cursor, IncomingRaftIdx, State} | Effects], ok};
+ {State, ok, [{release_cursor, IncomingRaftIdx, State} | Effects]};
_ ->
NewSmallest = rabbit_fifo_index:smallest(Indexes),
% Take the smallest raft index available in the index when starting
@@ -876,15 +880,15 @@ update_smallest_raft_index(IncomingRaftIdx, OldIndexes,
{{Smallest, _}, {Smallest, _}} ->
% smallest has not changed, do not issue release cursor
% effects
- {State, Effects, ok};
+ {State, ok, Effects};
{_, {Smallest, Shadow}} when Shadow =/= undefined ->
% ?INFO("RELEASE ~w ~w ~w~n", [IncomingRaftIdx, Smallest,
% Shadow]),
- {State, [{release_cursor, Smallest, Shadow} | Effects], ok};
+ {State, ok, [{release_cursor, Smallest, Shadow}]};
_ -> % smallest
% no shadow taken for this index,
% no release cursor increase
- {State, Effects, ok}
+ {State, ok, Effects}
end
end.
@@ -914,6 +918,8 @@ return_all(State, Checked0) ->
return_one(MsgNum, Msg, S)
end, State, Checked).
+%% checkout new messages to consumers
+%% reverses the effects list
checkout(State, Effects) ->
checkout0(checkout_one(State), Effects, #{}).
@@ -925,10 +931,10 @@ checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, Acc0) ->
checkout0(checkout_one(State), Effects, Acc);
checkout0({inactive, State}, Effects0, Acc) ->
Effects = append_send_msg_effects(Effects0, Acc),
- {State, [{aux, inactive} | Effects], ok};
+ {State, ok, lists:reverse([{aux, inactive} | Effects])};
checkout0(State, Effects0, Acc) ->
Effects = append_send_msg_effects(Effects0, Acc),
- {State, Effects, ok}.
+ {State, ok, lists:reverse(Effects)}.
append_send_msg_effects(Effects, AccMap) when map_size(AccMap) == 0 ->
Effects;
@@ -1241,10 +1247,10 @@ enq_enq_checkout_test() ->
Cid = {<<"enq_enq_checkout_test">>, self()},
{State1, _} = enq(1, 1, first, test_init(test)),
{State2, _} = enq(2, 2, second, State1),
- {_State3, Effects, _} =
+ {_State3, _, Effects} =
apply(meta(3),
make_checkout(Cid, {once, 2, simple_prefetch}, #{}),
- [], State2),
+ State2),
?ASSERT_EFF({monitor, _, _}, Effects),
?ASSERT_EFF({send_msg, _, {delivery, _, _}, _}, Effects),
ok.
@@ -1253,9 +1259,8 @@ credit_enq_enq_checkout_settled_credit_test() ->
Cid = {?FUNCTION_NAME, self()},
{State1, _} = enq(1, 1, first, test_init(test)),
{State2, _} = enq(2, 2, second, State1),
- {State3, Effects, _} =
- apply(meta(3), make_checkout(Cid, {auto, 1, credited}, #{}),
- [], State2),
+ {State3, _, Effects} =
+ apply(meta(3), make_checkout(Cid, {auto, 1, credited}, #{}), State2),
?ASSERT_EFF({monitor, _, _}, Effects),
Deliveries = lists:filter(fun ({send_msg, _, {delivery, _, _}, _}) -> true;
(_) -> false
@@ -1286,12 +1291,12 @@ credit_with_drained_test() ->
%% checkout with a single credit
{State1, _, _} =
apply(meta(1), make_checkout(Cid, {auto, 1, credited},#{}),
- [], State0),
+ State0),
?assertMatch(#state{consumers = #{Cid := #consumer{credit = 1,
delivery_count = 0}}},
State1),
- {State, _Effs, Result} =
- apply(meta(3), make_credit(Cid, 0, 5, true), [], State1),
+ {State, Result, _} =
+ apply(meta(3), make_credit(Cid, 0, 5, true), State1),
?assertMatch(#state{consumers = #{Cid := #consumer{credit = 0,
delivery_count = 5}}},
State),
@@ -1305,14 +1310,14 @@ credit_and_drain_test() ->
{State1, _} = enq(1, 1, first, test_init(test)),
{State2, _} = enq(2, 2, second, State1),
%% checkout without any initial credit (like AMQP 1.0 would)
- {State3, CheckEffs, _} =
+ {State3, _, CheckEffs} =
apply(meta(3), make_checkout(Cid, {auto, 0, credited}, #{}),
- [], State2),
+ State2),
?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, CheckEffs),
- {State4, Effects, {multi, [{send_credit_reply, 0},
- {send_drained, [{?FUNCTION_NAME, 2}]}]}} =
- apply(meta(4), make_credit(Cid, 4, 0, true), [], State3),
+ {State4, {multi, [{send_credit_reply, 0},
+ {send_drained, [{?FUNCTION_NAME, 2}]}]},
+ Effects} = apply(meta(4), make_credit(Cid, 4, 0, true), State3),
?assertMatch(#state{consumers = #{Cid := #consumer{credit = 0,
delivery_count = 4}}},
State4),
@@ -1330,9 +1335,9 @@ enq_enq_deq_test() ->
{State1, _} = enq(1, 1, first, test_init(test)),
{State2, _} = enq(2, 2, second, State1),
% get returns a reply value
- {_State3, [{monitor, _, _}], {dequeue, {0, {_, first}}}} =
+ {_State3, {dequeue, {0, {_, first}}}, [{monitor, _, _}]} =
apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}),
- [], State2),
+ State2),
ok.
enq_enq_deq_deq_settle_test() ->
@@ -1340,39 +1345,38 @@ enq_enq_deq_deq_settle_test() ->
{State1, _} = enq(1, 1, first, test_init(test)),
{State2, _} = enq(2, 2, second, State1),
% get returns a reply value
- {State3, [{monitor, _, _}], {dequeue, {0, {_, first}}}} =
+ {State3, {dequeue, {0, {_, first}}}, [{monitor, _, _}]} =
apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}),
- [], State2),
- {_State4, _Effects4, {dequeue, empty}} =
+ State2),
+ {_State4, {dequeue, empty}, _} =
apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}),
- [], State3),
+ State3),
ok.
enq_enq_checkout_get_settled_test() ->
Cid = {?FUNCTION_NAME, self()},
{State1, _} = enq(1, 1, first, test_init(test)),
% get returns a reply value
- {_State2, _Effects, {dequeue, {0, {_, first}}}} =
+ {_State2, {dequeue, {0, {_, first}}}, _Effs} =
apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}),
- [], State1),
+ State1),
ok.
checkout_get_empty_test() ->
Cid = {?FUNCTION_NAME, self()},
State = test_init(test),
- {_State2, [], {dequeue, empty}} =
- apply(meta(1), make_checkout(Cid, {dequeue, unsettled}, #{}),
- [], State),
+ {_State2, {dequeue, empty}} =
+ apply(meta(1), make_checkout(Cid, {dequeue, unsettled}, #{}), State),
ok.
untracked_enq_deq_test() ->
Cid = {?FUNCTION_NAME, self()},
State0 = test_init(test),
{State1, _, _} = apply(meta(1),
- make_enqueue(undefined, undefined, first), [], State0),
- {_State2, _, {dequeue, {0, {_, first}}}} =
- apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}),
- [], State1),
+ make_enqueue(undefined, undefined, first),
+ State0),
+ {_State2, {dequeue, {0, {_, first}}}, _} =
+ apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State1),
ok.
release_cursor_test() ->
@@ -1445,18 +1449,18 @@ return_non_existent_test() ->
Cid = {<<"cid">>, self()},
{State0, [_, _Inactive]} = enq(1, 1, second, test_init(test)),
% return non-existent
- {_State2, [], _} = apply(meta(3), make_return(Cid, [99]), [], State0),
+ {_State2, _} = apply(meta(3), make_return(Cid, [99]), State0),
ok.
return_checked_out_test() ->
Cid = {<<"cid">>, self()},
{State0, [_, _]} = enq(1, 1, first, test_init(test)),
- {State1, [_Monitor, {aux, active},
- {send_msg, _, {delivery, _, [{MsgId, _}]}, _}]} =
- check(Cid, 2, State0),
+ {State1, [_Monitor,
+ {send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event},
+ {aux, active}
+ ]} = check(Cid, 2, State0),
% return
- {_State2, [_, _], _} = apply(meta(3), make_return(Cid, [MsgId]),
- [], State1),
+ {_State2, _, [_, _]} = apply(meta(3), make_return(Cid, [MsgId]), State1),
ok.
return_auto_checked_out_test() ->
@@ -1465,11 +1469,13 @@ return_auto_checked_out_test() ->
{State0, [_]} = enq(2, 2, second, State00),
% it first active then inactive as the consumer took on but cannot take
% any more
- {State1, [_Monitor, {aux, inactive}, {aux, active},
- {send_msg, _, {delivery, _, [{MsgId, _}]}, _} | _]} =
- check_auto(Cid, 2, State0),
+ {State1, [_Monitor,
+ {send_msg, _, {delivery, _, [{MsgId, _}]}, _},
+ {aux, active},
+ {aux, inactive}
+ ]} = check_auto(Cid, 2, State0),
% return should include another delivery
- {_State2, Effects, _} = apply(meta(3), make_return(Cid, [MsgId]), [], State1),
+ {_State2, _, Effects} = apply(meta(3), make_return(Cid, [MsgId]), State1),
?ASSERT_EFF({send_msg, _,
{delivery, _, [{_, {#{delivery_count := 1}, first}}]}, _},
Effects),
@@ -1482,22 +1488,21 @@ cancelled_checkout_out_test() ->
{State0, [_]} = enq(2, 2, second, State00),
{State1, _} = check_auto(Cid, 2, State0),
% cancelled checkout should return all pending messages to queue
- {State2, _, _} = apply(meta(3), make_checkout(Cid, cancel, #{}),
- [], State1),
+ {State2, _, _} = apply(meta(3), make_checkout(Cid, cancel, #{}), State1),
?assertEqual(2, maps:size(State2#state.messages)),
- {State3, _, {dequeue, {0, {_, first}}}} =
- apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), [], State2),
+ {State3, {dequeue, {0, {_, first}}}, _} =
+ apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State2),
?debugFmt("State3 ~p", [State3]),
- {_State, _, {dequeue, {_, {_, second}}}} =
- apply(meta(4), make_checkout(Cid, {dequeue, settled}, #{}), [], State3),
+ {_State, {dequeue, {_, {_, second}}}, _} =
+ apply(meta(4), make_checkout(Cid, {dequeue, settled}, #{}), State3),
ok.
down_with_noproc_consumer_returns_unsettled_test() ->
Cid = {<<"down_consumer_returns_unsettled_test">>, self()},
{State0, [_, _]} = enq(1, 1, second, test_init(test)),
{State1, [{monitor, process, Pid} | _]} = check(Cid, 2, State0),
- {State2, _, _} = apply(meta(3), {down, Pid, noproc}, [], State1),
+ {State2, _, _} = apply(meta(3), {down, Pid, noproc}, State1),
{_State, Effects} = check(Cid, 4, State2),
?ASSERT_EFF({monitor, process, _}, Effects),
ok.
@@ -1514,15 +1519,15 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test() ->
?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects1),
% monitor both enqueuer and consumer
% because we received a noconnection we now need to monitor the node
- {State2a, _Effects2a, _} = apply(meta(3), {down, Pid, noconnection}, [], State1),
+ {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1),
#consumer{credit = 1} = maps:get(Cid, State2a#state.consumers),
%% validate consumer has credit
- {State2, Effects2, _} = apply(meta(3), {down, Self, noconnection}, [], State2a),
+ {State2, _, Effects2} = apply(meta(3), {down, Self, noconnection}, State2a),
?ASSERT_EFF({monitor, node, _}, Effects2),
?assertNoEffect({demonitor, process, _}, Effects2),
% when the node comes up we need to retry the process monitors for the
% disconnected processes
- {_State3, Effects3, _} = apply(meta(3), {nodeup, Node}, [], State2),
+ {_State3, _, Effects3} = apply(meta(3), {nodeup, Node}, State2),
% try to re-monitor the suspect processes
?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects3),
?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3),
@@ -1537,7 +1542,7 @@ down_with_noconnection_returns_unack_test() ->
{State1, {_, _}} = deq(2, Cid, unsettled, State0),
?assertEqual(0, maps:size(State1#state.messages)),
?assertEqual(0, lqueue:len(State1#state.returns)),
- {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, [], State1),
+ {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1),
?assertEqual(1, maps:size(State2a#state.messages)),
?assertEqual(1, lqueue:len(State2a#state.returns)),
ok.
@@ -1545,9 +1550,9 @@ down_with_noconnection_returns_unack_test() ->
down_with_noproc_enqueuer_is_cleaned_up_test() ->
State00 = test_init(test),
Pid = spawn(fun() -> ok end),
- {State0, Effects0, _} = apply(meta(1), {enqueue, Pid, 1, first}, [], State00),
+ {State0, _, Effects0} = apply(meta(1), {enqueue, Pid, 1, first}, State00),
?ASSERT_EFF({monitor, process, _}, Effects0),
- {State1, _Effects1, _} = apply(meta(3), {down, Pid, noproc}, [], State0),
+ {State1, _, _} = apply(meta(3), {down, Pid, noproc}, State0),
% ensure there are no enqueuers
?assert(0 =:= maps:size(State1#state.enqueuers)),
ok.
@@ -1569,7 +1574,7 @@ discarded_message_without_dead_letter_handler_is_removed_test() ->
?ASSERT_EFF({send_msg, _,
{delivery, _, [{0, {#{}, first}}]}, _},
Effects1),
- {_State2, Effects2, _} = apply(meta(1), make_discard(Cid, [0]), [], State1),
+ {_State2, _, Effects2} = apply(meta(1), make_discard(Cid, [0]), State1),
?assertNoEffect({send_msg, _,
{delivery, _, [{0, {#{}, first}}]}, _},
Effects2),
@@ -1586,8 +1591,7 @@ discarded_message_with_dead_letter_handler_emits_mod_call_effect_test() ->
?ASSERT_EFF({send_msg, _,
{delivery, _, [{0, {#{}, first}}]}, _},
Effects1),
- {_State2, Effects2, _} = apply(meta(1), make_discard(Cid, [0]),
- [], State1),
+ {_State2, _, Effects2} = apply(meta(1), make_discard(Cid, [0]), State1),
% assert mod call effect with appended reason and message
?ASSERT_EFF({mod_call, somemod, somefun, [somearg, [{rejected, first}]]},
Effects2),
@@ -1600,7 +1604,7 @@ tick_test() ->
{S1, _} = enq(2, 2, <<"snd">>, S0),
{S2, {MsgId, _}} = deq(3, Cid, unsettled, S1),
{S3, {_, _}} = deq(4, Cid2, unsettled, S2),
- {S4, _, _} = apply(meta(5), make_return(Cid, [MsgId]), [], S3),
+ {S4, _, _} = apply(meta(5), make_return(Cid, [MsgId]), S3),
[{mod_call, _, _,
[#resource{},
@@ -1776,8 +1780,8 @@ run_snapshot_test0(Name, Commands) ->
?debugFmt("running from snapshot: ~b", [SnapIdx]),
{S, _} = run_log(SnapState, Filtered),
% assert log can be restored from any release cursor index
- % ?debugFmt("Name ~p Idx ~p S~p~nState~p~nSnapState ~p~nFiltered ~p~n",
- % [Name, SnapIdx, S, State, SnapState, Filtered]),
+ ?debugFmt("Name ~p~nS~p~nState~p~nn",
+ [Name, S, State]),
?assertEqual(State, S)
end || {release_cursor, SnapIdx, SnapState} <- Effects],
ok.
@@ -1810,10 +1814,9 @@ pending_enqueue_is_enqueued_on_down_test() ->
Cid = {<<"cid">>, self()},
Pid = self(),
{State0, _} = enq(1, 2, first, test_init(test)),
- {State1, _, _} = apply(meta(2), {down, Pid, noproc}, [], State0),
- {_State2, _, {dequeue, {0, {_, first}}}} =
- apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}),
- [], State1),
+ {State1, _, _} = apply(meta(2), {down, Pid, noproc}, State0),
+ {_State2, {dequeue, {0, {_, first}}}, _} =
+ apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State1),
ok.
duplicate_delivery_test() ->
@@ -1858,11 +1861,11 @@ state_enter_montors_and_notifications_test() ->
purge_test() ->
Cid = {<<"purge_test">>, self()},
{State1, _} = enq(1, 1, first, test_init(test)),
- {State2, _, {purge, 1}} = apply(meta(2), make_purge(), [], State1),
+ {State2, {purge, 1}, _} = apply(meta(2), make_purge(), State1),
{State3, _} = enq(3, 2, second, State2),
% get returns a reply value
- {_State4, [{monitor, _, _}], {dequeue, {0, {_, second}}}} =
- apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}), [], State3),
+ {_State4, {dequeue, {0, {_, second}}}, [{monitor, _, _}]} =
+ apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}), State3),
ok.
purge_with_checkout_test() ->
@@ -1873,7 +1876,7 @@ purge_with_checkout_test() ->
%% assert message bytes are non zero
?assert(State2#state.msg_bytes_checkout > 0),
?assert(State2#state.msg_bytes_enqueue > 0),
- {State3, _, {purge, 2}} = apply(meta(2), make_purge(), [], State2),
+ {State3, {purge, 2}, _} = apply(meta(2), make_purge(), State2),
?assertEqual(0, State3#state.msg_bytes_checkout),
?assertEqual(0, State3#state.msg_bytes_enqueue),
#consumer{checked_out = Checked} = maps:get(Cid, State3#state.consumers),
@@ -1893,7 +1896,7 @@ down_returns_checked_out_in_order_test() ->
#consumer{checked_out = Checked} = maps:get(Cid, S2#state.consumers),
?assertEqual(100, maps:size(Checked)),
%% simulate down
- {S, _, _} = apply(meta(102), {down, self(), noproc}, [], S2),
+ {S, _, _} = apply(meta(102), {down, self(), noproc}, S2),
Returns = lqueue:to_list(S#state.returns),
?assertEqual(100, length(Returns)),
%% validate returns are in order
@@ -1905,54 +1908,58 @@ meta(Idx) ->
enq(Idx, MsgSeq, Msg, State) ->
strip_reply(
- apply(meta(Idx), make_enqueue(self(), MsgSeq, Msg), [], State)).
+ apply(meta(Idx), make_enqueue(self(), MsgSeq, Msg), State)).
deq(Idx, Cid, Settlement, State0) ->
- {State, _, {dequeue, Msg}} =
+ {State, {dequeue, Msg}, _} =
apply(meta(Idx),
- make_checkout(Cid, {dequeue, Settlement}, #{}),
- [], State0),
+ make_checkout(Cid, {dequeue, Settlement}, #{}),
+ State0),
{State, Msg}.
check_n(Cid, Idx, N, State) ->
strip_reply(
apply(meta(Idx),
make_checkout(Cid, {auto, N, simple_prefetch}, #{}),
- [], State)).
+ State)).
check(Cid, Idx, State) ->
strip_reply(
apply(meta(Idx),
make_checkout(Cid, {once, 1, simple_prefetch}, #{}),
- [], State)).
+ State)).
check_auto(Cid, Idx, State) ->
strip_reply(
apply(meta(Idx),
make_checkout(Cid, {auto, 1, simple_prefetch}, #{}),
- [], State)).
+ State)).
check(Cid, Idx, Num, State) ->
strip_reply(
apply(meta(Idx),
make_checkout(Cid, {auto, Num, simple_prefetch}, #{}),
- [], State)).
+ State)).
settle(Cid, Idx, MsgId, State) ->
- strip_reply(apply(meta(Idx), make_settle(Cid, [MsgId]), [], State)).
+ strip_reply(apply(meta(Idx), make_settle(Cid, [MsgId]), State)).
credit(Cid, Idx, Credit, DelCnt, Drain, State) ->
strip_reply(apply(meta(Idx), make_credit(Cid, Credit, DelCnt, Drain),
- [], State)).
+ State)).
-strip_reply({State, Effects, _Reply}) ->
+strip_reply({State, _, Effects}) ->
{State, Effects}.
run_log(InitState, Entries) ->
lists:foldl(fun ({Idx, E}, {Acc0, Efx0}) ->
- case apply(meta(Idx), E, Efx0, Acc0) of
- {Acc, Efx, _} ->
- {Acc, Efx}
+ case apply(meta(Idx), E, Acc0) of
+ {Acc, _, Efx} when is_list(Efx) ->
+ {Acc, Efx0 ++ Efx};
+ {Acc, _, Efx} ->
+ {Acc, Efx0 ++ [Efx]};
+ {Acc, _} ->
+ {Acc, Efx0}
end
end, {InitState, []}, Entries).
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 53dd2e2a7d..c43ef4dfd4 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -157,13 +157,15 @@ cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
case Node == node() of
true -> cancel_consumer(QName, ChPid, ConsumerTag);
false ->
+ %% this could potentially block for a while if the node is
+ %% in disconnected state or tcp buffers are full
rpc:cast(Node, rabbit_quorum_queue,
cancel_consumer,
[QName, ChPid, ConsumerTag])
end.
cancel_consumer(QName, ChPid, ConsumerTag) ->
- rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName),
+ catch rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName),
rabbit_event:notify(consumer_deleted,
[{consumer_tag, ConsumerTag},
{channel, ChPid},