diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-02-12 15:38:30 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-02-12 15:38:30 +0000 |
| commit | fb97086d7bd1c309803515789eb91b2e4e254de5 (patch) | |
| tree | e92d415f74f6d10af7e904ac15891965e0b09db7 /src | |
| parent | f3cd4af7924765b6210dbac064dc4ecd1a5f6c95 (diff) | |
| download | rabbitmq-server-git-fb97086d7bd1c309803515789eb91b2e4e254de5.tar.gz | |
rabbif_fifo: refactor and fix tests
Rename rabbit_fifo consumer suspected_down field to status.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 164 |
1 files changed, 84 insertions, 80 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 2580c1bfc0..4f548568be 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -184,7 +184,7 @@ %% command: `{consumer_credit, ReceiverDeliveryCount, Credit}' credit_mode = simple_prefetch :: credit_mode(), % part of snapshot data lifetime = once :: once | auto, - suspected_down = false :: 'cancel' | boolean() + status = up :: up | suspected_down | cancelled }). -type consumer() :: #consumer{}. @@ -193,7 +193,7 @@ {next_seqno = 1 :: msg_seqno(), % out of order enqueues - sorted list pending = [] :: [{msg_seqno(), ra_index(), raw_msg()}], - suspected_down = false :: boolean() + status = up :: up | suspected_down }). -record(state, @@ -463,27 +463,30 @@ apply(_, {down, ConsumerPid, noconnection}, % mark all consumers and enqueuers as suspected down % and monitor the node so that we can find out the final state of the % process at some later point - {Cons, State, Effects1} = maps:fold( - fun({_, P} = K, - #consumer{checked_out = Checked0} = C, - {Co, St0, Eff}) when (node(P) =:= Node) and - (C#consumer.suspected_down =/= cancel)-> - St = return_all(St0, Checked0), - Credit = increase_credit(C, maps:size(Checked0)), - Eff1 = ConsumerUpdateActiveFun(St, K, C, false, suspected_down, Eff), - {maps:put(K, C#consumer{suspected_down = true, - credit = Credit, - checked_out = #{}}, Co), - St, Eff1}; - (K, C, {Co, St, Eff}) -> - {maps:put(K, C, Co), St, Eff} - end, {#{}, State0, []}, Cons0), + {Cons, State, Effects1} = + maps:fold(fun({_, P} = K, + #consumer{checked_out = Checked0} = C, + {Co, St0, Eff}) when (node(P) =:= Node) and + (C#consumer.status =/= cancelled)-> + St = return_all(St0, Checked0), + Credit = increase_credit(C, maps:size(Checked0)), + Eff1 = ConsumerUpdateActiveFun(St, K, C, false, + suspected_down, Eff), + {maps:put(K, + C#consumer{status = suspected_down, + credit = Credit, + checked_out = #{}}, Co), + St, Eff1}; + (K, C, {Co, St, Eff}) -> + {maps:put(K, C, Co), St, Eff} + end, {#{}, State0, []}, Cons0), Enqs = maps:map(fun(P, E) when node(P) =:= Node -> - E#enqueuer{suspected_down = true}; + E#enqueuer{status = suspected_down}; (_, E) -> E end, Enqs0), % mark waiting consumers as suspected if necessary - WaitingConsumers = maybe_mark_suspect_waiting_consumers(Node, State0, true), + WaitingConsumers = update_waiting_consumer_status(Node, State0, + suspected_down), Effects2 = case maps:size(Cons) of 0 -> @@ -516,8 +519,8 @@ apply(Meta, {down, Pid, _Info}, #state{consumers = Cons0, end, {State2, Effects1}, DownConsumers), checkout(Meta, State, Effects); apply(Meta, {nodeup, Node}, #state{consumers = Cons0, - enqueuers = Enqs0, - service_queue = SQ0} = State0) -> + enqueuers = Enqs0, + service_queue = SQ0} = State0) -> %% A node we are monitoring has come back. %% If we have suspected any processes of being %% down we should now re-issue the monitors for them to detect if they're @@ -526,20 +529,20 @@ apply(Meta, {nodeup, Node}, #state{consumers = Cons0, || P <- suspected_pids_for(Node, State0)], % un-suspect waiting consumers when necessary - WaitingConsumers = maybe_mark_suspect_waiting_consumers(Node, State0, - false), + WaitingConsumers = update_waiting_consumer_status(Node, State0, up), Enqs1 = maps:map(fun(P, E) when node(P) =:= Node -> - E#enqueuer{suspected_down = false}; + E#enqueuer{status = up}; (_, E) -> E end, Enqs0), ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), {Cons1, SQ, Effects} = maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc}) - when (node(P) =:= Node) and (C#consumer.suspected_down =/= cancel) -> + when (node(P) =:= Node) and + (C#consumer.status =/= cancelled) -> EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C, true, up, EAcc), update_or_remove_sub( - ConsumerId, C#consumer{suspected_down = false}, + ConsumerId, C#consumer{status = up}, CAcc, SQAcc, EAcc1); (_, _, Acc) -> Acc @@ -585,27 +588,27 @@ handle_waiting_consumer_down(Pid, State = State0#state{waiting_consumers = StillUp}, {Effects, State}. -maybe_mark_suspect_waiting_consumers(_Node, #state{consumer_strategy = default}, - _Suspected) -> +update_waiting_consumer_status(_Node, #state{consumer_strategy = default}, + _Status) -> []; -maybe_mark_suspect_waiting_consumers(_Node, - #state{consumer_strategy = single_active, - waiting_consumers = []}, - _Suspected) -> +update_waiting_consumer_status(_Node, + #state{consumer_strategy = single_active, + waiting_consumers = []}, + _Status) -> []; -maybe_mark_suspect_waiting_consumers(Node, - #state{consumer_strategy = single_active, - waiting_consumers = WaitingConsumers}, - Suspected) -> +update_waiting_consumer_status(Node, + #state{consumer_strategy = single_active, + waiting_consumers = WaitingConsumers}, + Status) -> [begin case node(P) of Node -> - {ConsumerId, Consumer#consumer{suspected_down = Suspected}}; + {ConsumerId, Consumer#consumer{status = Status}}; _ -> {ConsumerId, Consumer} end end || {{_, P} = ConsumerId, Consumer} <- WaitingConsumers, - Consumer#consumer.suspected_down =/= cancel]. + Consumer#consumer.status =/= cancelled]. -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). state_enter(leader, #state{consumers = Cons, @@ -738,12 +741,13 @@ query_consumers(#state{consumers = Consumers, consumer_strategy = ConsumerStrategy } = State) -> ActiveActivityStatusFun = case ConsumerStrategy of default -> - fun(_ConsumerId, #consumer{suspected_down = SuspectedDown}) -> - case SuspectedDown of - true -> - {false, suspected_down}; - false -> - {true, up} + fun(_ConsumerId, + #consumer{status = Status}) -> + case Status of + suspected_down -> + {false, Status}; + _ -> + {true, Status} end end; single_active -> @@ -757,7 +761,7 @@ query_consumers(#state{consumers = Consumers, end end end, - FromConsumers = maps:fold(fun (_, #consumer{suspected_down = cancel}, Acc) -> + FromConsumers = maps:fold(fun (_, #consumer{status = cancelled}, Acc) -> Acc; ({Tag, Pid}, #consumer{meta = Meta} = Consumer, Acc) -> {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer), @@ -772,7 +776,7 @@ query_consumers(#state{consumers = Consumers, Acc) end, #{}, Consumers), FromWaitingConsumers = - lists:foldl(fun ({_, #consumer{suspected_down = cancel}}, Acc) -> + lists:foldl(fun ({_, #consumer{status = cancelled}}, Acc) -> Acc; ({{Tag, Pid}, #consumer{meta = Meta} = Consumer}, Acc) -> {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer), @@ -944,7 +948,7 @@ maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1 update_or_remove_sub(ConsumerId, Consumer#consumer{lifetime = once, credit = 0, - suspected_down = cancel}, + status = cancelled}, C0, SQ0, Effects0), {S0#state{consumers = Cons, service_queue = SQ}, Effects1}; down -> @@ -1324,9 +1328,9 @@ checkout_one(#state{service_queue = SQ0, %% can happen when draining %% recurse without consumer on queue checkout_one(InitState#state{service_queue = SQ1}); - {ok, #consumer{suspected_down = cancel}} -> + {ok, #consumer{status = cancelled}} -> checkout_one(InitState#state{service_queue = SQ1}); - {ok, #consumer{suspected_down = true}} -> + {ok, #consumer{status = suspected_down}} -> checkout_one(InitState#state{service_queue = SQ1}); {ok, #consumer{checked_out = Checked0, next_msg_id = Next, @@ -1381,8 +1385,9 @@ update_or_remove_sub(ConsumerId, #consumer{lifetime = once, case maps:size(Checked) of 0 -> % we're done with this consumer - {maps:remove(ConsumerId, Cons), ServiceQueue, - [{demonitor, process, ConsumerId} | Effects]}; + % TODO: demonitor consumer pid but _only_ if there are no other + % monitors for this pid + {maps:remove(ConsumerId, Cons), ServiceQueue, Effects}; _ -> % there are unsettled items so need to keep around {maps:put(ConsumerId, Con, Cons), ServiceQueue, Effects} @@ -1402,7 +1407,6 @@ uniq_queue_in(Key, Queue) -> queue:in(Key, Queue) end. - update_consumer(ConsumerId, Meta, Spec, #state{consumer_strategy = default} = State0) -> %% general case, single active consumer off @@ -1576,18 +1580,18 @@ message_size(Msg) -> suspected_pids_for(Node, #state{consumers = Cons0, enqueuers = Enqs0, waiting_consumers = WaitingConsumers0}) -> - Cons = maps:fold(fun({_, P}, #consumer{suspected_down = true}, Acc) + Cons = maps:fold(fun({_, P}, #consumer{status = suspected_down}, Acc) when node(P) =:= Node -> [P | Acc]; (_, _, Acc) -> Acc end, [], Cons0), - Enqs = maps:fold(fun(P, #enqueuer{suspected_down = true}, Acc) + Enqs = maps:fold(fun(P, #enqueuer{status = suspected_down}, Acc) when node(P) =:= Node -> [P | Acc]; (_, _, Acc) -> Acc end, Cons, Enqs0), lists:foldl(fun({{_, P}, - #consumer{suspected_down = true}}, Acc) + #consumer{status = suspected_down}}, Acc) when node(P) =:= Node -> [P | Acc]; (_, Acc) -> Acc @@ -1837,10 +1841,11 @@ return_checked_out_test() -> {State0, [_, _]} = enq(1, 1, first, test_init(test)), {State1, [_Monitor, {send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event}, - {aux, active} | _ - ]} = check(Cid, 2, State0), - % return - {_State2, _, [_]} = apply(meta(3), make_return(Cid, [MsgId]), State1), + {aux, active} | _ ]} = check_auto(Cid, 2, State0), + % returning immediately checks out the same message again + {_, ok, [{send_msg, _, {delivery, _, [{_, _}]}, ra_event}, + {aux, active}]} = + apply(meta(3), make_return(Cid, [MsgId]), State1), ok. return_auto_checked_out_test() -> @@ -1867,15 +1872,19 @@ cancelled_checkout_out_test() -> {State00, [_, _]} = enq(1, 1, first, test_init(test)), {State0, [_]} = enq(2, 2, second, State00), {State1, _} = check_auto(Cid, 2, State0), - % cancelled checkout should return all pending messages to queue + % cancelled checkout should not return pending messages to queue {State2, _, _} = apply(meta(3), make_checkout(Cid, cancel, #{}), State1), ?assertEqual(1, maps:size(State2#state.messages)), - ?assertEqual(1, lqueue:len(State2#state.returns)), + ?assertEqual(0, lqueue:len(State2#state.returns)), - {State3, {dequeue, {0, {_, first}}, _}, _} = + {State3, {dequeue, empty}} = apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State2), + %% settle + {State4, ok, _} = + apply(meta(4), make_settle(Cid, [0]), State3), + {_State, {dequeue, {_, {_, second}}, _}, _} = - apply(meta(4), make_checkout(Cid, {dequeue, settled}, #{}), State3), + apply(meta(5), make_checkout(Cid, {dequeue, settled}, #{}), State4), ok. down_with_noproc_consumer_returns_unsettled_test() -> @@ -1937,16 +1946,6 @@ down_with_noproc_enqueuer_is_cleaned_up_test() -> ?assert(0 =:= maps:size(State1#state.enqueuers)), ok. -completed_consumer_yields_demonitor_effect_test() -> - Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()}, - {State0, [_, _]} = enq(1, 1, second, test_init(test)), - {State1, [{monitor, process, _} | _]} = check(Cid, 2, State0), - {_, Effects} = settle(Cid, 3, 0, State1), - ?ASSERT_EFF({demonitor, _, _}, Effects), - % release cursor for empty queue - ?ASSERT_EFF({release_cursor, 3, _}, Effects), - ok. - discarded_message_without_dead_letter_handler_is_removed_test() -> Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()}, {State0, [_, _]} = enq(1, 1, first, test_init(test)), @@ -2322,7 +2321,10 @@ single_active_consumer_test() -> ?assertEqual(1, length(Effects1)), % cancelling the active consumer - {State3, _, Effects2} = apply(meta(3), #checkout{spec = cancel, consumer_id = {<<"ctag1">>, self()}}, State2), + {State3, _, Effects2} = apply(meta(3), + #checkout{spec = cancel, + consumer_id = {<<"ctag1">>, self()}}, + State2), % the second registered consumer is now the active one ?assertEqual(1, map_size(State3#state.consumers)), ?assert(maps:is_key({<<"ctag2">>, self()}, State3#state.consumers)), @@ -2425,15 +2427,16 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti State), NewState end, - State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + State1 = lists:foldl(AddConsumer, State0, + [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), % simulate node goes down {State2, _, _} = apply(#{}, {down, self(), noconnection}, State1), % all the waiting consumers should be suspected down ?assertEqual(3, length(State2#state.waiting_consumers)), - lists:foreach(fun({_, #consumer{suspected_down = SuspectedDown}}) -> - ?assert(SuspectedDown) + lists:foreach(fun({_, #consumer{status = Status}}) -> + ?assert(Status == suspected_down) end, State2#state.waiting_consumers), % simulate node goes back up @@ -2441,8 +2444,8 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti % all the waiting consumers should be un-suspected ?assertEqual(3, length(State3#state.waiting_consumers)), - lists:foreach(fun({_, #consumer{suspected_down = SuspectedDown}}) -> - ?assertNot(SuspectedDown) + lists:foreach(fun({_, #consumer{status = Status}}) -> + ?assert(Status /= suspected_down) end, State3#state.waiting_consumers), ok. @@ -2527,7 +2530,8 @@ query_consumers_test() -> State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), Consumers0 = State1#state.consumers, Consumer = maps:get({<<"ctag2">>, self()}, Consumers0), - Consumers1 = maps:put({<<"ctag2">>, self()}, Consumer#consumer{suspected_down = true}, Consumers0), + Consumers1 = maps:put({<<"ctag2">>, self()}, + Consumer#consumer{status = suspected_down}, Consumers0), State2 = State1#state{consumers = Consumers1}, ?assertEqual(4, query_consumer_count(State2)), |
