diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 62 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 246 |
2 files changed, 159 insertions, 149 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 0d64ab8e77..94e93b3e80 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -26,7 +26,7 @@ -export([start/1, stop/0]). --export([promote_backing_queue_state/4]). +-export([promote_backing_queue_state/5]). -behaviour(rabbit_backing_queue). @@ -36,7 +36,8 @@ coordinator, backing_queue, backing_queue_state, - set_delivered + set_delivered, + seen }). %% --------------------------------------------------------------------------- @@ -64,14 +65,16 @@ init(#amqqueue { arguments = Args } = Q, Recover) -> coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS, - set_delivered = 0 }. + set_delivered = 0, + seen = sets:new() }. -promote_backing_queue_state(CPid, BQ, BQS, GM) -> +promote_backing_queue_state(CPid, BQ, BQS, GM, Seen) -> #state { gm = GM, coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS, - set_delivered = BQ:len(BQS) }. + set_delivered = BQ:len(BQS), + seen = Seen }. terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> %% Backing queue termination. The queue is going down but @@ -94,22 +97,31 @@ purge(State = #state { gm = GM, {Count, State #state { backing_queue_state = BQS1, set_delivered = 0 }}. -publish(Msg = #basic_message { guid = Guid }, - MsgProps, ChPid, State = #state { gm = GM, +publish(Msg = #basic_message { guid = Guid }, MsgProps, ChPid, + State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + seen = Seen }) -> + case sets:is_element(Guid, Seen) of + true -> State #state { seen = sets:del_element(Guid, Seen) }; + false -> ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}), + BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + State #state { backing_queue_state = BQS1 } + end. + +publish_delivered(AckRequired, Msg = #basic_message { guid = Guid }, MsgProps, + ChPid, State = #state { gm = GM, backing_queue = BQ, - backing_queue_state = BQS }) -> - ok = gm:broadcast(GM, {publish, false, Guid, MsgProps, ChPid}), - BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), - State #state { backing_queue_state = BQS1 }. - -publish_delivered(AckRequired, Msg = #basic_message { guid = Guid }, - MsgProps, ChPid, - State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS }) -> - ok = gm:broadcast(GM, {publish, {true, AckRequired}, Guid, MsgProps, ChPid}), - {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS), - {AckTag, State #state { backing_queue_state = BQS1 }}. + backing_queue_state = BQS, + seen = Seen }) -> + case sets:is_element(Guid, Seen) of + true -> State #state { seen = sets:del_element(Guid, Seen) }; + false -> ok = gm:broadcast(GM, {publish, {true, AckRequired}, ChPid, + MsgProps, Msg}), + {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, + MsgProps, ChPid, BQS), + {AckTag, State #state { backing_queue_state = BQS1 }} + end. dropwhile(Fun, State = #state { gm = GM, backing_queue = BQ, @@ -126,7 +138,8 @@ dropwhile(Fun, State = #state { gm = GM, fetch(AckRequired, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS, - set_delivered = SetDelivered }) -> + set_delivered = SetDelivered, + seen = Seen }) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), State1 = State #state { backing_queue_state = BQS1 }, case Result of @@ -137,8 +150,13 @@ fetch(AckRequired, State = #state { gm = GM, ok = gm:broadcast(GM, {fetch, AckRequired, Guid, Remaining}), IsDelivered1 = IsDelivered orelse SetDelivered > 0, SetDelivered1 = lists:max([0, SetDelivered - 1]), + Seen1 = case SetDelivered + SetDelivered1 of + 1 -> sets:new(); %% transition to empty + _ -> Seen + end, {{Message, IsDelivered1, AckTag, Remaining}, - State1 #state { set_delivered = SetDelivered1 }} + State1 #state { set_delivered = SetDelivered1, + seen = Seen1 }} end. ack(AckTags, State = #state { gm = GM, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index f124bc9eb0..deb1cc66dd 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -86,7 +86,7 @@ sender_queues, %% :: Pid -> MsgQ guid_ack, %% :: Guid -> AckTag - instructions, %% :: InstrQ + seen, %% Set Guid guid_to_channel %% for confirms }). @@ -140,7 +140,7 @@ init([#amqqueue { name = QueueName } = Q]) -> sender_queues = dict:new(), guid_ack = dict:new(), - instructions = queue:new(), + seen = sets:new(), guid_to_channel = dict:new() }, hibernate, @@ -153,12 +153,12 @@ init([#amqqueue { name = QueueName } = Q]) -> handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) -> %% Synchronous, "immediate" delivery mode gen_server2:reply(From, false), %% master may deliver it, not us - handle_process_result(enqueue_message(Delivery, State)); + noreply(maybe_enqueue_message(Delivery, State)); handle_call({deliver, Delivery = #delivery {}}, From, State) -> %% Synchronous, "mandatory" delivery mode gen_server2:reply(From, true), %% amqqueue throws away the result anyway - handle_process_result(enqueue_message(Delivery, State)); + noreply(maybe_enqueue_message(Delivery, State)); handle_call({gm_deaths, Deaths}, From, State = #state { q = #amqqueue { name = QueueName }, @@ -187,16 +187,12 @@ handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) -> noreply(maybe_run_queue_via_backing_queue(Fun, State)); -handle_cast({gm, Instruction}, State = #state { instructions = InstrQ }) -> - State1 = State #state { instructions = queue:in(Instruction, InstrQ) }, - case queue:is_empty(InstrQ) of - true -> handle_process_result(process_instructions(State1)); - false -> noreply(State1) - end; +handle_cast({gm, Instruction}, State) -> + handle_process_result(process_instruction(Instruction, State)); handle_cast({deliver, Delivery = #delivery {}}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - handle_process_result(enqueue_message(Delivery, State)); + noreply(maybe_enqueue_message(Delivery, State)); handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), @@ -317,8 +313,8 @@ maybe_confirm_message(Guid, GTC) -> GTC end. -handle_process_result({continue, State}) -> noreply(State); -handle_process_result({stop, State}) -> {stop, normal, State}. +handle_process_result({ok, State}) -> noreply(State); +handle_process_result({stop, State}) -> {stop, normal, State}. promote_me(From, #state { q = Q, gm = GM, @@ -326,6 +322,7 @@ promote_me(From, #state { q = Q, backing_queue_state = BQS, rate_timer_ref = RateTRef, sender_queues = SQ, + seen = Seen, guid_ack = GA }) -> rabbit_log:info("Promoting slave ~p for queue ~p~n", [self(), Q #amqqueue.name]), @@ -334,7 +331,7 @@ promote_me(From, #state { q = Q, gen_server2:reply(From, {promote, CPid}), ok = gm:confirmed_broadcast(GM, heartbeat), MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( - CPid, BQ, BQS, GM), + CPid, BQ, BQS, GM, Seen), %% We have to do the requeue via this init because otherwise we %% don't have access to the relevent MsgPropsFun. Also, we are %% already in mnesia as the master queue pid. Thus we cannot just @@ -378,115 +375,111 @@ stop_rate_timer(State = #state { rate_timer_ref = TRef }) -> {ok, cancel} = timer:cancel(TRef), State #state { rate_timer_ref = undefined }. -enqueue_message(Delivery = #delivery { sender = ChPid }, - State = #state { sender_queues = SQ }) -> - Q = case dict:find(ChPid, SQ) of - {ok, Q1} -> Q1; - error -> queue:new() - end, - SQ1 = dict:store(ChPid, queue:in(Delivery, Q), SQ), - State1 = State #state { sender_queues = SQ1 }, - case queue:is_empty(Q) of - true -> process_instructions(State1); - false -> {continue, State1} - end. - -process_instructions(State = #state { instructions = InstrQ }) -> - case queue:out(InstrQ) of - {empty, _InstrQ} -> - {continue, State}; - {{value, Instr}, InstrQ1} -> - case process_instruction(Instr, State) of - {processed, State1} -> - process_instructions( - State1 #state { instructions = InstrQ1 }); - {stop, State1} -> - {stop, State1 #state { instructions = InstrQ1 }}; - blocked -> - {continue, State} - end +maybe_enqueue_message( + Delivery = #delivery { message = #basic_message { guid = Guid }, + sender = ChPid }, + State = #state { q = Q, + sender_queues = SQ, + seen = Seen, + guid_to_channel = GTC }) -> + case sets:is_element(Guid, Seen) of + true -> + GTC1 = record_confirm_or_confirm(Delivery, Q, GTC), + State #state { guid_to_channel = GTC1, + seen = sets:del_element(Guid, Seen) }; + false -> + MQ = case dict:find(ChPid, SQ) of + {ok, MQ1} -> MQ1; + error -> queue:new() + end, + SQ1 = dict:store(ChPid, queue:in(Delivery, MQ), SQ), + State #state { sender_queues = SQ1 } end. -process_instruction({publish, Deliver, Guid, MsgProps, ChPid}, - State = #state { q = Q, - sender_queues = SQ, - backing_queue = BQ, - backing_queue_state = BQS, - guid_ack = GA, - guid_to_channel = GTC }) -> - case dict:find(ChPid, SQ) of - error -> - blocked; - {ok, MQ} -> - case queue:out(MQ) of - {empty, _MQ} -> - blocked; - {{value, Delivery = #delivery { - message = Msg = #basic_message { guid = Guid } }}, - MQ1} -> - State1 = State #state { sender_queues = - dict:store(ChPid, MQ1, SQ) }, - GTC1 = record_confirm_or_confirm(Delivery, Q, GTC), - {processed, - case Deliver of - false -> - BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), - State1 #state { backing_queue_state = BQS1, - guid_to_channel = GTC1 }; - {true, AckRequired} -> - {AckTag, BQS1} = BQ:publish_delivered( - AckRequired, Msg, MsgProps, - ChPid, BQS), - {GA1, GTC2} = - case AckRequired of - true -> - {dict:store(Guid, AckTag, GA), GTC1}; - false -> - {GA, maybe_confirm_message(Guid, GTC1)} - end, - State1 #state { backing_queue_state = BQS1, - guid_ack = GA1, - guid_to_channel = GTC2 } - end}; - {{value, #delivery {}}, _MQ1} -> - %% throw away the instruction: we'll never receive - %% the message to which it corresponds. - {processed, State} - end - end; +process_instruction( + {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { guid = Guid }}, + State = #state { q = Q, + sender_queues = SQ, + backing_queue = BQ, + backing_queue_state = BQS, + guid_ack = GA, + seen = Seen, + guid_to_channel = GTC }) -> + {SQ1, Seen1, GTC1} = + case dict:find(ChPid, SQ) of + error -> + {SQ, sets:add_element(Guid, Seen), GTC}; + {ok, MQ} -> + case queue:out(MQ) of + {empty, _MQ} -> + {SQ, sets:add_element(Guid, Seen), GTC}; + {{value, Delivery = #delivery { + message = #basic_message { guid = Guid } }}, + MQ1} -> + GTC2 = record_confirm_or_confirm(Delivery, Q, GTC), + {dict:store(ChPid, MQ1, SQ), Seen, GTC2}; + {{value, #delivery {}}, _MQ1} -> + %% The instruction was sent to us before we + %% were within the mirror_pids within the + %% amqqueue record. We'll never receive the + %% message directly. + {SQ, Seen, GTC} + end + end, + State1 = State #state { sender_queues = SQ1, + seen = Seen1, + guid_to_channel = GTC1 }, + {ok, + case Deliver of + false -> + BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + State1 #state { backing_queue_state = BQS1 }; + {true, AckRequired} -> + {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, + ChPid, BQS), + {GA1, GTC3} = case AckRequired of + true -> {dict:store(Guid, AckTag, GA), GTC1}; + false -> {GA, maybe_confirm_message(Guid, GTC1)} + end, + State1 #state { backing_queue_state = BQS1, + guid_ack = GA1, + guid_to_channel = GTC3 } + end}; process_instruction({set_length, Length}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> QLen = BQ:len(BQS), ToDrop = QLen - Length, - {processed, - case ToDrop > 0 of - true -> BQS1 = lists:foldl( - fun (const, BQSN) -> BQ:fetch(false, BQSN) end, - BQS, lists:duplicate(ToDrop, const)), - State #state { backing_queue_state = BQS1 }; - false -> State - end}; + {ok, case ToDrop > 0 of + true -> BQS1 = + lists:foldl( + fun (const, BQSN) -> + {{_Msg, _IsDelivered, _AckTag, _Remaining}, + BQSN1} = BQ:fetch(false, BQSN), + BQSN1 + end, BQS, lists:duplicate(ToDrop, const)), + State #state { backing_queue_state = BQS1 }; + false -> State + end}; process_instruction({fetch, AckRequired, Guid, Remaining}, State = #state { backing_queue = BQ, backing_queue_state = BQS, guid_ack = GA }) -> QLen = BQ:len(BQS), - {processed, - case QLen - 1 of - Remaining -> - {{_Msg, _IsDelivered, AckTag, Remaining}, BQS1} = - BQ:fetch(AckRequired, BQS), - GA1 = case AckRequired of - true -> dict:store(Guid, AckTag, GA); - false -> GA - end, - State #state { backing_queue_state = BQS1, - guid_ack = GA1 }; - Other when Other < Remaining -> - %% we must be shorter than the master - State - end}; + {ok, case QLen - 1 of + Remaining -> + {{_Msg, _IsDelivered, AckTag, Remaining}, BQS1} = + BQ:fetch(AckRequired, BQS), + GA1 = case AckRequired of + true -> dict:store(Guid, AckTag, GA); + false -> GA + end, + State #state { backing_queue_state = BQS1, + guid_ack = GA1 }; + Other when Other < Remaining -> + %% we must be shorter than the master + State + end}; process_instruction({ack, Guids}, State = #state { backing_queue = BQ, backing_queue_state = BQS, @@ -494,27 +487,26 @@ process_instruction({ack, Guids}, {AckTags, GA1} = guids_to_acktags(Guids, GA), {Guids1, BQS1} = BQ:ack(AckTags, BQS), [] = Guids1 -- Guids, %% ASSERTION - {processed, State #state { guid_ack = GA1, - backing_queue_state = BQS1 }}; + {ok, State #state { guid_ack = GA1, + backing_queue_state = BQS1 }}; process_instruction({requeue, MsgPropsFun, Guids}, State = #state { backing_queue = BQ, backing_queue_state = BQS, guid_ack = GA }) -> {AckTags, GA1} = guids_to_acktags(Guids, GA), - {processed, - case length(AckTags) =:= length(Guids) of - true -> - {Guids, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS), - State #state { guid_ack = GA1, - backing_queue_state = BQS1 }; - false -> - %% the only thing we can safely do is nuke out our BQ and - %% GA - {_Count, BQS1} = BQ:purge(BQS), - {Guids, BQS2} = ack_all(BQ, GA, BQS1), - State #state { guid_ack = dict:new(), - backing_queue_state = BQS2 } - end}; + {ok, case length(AckTags) =:= length(Guids) of + true -> + {Guids, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS), + State #state { guid_ack = GA1, + backing_queue_state = BQS1 }; + false -> + %% the only thing we can safely do is nuke out our BQ + %% and GA + {_Count, BQS1} = BQ:purge(BQS), + {Guids, BQS2} = ack_all(BQ, GA, BQS1), + State #state { guid_ack = dict:new(), + backing_queue_state = BQS2 } + end}; process_instruction(delete_and_terminate, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> |
