summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl164
1 files changed, 84 insertions, 80 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 2580c1bfc0..4f548568be 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -184,7 +184,7 @@
%% command: `{consumer_credit, ReceiverDeliveryCount, Credit}'
credit_mode = simple_prefetch :: credit_mode(), % part of snapshot data
lifetime = once :: once | auto,
- suspected_down = false :: 'cancel' | boolean()
+ status = up :: up | suspected_down | cancelled
}).
-type consumer() :: #consumer{}.
@@ -193,7 +193,7 @@
{next_seqno = 1 :: msg_seqno(),
% out of order enqueues - sorted list
pending = [] :: [{msg_seqno(), ra_index(), raw_msg()}],
- suspected_down = false :: boolean()
+ status = up :: up | suspected_down
}).
-record(state,
@@ -463,27 +463,30 @@ apply(_, {down, ConsumerPid, noconnection},
% mark all consumers and enqueuers as suspected down
% and monitor the node so that we can find out the final state of the
% process at some later point
- {Cons, State, Effects1} = maps:fold(
- fun({_, P} = K,
- #consumer{checked_out = Checked0} = C,
- {Co, St0, Eff}) when (node(P) =:= Node) and
- (C#consumer.suspected_down =/= cancel)->
- St = return_all(St0, Checked0),
- Credit = increase_credit(C, maps:size(Checked0)),
- Eff1 = ConsumerUpdateActiveFun(St, K, C, false, suspected_down, Eff),
- {maps:put(K, C#consumer{suspected_down = true,
- credit = Credit,
- checked_out = #{}}, Co),
- St, Eff1};
- (K, C, {Co, St, Eff}) ->
- {maps:put(K, C, Co), St, Eff}
- end, {#{}, State0, []}, Cons0),
+ {Cons, State, Effects1} =
+ maps:fold(fun({_, P} = K,
+ #consumer{checked_out = Checked0} = C,
+ {Co, St0, Eff}) when (node(P) =:= Node) and
+ (C#consumer.status =/= cancelled)->
+ St = return_all(St0, Checked0),
+ Credit = increase_credit(C, maps:size(Checked0)),
+ Eff1 = ConsumerUpdateActiveFun(St, K, C, false,
+ suspected_down, Eff),
+ {maps:put(K,
+ C#consumer{status = suspected_down,
+ credit = Credit,
+ checked_out = #{}}, Co),
+ St, Eff1};
+ (K, C, {Co, St, Eff}) ->
+ {maps:put(K, C, Co), St, Eff}
+ end, {#{}, State0, []}, Cons0),
Enqs = maps:map(fun(P, E) when node(P) =:= Node ->
- E#enqueuer{suspected_down = true};
+ E#enqueuer{status = suspected_down};
(_, E) -> E
end, Enqs0),
% mark waiting consumers as suspected if necessary
- WaitingConsumers = maybe_mark_suspect_waiting_consumers(Node, State0, true),
+ WaitingConsumers = update_waiting_consumer_status(Node, State0,
+ suspected_down),
Effects2 = case maps:size(Cons) of
0 ->
@@ -516,8 +519,8 @@ apply(Meta, {down, Pid, _Info}, #state{consumers = Cons0,
end, {State2, Effects1}, DownConsumers),
checkout(Meta, State, Effects);
apply(Meta, {nodeup, Node}, #state{consumers = Cons0,
- enqueuers = Enqs0,
- service_queue = SQ0} = State0) ->
+ enqueuers = Enqs0,
+ service_queue = SQ0} = State0) ->
%% A node we are monitoring has come back.
%% If we have suspected any processes of being
%% down we should now re-issue the monitors for them to detect if they're
@@ -526,20 +529,20 @@ apply(Meta, {nodeup, Node}, #state{consumers = Cons0,
|| P <- suspected_pids_for(Node, State0)],
% un-suspect waiting consumers when necessary
- WaitingConsumers = maybe_mark_suspect_waiting_consumers(Node, State0,
- false),
+ WaitingConsumers = update_waiting_consumer_status(Node, State0, up),
Enqs1 = maps:map(fun(P, E) when node(P) =:= Node ->
- E#enqueuer{suspected_down = false};
+ E#enqueuer{status = up};
(_, E) -> E
end, Enqs0),
ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
{Cons1, SQ, Effects} =
maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc})
- when (node(P) =:= Node) and (C#consumer.suspected_down =/= cancel) ->
+ when (node(P) =:= Node) and
+ (C#consumer.status =/= cancelled) ->
EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C, true, up, EAcc),
update_or_remove_sub(
- ConsumerId, C#consumer{suspected_down = false},
+ ConsumerId, C#consumer{status = up},
CAcc, SQAcc, EAcc1);
(_, _, Acc) ->
Acc
@@ -585,27 +588,27 @@ handle_waiting_consumer_down(Pid,
State = State0#state{waiting_consumers = StillUp},
{Effects, State}.
-maybe_mark_suspect_waiting_consumers(_Node, #state{consumer_strategy = default},
- _Suspected) ->
+update_waiting_consumer_status(_Node, #state{consumer_strategy = default},
+ _Status) ->
[];
-maybe_mark_suspect_waiting_consumers(_Node,
- #state{consumer_strategy = single_active,
- waiting_consumers = []},
- _Suspected) ->
+update_waiting_consumer_status(_Node,
+ #state{consumer_strategy = single_active,
+ waiting_consumers = []},
+ _Status) ->
[];
-maybe_mark_suspect_waiting_consumers(Node,
- #state{consumer_strategy = single_active,
- waiting_consumers = WaitingConsumers},
- Suspected) ->
+update_waiting_consumer_status(Node,
+ #state{consumer_strategy = single_active,
+ waiting_consumers = WaitingConsumers},
+ Status) ->
[begin
case node(P) of
Node ->
- {ConsumerId, Consumer#consumer{suspected_down = Suspected}};
+ {ConsumerId, Consumer#consumer{status = Status}};
_ ->
{ConsumerId, Consumer}
end
end || {{_, P} = ConsumerId, Consumer} <- WaitingConsumers,
- Consumer#consumer.suspected_down =/= cancel].
+ Consumer#consumer.status =/= cancelled].
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
state_enter(leader, #state{consumers = Cons,
@@ -738,12 +741,13 @@ query_consumers(#state{consumers = Consumers,
consumer_strategy = ConsumerStrategy } = State) ->
ActiveActivityStatusFun = case ConsumerStrategy of
default ->
- fun(_ConsumerId, #consumer{suspected_down = SuspectedDown}) ->
- case SuspectedDown of
- true ->
- {false, suspected_down};
- false ->
- {true, up}
+ fun(_ConsumerId,
+ #consumer{status = Status}) ->
+ case Status of
+ suspected_down ->
+ {false, Status};
+ _ ->
+ {true, Status}
end
end;
single_active ->
@@ -757,7 +761,7 @@ query_consumers(#state{consumers = Consumers,
end
end
end,
- FromConsumers = maps:fold(fun (_, #consumer{suspected_down = cancel}, Acc) ->
+ FromConsumers = maps:fold(fun (_, #consumer{status = cancelled}, Acc) ->
Acc;
({Tag, Pid}, #consumer{meta = Meta} = Consumer, Acc) ->
{Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer),
@@ -772,7 +776,7 @@ query_consumers(#state{consumers = Consumers,
Acc)
end, #{}, Consumers),
FromWaitingConsumers =
- lists:foldl(fun ({_, #consumer{suspected_down = cancel}}, Acc) ->
+ lists:foldl(fun ({_, #consumer{status = cancelled}}, Acc) ->
Acc;
({{Tag, Pid}, #consumer{meta = Meta} = Consumer}, Acc) ->
{Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer),
@@ -944,7 +948,7 @@ maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1
update_or_remove_sub(ConsumerId,
Consumer#consumer{lifetime = once,
credit = 0,
- suspected_down = cancel},
+ status = cancelled},
C0, SQ0, Effects0),
{S0#state{consumers = Cons, service_queue = SQ}, Effects1};
down ->
@@ -1324,9 +1328,9 @@ checkout_one(#state{service_queue = SQ0,
%% can happen when draining
%% recurse without consumer on queue
checkout_one(InitState#state{service_queue = SQ1});
- {ok, #consumer{suspected_down = cancel}} ->
+ {ok, #consumer{status = cancelled}} ->
checkout_one(InitState#state{service_queue = SQ1});
- {ok, #consumer{suspected_down = true}} ->
+ {ok, #consumer{status = suspected_down}} ->
checkout_one(InitState#state{service_queue = SQ1});
{ok, #consumer{checked_out = Checked0,
next_msg_id = Next,
@@ -1381,8 +1385,9 @@ update_or_remove_sub(ConsumerId, #consumer{lifetime = once,
case maps:size(Checked) of
0 ->
% we're done with this consumer
- {maps:remove(ConsumerId, Cons), ServiceQueue,
- [{demonitor, process, ConsumerId} | Effects]};
+ % TODO: demonitor consumer pid but _only_ if there are no other
+ % monitors for this pid
+ {maps:remove(ConsumerId, Cons), ServiceQueue, Effects};
_ ->
% there are unsettled items so need to keep around
{maps:put(ConsumerId, Con, Cons), ServiceQueue, Effects}
@@ -1402,7 +1407,6 @@ uniq_queue_in(Key, Queue) ->
queue:in(Key, Queue)
end.
-
update_consumer(ConsumerId, Meta, Spec,
#state{consumer_strategy = default} = State0) ->
%% general case, single active consumer off
@@ -1576,18 +1580,18 @@ message_size(Msg) ->
suspected_pids_for(Node, #state{consumers = Cons0,
enqueuers = Enqs0,
waiting_consumers = WaitingConsumers0}) ->
- Cons = maps:fold(fun({_, P}, #consumer{suspected_down = true}, Acc)
+ Cons = maps:fold(fun({_, P}, #consumer{status = suspected_down}, Acc)
when node(P) =:= Node ->
[P | Acc];
(_, _, Acc) -> Acc
end, [], Cons0),
- Enqs = maps:fold(fun(P, #enqueuer{suspected_down = true}, Acc)
+ Enqs = maps:fold(fun(P, #enqueuer{status = suspected_down}, Acc)
when node(P) =:= Node ->
[P | Acc];
(_, _, Acc) -> Acc
end, Cons, Enqs0),
lists:foldl(fun({{_, P},
- #consumer{suspected_down = true}}, Acc)
+ #consumer{status = suspected_down}}, Acc)
when node(P) =:= Node ->
[P | Acc];
(_, Acc) -> Acc
@@ -1837,10 +1841,11 @@ return_checked_out_test() ->
{State0, [_, _]} = enq(1, 1, first, test_init(test)),
{State1, [_Monitor,
{send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event},
- {aux, active} | _
- ]} = check(Cid, 2, State0),
- % return
- {_State2, _, [_]} = apply(meta(3), make_return(Cid, [MsgId]), State1),
+ {aux, active} | _ ]} = check_auto(Cid, 2, State0),
+ % returning immediately checks out the same message again
+ {_, ok, [{send_msg, _, {delivery, _, [{_, _}]}, ra_event},
+ {aux, active}]} =
+ apply(meta(3), make_return(Cid, [MsgId]), State1),
ok.
return_auto_checked_out_test() ->
@@ -1867,15 +1872,19 @@ cancelled_checkout_out_test() ->
{State00, [_, _]} = enq(1, 1, first, test_init(test)),
{State0, [_]} = enq(2, 2, second, State00),
{State1, _} = check_auto(Cid, 2, State0),
- % cancelled checkout should return all pending messages to queue
+ % cancelled checkout should not return pending messages to queue
{State2, _, _} = apply(meta(3), make_checkout(Cid, cancel, #{}), State1),
?assertEqual(1, maps:size(State2#state.messages)),
- ?assertEqual(1, lqueue:len(State2#state.returns)),
+ ?assertEqual(0, lqueue:len(State2#state.returns)),
- {State3, {dequeue, {0, {_, first}}, _}, _} =
+ {State3, {dequeue, empty}} =
apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State2),
+ %% settle
+ {State4, ok, _} =
+ apply(meta(4), make_settle(Cid, [0]), State3),
+
{_State, {dequeue, {_, {_, second}}, _}, _} =
- apply(meta(4), make_checkout(Cid, {dequeue, settled}, #{}), State3),
+ apply(meta(5), make_checkout(Cid, {dequeue, settled}, #{}), State4),
ok.
down_with_noproc_consumer_returns_unsettled_test() ->
@@ -1937,16 +1946,6 @@ down_with_noproc_enqueuer_is_cleaned_up_test() ->
?assert(0 =:= maps:size(State1#state.enqueuers)),
ok.
-completed_consumer_yields_demonitor_effect_test() ->
- Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()},
- {State0, [_, _]} = enq(1, 1, second, test_init(test)),
- {State1, [{monitor, process, _} | _]} = check(Cid, 2, State0),
- {_, Effects} = settle(Cid, 3, 0, State1),
- ?ASSERT_EFF({demonitor, _, _}, Effects),
- % release cursor for empty queue
- ?ASSERT_EFF({release_cursor, 3, _}, Effects),
- ok.
-
discarded_message_without_dead_letter_handler_is_removed_test() ->
Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()},
{State0, [_, _]} = enq(1, 1, first, test_init(test)),
@@ -2322,7 +2321,10 @@ single_active_consumer_test() ->
?assertEqual(1, length(Effects1)),
% cancelling the active consumer
- {State3, _, Effects2} = apply(meta(3), #checkout{spec = cancel, consumer_id = {<<"ctag1">>, self()}}, State2),
+ {State3, _, Effects2} = apply(meta(3),
+ #checkout{spec = cancel,
+ consumer_id = {<<"ctag1">>, self()}},
+ State2),
% the second registered consumer is now the active one
?assertEqual(1, map_size(State3#state.consumers)),
?assert(maps:is_key({<<"ctag2">>, self()}, State3#state.consumers)),
@@ -2425,15 +2427,16 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti
State),
NewState
end,
- State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
+ State1 = lists:foldl(AddConsumer, State0,
+ [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
% simulate node goes down
{State2, _, _} = apply(#{}, {down, self(), noconnection}, State1),
% all the waiting consumers should be suspected down
?assertEqual(3, length(State2#state.waiting_consumers)),
- lists:foreach(fun({_, #consumer{suspected_down = SuspectedDown}}) ->
- ?assert(SuspectedDown)
+ lists:foreach(fun({_, #consumer{status = Status}}) ->
+ ?assert(Status == suspected_down)
end, State2#state.waiting_consumers),
% simulate node goes back up
@@ -2441,8 +2444,8 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti
% all the waiting consumers should be un-suspected
?assertEqual(3, length(State3#state.waiting_consumers)),
- lists:foreach(fun({_, #consumer{suspected_down = SuspectedDown}}) ->
- ?assertNot(SuspectedDown)
+ lists:foreach(fun({_, #consumer{status = Status}}) ->
+ ?assert(Status /= suspected_down)
end, State3#state.waiting_consumers),
ok.
@@ -2527,7 +2530,8 @@ query_consumers_test() ->
State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
Consumers0 = State1#state.consumers,
Consumer = maps:get({<<"ctag2">>, self()}, Consumers0),
- Consumers1 = maps:put({<<"ctag2">>, self()}, Consumer#consumer{suspected_down = true}, Consumers0),
+ Consumers1 = maps:put({<<"ctag2">>, self()},
+ Consumer#consumer{status = suspected_down}, Consumers0),
State2 = State1#state{consumers = Consumers1},
?assertEqual(4, query_consumer_count(State2)),