diff options
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 24 |
1 files changed, 10 insertions, 14 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 1f6567e0e4..b28ff6e27b 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -806,21 +806,17 @@ process_instruction({fetch, AckRequired, MsgId, Remaining}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> QLen = BQ:len(BQS), - {State1, Delta} = - case QLen - 1 of - Remaining -> - {{#basic_message{id = MsgId}, _IsDelivered, - AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), - {maybe_store_ack(AckRequired, MsgId, AckTag, - State #state { backing_queue_state = BQS1 }), - 0}; + {ok, case QLen - 1 of + Remaining -> + {{#basic_message{id = MsgId}, _IsDelivered, + AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), + maybe_store_ack(AckRequired, MsgId, AckTag, + State #state { backing_queue_state = BQS1 }); + _ when QLen =< Remaining andalso AckRequired -> + State; _ when QLen =< Remaining -> - {State, case AckRequired of - true -> 0; - false -> -1 - end} - end, - {ok, set_synchronised(Delta, State1)}; + set_synchronised(-1, State) + end}; process_instruction({ack, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, |
