diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-12-20 13:56:21 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-12-20 13:56:21 +0000 |
| commit | b60e1c47fd36139cf5f1f4b4fff13b5110728b6c (patch) | |
| tree | 43aa82c43a1ba386190ff68c7e3831653f683311 /src | |
| parent | b88e8aac5cfd38cf7380984565c1f4e134772f51 (diff) | |
| download | rabbitmq-server-git-b60e1c47fd36139cf5f1f4b4fff13b5110728b6c.tar.gz | |
Revert the previous changeset as I've decided to solve this differently
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 63 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 125 |
2 files changed, 53 insertions, 135 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 4628796f57..0d64ab8e77 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -26,7 +26,7 @@ -export([start/1, stop/0]). --export([promote_backing_queue_state/5]). +-export([promote_backing_queue_state/4]). -behaviour(rabbit_backing_queue). @@ -36,8 +36,7 @@ coordinator, backing_queue, backing_queue_state, - set_delivered, - fakes + set_delivered }). %% --------------------------------------------------------------------------- @@ -65,16 +64,14 @@ init(#amqqueue { arguments = Args } = Q, Recover) -> coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS, - set_delivered = 0, - fakes = sets:new() }. + set_delivered = 0 }. -promote_backing_queue_state(CPid, BQ, BQS, GM, Fakes) -> +promote_backing_queue_state(CPid, BQ, BQS, GM) -> #state { gm = GM, coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS, - set_delivered = BQ:len(BQS), - fakes = Fakes }. + set_delivered = BQ:len(BQS) }. terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> %% Backing queue termination. The queue is going down but @@ -129,54 +126,30 @@ dropwhile(Fun, State = #state { gm = GM, fetch(AckRequired, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS, - set_delivered = SetDelivered, - fakes = Fakes }) -> + set_delivered = SetDelivered }) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), + State1 = State #state { backing_queue_state = BQS1 }, case Result of empty -> - {Result, State #state { backing_queue_state = BQS1 }}; + {Result, State1}; {#basic_message { guid = Guid } = Message, IsDelivered, AckTag, Remaining} -> + ok = gm:broadcast(GM, {fetch, AckRequired, Guid, Remaining}), + IsDelivered1 = IsDelivered orelse SetDelivered > 0, SetDelivered1 = lists:max([0, SetDelivered - 1]), - case sets:is_element(Guid, Fakes) of - true -> - {BQS2, Fakes1} = - case AckRequired of - true -> {[Guid], BQS3} = BQ:ack([AckTag], BQS1), - {BQS3, Fakes}; - false -> {BQS1, sets:del_element(Guid, Fakes)} - end, - ok = gm:broadcast(GM, {fetch, false, Guid, Remaining}), - fetch(AckRequired, - State #state { backing_queue_state = BQS2, - set_delivered = SetDelivered1, - fakes = Fakes1 }); - false -> - ok = gm:broadcast(GM, - {fetch, AckRequired, Guid, Remaining}), - IsDelivered1 = IsDelivered orelse SetDelivered > 0, - Fakes1 = case SetDelivered + SetDelivered1 of - 1 -> sets:new(); %% transition to 0 - _ -> Fakes - end, - {{Message, IsDelivered1, AckTag, Remaining}, - State #state { backing_queue_state = BQS1, - set_delivered = SetDelivered1, - fakes = Fakes1 }} - end + {{Message, IsDelivered1, AckTag, Remaining}, + State1 #state { set_delivered = SetDelivered1 }} end. ack(AckTags, State = #state { gm = GM, backing_queue = BQ, - backing_queue_state = BQS, - fakes = Fakes }) -> + backing_queue_state = BQS }) -> {Guids, BQS1} = BQ:ack(AckTags, BQS), - Fakes1 = case Guids of - [] -> Fakes; - _ -> ok = gm:broadcast(GM, {ack, Guids}), - sets:difference(Fakes, sets:from_list(Guids)) - end, - {Guids, State #state { backing_queue_state = BQS1, fakes = Fakes1 }}. + case Guids of + [] -> ok; + _ -> ok = gm:broadcast(GM, {ack, Guids}) + end, + {Guids, State #state { backing_queue_state = BQS1 }}. tx_publish(Txn, Msg, MsgProps, ChPid, #state {} = State) -> %% gm:broadcast(GM, {tx_publish, Txn, Guid, MsgProps, ChPid}) diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 0134787cd0..f124bc9eb0 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -75,7 +75,6 @@ -behaviour(gm). -include("rabbit.hrl"). --include("rabbit_framing.hrl"). -include("gm_specs.hrl"). -record(state, { q, @@ -88,7 +87,6 @@ sender_queues, %% :: Pid -> MsgQ guid_ack, %% :: Guid -> AckTag instructions, %% :: InstrQ - fakes, %% :: Set Guid guid_to_channel %% for confirms }). @@ -143,7 +141,6 @@ init([#amqqueue { name = QueueName } = Q]) -> sender_queues = dict:new(), guid_ack = dict:new(), instructions = queue:new(), - fakes = sets:new(), guid_to_channel = dict:new() }, hibernate, @@ -193,7 +190,7 @@ handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) -> handle_cast({gm, Instruction}, State = #state { instructions = InstrQ }) -> State1 = State #state { instructions = queue:in(Instruction, InstrQ) }, case queue:is_empty(InstrQ) of - true -> handle_process_result(process_instructions(false, State1)); + true -> handle_process_result(process_instructions(State1)); false -> noreply(State1) end; @@ -323,25 +320,21 @@ maybe_confirm_message(Guid, GTC) -> handle_process_result({continue, State}) -> noreply(State); handle_process_result({stop, State}) -> {stop, normal, State}. -promote_me(From, State = #state { q = Q }) -> +promote_me(From, #state { q = Q, + gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = RateTRef, + sender_queues = SQ, + guid_ack = GA }) -> rabbit_log:info("Promoting slave ~p for queue ~p~n", [self(), Q #amqqueue.name]), - #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - rate_timer_ref = RateTRef, - sender_queues = SQ, - guid_ack = GA, - instructions = Instr, - fakes = Fakes } = - process_instructions(true, State), - true = queue:is_empty(Instr), %% ASSERTION {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, GM), true = unlink(GM), gen_server2:reply(From, {promote, CPid}), ok = gm:confirmed_broadcast(GM, heartbeat), MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( - CPid, BQ, BQS, GM, Fakes), + CPid, BQ, BQS, GM), %% We have to do the requeue via this init because otherwise we %% don't have access to the relevent MsgPropsFun. Also, we are %% already in mnesia as the master queue pid. Thus we cannot just @@ -394,19 +387,19 @@ enqueue_message(Delivery = #delivery { sender = ChPid }, SQ1 = dict:store(ChPid, queue:in(Delivery, Q), SQ), State1 = State #state { sender_queues = SQ1 }, case queue:is_empty(Q) of - true -> process_instructions(false, State1); + true -> process_instructions(State1); false -> {continue, State1} end. -process_instructions(Flush, State = #state { instructions = InstrQ }) -> +process_instructions(State = #state { instructions = InstrQ }) -> case queue:out(InstrQ) of {empty, _InstrQ} -> {continue, State}; {{value, Instr}, InstrQ1} -> - case process_instruction(Flush, Instr, State) of + case process_instruction(Instr, State) of {processed, State1} -> process_instructions( - Flush, State1 #state { instructions = InstrQ1 }); + State1 #state { instructions = InstrQ1 }); {stop, State1} -> {stop, State1 #state { instructions = InstrQ1 }}; blocked -> @@ -414,38 +407,20 @@ process_instructions(Flush, State = #state { instructions = InstrQ }) -> end end. -process_instruction(Flush, {publish, Deliver, Guid, MsgProps, ChPid} = Instr, +process_instruction({publish, Deliver, Guid, MsgProps, ChPid}, State = #state { q = Q, sender_queues = SQ, backing_queue = BQ, backing_queue_state = BQS, guid_ack = GA, - guid_to_channel = GTC, - fakes = Fakes }) -> + guid_to_channel = GTC }) -> case dict:find(ChPid, SQ) of error -> - case Flush of - true -> MQ = queue:from_list([fake_delivery(Q, Guid, ChPid)]), - State1 = State #state { - sender_queues = dict:store(ChPid, MQ, SQ), - fakes = sets:add_element(Guid, Fakes) }, - process_instruction(Flush, Instr, State1); - false -> blocked - end; + blocked; {ok, MQ} -> case queue:out(MQ) of {empty, _MQ} -> - case Flush of - true -> - MQ1 = queue:in_r(fake_delivery(Q, Guid, ChPid), MQ), - SQ1 = dict:store(ChPid, MQ1, SQ), - State1 = State #state { - sender_queues = SQ1, - fakes = sets:add_element(Guid, Fakes) }, - process_instruction(Flush, Instr, State1); - false -> - blocked - end; + blocked; {{value, Delivery = #delivery { message = Msg = #basic_message { guid = Guid } }}, MQ1} -> @@ -474,41 +449,28 @@ process_instruction(Flush, {publish, Deliver, Guid, MsgProps, ChPid} = Instr, guid_to_channel = GTC2 } end}; {{value, #delivery {}}, _MQ1} -> - MQ1 = queue:in_r(fake_delivery(Q, Guid, ChPid), MQ), - State1 = State #state { - sender_queues = dict:store(ChPid, MQ1, SQ), - fakes = sets:add_element(Guid, Fakes) }, - process_instruction(Flush, Instr, State1) + %% throw away the instruction: we'll never receive + %% the message to which it corresponds. + {processed, State} end end; -process_instruction(_Flush, {set_length, Length}, +process_instruction({set_length, Length}, State = #state { backing_queue = BQ, - backing_queue_state = BQS, - fakes = Fakes }) -> + backing_queue_state = BQS }) -> QLen = BQ:len(BQS), ToDrop = QLen - Length, {processed, case ToDrop > 0 of - true -> - {Guids, BQS1} = - lists:foldl( - fun (const, {GuidsN, BQSN}) -> - {{#basic_message { guid = Guid }, _IsDelivered, - _AckTag, _Remaining}, BQSN1} = - BQ:fetch(false, BQSN), - {[Guid | GuidsN], BQSN1} - end, BQS, lists:duplicate(ToDrop, const)), - Fakes1 = sets:difference(Fakes, sets:from_list(Guids)), - State #state { backing_queue_state = BQS1, - fakes = Fakes1 }; - false -> - State + true -> BQS1 = lists:foldl( + fun (const, BQSN) -> BQ:fetch(false, BQSN) end, + BQS, lists:duplicate(ToDrop, const)), + State #state { backing_queue_state = BQS1 }; + false -> State end}; -process_instruction(_Flush, {fetch, AckRequired, Guid, Remaining}, +process_instruction({fetch, AckRequired, Guid, Remaining}, State = #state { backing_queue = BQ, backing_queue_state = BQS, - guid_ack = GA, - fakes = Fakes }) -> + guid_ack = GA }) -> QLen = BQ:len(BQS), {processed, case QLen - 1 of @@ -519,28 +481,22 @@ process_instruction(_Flush, {fetch, AckRequired, Guid, Remaining}, true -> dict:store(Guid, AckTag, GA); false -> GA end, - Fakes1 = sets:del_element(Guid, Fakes), State #state { backing_queue_state = BQS1, - guid_ack = GA1, - fakes = Fakes1 }; + guid_ack = GA1 }; Other when Other < Remaining -> %% we must be shorter than the master - false = sets:is_element(Guid, Fakes), %% ASSERTION State end}; -process_instruction(_Flush, {ack, Guids}, +process_instruction({ack, Guids}, State = #state { backing_queue = BQ, backing_queue_state = BQS, - guid_ack = GA, - fakes = Fakes }) -> + guid_ack = GA }) -> {AckTags, GA1} = guids_to_acktags(Guids, GA), {Guids1, BQS1} = BQ:ack(AckTags, BQS), [] = Guids1 -- Guids, %% ASSERTION - Fakes1 = sets:difference(Fakes, sets:from_list(Guids)), {processed, State #state { guid_ack = GA1, - backing_queue_state = BQS1, - fakes = Fakes1 }}; -process_instruction(_Flush, {requeue, MsgPropsFun, Guids}, + backing_queue_state = BQS1 }}; +process_instruction({requeue, MsgPropsFun, Guids}, State = #state { backing_queue = BQ, backing_queue_state = BQS, guid_ack = GA }) -> @@ -559,7 +515,7 @@ process_instruction(_Flush, {requeue, MsgPropsFun, Guids}, State #state { guid_ack = dict:new(), backing_queue_state = BQS2 } end}; -process_instruction(_Flush, delete_and_terminate, +process_instruction(delete_and_terminate, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:delete_and_terminate(BQS), @@ -578,14 +534,3 @@ guids_to_acktags(Guids, GA) -> ack_all(BQ, GA, BQS) -> BQ:ack([AckTag || {_Guid, AckTag} <- dict:to_list(GA)], BQS). - -fake_delivery(#amqqueue { name = QueueName }, Guid, ChPid) -> - ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>), - Msg = (rabbit_basic:message(ExchangeName, <<>>, #'P_basic'{}, <<>>)) - #basic_message { guid = Guid }, - #delivery { mandatory = false, - immediate = false, - txn = none, - sender = ChPid, - message = Msg, - msg_seq_no = undefined }. |
