summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-07-14 13:30:30 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-07-14 13:30:30 +0100
commit21ff1e85d2a49ae8ab2fb899954fe6340658bc33 (patch)
tree1018c5e57571a506b3194c4edfe635784b683b46 /src
parent07485be0b7890d6645239d2cd1fc8794e9232390 (diff)
downloadrabbitmq-server-git-21ff1e85d2a49ae8ab2fb899954fe6340658bc33.tar.gz
rework
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl55
-rw-r--r--src/rabbit_control.erl5
-rw-r--r--src/rabbit_mirror_queue_master.erl4
-rw-r--r--src/rabbit_mirror_queue_slave.erl4
4 files changed, 47 insertions, 21 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 9fe13d0da8..a8bb83121d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -73,7 +73,7 @@
messages,
consumers,
memory,
- slaves,
+ slave_pids,
backing_queue_status
]).
@@ -84,10 +84,13 @@
auto_delete,
arguments,
owner_pid,
- mirror_nodes
+ mirror_nodes,
+ slave_pids,
+ synchronised_slave_pids
]).
--define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
+-define(INFO_KEYS,
+ ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid, slave_pids]).
%%----------------------------------------------------------------------------
@@ -709,7 +712,40 @@ ensure_ttl_timer(State) ->
now_micros() -> timer:now_diff(now(), {0,0,0}).
-infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
+infos(Items, State) ->
+ {Prefix, Items1} =
+ case lists:member(synchronised_slave_pids, Items) of
+ true -> Prefix1 = slaves_status(State),
+ case lists:member(slave_pids, Items) of
+ true -> {Prefix1, Items -- [slave_pids]};
+ false -> {proplists:delete(slave_pids, Prefix1), Items}
+ end;
+ false -> {[], Items}
+ end,
+ Prefix ++ [{Item, i(Item, State)}
+ || Item <- (Items1 -- [synchronised_slave_pids])].
+
+slaves_status(#q{q = #amqqueue{name = Name}}) ->
+ {ok, #amqqueue{mirror_nodes = MNodes, slave_pids = SPids}} =
+ rabbit_amqqueue:lookup(Name),
+ case MNodes of
+ undefined ->
+ [{slave_pids, ''}, {synchronised_slave_pids, ''}];
+ _ ->
+ {Results, _Bad} =
+ delegate:invoke(
+ SPids, fun (Pid) -> rabbit_mirror_queue_slave:info(Pid) end),
+ {SPids1, SSPids} =
+ lists:foldl(
+ fun ({Pid, Infos}, {SPidsN, SSPidsN}) ->
+ {[Pid | SPidsN],
+ case proplists:get_bool(is_synchronised, Infos) of
+ true -> [Pid | SSPidsN];
+ false -> SSPidsN
+ end}
+ end, {[], []}, Results),
+ [{slave_pids, SPids1}, {synchronised_slave_pids, SSPids}]
+ end.
i(name, #q{q = #amqqueue{name = Name}}) -> Name;
i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable;
@@ -747,17 +783,12 @@ i(mirror_nodes, #q{q = #amqqueue{name = Name}}) ->
undefined -> '';
_ -> MNodes
end;
-i(slaves, #q{q = #amqqueue{name = Name}}) ->
+i(slave_pids, #q{q = #amqqueue{name = Name}}) ->
{ok, #amqqueue{mirror_nodes = MNodes,
slave_pids = SPids}} = rabbit_amqqueue:lookup(Name),
case MNodes of
- undefined ->
- '';
- _ ->
- {Results, _Bad} =
- delegate:invoke(
- SPids, fun (Pid) -> rabbit_mirror_queue_slave:info(Pid) end),
- [Result || {_Pid, Result} <- Results]
+ undefined -> '';
+ _ -> SPids
end;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 548ad7fa9a..6eb1aaba9a 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -419,11 +419,6 @@ format_info_item([T | _] = Value)
"[" ++
lists:nthtail(2, lists:append(
[", " ++ format_info_item(E) || E <- Value])) ++ "]";
-format_info_item(Value) when is_tuple(Value) ->
- List = tuple_to_list(Value),
- "{" ++
- lists:nthtail(2, lists:append(
- [", " ++ format_info_item(E) || E <- List])) ++ "}";
format_info_item(Value) ->
io_lib:format("~w", [Value]).
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index a9bfe731b3..c4bb43cb81 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -301,7 +301,7 @@ is_duplicate(Message = #basic_message { id = MsgId },
error ->
%% We permit the underlying BQ to have a peek at it, but
%% only if we ourselves are not filtering out the msg.
- {Result, BQS1} = BQ:is_duplicate(none, Message, BQS),
+ {Result, BQS1} = BQ:is_duplicate(Message, BQS),
{Result, State #state { backing_queue_state = BQS1 }};
{ok, published} ->
%% It already got published when we were a slave and no
@@ -379,7 +379,7 @@ sender_death_fun() ->
length_fun() ->
Self = self(),
fun () ->
- rabbit_amqqueue:run_backing_queue_async(
+ rabbit_amqqueue:run_backing_queue(
Self, ?MODULE,
fun (?MODULE, State = #state { gm = GM,
backing_queue = BQ,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 89790c1797..45899d5020 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -47,12 +47,12 @@
-include("rabbit.hrl").
-include("gm_specs.hrl").
--define(STATISTICS_KEYS,
+-define(CREATION_EVENT_KEYS,
[pid,
is_synchronised
]).
--define(INFO_KEYS, ?STATISTICS_KEYS).
+-define(INFO_KEYS, ?CREATION_EVENT_KEYS).
-define(SYNC_INTERVAL, 25). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).