summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl21
1 files changed, 21 insertions, 0 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index fe2c975b4e..d118ddc8af 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1036,3 +1036,24 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer),
backing_queue_state = BQS2},
{hibernate, stop_rate_timer(State1)}.
+
+demonitor_and_erase_ch(#cr{ch_pid = ChPid,
+ monitor_ref = MonitorRef}) ->
+ erlang:demonitor(MonitorRef),
+ erase({ch, ChPid}).
+
+%% If the channel record we're considering submitting to the process dictionary
+%% has no consumers, has no pending acks, and doesn't have a transaction, we
+%% should delete its record rather than storing it.
+replace_or_erase_ch(C = #cr{consumer_count = ConsumerCount,
+ limiter_pid = LimiterPid,
+ acktags = ChAckTags,
+ txn = Txn}) ->
+ case {sets:size(ChAckTags), ConsumerCount, Txn} of
+ {0, 0, undefined} -> demonitor_and_erase_ch(C),
+ ok = rabbit_limiter:unregister(LimiterPid, self());
+ _ -> store_ch_record(C)
+ end.
+
+
+