summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-08-30 17:05:54 +0100
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-08-30 17:05:54 +0100
commit9578c4896171a1bef58499542fd3e2a7310557e2 (patch)
tree8b51e6922856ae41e93521a8297855ba9fade59c
parent75a5461907ee3b03e6d7ce6a8adc749b401a7757 (diff)
downloadrabbitmq-server-git-9578c4896171a1bef58499542fd3e2a7310557e2.tar.gz
take into account requeues when setting synch state for slaves
To do this, keep count all the fetches we've seen that require an ack for messages we don't have (e.g. when the queue we have is shorter than on the master). We then decrease this counter appropriately when requeueing, acking, and set_length'ing. Given this, we can deem the slave synced only when the length is the same *and* the counter described above is 9 - there are no pending acks on the master for messages we don't have. I might have missed something (I barely tested this) but it seems to do the trick.
-rw-r--r--src/rabbit_mirror_queue_master.erl8
-rw-r--r--src/rabbit_mirror_queue_slave.erl115
2 files changed, 65 insertions, 58 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 477449e3e9..094b83c973 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -145,7 +145,7 @@ monitor_wait([MRef | MRefs]) ->
purge(State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
- ok = gm:broadcast(GM, {set_length, 0, false}),
+ ok = gm:broadcast(GM, {set_length, 0, BQ:len(BQS), false}),
{Count, BQS1} = BQ:purge(BQS),
{Count, State #state { backing_queue_state = BQS1,
set_delivered = 0 }}.
@@ -187,8 +187,8 @@ dropwhile(Pred, AckRequired,
Len = BQ:len(BQS),
{Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
Len1 = BQ:len(BQS1),
- ok = gm:broadcast(GM, {set_length, Len1, AckRequired}),
Dropped = Len - Len1,
+ ok = gm:broadcast(GM, {set_length, Len1, Dropped, AckRequired}),
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
{Next, Msgs, State #state { backing_queue_state = BQS1,
set_delivered = SetDelivered1 } }.
@@ -251,7 +251,7 @@ ack(AckTags, State = #state { gm = GM,
{MsgIds, BQS1} = BQ:ack(AckTags, BQS),
case MsgIds of
[] -> ok;
- _ -> ok = gm:broadcast(GM, {ack, MsgIds})
+ _ -> ok = gm:broadcast(GM, {ack, MsgIds, BQ:len(BQS1)})
end,
AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
{MsgIds, State #state { backing_queue_state = BQS1,
@@ -265,7 +265,7 @@ requeue(AckTags, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
{MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- ok = gm:broadcast(GM, {requeue, MsgIds}),
+ ok = gm:broadcast(GM, {requeue, MsgIds, BQ:len(BQS1)}),
{MsgIds, State #state { backing_queue_state = BQS1 }}.
len(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index ef43d96e5f..2c60acf061 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -77,7 +77,8 @@
msg_id_status,
known_senders,
- synchronised
+ synchronised,
+ external_pending
}).
start_link(Q) ->
@@ -131,7 +132,8 @@ init(#amqqueue { name = QueueName } = Q) ->
msg_id_status = dict:new(),
known_senders = pmon:new(),
- synchronised = false
+ synchronised = false,
+ external_pending = 0
},
rabbit_event:notify(queue_slave_created,
infos(?CREATION_EVENT_KEYS, State)),
@@ -809,71 +811,67 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
{ok, State1 #state { sender_queues = SQ1,
msg_id_status = MS1,
backing_queue_state = BQS1 }};
-process_instruction({set_length, Length, AckRequired},
+process_instruction({set_length, Length, Dropped, AckRequired},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
ToDrop = QLen - Length,
{ok,
- case ToDrop >= 0 of
- true ->
- State1 =
- lists:foldl(
- fun (const, StateN = #state {backing_queue_state = BQSN}) ->
- {{#basic_message{id = MsgId}, _IsDelivered, AckTag,
- _Remaining}, BQSN1} = BQ:fetch(AckRequired, BQSN),
- maybe_store_ack(
- AckRequired, MsgId, AckTag,
- StateN #state { backing_queue_state = BQSN1 })
- end, State, lists:duplicate(ToDrop, const)),
- set_synchronised(true, State1);
- false ->
- State
- end};
+ set_synchronised(
+ Length,
+ case ToDrop >= 0 of
+ true ->
+ State1 =
+ lists:foldl(
+ fun (const, StateN = #state{backing_queue_state = BQSN}) ->
+ {{#basic_message{id = MsgId}, _, AckTag, _},
+ BQSN1} = BQ:fetch(AckRequired, BQSN),
+ maybe_store_ack(
+ AckRequired, MsgId, AckTag,
+ StateN #state { backing_queue_state = BQSN1 })
+ end, State, lists:duplicate(ToDrop, const)),
+ case AckRequired of
+ true -> set_synchronised(ToDrop, Dropped, Length, State1);
+ false -> State1
+ end;
+ false ->
+ State
+ end)};
process_instruction({fetch, AckRequired, MsgId, Remaining},
State = #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
+ backing_queue_state = BQS,
+ external_pending = ExtPending }) ->
QLen = BQ:len(BQS),
- {ok, case QLen - 1 of
- Remaining ->
+ {ok, case {QLen - 1, AckRequired} of
+ {Remaining, _} ->
{{#basic_message{id = MsgId}, _IsDelivered,
AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
maybe_store_ack(AckRequired, MsgId, AckTag,
State #state { backing_queue_state = BQS1 });
- Other when Other + 1 =:= Remaining ->
- set_synchronised(true, State);
- Other when Other < Remaining ->
- %% we must be shorter than the master
- State
+ {_, false} when QLen =< Remaining ->
+ set_synchronised(Remaining, State);
+ {_, true} when QLen =< Remaining ->
+ State #state { external_pending = ExtPending + 1}
end};
-process_instruction({ack, MsgIds},
+process_instruction({ack, MsgIds, Length},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
msg_id_ack = MA }) ->
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
{MsgIds1, BQS1} = BQ:ack(AckTags, BQS),
[] = MsgIds1 -- MsgIds, %% ASSERTION
- {ok, State #state { msg_id_ack = MA1,
- backing_queue_state = BQS1 }};
-process_instruction({requeue, MsgIds},
+ {ok, set_synchronised(length(AckTags), length(MsgIds), Length,
+ State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 })};
+process_instruction({requeue, MsgIds, Length},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
msg_id_ack = MA }) ->
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
- {ok, case length(AckTags) =:= length(MsgIds) of
- true ->
- {MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- State #state { msg_id_ack = MA1,
- backing_queue_state = BQS1 };
- false ->
- %% The only thing we can safely do is nuke out our BQ
- %% and MA. The interaction between this and confirms
- %% doesn't really bear thinking about...
- {_Count, BQS1} = BQ:purge(BQS),
- {_MsgIds, BQS2} = ack_all(BQ, MA, BQS1),
- State #state { msg_id_ack = dict:new(),
- backing_queue_state = BQS2 }
- end};
+ {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
+ {ok, set_synchronised(length(AckTags), length(MsgIds), Length,
+ State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 })};
process_instruction({sender_death, ChPid},
State = #state { sender_queues = SQ,
msg_id_status = MS,
@@ -891,10 +889,8 @@ process_instruction({sender_death, ChPid},
msg_id_status = MS1,
known_senders = pmon:demonitor(ChPid, KS) }
end};
-process_instruction({length, Length},
- State = #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
- {ok, set_synchronised(Length =:= BQ:len(BQS), State)};
+process_instruction({length, Length}, State) ->
+ {ok, set_synchronised(Length, State)};
process_instruction({delete_and_terminate, Reason},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -913,9 +909,6 @@ msg_ids_to_acktags(MsgIds, MA) ->
end, {[], MA}, MsgIds),
{lists:reverse(AckTags), MA1}.
-ack_all(BQ, MA, BQS) ->
- BQ:ack([AckTag || {_MsgId, {_Num, AckTag}} <- dict:to_list(MA)], BQS).
-
maybe_store_ack(false, _MsgId, _AckTag, State) ->
State;
maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
@@ -923,9 +916,23 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
ack_num = Num + 1 }.
+set_synchronised(LocalPending, RemotePending, Length,
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS,
+ external_pending = ExtPending }) ->
+ ExtPending1 = ExtPending - (RemotePending - LocalPending),
+ State1 = State #state { external_pending = ExtPending1 },
+ case ExtPending1 =:= 0 andalso Length =:= BQ:len(BQS) of
+ true -> set_synchronised1(true, State1);
+ false when ExtPending1 >= 0 -> set_synchronised1(false, State1)
+ end.
+
+set_synchronised(Length, State) ->
+ set_synchronised(0, 0, Length, State).
+
%% We intentionally leave out the head where a slave becomes
%% unsynchronised: we assert that can never happen.
-set_synchronised(true, State = #state { q = #amqqueue { name = QName },
+set_synchronised1(true, State = #state { q = #amqqueue { name = QName },
synchronised = false }) ->
Self = self(),
rabbit_misc:execute_mnesia_transaction(
@@ -939,7 +946,7 @@ set_synchronised(true, State = #state { q = #amqqueue { name = QName },
end
end),
State #state { synchronised = true };
-set_synchronised(true, State) ->
+set_synchronised1(true, State) ->
State;
-set_synchronised(false, State = #state { synchronised = false }) ->
+set_synchronised1(false, State = #state { synchronised = false }) ->
State.