summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_mirror_queue_master.erl62
-rw-r--r--src/rabbit_mirror_queue_slave.erl246
2 files changed, 159 insertions, 149 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 0d64ab8e77..94e93b3e80 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -26,7 +26,7 @@
-export([start/1, stop/0]).
--export([promote_backing_queue_state/4]).
+-export([promote_backing_queue_state/5]).
-behaviour(rabbit_backing_queue).
@@ -36,7 +36,8 @@
coordinator,
backing_queue,
backing_queue_state,
- set_delivered
+ set_delivered,
+ seen
}).
%% ---------------------------------------------------------------------------
@@ -64,14 +65,16 @@ init(#amqqueue { arguments = Args } = Q, Recover) ->
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = 0 }.
+ set_delivered = 0,
+ seen = sets:new() }.
-promote_backing_queue_state(CPid, BQ, BQS, GM) ->
+promote_backing_queue_state(CPid, BQ, BQS, GM, Seen) ->
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = BQ:len(BQS) }.
+ set_delivered = BQ:len(BQS),
+ seen = Seen }.
terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
%% Backing queue termination. The queue is going down but
@@ -94,22 +97,31 @@ purge(State = #state { gm = GM,
{Count, State #state { backing_queue_state = BQS1,
set_delivered = 0 }}.
-publish(Msg = #basic_message { guid = Guid },
- MsgProps, ChPid, State = #state { gm = GM,
+publish(Msg = #basic_message { guid = Guid }, MsgProps, ChPid,
+ State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ seen = Seen }) ->
+ case sets:is_element(Guid, Seen) of
+ true -> State #state { seen = sets:del_element(Guid, Seen) };
+ false -> ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}),
+ BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ State #state { backing_queue_state = BQS1 }
+ end.
+
+publish_delivered(AckRequired, Msg = #basic_message { guid = Guid }, MsgProps,
+ ChPid, State = #state { gm = GM,
backing_queue = BQ,
- backing_queue_state = BQS }) ->
- ok = gm:broadcast(GM, {publish, false, Guid, MsgProps, ChPid}),
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
- State #state { backing_queue_state = BQS1 }.
-
-publish_delivered(AckRequired, Msg = #basic_message { guid = Guid },
- MsgProps, ChPid,
- State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS }) ->
- ok = gm:broadcast(GM, {publish, {true, AckRequired}, Guid, MsgProps, ChPid}),
- {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS),
- {AckTag, State #state { backing_queue_state = BQS1 }}.
+ backing_queue_state = BQS,
+ seen = Seen }) ->
+ case sets:is_element(Guid, Seen) of
+ true -> State #state { seen = sets:del_element(Guid, Seen) };
+ false -> ok = gm:broadcast(GM, {publish, {true, AckRequired}, ChPid,
+ MsgProps, Msg}),
+ {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg,
+ MsgProps, ChPid, BQS),
+ {AckTag, State #state { backing_queue_state = BQS1 }}
+ end.
dropwhile(Fun, State = #state { gm = GM,
backing_queue = BQ,
@@ -126,7 +138,8 @@ dropwhile(Fun, State = #state { gm = GM,
fetch(AckRequired, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = SetDelivered }) ->
+ set_delivered = SetDelivered,
+ seen = Seen }) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
State1 = State #state { backing_queue_state = BQS1 },
case Result of
@@ -137,8 +150,13 @@ fetch(AckRequired, State = #state { gm = GM,
ok = gm:broadcast(GM, {fetch, AckRequired, Guid, Remaining}),
IsDelivered1 = IsDelivered orelse SetDelivered > 0,
SetDelivered1 = lists:max([0, SetDelivered - 1]),
+ Seen1 = case SetDelivered + SetDelivered1 of
+ 1 -> sets:new(); %% transition to empty
+ _ -> Seen
+ end,
{{Message, IsDelivered1, AckTag, Remaining},
- State1 #state { set_delivered = SetDelivered1 }}
+ State1 #state { set_delivered = SetDelivered1,
+ seen = Seen1 }}
end.
ack(AckTags, State = #state { gm = GM,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index f124bc9eb0..deb1cc66dd 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -86,7 +86,7 @@
sender_queues, %% :: Pid -> MsgQ
guid_ack, %% :: Guid -> AckTag
- instructions, %% :: InstrQ
+ seen, %% Set Guid
guid_to_channel %% for confirms
}).
@@ -140,7 +140,7 @@ init([#amqqueue { name = QueueName } = Q]) ->
sender_queues = dict:new(),
guid_ack = dict:new(),
- instructions = queue:new(),
+ seen = sets:new(),
guid_to_channel = dict:new()
}, hibernate,
@@ -153,12 +153,12 @@ init([#amqqueue { name = QueueName } = Q]) ->
handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) ->
%% Synchronous, "immediate" delivery mode
gen_server2:reply(From, false), %% master may deliver it, not us
- handle_process_result(enqueue_message(Delivery, State));
+ noreply(maybe_enqueue_message(Delivery, State));
handle_call({deliver, Delivery = #delivery {}}, From, State) ->
%% Synchronous, "mandatory" delivery mode
gen_server2:reply(From, true), %% amqqueue throws away the result anyway
- handle_process_result(enqueue_message(Delivery, State));
+ noreply(maybe_enqueue_message(Delivery, State));
handle_call({gm_deaths, Deaths}, From,
State = #state { q = #amqqueue { name = QueueName },
@@ -187,16 +187,12 @@ handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) ->
noreply(maybe_run_queue_via_backing_queue(Fun, State));
-handle_cast({gm, Instruction}, State = #state { instructions = InstrQ }) ->
- State1 = State #state { instructions = queue:in(Instruction, InstrQ) },
- case queue:is_empty(InstrQ) of
- true -> handle_process_result(process_instructions(State1));
- false -> noreply(State1)
- end;
+handle_cast({gm, Instruction}, State) ->
+ handle_process_result(process_instruction(Instruction, State));
handle_cast({deliver, Delivery = #delivery {}}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- handle_process_result(enqueue_message(Delivery, State));
+ noreply(maybe_enqueue_message(Delivery, State));
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
@@ -317,8 +313,8 @@ maybe_confirm_message(Guid, GTC) ->
GTC
end.
-handle_process_result({continue, State}) -> noreply(State);
-handle_process_result({stop, State}) -> {stop, normal, State}.
+handle_process_result({ok, State}) -> noreply(State);
+handle_process_result({stop, State}) -> {stop, normal, State}.
promote_me(From, #state { q = Q,
gm = GM,
@@ -326,6 +322,7 @@ promote_me(From, #state { q = Q,
backing_queue_state = BQS,
rate_timer_ref = RateTRef,
sender_queues = SQ,
+ seen = Seen,
guid_ack = GA }) ->
rabbit_log:info("Promoting slave ~p for queue ~p~n",
[self(), Q #amqqueue.name]),
@@ -334,7 +331,7 @@ promote_me(From, #state { q = Q,
gen_server2:reply(From, {promote, CPid}),
ok = gm:confirmed_broadcast(GM, heartbeat),
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
- CPid, BQ, BQS, GM),
+ CPid, BQ, BQS, GM, Seen),
%% We have to do the requeue via this init because otherwise we
%% don't have access to the relevent MsgPropsFun. Also, we are
%% already in mnesia as the master queue pid. Thus we cannot just
@@ -378,115 +375,111 @@ stop_rate_timer(State = #state { rate_timer_ref = TRef }) ->
{ok, cancel} = timer:cancel(TRef),
State #state { rate_timer_ref = undefined }.
-enqueue_message(Delivery = #delivery { sender = ChPid },
- State = #state { sender_queues = SQ }) ->
- Q = case dict:find(ChPid, SQ) of
- {ok, Q1} -> Q1;
- error -> queue:new()
- end,
- SQ1 = dict:store(ChPid, queue:in(Delivery, Q), SQ),
- State1 = State #state { sender_queues = SQ1 },
- case queue:is_empty(Q) of
- true -> process_instructions(State1);
- false -> {continue, State1}
- end.
-
-process_instructions(State = #state { instructions = InstrQ }) ->
- case queue:out(InstrQ) of
- {empty, _InstrQ} ->
- {continue, State};
- {{value, Instr}, InstrQ1} ->
- case process_instruction(Instr, State) of
- {processed, State1} ->
- process_instructions(
- State1 #state { instructions = InstrQ1 });
- {stop, State1} ->
- {stop, State1 #state { instructions = InstrQ1 }};
- blocked ->
- {continue, State}
- end
+maybe_enqueue_message(
+ Delivery = #delivery { message = #basic_message { guid = Guid },
+ sender = ChPid },
+ State = #state { q = Q,
+ sender_queues = SQ,
+ seen = Seen,
+ guid_to_channel = GTC }) ->
+ case sets:is_element(Guid, Seen) of
+ true ->
+ GTC1 = record_confirm_or_confirm(Delivery, Q, GTC),
+ State #state { guid_to_channel = GTC1,
+ seen = sets:del_element(Guid, Seen) };
+ false ->
+ MQ = case dict:find(ChPid, SQ) of
+ {ok, MQ1} -> MQ1;
+ error -> queue:new()
+ end,
+ SQ1 = dict:store(ChPid, queue:in(Delivery, MQ), SQ),
+ State #state { sender_queues = SQ1 }
end.
-process_instruction({publish, Deliver, Guid, MsgProps, ChPid},
- State = #state { q = Q,
- sender_queues = SQ,
- backing_queue = BQ,
- backing_queue_state = BQS,
- guid_ack = GA,
- guid_to_channel = GTC }) ->
- case dict:find(ChPid, SQ) of
- error ->
- blocked;
- {ok, MQ} ->
- case queue:out(MQ) of
- {empty, _MQ} ->
- blocked;
- {{value, Delivery = #delivery {
- message = Msg = #basic_message { guid = Guid } }},
- MQ1} ->
- State1 = State #state { sender_queues =
- dict:store(ChPid, MQ1, SQ) },
- GTC1 = record_confirm_or_confirm(Delivery, Q, GTC),
- {processed,
- case Deliver of
- false ->
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
- State1 #state { backing_queue_state = BQS1,
- guid_to_channel = GTC1 };
- {true, AckRequired} ->
- {AckTag, BQS1} = BQ:publish_delivered(
- AckRequired, Msg, MsgProps,
- ChPid, BQS),
- {GA1, GTC2} =
- case AckRequired of
- true ->
- {dict:store(Guid, AckTag, GA), GTC1};
- false ->
- {GA, maybe_confirm_message(Guid, GTC1)}
- end,
- State1 #state { backing_queue_state = BQS1,
- guid_ack = GA1,
- guid_to_channel = GTC2 }
- end};
- {{value, #delivery {}}, _MQ1} ->
- %% throw away the instruction: we'll never receive
- %% the message to which it corresponds.
- {processed, State}
- end
- end;
+process_instruction(
+ {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { guid = Guid }},
+ State = #state { q = Q,
+ sender_queues = SQ,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ guid_ack = GA,
+ seen = Seen,
+ guid_to_channel = GTC }) ->
+ {SQ1, Seen1, GTC1} =
+ case dict:find(ChPid, SQ) of
+ error ->
+ {SQ, sets:add_element(Guid, Seen), GTC};
+ {ok, MQ} ->
+ case queue:out(MQ) of
+ {empty, _MQ} ->
+ {SQ, sets:add_element(Guid, Seen), GTC};
+ {{value, Delivery = #delivery {
+ message = #basic_message { guid = Guid } }},
+ MQ1} ->
+ GTC2 = record_confirm_or_confirm(Delivery, Q, GTC),
+ {dict:store(ChPid, MQ1, SQ), Seen, GTC2};
+ {{value, #delivery {}}, _MQ1} ->
+ %% The instruction was sent to us before we
+ %% were within the mirror_pids within the
+ %% amqqueue record. We'll never receive the
+ %% message directly.
+ {SQ, Seen, GTC}
+ end
+ end,
+ State1 = State #state { sender_queues = SQ1,
+ seen = Seen1,
+ guid_to_channel = GTC1 },
+ {ok,
+ case Deliver of
+ false ->
+ BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ State1 #state { backing_queue_state = BQS1 };
+ {true, AckRequired} ->
+ {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps,
+ ChPid, BQS),
+ {GA1, GTC3} = case AckRequired of
+ true -> {dict:store(Guid, AckTag, GA), GTC1};
+ false -> {GA, maybe_confirm_message(Guid, GTC1)}
+ end,
+ State1 #state { backing_queue_state = BQS1,
+ guid_ack = GA1,
+ guid_to_channel = GTC3 }
+ end};
process_instruction({set_length, Length},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
ToDrop = QLen - Length,
- {processed,
- case ToDrop > 0 of
- true -> BQS1 = lists:foldl(
- fun (const, BQSN) -> BQ:fetch(false, BQSN) end,
- BQS, lists:duplicate(ToDrop, const)),
- State #state { backing_queue_state = BQS1 };
- false -> State
- end};
+ {ok, case ToDrop > 0 of
+ true -> BQS1 =
+ lists:foldl(
+ fun (const, BQSN) ->
+ {{_Msg, _IsDelivered, _AckTag, _Remaining},
+ BQSN1} = BQ:fetch(false, BQSN),
+ BQSN1
+ end, BQS, lists:duplicate(ToDrop, const)),
+ State #state { backing_queue_state = BQS1 };
+ false -> State
+ end};
process_instruction({fetch, AckRequired, Guid, Remaining},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
guid_ack = GA }) ->
QLen = BQ:len(BQS),
- {processed,
- case QLen - 1 of
- Remaining ->
- {{_Msg, _IsDelivered, AckTag, Remaining}, BQS1} =
- BQ:fetch(AckRequired, BQS),
- GA1 = case AckRequired of
- true -> dict:store(Guid, AckTag, GA);
- false -> GA
- end,
- State #state { backing_queue_state = BQS1,
- guid_ack = GA1 };
- Other when Other < Remaining ->
- %% we must be shorter than the master
- State
- end};
+ {ok, case QLen - 1 of
+ Remaining ->
+ {{_Msg, _IsDelivered, AckTag, Remaining}, BQS1} =
+ BQ:fetch(AckRequired, BQS),
+ GA1 = case AckRequired of
+ true -> dict:store(Guid, AckTag, GA);
+ false -> GA
+ end,
+ State #state { backing_queue_state = BQS1,
+ guid_ack = GA1 };
+ Other when Other < Remaining ->
+ %% we must be shorter than the master
+ State
+ end};
process_instruction({ack, Guids},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -494,27 +487,26 @@ process_instruction({ack, Guids},
{AckTags, GA1} = guids_to_acktags(Guids, GA),
{Guids1, BQS1} = BQ:ack(AckTags, BQS),
[] = Guids1 -- Guids, %% ASSERTION
- {processed, State #state { guid_ack = GA1,
- backing_queue_state = BQS1 }};
+ {ok, State #state { guid_ack = GA1,
+ backing_queue_state = BQS1 }};
process_instruction({requeue, MsgPropsFun, Guids},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
guid_ack = GA }) ->
{AckTags, GA1} = guids_to_acktags(Guids, GA),
- {processed,
- case length(AckTags) =:= length(Guids) of
- true ->
- {Guids, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS),
- State #state { guid_ack = GA1,
- backing_queue_state = BQS1 };
- false ->
- %% the only thing we can safely do is nuke out our BQ and
- %% GA
- {_Count, BQS1} = BQ:purge(BQS),
- {Guids, BQS2} = ack_all(BQ, GA, BQS1),
- State #state { guid_ack = dict:new(),
- backing_queue_state = BQS2 }
- end};
+ {ok, case length(AckTags) =:= length(Guids) of
+ true ->
+ {Guids, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS),
+ State #state { guid_ack = GA1,
+ backing_queue_state = BQS1 };
+ false ->
+ %% the only thing we can safely do is nuke out our BQ
+ %% and GA
+ {_Count, BQS1} = BQ:purge(BQS),
+ {Guids, BQS2} = ack_all(BQ, GA, BQS1),
+ State #state { guid_ack = dict:new(),
+ backing_queue_state = BQS2 }
+ end};
process_instruction(delete_and_terminate,
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->