summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2019-03-15 15:59:51 +0000
committerDiana Corbacho <diana@rabbitmq.com>2019-03-15 15:59:51 +0000
commita81ce662d0c06c15cb7d7d3ebd9aa1c328861768 (patch)
tree0e6637527ecd6527eae94ebbca7d930e7f2586bb
parented70fa442346c5b026b559a6bfbb19a2120bd2e3 (diff)
downloadrabbitmq-server-git-a81ce662d0c06c15cb7d7d3ebd9aa1c328861768.tar.gz
Resend messages if first messages are lost
last_applied is now initialised to -1 and resend always happens if we have an out of order sequence rabbitmq-server #1906 [#164375485]
-rw-r--r--src/rabbit_fifo_client.erl12
1 files changed, 5 insertions, 7 deletions
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index a1bf82367b..ba60b9d01c 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -49,6 +49,8 @@
-define(TIMER_TIME, 10000).
-type seq() :: non_neg_integer().
+%% last_applied is initialised to -1
+-type maybe_seq() :: integer().
-type action() :: {send_credit_reply, Available :: non_neg_integer()} |
{send_drained, CTagCredit ::
{rabbit_fifo:consumer_tag(), non_neg_integer()}}.
@@ -63,7 +65,7 @@
servers = [] :: [ra_server_id()],
leader :: maybe(ra_server_id()),
next_seq = 0 :: seq(),
- last_applied :: maybe(seq()),
+ last_applied = -1 :: maybe_seq(),
next_enqueue_seq = 1 :: seq(),
%% indicates that we've exceeded the soft limit
slow = false :: boolean(),
@@ -578,12 +580,8 @@ try_process_command([Server | Rem], Cmd, State) ->
seq_applied({Seq, MaybeAction},
{Corrs, Actions0, #state{last_applied = Last} = State0})
- when Seq > Last orelse Last =:= undefined ->
- State1 = case Last of
- undefined -> State0;
- _ ->
- do_resends(Last+1, Seq-1, State0)
- end,
+ when Seq > Last ->
+ State1 = do_resends(Last+1, Seq-1, State0),
{Actions, State} = maybe_add_action(MaybeAction, Actions0, State1),
case maps:take(Seq, State#state.pending) of
{{undefined, _}, Pending} ->