summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-08-27 00:39:33 +0200
committerAlvaro Videla <videlalvaro@gmail.com>2015-08-27 00:39:33 +0200
commita8d40f06ee8c91cb83fcbb912a1f36d8d9e707ce (patch)
tree99586d842e25b0238a7e62537dcc04296b4ce1ab /src
parent1167e5a5808f47ee743634406a40f1050507d8c2 (diff)
downloadrabbitmq-server-git-a8d40f06ee8c91cb83fcbb912a1f36d8d9e707ce.tar.gz
improves push_betas_to_deltas performance
When messages are persistend to the index, instead of pushing one message at a time to the FHC, now first we accumulate them in a list that acts as a cache, and then we push them all at once once push_betas_to_deltas finished recursion. The same is done when marking the message as delivered.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl72
-rw-r--r--src/rabbit_variable_queue.erl84
2 files changed, 143 insertions, 13 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 9bd917ee96..bfa449b2a0 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -18,6 +18,7 @@
-export([erase/1, init/3, recover/6,
terminate/2, delete_and_terminate/1,
+ pre_publish/6, flush_pre_publish_cache/2,
publish/6, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]).
@@ -177,7 +178,8 @@
-record(qistate, {dir, segments, journal_handle, dirty_count,
max_journal_entries, on_sync, on_sync_msg,
- unconfirmed, unconfirmed_msg}).
+ unconfirmed, unconfirmed_msg,
+ pre_publish_cache, delivered_cache}).
-record(segment, {num, path, journal_entries, unacked}).
@@ -210,7 +212,9 @@
on_sync :: on_sync_fun(),
on_sync_msg :: on_sync_fun(),
unconfirmed :: gb_sets:set(),
- unconfirmed_msg :: gb_sets:set()
+ unconfirmed_msg :: gb_sets:set(),
+ pre_publish_cache :: list(),
+ delivered_cache :: list()
}).
-type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())).
-type(walker(A) :: fun ((A) -> 'finished' |
@@ -289,6 +293,66 @@ delete_and_terminate(State) ->
ok = rabbit_file:recursive_delete([Dir]),
State1.
+pre_publish(MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered,
+ State = #qistate{unconfirmed = UC,
+ unconfirmed_msg = UCM,
+ pre_publish_cache = PPC,
+ delivered_cache = DC}) ->
+ MsgId = case MsgOrId of
+ #basic_message{id = Id} -> Id;
+ Id when is_binary(Id) -> Id
+ end,
+ ?MSG_ID_BYTES = size(MsgId),
+
+ State1 =
+ case {MsgProps#message_properties.needs_confirming, MsgOrId} of
+ {true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC),
+ State#qistate{unconfirmed = UC1};
+ {true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM),
+ State#qistate{unconfirmed_msg = UCM1};
+ {false, _} -> State
+ end,
+
+ {Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps),
+
+ PPC1 =
+ [[<<(case IsPersistent of
+ true -> ?PUB_PERSIST_JPREFIX;
+ false -> ?PUB_TRANS_JPREFIX
+ end):?JPREFIX_BITS,
+ SeqId:?SEQ_BITS, Bin/binary,
+ (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin], PPC],
+
+ DC1 =
+ case IsDelivered of
+ true ->
+ [SeqId | DC];
+ false ->
+ DC
+ end,
+
+ add_to_journal(SeqId, {IsPersistent, Bin, MsgBin},
+ State1#qistate{pre_publish_cache = PPC1, delivered_cache = DC1}).
+
+flush_pre_publish_cache(JournalSizeHint, State) ->
+ State1 = flush_pre_publish_cache(State),
+ State2 = flush_delivered_cache(State1),
+ maybe_flush_journal(JournalSizeHint, State2).
+
+flush_pre_publish_cache(#qistate{pre_publish_cache = []} = State) ->
+ State;
+flush_pre_publish_cache(State = #qistate{pre_publish_cache = PPC}) ->
+ {JournalHdl, State1} = get_journal_handle(State),
+ file_handle_cache_stats:update(queue_index_journal_write),
+ ok = file_handle_cache:append(JournalHdl, lists:reverse(PPC)),
+ State1#qistate{pre_publish_cache = []}.
+
+flush_delivered_cache(#qistate{delivered_cache = []} = State) ->
+ State;
+flush_delivered_cache(State = #qistate{delivered_cache = DC}) ->
+ State1 = deliver(lists:reverse(DC), State),
+ State1#qistate{delivered_cache = []}.
+
publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint,
State = #qistate{unconfirmed = UC,
unconfirmed_msg = UCM}) ->
@@ -444,7 +508,9 @@ blank_state_dir(Dir) ->
on_sync = fun (_) -> ok end,
on_sync_msg = fun (_) -> ok end,
unconfirmed = gb_sets:new(),
- unconfirmed_msg = gb_sets:new() }.
+ unconfirmed_msg = gb_sets:new(),
+ pre_publish_cache = [],
+ delivered_cache = [] }.
init_clean(RecoveredCounts, State) ->
%% Load the journal. Since this is a clean recovery this (almost)
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 691e4ce2e2..657215327e 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1367,6 +1367,34 @@ maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
maybe_write_msg_to_disk(_Force, MsgStatus, State) ->
{MsgStatus, State}.
+%% Due to certain optimizations made inside
+%% rabbit_queue_index:pre_publish/6 we need to have two separate
+%% functions for index persistence. This one is only used when paging
+%% during memory pressure.
+write_index_to_disk_paging(MsgStatus = #msg_status {
+ index_on_disk = true }, State) ->
+ {MsgStatus, State};
+write_index_to_disk_paging(MsgStatus = #msg_status {
+ msg = Msg,
+ msg_id = MsgId,
+ seq_id = SeqId,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_props = MsgProps},
+ State = #vqstate { disk_write_count = DiskWriteCount,
+ index_state = IndexState }) ->
+ {MsgOrId, DiskWriteCount1} =
+ case persist_to(MsgStatus) of
+ msg_store -> {MsgId, DiskWriteCount};
+ queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1}
+ end,
+ IndexState1 = rabbit_queue_index:pre_publish(
+ MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered,
+ IndexState),
+ {MsgStatus#msg_status{index_on_disk = true},
+ State#vqstate{index_state = IndexState1,
+ disk_write_count = DiskWriteCount1}}.
+
maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
index_on_disk = true }, State) ->
{MsgStatus, State};
@@ -1971,7 +1999,11 @@ push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,
delta = Delta1,
q3 = Q3a }.
-push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
+push_betas_to_deltas(Generator, LimitFun, Q,
+ {_Quota, _Delta,
+ #vqstate{
+ ram_msg_count = CurrRamReadyCount,
+ ram_bytes = CurrRamBytes}} = PushState) ->
case ?QUEUE:is_empty(Q) of
true ->
{Q, PushState};
@@ -1981,28 +2013,60 @@ push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
Limit = LimitFun(MinSeqId),
case MaxSeqId < Limit of
true -> {Q, PushState};
- false -> push_betas_to_deltas1(Generator, Limit, Q, PushState)
+ false -> push_betas_to_deltas1(Generator, Limit, Q,
+ CurrRamReadyCount, CurrRamBytes,
+ PushState)
end
end.
-push_betas_to_deltas1(_Generator, _Limit, Q, {0, _Delta, _State} = PushState) ->
- {Q, PushState};
-push_betas_to_deltas1(Generator, Limit, Q, {Quota, Delta, State} = PushState) ->
+push_betas_to_deltas1(_Generator, _Limit, Q,
+ CurrRamReadyCount, CurrRamBytes,
+ {0, Delta, State =
+ #vqstate{index_state = IndexState,
+ target_ram_count = TargetRamCount}}) ->
+ IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
+ TargetRamCount, IndexState),
+ {Q, {0, Delta, State#vqstate{index_state = IndexState1,
+ ram_msg_count = CurrRamReadyCount,
+ ram_bytes = CurrRamBytes}}};
+push_betas_to_deltas1(Generator, Limit, Q,
+ CurrRamReadyCount, CurrRamBytes,
+ {Quota, Delta, State =
+ #vqstate{index_state = IndexState,
+ target_ram_count = TargetRamCount}}) ->
case Generator(Q) of
{empty, _Q} ->
- {Q, PushState};
+ IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
+ TargetRamCount, IndexState),
+ {Q, {Quota, Delta, State#vqstate{index_state = IndexState1,
+ ram_msg_count = CurrRamReadyCount,
+ ram_bytes = CurrRamBytes}}};
{{value, #msg_status { seq_id = SeqId }}, _Qa}
when SeqId < Limit ->
- {Q, PushState};
+ IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
+ TargetRamCount, IndexState),
+ {Q, {Quota, Delta, State#vqstate{index_state = IndexState1,
+ ram_msg_count = CurrRamReadyCount,
+ ram_bytes = CurrRamBytes}}};
{{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} ->
{#msg_status { index_on_disk = true }, State1} =
- maybe_write_index_to_disk(true, MsgStatus, State),
- State2 = stats(ready0, {MsgStatus, none}, State1),
+ write_index_to_disk_paging(MsgStatus, State),
+ {Size, DeltaRam} = size_and_delta_ram(MsgStatus),
Delta1 = expand_delta(SeqId, Delta),
push_betas_to_deltas1(Generator, Limit, Qa,
- {Quota - 1, Delta1, State2})
+ CurrRamReadyCount + DeltaRam,
+ CurrRamBytes + DeltaRam * Size,
+ {Quota - 1, Delta1, State1})
end.
+%% Ooptimised version for paging only, based on stats/3 being called
+%% like this: stats(ready0, {MsgStatus, none}, State1).
+size_and_delta_ram(#msg_status{msg_props = #message_properties{size = Size},
+ msg = undefined}) ->
+ {Size, 0};
+size_and_delta_ram(#msg_status{msg_props = #message_properties{size = Size}}) ->
+ {Size, -1}.
+
%%----------------------------------------------------------------------------
%% Upgrading
%%----------------------------------------------------------------------------