diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-06-17 14:54:46 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-06-17 14:54:46 +0100 |
| commit | dbfc0120b8738ecb283a104a57a12dbb4fb64866 (patch) | |
| tree | 2908848c463227402ee2d582ccbf33f2bde170f8 | |
| parent | d0343eeae8640130339cc65fb647b13e4204241b (diff) | |
| download | rabbitmq-server-git-dbfc0120b8738ecb283a104a57a12dbb4fb64866.tar.gz | |
added batching for autoacks for general run_message_queue
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 31 |
1 files changed, 15 insertions, 16 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0ab44a532d..593746a710 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -237,30 +237,29 @@ deliver_queue(Funs = {PredFun, DeliverFun}, FunAcc0, {FunAcc0, State} end. -deliver_from_queue_pred(IsEmpty, _State) -> +deliver_from_queue_pred({IsEmpty, _AutoAcks}, _State) -> not IsEmpty. -deliver_from_queue_deliver(AckRequired, false, +deliver_from_queue_deliver(AckRequired, {false, AutoAcks}, State = #q { mixed_state = MS }) -> - {Res, MS2} = rabbit_mixed_queue:deliver(MS), - {Res2, MS3, IsEmpty} = - case Res of - empty -> {empty, MS2, true}; - {Msg, IsDelivered, AckTag, Remaining} -> - {ok, MS4} = case AckRequired of - true -> {ok, MS2}; - false -> rabbit_mixed_queue:ack([AckTag], MS2) - end, - {{Msg, IsDelivered, AckTag}, MS4, 0 == Remaining} + {{Msg, IsDelivered, AckTag, Remaining}, MS2} = + rabbit_mixed_queue:deliver(MS), + AutoAcks2 = + case AckRequired of + true -> AutoAcks; + false -> [AckTag | AutoAcks] end, - {Res2, IsEmpty, State #q { mixed_state = MS3 }}. + {{Msg, IsDelivered, AckTag}, {0 == Remaining, AutoAcks2}, + State #q { mixed_state = MS2 }}. run_message_queue(State = #q { mixed_state = MS }) -> Funs = { fun deliver_from_queue_pred/2, fun deliver_from_queue_deliver/3 }, IsEmpty = rabbit_mixed_queue:is_empty(MS), - {_IsEmpty2, State2} = - deliver_queue(Funs, IsEmpty, State), - State2. + {{_IsEmpty2, AutoAcks}, State2} = + deliver_queue(Funs, {IsEmpty, []}, State), + {ok, MS2} = + rabbit_mixed_queue:ack(lists:reverse(AutoAcks), State2 #q.mixed_state), + State2 #q { mixed_state = MS2 }. attempt_immediate_delivery(none, _ChPid, Msg, State) -> PredFun = fun (IsEmpty, _State) -> not IsEmpty end, |
