summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo_client.erl15
1 files changed, 8 insertions, 7 deletions
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index a2608bdbc0..a3c241aff2 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -49,6 +49,8 @@
-define(TIMER_TIME, 10000).
-type seq() :: non_neg_integer().
+%% last_applied is initialised to -1
+-type maybe_seq() :: integer().
-type action() :: {send_credit_reply, Available :: non_neg_integer()} |
{send_drained, CTagCredit ::
{rabbit_fifo:consumer_tag(), non_neg_integer()}}.
@@ -63,7 +65,10 @@
servers = [] :: [ra_server_id()],
leader :: maybe(ra_server_id()),
next_seq = 0 :: seq(),
- last_applied :: maybe(seq()),
+ %% Last applied is initialise to -1 to note that no command has yet been
+ %% applied, but allowing to resend messages if the first ones on the sequence
+ %% are lost (messages are sent from last_applied + 1)
+ last_applied = -1 :: maybe_seq(),
next_enqueue_seq = 1 :: seq(),
%% indicates that we've exceeded the soft limit
slow = false :: boolean(),
@@ -578,12 +583,8 @@ try_process_command([Server | Rem], Cmd, State) ->
seq_applied({Seq, MaybeAction},
{Corrs, Actions0, #state{last_applied = Last} = State0})
- when Seq > Last orelse Last =:= undefined ->
- State1 = case Last of
- undefined -> State0;
- _ ->
- do_resends(Last+1, Seq-1, State0)
- end,
+ when Seq > Last ->
+ State1 = do_resends(Last+1, Seq-1, State0),
{Actions, State} = maybe_add_action(MaybeAction, Actions0, State1),
case maps:take(Seq, State#state.pending) of
{{undefined, _}, Pending} ->