summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-01-17 20:48:11 +0000
committerMatthew Sackman <matthew@lshift.net>2010-01-17 20:48:11 +0000
commit33caa1d97ca81b59ebc44bef84feaaef2cf86e2c (patch)
treebd4e9f2d177838403f4171b90d4f6ad73a42fdb6
parent236aeca90193e83bc46c75d1c29f52f9e876c8eb (diff)
downloadrabbitmq-server-git-33caa1d97ca81b59ebc44bef84feaaef2cf86e2c.tar.gz
Refactored and generally tidied the msg_store. Also added a write-back cache for the current file. This means that the clients don't need to go to the server when reading a msg from the current file. Managed to avoid using any further lines!
-rw-r--r--include/rabbit_msg_store.hrl7
-rw-r--r--src/rabbit_msg_store.erl281
2 files changed, 144 insertions, 144 deletions
diff --git a/include/rabbit_msg_store.hrl b/include/rabbit_msg_store.hrl
index a094454a78..4dff4a01b2 100644
--- a/include/rabbit_msg_store.hrl
+++ b/include/rabbit_msg_store.hrl
@@ -50,6 +50,7 @@
-define(HANDLE_CACHE_BUFFER_SIZE, 1048576). %% 1MB
--define(FILE_SUMMARY_ETS_NAME, rabbit_msg_store_file_summary).
--define(CACHE_ETS_NAME, rabbit_msg_store_cache).
--define(FILE_HANDLES_ETS_NAME, rabbit_msg_store_file_handles).
+-define(FILE_SUMMARY_ETS_NAME, rabbit_msg_store_file_summary).
+-define(CACHE_ETS_NAME, rabbit_msg_store_cache).
+-define(FILE_HANDLES_ETS_NAME, rabbit_msg_store_file_handles).
+-define(CUR_FILE_CACHE_ETS_NAME, rabbit_msg_store_cur_file).
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index f362d15d07..272db825eb 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -247,101 +247,13 @@ read(MsgId, CState) ->
case index_lookup(MsgId, CState) of
not_found ->
Defer();
- #msg_location { ref_count = RefCount,
- file = File,
- offset = Offset,
- total_size = TotalSize } ->
+ MsgLocation ->
case fetch_and_increment_cache(MsgId) of
- not_found ->
- [#file_summary { locked = Locked, right = Right }] =
- ets:lookup(?FILE_SUMMARY_ETS_NAME, File),
- case Right =:= undefined orelse Locked =:= true of
- true ->
- Defer();
- false ->
- ets:update_counter(?FILE_SUMMARY_ETS_NAME, File,
- {#file_summary.readers, 1}),
- Release = fun() ->
- ets:update_counter(
- ?FILE_SUMMARY_ETS_NAME, File,
- {#file_summary.readers, -1})
- end,
- %% If a GC hasn't already started, it
- %% won't start now. Need to check again to
- %% see if we've been locked in the
- %% meantime, between lookup and
- %% update_counter (thus GC actually in
- %% progress).
- [#file_summary { locked = Locked2 }] =
- ets:lookup(?FILE_SUMMARY_ETS_NAME, File),
- case Locked2 of
- true ->
- Release(),
- Defer();
- false ->
- %% Ok, we're definitely safe to
- %% continue - a GC can't start up
- %% now, and isn't running, so
- %% nothing will tell us from now
- %% on to close the handle if it's
- %% already open. (Well, a GC could
- %% start, and could put close
- %% entries into the ets table, but
- %% the GC will wait until we're
- %% done here before doing any real
- %% work.)
-
- %% This is fine to fail (already
- %% exists)
- ets:insert_new(?FILE_HANDLES_ETS_NAME,
- {{self(), File}, open}),
- CState1 = close_all_indicated(CState),
- {Hdl, CState3} =
- get_read_handle(File, CState1),
- {ok, Offset} =
- file_handle_cache:position(Hdl, Offset),
- {ok, {MsgId, Msg}} =
- case rabbit_msg_file:read(Hdl, TotalSize) of
- {ok, {MsgId, _}} = Obj -> Obj;
- Rest ->
- throw({error,
- {misread,
- [{old_cstate, CState1},
- {file_num, File},
- {offset, Offset},
- {read, Rest},
- {proc_dict, get()}
- ]}})
- end,
- Release(),
- ok = case RefCount > 1 of
- true ->
- insert_into_cache(MsgId, Msg);
- false ->
- %% It's not in the
- %% cache and we only
- %% have one reference
- %% to the message. So
- %% don't bother
- %% putting it in the
- %% cache.
- ok
- end,
- {{ok, Msg}, CState3}
- end
- end;
- Msg ->
- {{ok, Msg}, CState}
+ not_found -> client_read1(MsgLocation, Defer, CState);
+ Msg -> {{ok, Msg}, CState}
end
end.
-close_all_indicated(CState) ->
- Objs = ets:match_object(?FILE_HANDLES_ETS_NAME, {{self(), '_'}, close}),
- lists:foldl(fun ({Key = {_Self, File}, close}, CStateM) ->
- true = ets:delete(?FILE_HANDLES_ETS_NAME, Key),
- close_handle(File, CStateM)
- end, CState, Objs).
-
contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity).
remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}).
release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}).
@@ -364,6 +276,69 @@ client_terminate(CState) ->
ok.
%%----------------------------------------------------------------------------
+%% Client-side-only helpers
+%%----------------------------------------------------------------------------
+
+client_read1(#msg_location { msg_id = MsgId, ref_count = RefCount, file = File }
+ = MsgLocation, Defer, CState) ->
+ [#file_summary { locked = Locked, right = Right }] =
+ ets:lookup(?FILE_SUMMARY_ETS_NAME, File),
+ case {Right, Locked} of
+ {undefined, false} ->
+ case ets:lookup(?CUR_FILE_CACHE_ETS_NAME, MsgId) of
+ [] ->
+ Defer(); %% may have rolled over
+ [{MsgId, Msg}] ->
+ ok = maybe_insert_into_cache(RefCount, MsgId, Msg),
+ {{ok, Msg}, CState}
+ end;
+ {_, true} ->
+ Defer();
+ _ ->
+ ets:update_counter(?FILE_SUMMARY_ETS_NAME, File,
+ {#file_summary.readers, 1}),
+ Release = fun() ->
+ ets:update_counter(?FILE_SUMMARY_ETS_NAME, File,
+ {#file_summary.readers, -1})
+ end,
+ %% If a GC hasn't already started, it won't start
+ %% now. Need to check again to see if we've been locked in
+ %% the meantime, between lookup and update_counter (thus
+ %% GC actually in progress).
+ [#file_summary { locked = Locked2 }] =
+ ets:lookup(?FILE_SUMMARY_ETS_NAME, File),
+ case Locked2 of
+ true ->
+ Release(),
+ Defer();
+ false ->
+ %% Ok, we're definitely safe to continue - a GC
+ %% can't start up now, and isn't running, so
+ %% nothing will tell us from now on to close the
+ %% handle if it's already open. (Well, a GC could
+ %% start, and could put close entries into the ets
+ %% table, but the GC will wait until we're done
+ %% here before doing any real work.)
+
+ %% This is fine to fail (already exists)
+ ets:insert_new(?FILE_HANDLES_ETS_NAME,
+ {{self(), File}, open}),
+ CState1 = close_all_indicated(CState),
+ {Msg, CState2} = read_from_disk(MsgLocation, CState1),
+ ok = maybe_insert_into_cache(RefCount, MsgId, Msg),
+ Release(),
+ {{ok, Msg}, CState2}
+ end
+ end.
+
+close_all_indicated(CState) ->
+ Objs = ets:match_object(?FILE_HANDLES_ETS_NAME, {{self(), '_'}, close}),
+ lists:foldl(fun ({Key = {_Self, File}, close}, CStateM) ->
+ true = ets:delete(?FILE_HANDLES_ETS_NAME, Key),
+ close_handle(File, CStateM)
+ end, CState, Objs).
+
+%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
@@ -384,6 +359,8 @@ init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
?CACHE_ETS_NAME = ets:new(?CACHE_ETS_NAME, [set, public, named_table]),
?FILE_HANDLES_ETS_NAME = ets:new(?FILE_HANDLES_ETS_NAME,
[ordered_set, public, named_table]),
+ ?CUR_FILE_CACHE_ETS_NAME = ets:new(?CUR_FILE_CACHE_ETS_NAME,
+ [set, public, named_table]),
State =
#msstate { dir = Dir,
index_module = IndexModule,
@@ -444,6 +421,7 @@ handle_cast({write, MsgId, Msg},
case index_lookup(MsgId, State) of
not_found ->
%% New message, lots to do
+ true = ets:insert_new(?CUR_FILE_CACHE_ETS_NAME, {MsgId, Msg}),
{ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
{ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg),
ok = index_insert(#msg_location {
@@ -563,6 +541,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
ets:delete(?FILE_SUMMARY_ETS_NAME),
ets:delete(?CACHE_ETS_NAME),
ets:delete(?FILE_HANDLES_ETS_NAME),
+ ets:delete(?CUR_FILE_CACHE_ETS_NAME),
IndexModule:terminate(IndexState),
State3 #msstate { index_state = undefined,
current_file_handle = undefined }.
@@ -621,66 +600,80 @@ sync(State = #msstate { current_file_handle = CurHdl,
State1 #msstate { on_sync = [] }
end.
-read_message(MsgId, From, State =
- #msstate { current_file = CurFile,
- current_file_handle = CurHdl }) ->
+read_message(MsgId, From, State) ->
case index_lookup(MsgId, State) of
not_found -> gen_server2:reply(From, not_found),
State;
- #msg_location { ref_count = RefCount,
- file = File,
- offset = Offset,
- total_size = TotalSize } ->
+ MsgLocation ->
case fetch_and_increment_cache(MsgId) of
not_found ->
- [#file_summary { locked = Locked }] =
- ets:lookup(?FILE_SUMMARY_ETS_NAME, File),
- case Locked of
- true ->
- add_to_pending_gc_completion({read, MsgId, From},
- State);
- false ->
- ok = case CurFile =:= File andalso {ok, Offset} >=
- file_handle_cache:current_raw_offset(
- CurHdl) of
- true -> file_handle_cache:flush(CurHdl);
- false -> ok
- end,
- {Hdl, State1} = get_read_handle(File, State),
- {ok, Offset} =
- file_handle_cache:position(Hdl, Offset),
- {ok, {MsgId, Msg}} =
- case rabbit_msg_file:read(Hdl, TotalSize) of
- {ok, {MsgId, _}} = Obj -> Obj;
- Rest ->
- throw({error, {misread,
- [{old_state, State},
- {file_num, File},
- {offset, Offset},
- {read, Rest},
- {proc_dict, get()}
- ]}})
- end,
- ok = case RefCount > 1 of
- true ->
- insert_into_cache(MsgId, Msg);
- false ->
- %% it's not in the cache and
- %% we only have one reference
- %% to the message. So don't
- %% bother putting it in the
- %% cache.
- ok
- end,
- gen_server2:reply(From, {ok, Msg}),
- State1
- end;
+ read_message1(From, MsgLocation, State);
Msg ->
gen_server2:reply(From, {ok, Msg}),
State
end
end.
+read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount,
+ file = File, offset = Offset } = MsgLoc,
+ State = #msstate { current_file = CurFile,
+ current_file_handle = CurHdl }) ->
+ case File =:= CurFile of
+ true ->
+ {Msg, State1} =
+ %% can return [] if msg in file existed on startup
+ case ets:lookup(?CUR_FILE_CACHE_ETS_NAME, MsgId) of
+ [] ->
+ ok = case {ok, Offset} >=
+ file_handle_cache:current_raw_offset(CurHdl) of
+ true -> file_handle_cache:flush(CurHdl);
+ false -> ok
+ end,
+ read_from_disk(MsgLoc, State);
+ [{MsgId, Msg1}] ->
+ {Msg1, State}
+ end,
+ ok = maybe_insert_into_cache(RefCount, MsgId, Msg),
+ gen_server2:reply(From, {ok, Msg}),
+ State1;
+ false ->
+ [#file_summary { locked = Locked }] =
+ ets:lookup(?FILE_SUMMARY_ETS_NAME, File),
+ case Locked of
+ true ->
+ add_to_pending_gc_completion({read, MsgId, From}, State);
+ false ->
+ {Msg, State1} = read_from_disk(MsgLoc, State),
+ gen_server2:reply(From, {ok, Msg}),
+ State1
+ end
+ end.
+
+read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount,
+ file = File, offset = Offset,
+ total_size = TotalSize }, State) ->
+ {Hdl, State1} = get_read_handle(File, State),
+ {ok, Offset} = file_handle_cache:position(Hdl, Offset),
+ {ok, {MsgId, Msg}} =
+ case rabbit_msg_file:read(Hdl, TotalSize) of
+ {ok, {MsgId, _}} = Obj ->
+ Obj;
+ Rest ->
+ throw({error, {misread, [{old_state, State},
+ {file_num, File},
+ {offset, Offset},
+ {read, Rest},
+ {proc_dict, get()}
+ ]}})
+ end,
+ ok = maybe_insert_into_cache(RefCount, MsgId, Msg),
+ {Msg, State1}.
+
+maybe_insert_into_cache(RefCount, MsgId, Msg) when RefCount > 1 ->
+ insert_into_cache(MsgId, Msg);
+maybe_insert_into_cache(_RefCount, _MsgId, _Msg) ->
+ ok.
+
contains_message(MsgId, From, State = #msstate { gc_active = GCActive }) ->
case index_lookup(MsgId, State) of
not_found ->
@@ -697,10 +690,15 @@ contains_message(MsgId, From, State = #msstate { gc_active = GCActive }) ->
end
end.
-remove_message(MsgId, State = #msstate { sum_valid_data = SumValid }) ->
+remove_message(MsgId, State = #msstate { sum_valid_data = SumValid,
+ current_file = CurFile }) ->
#msg_location { ref_count = RefCount, file = File,
offset = Offset, total_size = TotalSize } =
index_lookup(MsgId, State),
+ true = case File =:= CurFile of
+ true -> ets:delete(?CUR_FILE_CACHE_ETS_NAME, MsgId);
+ false -> true
+ end,
case RefCount of
1 ->
ok = remove_cache_entry(MsgId),
@@ -1104,6 +1102,7 @@ maybe_roll_to_new_file(Offset,
locked = false, readers = 0 }),
true = ets:update_element(?FILE_SUMMARY_ETS_NAME, CurFile,
{#file_summary.right, NextFile}),
+ true = ets:delete_all_objects(?CUR_FILE_CACHE_ETS_NAME),
State1 #msstate { current_file_handle = NextHdl,
current_file = NextFile };
maybe_roll_to_new_file(_, State) ->