diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-11-14 15:19:05 +0000 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-11-14 15:19:05 +0000 |
| commit | aa0c973fbb6a456bc1b22031be6b9cf8d95a8f1f (patch) | |
| tree | a756095890209f7f35335b63321a45ff223e8122 /src | |
| parent | e56430b201a41ccd11a8852d1491c6496b9bfcd2 (diff) | |
| download | rabbitmq-server-git-aa0c973fbb6a456bc1b22031be6b9cf8d95a8f1f.tar.gz | |
don't remove expired messages until the DLQ has confirmed them
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 14 |
2 files changed, 11 insertions, 7 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f7756232cf..d28a39689c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1189,8 +1189,8 @@ handle_cast({confirm, MsgSeqNos, _From}, State) -> backing_queue_state = BQS}) -> Reason = gb_trees:get(MsgSeqNo, Unconfirmed), case Reason of - {expired, _} -> - ok; + {expired, AckTag} -> + BQ:ack([AckTag], undefined, BQS); {rejected, AckTag} -> BQ:ack([AckTag], undefined, BQS); {queue_deleted, _} -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 811017d969..92c650272f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -586,12 +586,16 @@ dropwhile(Pred, MsgFun, State) -> {empty, State1} -> a(State1); {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> - case Pred(MsgProps) of - true -> - State2 = MsgFun(read_msg_callback(MsgStatus), undefined, State1), - {_, State3} = internal_fetch(false, MsgStatus, State2), + case {Pred(MsgProps), MsgFun} of + {true, undefined} -> + {_, State2} = internal_fetch(false, MsgStatus, State1), + dropwhile(Pred, MsgFun, State2); + {true, _} -> + {{_, _, AckTag, _}, State2} = + internal_fetch(true, MsgStatus, State1), + State3 = MsgFun(read_msg_callback(MsgStatus), AckTag, State2), dropwhile(Pred, MsgFun, State3); - false -> + {false, _} -> a(in_r(MsgStatus, State1)) end end. |
