summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-12-20 15:48:31 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2010-12-20 15:48:31 +0000
commitc1cc82f75d702329ef8007ab6ecaaade645edbd7 (patch)
tree842eb6d4815bd63aa43226c953cf804ace60ee5b /src
parentb60e1c47fd36139cf5f1f4b4fff13b5110728b6c (diff)
downloadrabbitmq-server-git-c1cc82f75d702329ef8007ab6ecaaade645edbd7.tar.gz
Give in and have the master put the pub msgs themselves on the gm. Avoiding this proves far too complex in all the failure cases (the worst being when the publishing node crashes - the master can receive the msg, but not the slaves. Worse, because of complexities like delegates, it's not even straightforward to monitor the publishers in order to be sure we're not going to receive more messages from them). We continue to have all msgs directly routed to all queues. Yes, this means that normally every slave receives every message twice, but this is genuinely the simplest and most secure route and protects against failures the best.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_mirror_queue_master.erl62
-rw-r--r--src/rabbit_mirror_queue_slave.erl246
2 files changed, 159 insertions, 149 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 0d64ab8e77..94e93b3e80 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -26,7 +26,7 @@
-export([start/1, stop/0]).
--export([promote_backing_queue_state/4]).
+-export([promote_backing_queue_state/5]).
-behaviour(rabbit_backing_queue).
@@ -36,7 +36,8 @@
coordinator,
backing_queue,
backing_queue_state,
- set_delivered
+ set_delivered,
+ seen
}).
%% ---------------------------------------------------------------------------
@@ -64,14 +65,16 @@ init(#amqqueue { arguments = Args } = Q, Recover) ->
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = 0 }.
+ set_delivered = 0,
+ seen = sets:new() }.
-promote_backing_queue_state(CPid, BQ, BQS, GM) ->
+promote_backing_queue_state(CPid, BQ, BQS, GM, Seen) ->
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = BQ:len(BQS) }.
+ set_delivered = BQ:len(BQS),
+ seen = Seen }.
terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
%% Backing queue termination. The queue is going down but
@@ -94,22 +97,31 @@ purge(State = #state { gm = GM,
{Count, State #state { backing_queue_state = BQS1,
set_delivered = 0 }}.
-publish(Msg = #basic_message { guid = Guid },
- MsgProps, ChPid, State = #state { gm = GM,
+publish(Msg = #basic_message { guid = Guid }, MsgProps, ChPid,
+ State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ seen = Seen }) ->
+ case sets:is_element(Guid, Seen) of
+ true -> State #state { seen = sets:del_element(Guid, Seen) };
+ false -> ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}),
+ BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ State #state { backing_queue_state = BQS1 }
+ end.
+
+publish_delivered(AckRequired, Msg = #basic_message { guid = Guid }, MsgProps,
+ ChPid, State = #state { gm = GM,
backing_queue = BQ,
- backing_queue_state = BQS }) ->
- ok = gm:broadcast(GM, {publish, false, Guid, MsgProps, ChPid}),
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
- State #state { backing_queue_state = BQS1 }.
-
-publish_delivered(AckRequired, Msg = #basic_message { guid = Guid },
- MsgProps, ChPid,
- State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS }) ->
- ok = gm:broadcast(GM, {publish, {true, AckRequired}, Guid, MsgProps, ChPid}),
- {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS),
- {AckTag, State #state { backing_queue_state = BQS1 }}.
+ backing_queue_state = BQS,
+ seen = Seen }) ->
+ case sets:is_element(Guid, Seen) of
+ true -> State #state { seen = sets:del_element(Guid, Seen) };
+ false -> ok = gm:broadcast(GM, {publish, {true, AckRequired}, ChPid,
+ MsgProps, Msg}),
+ {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg,
+ MsgProps, ChPid, BQS),
+ {AckTag, State #state { backing_queue_state = BQS1 }}
+ end.
dropwhile(Fun, State = #state { gm = GM,
backing_queue = BQ,
@@ -126,7 +138,8 @@ dropwhile(Fun, State = #state { gm = GM,
fetch(AckRequired, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = SetDelivered }) ->
+ set_delivered = SetDelivered,
+ seen = Seen }) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
State1 = State #state { backing_queue_state = BQS1 },
case Result of
@@ -137,8 +150,13 @@ fetch(AckRequired, State = #state { gm = GM,
ok = gm:broadcast(GM, {fetch, AckRequired, Guid, Remaining}),
IsDelivered1 = IsDelivered orelse SetDelivered > 0,
SetDelivered1 = lists:max([0, SetDelivered - 1]),
+ Seen1 = case SetDelivered + SetDelivered1 of
+ 1 -> sets:new(); %% transition to empty
+ _ -> Seen
+ end,
{{Message, IsDelivered1, AckTag, Remaining},
- State1 #state { set_delivered = SetDelivered1 }}
+ State1 #state { set_delivered = SetDelivered1,
+ seen = Seen1 }}
end.
ack(AckTags, State = #state { gm = GM,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index f124bc9eb0..deb1cc66dd 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -86,7 +86,7 @@
sender_queues, %% :: Pid -> MsgQ
guid_ack, %% :: Guid -> AckTag
- instructions, %% :: InstrQ
+ seen, %% Set Guid
guid_to_channel %% for confirms
}).
@@ -140,7 +140,7 @@ init([#amqqueue { name = QueueName } = Q]) ->
sender_queues = dict:new(),
guid_ack = dict:new(),
- instructions = queue:new(),
+ seen = sets:new(),
guid_to_channel = dict:new()
}, hibernate,
@@ -153,12 +153,12 @@ init([#amqqueue { name = QueueName } = Q]) ->
handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) ->
%% Synchronous, "immediate" delivery mode
gen_server2:reply(From, false), %% master may deliver it, not us
- handle_process_result(enqueue_message(Delivery, State));
+ noreply(maybe_enqueue_message(Delivery, State));
handle_call({deliver, Delivery = #delivery {}}, From, State) ->
%% Synchronous, "mandatory" delivery mode
gen_server2:reply(From, true), %% amqqueue throws away the result anyway
- handle_process_result(enqueue_message(Delivery, State));
+ noreply(maybe_enqueue_message(Delivery, State));
handle_call({gm_deaths, Deaths}, From,
State = #state { q = #amqqueue { name = QueueName },
@@ -187,16 +187,12 @@ handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) ->
noreply(maybe_run_queue_via_backing_queue(Fun, State));
-handle_cast({gm, Instruction}, State = #state { instructions = InstrQ }) ->
- State1 = State #state { instructions = queue:in(Instruction, InstrQ) },
- case queue:is_empty(InstrQ) of
- true -> handle_process_result(process_instructions(State1));
- false -> noreply(State1)
- end;
+handle_cast({gm, Instruction}, State) ->
+ handle_process_result(process_instruction(Instruction, State));
handle_cast({deliver, Delivery = #delivery {}}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- handle_process_result(enqueue_message(Delivery, State));
+ noreply(maybe_enqueue_message(Delivery, State));
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
@@ -317,8 +313,8 @@ maybe_confirm_message(Guid, GTC) ->
GTC
end.
-handle_process_result({continue, State}) -> noreply(State);
-handle_process_result({stop, State}) -> {stop, normal, State}.
+handle_process_result({ok, State}) -> noreply(State);
+handle_process_result({stop, State}) -> {stop, normal, State}.
promote_me(From, #state { q = Q,
gm = GM,
@@ -326,6 +322,7 @@ promote_me(From, #state { q = Q,
backing_queue_state = BQS,
rate_timer_ref = RateTRef,
sender_queues = SQ,
+ seen = Seen,
guid_ack = GA }) ->
rabbit_log:info("Promoting slave ~p for queue ~p~n",
[self(), Q #amqqueue.name]),
@@ -334,7 +331,7 @@ promote_me(From, #state { q = Q,
gen_server2:reply(From, {promote, CPid}),
ok = gm:confirmed_broadcast(GM, heartbeat),
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
- CPid, BQ, BQS, GM),
+ CPid, BQ, BQS, GM, Seen),
%% We have to do the requeue via this init because otherwise we
%% don't have access to the relevent MsgPropsFun. Also, we are
%% already in mnesia as the master queue pid. Thus we cannot just
@@ -378,115 +375,111 @@ stop_rate_timer(State = #state { rate_timer_ref = TRef }) ->
{ok, cancel} = timer:cancel(TRef),
State #state { rate_timer_ref = undefined }.
-enqueue_message(Delivery = #delivery { sender = ChPid },
- State = #state { sender_queues = SQ }) ->
- Q = case dict:find(ChPid, SQ) of
- {ok, Q1} -> Q1;
- error -> queue:new()
- end,
- SQ1 = dict:store(ChPid, queue:in(Delivery, Q), SQ),
- State1 = State #state { sender_queues = SQ1 },
- case queue:is_empty(Q) of
- true -> process_instructions(State1);
- false -> {continue, State1}
- end.
-
-process_instructions(State = #state { instructions = InstrQ }) ->
- case queue:out(InstrQ) of
- {empty, _InstrQ} ->
- {continue, State};
- {{value, Instr}, InstrQ1} ->
- case process_instruction(Instr, State) of
- {processed, State1} ->
- process_instructions(
- State1 #state { instructions = InstrQ1 });
- {stop, State1} ->
- {stop, State1 #state { instructions = InstrQ1 }};
- blocked ->
- {continue, State}
- end
+maybe_enqueue_message(
+ Delivery = #delivery { message = #basic_message { guid = Guid },
+ sender = ChPid },
+ State = #state { q = Q,
+ sender_queues = SQ,
+ seen = Seen,
+ guid_to_channel = GTC }) ->
+ case sets:is_element(Guid, Seen) of
+ true ->
+ GTC1 = record_confirm_or_confirm(Delivery, Q, GTC),
+ State #state { guid_to_channel = GTC1,
+ seen = sets:del_element(Guid, Seen) };
+ false ->
+ MQ = case dict:find(ChPid, SQ) of
+ {ok, MQ1} -> MQ1;
+ error -> queue:new()
+ end,
+ SQ1 = dict:store(ChPid, queue:in(Delivery, MQ), SQ),
+ State #state { sender_queues = SQ1 }
end.
-process_instruction({publish, Deliver, Guid, MsgProps, ChPid},
- State = #state { q = Q,
- sender_queues = SQ,
- backing_queue = BQ,
- backing_queue_state = BQS,
- guid_ack = GA,
- guid_to_channel = GTC }) ->
- case dict:find(ChPid, SQ) of
- error ->
- blocked;
- {ok, MQ} ->
- case queue:out(MQ) of
- {empty, _MQ} ->
- blocked;
- {{value, Delivery = #delivery {
- message = Msg = #basic_message { guid = Guid } }},
- MQ1} ->
- State1 = State #state { sender_queues =
- dict:store(ChPid, MQ1, SQ) },
- GTC1 = record_confirm_or_confirm(Delivery, Q, GTC),
- {processed,
- case Deliver of
- false ->
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
- State1 #state { backing_queue_state = BQS1,
- guid_to_channel = GTC1 };
- {true, AckRequired} ->
- {AckTag, BQS1} = BQ:publish_delivered(
- AckRequired, Msg, MsgProps,
- ChPid, BQS),
- {GA1, GTC2} =
- case AckRequired of
- true ->
- {dict:store(Guid, AckTag, GA), GTC1};
- false ->
- {GA, maybe_confirm_message(Guid, GTC1)}
- end,
- State1 #state { backing_queue_state = BQS1,
- guid_ack = GA1,
- guid_to_channel = GTC2 }
- end};
- {{value, #delivery {}}, _MQ1} ->
- %% throw away the instruction: we'll never receive
- %% the message to which it corresponds.
- {processed, State}
- end
- end;
+process_instruction(
+ {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { guid = Guid }},
+ State = #state { q = Q,
+ sender_queues = SQ,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ guid_ack = GA,
+ seen = Seen,
+ guid_to_channel = GTC }) ->
+ {SQ1, Seen1, GTC1} =
+ case dict:find(ChPid, SQ) of
+ error ->
+ {SQ, sets:add_element(Guid, Seen), GTC};
+ {ok, MQ} ->
+ case queue:out(MQ) of
+ {empty, _MQ} ->
+ {SQ, sets:add_element(Guid, Seen), GTC};
+ {{value, Delivery = #delivery {
+ message = #basic_message { guid = Guid } }},
+ MQ1} ->
+ GTC2 = record_confirm_or_confirm(Delivery, Q, GTC),
+ {dict:store(ChPid, MQ1, SQ), Seen, GTC2};
+ {{value, #delivery {}}, _MQ1} ->
+ %% The instruction was sent to us before we
+ %% were within the mirror_pids within the
+ %% amqqueue record. We'll never receive the
+ %% message directly.
+ {SQ, Seen, GTC}
+ end
+ end,
+ State1 = State #state { sender_queues = SQ1,
+ seen = Seen1,
+ guid_to_channel = GTC1 },
+ {ok,
+ case Deliver of
+ false ->
+ BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ State1 #state { backing_queue_state = BQS1 };
+ {true, AckRequired} ->
+ {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps,
+ ChPid, BQS),
+ {GA1, GTC3} = case AckRequired of
+ true -> {dict:store(Guid, AckTag, GA), GTC1};
+ false -> {GA, maybe_confirm_message(Guid, GTC1)}
+ end,
+ State1 #state { backing_queue_state = BQS1,
+ guid_ack = GA1,
+ guid_to_channel = GTC3 }
+ end};
process_instruction({set_length, Length},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
ToDrop = QLen - Length,
- {processed,
- case ToDrop > 0 of
- true -> BQS1 = lists:foldl(
- fun (const, BQSN) -> BQ:fetch(false, BQSN) end,
- BQS, lists:duplicate(ToDrop, const)),
- State #state { backing_queue_state = BQS1 };
- false -> State
- end};
+ {ok, case ToDrop > 0 of
+ true -> BQS1 =
+ lists:foldl(
+ fun (const, BQSN) ->
+ {{_Msg, _IsDelivered, _AckTag, _Remaining},
+ BQSN1} = BQ:fetch(false, BQSN),
+ BQSN1
+ end, BQS, lists:duplicate(ToDrop, const)),
+ State #state { backing_queue_state = BQS1 };
+ false -> State
+ end};
process_instruction({fetch, AckRequired, Guid, Remaining},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
guid_ack = GA }) ->
QLen = BQ:len(BQS),
- {processed,
- case QLen - 1 of
- Remaining ->
- {{_Msg, _IsDelivered, AckTag, Remaining}, BQS1} =
- BQ:fetch(AckRequired, BQS),
- GA1 = case AckRequired of
- true -> dict:store(Guid, AckTag, GA);
- false -> GA
- end,
- State #state { backing_queue_state = BQS1,
- guid_ack = GA1 };
- Other when Other < Remaining ->
- %% we must be shorter than the master
- State
- end};
+ {ok, case QLen - 1 of
+ Remaining ->
+ {{_Msg, _IsDelivered, AckTag, Remaining}, BQS1} =
+ BQ:fetch(AckRequired, BQS),
+ GA1 = case AckRequired of
+ true -> dict:store(Guid, AckTag, GA);
+ false -> GA
+ end,
+ State #state { backing_queue_state = BQS1,
+ guid_ack = GA1 };
+ Other when Other < Remaining ->
+ %% we must be shorter than the master
+ State
+ end};
process_instruction({ack, Guids},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -494,27 +487,26 @@ process_instruction({ack, Guids},
{AckTags, GA1} = guids_to_acktags(Guids, GA),
{Guids1, BQS1} = BQ:ack(AckTags, BQS),
[] = Guids1 -- Guids, %% ASSERTION
- {processed, State #state { guid_ack = GA1,
- backing_queue_state = BQS1 }};
+ {ok, State #state { guid_ack = GA1,
+ backing_queue_state = BQS1 }};
process_instruction({requeue, MsgPropsFun, Guids},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
guid_ack = GA }) ->
{AckTags, GA1} = guids_to_acktags(Guids, GA),
- {processed,
- case length(AckTags) =:= length(Guids) of
- true ->
- {Guids, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS),
- State #state { guid_ack = GA1,
- backing_queue_state = BQS1 };
- false ->
- %% the only thing we can safely do is nuke out our BQ and
- %% GA
- {_Count, BQS1} = BQ:purge(BQS),
- {Guids, BQS2} = ack_all(BQ, GA, BQS1),
- State #state { guid_ack = dict:new(),
- backing_queue_state = BQS2 }
- end};
+ {ok, case length(AckTags) =:= length(Guids) of
+ true ->
+ {Guids, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS),
+ State #state { guid_ack = GA1,
+ backing_queue_state = BQS1 };
+ false ->
+ %% the only thing we can safely do is nuke out our BQ
+ %% and GA
+ {_Count, BQS1} = BQ:purge(BQS),
+ {Guids, BQS2} = ack_all(BQ, GA, BQS1),
+ State #state { guid_ack = dict:new(),
+ backing_queue_state = BQS2 }
+ end};
process_instruction(delete_and_terminate,
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->