summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2012-01-31 12:52:09 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2012-01-31 12:52:09 +0000
commit151c88acf1ae7fd010c9287c33e2d5842c6788db (patch)
treef59a2e28263d6095148223f09118c514bfd3c491 /src
parentf49fc3133a7a785374293d7f9beab2ee7df122f2 (diff)
downloadrabbitmq-server-git-151c88acf1ae7fd010c9287c33e2d5842c6788db.tar.gz
don't leak queue monitors
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl44
1 files changed, 41 insertions, 3 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5d3d939a73..3c1aa0e314 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -52,6 +52,7 @@
ttl_timer_ref,
publish_seqno,
unconfirmed,
+ unconfirmed_qm,
blocked_op,
queue_monitors,
dlx,
@@ -139,6 +140,7 @@ init(Q) ->
dlx_routing_key = undefined,
publish_seqno = 1,
unconfirmed = gb_trees:empty(),
+ unconfirmed_qm = gb_trees:empty(),
blocked_op = undefined,
queue_monitors = dict:new(),
msg_id_to_channel = gb_trees:empty()},
@@ -165,6 +167,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
ttl = undefined,
publish_seqno = 1,
unconfirmed = gb_trees:empty(),
+ unconfirmed_qm = gb_trees:empty(),
blocked_op = undefined,
queue_monitors = dict:new(),
msg_id_to_channel = MTC},
@@ -768,7 +771,23 @@ dead_letter_msg(Msg, AckTag, Reason,
case QPids of
[] -> {_, BQS1} = BQ:ack([AckTag], undefined, BQS),
cleanup_after_confirm(State2#q{backing_queue_state = BQS1});
- _ -> noreply(State2#q{
+ _ -> State3 =
+ lists:foldl(
+ fun(QPid, State0 = #q{unconfirmed_qm = UQM}) ->
+ case gb_trees:lookup(QPid, UQM) of
+ {value, MsgSeqNos} ->
+ MsgSeqNos1 = gb_sets:insert(MsgSeqNo,
+ MsgSeqNos),
+ UQM1 = gb_trees:update(QPid, MsgSeqNos1,
+ UQM),
+ State0#q{unconfirmed_qm = UQM1};
+ none ->
+ S = gb_sets:singleton(MsgSeqNo),
+ UQM1 = gb_trees:insert(QPid, S, UQM),
+ State0#q{unconfirmed_qm = UQM1}
+ end
+ end, State2, QPids),
+ noreply(State3#q{
unconfirmed = gb_trees:insert(
MsgSeqNo, {gb_sets:from_list(QPids),
AckTag}, UC)})
@@ -782,6 +801,13 @@ monitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
QMons)}
end.
+demonitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
+ case dict:find(QPid, QMons) of
+ {ok, MRef} -> erlang:demonitor(MRef),
+ State#q{queue_monitors = dict:erase(QPid, QMons)};
+ error -> State
+ end.
+
handle_queue_down(QPid, State = #q{queue_monitors = QMons,
unconfirmed = UC}) ->
case dict:find(QPid, QMons) of
@@ -798,6 +824,7 @@ handle_queue_down(QPid, State = #q{queue_monitors = QMons,
end.
handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed = UC,
+ unconfirmed_qm = UQM,
backing_queue = BQ,
backing_queue_state = BQS}) ->
{BQS3, UC3} =
@@ -813,8 +840,19 @@ handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed = UC,
{QPids1, AckTag}, UC1)}
end
end, {BQS, UC}, MsgSeqNos),
- cleanup_after_confirm(State#q{unconfirmed = UC3,
- backing_queue_state = BQS3}).
+ MsgSeqNos1 = gb_sets:difference(gb_trees:get(QPid, UQM),
+ gb_sets:from_list(MsgSeqNos)),
+ State1 = case gb_sets:is_empty(MsgSeqNos1) of
+ false -> State#q{
+ unconfirmed_qm =
+ gb_trees:update(QPid, MsgSeqNos1, UQM)};
+ true -> demonitor_queue(
+ QPid, State#q{
+ unconfirmed_qm =
+ gb_trees:delete(QPid, UQM)})
+ end,
+ cleanup_after_confirm(State1#q{unconfirmed = UC3,
+ backing_queue_state = BQS3}).
cleanup_after_confirm(State = #q{blocked_op = Op,
unconfirmed = UC}) ->