summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/rabbitmqctl.1.xml10
-rw-r--r--src/rabbit_amqqueue_process.erl55
-rw-r--r--src/rabbit_control.erl5
-rw-r--r--src/rabbit_mirror_queue_master.erl4
-rw-r--r--src/rabbit_mirror_queue_slave.erl4
5 files changed, 54 insertions, 24 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 6bc9a4be82..74041969c7 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -862,11 +862,15 @@
</varlistentry>
<varlistentry>
<term>mirror_nodes</term>
- <listitem><para>If the queue is mirrored, the nodes upon which mirrors will be present if the nodes are part of the current cluster.</para></listitem>
+ <listitem><para>If the queue is mirrored, this provides the names of the nodes upon which mirrors will be present should those nodes be part of the current cluster (i.e. it may contain node names that are not currently part of the cluster).</para></listitem>
</varlistentry>
<varlistentry>
- <term>slaves</term>
- <listitem><para>If the queue is mirrored, this gives the status of slaves.</para></listitem>
+ <term>slave_pids</term>
+ <listitem><para>If the queue is mirrored, this gives the IDs of the current slaves.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>synchronised_slave_pids</term>
+ <listitem><para>If the queue is mirrored, this gives the IDs of the current slaves which are synchronised with the master.</para></listitem>
</varlistentry>
</variablelist>
<para>
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 9fe13d0da8..a8bb83121d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -73,7 +73,7 @@
messages,
consumers,
memory,
- slaves,
+ slave_pids,
backing_queue_status
]).
@@ -84,10 +84,13 @@
auto_delete,
arguments,
owner_pid,
- mirror_nodes
+ mirror_nodes,
+ slave_pids,
+ synchronised_slave_pids
]).
--define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+-define(INFO_KEYS,
+ ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid, slave_pids]).
%%----------------------------------------------------------------------------
@@ -709,7 +712,40 @@ ensure_ttl_timer(State) ->
now_micros() -> timer:now_diff(now(), {0,0,0}).
-infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+infos(Items, State) ->
+ {Prefix, Items1} =
+ case lists:member(synchronised_slave_pids, Items) of
+ true -> Prefix1 = slaves_status(State),
+ case lists:member(slave_pids, Items) of
+ true -> {Prefix1, Items -- [slave_pids]};
+ false -> {proplists:delete(slave_pids, Prefix1), Items}
+ end;
+ false -> {[], Items}
+ end,
+ Prefix ++ [{Item, i(Item, State)}
+ || Item <- (Items1 -- [synchronised_slave_pids])].
+
+slaves_status(#q{q = #amqqueue{name = Name}}) ->
+ {ok, #amqqueue{mirror_nodes = MNodes, slave_pids = SPids}} =
+ rabbit_amqqueue:lookup(Name),
+ case MNodes of
+ undefined ->
+ [{slave_pids, ''}, {synchronised_slave_pids, ''}];
+ _ ->
+ {Results, _Bad} =
+ delegate:invoke(
+ SPids, fun (Pid) -> rabbit_mirror_queue_slave:info(Pid) end),
+ {SPids1, SSPids} =
+ lists:foldl(
+ fun ({Pid, Infos}, {SPidsN, SSPidsN}) ->
+ {[Pid | SPidsN],
+ case proplists:get_bool(is_synchronised, Infos) of
+ true -> [Pid | SSPidsN];
+ false -> SSPidsN
+ end}
+ end, {[], []}, Results),
+ [{slave_pids, SPids1}, {synchronised_slave_pids, SSPids}]
+ end.
i(name, #q{q = #amqqueue{name = Name}}) -> Name;
i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable;
@@ -747,17 +783,12 @@ i(mirror_nodes, #q{q = #amqqueue{name = Name}}) ->
undefined -> '';
_ -> MNodes
end;
-i(slaves, #q{q = #amqqueue{name = Name}}) ->
+i(slave_pids, #q{q = #amqqueue{name = Name}}) ->
{ok, #amqqueue{mirror_nodes = MNodes,
slave_pids = SPids}} = rabbit_amqqueue:lookup(Name),
case MNodes of
- undefined ->
- '';
- _ ->
- {Results, _Bad} =
- delegate:invoke(
- SPids, fun (Pid) -> rabbit_mirror_queue_slave:info(Pid) end),
- [Result || {_Pid, Result} <- Results]
+ undefined -> '';
+ _ -> SPids
end;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 548ad7fa9a..6eb1aaba9a 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -419,11 +419,6 @@ format_info_item([T | _] = Value)
"[" ++
lists:nthtail(2, lists:append(
[", " ++ format_info_item(E) || E <- Value])) ++ "]";
-format_info_item(Value) when is_tuple(Value) ->
- List = tuple_to_list(Value),
- "{" ++
- lists:nthtail(2, lists:append(
- [", " ++ format_info_item(E) || E <- List])) ++ "}";
format_info_item(Value) ->
io_lib:format("~w", [Value]).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index a9bfe731b3..c4bb43cb81 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -301,7 +301,7 @@ is_duplicate(Message = #basic_message { id = MsgId },
error ->
%% We permit the underlying BQ to have a peek at it, but
%% only if we ourselves are not filtering out the msg.
- {Result, BQS1} = BQ:is_duplicate(none, Message, BQS),
+ {Result, BQS1} = BQ:is_duplicate(Message, BQS),
{Result, State #state { backing_queue_state = BQS1 }};
{ok, published} ->
%% It already got published when we were a slave and no
@@ -379,7 +379,7 @@ sender_death_fun() ->
length_fun() ->
Self = self(),
fun () ->
- rabbit_amqqueue:run_backing_queue_async(
+ rabbit_amqqueue:run_backing_queue(
Self, ?MODULE,
fun (?MODULE, State = #state { gm = GM,
backing_queue = BQ,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 89790c1797..45899d5020 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -47,12 +47,12 @@
-include("rabbit.hrl").
-include("gm_specs.hrl").
--define(STATISTICS_KEYS,
+-define(CREATION_EVENT_KEYS,
[pid,
is_synchronised
]).
--define(INFO_KEYS, ?STATISTICS_KEYS).
+-define(INFO_KEYS, ?CREATION_EVENT_KEYS).
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).