diff options
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 105 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 20 |
2 files changed, 79 insertions, 46 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index b0a22edd21..a61c32e0b2 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -26,7 +26,7 @@ -export([start/1, stop/0]). --export([promote_backing_queue_state/5]). +-export([promote_backing_queue_state/6]). -behaviour(rabbit_backing_queue). @@ -38,7 +38,9 @@ backing_queue_state, set_delivered, seen_status, - confirmed + confirmed, + ack_msg_id, + abandoned_txns }). %% --------------------------------------------------------------------------- @@ -73,16 +75,19 @@ init(#amqqueue { arguments = Args, name = QName } = Q, Recover, backing_queue_state = BQS, set_delivered = 0, seen_status = dict:new(), - confirmed = [] }. + confirmed = [], + ack_msg_id = dict:new() }. -promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus) -> +promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, AbandonedTxns) -> #state { gm = GM, coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS, set_delivered = BQ:len(BQS), seen_status = SeenStatus, - confirmed = [] }. + confirmed = [], + ack_msg_id = dict:new(), + abandoned_txns = AbandonedTxns }. terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> %% Backing queue termination. The queue is going down but @@ -119,7 +124,8 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, ChPid, State = #state { gm = GM, seen_status = SS, backing_queue = BQ, - backing_queue_state = BQS }) -> + backing_queue_state = BQS, + ack_msg_id = AM }) -> false = dict:is_key(MsgId, SS), %% ASSERTION %% Must use confirmed_broadcast here in order to guarantee that %% all slaves are forced to interpret this publish_delivered at @@ -128,7 +134,9 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg}), {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS), - {AckTag, State #state { backing_queue_state = BQS1 }}. + AM1 = maybe_store_acktag(AckTag, MsgId, AM), + {AckTag, State #state { backing_queue_state = BQS1, + ack_msg_id = AM1 }}. dropwhile(Fun, State = #state { gm = GM, backing_queue = BQ, @@ -175,7 +183,8 @@ drain_confirmed(State = #state { backing_queue = BQ, fetch(AckRequired, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS, - set_delivered = SetDelivered }) -> + set_delivered = SetDelivered, + ack_msg_id = AM }) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), State1 = State #state { backing_queue_state = BQS1 }, case Result of @@ -186,53 +195,60 @@ fetch(AckRequired, State = #state { gm = GM, ok = gm:broadcast(GM, {fetch, AckRequired, MsgId, Remaining}), IsDelivered1 = IsDelivered orelse SetDelivered > 0, SetDelivered1 = lists:max([0, SetDelivered - 1]), + AM1 = maybe_store_acktag(AckTag, MsgId, AM), {{Message, IsDelivered1, AckTag, Remaining}, - State1 #state { set_delivered = SetDelivered1 }} + State1 #state { set_delivered = SetDelivered1, + ack_msg_id = AM1 }} end. ack(AckTags, State = #state { gm = GM, backing_queue = BQ, - backing_queue_state = BQS }) -> + backing_queue_state = BQS, + ack_msg_id = AM }) -> {MsgIds, BQS1} = BQ:ack(AckTags, BQS), + AM1 = lists:foldl(fun dict:erase/2, AM, AckTags), case MsgIds of [] -> ok; _ -> ok = gm:broadcast(GM, {ack, MsgIds}) end, - {MsgIds, State #state { backing_queue_state = BQS1 }}. - -tx_publish(Txn, Msg, MsgProps, ChPid, #state {} = State) -> - %% gm:broadcast(GM, {tx_publish, Txn, MsgId, MsgProps, ChPid}) - State. + {MsgIds, State #state { backing_queue_state = BQS1, + ack_msg_id = AM1 }}. + +tx_publish(Txn, Msg, MsgProps, ChPid, + State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + ok = gm:broadcast(GM, {tx_publish, Txn, ChPid, MsgProps, Msg}), + BQS1 = BQ:tx_publish(Txn, Msg, MsgProps, ChPid, State), + State #state { backing_queue_state = BQS1 }. -tx_ack(Txn, AckTags, #state {} = State) -> - %% gm:broadcast(GM, {tx_ack, Txn, MsgIds}) +tx_ack(Txn, AckTags, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + ack_msg_id = AM }) -> + MsgIds = lists:foldl( + fun (AckTag, Acc) -> [dict:fetch(AckTag, AM) | Acc] end, + [], AckTags), + ok = gm:broadcast(GM, {tx_ack, Txn, MsgIds}) State. -tx_rollback(Txn, #state {} = State) -> - %% gm:broadcast(GM, {tx_rollback, Txn}) - {[], State}. - -tx_commit(Txn, PostCommitFun, MsgPropsFun, #state {} = State) -> - %% Maybe don't want to transmit the MsgPropsFun but what choice do - %% we have? OTOH, on the slaves, things won't be expiring on their - %% own (props are interpreted by amqqueue, not vq), so if the msg - %% props aren't quite the same, that doesn't matter. - %% - %% The PostCommitFun is actually worse - we need to prevent that - %% from being invoked until we have confirmation from all the - %% slaves that they've done everything up to there. - %% - %% In fact, transactions are going to need work seeing as it's at - %% this point that VQ mentions amqqueue, which will thus not work - %% on the slaves - we need to make sure that all the slaves do the - %% tx_commit_post_msg_store at the same point, and then when they - %% all confirm that (scatter/gather), we can finally invoke the - %% PostCommitFun. - %% - %% Another idea is that the slaves are actually driven with - %% pubacks and thus only the master needs to support txns - %% directly. - {[], State}. +tx_rollback(Txn, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + ok = gm:confirmed_broadcast(GM, {tx_rollback, Txn}), + {AckTags, BQS1} = BQ:tx_rollback(Txn, BQS), + {AckTags, State #state { backing_queue_state = BQS1 }}. + +tx_commit(Txn, PostCommitFun, MsgPropsFun, + State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + ack_msg_id = AM }) -> + ok = gm:confirmed_broadcast(GM, {tx_commit, Txn, MsgPropsFun}), + {AckTags, BQS1} = BQ:tx_commit(Txn, PostCommitFun, MsgPropsFun, BQS), + AM1 = lists:foldl(fun dict:erase/2, AM, AckTags), + {AckTags, State #state { backing_queue_state = BQS, + ack_msg_id = AM }}. requeue(AckTags, MsgPropsFun, State = #state { gm = GM, backing_queue = BQ, @@ -323,3 +339,8 @@ discard(Msg = #basic_message {}, ChPid, backing_queue_state = BQS }) -> ok = gm:broadcast(GM, {discard, ChPid, Msg}), State#state{backing_queue_state = BQ:discard(Msg, ChPid, BQS)}. + +maybe_store_acktag(undefined, _MsgId, AM) -> + AM; +maybe_store_acktag(AckTag, MsgId, AM) -> + dict:store(AckTag, MsgId, AM). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 628135b141..21a33341df 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -55,7 +55,8 @@ sender_queues, %% :: Pid -> MsgQ msg_id_ack, %% :: MsgId -> AckTag - msg_id_status + msg_id_status, + open_transactions }). -define(SYNC_INTERVAL, 25). %% milliseconds @@ -105,7 +106,8 @@ init([#amqqueue { name = QueueName } = Q]) -> sender_queues = dict:new(), msg_id_ack = dict:new(), - msg_id_status = dict:new() + msg_id_status = dict:new(), + open_transactions = sets:new() }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -358,7 +360,8 @@ promote_me(From, #state { q = Q, rate_timer_ref = RateTRef, sender_queues = SQ, msg_id_ack = MA, - msg_id_status = MS }) -> + msg_id_status = MS, + open_transactions = OT }) -> rabbit_log:info("Promoting slave ~p for ~s~n", [self(), rabbit_misc:rs(Q #amqqueue.name)]), {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, GM), @@ -366,6 +369,11 @@ promote_me(From, #state { q = Q, gen_server2:reply(From, {promote, CPid}), ok = gm:confirmed_broadcast(GM, heartbeat), + %% Start by rolling back all open transactions + + [ok = gm:confirmed_broadcast(GM, {tx_rollback, Txn}) + || Txn <- sets:to_list(OT)], + %% We find all the messages that we've received from channels but %% not from gm, and if they're due to be enqueued on promotion %% then we pass them to the @@ -380,7 +388,7 @@ promote_me(From, #state { q = Q, %% affect confirmations: if the message was previously pending a %% confirmation then it still will be, under the same msg_id. So %% as a master, we need to be prepared to filter out the - %% publication of said messages from the channel (validate_message + %% publication of said messages from the channel (is_duplicate %% (thus such requeued messages must remain in the msg_id_status %% (MS) which becomes seen_status (SS) in the master)). %% @@ -424,6 +432,10 @@ promote_me(From, #state { q = Q, %% those messages are then requeued. However, as discussed above, %% this does not affect MS, nor which bits go through to SS in %% Master, or MTC in queue_process. + %% + %% Everything that's in MA gets requeued. Consequently the new + %% master should start with a fresh AM as there are no messages + %% pending acks (txns will have been rolled back). MSList = dict:to_list(MS), SS = dict:from_list( |
