diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-07-14 13:30:30 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-07-14 13:30:30 +0100 |
| commit | 21ff1e85d2a49ae8ab2fb899954fe6340658bc33 (patch) | |
| tree | 1018c5e57571a506b3194c4edfe635784b683b46 /src | |
| parent | 07485be0b7890d6645239d2cd1fc8794e9232390 (diff) | |
| download | rabbitmq-server-git-21ff1e85d2a49ae8ab2fb899954fe6340658bc33.tar.gz | |
rework
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 55 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 4 |
4 files changed, 47 insertions, 21 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 9fe13d0da8..a8bb83121d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -73,7 +73,7 @@ messages, consumers, memory, - slaves, + slave_pids, backing_queue_status ]). @@ -84,10 +84,13 @@ auto_delete, arguments, owner_pid, - mirror_nodes + mirror_nodes, + slave_pids, + synchronised_slave_pids ]). --define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). +-define(INFO_KEYS, + ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid, slave_pids]). %%---------------------------------------------------------------------------- @@ -709,7 +712,40 @@ ensure_ttl_timer(State) -> now_micros() -> timer:now_diff(now(), {0,0,0}). -infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. +infos(Items, State) -> + {Prefix, Items1} = + case lists:member(synchronised_slave_pids, Items) of + true -> Prefix1 = slaves_status(State), + case lists:member(slave_pids, Items) of + true -> {Prefix1, Items -- [slave_pids]}; + false -> {proplists:delete(slave_pids, Prefix1), Items} + end; + false -> {[], Items} + end, + Prefix ++ [{Item, i(Item, State)} + || Item <- (Items1 -- [synchronised_slave_pids])]. + +slaves_status(#q{q = #amqqueue{name = Name}}) -> + {ok, #amqqueue{mirror_nodes = MNodes, slave_pids = SPids}} = + rabbit_amqqueue:lookup(Name), + case MNodes of + undefined -> + [{slave_pids, ''}, {synchronised_slave_pids, ''}]; + _ -> + {Results, _Bad} = + delegate:invoke( + SPids, fun (Pid) -> rabbit_mirror_queue_slave:info(Pid) end), + {SPids1, SSPids} = + lists:foldl( + fun ({Pid, Infos}, {SPidsN, SSPidsN}) -> + {[Pid | SPidsN], + case proplists:get_bool(is_synchronised, Infos) of + true -> [Pid | SSPidsN]; + false -> SSPidsN + end} + end, {[], []}, Results), + [{slave_pids, SPids1}, {synchronised_slave_pids, SSPids}] + end. i(name, #q{q = #amqqueue{name = Name}}) -> Name; i(durable, #q{q = #amqqueue{durable = Durable}}) -> Durable; @@ -747,17 +783,12 @@ i(mirror_nodes, #q{q = #amqqueue{name = Name}}) -> undefined -> ''; _ -> MNodes end; -i(slaves, #q{q = #amqqueue{name = Name}}) -> +i(slave_pids, #q{q = #amqqueue{name = Name}}) -> {ok, #amqqueue{mirror_nodes = MNodes, slave_pids = SPids}} = rabbit_amqqueue:lookup(Name), case MNodes of - undefined -> - ''; - _ -> - {Results, _Bad} = - delegate:invoke( - SPids, fun (Pid) -> rabbit_mirror_queue_slave:info(Pid) end), - [Result || {_Pid, Result} <- Results] + undefined -> ''; + _ -> SPids end; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 548ad7fa9a..6eb1aaba9a 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -419,11 +419,6 @@ format_info_item([T | _] = Value) "[" ++ lists:nthtail(2, lists:append( [", " ++ format_info_item(E) || E <- Value])) ++ "]"; -format_info_item(Value) when is_tuple(Value) -> - List = tuple_to_list(Value), - "{" ++ - lists:nthtail(2, lists:append( - [", " ++ format_info_item(E) || E <- List])) ++ "}"; format_info_item(Value) -> io_lib:format("~w", [Value]). diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index a9bfe731b3..c4bb43cb81 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -301,7 +301,7 @@ is_duplicate(Message = #basic_message { id = MsgId }, error -> %% We permit the underlying BQ to have a peek at it, but %% only if we ourselves are not filtering out the msg. - {Result, BQS1} = BQ:is_duplicate(none, Message, BQS), + {Result, BQS1} = BQ:is_duplicate(Message, BQS), {Result, State #state { backing_queue_state = BQS1 }}; {ok, published} -> %% It already got published when we were a slave and no @@ -379,7 +379,7 @@ sender_death_fun() -> length_fun() -> Self = self(), fun () -> - rabbit_amqqueue:run_backing_queue_async( + rabbit_amqqueue:run_backing_queue( Self, ?MODULE, fun (?MODULE, State = #state { gm = GM, backing_queue = BQ, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 89790c1797..45899d5020 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -47,12 +47,12 @@ -include("rabbit.hrl"). -include("gm_specs.hrl"). --define(STATISTICS_KEYS, +-define(CREATION_EVENT_KEYS, [pid, is_synchronised ]). --define(INFO_KEYS, ?STATISTICS_KEYS). +-define(INFO_KEYS, ?CREATION_EVENT_KEYS). -define(SYNC_INTERVAL, 25). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). |
