summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJerry Kuch <jerryk@vmware.com>2010-11-10 10:34:25 +0000
committerJerry Kuch <jerryk@vmware.com>2010-11-10 10:34:25 +0000
commit4ff2f25b20ed4c461ec3bb41b2e99bc9c0a8222e (patch)
treed765edad7fffa67e36b9efa9e5c9de5df4f57f8d
parentbf3b2a931b0260653aac0967dced37312a58e194 (diff)
downloadrabbitmq-server-git-4ff2f25b20ed4c461ec3bb41b2e99bc9c0a8222e.tar.gz
Improve function naming and handle other potential leak spots in channel record updates.
-rw-r--r--src/rabbit_amqqueue_process.erl19
1 files changed, 8 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 1d4ac94ff9..94cbef8648 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -323,7 +323,7 @@ update_ch_record(C = #cr{consumer_count = ConsumerCount,
acktags = ChAckTags,
txn = Txn}) ->
case {sets:size(ChAckTags), ConsumerCount, Txn} of
- {0, 0, none} -> demonitor_and_erase_ch(C),
+ {0, 0, none} -> demonitor_and_erase_ch_record(C),
ok = rabbit_limiter:unregister(LimiterPid, self());
_ -> store_ch_record(C)
end.
@@ -515,7 +515,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
not_found ->
{ok, State};
C = #cr{ch_pid = ChPid, txn = Txn, acktags = ChAckTags} ->
- demonitor_and_erase_ch(C),
+ demonitor_and_erase_ch_record(C),
State1 = State#q{
exclusive_consumer = case Holder of
{ChPid, _} -> none;
@@ -574,7 +574,7 @@ commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
%% by the channel.
C = #cr{acktags = ChAckTags} = lookup_ch(ChPid),
ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- store_ch_record(C#cr{acktags = ChAckTags1, txn = none}),
+ update_ch_record(C#cr{acktags = ChAckTags1, txn = none}),
State#q{backing_queue_state = BQS1}.
rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ,
@@ -889,7 +889,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
noreply(State);
C = #cr{acktags = ChAckTags} ->
ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- store_ch_record(C#cr{acktags = ChAckTags1}),
+ update_ch_record(C#cr{acktags = ChAckTags1}),
noreply(requeue_and_run(AckTags, State))
end;
@@ -913,7 +913,7 @@ handle_cast({ack, Txn, AckTags, ChPid},
{C#cr{acktags = ChAckTags1}, BQ:ack(AckTags, BQS)};
_ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)}
end,
- store_ch_record(C1),
+ update_ch_record(C1),
noreply(State#q{backing_queue_state = BQS1})
end;
@@ -924,7 +924,7 @@ handle_cast({reject, AckTags, Requeue, ChPid},
noreply(State);
C = #cr{acktags = ChAckTags} ->
ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- store_ch_record(C#cr{acktags = ChAckTags1}),
+ update_ch_record(C#cr{acktags = ChAckTags1}),
noreply(case Requeue of
true -> requeue_and_run(AckTags, State);
false -> BQS1 = BQ:ack(AckTags, BQS),
@@ -1046,10 +1046,7 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
backing_queue_state = BQS2},
{hibernate, stop_rate_timer(State1)}.
-demonitor_and_erase_ch(#cr{ch_pid = ChPid,
- monitor_ref = MonitorRef}) ->
+demonitor_and_erase_ch_record(#cr{ch_pid = ChPid,
+ monitor_ref = MonitorRef}) ->
erlang:demonitor(MonitorRef),
erase({ch, ChPid}).
-
-
-