summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_mirror_queue_master.erl8
-rw-r--r--src/rabbit_mirror_queue_slave.erl115
2 files changed, 65 insertions, 58 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 477449e3e9..094b83c973 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -145,7 +145,7 @@ monitor_wait([MRef | MRefs]) ->
purge(State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
- ok = gm:broadcast(GM, {set_length, 0, false}),
+ ok = gm:broadcast(GM, {set_length, 0, BQ:len(BQS), false}),
{Count, BQS1} = BQ:purge(BQS),
{Count, State #state { backing_queue_state = BQS1,
set_delivered = 0 }}.
@@ -187,8 +187,8 @@ dropwhile(Pred, AckRequired,
Len = BQ:len(BQS),
{Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
Len1 = BQ:len(BQS1),
- ok = gm:broadcast(GM, {set_length, Len1, AckRequired}),
Dropped = Len - Len1,
+ ok = gm:broadcast(GM, {set_length, Len1, Dropped, AckRequired}),
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
{Next, Msgs, State #state { backing_queue_state = BQS1,
set_delivered = SetDelivered1 } }.
@@ -251,7 +251,7 @@ ack(AckTags, State = #state { gm = GM,
{MsgIds, BQS1} = BQ:ack(AckTags, BQS),
case MsgIds of
[] -> ok;
- _ -> ok = gm:broadcast(GM, {ack, MsgIds})
+ _ -> ok = gm:broadcast(GM, {ack, MsgIds, BQ:len(BQS1)})
end,
AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
{MsgIds, State #state { backing_queue_state = BQS1,
@@ -265,7 +265,7 @@ requeue(AckTags, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
{MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- ok = gm:broadcast(GM, {requeue, MsgIds}),
+ ok = gm:broadcast(GM, {requeue, MsgIds, BQ:len(BQS1)}),
{MsgIds, State #state { backing_queue_state = BQS1 }}.
len(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index ef43d96e5f..2c60acf061 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -77,7 +77,8 @@
msg_id_status,
known_senders,
- synchronised
+ synchronised,
+ external_pending
}).
start_link(Q) ->
@@ -131,7 +132,8 @@ init(#amqqueue { name = QueueName } = Q) ->
msg_id_status = dict:new(),
known_senders = pmon:new(),
- synchronised = false
+ synchronised = false,
+ external_pending = 0
},
rabbit_event:notify(queue_slave_created,
infos(?CREATION_EVENT_KEYS, State)),
@@ -809,71 +811,67 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
{ok, State1 #state { sender_queues = SQ1,
msg_id_status = MS1,
backing_queue_state = BQS1 }};
-process_instruction({set_length, Length, AckRequired},
+process_instruction({set_length, Length, Dropped, AckRequired},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
ToDrop = QLen - Length,
{ok,
- case ToDrop >= 0 of
- true ->
- State1 =
- lists:foldl(
- fun (const, StateN = #state {backing_queue_state = BQSN}) ->
- {{#basic_message{id = MsgId}, _IsDelivered, AckTag,
- _Remaining}, BQSN1} = BQ:fetch(AckRequired, BQSN),
- maybe_store_ack(
- AckRequired, MsgId, AckTag,
- StateN #state { backing_queue_state = BQSN1 })
- end, State, lists:duplicate(ToDrop, const)),
- set_synchronised(true, State1);
- false ->
- State
- end};
+ set_synchronised(
+ Length,
+ case ToDrop >= 0 of
+ true ->
+ State1 =
+ lists:foldl(
+ fun (const, StateN = #state{backing_queue_state = BQSN}) ->
+ {{#basic_message{id = MsgId}, _, AckTag, _},
+ BQSN1} = BQ:fetch(AckRequired, BQSN),
+ maybe_store_ack(
+ AckRequired, MsgId, AckTag,
+ StateN #state { backing_queue_state = BQSN1 })
+ end, State, lists:duplicate(ToDrop, const)),
+ case AckRequired of
+ true -> set_synchronised(ToDrop, Dropped, Length, State1);
+ false -> State1
+ end;
+ false ->
+ State
+ end)};
process_instruction({fetch, AckRequired, MsgId, Remaining},
State = #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
+ backing_queue_state = BQS,
+ external_pending = ExtPending }) ->
QLen = BQ:len(BQS),
- {ok, case QLen - 1 of
- Remaining ->
+ {ok, case {QLen - 1, AckRequired} of
+ {Remaining, _} ->
{{#basic_message{id = MsgId}, _IsDelivered,
AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
maybe_store_ack(AckRequired, MsgId, AckTag,
State #state { backing_queue_state = BQS1 });
- Other when Other + 1 =:= Remaining ->
- set_synchronised(true, State);
- Other when Other < Remaining ->
- %% we must be shorter than the master
- State
+ {_, false} when QLen =< Remaining ->
+ set_synchronised(Remaining, State);
+ {_, true} when QLen =< Remaining ->
+ State #state { external_pending = ExtPending + 1}
end};
-process_instruction({ack, MsgIds},
+process_instruction({ack, MsgIds, Length},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
msg_id_ack = MA }) ->
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
{MsgIds1, BQS1} = BQ:ack(AckTags, BQS),
[] = MsgIds1 -- MsgIds, %% ASSERTION
- {ok, State #state { msg_id_ack = MA1,
- backing_queue_state = BQS1 }};
-process_instruction({requeue, MsgIds},
+ {ok, set_synchronised(length(AckTags), length(MsgIds), Length,
+ State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 })};
+process_instruction({requeue, MsgIds, Length},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
msg_id_ack = MA }) ->
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
- {ok, case length(AckTags) =:= length(MsgIds) of
- true ->
- {MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- State #state { msg_id_ack = MA1,
- backing_queue_state = BQS1 };
- false ->
- %% The only thing we can safely do is nuke out our BQ
- %% and MA. The interaction between this and confirms
- %% doesn't really bear thinking about...
- {_Count, BQS1} = BQ:purge(BQS),
- {_MsgIds, BQS2} = ack_all(BQ, MA, BQS1),
- State #state { msg_id_ack = dict:new(),
- backing_queue_state = BQS2 }
- end};
+ {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
+ {ok, set_synchronised(length(AckTags), length(MsgIds), Length,
+ State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 })};
process_instruction({sender_death, ChPid},
State = #state { sender_queues = SQ,
msg_id_status = MS,
@@ -891,10 +889,8 @@ process_instruction({sender_death, ChPid},
msg_id_status = MS1,
known_senders = pmon:demonitor(ChPid, KS) }
end};
-process_instruction({length, Length},
- State = #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
- {ok, set_synchronised(Length =:= BQ:len(BQS), State)};
+process_instruction({length, Length}, State) ->
+ {ok, set_synchronised(Length, State)};
process_instruction({delete_and_terminate, Reason},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -913,9 +909,6 @@ msg_ids_to_acktags(MsgIds, MA) ->
end, {[], MA}, MsgIds),
{lists:reverse(AckTags), MA1}.
-ack_all(BQ, MA, BQS) ->
- BQ:ack([AckTag || {_MsgId, {_Num, AckTag}} <- dict:to_list(MA)], BQS).
-
maybe_store_ack(false, _MsgId, _AckTag, State) ->
State;
maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
@@ -923,9 +916,23 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
ack_num = Num + 1 }.
+set_synchronised(LocalPending, RemotePending, Length,
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS,
+ external_pending = ExtPending }) ->
+ ExtPending1 = ExtPending - (RemotePending - LocalPending),
+ State1 = State #state { external_pending = ExtPending1 },
+ case ExtPending1 =:= 0 andalso Length =:= BQ:len(BQS) of
+ true -> set_synchronised1(true, State1);
+ false when ExtPending1 >= 0 -> set_synchronised1(false, State1)
+ end.
+
+set_synchronised(Length, State) ->
+ set_synchronised(0, 0, Length, State).
+
%% We intentionally leave out the head where a slave becomes
%% unsynchronised: we assert that can never happen.
-set_synchronised(true, State = #state { q = #amqqueue { name = QName },
+set_synchronised1(true, State = #state { q = #amqqueue { name = QName },
synchronised = false }) ->
Self = self(),
rabbit_misc:execute_mnesia_transaction(
@@ -939,7 +946,7 @@ set_synchronised(true, State = #state { q = #amqqueue { name = QName },
end
end),
State #state { synchronised = true };
-set_synchronised(true, State) ->
+set_synchronised1(true, State) ->
State;
-set_synchronised(false, State = #state { synchronised = false }) ->
+set_synchronised1(false, State = #state { synchronised = false }) ->
State.