summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2017-01-10 14:59:58 +0100
committerDiana Corbacho <diana@rabbitmq.com>2017-01-15 20:25:37 +0000
commit7b1f550c17d9cd7da4dc8394df7941bd7dd9fd1b (patch)
tree6f8b7e2ea8bc0cd2c35d14002941d238e59063e9
parent3d7e53723c3855d1e874be8a32f3258347d95518 (diff)
downloadrabbitmq-server-git-7b1f550c17d9cd7da4dc8394df7941bd7dd9fd1b.tar.gz
Add user ID to consumers
-rw-r--r--src/rabbit_queue_consumers.erl18
1 files changed, 10 insertions, 8 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index cd58de95dd..2bd3ebf75e 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -17,7 +17,7 @@
-module(rabbit_queue_consumers).
-export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
- unacknowledged_message_count/0, add/9, remove/3, erase_ch/2,
+ unacknowledged_message_count/0, add/10, remove/3, erase_ch/2,
send_drained/0, deliver/3, record_ack/3, subtract_acks/3,
possibly_unblock/3,
resume_fun/0, notify_sent_fun/1, activate_limit_fun/0,
@@ -32,7 +32,7 @@
-record(state, {consumers, use}).
--record(consumer, {tag, ack_required, prefetch, args}).
+-record(consumer, {tag, ack_required, prefetch, args, user}).
%% These are held in our process dictionary
-record(cr, {ch_pid,
@@ -68,7 +68,8 @@
-spec count() -> non_neg_integer().
-spec unacknowledged_message_count() -> non_neg_integer().
-spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(),
- non_neg_integer(), rabbit_framing:amqp_table(), boolean(), state())
+ non_neg_integer(), rabbit_framing:amqp_table(), boolean(),
+ rabbit_types:username(), state())
-> state().
-spec remove(ch(), rabbit_types:ctag(), state()) ->
'not_found' | state().
@@ -113,8 +114,8 @@ consumers(Consumers, Acc) ->
priority_queue:fold(
fun ({ChPid, Consumer}, _P, Acc1) ->
#consumer{tag = CTag, ack_required = Ack, prefetch = Prefetch,
- args = Args} = Consumer,
- [{ChPid, CTag, Ack, Prefetch, Args} | Acc1]
+ args = Args, user = Username} = Consumer,
+ [{ChPid, CTag, Ack, Prefetch, Args, Username} | Acc1]
end, Acc, Consumers).
count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
@@ -123,8 +124,8 @@ unacknowledged_message_count() ->
lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).
add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Prefetch, Args, IsEmpty,
- State = #state{consumers = Consumers,
- use = CUInfo}) ->
+ Username, State = #state{consumers = Consumers,
+ use = CUInfo}) ->
C = #cr{consumer_count = Count,
limiter = Limiter} = ch_record(ChPid, LimiterPid),
Limiter1 = case LimiterActive of
@@ -142,7 +143,8 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Prefetch, Args, IsEmpty,
Consumer = #consumer{tag = CTag,
ack_required = not NoAck,
prefetch = Prefetch,
- args = Args},
+ args = Args,
+ user = Username},
State#state{consumers = add_consumer({ChPid, Consumer}, Consumers),
use = update_use(CUInfo, active)}.