summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2018-12-12 10:50:50 +0000
committerkjnilsson <knilsson@pivotal.io>2018-12-12 10:57:01 +0000
commit0a21df17d97ead045bedd42bf2fd867687eeec17 (patch)
tree46ddeb73a9a63b60c97ae2306999356fddfde647
parent2eb31a5f39bc4d31b595db5674c93649bcda2844 (diff)
downloadrabbitmq-server-git-0a21df17d97ead045bedd42bf2fd867687eeec17.tar.gz
Ensure quorum queue consumers are cleaned up
Fixes a bug where the consumer_created metrics table wasn't cleared when a consumer was cancelled. Also removing some injected handlers from rabbit_fifo [#162583758]
-rw-r--r--src/rabbit_fifo.erl64
-rw-r--r--src/rabbit_quorum_queue.erl9
-rw-r--r--test/quorum_queue_SUITE.erl5
3 files changed, 29 insertions, 49 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 00d0db0b8a..1f047027b9 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -45,12 +45,6 @@
dehydrate_state/1
]).
--ifdef(TEST).
--export([
- metrics_handler/1
- ]).
--endif.
-
-type raw_msg() :: term().
%% The raw message. It is opaque to rabbit_fifo.
@@ -153,6 +147,7 @@
-record(state,
{name :: atom(),
+ queue_resource :: rabbit_types:r('queue'),
shadow_copy_interval = ?SHADOW_COPY_INTERVAL :: non_neg_integer(),
% unassigned messages
messages = #{} :: #{msg_in_id() => indexed_msg()},
@@ -183,9 +178,7 @@
% needs to be part of snapshot
service_queue = queue:new() :: queue:queue(consumer_id()),
dead_letter_handler :: maybe(applied_mfa()),
- cancel_consumer_handler :: maybe(applied_mfa()),
become_leader_handler :: maybe(applied_mfa()),
- metrics_handler :: maybe(applied_mfa()),
%% This is a special field that is only used for snapshots
%% It represents the number of queued messages at the time the
%% dehydrated snapshot state was cached.
@@ -205,8 +198,6 @@
-type config() :: #{name := atom(),
dead_letter_handler => applied_mfa(),
become_leader_handler => applied_mfa(),
- cancel_consumer_handler => applied_mfa(),
- metrics_handler => applied_mfa(),
shadow_copy_interval => non_neg_integer()}.
-export_type([protocol/0,
@@ -223,19 +214,17 @@
config/0]).
-spec init(config()) -> {state(), ra_machine:effects()}.
-init(#{name := Name} = Conf) ->
- update_state(Conf, #state{name = Name}).
+init(#{name := Name,
+ queue_resource := Resource} = Conf) ->
+ update_state(Conf, #state{name = Name,
+ queue_resource = Resource}).
update_state(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
- CCH = maps:get(cancel_consumer_handler, Conf, undefined),
BLH = maps:get(become_leader_handler, Conf, undefined),
- MH = maps:get(metrics_handler, Conf, undefined),
SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL),
State#state{dead_letter_handler = DLH,
- cancel_consumer_handler = CCH,
become_leader_handler = BLH,
- metrics_handler = MH,
shadow_copy_interval = SHI}.
% msg_ids are scoped per consumer
@@ -504,21 +493,17 @@ state_enter(_, _) ->
-spec tick(non_neg_integer(), state()) -> ra_machine:effects().
tick(_Ts, #state{name = Name,
+ queue_resource = QName,
messages = Messages,
ra_indexes = Indexes,
- metrics_handler = MH,
consumers = Cons} = State) ->
Metrics = {Name,
maps:size(Messages), % Ready
num_checked_out(State), % checked out
rabbit_fifo_index:size(Indexes), %% Total
maps:size(Cons)}, % Consumers
- case MH of
- undefined ->
- [{aux, emit}];
- {Mod, Fun, Args} ->
- [{mod_call, Mod, Fun, Args ++ [Metrics]}, {aux, emit}]
- end.
+ [{mod_call, rabbit_quorum_queue,
+ update_metrics, [QName, Metrics]}, {aux, emit}].
-spec overview(state()) -> map().
overview(#state{consumers = Cons,
@@ -627,11 +612,11 @@ num_checked_out(#state{consumers = Cons}) ->
end, 0, maps:values(Cons)).
cancel_consumer(ConsumerId,
- {Effects0, #state{consumers = C0, name = Name} = S0}) ->
+ {Effects0, #state{consumers = C0} = S0}) ->
case maps:take(ConsumerId, C0) of
{#consumer{checked_out = Checked0}, Cons} ->
S = return_all(S0, Checked0),
- Effects = cancel_consumer_effects(ConsumerId, Name, S, Effects0),
+ Effects = cancel_consumer_effects(ConsumerId, S, Effects0),
case maps:size(Cons) of
0 ->
{[{aux, inactive} | Effects], S#state{consumers = Cons}};
@@ -787,13 +772,9 @@ dead_letter_effects(Discarded,
end, [], Discarded),
[{mod_call, Mod, Fun, Args ++ [DeadLetters]} | Effects].
-cancel_consumer_effects(_, _, #state{cancel_consumer_handler = undefined},
- Effects) ->
- Effects;
-cancel_consumer_effects(Pid, Name,
- #state{cancel_consumer_handler = {Mod, Fun, Args}},
- Effects) ->
- [{mod_call, Mod, Fun, Args ++ [Pid, Name]} | Effects].
+cancel_consumer_effects(ConsumerId, #state{queue_resource = QName}, Effects) ->
+ [{mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [QName, ConsumerId]} | Effects].
update_smallest_raft_index(IncomingRaftIdx, OldIndexes,
#state{ra_indexes = Indexes,
@@ -1098,11 +1079,8 @@ dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
test_init(Name) ->
init(#{name => Name,
- shadow_copy_interval => 0,
- metrics_handler => {?MODULE, metrics_handler, []}}).
-
-metrics_handler(_) ->
- ok.
+ queue_resource => queue_resource,
+ shadow_copy_interval => 0}).
enq_enq_checkout_test() ->
Cid = {<<"enq_enq_checkout_test">>, self()},
@@ -1347,7 +1325,7 @@ down_with_noproc_consumer_returns_unsettled_test() ->
Cid = {<<"down_consumer_returns_unsettled_test">>, self()},
{State0, [_, _]} = enq(1, 1, second, test_init(test)),
{State1, [{monitor, process, Pid} | _]} = check(Cid, 2, State0),
- {State2, [_, _], _} = apply(meta(3), {down, Pid, noproc}, [], State1),
+ {State2, _, _} = apply(meta(3), {down, Pid, noproc}, [], State1),
{_State, Effects} = check(Cid, 4, State2),
?ASSERT_EFF({monitor, process, _}, Effects),
ok.
@@ -1425,6 +1403,7 @@ discarded_message_without_dead_letter_handler_is_removed_test() ->
discarded_message_with_dead_letter_handler_emits_mod_call_effect_test() ->
Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()},
State00 = init(#{name => test,
+ queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>),
dead_letter_handler =>
{somemod, somefun, [somearg]}}),
{State0, [_, _]} = enq(1, 1, first, State00),
@@ -1447,14 +1426,12 @@ tick_test() ->
{S3, {_, _}} = deq(4, Cid2, unsettled, S2),
{S4, _, _} = apply(meta(5), {return, [MsgId], Cid}, [], S3),
- [{mod_call, _, _, [{test, 1, 1, 2, 1}]}, {aux, emit}] = tick(1, S4),
+ [{mod_call, _, _, [_, {test, 1, 1, 2, 1}]}, {aux, emit}] = tick(1, S4),
ok.
enq_deq_snapshot_recover_test() ->
Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
- % OthPid = spawn(fun () -> ok end),
- % Oth = {<<"oth">>, OthPid},
Commands = [
{enqueue, self(), 1, one},
{enqueue, self(), 2, two},
@@ -1690,6 +1667,7 @@ duplicate_delivery_test() ->
state_enter_test() ->
S0 = init(#{name => the_name,
+ queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>),
become_leader_handler => {m, f, [a]}}),
[{mod_call, m, f, [a, the_name]}] = state_enter(leader, S0),
ok.
@@ -1789,7 +1767,9 @@ run_log(InitState, Entries) ->
aux_test() ->
_ = ra_machine_ets:start_link(),
Aux0 = init_aux(aux_test),
- MacState = init(#{name => aux_test}),
+ MacState = init(#{name => aux_test,
+ queue_resource =>
+ rabbit_misc:r(<<"/">>, queue, <<"test">>)}),
Log = undefined,
{no_reply, Aux, undefined} = handle_aux(leader, cast, active, Aux0,
Log, MacState),
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 5e854a4657..47b8a6f643 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -26,7 +26,7 @@
-export([dead_letter_publish/4]).
-export([queue_name/1]).
-export([cluster_state/1, status/2]).
--export([cancel_consumer_handler/3, cancel_consumer/3]).
+-export([cancel_consumer_handler/2, cancel_consumer/3]).
-export([become_leader/2, update_metrics/2]).
-export([rpc_delete_metrics/1]).
-export([format/1]).
@@ -150,20 +150,17 @@ declare(#amqqueue{name = QName,
Ex
end.
-
-
ra_machine(Q) ->
{module, rabbit_fifo, ra_machine_config(Q)}.
ra_machine_config(Q = #amqqueue{name = QName}) ->
#{dead_letter_handler => dlx_mfa(Q),
- cancel_consumer_handler => {?MODULE, cancel_consumer, [QName]},
+ queue_resource => QName,
become_leader_handler => {?MODULE, become_leader, [QName]},
metrics_handler => {?MODULE, update_metrics, [QName]}}.
-cancel_consumer_handler(QName, {ConsumerTag, ChPid}, _Name) ->
+cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
Node = node(ChPid),
- % QName = queue_name(Name),
case Node == node() of
true -> cancel_consumer(QName, ChPid, ConsumerTag);
false ->
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 4190903409..3464494cef 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -1503,7 +1503,10 @@ basic_cancel(Config) ->
wait_for_messages_pending_ack(Servers, RaName, 1),
amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}),
wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0)
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ [] = rpc:call(Server, ets, tab2list, [consumer_created])
+ after 5000 ->
+ exit(basic_deliver_timeout)
end.
purge(Config) ->