summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2018-11-29 13:57:45 +0000
committerkjnilsson <knilsson@pivotal.io>2018-12-04 17:15:33 +0000
commita4c00475eb6cf261ac7775244e518fc63ea8fa94 (patch)
tree190d4bac24e6fa98b1c3532747741a9331f23ea1
parent735810edf529605143fcc9b3544e1703f55b0d90 (diff)
downloadrabbitmq-server-git-a4c00475eb6cf261ac7775244e518fc63ea8fa94.tar.gz
Notify all enqueuers and consuers on leader change
This should ensure liveness and avoid missed confirms when the leader changes.
-rw-r--r--src/rabbit_fifo.erl89
-rw-r--r--src/rabbit_fifo_client.erl26
2 files changed, 89 insertions, 26 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index f96f59639c..a455264560 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -456,20 +456,26 @@ apply(_, {update_state, Conf}, Effects, State) ->
{update_state(Conf, State), Effects, ok}.
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
-state_enter(leader, #state{consumers = Custs,
+state_enter(leader, #state{consumers = Cons,
enqueuers = Enqs,
name = Name,
+ prefix_msg_count = 0,
become_leader_handler = BLH}) ->
% return effects to monitor all current consumers and enqueuers
- ConMons = [{monitor, process, P} || {_, P} <- maps:keys(Custs)],
- EnqMons = [{monitor, process, P} || P <- maps:keys(Enqs)],
- Effects = ConMons ++ EnqMons,
+ Pids = lists:usort(maps:keys(Enqs) ++ [P || {_, P} <- maps:keys(Cons)]),
+ Mons = [{monitor, process, P} || P <- Pids],
+ Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids],
+ Effects = Mons ++ Nots,
case BLH of
undefined ->
Effects;
{Mod, Fun, Args} ->
[{mod_call, Mod, Fun, Args ++ [Name]} | Effects]
end;
+state_enter(recovered, #state{prefix_msg_count = PrefixMsgCount})
+ when PrefixMsgCount =/= 0 ->
+ %% TODO: remove assertion?
+ exit({rabbit_fifo, unexpected_prefix_msg_count, PrefixMsgCount});
state_enter(eol, #state{enqueuers = Enqs, consumers = Custs0}) ->
Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0),
[{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, Custs))];
@@ -1020,6 +1026,7 @@ dehydrate_state(#state{messages = Messages0,
C#consumer{checked_out = #{}}
end, Consumers),
returns = queue:new(),
+ %% messages include returns
prefix_msg_count = maps:size(Messages0) + MsgCount}.
@@ -1456,22 +1463,49 @@ snapshot_recover_test() ->
],
run_snapshot_test(?FUNCTION_NAME, Commands).
-enq_deq_return_snapshot_recover_test() ->
+enq_deq_return_settle_snapshot_test() ->
Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
- % OthPid = spawn(fun () -> ok end),
- % Oth = {<<"oth">>, OthPid},
Commands = [
{enqueue, self(), 1, one}, %% to Cid
{checkout, {auto, 1, simple_prefetch}, Cid},
- % {checkout, {auto, 1, simple_prefetch}, Oth},
- {return, [0], Cid}, %% should be re-delivered to Oth
- {enqueue, self(), 2, two}, %% Cid prefix_msg_count: 1
- % {enqueue, self(), 3, three}, %% Queued: prefetch_msg_count: 2?
- % {settle, [0], Oth},
+ {return, [0], Cid}, %% should be re-delivered to Cid
+ {enqueue, self(), 2, two}, %% Cid prefix_msg_count: 2
{settle, [1], Cid},
{settle, [2], Cid}
- % purge
+ ],
+ run_snapshot_test(?FUNCTION_NAME, Commands).
+
+return_prefix_msg_count_test() ->
+ Tag = atom_to_binary(?FUNCTION_NAME, utf8),
+ Cid = {Tag, self()},
+ Commands = [
+ {enqueue, self(), 1, one},
+ {checkout, {auto, 1, simple_prefetch}, Cid},
+ {checkout, cancel, Cid},
+ {enqueue, self(), 2, two} %% Cid prefix_msg_count: 2
+ ],
+ Indexes = lists:seq(1, length(Commands)),
+ Entries = lists:zip(Indexes, Commands),
+ {State, _Effects} = run_log(test_init(?FUNCTION_NAME), Entries),
+ ?debugFmt("return_prefix_msg_count_test state ~n~p~n", [State]),
+ ok.
+
+
+return_settle_snapshot_test() ->
+ Tag = atom_to_binary(?FUNCTION_NAME, utf8),
+ Cid = {Tag, self()},
+ Commands = [
+ {enqueue, self(), 1, one}, %% to Cid
+ {checkout, {auto, 1, simple_prefetch}, Cid},
+ {return, [0], Cid}, %% should be re-delivered to Oth
+ {enqueue, self(), 2, two}, %% Cid prefix_msg_count: 2
+ {settle, [1], Cid},
+ {return, [2], Cid},
+ {settle, [3], Cid},
+ {enqueue, self(), 3, three},
+ purge,
+ {enqueue, self(), 4, four}
],
run_snapshot_test(?FUNCTION_NAME, Commands).
@@ -1577,19 +1611,30 @@ state_enter_test() ->
[{mod_call, m, f, [a, the_name]}] = state_enter(leader, S0),
ok.
-leader_monitors_on_state_enter_test() ->
- Cid = {<<"cid">>, self()},
- {State0, [_, _]} = enq(1, 1, first, test_init(test)),
- {State1, _} = check_auto(Cid, 2, State0),
+state_enter_montors_and_notifications_test() ->
+ Oth = spawn(fun () -> ok end),
+ {State0, _} = enq(1, 1, first, test_init(test)),
+ Cid = {<<"adf">>, self()},
+ OthCid = {<<"oth">>, Oth},
+ {State1, _} = check(Cid, 2, State0),
+ {State, _} = check(OthCid, 3, State1),
Self = self(),
- %% as we have an enqueuer _and_ a consumer we chould
- %% get two monitor effects in total, even if they are for the same
- %% processs
+ Effects = state_enter(leader, State),
+
+ %% monitor all enqueuers and consumers
[{monitor, process, Self},
- {monitor, process, Self}] = state_enter(leader, State1),
+ {monitor, process, Oth}] =
+ lists:filter(fun ({monitor, process, _}) -> true;
+ (_) -> false
+ end, Effects),
+ [{send_msg, Self, leader_change, ra_event},
+ {send_msg, Oth, leader_change, ra_event}] =
+ lists:filter(fun ({send_msg, _, leader_change, ra_event}) -> true;
+ (_) -> false
+ end, Effects),
+ ?ASSERT_EFF({monitor, process, _}, Effects),
ok.
-
purge_test() ->
Cid = {<<"purge_test">>, self()},
{State1, _} = enq(1, 1, first, test_init(test)),
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index 8742d74b3b..fc35d26e0f 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -442,9 +442,8 @@ handle_ra_event(From, {applied, Seqs},
CurLeader -> ok;
_ ->
?INFO("rabbit_fifo_client: leader change from ~w to ~w~n"
- "applying ~w last ~w~n"
- "STate1 ~p~n",
- [CurLeader, From, Seqs, _Last, State1])
+ "applying ~w last ~w~n",
+ [CurLeader, From, Seqs, _Last])
end,
case maps:size(State1#state.pending) < SftLmt of
true when State1#state.slow == true ->
@@ -476,6 +475,18 @@ handle_ra_event(From, {applied, Seqs},
end;
handle_ra_event(Leader, {machine, {delivery, _ConsumerTag, _} = Del}, State0) ->
handle_delivery(Leader, Del, State0);
+handle_ra_event(Leader, {machine, leader_change},
+ #state{leader = Leader} = State) ->
+ %% leader already known
+ {internal, [], [], State};
+handle_ra_event(Leader, {machine, leader_change},
+ #state{leader = OldLeader} = State0) ->
+ ?INFO("rabbit_fifo_client: leader changed from ~w to ~w~n",
+ [OldLeader, Leader]),
+ %% we need to update leader
+ %% and resend any pending commands
+ State = resend_all_pending(State0#state{leader = Leader}),
+ {internal, [], [], State};
handle_ra_event(_From, {rejected, {not_leader, undefined, _Seq}}, State0) ->
% TODO: how should these be handled? re-sent on timer or try random
{internal, [], [], State0};
@@ -541,7 +552,9 @@ seq_applied({Seq, MaybeAction},
error ->
?INFO("rabbit_fifo_client: pending not found ~w", [Seq]),
% must have already been resent or removed for some other reason
- {Corrs, Actions, State}
+ % still need to update last_applied or we may inadvertently resend
+ % stuff later
+ {Corrs, Actions, State#state{last_applied = Seq}}
end;
seq_applied(_Seq, Acc) ->
?INFO("rabbit_fifo_client: dropping seq ~b", [_Seq]),
@@ -581,6 +594,11 @@ resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) ->
State
end.
+resend_all_pending(#state{pending = Pend} = State) ->
+ Seqs = lists:sort(maps:keys(Pend)),
+ ?INFO ("rabbit_fifo_client: resending all ~w~n", [Seqs]),
+ lists:foldl(fun resend/2, State, Seqs).
+
handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0,
#state{consumer_deliveries = CDels0} = State0) ->
{LastId, _} = lists:last(IdMsgs),