summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2017-07-04 11:55:37 +0100
committerkjnilsson <knilsson@pivotal.io>2017-07-04 14:45:11 +0100
commit0e9b333b64928ec17b5054ad4c9aece1539f0037 (patch)
treebd25cef8e5d7bba59deb0301dff836b484764942
parent744e6991d226e87904672b227230732de59cc322 (diff)
downloadrabbitmq-server-git-0e9b333b64928ec17b5054ad4c9aece1539f0037.tar.gz
Clean up queue metrics on queue termination.
This change handles all non-crash termination cases. The assumption here is that once an amqqueue_process terminates the master is no longer on the current node. [#147753285]
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_core_metrics_gc.erl8
-rw-r--r--src/rabbit_mirror_queue_sync.erl2
-rw-r--r--test/rabbit_core_metrics_gc_SUITE.erl28
4 files changed, 19 insertions, 21 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index cfae4cbaf4..16a5a70e13 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -258,11 +258,13 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
State3.
terminate(shutdown = R, State = #q{backing_queue = BQ}) ->
+ rabbit_core_metrics:queue_deleted(qname(State)),
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
terminate({shutdown, missing_owner} = Reason, State) ->
%% if the owner was missing then there will be no queue, so don't emit stats
terminate_shutdown(terminate_delete(false, Reason, State), State);
terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) ->
+ rabbit_core_metrics:queue_deleted(qname(State)),
terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State);
terminate(normal, State) -> %% delete case
terminate_shutdown(terminate_delete(true, normal, State), State);
diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl
index 3321f2b5de..7f39a4eea9 100644
--- a/src/rabbit_core_metrics_gc.erl
+++ b/src/rabbit_core_metrics_gc.erl
@@ -54,9 +54,13 @@ terminate(_Reason, #state{timer = TRef}) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+start_timer(Interval, #state{timer = TRef0} = St) ->
+ timer:cancel(TRef0),
+ TRef1 = erlang:send_after(Interval, self(), start_gc),
+ St#state{timer = TRef1}.
+
start_timer(#state{interval = Interval} = St) ->
- TRef = erlang:send_after(Interval, self(), start_gc),
- St#state{timer = TRef}.
+ start_timer(Interval, St).
gc_connections() ->
gc_process(connection_created),
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index c5acd852e7..9e6182488e 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -16,7 +16,7 @@
-module(rabbit_mirror_queue_sync).
--include("rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
-export([master_prepare/4, master_go/8, slave/7, conserve_resources/3]).
diff --git a/test/rabbit_core_metrics_gc_SUITE.erl b/test/rabbit_core_metrics_gc_SUITE.erl
index 2540e5cd3e..d63c9ab2fb 100644
--- a/test/rabbit_core_metrics_gc_SUITE.erl
+++ b/test/rabbit_core_metrics_gc_SUITE.erl
@@ -343,11 +343,6 @@ cluster_queue_metrics(Config) ->
Node0 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Node1 = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
- rabbit_ct_broker_helpers:rpc(Config, Node0, erlang, send, [rabbit_core_metrics_gc, start_gc]),
- rabbit_ct_broker_helpers:rpc(Config, Node0, gen_server, call, [rabbit_core_metrics_gc, test]),
- rabbit_ct_broker_helpers:rpc(Config, Node1, erlang, send, [rabbit_core_metrics_gc, start_gc]),
- rabbit_ct_broker_helpers:rpc(Config, Node1, gen_server, call, [rabbit_core_metrics_gc, test]),
-
Ch = rabbit_ct_client_helpers:open_channel(Config, Node0),
Node0Name = rabbit_data_coercion:to_binary(Node0),
@@ -370,24 +365,21 @@ cluster_queue_metrics(Config) ->
% Synchronize
Name = rabbit_misc:r(VHost, queue, QueueName),
[#amqqueue{pid = QPid}] = rabbit_ct_broker_helpers:rpc(Config, Node0,
- ets, lookup, [rabbit_queue, Name]),
- ok = rabbit_ct_broker_helpers:rpc(Config, Node0, rabbit_amqqueue, sync_mirrors, [QPid]),
-
- timer:sleep(1000),
+ ets, lookup,
+ [rabbit_queue, Name]),
+ ok = rabbit_ct_broker_helpers:rpc(Config, Node0, rabbit_amqqueue,
+ sync_mirrors, [QPid]),
- rabbit_ct_broker_helpers:rpc(Config, Node0, erlang, send, [rabbit_core_metrics_gc, start_gc]),
- rabbit_ct_broker_helpers:rpc(Config, Node0, gen_server, call, [rabbit_core_metrics_gc, test]),
- rabbit_ct_broker_helpers:rpc(Config, Node1, erlang, send, [rabbit_core_metrics_gc, start_gc]),
- rabbit_ct_broker_helpers:rpc(Config, Node1, gen_server, call, [rabbit_core_metrics_gc, test]),
+ timer:sleep(1500),
% Check ETS table for data
% rabbit_core_metrics:queue_stats
- % {Name, MessagesReady, MessagesUnacknowledge, Messages, Reductions}
- % [{{resource,<<"/">>,queue,<<"cluster_queue_metrics">>}, 1,0,1,10524}]
- [] = rabbit_ct_broker_helpers:rpc(Config, Node0, ets, tab2list, [queue_coarse_metrics]),
+ [] = rabbit_ct_broker_helpers:rpc(Config, Node0, ets, tab2list,
+ [queue_coarse_metrics]),
- EtsData1_0 = rabbit_ct_broker_helpers:rpc(Config, Node1, ets, tab2list, [queue_coarse_metrics]),
- [{Name, 1, 0, 1, _}] = EtsData1_0,
+ [{Name, 1, 0, 1, _}] = rabbit_ct_broker_helpers:rpc(Config, Node1, ets,
+ tab2list,
+ [queue_coarse_metrics]),
amqp_channel:call(Ch, #'queue.delete'{queue=QueueName}),
rabbit_ct_client_helpers:close_channel(Ch),