diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-09-25 11:22:37 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-09-25 11:22:37 +0100 |
| commit | 66d7135950732631d2063dda2628a37d03c0be99 (patch) | |
| tree | 7531d216a6ef7c3c07b52769f9d566f85adfa2e0 | |
| parent | 8e4209055392d6537c369ff574a48d6724dff456 (diff) | |
| download | rabbitmq-server-git-66d7135950732631d2063dda2628a37d03c0be99.tar.gz | |
When we add a new consumer we might be inactive; make sure we transition to active since consumers by definition start out unblocked.
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 6 |
1 files changed, 4 insertions, 2 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 4b1f07de0d..c60adb5b58 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -125,7 +125,8 @@ unacknowledged_message_count() -> lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]). add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Prefetch, Args, IsEmpty, - State = #state{consumers = Consumers}) -> + State = #state{consumers = Consumers, + use = CUInfo}) -> C = #cr{consumer_count = Count, limiter = Limiter} = ch_record(ChPid, LimiterPid), Limiter1 = case LimiterActive of @@ -144,7 +145,8 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Prefetch, Args, IsEmpty, ack_required = not NoAck, prefetch = Prefetch, args = Args}, - State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}. + State#state{consumers = add_consumer({ChPid, Consumer}, Consumers), + use = update_use(CUInfo, active)}. remove(ChPid, CTag, State = #state{consumers = Consumers}) -> case lookup_ch(ChPid) of |
