diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-11-15 14:54:32 +0000 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-11-15 14:54:32 +0000 |
| commit | 210753283340f608726431a2e24975d1834791fb (patch) | |
| tree | 05fb26e89eb694c5c791f2525c07fc0f11e1b4d6 | |
| parent | 4233681bbc79c022e8bf5f2114c407fe2d744fef (diff) | |
| download | rabbitmq-server-git-210753283340f608726431a2e24975d1834791fb.tar.gz | |
monitor the DLQs
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 36 |
1 files changed, 28 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 25e5135766..265f4d1701 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -53,6 +53,7 @@ publish_seqno, unconfirmed, blocked_op, + queue_monitors, dlx }). @@ -137,6 +138,7 @@ init(Q) -> publish_seqno = 1, unconfirmed = gb_trees:empty(), blocked_op = undefined, + queue_monitors = dict:new(), msg_id_to_channel = gb_trees:empty()}, {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -162,6 +164,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, publish_seqno = 1, unconfirmed = gb_trees:empty(), blocked_op = undefined, + queue_monitors = dict:new(), msg_id_to_channel = MTC}, State1 = requeue_and_run(AckTags, process_args( rabbit_event:init_stats_timer( @@ -745,12 +748,29 @@ dead_letter_msg(Msg, Extra, Reason, State = #q{publish_seqno = MsgSeqNo, dlx = DLX}) -> rabbit_exchange:lookup_or_die(DLX), - rabbit_basic:publish( - rabbit_basic:delivery( - false, false, make_dead_letter_msg(DLX, Reason, Msg, State), - MsgSeqNo)), - State#q{publish_seqno = MsgSeqNo + 1, - unconfirmed = gb_trees:insert(MsgSeqNo, {Reason, Extra}, UC)}. + {ok, _, QPids} = + rabbit_basic:publish( + rabbit_basic:delivery( + false, false, make_dead_letter_msg(DLX, Reason, Msg, State), + MsgSeqNo)), + State1 = lists:foldl(fun monitor_queue/2, State, QPids), + State1#q{publish_seqno = MsgSeqNo + 1, + unconfirmed = gb_trees:insert(MsgSeqNo, {Reason, Extra}, UC)}. + +monitor_queue(QPid, State = #q{queue_monitors = QMons}) -> + case dict:is_key(QPid, QMons) of + true -> State; + false -> State#q{queue_monitors = + dict:store(QPid, erlang:monitor(process, QPid), + QMons)} + end. + +handle_queue_down(QPid, State = #q{queue_monitors = QMons}) -> + case dict:find(QPid, QMons) of + error -> State; + {ok, _} -> rabbit_log:info("DLQ ~p died~n", [QPid]), + State#q{queue_monitors = dict:erase(QPid, QMons)} + end. make_dead_letter_msg(DLX, Reason, Msg = #basic_message{content = Content}, State) -> @@ -1259,8 +1279,8 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, {stop, normal, State}; handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> case handle_ch_down(DownPid, State) of - {ok, NewState} -> noreply(NewState); - {stop, NewState} -> {stop, normal, NewState} + {ok, State1} -> noreply(handle_queue_down(DownPid, State1)); + {stop, State1} -> {stop, normal, State1} end; handle_info(update_ram_duration, State = #q{backing_queue = BQ, |
