diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-04-08 10:27:19 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-04-08 10:27:19 +0100 |
| commit | 79bbb0a7c29bb4edbcc316c3bb993bca78d2aaf2 (patch) | |
| tree | d831423e4c63525fc742c700c248d831c737cc33 | |
| parent | 8855feacf6d188872a2abcc1eaa52d7a7a5c280c (diff) | |
| download | rabbitmq-server-git-79bbb0a7c29bb4edbcc316c3bb993bca78d2aaf2.tar.gz | |
Sod it - transactions are too hard to do in mirror queues so don't bother. I know how to do it, but it's horrifically messy, and the margin is too small
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 96 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 183 |
2 files changed, 44 insertions, 235 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index a59d64d4cc..387dfbc481 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -26,7 +26,7 @@ -export([start/1, stop/0]). --export([promote_backing_queue_state/6]). +-export([promote_backing_queue_state/5]). -behaviour(rabbit_backing_queue). @@ -39,8 +39,7 @@ set_delivered, seen_status, confirmed, - ack_msg_id, - abandoned_txns + ack_msg_id }). %% --------------------------------------------------------------------------- @@ -78,7 +77,7 @@ init(#amqqueue { arguments = Args, name = QName } = Q, Recover, confirmed = [], ack_msg_id = dict:new() }. -promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, AbandonedTxns) -> +promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus) -> #state { gm = GM, coordinator = CPid, backing_queue = BQ, @@ -86,8 +85,7 @@ promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, AbandonedTxns) -> set_delivered = BQ:len(BQS), seen_status = SeenStatus, confirmed = [], - ack_msg_id = dict:new(), - abandoned_txns = AbandonedTxns }. + ack_msg_id = dict:new() }. terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> %% Backing queue termination. The queue is going down but @@ -214,68 +212,20 @@ ack(AckTags, State = #state { gm = GM, {MsgIds, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 }}. -tx_publish(Txn, Msg, MsgProps, ChPid, - State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - abandoned_txns = AbandonedTxns }) -> - case sets:is_element(Txn, AbandonedTxns) of - true -> State; - false -> ok = gm:broadcast(GM, {tx_publish, Txn, ChPid, MsgProps, Msg}), - BQS1 = BQ:tx_publish(Txn, Msg, MsgProps, ChPid, State), - State #state { backing_queue_state = BQS1 } - end. +tx_publish(_Txn, _Msg, _MsgProps, _ChPid, State) -> + %% We don't support txns in mirror queues + State. -tx_ack(Txn, AckTags, State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - ack_msg_id = AM, - abandoned_txns = AbandonedTxns }) -> - case sets:is_element(Txn, AbandonedTxns) of - true -> - State; - false -> - MsgIds = lists:foldl( - fun (AckTag, Acc) -> [dict:fetch(AckTag, AM) | Acc] end, - [], AckTags), - ok = gm:broadcast(GM, {tx_ack, Txn, MsgIds}), - BQS1 = BQ:tx_ack(Txn, AckTags, BQS), - State #state { backing_queue_state = BQS1 } - end. +tx_ack(_Txn, _AckTags, State) -> + %% We don't support txns in mirror queues + State. -tx_rollback(Txn, State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - abandoned_txns = AbandonedTxns }) -> - case sets:is_element(Txn, AbandonedTxns) of - true -> {[], State}; - false -> {AckTags, BQS1} = BQ:tx_rollback(Txn, BQS), - ok = gm:confirmed_broadcast(GM, {tx_rollback, Txn}), - {AckTags, State #state { backing_queue_state = BQS1 }} - end. +tx_rollback(_Txn, State) -> + {[], State}. -tx_commit(Txn, PostCommitFun, MsgPropsFun, - State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - ack_msg_id = AM }) -> - case sets:is_element(Txn, AbandonedTxns) of - true -> - %% Don't worry - the channel will explode as it'll still - %% try to commit on the old master. - {[], State}; - false -> - {AckTags, BQS1} = BQ:tx_commit(Txn, PostCommitFun, MsgPropsFun, BQS), - {MsgIds, AM1} = lists:foldl( - fun (AckTag, {MsgIdsN, AMN}) -> - MsgId = dict:fetch(AckTag, AMN), - {[MsgId|MsgIdsN], dict:erase(AckTag, AMN)} - end, {[], AM}, AckTags), - ok = gm:confirmed_broadcast( - GM, {tx_commit, Txn, MsgPropsFun, MsgIds}), - {AckTags, State #state { backing_queue_state = BQS, - ack_msg_id = AM }} - end. +tx_commit(_Txn, PostCommitFun, _MsgPropsFun, State) -> + PostCommitFun(), %% Probably must run it to avoid deadlocks + {[], State}. requeue(AckTags, MsgPropsFun, State = #state { gm = GM, backing_queue = BQ, @@ -361,13 +311,10 @@ is_duplicate(none, Message = #basic_message { id = MsgId }, %% be called and we need to be able to detect this case {discarded, State} end; -is_duplicate(Txn, _Msg, State = #state { abandoned_txns = AbandonedTxns }) -> - %% There will be nothing in seen_status for any transactions that - %% are still in flight. - case sets:is_element(Txn, AbandonedTxns) of - true -> {published, State}; - false -> {false, State} - end. +is_duplicate(_Txn, _Msg, State) -> + %% In a transaction. We don't support txns in mirror queues. But + %% it's probably not a duplicate... + {false, State}. discard(Msg = #basic_message { id = MsgId }, ChPid, State = #state { gm = GM, @@ -376,13 +323,14 @@ discard(Msg = #basic_message { id = MsgId }, ChPid, seen_status = SS }) -> %% It's a massive error if we get told to discard something that's %% already been published or published-and-confirmed. To do that - %% would require non FIFO access... + %% would require non FIFO access. Hence we should not find + %% 'published' or 'confirmed' in this dict:find. case dict:find(MsgId, SS) of error -> ok = gm:broadcast(GM, {discard, ChPid, Msg}), State #state { backing_queue_state = BQ:discard(Msg, ChPid, BQS), seen_status = dict:erase(MsgId, SS) }; - discarded -> + {ok, discarded} -> State end. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index a61abbd727..8ca82fa1fe 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -55,8 +55,7 @@ sender_queues, %% :: Pid -> MsgQ msg_id_ack, %% :: MsgId -> AckTag - msg_id_status, - open_transactions + msg_id_status }). -define(SYNC_INTERVAL, 25). %% milliseconds @@ -106,8 +105,7 @@ init([#amqqueue { name = QueueName } = Q]) -> sender_queues = dict:new(), msg_id_ack = dict:new(), - msg_id_status = dict:new(), - open_transactions = dict:new() + msg_id_status = dict:new() }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -159,28 +157,9 @@ handle_call({gm_deaths, Deaths}, From, handle_call({run_backing_queue, Mod, Fun}, _From, State) -> reply(ok, run_backing_queue(Mod, Fun, State)); -handle_call({commit, Txn, ChPid}, From, - State = #state { open_transactions = OT }) -> - case dict:find(Txn, OT) of - error -> - %% We've not received anything about this txn so far via - %% gm! - OT1 = dict:store(Txn, {undefined, {committed, From}}, OT), - noreply(State #state { open_transactions = OT1 }); - {ok, {open, undefined}} -> - %% Save who we're from, but we're still waiting for the - %% commit to arrive via GM - OT1 = dict:store(Txn, {open, {committed, From}}, OT), - noreply(State #state { open_transactions = OT1 }); - {ok, {committed, undefined}} -> - %% We've already finished via GM (our BQ has actually - %% replied back to us in the case of commit), so just - %% reply and tidy up. - reply(ok, State #state { open_transactions = dict:erase(Txn, OT) }); - {ok, {abandoned, undefined}} -> - %% GM must have told us to roll back. - reply(ok, State #state { open_transactions = dict:erase(Txn, OT) }) - end. +handle_call({commit, _Txn, _ChPid}, _From, State) -> + %% We don't support transactions in mirror queues + reply(ok, State). handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); @@ -216,23 +195,9 @@ handle_cast(sync_timeout, State) -> noreply(backing_queue_idle_timeout( State #state { sync_timer_ref = undefined })); -handle_cast({rollback, Txn, ChPid}, - State #state { open_transactions = OT }) -> - %% Will never see {'committed', _} or {_, 'abandoned'} or - %% {_, {'committed', From}} here - case dict:find(Txn, OT) of - error -> - %% We've not received anything from GM about this. - OT1 = dict:store(Txn, {undefined, abandoned}, OT), - noreply(State #state { open_transactions = OT1 }); - {ok, {open, undefined}} -> - %% The rollback is yet to arrive via GM. - OT1 = dict:store(Txn, {open, abandoned}, OT), - noreply(State #state { open_transactions = OT1 }); - {ok, {abandoned, undefined}} -> - %% GM has already rolled back. Tidy up. - noreply(State #state { open_transactions = dict:erase(Txn, OT) }) - end. +handle_cast({rollback, _Txn, _ChPid}, State) -> + %% We don't support transactions in mirror queues + noreply(State). handle_info(timeout, State) -> noreply(backing_queue_idle_timeout(State)); @@ -405,8 +370,7 @@ promote_me(From, #state { q = Q, rate_timer_ref = RateTRef, sender_queues = SQ, msg_id_ack = MA, - msg_id_status = MS, - open_transactions = OT }) -> + msg_id_status = MS }) -> rabbit_log:info("Promoting slave ~p for ~s~n", [self(), rabbit_misc:rs(Q #amqqueue.name)]), {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, GM), @@ -414,16 +378,6 @@ promote_me(From, #state { q = Q, gen_server2:reply(From, {promote, CPid}), ok = gm:confirmed_broadcast(GM, heartbeat), - %% Start by rolling back all open transactions - AbandonedTxns = [Txn || {Txn, {open, _TxnStatusByChannel}} - <- dict:to_list(OT)], - BQS1 = lists:foldl( - fun (Txn, BQSN) -> - ok = gm:confirmed_broadcast(GM, {tx_rollback, Txn}), - {_AckTags, BQSN1} = BQ:tx_rollback(Txn, BQSN), - BQSN1 - end, BQS, AbandonedTxns), - %% We find all the messages that we've received from channels but %% not from gm, and if they're due to be enqueued on promotion %% then we pass them to the @@ -495,7 +449,7 @@ promote_me(From, #state { q = Q, Status =:= published orelse Status =:= confirmed]), MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( - CPid, BQ, BQS1, GM, SS, sets:from_list(AbandonedTxns)), + CPid, BQ, BQS, GM, SS), MTC = dict:from_list( [{MsgId, {ChPid, MsgSeqNo}} || @@ -604,9 +558,8 @@ maybe_enqueue_message( %% discarded. We won't see this again. State #state { msg_id_status = dict:erase(MsgId, MS) } end; -maybe_enqueue_message(_Delivery, State) -> - %% In a txn. Txns are completely driven by gm for simplicity, so - %% we're not going to do anything here. +maybe_enqueue_message(_Delivery, _EnqueueOnPromotion, State) -> + %% We don't support txns in mirror queues. State. process_instruction( @@ -755,7 +708,7 @@ process_instruction({ack, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, msg_id_ack = MA }) -> - {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA, remove), + {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), {MsgIds1, BQS1} = BQ:ack(AckTags, BQS), [] = MsgIds1 -- MsgIds, %% ASSERTION {ok, State #state { msg_id_ack = MA1, @@ -764,7 +717,7 @@ process_instruction({requeue, MsgPropsFun, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, msg_id_ack = MA }) -> - {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA, remove), + {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), {ok, case length(AckTags) =:= length(MsgIds) of true -> {MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS), @@ -779,113 +732,21 @@ process_instruction({requeue, MsgPropsFun, MsgIds}, State #state { msg_id_ack = dict:new(), backing_queue_state = BQS2 } end}; -process_instruction({tx_publish, Txn, ChPid, MsgProps, Msg}, - State = #state { backing_queue = BQ, - backing_queue_state = BQS, - open_transactions = OT }) -> - %% Will never see abandoned or committed in the LHS - OT1 = case dict:find(Txn, OT) of - error -> - dict:store(Txn, {open, undefined}, OT); - {ok, {open, _TxnStatusByChannel}} -> - OT - end, - BQS1 = BQ:tx_publish(Txn, Msg, MsgProps, ChPid, BQS), - {ok, State #state { backing_queue_state = BQS1, - open_transactions = OT1 }}; -process_instruction({tx_ack, Txn, MsgIds}, - State = #state { backing_queue = BQ, - backing_queue_state = BQS, - open_transactions = OT, - msg_id_ack = MA }) -> - %% Will never see abandoned or committed in the LHS - OT1 = case dict:find(Txn, OT) of - error -> - dict:store(Txn, {open, undefined}, OT); - {ok, {open, _TxnStatusByChannel}} -> - OT - end, - %% Remember, rollback of a txn with acks simply undoes the ack - - %% the msg itself is not requeued or anything. Thus we make sure - %% msg_ids_to_acktags does not remove the entry from MQ, and we - %% will do the remove when we commit. - {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA, keep), - BQS1 = BQ:tx_ack(Txn, AckTags, BQS), - {ok, State #store { backing_queue_state = BQS1, - open_transactions = OT1, - msg_id_ack = MA1 }}; -process_instruction({tx_commit, Txn, MsgPropsFun, MsgIds}, - State = #state { backing_queue = BQ, - backing_queue_state = BQS, - open_transactions = OT, - msg_id_ack = MA }) -> - %% We must remove the ack tags from MQ at this point - {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA, remove), - %% We won't adjust open_transactions until we get the post_commit - %% callback, unless we've already seen the commit from the channel - case dict:find(Txn, OT) of - {open, {committed, From}} -> - {AckTags1, BQS1} = - BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, - MsgPropsFun, BQS), - OT1 = dict:erase(Txn, OT), - true = lists:usort(AckTags) =:= lists:usort(AckTags1), %% ASSERTION - {ok, State #state { backing_queue_state = BQS, - open_transactions = OT1, - msg_id_ack = MA1 }}; - Status -> - %% We have to cope with the possibility that we'll get - %% promoted before the txn finishes, and rely on slight - %% magic if we do complete here. - Me = self(), - F = fun () -> rabbit_amqqueue:run_backing_queue_async( - Me, rabbit_mirror_queue_master, - fun (rabbit_mirror_queue_master, - State1 = #state { open_transactions = OT2 }) -> - OT3 = case dict:find(Txn, OT2) of - {committing, undefined} -> - dict:store( - Txn, {committed, undefined}, - OT2); - {committing, {committed, From}} -> - gen_server2:reply(From, ok), - dict:erase(Txn, OT2) - end, - State1 #state { open_transactions = OT3 } - end) - end, - {AckTags1, BQS1} = BQ:tx_commit(Txn, F, MsgPropsFun, BQS), - true = lists:usort(AckTags) =:= lists:usort(AckTags1), %% ASSERTION - OT1 = case Status of - error -> - dict:store(Txn, {committing, undefined}, OT); - {open, TxnStatusByChannel} -> - dict:store(Txn, {committing, TxnStatusByChannel}, OT) - end, - {ok, State #state { backing_queue_state = BQS, - open_transactions = OT1, - msg_id_ack = MA1 }}} - end; - process_instruction(delete_and_terminate, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:delete_and_terminate(BQS), {stop, State #state { backing_queue_state = undefined }}. -msg_ids_to_acktags(MsgIds, MA, RemoveOrKeep) -> +msg_ids_to_acktags(MsgIds, MA) -> {AckTags, MA1} = - lists:foldl(fun (MsgId, {AckTagsN, MAN}) -> - case dict:find(MsgId, MA) of - error -> - {AckTagsN, MAN}; - {ok, AckTag} when RemoveOrKeep =:= remove -> - {[AckTag | AckTagsN], - dict:erase(MsgId, MAN)}; - {ok, AckTag} when RemoveOrKeep =:= keep -> - {[AckTag | AckTagsN], MAN} - end - end, {[], MA}, MsgIds), + lists:foldl( + fun (MsgId, {Acc, MAN}) -> + case dict:find(MsgId, MA) of + error -> {Acc, MAN}; + {ok, AckTag} -> {[AckTag | Acc], dict:erase(MsgId, MAN)} + end + end, {[], MA}, MsgIds), {lists:reverse(AckTags), MA1}. ack_all(BQ, MA, BQS) -> |
