summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-12-20 13:56:21 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2010-12-20 13:56:21 +0000
commitb60e1c47fd36139cf5f1f4b4fff13b5110728b6c (patch)
tree43aa82c43a1ba386190ff68c7e3831653f683311 /src
parentb88e8aac5cfd38cf7380984565c1f4e134772f51 (diff)
downloadrabbitmq-server-git-b60e1c47fd36139cf5f1f4b4fff13b5110728b6c.tar.gz
Revert the previous changeset as I've decided to solve this differently
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_mirror_queue_master.erl63
-rw-r--r--src/rabbit_mirror_queue_slave.erl125
2 files changed, 53 insertions, 135 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 4628796f57..0d64ab8e77 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -26,7 +26,7 @@
-export([start/1, stop/0]).
--export([promote_backing_queue_state/5]).
+-export([promote_backing_queue_state/4]).
-behaviour(rabbit_backing_queue).
@@ -36,8 +36,7 @@
coordinator,
backing_queue,
backing_queue_state,
- set_delivered,
- fakes
+ set_delivered
}).
%% ---------------------------------------------------------------------------
@@ -65,16 +64,14 @@ init(#amqqueue { arguments = Args } = Q, Recover) ->
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = 0,
- fakes = sets:new() }.
+ set_delivered = 0 }.
-promote_backing_queue_state(CPid, BQ, BQS, GM, Fakes) ->
+promote_backing_queue_state(CPid, BQ, BQS, GM) ->
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = BQ:len(BQS),
- fakes = Fakes }.
+ set_delivered = BQ:len(BQS) }.
terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
%% Backing queue termination. The queue is going down but
@@ -129,54 +126,30 @@ dropwhile(Fun, State = #state { gm = GM,
fetch(AckRequired, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = SetDelivered,
- fakes = Fakes }) ->
+ set_delivered = SetDelivered }) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
+ State1 = State #state { backing_queue_state = BQS1 },
case Result of
empty ->
- {Result, State #state { backing_queue_state = BQS1 }};
+ {Result, State1};
{#basic_message { guid = Guid } = Message, IsDelivered, AckTag,
Remaining} ->
+ ok = gm:broadcast(GM, {fetch, AckRequired, Guid, Remaining}),
+ IsDelivered1 = IsDelivered orelse SetDelivered > 0,
SetDelivered1 = lists:max([0, SetDelivered - 1]),
- case sets:is_element(Guid, Fakes) of
- true ->
- {BQS2, Fakes1} =
- case AckRequired of
- true -> {[Guid], BQS3} = BQ:ack([AckTag], BQS1),
- {BQS3, Fakes};
- false -> {BQS1, sets:del_element(Guid, Fakes)}
- end,
- ok = gm:broadcast(GM, {fetch, false, Guid, Remaining}),
- fetch(AckRequired,
- State #state { backing_queue_state = BQS2,
- set_delivered = SetDelivered1,
- fakes = Fakes1 });
- false ->
- ok = gm:broadcast(GM,
- {fetch, AckRequired, Guid, Remaining}),
- IsDelivered1 = IsDelivered orelse SetDelivered > 0,
- Fakes1 = case SetDelivered + SetDelivered1 of
- 1 -> sets:new(); %% transition to 0
- _ -> Fakes
- end,
- {{Message, IsDelivered1, AckTag, Remaining},
- State #state { backing_queue_state = BQS1,
- set_delivered = SetDelivered1,
- fakes = Fakes1 }}
- end
+ {{Message, IsDelivered1, AckTag, Remaining},
+ State1 #state { set_delivered = SetDelivered1 }}
end.
ack(AckTags, State = #state { gm = GM,
backing_queue = BQ,
- backing_queue_state = BQS,
- fakes = Fakes }) ->
+ backing_queue_state = BQS }) ->
{Guids, BQS1} = BQ:ack(AckTags, BQS),
- Fakes1 = case Guids of
- [] -> Fakes;
- _ -> ok = gm:broadcast(GM, {ack, Guids}),
- sets:difference(Fakes, sets:from_list(Guids))
- end,
- {Guids, State #state { backing_queue_state = BQS1, fakes = Fakes1 }}.
+ case Guids of
+ [] -> ok;
+ _ -> ok = gm:broadcast(GM, {ack, Guids})
+ end,
+ {Guids, State #state { backing_queue_state = BQS1 }}.
tx_publish(Txn, Msg, MsgProps, ChPid, #state {} = State) ->
%% gm:broadcast(GM, {tx_publish, Txn, Guid, MsgProps, ChPid})
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 0134787cd0..f124bc9eb0 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -75,7 +75,6 @@
-behaviour(gm).
-include("rabbit.hrl").
--include("rabbit_framing.hrl").
-include("gm_specs.hrl").
-record(state, { q,
@@ -88,7 +87,6 @@
sender_queues, %% :: Pid -> MsgQ
guid_ack, %% :: Guid -> AckTag
instructions, %% :: InstrQ
- fakes, %% :: Set Guid
guid_to_channel %% for confirms
}).
@@ -143,7 +141,6 @@ init([#amqqueue { name = QueueName } = Q]) ->
sender_queues = dict:new(),
guid_ack = dict:new(),
instructions = queue:new(),
- fakes = sets:new(),
guid_to_channel = dict:new()
}, hibernate,
@@ -193,7 +190,7 @@ handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) ->
handle_cast({gm, Instruction}, State = #state { instructions = InstrQ }) ->
State1 = State #state { instructions = queue:in(Instruction, InstrQ) },
case queue:is_empty(InstrQ) of
- true -> handle_process_result(process_instructions(false, State1));
+ true -> handle_process_result(process_instructions(State1));
false -> noreply(State1)
end;
@@ -323,25 +320,21 @@ maybe_confirm_message(Guid, GTC) ->
handle_process_result({continue, State}) -> noreply(State);
handle_process_result({stop, State}) -> {stop, normal, State}.
-promote_me(From, State = #state { q = Q }) ->
+promote_me(From, #state { q = Q,
+ gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = RateTRef,
+ sender_queues = SQ,
+ guid_ack = GA }) ->
rabbit_log:info("Promoting slave ~p for queue ~p~n",
[self(), Q #amqqueue.name]),
- #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- rate_timer_ref = RateTRef,
- sender_queues = SQ,
- guid_ack = GA,
- instructions = Instr,
- fakes = Fakes } =
- process_instructions(true, State),
- true = queue:is_empty(Instr), %% ASSERTION
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, GM),
true = unlink(GM),
gen_server2:reply(From, {promote, CPid}),
ok = gm:confirmed_broadcast(GM, heartbeat),
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
- CPid, BQ, BQS, GM, Fakes),
+ CPid, BQ, BQS, GM),
%% We have to do the requeue via this init because otherwise we
%% don't have access to the relevent MsgPropsFun. Also, we are
%% already in mnesia as the master queue pid. Thus we cannot just
@@ -394,19 +387,19 @@ enqueue_message(Delivery = #delivery { sender = ChPid },
SQ1 = dict:store(ChPid, queue:in(Delivery, Q), SQ),
State1 = State #state { sender_queues = SQ1 },
case queue:is_empty(Q) of
- true -> process_instructions(false, State1);
+ true -> process_instructions(State1);
false -> {continue, State1}
end.
-process_instructions(Flush, State = #state { instructions = InstrQ }) ->
+process_instructions(State = #state { instructions = InstrQ }) ->
case queue:out(InstrQ) of
{empty, _InstrQ} ->
{continue, State};
{{value, Instr}, InstrQ1} ->
- case process_instruction(Flush, Instr, State) of
+ case process_instruction(Instr, State) of
{processed, State1} ->
process_instructions(
- Flush, State1 #state { instructions = InstrQ1 });
+ State1 #state { instructions = InstrQ1 });
{stop, State1} ->
{stop, State1 #state { instructions = InstrQ1 }};
blocked ->
@@ -414,38 +407,20 @@ process_instructions(Flush, State = #state { instructions = InstrQ }) ->
end
end.
-process_instruction(Flush, {publish, Deliver, Guid, MsgProps, ChPid} = Instr,
+process_instruction({publish, Deliver, Guid, MsgProps, ChPid},
State = #state { q = Q,
sender_queues = SQ,
backing_queue = BQ,
backing_queue_state = BQS,
guid_ack = GA,
- guid_to_channel = GTC,
- fakes = Fakes }) ->
+ guid_to_channel = GTC }) ->
case dict:find(ChPid, SQ) of
error ->
- case Flush of
- true -> MQ = queue:from_list([fake_delivery(Q, Guid, ChPid)]),
- State1 = State #state {
- sender_queues = dict:store(ChPid, MQ, SQ),
- fakes = sets:add_element(Guid, Fakes) },
- process_instruction(Flush, Instr, State1);
- false -> blocked
- end;
+ blocked;
{ok, MQ} ->
case queue:out(MQ) of
{empty, _MQ} ->
- case Flush of
- true ->
- MQ1 = queue:in_r(fake_delivery(Q, Guid, ChPid), MQ),
- SQ1 = dict:store(ChPid, MQ1, SQ),
- State1 = State #state {
- sender_queues = SQ1,
- fakes = sets:add_element(Guid, Fakes) },
- process_instruction(Flush, Instr, State1);
- false ->
- blocked
- end;
+ blocked;
{{value, Delivery = #delivery {
message = Msg = #basic_message { guid = Guid } }},
MQ1} ->
@@ -474,41 +449,28 @@ process_instruction(Flush, {publish, Deliver, Guid, MsgProps, ChPid} = Instr,
guid_to_channel = GTC2 }
end};
{{value, #delivery {}}, _MQ1} ->
- MQ1 = queue:in_r(fake_delivery(Q, Guid, ChPid), MQ),
- State1 = State #state {
- sender_queues = dict:store(ChPid, MQ1, SQ),
- fakes = sets:add_element(Guid, Fakes) },
- process_instruction(Flush, Instr, State1)
+ %% throw away the instruction: we'll never receive
+ %% the message to which it corresponds.
+ {processed, State}
end
end;
-process_instruction(_Flush, {set_length, Length},
+process_instruction({set_length, Length},
State = #state { backing_queue = BQ,
- backing_queue_state = BQS,
- fakes = Fakes }) ->
+ backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
ToDrop = QLen - Length,
{processed,
case ToDrop > 0 of
- true ->
- {Guids, BQS1} =
- lists:foldl(
- fun (const, {GuidsN, BQSN}) ->
- {{#basic_message { guid = Guid }, _IsDelivered,
- _AckTag, _Remaining}, BQSN1} =
- BQ:fetch(false, BQSN),
- {[Guid | GuidsN], BQSN1}
- end, BQS, lists:duplicate(ToDrop, const)),
- Fakes1 = sets:difference(Fakes, sets:from_list(Guids)),
- State #state { backing_queue_state = BQS1,
- fakes = Fakes1 };
- false ->
- State
+ true -> BQS1 = lists:foldl(
+ fun (const, BQSN) -> BQ:fetch(false, BQSN) end,
+ BQS, lists:duplicate(ToDrop, const)),
+ State #state { backing_queue_state = BQS1 };
+ false -> State
end};
-process_instruction(_Flush, {fetch, AckRequired, Guid, Remaining},
+process_instruction({fetch, AckRequired, Guid, Remaining},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
- guid_ack = GA,
- fakes = Fakes }) ->
+ guid_ack = GA }) ->
QLen = BQ:len(BQS),
{processed,
case QLen - 1 of
@@ -519,28 +481,22 @@ process_instruction(_Flush, {fetch, AckRequired, Guid, Remaining},
true -> dict:store(Guid, AckTag, GA);
false -> GA
end,
- Fakes1 = sets:del_element(Guid, Fakes),
State #state { backing_queue_state = BQS1,
- guid_ack = GA1,
- fakes = Fakes1 };
+ guid_ack = GA1 };
Other when Other < Remaining ->
%% we must be shorter than the master
- false = sets:is_element(Guid, Fakes), %% ASSERTION
State
end};
-process_instruction(_Flush, {ack, Guids},
+process_instruction({ack, Guids},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
- guid_ack = GA,
- fakes = Fakes }) ->
+ guid_ack = GA }) ->
{AckTags, GA1} = guids_to_acktags(Guids, GA),
{Guids1, BQS1} = BQ:ack(AckTags, BQS),
[] = Guids1 -- Guids, %% ASSERTION
- Fakes1 = sets:difference(Fakes, sets:from_list(Guids)),
{processed, State #state { guid_ack = GA1,
- backing_queue_state = BQS1,
- fakes = Fakes1 }};
-process_instruction(_Flush, {requeue, MsgPropsFun, Guids},
+ backing_queue_state = BQS1 }};
+process_instruction({requeue, MsgPropsFun, Guids},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
guid_ack = GA }) ->
@@ -559,7 +515,7 @@ process_instruction(_Flush, {requeue, MsgPropsFun, Guids},
State #state { guid_ack = dict:new(),
backing_queue_state = BQS2 }
end};
-process_instruction(_Flush, delete_and_terminate,
+process_instruction(delete_and_terminate,
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
BQ:delete_and_terminate(BQS),
@@ -578,14 +534,3 @@ guids_to_acktags(Guids, GA) ->
ack_all(BQ, GA, BQS) ->
BQ:ack([AckTag || {_Guid, AckTag} <- dict:to_list(GA)], BQS).
-
-fake_delivery(#amqqueue { name = QueueName }, Guid, ChPid) ->
- ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>),
- Msg = (rabbit_basic:message(ExchangeName, <<>>, #'P_basic'{}, <<>>))
- #basic_message { guid = Guid },
- #delivery { mandatory = false,
- immediate = false,
- txn = none,
- sender = ChPid,
- message = Msg,
- msg_seq_no = undefined }.