summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-08-31 16:03:38 +0100
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-08-31 16:03:38 +0100
commite47c4fc77fa87478635417ac1ee4032a8c2cce35 (patch)
treefd00bc4da6876ee29210a5378c719648a12757b6
parentc32bfbf56428c3a2d6b12eb252034cca113c7524 (diff)
downloadrabbitmq-server-git-e47c4fc77fa87478635417ac1ee4032a8c2cce35.tar.gz
do not track external pendings until we receive `length'
Otherwise, we might break some assertions.
-rw-r--r--src/rabbit_mirror_queue_slave.erl9
1 files changed, 7 insertions, 2 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 38bbf59f3c..4e153ca1a1 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -133,7 +133,7 @@ init(#amqqueue { name = QueueName } = Q) ->
known_senders = pmon:new(),
synchronised = false,
- external_pending = 0
+ external_pending = undefined
},
rabbit_event:notify(queue_slave_created,
infos(?CREATION_EVENT_KEYS, State)),
@@ -849,7 +849,10 @@ process_instruction({fetch, AckRequired, MsgId, Remaining},
{_, false} when QLen =< Remaining ->
set_synchronised(Remaining, State);
{_, true} when QLen =< Remaining ->
- State #state { external_pending = ExtPending + 1}
+ State #state { external_pending = case ExtPending of
+ undefined -> undefined;
+ _ -> ExtPending + 1
+ end }
end};
process_instruction({ack, MsgIds, Length},
State = #state { backing_queue = BQ,
@@ -915,6 +918,8 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
ack_num = Num + 1 }.
+set_synchronised(_, _, _, State = #state { external_pending = undefined }) ->
+ State;
set_synchronised(LocalPending, RemotePending, Length,
State = #state { backing_queue = BQ,
backing_queue_state = BQS,