summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2015-09-17 07:46:48 -0700
committerMichael Klishin <michael@novemberain.com>2015-09-17 07:46:48 -0700
commit47f3e12953892d58426c38a43aa99fdbd59415de (patch)
tree04df5479b901a789680a9152214109326361eeff /src
parent0c9bf69dc9d96cfe4b100c289e7ef259caa74253 (diff)
parent522d7649503ecef10cd215ef8d02c908f9619645 (diff)
downloadrabbitmq-server-git-47f3e12953892d58426c38a43aa99fdbd59415de.tar.gz
Merge pull request #309 from rabbitmq/rabbitmq-server-308
Improves dropwhile performance
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl135
1 files changed, 91 insertions, 44 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index db95b8c844..995282b344 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -616,16 +616,9 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
end.
dropwhile(Pred, State) ->
- case queue_out(State) of
- {empty, State1} ->
- {undefined, a(State1)};
- {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
- case Pred(MsgProps) of
- true -> {_, State2} = remove(false, MsgStatus, State1),
- dropwhile(Pred, State2);
- false -> {MsgProps, a(in_r(MsgStatus, State1))}
- end
- end.
+ {MsgProps, State1} =
+ remove_by_predicate(Pred, State),
+ {MsgProps, a(State1)}.
fetchwhile(Pred, Fun, Acc, State) ->
case queue_out(State) of
@@ -1271,48 +1264,102 @@ msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size.
msg_in_ram(#msg_status{msg = Msg}) -> Msg =/= undefined.
-remove(AckRequired, MsgStatus = #msg_status {
- seq_id = SeqId,
- msg_id = MsgId,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_in_store = MsgInStore,
- index_on_disk = IndexOnDisk },
+%% first param: AckRequired
+remove(true, MsgStatus = #msg_status {
+ seq_id = SeqId,
+ is_delivered = IsDelivered,
+ index_on_disk = IndexOnDisk },
+ State = #vqstate {out_counter = OutCount,
+ index_state = IndexState}) ->
+ %% Mark it delivered if necessary
+ IndexState1 = maybe_write_delivered(
+ IndexOnDisk andalso not IsDelivered,
+ SeqId, IndexState),
+
+ State1 = record_pending_ack(
+ MsgStatus #msg_status {
+ is_delivered = true }, State),
+
+ State2 = stats({-1, 1}, {MsgStatus, MsgStatus}, State1),
+
+ {SeqId, maybe_update_rates(
+ State2 #vqstate {out_counter = OutCount + 1,
+ index_state = IndexState1})};
+
+%% This function body has the same behaviour as remove_queue_entries/3
+%% but instead of removing messages based on a ?QUEUE, this removes
+%% just one message, the one referenced by the MsgStatus provided.
+remove(false, MsgStatus = #msg_status {
+ seq_id = SeqId,
+ msg_id = MsgId,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_in_store = MsgInStore,
+ index_on_disk = IndexOnDisk },
State = #vqstate {out_counter = OutCount,
index_state = IndexState,
msg_store_clients = MSCState}) ->
- %% 1. Mark it delivered if necessary
+ %% Mark it delivered if necessary
IndexState1 = maybe_write_delivered(
IndexOnDisk andalso not IsDelivered,
SeqId, IndexState),
- %% 2. Remove from msg_store and queue index, if necessary
- Rem = fun () ->
- ok = msg_store_remove(MSCState, IsPersistent, [MsgId])
- end,
- Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
- IndexState2 = case {AckRequired, MsgInStore, IndexOnDisk} of
- {false, true, false} -> Rem(), IndexState1;
- {false, true, true} -> Rem(), Ack();
- {false, false, true} -> Ack();
- _ -> IndexState1
- end,
+ %% Remove from msg_store and queue index, if necessary
+ case MsgInStore of
+ true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]);
+ false -> ok
+ end,
- %% 3. If an ack is required, add something sensible to PA
- {AckTag, State1} = case AckRequired of
- true -> StateN = record_pending_ack(
- MsgStatus #msg_status {
- is_delivered = true }, State),
- {SeqId, StateN};
- false -> {undefined, State}
- end,
- State2 = case AckRequired of
- false -> stats({-1, 0}, {MsgStatus, none}, State1);
- true -> stats({-1, 1}, {MsgStatus, MsgStatus}, State1)
- end,
- {AckTag, maybe_update_rates(
- State2 #vqstate {out_counter = OutCount + 1,
- index_state = IndexState2})}.
+ IndexState2 =
+ case IndexOnDisk of
+ true -> rabbit_queue_index:ack([SeqId], IndexState1);
+ false -> IndexState1
+ end,
+
+ State1 = stats({-1, 0}, {MsgStatus, none}, State),
+
+ {undefined, maybe_update_rates(
+ State1 #vqstate {out_counter = OutCount + 1,
+ index_state = IndexState2})}.
+
+%% This function exists as a way to improve dropwhile/2
+%% performance. The idea of having this function is to optimise calls
+%% to rabbit_queue_index by batching delivers and acks, instead of
+%% sending them one by one.
+%%
+%% Instead of removing every message as their are popped from the
+%% queue, it first accumulates them and then removes them by calling
+%% remove_queue_entries/3, since the behaviour of
+%% remove_queue_entries/3 when used with
+%% process_delivers_and_acks_fun(deliver_and_ack) is the same as
+%% calling remove(false, MsgStatus, State).
+%%
+%% remove/3 also updates the out_counter in every call, but here we do
+%% it just once at the end.
+remove_by_predicate(Pred, State = #vqstate {out_counter = OutCount}) ->
+ {MsgProps, QAcc, State1} =
+ collect_by_predicate(Pred, ?QUEUE:new(), State),
+ State2 =
+ remove_queue_entries(
+ QAcc, process_delivers_and_acks_fun(deliver_and_ack), State1),
+ %% maybe_update_rates/1 is called in remove/2 for every
+ %% message. Since we update out_counter only once, we call it just
+ %% there.
+ {MsgProps, maybe_update_rates(
+ State2 #vqstate {
+ out_counter = OutCount + ?QUEUE:len(QAcc)})}.
+
+collect_by_predicate(Pred, QAcc, State) ->
+ case queue_out(State) of
+ {empty, State1} ->
+ {undefined, QAcc, State1};
+ {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
+ case Pred(MsgProps) of
+ true -> collect_by_predicate(Pred, ?QUEUE:in(MsgStatus, QAcc),
+ State1);
+ false -> {MsgProps, QAcc, in_r(MsgStatus, State1)}
+ end
+ end.
%%----------------------------------------------------------------------------
%% Helpers for Public API purge/1 function