summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2017-03-08 20:50:40 +0300
committerMichael Klishin <michael@clojurewerkz.org>2017-03-08 20:50:40 +0300
commit1f6d6d4fcc2107a69bb5fd91601f9168adc97d54 (patch)
treeee84abc078dcad96b5e8e63f09688f3deb4d5f3d
parentbf74a923c59a6ec6bc8f17be803f3281a3047ce0 (diff)
parentb1ce3f16a605ad9f06fe98c5e7ff49ba23f4b1f4 (diff)
downloadrabbitmq-server-git-1f6d6d4fcc2107a69bb5fd91601f9168adc97d54.tar.gz
Merge branch 'stable' into rabbitmq-server-1122
-rw-r--r--rabbitmq-components.mk2
-rw-r--r--src/rabbit.erl2
-rw-r--r--src/rabbit_alarm.erl10
-rw-r--r--src/rabbit_cli.erl2
-rw-r--r--src/rabbit_core_metrics_gc.erl27
-rw-r--r--src/rabbit_queue_index.erl41
-rw-r--r--test/metrics_SUITE.erl63
-rw-r--r--test/partitions_SUITE.erl13
-rw-r--r--test/rabbit_core_metrics_gc_SUITE.erl34
-rw-r--r--test/unit_inbroker_SUITE.erl48
10 files changed, 176 insertions, 66 deletions
diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk
index f4db6812bc..79ea3c7a3c 100644
--- a/rabbitmq-components.mk
+++ b/rabbitmq-components.mk
@@ -104,6 +104,8 @@ dep_rabbitmq_public_umbrella = git_rmq rabbitmq-public-umbrella $(curre
dep_cowboy_commit = 1.0.4
dep_mochiweb = git git://github.com/basho/mochiweb.git v2.9.0p2
+# Last commit of PropEr supporting Erlang R16B03.
+dep_proper_commit = 735d972758d8bd85b12483626fe1b66450d6a6fe
dep_ranch_commit = 1.3.1
dep_webmachine_commit = 1.10.8p2
diff --git a/src/rabbit.erl b/src/rabbit.erl
index bed7058057..c1f89af574 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -475,7 +475,7 @@ start_apps(Apps) ->
prompt ->
IoDevice = get_input_iodevice(),
io:setopts(IoDevice, [{echo, false}]),
- PP = lists:droplast(io:get_line(IoDevice,
+ PP = rabbit_misc:lists_droplast(io:get_line(IoDevice,
"\nPlease enter the passphrase to unlock encrypted "
"configuration entries.\n\nPassphrase: ")),
io:setopts(IoDevice, [{echo, true}]),
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index daf2c167fa..9466dd0eeb 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -128,8 +128,12 @@ handle_call(_Request, State) ->
handle_event({set_alarm, {{resource_limit, Source, Node}, []}}, State) ->
case is_node_alarmed(Source, Node, State) of
- true -> {ok, State};
- false -> handle_set_resource_alarm(Source, Node, State)
+ true ->
+ {ok, State};
+ false ->
+ rabbit_event:notify(alarm_set, [{source, Source},
+ {node, Node}]),
+ handle_set_resource_alarm(Source, Node, State)
end;
handle_event({set_alarm, Alarm}, State = #alarms{alarms = Alarms}) ->
case lists:member(Alarm, Alarms) of
@@ -141,6 +145,8 @@ handle_event({set_alarm, Alarm}, State = #alarms{alarms = Alarms}) ->
handle_event({clear_alarm, {resource_limit, Source, Node}}, State) ->
case is_node_alarmed(Source, Node, State) of
true ->
+ rabbit_event:notify(alarm_cleared, [{source, Source},
+ {node, Node}]),
handle_clear_resource_alarm(Source, Node, State);
false ->
{ok, State}
diff --git a/src/rabbit_cli.erl b/src/rabbit_cli.erl
index 1feda43b6e..fb4ce328ee 100644
--- a/src/rabbit_cli.erl
+++ b/src/rabbit_cli.erl
@@ -277,7 +277,7 @@ mutually_exclusive_flags(CurrentOptionValues, Default, FlagsAndValues) ->
{ok, Value};
_ ->
Names = [ [$', N, $'] || {N, _} <- PresentFlags ],
- CommaSeparated = string:join(lists:droplast(Names), ", "),
+ CommaSeparated = string:join(rabbit_misc:lists_droplast(Names), ", "),
AndOneMore = lists:last(Names),
Msg = io_lib:format("Options ~s and ~s are mutually exclusive", [CommaSeparated, AndOneMore]),
{error, lists:flatten(Msg)}
diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl
index e7f848bbc3..3141fdc301 100644
--- a/src/rabbit_core_metrics_gc.erl
+++ b/src/rabbit_core_metrics_gc.erl
@@ -64,7 +64,6 @@ gc_connections() ->
gc_process(connection_coarse_metrics).
gc_channels() ->
- %% TODO channel stats
gc_process(channel_created),
gc_process(channel_metrics),
gc_process(channel_process_metrics),
@@ -96,16 +95,17 @@ gc_gen_server2() ->
gc_process(Table) ->
ets:foldl(fun({Pid = Key, _}, none) ->
gc_process(Pid, Table, Key);
+ ({Pid = Key, _, _, _, _}, none) ->
+ gc_process(Pid, Table, Key);
({Pid = Key, _, _, _}, none) ->
gc_process(Pid, Table, Key)
end, none, Table).
gc_process(Pid, Table, Key) ->
- case erlang:is_process_alive(Pid) of
+ case rabbit_misc:is_process_alive(Pid) of
true ->
none;
false ->
- %% TODO catch?
ets:delete(Table, Key),
none
end.
@@ -115,6 +115,8 @@ gc_entity(Table, GbSet) ->
gc_entity(Id, Table, Key, GbSet);
({Id = Key, _}, none) ->
gc_entity(Id, Table, Key, GbSet);
+ ({Id = Key, _, _}, none) ->
+ gc_entity(Id, Table, Key, GbSet);
({Id = Key, _, _, _, _}, none) ->
gc_entity(Id, Table, Key, GbSet)
end, none, Table).
@@ -124,40 +126,35 @@ gc_entity(Id, Table, Key, GbSet) ->
true ->
none;
false ->
- %% TODO catch?
ets:delete(Table, Key),
none
end.
gc_process_and_entity(Table, GbSet) ->
- ets:foldl(fun({{Pid, Id} = Key, _, _, _, _, _, _}, none)
+ ets:foldl(fun({{Pid, Id} = Key, _, _, _, _, _, _, _}, none)
when Table == channel_queue_metrics ->
- gc_entity(Id, Table, Key, GbSet),
- gc_process(Pid, Table, Key);
- ({{Pid, Id} = Key, _, _, _}, none)
+ gc_process_and_entity(Id, Pid, Table, Key, GbSet);
+ ({{Pid, Id} = Key, _, _, _, _}, none)
when Table == channel_exchange_metrics ->
- gc_entity(Id, Table, Key, GbSet),
- gc_process(Pid, Table, Key);
+ gc_process_and_entity(Id, Pid, Table, Key, GbSet);
({{Id, Pid, _} = Key, _, _, _, _}, none)
when Table == consumer_created ->
- gc_entity(Id, Table, Key, GbSet),
- gc_process(Pid, Table, Key);
+ gc_process_and_entity(Id, Pid, Table, Key, GbSet);
({{{Pid, Id}, _} = Key, _, _, _, _}, none) ->
gc_process_and_entity(Id, Pid, Table, Key, GbSet)
end, none, Table).
gc_process_and_entity(Id, Pid, Table, Key, GbSet) ->
- case erlang:is_process_alive(Pid) orelse gb_sets:is_member(Id, GbSet) of
+ case rabbit_misc:is_process_alive(Pid) andalso gb_sets:is_member(Id, GbSet) of
true ->
none;
false ->
- %% TODO catch?
ets:delete(Table, Key),
none
end.
gc_process_and_entities(Table, QueueGbSet, ExchangeGbSet) ->
- ets:foldl(fun({{Pid, {Q, X}} = Key, _}, none) ->
+ ets:foldl(fun({{Pid, {Q, X}} = Key, _, _}, none) ->
gc_process(Pid, Table, Key),
gc_entity(Q, Table, Key, QueueGbSet),
gc_entity(X, Table, Key, ExchangeGbSet)
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 6560d61625..f1a240eed8 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -176,13 +176,40 @@
%%----------------------------------------------------------------------------
--record(qistate, {dir, segments, journal_handle, dirty_count,
- max_journal_entries, on_sync, on_sync_msg,
- unconfirmed, unconfirmed_msg,
- pre_publish_cache, delivered_cache}).
-
--record(segment, {num, path, journal_entries,
- entries_to_segment, unacked}).
+-record(qistate, {
+ %% queue directory where segment and journal files are stored
+ dir,
+ %% map of #segment records
+ segments,
+ %% journal file handle obtained from/used by file_handle_cache
+ journal_handle,
+ %% how many not yet flushed entries are there
+ dirty_count,
+ %% this many not yet flushed journal entries will force a flush
+ max_journal_entries,
+ %% callback function invoked when a message is "handled"
+ %% by the index and potentially can be confirmed to the publisher
+ on_sync,
+ on_sync_msg,
+ %% set of IDs of unconfirmed [to publishers] messages
+ unconfirmed,
+ unconfirmed_msg,
+ %% optimisation
+ pre_publish_cache,
+ %% optimisation
+ delivered_cache}).
+
+-record(segment, {
+ %% segment ID (an integer)
+ num,
+ %% segment file path (see also ?SEGMENT_EXTENSION)
+ path,
+ %% index operation log entries in this segment
+ journal_entries,
+ entries_to_segment,
+ %% counter of unacknowledged messages
+ unacked
+}).
-include("rabbit.hrl").
diff --git a/test/metrics_SUITE.erl b/test/metrics_SUITE.erl
index b2b0fe3560..a66e9cec3c 100644
--- a/test/metrics_SUITE.erl
+++ b/test/metrics_SUITE.erl
@@ -20,6 +20,7 @@
-include_lib("proper/include/proper.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("rabbit_common/include/rabbit_core_metrics.hrl").
all() ->
@@ -75,6 +76,7 @@ end_per_group(_, Config) ->
Config.
init_per_testcase(Testcase, Config) ->
+ clean_core_metrics(Config),
rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) ->
@@ -144,7 +146,7 @@ connection_metric_idemp(Config, {N, R}) ->
Table2 = [ Pid || {Pid, _} <- read_table_rpc(Config, connection_coarse_metrics)],
% referesh stats 'R' times
[[Pid ! emit_stats || Pid <- Table] || _ <- lists:seq(1, R)],
- timer:sleep(100),
+ force_metric_gc(Config),
TableAfter = [ Pid || {Pid, _} <- read_table_rpc(Config, connection_metrics)],
TableAfter2 = [ Pid || {Pid, _} <- read_table_rpc(Config, connection_coarse_metrics)],
[rabbit_ct_client_helpers:close_connection(Conn) || Conn <- Conns],
@@ -158,7 +160,7 @@ channel_metric_idemp(Config, {N, R}) ->
Table2 = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_process_metrics)],
% referesh stats 'R' times
[[Pid ! emit_stats || Pid <- Table] || _ <- lists:seq(1, R)],
- timer:sleep(100),
+ force_metric_gc(Config),
TableAfter = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_metrics)],
TableAfter2 = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_process_metrics)],
rabbit_ct_client_helpers:close_connection(Conn),
@@ -166,6 +168,7 @@ channel_metric_idemp(Config, {N, R}) ->
(N == length(Table)) and (N == length(TableAfter)).
queue_metric_idemp(Config, {N, R}) ->
+ clean_core_metrics(Config),
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
{ok, Chan} = amqp_connection:open_channel(Conn),
Queues =
@@ -175,14 +178,15 @@ queue_metric_idemp(Config, {N, R}) ->
ensure_channel_queue_metrics_populated(Chan, Queue),
Queue
end || _ <- lists:seq(1, N)],
- Table = [ Pid || {Pid, _} <- read_table_rpc(Config, queue_metrics)],
- Table2 = [ Pid || {Pid, _} <- read_table_rpc(Config, queue_coarse_metrics)],
+
+ Table = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_metrics)],
+ Table2 = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_coarse_metrics)],
% referesh stats 'R' times
ChanTable = read_table_rpc(Config, channel_created),
- [[Pid ! emit_stats || {Pid, _} <- ChanTable ] || _ <- lists:seq(1, R)],
- timer:sleep(100),
- TableAfter = [ Pid || {Pid, _} <- read_table_rpc(Config, queue_metrics)],
- TableAfter2 = [ Pid || {Pid, _} <- read_table_rpc(Config, queue_coarse_metrics)],
+ [[Pid ! emit_stats || {Pid, _, _} <- ChanTable ] || _ <- lists:seq(1, R)],
+ force_metric_gc(Config),
+ TableAfter = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_metrics)],
+ TableAfter2 = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_coarse_metrics)],
[ delete_queue(Chan, Q) || Q <- Queues],
rabbit_ct_client_helpers:close_connection(Conn),
(Table2 == TableAfter2) and (Table == TableAfter) and
@@ -191,7 +195,9 @@ queue_metric_idemp(Config, {N, R}) ->
connection_metric_count(Config, Ops) ->
add_rem_counter(Config, Ops,
{fun rabbit_ct_client_helpers:open_unmanaged_connection/1,
- fun rabbit_ct_client_helpers:close_connection/1},
+ fun(Cfg) ->
+ rabbit_ct_client_helpers:close_connection(Cfg)
+ end},
[ connection_created,
connection_metrics,
connection_coarse_metrics ]).
@@ -222,9 +228,10 @@ queue_metric_count(Config, Ops) ->
end,
Result = add_rem_counter(Config, Ops,
{AddFun,
- fun (Q) -> delete_queue(Chan, Q) end},
- [ channel_queue_metrics,
- channel_queue_exchange_metrics ]),
+ fun (Q) -> delete_queue(Chan, Q),
+ force_metric_gc(Config)
+ end}, [channel_queue_metrics,
+ channel_queue_exchange_metrics ]),
ok = rabbit_ct_client_helpers:close_connection(Conn),
Result.
@@ -240,7 +247,10 @@ queue_metric_count_channel_per_queue(Config, Ops) ->
end,
Result = add_rem_counter(Config, Ops,
{AddFun,
- fun ({Chan, Q}) -> delete_queue(Chan, Q) end},
+ fun ({Chan, Q}) ->
+ delete_queue(Chan, Q),
+ force_metric_gc(Config)
+ end},
[ channel_queue_metrics,
channel_queue_exchange_metrics ]),
ok = rabbit_ct_client_helpers:close_connection(Conn),
@@ -258,8 +268,10 @@ add_rem_counter(Config, {Initial, Ops}, {AddFun, RemFun}, Tables) ->
(_, S) -> S end,
{Initial, Things},
Ops),
+ force_metric_gc(Config),
TabLens = lists:map(fun(T) ->
- length(read_table_rpc(Config, T)) end, Tables),
+ length(read_table_rpc(Config, T))
+ end, Tables),
[RemFun(Thing) || Thing <- Things1],
[FinalLen] == lists:usort(TabLens).
@@ -270,9 +282,11 @@ connection(Config) ->
[_] = read_table_rpc(Config, connection_metrics),
[_] = read_table_rpc(Config, connection_coarse_metrics),
ok = rabbit_ct_client_helpers:close_connection(Conn),
+ force_metric_gc(Config),
[] = read_table_rpc(Config, connection_created),
[] = read_table_rpc(Config, connection_metrics),
- [] = read_table_rpc(Config, connection_coarse_metrics).
+ [] = read_table_rpc(Config, connection_coarse_metrics),
+ ok.
channel(Config) ->
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
@@ -308,10 +322,12 @@ channel_queue_delete_queue(Config) ->
[_] = read_table_rpc(Config, channel_queue_exchange_metrics),
delete_queue(Chan, Queue),
+ force_metric_gc(Config),
% ensure removal of queue cleans up channel_queue metrics
[] = read_table_rpc(Config, channel_queue_exchange_metrics),
[] = read_table_rpc(Config, channel_queue_metrics),
- ok = rabbit_ct_client_helpers:close_connection(Conn).
+ ok = rabbit_ct_client_helpers:close_connection(Conn),
+ ok.
channel_queue_exchange_consumer_close_connection(Config) ->
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
@@ -335,10 +351,13 @@ channel_queue_exchange_consumer_close_connection(Config) ->
ok = rabbit_ct_client_helpers:close_connection(Conn),
% ensure cleanup happened
+ force_metric_gc(Config),
[] = read_table_rpc(Config, channel_exchange_metrics),
[] = read_table_rpc(Config, channel_queue_exchange_metrics),
[] = read_table_rpc(Config, channel_queue_metrics),
- [] = read_table_rpc(Config, consumer_created).
+ [] = read_table_rpc(Config, consumer_created),
+ ok.
+
%% -------------------------------------------------------------------
@@ -371,6 +390,16 @@ force_channel_stats(Config) ->
read_table_rpc(Config, Table) ->
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, read_table, [Table]).
+clean_core_metrics(Config) ->
+ [ rabbit_ct_broker_helpers:rpc(Config, 0, ets, delete_all_objects, [Table])
+ || {Table, _} <- ?CORE_TABLES].
+
read_table(Table) ->
ets:tab2list(Table).
+force_metric_gc(Config) ->
+ timer:sleep(300),
+ rabbit_ct_broker_helpers:rpc(Config, 0, erlang, send,
+ [rabbit_core_metrics_gc, start_gc]),
+ rabbit_ct_broker_helpers:rpc(Config, 0, gen_server, call,
+ [rabbit_core_metrics_gc, test]).
diff --git a/test/partitions_SUITE.erl b/test/partitions_SUITE.erl
index e00c015d02..8c8a772987 100644
--- a/test/partitions_SUITE.erl
+++ b/test/partitions_SUITE.erl
@@ -11,7 +11,7 @@
%% The Original Code is RabbitMQ.
%%
%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2011-2016 Pivotal Software, Inc. All rights reserved.
+%% Copyright (c) 2011-2017 Pivotal Software, Inc. All rights reserved.
%%
-module(partitions_SUITE).
@@ -21,10 +21,6 @@
-compile(export_all).
--import(rabbit_ct_broker_helpers, [enable_dist_proxy_manager/1,
- enable_dist_proxy/1,
- enable_dist_proxy_on_node/3]).
-
%% We set ticktime to 1s and setuptime is 7s so to make sure it
%% passes...
-define(DELAY, 8000).
@@ -33,6 +29,9 @@
%% It's a lot, but still better than timetrap_timeout
-define(AWAIT_TIMEOUT, 300000).
+suite() ->
+ [{timetrap, 5 * 60000}].
+
all() ->
[
{group, net_ticktime_1},
@@ -49,8 +48,8 @@ groups() ->
{cluster_size_3, [], [
autoheal,
autoheal_after_pause_if_all_down,
- autoheal_multiple_partial_partitions,
- autoheal_unexpected_finish,
+ autoheal_multiple_partial_partitions,
+ autoheal_unexpected_finish,
ignore,
pause_if_all_down_on_blocked,
pause_if_all_down_on_down,
diff --git a/test/rabbit_core_metrics_gc_SUITE.erl b/test/rabbit_core_metrics_gc_SUITE.erl
index b7311463d5..6e7607eea6 100644
--- a/test/rabbit_core_metrics_gc_SUITE.erl
+++ b/test/rabbit_core_metrics_gc_SUITE.erl
@@ -34,7 +34,8 @@ groups() ->
connection_metrics,
channel_metrics,
node_metrics,
- gen_server2_metrics
+ gen_server2_metrics,
+ consumer_metrics
]
}
].
@@ -106,7 +107,6 @@ queue_metrics(Config) ->
[queue_metrics, Q]),
[_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup,
[queue_coarse_metrics, Q]),
-
%% Trigger gc. When the gen_server:call returns, the gc has already finished.
rabbit_ct_broker_helpers:rpc(Config, A, erlang, send, [rabbit_core_metrics_gc, start_gc]),
rabbit_ct_broker_helpers:rpc(Config, A, gen_server, call, [rabbit_core_metrics_gc, test]),
@@ -287,6 +287,36 @@ gen_server2_metrics(Config) ->
ok.
+consumer_metrics(Config) ->
+ A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, A),
+
+ amqp_channel:call(Ch, #'queue.declare'{queue = <<"queue_metrics">>}),
+ amqp_channel:call(Ch, #'basic.consume'{queue = <<"queue_metrics">>}),
+ timer:sleep(200),
+
+ DeadPid = rabbit_ct_broker_helpers:rpc(Config, A, ?MODULE, dead_pid, []),
+
+ QName = q(<<"queue_metrics">>),
+ CTag = <<"tag">>,
+ rabbit_ct_broker_helpers:rpc(Config, A, rabbit_core_metrics,
+ consumer_created, [DeadPid, CTag, true, true,
+ QName, 1, []]),
+ Id = {QName, DeadPid, CTag},
+ [_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, [consumer_created, Id]),
+
+ %% Trigger gc. When the gen_server:call returns, the gc has already finished.
+ rabbit_ct_broker_helpers:rpc(Config, A, erlang, send, [rabbit_core_metrics_gc, start_gc]),
+ rabbit_ct_broker_helpers:rpc(Config, A, gen_server, call, [rabbit_core_metrics_gc, test]),
+
+ [_|_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, tab2list, [consumer_created]),
+ [] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, [consumer_created, Id]),
+
+ amqp_channel:call(Ch, #'queue.delete'{queue = <<"queue_metrics">>}),
+ rabbit_ct_client_helpers:close_channel(Ch),
+
+ ok.
+
dead_pid() ->
spawn(fun() -> ok end).
diff --git a/test/unit_inbroker_SUITE.erl b/test/unit_inbroker_SUITE.erl
index 9d0b84e733..32738e1133 100644
--- a/test/unit_inbroker_SUITE.erl
+++ b/test/unit_inbroker_SUITE.erl
@@ -2929,34 +2929,54 @@ channel_statistics1(_Config) ->
%% Check the stats reflect that
Check2 = fun() ->
- [{{Ch, QRes}, 1, 0, 0, 0, 0, 0}] = ets:lookup(
- channel_queue_metrics,
- {Ch, QRes}),
- [{{Ch, X}, 1, 0, 0}] = ets:lookup(
- channel_exchange_metrics,
- {Ch, X}),
- [{{Ch, {QRes, X}}, 1}] = ets:lookup(
- channel_queue_exchange_metrics,
- {Ch, {QRes, X}})
+ [{{Ch, QRes}, 1, 0, 0, 0, 0, 0, 0}] = ets:lookup(
+ channel_queue_metrics,
+ {Ch, QRes}),
+ [{{Ch, X}, 1, 0, 0, 0}] = ets:lookup(
+ channel_exchange_metrics,
+ {Ch, X}),
+ [{{Ch, {QRes, X}}, 1, 0}] = ets:lookup(
+ channel_queue_exchange_metrics,
+ {Ch, {QRes, X}})
end,
test_ch_metrics(Check2, ?TIMEOUT),
- %% Check the stats remove stuff on queue deletion
+ %% Check the stats are marked for removal on queue deletion.
rabbit_channel:do(Ch, #'queue.delete'{queue = QName}),
Check3 = fun() ->
+ [{{Ch, QRes}, 1, 0, 0, 0, 0, 0, 1}] = ets:lookup(
+ channel_queue_metrics,
+ {Ch, QRes}),
+ [{{Ch, X}, 1, 0, 0, 0}] = ets:lookup(
+ channel_exchange_metrics,
+ {Ch, X}),
+ [{{Ch, {QRes, X}}, 1, 1}] = ets:lookup(
+ channel_queue_exchange_metrics,
+ {Ch, {QRes, X}})
+ end,
+ test_ch_metrics(Check3, ?TIMEOUT),
+
+ %% Check the garbage collection removes stuff.
+ force_metric_gc(),
+ Check4 = fun() ->
[] = ets:lookup(channel_queue_metrics, {Ch, QRes}),
- [{{Ch, X}, 1, 0, 0}] = ets:lookup(
- channel_exchange_metrics,
- {Ch, X}),
+ [{{Ch, X}, 1, 0, 0, 0}] = ets:lookup(
+ channel_exchange_metrics,
+ {Ch, X}),
[] = ets:lookup(channel_queue_exchange_metrics,
{Ch, {QRes, X}})
end,
- test_ch_metrics(Check3, ?TIMEOUT),
+ test_ch_metrics(Check4, ?TIMEOUT),
rabbit_channel:shutdown(Ch),
dummy_event_receiver:stop(),
passed.
+force_metric_gc() ->
+ timer:sleep(300),
+ rabbit_core_metrics_gc ! start_gc,
+ gen_server:call(rabbit_core_metrics_gc, test).
+
test_ch_metrics(Fun, Timeout) when Timeout =< 0 ->
Fun();
test_ch_metrics(Fun, Timeout) ->