diff options
| author | Gerhard Lazu <gerhard@users.noreply.github.com> | 2020-04-02 12:18:38 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-04-02 12:18:38 +0100 |
| commit | ee4b615fa84a5969aabcef1f7a2c7cc50a9f73ba (patch) | |
| tree | 9f582cb726adbc95bceb628773f719dcc9da37b7 | |
| parent | ebd54924e9628da5172fb91dfa519b5fe07993f8 (diff) | |
| parent | d289b7c4553b0467068af11021f21bb36dc0cb6f (diff) | |
| download | rabbitmq-server-git-ee4b615fa84a5969aabcef1f7a2c7cc50a9f73ba.tar.gz | |
Merge pull request #2295 from rabbitmq/rabbit-fifo-fix
rabbit_fifo: set timer when publish rejected
| -rw-r--r-- | src/rabbit_fifo_client.erl | 18 |
1 files changed, 9 insertions, 9 deletions
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index 07068c006b..918714bbf9 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -534,8 +534,8 @@ handle_ra_event(From, {applied, Seqs}, _ -> {internal, lists:reverse(Corrs), lists:reverse(Actions), State1} end; -handle_ra_event(Leader, {machine, {delivery, _ConsumerTag, _} = Del}, State0) -> - handle_delivery(Leader, Del, State0); +handle_ra_event(From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) -> + handle_delivery(From, Del, State0); handle_ra_event(Leader, {machine, leader_change}, #state{leader = Leader} = State) -> %% leader already known @@ -546,11 +546,9 @@ handle_ra_event(Leader, {machine, leader_change}, State0) -> State = resend_all_pending(State0#state{leader = Leader}), {internal, [], [], State}; handle_ra_event(_From, {rejected, {not_leader, undefined, _Seq}}, State0) -> - % TODO: how should these be handled? re-sent on timer or try random - {internal, [], [], State0}; + % set timer to try find leder and resend + {internal, [], [], set_timer(State0)}; handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) -> - % ?INFO("rabbit_fifo_client: rejected ~b not leader ~w leader: ~w~n", - % [Seq, From, Leader]), State1 = State0#state{leader = Leader}, State = resend(Seq, State1), {internal, [], [], State}; @@ -658,9 +656,11 @@ resend_all_pending(#state{pending = Pend} = State) -> Seqs = lists:sort(maps:keys(Pend)), lists:foldl(fun resend/2, State, Seqs). -handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0, +handle_delivery(From, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0, #state{consumer_deliveries = CDels0} = State0) -> {LastId, _} = lists:last(IdMsgs), + %% NB: deliveries may not be from the leader so we will not update the + %% tracked leader id here %% TODO: remove potential default allocation case maps:get(Tag, CDels0, #consumer{last_msg_id = -1}) of #consumer{last_msg_id = Prev} = C @@ -677,7 +677,7 @@ handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0, %% When the node is disconnected the leader will return all checked %% out messages to the main queue to ensure they don't get stuck in %% case the node never comes back. - Missing = get_missing_deliveries(Leader, Prev+1, FstId-1, Tag), + Missing = get_missing_deliveries(From, Prev+1, FstId-1, Tag), Del = {delivery, Tag, Missing ++ IdMsgs}, {Del, State0#state{consumer_deliveries = update_consumer(Tag, LastId, @@ -689,7 +689,7 @@ handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0, [] -> {internal, [], [], State0}; IdMsgs2 -> - handle_delivery(Leader, {delivery, Tag, IdMsgs2}, State0) + handle_delivery(From, {delivery, Tag, IdMsgs2}, State0) end; _ when FstId =:= 0 -> % the very first delivery |
