diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-18 00:46:16 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-09-18 00:46:16 +0100 |
| commit | 7678f8aeb43c9a3d08229efaa217027548296a13 (patch) | |
| tree | 7f5b4694482643eb622271af66dec7a85a07fcf8 /src | |
| parent | 739ee88e529aa0ffcd5e37d24a2d58b92f3d177d (diff) | |
| parent | ff8bf657ab87743b19d872b8766b42c9844dc1dd (diff) | |
| download | rabbitmq-server-git-7678f8aeb43c9a3d08229efaa217027548296a13.tar.gz | |
merge from default
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 95 |
1 files changed, 47 insertions, 48 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0cd03daf48..e4b48c4706 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -353,7 +353,8 @@ update_ch_record(C = #cr{consumer_count = ConsumerCount, case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount} of {0, 0, 0} -> ok = erase_ch_record(C); _ -> ok = store_ch_record(C) - end. + end, + C. store_ch_record(C = #cr{ch_pid = ChPid}) -> put({ch, ChPid}, C), @@ -367,6 +368,16 @@ erase_ch_record(#cr{ch_pid = ChPid, erase({ch, ChPid}), ok. +update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) -> + ok = rabbit_limiter:register(Limiter, self()), + update_ch_record(C#cr{consumer_count = 1}); +update_consumer_count(C = #cr{consumer_count = 1, limiter = Limiter}, -1) -> + ok = rabbit_limiter:unregister(Limiter, self()), + update_ch_record(C#cr{consumer_count = 0, + limiter = rabbit_limiter:make_token()}); +update_consumer_count(C = #cr{consumer_count = Count}, Delta) -> + update_ch_record(C#cr{consumer_count = Count + Delta}). + all_ch_record() -> [C || {{ch, _}, C} <- get()]. block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) -> @@ -564,15 +575,6 @@ requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> BQS1 end, State). -process_acks(ChPid, AckTags, State, Fun) -> - case lookup_ch(ChPid) of - not_found -> - noreply(State); - C = #cr{acktags = ChAckTags} -> - update_ch_record(C#cr{acktags = subtract_acks(ChAckTags, AckTags)}), - noreply(Fun(State)) - end. - fetch(AckRequired, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), @@ -674,8 +676,16 @@ run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> run_message_queue(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}). -subtract_acks(A, B) when is_list(B) -> - lists:foldl(fun sets:del_element/2, A, B). +subtract_acks(ChPid, AckTags, State, Fun) -> + case lookup_ch(ChPid) of + not_found -> + State; + C = #cr{acktags = ChAckTags} -> + update_ch_record( + C#cr{acktags = lists:foldl(fun sets:del_element/2, + ChAckTags, AckTags)}), + Fun(State) + end. discard_delivery(#delivery{sender = ChPid, message = Message}, @@ -959,15 +969,10 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, in_use -> reply({error, exclusive_consume_unavailable}, State); ok -> - C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), + C = ch_record(ChPid), + C1 = update_consumer_count(C#cr{limiter = Limiter}, +1), Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, - C1 = C#cr{consumer_count = ConsumerCount +1, - limiter = Limiter}, - ok = case ConsumerCount of - 0 -> rabbit_limiter:register(Limiter, self()); - _ -> ok - end, ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; true -> ExistingHolder end, @@ -976,7 +981,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, ok = maybe_send_reply(ChPid, OkMsg), E = {ChPid, Consumer}, State2 = - case is_ch_blocked(C) of + case is_ch_blocked(C1) of true -> block_consumer(C1, E), State1; false -> update_ch_record(C1), @@ -994,15 +999,8 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, not_found -> ok = maybe_send_reply(ChPid, OkMsg), reply(ok, State); - C = #cr{consumer_count = ConsumerCount, - limiter = Limiter} -> - C1 = C#cr{consumer_count = ConsumerCount -1}, - update_ch_record( - case ConsumerCount of - 1 -> ok = rabbit_limiter:unregister(Limiter, self()), - C1#cr{limiter = rabbit_limiter:make_token()}; - _ -> C1 - end), + C -> + update_consumer_count(C, -1), emit_consumer_deleted(ChPid, ConsumerTag), ok = maybe_send_reply(ChPid, OkMsg), NewState = @@ -1047,8 +1045,9 @@ handle_call(purge, _From, State = #q{backing_queue = BQ, handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), - noreply(process_acks(ChPid, AckTags, State, - fun (State1) -> requeue_and_run(AckTags, State1) end)). + noreply(subtract_acks( + ChPid, AckTags, State, + fun (State1) -> requeue_and_run(AckTags, State1) end)). handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); @@ -1058,25 +1057,25 @@ handle_cast({deliver, Delivery}, State) -> noreply(deliver_or_enqueue(Delivery, State)); handle_cast({ack, AckTags, ChPid}, State) -> - noreply(process_acks(ChPid, AckTags, State, - fun (State1 = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - {_Guids, BQS1} = BQ:ack(AckTags, BQS), - State1#q{backing_queue_state = BQS1} - end)); + noreply(subtract_acks( + ChPid, AckTags, State, + fun (State1 = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {_Guids, BQS1} = BQ:ack(AckTags, BQS), + State1#q{backing_queue_state = BQS1} + end)); handle_cast({reject, AckTags, Requeue, ChPid}, State) -> - noreply(process_acks(ChPid, AckTags, State, - fun (State1 = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - case Requeue of - true -> requeue_and_run(AckTags, State1); - false -> {_Guids, BQS1} = - BQ:ack(AckTags, BQS), - State1#q{ - backing_queue_state = BQS1} - end - end)); + noreply(subtract_acks( + ChPid, AckTags, State, + fun (State1 = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + case Requeue of + true -> requeue_and_run(AckTags, State1); + false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS), + State1#q{backing_queue_state = BQS1} + end + end)); handle_cast(delete_immediately, State) -> {stop, normal, State}; |
