summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_mirror_queue_master.erl63
-rw-r--r--src/rabbit_mirror_queue_slave.erl125
2 files changed, 135 insertions, 53 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 0d64ab8e77..4628796f57 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -26,7 +26,7 @@
-export([start/1, stop/0]).
--export([promote_backing_queue_state/4]).
+-export([promote_backing_queue_state/5]).
-behaviour(rabbit_backing_queue).
@@ -36,7 +36,8 @@
coordinator,
backing_queue,
backing_queue_state,
- set_delivered
+ set_delivered,
+ fakes
}).
%% ---------------------------------------------------------------------------
@@ -64,14 +65,16 @@ init(#amqqueue { arguments = Args } = Q, Recover) ->
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = 0 }.
+ set_delivered = 0,
+ fakes = sets:new() }.
-promote_backing_queue_state(CPid, BQ, BQS, GM) ->
+promote_backing_queue_state(CPid, BQ, BQS, GM, Fakes) ->
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = BQ:len(BQS) }.
+ set_delivered = BQ:len(BQS),
+ fakes = Fakes }.
terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
%% Backing queue termination. The queue is going down but
@@ -126,30 +129,54 @@ dropwhile(Fun, State = #state { gm = GM,
fetch(AckRequired, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = SetDelivered }) ->
+ set_delivered = SetDelivered,
+ fakes = Fakes }) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
- State1 = State #state { backing_queue_state = BQS1 },
case Result of
empty ->
- {Result, State1};
+ {Result, State #state { backing_queue_state = BQS1 }};
{#basic_message { guid = Guid } = Message, IsDelivered, AckTag,
Remaining} ->
- ok = gm:broadcast(GM, {fetch, AckRequired, Guid, Remaining}),
- IsDelivered1 = IsDelivered orelse SetDelivered > 0,
SetDelivered1 = lists:max([0, SetDelivered - 1]),
- {{Message, IsDelivered1, AckTag, Remaining},
- State1 #state { set_delivered = SetDelivered1 }}
+ case sets:is_element(Guid, Fakes) of
+ true ->
+ {BQS2, Fakes1} =
+ case AckRequired of
+ true -> {[Guid], BQS3} = BQ:ack([AckTag], BQS1),
+ {BQS3, Fakes};
+ false -> {BQS1, sets:del_element(Guid, Fakes)}
+ end,
+ ok = gm:broadcast(GM, {fetch, false, Guid, Remaining}),
+ fetch(AckRequired,
+ State #state { backing_queue_state = BQS2,
+ set_delivered = SetDelivered1,
+ fakes = Fakes1 });
+ false ->
+ ok = gm:broadcast(GM,
+ {fetch, AckRequired, Guid, Remaining}),
+ IsDelivered1 = IsDelivered orelse SetDelivered > 0,
+ Fakes1 = case SetDelivered + SetDelivered1 of
+ 1 -> sets:new(); %% transition to 0
+ _ -> Fakes
+ end,
+ {{Message, IsDelivered1, AckTag, Remaining},
+ State #state { backing_queue_state = BQS1,
+ set_delivered = SetDelivered1,
+ fakes = Fakes1 }}
+ end
end.
ack(AckTags, State = #state { gm = GM,
backing_queue = BQ,
- backing_queue_state = BQS }) ->
+ backing_queue_state = BQS,
+ fakes = Fakes }) ->
{Guids, BQS1} = BQ:ack(AckTags, BQS),
- case Guids of
- [] -> ok;
- _ -> ok = gm:broadcast(GM, {ack, Guids})
- end,
- {Guids, State #state { backing_queue_state = BQS1 }}.
+ Fakes1 = case Guids of
+ [] -> Fakes;
+ _ -> ok = gm:broadcast(GM, {ack, Guids}),
+ sets:difference(Fakes, sets:from_list(Guids))
+ end,
+ {Guids, State #state { backing_queue_state = BQS1, fakes = Fakes1 }}.
tx_publish(Txn, Msg, MsgProps, ChPid, #state {} = State) ->
%% gm:broadcast(GM, {tx_publish, Txn, Guid, MsgProps, ChPid})
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index f124bc9eb0..0134787cd0 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -75,6 +75,7 @@
-behaviour(gm).
-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
-include("gm_specs.hrl").
-record(state, { q,
@@ -87,6 +88,7 @@
sender_queues, %% :: Pid -> MsgQ
guid_ack, %% :: Guid -> AckTag
instructions, %% :: InstrQ
+ fakes, %% :: Set Guid
guid_to_channel %% for confirms
}).
@@ -141,6 +143,7 @@ init([#amqqueue { name = QueueName } = Q]) ->
sender_queues = dict:new(),
guid_ack = dict:new(),
instructions = queue:new(),
+ fakes = sets:new(),
guid_to_channel = dict:new()
}, hibernate,
@@ -190,7 +193,7 @@ handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) ->
handle_cast({gm, Instruction}, State = #state { instructions = InstrQ }) ->
State1 = State #state { instructions = queue:in(Instruction, InstrQ) },
case queue:is_empty(InstrQ) of
- true -> handle_process_result(process_instructions(State1));
+ true -> handle_process_result(process_instructions(false, State1));
false -> noreply(State1)
end;
@@ -320,21 +323,25 @@ maybe_confirm_message(Guid, GTC) ->
handle_process_result({continue, State}) -> noreply(State);
handle_process_result({stop, State}) -> {stop, normal, State}.
-promote_me(From, #state { q = Q,
- gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- rate_timer_ref = RateTRef,
- sender_queues = SQ,
- guid_ack = GA }) ->
+promote_me(From, State = #state { q = Q }) ->
rabbit_log:info("Promoting slave ~p for queue ~p~n",
[self(), Q #amqqueue.name]),
+ #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = RateTRef,
+ sender_queues = SQ,
+ guid_ack = GA,
+ instructions = Instr,
+ fakes = Fakes } =
+ process_instructions(true, State),
+ true = queue:is_empty(Instr), %% ASSERTION
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, GM),
true = unlink(GM),
gen_server2:reply(From, {promote, CPid}),
ok = gm:confirmed_broadcast(GM, heartbeat),
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
- CPid, BQ, BQS, GM),
+ CPid, BQ, BQS, GM, Fakes),
%% We have to do the requeue via this init because otherwise we
%% don't have access to the relevent MsgPropsFun. Also, we are
%% already in mnesia as the master queue pid. Thus we cannot just
@@ -387,19 +394,19 @@ enqueue_message(Delivery = #delivery { sender = ChPid },
SQ1 = dict:store(ChPid, queue:in(Delivery, Q), SQ),
State1 = State #state { sender_queues = SQ1 },
case queue:is_empty(Q) of
- true -> process_instructions(State1);
+ true -> process_instructions(false, State1);
false -> {continue, State1}
end.
-process_instructions(State = #state { instructions = InstrQ }) ->
+process_instructions(Flush, State = #state { instructions = InstrQ }) ->
case queue:out(InstrQ) of
{empty, _InstrQ} ->
{continue, State};
{{value, Instr}, InstrQ1} ->
- case process_instruction(Instr, State) of
+ case process_instruction(Flush, Instr, State) of
{processed, State1} ->
process_instructions(
- State1 #state { instructions = InstrQ1 });
+ Flush, State1 #state { instructions = InstrQ1 });
{stop, State1} ->
{stop, State1 #state { instructions = InstrQ1 }};
blocked ->
@@ -407,20 +414,38 @@ process_instructions(State = #state { instructions = InstrQ }) ->
end
end.
-process_instruction({publish, Deliver, Guid, MsgProps, ChPid},
+process_instruction(Flush, {publish, Deliver, Guid, MsgProps, ChPid} = Instr,
State = #state { q = Q,
sender_queues = SQ,
backing_queue = BQ,
backing_queue_state = BQS,
guid_ack = GA,
- guid_to_channel = GTC }) ->
+ guid_to_channel = GTC,
+ fakes = Fakes }) ->
case dict:find(ChPid, SQ) of
error ->
- blocked;
+ case Flush of
+ true -> MQ = queue:from_list([fake_delivery(Q, Guid, ChPid)]),
+ State1 = State #state {
+ sender_queues = dict:store(ChPid, MQ, SQ),
+ fakes = sets:add_element(Guid, Fakes) },
+ process_instruction(Flush, Instr, State1);
+ false -> blocked
+ end;
{ok, MQ} ->
case queue:out(MQ) of
{empty, _MQ} ->
- blocked;
+ case Flush of
+ true ->
+ MQ1 = queue:in_r(fake_delivery(Q, Guid, ChPid), MQ),
+ SQ1 = dict:store(ChPid, MQ1, SQ),
+ State1 = State #state {
+ sender_queues = SQ1,
+ fakes = sets:add_element(Guid, Fakes) },
+ process_instruction(Flush, Instr, State1);
+ false ->
+ blocked
+ end;
{{value, Delivery = #delivery {
message = Msg = #basic_message { guid = Guid } }},
MQ1} ->
@@ -449,28 +474,41 @@ process_instruction({publish, Deliver, Guid, MsgProps, ChPid},
guid_to_channel = GTC2 }
end};
{{value, #delivery {}}, _MQ1} ->
- %% throw away the instruction: we'll never receive
- %% the message to which it corresponds.
- {processed, State}
+ MQ1 = queue:in_r(fake_delivery(Q, Guid, ChPid), MQ),
+ State1 = State #state {
+ sender_queues = dict:store(ChPid, MQ1, SQ),
+ fakes = sets:add_element(Guid, Fakes) },
+ process_instruction(Flush, Instr, State1)
end
end;
-process_instruction({set_length, Length},
+process_instruction(_Flush, {set_length, Length},
State = #state { backing_queue = BQ,
- backing_queue_state = BQS }) ->
+ backing_queue_state = BQS,
+ fakes = Fakes }) ->
QLen = BQ:len(BQS),
ToDrop = QLen - Length,
{processed,
case ToDrop > 0 of
- true -> BQS1 = lists:foldl(
- fun (const, BQSN) -> BQ:fetch(false, BQSN) end,
- BQS, lists:duplicate(ToDrop, const)),
- State #state { backing_queue_state = BQS1 };
- false -> State
+ true ->
+ {Guids, BQS1} =
+ lists:foldl(
+ fun (const, {GuidsN, BQSN}) ->
+ {{#basic_message { guid = Guid }, _IsDelivered,
+ _AckTag, _Remaining}, BQSN1} =
+ BQ:fetch(false, BQSN),
+ {[Guid | GuidsN], BQSN1}
+ end, BQS, lists:duplicate(ToDrop, const)),
+ Fakes1 = sets:difference(Fakes, sets:from_list(Guids)),
+ State #state { backing_queue_state = BQS1,
+ fakes = Fakes1 };
+ false ->
+ State
end};
-process_instruction({fetch, AckRequired, Guid, Remaining},
+process_instruction(_Flush, {fetch, AckRequired, Guid, Remaining},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
- guid_ack = GA }) ->
+ guid_ack = GA,
+ fakes = Fakes }) ->
QLen = BQ:len(BQS),
{processed,
case QLen - 1 of
@@ -481,22 +519,28 @@ process_instruction({fetch, AckRequired, Guid, Remaining},
true -> dict:store(Guid, AckTag, GA);
false -> GA
end,
+ Fakes1 = sets:del_element(Guid, Fakes),
State #state { backing_queue_state = BQS1,
- guid_ack = GA1 };
+ guid_ack = GA1,
+ fakes = Fakes1 };
Other when Other < Remaining ->
%% we must be shorter than the master
+ false = sets:is_element(Guid, Fakes), %% ASSERTION
State
end};
-process_instruction({ack, Guids},
+process_instruction(_Flush, {ack, Guids},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
- guid_ack = GA }) ->
+ guid_ack = GA,
+ fakes = Fakes }) ->
{AckTags, GA1} = guids_to_acktags(Guids, GA),
{Guids1, BQS1} = BQ:ack(AckTags, BQS),
[] = Guids1 -- Guids, %% ASSERTION
+ Fakes1 = sets:difference(Fakes, sets:from_list(Guids)),
{processed, State #state { guid_ack = GA1,
- backing_queue_state = BQS1 }};
-process_instruction({requeue, MsgPropsFun, Guids},
+ backing_queue_state = BQS1,
+ fakes = Fakes1 }};
+process_instruction(_Flush, {requeue, MsgPropsFun, Guids},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
guid_ack = GA }) ->
@@ -515,7 +559,7 @@ process_instruction({requeue, MsgPropsFun, Guids},
State #state { guid_ack = dict:new(),
backing_queue_state = BQS2 }
end};
-process_instruction(delete_and_terminate,
+process_instruction(_Flush, delete_and_terminate,
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
BQ:delete_and_terminate(BQS),
@@ -534,3 +578,14 @@ guids_to_acktags(Guids, GA) ->
ack_all(BQ, GA, BQS) ->
BQ:ack([AckTag || {_Guid, AckTag} <- dict:to_list(GA)], BQS).
+
+fake_delivery(#amqqueue { name = QueueName }, Guid, ChPid) ->
+ ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>),
+ Msg = (rabbit_basic:message(ExchangeName, <<>>, #'P_basic'{}, <<>>))
+ #basic_message { guid = Guid },
+ #delivery { mandatory = false,
+ immediate = false,
+ txn = none,
+ sender = ChPid,
+ message = Msg,
+ msg_seq_no = undefined }.