diff options
| author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2019-03-22 11:59:57 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-03-22 11:59:57 +0100 |
| commit | db82465f8e9c7e79303f0cbc7d10c9227b6876c7 (patch) | |
| tree | d91debb2d443e83b1f611879cfb6ee9064f444f0 /src | |
| parent | 58ca7b99a1092c5b6d73a9263712391eef1b5955 (diff) | |
| parent | 2a26daab5997b9aa41087ea94edf02d510469521 (diff) | |
| download | rabbitmq-server-git-db82465f8e9c7e79303f0cbc7d10c9227b6876c7.tar.gz | |
Merge pull request #1917 from rabbitmq/rabbitmq-server-1906
Quorum Queues: resend messages if first batch is lost
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo_client.erl | 15 |
1 files changed, 8 insertions, 7 deletions
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index a2608bdbc0..a3c241aff2 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -49,6 +49,8 @@ -define(TIMER_TIME, 10000). -type seq() :: non_neg_integer(). +%% last_applied is initialised to -1 +-type maybe_seq() :: integer(). -type action() :: {send_credit_reply, Available :: non_neg_integer()} | {send_drained, CTagCredit :: {rabbit_fifo:consumer_tag(), non_neg_integer()}}. @@ -63,7 +65,10 @@ servers = [] :: [ra_server_id()], leader :: maybe(ra_server_id()), next_seq = 0 :: seq(), - last_applied :: maybe(seq()), + %% Last applied is initialise to -1 to note that no command has yet been + %% applied, but allowing to resend messages if the first ones on the sequence + %% are lost (messages are sent from last_applied + 1) + last_applied = -1 :: maybe_seq(), next_enqueue_seq = 1 :: seq(), %% indicates that we've exceeded the soft limit slow = false :: boolean(), @@ -578,12 +583,8 @@ try_process_command([Server | Rem], Cmd, State) -> seq_applied({Seq, MaybeAction}, {Corrs, Actions0, #state{last_applied = Last} = State0}) - when Seq > Last orelse Last =:= undefined -> - State1 = case Last of - undefined -> State0; - _ -> - do_resends(Last+1, Seq-1, State0) - end, + when Seq > Last -> + State1 = do_resends(Last+1, Seq-1, State0), {Actions, State} = maybe_add_action(MaybeAction, Actions0, State1), case maps:take(Seq, State#state.pending) of {{undefined, _}, Pending} -> |
