diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-04-07 18:19:16 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-04-07 18:19:16 +0100 |
| commit | 44efb9f3fb70eb5bf779a87314bd3f9752cecd52 (patch) | |
| tree | 5f849e09cc8f888709d39c193e2d1be3e67a60ad | |
| parent | 4b4e4bfb306d79cb46d0e465a7d2b87e79ad6832 (diff) | |
| download | rabbitmq-server-git-44efb9f3fb70eb5bf779a87314bd3f9752cecd52.tar.gz | |
Well txns are still only half in. But I want to go home.
| -rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 3 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 92 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 69 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 4 |
6 files changed, 135 insertions, 44 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index b0c5f13b03..d9296bf631 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -71,6 +71,7 @@ -spec(handle_pre_hibernate/1 :: (state()) -> state()). -spec(status/1 :: (state()) -> [{atom(), any()}]). -spec(invoke/3 :: (atom(), fun ((atom(), A) -> A), state()) -> state()). --spec(is_duplicate/2 :: (rabbit_types:basic_message(), state()) -> +-spec(is_duplicate/3 :: + (rabbit_types:txn(), rabbit_types:basic_message(), state()) -> {'false'|'published'|'discarded', state()}). -spec(discard/3 :: (rabbit_types:basic_message(), pid(), state()) -> state()). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 79f6472db7..d9be4909c5 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -523,7 +523,7 @@ attempt_delivery(Delivery = #delivery{txn = none, immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]); _ -> ok end, - case BQ:is_duplicate(Message, BQS) of + case BQ:is_duplicate(none, Message, BQS) of {false, BQS1} -> PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = @@ -561,7 +561,7 @@ attempt_delivery(Delivery = #delivery{txn = Txn, message = Message}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> Confirm = should_confirm_message(Delivery, State), - case BQ:is_duplicate(Message, BQS) of + case BQ:is_duplicate(Txn, Message, BQS) of {false, BQS1} -> store_ch_record((ch_record(ChPid))#cr{txn = Txn}), BQS2 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, ChPid, diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 0bbbd559d3..0955a0804b 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -175,12 +175,13 @@ behaviour_info(callbacks) -> %% the BQ to signal that it's already seen this message (and in %% what capacity - i.e. was it published previously or discarded %% previously) and thus the message should be dropped. - {is_duplicate, 2}, + {is_duplicate, 3}, %% Called to inform the BQ about messages which have reached the %% queue, but are not going to be further passed to BQ for some - %% reason. Note that this is not invoked for messages for which - %% BQ:is_duplicate/2 has already returned {true, BQS}. + %% reason. Note that this is may be invoked for messages for + %% which BQ:is_duplicate/2 has already returned {'published' | + %% 'discarded', BQS}. {discard, 3} ]; behaviour_info(_Other) -> diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index a61c32e0b2..8714c44db9 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -22,7 +22,7 @@ requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/2, discard/3]). + status/1, invoke/3, is_duplicate/3, discard/3]). -export([start/1, stop/0]). @@ -217,38 +217,59 @@ ack(AckTags, State = #state { gm = GM, tx_publish(Txn, Msg, MsgProps, ChPid, State = #state { gm = GM, backing_queue = BQ, - backing_queue_state = BQS }) -> - ok = gm:broadcast(GM, {tx_publish, Txn, ChPid, MsgProps, Msg}), - BQS1 = BQ:tx_publish(Txn, Msg, MsgProps, ChPid, State), - State #state { backing_queue_state = BQS1 }. + backing_queue_state = BQS, + abandoned_txns = AbandonedTxns }) -> + case sets:is_element(Txn, AbandonedTxns) of + true -> State; + false -> ok = gm:broadcast(GM, {tx_publish, Txn, ChPid, MsgProps, Msg}), + BQS1 = BQ:tx_publish(Txn, Msg, MsgProps, ChPid, State), + State #state { backing_queue_state = BQS1 } + end. tx_ack(Txn, AckTags, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS, - ack_msg_id = AM }) -> - MsgIds = lists:foldl( - fun (AckTag, Acc) -> [dict:fetch(AckTag, AM) | Acc] end, - [], AckTags), - ok = gm:broadcast(GM, {tx_ack, Txn, MsgIds}) - State. + ack_msg_id = AM, + abandoned_txns = AbandonedTxns }) -> + case sets:is_element(Txn, AbandonedTxns) of + true -> + State; + false -> + MsgIds = lists:foldl( + fun (AckTag, Acc) -> [dict:fetch(AckTag, AM) | Acc] end, + [], AckTags), + ok = gm:broadcast(GM, {tx_ack, Txn, MsgIds}), + State + end. tx_rollback(Txn, State = #state { gm = GM, backing_queue = BQ, - backing_queue_state = BQS }) -> - ok = gm:confirmed_broadcast(GM, {tx_rollback, Txn}), - {AckTags, BQS1} = BQ:tx_rollback(Txn, BQS), - {AckTags, State #state { backing_queue_state = BQS1 }}. + backing_queue_state = BQS, + abandoned_txns = AbandonedTxns }) -> + case sets:is_element(Txn, AbandonedTxns) of + true -> {[], State}; + false -> ok = gm:confirmed_broadcast(GM, {tx_rollback, Txn}), + {AckTags, BQS1} = BQ:tx_rollback(Txn, BQS), + {AckTags, State #state { backing_queue_state = BQS1 }} + end. tx_commit(Txn, PostCommitFun, MsgPropsFun, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS, ack_msg_id = AM }) -> - ok = gm:confirmed_broadcast(GM, {tx_commit, Txn, MsgPropsFun}), - {AckTags, BQS1} = BQ:tx_commit(Txn, PostCommitFun, MsgPropsFun, BQS), - AM1 = lists:foldl(fun dict:erase/2, AM, AckTags), - {AckTags, State #state { backing_queue_state = BQS, - ack_msg_id = AM }}. + case sets:is_element(Txn, AbandonedTxns) of + true -> + %% Don't worry - the channel will explode as it'll still + %% try to commit on the old master. + {[], State}; + false -> + ok = gm:confirmed_broadcast(GM, {tx_commit, Txn, MsgPropsFun}), + {AckTags, BQS1} = BQ:tx_commit(Txn, PostCommitFun, MsgPropsFun, BQS), + AM1 = lists:foldl(fun dict:erase/2, AM, AckTags), + {AckTags, State #state { backing_queue_state = BQS, + ack_msg_id = AM }} + end. requeue(AckTags, MsgPropsFun, State = #state { gm = GM, backing_queue = BQ, @@ -291,7 +312,7 @@ invoke(Mod, Fun, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }. -is_duplicate(Message = #basic_message { id = MsgId }, +is_duplicate(none, Message = #basic_message { id = MsgId }, State = #state { seen_status = SS, backing_queue = BQ, backing_queue_state = BQS, @@ -330,15 +351,34 @@ is_duplicate(Message = #basic_message { id = MsgId }, {published, State #state { seen_status = dict:erase(MsgId, SS), confirmed = [MsgId | Confirmed] }}; {ok, discarded} -> - {discarded, State #state { seen_status = dict:erase(MsgId, SS) }} + %% Don't erase from SS here because discard/2 is about to + %% be called and we need to be able to detect this case + {discarded, State} + end; +is_duplicate(Txn, _Msg, State = #state { abandoned_txns = AbandonedTxns }) -> + %% There will be nothing in seen_status for any transactions that + %% are still in flight. + case sets:is_element(Txn, AbandonedTxns) of + true -> {published, State}; + false -> {false, State} end. -discard(Msg = #basic_message {}, ChPid, +discard(Msg = #basic_message { id = MsgId }, ChPid, State = #state { gm = GM, backing_queue = BQ, - backing_queue_state = BQS }) -> - ok = gm:broadcast(GM, {discard, ChPid, Msg}), - State#state{backing_queue_state = BQ:discard(Msg, ChPid, BQS)}. + backing_queue_state = BQS, + seen_status = SS }) -> + %% It's a massive error if we get told to discard something that's + %% already been published or published-and-confirmed. To do that + %% would require non FIFO access... + case dict:find(MsgId, SS) of + error -> + ok = gm:broadcast(GM, {discard, ChPid, Msg}), + State #state { backing_queue_state = BQ:discard(Msg, ChPid, BQS), + seen_status = dict:erase(MsgId, SS) }; + discarded -> + State + end. maybe_store_acktag(undefined, _MsgId, AM) -> AM; diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 21a33341df..34ec510947 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -107,7 +107,7 @@ init([#amqqueue { name = QueueName } = Q]) -> sender_queues = dict:new(), msg_id_ack = dict:new(), msg_id_status = dict:new(), - open_transactions = sets:new() + open_transactions = dict:new() }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -157,8 +157,32 @@ handle_call({gm_deaths, Deaths}, From, end; handle_call({run_backing_queue, Mod, Fun}, _From, State) -> - reply(ok, run_backing_queue(Mod, Fun, State)). + reply(ok, run_backing_queue(Mod, Fun, State)); +handle_call({commit, Txn, ChPid}, From, + State = #state { open_transactions = OT }) -> + case dict:find(Txn, OT) of + error -> + %% curious. We've not received _anything_ about this txn + %% so far via gm! + OT1 = dict:store(Txn, {undefined, {committed, From}}, OT), + noreply(State #state { open_transactions = OT1 }); + {ok, {committed, undefined}} -> + %% We've already finished via GM (our BQ has actually + %% replied back to us in the case of commit), so just + %% reply and tidy up. Note that because no one can every + %% consume from a slave, there are never going to be any + %% acks to return. + reply(ok, State #state { open_transactions = dict:erase(Txn, OT) }); + {ok, {open, undefined}} -> + %% Save who we're from, but we're still waiting for the + %% commit to arrive via GM + OT1 = dict:store(Txn, {open, {committed, From}}, OT), + noreply(State #state { open_transactions = OT1 }); + {ok, {abandoned, undefined}} -> + %% GM must have told us to roll back. + reply(ok, State #state { open_transactions = dict:erase(Txn, OT) }) + end. handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); @@ -192,7 +216,25 @@ handle_cast(update_ram_duration, handle_cast(sync_timeout, State) -> noreply(backing_queue_idle_timeout( - State #state { sync_timer_ref = undefined })). + State #state { sync_timer_ref = undefined })); + +handle_cast({rollback, Txn, ChPid}, + State #state { open_transactions = OT }) -> + %% Will never see {'committed', _} or {_, 'abandoned'} or + %% {_, {'committed', From}} here + case dict:find(Txn, OT) of + error -> + %% odd. We've not received anything from GM about this. + OT1 = dict:store(Txn, {undefined, abandoned}, OT), + noreply(State #state { open_transactions = OT1 }); + {ok, {open, undefined}} -> + %% The rollback is yet to arrive via GM. + OT1 = dict:store(Txn, {open, abandoned}, OT), + noreply(State #state { open_transactions = OT1 }); + {ok, {abandoned, undefined}} -> + %% GM has already rolled back. Tidy up. + noreply(State #state { open_transactions = dict:erase(Txn, OT) }) + end. handle_info(timeout, State) -> noreply(backing_queue_idle_timeout(State)); @@ -370,9 +412,12 @@ promote_me(From, #state { q = Q, ok = gm:confirmed_broadcast(GM, heartbeat), %% Start by rolling back all open transactions - - [ok = gm:confirmed_broadcast(GM, {tx_rollback, Txn}) - || Txn <- sets:to_list(OT)], + BQS1 = lists:foldl( + fun (Txn, BQSN) -> + ok = gm:confirmed_broadcast(GM, {tx_rollback, Txn}), + {_AckTags, BQSN1} = BQ:tx_rollback(Txn, BQSN), + BQSN1 + end, BQS, dict:fetch_keys(OT)), %% We find all the messages that we've received from channels but %% not from gm, and if they're due to be enqueued on promotion @@ -445,8 +490,7 @@ promote_me(From, #state { q = Q, Status =:= published orelse Status =:= confirmed]), MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( - CPid, BQ, BQS, GM, SS), - + CPid, BQ, BQS1, GM, SS, OT), MTC = dict:from_list( [{MsgId, {ChPid, MsgSeqNo}} || @@ -516,7 +560,8 @@ stop_rate_timer(State = #state { rate_timer_ref = TRef }) -> maybe_enqueue_message( Delivery = #delivery { message = #basic_message { id = MsgId }, msg_seq_no = MsgSeqNo, - sender = ChPid }, + sender = ChPid, + txn = none }, EnqueueOnPromotion, State = #state { sender_queues = SQ, msg_id_status = MS }) -> @@ -553,7 +598,11 @@ maybe_enqueue_message( %% We've already heard from GM that the msg is to be %% discarded. We won't see this again. State #state { msg_id_status = dict:erase(MsgId, MS) } - end. + end; +maybe_enqueue_message(_Delivery, State) -> + %% In a txn. Txns are completely driven by gm for simplicity, so + %% we're not going to do anything here. + State. process_instruction( {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId }}, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 84987c8849..7a3c17a29c 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -22,7 +22,7 @@ requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/2, discard/3, + status/1, invoke/3, is_duplicate/3, discard/3, multiple_routing_keys/0]). -export([start/1, stop/0]). @@ -887,7 +887,7 @@ status(#vqstate { invoke(?MODULE, Fun, State) -> Fun(?MODULE, State). -is_duplicate(_Msg, State) -> {false, State}. +is_duplicate(_Txn, _Msg, State) -> {false, State}. discard(_Msg, _ChPid, State) -> State. |
