summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-01 22:48:49 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-01 22:48:49 +0000
commitada42944f5931ca278ff48009181baf6c1c68398 (patch)
treed6f8790901cbf6bf0a71df4e40fe6c65d0d3fed9
parenta94b5a7efa82d2bab165cdc416be905616e3f4f2 (diff)
downloadrabbitmq-server-git-ada42944f5931ca278ff48009181baf6c1c68398.tar.gz
refactor: extract dead lettering commonality
-rw-r--r--src/rabbit_amqqueue_process.erl80
1 files changed, 35 insertions, 45 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 8e20f4e1a6..66e48024fd 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -733,52 +733,42 @@ drop_expired_msgs(State = #q{dlx = DLX,
#message_properties{expiry = Exp} -> Exp
end, State1).
-dead_letter_expired_msgs(ExpirePred, X, State = #q{dlx_routing_key = RK,
- publish_seqno = SeqNo0,
- unconfirmed = UC0,
- queue_monitors = QMons0,
- backing_queue_state = BQS,
- backing_queue = BQ}) ->
+dead_letter_expired_msgs(ExpirePred, X, State = #q{backing_queue = BQ}) ->
+ dead_letter_msgs(fun (DLFun, Acc, BQS1) ->
+ BQ:fetchwhile(ExpirePred, DLFun, Acc, BQS1)
+ end, expired, X, State).
+
+dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) ->
+ {ok, State1} =
+ dead_letter_msgs(
+ fun (DLFun, Acc, BQS) ->
+ {Acc1, BQS1} = BQ:ackfold(DLFun, Acc, BQS, AckTags),
+ {ok, Acc1, BQS1}
+ end, rejected, X, State),
+ State1.
+
+dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
+ publish_seqno = SeqNo0,
+ unconfirmed = UC0,
+ queue_monitors = QMons0,
+ backing_queue_state = BQS,
+ backing_queue = BQ}) ->
QName = qname(State),
- {Next, {ConfirmImm1, SeqNo1, UC1, QMons1}, BQS1} =
- BQ:fetchwhile(
- ExpirePred,
- fun (Msg, AckTag, {ConfirmImm, SeqNo, UC, QMons}) ->
- case dead_letter_publish(Msg, expired, X, RK, SeqNo, QName) of
- [] -> {[AckTag | ConfirmImm], SeqNo, UC, QMons};
- QPids -> {ConfirmImm, SeqNo + 1,
- dtree:insert(SeqNo, QPids, AckTag, UC),
- pmon:monitor_all(QPids, QMons)}
- end
- end, {[], SeqNo0, UC0, QMons0}, BQS),
- {_Guids, BQS2} = BQ:ack(ConfirmImm1, BQS1),
- {Next, State#q{publish_seqno = SeqNo1,
- unconfirmed = UC1,
- queue_monitors = QMons1,
- backing_queue_state = BQS2}}.
-
-dead_letter_rejected_msgs(AckTags, X, State = #q{dlx_routing_key = RK,
- publish_seqno = SeqNo0,
- unconfirmed = UC0,
- queue_monitors = QMons0,
- backing_queue_state = BQS,
- backing_queue = BQ}) ->
- QName = qname(State),
- {{ConfirmImm1, SeqNo1, UC1, QMons1}, BQS1} =
- BQ:ackfold(
- fun (Msg, AckTag, {ConfirmImm, SeqNo, UC, QMons}) ->
- case dead_letter_publish(Msg, rejected, X, RK, SeqNo, QName) of
- [] -> {[AckTag | ConfirmImm], SeqNo, UC, QMons};
- QPids -> {ConfirmImm, SeqNo + 1,
- dtree:insert(SeqNo, QPids, AckTag, UC),
- pmon:monitor_all(QPids, QMons)}
- end
- end, {[], SeqNo0, UC0, QMons0}, BQS, AckTags),
- {_Guids, BQS2} = BQ:ack(ConfirmImm1, BQS1),
- State#q{publish_seqno = SeqNo1,
- unconfirmed = UC1,
- queue_monitors = QMons1,
- backing_queue_state = BQS2}.
+ {Res, {AckImm1, SeqNo1, UC1, QMons1}, BQS1} =
+ Fun(fun (Msg, AckTag, {AckImm, SeqNo, UC, QMons}) ->
+ case dead_letter_publish(Msg, Reason,
+ X, RK, SeqNo, QName) of
+ [] -> {[AckTag | AckImm], SeqNo, UC, QMons};
+ QPids -> {AckImm, SeqNo + 1,
+ dtree:insert(SeqNo, QPids, AckTag, UC),
+ pmon:monitor_all(QPids, QMons)}
+ end
+ end, {[], SeqNo0, UC0, QMons0}, BQS),
+ {_Guids, BQS2} = BQ:ack(AckImm1, BQS1),
+ {Res, State#q{publish_seqno = SeqNo1,
+ unconfirmed = UC1,
+ queue_monitors = QMons1,
+ backing_queue_state = BQS2}}.
ensure_ttl_timer(undefined, State) ->
State;