diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2012-01-31 12:52:09 +0000 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2012-01-31 12:52:09 +0000 |
| commit | 151c88acf1ae7fd010c9287c33e2d5842c6788db (patch) | |
| tree | f59a2e28263d6095148223f09118c514bfd3c491 /src | |
| parent | f49fc3133a7a785374293d7f9beab2ee7df122f2 (diff) | |
| download | rabbitmq-server-git-151c88acf1ae7fd010c9287c33e2d5842c6788db.tar.gz | |
don't leak queue monitors
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 44 |
1 files changed, 41 insertions, 3 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5d3d939a73..3c1aa0e314 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -52,6 +52,7 @@ ttl_timer_ref, publish_seqno, unconfirmed, + unconfirmed_qm, blocked_op, queue_monitors, dlx, @@ -139,6 +140,7 @@ init(Q) -> dlx_routing_key = undefined, publish_seqno = 1, unconfirmed = gb_trees:empty(), + unconfirmed_qm = gb_trees:empty(), blocked_op = undefined, queue_monitors = dict:new(), msg_id_to_channel = gb_trees:empty()}, @@ -165,6 +167,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, ttl = undefined, publish_seqno = 1, unconfirmed = gb_trees:empty(), + unconfirmed_qm = gb_trees:empty(), blocked_op = undefined, queue_monitors = dict:new(), msg_id_to_channel = MTC}, @@ -768,7 +771,23 @@ dead_letter_msg(Msg, AckTag, Reason, case QPids of [] -> {_, BQS1} = BQ:ack([AckTag], undefined, BQS), cleanup_after_confirm(State2#q{backing_queue_state = BQS1}); - _ -> noreply(State2#q{ + _ -> State3 = + lists:foldl( + fun(QPid, State0 = #q{unconfirmed_qm = UQM}) -> + case gb_trees:lookup(QPid, UQM) of + {value, MsgSeqNos} -> + MsgSeqNos1 = gb_sets:insert(MsgSeqNo, + MsgSeqNos), + UQM1 = gb_trees:update(QPid, MsgSeqNos1, + UQM), + State0#q{unconfirmed_qm = UQM1}; + none -> + S = gb_sets:singleton(MsgSeqNo), + UQM1 = gb_trees:insert(QPid, S, UQM), + State0#q{unconfirmed_qm = UQM1} + end + end, State2, QPids), + noreply(State3#q{ unconfirmed = gb_trees:insert( MsgSeqNo, {gb_sets:from_list(QPids), AckTag}, UC)}) @@ -782,6 +801,13 @@ monitor_queue(QPid, State = #q{queue_monitors = QMons}) -> QMons)} end. +demonitor_queue(QPid, State = #q{queue_monitors = QMons}) -> + case dict:find(QPid, QMons) of + {ok, MRef} -> erlang:demonitor(MRef), + State#q{queue_monitors = dict:erase(QPid, QMons)}; + error -> State + end. + handle_queue_down(QPid, State = #q{queue_monitors = QMons, unconfirmed = UC}) -> case dict:find(QPid, QMons) of @@ -798,6 +824,7 @@ handle_queue_down(QPid, State = #q{queue_monitors = QMons, end. handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed = UC, + unconfirmed_qm = UQM, backing_queue = BQ, backing_queue_state = BQS}) -> {BQS3, UC3} = @@ -813,8 +840,19 @@ handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed = UC, {QPids1, AckTag}, UC1)} end end, {BQS, UC}, MsgSeqNos), - cleanup_after_confirm(State#q{unconfirmed = UC3, - backing_queue_state = BQS3}). + MsgSeqNos1 = gb_sets:difference(gb_trees:get(QPid, UQM), + gb_sets:from_list(MsgSeqNos)), + State1 = case gb_sets:is_empty(MsgSeqNos1) of + false -> State#q{ + unconfirmed_qm = + gb_trees:update(QPid, MsgSeqNos1, UQM)}; + true -> demonitor_queue( + QPid, State#q{ + unconfirmed_qm = + gb_trees:delete(QPid, UQM)}) + end, + cleanup_after_confirm(State1#q{unconfirmed = UC3, + backing_queue_state = BQS3}). cleanup_after_confirm(State = #q{blocked_op = Op, unconfirmed = UC}) -> |
