diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-11-14 16:22:26 +0000 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-11-14 16:22:26 +0000 |
| commit | 11c0813c08a8c6281a7b55ceecdbaa7d13e2fba8 (patch) | |
| tree | 1bf243e81a7d3df3e6e7dae74bfb8285dc5657fb /src | |
| parent | aa0c973fbb6a456bc1b22031be6b9cf8d95a8f1f (diff) | |
| download | rabbitmq-server-git-11c0813c08a8c6281a7b55ceecdbaa7d13e2fba8.tar.gz | |
don't remove purged messages until the DLQ has confirmed them
Also, block the queue.purge_ok until the messages have reached the DLQ safely.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 77 |
1 files changed, 50 insertions, 27 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d28a39689c..618555e883 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -52,6 +52,7 @@ ttl_timer_ref, publish_seqno, unconfirmed, + blocked_purge, dlx }). @@ -135,6 +136,7 @@ init(Q) -> dlx = undefined, publish_seqno = 1, unconfirmed = gb_trees:empty(), + blocked_purge = undefined, msg_id_to_channel = gb_trees:empty()}, {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -159,6 +161,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, ttl = undefined, publish_seqno = 1, unconfirmed = gb_trees:empty(), + blocked_purge = undefined, msg_id_to_channel = MTC}, State1 = requeue_and_run(AckTags, process_args( rabbit_event:init_stats_timer( @@ -737,7 +740,7 @@ maybe_dead_letter_queue(Reason, State = #q{ end. dead_letter_msg(Msg, Extra, Reason, State = #q{publish_seqno = MsgSeqNo, - unconfirmed = Unconfirmed, + unconfirmed = UC, dlx = DLX}) -> rabbit_exchange:lookup_or_die(DLX), @@ -746,8 +749,7 @@ dead_letter_msg(Msg, Extra, Reason, State = #q{publish_seqno = MsgSeqNo, false, false, make_dead_letter_msg(DLX, Reason, Msg, State), MsgSeqNo)), State#q{publish_seqno = MsgSeqNo + 1, - unconfirmed = gb_trees:insert(MsgSeqNo, {Reason, Extra}, - Unconfirmed)}. + unconfirmed = gb_trees:insert(MsgSeqNo, {Reason, Extra}, UC)}. make_dead_letter_msg(DLX, Reason, Msg = #basic_message{content = Content}, State) -> @@ -1084,11 +1086,20 @@ handle_call({delete, IfUnused, IfEmpty}, _From, {stop, normal, {ok, BQ:len(BQS)}, State} end; -handle_call(purge, _From, State = #q{backing_queue = BQ}) -> - State1 = #q{backing_queue_state = BQS} = - maybe_dead_letter_queue(queue_purged, State), +handle_call(purge, _From, State = #q{backing_queue = BQ, + backing_queue_state = BQS, + dlx = undefined}) -> {Count, BQS1} = BQ:purge(BQS), - reply({ok, Count}, State1#q{backing_queue_state = BQS1}); + reply({ok, Count}, State#q{backing_queue_state = BQS1}); + +handle_call(purge, From, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + BQS1 = BQ:dropwhile( + fun (_) -> true end, + mk_dead_letter_fun(queue_purged, State), + BQS), + noreply(State#q{backing_queue_state = BQS1, + blocked_purge = {From, BQ:len(BQS)}}); handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), @@ -1181,26 +1192,38 @@ handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) -> end, noreply(State); -handle_cast({confirm, MsgSeqNos, _From}, State) -> - noreply(lists:foldl( - fun (MsgSeqNo, - State1 = #q{unconfirmed = Unconfirmed, - backing_queue = BQ, - backing_queue_state = BQS}) -> - Reason = gb_trees:get(MsgSeqNo, Unconfirmed), - case Reason of - {expired, AckTag} -> - BQ:ack([AckTag], undefined, BQS); - {rejected, AckTag} -> - BQ:ack([AckTag], undefined, BQS); - {queue_deleted, _} -> - ok; - {queue_purged, _} -> - ok - end, - State1#q{unconfirmed = gb_trees:delete(MsgSeqNo, - Unconfirmed)} - end, State, MsgSeqNos)); +handle_cast({confirm, MsgSeqNos, _From}, + State = #q{unconfirmed = UC, + backing_queue = BQ, + backing_queue_state = BQS, + blocked_purge = Purge}) -> + {BQS3, UC3} = lists:foldl( + fun (MsgSeqNo, {BQS1, UC1}) -> + Reason = gb_trees:get(MsgSeqNo, UC1), + %% FIXME Forward confirms to channels + {_, BQS2} = + case Reason of + {expired, AckTag} -> + BQ:ack([AckTag], undefined, BQS1); + {rejected, AckTag} -> + BQ:ack([AckTag], undefined, BQS1); + {queue_deleted, _} -> + BQS; + {queue_purged, AckTag} -> + BQ:ack([AckTag], undefined, BQS1) + end, + {BQS2, gb_trees:delete(MsgSeqNo, UC1)} + end, {BQS, UC}, MsgSeqNos), + Purge1 = case {gb_trees:is_empty(UC3), Purge} of + {true, {From, Count}} -> + gen_server2:reply(From, {ok, Count}), + undefined; + _ -> + Purge + end, + noreply(State#q{unconfirmed = UC3, + blocked_purge = Purge1, + backing_queue_state = BQS3}); handle_cast({dead_letter, {Msg, Extra}, Reason}, State) -> noreply(dead_letter_msg(Msg, Extra, Reason, State)). |
