summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-11-14 16:22:26 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-11-14 16:22:26 +0000
commit11c0813c08a8c6281a7b55ceecdbaa7d13e2fba8 (patch)
tree1bf243e81a7d3df3e6e7dae74bfb8285dc5657fb /src
parentaa0c973fbb6a456bc1b22031be6b9cf8d95a8f1f (diff)
downloadrabbitmq-server-git-11c0813c08a8c6281a7b55ceecdbaa7d13e2fba8.tar.gz
don't remove purged messages until the DLQ has confirmed them
Also, block the queue.purge_ok until the messages have reached the DLQ safely.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl77
1 files changed, 50 insertions, 27 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d28a39689c..618555e883 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -52,6 +52,7 @@
ttl_timer_ref,
publish_seqno,
unconfirmed,
+ blocked_purge,
dlx
}).
@@ -135,6 +136,7 @@ init(Q) ->
dlx = undefined,
publish_seqno = 1,
unconfirmed = gb_trees:empty(),
+ blocked_purge = undefined,
msg_id_to_channel = gb_trees:empty()},
{ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -159,6 +161,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
ttl = undefined,
publish_seqno = 1,
unconfirmed = gb_trees:empty(),
+ blocked_purge = undefined,
msg_id_to_channel = MTC},
State1 = requeue_and_run(AckTags, process_args(
rabbit_event:init_stats_timer(
@@ -737,7 +740,7 @@ maybe_dead_letter_queue(Reason, State = #q{
end.
dead_letter_msg(Msg, Extra, Reason, State = #q{publish_seqno = MsgSeqNo,
- unconfirmed = Unconfirmed,
+ unconfirmed = UC,
dlx = DLX}) ->
rabbit_exchange:lookup_or_die(DLX),
@@ -746,8 +749,7 @@ dead_letter_msg(Msg, Extra, Reason, State = #q{publish_seqno = MsgSeqNo,
false, false, make_dead_letter_msg(DLX, Reason, Msg, State),
MsgSeqNo)),
State#q{publish_seqno = MsgSeqNo + 1,
- unconfirmed = gb_trees:insert(MsgSeqNo, {Reason, Extra},
- Unconfirmed)}.
+ unconfirmed = gb_trees:insert(MsgSeqNo, {Reason, Extra}, UC)}.
make_dead_letter_msg(DLX, Reason, Msg = #basic_message{content = Content},
State) ->
@@ -1084,11 +1086,20 @@ handle_call({delete, IfUnused, IfEmpty}, _From,
{stop, normal, {ok, BQ:len(BQS)}, State}
end;
-handle_call(purge, _From, State = #q{backing_queue = BQ}) ->
- State1 = #q{backing_queue_state = BQS} =
- maybe_dead_letter_queue(queue_purged, State),
+handle_call(purge, _From, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ dlx = undefined}) ->
{Count, BQS1} = BQ:purge(BQS),
- reply({ok, Count}, State1#q{backing_queue_state = BQS1});
+ reply({ok, Count}, State#q{backing_queue_state = BQS1});
+
+handle_call(purge, From, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ BQS1 = BQ:dropwhile(
+ fun (_) -> true end,
+ mk_dead_letter_fun(queue_purged, State),
+ BQS),
+ noreply(State#q{backing_queue_state = BQS1,
+ blocked_purge = {From, BQ:len(BQS)}});
handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
@@ -1181,26 +1192,38 @@ handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) ->
end,
noreply(State);
-handle_cast({confirm, MsgSeqNos, _From}, State) ->
- noreply(lists:foldl(
- fun (MsgSeqNo,
- State1 = #q{unconfirmed = Unconfirmed,
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
- Reason = gb_trees:get(MsgSeqNo, Unconfirmed),
- case Reason of
- {expired, AckTag} ->
- BQ:ack([AckTag], undefined, BQS);
- {rejected, AckTag} ->
- BQ:ack([AckTag], undefined, BQS);
- {queue_deleted, _} ->
- ok;
- {queue_purged, _} ->
- ok
- end,
- State1#q{unconfirmed = gb_trees:delete(MsgSeqNo,
- Unconfirmed)}
- end, State, MsgSeqNos));
+handle_cast({confirm, MsgSeqNos, _From},
+ State = #q{unconfirmed = UC,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ blocked_purge = Purge}) ->
+ {BQS3, UC3} = lists:foldl(
+ fun (MsgSeqNo, {BQS1, UC1}) ->
+ Reason = gb_trees:get(MsgSeqNo, UC1),
+ %% FIXME Forward confirms to channels
+ {_, BQS2} =
+ case Reason of
+ {expired, AckTag} ->
+ BQ:ack([AckTag], undefined, BQS1);
+ {rejected, AckTag} ->
+ BQ:ack([AckTag], undefined, BQS1);
+ {queue_deleted, _} ->
+ BQS;
+ {queue_purged, AckTag} ->
+ BQ:ack([AckTag], undefined, BQS1)
+ end,
+ {BQS2, gb_trees:delete(MsgSeqNo, UC1)}
+ end, {BQS, UC}, MsgSeqNos),
+ Purge1 = case {gb_trees:is_empty(UC3), Purge} of
+ {true, {From, Count}} ->
+ gen_server2:reply(From, {ok, Count}),
+ undefined;
+ _ ->
+ Purge
+ end,
+ noreply(State#q{unconfirmed = UC3,
+ blocked_purge = Purge1,
+ backing_queue_state = BQS3});
handle_cast({dead_letter, {Msg, Extra}, Reason}, State) ->
noreply(dead_letter_msg(Msg, Extra, Reason, State)).