summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniil Fedotov <dfedotov@pivotal.io>2017-03-07 15:47:26 +0000
committerDaniil Fedotov <dfedotov@pivotal.io>2017-03-07 15:47:26 +0000
commit1a7a5571ff369831b7af5ce2c24a4cf3d4e58fe1 (patch)
tree1be38aa069c47651c397d4f8d637400e2a5b8222
parentf94b9ed329b87f3def32948e763a398e48865401 (diff)
downloadrabbitmq-server-git-1a7a5571ff369831b7af5ce2c24a4cf3d4e58fe1.tar.gz
Call msg_store_index operations via helper functions.
This makes reasoning about message store index concurrency and usage easier. One can see which operations on index are called from which actor in the message store.
-rw-r--r--src/rabbit_msg_store.erl28
1 files changed, 18 insertions, 10 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 52d0b5f0fc..6b16c7eabf 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -1517,6 +1517,10 @@ index_lookup_positive_ref_count(Key, State) ->
index_update_ref_count(Key, RefCount, State) ->
index_update_fields(Key, {#msg_location.ref_count, RefCount}, State).
+index_lookup(Key, #gc_state { index_module = Index,
+ index_state = State }) ->
+ Index:lookup(Key, State);
+
index_lookup(Key, #client_msstate { index_module = Index,
index_state = State }) ->
Index:lookup(Key, State);
@@ -1530,13 +1534,20 @@ index_insert(Obj, #msstate { index_module = Index, index_state = State }) ->
index_update(Obj, #msstate { index_module = Index, index_state = State }) ->
Index:update(Obj, State).
-index_update_fields(Key, Updates, #msstate { index_module = Index,
+index_update_fields(Key, Updates, #msstate{ index_module = Index,
+ index_state = State }) ->
+ Index:update_fields(Key, Updates, State);
+index_update_fields(Key, Updates, #gc_state{ index_module = Index,
index_state = State }) ->
Index:update_fields(Key, Updates, State).
index_delete(Key, #msstate { index_module = Index, index_state = State }) ->
Index:delete(Key, State).
+index_delete_object(Obj, #gc_state{ index_module = Index,
+ index_state = State }) ->
+ Index:delete_object(Obj, State).
+
index_clean_up_temporary_reference_count_entries(
#msstate { index_module = Index,
index_state = State }) ->
@@ -2037,19 +2048,17 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts,
gen_server2:cast(Server, {delete_file, File, FileSize}),
safe_file_delete_fun(File, Dir, FileHandlesEts).
-load_and_vacuum_message_file(File, #gc_state { dir = Dir,
- index_module = Index,
- index_state = IndexState }) ->
+load_and_vacuum_message_file(File, State = #gc_state { dir = Dir }) ->
%% Messages here will be end-of-file at start-of-list
{ok, Messages, _FileSize} =
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
%% foldl will reverse so will end up with msgs in ascending offset order
lists:foldl(
fun ({MsgId, TotalSize, Offset}, Acc = {List, Size}) ->
- case Index:lookup(MsgId, IndexState) of
+ case index_lookup(MsgId, State) of
#msg_location { file = File, total_size = TotalSize,
offset = Offset, ref_count = 0 } = Entry ->
- ok = Index:delete_object(Entry, IndexState),
+ ok = index_delete_object(Entry, State),
Acc;
#msg_location { file = File, total_size = TotalSize,
offset = Offset } = Entry ->
@@ -2060,8 +2069,7 @@ load_and_vacuum_message_file(File, #gc_state { dir = Dir,
end, {[], 0}, Messages).
copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
- Destination, #gc_state { index_module = Index,
- index_state = IndexState }) ->
+ Destination, State) ->
Copy = fun ({BlockStart, BlockEnd}) ->
BSize = BlockEnd - BlockStart,
{ok, BlockStart} =
@@ -2077,10 +2085,10 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
%% CurOffset is in the DestinationFile.
%% Offset, BlockStart and BlockEnd are in the SourceFile
%% update MsgLocation to reflect change of file and offset
- ok = Index:update_fields(MsgId,
+ ok = index_update_fields(MsgId,
[{#msg_location.file, Destination},
{#msg_location.offset, CurOffset}],
- IndexState),
+ State),
{CurOffset + TotalSize,
case BlockEnd of
undefined ->