diff options
| author | kjnilsson <knilsson@pivotal.io> | 2018-12-20 16:21:39 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2018-12-20 16:21:39 +0000 |
| commit | b20ad12d7b2f9769493fbbbb35eb672b0bd75343 (patch) | |
| tree | 4582665f80f20ae6881126d8f301b1993ca2e77a /src | |
| parent | 794ef8de24ec261d1f3217f0a1ccfae98c8c6873 (diff) | |
| download | rabbitmq-server-git-b20ad12d7b2f9769493fbbbb35eb672b0bd75343.tar.gz | |
rabbit_fifo: apply/4 -> apply/3
Support the latest Ra api changes.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 359 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 4 |
2 files changed, 186 insertions, 177 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 1fc467913e..579af38bb3 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -20,13 +20,14 @@ -compile(inline_list_funcs). -compile(inline). +-compile({no_auto_import, [apply/3]}). -include_lib("ra/include/ra.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). -export([ init/1, - apply/4, + apply/3, state_enter/2, tick/2, overview/1, @@ -277,55 +278,58 @@ zero(_) -> % msg_ids are scoped per consumer % ra_indexes holds all raft indexes for enqueues currently on queue -spec apply(ra_machine:command_meta_data(), command(), - ra_machine:effects(), state()) -> - {state(), ra_machine:effects(), Reply :: term()}. + state()) -> + {state(), Reply :: term(), ra_machine:effects()}. apply(#{index := RaftIdx}, #enqueue{pid = From, seq = Seq, - msg = RawMsg}, Effects0, State00) -> - case maybe_enqueue(RaftIdx, From, Seq, RawMsg, Effects0, State00) of + msg = RawMsg}, State00) -> + case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State00) of {ok, State0, Effects1} -> - {State, Effects, ok} = checkout(add_bytes_enqueue(RawMsg, State0), - Effects1), - {append_to_master_index(RaftIdx, State), Effects, ok}; + %% need to checkout before capturing the shadow copy else + %% snapshots may not be complete + {State, ok, Effects} = checkout( + add_bytes_enqueue(RawMsg, State0), + Effects1), + append_to_master_index(RaftIdx, Effects, State); {duplicate, State, Effects} -> - {State, Effects, ok} + {State, ok, lists:reverse(Effects)} end; apply(#{index := RaftIdx}, - #settle{msg_ids = MsgIds, consumer_id = ConsumerId}, Effects0, + #settle{msg_ids = MsgIds, consumer_id = ConsumerId}, #state{consumers = Cons0} = State) -> case Cons0 of #{ConsumerId := Con0} -> % need to increment metrics before completing as any snapshot % states taken need to includ them complete_and_checkout(RaftIdx, MsgIds, ConsumerId, - Con0, Effects0, State); + Con0, [], State); _ -> - {State, Effects0, ok} + {State, ok} + end; apply(#{index := RaftIdx}, #discard{msg_ids = MsgIds, consumer_id = ConsumerId}, - Effects0, #state{consumers = Cons0} = State0) -> + #state{consumers = Cons0} = State0) -> case Cons0 of #{ConsumerId := Con0} -> - {State, Effects, Res} = complete_and_checkout(RaftIdx, MsgIds, - ConsumerId, Con0, - Effects0, State0), Discarded = maps:with(MsgIds, Con0#consumer.checked_out), - {State, dead_letter_effects(Discarded, State, Effects), Res}; + Effects = dead_letter_effects(Discarded, State0, []), + complete_and_checkout(RaftIdx, MsgIds, ConsumerId, Con0, + Effects, State0); _ -> - {State0, Effects0, ok} + {State0, ok} end; -apply(_, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, Effects0, +apply(_, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, #state{consumers = Cons0} = State) -> case Cons0 of #{ConsumerId := Con0 = #consumer{checked_out = Checked0}} -> Checked = maps:without(MsgIds, Checked0), Returned = maps:with(MsgIds, Checked0), MsgNumMsgs = maps:values(Returned), - return(ConsumerId, MsgNumMsgs, Con0, Checked, Effects0, State); + return(ConsumerId, MsgNumMsgs, Con0, Checked, [], State); _ -> - {State, Effects0, ok} + {State, ok} end; apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, - drain = Drain, consumer_id = ConsumerId}, Effects0, + drain = Drain, consumer_id = ConsumerId}, #state{consumers = Cons0, service_queue = ServiceQueue0} = State0) -> case Cons0 of @@ -337,16 +341,16 @@ apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, ServiceQueue = maybe_queue_consumer(ConsumerId, Con1, ServiceQueue0), Cons = maps:put(ConsumerId, Con1, Cons0), - {State1, Effects, ok} = + {State1, ok, Effects} = checkout(State0#state{service_queue = ServiceQueue, - consumers = Cons}, Effects0), + consumers = Cons}, []), Response = {send_credit_reply, maps:size(State1#state.messages)}, %% by this point all checkouts for the updated credit value %% should be processed so we can evaluate the drain case Drain of false -> %% just return the result of the checkout - {State1, Effects, Response}; + {State1, Response, Effects}; true -> Con = #consumer{credit = PostCred} = maps:get(ConsumerId, State1#state.consumers), @@ -359,71 +363,68 @@ apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, Drained = Con#consumer.credit, {CTag, _} = ConsumerId, {State1#state{consumers = Consumers}, - Effects, %% returning a multi response with two client actions %% for the channel to execute - {multi, [Response, {send_drained, [{CTag, Drained}]}]}} + {multi, [Response, {send_drained, [{CTag, Drained}]}]}, + Effects} end; _ -> %% credit for unknown consumer - just ignore - {State0, Effects0, ok} + {State0, ok} end; -apply(_, #checkout{spec = {dequeue, _}}, Effects0, +apply(_, #checkout{spec = {dequeue, _}}, #state{messages = M, prefix_msg_counts = {0, 0}} = State0) when map_size(M) == 0 -> %% FIXME: also check if there are returned messages %% TODO do we need metric visibility of empty get requests? - {State0, Effects0, {dequeue, empty}}; + {State0, {dequeue, empty}}; apply(Meta, #checkout{spec = {dequeue, settled}, meta = ConsumerMeta, consumer_id = ConsumerId}, - Effects0, State0) -> + State0) -> % TODO: this clause could probably be optimised State1 = update_consumer(ConsumerId, ConsumerMeta, {once, 1, simple_prefetch}, State0), % turn send msg effect into reply {success, _, MsgId, Msg, State2} = checkout_one(State1), % immediately settle - {State, Effects, _} = apply(Meta, make_settle(ConsumerId, [MsgId]), - Effects0, State2), - {State, Effects, {dequeue, {MsgId, Msg}}}; + {State, _, Effects} = apply(Meta, make_settle(ConsumerId, [MsgId]), State2), + {State, {dequeue, {MsgId, Msg}}, Effects}; apply(_, #checkout{spec = {dequeue, unsettled}, meta = ConsumerMeta, consumer_id = {_, Pid} = ConsumerId}, - Effects0, State0) -> + State0) -> State1 = update_consumer(ConsumerId, ConsumerMeta, {once, 1, simple_prefetch}, State0), - Effects1 = [{monitor, process, Pid} | Effects0], - {State, Reply, Effects} = case checkout_one(State1) of - {success, _, MsgId, Msg, S} -> - {S, {MsgId, Msg}, Effects1}; - {inactive, S} -> - {S, empty, [{aux, inactive} | Effects1]}; - S -> - {S, empty, Effects1} - end, - {State, Effects, {dequeue, Reply}}; -apply(_, #checkout{spec = cancel, consumer_id = ConsumerId}, Effects0, State0) -> - {CancelEffects, State1} = cancel_consumer(ConsumerId, {Effects0, State0}), + case checkout_one(State1) of + {success, _, MsgId, Msg, S} -> + {S, {dequeue, {MsgId, Msg}}, [{monitor, process, Pid}]}; + {inactive, S} -> + {S, {dequeue, empty}, [{aux, inactive}]}; + S -> + {S, {dequeue, empty}} + end; +apply(_, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) -> + {CancelEffects, State1} = cancel_consumer(ConsumerId, {[], State0}), % TODO: here we should really demonitor the pid but _only_ if it has no % other consumers or enqueuers. checkout(State1, CancelEffects); -apply(_, #checkout{spec = Spec, meta = Meta, consumer_id = {_, Pid} = ConsumerId}, - Effects0, State0) -> +apply(_, #checkout{spec = Spec, meta = Meta, + consumer_id = {_, Pid} = ConsumerId}, + State0) -> State1 = update_consumer(ConsumerId, Meta, Spec, State0), - {State, Effects, Res} = checkout(State1, Effects0), - {State, [{monitor, process, Pid} | Effects], Res}; -apply(#{index := RaftIdx}, #purge{}, Effects0, + checkout(State1, [{monitor, process, Pid}]); +apply(#{index := RaftIdx}, #purge{}, #state{consumers = Cons0, ra_indexes = Indexes } = State0) -> Total = rabbit_fifo_index:size(Indexes), - {State1, Effects1, _} = + {State1, Effects1} = maps:fold( fun(ConsumerId, C = #consumer{checked_out = Checked0}, - {StateAcc0, EffectsAcc0, ok}) -> + {StateAcc0, EffectsAcc0}) -> MsgRaftIdxs = [RIdx || {_MsgInId, {RIdx, _}} <- maps:values(Checked0)], complete(ConsumerId, MsgRaftIdxs, maps:size(Checked0), C, #{}, EffectsAcc0, StateAcc0) - end, {State0, Effects0, ok}, Cons0), - {State, Effects, _} = + end, {State0, []}, Cons0), + {State, _, Effects} = update_smallest_raft_index( RaftIdx, Indexes, State1#state{ra_indexes = rabbit_fifo_index:empty(), @@ -432,9 +433,12 @@ apply(#{index := RaftIdx}, #purge{}, Effects0, msg_bytes_enqueue = 0, msg_bytes_checkout = 0, low_msg_num = undefined}, Effects1), - {State, [garbage_collection | Effects], {purge, Total}}; + %% as we're not checking out after a purge (no point) we have to + %% reverse the effects ourselves + {State, {purge, Total}, + lists:reverse([garbage_collection | Effects])}; apply(_, {down, ConsumerPid, noconnection}, - Effects0, #state{consumers = Cons0, + #state{consumers = Cons0, enqueuers = Enqs0} = State0) -> Node = node(ConsumerPid), % mark all consumers and enqueuers as suspected down @@ -461,14 +465,14 @@ apply(_, {down, ConsumerPid, noconnection}, end, Enqs0), Effects = case maps:size(Cons) of 0 -> - [{aux, inactive}, {monitor, node, Node} | Effects0]; + [{aux, inactive}, {monitor, node, Node}]; _ -> - [{monitor, node, Node} | Effects0] + [{monitor, node, Node}] end, - {State#state{consumers = Cons, enqueuers = Enqs}, Effects, ok}; -apply(_, {down, Pid, _Info}, Effects0, - #state{consumers = Cons0, - enqueuers = Enqs0} = State0) -> + %% TODO: should we run a checkout here? + {State#state{consumers = Cons, enqueuers = Enqs}, ok, Effects}; +apply(_, {down, Pid, _Info}, #state{consumers = Cons0, + enqueuers = Enqs0} = State0) -> % Remove any enqueuer for the same pid and enqueue any pending messages % This should be ok as we won't see any more enqueues from this pid State1 = case maps:take(Pid, Enqs0) of @@ -483,13 +487,12 @@ apply(_, {down, Pid, _Info}, Effects0, % Find the consumers for the down pid DownConsumers = maps:keys( maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)), - {Effects1, State2} = lists:foldl(fun cancel_consumer/2, {Effects0, State1}, + {Effects1, State2} = lists:foldl(fun cancel_consumer/2, {[], State1}, DownConsumers), checkout(State2, Effects1); -apply(_, {nodeup, Node}, Effects0, - #state{consumers = Cons0, - enqueuers = Enqs0, - service_queue = SQ0} = State0) -> +apply(_, {nodeup, Node}, #state{consumers = Cons0, + enqueuers = Enqs0, + service_queue = SQ0} = State0) -> %% A node we are monitoring has come back. %% If we have suspected any processes of being %% down we should now re-issue the monitors for them to detect if they're @@ -517,14 +520,14 @@ apply(_, {nodeup, Node}, Effects0, CAcc, SQAcc, EAcc); (_, _, Acc) -> Acc - end, {Cons0, SQ0, Effects0}, Cons0), + end, {Cons0, SQ0, Monitors}, Cons0), % TODO: avoid list concat checkout(State0#state{consumers = Cons1, enqueuers = Enqs1, - service_queue = SQ}, Monitors ++ Effects); -apply(_, {nodedown, _Node}, Effects, State) -> - {State, Effects, ok}; -apply(_, #update_config{config = Conf}, Effects, State) -> - {update_config(Conf, State), Effects, ok}. + service_queue = SQ}, Effects); +apply(_, {nodedown, _Node}, State) -> + {State, ok}; +apply(_, #update_config{config = Conf}, State) -> + {update_config(Conf, State), ok}. -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). state_enter(leader, #state{consumers = Cons, @@ -711,14 +714,6 @@ cancel_consumer(ConsumerId, {Effects0, S0} end. -incr_enqueue_count(#state{enqueue_count = C, - shadow_copy_interval = C} = State0) -> - % time to stash a dehydrated state version - State = State0#state{enqueue_count = 0}, - {State, dehydrate_state(State)}; -incr_enqueue_count(#state{enqueue_count = C} = State) -> - {State#state{enqueue_count = C + 1}, undefined}. - enqueue(RaftIdx, RawMsg, #state{messages = Messages, low_msg_num = LowMsgNum, next_msg_num = NextMsgNum} = State0) -> @@ -729,11 +724,20 @@ enqueue(RaftIdx, RawMsg, #state{messages = Messages, low_msg_num = min(LowMsgNum, NextMsgNum), next_msg_num = NextMsgNum + 1}. -append_to_master_index(RaftIdx, +append_to_master_index(RaftIdx, Effects, #state{ra_indexes = Indexes0} = State0) -> {State, Shadow} = incr_enqueue_count(State0), Indexes = rabbit_fifo_index:append(RaftIdx, Shadow, Indexes0), - State#state{ra_indexes = Indexes}. + {State#state{ra_indexes = Indexes}, ok, Effects}. + +incr_enqueue_count(#state{enqueue_count = C, + shadow_copy_interval = C} = State0) -> + % time to stash a dehydrated state version + State = State0#state{enqueue_count = 0}, + {State, dehydrate_state(State)}; +incr_enqueue_count(#state{enqueue_count = C} = State) -> + {State#state{enqueue_count = C + 1}, undefined}. + enqueue_pending(From, #enqueuer{next_seqno = Next, @@ -808,7 +812,7 @@ complete(ConsumerId, MsgRaftIdxs, NumDiscarded, Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, MsgRaftIdxs), {State0#state{consumers = Cons, ra_indexes = Indexes, - service_queue = SQ}, Effects, ok}. + service_queue = SQ}, Effects}. increase_credit(#consumer{lifetime = once, credit = Credit}, _) -> @@ -835,10 +839,10 @@ complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId, end, State0, maps:values(Discarded)), %% need to pass the length of discarded as $prefix_msgs would be filtered %% by the above list comprehension - {State2, Effects1, _} = complete(ConsumerId, MsgRaftIdxs, - maps:size(Discarded), - Con0, Checked, Effects0, State1), - {State, Effects, _} = checkout(State2, Effects1), + {State2, Effects1} = complete(ConsumerId, MsgRaftIdxs, + maps:size(Discarded), + Con0, Checked, Effects0, State1), + {State, ok, Effects} = checkout(State2, Effects1), % settle metrics are incremented separately update_smallest_raft_index(IncomingRaftIdx, Indexes0, State, Effects). @@ -867,7 +871,7 @@ update_smallest_raft_index(IncomingRaftIdx, OldIndexes, % there are no messages on queue anymore and no pending enqueues % we can forward release_cursor all the way until % the last received command - {State, [{release_cursor, IncomingRaftIdx, State} | Effects], ok}; + {State, ok, [{release_cursor, IncomingRaftIdx, State} | Effects]}; _ -> NewSmallest = rabbit_fifo_index:smallest(Indexes), % Take the smallest raft index available in the index when starting @@ -876,15 +880,15 @@ update_smallest_raft_index(IncomingRaftIdx, OldIndexes, {{Smallest, _}, {Smallest, _}} -> % smallest has not changed, do not issue release cursor % effects - {State, Effects, ok}; + {State, ok, Effects}; {_, {Smallest, Shadow}} when Shadow =/= undefined -> % ?INFO("RELEASE ~w ~w ~w~n", [IncomingRaftIdx, Smallest, % Shadow]), - {State, [{release_cursor, Smallest, Shadow} | Effects], ok}; + {State, ok, [{release_cursor, Smallest, Shadow}]}; _ -> % smallest % no shadow taken for this index, % no release cursor increase - {State, Effects, ok} + {State, ok, Effects} end end. @@ -914,6 +918,8 @@ return_all(State, Checked0) -> return_one(MsgNum, Msg, S) end, State, Checked). +%% checkout new messages to consumers +%% reverses the effects list checkout(State, Effects) -> checkout0(checkout_one(State), Effects, #{}). @@ -925,10 +931,10 @@ checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, Acc0) -> checkout0(checkout_one(State), Effects, Acc); checkout0({inactive, State}, Effects0, Acc) -> Effects = append_send_msg_effects(Effects0, Acc), - {State, [{aux, inactive} | Effects], ok}; + {State, ok, lists:reverse([{aux, inactive} | Effects])}; checkout0(State, Effects0, Acc) -> Effects = append_send_msg_effects(Effects0, Acc), - {State, Effects, ok}. + {State, ok, lists:reverse(Effects)}. append_send_msg_effects(Effects, AccMap) when map_size(AccMap) == 0 -> Effects; @@ -1241,10 +1247,10 @@ enq_enq_checkout_test() -> Cid = {<<"enq_enq_checkout_test">>, self()}, {State1, _} = enq(1, 1, first, test_init(test)), {State2, _} = enq(2, 2, second, State1), - {_State3, Effects, _} = + {_State3, _, Effects} = apply(meta(3), make_checkout(Cid, {once, 2, simple_prefetch}, #{}), - [], State2), + State2), ?ASSERT_EFF({monitor, _, _}, Effects), ?ASSERT_EFF({send_msg, _, {delivery, _, _}, _}, Effects), ok. @@ -1253,9 +1259,8 @@ credit_enq_enq_checkout_settled_credit_test() -> Cid = {?FUNCTION_NAME, self()}, {State1, _} = enq(1, 1, first, test_init(test)), {State2, _} = enq(2, 2, second, State1), - {State3, Effects, _} = - apply(meta(3), make_checkout(Cid, {auto, 1, credited}, #{}), - [], State2), + {State3, _, Effects} = + apply(meta(3), make_checkout(Cid, {auto, 1, credited}, #{}), State2), ?ASSERT_EFF({monitor, _, _}, Effects), Deliveries = lists:filter(fun ({send_msg, _, {delivery, _, _}, _}) -> true; (_) -> false @@ -1286,12 +1291,12 @@ credit_with_drained_test() -> %% checkout with a single credit {State1, _, _} = apply(meta(1), make_checkout(Cid, {auto, 1, credited},#{}), - [], State0), + State0), ?assertMatch(#state{consumers = #{Cid := #consumer{credit = 1, delivery_count = 0}}}, State1), - {State, _Effs, Result} = - apply(meta(3), make_credit(Cid, 0, 5, true), [], State1), + {State, Result, _} = + apply(meta(3), make_credit(Cid, 0, 5, true), State1), ?assertMatch(#state{consumers = #{Cid := #consumer{credit = 0, delivery_count = 5}}}, State), @@ -1305,14 +1310,14 @@ credit_and_drain_test() -> {State1, _} = enq(1, 1, first, test_init(test)), {State2, _} = enq(2, 2, second, State1), %% checkout without any initial credit (like AMQP 1.0 would) - {State3, CheckEffs, _} = + {State3, _, CheckEffs} = apply(meta(3), make_checkout(Cid, {auto, 0, credited}, #{}), - [], State2), + State2), ?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, CheckEffs), - {State4, Effects, {multi, [{send_credit_reply, 0}, - {send_drained, [{?FUNCTION_NAME, 2}]}]}} = - apply(meta(4), make_credit(Cid, 4, 0, true), [], State3), + {State4, {multi, [{send_credit_reply, 0}, + {send_drained, [{?FUNCTION_NAME, 2}]}]}, + Effects} = apply(meta(4), make_credit(Cid, 4, 0, true), State3), ?assertMatch(#state{consumers = #{Cid := #consumer{credit = 0, delivery_count = 4}}}, State4), @@ -1330,9 +1335,9 @@ enq_enq_deq_test() -> {State1, _} = enq(1, 1, first, test_init(test)), {State2, _} = enq(2, 2, second, State1), % get returns a reply value - {_State3, [{monitor, _, _}], {dequeue, {0, {_, first}}}} = + {_State3, {dequeue, {0, {_, first}}}, [{monitor, _, _}]} = apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}), - [], State2), + State2), ok. enq_enq_deq_deq_settle_test() -> @@ -1340,39 +1345,38 @@ enq_enq_deq_deq_settle_test() -> {State1, _} = enq(1, 1, first, test_init(test)), {State2, _} = enq(2, 2, second, State1), % get returns a reply value - {State3, [{monitor, _, _}], {dequeue, {0, {_, first}}}} = + {State3, {dequeue, {0, {_, first}}}, [{monitor, _, _}]} = apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}), - [], State2), - {_State4, _Effects4, {dequeue, empty}} = + State2), + {_State4, {dequeue, empty}, _} = apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}), - [], State3), + State3), ok. enq_enq_checkout_get_settled_test() -> Cid = {?FUNCTION_NAME, self()}, {State1, _} = enq(1, 1, first, test_init(test)), % get returns a reply value - {_State2, _Effects, {dequeue, {0, {_, first}}}} = + {_State2, {dequeue, {0, {_, first}}}, _Effs} = apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), - [], State1), + State1), ok. checkout_get_empty_test() -> Cid = {?FUNCTION_NAME, self()}, State = test_init(test), - {_State2, [], {dequeue, empty}} = - apply(meta(1), make_checkout(Cid, {dequeue, unsettled}, #{}), - [], State), + {_State2, {dequeue, empty}} = + apply(meta(1), make_checkout(Cid, {dequeue, unsettled}, #{}), State), ok. untracked_enq_deq_test() -> Cid = {?FUNCTION_NAME, self()}, State0 = test_init(test), {State1, _, _} = apply(meta(1), - make_enqueue(undefined, undefined, first), [], State0), - {_State2, _, {dequeue, {0, {_, first}}}} = - apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), - [], State1), + make_enqueue(undefined, undefined, first), + State0), + {_State2, {dequeue, {0, {_, first}}}, _} = + apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State1), ok. release_cursor_test() -> @@ -1445,18 +1449,18 @@ return_non_existent_test() -> Cid = {<<"cid">>, self()}, {State0, [_, _Inactive]} = enq(1, 1, second, test_init(test)), % return non-existent - {_State2, [], _} = apply(meta(3), make_return(Cid, [99]), [], State0), + {_State2, _} = apply(meta(3), make_return(Cid, [99]), State0), ok. return_checked_out_test() -> Cid = {<<"cid">>, self()}, {State0, [_, _]} = enq(1, 1, first, test_init(test)), - {State1, [_Monitor, {aux, active}, - {send_msg, _, {delivery, _, [{MsgId, _}]}, _}]} = - check(Cid, 2, State0), + {State1, [_Monitor, + {send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event}, + {aux, active} + ]} = check(Cid, 2, State0), % return - {_State2, [_, _], _} = apply(meta(3), make_return(Cid, [MsgId]), - [], State1), + {_State2, _, [_, _]} = apply(meta(3), make_return(Cid, [MsgId]), State1), ok. return_auto_checked_out_test() -> @@ -1465,11 +1469,13 @@ return_auto_checked_out_test() -> {State0, [_]} = enq(2, 2, second, State00), % it first active then inactive as the consumer took on but cannot take % any more - {State1, [_Monitor, {aux, inactive}, {aux, active}, - {send_msg, _, {delivery, _, [{MsgId, _}]}, _} | _]} = - check_auto(Cid, 2, State0), + {State1, [_Monitor, + {send_msg, _, {delivery, _, [{MsgId, _}]}, _}, + {aux, active}, + {aux, inactive} + ]} = check_auto(Cid, 2, State0), % return should include another delivery - {_State2, Effects, _} = apply(meta(3), make_return(Cid, [MsgId]), [], State1), + {_State2, _, Effects} = apply(meta(3), make_return(Cid, [MsgId]), State1), ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {#{delivery_count := 1}, first}}]}, _}, Effects), @@ -1482,22 +1488,21 @@ cancelled_checkout_out_test() -> {State0, [_]} = enq(2, 2, second, State00), {State1, _} = check_auto(Cid, 2, State0), % cancelled checkout should return all pending messages to queue - {State2, _, _} = apply(meta(3), make_checkout(Cid, cancel, #{}), - [], State1), + {State2, _, _} = apply(meta(3), make_checkout(Cid, cancel, #{}), State1), ?assertEqual(2, maps:size(State2#state.messages)), - {State3, _, {dequeue, {0, {_, first}}}} = - apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), [], State2), + {State3, {dequeue, {0, {_, first}}}, _} = + apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State2), ?debugFmt("State3 ~p", [State3]), - {_State, _, {dequeue, {_, {_, second}}}} = - apply(meta(4), make_checkout(Cid, {dequeue, settled}, #{}), [], State3), + {_State, {dequeue, {_, {_, second}}}, _} = + apply(meta(4), make_checkout(Cid, {dequeue, settled}, #{}), State3), ok. down_with_noproc_consumer_returns_unsettled_test() -> Cid = {<<"down_consumer_returns_unsettled_test">>, self()}, {State0, [_, _]} = enq(1, 1, second, test_init(test)), {State1, [{monitor, process, Pid} | _]} = check(Cid, 2, State0), - {State2, _, _} = apply(meta(3), {down, Pid, noproc}, [], State1), + {State2, _, _} = apply(meta(3), {down, Pid, noproc}, State1), {_State, Effects} = check(Cid, 4, State2), ?ASSERT_EFF({monitor, process, _}, Effects), ok. @@ -1514,15 +1519,15 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test() -> ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects1), % monitor both enqueuer and consumer % because we received a noconnection we now need to monitor the node - {State2a, _Effects2a, _} = apply(meta(3), {down, Pid, noconnection}, [], State1), + {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1), #consumer{credit = 1} = maps:get(Cid, State2a#state.consumers), %% validate consumer has credit - {State2, Effects2, _} = apply(meta(3), {down, Self, noconnection}, [], State2a), + {State2, _, Effects2} = apply(meta(3), {down, Self, noconnection}, State2a), ?ASSERT_EFF({monitor, node, _}, Effects2), ?assertNoEffect({demonitor, process, _}, Effects2), % when the node comes up we need to retry the process monitors for the % disconnected processes - {_State3, Effects3, _} = apply(meta(3), {nodeup, Node}, [], State2), + {_State3, _, Effects3} = apply(meta(3), {nodeup, Node}, State2), % try to re-monitor the suspect processes ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects3), ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3), @@ -1537,7 +1542,7 @@ down_with_noconnection_returns_unack_test() -> {State1, {_, _}} = deq(2, Cid, unsettled, State0), ?assertEqual(0, maps:size(State1#state.messages)), ?assertEqual(0, lqueue:len(State1#state.returns)), - {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, [], State1), + {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1), ?assertEqual(1, maps:size(State2a#state.messages)), ?assertEqual(1, lqueue:len(State2a#state.returns)), ok. @@ -1545,9 +1550,9 @@ down_with_noconnection_returns_unack_test() -> down_with_noproc_enqueuer_is_cleaned_up_test() -> State00 = test_init(test), Pid = spawn(fun() -> ok end), - {State0, Effects0, _} = apply(meta(1), {enqueue, Pid, 1, first}, [], State00), + {State0, _, Effects0} = apply(meta(1), {enqueue, Pid, 1, first}, State00), ?ASSERT_EFF({monitor, process, _}, Effects0), - {State1, _Effects1, _} = apply(meta(3), {down, Pid, noproc}, [], State0), + {State1, _, _} = apply(meta(3), {down, Pid, noproc}, State0), % ensure there are no enqueuers ?assert(0 =:= maps:size(State1#state.enqueuers)), ok. @@ -1569,7 +1574,7 @@ discarded_message_without_dead_letter_handler_is_removed_test() -> ?ASSERT_EFF({send_msg, _, {delivery, _, [{0, {#{}, first}}]}, _}, Effects1), - {_State2, Effects2, _} = apply(meta(1), make_discard(Cid, [0]), [], State1), + {_State2, _, Effects2} = apply(meta(1), make_discard(Cid, [0]), State1), ?assertNoEffect({send_msg, _, {delivery, _, [{0, {#{}, first}}]}, _}, Effects2), @@ -1586,8 +1591,7 @@ discarded_message_with_dead_letter_handler_emits_mod_call_effect_test() -> ?ASSERT_EFF({send_msg, _, {delivery, _, [{0, {#{}, first}}]}, _}, Effects1), - {_State2, Effects2, _} = apply(meta(1), make_discard(Cid, [0]), - [], State1), + {_State2, _, Effects2} = apply(meta(1), make_discard(Cid, [0]), State1), % assert mod call effect with appended reason and message ?ASSERT_EFF({mod_call, somemod, somefun, [somearg, [{rejected, first}]]}, Effects2), @@ -1600,7 +1604,7 @@ tick_test() -> {S1, _} = enq(2, 2, <<"snd">>, S0), {S2, {MsgId, _}} = deq(3, Cid, unsettled, S1), {S3, {_, _}} = deq(4, Cid2, unsettled, S2), - {S4, _, _} = apply(meta(5), make_return(Cid, [MsgId]), [], S3), + {S4, _, _} = apply(meta(5), make_return(Cid, [MsgId]), S3), [{mod_call, _, _, [#resource{}, @@ -1776,8 +1780,8 @@ run_snapshot_test0(Name, Commands) -> ?debugFmt("running from snapshot: ~b", [SnapIdx]), {S, _} = run_log(SnapState, Filtered), % assert log can be restored from any release cursor index - % ?debugFmt("Name ~p Idx ~p S~p~nState~p~nSnapState ~p~nFiltered ~p~n", - % [Name, SnapIdx, S, State, SnapState, Filtered]), + ?debugFmt("Name ~p~nS~p~nState~p~nn", + [Name, S, State]), ?assertEqual(State, S) end || {release_cursor, SnapIdx, SnapState} <- Effects], ok. @@ -1810,10 +1814,9 @@ pending_enqueue_is_enqueued_on_down_test() -> Cid = {<<"cid">>, self()}, Pid = self(), {State0, _} = enq(1, 2, first, test_init(test)), - {State1, _, _} = apply(meta(2), {down, Pid, noproc}, [], State0), - {_State2, _, {dequeue, {0, {_, first}}}} = - apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), - [], State1), + {State1, _, _} = apply(meta(2), {down, Pid, noproc}, State0), + {_State2, {dequeue, {0, {_, first}}}, _} = + apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State1), ok. duplicate_delivery_test() -> @@ -1858,11 +1861,11 @@ state_enter_montors_and_notifications_test() -> purge_test() -> Cid = {<<"purge_test">>, self()}, {State1, _} = enq(1, 1, first, test_init(test)), - {State2, _, {purge, 1}} = apply(meta(2), make_purge(), [], State1), + {State2, {purge, 1}, _} = apply(meta(2), make_purge(), State1), {State3, _} = enq(3, 2, second, State2), % get returns a reply value - {_State4, [{monitor, _, _}], {dequeue, {0, {_, second}}}} = - apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}), [], State3), + {_State4, {dequeue, {0, {_, second}}}, [{monitor, _, _}]} = + apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}), State3), ok. purge_with_checkout_test() -> @@ -1873,7 +1876,7 @@ purge_with_checkout_test() -> %% assert message bytes are non zero ?assert(State2#state.msg_bytes_checkout > 0), ?assert(State2#state.msg_bytes_enqueue > 0), - {State3, _, {purge, 2}} = apply(meta(2), make_purge(), [], State2), + {State3, {purge, 2}, _} = apply(meta(2), make_purge(), State2), ?assertEqual(0, State3#state.msg_bytes_checkout), ?assertEqual(0, State3#state.msg_bytes_enqueue), #consumer{checked_out = Checked} = maps:get(Cid, State3#state.consumers), @@ -1893,7 +1896,7 @@ down_returns_checked_out_in_order_test() -> #consumer{checked_out = Checked} = maps:get(Cid, S2#state.consumers), ?assertEqual(100, maps:size(Checked)), %% simulate down - {S, _, _} = apply(meta(102), {down, self(), noproc}, [], S2), + {S, _, _} = apply(meta(102), {down, self(), noproc}, S2), Returns = lqueue:to_list(S#state.returns), ?assertEqual(100, length(Returns)), %% validate returns are in order @@ -1905,54 +1908,58 @@ meta(Idx) -> enq(Idx, MsgSeq, Msg, State) -> strip_reply( - apply(meta(Idx), make_enqueue(self(), MsgSeq, Msg), [], State)). + apply(meta(Idx), make_enqueue(self(), MsgSeq, Msg), State)). deq(Idx, Cid, Settlement, State0) -> - {State, _, {dequeue, Msg}} = + {State, {dequeue, Msg}, _} = apply(meta(Idx), - make_checkout(Cid, {dequeue, Settlement}, #{}), - [], State0), + make_checkout(Cid, {dequeue, Settlement}, #{}), + State0), {State, Msg}. check_n(Cid, Idx, N, State) -> strip_reply( apply(meta(Idx), make_checkout(Cid, {auto, N, simple_prefetch}, #{}), - [], State)). + State)). check(Cid, Idx, State) -> strip_reply( apply(meta(Idx), make_checkout(Cid, {once, 1, simple_prefetch}, #{}), - [], State)). + State)). check_auto(Cid, Idx, State) -> strip_reply( apply(meta(Idx), make_checkout(Cid, {auto, 1, simple_prefetch}, #{}), - [], State)). + State)). check(Cid, Idx, Num, State) -> strip_reply( apply(meta(Idx), make_checkout(Cid, {auto, Num, simple_prefetch}, #{}), - [], State)). + State)). settle(Cid, Idx, MsgId, State) -> - strip_reply(apply(meta(Idx), make_settle(Cid, [MsgId]), [], State)). + strip_reply(apply(meta(Idx), make_settle(Cid, [MsgId]), State)). credit(Cid, Idx, Credit, DelCnt, Drain, State) -> strip_reply(apply(meta(Idx), make_credit(Cid, Credit, DelCnt, Drain), - [], State)). + State)). -strip_reply({State, Effects, _Reply}) -> +strip_reply({State, _, Effects}) -> {State, Effects}. run_log(InitState, Entries) -> lists:foldl(fun ({Idx, E}, {Acc0, Efx0}) -> - case apply(meta(Idx), E, Efx0, Acc0) of - {Acc, Efx, _} -> - {Acc, Efx} + case apply(meta(Idx), E, Acc0) of + {Acc, _, Efx} when is_list(Efx) -> + {Acc, Efx0 ++ Efx}; + {Acc, _, Efx} -> + {Acc, Efx0 ++ [Efx]}; + {Acc, _} -> + {Acc, Efx0} end end, {InitState, []}, Entries). diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 53dd2e2a7d..c43ef4dfd4 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -157,13 +157,15 @@ cancel_consumer_handler(QName, {ConsumerTag, ChPid}) -> case Node == node() of true -> cancel_consumer(QName, ChPid, ConsumerTag); false -> + %% this could potentially block for a while if the node is + %% in disconnected state or tcp buffers are full rpc:cast(Node, rabbit_quorum_queue, cancel_consumer, [QName, ChPid, ConsumerTag]) end. cancel_consumer(QName, ChPid, ConsumerTag) -> - rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName), + catch rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName), rabbit_event:notify(consumer_deleted, [{consumer_tag, ConsumerTag}, {channel, ChPid}, |
