summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJerry Kuch <jerryk@vmware.com>2010-11-11 09:54:43 +0000
committerJerry Kuch <jerryk@vmware.com>2010-11-11 09:54:43 +0000
commit60b28b903d118ca5302b948f25f88f5b5b775c3e (patch)
treea0778ea3e89045b7a153caed169a4e8c4dadfd84
parent477bcb7e42b7cac12ca0379e8b8cf580c0088e34 (diff)
downloadrabbitmq-server-git-60b28b903d118ca5302b948f25f88f5b5b775c3e.tar.gz
basic_cancel now unregisters limiter whenever consumer count zeroes; ch record updates now require zero unsent message count as condition for erasure.
-rw-r--r--src/rabbit_amqqueue_process.erl32
1 files changed, 21 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 48a2287f60..6d212157a6 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -320,16 +320,17 @@ store_ch_record(C = #cr{ch_pid = ChPid}) ->
%% a transaction, we should delete its record rather than storing it.
%% If the ch record was stored, this function returns true; otherwise,
%% the ch record is erased and this function returns false.
-maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount,
- limiter_pid = LimiterPid,
- acktags = ChAckTags,
- txn = Txn}) ->
- case {sets:size(ChAckTags), ConsumerCount, Txn} of
- {0, 0, none} -> demonitor_and_erase_ch_record(C),
- ok = rabbit_limiter:unregister(LimiterPid, self()),
- false;
- _ -> store_ch_record(C),
- true
+maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount,
+ limiter_pid = LimiterPid,
+ acktags = ChAckTags,
+ txn = Txn,
+ unsent_message_count = UnsentMessageCount}) ->
+ case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount, Txn} of
+ {0, 0, 0, none} -> demonitor_and_erase_ch_record(C),
+ ok = rabbit_limiter:unregister(LimiterPid, self()),
+ false;
+ _ -> store_ch_record(C),
+ true
end.
all_ch_record() ->
@@ -841,8 +842,17 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
not_found ->
ok = maybe_send_reply(ChPid, OkMsg),
reply(ok, State);
- C = #cr{consumer_count = ConsumerCount} ->
+ C = #cr{consumer_count = ConsumerCount,
+ limiter_pid = LimiterPid} ->
C1 = C#cr{consumer_count = ConsumerCount-1},
+ %% The consumer count falling to zero is a sufficient
+ %% condition for unregistering the limiter regardless of
+ %% whether or not we can dispose of the ch record
+ %% entirely.
+ case ConsumerCount-1 of
+ 0 -> ok = rabbit_limiter:unregister(LimiterPid, self());
+ _ -> ok
+ end,
maybe_store_ch_record(C1),
ok = maybe_send_reply(ChPid, OkMsg),
NewState =