diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-11-15 16:14:59 +0000 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-11-15 16:14:59 +0000 |
| commit | e041f9d892a3250acb273dccfb365693456a4fcc (patch) | |
| tree | fcd1fe1e9f24cc4247665abe4e9e0c01cf8b70ee /src | |
| parent | 210753283340f608726431a2e24975d1834791fb (diff) | |
| download | rabbitmq-server-git-e041f9d892a3250acb273dccfb365693456a4fcc.tar.gz | |
only remove messages when ALL DLQs have confirmed
Also, simulate confirms when a DLQ dies.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 107 |
1 files changed, 59 insertions, 48 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 265f4d1701..13ed0671d2 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -718,11 +718,11 @@ ensure_ttl_timer(State) -> State. mk_dead_letter_fun(_Reason, #q{dlx = undefined}) -> - fun(_MsgLookupFun, _Extra, BQS) -> BQS end; + fun(_MsgLookupFun, _AckTag, BQS) -> BQS end; mk_dead_letter_fun(Reason, _State) -> - fun(MsgLookupFun, Extra, BQS) -> + fun(MsgLookupFun, AckTag, BQS) -> {Msg, BQS1} = MsgLookupFun(BQS), - gen_server2:cast(self(), {dead_letter, {Msg, Extra}, Reason}), + gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason}), BQS1 end. @@ -743,9 +743,9 @@ dead_letter_deleted_queue(From, State = #q{backing_queue_state = BQS, backing_queue_state = BQS1}) end. -dead_letter_msg(Msg, Extra, Reason, State = #q{publish_seqno = MsgSeqNo, - unconfirmed = UC, - dlx = DLX}) -> +dead_letter_msg(Msg, AckTag, Reason, State = #q{publish_seqno = MsgSeqNo, + unconfirmed = UC, + dlx = DLX}) -> rabbit_exchange:lookup_or_die(DLX), {ok, _, QPids} = @@ -754,8 +754,9 @@ dead_letter_msg(Msg, Extra, Reason, State = #q{publish_seqno = MsgSeqNo, false, false, make_dead_letter_msg(DLX, Reason, Msg, State), MsgSeqNo)), State1 = lists:foldl(fun monitor_queue/2, State, QPids), + UC1 = gb_trees:insert(MsgSeqNo, {gb_sets:from_list(QPids), AckTag}, UC), State1#q{publish_seqno = MsgSeqNo + 1, - unconfirmed = gb_trees:insert(MsgSeqNo, {Reason, Extra}, UC)}. + unconfirmed = UC1}. monitor_queue(QPid, State = #q{queue_monitors = QMons}) -> case dict:is_key(QPid, QMons) of @@ -765,11 +766,53 @@ monitor_queue(QPid, State = #q{queue_monitors = QMons}) -> QMons)} end. -handle_queue_down(QPid, State = #q{queue_monitors = QMons}) -> +handle_queue_down(QPid, State = #q{queue_monitors = QMons, + unconfirmed = UC}) -> case dict:find(QPid, QMons) of - error -> State; - {ok, _} -> rabbit_log:info("DLQ ~p died~n", [QPid]), - State#q{queue_monitors = dict:erase(QPid, QMons)} + error -> + noreply(State); + {ok, _} -> + #resource{name = QName} = qname(State), + rabbit_log:info("DLQ ~p (for ~p) died~n", [QPid, QName]), + MsgSeqNos = [MsgSeqNo || + {MsgSeqNo, {QPids, _, _}} <- gb_trees:to_list(UC), + gb_sets:is_member(QPid, QPids)], + handle_confirm(MsgSeqNos, QPid, + State#q{queue_monitors = dict:erase(QPid, QMons)}) + end. + +handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed = UC, + backing_queue = BQ, + backing_queue_state = BQS, + blocked_op = Op}) -> + {BQS3, UC3} = + lists:foldl( + fun (MsgSeqNo, {BQS1, UC1}) -> + {QPids, AckTag} = gb_trees:get(MsgSeqNo, UC1), + QPids1 = gb_sets:delete(QPid, QPids), + case gb_sets:is_empty(QPids1) of + true -> {_Guids, BQS2} = + BQ:ack([AckTag], undefined, BQS1), + {BQS2, gb_trees:delete(MsgSeqNo, UC1)}; + false -> {BQS1, gb_trees:update(MsgSeqNo, + {QPids1, AckTag}, UC1)} + end + end, {BQS, UC}, MsgSeqNos), + State1 = State#q{unconfirmed = UC3, + backing_queue_state = BQS3, + blocked_op = undefined}, + case {gb_trees:is_empty(UC3), Op} of + {true, {purge, {From, Count}}} -> + gen_server2:reply(From, {ok, Count}), + noreply(State1); + {true, {delete, {From, Count}}} -> + case From of + undefined -> ok; + _ -> gen_server2:reply(From, {ok, Count}) + end, + {stop, normal, State1}; + _ -> + noreply(State1#q{blocked_op = Op}) end. make_dead_letter_msg(DLX, Reason, Msg = #basic_message{content = Content}, @@ -1213,43 +1256,11 @@ handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) -> end, noreply(State); -handle_cast({confirm, MsgSeqNos, _From}, - State = #q{unconfirmed = UC, - backing_queue = BQ, - backing_queue_state = BQS, - blocked_op = Op}) -> - {BQS3, UC3} = - lists:foldl( - fun (MsgSeqNo, {BQS1, UC1}) -> - {_Guids, BQS2} = - case gb_trees:get(MsgSeqNo, UC1) of - {Reason, AckTag} when Reason =:= expired; - Reason =:= rejected; - Reason =:= queue_deleted; - Reason =:= queue_purged -> - BQ:ack([AckTag], undefined, BQS1) - end, - {BQS2, gb_trees:delete(MsgSeqNo, UC1)} - end, {BQS, UC}, MsgSeqNos), - State1 = State#q{unconfirmed = UC3, - backing_queue_state = BQS3, - blocked_op = undefined}, - case {gb_trees:is_empty(UC3), Op} of - {true, {purge, {From, Count}}} -> - gen_server2:reply(From, {ok, Count}), - noreply(State1); - {true, {delete, {From, Count}}} -> - case From of - undefined -> ok; - _ -> gen_server2:reply(From, {ok, Count}) - end, - {stop, normal, State1}; - _ -> - noreply(State1#q{blocked_op = Op}) - end; +handle_cast({confirm, MsgSeqNos, QPid}, State) -> + handle_confirm(MsgSeqNos, QPid, State); -handle_cast({dead_letter, {Msg, Extra}, Reason}, State) -> - noreply(dead_letter_msg(Msg, Extra, Reason, State)). +handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) -> + noreply(dead_letter_msg(Msg, AckTag, Reason, State)). handle_info(maybe_expire, State) -> case is_unused(State) of @@ -1279,7 +1290,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, {stop, normal, State}; handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> case handle_ch_down(DownPid, State) of - {ok, State1} -> noreply(handle_queue_down(DownPid, State1)); + {ok, State1} -> handle_queue_down(DownPid, State1); {stop, State1} -> {stop, normal, State1} end; |
