summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-12-20 19:04:58 +0000
committerMatthew Sackman <matthew@lshift.net>2009-12-20 19:04:58 +0000
commit95f466aa10e9ae29bf0d8b6b3f81846bb55bfbd0 (patch)
tree54d9fe9d64ea6befe46051e0f7b3de145ca1f242
parent1e2f77364d773a726d775b1a5b76cd599e32e6af (diff)
parent57baa1db68397e308c57a738a29ec136065eee8c (diff)
downloadrabbitmq-server-git-95f466aa10e9ae29bf0d8b6b3f81846bb55bfbd0.tar.gz
merged bug 22161 into bug 21673. Lazy, concurrent msg_store GC landed.
-rw-r--r--include/rabbit_msg_store.hrl51
-rw-r--r--src/gen_server2.erl13
-rw-r--r--src/rabbit.erl10
-rw-r--r--src/rabbit_misc.erl6
-rw-r--r--src/rabbit_msg_store.erl740
-rw-r--r--src/rabbit_msg_store_ets_index.erl71
-rw-r--r--src/rabbit_msg_store_gc.erl249
-rw-r--r--src/rabbit_msg_store_misc.erl74
-rw-r--r--src/rabbit_tests.erl25
-rw-r--r--src/random_distributions.erl38
10 files changed, 842 insertions, 435 deletions
diff --git a/include/rabbit_msg_store.hrl b/include/rabbit_msg_store.hrl
new file mode 100644
index 0000000000..925d5d8e71
--- /dev/null
+++ b/include/rabbit_msg_store.hrl
@@ -0,0 +1,51 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-record(msg_location,
+ {msg_id, ref_count, file, offset, total_size}).
+
+-record(file_summary,
+ {file, valid_total_size, contiguous_top, left, right, file_size,
+ locked}).
+
+-define(BINARY_MODE, [raw, binary]).
+-define(READ_MODE, [read]).
+-define(READ_AHEAD_MODE, [read_ahead | ?READ_MODE]).
+-define(WRITE_MODE, [write]).
+
+-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
+-define(FILE_EXTENSION, ".rdq").
+-define(FILE_EXTENSION_TMP, ".rdt").
+
+-define(FILE_SIZE_LIMIT, (16*1024*1024)).
+
+-define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 53edf8deef..c48061518d 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -25,8 +25,11 @@
%% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be
%% called immediately prior to and post hibernation, respectively. If
%% handle_pre_hibernate returns {hibernate, NewState} then the process
-%% will hibernate. If the module does not implement
-%% handle_pre_hibernate/1 then the default action is to hibernate.
+%% will hibernate. If handle_pre_hibernate returns {insomniate,
+%% NewState} then the process will go around again, trying to receive
+%% for up to the current timeout value before attempting to hibernate
+%% again. If the module does not implement handle_pre_hibernate/1 then
+%% the default action is to hibernate.
%%
%% 6) init can return a 4th arg, {backoff, InitialTimeout,
%% MinimumTimeout, DesiredHibernatePeriod} (all in
@@ -36,7 +39,7 @@
%% InitialTimeout supplied from init). After this timeout has
%% occurred, hibernation will occur as normal. Upon awaking, a new
%% current timeout value will be calculated.
-%%
+%%
%% The purpose is that the gen_server2 takes care of adjusting the
%% current timeout value such that the process will increase the
%% timeout value repeatedly if it is unable to sleep for the
@@ -126,6 +129,7 @@
%%% handle_pre_hibernate(State)
%%%
%%% ==> {hibernate, State}
+%%% {insomniate, State}
%%% {stop, Reason, State}
%%% Reason = normal | shutdown | Term, terminate(State) is called
%%%
@@ -545,6 +549,9 @@ pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
{hibernate, NState} ->
hibernate(Parent, Name, NState, Mod, TimeoutState, Queue,
Debug);
+ {insomniate, NState} ->
+ process_next_msg(Parent, Name, NState, Mod, hibernate,
+ TimeoutState, Queue, Debug);
Reply ->
handle_common_termination(Reply, Name, pre_hibernate,
Mod, State, Debug)
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 2aa58fc02a..fe1be7c292 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -150,12 +150,11 @@ start(normal, []) ->
start_child(vm_memory_monitor, [MemoryWatermark])
end,
- ok = rabbit_amqqueue:start(),
+ ok = start_child(rabbit_memory_monitor),
+ ok = start_child(rabbit_guid),
ok = start_child(rabbit_router),
- ok = start_child(rabbit_guid),
- ok = start_child(rabbit_node_monitor),
- ok = start_child(rabbit_memory_monitor)
+ ok = start_child(rabbit_node_monitor)
end},
{"recovery",
fun () ->
@@ -163,6 +162,9 @@ start(normal, []) ->
ok = rabbit_exchange:recover(),
DurableQueues = rabbit_amqqueue:find_durable_queues(),
ok = rabbit_queue_index:start_msg_store(DurableQueues),
+
+ ok = rabbit_amqqueue:start(),
+
{ok, _RealDurableQueues} = rabbit_amqqueue:recover(DurableQueues)
%% TODO - RealDurableQueues is a subset of
%% DurableQueues. It may have queues removed which
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 21764fce6d..23666a5f3d 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -485,7 +485,7 @@ unfold(Fun, Acc, Init) ->
ceil(N) ->
T = trunc(N),
- case N - T of
- 0 -> N;
- _ -> 1 + T
+ case N == T of
+ true -> T;
+ false -> 1 + T
end.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index f139fc4581..c060c8d4c6 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -36,16 +36,16 @@
-export([start_link/3, write/2, read/1, contains/1, remove/1, release/1,
sync/2]).
--export([sync/0]). %% internal
+-export([sync/0, gc_done/3]). %% internal
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
+ terminate/2, code_change/3, handle_pre_hibernate/1]).
-define(SERVER, ?MODULE).
--define(FILE_SIZE_LIMIT, (16*1024*1024)).
-define(SYNC_INTERVAL, 5). %% milliseconds
--define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB
+
+-define(GEOMETRIC_P, 0.3). %% parameter to geometric distribution rng
%%----------------------------------------------------------------------------
@@ -54,6 +54,7 @@
-type(msg_id() :: binary()).
-type(msg() :: any()).
-type(file_path() :: any()).
+-type(file_num() :: non_neg_integer()).
-spec(start_link/3 ::
(file_path(),
@@ -65,6 +66,7 @@
-spec(remove/1 :: ([msg_id()]) -> 'ok').
-spec(release/1 :: ([msg_id()]) -> 'ok').
-spec(sync/2 :: ([msg_id()], fun (() -> any())) -> 'ok').
+-spec(gc_done/3 :: (non_neg_integer(), file_num(), file_num()) -> 'ok').
-endif.
@@ -72,36 +74,29 @@
-record(msstate,
{dir, %% store directory
- msg_locations, %% where are messages?
+ index_module, %% the module for index ops
+ index_state, %% where are messages?
file_summary, %% what's in the files?
current_file, %% current file name as number
current_file_handle, %% current file handle
%% since the last fsync?
- file_size_limit, %% how big can our files get?
file_handle_cache, %% file handle cache
on_sync, %% pending sync requests
sync_timer_ref, %% TRef for our interval timer
message_cache, %% ets message cache
sum_valid_data, %% sum of valid data in all files
- sum_file_size %% sum of file sizes
- }).
-
--record(msg_location,
- {msg_id, ref_count, file, offset, total_size}).
-
--record(file_summary,
- {file, valid_total_size, contiguous_top, left, right, file_size}).
+ sum_file_size, %% sum of file sizes
+ pending_gc_completion, %% things to do once GC completes
+ gc_active %% is the GC currently working?
+ }).
--define(MSG_LOC_NAME, rabbit_disk_queue_msg_location).
--define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary).
--define(FILE_EXTENSION, ".rdq").
--define(FILE_EXTENSION_TMP, ".rdt").
--define(CACHE_ETS_NAME, rabbit_disk_queue_cache).
+-include("rabbit_msg_store.hrl").
--define(BINARY_MODE, [raw, binary]).
--define(READ_MODE, [read]).
--define(READ_AHEAD_MODE, [read_ahead | ?READ_MODE]).
--define(WRITE_MODE, [write]).
+-define(FILE_SUMMARY_ETS_NAME, rabbit_msg_store_file_summary).
+-define(CACHE_ETS_NAME, rabbit_msg_store_cache).
+%% We run GC whenever (garbage / sum_file_size) > ?GARBAGE_FRACTION
+%% It is not recommended to set this to < 0.5
+-define(GARBAGE_FRACTION, 0.5).
%% The components:
%%
@@ -237,6 +232,8 @@ remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}).
release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}).
sync(MsgIds, K) -> gen_server2:cast(?SERVER, {sync, MsgIds, K}).
sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal
+gc_done(Reclaimed, Source, Destination) ->
+ gen_server2:pcast(?SERVER, 9, {gc_done, Reclaimed, Source, Destination}).
%%----------------------------------------------------------------------------
%% gen_server callbacks
@@ -247,26 +244,29 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
- MsgLocations = ets:new(?MSG_LOC_NAME,
- [set, private, {keypos, #msg_location.msg_id}]),
+ IndexModule = rabbit_msg_store_ets_index,
+ IndexState = IndexModule:init(),
InitFile = 0,
FileSummary = ets:new(?FILE_SUMMARY_ETS_NAME,
- [set, private, {keypos, #file_summary.file}]),
+ [ordered_set, public,
+ {keypos, #file_summary.file}]),
MessageCache = ets:new(?CACHE_ETS_NAME, [set, private]),
State =
#msstate { dir = Dir,
- msg_locations = MsgLocations,
+ index_module = IndexModule,
+ index_state = IndexState,
file_summary = FileSummary,
current_file = InitFile,
current_file_handle = undefined,
- file_size_limit = ?FILE_SIZE_LIMIT,
file_handle_cache = dict:new(),
on_sync = [],
sync_timer_ref = undefined,
message_cache = MessageCache,
sum_valid_data = 0,
- sum_file_size = 0
+ sum_file_size = 0,
+ pending_gc_completion = [],
+ gc_active = false
},
ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State),
@@ -282,65 +282,25 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
build_index(Files, State),
%% read is only needed so that we can seek
- {ok, FileHdl} = open_file(Dir, filenum_to_name(CurFile),
- [read | ?WRITE_MODE]),
+ {ok, FileHdl} = rabbit_msg_store_misc:open_file(
+ Dir, rabbit_msg_store_misc:filenum_to_name(CurFile),
+ [read | ?WRITE_MODE]),
{ok, Offset} = file_handle_cache:position(FileHdl, Offset),
ok = file_handle_cache:truncate(FileHdl),
- {ok, State1 #msstate { current_file_handle = FileHdl }}.
-
-handle_call({read, MsgId}, _From, State =
- #msstate { current_file = CurFile,
- current_file_handle = CurHdl }) ->
- {Result, State1} =
- case index_lookup(MsgId, State) of
- not_found -> {not_found, State};
- #msg_location { ref_count = RefCount,
- file = File,
- offset = Offset,
- total_size = TotalSize } ->
- case fetch_and_increment_cache(MsgId, State) of
- not_found ->
- ok = case CurFile =:= File andalso {ok, Offset} >=
- file_handle_cache:current_raw_offset(CurHdl) of
- true -> file_handle_cache:flush(CurHdl);
- false -> ok
- end,
- {Hdl, State2} = get_read_handle(File, State),
- {ok, Offset} = file_handle_cache:position(Hdl, Offset),
- {ok, {MsgId, Msg}} =
- case rabbit_msg_file:read(Hdl, TotalSize) of
- {ok, {MsgId, _}} = Obj -> Obj;
- Rest ->
- throw({error, {misread, [{old_state, State},
- {file_num, File},
- {offset, Offset},
- {read, Rest},
- {proc_dict, get()}
- ]}})
- end,
- ok = case RefCount > 1 of
- true ->
- insert_into_cache(MsgId, Msg, State2);
- false ->
- %% it's not in the cache and we
- %% only have one reference to the
- %% message. So don't bother
- %% putting it in the cache.
- ok
- end,
- {{ok, Msg}, State2};
- {Msg, _RefCount} ->
- {{ok, Msg}, State}
- end
- end,
- reply(Result, State1);
+ {ok, _Pid} = rabbit_msg_store_gc:start_link(
+ Dir, IndexState, FileSummary, IndexModule),
+
+ {ok, State1 #msstate { current_file_handle = FileHdl }, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-handle_call({contains, MsgId}, _From, State) ->
- reply(case index_lookup(MsgId, State) of
- not_found -> false;
- #msg_location {} -> true
- end, State).
+handle_call({read, MsgId}, From, State) ->
+ State1 = read_message(MsgId, From, State),
+ noreply(State1);
+
+handle_call({contains, MsgId}, From, State) ->
+ State1 = contains_message(MsgId, From, State),
+ noreply(State1).
handle_cast({write, MsgId, Msg},
State = #msstate { current_file_handle = CurHdl,
@@ -360,6 +320,7 @@ handle_cast({write, MsgId, Msg},
[FSEntry = #file_summary { valid_total_size = ValidTotalSize,
contiguous_top = ContiguousTop,
right = undefined,
+ locked = false,
file_size = FileSize }] =
ets:lookup(FileSummary, CurFile),
ValidTotalSize1 = ValidTotalSize + TotalSize,
@@ -373,31 +334,25 @@ handle_cast({write, MsgId, Msg},
contiguous_top = ContiguousTop1,
file_size = FileSize + TotalSize }),
NextOffset = CurOffset + TotalSize,
- noreply(maybe_roll_to_new_file(
- NextOffset,
- State #msstate { sum_valid_data = SumValid + TotalSize,
- sum_file_size = SumFileSize + TotalSize }));
- StoreEntry = #msg_location { ref_count = RefCount } ->
- %% We already know about it, just update counter
- ok = index_update(StoreEntry #msg_location {
- ref_count = RefCount + 1 }, State),
+ noreply(maybe_compact(maybe_roll_to_new_file(
+ NextOffset, State #msstate
+ { sum_valid_data = SumValid + TotalSize,
+ sum_file_size = SumFileSize + TotalSize }
+ )));
+ #msg_location { ref_count = RefCount } ->
+ %% We already know about it, just update counter. Only
+ %% update field otherwise bad interaction with concurrent GC
+ ok = index_update_fields(MsgId,
+ {#msg_location.ref_count, RefCount + 1},
+ State),
noreply(State)
end;
-handle_cast({remove, MsgIds}, State = #msstate { current_file = CurFile }) ->
- {Files, State1} =
- lists:foldl(
- fun (MsgId, {Files1, State2}) ->
- case remove_message(MsgId, State2) of
- {compact, File, State3} ->
- {if CurFile =:= File -> Files1;
- true -> sets:add_element(File, Files1)
- end, State3};
- {no_compact, State3} ->
- {Files1, State3}
- end
- end, {sets:new(), State}, MsgIds),
- noreply(compact(sets:to_list(Files), State1));
+handle_cast({remove, MsgIds}, State) ->
+ State1 = lists:foldl(
+ fun (MsgId, State2) -> remove_message(MsgId, State2) end,
+ State, MsgIds),
+ noreply(maybe_compact(State1));
handle_cast({release, MsgIds}, State) ->
lists:foreach(fun (MsgId) -> decrement_cache(MsgId, State) end, MsgIds),
@@ -419,7 +374,27 @@ handle_cast({sync, MsgIds, K},
end;
handle_cast(sync, State) ->
- noreply(sync(State)).
+ noreply(sync(State));
+
+handle_cast({gc_done, Reclaimed, Source, Dest},
+ State = #msstate { sum_file_size = SumFileSize,
+ gc_active = {Source, Dest},
+ file_summary = FileSummary }) ->
+ %% we always move data left, so Source has gone and was on the
+ %% right, so need to make dest = source.right.left, and also
+ %% dest.right = source.right
+ [#file_summary { left = Dest, right = SourceRight, locked = true }] =
+ ets:lookup(FileSummary, Source),
+ %% this could fail if SourceRight == undefined
+ ets:update_element(FileSummary, SourceRight,
+ {#file_summary.left, Dest}),
+ true = ets:update_element(FileSummary, Dest,
+ [{#file_summary.locked, false},
+ {#file_summary.right, SourceRight}]),
+ true = ets:delete(FileSummary, Source),
+ noreply(run_pending(
+ State #msstate { sum_file_size = SumFileSize - Reclaimed,
+ gc_active = false })).
handle_info(timeout, State) ->
noreply(sync(State));
@@ -431,9 +406,13 @@ handle_info({file_handle_cache, maximum_eldest_since_use, Age}, State) ->
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
-terminate(_Reason, State = #msstate { msg_locations = MsgLocations,
+terminate(_Reason, State = #msstate { index_state = IndexState,
+ index_module = IndexModule,
file_summary = FileSummary,
current_file_handle = FileHdl }) ->
+ %% stop the gc first, otherwise it could be working and we pull
+ %% out the ets tables from under it.
+ ok = rabbit_msg_store_gc:stop(),
State1 = case FileHdl of
undefined -> State;
_ -> State2 = sync(State),
@@ -441,15 +420,18 @@ terminate(_Reason, State = #msstate { msg_locations = MsgLocations,
State2
end,
State3 = close_all_handles(State1),
- ets:delete(MsgLocations),
ets:delete(FileSummary),
- State3 #msstate { msg_locations = undefined,
+ IndexModule:terminate(IndexState),
+ State3 #msstate { index_state = undefined,
file_summary = undefined,
current_file_handle = undefined }.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+handle_pre_hibernate(State) ->
+ {hibernate, maybe_compact(State)}.
+
%%----------------------------------------------------------------------------
%% general helper functions
%%----------------------------------------------------------------------------
@@ -463,11 +445,11 @@ reply(Reply, State) ->
{reply, Reply, State1, Timeout}.
next_state(State = #msstate { on_sync = [], sync_timer_ref = undefined }) ->
- {State, infinity};
+ {State, hibernate};
next_state(State = #msstate { sync_timer_ref = undefined }) ->
{start_sync_timer(State), 0};
next_state(State = #msstate { on_sync = [] }) ->
- {stop_sync_timer(State), infinity};
+ {stop_sync_timer(State), hibernate};
next_state(State) ->
{State, 0}.
@@ -481,27 +463,12 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) ->
{ok, cancel} = timer:cancel(TRef),
State #msstate { sync_timer_ref = undefined }.
-form_filename(Dir, Name) -> filename:join(Dir, Name).
-
-filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION.
-
filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)).
sort_file_names(FileNames) ->
lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end,
FileNames).
-preallocate(Hdl, FileSizeLimit, FinalPos) ->
- {ok, FileSizeLimit} = file_handle_cache:position(Hdl, FileSizeLimit),
- ok = file_handle_cache:truncate(Hdl),
- {ok, FinalPos} = file_handle_cache:position(Hdl, FinalPos),
- ok.
-
-truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) ->
- {ok, Lowpoint} = file_handle_cache:position(FileHdl, Lowpoint),
- ok = file_handle_cache:truncate(FileHdl),
- ok = preallocate(FileHdl, Highpoint, Lowpoint).
-
sync(State = #msstate { current_file_handle = CurHdl,
on_sync = Syncs }) ->
State1 = stop_sync_timer(State),
@@ -513,32 +480,135 @@ sync(State = #msstate { current_file_handle = CurHdl,
State1 #msstate { on_sync = [] }
end.
+read_message(MsgId, From, State =
+ #msstate { current_file = CurFile,
+ current_file_handle = CurHdl,
+ file_summary = FileSummary }) ->
+ case index_lookup(MsgId, State) of
+ not_found -> gen_server2:reply(From, not_found),
+ State;
+ #msg_location { ref_count = RefCount,
+ file = File,
+ offset = Offset,
+ total_size = TotalSize } ->
+ case fetch_and_increment_cache(MsgId, State) of
+ not_found ->
+ [#file_summary { locked = Locked }] =
+ ets:lookup(FileSummary, File),
+ case Locked of
+ true ->
+ add_to_pending_gc_completion({read, MsgId, From},
+ State);
+ false ->
+ ok = case CurFile =:= File andalso {ok, Offset} >=
+ file_handle_cache:current_raw_offset(
+ CurHdl) of
+ true -> file_handle_cache:flush(CurHdl);
+ false -> ok
+ end,
+ {Hdl, State1} = get_read_handle(File, State),
+ {ok, Offset} =
+ file_handle_cache:position(Hdl, Offset),
+ {ok, {MsgId, Msg}} =
+ case rabbit_msg_file:read(Hdl, TotalSize) of
+ {ok, {MsgId, _}} = Obj -> Obj;
+ Rest ->
+ throw({error, {misread,
+ [{old_state, State},
+ {file_num, File},
+ {offset, Offset},
+ {read, Rest},
+ {proc_dict, get()}
+ ]}})
+ end,
+ ok = case RefCount > 1 of
+ true ->
+ insert_into_cache(MsgId, Msg, State1);
+ false ->
+ %% it's not in the cache and
+ %% we only have one reference
+ %% to the message. So don't
+ %% bother putting it in the
+ %% cache.
+ ok
+ end,
+ gen_server2:reply(From, {ok, Msg}),
+ State1
+ end;
+ {Msg, _RefCount} ->
+ gen_server2:reply(From, {ok, Msg}),
+ State
+ end
+ end.
+
+contains_message(MsgId, From, State = #msstate { gc_active = GCActive }) ->
+ case index_lookup(MsgId, State) of
+ not_found ->
+ gen_server2:reply(From, false),
+ State;
+ #msg_location { file = File } ->
+ case GCActive of
+ {A, B} when File == A orelse File == B ->
+ add_to_pending_gc_completion(
+ {contains, MsgId, From}, State);
+ _ ->
+ gen_server2:reply(From, true),
+ State
+ end
+ end.
+
remove_message(MsgId, State = #msstate { file_summary = FileSummary,
sum_valid_data = SumValid }) ->
- StoreEntry = #msg_location { ref_count = RefCount, file = File,
- offset = Offset, total_size = TotalSize } =
+ #msg_location { ref_count = RefCount, file = File,
+ offset = Offset, total_size = TotalSize } =
index_lookup(MsgId, State),
case RefCount of
1 ->
- ok = index_delete(MsgId, State),
ok = remove_cache_entry(MsgId, State),
[FSEntry = #file_summary { valid_total_size = ValidTotalSize,
- contiguous_top = ContiguousTop }] =
+ contiguous_top = ContiguousTop,
+ locked = Locked }] =
ets:lookup(FileSummary, File),
- ContiguousTop1 = lists:min([ContiguousTop, Offset]),
- ValidTotalSize1 = ValidTotalSize - TotalSize,
- true = ets:insert(FileSummary, FSEntry #file_summary {
- valid_total_size = ValidTotalSize1,
- contiguous_top = ContiguousTop1 }),
- {compact, File, State #msstate {
- sum_valid_data = SumValid - TotalSize }};
+ case Locked of
+ true ->
+ add_to_pending_gc_completion({remove, MsgId}, State);
+ false ->
+ ok = index_delete(MsgId, State),
+ ContiguousTop1 = lists:min([ContiguousTop, Offset]),
+ ValidTotalSize1 = ValidTotalSize - TotalSize,
+ true = ets:insert(
+ FileSummary, FSEntry #file_summary {
+ valid_total_size = ValidTotalSize1,
+ contiguous_top = ContiguousTop1 }),
+ State1 = delete_file_if_empty(File, State),
+ State1 #msstate { sum_valid_data = SumValid - TotalSize }
+ end;
_ when 1 < RefCount ->
ok = decrement_cache(MsgId, State),
- ok = index_update(StoreEntry #msg_location {
- ref_count = RefCount - 1 }, State),
- {no_compact, State}
+ %% only update field, otherwise bad interaction with concurrent GC
+ ok = index_update_fields(MsgId,
+ {#msg_location.ref_count, RefCount - 1},
+ State),
+ State
end.
+add_to_pending_gc_completion(
+ Op, State = #msstate { pending_gc_completion = Pending }) ->
+ State #msstate { pending_gc_completion = [Op | Pending] }.
+
+run_pending(State = #msstate { pending_gc_completion = [] }) ->
+ State;
+run_pending(State = #msstate { pending_gc_completion = Pending }) ->
+ State1 = State #msstate { pending_gc_completion = [] },
+ lists:foldl(fun run_pending/2, State1, lists:reverse(Pending)).
+
+run_pending({read, MsgId, From}, State) ->
+ read_message(MsgId, From, State);
+run_pending({contains, MsgId, From}, State) ->
+ contains_message(MsgId, From, State);
+run_pending({remove, MsgId}, State) ->
+ remove_message(MsgId, State).
+
close_handle(Key, State = #msstate { file_handle_cache = FHC }) ->
case dict:find(Key, FHC) of
{ok, Hdl} ->
@@ -556,19 +626,16 @@ close_all_handles(State = #msstate { file_handle_cache = FHC }) ->
get_read_handle(FileNum, State = #msstate { file_handle_cache = FHC }) ->
case dict:find(FileNum, FHC) of
{ok, Hdl} -> {Hdl, State};
- error -> new_handle(FileNum, filenum_to_name(FileNum),
+ error -> new_handle(FileNum,
+ rabbit_msg_store_misc:filenum_to_name(FileNum),
[read | ?BINARY_MODE], State)
end.
new_handle(Key, FileName, Mode, State = #msstate { file_handle_cache = FHC,
dir = Dir }) ->
- {ok, Hdl} = open_file(Dir, FileName, Mode),
+ {ok, Hdl} = rabbit_msg_store_misc:open_file(Dir, FileName, Mode),
{Hdl, State #msstate { file_handle_cache = dict:store(Key, Hdl, FHC) }}.
-open_file(Dir, FileName, Mode) ->
- file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode,
- [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]).
-
%%----------------------------------------------------------------------------
%% message cache helper functions
%%----------------------------------------------------------------------------
@@ -607,28 +674,25 @@ insert_into_cache(MsgId, Msg, #msstate { message_cache = Cache }) ->
%% index
%%----------------------------------------------------------------------------
-index_lookup(Key, #msstate { msg_locations = MsgLocations }) ->
- case ets:lookup(MsgLocations, Key) of
- [] -> not_found;
- [Entry] -> Entry
- end.
+index_lookup(Key, #msstate { index_module = Index, index_state = State }) ->
+ Index:lookup(Key, State).
-index_insert(Obj, #msstate { msg_locations = MsgLocations }) ->
- true = ets:insert_new(MsgLocations, Obj),
- ok.
+index_insert(Obj, #msstate { index_module = Index, index_state = State }) ->
+ Index:insert(Obj, State).
-index_update(Obj, #msstate { msg_locations = MsgLocations }) ->
- true = ets:insert(MsgLocations, Obj),
- ok.
+index_update(Obj, #msstate { index_module = Index, index_state = State }) ->
+ Index:update(Obj, State).
-index_delete(Key, #msstate { msg_locations = MsgLocations }) ->
- true = ets:delete(MsgLocations, Key),
- ok.
+index_update_fields(Key, Updates,
+ #msstate { index_module = Index, index_state = State }) ->
+ Index:update_fields(Key, Updates, State).
-index_delete_by_file(File, #msstate { msg_locations = MsgLocations }) ->
- MatchHead = #msg_location { file = File, _ = '_' },
- ets:select_delete(MsgLocations, [{MatchHead, [], [true]}]),
- ok.
+index_delete(Key, #msstate { index_module = Index, index_state = State }) ->
+ Index:delete(Key, State).
+
+index_delete_by_file(File, #msstate { index_module = Index,
+ index_state = State }) ->
+ Index:delete_by_file(File, State).
%%----------------------------------------------------------------------------
%% recovery
@@ -696,7 +760,7 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) ->
%% consist only of valid messages. Plan: Truncate the main file
%% back to before any of the files in the tmp file and copy
%% them over again
- TmpPath = form_filename(Dir, TmpFileName),
+ TmpPath = rabbit_msg_store_misc:form_filename(Dir, TmpFileName),
case is_sublist(MsgIdsTmp, MsgIds) of
true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file
%% note this also catches the case when the tmp file
@@ -728,8 +792,8 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) ->
%% are in the tmp file
true = is_disjoint(MsgIds1, MsgIdsTmp),
%% must open with read flag, otherwise will stomp over contents
- {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName,
- [read | ?WRITE_MODE]),
+ {ok, MainHdl} = rabbit_msg_store_misc:open_file(
+ Dir, NonTmpRelatedFileName, [read | ?WRITE_MODE]),
%% Wipe out any rubbish at the end of the file. Remember
%% the head of the list will be the highest entry in the
%% file.
@@ -738,8 +802,10 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) ->
%% Extend the main file as big as necessary in a single
%% move. If we run out of disk space, this truncate could
%% fail, but we still aren't risking losing data
- ok = truncate_and_extend_file(MainHdl, Top, Top + TmpSize),
- {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_AHEAD_MODE),
+ ok = rabbit_msg_store_misc:truncate_and_extend_file(
+ MainHdl, Top, Top + TmpSize),
+ {ok, TmpHdl} = rabbit_msg_store_misc:open_file(
+ Dir, TmpFileName, ?READ_AHEAD_MODE),
{ok, TmpSize} = file_handle_cache:copy(TmpHdl, MainHdl, TmpSize),
ok = file_handle_cache:close(MainHdl),
ok = file_handle_cache:delete(TmpHdl),
@@ -761,22 +827,10 @@ is_disjoint(SmallerL, BiggerL) ->
lists:all(fun (Item) -> not lists:member(Item, BiggerL) end, SmallerL).
scan_file_for_valid_messages_msg_ids(Dir, FileName) ->
- {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, FileName),
+ {ok, Messages, _FileSize} =
+ rabbit_msg_store_misc:scan_file_for_valid_messages(Dir, FileName),
{ok, Messages, [MsgId || {MsgId, _TotalSize, _FileOffset} <- Messages]}.
-scan_file_for_valid_messages(Dir, FileName) ->
- case open_file(Dir, FileName, ?READ_MODE) of
- {ok, Hdl} ->
- Valid = rabbit_msg_file:scan(Hdl),
- %% if something really bad's happened, the close could fail,
- %% but ignore
- file_handle_cache:close(Hdl),
- Valid;
- {error, enoent} -> {ok, [], 0};
- {error, Reason} -> throw({error,
- {unable_to_scan_file, FileName, Reason}})
- end.
-
%% Takes the list in *ascending* order (i.e. eldest message
%% first). This is the opposite of what scan_file_for_valid_messages
%% produces. The list of msgs that is produced is youngest first.
@@ -794,25 +848,25 @@ find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) ->
{ExpectedOffset, MsgIds}.
build_index([], State) ->
- build_index(undefined, [State #msstate.current_file], [], State);
+ build_index(undefined, [State #msstate.current_file], State);
build_index(Files, State) ->
- build_index(undefined, Files, [], State).
+ {Offset, State1} = build_index(undefined, Files, State),
+ {Offset, lists:foldl(fun delete_file_if_empty/2, State1, Files)}.
-build_index(Left, [], FilesToCompact, State =
- #msstate { file_summary = FileSummary }) ->
+build_index(Left, [], State = #msstate { file_summary = FileSummary }) ->
ok = index_delete_by_file(undefined, State),
Offset = case ets:lookup(FileSummary, Left) of
[] -> 0;
[#file_summary { file_size = FileSize }] -> FileSize
end,
- {Offset, compact(FilesToCompact, %% this never includes the current file
- State #msstate { current_file = Left })};
-build_index(Left, [File|Files], FilesToCompact,
+ {Offset, State #msstate { current_file = Left }};
+build_index(Left, [File|Files],
State = #msstate { dir = Dir, file_summary = FileSummary,
sum_valid_data = SumValid,
sum_file_size = SumFileSize }) ->
{ok, Messages, FileSize} =
- scan_file_for_valid_messages(Dir, filenum_to_name(File)),
+ rabbit_msg_store_misc:scan_file_for_valid_messages(
+ Dir, rabbit_msg_store_misc:filenum_to_name(File)),
{ValidMessages, ValidTotalSize} =
lists:foldl(
fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
@@ -845,14 +899,9 @@ build_index(Left, [File|Files], FilesToCompact,
true =
ets:insert_new(FileSummary, #file_summary {
file = File, valid_total_size = ValidTotalSize,
- contiguous_top = ContiguousTop,
+ contiguous_top = ContiguousTop, locked = false,
left = Left, right = Right, file_size = FileSize1 }),
- FilesToCompact1 =
- case FileSize1 == ContiguousTop orelse Right =:= undefined of
- true -> FilesToCompact;
- false -> [File | FilesToCompact]
- end,
- build_index(File, Files, FilesToCompact1,
+ build_index(File, Files,
State #msstate { sum_valid_data = SumValid + ValidTotalSize,
sum_file_size = SumFileSize + FileSize1 }).
@@ -862,235 +911,93 @@ build_index(Left, [File|Files], FilesToCompact,
maybe_roll_to_new_file(Offset,
State = #msstate { dir = Dir,
- file_size_limit = FileSizeLimit,
current_file_handle = CurHdl,
current_file = CurFile,
file_summary = FileSummary })
- when Offset >= FileSizeLimit ->
+ when Offset >= ?FILE_SIZE_LIMIT ->
State1 = sync(State),
ok = file_handle_cache:close(CurHdl),
NextFile = CurFile + 1,
- {ok, NextHdl} = open_file(Dir, filenum_to_name(NextFile), ?WRITE_MODE),
- true = ets:update_element(FileSummary, CurFile,
- {#file_summary.right, NextFile}),
+ {ok, NextHdl} = rabbit_msg_store_misc:open_file(
+ Dir, rabbit_msg_store_misc:filenum_to_name(NextFile),
+ ?WRITE_MODE),
true = ets:insert_new(
FileSummary, #file_summary {
file = NextFile, valid_total_size = 0, contiguous_top = 0,
- left = CurFile, right = undefined, file_size = 0 }),
- State2 = State1 #msstate { current_file_handle = NextHdl,
- current_file = NextFile },
- compact([CurFile], State2);
+ left = CurFile, right = undefined, file_size = 0,
+ locked = false }),
+ true = ets:update_element(FileSummary, CurFile,
+ {#file_summary.right, NextFile}),
+ State1 #msstate { current_file_handle = NextHdl,
+ current_file = NextFile };
maybe_roll_to_new_file(_, State) ->
State.
-compact(Files, State) ->
- %% smallest number, hence eldest, hence left-most, first
- SortedFiles = lists:sort(Files),
- %% foldl reverses, so now youngest/right-most first
- {RemainingFiles, State1} =
- lists:foldl(fun (File, {Acc, State2}) ->
- case delete_file_if_empty(File, State2) of
- {true, State3} -> {Acc, State3};
- {false, State3} -> {[File | Acc], State3}
- end
- end, {[], State}, SortedFiles),
- lists:foldl(fun combine_file/2, State1, lists:reverse(RemainingFiles)).
-
-%% At this stage, we simply know that the file has had msgs removed
-%% from it. However, we don't know if we need to merge it left (which
-%% is what we would prefer), or merge it right. If we merge left, then
-%% this file is the source, and the left file is the destination. If
-%% we merge right then this file is the destination and the right file
-%% is the source.
-combine_file(File, State = #msstate { file_summary = FileSummary,
- current_file = CurFile }) ->
- %% the file we're looking at may no longer exist as it may have
- %% been deleted within the current GC run
- case ets:lookup(FileSummary, File) of
- [] -> State;
- [FSEntry = #file_summary { left = Left, right = Right }] ->
- GoRight =
- fun() ->
- case Right of
- undefined -> State;
- _ when not (CurFile == Right) ->
- [FSRight] = ets:lookup(FileSummary, Right),
- {_, State1} = adjust_meta_and_combine(
- FSEntry, FSRight, State),
- State1;
- _ -> State
- end
- end,
- case Left of
- undefined ->
- GoRight();
- _ -> [FSLeft] = ets:lookup(FileSummary, Left),
- case adjust_meta_and_combine(FSLeft, FSEntry, State) of
- {true, State1} -> State1;
- {false, State} -> GoRight()
- end
- end
- end.
+maybe_compact(State = #msstate { sum_valid_data = SumValid,
+ sum_file_size = SumFileSize,
+ file_summary = FileSummary,
+ gc_active = false })
+ when (SumFileSize - SumValid) / SumFileSize > ?GARBAGE_FRACTION ->
+ First = ets:first(FileSummary),
+ N = random_distributions:geometric(?GEOMETRIC_P),
+ case find_files_to_gc(FileSummary, N, First) of
+ undefined ->
+ State;
+ {Source, Dest} ->
+ State1 = close_handle(Source, close_handle(Dest, State)),
+ true = ets:update_element(FileSummary, Source,
+ {#file_summary.locked, true}),
+ true = ets:update_element(FileSummary, Dest,
+ {#file_summary.locked, true}),
+ ok = rabbit_msg_store_gc:gc(Source, Dest),
+ State1 #msstate { gc_active = {Source, Dest} }
+ end;
+maybe_compact(State) ->
+ State.
-adjust_meta_and_combine(
- LeftObj = #file_summary {
- file = LeftFile, valid_total_size = LeftValidData, right = RightFile,
- file_size = LeftFileSize },
- RightObj = #file_summary {
- file = RightFile, valid_total_size = RightValidData, left = LeftFile,
- right = RightRight, file_size = RightFileSize },
- State = #msstate { file_size_limit = FileSizeLimit,
- file_summary = FileSummary,
- sum_file_size = SumFileSize }) ->
- TotalValidData = LeftValidData + RightValidData,
- if FileSizeLimit >= TotalValidData ->
- State1 = combine_files(RightObj, LeftObj, State),
- %% this could fail if RightRight is undefined
- ets:update_element(FileSummary, RightRight,
- {#file_summary.left, LeftFile}),
- true = ets:insert(FileSummary, LeftObj #file_summary {
- valid_total_size = TotalValidData,
- contiguous_top = TotalValidData,
- file_size = TotalValidData,
- right = RightRight }),
- true = ets:delete(FileSummary, RightFile),
- {true, State1 #msstate { sum_file_size =
- SumFileSize - LeftFileSize - RightFileSize
- + TotalValidData }};
- true -> {false, State}
+find_files_to_gc(_FileSummary, _N, '$end_of_table') ->
+ undefined;
+find_files_to_gc(FileSummary, N, First) ->
+ [FirstObj = #file_summary { right = Right }] =
+ ets:lookup(FileSummary, First),
+ Pairs =
+ find_files_to_gc(FileSummary, N, FirstObj,
+ ets:lookup(FileSummary, Right), []),
+ case Pairs of
+ [] -> undefined;
+ [Pair] -> Pair;
+ _ -> M = 1 + (N rem length(Pairs)),
+ lists:nth(M, Pairs)
end.
-combine_files(#file_summary { file = Source,
- valid_total_size = SourceValid,
- left = Destination },
- #file_summary { file = Destination,
- valid_total_size = DestinationValid,
- contiguous_top = DestinationContiguousTop,
- right = Source },
- State = #msstate { dir = Dir }) ->
- State1 = close_handle(Source, close_handle(Destination, State)),
- SourceName = filenum_to_name(Source),
- DestinationName = filenum_to_name(Destination),
- {ok, SourceHdl} = open_file(Dir, SourceName, ?READ_AHEAD_MODE),
- {ok, DestinationHdl} = open_file(Dir, DestinationName,
- ?READ_AHEAD_MODE ++ ?WRITE_MODE),
- ExpectedSize = SourceValid + DestinationValid,
- %% if DestinationValid =:= DestinationContiguousTop then we don't
- %% need a tmp file
- %% if they're not equal, then we need to write out everything past
- %% the DestinationContiguousTop to a tmp file then truncate,
- %% copy back in, and then copy over from Source
- %% otherwise we just truncate straight away and copy over from Source
- if DestinationContiguousTop =:= DestinationValid ->
- ok = truncate_and_extend_file(DestinationHdl,
- DestinationValid, ExpectedSize);
- true ->
- Worklist =
- lists:dropwhile(
- fun (#msg_location { offset = Offset })
- when Offset /= DestinationContiguousTop ->
- %% it cannot be that Offset ==
- %% DestinationContiguousTop because if it
- %% was then DestinationContiguousTop would
- %% have been extended by TotalSize
- Offset < DestinationContiguousTop
- %% Given expected access patterns, I suspect
- %% that the list should be naturally sorted
- %% as we require, however, we need to
- %% enforce it anyway
- end, find_unremoved_messages_in_file(Destination, State1)),
- Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
- {ok, TmpHdl} = open_file(Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
- ok = copy_messages(
- Worklist, DestinationContiguousTop, DestinationValid,
- DestinationHdl, TmpHdl, Destination, State1),
- TmpSize = DestinationValid - DestinationContiguousTop,
- %% so now Tmp contains everything we need to salvage from
- %% Destination, and MsgLocation has been updated to
- %% reflect compaction of Destination so truncate
- %% Destination and copy from Tmp back to the end
- {ok, 0} = file_handle_cache:position(TmpHdl, 0),
- ok = truncate_and_extend_file(
- DestinationHdl, DestinationContiguousTop, ExpectedSize),
- {ok, TmpSize} =
- file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize),
- %% position in DestinationHdl should now be DestinationValid
- ok = file_handle_cache:sync(DestinationHdl),
- ok = file_handle_cache:close(TmpHdl),
- ok = file:delete(form_filename(Dir, Tmp))
- end,
- SourceWorkList = find_unremoved_messages_in_file(Source, State1),
- ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
- SourceHdl, DestinationHdl, Destination, State1),
- %% tidy up
- ok = file_handle_cache:close(SourceHdl),
- ok = file_handle_cache:close(DestinationHdl),
- ok = file:delete(form_filename(Dir, SourceName)),
- State1.
-
-find_unremoved_messages_in_file(File, State = #msstate { dir = Dir }) ->
- %% Msgs here will be end-of-file at start-of-list
- {ok, Messages, _FileSize} =
- scan_file_for_valid_messages(Dir, filenum_to_name(File)),
- %% foldl will reverse so will end up with msgs in ascending offset order
- lists:foldl(
- fun ({MsgId, _TotalSize, _Offset}, Acc) ->
- case index_lookup(MsgId, State) of
- Entry = #msg_location { file = File } -> [ Entry | Acc ];
- _ -> Acc
- end
- end, [], Messages).
-
-copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
- Destination, State) ->
- {FinalOffset, BlockStart1, BlockEnd1} =
- lists:foldl(
- fun (StoreEntry = #msg_location { offset = Offset,
- total_size = TotalSize },
- {CurOffset, BlockStart, BlockEnd}) ->
- %% CurOffset is in the DestinationFile.
- %% Offset, BlockStart and BlockEnd are in the SourceFile
- %% update MsgLocation to reflect change of file and offset
- ok = index_update(StoreEntry #msg_location {
- file = Destination,
- offset = CurOffset }, State),
- NextOffset = CurOffset + TotalSize,
- if BlockStart =:= undefined ->
- %% base case, called only for the first list elem
- {NextOffset, Offset, Offset + TotalSize};
- Offset =:= BlockEnd ->
- %% extend the current block because the next
- %% msg follows straight on
- {NextOffset, BlockStart, BlockEnd + TotalSize};
- true ->
- %% found a gap, so actually do the work for
- %% the previous block
- BSize = BlockEnd - BlockStart,
- {ok, BlockStart} =
- file_handle_cache:position(SourceHdl, BlockStart),
- {ok, BSize} = file_handle_cache:copy(
- SourceHdl, DestinationHdl, BSize),
- {NextOffset, Offset, Offset + TotalSize}
- end
- end, {InitOffset, undefined, undefined}, WorkList),
- case WorkList of
- [] ->
- ok;
- _ ->
- %% do the last remaining block
- BSize1 = BlockEnd1 - BlockStart1,
- {ok, BlockStart1} =
- file_handle_cache:position(SourceHdl, BlockStart1),
- {ok, BSize1} =
- file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1),
- ok = file_handle_cache:sync(DestinationHdl)
- end,
- ok.
+find_files_to_gc(_FileSummary, _N, #file_summary {}, [], Pairs) ->
+ lists:reverse(Pairs);
+find_files_to_gc(FileSummary, N,
+ #file_summary { right = Source, file = Dest,
+ valid_total_size = DestValid },
+ [SourceObj = #file_summary { left = Dest, right = SourceRight,
+ valid_total_size = SourceValid,
+ file = Source }],
+ Pairs) when DestValid + SourceValid =< ?FILE_SIZE_LIMIT andalso
+ not is_atom(SourceRight) ->
+ Pair = {Source, Dest},
+ case N == 1 of
+ true -> [Pair];
+ false -> find_files_to_gc(FileSummary, (N - 1), SourceObj,
+ ets:lookup(FileSummary, SourceRight),
+ [Pair | Pairs])
+ end;
+find_files_to_gc(FileSummary, N, _Left,
+ [Right = #file_summary { right = RightRight }], Pairs) ->
+ find_files_to_gc(FileSummary, N, Right,
+ ets:lookup(FileSummary, RightRight), Pairs).
+delete_file_if_empty(File, State = #msstate { current_file = File }) ->
+ State;
delete_file_if_empty(File, #msstate { dir = Dir, file_summary = FileSummary,
sum_file_size = SumFileSize } = State) ->
[#file_summary { valid_total_size = ValidData, file_size = FileSize,
- left = Left, right = Right }] =
+ left = Left, right = Right, locked = false }] =
ets:lookup(FileSummary, File),
case ValidData of
%% we should NEVER find the current file in here hence right
@@ -1101,16 +1008,17 @@ delete_file_if_empty(File, #msstate { dir = Dir, file_summary = FileSummary,
true = ets:update_element(
FileSummary, Right,
{#file_summary.left, undefined});
- {_, _} when not (is_atom(Right)) ->
+ {_, _} when not is_atom(Right) ->
true = ets:update_element(FileSummary, Right,
{#file_summary.left, Left}),
- true =
- ets:update_element(FileSummary, Left,
- {#file_summary.right, Right})
+ true = ets:update_element(FileSummary, Left,
+ {#file_summary.right, Right})
end,
true = ets:delete(FileSummary, File),
State1 = close_handle(File, State),
- ok = file:delete(form_filename(Dir, filenum_to_name(File))),
- {true, State1 #msstate { sum_file_size = SumFileSize - FileSize }};
- _ -> {false, State}
+ ok = file:delete(rabbit_msg_store_misc:form_filename(
+ Dir,
+ rabbit_msg_store_misc:filenum_to_name(File))),
+ State1 #msstate { sum_file_size = SumFileSize - FileSize };
+ _ -> State
end.
diff --git a/src/rabbit_msg_store_ets_index.erl b/src/rabbit_msg_store_ets_index.erl
new file mode 100644
index 0000000000..cb13ed8690
--- /dev/null
+++ b/src/rabbit_msg_store_ets_index.erl
@@ -0,0 +1,71 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_msg_store_ets_index).
+-export([init/0, lookup/2, insert/2, update/2, update_fields/3, delete/2,
+ delete_by_file/2, terminate/1]).
+
+-define(MSG_LOC_NAME, rabbit_msg_store_ets_index).
+
+-include("rabbit_msg_store.hrl").
+
+init() ->
+ ets:new(?MSG_LOC_NAME, [set, public, {keypos, #msg_location.msg_id}]).
+
+lookup(Key, MsgLocations) ->
+ case ets:lookup(MsgLocations, Key) of
+ [] -> not_found;
+ [Entry] -> Entry
+ end.
+
+insert(Obj, MsgLocations) ->
+ true = ets:insert_new(MsgLocations, Obj),
+ ok.
+
+update(Obj, MsgLocations) ->
+ true = ets:insert(MsgLocations, Obj),
+ ok.
+
+update_fields(Key, Updates, MsgLocations) ->
+ true = ets:update_element(MsgLocations, Key, Updates),
+ ok.
+
+delete(Key, MsgLocations) ->
+ true = ets:delete(MsgLocations, Key),
+ ok.
+
+delete_by_file(File, MsgLocations) ->
+ MatchHead = #msg_location { file = File, _ = '_' },
+ ets:select_delete(MsgLocations, [{MatchHead, [], [true]}]),
+ ok.
+
+terminate(MsgLocations) ->
+ ets:delete(MsgLocations).
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
new file mode 100644
index 0000000000..729cd28715
--- /dev/null
+++ b/src/rabbit_msg_store_gc.erl
@@ -0,0 +1,249 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_msg_store_gc).
+
+-behaviour(gen_server2).
+
+-export([start_link/4, gc/2, stop/0]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(gcstate,
+ {dir,
+ index_state,
+ file_summary,
+ index_module
+ }).
+
+-include("rabbit_msg_store.hrl").
+
+-define(SERVER, ?MODULE).
+
+%%----------------------------------------------------------------------------
+
+start_link(Dir, IndexState, FileSummary, IndexModule) ->
+ gen_server2:start_link({local, ?SERVER}, ?MODULE,
+ [Dir, IndexState, FileSummary, IndexModule],
+ [{timeout, infinity}]).
+
+gc(Source, Destination) ->
+ gen_server2:cast(?SERVER, {gc, Source, Destination}).
+
+stop() ->
+ gen_server2:call(?SERVER, stop).
+
+%%----------------------------------------------------------------------------
+
+init([Dir, IndexState, FileSummary, IndexModule]) ->
+ {ok, #gcstate { dir = Dir, index_state = IndexState,
+ file_summary = FileSummary, index_module = IndexModule },
+ hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+
+handle_call(stop, _From, State) ->
+ {stop, normal, ok, State}.
+
+handle_cast({gc, Source, Destination}, State) ->
+ Reclaimed = adjust_meta_and_combine(Source, Destination, State),
+ ok = rabbit_msg_store:gc_done(Reclaimed, Source, Destination),
+ {noreply, State, hibernate}.
+
+handle_info(Info, State) ->
+ {stop, {unhandled_info, Info}, State}.
+
+terminate(_Reason, State) ->
+ State.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%----------------------------------------------------------------------------
+
+adjust_meta_and_combine(SourceFile, DestFile,
+ State = #gcstate { file_summary = FileSummary }) ->
+
+ [SourceObj = #file_summary {
+ valid_total_size = SourceValidData, left = DestFile,
+ file_size = SourceFileSize, locked = true }] =
+ ets:lookup(FileSummary, SourceFile),
+ [DestObj = #file_summary {
+ valid_total_size = DestValidData, right = SourceFile,
+ file_size = DestFileSize, locked = true }] =
+ ets:lookup(FileSummary, DestFile),
+
+ TotalValidData = DestValidData + SourceValidData,
+ ok = combine_files(SourceObj, DestObj, State),
+ %% don't update dest.right, because it could be changing at the same time
+ true =
+ ets:update_element(FileSummary, DestFile,
+ [{#file_summary.valid_total_size, TotalValidData},
+ {#file_summary.contiguous_top, TotalValidData},
+ {#file_summary.file_size, TotalValidData}]),
+ SourceFileSize + DestFileSize - TotalValidData.
+
+combine_files(#file_summary { file = Source,
+ valid_total_size = SourceValid,
+ left = Destination },
+ #file_summary { file = Destination,
+ valid_total_size = DestinationValid,
+ contiguous_top = DestinationContiguousTop,
+ right = Source },
+ State = #gcstate { dir = Dir }) ->
+ SourceName = rabbit_msg_store_misc:filenum_to_name(Source),
+ DestinationName = rabbit_msg_store_misc:filenum_to_name(Destination),
+ {ok, SourceHdl} =
+ rabbit_msg_store_misc:open_file(Dir, SourceName, ?READ_AHEAD_MODE),
+ {ok, DestinationHdl} =
+ rabbit_msg_store_misc:open_file(Dir, DestinationName,
+ ?READ_AHEAD_MODE ++ ?WRITE_MODE),
+ ExpectedSize = SourceValid + DestinationValid,
+ %% if DestinationValid =:= DestinationContiguousTop then we don't
+ %% need a tmp file
+ %% if they're not equal, then we need to write out everything past
+ %% the DestinationContiguousTop to a tmp file then truncate,
+ %% copy back in, and then copy over from Source
+ %% otherwise we just truncate straight away and copy over from Source
+ if DestinationContiguousTop =:= DestinationValid ->
+ ok = rabbit_msg_store_misc:truncate_and_extend_file(
+ DestinationHdl, DestinationValid, ExpectedSize);
+ true ->
+ Worklist =
+ lists:dropwhile(
+ fun (#msg_location { offset = Offset })
+ when Offset /= DestinationContiguousTop ->
+ %% it cannot be that Offset ==
+ %% DestinationContiguousTop because if it
+ %% was then DestinationContiguousTop would
+ %% have been extended by TotalSize
+ Offset < DestinationContiguousTop
+ %% Given expected access patterns, I suspect
+ %% that the list should be naturally sorted
+ %% as we require, however, we need to
+ %% enforce it anyway
+ end,
+ find_unremoved_messages_in_file(Destination, State)),
+ Tmp = filename:rootname(DestinationName) ++ ?FILE_EXTENSION_TMP,
+ {ok, TmpHdl} = rabbit_msg_store_misc:open_file(
+ Dir, Tmp, ?READ_AHEAD_MODE ++ ?WRITE_MODE),
+ ok = copy_messages(
+ Worklist, DestinationContiguousTop, DestinationValid,
+ DestinationHdl, TmpHdl, Destination, State),
+ TmpSize = DestinationValid - DestinationContiguousTop,
+ %% so now Tmp contains everything we need to salvage from
+ %% Destination, and index_state has been updated to
+ %% reflect the compaction of Destination so truncate
+ %% Destination and copy from Tmp back to the end
+ {ok, 0} = file_handle_cache:position(TmpHdl, 0),
+ ok = rabbit_msg_store_misc:truncate_and_extend_file(
+ DestinationHdl, DestinationContiguousTop, ExpectedSize),
+ {ok, TmpSize} =
+ file_handle_cache:copy(TmpHdl, DestinationHdl, TmpSize),
+ %% position in DestinationHdl should now be DestinationValid
+ ok = file_handle_cache:sync(DestinationHdl),
+ ok = file_handle_cache:close(TmpHdl),
+ ok = file:delete(rabbit_msg_store_misc:form_filename(Dir, Tmp))
+ end,
+ SourceWorkList = find_unremoved_messages_in_file(Source, State),
+ ok = copy_messages(SourceWorkList, DestinationValid, ExpectedSize,
+ SourceHdl, DestinationHdl, Destination, State),
+ %% tidy up
+ ok = file_handle_cache:close(SourceHdl),
+ ok = file_handle_cache:close(DestinationHdl),
+ ok = file:delete(rabbit_msg_store_misc:form_filename(Dir, SourceName)),
+ ok.
+
+find_unremoved_messages_in_file(File, #gcstate { dir = Dir,
+ index_state = IndexState,
+ index_module = Index }) ->
+ %% Msgs here will be end-of-file at start-of-list
+ {ok, Messages, _FileSize} =
+ rabbit_msg_store_misc:scan_file_for_valid_messages(
+ Dir, rabbit_msg_store_misc:filenum_to_name(File)),
+ %% foldl will reverse so will end up with msgs in ascending offset order
+ lists:foldl(
+ fun ({MsgId, _TotalSize, _Offset}, Acc) ->
+ case Index:lookup(MsgId, IndexState) of
+ Entry = #msg_location { file = File } -> [ Entry | Acc ];
+ _ -> Acc
+ end
+ end, [], Messages).
+
+copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
+ Destination, #gcstate { index_module = Index,
+ index_state = IndexState }) ->
+ {FinalOffset, BlockStart1, BlockEnd1} =
+ lists:foldl(
+ fun (#msg_location { msg_id = MsgId, offset = Offset,
+ total_size = TotalSize },
+ {CurOffset, BlockStart, BlockEnd}) ->
+ %% CurOffset is in the DestinationFile.
+ %% Offset, BlockStart and BlockEnd are in the SourceFile
+ %% update MsgLocation to reflect change of file and offset
+ ok = Index:update_fields(MsgId,
+ [{#msg_location.file, Destination},
+ {#msg_location.offset, CurOffset}],
+ IndexState),
+ {BlockStart2, BlockEnd2} =
+ if BlockStart =:= undefined ->
+ %% base case, called only for the first list elem
+ {Offset, Offset + TotalSize};
+ Offset =:= BlockEnd ->
+ %% extend the current block because the
+ %% next msg follows straight on
+ {BlockStart, BlockEnd + TotalSize};
+ true ->
+ %% found a gap, so actually do the work
+ %% for the previous block
+ BSize = BlockEnd - BlockStart,
+ {ok, BlockStart} =
+ file_handle_cache:position(SourceHdl,
+ BlockStart),
+ {ok, BSize} = file_handle_cache:copy(
+ SourceHdl, DestinationHdl, BSize),
+ {Offset, Offset + TotalSize}
+ end,
+ {CurOffset + TotalSize, BlockStart2, BlockEnd2}
+ end, {InitOffset, undefined, undefined}, WorkList),
+ case WorkList of
+ [] ->
+ ok;
+ _ ->
+ %% do the last remaining block
+ BSize1 = BlockEnd1 - BlockStart1,
+ {ok, BlockStart1} =
+ file_handle_cache:position(SourceHdl, BlockStart1),
+ {ok, BSize1} =
+ file_handle_cache:copy(SourceHdl, DestinationHdl, BSize1),
+ ok = file_handle_cache:sync(DestinationHdl)
+ end,
+ ok.
diff --git a/src/rabbit_msg_store_misc.erl b/src/rabbit_msg_store_misc.erl
new file mode 100644
index 0000000000..cf76cf21f9
--- /dev/null
+++ b/src/rabbit_msg_store_misc.erl
@@ -0,0 +1,74 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_msg_store_misc).
+
+-export([open_file/3, preallocate/3, truncate_and_extend_file/3,
+ form_filename/2, filenum_to_name/1, scan_file_for_valid_messages/2]).
+
+-include("rabbit_msg_store.hrl").
+
+
+%%----------------------------------------------------------------------------
+
+open_file(Dir, FileName, Mode) ->
+ file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode,
+ [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]).
+
+%%----------------------------------------------------------------------------
+
+preallocate(Hdl, FileSizeLimit, FinalPos) ->
+ {ok, FileSizeLimit} = file_handle_cache:position(Hdl, FileSizeLimit),
+ ok = file_handle_cache:truncate(Hdl),
+ {ok, FinalPos} = file_handle_cache:position(Hdl, FinalPos),
+ ok.
+
+truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) ->
+ {ok, Lowpoint} = file_handle_cache:position(FileHdl, Lowpoint),
+ ok = file_handle_cache:truncate(FileHdl),
+ ok = preallocate(FileHdl, Highpoint, Lowpoint).
+
+form_filename(Dir, Name) -> filename:join(Dir, Name).
+
+filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION.
+
+scan_file_for_valid_messages(Dir, FileName) ->
+ case open_file(Dir, FileName, ?READ_MODE) of
+ {ok, Hdl} ->
+ Valid = rabbit_msg_file:scan(Hdl),
+ %% if something really bad's happened, the close could fail,
+ %% but ignore
+ file_handle_cache:close(Hdl),
+ Valid;
+ {error, enoent} -> {ok, [], 0};
+ {error, Reason} -> throw({error,
+ {unable_to_scan_file, FileName, Reason}})
+ end.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 7187e322dd..f5d7978cae 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -921,7 +921,7 @@ msg_store_write(MsgIds) ->
ok = lists:foldl(
fun (MsgId, ok) -> rabbit_msg_store:write(MsgId, MsgId) end,
ok, MsgIds).
-
+
test_msg_store() ->
stop_msg_store(),
ok = start_msg_store_empty(),
@@ -1016,16 +1016,23 @@ test_msg_store() ->
fun (MsgId, ok) ->
rabbit_msg_store:write(msg_id_bin(MsgId), Payload)
end, ok, MsgIdsBig),
- %% .., then remove even numbers ascending, and odd numbers
- %% descending. This hits the GC.
+ %% .., then 3s by 1...
ok = lists:foldl(
fun (MsgId, ok) ->
- rabbit_msg_store:remove([msg_id_bin(
- case MsgId rem 2 of
- 0 -> MsgId;
- 1 -> BigCount - MsgId
- end)])
- end, ok, MsgIdsBig),
+ rabbit_msg_store:remove([msg_id_bin(MsgId)])
+ end, ok, lists:seq(BigCount, 1, -3)),
+ %% .., then remove 3s by 2, from the young end first. This hits
+ %% GC (under 50% good data left, but no empty files. Must GC).
+ ok = lists:foldl(
+ fun (MsgId, ok) ->
+ rabbit_msg_store:remove([msg_id_bin(MsgId)])
+ end, ok, lists:seq(BigCount-1, 1, -3)),
+ %% .., then remove 3s by 3, from the young end first. This hits
+ %% GC...
+ ok = lists:foldl(
+ fun (MsgId, ok) ->
+ rabbit_msg_store:remove([msg_id_bin(MsgId)])
+ end, ok, lists:seq(BigCount-2, 1, -3)),
%% ensure empty
false = msg_store_contains(false, [msg_id_bin(M) || M <- MsgIdsBig]),
%% restart empty
diff --git a/src/random_distributions.erl b/src/random_distributions.erl
new file mode 100644
index 0000000000..dfcdc834c5
--- /dev/null
+++ b/src/random_distributions.erl
@@ -0,0 +1,38 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(random_distributions).
+
+-export([geometric/1]).
+
+geometric(P) when 0.0 < P andalso P < 1.0 ->
+ U = 1.0 - random:uniform(),
+ rabbit_misc:ceil(math:log(U) / math:log(1.0 - P)).