summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-04-10 16:07:21 +0100
committerMatthew Sackman <matthew@lshift.net>2009-04-10 16:07:21 +0100
commita1c140505e6bd32d57fabc37597981bc34f11e53 (patch)
tree639f5875e132e6efc0b385324c08d380fa245cf0
parent41376d3da57588fc5c3536e80edc048bbfaf6434 (diff)
downloadrabbitmq-server-git-a1c140505e6bd32d57fabc37597981bc34f11e53.tar.gz
initial fixes from testing
-rw-r--r--src/rabbit_disk_queue.erl16
1 files changed, 8 insertions, 8 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 5dc2095334..dbd81215f6 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -92,8 +92,6 @@ tx_cancel(MsgIds) when is_list(MsgIds) ->
init([FileSizeLimit, ReadFileHandlesLimit]) ->
process_flag(trap_exit, true),
- Dir = base_directory(),
- ok = filelib:ensure_dir(Dir),
State = #dqstate { msg_location = ets:new((?MSG_LOC_ETS_NAME), [set, private]),
file_summary = dict:new(),
file_detail = ets:new((?FILE_DETAIL_ETS_NAME), [bag, private]),
@@ -105,7 +103,9 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
read_file_handles_limit = ReadFileHandlesLimit
},
{ok, State1 = #dqstate { current_file_name = CurrentName } } = load_from_disk(State),
- {ok, FileHdl} = file:open(form_filename(CurrentName), [append, raw, binary]),
+ Path = form_filename(CurrentName),
+ ok = filelib:ensure_dir(Path),
+ {ok, FileHdl} = file:open(Path, [append, raw, binary]),
{ok, State1 # dqstate { current_file_handle = FileHdl }}.
handle_call({deliver, Q, MsgId}, _From, State) ->
@@ -151,7 +151,7 @@ form_filename(Name) ->
filename:join(base_directory(), Name).
base_directory() ->
- filename:join(mnesia:system_info(directory), "/rabbit_disk_queue/").
+ filename:join(mnesia:system_info(directory), "rabbit_disk_queue/").
file_packing_adjustment_bytes() ->
1 + (2* (?INTEGER_SIZE_BYTES)).
@@ -332,7 +332,7 @@ load_from_disk(State) ->
(State1 = #dqstate{ msg_location = MsgLocation }) = load_messages(undefined, Files, State),
% Finally, check there is nothing in mnesia which we haven't loaded
true = lists:all(fun ({_Q, MsgId}) -> 1 =:= length(ets:lookup(MsgLocation, MsgId)) end,
- mnesia:all_keys(rabbit_disk_queue)),
+ mnesia:async_dirty(fun() -> mnesia:all_keys(rabbit_disk_queue) end)),
{ok, State1}.
load_messages(undefined, [], State) ->
@@ -349,7 +349,7 @@ load_messages(Left, [File|Files],
{ok, Messages} = scan_file_for_valid_messages(form_filename(File)),
{ValidMessagesRev, ValidTotalSize} = lists:foldl(
fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
- case length(mnesia:match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'}, read)) of
+ case length(mnesia:dirty_match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'})) of
0 -> {VMAcc, VTSAcc};
RefCount ->
true = ets:insert_new(MsgLocation, {MsgId, RefCount, File, Offset, TotalSize}),
@@ -379,7 +379,7 @@ recover_crashed_compactions(Files, [TmpFile|TmpFiles]) ->
{ok, UncorruptedMessagesTmp} = scan_file_for_valid_messages(form_filename(TmpFile)),
% all of these messages should appear in the mnesia table, otherwise they wouldn't have been copied out
lists:foreach(fun ({MsgId, _TotalSize, _FileOffset}) ->
- 0 < length(mnesia:match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'}, read))
+ 0 < length(mnesia:dirty_match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'}))
end, UncorruptedMessagesTmp),
{ok, UncorruptedMessages} = scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)),
%% 1) It's possible that everything in the tmp file is also in the main file
@@ -412,7 +412,7 @@ recover_crashed_compactions(Files, [TmpFile|TmpFiles]) ->
% we're in case 4 above.
% check that everything in the main file is a valid message in mnesia
lists:foreach(fun (MsgId) ->
- 0 < length(mnesia:match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'}, read))
+ 0 < length(mnesia:dirty_match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'}))
end, MsgIds),
% The main file should be contiguous
{Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages),