diff options
| author | Matthias Radestock <matthias@lshift.net> | 2009-08-29 11:15:16 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2009-08-29 11:15:16 +0100 |
| commit | 8a5b3cdf7c705d3e8194af0ac32c52e2a9780db5 (patch) | |
| tree | 006e672f82bf44a4e8df6e63be21e50155196775 | |
| parent | 6b3c6014ef7164638dd1658d8251271716163291 (diff) | |
| download | rabbitmq-server-git-8a5b3cdf7c705d3e8194af0ac32c52e2a9780db5.tar.gz | |
refactoring: move all the low level message file i/o into a separate module
The details of how messages are encoded in files are opaque to disk_queue.
| -rw-r--r-- | src/rabbit_disk_queue.erl | 121 | ||||
| -rw-r--r-- | src/rabbit_msg_file.erl | 148 |
2 files changed, 153 insertions, 116 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 719ff1a039..84c3b6e345 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -54,11 +54,6 @@ -include("rabbit.hrl"). --define(WRITE_OK_SIZE_BITS, 8). --define(WRITE_OK_TRANSIENT, 255). --define(WRITE_OK_PERSISTENT, 254). --define(INTEGER_SIZE_BYTES, 8). --define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)). -define(MSG_LOC_NAME, rabbit_disk_queue_msg_location). -define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary). -define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences). @@ -66,7 +61,6 @@ -define(FILE_EXTENSION, ".rdq"). -define(FILE_EXTENSION_TMP, ".rdt"). -define(FILE_EXTENSION_DETS, ".dets"). --define(FILE_PACKING_ADJUSTMENT, (1 + (2* (?INTEGER_SIZE_BYTES)))). -define(BATCH_SIZE, 10000). -define(CACHE_MAX_SIZE, 10485760). -define(MAX_READ_FILE_HANDLES, 256). @@ -913,7 +907,7 @@ read_stored_message(#message_store_entry { msg_id = MsgId, ref_count = RefCount, with_read_handle_at( File, Offset, fun(Hdl) -> - Res = case read_message_from_disk(Hdl, TotalSize) of + Res = case rabbit_msg_file:read(Hdl, TotalSize) of {ok, {_, _, _}} = Obj -> Obj; {ok, Rest} -> throw({error, @@ -1024,8 +1018,9 @@ internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent, case dets_ets_lookup(State, MsgId) of [] -> %% New message, lots to do - {ok, TotalSize} = append_message(CurHdl, MsgId, msg_to_bin(Message), - IsPersistent), + {ok, TotalSize} = rabbit_msg_file:append( + CurHdl, MsgId, msg_to_bin(Message), + IsPersistent), true = dets_ets_insert_new( State, #message_store_entry { msg_id = MsgId, ref_count = 1, file = CurName, @@ -1902,50 +1897,10 @@ get_disk_queue_files() -> DQTFilesSorted = lists:sort(fun file_name_sort/2, DQTFiles), {DQFilesSorted, DQTFilesSorted}. -%%---------------------------------------------------------------------------- -%% raw reading and writing of files -%%---------------------------------------------------------------------------- - -append_message(FileHdl, MsgId, MsgBody, IsPersistent) when is_binary(MsgBody) -> - BodySize = size(MsgBody), - MsgIdBin = term_to_binary(MsgId), - MsgIdBinSize = size(MsgIdBin), - Size = BodySize + MsgIdBinSize, - StopByte = case IsPersistent of - true -> ?WRITE_OK_PERSISTENT; - false -> ?WRITE_OK_TRANSIENT - end, - case file:write(FileHdl, <<Size:?INTEGER_SIZE_BITS, - MsgIdBinSize:?INTEGER_SIZE_BITS, - MsgIdBin:MsgIdBinSize/binary, - MsgBody:BodySize/binary, - StopByte:?WRITE_OK_SIZE_BITS>>) of - ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT}; - KO -> KO - end. - -read_message_from_disk(FileHdl, TotalSize) -> - Size = TotalSize - ?FILE_PACKING_ADJUSTMENT, - SizeWriteOkBytes = Size + 1, - case file:read(FileHdl, TotalSize) of - {ok, <<Size:?INTEGER_SIZE_BITS, - MsgIdBinSize:?INTEGER_SIZE_BITS, - Rest:SizeWriteOkBytes/binary>>} -> - BodySize = Size - MsgIdBinSize, - <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary, - StopByte:?WRITE_OK_SIZE_BITS>> = Rest, - Persistent = case StopByte of - ?WRITE_OK_TRANSIENT -> false; - ?WRITE_OK_PERSISTENT -> true - end, - {ok, {MsgBody, Persistent, BodySize}}; - KO -> KO - end. - scan_file_for_valid_messages(File) -> case open_file(File, ?READ_MODE) of {ok, Hdl} -> - Valid = scan_file_for_valid_messages(Hdl, 0, []), + Valid = rabbit_msg_file:scan(Hdl), %% if something really bad's happened, the close could fail, %% but ignore file:close(Hdl), @@ -1953,69 +1908,3 @@ scan_file_for_valid_messages(File) -> {error, enoent} -> {ok, []}; {error, Reason} -> throw({error, {unable_to_scan_file, File, Reason}}) end. - -scan_file_for_valid_messages(FileHdl, Offset, Acc) -> - case read_next_file_entry(FileHdl, Offset) of - eof -> {ok, Acc}; - {corrupted, NextOffset} -> - scan_file_for_valid_messages(FileHdl, NextOffset, Acc); - {ok, {MsgId, IsPersistent, TotalSize, NextOffset}} -> - scan_file_for_valid_messages( - FileHdl, NextOffset, - [{MsgId, IsPersistent, TotalSize, Offset} | Acc]); - _KO -> - %% bad message, but we may still have recovered some valid messages - {ok, Acc} - end. - -read_next_file_entry(FileHdl, Offset) -> - TwoIntegers = 2 * ?INTEGER_SIZE_BYTES, - case file:read(FileHdl, TwoIntegers) of - {ok, - <<Size:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} -> - case {Size, MsgIdBinSize} of - {0, _} -> eof; %% Nothing we can do other than stop - {_, 0} -> - %% current message corrupted, try skipping past it - ExpectedAbsPos = Offset + Size + ?FILE_PACKING_ADJUSTMENT, - case file:position(FileHdl, {cur, Size + 1}) of - {ok, ExpectedAbsPos} -> {corrupted, ExpectedAbsPos}; - {ok, _SomeOtherPos} -> eof; %% seek failed, so give up - KO -> KO - end; - {_, _} -> %% all good, let's continue - case file:read(FileHdl, MsgIdBinSize) of - {ok, <<MsgIdBin:MsgIdBinSize/binary>>} -> - TotalSize = Size + ?FILE_PACKING_ADJUSTMENT, - ExpectedAbsPos = Offset + TotalSize - 1, - case file:position( - FileHdl, {cur, Size - MsgIdBinSize}) of - {ok, ExpectedAbsPos} -> - NextOffset = ExpectedAbsPos + 1, - case read_stop_byte(FileHdl) of - {ok, Persistent} -> - MsgId = binary_to_term(MsgIdBin), - {ok, {MsgId, Persistent, - TotalSize, NextOffset}}; - corrupted -> - {corrupted, NextOffset}; - KO -> KO - end; - {ok, _SomeOtherPos} -> - %% seek failed, so give up - eof; - KO -> KO - end; - Other -> Other - end - end; - Other -> Other - end. - -read_stop_byte(FileHdl) -> - case file:read(FileHdl, 1) of - {ok, <<?WRITE_OK_TRANSIENT:?WRITE_OK_SIZE_BITS>>} -> {ok, false}; - {ok, <<?WRITE_OK_PERSISTENT:?WRITE_OK_SIZE_BITS>>} -> {ok, true}; - {ok, _SomeOtherData} -> corrupted; - KO -> KO - end. diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl new file mode 100644 index 0000000000..6cf11ac871 --- /dev/null +++ b/src/rabbit_msg_file.erl @@ -0,0 +1,148 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_msg_file). + +-export([append/4, read/2, scan/1]). + +%%---------------------------------------------------------------------------- + +-define(INTEGER_SIZE_BYTES, 8). +-define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)). +-define(WRITE_OK_SIZE_BITS, 8). +-define(WRITE_OK_TRANSIENT, 255). +-define(WRITE_OK_PERSISTENT, 254). +-define(FILE_PACKING_ADJUSTMENT, (1 + (2* (?INTEGER_SIZE_BYTES)))). + +%%---------------------------------------------------------------------------- + +append(FileHdl, MsgId, MsgBody, IsPersistent) when is_binary(MsgBody) -> + BodySize = size(MsgBody), + MsgIdBin = term_to_binary(MsgId), + MsgIdBinSize = size(MsgIdBin), + Size = BodySize + MsgIdBinSize, + StopByte = case IsPersistent of + true -> ?WRITE_OK_PERSISTENT; + false -> ?WRITE_OK_TRANSIENT + end, + case file:write(FileHdl, <<Size:?INTEGER_SIZE_BITS, + MsgIdBinSize:?INTEGER_SIZE_BITS, + MsgIdBin:MsgIdBinSize/binary, + MsgBody:BodySize/binary, + StopByte:?WRITE_OK_SIZE_BITS>>) of + ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT}; + KO -> KO + end. + +read(FileHdl, TotalSize) -> + Size = TotalSize - ?FILE_PACKING_ADJUSTMENT, + SizeWriteOkBytes = Size + 1, + case file:read(FileHdl, TotalSize) of + {ok, <<Size:?INTEGER_SIZE_BITS, + MsgIdBinSize:?INTEGER_SIZE_BITS, + Rest:SizeWriteOkBytes/binary>>} -> + BodySize = Size - MsgIdBinSize, + <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary, + StopByte:?WRITE_OK_SIZE_BITS>> = Rest, + Persistent = case StopByte of + ?WRITE_OK_TRANSIENT -> false; + ?WRITE_OK_PERSISTENT -> true + end, + {ok, {MsgBody, Persistent, BodySize}}; + KO -> KO + end. + +scan(FileHdl) -> scan(FileHdl, 0, []). + +scan(FileHdl, Offset, Acc) -> + case read_next(FileHdl, Offset) of + eof -> {ok, Acc}; + {corrupted, NextOffset} -> + scan(FileHdl, NextOffset, Acc); + {ok, {MsgId, IsPersistent, TotalSize, NextOffset}} -> + scan(FileHdl, NextOffset, + [{MsgId, IsPersistent, TotalSize, Offset} | Acc]); + _KO -> + %% bad message, but we may still have recovered some valid messages + {ok, Acc} + end. + +read_next(FileHdl, Offset) -> + TwoIntegers = 2 * ?INTEGER_SIZE_BYTES, + case file:read(FileHdl, TwoIntegers) of + {ok, + <<Size:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} -> + case {Size, MsgIdBinSize} of + {0, _} -> eof; %% Nothing we can do other than stop + {_, 0} -> + %% current message corrupted, try skipping past it + ExpectedAbsPos = Offset + Size + ?FILE_PACKING_ADJUSTMENT, + case file:position(FileHdl, {cur, Size + 1}) of + {ok, ExpectedAbsPos} -> {corrupted, ExpectedAbsPos}; + {ok, _SomeOtherPos} -> eof; %% seek failed, so give up + KO -> KO + end; + {_, _} -> %% all good, let's continue + case file:read(FileHdl, MsgIdBinSize) of + {ok, <<MsgIdBin:MsgIdBinSize/binary>>} -> + TotalSize = Size + ?FILE_PACKING_ADJUSTMENT, + ExpectedAbsPos = Offset + TotalSize - 1, + case file:position( + FileHdl, {cur, Size - MsgIdBinSize}) of + {ok, ExpectedAbsPos} -> + NextOffset = ExpectedAbsPos + 1, + case read_stop_byte(FileHdl) of + {ok, Persistent} -> + MsgId = binary_to_term(MsgIdBin), + {ok, {MsgId, Persistent, + TotalSize, NextOffset}}; + corrupted -> + {corrupted, NextOffset}; + KO -> KO + end; + {ok, _SomeOtherPos} -> + %% seek failed, so give up + eof; + KO -> KO + end; + Other -> Other + end + end; + Other -> Other + end. + +read_stop_byte(FileHdl) -> + case file:read(FileHdl, 1) of + {ok, <<?WRITE_OK_TRANSIENT:?WRITE_OK_SIZE_BITS>>} -> {ok, false}; + {ok, <<?WRITE_OK_PERSISTENT:?WRITE_OK_SIZE_BITS>>} -> {ok, true}; + {ok, _SomeOtherData} -> corrupted; + KO -> KO + end. |
