summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2018-12-14 15:59:12 +0100
committerJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2019-02-01 11:23:16 +0100
commit7341d336304c7d5903052ca4d2bd8ad70708abf7 (patch)
treee75f4783740861be1d5f8171a065c957920af318 /src
parent166991462cb362ffb771dd731a13c8e7ad2e7be0 (diff)
downloadrabbitmq-server-git-7341d336304c7d5903052ca4d2bd8ad70708abf7.tar.gz
Restore the `rabbit:force_refresh_event()` feature
This is just to make the node compatible with a 3.7.x cluster. All involved functions are marked as deprecated. [#159298729]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl13
-rw-r--r--src/rabbit_amqqueue.erl9
-rw-r--r--src/rabbit_amqqueue_process.erl21
-rw-r--r--src/rabbit_channel.erl12
-rw-r--r--src/rabbit_direct.erl8
-rw-r--r--src/rabbit_networking.erl9
-rw-r--r--src/rabbit_reader.erl19
7 files changed, 86 insertions, 5 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index c0ecee6423..e06e50561d 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -25,7 +25,7 @@
-export([start/0, boot/0, stop/0,
stop_and_halt/0, await_startup/0, await_startup/1, await_startup/3,
status/0, is_running/0, alarms/0,
- is_running/1, environment/0, rotate_logs/0,
+ is_running/1, environment/0, rotate_logs/0, force_event_refresh/1,
start_fhc/0]).
-export([start/2, stop/1, prep_stop/1]).
@@ -272,6 +272,8 @@
-spec is_running(node()) -> boolean().
-spec environment() -> [{param(), term()}].
-spec rotate_logs() -> rabbit_types:ok_or_error(any()).
+-deprecated([{force_event_refresh, 1, eventually}]).
+-spec force_event_refresh(reference()) -> 'ok'.
-spec log_locations() -> [log_location()].
@@ -1045,6 +1047,15 @@ start_logger() ->
log_locations() ->
rabbit_lager:log_locations().
+%% This feature was used by the management API up-to and including
+%% RabbitMQ 3.7.x. It is unused in 3.8.x and thus deprecated. We keep it
+%% to support in-place upgrades to 3.8.x (i.e. mixed-version clusters).
+force_event_refresh(Ref) ->
+ rabbit_direct:force_event_refresh(Ref),
+ rabbit_networking:force_connection_event_refresh(Ref),
+ rabbit_channel:force_event_refresh(Ref),
+ rabbit_amqqueue:force_event_refresh(Ref).
+
%%---------------------------------------------------------------------------
%% misc
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index b41751a267..118a3d0fab 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -33,7 +33,7 @@
-export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0,
list_with_possible_retry/1]).
-export([list_by_type/1]).
--export([notify_policy_changed/1]).
+-export([force_event_refresh/1, notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]).
-export([basic_get/6, basic_consume/12, basic_cancel/6, notify_decorators/1]).
-export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
@@ -140,6 +140,8 @@
-spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()].
-spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) ->
[rabbit_types:infos()].
+-deprecated([{force_event_refresh, 1, eventually}]).
+-spec force_event_refresh(reference()) -> 'ok'.
-spec notify_policy_changed(amqqueue:amqqueue()) -> 'ok'.
-spec consumers(amqqueue:amqqueue()) ->
[{pid(), rabbit_types:ctag(), boolean(), non_neg_integer(),
@@ -1036,6 +1038,11 @@ list_local(VHostPath) ->
[Q || Q <- list(VHostPath),
amqqueue:get_state(Q) =/= crashed, is_local_to_node(amqqueue:get_pid(Q), node())].
+force_event_refresh(Ref) ->
+ [gen_server2:cast(amqqueue:get_pid(Q),
+ {force_event_refresh, Ref}) || Q <- list()],
+ ok.
+
notify_policy_changed(Q) when ?amqqueue_is_classic(Q) ->
QPid = amqqueue:get_pid(Q),
gen_server2:cast(QPid, policy_changed);
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 80230a3fa8..3644f2cf90 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1568,6 +1568,27 @@ handle_cast({credit, ChPid, CTag, Credit, Drain},
{unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1},
run_message_queue(true, State1)
end);
+
+handle_cast({force_event_refresh, Ref},
+ State = #q{consumers = Consumers,
+ active_consumer = Holder}) ->
+ rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State), Ref),
+ QName = qname(State),
+ AllConsumers = rabbit_queue_consumers:all(Consumers),
+ case Holder of
+ none ->
+ [emit_consumer_created(
+ Ch, CTag, false, AckRequired, QName, Prefetch,
+ Args, Ref, ActingUser) ||
+ {Ch, CTag, AckRequired, Prefetch, Args, ActingUser}
+ <- AllConsumers];
+ {Ch, CTag} ->
+ [{Ch, CTag, AckRequired, Prefetch, Args, ActingUser}] = AllConsumers,
+ emit_consumer_created(
+ Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref, ActingUser)
+ end,
+ noreply(rabbit_event:init_stats_timer(State, #q.stats_timer));
+
handle_cast(notify_decorators, State) ->
notify_decorators(State),
noreply(State);
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index dac3037bff..2491316304 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -62,6 +62,7 @@
emit_info_all/4, info_local/1]).
-export([refresh_config_local/0, ready_for_close/1]).
-export([refresh_interceptors/0]).
+-export([force_event_refresh/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/4,
@@ -253,6 +254,8 @@
-spec info_all(rabbit_types:info_keys()) -> [rabbit_types:infos()].
-spec refresh_config_local() -> 'ok'.
-spec ready_for_close(pid()) -> 'ok'.
+-deprecated([{force_event_refresh, 1, eventually}]).
+-spec force_event_refresh(reference()) -> 'ok'.
%%----------------------------------------------------------------------------
@@ -414,6 +417,10 @@ refresh_interceptors() ->
ready_for_close(Pid) ->
rabbit_channel_common:ready_for_close(Pid).
+force_event_refresh(Ref) ->
+ [gen_server2:cast(C, {force_event_refresh, Ref}) || C <- list()],
+ ok.
+
list_queue_states(Pid) ->
gen_server2:call(Pid, list_queue_states).
@@ -631,6 +638,11 @@ handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) ->
|| {ConsumerTag, CreditDrained} <- CTagCredit],
noreply(State);
+handle_cast({force_event_refresh, Ref}, State) ->
+ rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State),
+ Ref),
+ noreply(rabbit_event:init_stats_timer(State, #ch.stats_timer));
+
handle_cast({reject_publish, MsgSeqNo, _QPid}, State = #ch{unconfirmed = UC}) ->
%% It does not matter which queue rejected the message,
%% if any queue rejected it - it should not be confirmed.
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index f45ba3b1ca..cc2a9f1026 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -16,7 +16,7 @@
-module(rabbit_direct).
--export([boot/0, list/0, connect/5,
+-export([boot/0, force_event_refresh/1, list/0, connect/5,
start_channel/9, disconnect/2]).
%% Internal
-export([list_local/0]).
@@ -29,6 +29,8 @@
%%----------------------------------------------------------------------------
-spec boot() -> 'ok'.
+-deprecated([{force_event_refresh, 1, eventually}]).
+-spec force_event_refresh(reference()) -> 'ok'.
-spec list() -> [pid()].
-spec list_local() -> [pid()].
-spec connect
@@ -54,6 +56,10 @@ boot() -> rabbit_sup:start_supervisor_child(
[{local, rabbit_direct_client_sup},
{rabbit_channel_sup, start_link, []}]).
+force_event_refresh(Ref) ->
+ [Pid ! {force_event_refresh, Ref} || Pid <- list()],
+ ok.
+
list_local() ->
pg_local:get_members(rabbit_direct).
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 6131e2f294..79de7ce180 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -35,7 +35,8 @@
connection_info/1, connection_info/2,
connection_info_all/0, connection_info_all/1,
emit_connection_info_all/4, emit_connection_info_local/3,
- close_connection/2, handshake/2, tcp_host/1]).
+ close_connection/2, force_connection_event_refresh/1,
+ handshake/2, tcp_host/1]).
%% Used by TCP-based transports, e.g. STOMP adapter
-export([tcp_listener_addresses/1, tcp_listener_spec/9,
@@ -86,6 +87,8 @@
-spec connection_info_all(rabbit_types:info_keys()) ->
[rabbit_types:infos()].
-spec close_connection(pid(), string()) -> 'ok'.
+-deprecated([{force_connection_event_refresh, 1, eventually}]).
+-spec force_connection_event_refresh(reference()) -> 'ok'.
-spec on_node_down(node()) -> 'ok'.
-spec tcp_listener_addresses(listener_config()) -> [address()].
@@ -356,6 +359,10 @@ close_connection(Pid, Explanation) ->
ok
end.
+force_connection_event_refresh(Ref) ->
+ [rabbit_reader:force_event_refresh(C, Ref) || C <- connections()],
+ ok.
+
handshake(Ref, ProxyProtocol) ->
case ProxyProtocol of
true ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 9a4c143c54..20acec77ae 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -57,7 +57,7 @@
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
--export([start_link/2, info_keys/0, info/1, info/2,
+-export([start_link/2, info_keys/0, info/1, info/2, force_event_refresh/2,
shutdown/2]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
@@ -66,6 +66,8 @@
-export([conserve_resources/3, server_properties/1]).
+-deprecated([{force_event_refresh, 2, eventually}]).
+
-define(NORMAL_TIMEOUT, 3).
-define(CLOSING_TIMEOUT, 30).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
@@ -161,6 +163,7 @@
-spec info_keys() -> rabbit_types:info_keys().
-spec info(pid()) -> rabbit_types:infos().
-spec info(pid(), rabbit_types:info_keys()) -> rabbit_types:infos().
+-spec force_event_refresh(pid(), reference()) -> 'ok'.
-spec shutdown(pid(), string()) -> 'ok'.
-type resource_alert() :: {WasAlarmSetForNode :: boolean(),
IsThereAnyAlarmsWithSameSourceInTheCluster :: boolean(),
@@ -216,6 +219,9 @@ info(Pid, Items) ->
{error, Error} -> throw(Error)
end.
+force_event_refresh(Pid, Ref) ->
+ gen_server:cast(Pid, {force_event_refresh, Ref}).
+
conserve_resources(Pid, Source, {_, Conserve, _}) ->
Pid ! {conserve_resources, Source, Conserve},
ok.
@@ -615,6 +621,17 @@ handle_other({'$gen_call', From, {info, Items}}, State) ->
catch Error -> {error, Error}
end),
State;
+handle_other({'$gen_cast', {force_event_refresh, Ref}}, State)
+ when ?IS_RUNNING(State) ->
+ rabbit_event:notify(
+ connection_created,
+ augment_infos_with_user_provided_connection_name(
+ [{type, network} | infos(?CREATION_EVENT_KEYS, State)], State),
+ Ref),
+ rabbit_event:init_stats_timer(State, #v1.stats_timer);
+handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) ->
+ %% Ignore, we will emit a created event once we start running.
+ State;
handle_other(ensure_stats, State) ->
ensure_stats_timer(State);
handle_other(emit_stats, State) ->