summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-17 14:54:46 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-17 14:54:46 +0100
commitdbfc0120b8738ecb283a104a57a12dbb4fb64866 (patch)
tree2908848c463227402ee2d582ccbf33f2bde170f8
parentd0343eeae8640130339cc65fb647b13e4204241b (diff)
downloadrabbitmq-server-git-dbfc0120b8738ecb283a104a57a12dbb4fb64866.tar.gz
added batching for autoacks for general run_message_queue
-rw-r--r--src/rabbit_amqqueue_process.erl31
1 files changed, 15 insertions, 16 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 0ab44a532d..593746a710 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -237,30 +237,29 @@ deliver_queue(Funs = {PredFun, DeliverFun}, FunAcc0,
{FunAcc0, State}
end.
-deliver_from_queue_pred(IsEmpty, _State) ->
+deliver_from_queue_pred({IsEmpty, _AutoAcks}, _State) ->
not IsEmpty.
-deliver_from_queue_deliver(AckRequired, false,
+deliver_from_queue_deliver(AckRequired, {false, AutoAcks},
State = #q { mixed_state = MS }) ->
- {Res, MS2} = rabbit_mixed_queue:deliver(MS),
- {Res2, MS3, IsEmpty} =
- case Res of
- empty -> {empty, MS2, true};
- {Msg, IsDelivered, AckTag, Remaining} ->
- {ok, MS4} = case AckRequired of
- true -> {ok, MS2};
- false -> rabbit_mixed_queue:ack([AckTag], MS2)
- end,
- {{Msg, IsDelivered, AckTag}, MS4, 0 == Remaining}
+ {{Msg, IsDelivered, AckTag, Remaining}, MS2} =
+ rabbit_mixed_queue:deliver(MS),
+ AutoAcks2 =
+ case AckRequired of
+ true -> AutoAcks;
+ false -> [AckTag | AutoAcks]
end,
- {Res2, IsEmpty, State #q { mixed_state = MS3 }}.
+ {{Msg, IsDelivered, AckTag}, {0 == Remaining, AutoAcks2},
+ State #q { mixed_state = MS2 }}.
run_message_queue(State = #q { mixed_state = MS }) ->
Funs = { fun deliver_from_queue_pred/2,
fun deliver_from_queue_deliver/3 },
IsEmpty = rabbit_mixed_queue:is_empty(MS),
- {_IsEmpty2, State2} =
- deliver_queue(Funs, IsEmpty, State),
- State2.
+ {{_IsEmpty2, AutoAcks}, State2} =
+ deliver_queue(Funs, {IsEmpty, []}, State),
+ {ok, MS2} =
+ rabbit_mixed_queue:ack(lists:reverse(AutoAcks), State2 #q.mixed_state),
+ State2 #q { mixed_state = MS2 }.
attempt_immediate_delivery(none, _ChPid, Msg, State) ->
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,