summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_queue_index.erl84
-rw-r--r--src/rabbit_variable_queue.erl75
2 files changed, 143 insertions, 16 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 6f36d4f0dc..7c8fdf38c0 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -18,6 +18,7 @@
-export([erase/1, init/3, recover/6,
terminate/2, delete_and_terminate/1,
+ pre_publish/7, flush_pre_publish_cache/2,
publish/6, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]).
@@ -177,7 +178,8 @@
-record(qistate, {dir, segments, journal_handle, dirty_count,
max_journal_entries, on_sync, on_sync_msg,
- unconfirmed, unconfirmed_msg}).
+ unconfirmed, unconfirmed_msg,
+ pre_publish_cache, delivered_cache}).
-record(segment, {num, path, journal_entries,
entries_to_segment, unacked}).
@@ -212,7 +214,9 @@
on_sync :: on_sync_fun(),
on_sync_msg :: on_sync_fun(),
unconfirmed :: gb_sets:set(),
- unconfirmed_msg :: gb_sets:set()
+ unconfirmed_msg :: gb_sets:set(),
+ pre_publish_cache :: list(),
+ delivered_cache :: list()
}).
-type(contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean())).
-type(walker(A) :: fun ((A) -> 'finished' |
@@ -291,6 +295,78 @@ delete_and_terminate(State) ->
ok = rabbit_file:recursive_delete([Dir]),
State1.
+pre_publish(MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered, JournalSizeHint,
+ State = #qistate{unconfirmed = UC,
+ unconfirmed_msg = UCM,
+ pre_publish_cache = PPC,
+ delivered_cache = DC}) ->
+ MsgId = case MsgOrId of
+ #basic_message{id = Id} -> Id;
+ Id when is_binary(Id) -> Id
+ end,
+ ?MSG_ID_BYTES = size(MsgId),
+
+ State1 =
+ case {MsgProps#message_properties.needs_confirming, MsgOrId} of
+ {true, MsgId} -> UC1 = gb_sets:add_element(MsgId, UC),
+ State#qistate{unconfirmed = UC1};
+ {true, _} -> UCM1 = gb_sets:add_element(MsgId, UCM),
+ State#qistate{unconfirmed_msg = UCM1};
+ {false, _} -> State
+ end,
+
+ {Bin, MsgBin} = create_pub_record_body(MsgOrId, MsgProps),
+
+ PPC1 =
+ [[<<(case IsPersistent of
+ true -> ?PUB_PERSIST_JPREFIX;
+ false -> ?PUB_TRANS_JPREFIX
+ end):?JPREFIX_BITS,
+ SeqId:?SEQ_BITS, Bin/binary,
+ (size(MsgBin)):?EMBEDDED_SIZE_BITS>>, MsgBin], PPC],
+
+ DC1 =
+ case IsDelivered of
+ true ->
+ [SeqId | DC];
+ false ->
+ DC
+ end,
+
+ add_to_journal(SeqId, {IsPersistent, Bin, MsgBin},
+ maybe_flush_pre_publish_cache(
+ JournalSizeHint,
+ State1#qistate{pre_publish_cache = PPC1,
+ delivered_cache = DC1})).
+
+%% pre_publish_cache is the entry with most elements when comapred to
+%% delivered_cache so we only check the former in the guard.
+maybe_flush_pre_publish_cache(JournalSizeHint,
+ #qistate{pre_publish_cache = PPC} = State)
+ when length(PPC) >= ?SEGMENT_ENTRY_COUNT ->
+ flush_pre_publish_cache(JournalSizeHint, State);
+maybe_flush_pre_publish_cache(_JournalSizeHint, State) ->
+ State.
+
+flush_pre_publish_cache(JournalSizeHint, State) ->
+ State1 = flush_pre_publish_cache(State),
+ State2 = flush_delivered_cache(State1),
+ maybe_flush_journal(JournalSizeHint, State2).
+
+flush_pre_publish_cache(#qistate{pre_publish_cache = []} = State) ->
+ State;
+flush_pre_publish_cache(State = #qistate{pre_publish_cache = PPC}) ->
+ {JournalHdl, State1} = get_journal_handle(State),
+ file_handle_cache_stats:update(queue_index_journal_write),
+ ok = file_handle_cache:append(JournalHdl, lists:reverse(PPC)),
+ State1#qistate{pre_publish_cache = []}.
+
+flush_delivered_cache(#qistate{delivered_cache = []} = State) ->
+ State;
+flush_delivered_cache(State = #qistate{delivered_cache = DC}) ->
+ State1 = deliver(lists:reverse(DC), State),
+ State1#qistate{delivered_cache = []}.
+
publish(MsgOrId, SeqId, MsgProps, IsPersistent, JournalSizeHint,
State = #qistate{unconfirmed = UC,
unconfirmed_msg = UCM}) ->
@@ -446,7 +522,9 @@ blank_state_dir(Dir) ->
on_sync = fun (_) -> ok end,
on_sync_msg = fun (_) -> ok end,
unconfirmed = gb_sets:new(),
- unconfirmed_msg = gb_sets:new() }.
+ unconfirmed_msg = gb_sets:new(),
+ pre_publish_cache = [],
+ delivered_cache = [] }.
init_clean(RecoveredCounts, State) ->
%% Load the journal. Since this is a clean recovery this (almost)
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 859dc6051c..41a13ac9b6 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1440,6 +1440,43 @@ maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
maybe_write_msg_to_disk(_Force, MsgStatus, State) ->
{MsgStatus, State}.
+%% Due to certain optimizations made inside
+%% rabbit_queue_index:pre_publish/7 we need to have two separate
+%% functions for index persistence. This one is only used when paging
+%% during memory pressure. We didn't want to modify
+%% maybe_write_index_to_disk/3 because that function is used in other
+%% places.
+maybe_batch_write_index_to_disk(_Force,
+ MsgStatus = #msg_status {
+ index_on_disk = true }, State) ->
+ {MsgStatus, State};
+maybe_batch_write_index_to_disk(Force,
+ MsgStatus = #msg_status {
+ msg = Msg,
+ msg_id = MsgId,
+ seq_id = SeqId,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_props = MsgProps},
+ State = #vqstate {
+ target_ram_count = TargetRamCount,
+ disk_write_count = DiskWriteCount,
+ index_state = IndexState})
+ when Force orelse IsPersistent ->
+ {MsgOrId, DiskWriteCount1} =
+ case persist_to(MsgStatus) of
+ msg_store -> {MsgId, DiskWriteCount};
+ queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1}
+ end,
+ IndexState1 = rabbit_queue_index:pre_publish(
+ MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered,
+ TargetRamCount, IndexState),
+ {MsgStatus#msg_status{index_on_disk = true},
+ State#vqstate{index_state = IndexState1,
+ disk_write_count = DiskWriteCount1}};
+maybe_batch_write_index_to_disk(_Force, MsgStatus, State) ->
+ {MsgStatus, State}.
+
maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
index_on_disk = true }, State) ->
{MsgStatus, State};
@@ -1474,6 +1511,10 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) ->
{MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State),
maybe_write_index_to_disk(ForceIndex, MsgStatus1, State1).
+maybe_prepare_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) ->
+ {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State),
+ maybe_batch_write_index_to_disk(ForceIndex, MsgStatus1, State1).
+
determine_persist_to(#basic_message{
content = #content{properties = Props,
properties_bin = PropsBin}},
@@ -1861,16 +1902,16 @@ reduce_memory_use(State = #vqstate {
end.
limit_ram_acks(0, State) ->
- {0, State};
+ {0, ui(State)};
limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
disk_pending_ack = DPA }) ->
case gb_trees:is_empty(RPA) of
true ->
- {Quota, State};
+ {Quota, ui(State)};
false ->
{SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA),
{MsgStatus1, State1} =
- maybe_write_to_disk(true, false, MsgStatus, State),
+ maybe_prepare_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA),
limit_ram_acks(Quota - 1,
@@ -2008,16 +2049,17 @@ push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
when Quota =:= 0 orelse
TargetRamCount =:= infinity orelse
TargetRamCount >= RamMsgCount ->
- {Quota, State};
+ {Quota, ui(State)};
push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
case credit_flow:blocked() of
- true -> {Quota, State};
+ true -> {Quota, ui(State)};
false -> case Generator(Q) of
{empty, _Q} ->
- {Quota, State};
+ {Quota, ui(State)};
{{value, MsgStatus}, Qa} ->
{MsgStatus1, State1} =
- maybe_write_to_disk(true, false, MsgStatus, State),
+ maybe_prepare_write_to_disk(true, false, MsgStatus,
+ State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
State2 = stats(
ready0, {MsgStatus, MsgStatus2}, State1),
@@ -2058,24 +2100,31 @@ push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
end
end.
-push_betas_to_deltas1(_Generator, _Limit, Q, {0, _Delta, _State} = PushState) ->
- {Q, PushState};
-push_betas_to_deltas1(Generator, Limit, Q, {Quota, Delta, State} = PushState) ->
+push_betas_to_deltas1(_Generator, _Limit, Q, {0, Delta, State}) ->
+ {Q, {0, Delta, ui(State)}};
+push_betas_to_deltas1(Generator, Limit, Q, {Quota, Delta, State}) ->
case Generator(Q) of
{empty, _Q} ->
- {Q, PushState};
+ {Q, {Quota, Delta, ui(State)}};
{{value, #msg_status { seq_id = SeqId }}, _Qa}
when SeqId < Limit ->
- {Q, PushState};
+ {Q, {Quota, Delta, ui(State)}};
{{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} ->
{#msg_status { index_on_disk = true }, State1} =
- maybe_write_index_to_disk(true, MsgStatus, State),
+ maybe_batch_write_index_to_disk(true, MsgStatus, State),
State2 = stats(ready0, {MsgStatus, none}, State1),
Delta1 = expand_delta(SeqId, Delta),
push_betas_to_deltas1(Generator, Limit, Qa,
{Quota - 1, Delta1, State2})
end.
+%% Flushes queue index batch caches and updates queue index state.
+ui(#vqstate{index_state = IndexState,
+ target_ram_count = TargetRamCount} = State) ->
+ IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
+ TargetRamCount, IndexState),
+ State#vqstate{index_state = IndexState1}.
+
%%----------------------------------------------------------------------------
%% Upgrading
%%----------------------------------------------------------------------------