diff options
| author | Daniil Fedotov <dfedotov@pivotal.io> | 2017-03-07 15:47:26 +0000 |
|---|---|---|
| committer | Daniil Fedotov <dfedotov@pivotal.io> | 2017-03-07 15:47:26 +0000 |
| commit | 1a7a5571ff369831b7af5ce2c24a4cf3d4e58fe1 (patch) | |
| tree | 1be38aa069c47651c397d4f8d637400e2a5b8222 | |
| parent | f94b9ed329b87f3def32948e763a398e48865401 (diff) | |
| download | rabbitmq-server-git-1a7a5571ff369831b7af5ce2c24a4cf3d4e58fe1.tar.gz | |
Call msg_store_index operations via helper functions.
This makes reasoning about message store index concurrency and
usage easier. One can see which operations on index are called from
which actor in the message store.
| -rw-r--r-- | src/rabbit_msg_store.erl | 28 |
1 files changed, 18 insertions, 10 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 52d0b5f0fc..6b16c7eabf 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -1517,6 +1517,10 @@ index_lookup_positive_ref_count(Key, State) -> index_update_ref_count(Key, RefCount, State) -> index_update_fields(Key, {#msg_location.ref_count, RefCount}, State). +index_lookup(Key, #gc_state { index_module = Index, + index_state = State }) -> + Index:lookup(Key, State); + index_lookup(Key, #client_msstate { index_module = Index, index_state = State }) -> Index:lookup(Key, State); @@ -1530,13 +1534,20 @@ index_insert(Obj, #msstate { index_module = Index, index_state = State }) -> index_update(Obj, #msstate { index_module = Index, index_state = State }) -> Index:update(Obj, State). -index_update_fields(Key, Updates, #msstate { index_module = Index, +index_update_fields(Key, Updates, #msstate{ index_module = Index, + index_state = State }) -> + Index:update_fields(Key, Updates, State); +index_update_fields(Key, Updates, #gc_state{ index_module = Index, index_state = State }) -> Index:update_fields(Key, Updates, State). index_delete(Key, #msstate { index_module = Index, index_state = State }) -> Index:delete(Key, State). +index_delete_object(Obj, #gc_state{ index_module = Index, + index_state = State }) -> + Index:delete_object(Obj, State). + index_clean_up_temporary_reference_count_entries( #msstate { index_module = Index, index_state = State }) -> @@ -2037,19 +2048,17 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts, gen_server2:cast(Server, {delete_file, File, FileSize}), safe_file_delete_fun(File, Dir, FileHandlesEts). -load_and_vacuum_message_file(File, #gc_state { dir = Dir, - index_module = Index, - index_state = IndexState }) -> +load_and_vacuum_message_file(File, State = #gc_state { dir = Dir }) -> %% Messages here will be end-of-file at start-of-list {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, filenum_to_name(File)), %% foldl will reverse so will end up with msgs in ascending offset order lists:foldl( fun ({MsgId, TotalSize, Offset}, Acc = {List, Size}) -> - case Index:lookup(MsgId, IndexState) of + case index_lookup(MsgId, State) of #msg_location { file = File, total_size = TotalSize, offset = Offset, ref_count = 0 } = Entry -> - ok = Index:delete_object(Entry, IndexState), + ok = index_delete_object(Entry, State), Acc; #msg_location { file = File, total_size = TotalSize, offset = Offset } = Entry -> @@ -2060,8 +2069,7 @@ load_and_vacuum_message_file(File, #gc_state { dir = Dir, end, {[], 0}, Messages). copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, - Destination, #gc_state { index_module = Index, - index_state = IndexState }) -> + Destination, State) -> Copy = fun ({BlockStart, BlockEnd}) -> BSize = BlockEnd - BlockStart, {ok, BlockStart} = @@ -2077,10 +2085,10 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, %% CurOffset is in the DestinationFile. %% Offset, BlockStart and BlockEnd are in the SourceFile %% update MsgLocation to reflect change of file and offset - ok = Index:update_fields(MsgId, + ok = index_update_fields(MsgId, [{#msg_location.file, Destination}, {#msg_location.offset, CurOffset}], - IndexState), + State), {CurOffset + TotalSize, case BlockEnd of undefined -> |
