diff options
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 133 |
2 files changed, 126 insertions, 23 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 8714c44db9..a59d64d4cc 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -239,7 +239,8 @@ tx_ack(Txn, AckTags, State = #state { gm = GM, fun (AckTag, Acc) -> [dict:fetch(AckTag, AM) | Acc] end, [], AckTags), ok = gm:broadcast(GM, {tx_ack, Txn, MsgIds}), - State + BQS1 = BQ:tx_ack(Txn, AckTags, BQS), + State #state { backing_queue_state = BQS1 } end. tx_rollback(Txn, State = #state { gm = GM, @@ -248,8 +249,8 @@ tx_rollback(Txn, State = #state { gm = GM, abandoned_txns = AbandonedTxns }) -> case sets:is_element(Txn, AbandonedTxns) of true -> {[], State}; - false -> ok = gm:confirmed_broadcast(GM, {tx_rollback, Txn}), - {AckTags, BQS1} = BQ:tx_rollback(Txn, BQS), + false -> {AckTags, BQS1} = BQ:tx_rollback(Txn, BQS), + ok = gm:confirmed_broadcast(GM, {tx_rollback, Txn}), {AckTags, State #state { backing_queue_state = BQS1 }} end. @@ -264,9 +265,14 @@ tx_commit(Txn, PostCommitFun, MsgPropsFun, %% try to commit on the old master. {[], State}; false -> - ok = gm:confirmed_broadcast(GM, {tx_commit, Txn, MsgPropsFun}), {AckTags, BQS1} = BQ:tx_commit(Txn, PostCommitFun, MsgPropsFun, BQS), - AM1 = lists:foldl(fun dict:erase/2, AM, AckTags), + {MsgIds, AM1} = lists:foldl( + fun (AckTag, {MsgIdsN, AMN}) -> + MsgId = dict:fetch(AckTag, AMN), + {[MsgId|MsgIdsN], dict:erase(AckTag, AMN)} + end, {[], AM}, AckTags), + ok = gm:confirmed_broadcast( + GM, {tx_commit, Txn, MsgPropsFun, MsgIds}), {AckTags, State #state { backing_queue_state = BQS, ack_msg_id = AM }} end. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 34ec510947..a61abbd727 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -163,22 +163,20 @@ handle_call({commit, Txn, ChPid}, From, State = #state { open_transactions = OT }) -> case dict:find(Txn, OT) of error -> - %% curious. We've not received _anything_ about this txn - %% so far via gm! + %% We've not received anything about this txn so far via + %% gm! OT1 = dict:store(Txn, {undefined, {committed, From}}, OT), noreply(State #state { open_transactions = OT1 }); - {ok, {committed, undefined}} -> - %% We've already finished via GM (our BQ has actually - %% replied back to us in the case of commit), so just - %% reply and tidy up. Note that because no one can every - %% consume from a slave, there are never going to be any - %% acks to return. - reply(ok, State #state { open_transactions = dict:erase(Txn, OT) }); {ok, {open, undefined}} -> %% Save who we're from, but we're still waiting for the %% commit to arrive via GM OT1 = dict:store(Txn, {open, {committed, From}}, OT), noreply(State #state { open_transactions = OT1 }); + {ok, {committed, undefined}} -> + %% We've already finished via GM (our BQ has actually + %% replied back to us in the case of commit), so just + %% reply and tidy up. + reply(ok, State #state { open_transactions = dict:erase(Txn, OT) }); {ok, {abandoned, undefined}} -> %% GM must have told us to roll back. reply(ok, State #state { open_transactions = dict:erase(Txn, OT) }) @@ -224,7 +222,7 @@ handle_cast({rollback, Txn, ChPid}, %% {_, {'committed', From}} here case dict:find(Txn, OT) of error -> - %% odd. We've not received anything from GM about this. + %% We've not received anything from GM about this. OT1 = dict:store(Txn, {undefined, abandoned}, OT), noreply(State #state { open_transactions = OT1 }); {ok, {open, undefined}} -> @@ -292,6 +290,7 @@ prioritise_cast(Msg, _State) -> {run_backing_queue, _Mod, _Fun} -> 6; sync_timeout -> 6; {gm, _Msg} -> 5; + {post_commit, _Txn, _AckTags} -> 4; _ -> 0 end. @@ -340,6 +339,10 @@ bq_init(BQ, Q, Recover) -> end) end). +run_backing_queue(rabbit_mirror_queue_master, Fun, State) -> + %% Yes, this might look a little crazy, but see comments around + %% process_instruction({tx_commit,...}, State). + Fun(rabbit_mirror_queue_master, State); run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }. @@ -412,12 +415,14 @@ promote_me(From, #state { q = Q, ok = gm:confirmed_broadcast(GM, heartbeat), %% Start by rolling back all open transactions + AbandonedTxns = [Txn || {Txn, {open, _TxnStatusByChannel}} + <- dict:to_list(OT)], BQS1 = lists:foldl( fun (Txn, BQSN) -> ok = gm:confirmed_broadcast(GM, {tx_rollback, Txn}), {_AckTags, BQSN1} = BQ:tx_rollback(Txn, BQSN), BQSN1 - end, BQS, dict:fetch_keys(OT)), + end, BQS, AbandonedTxns), %% We find all the messages that we've received from channels but %% not from gm, and if they're due to be enqueued on promotion @@ -490,7 +495,7 @@ promote_me(From, #state { q = Q, Status =:= published orelse Status =:= confirmed]), MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( - CPid, BQ, BQS1, GM, SS, OT), + CPid, BQ, BQS1, GM, SS, sets:from_list(AbandonedTxns)), MTC = dict:from_list( [{MsgId, {ChPid, MsgSeqNo}} || @@ -750,7 +755,7 @@ process_instruction({ack, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, msg_id_ack = MA }) -> - {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), + {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA, remove), {MsgIds1, BQS1} = BQ:ack(AckTags, BQS), [] = MsgIds1 -- MsgIds, %% ASSERTION {ok, State #state { msg_id_ack = MA1, @@ -759,7 +764,7 @@ process_instruction({requeue, MsgPropsFun, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, msg_id_ack = MA }) -> - {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), + {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA, remove), {ok, case length(AckTags) =:= length(MsgIds) of true -> {MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS), @@ -774,19 +779,111 @@ process_instruction({requeue, MsgPropsFun, MsgIds}, State #state { msg_id_ack = dict:new(), backing_queue_state = BQS2 } end}; +process_instruction({tx_publish, Txn, ChPid, MsgProps, Msg}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS, + open_transactions = OT }) -> + %% Will never see abandoned or committed in the LHS + OT1 = case dict:find(Txn, OT) of + error -> + dict:store(Txn, {open, undefined}, OT); + {ok, {open, _TxnStatusByChannel}} -> + OT + end, + BQS1 = BQ:tx_publish(Txn, Msg, MsgProps, ChPid, BQS), + {ok, State #state { backing_queue_state = BQS1, + open_transactions = OT1 }}; +process_instruction({tx_ack, Txn, MsgIds}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS, + open_transactions = OT, + msg_id_ack = MA }) -> + %% Will never see abandoned or committed in the LHS + OT1 = case dict:find(Txn, OT) of + error -> + dict:store(Txn, {open, undefined}, OT); + {ok, {open, _TxnStatusByChannel}} -> + OT + end, + %% Remember, rollback of a txn with acks simply undoes the ack - + %% the msg itself is not requeued or anything. Thus we make sure + %% msg_ids_to_acktags does not remove the entry from MQ, and we + %% will do the remove when we commit. + {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA, keep), + BQS1 = BQ:tx_ack(Txn, AckTags, BQS), + {ok, State #store { backing_queue_state = BQS1, + open_transactions = OT1, + msg_id_ack = MA1 }}; +process_instruction({tx_commit, Txn, MsgPropsFun, MsgIds}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS, + open_transactions = OT, + msg_id_ack = MA }) -> + %% We must remove the ack tags from MQ at this point + {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA, remove), + %% We won't adjust open_transactions until we get the post_commit + %% callback, unless we've already seen the commit from the channel + case dict:find(Txn, OT) of + {open, {committed, From}} -> + {AckTags1, BQS1} = + BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, + MsgPropsFun, BQS), + OT1 = dict:erase(Txn, OT), + true = lists:usort(AckTags) =:= lists:usort(AckTags1), %% ASSERTION + {ok, State #state { backing_queue_state = BQS, + open_transactions = OT1, + msg_id_ack = MA1 }}; + Status -> + %% We have to cope with the possibility that we'll get + %% promoted before the txn finishes, and rely on slight + %% magic if we do complete here. + Me = self(), + F = fun () -> rabbit_amqqueue:run_backing_queue_async( + Me, rabbit_mirror_queue_master, + fun (rabbit_mirror_queue_master, + State1 = #state { open_transactions = OT2 }) -> + OT3 = case dict:find(Txn, OT2) of + {committing, undefined} -> + dict:store( + Txn, {committed, undefined}, + OT2); + {committing, {committed, From}} -> + gen_server2:reply(From, ok), + dict:erase(Txn, OT2) + end, + State1 #state { open_transactions = OT3 } + end) + end, + {AckTags1, BQS1} = BQ:tx_commit(Txn, F, MsgPropsFun, BQS), + true = lists:usort(AckTags) =:= lists:usort(AckTags1), %% ASSERTION + OT1 = case Status of + error -> + dict:store(Txn, {committing, undefined}, OT); + {open, TxnStatusByChannel} -> + dict:store(Txn, {committing, TxnStatusByChannel}, OT) + end, + {ok, State #state { backing_queue_state = BQS, + open_transactions = OT1, + msg_id_ack = MA1 }}} + end; + process_instruction(delete_and_terminate, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:delete_and_terminate(BQS), {stop, State #state { backing_queue_state = undefined }}. -msg_ids_to_acktags(MsgIds, MA) -> +msg_ids_to_acktags(MsgIds, MA, RemoveOrKeep) -> {AckTags, MA1} = lists:foldl(fun (MsgId, {AckTagsN, MAN}) -> case dict:find(MsgId, MA) of - error -> {AckTagsN, MAN}; - {ok, AckTag} -> {[AckTag | AckTagsN], - dict:erase(MsgId, MAN)} + error -> + {AckTagsN, MAN}; + {ok, AckTag} when RemoveOrKeep =:= remove -> + {[AckTag | AckTagsN], + dict:erase(MsgId, MAN)}; + {ok, AckTag} when RemoveOrKeep =:= keep -> + {[AckTag | AckTagsN], MAN} end end, {[], MA}, MsgIds), {lists:reverse(AckTags), MA1}. |
