summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-04-12 21:13:02 +0100
committerMatthew Sackman <matthew@lshift.net>2009-04-12 21:13:02 +0100
commitaac627de27d87a6c8e139243e70e53371d7bdd89 (patch)
treedf5d2c26803b3b6a11cd9862be09c2ed7270b552 /src
parentf191d8413b4d0bdcf2f466da1dc5e42304bd241d (diff)
downloadrabbitmq-server-git-aac627de27d87a6c8e139243e70e53371d7bdd89.tar.gz
and now we have some substantial tests.
This has already led to a good optimisation on reading, and found and fixed a bug in messages going to multiple queues.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl41
-rw-r--r--src/rabbit_tests.erl25
2 files changed, 36 insertions, 30 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index b67896ceb2..8de9d22c4a 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -144,8 +144,8 @@ handle_call(clean_stop, _From, State) ->
true = ets:delete(FileSummary),
true = ets:delete(FileDetail),
lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))),
- {stop, normal, ok, State # dqstate { current_file_handle = undefined,
- read_file_handles = {dict:new(), gb_trees:empty()}}}.
+ {stop, normal, ok, State1 # dqstate { current_file_handle = undefined,
+ read_file_handles = {dict:new(), gb_trees:empty()}}}.
%% gen_server now calls terminate, which then calls shutdown
handle_cast({publish, Q, MsgId, MsgBody}, State) ->
@@ -197,7 +197,7 @@ internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation,
read_file_handles_limit = ReadFileHandlesLimit,
read_file_handles = {ReadHdls, ReadHdlsAge}
}) ->
- [{MsgId, _RefCount, File, Offset, _TotalSize}] = ets:lookup(MsgLocation, MsgId),
+ [{MsgId, _RefCount, File, Offset, TotalSize}] = ets:lookup(MsgLocation, MsgId),
% so this next bit implements an LRU for file handles. But it's a bit insane, and smells
% of premature optimisation. So I might remove it and dump it overboard
{FileHdl, ReadHdls1, ReadHdlsAge1}
@@ -221,7 +221,7 @@ internal_deliver(Q, MsgId, State = #dqstate { msg_location = MsgLocation,
gb_trees:enter(Now, File, gb_trees:delete(Then, ReadHdlsAge))}
end,
% read the message
- {ok, {MsgBody, BodySize, _TotalSize}} = read_message_at_offset(FileHdl, Offset),
+ {ok, {MsgBody, BodySize}} = read_message_at_offset(FileHdl, Offset, TotalSize),
[Obj = #dq_msg_loc {is_delivered = Delivered}]
= mnesia:dirty_read(rabbit_disk_queue, {MsgId, Q}),
if Delivered -> ok;
@@ -270,7 +270,7 @@ remove_messages(Q, MsgIds, MnesiaDelete, State = # dqstate { msg_location = MsgL
internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocation,
current_file_handle = CurHdl,
current_file_name = CurName,
- current_offset = Offset,
+ current_offset = CurOffset,
file_summary = FileSummary,
file_detail = FileDetail
}) ->
@@ -278,21 +278,21 @@ internal_tx_publish(MsgId, MsgBody, State = #dqstate { msg_location = MsgLocatio
[] ->
% New message, lots to do
{ok, TotalSize} = append_message(CurHdl, MsgId, MsgBody),
- true = ets:insert_new(MsgLocation, {MsgId, 1, CurName, Offset, TotalSize}),
+ true = ets:insert_new(MsgLocation, {MsgId, 1, CurName, CurOffset, TotalSize}),
[{CurName, FileSum = #dqfile { valid_data = ValidTotalSize,
contiguous_prefix = ContiguousTop,
right = undefined }}]
= ets:lookup(FileSummary, CurName),
- true = ets:insert_new(FileDetail, {{CurName, Offset}, TotalSize}),
+ true = ets:insert_new(FileDetail, {{CurName, CurOffset}, TotalSize}),
ValidTotalSize1 = ValidTotalSize + TotalSize + (?FILE_PACKING_ADJUSTMENT),
- ContiguousTop1 = if Offset =:= ContiguousTop ->
+ ContiguousTop1 = if CurOffset =:= ContiguousTop ->
ValidTotalSize; % can't be any holes in this file
true -> ContiguousTop
end,
true = ets:insert(FileSummary, {CurName, FileSum #dqfile { valid_data = ValidTotalSize1,
contiguous_prefix = ContiguousTop1 }}),
- maybe_roll_to_new_file(Offset + TotalSize + (?FILE_PACKING_ADJUSTMENT),
- State # dqstate {current_offset = Offset + TotalSize + (?FILE_PACKING_ADJUSTMENT)});
+ maybe_roll_to_new_file(CurOffset + TotalSize + (?FILE_PACKING_ADJUSTMENT),
+ State # dqstate {current_offset = CurOffset + TotalSize + (?FILE_PACKING_ADJUSTMENT)});
[{MsgId, RefCount, File, Offset, TotalSize}] ->
% We already know about it, just update counter
true = ets:insert(MsgLocation, {MsgId, RefCount + 1, File, Offset, TotalSize}),
@@ -559,22 +559,15 @@ append_message(FileHdl, MsgId, MsgBody) when is_binary(MsgBody) ->
KO -> KO
end.
-read_message_at_offset(FileHdl, Offset) ->
+read_message_at_offset(FileHdl, Offset, TotalSize) ->
+ TotalSizeWriteOkBytes = TotalSize + 1,
case file:position(FileHdl, {bof, Offset}) of
{ok, Offset} ->
- case file:read(FileHdl, 2 * (?INTEGER_SIZE_BYTES)) of
- {ok, <<TotalSize:(?INTEGER_SIZE_BITS), MsgIdBinSize:(?INTEGER_SIZE_BITS)>>} ->
- ExpectedAbsPos = Offset + (2 * (?INTEGER_SIZE_BYTES)) + MsgIdBinSize,
- case file:position(FileHdl, {cur, MsgIdBinSize}) of
- {ok, ExpectedAbsPos} ->
- BodySize = TotalSize - MsgIdBinSize,
- case file:read(FileHdl, 1 + BodySize) of
- {ok, <<MsgBody:BodySize/binary, (?WRITE_OK):(?WRITE_OK_SIZE_BITS)>>} ->
- {ok, {MsgBody, BodySize, TotalSize}};
- KO -> KO
- end;
- KO -> KO
- end;
+ case file:read(FileHdl, TotalSize + (?FILE_PACKING_ADJUSTMENT)) of
+ {ok, <<TotalSize:(?INTEGER_SIZE_BITS), MsgIdBinSize:(?INTEGER_SIZE_BITS), Rest:TotalSizeWriteOkBytes/binary>>} ->
+ BodySize = TotalSize - MsgIdBinSize,
+ <<_MsgId:MsgIdBinSize/binary, MsgBody:BodySize/binary, (?WRITE_OK):(?WRITE_OK_SIZE_BITS)>> = Rest,
+ {ok, {MsgBody, BodySize}};
KO -> KO
end;
KO -> KO
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 0114eb2569..a9f546dc99 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -55,6 +55,7 @@ all_tests() ->
passed = test_cluster_management(),
passed = test_user_management(),
passed = test_server_status(),
+ passed = test_disk_queue(),
passed.
test_parsing() ->
@@ -625,21 +626,33 @@ delete_log_handlers(Handlers) ->
ok.
test_disk_queue() ->
- [begin rdq_time_tx_publish_commit(q, MsgCount, MsgSize), timer:sleep(1000) end || % 1000 milliseconds
+ [begin rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSize), timer:sleep(1000) end || % 1000 milliseconds
MsgSize <- [128, 512, 2048, 8192, 32768, 131072],
+ Qs <- [[1], lists:seq(1,10), lists:seq(1,100), lists:seq(1,1000)],
MsgCount <- [1024, 2048, 4096, 8192, 16384]
],
- rdq_virgin().
+ rdq_virgin(),
+ passed.
-rdq_time_tx_publish_commit(Q, MsgCount, MsgSizeBytes) ->
+rdq_time_tx_publish_commit_deliver_ack(Qs, MsgCount, MsgSizeBytes) ->
rdq_virgin(),
rdq_start(),
+ QCount = length(Qs),
Msg = <<0:(8*MsgSizeBytes)>>,
List = lists:seq(1, MsgCount),
{Micros, ok} = timer:tc(?MODULE, rdq_time_commands,
- [[fun() -> [rabbit_disk_queue:tx_publish(N, Msg) || N <- List] end,
- fun() -> rabbit_disk_queue:tx_commit(Q, List) end]]),
- io:format("Published ~p ~p-byte messages in ~p microseconds (~p microseconds/msg) (~p microseconds/byte)~n", [MsgCount, MsgSizeBytes, Micros, (Micros / MsgCount), (Micros / MsgCount / MsgSizeBytes)]),
+ [[fun() -> [rabbit_disk_queue:tx_publish(N, Msg) || N <- List, _ <- Qs] end,
+ fun() -> [rabbit_disk_queue:tx_commit(Q, List) || Q <- Qs] end
+ ]]),
+ io:format("Published ~p ~p-byte messages in ~p microseconds to ~p queues (~p microseconds/msg) (~p microseconds/byte)~n",
+ [MsgCount, MsgSizeBytes, Micros, QCount, (Micros / (MsgCount * QCount)), (Micros / (MsgCount * QCount * MsgSizeBytes))]),
+ {Micros2, ok} = timer:tc(?MODULE, rdq_time_commands,
+ [[fun() -> [begin [begin rabbit_disk_queue:deliver(Q, N), ok end || N <- List],
+ rabbit_disk_queue:ack(Q, List),
+ rabbit_disk_queue:tx_commit(Q, [])
+ end || Q <- Qs]
+ end]]),
+ io:format("Delivered ~p ~p-byte messages in ~p microseconds from ~p queues (~p microseconds/msg) (~p microseconds/byte)~n", [MsgCount, MsgSizeBytes, Micros2, QCount, (Micros2 / (MsgCount * QCount)), (Micros2 / (MsgCount * QCount * MsgSizeBytes))]),
rdq_stop().
rdq_time_commands(Funcs) ->