diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-11-06 14:18:51 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-11-06 14:18:51 +0000 |
| commit | 4c8dcae91ec74a85783b0793d0bbc8af39134174 (patch) | |
| tree | b7958bbd5b90c5491f57a68d53da466bcb0b7d5e /src | |
| parent | 7a636944bcd9c951af1c3ce32f63a6d132106728 (diff) | |
| download | rabbitmq-server-git-4c8dcae91ec74a85783b0793d0bbc8af39134174.tar.gz | |
man 2 write states:
POSIX requires that a read(2) which can be proved to occur after a write() has returned returns the new data.
This means that in the msg_store, when we're reading a msg, we only need to make sure that write has really been called on the msg. We do not need to sync here. So, add a test, explicitly to hit this condition.
We were using both buffering in the fhc, and also delayed_write. This is dangerous, because we can think that we've written out from the fhc, but even though we're writing > the 64 kbyte default limit in delayed_write, we may not be writing a whole number of 64kb blocks, thus there may be something left in the delayed_write buffer, so remove that. Performance doesn't suffer because we're writing blocks of 1MB anyway!
Also, the current_offset/1 fun in fhc was returning the virtual address - i.e. where we would be if we flushed out the write buffer. We actually need both variants - getting the real raw offset of the handle, and getting the virtual offset.
Finally, as a result of all of this, the sync fun in msg_store now has no need to do any work unless there are outstanding transactions.
Diffstat (limited to 'src')
| -rw-r--r-- | src/file_handle_cache.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 38 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 5 |
3 files changed, 42 insertions, 24 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index ae9133b8da..5c1c5a83d4 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -32,7 +32,8 @@ -module(file_handle_cache). -export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, - last_sync_offset/1, current_offset/1, append_write_buffer/1, copy/3]). + last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1, + append_write_buffer/1, copy/3]). %%---------------------------------------------------------------------------- @@ -80,7 +81,8 @@ ({'ok', non_neg_integer()} | error())). -spec(truncate/1 :: (ref()) -> ok_or_error()). -spec(last_sync_offset/1 :: (ref()) -> ({'ok', integer()} | error())). --spec(current_offset/1 :: (ref()) -> ({'ok', integer()} | error())). +-spec(current_virtual_offset/1 :: (ref()) -> ({'ok', integer()} | error())). +-spec(current_raw_offset/1 :: (ref()) -> ({'ok', integer()} | error())). -spec(append_write_buffer/1 :: (ref()) -> ok_or_error()). -spec(copy/3 :: (ref(), ref(), non_neg_integer()) -> ({'ok', integer()} | error())). @@ -109,7 +111,7 @@ open(Path, Mode, Options) -> true -> RCount + 1; false -> RCount end, - put({Path1, fhc_file}, + put({GRef, fhc_file}, File #file { reader_count = RCount1, has_writer = HasWriter orelse IsWriter }), @@ -149,9 +151,9 @@ close(Ref) -> RCount1 = case IsReader of true -> RCount - 1; false -> RCount - end, + end, HasWriter1 = HasWriter andalso not IsWriter, - case RCount1 == 0 andalso not HasWriter1 of + case RCount1 =:= 0 andalso not HasWriter1 of true -> erase({GRef, fhc_file}), erase({Path, fhc_path}); false -> put({GRef, fhc_file}, @@ -274,16 +276,21 @@ last_sync_offset(Ref) -> Error -> Error end. -current_offset(Ref) -> +current_virtual_offset(Ref) -> case get_or_reopen(Ref) of {ok, #handle { at_eof = true, is_write = true, offset = Offset, write_buffer_size = Size }} -> {ok, Offset + Size}; - {ok, #handle { offset = Offset }} -> - {ok, Offset}; + {ok, #handle { offset = Offset }} -> {ok, Offset}; Error -> Error end. +current_raw_offset(Ref) -> + case get_or_reopen(Ref) of + {ok, #handle { offset = Offset }} -> {ok, Offset}; + Error -> Error + end. + append_write_buffer(Ref) -> case get_or_reopen(Ref) of {ok, Handle} -> diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 89f13c6fd0..591435ba01 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -102,7 +102,7 @@ -define(BINARY_MODE, [raw, binary]). -define(READ_MODE, [read]). -define(READ_AHEAD_MODE, [read_ahead | ?READ_MODE]). --define(WRITE_MODE, [write, delayed_write]). +-define(WRITE_MODE, [write]). %% The components: %% @@ -306,7 +306,7 @@ handle_cast({write, MsgId, Msg}, case index_lookup(MsgId, State) of not_found -> %% New message, lots to do - {ok, CurOffset} = file_handle_cache:current_offset(CurHdl), + {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl), {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg), ok = index_insert(#msg_location { msg_id = MsgId, ref_count = 1, file = CurFile, @@ -453,10 +453,13 @@ truncate_and_extend_file(FileHdl, Lowpoint, Highpoint) -> sync(State = #msstate { current_file_handle = CurHdl, on_sync = Syncs }) -> State1 = stop_sync_timer(State), - %% we depend on this really calling sync, even if [] == Syncs - ok = file_handle_cache:sync(CurHdl), - lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), - State1 #msstate { on_sync = [] }. + case Syncs of + [] -> State1; + _ -> + ok = file_handle_cache:sync(CurHdl), + lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), + State1 #msstate { on_sync = [] } + end. remove_message(MsgId, State = #msstate { file_summary = FileSummary }) -> StoreEntry = #msg_location { ref_count = RefCount, file = File, @@ -493,13 +496,15 @@ internal_read_message(MsgId, total_size = TotalSize } -> case fetch_and_increment_cache(MsgId, State) of not_found -> - {ok, SyncOffset} = file_handle_cache:last_sync_offset(CurHdl), - State1 = - case CurFile =:= File andalso Offset >= SyncOffset of - true -> sync(State); - false -> State - end, - {Hdl, State2} = get_read_handle(File, State1), + {ok, CurOffset} = + file_handle_cache:current_raw_offset(CurHdl), + ok = case CurFile =:= File andalso Offset >= CurOffset of + true -> + file_handle_cache:append_write_buffer(CurHdl); + false -> + ok + end, + {Hdl, State1} = get_read_handle(File, State), {ok, Offset} = file_handle_cache:position(Hdl, Offset), {ok, {MsgId, Msg}} = case rabbit_msg_file:read(Hdl, TotalSize) of @@ -508,17 +513,18 @@ internal_read_message(MsgId, throw({error, {misread, [{old_state, State}, {file_num, File}, {offset, Offset}, - {read, Rest}]}}) + {read, Rest}, + {proc_dict, get()}]}}) end, ok = if RefCount > 1 -> - insert_into_cache(MsgId, Msg, State2); + insert_into_cache(MsgId, Msg, State1); true -> ok %% it's not in the cache and we %% only have one reference to the %% message. So don't bother %% putting it in the cache. end, - {{ok, Msg}, State2}; + {{ok, Msg}, State1}; {Msg, _RefCount} -> {{ok, Msg}, State} end diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 9b53334eb7..d618d3e030 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -965,6 +965,11 @@ test_msg_store() -> ok = start_msg_store_empty(), %% check we don't contain any of the msgs false = msg_store_contains(false, MsgIds), + %% publish the first half again + ok = msg_store_write(MsgIds1stHalf), + %% this should force some sort of sync internally otherwise misread + ok = msg_store_read(MsgIds1stHalf), + ok = rabbit_msg_store:remove(MsgIds1stHalf), %% push a lot of msgs in... BigCount = 100000, MsgIdsBig = lists:seq(1, BigCount), |
