diff options
| author | Michael Klishin <michael@novemberain.com> | 2019-03-01 21:09:22 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-03-01 21:09:22 +0300 |
| commit | 27607074e9ac6bca20db423b0f62a465648a2bdc (patch) | |
| tree | e27254e341d493c62b2710ff2bd1e1245b8546f4 | |
| parent | 564bca6b67e7d4a9f15d343e19fe41e7ec1787c5 (diff) | |
| parent | 933c176c95be6cec082389b501787797690f51ba (diff) | |
| download | rabbitmq-server-git-27607074e9ac6bca20db423b0f62a465648a2bdc.tar.gz | |
Merge pull request #1903 from rabbitmq/single-active-noconnection
rabbit_fifo: change single active consumer on noconnection
| -rw-r--r-- | src/rabbit_fifo.erl | 248 | ||||
| -rw-r--r-- | src/rabbit_fifo.hrl | 7 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 6 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 4 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 330 | ||||
| -rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 17 | ||||
| -rw-r--r-- | test/single_active_consumer_SUITE.erl | 8 | ||||
| -rw-r--r-- | test/test_util.erl | 17 |
8 files changed, 438 insertions, 199 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index b06d34e83a..97e5f6e901 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -94,6 +94,7 @@ #credit{} | #purge{} | #update_config{}. + -type command() :: protocol() | ra_machine:builtin_command(). %% all the command types supported by ra fifo @@ -134,7 +135,7 @@ update_config(Conf, State) -> true -> single_active; false -> - default + competing end, Cfg = State#?MODULE.cfg, State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = SHI, @@ -194,7 +195,8 @@ apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, drain = Drain, consumer_id = ConsumerId}, #?MODULE{consumers = Cons0, - service_queue = ServiceQueue0} = State0) -> + service_queue = ServiceQueue0, + waiting_consumers = Waiting0} = State0) -> case Cons0 of #{ConsumerId := #consumer{delivery_count = DelCnt} = Con0} -> %% this can go below 0 when credit is reduced @@ -207,7 +209,7 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, {State1, ok, Effects} = checkout(Meta, State0#?MODULE{service_queue = ServiceQueue, consumers = Cons}, []), - Response = {send_credit_reply, maps:size(State1#?MODULE.messages)}, + Response = {send_credit_reply, messages_ready(State1)}, %% by this point all checkouts for the updated credit value %% should be processed so we can evaluate the drain case Drain of @@ -231,6 +233,20 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, {multi, [Response, {send_drained, [{CTag, Drained}]}]}, Effects} end; + _ when Waiting0 /= [] -> + %% there are waiting consuemrs + case lists:keytake(ConsumerId, 1, Waiting0) of + {value, {_, Con0 = #consumer{delivery_count = DelCnt}}, Waiting} -> + %% the consumer is a waiting one + %% grant the credit + C = max(0, RemoteDelCnt + NewCredit - DelCnt), + Con = Con0#consumer{credit = C}, + State = State0#?MODULE{waiting_consumers = + [{ConsumerId, Con} | Waiting]}, + {State, {send_credit_reply, messages_ready(State)}}; + false -> + {State0, ok} + end; _ -> %% credit for unknown consumer - just ignore {State0, ok} @@ -248,7 +264,8 @@ apply(Meta, #checkout{spec = {dequeue, Settlement}, {State0, {dequeue, empty}}; Ready -> State1 = update_consumer(ConsumerId, ConsumerMeta, - {once, 1, simple_prefetch}, State0), + {once, 1, simple_prefetch}, + State0), {success, _, MsgId, Msg, State2} = checkout_one(State1), case Settlement of unsettled -> @@ -257,10 +274,9 @@ apply(Meta, #checkout{spec = {dequeue, Settlement}, [{monitor, process, Pid}]}; settled -> %% immediately settle the checkout - {State, _, Effects} = apply(Meta, - make_settle(ConsumerId, - [MsgId]), - State2), + {State, _, Effects} = + apply(Meta, make_settle(ConsumerId, [MsgId]), + State2), {State, {dequeue, {MsgId, Msg}, Ready-1}, Effects} end end; @@ -294,27 +310,64 @@ apply(#{index := RaftIdx}, #purge{}, %% reverse the effects ourselves {State, {purge, Total}, lists:reverse([garbage_collection | Effects])}; -apply(_, {down, ConsumerPid, noconnection}, +apply(_, {down, Pid, noconnection}, + #?MODULE{consumers = Cons0, + cfg = #cfg{consumer_strategy = single_active}, + waiting_consumers = Waiting0, + enqueuers = Enqs0} = State0) -> + Node = node(Pid), + %% if the pid refers to the active consumer, mark it as suspected and return + %% it to the waiting queue + {State1, Effects0} = case maps:to_list(Cons0) of + [{{_, P} = Cid, C}] when node(P) =:= Node -> + %% the consumer should be returned to waiting + %% + Effs = consumer_update_active_effects( + State0, Cid, C, false, suspected_down, []), + {State0#?MODULE{consumers = #{}, + waiting_consumers = Waiting0 ++ [{Cid, C}]}, + Effs}; + _ -> {State0, []} + end, + WaitingConsumers = update_waiting_consumer_status(Node, State1, + suspected_down), + + %% select a new consumer from the waiting queue and run a checkout + State2 = State1#?MODULE{waiting_consumers = WaitingConsumers}, + {State, Effects1} = activate_next_consumer(State2, Effects0), + + %% mark any enquers as suspected + Enqs = maps:map(fun(P, E) when node(P) =:= Node -> + E#enqueuer{status = suspected_down}; + (_, E) -> E + end, Enqs0), + Effects = [{monitor, node, Node} | Effects1], + {State#?MODULE{enqueuers = Enqs}, ok, Effects}; +apply(_, {down, Pid, noconnection}, #?MODULE{consumers = Cons0, enqueuers = Enqs0} = State0) -> - Node = node(ConsumerPid), + %% A node has been disconnected. This doesn't necessarily mean that + %% any processes on this node are down, they _may_ come back so here + %% we just mark them as suspected (effectively deactivated) + %% and return all checked out messages to the main queue for delivery to any + %% live consumers + %% + %% all pids for the disconnected node will be marked as suspected not just + %% the one we got the `down' command for + Node = node(Pid), ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), - % mark all consumers and enqueuers as suspected down - % and monitor the node so that we can find out the final state of the - % process at some later point + {Cons, State, Effects1} = - maps:fold(fun({_, P} = K, - #consumer{checked_out = Checked0} = C, - {Co, St0, Eff}) when (node(P) =:= Node) and - (C#consumer.status =/= cancelled)-> + maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0, + status = up} = C, + {Co, St0, Eff}) when node(P) =:= Node -> {St, Eff0} = return_all(St0, Checked0, Eff, K, C), Credit = increase_credit(C, maps:size(Checked0)), Eff1 = ConsumerUpdateActiveFun(St, K, C, false, suspected_down, Eff0), - {maps:put(K, - C#consumer{status = suspected_down, - credit = Credit, - checked_out = #{}}, Co), + {maps:put(K, C#consumer{status = suspected_down, + credit = Credit, + checked_out = #{}}, Co), St, Eff1}; (K, C, {Co, St, Eff}) -> {maps:put(K, C, Co), St, Eff} @@ -323,10 +376,10 @@ apply(_, {down, ConsumerPid, noconnection}, E#enqueuer{status = suspected_down}; (_, E) -> E end, Enqs0), - % mark waiting consumers as suspected if necessary - WaitingConsumers = update_waiting_consumer_status(Node, State0, - suspected_down), + % Monitor the node so that we can "unsuspect" these processes when the node + % comes back, then re-issue all monitors and discover the final fate of + % these processes Effects2 = case maps:size(Cons) of 0 -> [{aux, inactive}, {monitor, node, Node}]; @@ -334,8 +387,7 @@ apply(_, {down, ConsumerPid, noconnection}, [{monitor, node, Node}] end ++ Effects1, %% TODO: should we run a checkout here? - {State#?MODULE{consumers = Cons, enqueuers = Enqs, - waiting_consumers = WaitingConsumers}, ok, Effects2}; + {State#?MODULE{consumers = Cons, enqueuers = Enqs}, ok, Effects2}; apply(Meta, {down, Pid, _Info}, #?MODULE{consumers = Cons0, enqueuers = Enqs0} = State0) -> % Remove any enqueuer for the same pid and enqueue any pending messages @@ -367,36 +419,36 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, Monitors = [{monitor, process, P} || P <- suspected_pids_for(Node, State0)], - % un-suspect waiting consumers when necessary - WaitingConsumers = update_waiting_consumer_status(Node, State0, up), - Enqs1 = maps:map(fun(P, E) when node(P) =:= Node -> E#enqueuer{status = up}; (_, E) -> E end, Enqs0), ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), - {Cons1, SQ, Effects} = - maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc}) - when (node(P) =:= Node) and - (C#consumer.status =/= cancelled) -> - EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C, - true, up, EAcc), - update_or_remove_sub( - ConsumerId, C#consumer{status = up}, - CAcc, SQAcc, EAcc1); - (_, _, Acc) -> - Acc - end, {Cons0, SQ0, Monitors}, Cons0), - - checkout(Meta, State0#?MODULE{consumers = Cons1, enqueuers = Enqs1, - service_queue = SQ, - waiting_consumers = WaitingConsumers}, Effects); + %% mark all consumers as up + {Cons1, SQ, Effects1} = + maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc}) + when (node(P) =:= Node) and + (C#consumer.status =/= cancelled) -> + EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, + C, true, up, EAcc), + update_or_remove_sub(ConsumerId, + C#consumer{status = up}, CAcc, + SQAcc, EAcc1); + (_, _, Acc) -> + Acc + end, {Cons0, SQ0, Monitors}, Cons0), + Waiting = update_waiting_consumer_status(Node, State0, up), + State1 = State0#?MODULE{consumers = Cons1, enqueuers = Enqs1, + service_queue = SQ, + waiting_consumers = Waiting}, + {State, Effects} = activate_next_consumer(State1, Effects1), + checkout(Meta, State, Effects); apply(_, {nodedown, _Node}, State) -> {State, ok}; apply(Meta, #update_config{config = Conf}, State) -> checkout(Meta, update_config(Conf, State), []). -consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = default}}) -> +consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = competing}}) -> fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) -> consumer_update_active_effects(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) @@ -407,7 +459,7 @@ consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = sin end. handle_waiting_consumer_down(_Pid, - #?MODULE{cfg = #cfg{consumer_strategy = default}} = State) -> + #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State) -> {[], State}; handle_waiting_consumer_down(_Pid, #?MODULE{cfg = #cfg{consumer_strategy = single_active}, @@ -429,27 +481,18 @@ handle_waiting_consumer_down(Pid, State = State0#?MODULE{waiting_consumers = StillUp}, {Effects, State}. -update_waiting_consumer_status(_Node, #?MODULE{cfg = #cfg{consumer_strategy = default}}, - _Status) -> - []; -update_waiting_consumer_status(_Node, - #?MODULE{cfg = #cfg{consumer_strategy = single_active}, - waiting_consumers = []}, - _Status) -> - []; update_waiting_consumer_status(Node, - #?MODULE{cfg = #cfg{consumer_strategy = single_active}, - waiting_consumers = WaitingConsumers}, + #?MODULE{waiting_consumers = WaitingConsumers}, Status) -> [begin - case node(P) of + case node(Pid) of Node -> {ConsumerId, Consumer#consumer{status = Status}}; _ -> {ConsumerId, Consumer} end - end || {{_, P} = ConsumerId, Consumer} <- WaitingConsumers, - Consumer#consumer.status =/= cancelled]. + end || {{_, Pid} = ConsumerId, Consumer} <- WaitingConsumers, + Consumer#consumer.status =/= cancelled]. -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). state_enter(leader, #?MODULE{consumers = Cons, @@ -583,7 +626,7 @@ query_consumers(#?MODULE{consumers = Consumers, cfg = #cfg{consumer_strategy = ConsumerStrategy}} = State) -> ActiveActivityStatusFun = case ConsumerStrategy of - default -> + competing -> fun(_ConsumerId, #consumer{status = Status}) -> case Status of @@ -709,8 +752,8 @@ num_checked_out(#?MODULE{consumers = Cons}) -> end, 0, maps:values(Cons)). cancel_consumer(ConsumerId, - #?MODULE{cfg = #cfg{consumer_strategy = default}} = State, Effects, Reason) -> - %% general case, single active consumer off + #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State, + Effects, Reason) -> cancel_consumer0(ConsumerId, State, Effects, Reason); cancel_consumer(ConsumerId, #?MODULE{cfg = #cfg{consumer_strategy = single_active}, @@ -721,41 +764,23 @@ cancel_consumer(ConsumerId, cancel_consumer(ConsumerId, #?MODULE{consumers = Cons0, cfg = #cfg{consumer_strategy = single_active}, - waiting_consumers = WaitingConsumers0} = State0, + waiting_consumers = Waiting0} = State0, Effects0, Reason) -> %% single active consumer on, consumers are waiting - case maps:take(ConsumerId, Cons0) of - {Consumer, Cons1} -> + case maps:is_key(ConsumerId, Cons0) of + true -> % The active consumer is to be removed - % Cancel it - {State1, Effects1} = maybe_return_all(ConsumerId, Consumer, Cons1, - State0, Effects0, Reason), - Effects2 = cancel_consumer_effects(ConsumerId, State1, Effects1), - % Take another one from the waiting consumers and put it in consumers - [{NewActiveConsumerId, NewActiveConsumer} - | RemainingWaitingConsumers] = WaitingConsumers0, - #?MODULE{service_queue = ServiceQueue} = State1, - ServiceQueue1 = maybe_queue_consumer(NewActiveConsumerId, - NewActiveConsumer, - ServiceQueue), - State = State1#?MODULE{consumers = maps:put(NewActiveConsumerId, - NewActiveConsumer, - State1#?MODULE.consumers), - service_queue = ServiceQueue1, - waiting_consumers = RemainingWaitingConsumers}, - Effects = consumer_update_active_effects(State, NewActiveConsumerId, - NewActiveConsumer, true, - single_active, Effects2), - {State, Effects}; - error -> + {State1, Effects1} = cancel_consumer0(ConsumerId, State0, + Effects0, Reason), + activate_next_consumer(State1, Effects1); + false -> % The cancelled consumer is not the active one % Just remove it from idle_consumers - WaitingConsumers = lists:keydelete(ConsumerId, 1, - WaitingConsumers0), + Waiting = lists:keydelete(ConsumerId, 1, Waiting0), Effects = cancel_consumer_effects(ConsumerId, State0, Effects0), % A waiting consumer isn't supposed to have any checked out messages, % so nothing special to do here - {State0#?MODULE{waiting_consumers = WaitingConsumers}, Effects} + {State0#?MODULE{waiting_consumers = Waiting}, Effects} end. consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}}, @@ -765,9 +790,7 @@ consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}}, Ack = maps:get(ack, Meta, undefined), Prefetch = maps:get(prefetch, Meta, undefined), Args = maps:get(args, Meta, []), - [{mod_call, - rabbit_quorum_queue, - update_consumer_handler, + [{mod_call, rabbit_quorum_queue, update_consumer_handler, [QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]} | Effects]. @@ -776,6 +799,10 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) -> {Consumer, Cons1} -> {S, Effects2} = maybe_return_all(ConsumerId, Consumer, Cons1, S0, Effects0, Reason), + %% The effects are emitted before the consumer is actually removed + %% if the consumer has unacked messages. This is a bit weird but + %% in line with what classic queues do (from an external point of + %% view) Effects = cancel_consumer_effects(ConsumerId, S, Effects2), case maps:size(S#?MODULE.consumers) of 0 -> @@ -788,6 +815,38 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) -> {S0, Effects0} end. +activate_next_consumer(#?MODULE{consumers = Cons, + waiting_consumers = Waiting0} = State0, + Effects0) -> + case maps:filter(fun (_, #consumer{status = S}) -> S == up end, Cons) of + Up when map_size(Up) == 0 -> + %% there are no active consumer in the consumer map + case lists:filter(fun ({_, #consumer{status = Status}}) -> + Status == up + end, Waiting0) of + [{NextConsumerId, NextConsumer} | _] -> + %% there is a potential next active consumer + Remaining = lists:keydelete(NextConsumerId, 1, Waiting0), + #?MODULE{service_queue = ServiceQueue} = State0, + ServiceQueue1 = maybe_queue_consumer(NextConsumerId, + NextConsumer, + ServiceQueue), + State = State0#?MODULE{consumers = Cons#{NextConsumerId => NextConsumer}, + service_queue = ServiceQueue1, + waiting_consumers = Remaining}, + Effects = consumer_update_active_effects(State, NextConsumerId, + NextConsumer, true, + single_active, Effects0), + {State, Effects}; + [] -> + {State0, [{aux, inactive} | Effects0]} + end; + _ -> + {State0, Effects0} + end. + + + maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1, #?MODULE{consumers = C0, service_queue = SQ0} = S0, @@ -1296,7 +1355,7 @@ uniq_queue_in(Key, Queue) -> end. update_consumer(ConsumerId, Meta, Spec, - #?MODULE{cfg = #cfg{consumer_strategy = default}} = State0) -> + #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State0) -> %% general case, single active consumer off update_consumer0(ConsumerId, Meta, Spec, State0); update_consumer(ConsumerId, Meta, Spec, @@ -1331,7 +1390,6 @@ update_consumer0(ConsumerId, Meta, {Life, Credit, Mode}, end, Init, Cons0), ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons), ServiceQueue0), - State0#?MODULE{consumers = Cons, service_queue = ServiceQueue}. maybe_queue_consumer(ConsumerId, #consumer{credit = Credit}, diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl index ebe5f3328a..968ae07739 100644 --- a/src/rabbit_fifo.hrl +++ b/src/rabbit_fifo.hrl @@ -88,6 +88,8 @@ -type consumer() :: #consumer{}. +-type consumer_strategy() :: competing | single_active. + -record(enqueuer, {next_seqno = 1 :: msg_seqno(), % out of order enqueues - sorted list @@ -104,7 +106,8 @@ max_length :: maybe(non_neg_integer()), max_bytes :: maybe(non_neg_integer()), %% whether single active consumer is on or not for this queue - consumer_strategy = default :: default | single_active, + consumer_strategy = competing :: consumer_strategy(), + %% the maximum number of unsuccessful delivery attempts permitted delivery_limit :: maybe(non_neg_integer()) }). @@ -114,7 +117,7 @@ messages = #{} :: #{msg_in_id() => indexed_msg()}, % defines the lowest message in id available in the messages map % that isn't a return - low_msg_num :: msg_in_id() | undefined, + low_msg_num :: maybe(msg_in_id()), % defines the next message in id to be added to the messages map next_msg_num = 1 :: msg_in_id(), % list of returned msg_in_ids - when checking out it picks from diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index e811bfffb3..c685785d0d 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -120,6 +120,7 @@ declare(Q) when ?amqqueue_is_quorum(Q) -> RaMachine = ra_machine(NewQ), ServerIds = [{RaName, Node} || Node <- Nodes], ClusterName = RaName, + TickTimeout = application:get_env(rabbit, quorum_tick_interval, ?TICK_TIMEOUT), RaConfs = [begin UId = ra:new_uid(ra_lib:to_binary(ClusterName)), FName = rabbit_misc:rs(QName), @@ -129,7 +130,7 @@ declare(Q) when ?amqqueue_is_quorum(Q) -> friendly_name => FName, initial_members => ServerIds, log_init_args => #{uid => UId}, - tick_timeout => ?TICK_TIMEOUT, + tick_timeout => TickTimeout, machine => RaMachine} end || ServerId <- ServerIds], @@ -190,7 +191,8 @@ update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Act QName, Prefetch, Active, ActivityStatus, Args). cancel_consumer_handler(QName, {ConsumerTag, ChPid}) -> - local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer, [QName, ChPid, ConsumerTag]). + local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer, + [QName, ChPid, ConsumerTag]). cancel_consumer(QName, ChPid, ConsumerTag) -> catch rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName), diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 0ec65c31b8..bab28ba1ff 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -126,8 +126,10 @@ memory_tests() -> %% Testsuite setup/teardown. %% ------------------------------------------------------------------- -init_per_suite(Config) -> +init_per_suite(Config0) -> rabbit_ct_helpers:log_environment(), + Config = rabbit_ct_helpers:merge_app_env( + Config0, {rabbit, [{quorum_tick_interval, 1000}]}), rabbit_ct_helpers:run_setup_steps( Config, [fun rabbit_ct_broker_helpers:enable_dist_proxy_manager/1]). diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index 72f09c3c64..cf14a68d9d 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -27,8 +27,7 @@ all() -> %% replicate eunit like test resultion all_tests() -> [F || {F, _} <- ?MODULE:module_info(functions), - re:run(atom_to_list(F), "_test$") /= nomatch] - . + re:run(atom_to_list(F), "_test$") /= nomatch]. groups() -> [ @@ -588,76 +587,89 @@ single_active_consumer_test(_) -> % adding some consumers AddConsumer = fun(CTag, State) -> - {NewState, _, _} = apply( - meta(1), - make_checkout({CTag, self()}, - {once, 1, simple_prefetch}, - #{}), - State), - NewState + {NewState, _, _} = apply( + meta(1), + make_checkout({CTag, self()}, + {once, 1, simple_prefetch}, + #{}), + State), + NewState end, - State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + State1 = lists:foldl(AddConsumer, State0, + [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + C1 = {<<"ctag1">>, self()}, + C2 = {<<"ctag2">>, self()}, + C3 = {<<"ctag3">>, self()}, + C4 = {<<"ctag4">>, self()}, % the first registered consumer is the active one, the others are waiting ?assertEqual(1, map_size(State1#rabbit_fifo.consumers)), - ?assert(maps:is_key({<<"ctag1">>, self()}, State1#rabbit_fifo.consumers)), + ?assertMatch(#{C1 := _}, State1#rabbit_fifo.consumers), ?assertEqual(3, length(State1#rabbit_fifo.waiting_consumers)), - ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State1#rabbit_fifo.waiting_consumers)), - ?assertNotEqual(false, lists:keyfind({<<"ctag3">>, self()}, 1, State1#rabbit_fifo.waiting_consumers)), - ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State1#rabbit_fifo.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C2, 1, State1#rabbit_fifo.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C3, 1, State1#rabbit_fifo.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C4, 1, State1#rabbit_fifo.waiting_consumers)), % cancelling a waiting consumer {State2, _, Effects1} = apply(meta(2), - make_checkout({<<"ctag3">>, self()}, - cancel, #{}), State1), + make_checkout(C3, cancel, #{}), + State1), % the active consumer should still be in place ?assertEqual(1, map_size(State2#rabbit_fifo.consumers)), - ?assert(maps:is_key({<<"ctag1">>, self()}, State2#rabbit_fifo.consumers)), + ?assertMatch(#{C1 := _}, State2#rabbit_fifo.consumers), % the cancelled consumer has been removed from waiting consumers ?assertEqual(2, length(State2#rabbit_fifo.waiting_consumers)), - ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State2#rabbit_fifo.waiting_consumers)), - ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State2#rabbit_fifo.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C2, 1, State2#rabbit_fifo.waiting_consumers)), + ?assertNotEqual(false, lists:keyfind(C4, 1, State2#rabbit_fifo.waiting_consumers)), % there are some effects to unregister the consumer - ?assertEqual(1, length(Effects1)), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C3, Effects1), % cancelling the active consumer {State3, _, Effects2} = apply(meta(3), - make_checkout({<<"ctag1">>, self()}, - cancel, #{}), + make_checkout(C1, cancel, #{}), State2), % the second registered consumer is now the active one ?assertEqual(1, map_size(State3#rabbit_fifo.consumers)), - ?assert(maps:is_key({<<"ctag2">>, self()}, State3#rabbit_fifo.consumers)), + ?assertMatch(#{C2 := _}, State3#rabbit_fifo.consumers), % the new active consumer is no longer in the waiting list ?assertEqual(1, length(State3#rabbit_fifo.waiting_consumers)), - ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State3#rabbit_fifo.waiting_consumers)), - % there are some effects to unregister the consumer and to update the new active one (metrics) - ?assertEqual(2, length(Effects2)), + ?assertNotEqual(false, lists:keyfind(C4, 1, + State3#rabbit_fifo.waiting_consumers)), + %% should have a cancel consumer handler mod_call effect and + %% an active new consumer effect + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C1, Effects2), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + update_consumer_handler, _}, Effects2), % cancelling the active consumer {State4, _, Effects3} = apply(meta(4), - make_checkout({<<"ctag2">>, self()}, - cancel, #{}), + make_checkout(C2, cancel, #{}), State3), % the last waiting consumer became the active one ?assertEqual(1, map_size(State4#rabbit_fifo.consumers)), - ?assert(maps:is_key({<<"ctag4">>, self()}, State4#rabbit_fifo.consumers)), + ?assertMatch(#{C4 := _}, State4#rabbit_fifo.consumers), % the waiting consumer list is now empty ?assertEqual(0, length(State4#rabbit_fifo.waiting_consumers)), - % there are some effects to unregister the consumer and to update the new active one (metrics) - ?assertEqual(2, length(Effects3)), + % there are some effects to unregister the consumer and + % to update the new active one (metrics) + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C2, Effects3), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + update_consumer_handler, _}, Effects3), % cancelling the last consumer {State5, _, Effects4} = apply(meta(5), - make_checkout({<<"ctag4">>, self()}, - cancel, #{}), + make_checkout(C4, cancel, #{}), State4), % no active consumer anymore ?assertEqual(0, map_size(State5#rabbit_fifo.consumers)), % still nothing in the waiting list ?assertEqual(0, length(State5#rabbit_fifo.waiting_consumers)), % there is an effect to unregister the consumer + queue inactive effect - ?assertEqual(1 + 1, length(Effects4)), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, _}, Effects4), ok. @@ -673,6 +685,9 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) -> Pid2 = spawn(DummyFunction), Pid3 = spawn(DummyFunction), + [C1, C2, C3, C4] = Consumers = + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, + {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}], % adding some consumers AddConsumer = fun({CTag, ChannelId}, State) -> {NewState, _, _} = apply( @@ -681,27 +696,34 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) -> State), NewState end, - State1 = lists:foldl(AddConsumer, State0, - [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + State1 = lists:foldl(AddConsumer, State0, Consumers), % the channel of the active consumer goes down - {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, doesnotmatter}, State1), + {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, noproc}, State1), % fell back to another consumer ?assertEqual(1, map_size(State2#rabbit_fifo.consumers)), % there are still waiting consumers ?assertEqual(2, length(State2#rabbit_fifo.waiting_consumers)), % effects to unregister the consumer and % to update the new active one (metrics) are there - ?assertEqual(2, length(Effects)), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C1, Effects), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + update_consumer_handler, _}, Effects), % the channel of the active consumer and a waiting consumer goes down - {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, doesnotmatter}, State2), + {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, noproc}, State2), % fell back to another consumer ?assertEqual(1, map_size(State3#rabbit_fifo.consumers)), % no more waiting consumer ?assertEqual(0, length(State3#rabbit_fifo.waiting_consumers)), % effects to cancel both consumers of this channel + effect to update the new active one (metrics) - ?assertEqual(3, length(Effects2)), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C2, Effects2), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C3, Effects2), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + update_consumer_handler, _}, Effects2), % the last channel goes down {State4, _, Effects3} = apply(#{index => 4}, {down, Pid3, doesnotmatter}, State3), @@ -709,48 +731,107 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) -> ?assertEqual(0, map_size(State4#rabbit_fifo.consumers)), ?assertEqual(0, length(State4#rabbit_fifo.waiting_consumers)), % there is an effect to unregister the consumer + queue inactive effect - ?assertEqual(1 + 1, length(Effects3)), + ?ASSERT_EFF({mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [_, C]}, C == C4, Effects3), ok. -single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnection_test(_) -> +single_active_consumer_replaces_consumer_when_down_noconnection_test(_) -> + R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), State0 = init(#{name => ?FUNCTION_NAME, - queue_resource => rabbit_misc:r("/", queue, - atom_to_binary(?FUNCTION_NAME, utf8)), - release_cursor_interval => 0, - single_active_consumer_on => true}), - + queue_resource => R, + release_cursor_interval => 0, + single_active_consumer_on => true}), Meta = #{index => 1}, + Nodes = [n1, n2, node()], + ConsumerIds = [C1 = {_, DownPid}, C2, _C3] = + [begin + B = atom_to_binary(N, utf8), + {<<"ctag_", B/binary>>, + test_util:fake_pid(N)} + end || N <- Nodes], % adding some consumers - AddConsumer = fun(CTag, State) -> - {NewState, _, _} = apply( - Meta, - make_checkout({CTag, self()}, - {once, 1, simple_prefetch}, #{}), - State), - NewState - end, - State1 = lists:foldl(AddConsumer, State0, - [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + State1 = lists:foldl( + fun(CId, Acc0) -> + {Acc, _, _} = + apply(Meta, + make_checkout(CId, + {once, 1, simple_prefetch}, #{}), + Acc0), + Acc + end, State0, ConsumerIds), + + %% assert the consumer is up + ?assertMatch(#{C1 := #consumer{status = up}}, + State1#rabbit_fifo.consumers), % simulate node goes down - {State2, _, _} = apply(#{}, {down, self(), noconnection}, State1), + {State2, _, _} = apply(meta(5), {down, DownPid, noconnection}, State1), - % all the waiting consumers should be suspected down - ?assertEqual(3, length(State2#rabbit_fifo.waiting_consumers)), - lists:foreach(fun({_, #consumer{status = Status}}) -> - ?assert(Status == suspected_down) - end, State2#rabbit_fifo.waiting_consumers), + %% assert a new consumer is in place and it is up + ?assertMatch([{C2, #consumer{status = up}}], + maps:to_list(State2#rabbit_fifo.consumers)), + + %% the disconnected consumer has been returned to waiting + ?assert(lists:any(fun ({C,_}) -> C =:= C1 end, + State2#rabbit_fifo.waiting_consumers)), + ?assertEqual(2, length(State2#rabbit_fifo.waiting_consumers)), - % simulate node goes back up - {State3, _, _} = apply(#{index => 2}, {nodeup, node(self())}, State2), + % simulate node comes back up + {State3, _, _} = apply(#{index => 2}, {nodeup, node(DownPid)}, State2), - % all the waiting consumers should be un-suspected - ?assertEqual(3, length(State3#rabbit_fifo.waiting_consumers)), + %% the consumer is still active and the same as before + ?assertMatch([{C2, #consumer{status = up}}], + maps:to_list(State3#rabbit_fifo.consumers)), + % the waiting consumers should be un-suspected + ?assertEqual(2, length(State3#rabbit_fifo.waiting_consumers)), lists:foreach(fun({_, #consumer{status = Status}}) -> ?assert(Status /= suspected_down) end, State3#rabbit_fifo.waiting_consumers), + ok. +single_active_consumer_all_disconnected_test(_) -> + R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => R, + release_cursor_interval => 0, + single_active_consumer_on => true}), + Meta = #{index => 1}, + Nodes = [n1, n2], + ConsumerIds = [C1 = {_, C1Pid}, C2 = {_, C2Pid}] = + [begin + B = atom_to_binary(N, utf8), + {<<"ctag_", B/binary>>, + test_util:fake_pid(N)} + end || N <- Nodes], + % adding some consumers + State1 = lists:foldl( + fun(CId, Acc0) -> + {Acc, _, _} = + apply(Meta, + make_checkout(CId, + {once, 1, simple_prefetch}, #{}), + Acc0), + Acc + end, State0, ConsumerIds), + + %% assert the consumer is up + ?assertMatch(#{C1 := #consumer{status = up}}, State1#rabbit_fifo.consumers), + + % simulate node goes down + {State2, _, _} = apply(meta(5), {down, C1Pid, noconnection}, State1), + %% assert the consumer fails over to the consumer on n2 + ?assertMatch(#{C2 := #consumer{status = up}}, State2#rabbit_fifo.consumers), + {State3, _, _} = apply(meta(6), {down, C2Pid, noconnection}, State2), + %% assert these no active consumer after both nodes are maked as down + ?assertMatch([], maps:to_list(State3#rabbit_fifo.consumers)), + %% n2 comes back + {State4, _, _} = apply(meta(7), {nodeup, node(C2Pid)}, State3), + %% ensure n2 is the active consumer as this node as been registered + %% as up again + ?assertMatch([{{<<"ctag_n2">>, _}, #consumer{status = up, + credit = 1}}], + maps:to_list(State4#rabbit_fifo.consumers)), ok. single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) -> @@ -783,11 +864,11 @@ single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) -> ?assertEqual(2 * 3 + 1, length(Effects)). single_active_consumer_state_enter_eol_include_waiting_consumers_test(_) -> + Resource = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), State0 = init(#{name => ?FUNCTION_NAME, - queue_resource => rabbit_misc:r("/", queue, - atom_to_binary(?FUNCTION_NAME, utf8)), - release_cursor_interval => 0, - single_active_consumer_on => true}), + queue_resource => Resource, + release_cursor_interval => 0, + single_active_consumer_on => true}), DummyFunction = fun() -> ok end, Pid1 = spawn(DummyFunction), @@ -805,7 +886,8 @@ single_active_consumer_state_enter_eol_include_waiting_consumers_test(_) -> NewState end, State1 = lists:foldl(AddConsumer, State0, - [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, + {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), Effects = rabbit_fifo:state_enter(eol, State1), % 1 effect for each consumer process (channel process) @@ -918,10 +1000,10 @@ active_flag_updated_when_consumer_suspected_unsuspected_test(_) -> active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_consumer_is_on_test(_) -> State0 = init(#{name => ?FUNCTION_NAME, - queue_resource => rabbit_misc:r("/", queue, - atom_to_binary(?FUNCTION_NAME, utf8)), - release_cursor_interval => 0, - single_active_consumer_on => true}), + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), DummyFunction = fun() -> ok end, Pid1 = spawn(DummyFunction), @@ -938,15 +1020,103 @@ active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_co NewState end, State1 = lists:foldl(AddConsumer, State0, - [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), + [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, + {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), {State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1), - % only 1 effect to monitor the node - ?assertEqual(1, length(Effects2)), + % one monitor and one consumer status update (deactivated) + ?assertEqual(3, length(Effects2)), {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2), % for each consumer: 1 effect to monitor the consumer PID - ?assertEqual(4, length(Effects3)). + ?assertEqual(5, length(Effects3)). + +single_active_cancelled_with_unacked_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + + C1 = {<<"ctag1">>, self()}, + C2 = {<<"ctag2">>, self()}, + % adding some consumers + AddConsumer = fun(C, S0) -> + {S, _, _} = apply( + meta(1), + make_checkout(C, + {auto, 1, simple_prefetch}, + #{}), + S0), + S + end, + State1 = lists:foldl(AddConsumer, State0, [C1, C2]), + + %% enqueue 2 messages + {State2, _Effects2} = enq(3, 1, msg1, State1), + {State3, _Effects3} = enq(4, 2, msg2, State2), + %% one should be checked ou to C1 + %% cancel C1 + {State4, _, _} = apply(meta(5), + make_checkout(C1, cancel, #{}), + State3), + %% C2 should be the active consumer + ?assertMatch(#{C2 := #consumer{status = up, + checked_out = #{0 := _}}}, + State4#rabbit_fifo.consumers), + %% C1 should be a cancelled consumer + ?assertMatch(#{C1 := #consumer{status = cancelled, + lifetime = once, + checked_out = #{0 := _}}}, + State4#rabbit_fifo.consumers), + ?assertMatch([], State4#rabbit_fifo.waiting_consumers), + + %% Ack both messages + {State5, _Effects5} = settle(C1, 1, 0, State4), + %% C1 should now be cancelled + {State6, _Effects6} = settle(C2, 2, 0, State5), + + %% C2 should remain + ?assertMatch(#{C2 := #consumer{status = up}}, + State6#rabbit_fifo.consumers), + %% C1 should be gone + ?assertNotMatch(#{C1 := _}, + State6#rabbit_fifo.consumers), + ?assertMatch([], State6#rabbit_fifo.waiting_consumers), + ok. + +single_active_with_credited_test(_) -> + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), + + C1 = {<<"ctag1">>, self()}, + C2 = {<<"ctag2">>, self()}, + % adding some consumers + AddConsumer = fun(C, S0) -> + {S, _, _} = apply( + meta(1), + make_checkout(C, + {auto, 0, credited}, + #{}), + S0), + S + end, + State1 = lists:foldl(AddConsumer, State0, [C1, C2]), + + %% add some credit + C1Cred = rabbit_fifo:make_credit(C1, 5, 0, false), + {State2, _, _Effects2} = apply(meta(3), C1Cred, State1), + C2Cred = rabbit_fifo:make_credit(C2, 4, 0, false), + {State3, _} = apply(meta(4), C2Cred, State2), + %% both consumers should have credit + ?assertMatch(#{C1 := #consumer{credit = 5}}, + State3#rabbit_fifo.consumers), + ?assertMatch([{C2, #consumer{credit = 4}}], + State3#rabbit_fifo.waiting_consumers), + ok. meta(Idx) -> #{index => Idx, term => 1}. diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index da72c030cd..c61d85fbff 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -271,21 +271,6 @@ scenario16(_Config) -> delivery_limit => 1}, Commands), ok. -fake_pid(_Config) -> - Pid = fake_external_pid(<<"mynode@banana">>), - ?assertNotEqual(node(Pid), node()), - ?assert(is_pid(Pid)), - ok. - -fake_external_pid(Node) when is_binary(Node) -> - ThisNodeSize = size(term_to_binary(node())) + 1, - Pid = spawn(fun () -> ok end), - %% drop the local node data from a local pid - <<_:ThisNodeSize/binary, LocalPidData/binary>> = term_to_binary(Pid), - S = size(Node), - %% replace it with the incoming node binary - Final = <<131,103, 100, 0, S, Node/binary, LocalPidData/binary>>, - binary_to_term(Final). snapshots(_Config) -> run_proper( @@ -352,7 +337,7 @@ log_gen(Size) -> pid_gen(Nodes) -> ?LET(Node, oneof(Nodes), - fake_external_pid(atom_to_binary(Node, utf8))). + test_util:fake_pid(atom_to_binary(Node, utf8))). down_gen(Pid) -> ?LET(E, {down, Pid, oneof([noconnection, noproc])}, E). diff --git a/test/single_active_consumer_SUITE.erl b/test/single_active_consumer_SUITE.erl index 0b12f54c0b..c22a13c744 100644 --- a/test/single_active_consumer_SUITE.erl +++ b/test/single_active_consumer_SUITE.erl @@ -45,12 +45,14 @@ groups() -> ]} ]. -init_per_suite(Config) -> +init_per_suite(Config0) -> rabbit_ct_helpers:log_environment(), - Config1 = rabbit_ct_helpers:set_config(Config, [ + Config1 = rabbit_ct_helpers:set_config(Config0, [ {rmq_nodename_suffix, ?MODULE} ]), - rabbit_ct_helpers:run_setup_steps(Config1, + Config = rabbit_ct_helpers:merge_app_env( + Config1, {rabbit, [{quorum_tick_interval, 1000}]}), + rabbit_ct_helpers:run_setup_steps(Config, rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps()). diff --git a/test/test_util.erl b/test/test_util.erl new file mode 100644 index 0000000000..7fcf247898 --- /dev/null +++ b/test/test_util.erl @@ -0,0 +1,17 @@ +-module(test_util). + +-export([ + fake_pid/1 + ]). + + +fake_pid(Node) -> + NodeBin = rabbit_data_coercion:to_binary(Node), + ThisNodeSize = size(term_to_binary(node())) + 1, + Pid = spawn(fun () -> ok end), + %% drop the local node data from a local pid + <<_:ThisNodeSize/binary, LocalPidData/binary>> = term_to_binary(Pid), + S = size(NodeBin), + %% replace it with the incoming node binary + Final = <<131,103, 100, S:16/unsigned, NodeBin/binary, LocalPidData/binary>>, + binary_to_term(Final). |
