diff options
| author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-04-19 18:56:44 +0100 |
|---|---|---|
| committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-04-19 18:56:44 +0100 |
| commit | ce7ca70ce09c354973d6efca23cecd96d73a57d4 (patch) | |
| tree | d750291c1cd8b4efad53592c77b651ca1bd1a4f4 | |
| parent | 24c18e4d3621efe7f2f348c0572def0509362729 (diff) | |
| parent | e3ee22fecc5c582ecd45e81cc458018bdaa7dc81 (diff) | |
| download | rabbitmq-server-git-ce7ca70ce09c354973d6efca23cecd96d73a57d4.tar.gz | |
merge bug24885
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 4 |
2 files changed, 4 insertions, 4 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 3afa5b6022..ee9dc0bf18 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -134,7 +134,7 @@ delete_and_terminate(Reason, State = #state { gm = GM, purge(State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> - ok = gm:broadcast(GM, {set_length, 0}), + ok = gm:broadcast(GM, {set_length, 0, false}), {Count, BQS1} = BQ:purge(BQS), {Count, State #state { backing_queue_state = BQS1, set_delivered = 0 }}. @@ -176,7 +176,7 @@ dropwhile(Pred, AckMsgs, Len = BQ:len(BQS), {Msgs, BQS1} = BQ:dropwhile(Pred, AckMsgs, BQS), Len1 = BQ:len(BQS1), - ok = gm:broadcast(GM, {set_length, Len1}), + ok = gm:broadcast(GM, {set_length, Len1, AckMsgs}), Dropped = Len - Len1, SetDelivered1 = lists:max([0, SetDelivered - Dropped]), {Msgs, State #state { backing_queue_state = BQS1, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index a7a1273d91..2df5c82c06 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -793,7 +793,7 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, {ok, State1 #state { sender_queues = SQ1, msg_id_status = MS1, backing_queue_state = BQS1 }}; -process_instruction({set_length, Length}, +process_instruction({set_length, Length, AckRequired}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> QLen = BQ:len(BQS), @@ -803,7 +803,7 @@ process_instruction({set_length, Length}, lists:foldl( fun (const, BQSN) -> {{_Msg, _IsDelivered, _AckTag, _Remaining}, - BQSN1} = BQ:fetch(false, BQSN), + BQSN1} = BQ:fetch(AckRequired, BQSN), BQSN1 end, BQS, lists:duplicate(ToDrop, const)), set_synchronised( |
