summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-04-08 00:11:23 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-04-08 00:11:23 +0100
commit8855feacf6d188872a2abcc1eaa52d7a7a5c280c (patch)
tree03ac0bc25e932ab5ccd136a7f9cf48120ec26654
parent44efb9f3fb70eb5bf779a87314bd3f9752cecd52 (diff)
downloadrabbitmq-server-git-8855feacf6d188872a2abcc1eaa52d7a7a5c280c.tar.gz
Well, getting closer. But it's not done yet, and I may have discovered a rather fatal problem with the whole idea of supporting txns in mirrors anyway in that because of the coalescing going on, there is absolutely no indication of when the BQ finally completes adding the msgs to the queue. Thus the only solution here might be to ban coalescing in this case
-rw-r--r--src/rabbit_mirror_queue_master.erl16
-rw-r--r--src/rabbit_mirror_queue_slave.erl133
2 files changed, 126 insertions, 23 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 8714c44db9..a59d64d4cc 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -239,7 +239,8 @@ tx_ack(Txn, AckTags, State = #state { gm = GM,
fun (AckTag, Acc) -> [dict:fetch(AckTag, AM) | Acc] end,
[], AckTags),
ok = gm:broadcast(GM, {tx_ack, Txn, MsgIds}),
- State
+ BQS1 = BQ:tx_ack(Txn, AckTags, BQS),
+ State #state { backing_queue_state = BQS1 }
end.
tx_rollback(Txn, State = #state { gm = GM,
@@ -248,8 +249,8 @@ tx_rollback(Txn, State = #state { gm = GM,
abandoned_txns = AbandonedTxns }) ->
case sets:is_element(Txn, AbandonedTxns) of
true -> {[], State};
- false -> ok = gm:confirmed_broadcast(GM, {tx_rollback, Txn}),
- {AckTags, BQS1} = BQ:tx_rollback(Txn, BQS),
+ false -> {AckTags, BQS1} = BQ:tx_rollback(Txn, BQS),
+ ok = gm:confirmed_broadcast(GM, {tx_rollback, Txn}),
{AckTags, State #state { backing_queue_state = BQS1 }}
end.
@@ -264,9 +265,14 @@ tx_commit(Txn, PostCommitFun, MsgPropsFun,
%% try to commit on the old master.
{[], State};
false ->
- ok = gm:confirmed_broadcast(GM, {tx_commit, Txn, MsgPropsFun}),
{AckTags, BQS1} = BQ:tx_commit(Txn, PostCommitFun, MsgPropsFun, BQS),
- AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
+ {MsgIds, AM1} = lists:foldl(
+ fun (AckTag, {MsgIdsN, AMN}) ->
+ MsgId = dict:fetch(AckTag, AMN),
+ {[MsgId|MsgIdsN], dict:erase(AckTag, AMN)}
+ end, {[], AM}, AckTags),
+ ok = gm:confirmed_broadcast(
+ GM, {tx_commit, Txn, MsgPropsFun, MsgIds}),
{AckTags, State #state { backing_queue_state = BQS,
ack_msg_id = AM }}
end.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 34ec510947..a61abbd727 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -163,22 +163,20 @@ handle_call({commit, Txn, ChPid}, From,
State = #state { open_transactions = OT }) ->
case dict:find(Txn, OT) of
error ->
- %% curious. We've not received _anything_ about this txn
- %% so far via gm!
+ %% We've not received anything about this txn so far via
+ %% gm!
OT1 = dict:store(Txn, {undefined, {committed, From}}, OT),
noreply(State #state { open_transactions = OT1 });
- {ok, {committed, undefined}} ->
- %% We've already finished via GM (our BQ has actually
- %% replied back to us in the case of commit), so just
- %% reply and tidy up. Note that because no one can every
- %% consume from a slave, there are never going to be any
- %% acks to return.
- reply(ok, State #state { open_transactions = dict:erase(Txn, OT) });
{ok, {open, undefined}} ->
%% Save who we're from, but we're still waiting for the
%% commit to arrive via GM
OT1 = dict:store(Txn, {open, {committed, From}}, OT),
noreply(State #state { open_transactions = OT1 });
+ {ok, {committed, undefined}} ->
+ %% We've already finished via GM (our BQ has actually
+ %% replied back to us in the case of commit), so just
+ %% reply and tidy up.
+ reply(ok, State #state { open_transactions = dict:erase(Txn, OT) });
{ok, {abandoned, undefined}} ->
%% GM must have told us to roll back.
reply(ok, State #state { open_transactions = dict:erase(Txn, OT) })
@@ -224,7 +222,7 @@ handle_cast({rollback, Txn, ChPid},
%% {_, {'committed', From}} here
case dict:find(Txn, OT) of
error ->
- %% odd. We've not received anything from GM about this.
+ %% We've not received anything from GM about this.
OT1 = dict:store(Txn, {undefined, abandoned}, OT),
noreply(State #state { open_transactions = OT1 });
{ok, {open, undefined}} ->
@@ -292,6 +290,7 @@ prioritise_cast(Msg, _State) ->
{run_backing_queue, _Mod, _Fun} -> 6;
sync_timeout -> 6;
{gm, _Msg} -> 5;
+ {post_commit, _Txn, _AckTags} -> 4;
_ -> 0
end.
@@ -340,6 +339,10 @@ bq_init(BQ, Q, Recover) ->
end)
end).
+run_backing_queue(rabbit_mirror_queue_master, Fun, State) ->
+ %% Yes, this might look a little crazy, but see comments around
+ %% process_instruction({tx_commit,...}, State).
+ Fun(rabbit_mirror_queue_master, State);
run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }.
@@ -412,12 +415,14 @@ promote_me(From, #state { q = Q,
ok = gm:confirmed_broadcast(GM, heartbeat),
%% Start by rolling back all open transactions
+ AbandonedTxns = [Txn || {Txn, {open, _TxnStatusByChannel}}
+ <- dict:to_list(OT)],
BQS1 = lists:foldl(
fun (Txn, BQSN) ->
ok = gm:confirmed_broadcast(GM, {tx_rollback, Txn}),
{_AckTags, BQSN1} = BQ:tx_rollback(Txn, BQSN),
BQSN1
- end, BQS, dict:fetch_keys(OT)),
+ end, BQS, AbandonedTxns),
%% We find all the messages that we've received from channels but
%% not from gm, and if they're due to be enqueued on promotion
@@ -490,7 +495,7 @@ promote_me(From, #state { q = Q,
Status =:= published orelse Status =:= confirmed]),
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
- CPid, BQ, BQS1, GM, SS, OT),
+ CPid, BQ, BQS1, GM, SS, sets:from_list(AbandonedTxns)),
MTC = dict:from_list(
[{MsgId, {ChPid, MsgSeqNo}} ||
@@ -750,7 +755,7 @@ process_instruction({ack, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
msg_id_ack = MA }) ->
- {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
+ {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA, remove),
{MsgIds1, BQS1} = BQ:ack(AckTags, BQS),
[] = MsgIds1 -- MsgIds, %% ASSERTION
{ok, State #state { msg_id_ack = MA1,
@@ -759,7 +764,7 @@ process_instruction({requeue, MsgPropsFun, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
msg_id_ack = MA }) ->
- {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
+ {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA, remove),
{ok, case length(AckTags) =:= length(MsgIds) of
true ->
{MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS),
@@ -774,19 +779,111 @@ process_instruction({requeue, MsgPropsFun, MsgIds},
State #state { msg_id_ack = dict:new(),
backing_queue_state = BQS2 }
end};
+process_instruction({tx_publish, Txn, ChPid, MsgProps, Msg},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS,
+ open_transactions = OT }) ->
+ %% Will never see abandoned or committed in the LHS
+ OT1 = case dict:find(Txn, OT) of
+ error ->
+ dict:store(Txn, {open, undefined}, OT);
+ {ok, {open, _TxnStatusByChannel}} ->
+ OT
+ end,
+ BQS1 = BQ:tx_publish(Txn, Msg, MsgProps, ChPid, BQS),
+ {ok, State #state { backing_queue_state = BQS1,
+ open_transactions = OT1 }};
+process_instruction({tx_ack, Txn, MsgIds},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS,
+ open_transactions = OT,
+ msg_id_ack = MA }) ->
+ %% Will never see abandoned or committed in the LHS
+ OT1 = case dict:find(Txn, OT) of
+ error ->
+ dict:store(Txn, {open, undefined}, OT);
+ {ok, {open, _TxnStatusByChannel}} ->
+ OT
+ end,
+ %% Remember, rollback of a txn with acks simply undoes the ack -
+ %% the msg itself is not requeued or anything. Thus we make sure
+ %% msg_ids_to_acktags does not remove the entry from MQ, and we
+ %% will do the remove when we commit.
+ {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA, keep),
+ BQS1 = BQ:tx_ack(Txn, AckTags, BQS),
+ {ok, State #store { backing_queue_state = BQS1,
+ open_transactions = OT1,
+ msg_id_ack = MA1 }};
+process_instruction({tx_commit, Txn, MsgPropsFun, MsgIds},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS,
+ open_transactions = OT,
+ msg_id_ack = MA }) ->
+ %% We must remove the ack tags from MQ at this point
+ {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA, remove),
+ %% We won't adjust open_transactions until we get the post_commit
+ %% callback, unless we've already seen the commit from the channel
+ case dict:find(Txn, OT) of
+ {open, {committed, From}} ->
+ {AckTags1, BQS1} =
+ BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end,
+ MsgPropsFun, BQS),
+ OT1 = dict:erase(Txn, OT),
+ true = lists:usort(AckTags) =:= lists:usort(AckTags1), %% ASSERTION
+ {ok, State #state { backing_queue_state = BQS,
+ open_transactions = OT1,
+ msg_id_ack = MA1 }};
+ Status ->
+ %% We have to cope with the possibility that we'll get
+ %% promoted before the txn finishes, and rely on slight
+ %% magic if we do complete here.
+ Me = self(),
+ F = fun () -> rabbit_amqqueue:run_backing_queue_async(
+ Me, rabbit_mirror_queue_master,
+ fun (rabbit_mirror_queue_master,
+ State1 = #state { open_transactions = OT2 }) ->
+ OT3 = case dict:find(Txn, OT2) of
+ {committing, undefined} ->
+ dict:store(
+ Txn, {committed, undefined},
+ OT2);
+ {committing, {committed, From}} ->
+ gen_server2:reply(From, ok),
+ dict:erase(Txn, OT2)
+ end,
+ State1 #state { open_transactions = OT3 }
+ end)
+ end,
+ {AckTags1, BQS1} = BQ:tx_commit(Txn, F, MsgPropsFun, BQS),
+ true = lists:usort(AckTags) =:= lists:usort(AckTags1), %% ASSERTION
+ OT1 = case Status of
+ error ->
+ dict:store(Txn, {committing, undefined}, OT);
+ {open, TxnStatusByChannel} ->
+ dict:store(Txn, {committing, TxnStatusByChannel}, OT)
+ end,
+ {ok, State #state { backing_queue_state = BQS,
+ open_transactions = OT1,
+ msg_id_ack = MA1 }}}
+ end;
+
process_instruction(delete_and_terminate,
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
BQ:delete_and_terminate(BQS),
{stop, State #state { backing_queue_state = undefined }}.
-msg_ids_to_acktags(MsgIds, MA) ->
+msg_ids_to_acktags(MsgIds, MA, RemoveOrKeep) ->
{AckTags, MA1} =
lists:foldl(fun (MsgId, {AckTagsN, MAN}) ->
case dict:find(MsgId, MA) of
- error -> {AckTagsN, MAN};
- {ok, AckTag} -> {[AckTag | AckTagsN],
- dict:erase(MsgId, MAN)}
+ error ->
+ {AckTagsN, MAN};
+ {ok, AckTag} when RemoveOrKeep =:= remove ->
+ {[AckTag | AckTagsN],
+ dict:erase(MsgId, MAN)};
+ {ok, AckTag} when RemoveOrKeep =:= keep ->
+ {[AckTag | AckTagsN], MAN}
end
end, {[], MA}, MsgIds),
{lists:reverse(AckTags), MA1}.