summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-04-19 18:56:44 +0100
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-04-19 18:56:44 +0100
commitce7ca70ce09c354973d6efca23cecd96d73a57d4 (patch)
treed750291c1cd8b4efad53592c77b651ca1bd1a4f4
parent24c18e4d3621efe7f2f348c0572def0509362729 (diff)
parente3ee22fecc5c582ecd45e81cc458018bdaa7dc81 (diff)
downloadrabbitmq-server-git-ce7ca70ce09c354973d6efca23cecd96d73a57d4.tar.gz
merge bug24885
-rw-r--r--src/rabbit_mirror_queue_master.erl4
-rw-r--r--src/rabbit_mirror_queue_slave.erl4
2 files changed, 4 insertions, 4 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 3afa5b6022..ee9dc0bf18 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -134,7 +134,7 @@ delete_and_terminate(Reason, State = #state { gm = GM,
purge(State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
- ok = gm:broadcast(GM, {set_length, 0}),
+ ok = gm:broadcast(GM, {set_length, 0, false}),
{Count, BQS1} = BQ:purge(BQS),
{Count, State #state { backing_queue_state = BQS1,
set_delivered = 0 }}.
@@ -176,7 +176,7 @@ dropwhile(Pred, AckMsgs,
Len = BQ:len(BQS),
{Msgs, BQS1} = BQ:dropwhile(Pred, AckMsgs, BQS),
Len1 = BQ:len(BQS1),
- ok = gm:broadcast(GM, {set_length, Len1}),
+ ok = gm:broadcast(GM, {set_length, Len1, AckMsgs}),
Dropped = Len - Len1,
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
{Msgs, State #state { backing_queue_state = BQS1,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index a7a1273d91..2df5c82c06 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -793,7 +793,7 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
{ok, State1 #state { sender_queues = SQ1,
msg_id_status = MS1,
backing_queue_state = BQS1 }};
-process_instruction({set_length, Length},
+process_instruction({set_length, Length, AckRequired},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
@@ -803,7 +803,7 @@ process_instruction({set_length, Length},
lists:foldl(
fun (const, BQSN) ->
{{_Msg, _IsDelivered, _AckTag, _Remaining},
- BQSN1} = BQ:fetch(false, BQSN),
+ BQSN1} = BQ:fetch(AckRequired, BQSN),
BQSN1
end, BQS, lists:duplicate(ToDrop, const)),
set_synchronised(