diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-01 22:48:49 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-01 22:48:49 +0000 |
| commit | ada42944f5931ca278ff48009181baf6c1c68398 (patch) | |
| tree | d6f8790901cbf6bf0a71df4e40fe6c65d0d3fed9 | |
| parent | a94b5a7efa82d2bab165cdc416be905616e3f4f2 (diff) | |
| download | rabbitmq-server-git-ada42944f5931ca278ff48009181baf6c1c68398.tar.gz | |
refactor: extract dead lettering commonality
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 80 |
1 files changed, 35 insertions, 45 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 8e20f4e1a6..66e48024fd 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -733,52 +733,42 @@ drop_expired_msgs(State = #q{dlx = DLX, #message_properties{expiry = Exp} -> Exp end, State1). -dead_letter_expired_msgs(ExpirePred, X, State = #q{dlx_routing_key = RK, - publish_seqno = SeqNo0, - unconfirmed = UC0, - queue_monitors = QMons0, - backing_queue_state = BQS, - backing_queue = BQ}) -> +dead_letter_expired_msgs(ExpirePred, X, State = #q{backing_queue = BQ}) -> + dead_letter_msgs(fun (DLFun, Acc, BQS1) -> + BQ:fetchwhile(ExpirePred, DLFun, Acc, BQS1) + end, expired, X, State). + +dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) -> + {ok, State1} = + dead_letter_msgs( + fun (DLFun, Acc, BQS) -> + {Acc1, BQS1} = BQ:ackfold(DLFun, Acc, BQS, AckTags), + {ok, Acc1, BQS1} + end, rejected, X, State), + State1. + +dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK, + publish_seqno = SeqNo0, + unconfirmed = UC0, + queue_monitors = QMons0, + backing_queue_state = BQS, + backing_queue = BQ}) -> QName = qname(State), - {Next, {ConfirmImm1, SeqNo1, UC1, QMons1}, BQS1} = - BQ:fetchwhile( - ExpirePred, - fun (Msg, AckTag, {ConfirmImm, SeqNo, UC, QMons}) -> - case dead_letter_publish(Msg, expired, X, RK, SeqNo, QName) of - [] -> {[AckTag | ConfirmImm], SeqNo, UC, QMons}; - QPids -> {ConfirmImm, SeqNo + 1, - dtree:insert(SeqNo, QPids, AckTag, UC), - pmon:monitor_all(QPids, QMons)} - end - end, {[], SeqNo0, UC0, QMons0}, BQS), - {_Guids, BQS2} = BQ:ack(ConfirmImm1, BQS1), - {Next, State#q{publish_seqno = SeqNo1, - unconfirmed = UC1, - queue_monitors = QMons1, - backing_queue_state = BQS2}}. - -dead_letter_rejected_msgs(AckTags, X, State = #q{dlx_routing_key = RK, - publish_seqno = SeqNo0, - unconfirmed = UC0, - queue_monitors = QMons0, - backing_queue_state = BQS, - backing_queue = BQ}) -> - QName = qname(State), - {{ConfirmImm1, SeqNo1, UC1, QMons1}, BQS1} = - BQ:ackfold( - fun (Msg, AckTag, {ConfirmImm, SeqNo, UC, QMons}) -> - case dead_letter_publish(Msg, rejected, X, RK, SeqNo, QName) of - [] -> {[AckTag | ConfirmImm], SeqNo, UC, QMons}; - QPids -> {ConfirmImm, SeqNo + 1, - dtree:insert(SeqNo, QPids, AckTag, UC), - pmon:monitor_all(QPids, QMons)} - end - end, {[], SeqNo0, UC0, QMons0}, BQS, AckTags), - {_Guids, BQS2} = BQ:ack(ConfirmImm1, BQS1), - State#q{publish_seqno = SeqNo1, - unconfirmed = UC1, - queue_monitors = QMons1, - backing_queue_state = BQS2}. + {Res, {AckImm1, SeqNo1, UC1, QMons1}, BQS1} = + Fun(fun (Msg, AckTag, {AckImm, SeqNo, UC, QMons}) -> + case dead_letter_publish(Msg, Reason, + X, RK, SeqNo, QName) of + [] -> {[AckTag | AckImm], SeqNo, UC, QMons}; + QPids -> {AckImm, SeqNo + 1, + dtree:insert(SeqNo, QPids, AckTag, UC), + pmon:monitor_all(QPids, QMons)} + end + end, {[], SeqNo0, UC0, QMons0}, BQS), + {_Guids, BQS2} = BQ:ack(AckImm1, BQS1), + {Res, State#q{publish_seqno = SeqNo1, + unconfirmed = UC1, + queue_monitors = QMons1, + backing_queue_state = BQS2}}. ensure_ttl_timer(undefined, State) -> State; |
