summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl8
-rw-r--r--src/rabbit_amqqueue_process.erl18
-rw-r--r--src/rabbit_channel.erl24
-rw-r--r--src/rabbit_networking.erl3
-rw-r--r--src/rabbit_reader.erl11
-rw-r--r--src/rabbit_trace.erl2
6 files changed, 32 insertions, 34 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 0aa8921f86..fd7f86f5ab 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -361,6 +361,9 @@ check_ha_policy_argument({longstr, Policy}, _Args) ->
check_ha_policy_argument({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
+list() ->
+ mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}).
+
list(VHostPath) ->
mnesia:dirty_match_object(
rabbit_queue,
@@ -384,9 +387,8 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
force_event_refresh() ->
- [map(VHost, fun(Q) -> delegate_cast(Q#amqqueue.pid,
- force_event_refresh) end) ||
- VHost <- rabbit_vhost:list()].
+ [gen_server2:cast(Q#amqqueue.pid, force_event_refresh) || Q <- list()],
+ ok.
consumers(#amqqueue{ pid = QPid }) ->
delegate_call(QPid, consumers).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e67787aa41..06ba1177bf 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -760,14 +760,6 @@ emit_stats(State) ->
emit_stats(State, Extra) ->
rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)).
-emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) ->
- emit_consumer_event(ChPid, ConsumerTag, Exclusive, AckRequired,
- consumer_created).
-
-emit_consumer_exists(ChPid, ConsumerTag, Exclusive, AckRequired) ->
- emit_consumer_event(ChPid, ConsumerTag, Exclusive, AckRequired,
- consumer_exists).
-
emit_consumer_event(ChPid, ConsumerTag, Exclusive, AckRequired, Type) ->
rabbit_event:notify(Type,
[{consumer_tag, ConsumerTag},
@@ -943,8 +935,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid,
add_consumer(ChPid, Consumer,
State1#q.active_consumers)})
end,
- emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck),
+ emit_consumer_event(ChPid, ConsumerTag, ExclusiveConsume,
+ not NoAck, consumer_created),
reply(ok, State2)
end;
@@ -1098,10 +1090,12 @@ handle_cast({set_maximum_since_use, Age}, State) ->
handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) ->
rabbit_event:notify(queue_exists, infos(?CREATION_EVENT_KEYS, State)),
case Exclusive of
- none -> [emit_consumer_exists(Ch, CTag, false, AckRequired) ||
+ none -> [emit_consumer_event(Ch, CTag, false, AckRequired,
+ consumer_exists) ||
{Ch, CTag, AckRequired} <- consumers(State)];
{Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State),
- emit_consumer_exists(Ch, CTag, true, AckRequired)
+ emit_consumer_event(Ch, CTag, true, AckRequired,
+ consumer_exists)
end,
noreply(State).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f49f2e20e1..f887afec5c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -22,13 +22,15 @@
-export([start_link/10, do/2, do/3, flush/1, shutdown/1]).
-export([send_command/2, deliver/4, flushed/2, confirm/2]).
--export([list_local/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
--export([refresh_config_all/0, ready_for_close/1]).
+-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
+-export([refresh_config_local/0, ready_for_close/1]).
-export([force_event_refresh/0]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
prioritise_cast/2, prioritise_info/2, format_message_queue/2]).
+%% Internal
+-export([list_local/0]).
-record(ch, {state, protocol, channel, reader_pid, writer_pid, conn_pid,
limiter_pid, start_limiter_fun, tx_status, next_tag,
@@ -85,13 +87,14 @@
-> 'ok').
-spec(flushed/2 :: (pid(), pid()) -> 'ok').
-spec(confirm/2 ::(pid(), [non_neg_integer()]) -> 'ok').
+-spec(list/0 :: () -> [pid()]).
-spec(list_local/0 :: () -> [pid()]).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(info/1 :: (pid()) -> rabbit_types:infos()).
-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
-spec(info_all/0 :: () -> [rabbit_types:infos()]).
-spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]).
--spec(refresh_config_all/0 :: () -> 'ok').
+-spec(refresh_config_local/0 :: () -> 'ok').
-spec(ready_for_close/1 :: (pid()) -> 'ok').
-spec(force_event_refresh/0 :: () -> 'ok').
@@ -129,12 +132,12 @@ flushed(Pid, QPid) ->
confirm(Pid, MsgSeqNos) ->
gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}).
-list_local() ->
- pg_local:get_members(rabbit_channels).
-
list() ->
rabbit_misc:rpc_list_all_nodes(rabbit_channel, list_local, []).
+list_local() ->
+ pg_local:get_members(rabbit_channels).
+
info_keys() -> ?INFO_KEYS.
info(Pid) ->
@@ -152,7 +155,7 @@ info_all() ->
info_all(Items) ->
rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()).
-refresh_config_all() ->
+refresh_config_local() ->
rabbit_misc:upmap(
fun (C) -> gen_server2:call(C, refresh_config) end, list_local()),
ok.
@@ -161,10 +164,9 @@ ready_for_close(Pid) ->
gen_server2:cast(Pid, ready_for_close).
force_event_refresh() ->
- rabbit_misc:filter_exit_map(fun (C) -> force_event_refresh(C) end, list()).
-
-force_event_refresh(Pid) ->
- gen_server2:cast(Pid, force_event_refresh).
+ rabbit_misc:filter_exit_map(
+ fun (C) -> gen_server2:cast(C, force_event_refresh) end, list()),
+ ok.
%%---------------------------------------------------------------------------
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index a75a5fc0e0..a463649791 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -292,7 +292,8 @@ close_connection(Pid, Explanation) ->
end.
force_connection_event_refresh() ->
- cmap(fun (C) -> rabbit_reader:force_event_refresh(C) end).
+ [rabbit_reader:force_event_refresh(C) || C <- connections()],
+ ok.
%%--------------------------------------------------------------------
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 2fc0b9cdc5..8a1a77686b 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -18,7 +18,8 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/3, info_keys/0, info/1, info/2, shutdown/2]).
+-export([start_link/3, info_keys/0, info/1, info/2, force_event_refresh/1,
+ shutdown/2]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
@@ -27,7 +28,6 @@
-export([conserve_memory/2, server_properties/1]).
-export([process_channel_frame/5]). %% used by erlang-client
--export([force_event_refresh/1]).
-define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
@@ -322,13 +322,12 @@ handle_other({'$gen_call', From, {info, Items}}, Deb, State) ->
catch Error -> {error, Error}
end),
mainloop(Deb, State);
-handle_other(emit_stats, Deb, State) ->
- mainloop(Deb, emit_stats(State));
handle_other({'$gen_cast', force_event_refresh}, Deb, State) ->
rabbit_event:notify(connection_exists,
- [{type, network} |
- infos(?CREATION_EVENT_KEYS, State)]),
+ [{type, network} | infos(?CREATION_EVENT_KEYS, State)]),
mainloop(Deb, State);
+handle_other(emit_stats, Deb, State) ->
+ mainloop(Deb, emit_stats(State));
handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State);
handle_other(Other, _Deb, _State) ->
diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl
index 7d36856a9d..f9632324be 100644
--- a/src/rabbit_trace.erl
+++ b/src/rabbit_trace.erl
@@ -76,7 +76,7 @@ update_config(Fun) ->
{ok, VHosts0} = application:get_env(rabbit, ?TRACE_VHOSTS),
VHosts = Fun(VHosts0),
application:set_env(rabbit, ?TRACE_VHOSTS, VHosts),
- rabbit_channel:refresh_config_all(),
+ rabbit_channel:refresh_config_local(),
ok.
%%----------------------------------------------------------------------------