summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-03-23 10:56:50 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-03-23 10:56:50 +0000
commit7eae68ac5add562edb07951055bfb0be33cd13c6 (patch)
tree1c3fd07b1c1c7dceeee078a234b883e4481470de
parente9ae5eeb0b90c5b0f3ac0782b8547f2458137022 (diff)
downloadrabbitmq-server-git-7eae68ac5add562edb07951055bfb0be33cd13c6.tar.gz
use pmon in amqqueue_process
-rw-r--r--src/rabbit_amqqueue_process.erl78
1 files changed, 32 insertions, 46 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 41fc173b2b..456fc77c21 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -141,7 +141,7 @@ init(Q) ->
unconfirmed_mq = gb_trees:empty(),
unconfirmed_qm = gb_trees:empty(),
delayed_stop = undefined,
- queue_monitors = dict:new(),
+ queue_monitors = pmon:new(),
msg_id_to_channel = gb_trees:empty()},
{ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -167,7 +167,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
unconfirmed_mq = gb_trees:empty(),
unconfirmed_qm = gb_trees:empty(),
delayed_stop = undefined,
- queue_monitors = dict:new(),
+ queue_monitors = pmon:new(),
msg_id_to_channel = MTC},
State1 = requeue_and_run(AckTags, process_args(
rabbit_event:init_stats_timer(
@@ -752,61 +752,46 @@ dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
rabbit_basic:delivery(
false, false, make_dead_letter_msg(DLX, Reason, Msg, State),
MsgSeqNo)),
- State1 = lists:foldl(fun monitor_queue/2, State, QPids),
- State2 = State1#q{publish_seqno = MsgSeqNo + 1},
+ State1 = State#q{queue_monitors = pmon:monitor_all(
+ QPids, State#q.queue_monitors),
+ publish_seqno = MsgSeqNo + 1},
case QPids of
[] -> {_Guids, BQS1} = BQ:ack([AckTag], BQS),
- cleanup_after_confirm(State2#q{backing_queue_state = BQS1});
- _ -> State3 =
+ cleanup_after_confirm(State1#q{backing_queue_state = BQS1});
+ _ -> State2 =
lists:foldl(
fun(QPid, State0 = #q{unconfirmed_qm = UQM}) ->
UQM1 = rabbit_misc:gb_trees_set_insert(
QPid, MsgSeqNo, UQM),
State0#q{unconfirmed_qm = UQM1}
- end, State2, QPids),
- noreply(State3#q{
+ end, State1, QPids),
+ noreply(State2#q{
unconfirmed_mq =
gb_trees:insert(
MsgSeqNo, {gb_sets:from_list(QPids),
AckTag}, UMQ)})
end.
-monitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
- case dict:is_key(QPid, QMons) of
- true -> State;
- false -> State#q{queue_monitors =
- dict:store(QPid, erlang:monitor(process, QPid),
- QMons)}
- end.
-
-demonitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
- case dict:find(QPid, QMons) of
- {ok, MRef} -> erlang:demonitor(MRef),
- State#q{queue_monitors = dict:erase(QPid, QMons)};
- error -> State
- end.
-
handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
unconfirmed_qm = UQM}) ->
- case dict:find(QPid, QMons) of
- error ->
- noreply(State);
- {ok, _} ->
- rabbit_log:info("DLQ ~p (for ~s) died~n",
- [QPid, rabbit_misc:rs(qname(State))]),
- State1 = State#q{queue_monitors = dict:erase(QPid, QMons)},
- case gb_trees:lookup(QPid, UQM) of
- none ->
- noreply(State1);
- {value, MsgSeqNosSet} ->
- case rabbit_misc:is_abnormal_termination(Reason) of
- true -> rabbit_log:warning(
- "Dead queue lost ~p messages~n",
- [gb_sets:size(MsgSeqNosSet)]);
- false -> ok
- end,
- handle_confirm(gb_sets:to_list(MsgSeqNosSet), QPid, State1)
- end
+ case pmon:is_monitored(QPid, QMons) of
+ false -> noreply(State);
+ true -> rabbit_log:info("DLQ ~p (for ~s) died~n",
+ [QPid, rabbit_misc:rs(qname(State))]),
+ State1 = State#q{queue_monitors = pmon:erase(QPid, QMons)},
+ case gb_trees:lookup(QPid, UQM) of
+ none ->
+ noreply(State1);
+ {value, MsgSeqNosSet} ->
+ case rabbit_misc:is_abnormal_termination(Reason) of
+ true -> rabbit_log:warning(
+ "Dead queue lost ~p messages~n",
+ [gb_sets:size(MsgSeqNosSet)]);
+ false -> ok
+ end,
+ handle_confirm(gb_sets:to_list(MsgSeqNosSet), QPid,
+ State1)
+ end
end.
handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ,
@@ -832,10 +817,11 @@ handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ,
false -> State#q{
unconfirmed_qm =
gb_trees:update(QPid, MsgSeqNos1, UQM)};
- true -> demonitor_queue(
- QPid, State#q{
- unconfirmed_qm =
- gb_trees:delete(QPid, UQM)})
+ true -> State#q{
+ queue_monitors =
+ pmon:demonitor(QPid, State#q.queue_monitors),
+ unconfirmed_qm =
+ gb_trees:delete(QPid, UQM)}
end,
cleanup_after_confirm(State1#q{unconfirmed_mq = UMQ3,
backing_queue_state = BQS1}).