diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2020-05-12 18:27:04 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2020-05-12 18:27:04 +0300 |
| commit | 76af44a781cc74c96ee0d63e301d3ddc17c1c80c (patch) | |
| tree | 729e0004fb17c5748883b7238262290ca2408fe5 /src | |
| parent | 776278c05cdf88b7561f828e794ab06083ca2b79 (diff) | |
| download | rabbitmq-server-git-76af44a781cc74c96ee0d63e301d3ddc17c1c80c.tar.gz | |
Emit a consumer.created event for quorum queues
Closes #2341
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 43 |
1 files changed, 30 insertions, 13 deletions
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 62ed6e3ef9..e6f2b4ebe8 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -198,11 +198,7 @@ cancel_consumer_handler(QName, {ConsumerTag, ChPid}) -> cancel_consumer(QName, ChPid, ConsumerTag) -> catch rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName), - rabbit_event:notify(consumer_deleted, - [{consumer_tag, ConsumerTag}, - {channel, ChPid}, - {queue, QName}, - {user_who_performed_action, ?INTERNAL_USER}]). + emit_consumer_deleted(ChPid, ConsumerTag, QName, ?INTERNAL_USER). local_or_remote_handler(ChPid, Module, Function, Args) -> Node = node(ChPid), @@ -637,7 +633,8 @@ basic_consume(Q, NoAck, ChPid, Other -> Other end, %% consumer info is used to describe the consumer properties - ConsumerMeta = #{ack => not NoAck, + AckRequired = not NoAck, + ConsumerMeta = #{ack => AckRequired, prefetch => ConsumerPrefetchCount, args => Args, username => ActingUser}, @@ -648,7 +645,6 @@ basic_consume(Q, NoAck, ChPid, case ra:local_query(QPid, fun rabbit_fifo:query_single_active_consumer/1) of {ok, {_, SacResult}, _} -> - SingleActiveConsumerOn = single_active_consumer_on(Q), {IsSingleActiveConsumer, ActivityStatus} = case {SingleActiveConsumerOn, SacResult} of {false, _} -> @@ -658,12 +654,14 @@ basic_consume(Q, NoAck, ChPid, _ -> {false, waiting} end, - - %% TODO: emit as rabbit_fifo effect - rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck, QName, - ConsumerPrefetchCount, IsSingleActiveConsumer, - ActivityStatus, Args), + rabbit_core_metrics:consumer_created( + ChPid, ConsumerTag, ExclusiveConsume, + AckRequired, QName, + ConsumerPrefetchCount, IsSingleActiveConsumer, + ActivityStatus, Args), + emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, + AckRequired, QName, Prefetch, + Args, none, ActingUser), {ok, QState}; {error, Error} -> Error; @@ -678,6 +676,25 @@ basic_cancel(ConsumerTag, ChPid, OkMsg, QState0) -> maybe_send_reply(ChPid, OkMsg), rabbit_fifo_client:cancel_checkout(quorum_ctag(ConsumerTag), QState0). +emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, PrefetchCount, Args, Ref, ActingUser) -> + rabbit_event:notify(consumer_created, + [{consumer_tag, CTag}, + {exclusive, Exclusive}, + {ack_required, AckRequired}, + {channel, ChPid}, + {queue, QName}, + {prefetch_count, PrefetchCount}, + {arguments, Args}, + {user_who_performed_action, ActingUser}], + Ref). + +emit_consumer_deleted(ChPid, ConsumerTag, QName, ActingUser) -> + rabbit_event:notify(consumer_deleted, + [{consumer_tag, ConsumerTag}, + {channel, ChPid}, + {queue, QName}, + {user_who_performed_action, ActingUser}]). + -spec stateless_deliver(amqqueue:ra_server_id(), rabbit_types:delivery()) -> 'ok'. stateless_deliver(ServerId, Delivery) -> |
