summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl65
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl69
-rw-r--r--src/rabbit_mirror_queue_master.erl28
-rw-r--r--src/rabbit_mirror_queue_slave.erl110
4 files changed, 206 insertions, 66 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 05de48d653..c537671eb0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -73,8 +73,8 @@
messages,
consumers,
memory,
- backing_queue_status,
- slave_pids
+ slave_pids,
+ backing_queue_status
]).
-define(CREATION_EVENT_KEYS,
@@ -84,10 +84,13 @@
auto_delete,
arguments,
owner_pid,
- mirror_nodes
+ mirror_nodes,
+ slave_pids,
+ synchronised_slave_pids
]).
--define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+-define(INFO_KEYS,
+ ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid, slave_pids]).
%%----------------------------------------------------------------------------
@@ -703,7 +706,40 @@ ensure_ttl_timer(State) ->
now_micros() -> timer:now_diff(now(), {0,0,0}).
-infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+infos(Items, State) ->
+ {Prefix, Items1} =
+ case lists:member(synchronised_slave_pids, Items) of
+ true -> Prefix1 = slaves_status(State),
+ case lists:member(slave_pids, Items) of
+ true -> {Prefix1, Items -- [slave_pids]};
+ false -> {proplists:delete(slave_pids, Prefix1), Items}
+ end;
+ false -> {[], Items}
+ end,
+ Prefix ++ [{Item, i(Item, State)}
+ || Item <- (Items1 -- [synchronised_slave_pids])].
+
+slaves_status(#q{q = #amqqueue{name = Name}}) ->
+ {ok, #amqqueue{mirror_nodes = MNodes, slave_pids = SPids}} =
+ rabbit_amqqueue:lookup(Name),
+ case MNodes of
+ undefined ->
+ [{slave_pids, ''}, {synchronised_slave_pids, ''}];
+ _ ->
+ {Results, _Bad} =
+ delegate:invoke(
+ SPids, fun (Pid) -> rabbit_mirror_queue_slave:info(Pid) end),
+ {SPids1, SSPids} =
+ lists:foldl(
+ fun ({Pid, Infos}, {SPidsN, SSPidsN}) ->
+ {[Pid | SPidsN],
+ case proplists:get_bool(is_synchronised, Infos) of
+ true -> [Pid | SSPidsN];
+ false -> SSPidsN
+ end}
+ end, {[], []}, Results),
+ [{slave_pids, SPids1}, {synchronised_slave_pids, SSPids}]
+ end.
i(name, #q{q = #amqqueue{name = Name}}) -> Name;
i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable;
@@ -735,14 +771,21 @@ i(consumers, State) ->
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
-i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
- BQ:status(BQS);
-i(slave_pids, #q{q = #amqqueue{name = Name}}) ->
- {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(Name),
- SPids;
i(mirror_nodes, #q{q = #amqqueue{name = Name}}) ->
{ok, #amqqueue{mirror_nodes = MNodes}} = rabbit_amqqueue:lookup(Name),
- MNodes;
+ case MNodes of
+ undefined -> '';
+ _ -> MNodes
+ end;
+i(slave_pids, #q{q = #amqqueue{name = Name}}) ->
+ {ok, #amqqueue{mirror_nodes = MNodes,
+ slave_pids = SPids}} = rabbit_amqqueue:lookup(Name),
+ case MNodes of
+ undefined -> '';
+ _ -> SPids
+ end;
+i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
+ BQ:status(BQS);
i(Item, _) ->
throw({bad_argument, Item}).
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index f6664a2722..0b9f053f29 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -16,7 +16,7 @@
-module(rabbit_mirror_queue_coordinator).
--export([start_link/3, get_gm/1, ensure_monitoring/2]).
+-export([start_link/4, get_gm/1, ensure_monitoring/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
@@ -32,15 +32,17 @@
-record(state, { q,
gm,
monitors,
- death_fun
+ death_fun,
+ length_fun
}).
-define(ONE_SECOND, 1000).
-ifdef(use_specs).
--spec(start_link/3 :: (rabbit_types:amqqueue(), pid() | 'undefined',
- rabbit_mirror_queue_master:death_fun()) ->
+-spec(start_link/4 :: (rabbit_types:amqqueue(), pid() | 'undefined',
+ rabbit_mirror_queue_master:death_fun(),
+ rabbit_mirror_queue_master:length_fun()) ->
rabbit_types:ok_pid_or_error()).
-spec(get_gm/1 :: (pid()) -> pid()).
-spec(ensure_monitoring/2 :: (pid(), [pid()]) -> 'ok').
@@ -138,9 +140,28 @@
%% state of the master. The detection of the sync-status of a slave is
%% done entirely based on length: if the slave and the master both
%% agree on the length of the queue after the fetch of the head of the
-%% queue, then the queues must be in sync. The only other possibility
-%% is that the slave's queue is shorter, and thus the fetch should be
-%% ignored.
+%% queue (or a 'set_length' results in a slave having to drop some
+%% messages from the head of its queue), then the queues must be in
+%% sync. The only other possibility is that the slave's queue is
+%% shorter, and thus the fetch should be ignored. In case slaves are
+%% joined to an empty queue which only goes on to receive publishes,
+%% they start by asking the master to broadcast its length. This is
+%% enough for slaves to always be able to work out when their head
+%% does not differ from the master (and is much simpler and cheaper
+%% than getting the master to hang on to the guid of the msg at the
+%% head of its queue). When a slave is promoted to a master, it
+%% unilaterally broadcasts its length, in order to solve the problem
+%% of length requests from new slaves being unanswered by a dead
+%% master.
+%%
+%% Obviously, due to the async nature of communication across gm, the
+%% slaves can fall behind. This does not matter from a sync pov: if
+%% they fall behind and the master dies then a) no publishes are lost
+%% because all publishes go to all mirrors anyway; b) the worst that
+%% happens is that acks get lost and so messages come back to
+%% life. This is no worse than normal given you never get confirmation
+%% that an ack has been received (not quite true with QoS-prefetch,
+%% but close enough for jazz).
%%
%% Because acktags are issued by the bq independently, and because
%% there is no requirement for the master and all slaves to use the
@@ -279,8 +300,8 @@
%%
%%----------------------------------------------------------------------------
-start_link(Queue, GM, DeathFun) ->
- gen_server2:start_link(?MODULE, [Queue, GM, DeathFun], []).
+start_link(Queue, GM, DeathFun, LengthFun) ->
+ gen_server2:start_link(?MODULE, [Queue, GM, DeathFun, LengthFun], []).
get_gm(CPid) ->
gen_server2:call(CPid, get_gm, infinity).
@@ -292,7 +313,7 @@ ensure_monitoring(CPid, Pids) ->
%% gen_server
%% ---------------------------------------------------------------------------
-init([#amqqueue { name = QueueName } = Q, GM, DeathFun]) ->
+init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) ->
GM1 = case GM of
undefined ->
{ok, GM2} = gm:start_link(QueueName, ?MODULE, [self()]),
@@ -306,10 +327,11 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun]) ->
end,
{ok, _TRef} =
timer:apply_interval(?ONE_SECOND, gm, broadcast, [GM1, heartbeat]),
- {ok, #state { q = Q,
- gm = GM1,
- monitors = dict:new(),
- death_fun = DeathFun },
+ {ok, #state { q = Q,
+ gm = GM1,
+ monitors = dict:new(),
+ death_fun = DeathFun,
+ length_fun = LengthFun },
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -329,6 +351,10 @@ handle_cast({gm_deaths, Deaths},
{stop, normal, State}
end;
+handle_cast(request_length, State = #state { length_fun = LengthFun }) ->
+ ok = LengthFun(),
+ noreply(State);
+
handle_cast({ensure_monitoring, Pids},
State = #state { monitors = Monitors }) ->
Monitors1 =
@@ -343,13 +369,12 @@ handle_cast({ensure_monitoring, Pids},
handle_info({'DOWN', _MonitorRef, process, Pid, _Reason},
State = #state { monitors = Monitors,
- death_fun = Fun }) ->
- noreply(
- case dict:is_key(Pid, Monitors) of
- false -> State;
- true -> ok = Fun(Pid),
- State #state { monitors = dict:erase(Pid, Monitors) }
- end);
+ death_fun = DeathFun }) ->
+ noreply(case dict:is_key(Pid, Monitors) of
+ false -> State;
+ true -> ok = DeathFun(Pid),
+ State #state { monitors = dict:erase(Pid, Monitors) }
+ end);
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
@@ -379,6 +404,8 @@ members_changed([CPid], _Births, Deaths) ->
handle_msg([_CPid], _From, heartbeat) ->
ok;
+handle_msg([CPid], _From, request_length = Msg) ->
+ ok = gen_server2:cast(CPid, Msg);
handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) ->
ok = gen_server2:cast(CPid, Msg);
handle_msg([_CPid], _From, _Msg) ->
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 532911f265..ad5fd28f83 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -25,7 +25,7 @@
-export([start/1, stop/0]).
--export([promote_backing_queue_state/6, sender_death_fun/0]).
+-export([promote_backing_queue_state/6, sender_death_fun/0, length_fun/0]).
-behaviour(rabbit_backing_queue).
@@ -44,9 +44,10 @@
-ifdef(use_specs).
--export_type([death_fun/0]).
+-export_type([death_fun/0, length_fun/0]).
-type(death_fun() :: fun ((pid()) -> 'ok')).
+-type(length_fun() :: fun (() -> 'ok')).
-type(master_state() :: #state { gm :: pid(),
coordinator :: pid(),
backing_queue :: atom(),
@@ -61,6 +62,7 @@
-spec(promote_backing_queue_state/6 ::
(pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()).
-spec(sender_death_fun/0 :: () -> death_fun()).
+-spec(length_fun/0 :: () -> length_fun()).
-endif.
@@ -83,7 +85,7 @@ stop() ->
init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
AsyncCallback) ->
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
- Q, undefined, sender_death_fun()),
+ Q, undefined, sender_death_fun(), length_fun()),
GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
MNodes1 =
(case MNodes of
@@ -94,6 +96,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
[rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1],
{ok, BQ} = application:get_env(backing_queue_module),
BQS = BQ:init(Q, Recover, AsyncCallback),
+ ok = gm:broadcast(GM, {length, BQ:len(BQS)}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -349,11 +352,13 @@ discard(Msg = #basic_message { id = MsgId }, ChPid,
%% ---------------------------------------------------------------------------
promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) ->
+ Len = BQ:len(BQS),
+ ok = gm:broadcast(GM, {length, Len}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = BQ:len(BQS),
+ set_delivered = Len,
seen_status = SeenStatus,
confirmed = [],
ack_msg_id = dict:new(),
@@ -371,9 +376,18 @@ sender_death_fun() ->
end)
end.
-%% ---------------------------------------------------------------------------
-%% Helpers
-%% ---------------------------------------------------------------------------
+length_fun() ->
+ Self = self(),
+ fun () ->
+ rabbit_amqqueue:run_backing_queue(
+ Self, ?MODULE,
+ fun (?MODULE, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:broadcast(GM, {length, BQ:len(BQS)}),
+ State
+ end)
+ end.
maybe_store_acktag(undefined, _MsgId, AM) ->
AM;
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index c918f388ea..fa46ad1157 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -33,7 +33,7 @@
%% All instructions from the GM group must be processed in the order
%% in which they're received.
--export([start_link/1, set_maximum_since_use/2]).
+-export([start_link/1, set_maximum_since_use/2, info/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, handle_pre_hibernate/1, prioritise_call/3,
@@ -47,6 +47,15 @@
-include("rabbit.hrl").
-include("gm_specs.hrl").
+-define(CREATION_EVENT_KEYS,
+ [pid,
+ name,
+ master_pid,
+ is_synchronised
+ ]).
+
+-define(INFO_KEYS, ?CREATION_EVENT_KEYS).
+
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
-define(DEATH_TIMEOUT, 20000). %% 20 seconds
@@ -64,7 +73,9 @@
ack_num,
msg_id_status,
- known_senders
+ known_senders,
+
+ synchronised
}).
start_link(Q) ->
@@ -73,6 +84,9 @@ start_link(Q) ->
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
+info(QPid) ->
+ gen_server2:call(QPid, info, infinity).
+
init([#amqqueue { name = QueueName } = Q]) ->
process_flag(trap_exit, true), %% amqqueue_process traps exits too.
{ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
@@ -95,26 +109,32 @@ init([#amqqueue { name = QueueName } = Q]) ->
end),
erlang:monitor(process, MPid),
ok = file_handle_cache:register_callback(
- rabbit_amqqueue, set_maximum_since_use, [self()]),
+ rabbit_amqqueue, set_maximum_since_use, [Self]),
ok = rabbit_memory_monitor:register(
- self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}),
+ Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}),
{ok, BQ} = application:get_env(backing_queue_module),
BQS = bq_init(BQ, Q, false),
- {ok, #state { q = Q,
- gm = GM,
- master_pid = MPid,
- backing_queue = BQ,
- backing_queue_state = BQS,
- rate_timer_ref = undefined,
- sync_timer_ref = undefined,
-
- sender_queues = dict:new(),
- msg_id_ack = dict:new(),
- ack_num = 0,
-
- msg_id_status = dict:new(),
- known_senders = dict:new()
- }, hibernate,
+ State = #state { q = Q,
+ gm = GM,
+ master_pid = MPid,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ rate_timer_ref = undefined,
+ sync_timer_ref = undefined,
+
+ sender_queues = dict:new(),
+ msg_id_ack = dict:new(),
+ ack_num = 0,
+
+ msg_id_status = dict:new(),
+ known_senders = dict:new(),
+
+ synchronised = false
+ },
+ rabbit_event:notify(queue_slave_created,
+ infos(?CREATION_EVENT_KEYS, State)),
+ ok = gm:broadcast(GM, request_length),
+ {ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) ->
@@ -166,7 +186,10 @@ handle_call({gm_deaths, Deaths}, From,
{error, not_found} ->
gen_server2:reply(From, ok),
{stop, normal, State}
- end.
+ end;
+
+handle_call(info, _From, State) ->
+ reply(infos(?INFO_KEYS, State), State).
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
@@ -295,6 +318,9 @@ members_changed([SPid], _Births, Deaths) ->
handle_msg([_SPid], _From, heartbeat) ->
ok;
+handle_msg([_SPid], _From, request_length) ->
+ %% This is only of value to the master
+ ok;
handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) ->
%% This is only of value to the master
ok;
@@ -319,6 +345,14 @@ inform_deaths(SPid, Deaths) ->
%% Others
%% ---------------------------------------------------------------------------
+infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+
+i(pid, _State) -> self();
+i(name, #state { q = #amqqueue { name = Name } }) -> Name;
+i(master_pid, #state { master_pid = MPid }) -> MPid;
+i(is_synchronised, #state { synchronised = Synchronised }) -> Synchronised;
+i(Item, _State) -> throw({bad_argument, Item}).
+
bq_init(BQ, Q, Recover) ->
Self = self(),
BQ:init(Q, Recover,
@@ -384,7 +418,7 @@ gb_trees_cons(Key, Value, Tree) ->
handle_process_result({ok, State}) -> noreply(State);
handle_process_result({stop, State}) -> {stop, normal, State}.
-promote_me(From, #state { q = Q,
+promote_me(From, #state { q = Q = #amqqueue { name = QName },
gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
@@ -392,13 +426,16 @@ promote_me(From, #state { q = Q,
sender_queues = SQ,
msg_id_ack = MA,
msg_id_status = MS,
- known_senders = KS }) ->
+ known_senders = KS,
+ master_pid = MPid}) ->
+ rabbit_event:notify(queue_slave_promoted, [{pid, self()},
+ {old_pid, MPid}]),
rabbit_log:info("Mirrored-queue (~s): Promoting slave ~s to master~n",
- [rabbit_misc:rs(Q #amqqueue.name),
- rabbit_misc:pid_to_string(self())]),
+ [rabbit_misc:rs(QName), rabbit_misc:pid_to_string(self())]),
Q1 = Q #amqqueue { pid = self() },
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
- Q1, GM, rabbit_mirror_queue_master:sender_death_fun()),
+ Q1, GM, rabbit_mirror_queue_master:sender_death_fun(),
+ rabbit_mirror_queue_master:length_fun()),
true = unlink(GM),
gen_server2:reply(From, {promote, CPid}),
ok = gm:confirmed_broadcast(GM, heartbeat),
@@ -749,7 +786,7 @@ process_instruction({set_length, Length},
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
ToDrop = QLen - Length,
- {ok, case ToDrop > 0 of
+ {ok, case ToDrop >= 0 of
true -> BQS1 =
lists:foldl(
fun (const, BQSN) ->
@@ -757,7 +794,8 @@ process_instruction({set_length, Length},
BQSN1} = BQ:fetch(false, BQSN),
BQSN1
end, BQS, lists:duplicate(ToDrop, const)),
- State #state { backing_queue_state = BQS1 };
+ set_synchronised(
+ true, State #state { backing_queue_state = BQS1 });
false -> State
end};
process_instruction({fetch, AckRequired, MsgId, Remaining},
@@ -770,6 +808,8 @@ process_instruction({fetch, AckRequired, MsgId, Remaining},
AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
maybe_store_ack(AckRequired, MsgId, AckTag,
State #state { backing_queue_state = BQS1 });
+ Other when Other + 1 =:= Remaining ->
+ set_synchronised(true, State);
Other when Other < Remaining ->
%% we must be shorter than the master
State
@@ -822,6 +862,10 @@ process_instruction({sender_death, ChPid},
msg_id_status = MS1,
known_senders = dict:erase(ChPid, KS) }
end};
+process_instruction({length, Length},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {ok, set_synchronised(Length =:= BQ:len(BQS), State)};
process_instruction({delete_and_terminate, Reason},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -849,3 +893,15 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
ack_num = Num }) ->
State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
ack_num = Num + 1 }.
+
+%% We intentionally leave out the head where a slave becomes
+%% unsynchronised: we assert that can never happen.
+set_synchronised(true, State = #state { master_pid = MasterPid,
+ synchronised = false }) ->
+ rabbit_event:notify(queue_slave_synchronised, [{master_pid, MasterPid},
+ {pid, self()}]),
+ State #state { synchronised = true };
+set_synchronised(true, State) ->
+ State;
+set_synchronised(false, State = #state { synchronised = false }) ->
+ State.