summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-11-16 02:23:45 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-11-16 02:23:45 +0000
commita1e21bf18db6563f3e45178cfa4da34bf1446615 (patch)
treead0d7a5478ef1524f2d352b6db2d938ec20873fd /src
parent3b432ffe15bf40647e2c63352614703cc1e34fef (diff)
downloadrabbitmq-server-git-a1e21bf18db6563f3e45178cfa4da34bf1446615.tar.gz
refactor: simplify dlx logic top half
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl50
1 files changed, 20 insertions, 30 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 8d7535f8fc..b679cb0870 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -705,17 +705,18 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
T -> now_micros() + T * 1000
end.
-drop_expired_messages(State = #q{backing_queue_state = BQS,
+drop_expired_messages(State = #q{dlx = DLX,
+ backing_queue_state = BQS,
backing_queue = BQ }) ->
Now = now_micros(),
- DLXFun = dead_letter_fun(expired, State),
ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
- {Props, BQS1} = case DLXFun of
+ {Props, BQS1} = case DLX of
undefined -> {Next, undefined, BQS2} =
BQ:dropwhile(ExpirePred, false, BQS),
{Next, BQS2};
_ -> {Next, Msgs, BQS2} =
BQ:dropwhile(ExpirePred, true, BQS),
+ DLXFun = dead_letter_fun(expired),
DLXFun(Msgs),
{Next, BQS2}
end,
@@ -743,17 +744,7 @@ ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef,
ensure_ttl_timer(_Expiry, State) ->
State.
-ack_if_no_dlx(AckTags, State = #q{dlx = undefined,
- backing_queue = BQ,
- backing_queue_state = BQS }) ->
- {_Guids, BQS1} = BQ:ack(AckTags, BQS),
- State#q{backing_queue_state = BQS1};
-ack_if_no_dlx(_AckTags, State) ->
- State.
-
-dead_letter_fun(_Reason, #q{dlx = undefined}) ->
- undefined;
-dead_letter_fun(Reason, _State) ->
+dead_letter_fun(Reason) ->
fun(Msgs) -> gen_server2:cast(self(), {dead_letter, Msgs, Reason}) end.
dead_letter_publish(Msg, Reason, X, State = #q{publish_seqno = MsgSeqNo}) ->
@@ -1217,24 +1208,23 @@ handle_cast({ack, AckTags, ChPid}, State) ->
State1#q{backing_queue_state = BQS1}
end));
-handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
+handle_cast({reject, AckTags, true, ChPid}, State) ->
noreply(subtract_acks(
ChPid, AckTags, State,
- case Requeue of
- true -> fun (State1) -> requeue_and_run(AckTags, State1) end;
- false -> fun (State1 = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- Fun =
- case dead_letter_fun(rejected, State1) of
- undefined -> undefined;
- F -> fun(M, A) -> F([{M, A}])
- end
- end,
- BQS1 = BQ:fold(Fun, BQS, AckTags),
- ack_if_no_dlx(
- AckTags,
- State1#q{backing_queue_state = BQS1})
- end
+ fun (State1) -> requeue_and_run(AckTags, State1) end));
+
+handle_cast({reject, AckTags, false, ChPid}, State = #q{dlx = undefined}) ->
+ handle_cast({ack, AckTags, ChPid}, State);
+
+handle_cast({reject, AckTags, false, ChPid}, State) ->
+ DLXFun = dead_letter_fun(rejected),
+ noreply(subtract_acks(
+ ChPid, AckTags, State,
+ fun (State1 = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ BQS1 = BQ:fold(fun(M, A) -> DLXFun([{M, A}]) end,
+ BQS, AckTags),
+ State1#q{backing_queue_state = BQS1}
end));
handle_cast(delete_immediately, State) ->