summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2011-02-11 17:28:13 +0000
committerEmile Joubert <emile@rabbitmq.com>2011-02-11 17:28:13 +0000
commit803bbd4354d3d20624abd589aab199d9a7823b7b (patch)
treef573a1322e948ee3f50f8f6bb53d9180b1a8d1c0
parent21f09a4abc9b89aa6649397c9be458eee5e8a38d (diff)
downloadrabbitmq-server-git-803bbd4354d3d20624abd589aab199d9a7823b7b.tar.gz
rabbit_msg_file:scan/4 now looks a bit more like fold
Also ignore garbage at the end of a message store
-rw-r--r--src/rabbit_msg_file.erl20
-rw-r--r--src/rabbit_msg_store.erl31
-rw-r--r--src/rabbit_upgrade_functions.erl2
-rw-r--r--src/rabbit_variable_queue.erl2
4 files changed, 26 insertions, 29 deletions
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index ad87ee161e..9d5953d5c9 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -16,7 +16,7 @@
-module(rabbit_msg_file).
--export([append/3, read/2, scan/2, scan/3]).
+-export([append/3, read/2, scan/2, scan/4]).
%%----------------------------------------------------------------------------
@@ -48,9 +48,9 @@
-spec(scan/2 :: (io_device(), file_size()) ->
{'ok', [{rabbit_guid:guid(), msg_size(), position()}],
position()}).
--spec(scan/3 :: (io_device(), file_size(),
- fun ((rabbit_guid:guid(), msg_size(), position(), binary()) -> any())) ->
- {'ok', [any()], position()}).
+-spec(scan/4 :: (io_device(), file_size(),
+ fun (({rabbit_guid:guid(), msg_size(), position(), binary()}, A) -> A),
+ A) -> {'ok', A, position()}).
-endif.
@@ -82,14 +82,14 @@ read(FileHdl, TotalSize) ->
KO -> KO
end.
-scan_fun(Guid, TotalSize, Offset, _Msg) ->
- {Guid, TotalSize, Offset}.
+scan_fun({Guid, TotalSize, Offset, _Msg}, Acc) ->
+ [{Guid, TotalSize, Offset} | Acc].
scan(FileHdl, FileSize) when FileSize >= 0 ->
- scan(FileHdl, FileSize, <<>>, 0, [], 0, fun scan_fun/4).
+ scan(FileHdl, FileSize, <<>>, 0, [], 0, fun scan_fun/2).
-scan(FileHdl, FileSize, Fun) when FileSize >= 0 ->
- scan(FileHdl, FileSize, <<>>, 0, [], 0, Fun).
+scan(FileHdl, FileSize, Fun, Acc) when FileSize >= 0 ->
+ scan(FileHdl, FileSize, <<>>, 0, Acc, 0, Fun).
scan(_FileHdl, FileSize, _Data, FileSize, Acc, ScanOffset, _Fun) ->
{ok, Acc, ScanOffset};
@@ -122,7 +122,7 @@ scanner(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary,
<<GuidNum:?GUID_SIZE_BITS, Msg/binary>> =
<<GuidAndMsg:Size/binary>>,
<<Guid:?GUID_SIZE_BYTES/binary>> = <<GuidNum:?GUID_SIZE_BITS>>,
- scanner(Rest, [Fun(Guid, TotalSize, Offset, Msg) | Acc],
+ scanner(Rest, Fun({Guid, TotalSize, Offset, Msg}, Acc),
Offset + TotalSize, Fun);
_ ->
scanner(Rest, Acc, Offset + TotalSize, Fun)
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index bd8d61e883..b827eba944 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -166,8 +166,7 @@
-spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> deletion_thunk()).
-spec(force_recovery/2 :: (file:filename(), server()) -> 'ok').
-spec(transform_dir/3 :: (file:filename(), server(),
- fun ((binary())->({'ok', msg()} | {error, any()}))) ->
- non_neg_integer()).
+ fun ((binary()) -> ({'ok', msg()} | {error, any()}))) -> 'ok').
-endif.
@@ -1976,21 +1975,19 @@ transform_dir(BaseDir, Server, TransformFun) ->
Dir = filename:join(BaseDir, atom_to_list(Server)),
TmpDir = filename:join(Dir, ?TRANSFORM_TMP),
case filelib:is_dir(TmpDir) of
- true -> throw({error, previously_failed_transform});
+ true -> throw({error, transform_failed_previously});
false ->
- Count = lists:sum(
- [transform_msg_file(filename:join(Dir, File),
- filename:join(TmpDir, File),
- TransformFun) ||
- File <- list_sorted_file_names(Dir, ?FILE_EXTENSION)]),
+ [transform_msg_file(filename:join(Dir, File),
+ filename:join(TmpDir, File),
+ TransformFun) ||
+ File <- list_sorted_file_names(Dir, ?FILE_EXTENSION)],
[file:delete(filename:join(Dir, File)) ||
File <- list_sorted_file_names(Dir, ?FILE_EXTENSION)],
[file:copy(filename:join(TmpDir, File), filename:join(Dir, File)) ||
File <- list_sorted_file_names(TmpDir, ?FILE_EXTENSION)],
[file:delete(filename:join(TmpDir, File)) ||
File <- list_sorted_file_names(TmpDir, ?FILE_EXTENSION)],
- ok = file:del_dir(TmpDir),
- Count
+ ok = file:del_dir(TmpDir)
end.
transform_msg_file(FileOld, FileNew, TransformFun) ->
@@ -2000,21 +1997,21 @@ transform_msg_file(FileOld, FileNew, TransformFun) ->
{ok, RefNew} = file_handle_cache:open(FileNew, [raw, binary, write],
[{write_buffer,
?HANDLE_CACHE_BUFFER_SIZE}]),
- {ok, Acc, Size} =
+ {ok, Acc, _IgnoreSize} =
rabbit_msg_file:scan(
RefOld, Size,
- fun(Guid, _Size, _Offset, BinMsg) ->
+ fun({Guid, _Size, _Offset, BinMsg}, ok) ->
case TransformFun(BinMsg) of
{ok, MsgNew} ->
- rabbit_msg_file:append(RefNew, Guid, MsgNew),
- 1;
+ {ok, _} = rabbit_msg_file:append(RefNew, Guid, MsgNew),
+ ok;
{error, Reason} ->
error_logger:error_msg("Message transform failed: ~p~n",
[Reason]),
- 0
+ ok
end
- end),
+ end, ok),
file_handle_cache:close(RefOld),
file_handle_cache:close(RefNew),
- lists:sum(Acc).
+ ok = Acc.
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index f4e27cc87b..73f59557be 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -107,7 +107,7 @@ mnesia(TableName, Fun, FieldList, NewRecordName) ->
%%--------------------------------------------------------------------
multiple_routing_keys() ->
- _UpgradeMsgCount = rabbit_variable_queue:transform_storage(
+ rabbit_variable_queue:transform_storage(
fun (BinMsg) ->
case binary_to_term(BinMsg) of
{basic_message, ExchangeName, Routing_Key, Content, Guid,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index f2176c0e14..dee6a8e51f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1808,7 +1808,7 @@ push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) ->
%% Assumes message store is not running
transform_storage(TransformFun) ->
- transform_store(?PERSISTENT_MSG_STORE, TransformFun) +
+ transform_store(?PERSISTENT_MSG_STORE, TransformFun),
transform_store(?TRANSIENT_MSG_STORE, TransformFun).
transform_store(Store, TransformFun) ->