summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2020-05-12 18:27:04 +0300
committerMichael Klishin <michael@clojurewerkz.org>2020-05-12 18:27:04 +0300
commit76af44a781cc74c96ee0d63e301d3ddc17c1c80c (patch)
tree729e0004fb17c5748883b7238262290ca2408fe5 /src
parent776278c05cdf88b7561f828e794ab06083ca2b79 (diff)
downloadrabbitmq-server-git-76af44a781cc74c96ee0d63e301d3ddc17c1c80c.tar.gz
Emit a consumer.created event for quorum queues
Closes #2341
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_quorum_queue.erl43
1 files changed, 30 insertions, 13 deletions
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 62ed6e3ef9..e6f2b4ebe8 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -198,11 +198,7 @@ cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
cancel_consumer(QName, ChPid, ConsumerTag) ->
catch rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName),
- rabbit_event:notify(consumer_deleted,
- [{consumer_tag, ConsumerTag},
- {channel, ChPid},
- {queue, QName},
- {user_who_performed_action, ?INTERNAL_USER}]).
+ emit_consumer_deleted(ChPid, ConsumerTag, QName, ?INTERNAL_USER).
local_or_remote_handler(ChPid, Module, Function, Args) ->
Node = node(ChPid),
@@ -637,7 +633,8 @@ basic_consume(Q, NoAck, ChPid,
Other -> Other
end,
%% consumer info is used to describe the consumer properties
- ConsumerMeta = #{ack => not NoAck,
+ AckRequired = not NoAck,
+ ConsumerMeta = #{ack => AckRequired,
prefetch => ConsumerPrefetchCount,
args => Args,
username => ActingUser},
@@ -648,7 +645,6 @@ basic_consume(Q, NoAck, ChPid,
case ra:local_query(QPid,
fun rabbit_fifo:query_single_active_consumer/1) of
{ok, {_, SacResult}, _} ->
-
SingleActiveConsumerOn = single_active_consumer_on(Q),
{IsSingleActiveConsumer, ActivityStatus} = case {SingleActiveConsumerOn, SacResult} of
{false, _} ->
@@ -658,12 +654,14 @@ basic_consume(Q, NoAck, ChPid,
_ ->
{false, waiting}
end,
-
- %% TODO: emit as rabbit_fifo effect
- rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck, QName,
- ConsumerPrefetchCount, IsSingleActiveConsumer,
- ActivityStatus, Args),
+ rabbit_core_metrics:consumer_created(
+ ChPid, ConsumerTag, ExclusiveConsume,
+ AckRequired, QName,
+ ConsumerPrefetchCount, IsSingleActiveConsumer,
+ ActivityStatus, Args),
+ emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
+ AckRequired, QName, Prefetch,
+ Args, none, ActingUser),
{ok, QState};
{error, Error} ->
Error;
@@ -678,6 +676,25 @@ basic_cancel(ConsumerTag, ChPid, OkMsg, QState0) ->
maybe_send_reply(ChPid, OkMsg),
rabbit_fifo_client:cancel_checkout(quorum_ctag(ConsumerTag), QState0).
+emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, PrefetchCount, Args, Ref, ActingUser) ->
+ rabbit_event:notify(consumer_created,
+ [{consumer_tag, CTag},
+ {exclusive, Exclusive},
+ {ack_required, AckRequired},
+ {channel, ChPid},
+ {queue, QName},
+ {prefetch_count, PrefetchCount},
+ {arguments, Args},
+ {user_who_performed_action, ActingUser}],
+ Ref).
+
+emit_consumer_deleted(ChPid, ConsumerTag, QName, ActingUser) ->
+ rabbit_event:notify(consumer_deleted,
+ [{consumer_tag, ConsumerTag},
+ {channel, ChPid},
+ {queue, QName},
+ {user_who_performed_action, ActingUser}]).
+
-spec stateless_deliver(amqqueue:ra_server_id(), rabbit_types:delivery()) -> 'ok'.
stateless_deliver(ServerId, Delivery) ->