diff options
| author | Jerry Kuch <jerryk@vmware.com> | 2010-11-11 09:54:43 +0000 |
|---|---|---|
| committer | Jerry Kuch <jerryk@vmware.com> | 2010-11-11 09:54:43 +0000 |
| commit | 60b28b903d118ca5302b948f25f88f5b5b775c3e (patch) | |
| tree | a0778ea3e89045b7a153caed169a4e8c4dadfd84 | |
| parent | 477bcb7e42b7cac12ca0379e8b8cf580c0088e34 (diff) | |
| download | rabbitmq-server-git-60b28b903d118ca5302b948f25f88f5b5b775c3e.tar.gz | |
basic_cancel now unregisters limiter whenever consumer count zeroes; ch record updates now require zero unsent message count as condition for erasure.
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 32 |
1 files changed, 21 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 48a2287f60..6d212157a6 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -320,16 +320,17 @@ store_ch_record(C = #cr{ch_pid = ChPid}) -> %% a transaction, we should delete its record rather than storing it. %% If the ch record was stored, this function returns true; otherwise, %% the ch record is erased and this function returns false. -maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount, - limiter_pid = LimiterPid, - acktags = ChAckTags, - txn = Txn}) -> - case {sets:size(ChAckTags), ConsumerCount, Txn} of - {0, 0, none} -> demonitor_and_erase_ch_record(C), - ok = rabbit_limiter:unregister(LimiterPid, self()), - false; - _ -> store_ch_record(C), - true +maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount, + limiter_pid = LimiterPid, + acktags = ChAckTags, + txn = Txn, + unsent_message_count = UnsentMessageCount}) -> + case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount, Txn} of + {0, 0, 0, none} -> demonitor_and_erase_ch_record(C), + ok = rabbit_limiter:unregister(LimiterPid, self()), + false; + _ -> store_ch_record(C), + true end. all_ch_record() -> @@ -841,8 +842,17 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, not_found -> ok = maybe_send_reply(ChPid, OkMsg), reply(ok, State); - C = #cr{consumer_count = ConsumerCount} -> + C = #cr{consumer_count = ConsumerCount, + limiter_pid = LimiterPid} -> C1 = C#cr{consumer_count = ConsumerCount-1}, + %% The consumer count falling to zero is a sufficient + %% condition for unregistering the limiter regardless of + %% whether or not we can dispose of the ch record + %% entirely. + case ConsumerCount-1 of + 0 -> ok = rabbit_limiter:unregister(LimiterPid, self()); + _ -> ok + end, maybe_store_ch_record(C1), ok = maybe_send_reply(ChPid, OkMsg), NewState = |
