summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-05-06 07:20:04 +0100
committerMatthias Radestock <matthias@lshift.net>2010-05-06 07:20:04 +0100
commit9664f07c47a2adb0128b9bcef0615abd8ea022ab (patch)
tree0f24d55ab922ca876cdef5a03e112230cee3b1c5 /src
parent3946353d89493d26bddcef75a7c7008998529cce (diff)
downloadrabbitmq-server-git-9664f07c47a2adb0128b9bcef0615abd8ea022ab.tar.gz
cosmetic
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_msg_store.erl168
1 files changed, 92 insertions, 76 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 4dc390f66c..d92d7aa3e5 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -310,7 +310,7 @@ write(Server, Guid, Msg,
{gen_server2:cast(Server, {write, Guid, Msg}), CState}.
read(Server, Guid,
- CState = #client_msstate { dedup_cache_ets = DedupCacheEts,
+ CState = #client_msstate { dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts }) ->
%% 1. Check the dedup cache
case fetch_and_increment_cache(DedupCacheEts, Guid) of
@@ -389,14 +389,6 @@ safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) ->
safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) ->
safe_ets_update_counter(Tab, Key, UpdateOp, fun (_) -> ok end, FailThunk).
-update_msg_cache(CacheEts, Guid, Msg) ->
- case ets:insert_new(CacheEts, {Guid, Msg, 1}) of
- true -> ok;
- false -> safe_ets_update_counter_ok(
- CacheEts, Guid, {3, +1},
- fun () -> update_msg_cache(CacheEts, Guid, Msg) end)
- end.
-
client_read1(Server,
#msg_location { guid = Guid, file = File } = MsgLocation,
Defer,
@@ -539,7 +531,6 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) ->
end
end,
- InitFile = 0,
{FileSummaryRecovered, FileSummaryEts} =
recover_file_summary(AllCleanShutdown, Dir),
DedupCacheEts = ets:new(rabbit_msg_store_dedup_cache, [set, public]),
@@ -550,7 +541,7 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) ->
State = #msstate { dir = Dir,
index_module = IndexModule,
index_state = IndexState,
- current_file = InitFile,
+ current_file = 0,
current_file_handle = undefined,
file_handle_cache = dict:new(),
on_sync = [],
@@ -641,10 +632,10 @@ handle_cast({write, Guid, Msg},
offset = CurOffset, total_size = TotalSize },
State),
[#file_summary { valid_total_size = ValidTotalSize,
- contiguous_top = ContiguousTop,
- right = undefined,
- locked = false,
- file_size = FileSize }] =
+ contiguous_top = ContiguousTop,
+ right = undefined,
+ locked = false,
+ file_size = FileSize }] =
ets:lookup(FileSummaryEts, CurFile),
ValidTotalSize1 = ValidTotalSize + TotalSize,
ContiguousTop1 = if CurOffset =:= ContiguousTop ->
@@ -664,7 +655,7 @@ handle_cast({write, Guid, Msg},
maybe_roll_to_new_file(
NextOffset, State #msstate {
sum_valid_data = SumValid + TotalSize,
- sum_file_size = SumFileSize + TotalSize })));
+ sum_file_size = SumFileSize + TotalSize })));
#msg_location { ref_count = RefCount } ->
%% We already know about it, just update counter. Only
%% update field otherwise bad interaction with concurrent GC
@@ -705,8 +696,8 @@ handle_cast(sync, State) ->
noreply(internal_sync(State));
handle_cast({gc_done, Reclaimed, Source, Dest},
- State = #msstate { sum_file_size = SumFileSize,
- gc_active = {Source, Dest},
+ State = #msstate { sum_file_size = SumFileSize,
+ gc_active = {Source, Dest},
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts }) ->
%% GC done, so now ensure that any clients that have open fhs to
@@ -721,9 +712,10 @@ handle_cast({gc_done, Reclaimed, Source, Dest},
%% we always move data left, so Source has gone and was on the
%% right, so need to make dest = source.right.left, and also
%% dest.right = source.right
- [#file_summary { left = Dest, right = SourceRight, locked = true,
- readers = 0 }] =
- ets:lookup(FileSummaryEts, Source),
+ [#file_summary { left = Dest,
+ right = SourceRight,
+ locked = true,
+ readers = 0 }] = ets:lookup(FileSummaryEts, Source),
%% this could fail if SourceRight == undefined
ets:update_element(FileSummaryEts, SourceRight,
{#file_summary.left, Dest}),
@@ -733,7 +725,7 @@ handle_cast({gc_done, Reclaimed, Source, Dest},
true = ets:delete(FileSummaryEts, Source),
noreply(run_pending(
State #msstate { sum_file_size = SumFileSize - Reclaimed,
- gc_active = false }));
+ gc_active = false }));
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
@@ -819,7 +811,7 @@ sort_file_names(FileNames) ->
FileNames).
internal_sync(State = #msstate { current_file_handle = CurHdl,
- on_sync = Syncs }) ->
+ on_sync = Syncs }) ->
State1 = stop_sync_timer(State),
case Syncs of
[] -> State1;
@@ -829,8 +821,8 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
State1 #msstate { on_sync = [] }
end.
-read_message(Guid, From, State =
- #msstate { dedup_cache_ets = DedupCacheEts }) ->
+read_message(Guid, From,
+ State = #msstate { dedup_cache_ets = DedupCacheEts }) ->
case index_lookup(Guid, State) of
not_found -> gen_server2:reply(From, not_found),
State;
@@ -846,11 +838,11 @@ read_message(Guid, From, State =
read_message1(From, #msg_location { guid = Guid, ref_count = RefCount,
file = File, offset = Offset } = MsgLoc,
- State = #msstate { current_file = CurFile,
+ State = #msstate { current_file = CurFile,
current_file_handle = CurHdl,
- file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
- cur_file_cache_ets = CurFileCacheEts }) ->
+ file_summary_ets = FileSummaryEts,
+ dedup_cache_ets = DedupCacheEts,
+ cur_file_cache_ets = CurFileCacheEts }) ->
case File =:= CurFile of
true ->
{Msg, State1} =
@@ -913,6 +905,14 @@ maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg)
maybe_insert_into_cache(_DedupCacheEts, _RefCount, _Guid, _Msg) ->
ok.
+update_msg_cache(CacheEts, Guid, Msg) ->
+ case ets:insert_new(CacheEts, {Guid, Msg, 1}) of
+ true -> ok;
+ false -> safe_ets_update_counter_ok(
+ CacheEts, Guid, {3, +1},
+ fun () -> update_msg_cache(CacheEts, Guid, Msg) end)
+ end.
+
contains_message(Guid, From, State = #msstate { gc_active = GCActive }) ->
case index_lookup(Guid, State) of
not_found ->
@@ -929,9 +929,9 @@ contains_message(Guid, From, State = #msstate { gc_active = GCActive }) ->
end
end.
-remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
+remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts }) ->
+ dedup_cache_ets = DedupCacheEts }) ->
#msg_location { ref_count = RefCount, file = File,
offset = Offset, total_size = TotalSize } =
index_lookup(Guid, State),
@@ -942,8 +942,8 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
%% msg.
ok = remove_cache_entry(DedupCacheEts, Guid),
[#file_summary { valid_total_size = ValidTotalSize,
- contiguous_top = ContiguousTop,
- locked = Locked }] =
+ contiguous_top = ContiguousTop,
+ locked = Locked }] =
ets:lookup(FileSummaryEts, File),
case Locked of
true ->
@@ -1063,7 +1063,6 @@ store_file_summary(Tid, Dir) ->
[{extended_info, [object_count]}]),
ets:delete(Tid).
-
preallocate(Hdl, FileSizeLimit, FinalPos) ->
{ok, FileSizeLimit} = file_handle_cache:position(Hdl, FileSizeLimit),
ok = file_handle_cache:truncate(Hdl),
@@ -1130,7 +1129,7 @@ decrement_cache(DedupCacheEts, Guid) ->
%%----------------------------------------------------------------------------
index_lookup(Key, #client_msstate { index_module = Index,
- index_state = State }) ->
+ index_state = State }) ->
Index:lookup(Key, State);
index_lookup(Key, #msstate { index_module = Index, index_state = State }) ->
@@ -1143,14 +1142,14 @@ index_update(Obj, #msstate { index_module = Index, index_state = State }) ->
Index:update(Obj, State).
index_update_fields(Key, Updates, #msstate { index_module = Index,
- index_state = State }) ->
+ index_state = State }) ->
Index:update_fields(Key, Updates, State).
index_delete(Key, #msstate { index_module = Index, index_state = State }) ->
Index:delete(Key, State).
index_delete_by_file(File, #msstate { index_module = Index,
- index_state = State }) ->
+ index_state = State }) ->
Index:delete_by_file(File, State).
%%----------------------------------------------------------------------------
@@ -1315,13 +1314,14 @@ build_index(true, _Files, State =
#msstate { file_summary_ets = FileSummaryEts }) ->
ets:foldl(
fun (#file_summary { valid_total_size = ValidTotalSize,
- file_size = FileSize, file = File },
+ file_size = FileSize,
+ file = File },
{_Offset, State1 = #msstate { sum_valid_data = SumValid,
- sum_file_size = SumFileSize }}) ->
+ sum_file_size = SumFileSize }}) ->
{FileSize, State1 #msstate {
sum_valid_data = SumValid + ValidTotalSize,
- sum_file_size = SumFileSize + FileSize,
- current_file = File }}
+ sum_file_size = SumFileSize + FileSize,
+ current_file = File }}
end, {0, State}, FileSummaryEts);
build_index(false, Files, State) ->
{ok, Pid} = gatherer:start_link(),
@@ -1333,8 +1333,8 @@ build_index(false, Files, State) ->
build_index(Gatherer, Left, [],
State = #msstate { file_summary_ets = FileSummaryEts,
- sum_valid_data = SumValid,
- sum_file_size = SumFileSize }) ->
+ sum_valid_data = SumValid,
+ sum_file_size = SumFileSize }) ->
case gatherer:fetch(Gatherer) of
finished ->
ok = rabbit_misc:unlink_and_capture_exit(Gatherer),
@@ -1351,7 +1351,7 @@ build_index(Gatherer, Left, [],
build_index(Gatherer, Left, [],
State #msstate {
sum_valid_data = SumValid + ValidTotalSize,
- sum_file_size = SumFileSize + FileSize })
+ sum_file_size = SumFileSize + FileSize })
end;
build_index(Gatherer, Left, [File|Files], State) ->
Child = make_ref(),
@@ -1395,10 +1395,14 @@ build_index_worker(
[F|_] -> {F, FileSize}
end,
ok = gatherer:produce(Gatherer, #file_summary {
- file = File, valid_total_size = ValidTotalSize,
- contiguous_top = ContiguousTop, locked = false,
- left = Left, right = Right, file_size = FileSize1,
- readers = 0 }),
+ file = File,
+ valid_total_size = ValidTotalSize,
+ contiguous_top = ContiguousTop,
+ left = Left,
+ right = Right,
+ file_size = FileSize1,
+ locked = false,
+ readers = 0 }),
ok = gatherer:finished(Gatherer, Ref).
%%----------------------------------------------------------------------------
@@ -1419,11 +1423,15 @@ maybe_roll_to_new_file(
{ok, NextHdl} = open_file(
Dir, filenum_to_name(NextFile),
?WRITE_MODE),
- true = ets:insert_new(
- FileSummaryEts, #file_summary {
- file = NextFile, valid_total_size = 0, contiguous_top = 0,
- left = CurFile, right = undefined, file_size = 0,
- locked = false, readers = 0 }),
+ true = ets:insert_new(FileSummaryEts, #file_summary {
+ file = NextFile,
+ valid_total_size = 0,
+ contiguous_top = 0,
+ left = CurFile,
+ right = undefined,
+ file_size = 0,
+ locked = false,
+ readers = 0 }),
true = ets:update_element(FileSummaryEts, CurFile,
{#file_summary.right, NextFile}),
true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
@@ -1481,11 +1489,13 @@ find_files_to_gc(FileSummaryEts, N, First) ->
find_files_to_gc(_FileSummaryEts, _N, #file_summary {}, [], Pairs) ->
lists:reverse(Pairs);
find_files_to_gc(FileSummaryEts, N,
- #file_summary { right = Source, file = Dest,
+ #file_summary { right = Source,
+ file = Dest,
valid_total_size = DestValid },
- [SourceObj = #file_summary { left = Dest, right = SourceRight,
+ [SourceObj = #file_summary { left = Dest,
+ right = SourceRight,
valid_total_size = SourceValid,
- file = Source }],
+ file = Source }],
Pairs) when DestValid + SourceValid =< ?FILE_SIZE_LIMIT andalso
not is_atom(SourceRight) ->
Pair = {Source, Dest},
@@ -1502,13 +1512,17 @@ find_files_to_gc(FileSummaryEts, N, _Left,
delete_file_if_empty(File, State = #msstate { current_file = File }) ->
State;
-delete_file_if_empty(File, State =
- #msstate { dir = Dir, sum_file_size = SumFileSize,
- file_handles_ets = FileHandlesEts,
- file_summary_ets = FileSummaryEts }) ->
- [#file_summary { valid_total_size = ValidData, file_size = FileSize,
- left = Left, right = Right, locked = false }]
- = ets:lookup(FileSummaryEts, File),
+delete_file_if_empty(File, State = #msstate {
+ dir = Dir,
+ sum_file_size = SumFileSize,
+ file_handles_ets = FileHandlesEts,
+ file_summary_ets = FileSummaryEts }) ->
+ [#file_summary { valid_total_size = ValidData,
+ left = Left,
+ right = Right,
+ file_size = FileSize,
+ locked = false }] =
+ ts:lookup(FileSummaryEts, File),
case ValidData of
%% we should NEVER find the current file in here hence right
%% should always be a file, not undefined
@@ -1540,15 +1554,17 @@ delete_file_if_empty(File, State =
gc(SourceFile, DestFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) ->
[SourceObj = #file_summary {
- readers = SourceReaders,
- valid_total_size = SourceValidData, left = DestFile,
- file_size = SourceFileSize, locked = true }] =
- ets:lookup(FileSummaryEts, SourceFile),
+ readers = SourceReaders,
+ valid_total_size = SourceValidData,
+ left = DestFile,
+ file_size = SourceFileSize,
+ locked = true }] = ets:lookup(FileSummaryEts, SourceFile),
[DestObj = #file_summary {
- readers = DestReaders,
- valid_total_size = DestValidData, right = SourceFile,
- file_size = DestFileSize, locked = true }] =
- ets:lookup(FileSummaryEts, DestFile),
+ readers = DestReaders,
+ valid_total_size = DestValidData,
+ right = SourceFile,
+ file_size = DestFileSize,
+ locked = true }] = ets:lookup(FileSummaryEts, DestFile),
case SourceReaders =:= 0 andalso DestReaders =:= 0 of
true -> TotalValidData = DestValidData + SourceValidData,
@@ -1565,13 +1581,13 @@ gc(SourceFile, DestFile, State = {FileSummaryEts, _Dir, _Index, _IndexState}) ->
gc(SourceFile, DestFile, State)
end.
-combine_files(#file_summary { file = Source,
+combine_files(#file_summary { file = Source,
valid_total_size = SourceValid,
- left = Destination },
- #file_summary { file = Destination,
+ left = Destination },
+ #file_summary { file = Destination,
valid_total_size = DestinationValid,
- contiguous_top = DestinationContiguousTop,
- right = Source },
+ contiguous_top = DestinationContiguousTop,
+ right = Source },
State = {_FileSummaryEts, Dir, _Index, _IndexState}) ->
SourceName = filenum_to_name(Source),
DestinationName = filenum_to_name(Destination),