diff options
| author | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2018-12-14 15:59:12 +0100 |
|---|---|---|
| committer | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2019-02-01 11:23:16 +0100 |
| commit | 7341d336304c7d5903052ca4d2bd8ad70708abf7 (patch) | |
| tree | e75f4783740861be1d5f8171a065c957920af318 /src | |
| parent | 166991462cb362ffb771dd731a13c8e7ad2e7be0 (diff) | |
| download | rabbitmq-server-git-7341d336304c7d5903052ca4d2bd8ad70708abf7.tar.gz | |
Restore the `rabbit:force_refresh_event()` feature
This is just to make the node compatible with a 3.7.x cluster. All
involved functions are marked as deprecated.
[#159298729]
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_networking.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 19 |
7 files changed, 86 insertions, 5 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index c0ecee6423..e06e50561d 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -25,7 +25,7 @@ -export([start/0, boot/0, stop/0, stop_and_halt/0, await_startup/0, await_startup/1, await_startup/3, status/0, is_running/0, alarms/0, - is_running/1, environment/0, rotate_logs/0, + is_running/1, environment/0, rotate_logs/0, force_event_refresh/1, start_fhc/0]). -export([start/2, stop/1, prep_stop/1]). @@ -272,6 +272,8 @@ -spec is_running(node()) -> boolean(). -spec environment() -> [{param(), term()}]. -spec rotate_logs() -> rabbit_types:ok_or_error(any()). +-deprecated([{force_event_refresh, 1, eventually}]). +-spec force_event_refresh(reference()) -> 'ok'. -spec log_locations() -> [log_location()]. @@ -1045,6 +1047,15 @@ start_logger() -> log_locations() -> rabbit_lager:log_locations(). +%% This feature was used by the management API up-to and including +%% RabbitMQ 3.7.x. It is unused in 3.8.x and thus deprecated. We keep it +%% to support in-place upgrades to 3.8.x (i.e. mixed-version clusters). +force_event_refresh(Ref) -> + rabbit_direct:force_event_refresh(Ref), + rabbit_networking:force_connection_event_refresh(Ref), + rabbit_channel:force_event_refresh(Ref), + rabbit_amqqueue:force_event_refresh(Ref). + %%--------------------------------------------------------------------------- %% misc diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index b41751a267..118a3d0fab 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -33,7 +33,7 @@ -export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0, list_with_possible_retry/1]). -export([list_by_type/1]). --export([notify_policy_changed/1]). +-export([force_event_refresh/1, notify_policy_changed/1]). -export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]). -export([basic_get/6, basic_consume/12, basic_cancel/6, notify_decorators/1]). -export([notify_sent/2, notify_sent_queue_down/1, resume/2]). @@ -140,6 +140,8 @@ -spec info_all(rabbit_types:vhost()) -> [rabbit_types:infos()]. -spec info_all(rabbit_types:vhost(), rabbit_types:info_keys()) -> [rabbit_types:infos()]. +-deprecated([{force_event_refresh, 1, eventually}]). +-spec force_event_refresh(reference()) -> 'ok'. -spec notify_policy_changed(amqqueue:amqqueue()) -> 'ok'. -spec consumers(amqqueue:amqqueue()) -> [{pid(), rabbit_types:ctag(), boolean(), non_neg_integer(), @@ -1036,6 +1038,11 @@ list_local(VHostPath) -> [Q || Q <- list(VHostPath), amqqueue:get_state(Q) =/= crashed, is_local_to_node(amqqueue:get_pid(Q), node())]. +force_event_refresh(Ref) -> + [gen_server2:cast(amqqueue:get_pid(Q), + {force_event_refresh, Ref}) || Q <- list()], + ok. + notify_policy_changed(Q) when ?amqqueue_is_classic(Q) -> QPid = amqqueue:get_pid(Q), gen_server2:cast(QPid, policy_changed); diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 80230a3fa8..3644f2cf90 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1568,6 +1568,27 @@ handle_cast({credit, ChPid, CTag, Credit, Drain}, {unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1}, run_message_queue(true, State1) end); + +handle_cast({force_event_refresh, Ref}, + State = #q{consumers = Consumers, + active_consumer = Holder}) -> + rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State), Ref), + QName = qname(State), + AllConsumers = rabbit_queue_consumers:all(Consumers), + case Holder of + none -> + [emit_consumer_created( + Ch, CTag, false, AckRequired, QName, Prefetch, + Args, Ref, ActingUser) || + {Ch, CTag, AckRequired, Prefetch, Args, ActingUser} + <- AllConsumers]; + {Ch, CTag} -> + [{Ch, CTag, AckRequired, Prefetch, Args, ActingUser}] = AllConsumers, + emit_consumer_created( + Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref, ActingUser) + end, + noreply(rabbit_event:init_stats_timer(State, #q.stats_timer)); + handle_cast(notify_decorators, State) -> notify_decorators(State), noreply(State); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index dac3037bff..2491316304 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -62,6 +62,7 @@ emit_info_all/4, info_local/1]). -export([refresh_config_local/0, ready_for_close/1]). -export([refresh_interceptors/0]). +-export([force_event_refresh/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/4, @@ -253,6 +254,8 @@ -spec info_all(rabbit_types:info_keys()) -> [rabbit_types:infos()]. -spec refresh_config_local() -> 'ok'. -spec ready_for_close(pid()) -> 'ok'. +-deprecated([{force_event_refresh, 1, eventually}]). +-spec force_event_refresh(reference()) -> 'ok'. %%---------------------------------------------------------------------------- @@ -414,6 +417,10 @@ refresh_interceptors() -> ready_for_close(Pid) -> rabbit_channel_common:ready_for_close(Pid). +force_event_refresh(Ref) -> + [gen_server2:cast(C, {force_event_refresh, Ref}) || C <- list()], + ok. + list_queue_states(Pid) -> gen_server2:call(Pid, list_queue_states). @@ -631,6 +638,11 @@ handle_cast({send_drained, CTagCredit}, State = #ch{writer_pid = WriterPid}) -> || {ConsumerTag, CreditDrained} <- CTagCredit], noreply(State); +handle_cast({force_event_refresh, Ref}, State) -> + rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State), + Ref), + noreply(rabbit_event:init_stats_timer(State, #ch.stats_timer)); + handle_cast({reject_publish, MsgSeqNo, _QPid}, State = #ch{unconfirmed = UC}) -> %% It does not matter which queue rejected the message, %% if any queue rejected it - it should not be confirmed. diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index f45ba3b1ca..cc2a9f1026 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -16,7 +16,7 @@ -module(rabbit_direct). --export([boot/0, list/0, connect/5, +-export([boot/0, force_event_refresh/1, list/0, connect/5, start_channel/9, disconnect/2]). %% Internal -export([list_local/0]). @@ -29,6 +29,8 @@ %%---------------------------------------------------------------------------- -spec boot() -> 'ok'. +-deprecated([{force_event_refresh, 1, eventually}]). +-spec force_event_refresh(reference()) -> 'ok'. -spec list() -> [pid()]. -spec list_local() -> [pid()]. -spec connect @@ -54,6 +56,10 @@ boot() -> rabbit_sup:start_supervisor_child( [{local, rabbit_direct_client_sup}, {rabbit_channel_sup, start_link, []}]). +force_event_refresh(Ref) -> + [Pid ! {force_event_refresh, Ref} || Pid <- list()], + ok. + list_local() -> pg_local:get_members(rabbit_direct). diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 6131e2f294..79de7ce180 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -35,7 +35,8 @@ connection_info/1, connection_info/2, connection_info_all/0, connection_info_all/1, emit_connection_info_all/4, emit_connection_info_local/3, - close_connection/2, handshake/2, tcp_host/1]). + close_connection/2, force_connection_event_refresh/1, + handshake/2, tcp_host/1]). %% Used by TCP-based transports, e.g. STOMP adapter -export([tcp_listener_addresses/1, tcp_listener_spec/9, @@ -86,6 +87,8 @@ -spec connection_info_all(rabbit_types:info_keys()) -> [rabbit_types:infos()]. -spec close_connection(pid(), string()) -> 'ok'. +-deprecated([{force_connection_event_refresh, 1, eventually}]). +-spec force_connection_event_refresh(reference()) -> 'ok'. -spec on_node_down(node()) -> 'ok'. -spec tcp_listener_addresses(listener_config()) -> [address()]. @@ -356,6 +359,10 @@ close_connection(Pid, Explanation) -> ok end. +force_connection_event_refresh(Ref) -> + [rabbit_reader:force_event_refresh(C, Ref) || C <- connections()], + ok. + handshake(Ref, ProxyProtocol) -> case ProxyProtocol of true -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 9a4c143c54..20acec77ae 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -57,7 +57,7 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/2, info_keys/0, info/1, info/2, +-export([start_link/2, info_keys/0, info/1, info/2, force_event_refresh/2, shutdown/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). @@ -66,6 +66,8 @@ -export([conserve_resources/3, server_properties/1]). +-deprecated([{force_event_refresh, 2, eventually}]). + -define(NORMAL_TIMEOUT, 3). -define(CLOSING_TIMEOUT, 30). -define(CHANNEL_TERMINATION_TIMEOUT, 3). @@ -161,6 +163,7 @@ -spec info_keys() -> rabbit_types:info_keys(). -spec info(pid()) -> rabbit_types:infos(). -spec info(pid(), rabbit_types:info_keys()) -> rabbit_types:infos(). +-spec force_event_refresh(pid(), reference()) -> 'ok'. -spec shutdown(pid(), string()) -> 'ok'. -type resource_alert() :: {WasAlarmSetForNode :: boolean(), IsThereAnyAlarmsWithSameSourceInTheCluster :: boolean(), @@ -216,6 +219,9 @@ info(Pid, Items) -> {error, Error} -> throw(Error) end. +force_event_refresh(Pid, Ref) -> + gen_server:cast(Pid, {force_event_refresh, Ref}). + conserve_resources(Pid, Source, {_, Conserve, _}) -> Pid ! {conserve_resources, Source, Conserve}, ok. @@ -615,6 +621,17 @@ handle_other({'$gen_call', From, {info, Items}}, State) -> catch Error -> {error, Error} end), State; +handle_other({'$gen_cast', {force_event_refresh, Ref}}, State) + when ?IS_RUNNING(State) -> + rabbit_event:notify( + connection_created, + augment_infos_with_user_provided_connection_name( + [{type, network} | infos(?CREATION_EVENT_KEYS, State)], State), + Ref), + rabbit_event:init_stats_timer(State, #v1.stats_timer); +handle_other({'$gen_cast', {force_event_refresh, _Ref}}, State) -> + %% Ignore, we will emit a created event once we start running. + State; handle_other(ensure_stats, State) -> ensure_stats_timer(State); handle_other(emit_stats, State) -> |
