diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-12-20 15:48:31 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-12-20 15:48:31 +0000 |
| commit | c1cc82f75d702329ef8007ab6ecaaade645edbd7 (patch) | |
| tree | 842eb6d4815bd63aa43226c953cf804ace60ee5b /src | |
| parent | b60e1c47fd36139cf5f1f4b4fff13b5110728b6c (diff) | |
| download | rabbitmq-server-git-c1cc82f75d702329ef8007ab6ecaaade645edbd7.tar.gz | |
Give in and have the master put the pub msgs themselves on the gm. Avoiding this proves far too complex in all the failure cases (the worst being when the publishing node crashes - the master can receive the msg, but not the slaves. Worse, because of complexities like delegates, it's not even straightforward to monitor the publishers in order to be sure we're not going to receive more messages from them). We continue to have all msgs directly routed to all queues. Yes, this means that normally every slave receives every message twice, but this is genuinely the simplest and most secure route and protects against failures the best.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 62 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 246 |
2 files changed, 159 insertions, 149 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 0d64ab8e77..94e93b3e80 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -26,7 +26,7 @@ -export([start/1, stop/0]). --export([promote_backing_queue_state/4]). +-export([promote_backing_queue_state/5]). -behaviour(rabbit_backing_queue). @@ -36,7 +36,8 @@ coordinator, backing_queue, backing_queue_state, - set_delivered + set_delivered, + seen }). %% --------------------------------------------------------------------------- @@ -64,14 +65,16 @@ init(#amqqueue { arguments = Args } = Q, Recover) -> coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS, - set_delivered = 0 }. + set_delivered = 0, + seen = sets:new() }. -promote_backing_queue_state(CPid, BQ, BQS, GM) -> +promote_backing_queue_state(CPid, BQ, BQS, GM, Seen) -> #state { gm = GM, coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS, - set_delivered = BQ:len(BQS) }. + set_delivered = BQ:len(BQS), + seen = Seen }. terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> %% Backing queue termination. The queue is going down but @@ -94,22 +97,31 @@ purge(State = #state { gm = GM, {Count, State #state { backing_queue_state = BQS1, set_delivered = 0 }}. -publish(Msg = #basic_message { guid = Guid }, - MsgProps, ChPid, State = #state { gm = GM, +publish(Msg = #basic_message { guid = Guid }, MsgProps, ChPid, + State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + seen = Seen }) -> + case sets:is_element(Guid, Seen) of + true -> State #state { seen = sets:del_element(Guid, Seen) }; + false -> ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}), + BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + State #state { backing_queue_state = BQS1 } + end. + +publish_delivered(AckRequired, Msg = #basic_message { guid = Guid }, MsgProps, + ChPid, State = #state { gm = GM, backing_queue = BQ, - backing_queue_state = BQS }) -> - ok = gm:broadcast(GM, {publish, false, Guid, MsgProps, ChPid}), - BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), - State #state { backing_queue_state = BQS1 }. - -publish_delivered(AckRequired, Msg = #basic_message { guid = Guid }, - MsgProps, ChPid, - State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS }) -> - ok = gm:broadcast(GM, {publish, {true, AckRequired}, Guid, MsgProps, ChPid}), - {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS), - {AckTag, State #state { backing_queue_state = BQS1 }}. + backing_queue_state = BQS, + seen = Seen }) -> + case sets:is_element(Guid, Seen) of + true -> State #state { seen = sets:del_element(Guid, Seen) }; + false -> ok = gm:broadcast(GM, {publish, {true, AckRequired}, ChPid, + MsgProps, Msg}), + {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, + MsgProps, ChPid, BQS), + {AckTag, State #state { backing_queue_state = BQS1 }} + end. dropwhile(Fun, State = #state { gm = GM, backing_queue = BQ, @@ -126,7 +138,8 @@ dropwhile(Fun, State = #state { gm = GM, fetch(AckRequired, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS, - set_delivered = SetDelivered }) -> + set_delivered = SetDelivered, + seen = Seen }) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), State1 = State #state { backing_queue_state = BQS1 }, case Result of @@ -137,8 +150,13 @@ fetch(AckRequired, State = #state { gm = GM, ok = gm:broadcast(GM, {fetch, AckRequired, Guid, Remaining}), IsDelivered1 = IsDelivered orelse SetDelivered > 0, SetDelivered1 = lists:max([0, SetDelivered - 1]), + Seen1 = case SetDelivered + SetDelivered1 of + 1 -> sets:new(); %% transition to empty + _ -> Seen + end, {{Message, IsDelivered1, AckTag, Remaining}, - State1 #state { set_delivered = SetDelivered1 }} + State1 #state { set_delivered = SetDelivered1, + seen = Seen1 }} end. ack(AckTags, State = #state { gm = GM, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index f124bc9eb0..deb1cc66dd 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -86,7 +86,7 @@ sender_queues, %% :: Pid -> MsgQ guid_ack, %% :: Guid -> AckTag - instructions, %% :: InstrQ + seen, %% Set Guid guid_to_channel %% for confirms }). @@ -140,7 +140,7 @@ init([#amqqueue { name = QueueName } = Q]) -> sender_queues = dict:new(), guid_ack = dict:new(), - instructions = queue:new(), + seen = sets:new(), guid_to_channel = dict:new() }, hibernate, @@ -153,12 +153,12 @@ init([#amqqueue { name = QueueName } = Q]) -> handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) -> %% Synchronous, "immediate" delivery mode gen_server2:reply(From, false), %% master may deliver it, not us - handle_process_result(enqueue_message(Delivery, State)); + noreply(maybe_enqueue_message(Delivery, State)); handle_call({deliver, Delivery = #delivery {}}, From, State) -> %% Synchronous, "mandatory" delivery mode gen_server2:reply(From, true), %% amqqueue throws away the result anyway - handle_process_result(enqueue_message(Delivery, State)); + noreply(maybe_enqueue_message(Delivery, State)); handle_call({gm_deaths, Deaths}, From, State = #state { q = #amqqueue { name = QueueName }, @@ -187,16 +187,12 @@ handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) -> noreply(maybe_run_queue_via_backing_queue(Fun, State)); -handle_cast({gm, Instruction}, State = #state { instructions = InstrQ }) -> - State1 = State #state { instructions = queue:in(Instruction, InstrQ) }, - case queue:is_empty(InstrQ) of - true -> handle_process_result(process_instructions(State1)); - false -> noreply(State1) - end; +handle_cast({gm, Instruction}, State) -> + handle_process_result(process_instruction(Instruction, State)); handle_cast({deliver, Delivery = #delivery {}}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - handle_process_result(enqueue_message(Delivery, State)); + noreply(maybe_enqueue_message(Delivery, State)); handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), @@ -317,8 +313,8 @@ maybe_confirm_message(Guid, GTC) -> GTC end. -handle_process_result({continue, State}) -> noreply(State); -handle_process_result({stop, State}) -> {stop, normal, State}. +handle_process_result({ok, State}) -> noreply(State); +handle_process_result({stop, State}) -> {stop, normal, State}. promote_me(From, #state { q = Q, gm = GM, @@ -326,6 +322,7 @@ promote_me(From, #state { q = Q, backing_queue_state = BQS, rate_timer_ref = RateTRef, sender_queues = SQ, + seen = Seen, guid_ack = GA }) -> rabbit_log:info("Promoting slave ~p for queue ~p~n", [self(), Q #amqqueue.name]), @@ -334,7 +331,7 @@ promote_me(From, #state { q = Q, gen_server2:reply(From, {promote, CPid}), ok = gm:confirmed_broadcast(GM, heartbeat), MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( - CPid, BQ, BQS, GM), + CPid, BQ, BQS, GM, Seen), %% We have to do the requeue via this init because otherwise we %% don't have access to the relevent MsgPropsFun. Also, we are %% already in mnesia as the master queue pid. Thus we cannot just @@ -378,115 +375,111 @@ stop_rate_timer(State = #state { rate_timer_ref = TRef }) -> {ok, cancel} = timer:cancel(TRef), State #state { rate_timer_ref = undefined }. -enqueue_message(Delivery = #delivery { sender = ChPid }, - State = #state { sender_queues = SQ }) -> - Q = case dict:find(ChPid, SQ) of - {ok, Q1} -> Q1; - error -> queue:new() - end, - SQ1 = dict:store(ChPid, queue:in(Delivery, Q), SQ), - State1 = State #state { sender_queues = SQ1 }, - case queue:is_empty(Q) of - true -> process_instructions(State1); - false -> {continue, State1} - end. - -process_instructions(State = #state { instructions = InstrQ }) -> - case queue:out(InstrQ) of - {empty, _InstrQ} -> - {continue, State}; - {{value, Instr}, InstrQ1} -> - case process_instruction(Instr, State) of - {processed, State1} -> - process_instructions( - State1 #state { instructions = InstrQ1 }); - {stop, State1} -> - {stop, State1 #state { instructions = InstrQ1 }}; - blocked -> - {continue, State} - end +maybe_enqueue_message( + Delivery = #delivery { message = #basic_message { guid = Guid }, + sender = ChPid }, + State = #state { q = Q, + sender_queues = SQ, + seen = Seen, + guid_to_channel = GTC }) -> + case sets:is_element(Guid, Seen) of + true -> + GTC1 = record_confirm_or_confirm(Delivery, Q, GTC), + State #state { guid_to_channel = GTC1, + seen = sets:del_element(Guid, Seen) }; + false -> + MQ = case dict:find(ChPid, SQ) of + {ok, MQ1} -> MQ1; + error -> queue:new() + end, + SQ1 = dict:store(ChPid, queue:in(Delivery, MQ), SQ), + State #state { sender_queues = SQ1 } end. -process_instruction({publish, Deliver, Guid, MsgProps, ChPid}, - State = #state { q = Q, - sender_queues = SQ, - backing_queue = BQ, - backing_queue_state = BQS, - guid_ack = GA, - guid_to_channel = GTC }) -> - case dict:find(ChPid, SQ) of - error -> - blocked; - {ok, MQ} -> - case queue:out(MQ) of - {empty, _MQ} -> - blocked; - {{value, Delivery = #delivery { - message = Msg = #basic_message { guid = Guid } }}, - MQ1} -> - State1 = State #state { sender_queues = - dict:store(ChPid, MQ1, SQ) }, - GTC1 = record_confirm_or_confirm(Delivery, Q, GTC), - {processed, - case Deliver of - false -> - BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), - State1 #state { backing_queue_state = BQS1, - guid_to_channel = GTC1 }; - {true, AckRequired} -> - {AckTag, BQS1} = BQ:publish_delivered( - AckRequired, Msg, MsgProps, - ChPid, BQS), - {GA1, GTC2} = - case AckRequired of - true -> - {dict:store(Guid, AckTag, GA), GTC1}; - false -> - {GA, maybe_confirm_message(Guid, GTC1)} - end, - State1 #state { backing_queue_state = BQS1, - guid_ack = GA1, - guid_to_channel = GTC2 } - end}; - {{value, #delivery {}}, _MQ1} -> - %% throw away the instruction: we'll never receive - %% the message to which it corresponds. - {processed, State} - end - end; +process_instruction( + {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { guid = Guid }}, + State = #state { q = Q, + sender_queues = SQ, + backing_queue = BQ, + backing_queue_state = BQS, + guid_ack = GA, + seen = Seen, + guid_to_channel = GTC }) -> + {SQ1, Seen1, GTC1} = + case dict:find(ChPid, SQ) of + error -> + {SQ, sets:add_element(Guid, Seen), GTC}; + {ok, MQ} -> + case queue:out(MQ) of + {empty, _MQ} -> + {SQ, sets:add_element(Guid, Seen), GTC}; + {{value, Delivery = #delivery { + message = #basic_message { guid = Guid } }}, + MQ1} -> + GTC2 = record_confirm_or_confirm(Delivery, Q, GTC), + {dict:store(ChPid, MQ1, SQ), Seen, GTC2}; + {{value, #delivery {}}, _MQ1} -> + %% The instruction was sent to us before we + %% were within the mirror_pids within the + %% amqqueue record. We'll never receive the + %% message directly. + {SQ, Seen, GTC} + end + end, + State1 = State #state { sender_queues = SQ1, + seen = Seen1, + guid_to_channel = GTC1 }, + {ok, + case Deliver of + false -> + BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + State1 #state { backing_queue_state = BQS1 }; + {true, AckRequired} -> + {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, + ChPid, BQS), + {GA1, GTC3} = case AckRequired of + true -> {dict:store(Guid, AckTag, GA), GTC1}; + false -> {GA, maybe_confirm_message(Guid, GTC1)} + end, + State1 #state { backing_queue_state = BQS1, + guid_ack = GA1, + guid_to_channel = GTC3 } + end}; process_instruction({set_length, Length}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> QLen = BQ:len(BQS), ToDrop = QLen - Length, - {processed, - case ToDrop > 0 of - true -> BQS1 = lists:foldl( - fun (const, BQSN) -> BQ:fetch(false, BQSN) end, - BQS, lists:duplicate(ToDrop, const)), - State #state { backing_queue_state = BQS1 }; - false -> State - end}; + {ok, case ToDrop > 0 of + true -> BQS1 = + lists:foldl( + fun (const, BQSN) -> + {{_Msg, _IsDelivered, _AckTag, _Remaining}, + BQSN1} = BQ:fetch(false, BQSN), + BQSN1 + end, BQS, lists:duplicate(ToDrop, const)), + State #state { backing_queue_state = BQS1 }; + false -> State + end}; process_instruction({fetch, AckRequired, Guid, Remaining}, State = #state { backing_queue = BQ, backing_queue_state = BQS, guid_ack = GA }) -> QLen = BQ:len(BQS), - {processed, - case QLen - 1 of - Remaining -> - {{_Msg, _IsDelivered, AckTag, Remaining}, BQS1} = - BQ:fetch(AckRequired, BQS), - GA1 = case AckRequired of - true -> dict:store(Guid, AckTag, GA); - false -> GA - end, - State #state { backing_queue_state = BQS1, - guid_ack = GA1 }; - Other when Other < Remaining -> - %% we must be shorter than the master - State - end}; + {ok, case QLen - 1 of + Remaining -> + {{_Msg, _IsDelivered, AckTag, Remaining}, BQS1} = + BQ:fetch(AckRequired, BQS), + GA1 = case AckRequired of + true -> dict:store(Guid, AckTag, GA); + false -> GA + end, + State #state { backing_queue_state = BQS1, + guid_ack = GA1 }; + Other when Other < Remaining -> + %% we must be shorter than the master + State + end}; process_instruction({ack, Guids}, State = #state { backing_queue = BQ, backing_queue_state = BQS, @@ -494,27 +487,26 @@ process_instruction({ack, Guids}, {AckTags, GA1} = guids_to_acktags(Guids, GA), {Guids1, BQS1} = BQ:ack(AckTags, BQS), [] = Guids1 -- Guids, %% ASSERTION - {processed, State #state { guid_ack = GA1, - backing_queue_state = BQS1 }}; + {ok, State #state { guid_ack = GA1, + backing_queue_state = BQS1 }}; process_instruction({requeue, MsgPropsFun, Guids}, State = #state { backing_queue = BQ, backing_queue_state = BQS, guid_ack = GA }) -> {AckTags, GA1} = guids_to_acktags(Guids, GA), - {processed, - case length(AckTags) =:= length(Guids) of - true -> - {Guids, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS), - State #state { guid_ack = GA1, - backing_queue_state = BQS1 }; - false -> - %% the only thing we can safely do is nuke out our BQ and - %% GA - {_Count, BQS1} = BQ:purge(BQS), - {Guids, BQS2} = ack_all(BQ, GA, BQS1), - State #state { guid_ack = dict:new(), - backing_queue_state = BQS2 } - end}; + {ok, case length(AckTags) =:= length(Guids) of + true -> + {Guids, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS), + State #state { guid_ack = GA1, + backing_queue_state = BQS1 }; + false -> + %% the only thing we can safely do is nuke out our BQ + %% and GA + {_Count, BQS1} = BQ:purge(BQS), + {Guids, BQS2} = ack_all(BQ, GA, BQS1), + State #state { guid_ack = dict:new(), + backing_queue_state = BQS2 } + end}; process_instruction(delete_and_terminate, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> |
