diff options
| author | Emile Joubert <emile@rabbitmq.com> | 2011-02-11 17:28:13 +0000 |
|---|---|---|
| committer | Emile Joubert <emile@rabbitmq.com> | 2011-02-11 17:28:13 +0000 |
| commit | 803bbd4354d3d20624abd589aab199d9a7823b7b (patch) | |
| tree | f573a1322e948ee3f50f8f6bb53d9180b1a8d1c0 | |
| parent | 21f09a4abc9b89aa6649397c9be458eee5e8a38d (diff) | |
| download | rabbitmq-server-git-803bbd4354d3d20624abd589aab199d9a7823b7b.tar.gz | |
rabbit_msg_file:scan/4 now looks a bit more like fold
Also ignore garbage at the end of a message store
| -rw-r--r-- | src/rabbit_msg_file.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 31 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 2 |
4 files changed, 26 insertions, 29 deletions
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index ad87ee161e..9d5953d5c9 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -16,7 +16,7 @@ -module(rabbit_msg_file). --export([append/3, read/2, scan/2, scan/3]). +-export([append/3, read/2, scan/2, scan/4]). %%---------------------------------------------------------------------------- @@ -48,9 +48,9 @@ -spec(scan/2 :: (io_device(), file_size()) -> {'ok', [{rabbit_guid:guid(), msg_size(), position()}], position()}). --spec(scan/3 :: (io_device(), file_size(), - fun ((rabbit_guid:guid(), msg_size(), position(), binary()) -> any())) -> - {'ok', [any()], position()}). +-spec(scan/4 :: (io_device(), file_size(), + fun (({rabbit_guid:guid(), msg_size(), position(), binary()}, A) -> A), + A) -> {'ok', A, position()}). -endif. @@ -82,14 +82,14 @@ read(FileHdl, TotalSize) -> KO -> KO end. -scan_fun(Guid, TotalSize, Offset, _Msg) -> - {Guid, TotalSize, Offset}. +scan_fun({Guid, TotalSize, Offset, _Msg}, Acc) -> + [{Guid, TotalSize, Offset} | Acc]. scan(FileHdl, FileSize) when FileSize >= 0 -> - scan(FileHdl, FileSize, <<>>, 0, [], 0, fun scan_fun/4). + scan(FileHdl, FileSize, <<>>, 0, [], 0, fun scan_fun/2). -scan(FileHdl, FileSize, Fun) when FileSize >= 0 -> - scan(FileHdl, FileSize, <<>>, 0, [], 0, Fun). +scan(FileHdl, FileSize, Fun, Acc) when FileSize >= 0 -> + scan(FileHdl, FileSize, <<>>, 0, Acc, 0, Fun). scan(_FileHdl, FileSize, _Data, FileSize, Acc, ScanOffset, _Fun) -> {ok, Acc, ScanOffset}; @@ -122,7 +122,7 @@ scanner(<<Size:?INTEGER_SIZE_BITS, GuidAndMsg:Size/binary, <<GuidNum:?GUID_SIZE_BITS, Msg/binary>> = <<GuidAndMsg:Size/binary>>, <<Guid:?GUID_SIZE_BYTES/binary>> = <<GuidNum:?GUID_SIZE_BITS>>, - scanner(Rest, [Fun(Guid, TotalSize, Offset, Msg) | Acc], + scanner(Rest, Fun({Guid, TotalSize, Offset, Msg}, Acc), Offset + TotalSize, Fun); _ -> scanner(Rest, Acc, Offset + TotalSize, Fun) diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index bd8d61e883..b827eba944 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -166,8 +166,7 @@ -spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> deletion_thunk()). -spec(force_recovery/2 :: (file:filename(), server()) -> 'ok'). -spec(transform_dir/3 :: (file:filename(), server(), - fun ((binary())->({'ok', msg()} | {error, any()}))) -> - non_neg_integer()). + fun ((binary()) -> ({'ok', msg()} | {error, any()}))) -> 'ok'). -endif. @@ -1976,21 +1975,19 @@ transform_dir(BaseDir, Server, TransformFun) -> Dir = filename:join(BaseDir, atom_to_list(Server)), TmpDir = filename:join(Dir, ?TRANSFORM_TMP), case filelib:is_dir(TmpDir) of - true -> throw({error, previously_failed_transform}); + true -> throw({error, transform_failed_previously}); false -> - Count = lists:sum( - [transform_msg_file(filename:join(Dir, File), - filename:join(TmpDir, File), - TransformFun) || - File <- list_sorted_file_names(Dir, ?FILE_EXTENSION)]), + [transform_msg_file(filename:join(Dir, File), + filename:join(TmpDir, File), + TransformFun) || + File <- list_sorted_file_names(Dir, ?FILE_EXTENSION)], [file:delete(filename:join(Dir, File)) || File <- list_sorted_file_names(Dir, ?FILE_EXTENSION)], [file:copy(filename:join(TmpDir, File), filename:join(Dir, File)) || File <- list_sorted_file_names(TmpDir, ?FILE_EXTENSION)], [file:delete(filename:join(TmpDir, File)) || File <- list_sorted_file_names(TmpDir, ?FILE_EXTENSION)], - ok = file:del_dir(TmpDir), - Count + ok = file:del_dir(TmpDir) end. transform_msg_file(FileOld, FileNew, TransformFun) -> @@ -2000,21 +1997,21 @@ transform_msg_file(FileOld, FileNew, TransformFun) -> {ok, RefNew} = file_handle_cache:open(FileNew, [raw, binary, write], [{write_buffer, ?HANDLE_CACHE_BUFFER_SIZE}]), - {ok, Acc, Size} = + {ok, Acc, _IgnoreSize} = rabbit_msg_file:scan( RefOld, Size, - fun(Guid, _Size, _Offset, BinMsg) -> + fun({Guid, _Size, _Offset, BinMsg}, ok) -> case TransformFun(BinMsg) of {ok, MsgNew} -> - rabbit_msg_file:append(RefNew, Guid, MsgNew), - 1; + {ok, _} = rabbit_msg_file:append(RefNew, Guid, MsgNew), + ok; {error, Reason} -> error_logger:error_msg("Message transform failed: ~p~n", [Reason]), - 0 + ok end - end), + end, ok), file_handle_cache:close(RefOld), file_handle_cache:close(RefNew), - lists:sum(Acc). + ok = Acc. diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index f4e27cc87b..73f59557be 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -107,7 +107,7 @@ mnesia(TableName, Fun, FieldList, NewRecordName) -> %%-------------------------------------------------------------------- multiple_routing_keys() -> - _UpgradeMsgCount = rabbit_variable_queue:transform_storage( + rabbit_variable_queue:transform_storage( fun (BinMsg) -> case binary_to_term(BinMsg) of {basic_message, ExchangeName, Routing_Key, Content, Guid, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f2176c0e14..dee6a8e51f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1808,7 +1808,7 @@ push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) -> %% Assumes message store is not running transform_storage(TransformFun) -> - transform_store(?PERSISTENT_MSG_STORE, TransformFun) + + transform_store(?PERSISTENT_MSG_STORE, TransformFun), transform_store(?TRANSIENT_MSG_STORE, TransformFun). transform_store(Store, TransformFun) -> |
