diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-01-08 16:58:54 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-01-08 16:58:54 +0000 |
| commit | 003bcc5a106b5de17318c8a475ce0c9837b8923e (patch) | |
| tree | 4b756ce9ab8bee750d7bcef8db0daa3d3a9830f6 /src | |
| parent | 3274299179812bfc5be48886ecee4d29d02d22c4 (diff) | |
| download | rabbitmq-server-git-003bcc5a106b5de17318c8a475ce0c9837b8923e.tar.gz | |
Msg_store now supports concurrent reads when it is safe, directly from the queue. This means that even if the msg_store process is flooded with writes or acks, it won't (necessarily) block queues. This is extremely useful and has substantial benefit when memory has been exhausted and the queue is operating off γ only (effectively, no message content held in ram).
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_msg_store.erl | 262 | ||||
| -rw-r--r-- | src/rabbit_msg_store_gc.erl | 28 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 39 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 25 |
4 files changed, 280 insertions, 74 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 8acd9149aa..3a6450596f 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -33,8 +33,8 @@ -behaviour(gen_server2). --export([start_link/3, write/2, read/1, contains/1, remove/1, release/1, - sync/2]). +-export([start_link/3, write/2, read/2, contains/1, remove/1, release/1, + sync/2, client_init/0, client_terminate/1]). -export([sync/0, gc_done/3]). %% internal @@ -49,45 +49,62 @@ %%---------------------------------------------------------------------------- +-record(msstate, + { dir, %% store directory + index_module, %% the module for index ops + index_state, %% where are messages? + current_file, %% current file name as number + current_file_handle, %% current file handle + %% since the last fsync? + file_handle_cache, %% file handle cache + on_sync, %% pending sync requests + sync_timer_ref, %% TRef for our interval timer + sum_valid_data, %% sum of valid data in all files + sum_file_size, %% sum of file sizes + pending_gc_completion, %% things to do once GC completes + gc_active %% is the GC currently working? + }). + +-record(client_msstate, + { file_handle_cache, + index_state, + index_module, + dir + }). + +%%---------------------------------------------------------------------------- + -ifdef(use_specs). -type(msg_id() :: binary()). -type(msg() :: any()). -type(file_path() :: any()). -type(file_num() :: non_neg_integer()). +-type(client_msstate() :: #client_msstate { file_handle_cache :: dict(), + index_state :: any(), + index_module :: atom(), + dir :: file_path() }). -spec(start_link/3 :: (file_path(), (fun ((A) -> 'finished' | {msg_id(), non_neg_integer(), A})), A) -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(write/2 :: (msg_id(), msg()) -> 'ok'). --spec(read/1 :: (msg_id()) -> {'ok', msg()} | 'not_found'). +%% -spec(read/1 :: (msg_id()) -> {'ok', msg()} | 'not_found'). +-spec(read/2 :: (msg_id(), client_msstate()) -> + {{'ok', msg()} | 'not_found', client_msstate()}). -spec(contains/1 :: (msg_id()) -> boolean()). -spec(remove/1 :: ([msg_id()]) -> 'ok'). -spec(release/1 :: ([msg_id()]) -> 'ok'). -spec(sync/2 :: ([msg_id()], fun (() -> any())) -> 'ok'). -spec(gc_done/3 :: (non_neg_integer(), file_num(), file_num()) -> 'ok'). +-spec(client_init/0 :: () -> client_msstate()). +-spec(client_terminate/1 :: (client_msstate()) -> 'ok'). -endif. %%---------------------------------------------------------------------------- --record(msstate, - {dir, %% store directory - index_module, %% the module for index ops - index_state, %% where are messages? - current_file, %% current file name as number - current_file_handle, %% current file handle - %% since the last fsync? - file_handle_cache, %% file handle cache - on_sync, %% pending sync requests - sync_timer_ref, %% TRef for our interval timer - sum_valid_data, %% sum of valid data in all files - sum_file_size, %% sum of file sizes - pending_gc_completion, %% things to do once GC completes - gc_active %% is the GC currently working? - }). - -include("rabbit_msg_store.hrl"). %% We run GC whenever (garbage / sum_file_size) > ?GARBAGE_FRACTION @@ -221,16 +238,120 @@ start_link(Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) -> [Dir, MsgRefDeltaGen, MsgRefDeltaGenInit], [{timeout, infinity}]). -write(MsgId, Msg) -> gen_server2:cast(?SERVER, {write, MsgId, Msg}). -read(MsgId) -> gen_server2:call(?SERVER, {read, MsgId}, infinity). -contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity). -remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}). -release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}). -sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}). -sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal +write(MsgId, Msg) -> gen_server2:cast(?SERVER, {write, MsgId, Msg}). + +read(MsgId, CState) -> + case index_lookup(MsgId, CState) of + not_found -> + {gen_server2:call(?SERVER, {read, MsgId}, infinity), CState}; + #msg_location { ref_count = RefCount, + file = File, + offset = Offset, + total_size = TotalSize } -> + case fetch_and_increment_cache(MsgId) of + not_found -> + [#file_summary { locked = Locked, right = Right }] = + ets:lookup(?FILE_SUMMARY_ETS_NAME, File), + case Right =:= undefined orelse Locked =:= true of + true -> + {gen_server2:call(?SERVER, {read, MsgId}, infinity), + CState}; + false -> + ets:update_counter(?FILE_SUMMARY_ETS_NAME, File, + {#file_summary.readers, 1}), + %% need to check again to see if we've + %% been locked in the meantime + [#file_summary { locked = Locked2 }] = + ets:lookup(?FILE_SUMMARY_ETS_NAME, File), + case Locked2 of + true -> + {gen_server2:call(?SERVER, {read, MsgId}, + infinity), CState}; + false -> + %% ok, we're definitely safe to + %% continue - a GC can't start up + %% now + Self = self(), + CState1 = + case ets:lookup(?FILE_HANDLES_ETS_NAME, + {File, self()}) of + [{Key, close}] -> + CState2 = + close_handle(File, CState), + true = ets:insert( + ?FILE_HANDLES_ETS_NAME, + {Key, open}), + CState2; + [{_Key, open}] -> + CState; + [] -> + true = ets:insert_new( + ?FILE_HANDLES_ETS_NAME, + {{File, Self}, open}), + CState + end, + {Hdl, CState3} = + get_read_handle(File, CState1), + {ok, Offset} = + file_handle_cache:position(Hdl, Offset), + {ok, {MsgId, Msg}} = + case rabbit_msg_file:read(Hdl, TotalSize) of + {ok, {MsgId, _}} = Obj -> Obj; + Rest -> + throw({error, + {misread, + [{old_cstate, CState1}, + {file_num, File}, + {offset, Offset}, + {read, Rest}, + {proc_dict, get()} + ]}}) + end, + ets:update_counter( + ?FILE_SUMMARY_ETS_NAME, File, + {#file_summary.readers, -1}), + ok = case RefCount > 1 of + true -> + insert_into_cache(MsgId, Msg); + false -> + %% it's not in the + %% cache and we only + %% have one reference + %% to the message. So + %% don't bother + %% putting it in the + %% cache. + ok + end, + {{ok, Msg}, CState3} + end + end; + Msg -> + {{ok, Msg}, CState} + end + end. + +contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity). +remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}). +release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}). +sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}). +sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal + gc_done(Reclaimed, Source, Destination) -> gen_server2:pcast(?SERVER, 9, {gc_done, Reclaimed, Source, Destination}). +client_init() -> + {IState, IModule, Dir} = + gen_server2:call(?SERVER, new_client_state, infinity), + #client_msstate { file_handle_cache = dict:new(), + index_state = IState, + index_module = IModule, + dir = Dir }. + +client_terminate(CState) -> + close_all_handles(CState), + ok. + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -250,6 +371,8 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> [ordered_set, public, named_table, {keypos, #file_summary.file}]), ?CACHE_ETS_NAME = ets:new(?CACHE_ETS_NAME, [set, public, named_table]), + ?FILE_HANDLES_ETS_NAME = ets:new(?FILE_HANDLES_ETS_NAME, + [ordered_set, public, named_table]), State = #msstate { dir = Dir, index_module = IndexModule, @@ -295,7 +418,12 @@ handle_call({read, MsgId}, From, State) -> handle_call({contains, MsgId}, From, State) -> State1 = contains_message(MsgId, From, State), - noreply(State1). + noreply(State1); + +handle_call(new_client_state, _From, + State = #msstate { index_state = IndexState, dir = Dir, + index_module = IndexModule }) -> + reply({IndexState, IndexModule, Dir}, State). handle_cast({write, MsgId, Msg}, State = #msstate { current_file_handle = CurHdl, @@ -414,6 +542,8 @@ terminate(_Reason, State = #msstate { index_state = IndexState, end, State3 = close_all_handles(State1), ets:delete(?FILE_SUMMARY_ETS_NAME), + ets:delete(?CACHE_ETS_NAME), + ets:delete(?FILE_HANDLES_ETS_NAME), IndexModule:terminate(IndexState), State3 #msstate { index_state = undefined, current_file_handle = undefined }. @@ -599,33 +729,56 @@ run_pending({contains, MsgId, From}, State) -> run_pending({remove, MsgId}, State) -> remove_message(MsgId, State). +close_handle(Key, CState = #client_msstate { file_handle_cache = FHC }) -> + CState #client_msstate { file_handle_cache = close_handle(Key, FHC) }; + close_handle(Key, State = #msstate { file_handle_cache = FHC }) -> + State #msstate { file_handle_cache = close_handle(Key, FHC) }; + +close_handle(Key, FHC) -> case dict:find(Key, FHC) of {ok, Hdl} -> ok = file_handle_cache:close(Hdl), - State #msstate { file_handle_cache = dict:erase(Key, FHC) }; - error -> State + dict:erase(Key, FHC); + error -> FHC end. +close_all_handles(CState = #client_msstate { file_handle_cache = FHC }) -> + Self = self(), + ok = dict:fold(fun (Key, Hdl, ok) -> + true = + ets:delete(?FILE_HANDLES_ETS_NAME, {Key, Self}), + file_handle_cache:close(Hdl) + end, ok, FHC), + CState #client_msstate { file_handle_cache = dict:new() }; + close_all_handles(State = #msstate { file_handle_cache = FHC }) -> ok = dict:fold(fun (_Key, Hdl, ok) -> file_handle_cache:close(Hdl) end, ok, FHC), State #msstate { file_handle_cache = dict:new() }. -get_read_handle(FileNum, State = #msstate { file_handle_cache = FHC }) -> +get_read_handle(FileNum, CState = #client_msstate { file_handle_cache = FHC, + dir = Dir }) -> + {Hdl, FHC2} = get_read_handle(FileNum, FHC, Dir), + {Hdl, CState #client_msstate { file_handle_cache = FHC2 }}; + +get_read_handle(FileNum, State = #msstate { file_handle_cache = FHC, + dir = Dir }) -> + {Hdl, FHC2} = get_read_handle(FileNum, FHC, Dir), + {Hdl, State #msstate { file_handle_cache = FHC2 }}. + +get_read_handle(FileNum, FHC, Dir) -> case dict:find(FileNum, FHC) of - {ok, Hdl} -> {Hdl, State}; - error -> new_handle(FileNum, - rabbit_msg_store_misc:filenum_to_name(FileNum), - [read | ?BINARY_MODE], State) + {ok, Hdl} -> + {Hdl, FHC}; + error -> + {ok, Hdl} = rabbit_msg_store_misc:open_file( + Dir, rabbit_msg_store_misc:filenum_to_name(FileNum), + [read | ?BINARY_MODE]), + {Hdl, dict:store(FileNum, Hdl, FHC) } end. -new_handle(Key, FileName, Mode, State = #msstate { file_handle_cache = FHC, - dir = Dir }) -> - {ok, Hdl} = rabbit_msg_store_misc:open_file(Dir, FileName, Mode), - {Hdl, State #msstate { file_handle_cache = dict:store(Key, Hdl, FHC) }}. - %%---------------------------------------------------------------------------- %% message cache helper functions %%---------------------------------------------------------------------------- @@ -676,6 +829,9 @@ insert_into_cache(MsgId, Msg) -> %% index %%---------------------------------------------------------------------------- +index_lookup(Key, #client_msstate { index_module = Index, index_state = State }) -> + Index:lookup(Key, State); + index_lookup(Key, #msstate { index_module = Index, index_state = State }) -> Index:lookup(Key, State). @@ -901,7 +1057,8 @@ build_index(Left, [File|Files], ets:insert_new(?FILE_SUMMARY_ETS_NAME, #file_summary { file = File, valid_total_size = ValidTotalSize, contiguous_top = ContiguousTop, locked = false, - left = Left, right = Right, file_size = FileSize1 }), + left = Left, right = Right, file_size = FileSize1, + readers = 0 }), build_index(File, Files, State #msstate { sum_valid_data = SumValid + ValidTotalSize, sum_file_size = SumFileSize + FileSize1 }). @@ -925,7 +1082,7 @@ maybe_roll_to_new_file(Offset, ?FILE_SUMMARY_ETS_NAME, #file_summary { file = NextFile, valid_total_size = 0, contiguous_top = 0, left = CurFile, right = undefined, file_size = 0, - locked = false }), + locked = false, readers = 0 }), true = ets:update_element(?FILE_SUMMARY_ETS_NAME, CurFile, {#file_summary.right, NextFile}), State1 #msstate { current_file_handle = NextHdl, @@ -948,12 +1105,30 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid, {#file_summary.locked, true}), true = ets:update_element(?FILE_SUMMARY_ETS_NAME, Dest, {#file_summary.locked, true}), + %% now that they're locked, we know no queue will touch + %% them (not even add to the ets table for these files), + %% so now ensure that we ask the queues to close handles + %% to these files + true = mark_handle_to_close(Source), + true = mark_handle_to_close(Dest), ok = rabbit_msg_store_gc:gc(Source, Dest), State1 #msstate { gc_active = {Source, Dest} } end; maybe_compact(State) -> State. +mark_handle_to_close(File) -> + lists:foldl( + fun ({Key, opened}, true) -> + try + true = ets:update_element(?FILE_HANDLES_ETS_NAME, + Key, {2, close}) + catch error:badarg -> %% client has deleted concurrently, no prob + true + end + end, + true, ets:match_object(?FILE_HANDLES_ETS_NAME, {{File, '_'}, opened})). + find_files_to_gc(_N, '$end_of_table') -> undefined; find_files_to_gc(N, First) -> @@ -995,8 +1170,8 @@ delete_file_if_empty(File, State = #msstate { current_file = File }) -> delete_file_if_empty(File, State = #msstate { dir = Dir, sum_file_size = SumFileSize }) -> [#file_summary { valid_total_size = ValidData, file_size = FileSize, - left = Left, right = Right, locked = false }] = - ets:lookup(?FILE_SUMMARY_ETS_NAME, File), + left = Left, right = Right, locked = false }] + = ets:lookup(?FILE_SUMMARY_ETS_NAME, File), case ValidData of %% we should NEVER find the current file in here hence right %% should always be a file, not undefined @@ -1012,6 +1187,7 @@ delete_file_if_empty(File, State = true = ets:update_element(?FILE_SUMMARY_ETS_NAME, Left, {#file_summary.right, Right}) end, + true = mark_handle_to_close(File), true = ets:delete(?FILE_SUMMARY_ETS_NAME, File), State1 = close_handle(File, State), ok = file:delete(rabbit_msg_store_misc:form_filename( diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index 1866e6297a..d4c572c108 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -91,23 +91,33 @@ code_change(_OldVsn, State, _Extra) -> adjust_meta_and_combine(SourceFile, DestFile, State) -> [SourceObj = #file_summary { + readers = SourceReaders, valid_total_size = SourceValidData, left = DestFile, file_size = SourceFileSize, locked = true }] = ets:lookup(?FILE_SUMMARY_ETS_NAME, SourceFile), [DestObj = #file_summary { + readers = DestReaders, valid_total_size = DestValidData, right = SourceFile, file_size = DestFileSize, locked = true }] = ets:lookup(?FILE_SUMMARY_ETS_NAME, DestFile), - TotalValidData = DestValidData + SourceValidData, - ok = combine_files(SourceObj, DestObj, State), - %% don't update dest.right, because it could be changing at the same time - true = - ets:update_element(?FILE_SUMMARY_ETS_NAME, DestFile, - [{#file_summary.valid_total_size, TotalValidData}, - {#file_summary.contiguous_top, TotalValidData}, - {#file_summary.file_size, TotalValidData}]), - SourceFileSize + DestFileSize - TotalValidData. + case SourceReaders =:= 0 andalso DestReaders =:= 0 of + true -> + TotalValidData = DestValidData + SourceValidData, + ok = combine_files(SourceObj, DestObj, State), + %% don't update dest.right, because it could be changing + %% at the same time + true = ets:update_element( + ?FILE_SUMMARY_ETS_NAME, DestFile, + [{#file_summary.valid_total_size, TotalValidData}, + {#file_summary.contiguous_top, TotalValidData}, + {#file_summary.file_size, TotalValidData}]), + SourceFileSize + DestFileSize - TotalValidData; + false -> + io:format("sleeping!~n"), + timer:sleep(100), + adjust_meta_and_combine(SourceFile, DestFile, State) + end. combine_files(#file_summary { file = Source, valid_total_size = SourceValid, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 62a4792cd4..856a8c4647 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -911,11 +911,13 @@ msg_store_sync(MsgIds) -> throw(timeout) end. -msg_store_read(MsgIds) -> - ok = - lists:foldl( - fun (MsgId, ok) -> {ok, MsgId} = rabbit_msg_store:read(MsgId), ok end, - ok, MsgIds). +msg_store_read(MsgIds, MSCState) -> + lists:foldl( + fun (MsgId, MSCStateM) -> + {{ok, MsgId}, MSCStateN} = rabbit_msg_store:read(MsgId, MSCStateM), + MSCStateN + end, + MSCState, MsgIds). msg_store_write(MsgIds) -> ok = lists:foldl( @@ -966,9 +968,10 @@ test_msg_store() -> %% should hit a different code path ok = msg_store_sync(MsgIds1stHalf), %% read them all - ok = msg_store_read(MsgIds), + MSCState = rabbit_msg_store:client_init(), + MSCState1 = msg_store_read(MsgIds, MSCState), %% read them all again - this will hit the cache, not disk - ok = msg_store_read(MsgIds), + MSCState2 = msg_store_read(MsgIds, MSCState1), %% remove them all ok = rabbit_msg_store:remove(MsgIds), %% check first half doesn't exist @@ -976,11 +979,12 @@ test_msg_store() -> %% check second half does exist true = msg_store_contains(true, MsgIds2ndHalf), %% read the second half again - ok = msg_store_read(MsgIds2ndHalf), + MSCState3 = msg_store_read(MsgIds2ndHalf, MSCState2), %% release the second half, just for fun (aka code coverage) ok = rabbit_msg_store:release(MsgIds2ndHalf), %% read the second half again, just for fun (aka code coverage) - ok = msg_store_read(MsgIds2ndHalf), + MSCState4 = msg_store_read(MsgIds2ndHalf, MSCState3), + ok = rabbit_msg_store:client_terminate(MSCState4), %% stop and restart, preserving every other msg in 2nd half ok = stop_msg_store(), ok = start_msg_store(fun ([]) -> finished; @@ -1003,19 +1007,28 @@ test_msg_store() -> %% publish the first half again ok = msg_store_write(MsgIds1stHalf), %% this should force some sort of sync internally otherwise misread - ok = msg_store_read(MsgIds1stHalf), + ok = rabbit_msg_store:client_terminate( + msg_store_read(MsgIds1stHalf, rabbit_msg_store:client_init())), ok = rabbit_msg_store:remove(MsgIds1stHalf), %% restart empty ok = stop_msg_store(), ok = start_msg_store_empty(), %% now safe to reuse msg_ids %% push a lot of msgs in... BigCount = 100000, - MsgIdsBig = lists:seq(1, BigCount), + MsgIdsBig = [msg_id_bin(X) || X <- lists:seq(1, BigCount)], Payload = << 0:65536 >>, ok = lists:foldl( fun (MsgId, ok) -> - rabbit_msg_store:write(msg_id_bin(MsgId), Payload) + rabbit_msg_store:write(MsgId, Payload) end, ok, MsgIdsBig), + %% now read them to ensure we hit the fast client-side reading + ok = rabbit_msg_store:client_terminate( + lists:foldl( + fun (MsgId, MSCStateM) -> + {{ok, Payload}, MSCStateN} = + rabbit_msg_store:read(MsgId, MSCStateM), + MSCStateN + end, rabbit_msg_store:client_init(), MsgIdsBig)), %% .., then 3s by 1... ok = lists:foldl( fun (MsgId, ok) -> @@ -1034,7 +1047,7 @@ test_msg_store() -> rabbit_msg_store:remove([msg_id_bin(MsgId)]) end, ok, lists:seq(BigCount-2, 1, -3)), %% ensure empty - false = msg_store_contains(false, [msg_id_bin(M) || M <- MsgIdsBig]), + false = msg_store_contains(false, MsgIdsBig), %% restart empty ok = stop_msg_store(), ok = start_msg_store_empty(), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f2d45700f0..7c1ef6875d 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -61,7 +61,8 @@ avg_ingress_rate, rate_timestamp, len, - on_sync + on_sync, + msg_store_read_state }). -include("rabbit.hrl"). @@ -124,7 +125,8 @@ avg_ingress_rate :: float(), rate_timestamp :: {integer(), integer(), integer()}, len :: non_neg_integer(), - on_sync :: {[ack()], [msg_id()], [{pid(), any()}]} + on_sync :: {[ack()], [msg_id()], [{pid(), any()}]}, + msg_store_read_state :: any() }). -spec(init/1 :: (queue_name()) -> vqstate()). @@ -198,11 +200,14 @@ init(QueueName) -> avg_ingress_rate = 0, rate_timestamp = Now, len = GammaCount, - on_sync = {[], [], []} + on_sync = {[], [], []}, + msg_store_read_state = rabbit_msg_store:client_init() }, maybe_gammas_to_betas(State). -terminate(State = #vqstate { index_state = IndexState }) -> +terminate(State = #vqstate { index_state = IndexState, + msg_store_read_state = MSCState }) -> + rabbit_msg_store:client_terminate(MSCState), State #vqstate { index_state = rabbit_queue_index:terminate(IndexState) }. publish(Msg, State) -> @@ -618,7 +623,8 @@ remove_queue_entries(Q, IndexState) -> fetch_from_q3_or_gamma(State = #vqstate { q1 = Q1, q2 = Q2, gamma = #gamma { count = GammaCount }, - q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount }) -> + q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount, + msg_store_read_state = MSCState }) -> case queue:out(Q3) of {empty, _Q3} -> 0 = GammaCount, %% ASSERTION @@ -629,15 +635,16 @@ fetch_from_q3_or_gamma(State = #vqstate { #beta { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered, is_persistent = IsPersistent, index_on_disk = IndexOnDisk }}, Q3a} -> - {ok, Msg = #basic_message { is_persistent = IsPersistent, - guid = MsgId }} = - rabbit_msg_store:read(MsgId), + {{ok, Msg = #basic_message { is_persistent = IsPersistent, + guid = MsgId }}, MSCState1} = + rabbit_msg_store:read(MsgId, MSCState), Q4a = queue:in( #alpha { msg = Msg, seq_id = SeqId, is_delivered = IsDelivered, msg_on_disk = true, index_on_disk = IndexOnDisk }, Q4), State1 = State #vqstate { q3 = Q3a, q4 = Q4a, - ram_msg_count = RamMsgCount + 1 }, + ram_msg_count = RamMsgCount + 1, + msg_store_read_state = MSCState1 }, State2 = case {queue:is_empty(Q3a), 0 == GammaCount} of {true, true} -> |
