summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-07-14 13:30:30 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-07-14 13:30:30 +0100
commit21ff1e85d2a49ae8ab2fb899954fe6340658bc33 (patch)
tree1018c5e57571a506b3194c4edfe635784b683b46
parent07485be0b7890d6645239d2cd1fc8794e9232390 (diff)
downloadrabbitmq-server-git-21ff1e85d2a49ae8ab2fb899954fe6340658bc33.tar.gz
rework
-rw-r--r--docs/rabbitmqctl.1.xml10
-rw-r--r--src/rabbit_amqqueue_process.erl55
-rw-r--r--src/rabbit_control.erl5
-rw-r--r--src/rabbit_mirror_queue_master.erl4
-rw-r--r--src/rabbit_mirror_queue_slave.erl4
5 files changed, 54 insertions, 24 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 6bc9a4be82..74041969c7 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -862,11 +862,15 @@
</varlistentry>
<varlistentry>
<term>mirror_nodes</term>
- <listitem><para>If the queue is mirrored, the nodes upon which mirrors will be present if the nodes are part of the current cluster.</para></listitem>
+ <listitem><para>If the queue is mirrored, this provides the names of the nodes upon which mirrors will be present should those nodes be part of the current cluster (i.e. it may contain node names that are not currently part of the cluster).</para></listitem>
</varlistentry>
<varlistentry>
- <term>slaves</term>
- <listitem><para>If the queue is mirrored, this gives the status of slaves.</para></listitem>
+ <term>slave_pids</term>
+ <listitem><para>If the queue is mirrored, this gives the IDs of the current slaves.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>synchronised_slave_pids</term>
+ <listitem><para>If the queue is mirrored, this gives the IDs of the current slaves which are synchronised with the master.</para></listitem>
</varlistentry>
</variablelist>
<para>
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 9fe13d0da8..a8bb83121d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -73,7 +73,7 @@
messages,
consumers,
memory,
- slaves,
+ slave_pids,
backing_queue_status
]).
@@ -84,10 +84,13 @@
auto_delete,
arguments,
owner_pid,
- mirror_nodes
+ mirror_nodes,
+ slave_pids,
+ synchronised_slave_pids
]).
--define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+-define(INFO_KEYS,
+ ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid, slave_pids]).
%%----------------------------------------------------------------------------
@@ -709,7 +712,40 @@ ensure_ttl_timer(State) ->
now_micros() -> timer:now_diff(now(), {0,0,0}).
-infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+infos(Items, State) ->
+ {Prefix, Items1} =
+ case lists:member(synchronised_slave_pids, Items) of
+ true -> Prefix1 = slaves_status(State),
+ case lists:member(slave_pids, Items) of
+ true -> {Prefix1, Items -- [slave_pids]};
+ false -> {proplists:delete(slave_pids, Prefix1), Items}
+ end;
+ false -> {[], Items}
+ end,
+ Prefix ++ [{Item, i(Item, State)}
+ || Item <- (Items1 -- [synchronised_slave_pids])].
+
+slaves_status(#q{q = #amqqueue{name = Name}}) ->
+ {ok, #amqqueue{mirror_nodes = MNodes, slave_pids = SPids}} =
+ rabbit_amqqueue:lookup(Name),
+ case MNodes of
+ undefined ->
+ [{slave_pids, ''}, {synchronised_slave_pids, ''}];
+ _ ->
+ {Results, _Bad} =
+ delegate:invoke(
+ SPids, fun (Pid) -> rabbit_mirror_queue_slave:info(Pid) end),
+ {SPids1, SSPids} =
+ lists:foldl(
+ fun ({Pid, Infos}, {SPidsN, SSPidsN}) ->
+ {[Pid | SPidsN],
+ case proplists:get_bool(is_synchronised, Infos) of
+ true -> [Pid | SSPidsN];
+ false -> SSPidsN
+ end}
+ end, {[], []}, Results),
+ [{slave_pids, SPids1}, {synchronised_slave_pids, SSPids}]
+ end.
i(name, #q{q = #amqqueue{name = Name}}) -> Name;
i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable;
@@ -747,17 +783,12 @@ i(mirror_nodes, #q{q = #amqqueue{name = Name}}) ->
undefined -> '';
_ -> MNodes
end;
-i(slaves, #q{q = #amqqueue{name = Name}}) ->
+i(slave_pids, #q{q = #amqqueue{name = Name}}) ->
{ok, #amqqueue{mirror_nodes = MNodes,
slave_pids = SPids}} = rabbit_amqqueue:lookup(Name),
case MNodes of
- undefined ->
- '';
- _ ->
- {Results, _Bad} =
- delegate:invoke(
- SPids, fun (Pid) -> rabbit_mirror_queue_slave:info(Pid) end),
- [Result || {_Pid, Result} <- Results]
+ undefined -> '';
+ _ -> SPids
end;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 548ad7fa9a..6eb1aaba9a 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -419,11 +419,6 @@ format_info_item([T | _] = Value)
"[" ++
lists:nthtail(2, lists:append(
[", " ++ format_info_item(E) || E <- Value])) ++ "]";
-format_info_item(Value) when is_tuple(Value) ->
- List = tuple_to_list(Value),
- "{" ++
- lists:nthtail(2, lists:append(
- [", " ++ format_info_item(E) || E <- List])) ++ "}";
format_info_item(Value) ->
io_lib:format("~w", [Value]).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index a9bfe731b3..c4bb43cb81 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -301,7 +301,7 @@ is_duplicate(Message = #basic_message { id = MsgId },
error ->
%% We permit the underlying BQ to have a peek at it, but
%% only if we ourselves are not filtering out the msg.
- {Result, BQS1} = BQ:is_duplicate(none, Message, BQS),
+ {Result, BQS1} = BQ:is_duplicate(Message, BQS),
{Result, State #state { backing_queue_state = BQS1 }};
{ok, published} ->
%% It already got published when we were a slave and no
@@ -379,7 +379,7 @@ sender_death_fun() ->
length_fun() ->
Self = self(),
fun () ->
- rabbit_amqqueue:run_backing_queue_async(
+ rabbit_amqqueue:run_backing_queue(
Self, ?MODULE,
fun (?MODULE, State = #state { gm = GM,
backing_queue = BQ,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 89790c1797..45899d5020 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -47,12 +47,12 @@
-include("rabbit.hrl").
-include("gm_specs.hrl").
--define(STATISTICS_KEYS,
+-define(CREATION_EVENT_KEYS,
[pid,
is_synchronised
]).
--define(INFO_KEYS, ?STATISTICS_KEYS).
+-define(INFO_KEYS, ?CREATION_EVENT_KEYS).
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).