diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2017-03-03 23:15:05 +0300 |
|---|---|---|
| committer | Michael Klishin <mklishin@pivotal.io> | 2017-03-03 23:15:05 +0300 |
| commit | 07d3878f702b676f9e4c4744ce5444dd12290ec4 (patch) | |
| tree | ed72ef253309dcd3a7357351baad9c1beb747d44 | |
| parent | bcc24749ac0091877ab80de1ce9bc335e7469c52 (diff) | |
| parent | 8d48dc496316121d808c87e06c8b117b140fb605 (diff) | |
| download | rabbitmq-server-git-07d3878f702b676f9e4c4744ce5444dd12290ec4.tar.gz | |
Merge branch 'master' into rabbitmq-server-1131
| -rw-r--r-- | src/rabbit_core_metrics_gc.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 13 | ||||
| -rw-r--r-- | test/metrics_SUITE.erl | 57 | ||||
| -rw-r--r-- | test/rabbit_core_metrics_gc_SUITE.erl | 1 |
4 files changed, 61 insertions, 20 deletions
diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl index e7f848bbc3..e0d7781796 100644 --- a/src/rabbit_core_metrics_gc.erl +++ b/src/rabbit_core_metrics_gc.erl @@ -96,6 +96,8 @@ gc_gen_server2() -> gc_process(Table) -> ets:foldl(fun({Pid = Key, _}, none) -> gc_process(Pid, Table, Key); + ({Pid = Key, _, _, _, _}, none) -> + gc_process(Pid, Table, Key); ({Pid = Key, _, _, _}, none) -> gc_process(Pid, Table, Key) end, none, Table). @@ -115,6 +117,8 @@ gc_entity(Table, GbSet) -> gc_entity(Id, Table, Key, GbSet); ({Id = Key, _}, none) -> gc_entity(Id, Table, Key, GbSet); + ({Id = Key, _, _}, none) -> + gc_entity(Id, Table, Key, GbSet); ({Id = Key, _, _, _, _}, none) -> gc_entity(Id, Table, Key, GbSet) end, none, Table). @@ -130,11 +134,11 @@ gc_entity(Id, Table, Key, GbSet) -> end. gc_process_and_entity(Table, GbSet) -> - ets:foldl(fun({{Pid, Id} = Key, _, _, _, _, _, _}, none) + ets:foldl(fun({{Pid, Id} = Key, _, _, _, _, _, _, _}, none) when Table == channel_queue_metrics -> gc_entity(Id, Table, Key, GbSet), gc_process(Pid, Table, Key); - ({{Pid, Id} = Key, _, _, _}, none) + ({{Pid, Id} = Key, _, _, _, _}, none) when Table == channel_exchange_metrics -> gc_entity(Id, Table, Key, GbSet), gc_process(Pid, Table, Key); @@ -157,7 +161,7 @@ gc_process_and_entity(Id, Pid, Table, Key, GbSet) -> end. gc_process_and_entities(Table, QueueGbSet, ExchangeGbSet) -> - ets:foldl(fun({{Pid, {Q, X}} = Key, _}, none) -> + ets:foldl(fun({{Pid, {Q, X}} = Key, _, _}, none) -> gc_process(Pid, Table, Key), gc_entity(Q, Table, Key, QueueGbSet), gc_entity(X, Table, Key, ExchangeGbSet) diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index e101e4aef4..52ec8d9e00 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -203,8 +203,17 @@ %% optimisation delivered_cache}). --record(segment, {num, path, journal_entries, - entries_to_segment, unacked}). +-record(segment, { + %% segment ID (an integer) + num, + %% segment file path (see also ?SEGMENT_EXTENSION) + path, + %% index operation log entries in this segment + journal_entries, + entries_to_segment, + %% counter of unacknowledged messages + unacked +}). -include("rabbit.hrl"). diff --git a/test/metrics_SUITE.erl b/test/metrics_SUITE.erl index b2b0fe3560..14955d127d 100644 --- a/test/metrics_SUITE.erl +++ b/test/metrics_SUITE.erl @@ -20,6 +20,7 @@ -include_lib("proper/include/proper.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("rabbit_common/include/rabbit_core_metrics.hrl"). all() -> @@ -75,6 +76,7 @@ end_per_group(_, Config) -> Config. init_per_testcase(Testcase, Config) -> + clean_core_metrics(Config), rabbit_ct_helpers:testcase_started(Config, Testcase). end_per_testcase(Testcase, Config) -> @@ -166,6 +168,7 @@ channel_metric_idemp(Config, {N, R}) -> (N == length(Table)) and (N == length(TableAfter)). queue_metric_idemp(Config, {N, R}) -> + clean_core_metrics(Config), Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), {ok, Chan} = amqp_connection:open_channel(Conn), Queues = @@ -175,14 +178,15 @@ queue_metric_idemp(Config, {N, R}) -> ensure_channel_queue_metrics_populated(Chan, Queue), Queue end || _ <- lists:seq(1, N)], - Table = [ Pid || {Pid, _} <- read_table_rpc(Config, queue_metrics)], - Table2 = [ Pid || {Pid, _} <- read_table_rpc(Config, queue_coarse_metrics)], + + Table = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_metrics)], + Table2 = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_coarse_metrics)], % referesh stats 'R' times ChanTable = read_table_rpc(Config, channel_created), - [[Pid ! emit_stats || {Pid, _} <- ChanTable ] || _ <- lists:seq(1, R)], + [[Pid ! emit_stats || {Pid, _, _} <- ChanTable ] || _ <- lists:seq(1, R)], timer:sleep(100), - TableAfter = [ Pid || {Pid, _} <- read_table_rpc(Config, queue_metrics)], - TableAfter2 = [ Pid || {Pid, _} <- read_table_rpc(Config, queue_coarse_metrics)], + TableAfter = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_metrics)], + TableAfter2 = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_coarse_metrics)], [ delete_queue(Chan, Q) || Q <- Queues], rabbit_ct_client_helpers:close_connection(Conn), (Table2 == TableAfter2) and (Table == TableAfter) and @@ -191,7 +195,9 @@ queue_metric_idemp(Config, {N, R}) -> connection_metric_count(Config, Ops) -> add_rem_counter(Config, Ops, {fun rabbit_ct_client_helpers:open_unmanaged_connection/1, - fun rabbit_ct_client_helpers:close_connection/1}, + fun(Cfg) -> + rabbit_ct_client_helpers:close_connection(Cfg) + end}, [ connection_created, connection_metrics, connection_coarse_metrics ]). @@ -222,9 +228,10 @@ queue_metric_count(Config, Ops) -> end, Result = add_rem_counter(Config, Ops, {AddFun, - fun (Q) -> delete_queue(Chan, Q) end}, - [ channel_queue_metrics, - channel_queue_exchange_metrics ]), + fun (Q) -> delete_queue(Chan, Q), + force_metric_gc(Config) + end}, [channel_queue_metrics, + channel_queue_exchange_metrics ]), ok = rabbit_ct_client_helpers:close_connection(Conn), Result. @@ -240,7 +247,10 @@ queue_metric_count_channel_per_queue(Config, Ops) -> end, Result = add_rem_counter(Config, Ops, {AddFun, - fun ({Chan, Q}) -> delete_queue(Chan, Q) end}, + fun ({Chan, Q}) -> + delete_queue(Chan, Q), + force_metric_gc(Config) + end}, [ channel_queue_metrics, channel_queue_exchange_metrics ]), ok = rabbit_ct_client_helpers:close_connection(Conn), @@ -258,8 +268,10 @@ add_rem_counter(Config, {Initial, Ops}, {AddFun, RemFun}, Tables) -> (_, S) -> S end, {Initial, Things}, Ops), + force_metric_gc(Config), TabLens = lists:map(fun(T) -> - length(read_table_rpc(Config, T)) end, Tables), + length(read_table_rpc(Config, T)) + end, Tables), [RemFun(Thing) || Thing <- Things1], [FinalLen] == lists:usort(TabLens). @@ -270,9 +282,11 @@ connection(Config) -> [_] = read_table_rpc(Config, connection_metrics), [_] = read_table_rpc(Config, connection_coarse_metrics), ok = rabbit_ct_client_helpers:close_connection(Conn), + force_metric_gc(Config), [] = read_table_rpc(Config, connection_created), [] = read_table_rpc(Config, connection_metrics), - [] = read_table_rpc(Config, connection_coarse_metrics). + [] = read_table_rpc(Config, connection_coarse_metrics), + ok. channel(Config) -> Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), @@ -308,10 +322,12 @@ channel_queue_delete_queue(Config) -> [_] = read_table_rpc(Config, channel_queue_exchange_metrics), delete_queue(Chan, Queue), + force_metric_gc(Config), % ensure removal of queue cleans up channel_queue metrics [] = read_table_rpc(Config, channel_queue_exchange_metrics), [] = read_table_rpc(Config, channel_queue_metrics), - ok = rabbit_ct_client_helpers:close_connection(Conn). + ok = rabbit_ct_client_helpers:close_connection(Conn), + ok. channel_queue_exchange_consumer_close_connection(Config) -> Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), @@ -335,10 +351,13 @@ channel_queue_exchange_consumer_close_connection(Config) -> ok = rabbit_ct_client_helpers:close_connection(Conn), % ensure cleanup happened + force_metric_gc(Config), [] = read_table_rpc(Config, channel_exchange_metrics), [] = read_table_rpc(Config, channel_queue_exchange_metrics), [] = read_table_rpc(Config, channel_queue_metrics), - [] = read_table_rpc(Config, consumer_created). + [] = read_table_rpc(Config, consumer_created), + ok. + %% ------------------------------------------------------------------- @@ -371,6 +390,16 @@ force_channel_stats(Config) -> read_table_rpc(Config, Table) -> rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, read_table, [Table]). +clean_core_metrics(Config) -> + [ rabbit_ct_broker_helpers:rpc(Config, 0, ets, delete_all_objects, [Table]) + || {Table, _} <- ?CORE_TABLES]. + read_table(Table) -> ets:tab2list(Table). +force_metric_gc(Config) -> + timer:sleep(100), + rabbit_ct_broker_helpers:rpc(Config, 0, erlang, send, + [rabbit_core_metrics_gc, start_gc]), + rabbit_ct_broker_helpers:rpc(Config, 0, gen_server, call, + [rabbit_core_metrics_gc, test]). diff --git a/test/rabbit_core_metrics_gc_SUITE.erl b/test/rabbit_core_metrics_gc_SUITE.erl index b7311463d5..c3997f31e2 100644 --- a/test/rabbit_core_metrics_gc_SUITE.erl +++ b/test/rabbit_core_metrics_gc_SUITE.erl @@ -106,7 +106,6 @@ queue_metrics(Config) -> [queue_metrics, Q]), [_] = rabbit_ct_broker_helpers:rpc(Config, A, ets, lookup, [queue_coarse_metrics, Q]), - %% Trigger gc. When the gen_server:call returns, the gc has already finished. rabbit_ct_broker_helpers:rpc(Config, A, erlang, send, [rabbit_core_metrics_gc, start_gc]), rabbit_ct_broker_helpers:rpc(Config, A, gen_server, call, [rabbit_core_metrics_gc, test]), |
