diff options
| author | Michael Klishin <michael@novemberain.com> | 2015-09-17 07:46:48 -0700 |
|---|---|---|
| committer | Michael Klishin <michael@novemberain.com> | 2015-09-17 07:46:48 -0700 |
| commit | 47f3e12953892d58426c38a43aa99fdbd59415de (patch) | |
| tree | 04df5479b901a789680a9152214109326361eeff /src | |
| parent | 0c9bf69dc9d96cfe4b100c289e7ef259caa74253 (diff) | |
| parent | 522d7649503ecef10cd215ef8d02c908f9619645 (diff) | |
| download | rabbitmq-server-git-47f3e12953892d58426c38a43aa99fdbd59415de.tar.gz | |
Merge pull request #309 from rabbitmq/rabbitmq-server-308
Improves dropwhile performance
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 135 |
1 files changed, 91 insertions, 44 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index db95b8c844..995282b344 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -616,16 +616,9 @@ drain_confirmed(State = #vqstate { confirmed = C }) -> end. dropwhile(Pred, State) -> - case queue_out(State) of - {empty, State1} -> - {undefined, a(State1)}; - {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> - case Pred(MsgProps) of - true -> {_, State2} = remove(false, MsgStatus, State1), - dropwhile(Pred, State2); - false -> {MsgProps, a(in_r(MsgStatus, State1))} - end - end. + {MsgProps, State1} = + remove_by_predicate(Pred, State), + {MsgProps, a(State1)}. fetchwhile(Pred, Fun, Acc, State) -> case queue_out(State) of @@ -1271,48 +1264,102 @@ msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size. msg_in_ram(#msg_status{msg = Msg}) -> Msg =/= undefined. -remove(AckRequired, MsgStatus = #msg_status { - seq_id = SeqId, - msg_id = MsgId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_in_store = MsgInStore, - index_on_disk = IndexOnDisk }, +%% first param: AckRequired +remove(true, MsgStatus = #msg_status { + seq_id = SeqId, + is_delivered = IsDelivered, + index_on_disk = IndexOnDisk }, + State = #vqstate {out_counter = OutCount, + index_state = IndexState}) -> + %% Mark it delivered if necessary + IndexState1 = maybe_write_delivered( + IndexOnDisk andalso not IsDelivered, + SeqId, IndexState), + + State1 = record_pending_ack( + MsgStatus #msg_status { + is_delivered = true }, State), + + State2 = stats({-1, 1}, {MsgStatus, MsgStatus}, State1), + + {SeqId, maybe_update_rates( + State2 #vqstate {out_counter = OutCount + 1, + index_state = IndexState1})}; + +%% This function body has the same behaviour as remove_queue_entries/3 +%% but instead of removing messages based on a ?QUEUE, this removes +%% just one message, the one referenced by the MsgStatus provided. +remove(false, MsgStatus = #msg_status { + seq_id = SeqId, + msg_id = MsgId, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_in_store = MsgInStore, + index_on_disk = IndexOnDisk }, State = #vqstate {out_counter = OutCount, index_state = IndexState, msg_store_clients = MSCState}) -> - %% 1. Mark it delivered if necessary + %% Mark it delivered if necessary IndexState1 = maybe_write_delivered( IndexOnDisk andalso not IsDelivered, SeqId, IndexState), - %% 2. Remove from msg_store and queue index, if necessary - Rem = fun () -> - ok = msg_store_remove(MSCState, IsPersistent, [MsgId]) - end, - Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, - IndexState2 = case {AckRequired, MsgInStore, IndexOnDisk} of - {false, true, false} -> Rem(), IndexState1; - {false, true, true} -> Rem(), Ack(); - {false, false, true} -> Ack(); - _ -> IndexState1 - end, + %% Remove from msg_store and queue index, if necessary + case MsgInStore of + true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]); + false -> ok + end, - %% 3. If an ack is required, add something sensible to PA - {AckTag, State1} = case AckRequired of - true -> StateN = record_pending_ack( - MsgStatus #msg_status { - is_delivered = true }, State), - {SeqId, StateN}; - false -> {undefined, State} - end, - State2 = case AckRequired of - false -> stats({-1, 0}, {MsgStatus, none}, State1); - true -> stats({-1, 1}, {MsgStatus, MsgStatus}, State1) - end, - {AckTag, maybe_update_rates( - State2 #vqstate {out_counter = OutCount + 1, - index_state = IndexState2})}. + IndexState2 = + case IndexOnDisk of + true -> rabbit_queue_index:ack([SeqId], IndexState1); + false -> IndexState1 + end, + + State1 = stats({-1, 0}, {MsgStatus, none}, State), + + {undefined, maybe_update_rates( + State1 #vqstate {out_counter = OutCount + 1, + index_state = IndexState2})}. + +%% This function exists as a way to improve dropwhile/2 +%% performance. The idea of having this function is to optimise calls +%% to rabbit_queue_index by batching delivers and acks, instead of +%% sending them one by one. +%% +%% Instead of removing every message as their are popped from the +%% queue, it first accumulates them and then removes them by calling +%% remove_queue_entries/3, since the behaviour of +%% remove_queue_entries/3 when used with +%% process_delivers_and_acks_fun(deliver_and_ack) is the same as +%% calling remove(false, MsgStatus, State). +%% +%% remove/3 also updates the out_counter in every call, but here we do +%% it just once at the end. +remove_by_predicate(Pred, State = #vqstate {out_counter = OutCount}) -> + {MsgProps, QAcc, State1} = + collect_by_predicate(Pred, ?QUEUE:new(), State), + State2 = + remove_queue_entries( + QAcc, process_delivers_and_acks_fun(deliver_and_ack), State1), + %% maybe_update_rates/1 is called in remove/2 for every + %% message. Since we update out_counter only once, we call it just + %% there. + {MsgProps, maybe_update_rates( + State2 #vqstate { + out_counter = OutCount + ?QUEUE:len(QAcc)})}. + +collect_by_predicate(Pred, QAcc, State) -> + case queue_out(State) of + {empty, State1} -> + {undefined, QAcc, State1}; + {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> + case Pred(MsgProps) of + true -> collect_by_predicate(Pred, ?QUEUE:in(MsgStatus, QAcc), + State1); + false -> {MsgProps, QAcc, in_r(MsgStatus, State1)} + end + end. %%---------------------------------------------------------------------------- %% Helpers for Public API purge/1 function |
