diff options
| -rw-r--r-- | include/rabbit_msg_store.hrl | 3 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 262 | ||||
| -rw-r--r-- | src/rabbit_msg_store_gc.erl | 28 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 39 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 25 |
5 files changed, 282 insertions, 75 deletions
diff --git a/include/rabbit_msg_store.hrl b/include/rabbit_msg_store.hrl index 0e9a0408cb..a094454a78 100644 --- a/include/rabbit_msg_store.hrl +++ b/include/rabbit_msg_store.hrl @@ -34,7 +34,7 @@ -record(file_summary, {file, valid_total_size, contiguous_top, left, right, file_size, - locked}). + locked, readers}). -define(BINARY_MODE, [raw, binary]). -define(READ_MODE, [read]). @@ -52,3 +52,4 @@ -define(FILE_SUMMARY_ETS_NAME, rabbit_msg_store_file_summary). -define(CACHE_ETS_NAME, rabbit_msg_store_cache). +-define(FILE_HANDLES_ETS_NAME, rabbit_msg_store_file_handles). diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 8acd9149aa..3a6450596f 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -33,8 +33,8 @@ -behaviour(gen_server2). --export([start_link/3, write/2, read/1, contains/1, remove/1, release/1, - sync/2]). +-export([start_link/3, write/2, read/2, contains/1, remove/1, release/1, + sync/2, client_init/0, client_terminate/1]). -export([sync/0, gc_done/3]). %% internal @@ -49,45 +49,62 @@ %%---------------------------------------------------------------------------- +-record(msstate, + { dir, %% store directory + index_module, %% the module for index ops + index_state, %% where are messages? + current_file, %% current file name as number + current_file_handle, %% current file handle + %% since the last fsync? + file_handle_cache, %% file handle cache + on_sync, %% pending sync requests + sync_timer_ref, %% TRef for our interval timer + sum_valid_data, %% sum of valid data in all files + sum_file_size, %% sum of file sizes + pending_gc_completion, %% things to do once GC completes + gc_active %% is the GC currently working? + }). + +-record(client_msstate, + { file_handle_cache, + index_state, + index_module, + dir + }). + +%%---------------------------------------------------------------------------- + -ifdef(use_specs). -type(msg_id() :: binary()). -type(msg() :: any()). -type(file_path() :: any()). -type(file_num() :: non_neg_integer()). +-type(client_msstate() :: #client_msstate { file_handle_cache :: dict(), + index_state :: any(), + index_module :: atom(), + dir :: file_path() }). -spec(start_link/3 :: (file_path(), (fun ((A) -> 'finished' | {msg_id(), non_neg_integer(), A})), A) -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(write/2 :: (msg_id(), msg()) -> 'ok'). --spec(read/1 :: (msg_id()) -> {'ok', msg()} | 'not_found'). +%% -spec(read/1 :: (msg_id()) -> {'ok', msg()} | 'not_found'). +-spec(read/2 :: (msg_id(), client_msstate()) -> + {{'ok', msg()} | 'not_found', client_msstate()}). -spec(contains/1 :: (msg_id()) -> boolean()). -spec(remove/1 :: ([msg_id()]) -> 'ok'). -spec(release/1 :: ([msg_id()]) -> 'ok'). -spec(sync/2 :: ([msg_id()], fun (() -> any())) -> 'ok'). -spec(gc_done/3 :: (non_neg_integer(), file_num(), file_num()) -> 'ok'). +-spec(client_init/0 :: () -> client_msstate()). +-spec(client_terminate/1 :: (client_msstate()) -> 'ok'). -endif. %%---------------------------------------------------------------------------- --record(msstate, - {dir, %% store directory - index_module, %% the module for index ops - index_state, %% where are messages? - current_file, %% current file name as number - current_file_handle, %% current file handle - %% since the last fsync? - file_handle_cache, %% file handle cache - on_sync, %% pending sync requests - sync_timer_ref, %% TRef for our interval timer - sum_valid_data, %% sum of valid data in all files - sum_file_size, %% sum of file sizes - pending_gc_completion, %% things to do once GC completes - gc_active %% is the GC currently working? - }). - -include("rabbit_msg_store.hrl"). %% We run GC whenever (garbage / sum_file_size) > ?GARBAGE_FRACTION @@ -221,16 +238,120 @@ start_link(Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) -> [Dir, MsgRefDeltaGen, MsgRefDeltaGenInit], [{timeout, infinity}]). -write(MsgId, Msg) -> gen_server2:cast(?SERVER, {write, MsgId, Msg}). -read(MsgId) -> gen_server2:call(?SERVER, {read, MsgId}, infinity). -contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity). -remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}). -release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}). -sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}). -sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal +write(MsgId, Msg) -> gen_server2:cast(?SERVER, {write, MsgId, Msg}). + +read(MsgId, CState) -> + case index_lookup(MsgId, CState) of + not_found -> + {gen_server2:call(?SERVER, {read, MsgId}, infinity), CState}; + #msg_location { ref_count = RefCount, + file = File, + offset = Offset, + total_size = TotalSize } -> + case fetch_and_increment_cache(MsgId) of + not_found -> + [#file_summary { locked = Locked, right = Right }] = + ets:lookup(?FILE_SUMMARY_ETS_NAME, File), + case Right =:= undefined orelse Locked =:= true of + true -> + {gen_server2:call(?SERVER, {read, MsgId}, infinity), + CState}; + false -> + ets:update_counter(?FILE_SUMMARY_ETS_NAME, File, + {#file_summary.readers, 1}), + %% need to check again to see if we've + %% been locked in the meantime + [#file_summary { locked = Locked2 }] = + ets:lookup(?FILE_SUMMARY_ETS_NAME, File), + case Locked2 of + true -> + {gen_server2:call(?SERVER, {read, MsgId}, + infinity), CState}; + false -> + %% ok, we're definitely safe to + %% continue - a GC can't start up + %% now + Self = self(), + CState1 = + case ets:lookup(?FILE_HANDLES_ETS_NAME, + {File, self()}) of + [{Key, close}] -> + CState2 = + close_handle(File, CState), + true = ets:insert( + ?FILE_HANDLES_ETS_NAME, + {Key, open}), + CState2; + [{_Key, open}] -> + CState; + [] -> + true = ets:insert_new( + ?FILE_HANDLES_ETS_NAME, + {{File, Self}, open}), + CState + end, + {Hdl, CState3} = + get_read_handle(File, CState1), + {ok, Offset} = + file_handle_cache:position(Hdl, Offset), + {ok, {MsgId, Msg}} = + case rabbit_msg_file:read(Hdl, TotalSize) of + {ok, {MsgId, _}} = Obj -> Obj; + Rest -> + throw({error, + {misread, + [{old_cstate, CState1}, + {file_num, File}, + {offset, Offset}, + {read, Rest}, + {proc_dict, get()} + ]}}) + end, + ets:update_counter( + ?FILE_SUMMARY_ETS_NAME, File, + {#file_summary.readers, -1}), + ok = case RefCount > 1 of + true -> + insert_into_cache(MsgId, Msg); + false -> + %% it's not in the + %% cache and we only + %% have one reference + %% to the message. So + %% don't bother + %% putting it in the + %% cache. + ok + end, + {{ok, Msg}, CState3} + end + end; + Msg -> + {{ok, Msg}, CState} + end + end. + +contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity). +remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}). +release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}). +sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}). +sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal + gc_done(Reclaimed, Source, Destination) -> gen_server2:pcast(?SERVER, 9, {gc_done, Reclaimed, Source, Destination}). +client_init() -> + {IState, IModule, Dir} = + gen_server2:call(?SERVER, new_client_state, infinity), + #client_msstate { file_handle_cache = dict:new(), + index_state = IState, + index_module = IModule, + dir = Dir }. + +client_terminate(CState) -> + close_all_handles(CState), + ok. + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -250,6 +371,8 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> [ordered_set, public, named_table, {keypos, #file_summary.file}]), ?CACHE_ETS_NAME = ets:new(?CACHE_ETS_NAME, [set, public, named_table]), + ?FILE_HANDLES_ETS_NAME = ets:new(?FILE_HANDLES_ETS_NAME, + [ordered_set, public, named_table]), State = #msstate { dir = Dir, index_module = IndexModule, @@ -295,7 +418,12 @@ handle_call({read, MsgId}, From, State) -> handle_call({contains, MsgId}, From, State) -> State1 = contains_message(MsgId, From, State), - noreply(State1). + noreply(State1); + +handle_call(new_client_state, _From, + State = #msstate { index_state = IndexState, dir = Dir, + index_module = IndexModule }) -> + reply({IndexState, IndexModule, Dir}, State). handle_cast({write, MsgId, Msg}, State = #msstate { current_file_handle = CurHdl, @@ -414,6 +542,8 @@ terminate(_Reason, State = #msstate { index_state = IndexState, end, State3 = close_all_handles(State1), ets:delete(?FILE_SUMMARY_ETS_NAME), + ets:delete(?CACHE_ETS_NAME), + ets:delete(?FILE_HANDLES_ETS_NAME), IndexModule:terminate(IndexState), State3 #msstate { index_state = undefined, current_file_handle = undefined }. @@ -599,33 +729,56 @@ run_pending({contains, MsgId, From}, State) -> run_pending({remove, MsgId}, State) -> remove_message(MsgId, State). +close_handle(Key, CState = #client_msstate { file_handle_cache = FHC }) -> + CState #client_msstate { file_handle_cache = close_handle(Key, FHC) }; + close_handle(Key, State = #msstate { file_handle_cache = FHC }) -> + State #msstate { file_handle_cache = close_handle(Key, FHC) }; + +close_handle(Key, FHC) -> case dict:find(Key, FHC) of {ok, Hdl} -> ok = file_handle_cache:close(Hdl), - State #msstate { file_handle_cache = dict:erase(Key, FHC) }; - error -> State + dict:erase(Key, FHC); + error -> FHC end. +close_all_handles(CState = #client_msstate { file_handle_cache = FHC }) -> + Self = self(), + ok = dict:fold(fun (Key, Hdl, ok) -> + true = + ets:delete(?FILE_HANDLES_ETS_NAME, {Key, Self}), + file_handle_cache:close(Hdl) + end, ok, FHC), + CState #client_msstate { file_handle_cache = dict:new() }; + close_all_handles(State = #msstate { file_handle_cache = FHC }) -> ok = dict:fold(fun (_Key, Hdl, ok) -> file_handle_cache:close(Hdl) end, ok, FHC), State #msstate { file_handle_cache = dict:new() }. -get_read_handle(FileNum, State = #msstate { file_handle_cache = FHC }) -> +get_read_handle(FileNum, CState = #client_msstate { file_handle_cache = FHC, + dir = Dir }) -> + {Hdl, FHC2} = get_read_handle(FileNum, FHC, Dir), + {Hdl, CState #client_msstate { file_handle_cache = FHC2 }}; + +get_read_handle(FileNum, State = #msstate { file_handle_cache = FHC, + dir = Dir }) -> + {Hdl, FHC2} = get_read_handle(FileNum, FHC, Dir), + {Hdl, State #msstate { file_handle_cache = FHC2 }}. + +get_read_handle(FileNum, FHC, Dir) -> case dict:find(FileNum, FHC) of - {ok, Hdl} -> {Hdl, State}; - error -> new_handle(FileNum, - rabbit_msg_store_misc:filenum_to_name(FileNum), - [read | ?BINARY_MODE], State) + {ok, Hdl} -> + {Hdl, FHC}; + error -> + {ok, Hdl} = rabbit_msg_store_misc:open_file( + Dir, rabbit_msg_store_misc:filenum_to_name(FileNum), + [read | ?BINARY_MODE]), + {Hdl, dict:store(FileNum, Hdl, FHC) } end. -new_handle(Key, FileName, Mode, State = #msstate { file_handle_cache = FHC, - dir = Dir }) -> - {ok, Hdl} = rabbit_msg_store_misc:open_file(Dir, FileName, Mode), - {Hdl, State #msstate { file_handle_cache = dict:store(Key, Hdl, FHC) }}. - %%---------------------------------------------------------------------------- %% message cache helper functions %%---------------------------------------------------------------------------- @@ -676,6 +829,9 @@ insert_into_cache(MsgId, Msg) -> %% index %%---------------------------------------------------------------------------- +index_lookup(Key, #client_msstate { index_module = Index, index_state = State }) -> + Index:lookup(Key, State); + index_lookup(Key, #msstate { index_module = Index, index_state = State }) -> Index:lookup(Key, State). @@ -901,7 +1057,8 @@ build_index(Left, [File|Files], ets:insert_new(?FILE_SUMMARY_ETS_NAME, #file_summary { file = File, valid_total_size = ValidTotalSize, contiguous_top = ContiguousTop, locked = false, - left = Left, right = Right, file_size = FileSize1 }), + left = Left, right = Right, file_size = FileSize1, + readers = 0 }), build_index(File, Files, State #msstate { sum_valid_data = SumValid + ValidTotalSize, sum_file_size = SumFileSize + FileSize1 }). @@ -925,7 +1082,7 @@ maybe_roll_to_new_file(Offset, ?FILE_SUMMARY_ETS_NAME, #file_summary { file = NextFile, valid_total_size = 0, contiguous_top = 0, left = CurFile, right = undefined, file_size = 0, - locked = false }), + locked = false, readers = 0 }), true = ets:update_element(?FILE_SUMMARY_ETS_NAME, CurFile, {#file_summary.right, NextFile}), State1 #msstate { current_file_handle = NextHdl, @@ -948,12 +1105,30 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid, {#file_summary.locked, true}), true = ets:update_element(?FILE_SUMMARY_ETS_NAME, Dest, {#file_summary.locked, true}), + %% now that they're locked, we know no queue will touch + %% them (not even add to the ets table for these files), + %% so now ensure that we ask the queues to close handles + %% to these files + true = mark_handle_to_close(Source), + true = mark_handle_to_close(Dest), ok = rabbit_msg_store_gc:gc(Source, Dest), State1 #msstate { gc_active = {Source, Dest} } end; maybe_compact(State) -> State. +mark_handle_to_close(File) -> + lists:foldl( + fun ({Key, opened}, true) -> + try + true = ets:update_element(?FILE_HANDLES_ETS_NAME, + Key, {2, close}) + catch error:badarg -> %% client has deleted concurrently, no prob + true + end + end, + true, ets:match_object(?FILE_HANDLES_ETS_NAME, {{File, '_'}, opened})). + find_files_to_gc(_N, '$end_of_table') -> undefined; find_files_to_gc(N, First) -> @@ -995,8 +1170,8 @@ delete_file_if_empty(File, State = #msstate { current_file = File }) -> delete_file_if_empty(File, State = #msstate { dir = Dir, sum_file_size = SumFileSize }) -> [#file_summary { valid_total_size = ValidData, file_size = FileSize, - left = Left, right = Right, locked = false }] = - ets:lookup(?FILE_SUMMARY_ETS_NAME, File), + left = Left, right = Right, locked = false }] + = ets:lookup(?FILE_SUMMARY_ETS_NAME, File), case ValidData of %% we should NEVER find the current file in here hence right %% should always be a file, not undefined @@ -1012,6 +1187,7 @@ delete_file_if_empty(File, State = true = ets:update_element(?FILE_SUMMARY_ETS_NAME, Left, {#file_summary.right, Right}) end, + true = mark_handle_to_close(File), true = ets:delete(?FILE_SUMMARY_ETS_NAME, File), State1 = close_handle(File, State), ok = file:delete(rabbit_msg_store_misc:form_filename( diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index 1866e6297a..d4c572c108 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -91,23 +91,33 @@ code_change(_OldVsn, State, _Extra) -> adjust_meta_and_combine(SourceFile, DestFile, State) -> [SourceObj = #file_summary { + readers = SourceReaders, valid_total_size = SourceValidData, left = DestFile, file_size = SourceFileSize, locked = true }] = ets:lookup(?FILE_SUMMARY_ETS_NAME, SourceFile), [DestObj = #file_summary { + readers = DestReaders, valid_total_size = DestValidData, right = SourceFile, file_size = DestFileSize, locked = true }] = ets:lookup(?FILE_SUMMARY_ETS_NAME, DestFile), - TotalValidData = DestValidData + SourceValidData, - ok = combine_files(SourceObj, DestObj, State), - %% don't update dest.right, because it could be changing at the same time - true = - ets:update_element(?FILE_SUMMARY_ETS_NAME, DestFile, - [{#file_summary.valid_total_size, TotalValidData}, - {#file_summary.contiguous_top, TotalValidData}, - {#file_summary.file_size, TotalValidData}]), - SourceFileSize + DestFileSize - TotalValidData. + case SourceReaders =:= 0 andalso DestReaders =:= 0 of + true -> + TotalValidData = DestValidData + SourceValidData, + ok = combine_files(SourceObj, DestObj, State), + %% don't update dest.right, because it could be changing + %% at the same time + true = ets:update_element( + ?FILE_SUMMARY_ETS_NAME, DestFile, + [{#file_summary.valid_total_size, TotalValidData}, + {#file_summary.contiguous_top, TotalValidData}, + {#file_summary.file_size, TotalValidData}]), + SourceFileSize + DestFileSize - TotalValidData; + false -> + io:format("sleeping!~n"), + timer:sleep(100), + adjust_meta_and_combine(SourceFile, DestFile, State) + end. combine_files(#file_summary { file = Source, valid_total_size = SourceValid, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 62a4792cd4..856a8c4647 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -911,11 +911,13 @@ msg_store_sync(MsgIds) -> throw(timeout) end. -msg_store_read(MsgIds) -> - ok = - lists:foldl( - fun (MsgId, ok) -> {ok, MsgId} = rabbit_msg_store:read(MsgId), ok end, - ok, MsgIds). +msg_store_read(MsgIds, MSCState) -> + lists:foldl( + fun (MsgId, MSCStateM) -> + {{ok, MsgId}, MSCStateN} = rabbit_msg_store:read(MsgId, MSCStateM), + MSCStateN + end, + MSCState, MsgIds). msg_store_write(MsgIds) -> ok = lists:foldl( @@ -966,9 +968,10 @@ test_msg_store() -> %% should hit a different code path ok = msg_store_sync(MsgIds1stHalf), %% read them all - ok = msg_store_read(MsgIds), + MSCState = rabbit_msg_store:client_init(), + MSCState1 = msg_store_read(MsgIds, MSCState), %% read them all again - this will hit the cache, not disk - ok = msg_store_read(MsgIds), + MSCState2 = msg_store_read(MsgIds, MSCState1), %% remove them all ok = rabbit_msg_store:remove(MsgIds), %% check first half doesn't exist @@ -976,11 +979,12 @@ test_msg_store() -> %% check second half does exist true = msg_store_contains(true, MsgIds2ndHalf), %% read the second half again - ok = msg_store_read(MsgIds2ndHalf), + MSCState3 = msg_store_read(MsgIds2ndHalf, MSCState2), %% release the second half, just for fun (aka code coverage) ok = rabbit_msg_store:release(MsgIds2ndHalf), %% read the second half again, just for fun (aka code coverage) - ok = msg_store_read(MsgIds2ndHalf), + MSCState4 = msg_store_read(MsgIds2ndHalf, MSCState3), + ok = rabbit_msg_store:client_terminate(MSCState4), %% stop and restart, preserving every other msg in 2nd half ok = stop_msg_store(), ok = start_msg_store(fun ([]) -> finished; @@ -1003,19 +1007,28 @@ test_msg_store() -> %% publish the first half again ok = msg_store_write(MsgIds1stHalf), %% this should force some sort of sync internally otherwise misread - ok = msg_store_read(MsgIds1stHalf), + ok = rabbit_msg_store:client_terminate( + msg_store_read(MsgIds1stHalf, rabbit_msg_store:client_init())), ok = rabbit_msg_store:remove(MsgIds1stHalf), %% restart empty ok = stop_msg_store(), ok = start_msg_store_empty(), %% now safe to reuse msg_ids %% push a lot of msgs in... BigCount = 100000, - MsgIdsBig = lists:seq(1, BigCount), + MsgIdsBig = [msg_id_bin(X) || X <- lists:seq(1, BigCount)], Payload = << 0:65536 >>, ok = lists:foldl( fun (MsgId, ok) -> - rabbit_msg_store:write(msg_id_bin(MsgId), Payload) + rabbit_msg_store:write(MsgId, Payload) end, ok, MsgIdsBig), + %% now read them to ensure we hit the fast client-side reading + ok = rabbit_msg_store:client_terminate( + lists:foldl( + fun (MsgId, MSCStateM) -> + {{ok, Payload}, MSCStateN} = + rabbit_msg_store:read(MsgId, MSCStateM), + MSCStateN + end, rabbit_msg_store:client_init(), MsgIdsBig)), %% .., then 3s by 1... ok = lists:foldl( fun (MsgId, ok) -> @@ -1034,7 +1047,7 @@ test_msg_store() -> rabbit_msg_store:remove([msg_id_bin(MsgId)]) end, ok, lists:seq(BigCount-2, 1, -3)), %% ensure empty - false = msg_store_contains(false, [msg_id_bin(M) || M <- MsgIdsBig]), + false = msg_store_contains(false, MsgIdsBig), %% restart empty ok = stop_msg_store(), ok = start_msg_store_empty(), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f2d45700f0..7c1ef6875d 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -61,7 +61,8 @@ avg_ingress_rate, rate_timestamp, len, - on_sync + on_sync, + msg_store_read_state }). -include("rabbit.hrl"). @@ -124,7 +125,8 @@ avg_ingress_rate :: float(), rate_timestamp :: {integer(), integer(), integer()}, len :: non_neg_integer(), - on_sync :: {[ack()], [msg_id()], [{pid(), any()}]} + on_sync :: {[ack()], [msg_id()], [{pid(), any()}]}, + msg_store_read_state :: any() }). -spec(init/1 :: (queue_name()) -> vqstate()). @@ -198,11 +200,14 @@ init(QueueName) -> avg_ingress_rate = 0, rate_timestamp = Now, len = GammaCount, - on_sync = {[], [], []} + on_sync = {[], [], []}, + msg_store_read_state = rabbit_msg_store:client_init() }, maybe_gammas_to_betas(State). -terminate(State = #vqstate { index_state = IndexState }) -> +terminate(State = #vqstate { index_state = IndexState, + msg_store_read_state = MSCState }) -> + rabbit_msg_store:client_terminate(MSCState), State #vqstate { index_state = rabbit_queue_index:terminate(IndexState) }. publish(Msg, State) -> @@ -618,7 +623,8 @@ remove_queue_entries(Q, IndexState) -> fetch_from_q3_or_gamma(State = #vqstate { q1 = Q1, q2 = Q2, gamma = #gamma { count = GammaCount }, - q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount }) -> + q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount, + msg_store_read_state = MSCState }) -> case queue:out(Q3) of {empty, _Q3} -> 0 = GammaCount, %% ASSERTION @@ -629,15 +635,16 @@ fetch_from_q3_or_gamma(State = #vqstate { #beta { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered, is_persistent = IsPersistent, index_on_disk = IndexOnDisk }}, Q3a} -> - {ok, Msg = #basic_message { is_persistent = IsPersistent, - guid = MsgId }} = - rabbit_msg_store:read(MsgId), + {{ok, Msg = #basic_message { is_persistent = IsPersistent, + guid = MsgId }}, MSCState1} = + rabbit_msg_store:read(MsgId, MSCState), Q4a = queue:in( #alpha { msg = Msg, seq_id = SeqId, is_delivered = IsDelivered, msg_on_disk = true, index_on_disk = IndexOnDisk }, Q4), State1 = State #vqstate { q3 = Q3a, q4 = Q4a, - ram_msg_count = RamMsgCount + 1 }, + ram_msg_count = RamMsgCount + 1, + msg_store_read_state = MSCState1 }, State2 = case {queue:is_empty(Q3a), 0 == GammaCount} of {true, true} -> |
