diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 21 |
1 files changed, 21 insertions, 0 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fe2c975b4e..d118ddc8af 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1036,3 +1036,24 @@ handle_pre_hibernate(State = #q{backing_queue = BQ, State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer), backing_queue_state = BQS2}, {hibernate, stop_rate_timer(State1)}. + +demonitor_and_erase_ch(#cr{ch_pid = ChPid, + monitor_ref = MonitorRef}) -> + erlang:demonitor(MonitorRef), + erase({ch, ChPid}). + +%% If the channel record we're considering submitting to the process dictionary +%% has no consumers, has no pending acks, and doesn't have a transaction, we +%% should delete its record rather than storing it. +replace_or_erase_ch(C = #cr{consumer_count = ConsumerCount, + limiter_pid = LimiterPid, + acktags = ChAckTags, + txn = Txn}) -> + case {sets:size(ChAckTags), ConsumerCount, Txn} of + {0, 0, undefined} -> demonitor_and_erase_ch(C), + ok = rabbit_limiter:unregister(LimiterPid, self()); + _ -> store_ch_record(C) + end. + + + |
