summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_mirror_queue_master.erl16
-rw-r--r--src/rabbit_mirror_queue_slave.erl133
2 files changed, 126 insertions, 23 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 8714c44db9..a59d64d4cc 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -239,7 +239,8 @@ tx_ack(Txn, AckTags, State = #state { gm = GM,
fun (AckTag, Acc) -> [dict:fetch(AckTag, AM) | Acc] end,
[], AckTags),
ok = gm:broadcast(GM, {tx_ack, Txn, MsgIds}),
- State
+ BQS1 = BQ:tx_ack(Txn, AckTags, BQS),
+ State #state { backing_queue_state = BQS1 }
end.
tx_rollback(Txn, State = #state { gm = GM,
@@ -248,8 +249,8 @@ tx_rollback(Txn, State = #state { gm = GM,
abandoned_txns = AbandonedTxns }) ->
case sets:is_element(Txn, AbandonedTxns) of
true -> {[], State};
- false -> ok = gm:confirmed_broadcast(GM, {tx_rollback, Txn}),
- {AckTags, BQS1} = BQ:tx_rollback(Txn, BQS),
+ false -> {AckTags, BQS1} = BQ:tx_rollback(Txn, BQS),
+ ok = gm:confirmed_broadcast(GM, {tx_rollback, Txn}),
{AckTags, State #state { backing_queue_state = BQS1 }}
end.
@@ -264,9 +265,14 @@ tx_commit(Txn, PostCommitFun, MsgPropsFun,
%% try to commit on the old master.
{[], State};
false ->
- ok = gm:confirmed_broadcast(GM, {tx_commit, Txn, MsgPropsFun}),
{AckTags, BQS1} = BQ:tx_commit(Txn, PostCommitFun, MsgPropsFun, BQS),
- AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
+ {MsgIds, AM1} = lists:foldl(
+ fun (AckTag, {MsgIdsN, AMN}) ->
+ MsgId = dict:fetch(AckTag, AMN),
+ {[MsgId|MsgIdsN], dict:erase(AckTag, AMN)}
+ end, {[], AM}, AckTags),
+ ok = gm:confirmed_broadcast(
+ GM, {tx_commit, Txn, MsgPropsFun, MsgIds}),
{AckTags, State #state { backing_queue_state = BQS,
ack_msg_id = AM }}
end.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 34ec510947..a61abbd727 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -163,22 +163,20 @@ handle_call({commit, Txn, ChPid}, From,
State = #state { open_transactions = OT }) ->
case dict:find(Txn, OT) of
error ->
- %% curious. We've not received _anything_ about this txn
- %% so far via gm!
+ %% We've not received anything about this txn so far via
+ %% gm!
OT1 = dict:store(Txn, {undefined, {committed, From}}, OT),
noreply(State #state { open_transactions = OT1 });
- {ok, {committed, undefined}} ->
- %% We've already finished via GM (our BQ has actually
- %% replied back to us in the case of commit), so just
- %% reply and tidy up. Note that because no one can every
- %% consume from a slave, there are never going to be any
- %% acks to return.
- reply(ok, State #state { open_transactions = dict:erase(Txn, OT) });
{ok, {open, undefined}} ->
%% Save who we're from, but we're still waiting for the
%% commit to arrive via GM
OT1 = dict:store(Txn, {open, {committed, From}}, OT),
noreply(State #state { open_transactions = OT1 });
+ {ok, {committed, undefined}} ->
+ %% We've already finished via GM (our BQ has actually
+ %% replied back to us in the case of commit), so just
+ %% reply and tidy up.
+ reply(ok, State #state { open_transactions = dict:erase(Txn, OT) });
{ok, {abandoned, undefined}} ->
%% GM must have told us to roll back.
reply(ok, State #state { open_transactions = dict:erase(Txn, OT) })
@@ -224,7 +222,7 @@ handle_cast({rollback, Txn, ChPid},
%% {_, {'committed', From}} here
case dict:find(Txn, OT) of
error ->
- %% odd. We've not received anything from GM about this.
+ %% We've not received anything from GM about this.
OT1 = dict:store(Txn, {undefined, abandoned}, OT),
noreply(State #state { open_transactions = OT1 });
{ok, {open, undefined}} ->
@@ -292,6 +290,7 @@ prioritise_cast(Msg, _State) ->
{run_backing_queue, _Mod, _Fun} -> 6;
sync_timeout -> 6;
{gm, _Msg} -> 5;
+ {post_commit, _Txn, _AckTags} -> 4;
_ -> 0
end.
@@ -340,6 +339,10 @@ bq_init(BQ, Q, Recover) ->
end)
end).
+run_backing_queue(rabbit_mirror_queue_master, Fun, State) ->
+ %% Yes, this might look a little crazy, but see comments around
+ %% process_instruction({tx_commit,...}, State).
+ Fun(rabbit_mirror_queue_master, State);
run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }.
@@ -412,12 +415,14 @@ promote_me(From, #state { q = Q,
ok = gm:confirmed_broadcast(GM, heartbeat),
%% Start by rolling back all open transactions
+ AbandonedTxns = [Txn || {Txn, {open, _TxnStatusByChannel}}
+ <- dict:to_list(OT)],
BQS1 = lists:foldl(
fun (Txn, BQSN) ->
ok = gm:confirmed_broadcast(GM, {tx_rollback, Txn}),
{_AckTags, BQSN1} = BQ:tx_rollback(Txn, BQSN),
BQSN1
- end, BQS, dict:fetch_keys(OT)),
+ end, BQS, AbandonedTxns),
%% We find all the messages that we've received from channels but
%% not from gm, and if they're due to be enqueued on promotion
@@ -490,7 +495,7 @@ promote_me(From, #state { q = Q,
Status =:= published orelse Status =:= confirmed]),
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
- CPid, BQ, BQS1, GM, SS, OT),
+ CPid, BQ, BQS1, GM, SS, sets:from_list(AbandonedTxns)),
MTC = dict:from_list(
[{MsgId, {ChPid, MsgSeqNo}} ||
@@ -750,7 +755,7 @@ process_instruction({ack, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
msg_id_ack = MA }) ->
- {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
+ {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA, remove),
{MsgIds1, BQS1} = BQ:ack(AckTags, BQS),
[] = MsgIds1 -- MsgIds, %% ASSERTION
{ok, State #state { msg_id_ack = MA1,
@@ -759,7 +764,7 @@ process_instruction({requeue, MsgPropsFun, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
msg_id_ack = MA }) ->
- {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
+ {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA, remove),
{ok, case length(AckTags) =:= length(MsgIds) of
true ->
{MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS),
@@ -774,19 +779,111 @@ process_instruction({requeue, MsgPropsFun, MsgIds},
State #state { msg_id_ack = dict:new(),
backing_queue_state = BQS2 }
end};
+process_instruction({tx_publish, Txn, ChPid, MsgProps, Msg},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS,
+ open_transactions = OT }) ->
+ %% Will never see abandoned or committed in the LHS
+ OT1 = case dict:find(Txn, OT) of
+ error ->
+ dict:store(Txn, {open, undefined}, OT);
+ {ok, {open, _TxnStatusByChannel}} ->
+ OT
+ end,
+ BQS1 = BQ:tx_publish(Txn, Msg, MsgProps, ChPid, BQS),
+ {ok, State #state { backing_queue_state = BQS1,
+ open_transactions = OT1 }};
+process_instruction({tx_ack, Txn, MsgIds},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS,
+ open_transactions = OT,
+ msg_id_ack = MA }) ->
+ %% Will never see abandoned or committed in the LHS
+ OT1 = case dict:find(Txn, OT) of
+ error ->
+ dict:store(Txn, {open, undefined}, OT);
+ {ok, {open, _TxnStatusByChannel}} ->
+ OT
+ end,
+ %% Remember, rollback of a txn with acks simply undoes the ack -
+ %% the msg itself is not requeued or anything. Thus we make sure
+ %% msg_ids_to_acktags does not remove the entry from MQ, and we
+ %% will do the remove when we commit.
+ {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA, keep),
+ BQS1 = BQ:tx_ack(Txn, AckTags, BQS),
+ {ok, State #store { backing_queue_state = BQS1,
+ open_transactions = OT1,
+ msg_id_ack = MA1 }};
+process_instruction({tx_commit, Txn, MsgPropsFun, MsgIds},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS,
+ open_transactions = OT,
+ msg_id_ack = MA }) ->
+ %% We must remove the ack tags from MQ at this point
+ {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA, remove),
+ %% We won't adjust open_transactions until we get the post_commit
+ %% callback, unless we've already seen the commit from the channel
+ case dict:find(Txn, OT) of
+ {open, {committed, From}} ->
+ {AckTags1, BQS1} =
+ BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end,
+ MsgPropsFun, BQS),
+ OT1 = dict:erase(Txn, OT),
+ true = lists:usort(AckTags) =:= lists:usort(AckTags1), %% ASSERTION
+ {ok, State #state { backing_queue_state = BQS,
+ open_transactions = OT1,
+ msg_id_ack = MA1 }};
+ Status ->
+ %% We have to cope with the possibility that we'll get
+ %% promoted before the txn finishes, and rely on slight
+ %% magic if we do complete here.
+ Me = self(),
+ F = fun () -> rabbit_amqqueue:run_backing_queue_async(
+ Me, rabbit_mirror_queue_master,
+ fun (rabbit_mirror_queue_master,
+ State1 = #state { open_transactions = OT2 }) ->
+ OT3 = case dict:find(Txn, OT2) of
+ {committing, undefined} ->
+ dict:store(
+ Txn, {committed, undefined},
+ OT2);
+ {committing, {committed, From}} ->
+ gen_server2:reply(From, ok),
+ dict:erase(Txn, OT2)
+ end,
+ State1 #state { open_transactions = OT3 }
+ end)
+ end,
+ {AckTags1, BQS1} = BQ:tx_commit(Txn, F, MsgPropsFun, BQS),
+ true = lists:usort(AckTags) =:= lists:usort(AckTags1), %% ASSERTION
+ OT1 = case Status of
+ error ->
+ dict:store(Txn, {committing, undefined}, OT);
+ {open, TxnStatusByChannel} ->
+ dict:store(Txn, {committing, TxnStatusByChannel}, OT)
+ end,
+ {ok, State #state { backing_queue_state = BQS,
+ open_transactions = OT1,
+ msg_id_ack = MA1 }}}
+ end;
+
process_instruction(delete_and_terminate,
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
BQ:delete_and_terminate(BQS),
{stop, State #state { backing_queue_state = undefined }}.
-msg_ids_to_acktags(MsgIds, MA) ->
+msg_ids_to_acktags(MsgIds, MA, RemoveOrKeep) ->
{AckTags, MA1} =
lists:foldl(fun (MsgId, {AckTagsN, MAN}) ->
case dict:find(MsgId, MA) of
- error -> {AckTagsN, MAN};
- {ok, AckTag} -> {[AckTag | AckTagsN],
- dict:erase(MsgId, MAN)}
+ error ->
+ {AckTagsN, MAN};
+ {ok, AckTag} when RemoveOrKeep =:= remove ->
+ {[AckTag | AckTagsN],
+ dict:erase(MsgId, MAN)};
+ {ok, AckTag} when RemoveOrKeep =:= keep ->
+ {[AckTag | AckTagsN], MAN}
end
end, {[], MA}, MsgIds),
{lists:reverse(AckTags), MA1}.