summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2009-08-29 11:15:16 +0100
committerMatthias Radestock <matthias@lshift.net>2009-08-29 11:15:16 +0100
commit8a5b3cdf7c705d3e8194af0ac32c52e2a9780db5 (patch)
tree006e672f82bf44a4e8df6e63be21e50155196775
parent6b3c6014ef7164638dd1658d8251271716163291 (diff)
downloadrabbitmq-server-git-8a5b3cdf7c705d3e8194af0ac32c52e2a9780db5.tar.gz
refactoring: move all the low level message file i/o into a separate module
The details of how messages are encoded in files are opaque to disk_queue.
-rw-r--r--src/rabbit_disk_queue.erl121
-rw-r--r--src/rabbit_msg_file.erl148
2 files changed, 153 insertions, 116 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 719ff1a039..84c3b6e345 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -54,11 +54,6 @@
-include("rabbit.hrl").
--define(WRITE_OK_SIZE_BITS, 8).
--define(WRITE_OK_TRANSIENT, 255).
--define(WRITE_OK_PERSISTENT, 254).
--define(INTEGER_SIZE_BYTES, 8).
--define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)).
-define(MSG_LOC_NAME, rabbit_disk_queue_msg_location).
-define(FILE_SUMMARY_ETS_NAME, rabbit_disk_queue_file_summary).
-define(SEQUENCE_ETS_NAME, rabbit_disk_queue_sequences).
@@ -66,7 +61,6 @@
-define(FILE_EXTENSION, ".rdq").
-define(FILE_EXTENSION_TMP, ".rdt").
-define(FILE_EXTENSION_DETS, ".dets").
--define(FILE_PACKING_ADJUSTMENT, (1 + (2* (?INTEGER_SIZE_BYTES)))).
-define(BATCH_SIZE, 10000).
-define(CACHE_MAX_SIZE, 10485760).
-define(MAX_READ_FILE_HANDLES, 256).
@@ -913,7 +907,7 @@ read_stored_message(#message_store_entry { msg_id = MsgId, ref_count = RefCount,
with_read_handle_at(
File, Offset,
fun(Hdl) ->
- Res = case read_message_from_disk(Hdl, TotalSize) of
+ Res = case rabbit_msg_file:read(Hdl, TotalSize) of
{ok, {_, _, _}} = Obj -> Obj;
{ok, Rest} ->
throw({error,
@@ -1024,8 +1018,9 @@ internal_tx_publish(Message = #basic_message { is_persistent = IsPersistent,
case dets_ets_lookup(State, MsgId) of
[] ->
%% New message, lots to do
- {ok, TotalSize} = append_message(CurHdl, MsgId, msg_to_bin(Message),
- IsPersistent),
+ {ok, TotalSize} = rabbit_msg_file:append(
+ CurHdl, MsgId, msg_to_bin(Message),
+ IsPersistent),
true = dets_ets_insert_new(
State, #message_store_entry
{ msg_id = MsgId, ref_count = 1, file = CurName,
@@ -1902,50 +1897,10 @@ get_disk_queue_files() ->
DQTFilesSorted = lists:sort(fun file_name_sort/2, DQTFiles),
{DQFilesSorted, DQTFilesSorted}.
-%%----------------------------------------------------------------------------
-%% raw reading and writing of files
-%%----------------------------------------------------------------------------
-
-append_message(FileHdl, MsgId, MsgBody, IsPersistent) when is_binary(MsgBody) ->
- BodySize = size(MsgBody),
- MsgIdBin = term_to_binary(MsgId),
- MsgIdBinSize = size(MsgIdBin),
- Size = BodySize + MsgIdBinSize,
- StopByte = case IsPersistent of
- true -> ?WRITE_OK_PERSISTENT;
- false -> ?WRITE_OK_TRANSIENT
- end,
- case file:write(FileHdl, <<Size:?INTEGER_SIZE_BITS,
- MsgIdBinSize:?INTEGER_SIZE_BITS,
- MsgIdBin:MsgIdBinSize/binary,
- MsgBody:BodySize/binary,
- StopByte:?WRITE_OK_SIZE_BITS>>) of
- ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT};
- KO -> KO
- end.
-
-read_message_from_disk(FileHdl, TotalSize) ->
- Size = TotalSize - ?FILE_PACKING_ADJUSTMENT,
- SizeWriteOkBytes = Size + 1,
- case file:read(FileHdl, TotalSize) of
- {ok, <<Size:?INTEGER_SIZE_BITS,
- MsgIdBinSize:?INTEGER_SIZE_BITS,
- Rest:SizeWriteOkBytes/binary>>} ->
- BodySize = Size - MsgIdBinSize,
- <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary,
- StopByte:?WRITE_OK_SIZE_BITS>> = Rest,
- Persistent = case StopByte of
- ?WRITE_OK_TRANSIENT -> false;
- ?WRITE_OK_PERSISTENT -> true
- end,
- {ok, {MsgBody, Persistent, BodySize}};
- KO -> KO
- end.
-
scan_file_for_valid_messages(File) ->
case open_file(File, ?READ_MODE) of
{ok, Hdl} ->
- Valid = scan_file_for_valid_messages(Hdl, 0, []),
+ Valid = rabbit_msg_file:scan(Hdl),
%% if something really bad's happened, the close could fail,
%% but ignore
file:close(Hdl),
@@ -1953,69 +1908,3 @@ scan_file_for_valid_messages(File) ->
{error, enoent} -> {ok, []};
{error, Reason} -> throw({error, {unable_to_scan_file, File, Reason}})
end.
-
-scan_file_for_valid_messages(FileHdl, Offset, Acc) ->
- case read_next_file_entry(FileHdl, Offset) of
- eof -> {ok, Acc};
- {corrupted, NextOffset} ->
- scan_file_for_valid_messages(FileHdl, NextOffset, Acc);
- {ok, {MsgId, IsPersistent, TotalSize, NextOffset}} ->
- scan_file_for_valid_messages(
- FileHdl, NextOffset,
- [{MsgId, IsPersistent, TotalSize, Offset} | Acc]);
- _KO ->
- %% bad message, but we may still have recovered some valid messages
- {ok, Acc}
- end.
-
-read_next_file_entry(FileHdl, Offset) ->
- TwoIntegers = 2 * ?INTEGER_SIZE_BYTES,
- case file:read(FileHdl, TwoIntegers) of
- {ok,
- <<Size:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} ->
- case {Size, MsgIdBinSize} of
- {0, _} -> eof; %% Nothing we can do other than stop
- {_, 0} ->
- %% current message corrupted, try skipping past it
- ExpectedAbsPos = Offset + Size + ?FILE_PACKING_ADJUSTMENT,
- case file:position(FileHdl, {cur, Size + 1}) of
- {ok, ExpectedAbsPos} -> {corrupted, ExpectedAbsPos};
- {ok, _SomeOtherPos} -> eof; %% seek failed, so give up
- KO -> KO
- end;
- {_, _} -> %% all good, let's continue
- case file:read(FileHdl, MsgIdBinSize) of
- {ok, <<MsgIdBin:MsgIdBinSize/binary>>} ->
- TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
- ExpectedAbsPos = Offset + TotalSize - 1,
- case file:position(
- FileHdl, {cur, Size - MsgIdBinSize}) of
- {ok, ExpectedAbsPos} ->
- NextOffset = ExpectedAbsPos + 1,
- case read_stop_byte(FileHdl) of
- {ok, Persistent} ->
- MsgId = binary_to_term(MsgIdBin),
- {ok, {MsgId, Persistent,
- TotalSize, NextOffset}};
- corrupted ->
- {corrupted, NextOffset};
- KO -> KO
- end;
- {ok, _SomeOtherPos} ->
- %% seek failed, so give up
- eof;
- KO -> KO
- end;
- Other -> Other
- end
- end;
- Other -> Other
- end.
-
-read_stop_byte(FileHdl) ->
- case file:read(FileHdl, 1) of
- {ok, <<?WRITE_OK_TRANSIENT:?WRITE_OK_SIZE_BITS>>} -> {ok, false};
- {ok, <<?WRITE_OK_PERSISTENT:?WRITE_OK_SIZE_BITS>>} -> {ok, true};
- {ok, _SomeOtherData} -> corrupted;
- KO -> KO
- end.
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
new file mode 100644
index 0000000000..6cf11ac871
--- /dev/null
+++ b/src/rabbit_msg_file.erl
@@ -0,0 +1,148 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_msg_file).
+
+-export([append/4, read/2, scan/1]).
+
+%%----------------------------------------------------------------------------
+
+-define(INTEGER_SIZE_BYTES, 8).
+-define(INTEGER_SIZE_BITS, (8 * ?INTEGER_SIZE_BYTES)).
+-define(WRITE_OK_SIZE_BITS, 8).
+-define(WRITE_OK_TRANSIENT, 255).
+-define(WRITE_OK_PERSISTENT, 254).
+-define(FILE_PACKING_ADJUSTMENT, (1 + (2* (?INTEGER_SIZE_BYTES)))).
+
+%%----------------------------------------------------------------------------
+
+append(FileHdl, MsgId, MsgBody, IsPersistent) when is_binary(MsgBody) ->
+ BodySize = size(MsgBody),
+ MsgIdBin = term_to_binary(MsgId),
+ MsgIdBinSize = size(MsgIdBin),
+ Size = BodySize + MsgIdBinSize,
+ StopByte = case IsPersistent of
+ true -> ?WRITE_OK_PERSISTENT;
+ false -> ?WRITE_OK_TRANSIENT
+ end,
+ case file:write(FileHdl, <<Size:?INTEGER_SIZE_BITS,
+ MsgIdBinSize:?INTEGER_SIZE_BITS,
+ MsgIdBin:MsgIdBinSize/binary,
+ MsgBody:BodySize/binary,
+ StopByte:?WRITE_OK_SIZE_BITS>>) of
+ ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT};
+ KO -> KO
+ end.
+
+read(FileHdl, TotalSize) ->
+ Size = TotalSize - ?FILE_PACKING_ADJUSTMENT,
+ SizeWriteOkBytes = Size + 1,
+ case file:read(FileHdl, TotalSize) of
+ {ok, <<Size:?INTEGER_SIZE_BITS,
+ MsgIdBinSize:?INTEGER_SIZE_BITS,
+ Rest:SizeWriteOkBytes/binary>>} ->
+ BodySize = Size - MsgIdBinSize,
+ <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary,
+ StopByte:?WRITE_OK_SIZE_BITS>> = Rest,
+ Persistent = case StopByte of
+ ?WRITE_OK_TRANSIENT -> false;
+ ?WRITE_OK_PERSISTENT -> true
+ end,
+ {ok, {MsgBody, Persistent, BodySize}};
+ KO -> KO
+ end.
+
+scan(FileHdl) -> scan(FileHdl, 0, []).
+
+scan(FileHdl, Offset, Acc) ->
+ case read_next(FileHdl, Offset) of
+ eof -> {ok, Acc};
+ {corrupted, NextOffset} ->
+ scan(FileHdl, NextOffset, Acc);
+ {ok, {MsgId, IsPersistent, TotalSize, NextOffset}} ->
+ scan(FileHdl, NextOffset,
+ [{MsgId, IsPersistent, TotalSize, Offset} | Acc]);
+ _KO ->
+ %% bad message, but we may still have recovered some valid messages
+ {ok, Acc}
+ end.
+
+read_next(FileHdl, Offset) ->
+ TwoIntegers = 2 * ?INTEGER_SIZE_BYTES,
+ case file:read(FileHdl, TwoIntegers) of
+ {ok,
+ <<Size:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} ->
+ case {Size, MsgIdBinSize} of
+ {0, _} -> eof; %% Nothing we can do other than stop
+ {_, 0} ->
+ %% current message corrupted, try skipping past it
+ ExpectedAbsPos = Offset + Size + ?FILE_PACKING_ADJUSTMENT,
+ case file:position(FileHdl, {cur, Size + 1}) of
+ {ok, ExpectedAbsPos} -> {corrupted, ExpectedAbsPos};
+ {ok, _SomeOtherPos} -> eof; %% seek failed, so give up
+ KO -> KO
+ end;
+ {_, _} -> %% all good, let's continue
+ case file:read(FileHdl, MsgIdBinSize) of
+ {ok, <<MsgIdBin:MsgIdBinSize/binary>>} ->
+ TotalSize = Size + ?FILE_PACKING_ADJUSTMENT,
+ ExpectedAbsPos = Offset + TotalSize - 1,
+ case file:position(
+ FileHdl, {cur, Size - MsgIdBinSize}) of
+ {ok, ExpectedAbsPos} ->
+ NextOffset = ExpectedAbsPos + 1,
+ case read_stop_byte(FileHdl) of
+ {ok, Persistent} ->
+ MsgId = binary_to_term(MsgIdBin),
+ {ok, {MsgId, Persistent,
+ TotalSize, NextOffset}};
+ corrupted ->
+ {corrupted, NextOffset};
+ KO -> KO
+ end;
+ {ok, _SomeOtherPos} ->
+ %% seek failed, so give up
+ eof;
+ KO -> KO
+ end;
+ Other -> Other
+ end
+ end;
+ Other -> Other
+ end.
+
+read_stop_byte(FileHdl) ->
+ case file:read(FileHdl, 1) of
+ {ok, <<?WRITE_OK_TRANSIENT:?WRITE_OK_SIZE_BITS>>} -> {ok, false};
+ {ok, <<?WRITE_OK_PERSISTENT:?WRITE_OK_SIZE_BITS>>} -> {ok, true};
+ {ok, _SomeOtherData} -> corrupted;
+ KO -> KO
+ end.