diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo_client.erl | 18 |
1 files changed, 9 insertions, 9 deletions
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index 07068c006b..918714bbf9 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -534,8 +534,8 @@ handle_ra_event(From, {applied, Seqs}, _ -> {internal, lists:reverse(Corrs), lists:reverse(Actions), State1} end; -handle_ra_event(Leader, {machine, {delivery, _ConsumerTag, _} = Del}, State0) -> - handle_delivery(Leader, Del, State0); +handle_ra_event(From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) -> + handle_delivery(From, Del, State0); handle_ra_event(Leader, {machine, leader_change}, #state{leader = Leader} = State) -> %% leader already known @@ -546,11 +546,9 @@ handle_ra_event(Leader, {machine, leader_change}, State0) -> State = resend_all_pending(State0#state{leader = Leader}), {internal, [], [], State}; handle_ra_event(_From, {rejected, {not_leader, undefined, _Seq}}, State0) -> - % TODO: how should these be handled? re-sent on timer or try random - {internal, [], [], State0}; + % set timer to try find leder and resend + {internal, [], [], set_timer(State0)}; handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) -> - % ?INFO("rabbit_fifo_client: rejected ~b not leader ~w leader: ~w~n", - % [Seq, From, Leader]), State1 = State0#state{leader = Leader}, State = resend(Seq, State1), {internal, [], [], State}; @@ -658,9 +656,11 @@ resend_all_pending(#state{pending = Pend} = State) -> Seqs = lists:sort(maps:keys(Pend)), lists:foldl(fun resend/2, State, Seqs). -handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0, +handle_delivery(From, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0, #state{consumer_deliveries = CDels0} = State0) -> {LastId, _} = lists:last(IdMsgs), + %% NB: deliveries may not be from the leader so we will not update the + %% tracked leader id here %% TODO: remove potential default allocation case maps:get(Tag, CDels0, #consumer{last_msg_id = -1}) of #consumer{last_msg_id = Prev} = C @@ -677,7 +677,7 @@ handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0, %% When the node is disconnected the leader will return all checked %% out messages to the main queue to ensure they don't get stuck in %% case the node never comes back. - Missing = get_missing_deliveries(Leader, Prev+1, FstId-1, Tag), + Missing = get_missing_deliveries(From, Prev+1, FstId-1, Tag), Del = {delivery, Tag, Missing ++ IdMsgs}, {Del, State0#state{consumer_deliveries = update_consumer(Tag, LastId, @@ -689,7 +689,7 @@ handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0, [] -> {internal, [], [], State0}; IdMsgs2 -> - handle_delivery(Leader, {delivery, Tag, IdMsgs2}, State0) + handle_delivery(From, {delivery, Tag, IdMsgs2}, State0) end; _ when FstId =:= 0 -> % the very first delivery |
