summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-04-07 18:19:16 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-04-07 18:19:16 +0100
commit44efb9f3fb70eb5bf779a87314bd3f9752cecd52 (patch)
tree5f849e09cc8f888709d39c193e2d1be3e67a60ad
parent4b4e4bfb306d79cb46d0e465a7d2b87e79ad6832 (diff)
downloadrabbitmq-server-git-44efb9f3fb70eb5bf779a87314bd3f9752cecd52.tar.gz
Well txns are still only half in. But I want to go home.
-rw-r--r--include/rabbit_backing_queue_spec.hrl3
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_backing_queue.erl7
-rw-r--r--src/rabbit_mirror_queue_master.erl92
-rw-r--r--src/rabbit_mirror_queue_slave.erl69
-rw-r--r--src/rabbit_variable_queue.erl4
6 files changed, 135 insertions, 44 deletions
diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl
index b0c5f13b03..d9296bf631 100644
--- a/include/rabbit_backing_queue_spec.hrl
+++ b/include/rabbit_backing_queue_spec.hrl
@@ -71,6 +71,7 @@
-spec(handle_pre_hibernate/1 :: (state()) -> state()).
-spec(status/1 :: (state()) -> [{atom(), any()}]).
-spec(invoke/3 :: (atom(), fun ((atom(), A) -> A), state()) -> state()).
--spec(is_duplicate/2 :: (rabbit_types:basic_message(), state()) ->
+-spec(is_duplicate/3 ::
+ (rabbit_types:txn(), rabbit_types:basic_message(), state()) ->
{'false'|'published'|'discarded', state()}).
-spec(discard/3 :: (rabbit_types:basic_message(), pid(), state()) -> state()).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 79f6472db7..d9be4909c5 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -523,7 +523,7 @@ attempt_delivery(Delivery = #delivery{txn = none,
immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]);
_ -> ok
end,
- case BQ:is_duplicate(Message, BQS) of
+ case BQ:is_duplicate(none, Message, BQS) of
{false, BQS1} ->
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
@@ -561,7 +561,7 @@ attempt_delivery(Delivery = #delivery{txn = Txn,
message = Message},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
Confirm = should_confirm_message(Delivery, State),
- case BQ:is_duplicate(Message, BQS) of
+ case BQ:is_duplicate(Txn, Message, BQS) of
{false, BQS1} ->
store_ch_record((ch_record(ChPid))#cr{txn = Txn}),
BQS2 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, ChPid,
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 0bbbd559d3..0955a0804b 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -175,12 +175,13 @@ behaviour_info(callbacks) ->
%% the BQ to signal that it's already seen this message (and in
%% what capacity - i.e. was it published previously or discarded
%% previously) and thus the message should be dropped.
- {is_duplicate, 2},
+ {is_duplicate, 3},
%% Called to inform the BQ about messages which have reached the
%% queue, but are not going to be further passed to BQ for some
- %% reason. Note that this is not invoked for messages for which
- %% BQ:is_duplicate/2 has already returned {true, BQS}.
+ %% reason. Note that this is may be invoked for messages for
+ %% which BQ:is_duplicate/2 has already returned {'published' |
+ %% 'discarded', BQS}.
{discard, 3}
];
behaviour_info(_Other) ->
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index a61c32e0b2..8714c44db9 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -22,7 +22,7 @@
requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2, discard/3]).
+ status/1, invoke/3, is_duplicate/3, discard/3]).
-export([start/1, stop/0]).
@@ -217,38 +217,59 @@ ack(AckTags, State = #state { gm = GM,
tx_publish(Txn, Msg, MsgProps, ChPid,
State = #state { gm = GM,
backing_queue = BQ,
- backing_queue_state = BQS }) ->
- ok = gm:broadcast(GM, {tx_publish, Txn, ChPid, MsgProps, Msg}),
- BQS1 = BQ:tx_publish(Txn, Msg, MsgProps, ChPid, State),
- State #state { backing_queue_state = BQS1 }.
+ backing_queue_state = BQS,
+ abandoned_txns = AbandonedTxns }) ->
+ case sets:is_element(Txn, AbandonedTxns) of
+ true -> State;
+ false -> ok = gm:broadcast(GM, {tx_publish, Txn, ChPid, MsgProps, Msg}),
+ BQS1 = BQ:tx_publish(Txn, Msg, MsgProps, ChPid, State),
+ State #state { backing_queue_state = BQS1 }
+ end.
tx_ack(Txn, AckTags, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
- ack_msg_id = AM }) ->
- MsgIds = lists:foldl(
- fun (AckTag, Acc) -> [dict:fetch(AckTag, AM) | Acc] end,
- [], AckTags),
- ok = gm:broadcast(GM, {tx_ack, Txn, MsgIds})
- State.
+ ack_msg_id = AM,
+ abandoned_txns = AbandonedTxns }) ->
+ case sets:is_element(Txn, AbandonedTxns) of
+ true ->
+ State;
+ false ->
+ MsgIds = lists:foldl(
+ fun (AckTag, Acc) -> [dict:fetch(AckTag, AM) | Acc] end,
+ [], AckTags),
+ ok = gm:broadcast(GM, {tx_ack, Txn, MsgIds}),
+ State
+ end.
tx_rollback(Txn, State = #state { gm = GM,
backing_queue = BQ,
- backing_queue_state = BQS }) ->
- ok = gm:confirmed_broadcast(GM, {tx_rollback, Txn}),
- {AckTags, BQS1} = BQ:tx_rollback(Txn, BQS),
- {AckTags, State #state { backing_queue_state = BQS1 }}.
+ backing_queue_state = BQS,
+ abandoned_txns = AbandonedTxns }) ->
+ case sets:is_element(Txn, AbandonedTxns) of
+ true -> {[], State};
+ false -> ok = gm:confirmed_broadcast(GM, {tx_rollback, Txn}),
+ {AckTags, BQS1} = BQ:tx_rollback(Txn, BQS),
+ {AckTags, State #state { backing_queue_state = BQS1 }}
+ end.
tx_commit(Txn, PostCommitFun, MsgPropsFun,
State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
ack_msg_id = AM }) ->
- ok = gm:confirmed_broadcast(GM, {tx_commit, Txn, MsgPropsFun}),
- {AckTags, BQS1} = BQ:tx_commit(Txn, PostCommitFun, MsgPropsFun, BQS),
- AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
- {AckTags, State #state { backing_queue_state = BQS,
- ack_msg_id = AM }}.
+ case sets:is_element(Txn, AbandonedTxns) of
+ true ->
+ %% Don't worry - the channel will explode as it'll still
+ %% try to commit on the old master.
+ {[], State};
+ false ->
+ ok = gm:confirmed_broadcast(GM, {tx_commit, Txn, MsgPropsFun}),
+ {AckTags, BQS1} = BQ:tx_commit(Txn, PostCommitFun, MsgPropsFun, BQS),
+ AM1 = lists:foldl(fun dict:erase/2, AM, AckTags),
+ {AckTags, State #state { backing_queue_state = BQS,
+ ack_msg_id = AM }}
+ end.
requeue(AckTags, MsgPropsFun, State = #state { gm = GM,
backing_queue = BQ,
@@ -291,7 +312,7 @@ invoke(Mod, Fun, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }.
-is_duplicate(Message = #basic_message { id = MsgId },
+is_duplicate(none, Message = #basic_message { id = MsgId },
State = #state { seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS,
@@ -330,15 +351,34 @@ is_duplicate(Message = #basic_message { id = MsgId },
{published, State #state { seen_status = dict:erase(MsgId, SS),
confirmed = [MsgId | Confirmed] }};
{ok, discarded} ->
- {discarded, State #state { seen_status = dict:erase(MsgId, SS) }}
+ %% Don't erase from SS here because discard/2 is about to
+ %% be called and we need to be able to detect this case
+ {discarded, State}
+ end;
+is_duplicate(Txn, _Msg, State = #state { abandoned_txns = AbandonedTxns }) ->
+ %% There will be nothing in seen_status for any transactions that
+ %% are still in flight.
+ case sets:is_element(Txn, AbandonedTxns) of
+ true -> {published, State};
+ false -> {false, State}
end.
-discard(Msg = #basic_message {}, ChPid,
+discard(Msg = #basic_message { id = MsgId }, ChPid,
State = #state { gm = GM,
backing_queue = BQ,
- backing_queue_state = BQS }) ->
- ok = gm:broadcast(GM, {discard, ChPid, Msg}),
- State#state{backing_queue_state = BQ:discard(Msg, ChPid, BQS)}.
+ backing_queue_state = BQS,
+ seen_status = SS }) ->
+ %% It's a massive error if we get told to discard something that's
+ %% already been published or published-and-confirmed. To do that
+ %% would require non FIFO access...
+ case dict:find(MsgId, SS) of
+ error ->
+ ok = gm:broadcast(GM, {discard, ChPid, Msg}),
+ State #state { backing_queue_state = BQ:discard(Msg, ChPid, BQS),
+ seen_status = dict:erase(MsgId, SS) };
+ discarded ->
+ State
+ end.
maybe_store_acktag(undefined, _MsgId, AM) ->
AM;
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 21a33341df..34ec510947 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -107,7 +107,7 @@ init([#amqqueue { name = QueueName } = Q]) ->
sender_queues = dict:new(),
msg_id_ack = dict:new(),
msg_id_status = dict:new(),
- open_transactions = sets:new()
+ open_transactions = dict:new()
}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -157,8 +157,32 @@ handle_call({gm_deaths, Deaths}, From,
end;
handle_call({run_backing_queue, Mod, Fun}, _From, State) ->
- reply(ok, run_backing_queue(Mod, Fun, State)).
+ reply(ok, run_backing_queue(Mod, Fun, State));
+handle_call({commit, Txn, ChPid}, From,
+ State = #state { open_transactions = OT }) ->
+ case dict:find(Txn, OT) of
+ error ->
+ %% curious. We've not received _anything_ about this txn
+ %% so far via gm!
+ OT1 = dict:store(Txn, {undefined, {committed, From}}, OT),
+ noreply(State #state { open_transactions = OT1 });
+ {ok, {committed, undefined}} ->
+ %% We've already finished via GM (our BQ has actually
+ %% replied back to us in the case of commit), so just
+ %% reply and tidy up. Note that because no one can every
+ %% consume from a slave, there are never going to be any
+ %% acks to return.
+ reply(ok, State #state { open_transactions = dict:erase(Txn, OT) });
+ {ok, {open, undefined}} ->
+ %% Save who we're from, but we're still waiting for the
+ %% commit to arrive via GM
+ OT1 = dict:store(Txn, {open, {committed, From}}, OT),
+ noreply(State #state { open_transactions = OT1 });
+ {ok, {abandoned, undefined}} ->
+ %% GM must have told us to roll back.
+ reply(ok, State #state { open_transactions = dict:erase(Txn, OT) })
+ end.
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
@@ -192,7 +216,25 @@ handle_cast(update_ram_duration,
handle_cast(sync_timeout, State) ->
noreply(backing_queue_idle_timeout(
- State #state { sync_timer_ref = undefined })).
+ State #state { sync_timer_ref = undefined }));
+
+handle_cast({rollback, Txn, ChPid},
+ State #state { open_transactions = OT }) ->
+ %% Will never see {'committed', _} or {_, 'abandoned'} or
+ %% {_, {'committed', From}} here
+ case dict:find(Txn, OT) of
+ error ->
+ %% odd. We've not received anything from GM about this.
+ OT1 = dict:store(Txn, {undefined, abandoned}, OT),
+ noreply(State #state { open_transactions = OT1 });
+ {ok, {open, undefined}} ->
+ %% The rollback is yet to arrive via GM.
+ OT1 = dict:store(Txn, {open, abandoned}, OT),
+ noreply(State #state { open_transactions = OT1 });
+ {ok, {abandoned, undefined}} ->
+ %% GM has already rolled back. Tidy up.
+ noreply(State #state { open_transactions = dict:erase(Txn, OT) })
+ end.
handle_info(timeout, State) ->
noreply(backing_queue_idle_timeout(State));
@@ -370,9 +412,12 @@ promote_me(From, #state { q = Q,
ok = gm:confirmed_broadcast(GM, heartbeat),
%% Start by rolling back all open transactions
-
- [ok = gm:confirmed_broadcast(GM, {tx_rollback, Txn})
- || Txn <- sets:to_list(OT)],
+ BQS1 = lists:foldl(
+ fun (Txn, BQSN) ->
+ ok = gm:confirmed_broadcast(GM, {tx_rollback, Txn}),
+ {_AckTags, BQSN1} = BQ:tx_rollback(Txn, BQSN),
+ BQSN1
+ end, BQS, dict:fetch_keys(OT)),
%% We find all the messages that we've received from channels but
%% not from gm, and if they're due to be enqueued on promotion
@@ -445,8 +490,7 @@ promote_me(From, #state { q = Q,
Status =:= published orelse Status =:= confirmed]),
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
- CPid, BQ, BQS, GM, SS),
-
+ CPid, BQ, BQS1, GM, SS, OT),
MTC = dict:from_list(
[{MsgId, {ChPid, MsgSeqNo}} ||
@@ -516,7 +560,8 @@ stop_rate_timer(State = #state { rate_timer_ref = TRef }) ->
maybe_enqueue_message(
Delivery = #delivery { message = #basic_message { id = MsgId },
msg_seq_no = MsgSeqNo,
- sender = ChPid },
+ sender = ChPid,
+ txn = none },
EnqueueOnPromotion,
State = #state { sender_queues = SQ,
msg_id_status = MS }) ->
@@ -553,7 +598,11 @@ maybe_enqueue_message(
%% We've already heard from GM that the msg is to be
%% discarded. We won't see this again.
State #state { msg_id_status = dict:erase(MsgId, MS) }
- end.
+ end;
+maybe_enqueue_message(_Delivery, State) ->
+ %% In a txn. Txns are completely driven by gm for simplicity, so
+ %% we're not going to do anything here.
+ State.
process_instruction(
{publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId }},
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 84987c8849..7a3c17a29c 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -22,7 +22,7 @@
requeue/3, len/1, is_empty/1, dropwhile/2,
set_ram_duration_target/2, ram_duration/1,
needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2, discard/3,
+ status/1, invoke/3, is_duplicate/3, discard/3,
multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -887,7 +887,7 @@ status(#vqstate {
invoke(?MODULE, Fun, State) ->
Fun(?MODULE, State).
-is_duplicate(_Msg, State) -> {false, State}.
+is_duplicate(_Txn, _Msg, State) -> {false, State}.
discard(_Msg, _ChPid, State) -> State.