summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-04-22 13:01:53 +0100
committerMatthew Sackman <matthew@lshift.net>2009-04-22 13:01:53 +0100
commit8e0df683b31d15039b4751554f9248809d10a752 (patch)
treec8dfe88b1a215d7a9d5c78e1c6d4d8860bdd3f17 /src
parent29d323a6881a508977d7382e15b379695648ed13 (diff)
downloadrabbitmq-server-git-8e0df683b31d15039b4751554f9248809d10a752.tar.gz
a bit more refactoring. Also, drop file size and file handle count in tests to stress those code paths more.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_disk_queue.erl70
-rw-r--r--src/rabbit_tests.erl4
2 files changed, 36 insertions, 38 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 26f831e343..ecb8c91e76 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -137,7 +137,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
%% read is only needed so that we can seek
{ok, FileHdl} = file:open(Path, [read, write, raw, binary, delayed_write]),
{ok, Offset} = file:position(FileHdl, {bof, Offset}),
- {ok, State1 # dqstate { current_file_handle = FileHdl }}.
+ {ok, State1 #dqstate { current_file_handle = FileHdl }}.
handle_call({deliver, Q}, _From, State) ->
{ok, Result, State1} = internal_deliver(Q, State),
@@ -156,8 +156,8 @@ handle_call(clean_stop, _From, State) ->
true = ets:delete(Sequences),
lists:foreach(fun file:delete/1, filelib:wildcard(form_filename("*"))),
{stop, normal, ok,
- State1 # dqstate { current_file_handle = undefined,
- read_file_handles = {dict:new(), gb_trees:empty()}}}.
+ State1 #dqstate { current_file_handle = undefined,
+ read_file_handles = {dict:new(), gb_trees:empty()}}}.
%% gen_server now calls terminate, which then calls shutdown
handle_cast({publish, Q, MsgId, MsgBody}, State) ->
@@ -194,8 +194,8 @@ shutdown(State = #dqstate { msg_location = MsgLocation,
dict:fold(fun (_File, Hdl, _Acc) ->
file:close(Hdl)
end, ok, ReadHdls),
- State # dqstate { current_file_handle = undefined,
- read_file_handles = {dict:new(), gb_trees:empty()}}.
+ State #dqstate { current_file_handle = undefined,
+ read_file_handles = {dict:new(), gb_trees:empty()}}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -232,26 +232,24 @@ internal_deliver(Q, State =
{ok, Hdl} = file:open(form_filename(File),
[read, raw, binary,
read_ahead]),
- {ReadHdls2, ReadHdlsAge2} =
- case dict:size(ReadHdls) < ReadFileHandlesLimit of
- true ->
- {ReadHdls, ReadHdlsAge};
- _False ->
- {_Then, OldFile, ReadHdlsAge3} =
- gb_trees:take_smallest(ReadHdlsAge),
- {ok, {OldHdl, _Then}} =
- dict:find(OldFile, ReadHdls),
- ok = file:close(OldHdl),
- {dict:erase(OldFile, ReadHdls),
- ReadHdlsAge3}
- end,
- {Hdl, dict:store(File, {Hdl, Now}, ReadHdls2),
- gb_trees:enter(Now, File, ReadHdlsAge2)};
+ case dict:size(ReadHdls) < ReadFileHandlesLimit of
+ true ->
+ {Hdl, ReadHdls, ReadHdlsAge};
+ _False ->
+ {Then, OldFile, ReadHdlsAge3} =
+ gb_trees:take_smallest(ReadHdlsAge),
+ {ok, {OldHdl, Then}} =
+ dict:find(OldFile, ReadHdls),
+ ok = file:close(OldHdl),
+ {Hdl, dict:erase(OldFile, ReadHdls),
+ ReadHdlsAge3}
+ end;
{ok, {Hdl, Then}} ->
- {Hdl, dict:store(File, {Hdl, Now}, ReadHdls),
- gb_trees:enter(Now, File,
- gb_trees:delete(Then, ReadHdlsAge))}
+ {Hdl, ReadHdls,
+ gb_trees:delete(Then, ReadHdlsAge)}
end,
+ ReadHdls2 = dict:store(File, {FileHdl, Now}, ReadHdls1),
+ ReadHdlsAge2 = gb_trees:enter(Now, File, ReadHdlsAge1),
%% read the message
{ok, {MsgBody, BodySize}} =
read_message_at_offset(FileHdl, Offset, TotalSize),
@@ -261,7 +259,7 @@ internal_deliver(Q, State =
end,
true = ets:insert(Sequences, {Q, ReadSeqId + 1, WriteSeqId}),
{ok, {MsgId, MsgBody, BodySize, Delivered, {MsgId, ReadSeqId}},
- State # dqstate { read_file_handles = {ReadHdls1, ReadHdlsAge1} }}
+ State #dqstate { read_file_handles = {ReadHdls2, ReadHdlsAge2} }}
end
end.
@@ -272,10 +270,10 @@ internal_ack(Q, MsgIds, State) ->
%% called from tx_cancel with MnesiaDelete = false
%% called from ack with MnesiaDelete = true
remove_messages(Q, MsgSeqIds, MnesiaDelete,
- State = # dqstate { msg_location = MsgLocation,
- file_summary = FileSummary,
- current_file_name = CurName
- }) ->
+ State = #dqstate { msg_location = MsgLocation,
+ file_summary = FileSummary,
+ current_file_name = CurName
+ }) ->
Files =
lists:foldl(
fun ({MsgId, SeqId}, Files2) ->
@@ -334,7 +332,7 @@ internal_tx_publish(MsgId, MsgBody,
ContiguousTop1, Left, undefined}),
NextOffset = CurOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT,
maybe_roll_to_new_file(NextOffset,
- State # dqstate {current_offset = NextOffset});
+ State #dqstate {current_offset = NextOffset});
[{MsgId, RefCount, File, Offset, TotalSize}] ->
%% We already know about it, just update counter
ok = dets:insert(MsgLocation, {MsgId, RefCount + 1, File,
@@ -417,11 +415,11 @@ maybe_roll_to_new_file(Offset,
[write, raw, binary, delayed_write]),
true = ets:update_element(FileSummary, CurName, {5, NextName}), %% 5 is Right
true = ets:insert_new(FileSummary, {NextName, 0, 0, CurName, undefined}),
- {ok, State # dqstate { current_file_name = NextName,
- current_file_handle = NextHdl,
- current_file_num = NextNum,
- current_offset = 0
- }};
+ {ok, State #dqstate { current_file_name = NextName,
+ current_file_handle = NextHdl,
+ current_file_num = NextNum,
+ current_offset = 0
+ }};
maybe_roll_to_new_file(_, State) ->
{ok, State}.
@@ -745,8 +743,8 @@ load_messages(Left, [], State = #dqstate { msg_location = MsgLocation }) ->
sortMsgLocationsByOffset(false, L),
MaxOffset + TotalSize + ?FILE_PACKING_ADJUSTMENT
end,
- State # dqstate { current_file_num = Num, current_file_name = Left,
- current_offset = Offset };
+ State #dqstate { current_file_num = Num, current_file_name = Left,
+ current_offset = Offset };
load_messages(Left, [File|Files],
State = #dqstate { msg_location = MsgLocation,
file_summary = FileSummary
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index cce9da1a68..08b05da282 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -771,12 +771,12 @@ rdq_time_commands(Funcs) ->
rdq_virgin() ->
{Micros, {ok, _}} =
- timer:tc(rabbit_disk_queue, start_link, [1024*1024*10, 1000]),
+ timer:tc(rabbit_disk_queue, start_link, [1024*1024, 5]),
ok = rabbit_disk_queue:clean_stop(),
Micros.
rdq_start() ->
- {ok, _} = rabbit_disk_queue:start_link(1024*1024*10, 1000).
+ {ok, _} = rabbit_disk_queue:start_link(1024*1024, 5).
rdq_stop() ->
rabbit_disk_queue:stop().