summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGerhard Lazu <gerhard@users.noreply.github.com>2020-04-02 12:18:38 +0100
committerGitHub <noreply@github.com>2020-04-02 12:18:38 +0100
commitee4b615fa84a5969aabcef1f7a2c7cc50a9f73ba (patch)
tree9f582cb726adbc95bceb628773f719dcc9da37b7
parentebd54924e9628da5172fb91dfa519b5fe07993f8 (diff)
parentd289b7c4553b0467068af11021f21bb36dc0cb6f (diff)
downloadrabbitmq-server-git-ee4b615fa84a5969aabcef1f7a2c7cc50a9f73ba.tar.gz
Merge pull request #2295 from rabbitmq/rabbit-fifo-fix
rabbit_fifo: set timer when publish rejected
-rw-r--r--src/rabbit_fifo_client.erl18
1 files changed, 9 insertions, 9 deletions
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index 07068c006b..918714bbf9 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -534,8 +534,8 @@ handle_ra_event(From, {applied, Seqs},
_ ->
{internal, lists:reverse(Corrs), lists:reverse(Actions), State1}
end;
-handle_ra_event(Leader, {machine, {delivery, _ConsumerTag, _} = Del}, State0) ->
- handle_delivery(Leader, Del, State0);
+handle_ra_event(From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) ->
+ handle_delivery(From, Del, State0);
handle_ra_event(Leader, {machine, leader_change},
#state{leader = Leader} = State) ->
%% leader already known
@@ -546,11 +546,9 @@ handle_ra_event(Leader, {machine, leader_change}, State0) ->
State = resend_all_pending(State0#state{leader = Leader}),
{internal, [], [], State};
handle_ra_event(_From, {rejected, {not_leader, undefined, _Seq}}, State0) ->
- % TODO: how should these be handled? re-sent on timer or try random
- {internal, [], [], State0};
+ % set timer to try find leder and resend
+ {internal, [], [], set_timer(State0)};
handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) ->
- % ?INFO("rabbit_fifo_client: rejected ~b not leader ~w leader: ~w~n",
- % [Seq, From, Leader]),
State1 = State0#state{leader = Leader},
State = resend(Seq, State1),
{internal, [], [], State};
@@ -658,9 +656,11 @@ resend_all_pending(#state{pending = Pend} = State) ->
Seqs = lists:sort(maps:keys(Pend)),
lists:foldl(fun resend/2, State, Seqs).
-handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0,
+handle_delivery(From, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0,
#state{consumer_deliveries = CDels0} = State0) ->
{LastId, _} = lists:last(IdMsgs),
+ %% NB: deliveries may not be from the leader so we will not update the
+ %% tracked leader id here
%% TODO: remove potential default allocation
case maps:get(Tag, CDels0, #consumer{last_msg_id = -1}) of
#consumer{last_msg_id = Prev} = C
@@ -677,7 +677,7 @@ handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0,
%% When the node is disconnected the leader will return all checked
%% out messages to the main queue to ensure they don't get stuck in
%% case the node never comes back.
- Missing = get_missing_deliveries(Leader, Prev+1, FstId-1, Tag),
+ Missing = get_missing_deliveries(From, Prev+1, FstId-1, Tag),
Del = {delivery, Tag, Missing ++ IdMsgs},
{Del, State0#state{consumer_deliveries =
update_consumer(Tag, LastId,
@@ -689,7 +689,7 @@ handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0,
[] ->
{internal, [], [], State0};
IdMsgs2 ->
- handle_delivery(Leader, {delivery, Tag, IdMsgs2}, State0)
+ handle_delivery(From, {delivery, Tag, IdMsgs2}, State0)
end;
_ when FstId =:= 0 ->
% the very first delivery