summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-09-28 04:32:20 +0200
committerAlvaro Videla <videlalvaro@gmail.com>2015-09-28 04:32:20 +0200
commit1df75264be00f3582e1012b6b95544060d30621c (patch)
treef7b6273b3ffad6cea2f54c4ef34d44366c8298f1 /src
parent31a4b251e4ce303458031d09f29730a8c27aff6d (diff)
downloadrabbitmq-server-git-1df75264be00f3582e1012b6b95544060d30621c.tar.gz
improves fetchwhile performance
Fixes #316
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl67
1 files changed, 56 insertions, 11 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index f50c1bde7e..c15defc790 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -621,17 +621,9 @@ dropwhile(Pred, State) ->
{MsgProps, a(State1)}.
fetchwhile(Pred, Fun, Acc, State) ->
- case queue_out(State) of
- {empty, State1} ->
- {undefined, Acc, a(State1)};
- {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
- case Pred(MsgProps) of
- true -> {Msg, State2} = read_msg(MsgStatus, State1),
- {AckTag, State3} = remove(true, MsgStatus, State2),
- fetchwhile(Pred, Fun, Fun(Msg, AckTag, Acc), State3);
- false -> {MsgProps, Acc, a(in_r(MsgStatus, State1))}
- end
- end.
+ {MsgProps, Acc1, State1} =
+ fetch_by_predicate(Pred, Fun, Acc, State),
+ {MsgProps, Acc1, a(State1)}.
fetch(AckRequired, State) ->
case queue_out(State) of
@@ -1349,6 +1341,59 @@ remove_by_predicate(Pred, State = #vqstate {out_counter = OutCount}) ->
State2 #vqstate {
out_counter = OutCount + ?QUEUE:len(QAcc)})}.
+%% This function exists as a way to improve fetchwhile/4
+%% performance. The idea of having this function is to optimise calls
+%% to rabbit_queue_index by batching delivers, instead of sending them
+%% one by one.
+%%
+%% Fun is the function passed to fetchwhile/4 that's
+%% applied to every fetched message and used to build the fetchwhile/4
+%% result accumulator FetchAcc.
+fetch_by_predicate(Pred, Fun, FetchAcc,
+ State = #vqstate {
+ index_state = IndexState,
+ out_counter = OutCount}) ->
+ {MsgProps, QAcc, State1} =
+ collect_by_predicate(Pred, ?QUEUE:new(), State),
+
+ {Delivers, FetchAcc1, State2} =
+ process_queue_entries(QAcc, Fun, FetchAcc, State1),
+
+ IndexState1 = rabbit_queue_index:deliver(Delivers, IndexState),
+
+ {MsgProps, FetchAcc1, maybe_update_rates(
+ State2 #vqstate {
+ index_state = IndexState1,
+ out_counter = OutCount + ?QUEUE:len(QAcc)})}.
+
+%% We try to do here the same as what remove(true, State) does but
+%% processing several messages at the same time. The idea is to
+%% optimize rabbit_queue_index:deliver/2 calls by sending a list of
+%% SeqIds instead of one by one, thus process_queue_entries1 will
+%% accumulate the required deliveries, will record_pending_ack for
+%% each message, and will update stats, like remove/2 does.
+%%
+%% For the meaning of Fun and FetchAcc arguments see
+%% fetch_by_predicate/4 above.
+process_queue_entries(Q, Fun, FetchAcc, State) ->
+ ?QUEUE:foldl(fun (MsgStatus, Acc) ->
+ process_queue_entries1(MsgStatus, Fun, Acc)
+ end,
+ {[], FetchAcc, State}, Q).
+
+process_queue_entries1(
+ #msg_status { seq_id = SeqId, is_delivered = IsDelivered,
+ index_on_disk = IndexOnDisk} = MsgStatus,
+ Fun,
+ {Delivers, FetchAcc, State}) ->
+ {Msg, State1} = read_msg(MsgStatus, State),
+ State2 = record_pending_ack(
+ MsgStatus #msg_status {
+ is_delivered = true }, State1),
+ {cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
+ Fun(Msg, SeqId, FetchAcc),
+ stats({-1, 1}, {MsgStatus, MsgStatus}, State2)}.
+
collect_by_predicate(Pred, QAcc, State) ->
case queue_out(State) of
{empty, State1} ->