summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-11-14 15:19:05 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-11-14 15:19:05 +0000
commitaa0c973fbb6a456bc1b22031be6b9cf8d95a8f1f (patch)
treea756095890209f7f35335b63321a45ff223e8122 /src
parente56430b201a41ccd11a8852d1491c6496b9bfcd2 (diff)
downloadrabbitmq-server-git-aa0c973fbb6a456bc1b22031be6b9cf8d95a8f1f.tar.gz
don't remove expired messages until the DLQ has confirmed them
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_variable_queue.erl14
2 files changed, 11 insertions, 7 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f7756232cf..d28a39689c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1189,8 +1189,8 @@ handle_cast({confirm, MsgSeqNos, _From}, State) ->
backing_queue_state = BQS}) ->
Reason = gb_trees:get(MsgSeqNo, Unconfirmed),
case Reason of
- {expired, _} ->
- ok;
+ {expired, AckTag} ->
+ BQ:ack([AckTag], undefined, BQS);
{rejected, AckTag} ->
BQ:ack([AckTag], undefined, BQS);
{queue_deleted, _} ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 811017d969..92c650272f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -586,12 +586,16 @@ dropwhile(Pred, MsgFun, State) ->
{empty, State1} ->
a(State1);
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
- case Pred(MsgProps) of
- true ->
- State2 = MsgFun(read_msg_callback(MsgStatus), undefined, State1),
- {_, State3} = internal_fetch(false, MsgStatus, State2),
+ case {Pred(MsgProps), MsgFun} of
+ {true, undefined} ->
+ {_, State2} = internal_fetch(false, MsgStatus, State1),
+ dropwhile(Pred, MsgFun, State2);
+ {true, _} ->
+ {{_, _, AckTag, _}, State2} =
+ internal_fetch(true, MsgStatus, State1),
+ State3 = MsgFun(read_msg_callback(MsgStatus), AckTag, State2),
dropwhile(Pred, MsgFun, State3);
- false ->
+ {false, _} ->
a(in_r(MsgStatus, State1))
end
end.