summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-11-15 16:14:59 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-11-15 16:14:59 +0000
commite041f9d892a3250acb273dccfb365693456a4fcc (patch)
treefcd1fe1e9f24cc4247665abe4e9e0c01cf8b70ee /src
parent210753283340f608726431a2e24975d1834791fb (diff)
downloadrabbitmq-server-git-e041f9d892a3250acb273dccfb365693456a4fcc.tar.gz
only remove messages when ALL DLQs have confirmed
Also, simulate confirms when a DLQ dies.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl107
1 files changed, 59 insertions, 48 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 265f4d1701..13ed0671d2 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -718,11 +718,11 @@ ensure_ttl_timer(State) ->
State.
mk_dead_letter_fun(_Reason, #q{dlx = undefined}) ->
- fun(_MsgLookupFun, _Extra, BQS) -> BQS end;
+ fun(_MsgLookupFun, _AckTag, BQS) -> BQS end;
mk_dead_letter_fun(Reason, _State) ->
- fun(MsgLookupFun, Extra, BQS) ->
+ fun(MsgLookupFun, AckTag, BQS) ->
{Msg, BQS1} = MsgLookupFun(BQS),
- gen_server2:cast(self(), {dead_letter, {Msg, Extra}, Reason}),
+ gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason}),
BQS1
end.
@@ -743,9 +743,9 @@ dead_letter_deleted_queue(From, State = #q{backing_queue_state = BQS,
backing_queue_state = BQS1})
end.
-dead_letter_msg(Msg, Extra, Reason, State = #q{publish_seqno = MsgSeqNo,
- unconfirmed = UC,
- dlx = DLX}) ->
+dead_letter_msg(Msg, AckTag, Reason, State = #q{publish_seqno = MsgSeqNo,
+ unconfirmed = UC,
+ dlx = DLX}) ->
rabbit_exchange:lookup_or_die(DLX),
{ok, _, QPids} =
@@ -754,8 +754,9 @@ dead_letter_msg(Msg, Extra, Reason, State = #q{publish_seqno = MsgSeqNo,
false, false, make_dead_letter_msg(DLX, Reason, Msg, State),
MsgSeqNo)),
State1 = lists:foldl(fun monitor_queue/2, State, QPids),
+ UC1 = gb_trees:insert(MsgSeqNo, {gb_sets:from_list(QPids), AckTag}, UC),
State1#q{publish_seqno = MsgSeqNo + 1,
- unconfirmed = gb_trees:insert(MsgSeqNo, {Reason, Extra}, UC)}.
+ unconfirmed = UC1}.
monitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
case dict:is_key(QPid, QMons) of
@@ -765,11 +766,53 @@ monitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
QMons)}
end.
-handle_queue_down(QPid, State = #q{queue_monitors = QMons}) ->
+handle_queue_down(QPid, State = #q{queue_monitors = QMons,
+ unconfirmed = UC}) ->
case dict:find(QPid, QMons) of
- error -> State;
- {ok, _} -> rabbit_log:info("DLQ ~p died~n", [QPid]),
- State#q{queue_monitors = dict:erase(QPid, QMons)}
+ error ->
+ noreply(State);
+ {ok, _} ->
+ #resource{name = QName} = qname(State),
+ rabbit_log:info("DLQ ~p (for ~p) died~n", [QPid, QName]),
+ MsgSeqNos = [MsgSeqNo ||
+ {MsgSeqNo, {QPids, _, _}} <- gb_trees:to_list(UC),
+ gb_sets:is_member(QPid, QPids)],
+ handle_confirm(MsgSeqNos, QPid,
+ State#q{queue_monitors = dict:erase(QPid, QMons)})
+ end.
+
+handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed = UC,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ blocked_op = Op}) ->
+ {BQS3, UC3} =
+ lists:foldl(
+ fun (MsgSeqNo, {BQS1, UC1}) ->
+ {QPids, AckTag} = gb_trees:get(MsgSeqNo, UC1),
+ QPids1 = gb_sets:delete(QPid, QPids),
+ case gb_sets:is_empty(QPids1) of
+ true -> {_Guids, BQS2} =
+ BQ:ack([AckTag], undefined, BQS1),
+ {BQS2, gb_trees:delete(MsgSeqNo, UC1)};
+ false -> {BQS1, gb_trees:update(MsgSeqNo,
+ {QPids1, AckTag}, UC1)}
+ end
+ end, {BQS, UC}, MsgSeqNos),
+ State1 = State#q{unconfirmed = UC3,
+ backing_queue_state = BQS3,
+ blocked_op = undefined},
+ case {gb_trees:is_empty(UC3), Op} of
+ {true, {purge, {From, Count}}} ->
+ gen_server2:reply(From, {ok, Count}),
+ noreply(State1);
+ {true, {delete, {From, Count}}} ->
+ case From of
+ undefined -> ok;
+ _ -> gen_server2:reply(From, {ok, Count})
+ end,
+ {stop, normal, State1};
+ _ ->
+ noreply(State1#q{blocked_op = Op})
end.
make_dead_letter_msg(DLX, Reason, Msg = #basic_message{content = Content},
@@ -1213,43 +1256,11 @@ handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) ->
end,
noreply(State);
-handle_cast({confirm, MsgSeqNos, _From},
- State = #q{unconfirmed = UC,
- backing_queue = BQ,
- backing_queue_state = BQS,
- blocked_op = Op}) ->
- {BQS3, UC3} =
- lists:foldl(
- fun (MsgSeqNo, {BQS1, UC1}) ->
- {_Guids, BQS2} =
- case gb_trees:get(MsgSeqNo, UC1) of
- {Reason, AckTag} when Reason =:= expired;
- Reason =:= rejected;
- Reason =:= queue_deleted;
- Reason =:= queue_purged ->
- BQ:ack([AckTag], undefined, BQS1)
- end,
- {BQS2, gb_trees:delete(MsgSeqNo, UC1)}
- end, {BQS, UC}, MsgSeqNos),
- State1 = State#q{unconfirmed = UC3,
- backing_queue_state = BQS3,
- blocked_op = undefined},
- case {gb_trees:is_empty(UC3), Op} of
- {true, {purge, {From, Count}}} ->
- gen_server2:reply(From, {ok, Count}),
- noreply(State1);
- {true, {delete, {From, Count}}} ->
- case From of
- undefined -> ok;
- _ -> gen_server2:reply(From, {ok, Count})
- end,
- {stop, normal, State1};
- _ ->
- noreply(State1#q{blocked_op = Op})
- end;
+handle_cast({confirm, MsgSeqNos, QPid}, State) ->
+ handle_confirm(MsgSeqNos, QPid, State);
-handle_cast({dead_letter, {Msg, Extra}, Reason}, State) ->
- noreply(dead_letter_msg(Msg, Extra, Reason, State)).
+handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) ->
+ noreply(dead_letter_msg(Msg, AckTag, Reason, State)).
handle_info(maybe_expire, State) ->
case is_unused(State) of
@@ -1279,7 +1290,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
{stop, normal, State};
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
case handle_ch_down(DownPid, State) of
- {ok, State1} -> noreply(handle_queue_down(DownPid, State1));
+ {ok, State1} -> handle_queue_down(DownPid, State1);
{stop, State1} -> {stop, normal, State1}
end;