diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-12-20 19:04:58 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-12-20 19:04:58 +0000 |
| commit | 95f466aa10e9ae29bf0d8b6b3f81846bb55bfbd0 (patch) | |
| tree | 54d9fe9d64ea6befe46051e0f7b3de145ca1f242 | |
| parent | 1e2f77364d773a726d775b1a5b76cd599e32e6af (diff) | |
| parent | 57baa1db68397e308c57a738a29ec136065eee8c (diff) | |
| download | rabbitmq-server-git-95f466aa10e9ae29bf0d8b6b3f81846bb55bfbd0.tar.gz | |
merged bug 22161 into bug 21673. Lazy, concurrent msg_store GC landed.
| -rw-r--r-- | include/rabbit_msg_store.hrl | 51 | ||||
| -rw-r--r-- | src/gen_server2.erl | 13 | ||||
| -rw-r--r-- | src/rabbit.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 740 | ||||
| -rw-r--r-- | src/rabbit_msg_store_ets_index.erl | 71 | ||||
| -rw-r--r-- | src/rabbit_msg_store_gc.erl | 249 | ||||
| -rw-r--r-- | src/rabbit_msg_store_misc.erl | 74 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 25 | ||||
| -rw-r--r-- | src/random_distributions.erl | 38 |
10 files changed, 842 insertions, 435 deletions
diff --git a/include/rabbit_msg_store.hrl b/include/rabbit_msg_store.hrl new file mode 100644 index 0000000000..925d5d8e71 --- /dev/null +++ b/include/rabbit_msg_store.hrl @@ -0,0 +1,51 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-record(msg_location, + {msg_id, ref_count, file, offset, total_size}). + +-record(file_summary, + {file, valid_total_size, contiguous_top, left, right, file_size, + locked}). + +-define(BINARY_MODE, [raw, binary]). +-define(READ_MODE, [read]). +-define(READ_AHEAD_MODE, [read_ahead | ?READ_MODE]). +-define(WRITE_MODE, [write]). + +-define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). +-define(FILE_EXTENSION, ".rdq"). +-define(FILE_EXTENSION_TMP, ".rdt"). + +-define(FILE_SIZE_LIMIT, (16*1024*1024)). + +-define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 53edf8deef..c48061518d 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -25,8 +25,11 @@ %% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be %% called immediately prior to and post hibernation, respectively. If %% handle_pre_hibernate returns {hibernate, NewState} then the process -%% will hibernate. If the module does not implement -%% handle_pre_hibernate/1 then the default action is to hibernate. +%% will hibernate. If handle_pre_hibernate returns {insomniate, +%% NewState} then the process will go around again, trying to receive +%% for up to the current timeout value before attempting to hibernate +%% again. If the module does not implement handle_pre_hibernate/1 then +%% the default action is to hibernate. %% %% 6) init can return a 4th arg, {backoff, InitialTimeout, %% MinimumTimeout, DesiredHibernatePeriod} (all in @@ -36,7 +39,7 @@ %% InitialTimeout supplied from init). After this timeout has %% occurred, hibernation will occur as normal. Upon awaking, a new %% current timeout value will be calculated. -%% +%% %% The purpose is that the gen_server2 takes care of adjusting the %% current timeout value such that the process will increase the %% timeout value repeatedly if it is unable to sleep for the @@ -126,6 +129,7 @@ %%% handle_pre_hibernate(State) %%% %%% ==> {hibernate, State} +%%% {insomniate, State} %%% {stop, Reason, State} %%% Reason = normal | shutdown | Term, terminate(State) is called %%% @@ -545,6 +549,9 @@ pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> {hibernate, NState} -> hibernate(Parent, Name, NState, Mod, TimeoutState, Queue, Debug); + {insomniate, NState} -> + process_next_msg(Parent, Name, NState, Mod, hibernate, + TimeoutState, Queue, Debug); Reply -> handle_common_termination(Reply, Name, pre_hibernate, Mod, State, Debug) diff --git a/src/rabbit.erl b/src/rabbit.erl index 2aa58fc02a..fe1be7c292 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -150,12 +150,11 @@ start(normal, []) -> start_child(vm_memory_monitor, [MemoryWatermark]) end, - ok = rabbit_amqqueue:start(), + ok = start_child(rabbit_memory_monitor), + ok = start_child(rabbit_guid), ok = start_child(rabbit_router), - ok = start_child(rabbit_guid), - ok = start_child(rabbit_node_monitor), - ok = start_child(rabbit_memory_monitor) + ok = start_child(rabbit_node_monitor) end}, {"recovery", fun () -> @@ -163,6 +162,9 @@ start(normal, []) -> ok = rabbit_exchange:recover(), DurableQueues = rabbit_amqqueue:find_durable_queues(), ok = rabbit_queue_index:start_msg_store(DurableQueues), + + ok = rabbit_amqqueue:start(), + {ok, _RealDurableQueues} = rabbit_amqqueue:recover(DurableQueues) %% TODO - RealDurableQueues is a subset of %% DurableQueues. It may have queues removed which diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 21764fce6d..23666a5f3d 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -485,7 +485,7 @@ unfold(Fun, Acc, Init) -> ceil(N) -> T = trunc(N), - case N - T of - 0 -> N; - _ -> 1 + T + case N == T of + true -> T; + false -> 1 + T end. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index f139fc4581..c060c8d4c6 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -36,16 +36,16 @@ -export([start_link/3, write/2, read/1, contains/1, remove/1, release/1, sync/2]). --export([sync/0]). %% internal +-export([sync/0, gc_done/3]). %% internal -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). + terminate/2, code_change/3, handle_pre_hibernate/1]). -define(SERVER, ?MODULE). --define(FILE_SIZE_LIMIT, (16*1024*1024)). -define(SYNC_INTERVAL, 5). %% milliseconds --define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB + +-define(GEOMETRIC_P, 0.3). %% parameter to geometric distribution rng %%---------------------------------------------------------------------------- @@ -54,6 +54,7 @@ -type(msg_id() :: binary()). -type(msg() :: any()). -type(file_path() :: any()). +-type(file_num() :: non_neg_integer()). -spec(start_link/3 :: (file_path(), @@ -65,6 +66,7 @@ -spec(remove/1 :: ([msg_id()]) -> 'ok'). -spec(release/1 :: ([msg_id()]) -> 'ok'). -spec(sync/2 :: ([msg_id()], fun (() -> any())) -> 'ok'). +-spec(gc_done/3 :: (non_neg_integer(), file_num(), file_num()) -> 'ok'). -endif. @@ -72,36 +74,29 @@ -record(msstate, {dir, %% store directory - msg_locations, %% where are messages? + index_module, %% the module for index ops + index_state, %% where are messages? file_summary, %% what's in the files? current_file, %% current file name as number current_file_handle, %% current file handle %% since the last fsync? - file_size_limit, %% how big can our files get? file_handle_cache, %% file handle cache on_sync, %% pending sync requests sync_timer_ref, %% TRef for our interval timer message_cache, %% ets message cache sum_valid_data, %% sum of valid data in all files - sum_file_size %% sum of file sizes - }). - --record(msg_location, - {msg_id, ref_count, file, offset, total_size}). - --record(file_summary, - {file, valid_total_size, contiguous_top, left, right, file_size}). + sum_file_size, %% sum of file sizes + pending_gc_completion, %% things to do once GC completes + gc_active %% is the GC currently working? + }). --define(MSG_LOC_NAME, rabbit_disk_queue_msg_location). --define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary). --define(FILE_EXTENSION, ".rdq"). --define(FILE_EXTENSION_TMP, ".rdt"). --define(CACHE_ETS_NAME, rabbit_disk_queue_cache). +-include("rabbit_msg_store.hrl"). --define(BINARY_MODE, [raw, binary]). --define(READ_MODE, [read]). --define(READ_AHEAD_MODE, [read_ahead | ?READ_MODE]). --define(WRITE_MODE, [write]). +-define(FILE_SUMMARY_ETS_NAME, rabbit_msg_store_file_summary). +-define(CACHE_ETS_NAME, rabbit_msg_store_cache). +%% We run GC whenever (garbage / sum_file_size) > ?GARBAGE_FRACTION +%% It is not recommended to set this to < 0.5 +-define(GARBAGE_FRACTION, 0.5). %% The components: %% @@ -237,6 +232,8 @@ remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}). release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}). sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}). sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal +gc_done(Reclaimed, Source, Destination) -> + gen_server2:pcast(?SERVER, 9, {gc_done, Reclaimed, Source, Destination}). %%---------------------------------------------------------------------------- %% gen_server callbacks @@ -247,26 +244,29 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> ok = filelib:ensure_dir(filename:join(Dir, "nothing")), - MsgLocations = ets:new(?MSG_LOC_NAME, - [set, private, {keypos, #msg_location.msg_id}]), + IndexModule = rabbit_msg_store_ets_index, + IndexState = IndexModule:init(), InitFile = 0, FileSummary = ets:new(?FILE_SUMMARY_ETS_NAME, - [set, private, {keypos, #file_summary.file}]), + [ordered_set, public, + {keypos, #file_summary.file}]), MessageCache = ets:new(?CACHE_ETS_NAME, [set, private]), State = #msstate { dir = Dir, - msg_locations = MsgLocations, + index_module = IndexModule, + index_state = IndexState, file_summary = FileSummary, current_file = InitFile, current_file_handle = undefined, - file_size_limit = ?FILE_SIZE_LIMIT, file_handle_cache = dict:new(), on_sync = [], sync_timer_ref = undefined, message_cache = MessageCache, sum_valid_data = 0, - sum_file_size = 0 + sum_file_size = 0, + pending_gc_completion = [], + gc_active = false }, ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State), @@ -282,65 +282,25 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> build_index(Files, State), %% read is only needed so that we can seek - {ok, FileHdl} = open_file(Dir, filenum_to_name(CurFile), - [read | ?WRITE_MODE]), + {ok, FileHdl} = rabbit_msg_store_misc:open_file( + Dir, rabbit_msg_store_misc:filenum_to_name(CurFile), + [read | ?WRITE_MODE]), {ok, Offset} = file_handle_cache:position(FileHdl, Offset), ok = file_handle_cache:truncate(FileHdl), - {ok, State1 #msstate { current_file_handle = FileHdl }}. - -handle_call({read, MsgId}, _From, State = - #msstate { current_file = CurFile, - current_file_handle = CurHdl }) -> - {Result, State1} = - case index_lookup(MsgId, State) of - not_found -> {not_found, State}; - #msg_location { ref_count = RefCount, - file = File, - offset = Offset, - total_size = TotalSize } -> - case fetch_and_increment_cache(MsgId, State) of - not_found -> - ok = case CurFile =:= File andalso {ok, Offset} >= - file_handle_cache:current_raw_offset(CurHdl) of - true -> file_handle_cache:flush(CurHdl); - false -> ok - end, - {Hdl, State2} = get_read_handle(File, State), - {ok, Offset} = file_handle_cache:position(Hdl, Offset), - {ok, {MsgId, Msg}} = - case rabbit_msg_file:read(Hdl, TotalSize) of - {ok, {MsgId, _}} = Obj -> Obj; - Rest -> - throw({error, {misread, [{old_state, State}, - {file_num, File}, - {offset, Offset}, - {read, Rest}, - {proc_dict, get()} - ]}}) - end, - ok = case RefCount > 1 of - true -> - insert_into_cache(MsgId, Msg, State2); - false -> - %% it's not in the cache and we - %% only have one reference to the - %% message. So don't bother - %% putting it in the cache. - ok - end, - {{ok, Msg}, State2}; - {Msg, _RefCount} -> - {{ok, Msg}, State} - end - end, - reply(Result, State1); + {ok, _Pid} = rabbit_msg_store_gc:start_link( + Dir, IndexState, FileSummary, IndexModule), + + {ok, State1 #msstate { current_file_handle = FileHdl }, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -handle_call({contains, MsgId}, _From, State) -> - reply(case index_lookup(MsgId, State) of - not_found -> false; - #msg_location {} -> true - end, State). +handle_call({read, MsgId}, From, State) -> + State1 = read_message(MsgId, From, State), + noreply(State1); + +handle_call({contains, MsgId}, From, State) -> + State1 = contains_message(MsgId, From, State), + noreply(State1). handle_cast({write, MsgId, Msg}, State = #msstate { current_file_handle = CurHdl, @@ -360,6 +320,7 @@ handle_cast({write, MsgId, Msg}, [FSEntry = #file_summary { valid_total_size = ValidTotalSize, contiguous_top = ContiguousTop, right = undefined, + locked = false, file_size = FileSize }] = ets:lookup(FileSummary, CurFile), ValidTotalSize1 = ValidTotalSize + TotalSize, @@ -373,31 +334,25 @@ handle_cast({write, MsgId, Msg}, contiguous_top = ContiguousTop1, file_size = FileSize + TotalSize }), NextOffset = CurOffset + TotalSize, - noreply(maybe_roll_to_new_file( - NextOffset, - State #msstate { sum_valid_data = SumValid + TotalSize, - sum_file_size = SumFileSize + TotalSize })); - StoreEntry = #msg_location { ref_count = RefCount } -> - %% We already know about it, just update counter - ok = index_update(StoreEntry #msg_location { - ref_count = RefCount + 1 }, State), + noreply(maybe_compact(maybe_roll_to_new_file( + NextOffset, State #msstate + { sum_valid_data = SumValid + TotalSize, + sum_file_size = SumFileSize + TotalSize } + ))); + #msg_location { ref_count = RefCount } -> + %% We already know about it, just update counter. Only + %% update field otherwise bad interaction with concurrent GC + ok = index_update_fields(MsgId, + {#msg_location.ref_count, RefCount + 1}, + State), noreply(State) end; -handle_cast({remove, MsgIds}, State = #msstate { current_file = CurFile }) -> - {Files, State1} = - lists:foldl( - fun (MsgId, {Files1, State2}) -> - case remove_message(MsgId, State2) of - {compact, File, State3} -> - {if CurFile =:= File -> Files1; - true -> sets:add_element(File, Files1) - end, State3}; - {no_compact, State3} -> - {Files1, State3} - end - end, {sets:new(), State}, MsgIds), - noreply(compact(sets:to_list(Files), State1)); +handle_cast({remove, MsgIds}, State) -> + State1 = lists:foldl( + fun (MsgId, State2) -> remove_message(MsgId, State2) end, + State, MsgIds), + noreply(maybe_compact(State1)); handle_cast({release, MsgIds}, State) -> lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds), @@ -419,7 +374,27 @@ handle_cast({sync, MsgIds, K}, end; handle_cast(sync, State) -> - noreply(sync(State)). + noreply(sync(State)); + +handle_cast({gc_done, Reclaimed, Source, Dest}, + State = #msstate { sum_file_size = SumFileSize, + gc_active = {Source, Dest}, + file_summary = FileSummary }) -> + %% we always move data left, so Source has gone and was on the + %% right, so need to make dest = source.right.left, and also + %% dest.right = source.right + [#file_summary { left = Dest, right = SourceRight, locked = true }] = + ets:lookup(FileSummary, Source), + %% this could fail if SourceRight == undefined + ets:update_element(FileSummary, SourceRight, + {#file_summary.left, Dest}), + true = ets:update_element(FileSummary, Dest, + [{#file_summary.locked, false}, + {#file_summary.right, SourceRight}]), + true = ets:delete(FileSummary, Source), + noreply(run_pending( + State #msstate { sum_file_size = SumFileSize - Reclaimed, + gc_active = false })). handle_info(timeout, State) -> noreply(sync(State)); @@ -431,9 +406,13 @@ handle_info({file_handle_cache, maximum_eldest_since_use, Age}, State) -> handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. -terminate(_Reason, State = #msstate { msg_locations = MsgLocations, +terminate(_Reason, State = #msstate { index_state = IndexState, + index_module = IndexModule, file_summary = FileSummary, current_file_handle = FileHdl }) -> + %% stop the gc first, otherwise it could be working and we pull + %% out the ets tables from under it. + ok = rabbit_msg_store_gc:stop(), State1 = case FileHdl of undefined -> State; _ -> State2 = sync(State), @@ -441,15 +420,18 @@ terminate(_Reason, State = #msstate { msg_locations = MsgLocations, State2 end, State3 = close_all_handles(State1), - ets:delete(MsgLocations), ets:delete(FileSummary), - State3 #msstate { msg_locations = undefined, + IndexModule:terminate(IndexState), + State3 #msstate { index_state = undefined, file_summary = undefined, current_file_handle = undefined }. code_change(_OldVsn, State, _Extra) -> {ok, State}. +handle_pre_hibernate(State) -> + {hibernate, maybe_compact(State)}. + %%---------------------------------------------------------------------------- %% general helper functions %%---------------------------------------------------------------------------- @@ -463,11 +445,11 @@ reply(Reply, State) -> {reply, Reply, State1, Timeout}. next_state(State = #msstate { on_sync = [], sync_timer_ref = undefined }) -> - {State, infinity}; + {State, hibernate}; next_state(State = #msstate { sync_timer_ref = undefined }) -> {start_sync_timer(State), 0}; next_state(State = #msstate { on_sync = [] }) -> - {stop_sync_timer(State), infinity}; + {stop_sync_timer(State), hibernate}; next_state(State) -> {State, 0}. @@ -481,27 +463,12 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) -> {ok, cancel} = timer:cancel(TRef), State #msstate { sync_timer_ref = undefined }. -form_filename(Dir, Name) -> filename:join(Dir, Name). - -filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION. - filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)). sort_file_names(FileNames) -> lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end, FileNames). -preallocate(Hdl, FileSizeLimit, FinalPos) -> - {ok, FileSizeLimit} = file_handle_cache:position(Hdl, FileSizeLimit), - ok = file_handle_cache:truncate(Hdl), - {ok, FinalPos} = file_handle_cache:position(Hdl, FinalPos), - ok. - -truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) -> - {ok, Lowpoint} = file_handle_cache:position(FileHdl, Lowpoint), - ok = file_handle_cache:truncate(FileHdl), - ok = preallocate(FileHdl, Highpoint, Lowpoint). - sync(State = #msstate { current_file_handle = CurHdl, on_sync = Syncs }) -> State1 = stop_sync_timer(State), @@ -513,32 +480,135 @@ sync(State = #msstate { current_file_handle = CurHdl, State1 #msstate { on_sync = [] } end. +read_message(MsgId, From, State = + #msstate { current_file = CurFile, + current_file_handle = CurHdl, + file_summary = FileSummary }) -> + case index_lookup(MsgId, State) of + not_found -> gen_server2:reply(From, not_found), + State; + #msg_location { ref_count = RefCount, + file = File, + offset = Offset, + total_size = TotalSize } -> + case fetch_and_increment_cache(MsgId, State) of + not_found -> + [#file_summary { locked = Locked }] = + ets:lookup(FileSummary, File), + case Locked of + true -> + add_to_pending_gc_completion({read, MsgId, From}, + State); + false -> + ok = case CurFile =:= File andalso {ok, Offset} >= + file_handle_cache:current_raw_offset( + CurHdl) of + true -> file_handle_cache:flush(CurHdl); + false -> ok + end, + {Hdl, State1} = get_read_handle(File, State), + {ok, Offset} = + file_handle_cache:position(Hdl, Offset), + {ok, {MsgId, Msg}} = + case rabbit_msg_file:read(Hdl, TotalSize) of + {ok, {MsgId, _}} = Obj -> Obj; + Rest -> + throw({error, {misread, + [{old_state, State}, + {file_num, File}, + {offset, Offset}, + {read, Rest}, + {proc_dict, get()} + ]}}) + end, + ok = case RefCount > 1 of + true -> + insert_into_cache(MsgId, Msg, State1); + false -> + %% it's not in the cache and + %% we only have one reference + %% to the message. So don't + %% bother putting it in the + %% cache. + ok + end, + gen_server2:reply(From, {ok, Msg}), + State1 + end; + {Msg, _RefCount} -> + gen_server2:reply(From, {ok, Msg}), + State + end + end. + +contains_message(MsgId, From, State = #msstate { gc_active = GCActive }) -> + case index_lookup(MsgId, State) of + not_found -> + gen_server2:reply(From, false), + State; + #msg_location { file = File } -> + case GCActive of + {A, B} when File == A orelse File == B -> + add_to_pending_gc_completion( + {contains, MsgId, From}, State); + _ -> + gen_server2:reply(From, true), + State + end + end. + remove_message(MsgId, State = #msstate { file_summary = FileSummary, sum_valid_data = SumValid }) -> - StoreEntry = #msg_location { ref_count = RefCount, file = File, - offset = Offset, total_size = TotalSize } = + #msg_location { ref_count = RefCount, file = File, + offset = Offset, total_size = TotalSize } = index_lookup(MsgId, State), case RefCount of 1 -> - ok = index_delete(MsgId, State), ok = remove_cache_entry(MsgId, State), [FSEntry = #file_summary { valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop }] = + contiguous_top = ContiguousTop, + locked = Locked }] = ets:lookup(FileSummary, File), - ContiguousTop1 = lists:min([ContiguousTop, Offset]), - ValidTotalSize1 = ValidTotalSize - TotalSize, - true = ets:insert(FileSummary, FSEntry #file_summary { - valid_total_size = ValidTotalSize1, - contiguous_top = ContiguousTop1 }), - {compact, File, State #msstate { - sum_valid_data = SumValid - TotalSize }}; + case Locked of + true -> + add_to_pending_gc_completion({remove, MsgId}, State); + false -> + ok = index_delete(MsgId, State), + ContiguousTop1 = lists:min([ContiguousTop, Offset]), + ValidTotalSize1 = ValidTotalSize - TotalSize, + true = ets:insert( + FileSummary, FSEntry #file_summary { + valid_total_size = ValidTotalSize1, + contiguous_top = ContiguousTop1 }), + State1 = delete_file_if_empty(File, State), + State1 #msstate { sum_valid_data = SumValid - TotalSize } + end; _ when 1 < RefCount -> ok = decrement_cache(MsgId, State), - ok = index_update(StoreEntry #msg_location { - ref_count = RefCount - 1 }, State), - {no_compact, State} + %% only update field, otherwise bad interaction with concurrent GC + ok = index_update_fields(MsgId, + {#msg_location.ref_count, RefCount - 1}, + State), + State end. +add_to_pending_gc_completion( + Op, State = #msstate { pending_gc_completion = Pending }) -> + State #msstate { pending_gc_completion = [Op | Pending] }. + +run_pending(State = #msstate { pending_gc_completion = [] }) -> + State; +run_pending(State = #msstate { pending_gc_completion = Pending }) -> + State1 = State #msstate { pending_gc_completion = [] }, + lists:foldl(fun run_pending/2, State1, lists:reverse(Pending)). + +run_pending({read, MsgId, From}, State) -> + read_message(MsgId, From, State); +run_pending({contains, MsgId, From}, State) -> + contains_message(MsgId, From, State); +run_pending({remove, MsgId}, State) -> + remove_message(MsgId, State). + close_handle(Key, State = #msstate { file_handle_cache = FHC }) -> case dict:find(Key, FHC) of {ok, Hdl} -> @@ -556,19 +626,16 @@ close_all_handles(State = #msstate { file_handle_cache = FHC }) -> get_read_handle(FileNum, State = #msstate { file_handle_cache = FHC }) -> case dict:find(FileNum, FHC) of {ok, Hdl} -> {Hdl, State}; - error -> new_handle(FileNum, filenum_to_name(FileNum), + error -> new_handle(FileNum, + rabbit_msg_store_misc:filenum_to_name(FileNum), [read | ?BINARY_MODE], State) end. new_handle(Key, FileName, Mode, State = #msstate { file_handle_cache = FHC, dir = Dir }) -> - {ok, Hdl} = open_file(Dir, FileName, Mode), + {ok, Hdl} = rabbit_msg_store_misc:open_file(Dir, FileName, Mode), {Hdl, State #msstate { file_handle_cache = dict:store(Key, Hdl, FHC) }}. -open_file(Dir, FileName, Mode) -> - file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode, - [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]). - %%---------------------------------------------------------------------------- %% message cache helper functions %%---------------------------------------------------------------------------- @@ -607,28 +674,25 @@ insert_into_cache(MsgId, Msg, #msstate { message_cache = Cache }) -> %% index %%---------------------------------------------------------------------------- -index_lookup(Key, #msstate { msg_locations = MsgLocations }) -> - case ets:lookup(MsgLocations, Key) of - [] -> not_found; - [Entry] -> Entry - end. +index_lookup(Key, #msstate { index_module = Index, index_state = State }) -> + Index:lookup(Key, State). -index_insert(Obj, #msstate { msg_locations = MsgLocations }) -> - true = ets:insert_new(MsgLocations, Obj), - ok. +index_insert(Obj, #msstate { index_module = Index, index_state = State }) -> + Index:insert(Obj, State). -index_update(Obj, #msstate { msg_locations = MsgLocations }) -> - true = ets:insert(MsgLocations, Obj), - ok. +index_update(Obj, #msstate { index_module = Index, index_state = State }) -> + Index:update(Obj, State). -index_delete(Key, #msstate { msg_locations = MsgLocations }) -> - true = ets:delete(MsgLocations, Key), - ok. +index_update_fields(Key, Updates, + #msstate { index_module = Index, index_state = State }) -> + Index:update_fields(Key, Updates, State). -index_delete_by_file(File, #msstate { msg_locations = MsgLocations }) -> - MatchHead = #msg_location { file = File, _ = '_' }, - ets:select_delete(MsgLocations, [{MatchHead, [], [true]}]), - ok. +index_delete(Key, #msstate { index_module = Index, index_state = State }) -> + Index:delete(Key, State). + +index_delete_by_file(File, #msstate { index_module = Index, + index_state = State }) -> + Index:delete_by_file(File, State). %%---------------------------------------------------------------------------- %% recovery @@ -696,7 +760,7 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) -> %% consist only of valid messages. Plan: Truncate the main file %% back to before any of the files in the tmp file and copy %% them over again - TmpPath = form_filename(Dir, TmpFileName), + TmpPath = rabbit_msg_store_misc:form_filename(Dir, TmpFileName), case is_sublist(MsgIdsTmp, MsgIds) of true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file %% note this also catches the case when the tmp file @@ -728,8 +792,8 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) -> %% are in the tmp file true = is_disjoint(MsgIds1, MsgIdsTmp), %% must open with read flag, otherwise will stomp over contents - {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName, - [read | ?WRITE_MODE]), + {ok, MainHdl} = rabbit_msg_store_misc:open_file( + Dir, NonTmpRelatedFileName, [read | ?WRITE_MODE]), %% Wipe out any rubbish at the end of the file. Remember %% the head of the list will be the highest entry in the %% file. @@ -738,8 +802,10 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) -> %% Extend the main file as big as necessary in a single %% move. If we run out of disk space, this truncate could %% fail, but we still aren't risking losing data - ok = truncate_and_extend_file(MainHdl, Top, Top + TmpSize), - {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_AHEAD_MODE), + ok = rabbit_msg_store_misc:truncate_and_extend_file( + MainHdl, Top, Top + TmpSize), + {ok, TmpHdl} = rabbit_msg_store_misc:open_file( + Dir, TmpFileName, ?READ_AHEAD_MODE), {ok, TmpSize} = file_handle_cache:copy(TmpHdl, MainHdl, TmpSize), ok = file_handle_cache:close(MainHdl), ok = file_handle_cache:delete(TmpHdl), @@ -761,22 +827,10 @@ is_disjoint(SmallerL, BiggerL) -> lists:all(fun (Item) -> not lists:member(Item, BiggerL) end, SmallerL). scan_file_for_valid_messages_msg_ids(Dir, FileName) -> - {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, FileName), + {ok, Messages, _FileSize} = + rabbit_msg_store_misc:scan_file_for_valid_messages(Dir, FileName), {ok, Messages, [MsgId || {MsgId, _TotalSize, _FileOffset} <- Messages]}. -scan_file_for_valid_messages(Dir, FileName) -> - case open_file(Dir, FileName, ?READ_MODE) of - {ok, Hdl} -> - Valid = rabbit_msg_file:scan(Hdl), - %% if something really bad's happened, the close could fail, - %% but ignore - file_handle_cache:close(Hdl), - Valid; - {error, enoent} -> {ok, [], 0}; - {error, Reason} -> throw({error, - {unable_to_scan_file, FileName, Reason}}) - end. - %% Takes the list in *ascending* order (i.e. eldest message %% first). This is the opposite of what scan_file_for_valid_messages %% produces. The list of msgs that is produced is youngest first. @@ -794,25 +848,25 @@ find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) -> {ExpectedOffset, MsgIds}. build_index([], State) -> - build_index(undefined, [State #msstate.current_file], [], State); + build_index(undefined, [State #msstate.current_file], State); build_index(Files, State) -> - build_index(undefined, Files, [], State). + {Offset, State1} = build_index(undefined, Files, State), + {Offset, lists:foldl(fun delete_file_if_empty/2, State1, Files)}. -build_index(Left, [], FilesToCompact, State = - #msstate { file_summary = FileSummary }) -> +build_index(Left, [], State = #msstate { file_summary = FileSummary }) -> ok = index_delete_by_file(undefined, State), Offset = case ets:lookup(FileSummary, Left) of [] -> 0; [#file_summary { file_size = FileSize }] -> FileSize end, - {Offset, compact(FilesToCompact, %% this never includes the current file - State #msstate { current_file = Left })}; -build_index(Left, [File|Files], FilesToCompact, + {Offset, State #msstate { current_file = Left }}; +build_index(Left, [File|Files], State = #msstate { dir = Dir, file_summary = FileSummary, sum_valid_data = SumValid, sum_file_size = SumFileSize }) -> {ok, Messages, FileSize} = - scan_file_for_valid_messages(Dir, filenum_to_name(File)), + rabbit_msg_store_misc:scan_file_for_valid_messages( + Dir, rabbit_msg_store_misc:filenum_to_name(File)), {ValidMessages, ValidTotalSize} = lists:foldl( fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) -> @@ -845,14 +899,9 @@ build_index(Left, [File|Files], FilesToCompact, true = ets:insert_new(FileSummary, #file_summary { file = File, valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop, + contiguous_top = ContiguousTop, locked = false, left = Left, right = Right, file_size = FileSize1 }), - FilesToCompact1 = - case FileSize1 == ContiguousTop orelse Right =:= undefined of - true -> FilesToCompact; - false -> [File | FilesToCompact] - end, - build_index(File, Files, FilesToCompact1, + build_index(File, Files, State #msstate { sum_valid_data = SumValid + ValidTotalSize, sum_file_size = SumFileSize + FileSize1 }). @@ -862,235 +911,93 @@ build_index(Left, [File|Files], FilesToCompact, maybe_roll_to_new_file(Offset, State = #msstate { dir = Dir, - file_size_limit = FileSizeLimit, current_file_handle = CurHdl, current_file = CurFile, file_summary = FileSummary }) - when Offset >= FileSizeLimit -> + when Offset >= ?FILE_SIZE_LIMIT -> State1 = sync(State), ok = file_handle_cache:close(CurHdl), NextFile = CurFile + 1, - {ok, NextHdl} = open_file(Dir, filenum_to_name(NextFile), ?WRITE_MODE), - true = ets:update_element(FileSummary, CurFile, - {#file_summary.right, NextFile}), + {ok, NextHdl} = rabbit_msg_store_misc:open_file( + Dir, rabbit_msg_store_misc:filenum_to_name(NextFile), + ?WRITE_MODE), true = ets:insert_new( FileSummary, #file_summary { file = NextFile, valid_total_size = 0, contiguous_top = 0, - left = CurFile, right = undefined, file_size = 0 }), - State2 = State1 #msstate { current_file_handle = NextHdl, - current_file = NextFile }, - compact([CurFile], State2); + left = CurFile, right = undefined, file_size = 0, + locked = false }), + true = ets:update_element(FileSummary, CurFile, + {#file_summary.right, NextFile}), + State1 #msstate { current_file_handle = NextHdl, + current_file = NextFile }; maybe_roll_to_new_file(_, State) -> State. -compact(Files, State) -> - %% smallest number, hence eldest, hence left-most, first - SortedFiles = lists:sort(Files), - %% foldl reverses, so now youngest/right-most first - {RemainingFiles, State1} = - lists:foldl(fun (File, {Acc, State2}) -> - case delete_file_if_empty(File, State2) of - {true, State3} -> {Acc, State3}; - {false, State3} -> {[File | Acc], State3} - end - end, {[], State}, SortedFiles), - lists:foldl(fun combine_file/2, State1, lists:reverse(RemainingFiles)). - -%% At this stage, we simply know that the file has had msgs removed -%% from it. However, we don't know if we need to merge it left (which -%% is what we would prefer), or merge it right. If we merge left, then -%% this file is the source, and the left file is the destination. If -%% we merge right then this file is the destination and the right file -%% is the source. -combine_file(File, State = #msstate { file_summary = FileSummary, - current_file = CurFile }) -> - %% the file we're looking at may no longer exist as it may have - %% been deleted within the current GC run - case ets:lookup(FileSummary, File) of - [] -> State; - [FSEntry = #file_summary { left = Left, right = Right }] -> - GoRight = - fun() -> - case Right of - undefined -> State; - _ when not (CurFile == Right) -> - [FSRight] = ets:lookup(FileSummary, Right), - {_, State1} = adjust_meta_and_combine( - FSEntry, FSRight, State), - State1; - _ -> State - end - end, - case Left of - undefined -> - GoRight(); - _ -> [FSLeft] = ets:lookup(FileSummary, Left), - case adjust_meta_and_combine(FSLeft, FSEntry, State) of - {true, State1} -> State1; - {false, State} -> GoRight() - end - end - end. +maybe_compact(State = #msstate { sum_valid_data = SumValid, + sum_file_size = SumFileSize, + file_summary = FileSummary, + gc_active = false }) + when (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION -> + First = ets:first(FileSummary), + N = random_distributions:geometric(?GEOMETRIC_P), + case find_files_to_gc(FileSummary, N, First) of + undefined -> + State; + {Source, Dest} -> + State1 = close_handle(Source, close_handle(Dest, State)), + true = ets:update_element(FileSummary, Source, + {#file_summary.locked, true}), + true = ets:update_element(FileSummary, Dest, + {#file_summary.locked, true}), + ok = rabbit_msg_store_gc:gc(Source, Dest), + State1 #msstate { gc_active = {Source, Dest} } + end; +maybe_compact(State) -> + State. -adjust_meta_and_combine( - LeftObj = #file_summary { - file = LeftFile, valid_total_size = LeftValidData, right = RightFile, - file_size = LeftFileSize }, - RightObj = #file_summary { - file = RightFile, valid_total_size = RightValidData, left = LeftFile, - right = RightRight, file_size = RightFileSize }, - State = #msstate { file_size_limit = FileSizeLimit, - file_summary = FileSummary, - sum_file_size = SumFileSize }) -> - TotalValidData = LeftValidData + RightValidData, - if FileSizeLimit >= TotalValidData -> - State1 = combine_files(RightObj, LeftObj, State), - %% this could fail if RightRight is undefined - ets:update_element(FileSummary, RightRight, - {#file_summary.left, LeftFile}), - true = ets:insert(FileSummary, LeftObj #file_summary { - valid_total_size = TotalValidData, - contiguous_top = TotalValidData, - file_size = TotalValidData, - right = RightRight }), - true = ets:delete(FileSummary, RightFile), - {true, State1 #msstate { sum_file_size = - SumFileSize - LeftFileSize - RightFileSize - + TotalValidData }}; - true -> {false, State} +find_files_to_gc(_FileSummary, _N, '$end_of_table') -> + undefined; +find_files_to_gc(FileSummary, N, First) -> + [FirstObj = #file_summary { right = Right }] = + ets:lookup(FileSummary, First), + Pairs = + find_files_to_gc(FileSummary, N, FirstObj, + ets:lookup(FileSummary, Right), []), + case Pairs of + [] -> undefined; + [Pair] -> Pair; + _ -> M = 1 + (N rem length(Pairs)), + lists:nth(M, Pairs) end. -combine_files(#file_summary { file = Source, - valid_total_size = SourceValid, - left = Destination }, - #file_summary { file = Destination, - valid_total_size = DestinationValid, - contiguous_top = DestinationContiguousTop, - right = Source }, - State = #msstate { dir = Dir }) -> - State1 = close_handle(Source, close_handle(Destination, State)), - SourceName = filenum_to_name(Source), - DestinationName = filenum_to_name(Destination), - {ok, SourceHdl} = open_file(Dir, SourceName, ?READ_AHEAD_MODE), - {ok, DestinationHdl} = open_file(Dir, DestinationName, - ?READ_AHEAD_MODE ++ ?WRITE_MODE), - ExpectedSize = SourceValid + DestinationValid, - %% if DestinationValid =:= DestinationContiguousTop then we don't - %% need a tmp file - %% if they're not equal, then we need to write out everything past - %% the DestinationContiguousTop to a tmp file then truncate, - %% copy back in, and then copy over from Source - %% otherwise we just truncate straight away and copy over from Source - if DestinationContiguousTop =:= DestinationValid -> - ok = truncate_and_extend_file(DestinationHdl, - DestinationValid, ExpectedSize); - true -> - Worklist = - lists:dropwhile( - fun (#msg_location { offset = Offset }) - when Offset /= DestinationContiguousTop -> - %% it cannot be that Offset == - %% DestinationContiguousTop because if it - %% was then DestinationContiguousTop would - %% have been extended by TotalSize - Offset < DestinationContiguousTop - %% Given expected access patterns, I suspect - %% that the list should be naturally sorted - %% as we require, however, we need to - %% enforce it anyway - end, find_unremoved_messages_in_file(Destination, State1)), - Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, - {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE), - ok = copy_messages( - Worklist, DestinationContiguousTop, DestinationValid, - DestinationHdl, TmpHdl, Destination, State1), - TmpSize = DestinationValid - DestinationContiguousTop, - %% so now Tmp contains everything we need to salvage from - %% Destination, and MsgLocation has been updated to - %% reflect compaction of Destination so truncate - %% Destination and copy from Tmp back to the end - {ok, 0} = file_handle_cache:position(TmpHdl, 0), - ok = truncate_and_extend_file( - DestinationHdl, DestinationContiguousTop, ExpectedSize), - {ok, TmpSize} = - file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize), - %% position in DestinationHdl should now be DestinationValid - ok = file_handle_cache:sync(DestinationHdl), - ok = file_handle_cache:close(TmpHdl), - ok = file:delete(form_filename(Dir, Tmp)) - end, - SourceWorkList = find_unremoved_messages_in_file(Source, State1), - ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, - SourceHdl, DestinationHdl, Destination, State1), - %% tidy up - ok = file_handle_cache:close(SourceHdl), - ok = file_handle_cache:close(DestinationHdl), - ok = file:delete(form_filename(Dir, SourceName)), - State1. - -find_unremoved_messages_in_file(File, State = #msstate { dir = Dir }) -> - %% Msgs here will be end-of-file at start-of-list - {ok, Messages, _FileSize} = - scan_file_for_valid_messages(Dir, filenum_to_name(File)), - %% foldl will reverse so will end up with msgs in ascending offset order - lists:foldl( - fun ({MsgId, _TotalSize, _Offset}, Acc) -> - case index_lookup(MsgId, State) of - Entry = #msg_location { file = File } -> [ Entry | Acc ]; - _ -> Acc - end - end, [], Messages). - -copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, - Destination, State) -> - {FinalOffset, BlockStart1, BlockEnd1} = - lists:foldl( - fun (StoreEntry = #msg_location { offset = Offset, - total_size = TotalSize }, - {CurOffset, BlockStart, BlockEnd}) -> - %% CurOffset is in the DestinationFile. - %% Offset, BlockStart and BlockEnd are in the SourceFile - %% update MsgLocation to reflect change of file and offset - ok = index_update(StoreEntry #msg_location { - file = Destination, - offset = CurOffset }, State), - NextOffset = CurOffset + TotalSize, - if BlockStart =:= undefined -> - %% base case, called only for the first list elem - {NextOffset, Offset, Offset + TotalSize}; - Offset =:= BlockEnd -> - %% extend the current block because the next - %% msg follows straight on - {NextOffset, BlockStart, BlockEnd + TotalSize}; - true -> - %% found a gap, so actually do the work for - %% the previous block - BSize = BlockEnd - BlockStart, - {ok, BlockStart} = - file_handle_cache:position(SourceHdl, BlockStart), - {ok, BSize} = file_handle_cache:copy( - SourceHdl, DestinationHdl, BSize), - {NextOffset, Offset, Offset + TotalSize} - end - end, {InitOffset, undefined, undefined}, WorkList), - case WorkList of - [] -> - ok; - _ -> - %% do the last remaining block - BSize1 = BlockEnd1 - BlockStart1, - {ok, BlockStart1} = - file_handle_cache:position(SourceHdl, BlockStart1), - {ok, BSize1} = - file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1), - ok = file_handle_cache:sync(DestinationHdl) - end, - ok. +find_files_to_gc(_FileSummary, _N, #file_summary {}, [], Pairs) -> + lists:reverse(Pairs); +find_files_to_gc(FileSummary, N, + #file_summary { right = Source, file = Dest, + valid_total_size = DestValid }, + [SourceObj = #file_summary { left = Dest, right = SourceRight, + valid_total_size = SourceValid, + file = Source }], + Pairs) when DestValid + SourceValid =< ?FILE_SIZE_LIMIT andalso + not is_atom(SourceRight) -> + Pair = {Source, Dest}, + case N == 1 of + true -> [Pair]; + false -> find_files_to_gc(FileSummary, (N - 1), SourceObj, + ets:lookup(FileSummary, SourceRight), + [Pair | Pairs]) + end; +find_files_to_gc(FileSummary, N, _Left, + [Right = #file_summary { right = RightRight }], Pairs) -> + find_files_to_gc(FileSummary, N, Right, + ets:lookup(FileSummary, RightRight), Pairs). +delete_file_if_empty(File, State = #msstate { current_file = File }) -> + State; delete_file_if_empty(File, #msstate { dir = Dir, file_summary = FileSummary, sum_file_size = SumFileSize } = State) -> [#file_summary { valid_total_size = ValidData, file_size = FileSize, - left = Left, right = Right }] = + left = Left, right = Right, locked = false }] = ets:lookup(FileSummary, File), case ValidData of %% we should NEVER find the current file in here hence right @@ -1101,16 +1008,17 @@ delete_file_if_empty(File, #msstate { dir = Dir, file_summary = FileSummary, true = ets:update_element( FileSummary, Right, {#file_summary.left, undefined}); - {_, _} when not (is_atom(Right)) -> + {_, _} when not is_atom(Right) -> true = ets:update_element(FileSummary, Right, {#file_summary.left, Left}), - true = - ets:update_element(FileSummary, Left, - {#file_summary.right, Right}) + true = ets:update_element(FileSummary, Left, + {#file_summary.right, Right}) end, true = ets:delete(FileSummary, File), State1 = close_handle(File, State), - ok = file:delete(form_filename(Dir, filenum_to_name(File))), - {true, State1 #msstate { sum_file_size = SumFileSize - FileSize }}; - _ -> {false, State} + ok = file:delete(rabbit_msg_store_misc:form_filename( + Dir, + rabbit_msg_store_misc:filenum_to_name(File))), + State1 #msstate { sum_file_size = SumFileSize - FileSize }; + _ -> State end. diff --git a/src/rabbit_msg_store_ets_index.erl b/src/rabbit_msg_store_ets_index.erl new file mode 100644 index 0000000000..cb13ed8690 --- /dev/null +++ b/src/rabbit_msg_store_ets_index.erl @@ -0,0 +1,71 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_msg_store_ets_index). +-export([init/0, lookup/2, insert/2, update/2, update_fields/3, delete/2, + delete_by_file/2, terminate/1]). + +-define(MSG_LOC_NAME, rabbit_msg_store_ets_index). + +-include("rabbit_msg_store.hrl"). + +init() -> + ets:new(?MSG_LOC_NAME, [set, public, {keypos, #msg_location.msg_id}]). + +lookup(Key, MsgLocations) -> + case ets:lookup(MsgLocations, Key) of + [] -> not_found; + [Entry] -> Entry + end. + +insert(Obj, MsgLocations) -> + true = ets:insert_new(MsgLocations, Obj), + ok. + +update(Obj, MsgLocations) -> + true = ets:insert(MsgLocations, Obj), + ok. + +update_fields(Key, Updates, MsgLocations) -> + true = ets:update_element(MsgLocations, Key, Updates), + ok. + +delete(Key, MsgLocations) -> + true = ets:delete(MsgLocations, Key), + ok. + +delete_by_file(File, MsgLocations) -> + MatchHead = #msg_location { file = File, _ = '_' }, + ets:select_delete(MsgLocations, [{MatchHead, [], [true]}]), + ok. + +terminate(MsgLocations) -> + ets:delete(MsgLocations). diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl new file mode 100644 index 0000000000..729cd28715 --- /dev/null +++ b/src/rabbit_msg_store_gc.erl @@ -0,0 +1,249 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_msg_store_gc). + +-behaviour(gen_server2). + +-export([start_link/4, gc/2, stop/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-record(gcstate, + {dir, + index_state, + file_summary, + index_module + }). + +-include("rabbit_msg_store.hrl"). + +-define(SERVER, ?MODULE). + +%%---------------------------------------------------------------------------- + +start_link(Dir, IndexState, FileSummary, IndexModule) -> + gen_server2:start_link({local, ?SERVER}, ?MODULE, + [Dir, IndexState, FileSummary, IndexModule], + [{timeout, infinity}]). + +gc(Source, Destination) -> + gen_server2:cast(?SERVER, {gc, Source, Destination}). + +stop() -> + gen_server2:call(?SERVER, stop). + +%%---------------------------------------------------------------------------- + +init([Dir, IndexState, FileSummary, IndexModule]) -> + {ok, #gcstate { dir = Dir, index_state = IndexState, + file_summary = FileSummary, index_module = IndexModule }, + hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call(stop, _From, State) -> + {stop, normal, ok, State}. + +handle_cast({gc, Source, Destination}, State) -> + Reclaimed = adjust_meta_and_combine(Source, Destination, State), + ok = rabbit_msg_store:gc_done(Reclaimed, Source, Destination), + {noreply, State, hibernate}. + +handle_info(Info, State) -> + {stop, {unhandled_info, Info}, State}. + +terminate(_Reason, State) -> + State. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------------- + +adjust_meta_and_combine(SourceFile, DestFile, + State = #gcstate { file_summary = FileSummary }) -> + + [SourceObj = #file_summary { + valid_total_size = SourceValidData, left = DestFile, + file_size = SourceFileSize, locked = true }] = + ets:lookup(FileSummary, SourceFile), + [DestObj = #file_summary { + valid_total_size = DestValidData, right = SourceFile, + file_size = DestFileSize, locked = true }] = + ets:lookup(FileSummary, DestFile), + + TotalValidData = DestValidData + SourceValidData, + ok = combine_files(SourceObj, DestObj, State), + %% don't update dest.right, because it could be changing at the same time + true = + ets:update_element(FileSummary, DestFile, + [{#file_summary.valid_total_size, TotalValidData}, + {#file_summary.contiguous_top, TotalValidData}, + {#file_summary.file_size, TotalValidData}]), + SourceFileSize + DestFileSize - TotalValidData. + +combine_files(#file_summary { file = Source, + valid_total_size = SourceValid, + left = Destination }, + #file_summary { file = Destination, + valid_total_size = DestinationValid, + contiguous_top = DestinationContiguousTop, + right = Source }, + State = #gcstate { dir = Dir }) -> + SourceName = rabbit_msg_store_misc:filenum_to_name(Source), + DestinationName = rabbit_msg_store_misc:filenum_to_name(Destination), + {ok, SourceHdl} = + rabbit_msg_store_misc:open_file(Dir, SourceName, ?READ_AHEAD_MODE), + {ok, DestinationHdl} = + rabbit_msg_store_misc:open_file(Dir, DestinationName, + ?READ_AHEAD_MODE ++ ?WRITE_MODE), + ExpectedSize = SourceValid + DestinationValid, + %% if DestinationValid =:= DestinationContiguousTop then we don't + %% need a tmp file + %% if they're not equal, then we need to write out everything past + %% the DestinationContiguousTop to a tmp file then truncate, + %% copy back in, and then copy over from Source + %% otherwise we just truncate straight away and copy over from Source + if DestinationContiguousTop =:= DestinationValid -> + ok = rabbit_msg_store_misc:truncate_and_extend_file( + DestinationHdl, DestinationValid, ExpectedSize); + true -> + Worklist = + lists:dropwhile( + fun (#msg_location { offset = Offset }) + when Offset /= DestinationContiguousTop -> + %% it cannot be that Offset == + %% DestinationContiguousTop because if it + %% was then DestinationContiguousTop would + %% have been extended by TotalSize + Offset < DestinationContiguousTop + %% Given expected access patterns, I suspect + %% that the list should be naturally sorted + %% as we require, however, we need to + %% enforce it anyway + end, + find_unremoved_messages_in_file(Destination, State)), + Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP, + {ok, TmpHdl} = rabbit_msg_store_misc:open_file( + Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE), + ok = copy_messages( + Worklist, DestinationContiguousTop, DestinationValid, + DestinationHdl, TmpHdl, Destination, State), + TmpSize = DestinationValid - DestinationContiguousTop, + %% so now Tmp contains everything we need to salvage from + %% Destination, and index_state has been updated to + %% reflect the compaction of Destination so truncate + %% Destination and copy from Tmp back to the end + {ok, 0} = file_handle_cache:position(TmpHdl, 0), + ok = rabbit_msg_store_misc:truncate_and_extend_file( + DestinationHdl, DestinationContiguousTop, ExpectedSize), + {ok, TmpSize} = + file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize), + %% position in DestinationHdl should now be DestinationValid + ok = file_handle_cache:sync(DestinationHdl), + ok = file_handle_cache:close(TmpHdl), + ok = file:delete(rabbit_msg_store_misc:form_filename(Dir, Tmp)) + end, + SourceWorkList = find_unremoved_messages_in_file(Source, State), + ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize, + SourceHdl, DestinationHdl, Destination, State), + %% tidy up + ok = file_handle_cache:close(SourceHdl), + ok = file_handle_cache:close(DestinationHdl), + ok = file:delete(rabbit_msg_store_misc:form_filename(Dir, SourceName)), + ok. + +find_unremoved_messages_in_file(File, #gcstate { dir = Dir, + index_state = IndexState, + index_module = Index }) -> + %% Msgs here will be end-of-file at start-of-list + {ok, Messages, _FileSize} = + rabbit_msg_store_misc:scan_file_for_valid_messages( + Dir, rabbit_msg_store_misc:filenum_to_name(File)), + %% foldl will reverse so will end up with msgs in ascending offset order + lists:foldl( + fun ({MsgId, _TotalSize, _Offset}, Acc) -> + case Index:lookup(MsgId, IndexState) of + Entry = #msg_location { file = File } -> [ Entry | Acc ]; + _ -> Acc + end + end, [], Messages). + +copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, + Destination, #gcstate { index_module = Index, + index_state = IndexState }) -> + {FinalOffset, BlockStart1, BlockEnd1} = + lists:foldl( + fun (#msg_location { msg_id = MsgId, offset = Offset, + total_size = TotalSize }, + {CurOffset, BlockStart, BlockEnd}) -> + %% CurOffset is in the DestinationFile. + %% Offset, BlockStart and BlockEnd are in the SourceFile + %% update MsgLocation to reflect change of file and offset + ok = Index:update_fields(MsgId, + [{#msg_location.file, Destination}, + {#msg_location.offset, CurOffset}], + IndexState), + {BlockStart2, BlockEnd2} = + if BlockStart =:= undefined -> + %% base case, called only for the first list elem + {Offset, Offset + TotalSize}; + Offset =:= BlockEnd -> + %% extend the current block because the + %% next msg follows straight on + {BlockStart, BlockEnd + TotalSize}; + true -> + %% found a gap, so actually do the work + %% for the previous block + BSize = BlockEnd - BlockStart, + {ok, BlockStart} = + file_handle_cache:position(SourceHdl, + BlockStart), + {ok, BSize} = file_handle_cache:copy( + SourceHdl, DestinationHdl, BSize), + {Offset, Offset + TotalSize} + end, + {CurOffset + TotalSize, BlockStart2, BlockEnd2} + end, {InitOffset, undefined, undefined}, WorkList), + case WorkList of + [] -> + ok; + _ -> + %% do the last remaining block + BSize1 = BlockEnd1 - BlockStart1, + {ok, BlockStart1} = + file_handle_cache:position(SourceHdl, BlockStart1), + {ok, BSize1} = + file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1), + ok = file_handle_cache:sync(DestinationHdl) + end, + ok. diff --git a/src/rabbit_msg_store_misc.erl b/src/rabbit_msg_store_misc.erl new file mode 100644 index 0000000000..cf76cf21f9 --- /dev/null +++ b/src/rabbit_msg_store_misc.erl @@ -0,0 +1,74 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_msg_store_misc). + +-export([open_file/3, preallocate/3, truncate_and_extend_file/3, + form_filename/2, filenum_to_name/1, scan_file_for_valid_messages/2]). + +-include("rabbit_msg_store.hrl"). + + +%%---------------------------------------------------------------------------- + +open_file(Dir, FileName, Mode) -> + file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode, + [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]). + +%%---------------------------------------------------------------------------- + +preallocate(Hdl, FileSizeLimit, FinalPos) -> + {ok, FileSizeLimit} = file_handle_cache:position(Hdl, FileSizeLimit), + ok = file_handle_cache:truncate(Hdl), + {ok, FinalPos} = file_handle_cache:position(Hdl, FinalPos), + ok. + +truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) -> + {ok, Lowpoint} = file_handle_cache:position(FileHdl, Lowpoint), + ok = file_handle_cache:truncate(FileHdl), + ok = preallocate(FileHdl, Highpoint, Lowpoint). + +form_filename(Dir, Name) -> filename:join(Dir, Name). + +filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION. + +scan_file_for_valid_messages(Dir, FileName) -> + case open_file(Dir, FileName, ?READ_MODE) of + {ok, Hdl} -> + Valid = rabbit_msg_file:scan(Hdl), + %% if something really bad's happened, the close could fail, + %% but ignore + file_handle_cache:close(Hdl), + Valid; + {error, enoent} -> {ok, [], 0}; + {error, Reason} -> throw({error, + {unable_to_scan_file, FileName, Reason}}) + end. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 7187e322dd..f5d7978cae 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -921,7 +921,7 @@ msg_store_write(MsgIds) -> ok = lists:foldl( fun (MsgId, ok) -> rabbit_msg_store:write(MsgId, MsgId) end, ok, MsgIds). - + test_msg_store() -> stop_msg_store(), ok = start_msg_store_empty(), @@ -1016,16 +1016,23 @@ test_msg_store() -> fun (MsgId, ok) -> rabbit_msg_store:write(msg_id_bin(MsgId), Payload) end, ok, MsgIdsBig), - %% .., then remove even numbers ascending, and odd numbers - %% descending. This hits the GC. + %% .., then 3s by 1... ok = lists:foldl( fun (MsgId, ok) -> - rabbit_msg_store:remove([msg_id_bin( - case MsgId rem 2 of - 0 -> MsgId; - 1 -> BigCount - MsgId - end)]) - end, ok, MsgIdsBig), + rabbit_msg_store:remove([msg_id_bin(MsgId)]) + end, ok, lists:seq(BigCount, 1, -3)), + %% .., then remove 3s by 2, from the young end first. This hits + %% GC (under 50% good data left, but no empty files. Must GC). + ok = lists:foldl( + fun (MsgId, ok) -> + rabbit_msg_store:remove([msg_id_bin(MsgId)]) + end, ok, lists:seq(BigCount-1, 1, -3)), + %% .., then remove 3s by 3, from the young end first. This hits + %% GC... + ok = lists:foldl( + fun (MsgId, ok) -> + rabbit_msg_store:remove([msg_id_bin(MsgId)]) + end, ok, lists:seq(BigCount-2, 1, -3)), %% ensure empty false = msg_store_contains(false, [msg_id_bin(M) || M <- MsgIdsBig]), %% restart empty diff --git a/src/random_distributions.erl b/src/random_distributions.erl new file mode 100644 index 0000000000..dfcdc834c5 --- /dev/null +++ b/src/random_distributions.erl @@ -0,0 +1,38 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(random_distributions). + +-export([geometric/1]). + +geometric(P) when 0.0 < P andalso P < 1.0 -> + U = 1.0 - random:uniform(), + rabbit_misc:ceil(math:log(U) / math:log(1.0 - P)). |
