diff options
| author | kjnilsson <knilsson@pivotal.io> | 2018-12-12 10:50:50 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2018-12-12 10:57:01 +0000 |
| commit | 0a21df17d97ead045bedd42bf2fd867687eeec17 (patch) | |
| tree | 46ddeb73a9a63b60c97ae2306999356fddfde647 | |
| parent | 2eb31a5f39bc4d31b595db5674c93649bcda2844 (diff) | |
| download | rabbitmq-server-git-0a21df17d97ead045bedd42bf2fd867687eeec17.tar.gz | |
Ensure quorum queue consumers are cleaned up
Fixes a bug where the consumer_created metrics table wasn't cleared when
a consumer was cancelled.
Also removing some injected handlers from rabbit_fifo
[#162583758]
| -rw-r--r-- | src/rabbit_fifo.erl | 64 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 9 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 5 |
3 files changed, 29 insertions, 49 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 00d0db0b8a..1f047027b9 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -45,12 +45,6 @@ dehydrate_state/1 ]). --ifdef(TEST). --export([ - metrics_handler/1 - ]). --endif. - -type raw_msg() :: term(). %% The raw message. It is opaque to rabbit_fifo. @@ -153,6 +147,7 @@ -record(state, {name :: atom(), + queue_resource :: rabbit_types:r('queue'), shadow_copy_interval = ?SHADOW_COPY_INTERVAL :: non_neg_integer(), % unassigned messages messages = #{} :: #{msg_in_id() => indexed_msg()}, @@ -183,9 +178,7 @@ % needs to be part of snapshot service_queue = queue:new() :: queue:queue(consumer_id()), dead_letter_handler :: maybe(applied_mfa()), - cancel_consumer_handler :: maybe(applied_mfa()), become_leader_handler :: maybe(applied_mfa()), - metrics_handler :: maybe(applied_mfa()), %% This is a special field that is only used for snapshots %% It represents the number of queued messages at the time the %% dehydrated snapshot state was cached. @@ -205,8 +198,6 @@ -type config() :: #{name := atom(), dead_letter_handler => applied_mfa(), become_leader_handler => applied_mfa(), - cancel_consumer_handler => applied_mfa(), - metrics_handler => applied_mfa(), shadow_copy_interval => non_neg_integer()}. -export_type([protocol/0, @@ -223,19 +214,17 @@ config/0]). -spec init(config()) -> {state(), ra_machine:effects()}. -init(#{name := Name} = Conf) -> - update_state(Conf, #state{name = Name}). +init(#{name := Name, + queue_resource := Resource} = Conf) -> + update_state(Conf, #state{name = Name, + queue_resource = Resource}). update_state(Conf, State) -> DLH = maps:get(dead_letter_handler, Conf, undefined), - CCH = maps:get(cancel_consumer_handler, Conf, undefined), BLH = maps:get(become_leader_handler, Conf, undefined), - MH = maps:get(metrics_handler, Conf, undefined), SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL), State#state{dead_letter_handler = DLH, - cancel_consumer_handler = CCH, become_leader_handler = BLH, - metrics_handler = MH, shadow_copy_interval = SHI}. % msg_ids are scoped per consumer @@ -504,21 +493,17 @@ state_enter(_, _) -> -spec tick(non_neg_integer(), state()) -> ra_machine:effects(). tick(_Ts, #state{name = Name, + queue_resource = QName, messages = Messages, ra_indexes = Indexes, - metrics_handler = MH, consumers = Cons} = State) -> Metrics = {Name, maps:size(Messages), % Ready num_checked_out(State), % checked out rabbit_fifo_index:size(Indexes), %% Total maps:size(Cons)}, % Consumers - case MH of - undefined -> - [{aux, emit}]; - {Mod, Fun, Args} -> - [{mod_call, Mod, Fun, Args ++ [Metrics]}, {aux, emit}] - end. + [{mod_call, rabbit_quorum_queue, + update_metrics, [QName, Metrics]}, {aux, emit}]. -spec overview(state()) -> map(). overview(#state{consumers = Cons, @@ -627,11 +612,11 @@ num_checked_out(#state{consumers = Cons}) -> end, 0, maps:values(Cons)). cancel_consumer(ConsumerId, - {Effects0, #state{consumers = C0, name = Name} = S0}) -> + {Effects0, #state{consumers = C0} = S0}) -> case maps:take(ConsumerId, C0) of {#consumer{checked_out = Checked0}, Cons} -> S = return_all(S0, Checked0), - Effects = cancel_consumer_effects(ConsumerId, Name, S, Effects0), + Effects = cancel_consumer_effects(ConsumerId, S, Effects0), case maps:size(Cons) of 0 -> {[{aux, inactive} | Effects], S#state{consumers = Cons}}; @@ -787,13 +772,9 @@ dead_letter_effects(Discarded, end, [], Discarded), [{mod_call, Mod, Fun, Args ++ [DeadLetters]} | Effects]. -cancel_consumer_effects(_, _, #state{cancel_consumer_handler = undefined}, - Effects) -> - Effects; -cancel_consumer_effects(Pid, Name, - #state{cancel_consumer_handler = {Mod, Fun, Args}}, - Effects) -> - [{mod_call, Mod, Fun, Args ++ [Pid, Name]} | Effects]. +cancel_consumer_effects(ConsumerId, #state{queue_resource = QName}, Effects) -> + [{mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [QName, ConsumerId]} | Effects]. update_smallest_raft_index(IncomingRaftIdx, OldIndexes, #state{ra_indexes = Indexes, @@ -1098,11 +1079,8 @@ dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> test_init(Name) -> init(#{name => Name, - shadow_copy_interval => 0, - metrics_handler => {?MODULE, metrics_handler, []}}). - -metrics_handler(_) -> - ok. + queue_resource => queue_resource, + shadow_copy_interval => 0}). enq_enq_checkout_test() -> Cid = {<<"enq_enq_checkout_test">>, self()}, @@ -1347,7 +1325,7 @@ down_with_noproc_consumer_returns_unsettled_test() -> Cid = {<<"down_consumer_returns_unsettled_test">>, self()}, {State0, [_, _]} = enq(1, 1, second, test_init(test)), {State1, [{monitor, process, Pid} | _]} = check(Cid, 2, State0), - {State2, [_, _], _} = apply(meta(3), {down, Pid, noproc}, [], State1), + {State2, _, _} = apply(meta(3), {down, Pid, noproc}, [], State1), {_State, Effects} = check(Cid, 4, State2), ?ASSERT_EFF({monitor, process, _}, Effects), ok. @@ -1425,6 +1403,7 @@ discarded_message_without_dead_letter_handler_is_removed_test() -> discarded_message_with_dead_letter_handler_emits_mod_call_effect_test() -> Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()}, State00 = init(#{name => test, + queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>), dead_letter_handler => {somemod, somefun, [somearg]}}), {State0, [_, _]} = enq(1, 1, first, State00), @@ -1447,14 +1426,12 @@ tick_test() -> {S3, {_, _}} = deq(4, Cid2, unsettled, S2), {S4, _, _} = apply(meta(5), {return, [MsgId], Cid}, [], S3), - [{mod_call, _, _, [{test, 1, 1, 2, 1}]}, {aux, emit}] = tick(1, S4), + [{mod_call, _, _, [_, {test, 1, 1, 2, 1}]}, {aux, emit}] = tick(1, S4), ok. enq_deq_snapshot_recover_test() -> Tag = atom_to_binary(?FUNCTION_NAME, utf8), Cid = {Tag, self()}, - % OthPid = spawn(fun () -> ok end), - % Oth = {<<"oth">>, OthPid}, Commands = [ {enqueue, self(), 1, one}, {enqueue, self(), 2, two}, @@ -1690,6 +1667,7 @@ duplicate_delivery_test() -> state_enter_test() -> S0 = init(#{name => the_name, + queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>), become_leader_handler => {m, f, [a]}}), [{mod_call, m, f, [a, the_name]}] = state_enter(leader, S0), ok. @@ -1789,7 +1767,9 @@ run_log(InitState, Entries) -> aux_test() -> _ = ra_machine_ets:start_link(), Aux0 = init_aux(aux_test), - MacState = init(#{name => aux_test}), + MacState = init(#{name => aux_test, + queue_resource => + rabbit_misc:r(<<"/">>, queue, <<"test">>)}), Log = undefined, {no_reply, Aux, undefined} = handle_aux(leader, cast, active, Aux0, Log, MacState), diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 5e854a4657..47b8a6f643 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -26,7 +26,7 @@ -export([dead_letter_publish/4]). -export([queue_name/1]). -export([cluster_state/1, status/2]). --export([cancel_consumer_handler/3, cancel_consumer/3]). +-export([cancel_consumer_handler/2, cancel_consumer/3]). -export([become_leader/2, update_metrics/2]). -export([rpc_delete_metrics/1]). -export([format/1]). @@ -150,20 +150,17 @@ declare(#amqqueue{name = QName, Ex end. - - ra_machine(Q) -> {module, rabbit_fifo, ra_machine_config(Q)}. ra_machine_config(Q = #amqqueue{name = QName}) -> #{dead_letter_handler => dlx_mfa(Q), - cancel_consumer_handler => {?MODULE, cancel_consumer, [QName]}, + queue_resource => QName, become_leader_handler => {?MODULE, become_leader, [QName]}, metrics_handler => {?MODULE, update_metrics, [QName]}}. -cancel_consumer_handler(QName, {ConsumerTag, ChPid}, _Name) -> +cancel_consumer_handler(QName, {ConsumerTag, ChPid}) -> Node = node(ChPid), - % QName = queue_name(Name), case Node == node() of true -> cancel_consumer(QName, ChPid, ConsumerTag); false -> diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 4190903409..3464494cef 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -1503,7 +1503,10 @@ basic_cancel(Config) -> wait_for_messages_pending_ack(Servers, RaName, 1), amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}), wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0) + wait_for_messages_pending_ack(Servers, RaName, 0), + [] = rpc:call(Server, ets, tab2list, [consumer_created]) + after 5000 -> + exit(basic_deliver_timeout) end. purge(Config) -> |
