summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_mirror_queue_slave.erl24
1 files changed, 10 insertions, 14 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 1f6567e0e4..b28ff6e27b 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -806,21 +806,17 @@ process_instruction({fetch, AckRequired, MsgId, Remaining},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
- {State1, Delta} =
- case QLen - 1 of
- Remaining ->
- {{#basic_message{id = MsgId}, _IsDelivered,
- AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
- {maybe_store_ack(AckRequired, MsgId, AckTag,
- State #state { backing_queue_state = BQS1 }),
- 0};
+ {ok, case QLen - 1 of
+ Remaining ->
+ {{#basic_message{id = MsgId}, _IsDelivered,
+ AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
+ maybe_store_ack(AckRequired, MsgId, AckTag,
+ State #state { backing_queue_state = BQS1 });
+ _ when QLen =< Remaining andalso AckRequired ->
+ State;
_ when QLen =< Remaining ->
- {State, case AckRequired of
- true -> 0;
- false -> -1
- end}
- end,
- {ok, set_synchronised(Delta, State1)};
+ set_synchronised(-1, State)
+ end};
process_instruction({ack, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,