diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 14 |
2 files changed, 11 insertions, 7 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f7756232cf..d28a39689c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1189,8 +1189,8 @@ handle_cast({confirm, MsgSeqNos, _From}, State) -> backing_queue_state = BQS}) -> Reason = gb_trees:get(MsgSeqNo, Unconfirmed), case Reason of - {expired, _} -> - ok; + {expired, AckTag} -> + BQ:ack([AckTag], undefined, BQS); {rejected, AckTag} -> BQ:ack([AckTag], undefined, BQS); {queue_deleted, _} -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 811017d969..92c650272f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -586,12 +586,16 @@ dropwhile(Pred, MsgFun, State) -> {empty, State1} -> a(State1); {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> - case Pred(MsgProps) of - true -> - State2 = MsgFun(read_msg_callback(MsgStatus), undefined, State1), - {_, State3} = internal_fetch(false, MsgStatus, State2), + case {Pred(MsgProps), MsgFun} of + {true, undefined} -> + {_, State2} = internal_fetch(false, MsgStatus, State1), + dropwhile(Pred, MsgFun, State2); + {true, _} -> + {{_, _, AckTag, _}, State2} = + internal_fetch(true, MsgStatus, State1), + State3 = MsgFun(read_msg_callback(MsgStatus), AckTag, State2), dropwhile(Pred, MsgFun, State3); - false -> + {false, _} -> a(in_r(MsgStatus, State1)) end end. |
