summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-11-15 14:54:32 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-11-15 14:54:32 +0000
commit210753283340f608726431a2e24975d1834791fb (patch)
tree05fb26e89eb694c5c791f2525c07fc0f11e1b4d6
parent4233681bbc79c022e8bf5f2114c407fe2d744fef (diff)
downloadrabbitmq-server-git-210753283340f608726431a2e24975d1834791fb.tar.gz
monitor the DLQs
-rw-r--r--src/rabbit_amqqueue_process.erl36
1 files changed, 28 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 25e5135766..265f4d1701 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -53,6 +53,7 @@
publish_seqno,
unconfirmed,
blocked_op,
+ queue_monitors,
dlx
}).
@@ -137,6 +138,7 @@ init(Q) ->
publish_seqno = 1,
unconfirmed = gb_trees:empty(),
blocked_op = undefined,
+ queue_monitors = dict:new(),
msg_id_to_channel = gb_trees:empty()},
{ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -162,6 +164,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
publish_seqno = 1,
unconfirmed = gb_trees:empty(),
blocked_op = undefined,
+ queue_monitors = dict:new(),
msg_id_to_channel = MTC},
State1 = requeue_and_run(AckTags, process_args(
rabbit_event:init_stats_timer(
@@ -745,12 +748,29 @@ dead_letter_msg(Msg, Extra, Reason, State = #q{publish_seqno = MsgSeqNo,
dlx = DLX}) ->
rabbit_exchange:lookup_or_die(DLX),
- rabbit_basic:publish(
- rabbit_basic:delivery(
- false, false, make_dead_letter_msg(DLX, Reason, Msg, State),
- MsgSeqNo)),
- State#q{publish_seqno = MsgSeqNo + 1,
- unconfirmed = gb_trees:insert(MsgSeqNo, {Reason, Extra}, UC)}.
+ {ok, _, QPids} =
+ rabbit_basic:publish(
+ rabbit_basic:delivery(
+ false, false, make_dead_letter_msg(DLX, Reason, Msg, State),
+ MsgSeqNo)),
+ State1 = lists:foldl(fun monitor_queue/2, State, QPids),
+ State1#q{publish_seqno = MsgSeqNo + 1,
+ unconfirmed = gb_trees:insert(MsgSeqNo, {Reason, Extra}, UC)}.
+
+monitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
+ case dict:is_key(QPid, QMons) of
+ true -> State;
+ false -> State#q{queue_monitors =
+ dict:store(QPid, erlang:monitor(process, QPid),
+ QMons)}
+ end.
+
+handle_queue_down(QPid, State = #q{queue_monitors = QMons}) ->
+ case dict:find(QPid, QMons) of
+ error -> State;
+ {ok, _} -> rabbit_log:info("DLQ ~p died~n", [QPid]),
+ State#q{queue_monitors = dict:erase(QPid, QMons)}
+ end.
make_dead_letter_msg(DLX, Reason, Msg = #basic_message{content = Content},
State) ->
@@ -1259,8 +1279,8 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
{stop, normal, State};
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
case handle_ch_down(DownPid, State) of
- {ok, NewState} -> noreply(NewState);
- {stop, NewState} -> {stop, normal, NewState}
+ {ok, State1} -> noreply(handle_queue_down(DownPid, State1));
+ {stop, State1} -> {stop, normal, State1}
end;
handle_info(update_ram_duration, State = #q{backing_queue = BQ,