diff options
| -rw-r--r-- | src/rabbit_fifo.erl | 89 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 26 |
2 files changed, 89 insertions, 26 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index f96f59639c..a455264560 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -456,20 +456,26 @@ apply(_, {update_state, Conf}, Effects, State) -> {update_state(Conf, State), Effects, ok}. -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). -state_enter(leader, #state{consumers = Custs, +state_enter(leader, #state{consumers = Cons, enqueuers = Enqs, name = Name, + prefix_msg_count = 0, become_leader_handler = BLH}) -> % return effects to monitor all current consumers and enqueuers - ConMons = [{monitor, process, P} || {_, P} <- maps:keys(Custs)], - EnqMons = [{monitor, process, P} || P <- maps:keys(Enqs)], - Effects = ConMons ++ EnqMons, + Pids = lists:usort(maps:keys(Enqs) ++ [P || {_, P} <- maps:keys(Cons)]), + Mons = [{monitor, process, P} || P <- Pids], + Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids], + Effects = Mons ++ Nots, case BLH of undefined -> Effects; {Mod, Fun, Args} -> [{mod_call, Mod, Fun, Args ++ [Name]} | Effects] end; +state_enter(recovered, #state{prefix_msg_count = PrefixMsgCount}) + when PrefixMsgCount =/= 0 -> + %% TODO: remove assertion? + exit({rabbit_fifo, unexpected_prefix_msg_count, PrefixMsgCount}); state_enter(eol, #state{enqueuers = Enqs, consumers = Custs0}) -> Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0), [{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, Custs))]; @@ -1020,6 +1026,7 @@ dehydrate_state(#state{messages = Messages0, C#consumer{checked_out = #{}} end, Consumers), returns = queue:new(), + %% messages include returns prefix_msg_count = maps:size(Messages0) + MsgCount}. @@ -1456,22 +1463,49 @@ snapshot_recover_test() -> ], run_snapshot_test(?FUNCTION_NAME, Commands). -enq_deq_return_snapshot_recover_test() -> +enq_deq_return_settle_snapshot_test() -> Tag = atom_to_binary(?FUNCTION_NAME, utf8), Cid = {Tag, self()}, - % OthPid = spawn(fun () -> ok end), - % Oth = {<<"oth">>, OthPid}, Commands = [ {enqueue, self(), 1, one}, %% to Cid {checkout, {auto, 1, simple_prefetch}, Cid}, - % {checkout, {auto, 1, simple_prefetch}, Oth}, - {return, [0], Cid}, %% should be re-delivered to Oth - {enqueue, self(), 2, two}, %% Cid prefix_msg_count: 1 - % {enqueue, self(), 3, three}, %% Queued: prefetch_msg_count: 2? - % {settle, [0], Oth}, + {return, [0], Cid}, %% should be re-delivered to Cid + {enqueue, self(), 2, two}, %% Cid prefix_msg_count: 2 {settle, [1], Cid}, {settle, [2], Cid} - % purge + ], + run_snapshot_test(?FUNCTION_NAME, Commands). + +return_prefix_msg_count_test() -> + Tag = atom_to_binary(?FUNCTION_NAME, utf8), + Cid = {Tag, self()}, + Commands = [ + {enqueue, self(), 1, one}, + {checkout, {auto, 1, simple_prefetch}, Cid}, + {checkout, cancel, Cid}, + {enqueue, self(), 2, two} %% Cid prefix_msg_count: 2 + ], + Indexes = lists:seq(1, length(Commands)), + Entries = lists:zip(Indexes, Commands), + {State, _Effects} = run_log(test_init(?FUNCTION_NAME), Entries), + ?debugFmt("return_prefix_msg_count_test state ~n~p~n", [State]), + ok. + + +return_settle_snapshot_test() -> + Tag = atom_to_binary(?FUNCTION_NAME, utf8), + Cid = {Tag, self()}, + Commands = [ + {enqueue, self(), 1, one}, %% to Cid + {checkout, {auto, 1, simple_prefetch}, Cid}, + {return, [0], Cid}, %% should be re-delivered to Oth + {enqueue, self(), 2, two}, %% Cid prefix_msg_count: 2 + {settle, [1], Cid}, + {return, [2], Cid}, + {settle, [3], Cid}, + {enqueue, self(), 3, three}, + purge, + {enqueue, self(), 4, four} ], run_snapshot_test(?FUNCTION_NAME, Commands). @@ -1577,19 +1611,30 @@ state_enter_test() -> [{mod_call, m, f, [a, the_name]}] = state_enter(leader, S0), ok. -leader_monitors_on_state_enter_test() -> - Cid = {<<"cid">>, self()}, - {State0, [_, _]} = enq(1, 1, first, test_init(test)), - {State1, _} = check_auto(Cid, 2, State0), +state_enter_montors_and_notifications_test() -> + Oth = spawn(fun () -> ok end), + {State0, _} = enq(1, 1, first, test_init(test)), + Cid = {<<"adf">>, self()}, + OthCid = {<<"oth">>, Oth}, + {State1, _} = check(Cid, 2, State0), + {State, _} = check(OthCid, 3, State1), Self = self(), - %% as we have an enqueuer _and_ a consumer we chould - %% get two monitor effects in total, even if they are for the same - %% processs + Effects = state_enter(leader, State), + + %% monitor all enqueuers and consumers [{monitor, process, Self}, - {monitor, process, Self}] = state_enter(leader, State1), + {monitor, process, Oth}] = + lists:filter(fun ({monitor, process, _}) -> true; + (_) -> false + end, Effects), + [{send_msg, Self, leader_change, ra_event}, + {send_msg, Oth, leader_change, ra_event}] = + lists:filter(fun ({send_msg, _, leader_change, ra_event}) -> true; + (_) -> false + end, Effects), + ?ASSERT_EFF({monitor, process, _}, Effects), ok. - purge_test() -> Cid = {<<"purge_test">>, self()}, {State1, _} = enq(1, 1, first, test_init(test)), diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index 8742d74b3b..fc35d26e0f 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -442,9 +442,8 @@ handle_ra_event(From, {applied, Seqs}, CurLeader -> ok; _ -> ?INFO("rabbit_fifo_client: leader change from ~w to ~w~n" - "applying ~w last ~w~n" - "STate1 ~p~n", - [CurLeader, From, Seqs, _Last, State1]) + "applying ~w last ~w~n", + [CurLeader, From, Seqs, _Last]) end, case maps:size(State1#state.pending) < SftLmt of true when State1#state.slow == true -> @@ -476,6 +475,18 @@ handle_ra_event(From, {applied, Seqs}, end; handle_ra_event(Leader, {machine, {delivery, _ConsumerTag, _} = Del}, State0) -> handle_delivery(Leader, Del, State0); +handle_ra_event(Leader, {machine, leader_change}, + #state{leader = Leader} = State) -> + %% leader already known + {internal, [], [], State}; +handle_ra_event(Leader, {machine, leader_change}, + #state{leader = OldLeader} = State0) -> + ?INFO("rabbit_fifo_client: leader changed from ~w to ~w~n", + [OldLeader, Leader]), + %% we need to update leader + %% and resend any pending commands + State = resend_all_pending(State0#state{leader = Leader}), + {internal, [], [], State}; handle_ra_event(_From, {rejected, {not_leader, undefined, _Seq}}, State0) -> % TODO: how should these be handled? re-sent on timer or try random {internal, [], [], State0}; @@ -541,7 +552,9 @@ seq_applied({Seq, MaybeAction}, error -> ?INFO("rabbit_fifo_client: pending not found ~w", [Seq]), % must have already been resent or removed for some other reason - {Corrs, Actions, State} + % still need to update last_applied or we may inadvertently resend + % stuff later + {Corrs, Actions, State#state{last_applied = Seq}} end; seq_applied(_Seq, Acc) -> ?INFO("rabbit_fifo_client: dropping seq ~b", [_Seq]), @@ -581,6 +594,11 @@ resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) -> State end. +resend_all_pending(#state{pending = Pend} = State) -> + Seqs = lists:sort(maps:keys(Pend)), + ?INFO ("rabbit_fifo_client: resending all ~w~n", [Seqs]), + lists:foldl(fun resend/2, State, Seqs). + handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0, #state{consumer_deliveries = CDels0} = State0) -> {LastId, _} = lists:last(IdMsgs), |
