diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2012-03-23 10:56:50 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-03-23 10:56:50 +0000 |
| commit | 7eae68ac5add562edb07951055bfb0be33cd13c6 (patch) | |
| tree | 1c3fd07b1c1c7dceeee078a234b883e4481470de /src | |
| parent | e9ae5eeb0b90c5b0f3ac0782b8547f2458137022 (diff) | |
| download | rabbitmq-server-git-7eae68ac5add562edb07951055bfb0be33cd13c6.tar.gz | |
use pmon in amqqueue_process
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 78 |
1 files changed, 32 insertions, 46 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 41fc173b2b..456fc77c21 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -141,7 +141,7 @@ init(Q) -> unconfirmed_mq = gb_trees:empty(), unconfirmed_qm = gb_trees:empty(), delayed_stop = undefined, - queue_monitors = dict:new(), + queue_monitors = pmon:new(), msg_id_to_channel = gb_trees:empty()}, {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -167,7 +167,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, unconfirmed_mq = gb_trees:empty(), unconfirmed_qm = gb_trees:empty(), delayed_stop = undefined, - queue_monitors = dict:new(), + queue_monitors = pmon:new(), msg_id_to_channel = MTC}, State1 = requeue_and_run(AckTags, process_args( rabbit_event:init_stats_timer( @@ -752,61 +752,46 @@ dead_letter_msg_existing_dlx(Msg, AckTag, Reason, rabbit_basic:delivery( false, false, make_dead_letter_msg(DLX, Reason, Msg, State), MsgSeqNo)), - State1 = lists:foldl(fun monitor_queue/2, State, QPids), - State2 = State1#q{publish_seqno = MsgSeqNo + 1}, + State1 = State#q{queue_monitors = pmon:monitor_all( + QPids, State#q.queue_monitors), + publish_seqno = MsgSeqNo + 1}, case QPids of [] -> {_Guids, BQS1} = BQ:ack([AckTag], BQS), - cleanup_after_confirm(State2#q{backing_queue_state = BQS1}); - _ -> State3 = + cleanup_after_confirm(State1#q{backing_queue_state = BQS1}); + _ -> State2 = lists:foldl( fun(QPid, State0 = #q{unconfirmed_qm = UQM}) -> UQM1 = rabbit_misc:gb_trees_set_insert( QPid, MsgSeqNo, UQM), State0#q{unconfirmed_qm = UQM1} - end, State2, QPids), - noreply(State3#q{ + end, State1, QPids), + noreply(State2#q{ unconfirmed_mq = gb_trees:insert( MsgSeqNo, {gb_sets:from_list(QPids), AckTag}, UMQ)}) end. -monitor_queue(QPid, State = #q{queue_monitors = QMons}) -> - case dict:is_key(QPid, QMons) of - true -> State; - false -> State#q{queue_monitors = - dict:store(QPid, erlang:monitor(process, QPid), - QMons)} - end. - -demonitor_queue(QPid, State = #q{queue_monitors = QMons}) -> - case dict:find(QPid, QMons) of - {ok, MRef} -> erlang:demonitor(MRef), - State#q{queue_monitors = dict:erase(QPid, QMons)}; - error -> State - end. - handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons, unconfirmed_qm = UQM}) -> - case dict:find(QPid, QMons) of - error -> - noreply(State); - {ok, _} -> - rabbit_log:info("DLQ ~p (for ~s) died~n", - [QPid, rabbit_misc:rs(qname(State))]), - State1 = State#q{queue_monitors = dict:erase(QPid, QMons)}, - case gb_trees:lookup(QPid, UQM) of - none -> - noreply(State1); - {value, MsgSeqNosSet} -> - case rabbit_misc:is_abnormal_termination(Reason) of - true -> rabbit_log:warning( - "Dead queue lost ~p messages~n", - [gb_sets:size(MsgSeqNosSet)]); - false -> ok - end, - handle_confirm(gb_sets:to_list(MsgSeqNosSet), QPid, State1) - end + case pmon:is_monitored(QPid, QMons) of + false -> noreply(State); + true -> rabbit_log:info("DLQ ~p (for ~s) died~n", + [QPid, rabbit_misc:rs(qname(State))]), + State1 = State#q{queue_monitors = pmon:erase(QPid, QMons)}, + case gb_trees:lookup(QPid, UQM) of + none -> + noreply(State1); + {value, MsgSeqNosSet} -> + case rabbit_misc:is_abnormal_termination(Reason) of + true -> rabbit_log:warning( + "Dead queue lost ~p messages~n", + [gb_sets:size(MsgSeqNosSet)]); + false -> ok + end, + handle_confirm(gb_sets:to_list(MsgSeqNosSet), QPid, + State1) + end end. handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ, @@ -832,10 +817,11 @@ handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ, false -> State#q{ unconfirmed_qm = gb_trees:update(QPid, MsgSeqNos1, UQM)}; - true -> demonitor_queue( - QPid, State#q{ - unconfirmed_qm = - gb_trees:delete(QPid, UQM)}) + true -> State#q{ + queue_monitors = + pmon:demonitor(QPid, State#q.queue_monitors), + unconfirmed_qm = + gb_trees:delete(QPid, UQM)} end, cleanup_after_confirm(State1#q{unconfirmed_mq = UMQ3, backing_queue_state = BQS1}). |
