diff options
| author | Jerry Kuch <jerryk@vmware.com> | 2010-11-10 10:34:25 +0000 |
|---|---|---|
| committer | Jerry Kuch <jerryk@vmware.com> | 2010-11-10 10:34:25 +0000 |
| commit | 4ff2f25b20ed4c461ec3bb41b2e99bc9c0a8222e (patch) | |
| tree | d765edad7fffa67e36b9efa9e5c9de5df4f57f8d | |
| parent | bf3b2a931b0260653aac0967dced37312a58e194 (diff) | |
| download | rabbitmq-server-git-4ff2f25b20ed4c461ec3bb41b2e99bc9c0a8222e.tar.gz | |
Improve function naming and handle other potential leak spots in channel record updates.
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 19 |
1 files changed, 8 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 1d4ac94ff9..94cbef8648 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -323,7 +323,7 @@ update_ch_record(C = #cr{consumer_count = ConsumerCount, acktags = ChAckTags, txn = Txn}) -> case {sets:size(ChAckTags), ConsumerCount, Txn} of - {0, 0, none} -> demonitor_and_erase_ch(C), + {0, 0, none} -> demonitor_and_erase_ch_record(C), ok = rabbit_limiter:unregister(LimiterPid, self()); _ -> store_ch_record(C) end. @@ -515,7 +515,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> not_found -> {ok, State}; C = #cr{ch_pid = ChPid, txn = Txn, acktags = ChAckTags} -> - demonitor_and_erase_ch(C), + demonitor_and_erase_ch_record(C), State1 = State#q{ exclusive_consumer = case Holder of {ChPid, _} -> none; @@ -574,7 +574,7 @@ commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, %% by the channel. C = #cr{acktags = ChAckTags} = lookup_ch(ChPid), ChAckTags1 = subtract_acks(ChAckTags, AckTags), - store_ch_record(C#cr{acktags = ChAckTags1, txn = none}), + update_ch_record(C#cr{acktags = ChAckTags1, txn = none}), State#q{backing_queue_state = BQS1}. rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ, @@ -889,7 +889,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> noreply(State); C = #cr{acktags = ChAckTags} -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), - store_ch_record(C#cr{acktags = ChAckTags1}), + update_ch_record(C#cr{acktags = ChAckTags1}), noreply(requeue_and_run(AckTags, State)) end; @@ -913,7 +913,7 @@ handle_cast({ack, Txn, AckTags, ChPid}, {C#cr{acktags = ChAckTags1}, BQ:ack(AckTags, BQS)}; _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)} end, - store_ch_record(C1), + update_ch_record(C1), noreply(State#q{backing_queue_state = BQS1}) end; @@ -924,7 +924,7 @@ handle_cast({reject, AckTags, Requeue, ChPid}, noreply(State); C = #cr{acktags = ChAckTags} -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), - store_ch_record(C#cr{acktags = ChAckTags1}), + update_ch_record(C#cr{acktags = ChAckTags1}), noreply(case Requeue of true -> requeue_and_run(AckTags, State); false -> BQS1 = BQ:ack(AckTags, BQS), @@ -1046,10 +1046,7 @@ handle_pre_hibernate(State = #q{backing_queue = BQ, backing_queue_state = BQS2}, {hibernate, stop_rate_timer(State1)}. -demonitor_and_erase_ch(#cr{ch_pid = ChPid, - monitor_ref = MonitorRef}) -> +demonitor_and_erase_ch_record(#cr{ch_pid = ChPid, + monitor_ref = MonitorRef}) -> erlang:demonitor(MonitorRef), erase({ch, ChPid}). - - - |
