summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-11-06 14:18:51 +0000
committerMatthew Sackman <matthew@lshift.net>2009-11-06 14:18:51 +0000
commit4c8dcae91ec74a85783b0793d0bbc8af39134174 (patch)
treeb7958bbd5b90c5491f57a68d53da466bcb0b7d5e /src
parent7a636944bcd9c951af1c3ce32f63a6d132106728 (diff)
downloadrabbitmq-server-git-4c8dcae91ec74a85783b0793d0bbc8af39134174.tar.gz
man 2 write states:
POSIX requires that a read(2) which can be proved to occur after a write() has returned returns the new data. This means that in the msg_store, when we're reading a msg, we only need to make sure that write has really been called on the msg. We do not need to sync here. So, add a test, explicitly to hit this condition. We were using both buffering in the fhc, and also delayed_write. This is dangerous, because we can think that we've written out from the fhc, but even though we're writing > the 64 kbyte default limit in delayed_write, we may not be writing a whole number of 64kb blocks, thus there may be something left in the delayed_write buffer, so remove that. Performance doesn't suffer because we're writing blocks of 1MB anyway! Also, the current_offset/1 fun in fhc was returning the virtual address - i.e. where we would be if we flushed out the write buffer. We actually need both variants - getting the real raw offset of the handle, and getting the virtual offset. Finally, as a result of all of this, the sync fun in msg_store now has no need to do any work unless there are outstanding transactions.
Diffstat (limited to 'src')
-rw-r--r--src/file_handle_cache.erl23
-rw-r--r--src/rabbit_msg_store.erl38
-rw-r--r--src/rabbit_tests.erl5
3 files changed, 42 insertions, 24 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index ae9133b8da..5c1c5a83d4 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -32,7 +32,8 @@
-module(file_handle_cache).
-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
- last_sync_offset/1, current_offset/1, append_write_buffer/1, copy/3]).
+ last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1,
+ append_write_buffer/1, copy/3]).
%%----------------------------------------------------------------------------
@@ -80,7 +81,8 @@
({'ok', non_neg_integer()} | error())).
-spec(truncate/1 :: (ref()) -> ok_or_error()).
-spec(last_sync_offset/1 :: (ref()) -> ({'ok', integer()} | error())).
--spec(current_offset/1 :: (ref()) -> ({'ok', integer()} | error())).
+-spec(current_virtual_offset/1 :: (ref()) -> ({'ok', integer()} | error())).
+-spec(current_raw_offset/1 :: (ref()) -> ({'ok', integer()} | error())).
-spec(append_write_buffer/1 :: (ref()) -> ok_or_error()).
-spec(copy/3 :: (ref(), ref(), non_neg_integer()) ->
({'ok', integer()} | error())).
@@ -109,7 +111,7 @@ open(Path, Mode, Options) ->
true -> RCount + 1;
false -> RCount
end,
- put({Path1, fhc_file},
+ put({GRef, fhc_file},
File #file {
reader_count = RCount1,
has_writer = HasWriter orelse IsWriter }),
@@ -149,9 +151,9 @@ close(Ref) ->
RCount1 = case IsReader of
true -> RCount - 1;
false -> RCount
- end,
+ end,
HasWriter1 = HasWriter andalso not IsWriter,
- case RCount1 == 0 andalso not HasWriter1 of
+ case RCount1 =:= 0 andalso not HasWriter1 of
true -> erase({GRef, fhc_file}),
erase({Path, fhc_path});
false -> put({GRef, fhc_file},
@@ -274,16 +276,21 @@ last_sync_offset(Ref) ->
Error -> Error
end.
-current_offset(Ref) ->
+current_virtual_offset(Ref) ->
case get_or_reopen(Ref) of
{ok, #handle { at_eof = true, is_write = true, offset = Offset,
write_buffer_size = Size }} ->
{ok, Offset + Size};
- {ok, #handle { offset = Offset }} ->
- {ok, Offset};
+ {ok, #handle { offset = Offset }} -> {ok, Offset};
Error -> Error
end.
+current_raw_offset(Ref) ->
+ case get_or_reopen(Ref) of
+ {ok, #handle { offset = Offset }} -> {ok, Offset};
+ Error -> Error
+ end.
+
append_write_buffer(Ref) ->
case get_or_reopen(Ref) of
{ok, Handle} ->
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 89f13c6fd0..591435ba01 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -102,7 +102,7 @@
-define(BINARY_MODE, [raw, binary]).
-define(READ_MODE, [read]).
-define(READ_AHEAD_MODE, [read_ahead | ?READ_MODE]).
--define(WRITE_MODE, [write, delayed_write]).
+-define(WRITE_MODE, [write]).
%% The components:
%%
@@ -306,7 +306,7 @@ handle_cast({write, MsgId, Msg},
case index_lookup(MsgId, State) of
not_found ->
%% New message, lots to do
- {ok, CurOffset} = file_handle_cache:current_offset(CurHdl),
+ {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
{ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg),
ok = index_insert(#msg_location {
msg_id = MsgId, ref_count = 1, file = CurFile,
@@ -453,10 +453,13 @@ truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) ->
sync(State = #msstate { current_file_handle = CurHdl,
on_sync = Syncs }) ->
State1 = stop_sync_timer(State),
- %% we depend on this really calling sync, even if [] == Syncs
- ok = file_handle_cache:sync(CurHdl),
- lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)),
- State1 #msstate { on_sync = [] }.
+ case Syncs of
+ [] -> State1;
+ _ ->
+ ok = file_handle_cache:sync(CurHdl),
+ lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)),
+ State1 #msstate { on_sync = [] }
+ end.
remove_message(MsgId, State = #msstate { file_summary = FileSummary }) ->
StoreEntry = #msg_location { ref_count = RefCount, file = File,
@@ -493,13 +496,15 @@ internal_read_message(MsgId,
total_size = TotalSize } ->
case fetch_and_increment_cache(MsgId, State) of
not_found ->
- {ok, SyncOffset} = file_handle_cache:last_sync_offset(CurHdl),
- State1 =
- case CurFile =:= File andalso Offset >= SyncOffset of
- true -> sync(State);
- false -> State
- end,
- {Hdl, State2} = get_read_handle(File, State1),
+ {ok, CurOffset} =
+ file_handle_cache:current_raw_offset(CurHdl),
+ ok = case CurFile =:= File andalso Offset >= CurOffset of
+ true ->
+ file_handle_cache:append_write_buffer(CurHdl);
+ false ->
+ ok
+ end,
+ {Hdl, State1} = get_read_handle(File, State),
{ok, Offset} = file_handle_cache:position(Hdl, Offset),
{ok, {MsgId, Msg}} =
case rabbit_msg_file:read(Hdl, TotalSize) of
@@ -508,17 +513,18 @@ internal_read_message(MsgId,
throw({error, {misread, [{old_state, State},
{file_num, File},
{offset, Offset},
- {read, Rest}]}})
+ {read, Rest},
+ {proc_dict, get()}]}})
end,
ok = if RefCount > 1 ->
- insert_into_cache(MsgId, Msg, State2);
+ insert_into_cache(MsgId, Msg, State1);
true -> ok
%% it's not in the cache and we
%% only have one reference to the
%% message. So don't bother
%% putting it in the cache.
end,
- {{ok, Msg}, State2};
+ {{ok, Msg}, State1};
{Msg, _RefCount} ->
{{ok, Msg}, State}
end
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 9b53334eb7..d618d3e030 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -965,6 +965,11 @@ test_msg_store() ->
ok = start_msg_store_empty(),
%% check we don't contain any of the msgs
false = msg_store_contains(false, MsgIds),
+ %% publish the first half again
+ ok = msg_store_write(MsgIds1stHalf),
+ %% this should force some sort of sync internally otherwise misread
+ ok = msg_store_read(MsgIds1stHalf),
+ ok = rabbit_msg_store:remove(MsgIds1stHalf),
%% push a lot of msgs in...
BigCount = 100000,
MsgIdsBig = lists:seq(1, BigCount),