diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 50 |
1 files changed, 20 insertions, 30 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 8d7535f8fc..b679cb0870 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -705,17 +705,18 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) -> T -> now_micros() + T * 1000 end. -drop_expired_messages(State = #q{backing_queue_state = BQS, +drop_expired_messages(State = #q{dlx = DLX, + backing_queue_state = BQS, backing_queue = BQ }) -> Now = now_micros(), - DLXFun = dead_letter_fun(expired, State), ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end, - {Props, BQS1} = case DLXFun of + {Props, BQS1} = case DLX of undefined -> {Next, undefined, BQS2} = BQ:dropwhile(ExpirePred, false, BQS), {Next, BQS2}; _ -> {Next, Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS), + DLXFun = dead_letter_fun(expired), DLXFun(Msgs), {Next, BQS2} end, @@ -743,17 +744,7 @@ ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef, ensure_ttl_timer(_Expiry, State) -> State. -ack_if_no_dlx(AckTags, State = #q{dlx = undefined, - backing_queue = BQ, - backing_queue_state = BQS }) -> - {_Guids, BQS1} = BQ:ack(AckTags, BQS), - State#q{backing_queue_state = BQS1}; -ack_if_no_dlx(_AckTags, State) -> - State. - -dead_letter_fun(_Reason, #q{dlx = undefined}) -> - undefined; -dead_letter_fun(Reason, _State) -> +dead_letter_fun(Reason) -> fun(Msgs) -> gen_server2:cast(self(), {dead_letter, Msgs, Reason}) end. dead_letter_publish(Msg, Reason, X, State = #q{publish_seqno = MsgSeqNo}) -> @@ -1217,24 +1208,23 @@ handle_cast({ack, AckTags, ChPid}, State) -> State1#q{backing_queue_state = BQS1} end)); -handle_cast({reject, AckTags, Requeue, ChPid}, State) -> +handle_cast({reject, AckTags, true, ChPid}, State) -> noreply(subtract_acks( ChPid, AckTags, State, - case Requeue of - true -> fun (State1) -> requeue_and_run(AckTags, State1) end; - false -> fun (State1 = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - Fun = - case dead_letter_fun(rejected, State1) of - undefined -> undefined; - F -> fun(M, A) -> F([{M, A}]) - end - end, - BQS1 = BQ:fold(Fun, BQS, AckTags), - ack_if_no_dlx( - AckTags, - State1#q{backing_queue_state = BQS1}) - end + fun (State1) -> requeue_and_run(AckTags, State1) end)); + +handle_cast({reject, AckTags, false, ChPid}, State = #q{dlx = undefined}) -> + handle_cast({ack, AckTags, ChPid}, State); + +handle_cast({reject, AckTags, false, ChPid}, State) -> + DLXFun = dead_letter_fun(rejected), + noreply(subtract_acks( + ChPid, AckTags, State, + fun (State1 = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + BQS1 = BQ:fold(fun(M, A) -> DLXFun([{M, A}]) end, + BQS, AckTags), + State1#q{backing_queue_state = BQS1} end)); handle_cast(delete_immediately, State) -> |
