summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_mirror_queue_master.erl96
-rw-r--r--src/rabbit_mirror_queue_slave.erl183
2 files changed, 44 insertions, 235 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index a59d64d4cc..387dfbc481 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -26,7 +26,7 @@
-export([start/1, stop/0]).
--export([promote_backing_queue_state/6]).
+-export([promote_backing_queue_state/5]).
-behaviour(rabbit_backing_queue).
@@ -39,8 +39,7 @@
set_delivered,
seen_status,
confirmed,
- ack_msg_id,
- abandoned_txns
+ ack_msg_id
}).
%% ---------------------------------------------------------------------------
@@ -78,7 +77,7 @@ init(#amqqueue { arguments = Args, name = QName } = Q, Recover,
confirmed = [],
ack_msg_id = dict:new() }.
-promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, AbandonedTxns) ->
+promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus) ->
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -86,8 +85,7 @@ promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, AbandonedTxns) ->
set_delivered = BQ:len(BQS),
seen_status = SeenStatus,
confirmed = [],
- ack_msg_id = dict:new(),
- abandoned_txns = AbandonedTxns }.
+ ack_msg_id = dict:new() }.
terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
%% Backing queue termination. The queue is going down but
@@ -214,68 +212,20 @@ ack(AckTags, State = #state { gm = GM,
{MsgIds, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 }}.
-tx_publish(Txn, Msg, MsgProps, ChPid,
- State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- abandoned_txns = AbandonedTxns }) ->
- case sets:is_element(Txn, AbandonedTxns) of
- true -> State;
- false -> ok = gm:broadcast(GM, {tx_publish, Txn, ChPid, MsgProps, Msg}),
- BQS1 = BQ:tx_publish(Txn, Msg, MsgProps, ChPid, State),
- State #state { backing_queue_state = BQS1 }
- end.
+tx_publish(_Txn, _Msg, _MsgProps, _ChPid, State) ->
+ %% We don't support txns in mirror queues
+ State.
-tx_ack(Txn, AckTags, State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- ack_msg_id = AM,
- abandoned_txns = AbandonedTxns }) ->
- case sets:is_element(Txn, AbandonedTxns) of
- true ->
- State;
- false ->
- MsgIds = lists:foldl(
- fun (AckTag, Acc) -> [dict:fetch(AckTag, AM) | Acc] end,
- [], AckTags),
- ok = gm:broadcast(GM, {tx_ack, Txn, MsgIds}),
- BQS1 = BQ:tx_ack(Txn, AckTags, BQS),
- State #state { backing_queue_state = BQS1 }
- end.
+tx_ack(_Txn, _AckTags, State) ->
+ %% We don't support txns in mirror queues
+ State.
-tx_rollback(Txn, State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- abandoned_txns = AbandonedTxns }) ->
- case sets:is_element(Txn, AbandonedTxns) of
- true -> {[], State};
- false -> {AckTags, BQS1} = BQ:tx_rollback(Txn, BQS),
- ok = gm:confirmed_broadcast(GM, {tx_rollback, Txn}),
- {AckTags, State #state { backing_queue_state = BQS1 }}
- end.
+tx_rollback(_Txn, State) ->
+ {[], State}.
-tx_commit(Txn, PostCommitFun, MsgPropsFun,
- State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- ack_msg_id = AM }) ->
- case sets:is_element(Txn, AbandonedTxns) of
- true ->
- %% Don't worry - the channel will explode as it'll still
- %% try to commit on the old master.
- {[], State};
- false ->
- {AckTags, BQS1} = BQ:tx_commit(Txn, PostCommitFun, MsgPropsFun, BQS),
- {MsgIds, AM1} = lists:foldl(
- fun (AckTag, {MsgIdsN, AMN}) ->
- MsgId = dict:fetch(AckTag, AMN),
- {[MsgId|MsgIdsN], dict:erase(AckTag, AMN)}
- end, {[], AM}, AckTags),
- ok = gm:confirmed_broadcast(
- GM, {tx_commit, Txn, MsgPropsFun, MsgIds}),
- {AckTags, State #state { backing_queue_state = BQS,
- ack_msg_id = AM }}
- end.
+tx_commit(_Txn, PostCommitFun, _MsgPropsFun, State) ->
+ PostCommitFun(), %% Probably must run it to avoid deadlocks
+ {[], State}.
requeue(AckTags, MsgPropsFun, State = #state { gm = GM,
backing_queue = BQ,
@@ -361,13 +311,10 @@ is_duplicate(none, Message = #basic_message { id = MsgId },
%% be called and we need to be able to detect this case
{discarded, State}
end;
-is_duplicate(Txn, _Msg, State = #state { abandoned_txns = AbandonedTxns }) ->
- %% There will be nothing in seen_status for any transactions that
- %% are still in flight.
- case sets:is_element(Txn, AbandonedTxns) of
- true -> {published, State};
- false -> {false, State}
- end.
+is_duplicate(_Txn, _Msg, State) ->
+ %% In a transaction. We don't support txns in mirror queues. But
+ %% it's probably not a duplicate...
+ {false, State}.
discard(Msg = #basic_message { id = MsgId }, ChPid,
State = #state { gm = GM,
@@ -376,13 +323,14 @@ discard(Msg = #basic_message { id = MsgId }, ChPid,
seen_status = SS }) ->
%% It's a massive error if we get told to discard something that's
%% already been published or published-and-confirmed. To do that
- %% would require non FIFO access...
+ %% would require non FIFO access. Hence we should not find
+ %% 'published' or 'confirmed' in this dict:find.
case dict:find(MsgId, SS) of
error ->
ok = gm:broadcast(GM, {discard, ChPid, Msg}),
State #state { backing_queue_state = BQ:discard(Msg, ChPid, BQS),
seen_status = dict:erase(MsgId, SS) };
- discarded ->
+ {ok, discarded} ->
State
end.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index a61abbd727..8ca82fa1fe 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -55,8 +55,7 @@
sender_queues, %% :: Pid -> MsgQ
msg_id_ack, %% :: MsgId -> AckTag
- msg_id_status,
- open_transactions
+ msg_id_status
}).
-define(SYNC_INTERVAL, 25). %% milliseconds
@@ -106,8 +105,7 @@ init([#amqqueue { name = QueueName } = Q]) ->
sender_queues = dict:new(),
msg_id_ack = dict:new(),
- msg_id_status = dict:new(),
- open_transactions = dict:new()
+ msg_id_status = dict:new()
}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -159,28 +157,9 @@ handle_call({gm_deaths, Deaths}, From,
handle_call({run_backing_queue, Mod, Fun}, _From, State) ->
reply(ok, run_backing_queue(Mod, Fun, State));
-handle_call({commit, Txn, ChPid}, From,
- State = #state { open_transactions = OT }) ->
- case dict:find(Txn, OT) of
- error ->
- %% We've not received anything about this txn so far via
- %% gm!
- OT1 = dict:store(Txn, {undefined, {committed, From}}, OT),
- noreply(State #state { open_transactions = OT1 });
- {ok, {open, undefined}} ->
- %% Save who we're from, but we're still waiting for the
- %% commit to arrive via GM
- OT1 = dict:store(Txn, {open, {committed, From}}, OT),
- noreply(State #state { open_transactions = OT1 });
- {ok, {committed, undefined}} ->
- %% We've already finished via GM (our BQ has actually
- %% replied back to us in the case of commit), so just
- %% reply and tidy up.
- reply(ok, State #state { open_transactions = dict:erase(Txn, OT) });
- {ok, {abandoned, undefined}} ->
- %% GM must have told us to roll back.
- reply(ok, State #state { open_transactions = dict:erase(Txn, OT) })
- end.
+handle_call({commit, _Txn, _ChPid}, _From, State) ->
+ %% We don't support transactions in mirror queues
+ reply(ok, State).
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
@@ -216,23 +195,9 @@ handle_cast(sync_timeout, State) ->
noreply(backing_queue_idle_timeout(
State #state { sync_timer_ref = undefined }));
-handle_cast({rollback, Txn, ChPid},
- State #state { open_transactions = OT }) ->
- %% Will never see {'committed', _} or {_, 'abandoned'} or
- %% {_, {'committed', From}} here
- case dict:find(Txn, OT) of
- error ->
- %% We've not received anything from GM about this.
- OT1 = dict:store(Txn, {undefined, abandoned}, OT),
- noreply(State #state { open_transactions = OT1 });
- {ok, {open, undefined}} ->
- %% The rollback is yet to arrive via GM.
- OT1 = dict:store(Txn, {open, abandoned}, OT),
- noreply(State #state { open_transactions = OT1 });
- {ok, {abandoned, undefined}} ->
- %% GM has already rolled back. Tidy up.
- noreply(State #state { open_transactions = dict:erase(Txn, OT) })
- end.
+handle_cast({rollback, _Txn, _ChPid}, State) ->
+ %% We don't support transactions in mirror queues
+ noreply(State).
handle_info(timeout, State) ->
noreply(backing_queue_idle_timeout(State));
@@ -405,8 +370,7 @@ promote_me(From, #state { q = Q,
rate_timer_ref = RateTRef,
sender_queues = SQ,
msg_id_ack = MA,
- msg_id_status = MS,
- open_transactions = OT }) ->
+ msg_id_status = MS }) ->
rabbit_log:info("Promoting slave ~p for ~s~n",
[self(), rabbit_misc:rs(Q #amqqueue.name)]),
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, GM),
@@ -414,16 +378,6 @@ promote_me(From, #state { q = Q,
gen_server2:reply(From, {promote, CPid}),
ok = gm:confirmed_broadcast(GM, heartbeat),
- %% Start by rolling back all open transactions
- AbandonedTxns = [Txn || {Txn, {open, _TxnStatusByChannel}}
- <- dict:to_list(OT)],
- BQS1 = lists:foldl(
- fun (Txn, BQSN) ->
- ok = gm:confirmed_broadcast(GM, {tx_rollback, Txn}),
- {_AckTags, BQSN1} = BQ:tx_rollback(Txn, BQSN),
- BQSN1
- end, BQS, AbandonedTxns),
-
%% We find all the messages that we've received from channels but
%% not from gm, and if they're due to be enqueued on promotion
%% then we pass them to the
@@ -495,7 +449,7 @@ promote_me(From, #state { q = Q,
Status =:= published orelse Status =:= confirmed]),
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
- CPid, BQ, BQS1, GM, SS, sets:from_list(AbandonedTxns)),
+ CPid, BQ, BQS, GM, SS),
MTC = dict:from_list(
[{MsgId, {ChPid, MsgSeqNo}} ||
@@ -604,9 +558,8 @@ maybe_enqueue_message(
%% discarded. We won't see this again.
State #state { msg_id_status = dict:erase(MsgId, MS) }
end;
-maybe_enqueue_message(_Delivery, State) ->
- %% In a txn. Txns are completely driven by gm for simplicity, so
- %% we're not going to do anything here.
+maybe_enqueue_message(_Delivery, _EnqueueOnPromotion, State) ->
+ %% We don't support txns in mirror queues.
State.
process_instruction(
@@ -755,7 +708,7 @@ process_instruction({ack, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
msg_id_ack = MA }) ->
- {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA, remove),
+ {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
{MsgIds1, BQS1} = BQ:ack(AckTags, BQS),
[] = MsgIds1 -- MsgIds, %% ASSERTION
{ok, State #state { msg_id_ack = MA1,
@@ -764,7 +717,7 @@ process_instruction({requeue, MsgPropsFun, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
msg_id_ack = MA }) ->
- {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA, remove),
+ {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
{ok, case length(AckTags) =:= length(MsgIds) of
true ->
{MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS),
@@ -779,113 +732,21 @@ process_instruction({requeue, MsgPropsFun, MsgIds},
State #state { msg_id_ack = dict:new(),
backing_queue_state = BQS2 }
end};
-process_instruction({tx_publish, Txn, ChPid, MsgProps, Msg},
- State = #state { backing_queue = BQ,
- backing_queue_state = BQS,
- open_transactions = OT }) ->
- %% Will never see abandoned or committed in the LHS
- OT1 = case dict:find(Txn, OT) of
- error ->
- dict:store(Txn, {open, undefined}, OT);
- {ok, {open, _TxnStatusByChannel}} ->
- OT
- end,
- BQS1 = BQ:tx_publish(Txn, Msg, MsgProps, ChPid, BQS),
- {ok, State #state { backing_queue_state = BQS1,
- open_transactions = OT1 }};
-process_instruction({tx_ack, Txn, MsgIds},
- State = #state { backing_queue = BQ,
- backing_queue_state = BQS,
- open_transactions = OT,
- msg_id_ack = MA }) ->
- %% Will never see abandoned or committed in the LHS
- OT1 = case dict:find(Txn, OT) of
- error ->
- dict:store(Txn, {open, undefined}, OT);
- {ok, {open, _TxnStatusByChannel}} ->
- OT
- end,
- %% Remember, rollback of a txn with acks simply undoes the ack -
- %% the msg itself is not requeued or anything. Thus we make sure
- %% msg_ids_to_acktags does not remove the entry from MQ, and we
- %% will do the remove when we commit.
- {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA, keep),
- BQS1 = BQ:tx_ack(Txn, AckTags, BQS),
- {ok, State #store { backing_queue_state = BQS1,
- open_transactions = OT1,
- msg_id_ack = MA1 }};
-process_instruction({tx_commit, Txn, MsgPropsFun, MsgIds},
- State = #state { backing_queue = BQ,
- backing_queue_state = BQS,
- open_transactions = OT,
- msg_id_ack = MA }) ->
- %% We must remove the ack tags from MQ at this point
- {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA, remove),
- %% We won't adjust open_transactions until we get the post_commit
- %% callback, unless we've already seen the commit from the channel
- case dict:find(Txn, OT) of
- {open, {committed, From}} ->
- {AckTags1, BQS1} =
- BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end,
- MsgPropsFun, BQS),
- OT1 = dict:erase(Txn, OT),
- true = lists:usort(AckTags) =:= lists:usort(AckTags1), %% ASSERTION
- {ok, State #state { backing_queue_state = BQS,
- open_transactions = OT1,
- msg_id_ack = MA1 }};
- Status ->
- %% We have to cope with the possibility that we'll get
- %% promoted before the txn finishes, and rely on slight
- %% magic if we do complete here.
- Me = self(),
- F = fun () -> rabbit_amqqueue:run_backing_queue_async(
- Me, rabbit_mirror_queue_master,
- fun (rabbit_mirror_queue_master,
- State1 = #state { open_transactions = OT2 }) ->
- OT3 = case dict:find(Txn, OT2) of
- {committing, undefined} ->
- dict:store(
- Txn, {committed, undefined},
- OT2);
- {committing, {committed, From}} ->
- gen_server2:reply(From, ok),
- dict:erase(Txn, OT2)
- end,
- State1 #state { open_transactions = OT3 }
- end)
- end,
- {AckTags1, BQS1} = BQ:tx_commit(Txn, F, MsgPropsFun, BQS),
- true = lists:usort(AckTags) =:= lists:usort(AckTags1), %% ASSERTION
- OT1 = case Status of
- error ->
- dict:store(Txn, {committing, undefined}, OT);
- {open, TxnStatusByChannel} ->
- dict:store(Txn, {committing, TxnStatusByChannel}, OT)
- end,
- {ok, State #state { backing_queue_state = BQS,
- open_transactions = OT1,
- msg_id_ack = MA1 }}}
- end;
-
process_instruction(delete_and_terminate,
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
BQ:delete_and_terminate(BQS),
{stop, State #state { backing_queue_state = undefined }}.
-msg_ids_to_acktags(MsgIds, MA, RemoveOrKeep) ->
+msg_ids_to_acktags(MsgIds, MA) ->
{AckTags, MA1} =
- lists:foldl(fun (MsgId, {AckTagsN, MAN}) ->
- case dict:find(MsgId, MA) of
- error ->
- {AckTagsN, MAN};
- {ok, AckTag} when RemoveOrKeep =:= remove ->
- {[AckTag | AckTagsN],
- dict:erase(MsgId, MAN)};
- {ok, AckTag} when RemoveOrKeep =:= keep ->
- {[AckTag | AckTagsN], MAN}
- end
- end, {[], MA}, MsgIds),
+ lists:foldl(
+ fun (MsgId, {Acc, MAN}) ->
+ case dict:find(MsgId, MA) of
+ error -> {Acc, MAN};
+ {ok, AckTag} -> {[AckTag | Acc], dict:erase(MsgId, MAN)}
+ end
+ end, {[], MA}, MsgIds),
{lists:reverse(AckTags), MA1}.
ack_all(BQ, MA, BQS) ->