summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-12-17 00:31:15 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2010-12-17 00:31:15 +0000
commit91e9438fb63cab946e8dde6af72cdc2d81c9bb1b (patch)
treeb35d877f260a35aa8673592c90a0368c3373ec9a /src
parentfaa35a799468acc38a014d781e535d08f1b97f35 (diff)
downloadrabbitmq-server-git-91e9438fb63cab946e8dde6af72cdc2d81c9bb1b.tar.gz
Support maybe_run_queue_via_backing_queue in the slaves, and add some comments about where to deal with confirmations. I think. Assuming my understanding of pubacks is right.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_mirror_queue_master.erl2
-rw-r--r--src/rabbit_mirror_queue_slave.erl25
2 files changed, 23 insertions, 4 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 2299c3d17b..0d64ab8e77 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -52,7 +52,7 @@ stop() ->
%% Same as start/1.
exit({not_valid_for_generic_backing_queue, ?MODULE}).
-init(#amqqueue { arguments = Args, durable = false } = Q, Recover) ->
+init(#amqqueue { arguments = Args } = Q, Recover) ->
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, undefined),
GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
{_Type, Nodes} = rabbit_misc:table_lookup(Args, <<"x-mirror">>),
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index a9429ab80f..ac49b10b29 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -86,7 +86,9 @@
sender_queues, %% :: Pid -> MsgQ
guid_ack, %% :: Guid -> AckTag
- instructions %% :: InstrQ
+ instructions, %% :: InstrQ
+
+ guid_to_channel %% for confirms
}).
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
@@ -138,7 +140,9 @@ init([#amqqueue { name = QueueName } = Q]) ->
sender_queues = dict:new(),
guid_ack = dict:new(),
- instructions = queue:new()
+ instructions = queue:new(),
+
+ guid_to_channel = dict:new()
}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
?DESIRED_HIBERNATE}};
@@ -172,8 +176,14 @@ handle_call({gm_deaths, Deaths}, From,
gen_server2:reply(From, ok),
ok = gm:broadcast(GM, heartbeat),
noreply(State #state { master_node = MNode1 })
- end.
+ end;
+handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
+ reply(ok, maybe_run_queue_via_backing_queue(Fun, State)).
+
+
+handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) ->
+ noreply(maybe_run_queue_via_backing_queue(Fun, State));
handle_cast({gm, Instruction}, State = #state { instructions = InstrQ }) ->
State1 = State #state { instructions = queue:in(Instruction, InstrQ) },
@@ -271,6 +281,12 @@ handle_msg([SPid], _From, Msg) ->
%% Others
%% ---------------------------------------------------------------------------
+maybe_run_queue_via_backing_queue(
+ Fun, State = #state { backing_queue_state = BQS }) ->
+ %% TODO: some CONFIRM-like thing with these Guids
+ {_Guids, BQS1} = Fun(BQS),
+ State #state { backing_queue_state = BQS1 }.
+
handle_process_result({continue, State}) -> noreply(State);
handle_process_result({stop, State}) -> {stop, normal, State}.
@@ -380,6 +396,7 @@ process_instruction({publish, Deliver, Guid, MsgProps, ChPid},
{processed,
case Deliver of
false ->
+ %% RECORD CONFIRM - modify MsgProps
BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
State1 #state {backing_queue_state = BQS1 };
{true, AckRequired} ->
@@ -439,6 +456,7 @@ process_instruction({ack, Guids},
{AckTags, GA1} = guids_to_acktags(Guids, GA),
{Guids1, BQS1} = BQ:ack(AckTags, BQS),
[] = Guids1 -- Guids, %% ASSERTION
+ %% CONFIRM - persistent but delivered faster than disk sync
{processed, State #state { guid_ack = GA1,
backing_queue_state = BQS1 }};
process_instruction({requeue, MsgPropsFun, Guids},
@@ -457,6 +475,7 @@ process_instruction({requeue, MsgPropsFun, Guids},
%% GA
{_Count, BQS1} = BQ:purge(BQS),
{Guids, BQS2} = ack_all(BQ, GA, BQS1),
+ %% CONFIRM these Guids
State #state { guid_ack = dict:new(),
backing_queue_state = BQS2 }
end};