summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_mirror_queue_master.erl105
-rw-r--r--src/rabbit_mirror_queue_slave.erl20
2 files changed, 79 insertions, 46 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index b0a22edd21..a61c32e0b2 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -26,7 +26,7 @@
-export([start/1, stop/0]).
--export([promote_backing_queue_state/5]).
+-export([promote_backing_queue_state/6]).
-behaviour(rabbit_backing_queue).
@@ -38,7 +38,9 @@
backing_queue_state,
set_delivered,
seen_status,
- confirmed
+ confirmed,
+ ack_msg_id,
+ abandoned_txns
}).
%% ---------------------------------------------------------------------------
@@ -73,16 +75,19 @@ init(#amqqueue { arguments = Args, name = QName } = Q, Recover,
backing_queue_state = BQS,
set_delivered = 0,
seen_status = dict:new(),
- confirmed = [] }.
+ confirmed = [],
+ ack_msg_id = dict:new() }.
-promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus) ->
+promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, AbandonedTxns) ->
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
set_delivered = BQ:len(BQS),
seen_status = SeenStatus,
- confirmed = [] }.
+ confirmed = [],
+ ack_msg_id = dict:new(),
+ abandoned_txns = AbandonedTxns }.
terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
%% Backing queue termination. The queue is going down but
@@ -119,7 +124,8 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
ChPid, State = #state { gm = GM,
seen_status = SS,
backing_queue = BQ,
- backing_queue_state = BQS }) ->
+ backing_queue_state = BQS,
+ ack_msg_id = AM }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
%% Must use confirmed_broadcast here in order to guarantee that
%% all slaves are forced to interpret this publish_delivered at
@@ -128,7 +134,9 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg}),
{AckTag, BQS1} =
BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS),
- {AckTag, State #state { backing_queue_state = BQS1 }}.
+ AM1 = maybe_store_acktag(AckTag, MsgId, AM),
+ {AckTag, State #state { backing_queue_state = BQS1,
+ ack_msg_id = AM1 }}.
dropwhile(Fun, State = #state { gm = GM,
backing_queue = BQ,
@@ -175,7 +183,8 @@ drain_confirmed(State = #state { backing_queue = BQ,
fetch(AckRequired, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = SetDelivered }) ->
+ set_delivered = SetDelivered,
+ ack_msg_id = AM }) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
State1 = State #state { backing_queue_state = BQS1 },
case Result of
@@ -186,53 +195,60 @@ fetch(AckRequired, State = #state { gm = GM,
ok = gm:broadcast(GM, {fetch, AckRequired, MsgId, Remaining}),
IsDelivered1 = IsDelivered orelse SetDelivered > 0,
SetDelivered1 = lists:max([0, SetDelivered - 1]),
+ AM1 = maybe_store_acktag(AckTag, MsgId, AM),
{{Message, IsDelivered1, AckTag, Remaining},
- State1 #state { set_delivered = SetDelivered1 }}
+ State1 #state { set_delivered = SetDelivered1,
+ ack_msg_id = AM1 }}
end.
ack(AckTags, State = #state { gm = GM,
backing_queue = BQ,
- backing_queue_state = BQS }) ->
+ backing_queue_state = BQS,
+ ack_msg_id = AM }) ->
{MsgIds, BQS1} = BQ:ack(AckTags, BQS),
+ AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
case MsgIds of
[] -> ok;
_ -> ok = gm:broadcast(GM, {ack, MsgIds})
end,
- {MsgIds, State #state { backing_queue_state = BQS1 }}.
-
-tx_publish(Txn, Msg, MsgProps, ChPid, #state {} = State) ->
- %% gm:broadcast(GM, {tx_publish, Txn, MsgId, MsgProps, ChPid})
- State.
+ {MsgIds, State #state { backing_queue_state = BQS1,
+ ack_msg_id = AM1 }}.
+
+tx_publish(Txn, Msg, MsgProps, ChPid,
+ State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:broadcast(GM, {tx_publish, Txn, ChPid, MsgProps, Msg}),
+ BQS1 = BQ:tx_publish(Txn, Msg, MsgProps, ChPid, State),
+ State #state { backing_queue_state = BQS1 }.
-tx_ack(Txn, AckTags, #state {} = State) ->
- %% gm:broadcast(GM, {tx_ack, Txn, MsgIds})
+tx_ack(Txn, AckTags, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ ack_msg_id = AM }) ->
+ MsgIds = lists:foldl(
+ fun (AckTag, Acc) -> [dict:fetch(AckTag, AM) | Acc] end,
+ [], AckTags),
+ ok = gm:broadcast(GM, {tx_ack, Txn, MsgIds})
State.
-tx_rollback(Txn, #state {} = State) ->
- %% gm:broadcast(GM, {tx_rollback, Txn})
- {[], State}.
-
-tx_commit(Txn, PostCommitFun, MsgPropsFun, #state {} = State) ->
- %% Maybe don't want to transmit the MsgPropsFun but what choice do
- %% we have? OTOH, on the slaves, things won't be expiring on their
- %% own (props are interpreted by amqqueue, not vq), so if the msg
- %% props aren't quite the same, that doesn't matter.
- %%
- %% The PostCommitFun is actually worse - we need to prevent that
- %% from being invoked until we have confirmation from all the
- %% slaves that they've done everything up to there.
- %%
- %% In fact, transactions are going to need work seeing as it's at
- %% this point that VQ mentions amqqueue, which will thus not work
- %% on the slaves - we need to make sure that all the slaves do the
- %% tx_commit_post_msg_store at the same point, and then when they
- %% all confirm that (scatter/gather), we can finally invoke the
- %% PostCommitFun.
- %%
- %% Another idea is that the slaves are actually driven with
- %% pubacks and thus only the master needs to support txns
- %% directly.
- {[], State}.
+tx_rollback(Txn, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:confirmed_broadcast(GM, {tx_rollback, Txn}),
+ {AckTags, BQS1} = BQ:tx_rollback(Txn, BQS),
+ {AckTags, State #state { backing_queue_state = BQS1 }}.
+
+tx_commit(Txn, PostCommitFun, MsgPropsFun,
+ State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ ack_msg_id = AM }) ->
+ ok = gm:confirmed_broadcast(GM, {tx_commit, Txn, MsgPropsFun}),
+ {AckTags, BQS1} = BQ:tx_commit(Txn, PostCommitFun, MsgPropsFun, BQS),
+ AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
+ {AckTags, State #state { backing_queue_state = BQS,
+ ack_msg_id = AM }}.
requeue(AckTags, MsgPropsFun, State = #state { gm = GM,
backing_queue = BQ,
@@ -323,3 +339,8 @@ discard(Msg = #basic_message {}, ChPid,
backing_queue_state = BQS }) ->
ok = gm:broadcast(GM, {discard, ChPid, Msg}),
State#state{backing_queue_state = BQ:discard(Msg, ChPid, BQS)}.
+
+maybe_store_acktag(undefined, _MsgId, AM) ->
+ AM;
+maybe_store_acktag(AckTag, MsgId, AM) ->
+ dict:store(AckTag, MsgId, AM).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 628135b141..21a33341df 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -55,7 +55,8 @@
sender_queues, %% :: Pid -> MsgQ
msg_id_ack, %% :: MsgId -> AckTag
- msg_id_status
+ msg_id_status,
+ open_transactions
}).
-define(SYNC_INTERVAL, 25). %% milliseconds
@@ -105,7 +106,8 @@ init([#amqqueue { name = QueueName } = Q]) ->
sender_queues = dict:new(),
msg_id_ack = dict:new(),
- msg_id_status = dict:new()
+ msg_id_status = dict:new(),
+ open_transactions = sets:new()
}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -358,7 +360,8 @@ promote_me(From, #state { q = Q,
rate_timer_ref = RateTRef,
sender_queues = SQ,
msg_id_ack = MA,
- msg_id_status = MS }) ->
+ msg_id_status = MS,
+ open_transactions = OT }) ->
rabbit_log:info("Promoting slave ~p for ~s~n",
[self(), rabbit_misc:rs(Q #amqqueue.name)]),
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, GM),
@@ -366,6 +369,11 @@ promote_me(From, #state { q = Q,
gen_server2:reply(From, {promote, CPid}),
ok = gm:confirmed_broadcast(GM, heartbeat),
+ %% Start by rolling back all open transactions
+
+ [ok = gm:confirmed_broadcast(GM, {tx_rollback, Txn})
+ || Txn <- sets:to_list(OT)],
+
%% We find all the messages that we've received from channels but
%% not from gm, and if they're due to be enqueued on promotion
%% then we pass them to the
@@ -380,7 +388,7 @@ promote_me(From, #state { q = Q,
%% affect confirmations: if the message was previously pending a
%% confirmation then it still will be, under the same msg_id. So
%% as a master, we need to be prepared to filter out the
- %% publication of said messages from the channel (validate_message
+ %% publication of said messages from the channel (is_duplicate
%% (thus such requeued messages must remain in the msg_id_status
%% (MS) which becomes seen_status (SS) in the master)).
%%
@@ -424,6 +432,10 @@ promote_me(From, #state { q = Q,
%% those messages are then requeued. However, as discussed above,
%% this does not affect MS, nor which bits go through to SS in
%% Master, or MTC in queue_process.
+ %%
+ %% Everything that's in MA gets requeued. Consequently the new
+ %% master should start with a fresh AM as there are no messages
+ %% pending acks (txns will have been rolled back).
MSList = dict:to_list(MS),
SS = dict:from_list(