diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2017-01-10 14:59:58 +0100 |
|---|---|---|
| committer | Diana Corbacho <diana@rabbitmq.com> | 2017-01-15 20:25:37 +0000 |
| commit | 7b1f550c17d9cd7da4dc8394df7941bd7dd9fd1b (patch) | |
| tree | 6f8b7e2ea8bc0cd2c35d14002941d238e59063e9 | |
| parent | 3d7e53723c3855d1e874be8a32f3258347d95518 (diff) | |
| download | rabbitmq-server-git-7b1f550c17d9cd7da4dc8394df7941bd7dd9fd1b.tar.gz | |
Add user ID to consumers
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 18 |
1 files changed, 10 insertions, 8 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index cd58de95dd..2bd3ebf75e 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -17,7 +17,7 @@ -module(rabbit_queue_consumers). -export([new/0, max_active_priority/1, inactive/1, all/1, count/0, - unacknowledged_message_count/0, add/9, remove/3, erase_ch/2, + unacknowledged_message_count/0, add/10, remove/3, erase_ch/2, send_drained/0, deliver/3, record_ack/3, subtract_acks/3, possibly_unblock/3, resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, @@ -32,7 +32,7 @@ -record(state, {consumers, use}). --record(consumer, {tag, ack_required, prefetch, args}). +-record(consumer, {tag, ack_required, prefetch, args, user}). %% These are held in our process dictionary -record(cr, {ch_pid, @@ -68,7 +68,8 @@ -spec count() -> non_neg_integer(). -spec unacknowledged_message_count() -> non_neg_integer(). -spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(), - non_neg_integer(), rabbit_framing:amqp_table(), boolean(), state()) + non_neg_integer(), rabbit_framing:amqp_table(), boolean(), + rabbit_types:username(), state()) -> state(). -spec remove(ch(), rabbit_types:ctag(), state()) -> 'not_found' | state(). @@ -113,8 +114,8 @@ consumers(Consumers, Acc) -> priority_queue:fold( fun ({ChPid, Consumer}, _P, Acc1) -> #consumer{tag = CTag, ack_required = Ack, prefetch = Prefetch, - args = Args} = Consumer, - [{ChPid, CTag, Ack, Prefetch, Args} | Acc1] + args = Args, user = Username} = Consumer, + [{ChPid, CTag, Ack, Prefetch, Args, Username} | Acc1] end, Acc, Consumers). count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). @@ -123,8 +124,8 @@ unacknowledged_message_count() -> lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]). add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Prefetch, Args, IsEmpty, - State = #state{consumers = Consumers, - use = CUInfo}) -> + Username, State = #state{consumers = Consumers, + use = CUInfo}) -> C = #cr{consumer_count = Count, limiter = Limiter} = ch_record(ChPid, LimiterPid), Limiter1 = case LimiterActive of @@ -142,7 +143,8 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Prefetch, Args, IsEmpty, Consumer = #consumer{tag = CTag, ack_required = not NoAck, prefetch = Prefetch, - args = Args}, + args = Args, + user = Username}, State#state{consumers = add_consumer({ChPid, Consumer}, Consumers), use = update_use(CUInfo, active)}. |
