summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2017-03-03 23:15:05 +0300
committerMichael Klishin <mklishin@pivotal.io>2017-03-03 23:15:05 +0300
commit07d3878f702b676f9e4c4744ce5444dd12290ec4 (patch)
treeed72ef253309dcd3a7357351baad9c1beb747d44
parentbcc24749ac0091877ab80de1ce9bc335e7469c52 (diff)
parent8d48dc496316121d808c87e06c8b117b140fb605 (diff)
downloadrabbitmq-server-git-07d3878f702b676f9e4c4744ce5444dd12290ec4.tar.gz
Merge branch 'master' into rabbitmq-server-1131
-rw-r--r--src/rabbit_core_metrics_gc.erl10
-rw-r--r--src/rabbit_queue_index.erl13
-rw-r--r--test/metrics_SUITE.erl57
-rw-r--r--test/rabbit_core_metrics_gc_SUITE.erl1
4 files changed, 61 insertions, 20 deletions
diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl
index e7f848bbc3..e0d7781796 100644
--- a/src/rabbit_core_metrics_gc.erl
+++ b/src/rabbit_core_metrics_gc.erl
@@ -96,6 +96,8 @@ gc_gen_server2() ->
gc_process(Table) ->
ets:foldl(fun({Pid = Key, _}, none) ->
gc_process(Pid, Table, Key);
+ ({Pid = Key, _, _, _, _}, none) ->
+ gc_process(Pid, Table, Key);
({Pid = Key, _, _, _}, none) ->
gc_process(Pid, Table, Key)
end, none, Table).
@@ -115,6 +117,8 @@ gc_entity(Table, GbSet) ->
gc_entity(Id, Table, Key, GbSet);
({Id = Key, _}, none) ->
gc_entity(Id, Table, Key, GbSet);
+ ({Id = Key, _, _}, none) ->
+ gc_entity(Id, Table, Key, GbSet);
({Id = Key, _, _, _, _}, none) ->
gc_entity(Id, Table, Key, GbSet)
end, none, Table).
@@ -130,11 +134,11 @@ gc_entity(Id, Table, Key, GbSet) ->
end.
gc_process_and_entity(Table, GbSet) ->
- ets:foldl(fun({{Pid, Id} = Key, _, _, _, _, _, _}, none)
+ ets:foldl(fun({{Pid, Id} = Key, _, _, _, _, _, _, _}, none)
when Table == channel_queue_metrics ->
gc_entity(Id, Table, Key, GbSet),
gc_process(Pid, Table, Key);
- ({{Pid, Id} = Key, _, _, _}, none)
+ ({{Pid, Id} = Key, _, _, _, _}, none)
when Table == channel_exchange_metrics ->
gc_entity(Id, Table, Key, GbSet),
gc_process(Pid, Table, Key);
@@ -157,7 +161,7 @@ gc_process_and_entity(Id, Pid, Table, Key, GbSet) ->
end.
gc_process_and_entities(Table, QueueGbSet, ExchangeGbSet) ->
- ets:foldl(fun({{Pid, {Q, X}} = Key, _}, none) ->
+ ets:foldl(fun({{Pid, {Q, X}} = Key, _, _}, none) ->
gc_process(Pid, Table, Key),
gc_entity(Q, Table, Key, QueueGbSet),
gc_entity(X, Table, Key, ExchangeGbSet)
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index e101e4aef4..52ec8d9e00 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -203,8 +203,17 @@
%% optimisation
delivered_cache}).
--record(segment, {num, path, journal_entries,
- entries_to_segment, unacked}).
+-record(segment, {
+ %% segment ID (an integer)
+ num,
+ %% segment file path (see also ?SEGMENT_EXTENSION)
+ path,
+ %% index operation log entries in this segment
+ journal_entries,
+ entries_to_segment,
+ %% counter of unacknowledged messages
+ unacked
+}).
-include("rabbit.hrl").
diff --git a/test/metrics_SUITE.erl b/test/metrics_SUITE.erl
index b2b0fe3560..14955d127d 100644
--- a/test/metrics_SUITE.erl
+++ b/test/metrics_SUITE.erl
@@ -20,6 +20,7 @@
-include_lib("proper/include/proper.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("rabbit_common/include/rabbit_core_metrics.hrl").
all() ->
@@ -75,6 +76,7 @@ end_per_group(_, Config) ->
Config.
init_per_testcase(Testcase, Config) ->
+ clean_core_metrics(Config),
rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) ->
@@ -166,6 +168,7 @@ channel_metric_idemp(Config, {N, R}) ->
(N == length(Table)) and (N == length(TableAfter)).
queue_metric_idemp(Config, {N, R}) ->
+ clean_core_metrics(Config),
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
{ok, Chan} = amqp_connection:open_channel(Conn),
Queues =
@@ -175,14 +178,15 @@ queue_metric_idemp(Config, {N, R}) ->
ensure_channel_queue_metrics_populated(Chan, Queue),
Queue
end || _ <- lists:seq(1, N)],
- Table = [ Pid || {Pid, _} <- read_table_rpc(Config, queue_metrics)],
- Table2 = [ Pid || {Pid, _} <- read_table_rpc(Config, queue_coarse_metrics)],
+
+ Table = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_metrics)],
+ Table2 = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_coarse_metrics)],
% referesh stats 'R' times
ChanTable = read_table_rpc(Config, channel_created),
- [[Pid ! emit_stats || {Pid, _} <- ChanTable ] || _ <- lists:seq(1, R)],
+ [[Pid ! emit_stats || {Pid, _, _} <- ChanTable ] || _ <- lists:seq(1, R)],
timer:sleep(100),
- TableAfter = [ Pid || {Pid, _} <- read_table_rpc(Config, queue_metrics)],
- TableAfter2 = [ Pid || {Pid, _} <- read_table_rpc(Config, queue_coarse_metrics)],
+ TableAfter = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_metrics)],
+ TableAfter2 = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_coarse_metrics)],
[ delete_queue(Chan, Q) || Q <- Queues],
rabbit_ct_client_helpers:close_connection(Conn),
(Table2 == TableAfter2) and (Table == TableAfter) and
@@ -191,7 +195,9 @@ queue_metric_idemp(Config, {N, R}) ->
connection_metric_count(Config, Ops) ->
add_rem_counter(Config, Ops,
{fun rabbit_ct_client_helpers:open_unmanaged_connection/1,
- fun rabbit_ct_client_helpers:close_connection/1},
+ fun(Cfg) ->
+ rabbit_ct_client_helpers:close_connection(Cfg)
+ end},
[ connection_created,
connection_metrics,
connection_coarse_metrics ]).
@@ -222,9 +228,10 @@ queue_metric_count(Config, Ops) ->
end,
Result = add_rem_counter(Config, Ops,
{AddFun,
- fun (Q) -> delete_queue(Chan, Q) end},
- [ channel_queue_metrics,
- channel_queue_exchange_metrics ]),
+ fun (Q) -> delete_queue(Chan, Q),
+ force_metric_gc(Config)
+ end}, [channel_queue_metrics,
+ channel_queue_exchange_metrics ]),
ok = rabbit_ct_client_helpers:close_connection(Conn),
Result.
@@ -240,7 +247,10 @@ queue_metric_count_channel_per_queue(Config, Ops) ->
end,
Result = add_rem_counter(Config, Ops,
{AddFun,
- fun ({Chan, Q}) -> delete_queue(Chan, Q) end},
+ fun ({Chan, Q}) ->
+ delete_queue(Chan, Q),
+ force_metric_gc(Config)
+ end},
[ channel_queue_metrics,
channel_queue_exchange_metrics ]),
ok = rabbit_ct_client_helpers:close_connection(Conn),
@@ -258,8 +268,10 @@ add_rem_counter(Config, {Initial, Ops}, {AddFun, RemFun}, Tables) ->
(_, S) -> S end,
{Initial, Things},
Ops),
+ force_metric_gc(Config),
TabLens = lists:map(fun(T) ->
- length(read_table_rpc(Config, T)) end, Tables),
+ length(read_table_rpc(Config, T))
+ end, Tables),
[RemFun(Thing) || Thing <- Things1],
[FinalLen] == lists:usort(TabLens).
@@ -270,9 +282,11 @@ connection(Config) ->
[_] = read_table_rpc(Config, connection_metrics),
[_] = read_table_rpc(Config, connection_coarse_metrics),
ok = rabbit_ct_client_helpers:close_connection(Conn),
+ force_metric_gc(Config),
[] = read_table_rpc(Config, connection_created),
[] = read_table_rpc(Config, connection_metrics),
- [] = read_table_rpc(Config, connection_coarse_metrics).
+ [] = read_table_rpc(Config, connection_coarse_metrics),
+ ok.
channel(Config) ->
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
@@ -308,10 +322,12 @@ channel_queue_delete_queue(Config) ->
[_] = read_table_rpc(Config, channel_queue_exchange_metrics),
delete_queue(Chan, Queue),
+ force_metric_gc(Config),
% ensure removal of queue cleans up channel_queue metrics
[] = read_table_rpc(Config, channel_queue_exchange_metrics),
[] = read_table_rpc(Config, channel_queue_metrics),
- ok = rabbit_ct_client_helpers:close_connection(Conn).
+ ok = rabbit_ct_client_helpers:close_connection(Conn),
+ ok.
channel_queue_exchange_consumer_close_connection(Config) ->
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
@@ -335,10 +351,13 @@ channel_queue_exchange_consumer_close_connection(Config) ->
ok = rabbit_ct_client_helpers:close_connection(Conn),
% ensure cleanup happened
+ force_metric_gc(Config),
[] = read_table_rpc(Config, channel_exchange_metrics),
[] = read_table_rpc(Config, channel_queue_exchange_metrics),
[] = read_table_rpc(Config, channel_queue_metrics),
- [] = read_table_rpc(Config, consumer_created).
+ [] = read_table_rpc(Config, consumer_created),
+ ok.
+
%% -------------------------------------------------------------------
@@ -371,6 +390,16 @@ force_channel_stats(Config) ->
read_table_rpc(Config, Table) ->
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, read_table, [Table]).
+clean_core_metrics(Config) ->
+ [ rabbit_ct_broker_helpers:rpc(Config, 0, ets, delete_all_objects, [Table])
+ || {Table, _} <- ?CORE_TABLES].
+
read_table(Table) ->
ets:tab2list(Table).
+force_metric_gc(Config) ->
+ timer:sleep(100),
+ rabbit_ct_broker_helpers:rpc(Config, 0, erlang, send,
+ [rabbit_core_metrics_gc, start_gc]),
+ rabbit_ct_broker_helpers:rpc(Config, 0, gen_server, call,
+ [rabbit_core_metrics_gc, test]).
diff --git a/test/rabbit_core_metrics_gc_SUITE.erl b/test/rabbit_core_metrics_gc_SUITE.erl
index b7311463d5..c3997f31e2 100644
--- a/test/rabbit_core_metrics_gc_SUITE.erl
+++ b/test/rabbit_core_metrics_gc_SUITE.erl
@@ -106,7 +106,6 @@ queue_metrics(Config) ->
[queue_metrics, Q]),
[_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup,
[queue_coarse_metrics, Q]),
-
%% Trigger gc. When the gen_server:call returns, the gc has already finished.
rabbit_ct_broker_helpers:rpc(Config, A, erlang, send, [rabbit_core_metrics_gc, start_gc]),
rabbit_ct_broker_helpers:rpc(Config, A, gen_server, call, [rabbit_core_metrics_gc, test]),