diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2019-03-15 15:59:51 +0000 |
|---|---|---|
| committer | Diana Corbacho <diana@rabbitmq.com> | 2019-03-15 15:59:51 +0000 |
| commit | a81ce662d0c06c15cb7d7d3ebd9aa1c328861768 (patch) | |
| tree | 0e6637527ecd6527eae94ebbca7d930e7f2586bb | |
| parent | ed70fa442346c5b026b559a6bfbb19a2120bd2e3 (diff) | |
| download | rabbitmq-server-git-a81ce662d0c06c15cb7d7d3ebd9aa1c328861768.tar.gz | |
Resend messages if first messages are lost
last_applied is now initialised to -1 and resend always happens if
we have an out of order sequence
rabbitmq-server #1906
[#164375485]
| -rw-r--r-- | src/rabbit_fifo_client.erl | 12 |
1 files changed, 5 insertions, 7 deletions
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index a1bf82367b..ba60b9d01c 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -49,6 +49,8 @@ -define(TIMER_TIME, 10000). -type seq() :: non_neg_integer(). +%% last_applied is initialised to -1 +-type maybe_seq() :: integer(). -type action() :: {send_credit_reply, Available :: non_neg_integer()} | {send_drained, CTagCredit :: {rabbit_fifo:consumer_tag(), non_neg_integer()}}. @@ -63,7 +65,7 @@ servers = [] :: [ra_server_id()], leader :: maybe(ra_server_id()), next_seq = 0 :: seq(), - last_applied :: maybe(seq()), + last_applied = -1 :: maybe_seq(), next_enqueue_seq = 1 :: seq(), %% indicates that we've exceeded the soft limit slow = false :: boolean(), @@ -578,12 +580,8 @@ try_process_command([Server | Rem], Cmd, State) -> seq_applied({Seq, MaybeAction}, {Corrs, Actions0, #state{last_applied = Last} = State0}) - when Seq > Last orelse Last =:= undefined -> - State1 = case Last of - undefined -> State0; - _ -> - do_resends(Last+1, Seq-1, State0) - end, + when Seq > Last -> + State1 = do_resends(Last+1, Seq-1, State0), {Actions, State} = maybe_add_action(MaybeAction, Actions0, State1), case maps:take(Seq, State#state.pending) of {{undefined, _}, Pending} -> |
