diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-04-12 21:13:02 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-04-12 21:13:02 +0100 |
| commit | aac627de27d87a6c8e139243e70e53371d7bdd89 (patch) | |
| tree | df5d2c26803b3b6a11cd9862be09c2ed7270b552 /src | |
| parent | f191d8413b4d0bdcf2f466da1dc5e42304bd241d (diff) | |
| download | rabbitmq-server-git-aac627de27d87a6c8e139243e70e53371d7bdd89.tar.gz | |
and now we have some substantial tests.
This has already led to a good optimisation on reading, and found and fixed a bug in messages going to multiple queues.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_disk_queue.erl | 41 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 25 |
2 files changed, 36 insertions, 30 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index b67896ceb2..8de9d22c4a 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -144,8 +144,8 @@ handle_call(clean_stop, _From, State) -> true = ets:delete(FileSummary), true = ets:delete(FileDetail), lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))), - {stop, normal, ok, State # dqstate { current_file_handle = undefined, - read_file_handles = {dict:new(), gb_trees:empty()}}}. + {stop, normal, ok, State1 # dqstate { current_file_handle = undefined, + read_file_handles = {dict:new(), gb_trees:empty()}}}. %% gen_server now calls terminate, which then calls shutdown handle_cast({publish, Q, MsgId, MsgBody}, State) -> @@ -197,7 +197,7 @@ internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation, read_file_handles_limit = ReadFileHandlesLimit, read_file_handles = {ReadHdls, ReadHdlsAge} }) -> - [{MsgId, _RefCount, File, Offset, _TotalSize}] = ets:lookup(MsgLocation, MsgId), + [{MsgId, _RefCount, File, Offset, TotalSize}] = ets:lookup(MsgLocation, MsgId), % so this next bit implements an LRU for file handles. But it's a bit insane, and smells % of premature optimisation. So I might remove it and dump it overboard {FileHdl, ReadHdls1, ReadHdlsAge1} @@ -221,7 +221,7 @@ internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation, gb_trees:enter(Now, File, gb_trees:delete(Then, ReadHdlsAge))} end, % read the message - {ok, {MsgBody, BodySize, _TotalSize}} = read_message_at_offset(FileHdl, Offset), + {ok, {MsgBody, BodySize}} = read_message_at_offset(FileHdl, Offset, TotalSize), [Obj = #dq_msg_loc {is_delivered = Delivered}] = mnesia:dirty_read(rabbit_disk_queue, {MsgId, Q}), if Delivered -> ok; @@ -270,7 +270,7 @@ remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgL internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocation, current_file_handle = CurHdl, current_file_name = CurName, - current_offset = Offset, + current_offset = CurOffset, file_summary = FileSummary, file_detail = FileDetail }) -> @@ -278,21 +278,21 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio [] -> % New message, lots to do {ok, TotalSize} = append_message(CurHdl, MsgId, MsgBody), - true = ets:insert_new(MsgLocation, {MsgId, 1, CurName, Offset, TotalSize}), + true = ets:insert_new(MsgLocation, {MsgId, 1, CurName, CurOffset, TotalSize}), [{CurName, FileSum = #dqfile { valid_data = ValidTotalSize, contiguous_prefix = ContiguousTop, right = undefined }}] = ets:lookup(FileSummary, CurName), - true = ets:insert_new(FileDetail, {{CurName, Offset}, TotalSize}), + true = ets:insert_new(FileDetail, {{CurName, CurOffset}, TotalSize}), ValidTotalSize1 = ValidTotalSize + TotalSize + (?FILE_PACKING_ADJUSTMENT), - ContiguousTop1 = if Offset =:= ContiguousTop -> + ContiguousTop1 = if CurOffset =:= ContiguousTop -> ValidTotalSize; % can't be any holes in this file true -> ContiguousTop end, true = ets:insert(FileSummary, {CurName, FileSum #dqfile { valid_data = ValidTotalSize1, contiguous_prefix = ContiguousTop1 }}), - maybe_roll_to_new_file(Offset + TotalSize + (?FILE_PACKING_ADJUSTMENT), - State # dqstate {current_offset = Offset + TotalSize + (?FILE_PACKING_ADJUSTMENT)}); + maybe_roll_to_new_file(CurOffset + TotalSize + (?FILE_PACKING_ADJUSTMENT), + State # dqstate {current_offset = CurOffset + TotalSize + (?FILE_PACKING_ADJUSTMENT)}); [{MsgId, RefCount, File, Offset, TotalSize}] -> % We already know about it, just update counter true = ets:insert(MsgLocation, {MsgId, RefCount + 1, File, Offset, TotalSize}), @@ -559,22 +559,15 @@ append_message(FileHdl, MsgId, MsgBody) when is_binary(MsgBody) -> KO -> KO end. -read_message_at_offset(FileHdl, Offset) -> +read_message_at_offset(FileHdl, Offset, TotalSize) -> + TotalSizeWriteOkBytes = TotalSize + 1, case file:position(FileHdl, {bof, Offset}) of {ok, Offset} -> - case file:read(FileHdl, 2 * (?INTEGER_SIZE_BYTES)) of - {ok, <<TotalSize:(?INTEGER_SIZE_BITS), MsgIdBinSize:(?INTEGER_SIZE_BITS)>>} -> - ExpectedAbsPos = Offset + (2 * (?INTEGER_SIZE_BYTES)) + MsgIdBinSize, - case file:position(FileHdl, {cur, MsgIdBinSize}) of - {ok, ExpectedAbsPos} -> - BodySize = TotalSize - MsgIdBinSize, - case file:read(FileHdl, 1 + BodySize) of - {ok, <<MsgBody:BodySize/binary, (?WRITE_OK):(?WRITE_OK_SIZE_BITS)>>} -> - {ok, {MsgBody, BodySize, TotalSize}}; - KO -> KO - end; - KO -> KO - end; + case file:read(FileHdl, TotalSize + (?FILE_PACKING_ADJUSTMENT)) of + {ok, <<TotalSize:(?INTEGER_SIZE_BITS), MsgIdBinSize:(?INTEGER_SIZE_BITS), Rest:TotalSizeWriteOkBytes/binary>>} -> + BodySize = TotalSize - MsgIdBinSize, + <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary, (?WRITE_OK):(?WRITE_OK_SIZE_BITS)>> = Rest, + {ok, {MsgBody, BodySize}}; KO -> KO end; KO -> KO diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 0114eb2569..a9f546dc99 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -55,6 +55,7 @@ all_tests() -> passed = test_cluster_management(), passed = test_user_management(), passed = test_server_status(), + passed = test_disk_queue(), passed. test_parsing() -> @@ -625,21 +626,33 @@ delete_log_handlers(Handlers) -> ok. test_disk_queue() -> - [begin rdq_time_tx_publish_commit(q, MsgCount, MsgSize), timer:sleep(1000) end || % 1000 milliseconds + [begin rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSize), timer:sleep(1000) end || % 1000 milliseconds MsgSize <- [128, 512, 2048, 8192, 32768, 131072], + Qs <- [[1], lists:seq(1,10), lists:seq(1,100), lists:seq(1,1000)], MsgCount <- [1024, 2048, 4096, 8192, 16384] ], - rdq_virgin(). + rdq_virgin(), + passed. -rdq_time_tx_publish_commit(Q, MsgCount, MsgSizeBytes) -> +rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) -> rdq_virgin(), rdq_start(), + QCount = length(Qs), Msg = <<0:(8*MsgSizeBytes)>>, List = lists:seq(1, MsgCount), {Micros, ok} = timer:tc(?MODULE, rdq_time_commands, - [[fun() -> [rabbit_disk_queue:tx_publish(N, Msg) || N <- List] end, - fun() -> rabbit_disk_queue:tx_commit(Q, List) end]]), - io:format("Published ~p ~p-byte messages in ~p microseconds (~p microseconds/msg) (~p microseconds/byte)~n", [MsgCount, MsgSizeBytes, Micros, (Micros / MsgCount), (Micros / MsgCount / MsgSizeBytes)]), + [[fun() -> [rabbit_disk_queue:tx_publish(N, Msg) || N <- List, _ <- Qs] end, + fun() -> [rabbit_disk_queue:tx_commit(Q, List) || Q <- Qs] end + ]]), + io:format("Published ~p ~p-byte messages in ~p microseconds to ~p queues (~p microseconds/msg) (~p microseconds/byte)~n", + [MsgCount, MsgSizeBytes, Micros, QCount, (Micros / (MsgCount * QCount)), (Micros / (MsgCount * QCount * MsgSizeBytes))]), + {Micros2, ok} = timer:tc(?MODULE, rdq_time_commands, + [[fun() -> [begin [begin rabbit_disk_queue:deliver(Q, N), ok end || N <- List], + rabbit_disk_queue:ack(Q, List), + rabbit_disk_queue:tx_commit(Q, []) + end || Q <- Qs] + end]]), + io:format("Delivered ~p ~p-byte messages in ~p microseconds from ~p queues (~p microseconds/msg) (~p microseconds/byte)~n", [MsgCount, MsgSizeBytes, Micros2, QCount, (Micros2 / (MsgCount * QCount)), (Micros2 / (MsgCount * QCount * MsgSizeBytes))]), rdq_stop(). rdq_time_commands(Funcs) -> |
