diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-04-08 00:11:23 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-04-08 00:11:23 +0100 |
| commit | 8855feacf6d188872a2abcc1eaa52d7a7a5c280c (patch) | |
| tree | 03ac0bc25e932ab5ccd136a7f9cf48120ec26654 | |
| parent | 44efb9f3fb70eb5bf779a87314bd3f9752cecd52 (diff) | |
| download | rabbitmq-server-git-8855feacf6d188872a2abcc1eaa52d7a7a5c280c.tar.gz | |
Well, getting closer. But it's not done yet, and I may have discovered a rather fatal problem with the whole idea of supporting txns in mirrors anyway in that because of the coalescing going on, there is absolutely no indication of when the BQ finally completes adding the msgs to the queue. Thus the only solution here might be to ban coalescing in this case
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 133 |
2 files changed, 126 insertions, 23 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 8714c44db9..a59d64d4cc 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -239,7 +239,8 @@ tx_ack(Txn, AckTags, State = #state { gm = GM, fun (AckTag, Acc) -> [dict:fetch(AckTag, AM) | Acc] end, [], AckTags), ok = gm:broadcast(GM, {tx_ack, Txn, MsgIds}), - State + BQS1 = BQ:tx_ack(Txn, AckTags, BQS), + State #state { backing_queue_state = BQS1 } end. tx_rollback(Txn, State = #state { gm = GM, @@ -248,8 +249,8 @@ tx_rollback(Txn, State = #state { gm = GM, abandoned_txns = AbandonedTxns }) -> case sets:is_element(Txn, AbandonedTxns) of true -> {[], State}; - false -> ok = gm:confirmed_broadcast(GM, {tx_rollback, Txn}), - {AckTags, BQS1} = BQ:tx_rollback(Txn, BQS), + false -> {AckTags, BQS1} = BQ:tx_rollback(Txn, BQS), + ok = gm:confirmed_broadcast(GM, {tx_rollback, Txn}), {AckTags, State #state { backing_queue_state = BQS1 }} end. @@ -264,9 +265,14 @@ tx_commit(Txn, PostCommitFun, MsgPropsFun, %% try to commit on the old master. {[], State}; false -> - ok = gm:confirmed_broadcast(GM, {tx_commit, Txn, MsgPropsFun}), {AckTags, BQS1} = BQ:tx_commit(Txn, PostCommitFun, MsgPropsFun, BQS), - AM1 = lists:foldl(fun dict:erase/2, AM, AckTags), + {MsgIds, AM1} = lists:foldl( + fun (AckTag, {MsgIdsN, AMN}) -> + MsgId = dict:fetch(AckTag, AMN), + {[MsgId|MsgIdsN], dict:erase(AckTag, AMN)} + end, {[], AM}, AckTags), + ok = gm:confirmed_broadcast( + GM, {tx_commit, Txn, MsgPropsFun, MsgIds}), {AckTags, State #state { backing_queue_state = BQS, ack_msg_id = AM }} end. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 34ec510947..a61abbd727 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -163,22 +163,20 @@ handle_call({commit, Txn, ChPid}, From, State = #state { open_transactions = OT }) -> case dict:find(Txn, OT) of error -> - %% curious. We've not received _anything_ about this txn - %% so far via gm! + %% We've not received anything about this txn so far via + %% gm! OT1 = dict:store(Txn, {undefined, {committed, From}}, OT), noreply(State #state { open_transactions = OT1 }); - {ok, {committed, undefined}} -> - %% We've already finished via GM (our BQ has actually - %% replied back to us in the case of commit), so just - %% reply and tidy up. Note that because no one can every - %% consume from a slave, there are never going to be any - %% acks to return. - reply(ok, State #state { open_transactions = dict:erase(Txn, OT) }); {ok, {open, undefined}} -> %% Save who we're from, but we're still waiting for the %% commit to arrive via GM OT1 = dict:store(Txn, {open, {committed, From}}, OT), noreply(State #state { open_transactions = OT1 }); + {ok, {committed, undefined}} -> + %% We've already finished via GM (our BQ has actually + %% replied back to us in the case of commit), so just + %% reply and tidy up. + reply(ok, State #state { open_transactions = dict:erase(Txn, OT) }); {ok, {abandoned, undefined}} -> %% GM must have told us to roll back. reply(ok, State #state { open_transactions = dict:erase(Txn, OT) }) @@ -224,7 +222,7 @@ handle_cast({rollback, Txn, ChPid}, %% {_, {'committed', From}} here case dict:find(Txn, OT) of error -> - %% odd. We've not received anything from GM about this. + %% We've not received anything from GM about this. OT1 = dict:store(Txn, {undefined, abandoned}, OT), noreply(State #state { open_transactions = OT1 }); {ok, {open, undefined}} -> @@ -292,6 +290,7 @@ prioritise_cast(Msg, _State) -> {run_backing_queue, _Mod, _Fun} -> 6; sync_timeout -> 6; {gm, _Msg} -> 5; + {post_commit, _Txn, _AckTags} -> 4; _ -> 0 end. @@ -340,6 +339,10 @@ bq_init(BQ, Q, Recover) -> end) end). +run_backing_queue(rabbit_mirror_queue_master, Fun, State) -> + %% Yes, this might look a little crazy, but see comments around + %% process_instruction({tx_commit,...}, State). + Fun(rabbit_mirror_queue_master, State); run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }. @@ -412,12 +415,14 @@ promote_me(From, #state { q = Q, ok = gm:confirmed_broadcast(GM, heartbeat), %% Start by rolling back all open transactions + AbandonedTxns = [Txn || {Txn, {open, _TxnStatusByChannel}} + <- dict:to_list(OT)], BQS1 = lists:foldl( fun (Txn, BQSN) -> ok = gm:confirmed_broadcast(GM, {tx_rollback, Txn}), {_AckTags, BQSN1} = BQ:tx_rollback(Txn, BQSN), BQSN1 - end, BQS, dict:fetch_keys(OT)), + end, BQS, AbandonedTxns), %% We find all the messages that we've received from channels but %% not from gm, and if they're due to be enqueued on promotion @@ -490,7 +495,7 @@ promote_me(From, #state { q = Q, Status =:= published orelse Status =:= confirmed]), MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( - CPid, BQ, BQS1, GM, SS, OT), + CPid, BQ, BQS1, GM, SS, sets:from_list(AbandonedTxns)), MTC = dict:from_list( [{MsgId, {ChPid, MsgSeqNo}} || @@ -750,7 +755,7 @@ process_instruction({ack, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, msg_id_ack = MA }) -> - {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), + {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA, remove), {MsgIds1, BQS1} = BQ:ack(AckTags, BQS), [] = MsgIds1 -- MsgIds, %% ASSERTION {ok, State #state { msg_id_ack = MA1, @@ -759,7 +764,7 @@ process_instruction({requeue, MsgPropsFun, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, msg_id_ack = MA }) -> - {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), + {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA, remove), {ok, case length(AckTags) =:= length(MsgIds) of true -> {MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS), @@ -774,19 +779,111 @@ process_instruction({requeue, MsgPropsFun, MsgIds}, State #state { msg_id_ack = dict:new(), backing_queue_state = BQS2 } end}; +process_instruction({tx_publish, Txn, ChPid, MsgProps, Msg}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS, + open_transactions = OT }) -> + %% Will never see abandoned or committed in the LHS + OT1 = case dict:find(Txn, OT) of + error -> + dict:store(Txn, {open, undefined}, OT); + {ok, {open, _TxnStatusByChannel}} -> + OT + end, + BQS1 = BQ:tx_publish(Txn, Msg, MsgProps, ChPid, BQS), + {ok, State #state { backing_queue_state = BQS1, + open_transactions = OT1 }}; +process_instruction({tx_ack, Txn, MsgIds}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS, + open_transactions = OT, + msg_id_ack = MA }) -> + %% Will never see abandoned or committed in the LHS + OT1 = case dict:find(Txn, OT) of + error -> + dict:store(Txn, {open, undefined}, OT); + {ok, {open, _TxnStatusByChannel}} -> + OT + end, + %% Remember, rollback of a txn with acks simply undoes the ack - + %% the msg itself is not requeued or anything. Thus we make sure + %% msg_ids_to_acktags does not remove the entry from MQ, and we + %% will do the remove when we commit. + {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA, keep), + BQS1 = BQ:tx_ack(Txn, AckTags, BQS), + {ok, State #store { backing_queue_state = BQS1, + open_transactions = OT1, + msg_id_ack = MA1 }}; +process_instruction({tx_commit, Txn, MsgPropsFun, MsgIds}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS, + open_transactions = OT, + msg_id_ack = MA }) -> + %% We must remove the ack tags from MQ at this point + {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA, remove), + %% We won't adjust open_transactions until we get the post_commit + %% callback, unless we've already seen the commit from the channel + case dict:find(Txn, OT) of + {open, {committed, From}} -> + {AckTags1, BQS1} = + BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, + MsgPropsFun, BQS), + OT1 = dict:erase(Txn, OT), + true = lists:usort(AckTags) =:= lists:usort(AckTags1), %% ASSERTION + {ok, State #state { backing_queue_state = BQS, + open_transactions = OT1, + msg_id_ack = MA1 }}; + Status -> + %% We have to cope with the possibility that we'll get + %% promoted before the txn finishes, and rely on slight + %% magic if we do complete here. + Me = self(), + F = fun () -> rabbit_amqqueue:run_backing_queue_async( + Me, rabbit_mirror_queue_master, + fun (rabbit_mirror_queue_master, + State1 = #state { open_transactions = OT2 }) -> + OT3 = case dict:find(Txn, OT2) of + {committing, undefined} -> + dict:store( + Txn, {committed, undefined}, + OT2); + {committing, {committed, From}} -> + gen_server2:reply(From, ok), + dict:erase(Txn, OT2) + end, + State1 #state { open_transactions = OT3 } + end) + end, + {AckTags1, BQS1} = BQ:tx_commit(Txn, F, MsgPropsFun, BQS), + true = lists:usort(AckTags) =:= lists:usort(AckTags1), %% ASSERTION + OT1 = case Status of + error -> + dict:store(Txn, {committing, undefined}, OT); + {open, TxnStatusByChannel} -> + dict:store(Txn, {committing, TxnStatusByChannel}, OT) + end, + {ok, State #state { backing_queue_state = BQS, + open_transactions = OT1, + msg_id_ack = MA1 }}} + end; + process_instruction(delete_and_terminate, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:delete_and_terminate(BQS), {stop, State #state { backing_queue_state = undefined }}. -msg_ids_to_acktags(MsgIds, MA) -> +msg_ids_to_acktags(MsgIds, MA, RemoveOrKeep) -> {AckTags, MA1} = lists:foldl(fun (MsgId, {AckTagsN, MAN}) -> case dict:find(MsgId, MA) of - error -> {AckTagsN, MAN}; - {ok, AckTag} -> {[AckTag | AckTagsN], - dict:erase(MsgId, MAN)} + error -> + {AckTagsN, MAN}; + {ok, AckTag} when RemoveOrKeep =:= remove -> + {[AckTag | AckTagsN], + dict:erase(MsgId, MAN)}; + {ok, AckTag} when RemoveOrKeep =:= keep -> + {[AckTag | AckTagsN], MAN} end end, {[], MA}, MsgIds), {lists:reverse(AckTags), MA1}. |
