summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit_msg_store.hrl3
-rw-r--r--src/rabbit_msg_store.erl262
-rw-r--r--src/rabbit_msg_store_gc.erl28
-rw-r--r--src/rabbit_tests.erl39
-rw-r--r--src/rabbit_variable_queue.erl25
5 files changed, 282 insertions, 75 deletions
diff --git a/include/rabbit_msg_store.hrl b/include/rabbit_msg_store.hrl
index 0e9a0408cb..a094454a78 100644
--- a/include/rabbit_msg_store.hrl
+++ b/include/rabbit_msg_store.hrl
@@ -34,7 +34,7 @@
-record(file_summary,
{file, valid_total_size, contiguous_top, left, right, file_size,
- locked}).
+ locked, readers}).
-define(BINARY_MODE, [raw, binary]).
-define(READ_MODE, [read]).
@@ -52,3 +52,4 @@
-define(FILE_SUMMARY_ETS_NAME, rabbit_msg_store_file_summary).
-define(CACHE_ETS_NAME, rabbit_msg_store_cache).
+-define(FILE_HANDLES_ETS_NAME, rabbit_msg_store_file_handles).
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 8acd9149aa..3a6450596f 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -33,8 +33,8 @@
-behaviour(gen_server2).
--export([start_link/3, write/2, read/1, contains/1, remove/1, release/1,
- sync/2]).
+-export([start_link/3, write/2, read/2, contains/1, remove/1, release/1,
+ sync/2, client_init/0, client_terminate/1]).
-export([sync/0, gc_done/3]). %% internal
@@ -49,45 +49,62 @@
%%----------------------------------------------------------------------------
+-record(msstate,
+ { dir, %% store directory
+ index_module, %% the module for index ops
+ index_state, %% where are messages?
+ current_file, %% current file name as number
+ current_file_handle, %% current file handle
+ %% since the last fsync?
+ file_handle_cache, %% file handle cache
+ on_sync, %% pending sync requests
+ sync_timer_ref, %% TRef for our interval timer
+ sum_valid_data, %% sum of valid data in all files
+ sum_file_size, %% sum of file sizes
+ pending_gc_completion, %% things to do once GC completes
+ gc_active %% is the GC currently working?
+ }).
+
+-record(client_msstate,
+ { file_handle_cache,
+ index_state,
+ index_module,
+ dir
+ }).
+
+%%----------------------------------------------------------------------------
+
-ifdef(use_specs).
-type(msg_id() :: binary()).
-type(msg() :: any()).
-type(file_path() :: any()).
-type(file_num() :: non_neg_integer()).
+-type(client_msstate() :: #client_msstate { file_handle_cache :: dict(),
+ index_state :: any(),
+ index_module :: atom(),
+ dir :: file_path() }).
-spec(start_link/3 ::
(file_path(),
(fun ((A) -> 'finished' | {msg_id(), non_neg_integer(), A})), A) ->
{'ok', pid()} | 'ignore' | {'error', any()}).
-spec(write/2 :: (msg_id(), msg()) -> 'ok').
--spec(read/1 :: (msg_id()) -> {'ok', msg()} | 'not_found').
+%% -spec(read/1 :: (msg_id()) -> {'ok', msg()} | 'not_found').
+-spec(read/2 :: (msg_id(), client_msstate()) ->
+ {{'ok', msg()} | 'not_found', client_msstate()}).
-spec(contains/1 :: (msg_id()) -> boolean()).
-spec(remove/1 :: ([msg_id()]) -> 'ok').
-spec(release/1 :: ([msg_id()]) -> 'ok').
-spec(sync/2 :: ([msg_id()], fun (() -> any())) -> 'ok').
-spec(gc_done/3 :: (non_neg_integer(), file_num(), file_num()) -> 'ok').
+-spec(client_init/0 :: () -> client_msstate()).
+-spec(client_terminate/1 :: (client_msstate()) -> 'ok').
-endif.
%%----------------------------------------------------------------------------
--record(msstate,
- {dir, %% store directory
- index_module, %% the module for index ops
- index_state, %% where are messages?
- current_file, %% current file name as number
- current_file_handle, %% current file handle
- %% since the last fsync?
- file_handle_cache, %% file handle cache
- on_sync, %% pending sync requests
- sync_timer_ref, %% TRef for our interval timer
- sum_valid_data, %% sum of valid data in all files
- sum_file_size, %% sum of file sizes
- pending_gc_completion, %% things to do once GC completes
- gc_active %% is the GC currently working?
- }).
-
-include("rabbit_msg_store.hrl").
%% We run GC whenever (garbage / sum_file_size) > ?GARBAGE_FRACTION
@@ -221,16 +238,120 @@ start_link(Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) ->
[Dir, MsgRefDeltaGen, MsgRefDeltaGenInit],
[{timeout, infinity}]).
-write(MsgId, Msg) -> gen_server2:cast(?SERVER, {write, MsgId, Msg}).
-read(MsgId) -> gen_server2:call(?SERVER, {read, MsgId}, infinity).
-contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity).
-remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}).
-release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}).
-sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}).
-sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal
+write(MsgId, Msg) -> gen_server2:cast(?SERVER, {write, MsgId, Msg}).
+
+read(MsgId, CState) ->
+ case index_lookup(MsgId, CState) of
+ not_found ->
+ {gen_server2:call(?SERVER, {read, MsgId}, infinity), CState};
+ #msg_location { ref_count = RefCount,
+ file = File,
+ offset = Offset,
+ total_size = TotalSize } ->
+ case fetch_and_increment_cache(MsgId) of
+ not_found ->
+ [#file_summary { locked = Locked, right = Right }] =
+ ets:lookup(?FILE_SUMMARY_ETS_NAME, File),
+ case Right =:= undefined orelse Locked =:= true of
+ true ->
+ {gen_server2:call(?SERVER, {read, MsgId}, infinity),
+ CState};
+ false ->
+ ets:update_counter(?FILE_SUMMARY_ETS_NAME, File,
+ {#file_summary.readers, 1}),
+ %% need to check again to see if we've
+ %% been locked in the meantime
+ [#file_summary { locked = Locked2 }] =
+ ets:lookup(?FILE_SUMMARY_ETS_NAME, File),
+ case Locked2 of
+ true ->
+ {gen_server2:call(?SERVER, {read, MsgId},
+ infinity), CState};
+ false ->
+ %% ok, we're definitely safe to
+ %% continue - a GC can't start up
+ %% now
+ Self = self(),
+ CState1 =
+ case ets:lookup(?FILE_HANDLES_ETS_NAME,
+ {File, self()}) of
+ [{Key, close}] ->
+ CState2 =
+ close_handle(File, CState),
+ true = ets:insert(
+ ?FILE_HANDLES_ETS_NAME,
+ {Key, open}),
+ CState2;
+ [{_Key, open}] ->
+ CState;
+ [] ->
+ true = ets:insert_new(
+ ?FILE_HANDLES_ETS_NAME,
+ {{File, Self}, open}),
+ CState
+ end,
+ {Hdl, CState3} =
+ get_read_handle(File, CState1),
+ {ok, Offset} =
+ file_handle_cache:position(Hdl, Offset),
+ {ok, {MsgId, Msg}} =
+ case rabbit_msg_file:read(Hdl, TotalSize) of
+ {ok, {MsgId, _}} = Obj -> Obj;
+ Rest ->
+ throw({error,
+ {misread,
+ [{old_cstate, CState1},
+ {file_num, File},
+ {offset, Offset},
+ {read, Rest},
+ {proc_dict, get()}
+ ]}})
+ end,
+ ets:update_counter(
+ ?FILE_SUMMARY_ETS_NAME, File,
+ {#file_summary.readers, -1}),
+ ok = case RefCount > 1 of
+ true ->
+ insert_into_cache(MsgId, Msg);
+ false ->
+ %% it's not in the
+ %% cache and we only
+ %% have one reference
+ %% to the message. So
+ %% don't bother
+ %% putting it in the
+ %% cache.
+ ok
+ end,
+ {{ok, Msg}, CState3}
+ end
+ end;
+ Msg ->
+ {{ok, Msg}, CState}
+ end
+ end.
+
+contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity).
+remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}).
+release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}).
+sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}).
+sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal
+
gc_done(Reclaimed, Source, Destination) ->
gen_server2:pcast(?SERVER, 9, {gc_done, Reclaimed, Source, Destination}).
+client_init() ->
+ {IState, IModule, Dir} =
+ gen_server2:call(?SERVER, new_client_state, infinity),
+ #client_msstate { file_handle_cache = dict:new(),
+ index_state = IState,
+ index_module = IModule,
+ dir = Dir }.
+
+client_terminate(CState) ->
+ close_all_handles(CState),
+ ok.
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
@@ -250,6 +371,8 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
[ordered_set, public, named_table,
{keypos, #file_summary.file}]),
?CACHE_ETS_NAME = ets:new(?CACHE_ETS_NAME, [set, public, named_table]),
+ ?FILE_HANDLES_ETS_NAME = ets:new(?FILE_HANDLES_ETS_NAME,
+ [ordered_set, public, named_table]),
State =
#msstate { dir = Dir,
index_module = IndexModule,
@@ -295,7 +418,12 @@ handle_call({read, MsgId}, From, State) ->
handle_call({contains, MsgId}, From, State) ->
State1 = contains_message(MsgId, From, State),
- noreply(State1).
+ noreply(State1);
+
+handle_call(new_client_state, _From,
+ State = #msstate { index_state = IndexState, dir = Dir,
+ index_module = IndexModule }) ->
+ reply({IndexState, IndexModule, Dir}, State).
handle_cast({write, MsgId, Msg},
State = #msstate { current_file_handle = CurHdl,
@@ -414,6 +542,8 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
end,
State3 = close_all_handles(State1),
ets:delete(?FILE_SUMMARY_ETS_NAME),
+ ets:delete(?CACHE_ETS_NAME),
+ ets:delete(?FILE_HANDLES_ETS_NAME),
IndexModule:terminate(IndexState),
State3 #msstate { index_state = undefined,
current_file_handle = undefined }.
@@ -599,33 +729,56 @@ run_pending({contains, MsgId, From}, State) ->
run_pending({remove, MsgId}, State) ->
remove_message(MsgId, State).
+close_handle(Key, CState = #client_msstate { file_handle_cache = FHC }) ->
+ CState #client_msstate { file_handle_cache = close_handle(Key, FHC) };
+
close_handle(Key, State = #msstate { file_handle_cache = FHC }) ->
+ State #msstate { file_handle_cache = close_handle(Key, FHC) };
+
+close_handle(Key, FHC) ->
case dict:find(Key, FHC) of
{ok, Hdl} ->
ok = file_handle_cache:close(Hdl),
- State #msstate { file_handle_cache = dict:erase(Key, FHC) };
- error -> State
+ dict:erase(Key, FHC);
+ error -> FHC
end.
+close_all_handles(CState = #client_msstate { file_handle_cache = FHC }) ->
+ Self = self(),
+ ok = dict:fold(fun (Key, Hdl, ok) ->
+ true =
+ ets:delete(?FILE_HANDLES_ETS_NAME, {Key, Self}),
+ file_handle_cache:close(Hdl)
+ end, ok, FHC),
+ CState #client_msstate { file_handle_cache = dict:new() };
+
close_all_handles(State = #msstate { file_handle_cache = FHC }) ->
ok = dict:fold(fun (_Key, Hdl, ok) ->
file_handle_cache:close(Hdl)
end, ok, FHC),
State #msstate { file_handle_cache = dict:new() }.
-get_read_handle(FileNum, State = #msstate { file_handle_cache = FHC }) ->
+get_read_handle(FileNum, CState = #client_msstate { file_handle_cache = FHC,
+ dir = Dir }) ->
+ {Hdl, FHC2} = get_read_handle(FileNum, FHC, Dir),
+ {Hdl, CState #client_msstate { file_handle_cache = FHC2 }};
+
+get_read_handle(FileNum, State = #msstate { file_handle_cache = FHC,
+ dir = Dir }) ->
+ {Hdl, FHC2} = get_read_handle(FileNum, FHC, Dir),
+ {Hdl, State #msstate { file_handle_cache = FHC2 }}.
+
+get_read_handle(FileNum, FHC, Dir) ->
case dict:find(FileNum, FHC) of
- {ok, Hdl} -> {Hdl, State};
- error -> new_handle(FileNum,
- rabbit_msg_store_misc:filenum_to_name(FileNum),
- [read | ?BINARY_MODE], State)
+ {ok, Hdl} ->
+ {Hdl, FHC};
+ error ->
+ {ok, Hdl} = rabbit_msg_store_misc:open_file(
+ Dir, rabbit_msg_store_misc:filenum_to_name(FileNum),
+ [read | ?BINARY_MODE]),
+ {Hdl, dict:store(FileNum, Hdl, FHC) }
end.
-new_handle(Key, FileName, Mode, State = #msstate { file_handle_cache = FHC,
- dir = Dir }) ->
- {ok, Hdl} = rabbit_msg_store_misc:open_file(Dir, FileName, Mode),
- {Hdl, State #msstate { file_handle_cache = dict:store(Key, Hdl, FHC) }}.
-
%%----------------------------------------------------------------------------
%% message cache helper functions
%%----------------------------------------------------------------------------
@@ -676,6 +829,9 @@ insert_into_cache(MsgId, Msg) ->
%% index
%%----------------------------------------------------------------------------
+index_lookup(Key, #client_msstate { index_module = Index, index_state = State }) ->
+ Index:lookup(Key, State);
+
index_lookup(Key, #msstate { index_module = Index, index_state = State }) ->
Index:lookup(Key, State).
@@ -901,7 +1057,8 @@ build_index(Left, [File|Files],
ets:insert_new(?FILE_SUMMARY_ETS_NAME, #file_summary {
file = File, valid_total_size = ValidTotalSize,
contiguous_top = ContiguousTop, locked = false,
- left = Left, right = Right, file_size = FileSize1 }),
+ left = Left, right = Right, file_size = FileSize1,
+ readers = 0 }),
build_index(File, Files,
State #msstate { sum_valid_data = SumValid + ValidTotalSize,
sum_file_size = SumFileSize + FileSize1 }).
@@ -925,7 +1082,7 @@ maybe_roll_to_new_file(Offset,
?FILE_SUMMARY_ETS_NAME, #file_summary {
file = NextFile, valid_total_size = 0, contiguous_top = 0,
left = CurFile, right = undefined, file_size = 0,
- locked = false }),
+ locked = false, readers = 0 }),
true = ets:update_element(?FILE_SUMMARY_ETS_NAME, CurFile,
{#file_summary.right, NextFile}),
State1 #msstate { current_file_handle = NextHdl,
@@ -948,12 +1105,30 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid,
{#file_summary.locked, true}),
true = ets:update_element(?FILE_SUMMARY_ETS_NAME, Dest,
{#file_summary.locked, true}),
+ %% now that they're locked, we know no queue will touch
+ %% them (not even add to the ets table for these files),
+ %% so now ensure that we ask the queues to close handles
+ %% to these files
+ true = mark_handle_to_close(Source),
+ true = mark_handle_to_close(Dest),
ok = rabbit_msg_store_gc:gc(Source, Dest),
State1 #msstate { gc_active = {Source, Dest} }
end;
maybe_compact(State) ->
State.
+mark_handle_to_close(File) ->
+ lists:foldl(
+ fun ({Key, opened}, true) ->
+ try
+ true = ets:update_element(?FILE_HANDLES_ETS_NAME,
+ Key, {2, close})
+ catch error:badarg -> %% client has deleted concurrently, no prob
+ true
+ end
+ end,
+ true, ets:match_object(?FILE_HANDLES_ETS_NAME, {{File, '_'}, opened})).
+
find_files_to_gc(_N, '$end_of_table') ->
undefined;
find_files_to_gc(N, First) ->
@@ -995,8 +1170,8 @@ delete_file_if_empty(File, State = #msstate { current_file = File }) ->
delete_file_if_empty(File, State =
#msstate { dir = Dir, sum_file_size = SumFileSize }) ->
[#file_summary { valid_total_size = ValidData, file_size = FileSize,
- left = Left, right = Right, locked = false }] =
- ets:lookup(?FILE_SUMMARY_ETS_NAME, File),
+ left = Left, right = Right, locked = false }]
+ = ets:lookup(?FILE_SUMMARY_ETS_NAME, File),
case ValidData of
%% we should NEVER find the current file in here hence right
%% should always be a file, not undefined
@@ -1012,6 +1187,7 @@ delete_file_if_empty(File, State =
true = ets:update_element(?FILE_SUMMARY_ETS_NAME, Left,
{#file_summary.right, Right})
end,
+ true = mark_handle_to_close(File),
true = ets:delete(?FILE_SUMMARY_ETS_NAME, File),
State1 = close_handle(File, State),
ok = file:delete(rabbit_msg_store_misc:form_filename(
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index 1866e6297a..d4c572c108 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -91,23 +91,33 @@ code_change(_OldVsn, State, _Extra) ->
adjust_meta_and_combine(SourceFile, DestFile, State) ->
[SourceObj = #file_summary {
+ readers = SourceReaders,
valid_total_size = SourceValidData, left = DestFile,
file_size = SourceFileSize, locked = true }] =
ets:lookup(?FILE_SUMMARY_ETS_NAME, SourceFile),
[DestObj = #file_summary {
+ readers = DestReaders,
valid_total_size = DestValidData, right = SourceFile,
file_size = DestFileSize, locked = true }] =
ets:lookup(?FILE_SUMMARY_ETS_NAME, DestFile),
- TotalValidData = DestValidData + SourceValidData,
- ok = combine_files(SourceObj, DestObj, State),
- %% don't update dest.right, because it could be changing at the same time
- true =
- ets:update_element(?FILE_SUMMARY_ETS_NAME, DestFile,
- [{#file_summary.valid_total_size, TotalValidData},
- {#file_summary.contiguous_top, TotalValidData},
- {#file_summary.file_size, TotalValidData}]),
- SourceFileSize + DestFileSize - TotalValidData.
+ case SourceReaders =:= 0 andalso DestReaders =:= 0 of
+ true ->
+ TotalValidData = DestValidData + SourceValidData,
+ ok = combine_files(SourceObj, DestObj, State),
+ %% don't update dest.right, because it could be changing
+ %% at the same time
+ true = ets:update_element(
+ ?FILE_SUMMARY_ETS_NAME, DestFile,
+ [{#file_summary.valid_total_size, TotalValidData},
+ {#file_summary.contiguous_top, TotalValidData},
+ {#file_summary.file_size, TotalValidData}]),
+ SourceFileSize + DestFileSize - TotalValidData;
+ false ->
+ io:format("sleeping!~n"),
+ timer:sleep(100),
+ adjust_meta_and_combine(SourceFile, DestFile, State)
+ end.
combine_files(#file_summary { file = Source,
valid_total_size = SourceValid,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 62a4792cd4..856a8c4647 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -911,11 +911,13 @@ msg_store_sync(MsgIds) ->
throw(timeout)
end.
-msg_store_read(MsgIds) ->
- ok =
- lists:foldl(
- fun (MsgId, ok) -> {ok, MsgId} = rabbit_msg_store:read(MsgId), ok end,
- ok, MsgIds).
+msg_store_read(MsgIds, MSCState) ->
+ lists:foldl(
+ fun (MsgId, MSCStateM) ->
+ {{ok, MsgId}, MSCStateN} = rabbit_msg_store:read(MsgId, MSCStateM),
+ MSCStateN
+ end,
+ MSCState, MsgIds).
msg_store_write(MsgIds) ->
ok = lists:foldl(
@@ -966,9 +968,10 @@ test_msg_store() ->
%% should hit a different code path
ok = msg_store_sync(MsgIds1stHalf),
%% read them all
- ok = msg_store_read(MsgIds),
+ MSCState = rabbit_msg_store:client_init(),
+ MSCState1 = msg_store_read(MsgIds, MSCState),
%% read them all again - this will hit the cache, not disk
- ok = msg_store_read(MsgIds),
+ MSCState2 = msg_store_read(MsgIds, MSCState1),
%% remove them all
ok = rabbit_msg_store:remove(MsgIds),
%% check first half doesn't exist
@@ -976,11 +979,12 @@ test_msg_store() ->
%% check second half does exist
true = msg_store_contains(true, MsgIds2ndHalf),
%% read the second half again
- ok = msg_store_read(MsgIds2ndHalf),
+ MSCState3 = msg_store_read(MsgIds2ndHalf, MSCState2),
%% release the second half, just for fun (aka code coverage)
ok = rabbit_msg_store:release(MsgIds2ndHalf),
%% read the second half again, just for fun (aka code coverage)
- ok = msg_store_read(MsgIds2ndHalf),
+ MSCState4 = msg_store_read(MsgIds2ndHalf, MSCState3),
+ ok = rabbit_msg_store:client_terminate(MSCState4),
%% stop and restart, preserving every other msg in 2nd half
ok = stop_msg_store(),
ok = start_msg_store(fun ([]) -> finished;
@@ -1003,19 +1007,28 @@ test_msg_store() ->
%% publish the first half again
ok = msg_store_write(MsgIds1stHalf),
%% this should force some sort of sync internally otherwise misread
- ok = msg_store_read(MsgIds1stHalf),
+ ok = rabbit_msg_store:client_terminate(
+ msg_store_read(MsgIds1stHalf, rabbit_msg_store:client_init())),
ok = rabbit_msg_store:remove(MsgIds1stHalf),
%% restart empty
ok = stop_msg_store(),
ok = start_msg_store_empty(), %% now safe to reuse msg_ids
%% push a lot of msgs in...
BigCount = 100000,
- MsgIdsBig = lists:seq(1, BigCount),
+ MsgIdsBig = [msg_id_bin(X) || X <- lists:seq(1, BigCount)],
Payload = << 0:65536 >>,
ok = lists:foldl(
fun (MsgId, ok) ->
- rabbit_msg_store:write(msg_id_bin(MsgId), Payload)
+ rabbit_msg_store:write(MsgId, Payload)
end, ok, MsgIdsBig),
+ %% now read them to ensure we hit the fast client-side reading
+ ok = rabbit_msg_store:client_terminate(
+ lists:foldl(
+ fun (MsgId, MSCStateM) ->
+ {{ok, Payload}, MSCStateN} =
+ rabbit_msg_store:read(MsgId, MSCStateM),
+ MSCStateN
+ end, rabbit_msg_store:client_init(), MsgIdsBig)),
%% .., then 3s by 1...
ok = lists:foldl(
fun (MsgId, ok) ->
@@ -1034,7 +1047,7 @@ test_msg_store() ->
rabbit_msg_store:remove([msg_id_bin(MsgId)])
end, ok, lists:seq(BigCount-2, 1, -3)),
%% ensure empty
- false = msg_store_contains(false, [msg_id_bin(M) || M <- MsgIdsBig]),
+ false = msg_store_contains(false, MsgIdsBig),
%% restart empty
ok = stop_msg_store(),
ok = start_msg_store_empty(),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index f2d45700f0..7c1ef6875d 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -61,7 +61,8 @@
avg_ingress_rate,
rate_timestamp,
len,
- on_sync
+ on_sync,
+ msg_store_read_state
}).
-include("rabbit.hrl").
@@ -124,7 +125,8 @@
avg_ingress_rate :: float(),
rate_timestamp :: {integer(), integer(), integer()},
len :: non_neg_integer(),
- on_sync :: {[ack()], [msg_id()], [{pid(), any()}]}
+ on_sync :: {[ack()], [msg_id()], [{pid(), any()}]},
+ msg_store_read_state :: any()
}).
-spec(init/1 :: (queue_name()) -> vqstate()).
@@ -198,11 +200,14 @@ init(QueueName) ->
avg_ingress_rate = 0,
rate_timestamp = Now,
len = GammaCount,
- on_sync = {[], [], []}
+ on_sync = {[], [], []},
+ msg_store_read_state = rabbit_msg_store:client_init()
},
maybe_gammas_to_betas(State).
-terminate(State = #vqstate { index_state = IndexState }) ->
+terminate(State = #vqstate { index_state = IndexState,
+ msg_store_read_state = MSCState }) ->
+ rabbit_msg_store:client_terminate(MSCState),
State #vqstate { index_state = rabbit_queue_index:terminate(IndexState) }.
publish(Msg, State) ->
@@ -618,7 +623,8 @@ remove_queue_entries(Q, IndexState) ->
fetch_from_q3_or_gamma(State = #vqstate {
q1 = Q1, q2 = Q2, gamma = #gamma { count = GammaCount },
- q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount }) ->
+ q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount,
+ msg_store_read_state = MSCState }) ->
case queue:out(Q3) of
{empty, _Q3} ->
0 = GammaCount, %% ASSERTION
@@ -629,15 +635,16 @@ fetch_from_q3_or_gamma(State = #vqstate {
#beta { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered,
is_persistent = IsPersistent, index_on_disk = IndexOnDisk }},
Q3a} ->
- {ok, Msg = #basic_message { is_persistent = IsPersistent,
- guid = MsgId }} =
- rabbit_msg_store:read(MsgId),
+ {{ok, Msg = #basic_message { is_persistent = IsPersistent,
+ guid = MsgId }}, MSCState1} =
+ rabbit_msg_store:read(MsgId, MSCState),
Q4a = queue:in(
#alpha { msg = Msg, seq_id = SeqId,
is_delivered = IsDelivered, msg_on_disk = true,
index_on_disk = IndexOnDisk }, Q4),
State1 = State #vqstate { q3 = Q3a, q4 = Q4a,
- ram_msg_count = RamMsgCount + 1 },
+ ram_msg_count = RamMsgCount + 1,
+ msg_store_read_state = MSCState1 },
State2 =
case {queue:is_empty(Q3a), 0 == GammaCount} of
{true, true} ->