summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_variable_queue.erl14
2 files changed, 11 insertions, 7 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f7756232cf..d28a39689c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1189,8 +1189,8 @@ handle_cast({confirm, MsgSeqNos, _From}, State) ->
backing_queue_state = BQS}) ->
Reason = gb_trees:get(MsgSeqNo, Unconfirmed),
case Reason of
- {expired, _} ->
- ok;
+ {expired, AckTag} ->
+ BQ:ack([AckTag], undefined, BQS);
{rejected, AckTag} ->
BQ:ack([AckTag], undefined, BQS);
{queue_deleted, _} ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 811017d969..92c650272f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -586,12 +586,16 @@ dropwhile(Pred, MsgFun, State) ->
{empty, State1} ->
a(State1);
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
- case Pred(MsgProps) of
- true ->
- State2 = MsgFun(read_msg_callback(MsgStatus), undefined, State1),
- {_, State3} = internal_fetch(false, MsgStatus, State2),
+ case {Pred(MsgProps), MsgFun} of
+ {true, undefined} ->
+ {_, State2} = internal_fetch(false, MsgStatus, State1),
+ dropwhile(Pred, MsgFun, State2);
+ {true, _} ->
+ {{_, _, AckTag, _}, State2} =
+ internal_fetch(true, MsgStatus, State1),
+ State3 = MsgFun(read_msg_callback(MsgStatus), AckTag, State2),
dropwhile(Pred, MsgFun, State3);
- false ->
+ {false, _} ->
a(in_r(MsgStatus, State1))
end
end.