diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-04-10 16:07:21 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-04-10 16:07:21 +0100 |
| commit | a1c140505e6bd32d57fabc37597981bc34f11e53 (patch) | |
| tree | 639f5875e132e6efc0b385324c08d380fa245cf0 | |
| parent | 41376d3da57588fc5c3536e80edc048bbfaf6434 (diff) | |
| download | rabbitmq-server-git-a1c140505e6bd32d57fabc37597981bc34f11e53.tar.gz | |
initial fixes from testing
| -rw-r--r-- | src/rabbit_disk_queue.erl | 16 |
1 files changed, 8 insertions, 8 deletions
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 5dc2095334..dbd81215f6 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -92,8 +92,6 @@ tx_cancel(MsgIds) when is_list(MsgIds) -> init([FileSizeLimit, ReadFileHandlesLimit]) -> process_flag(trap_exit, true), - Dir = base_directory(), - ok = filelib:ensure_dir(Dir), State = #dqstate { msg_location = ets:new((?MSG_LOC_ETS_NAME), [set, private]), file_summary = dict:new(), file_detail = ets:new((?FILE_DETAIL_ETS_NAME), [bag, private]), @@ -105,7 +103,9 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> read_file_handles_limit = ReadFileHandlesLimit }, {ok, State1 = #dqstate { current_file_name = CurrentName } } = load_from_disk(State), - {ok, FileHdl} = file:open(form_filename(CurrentName), [append, raw, binary]), + Path = form_filename(CurrentName), + ok = filelib:ensure_dir(Path), + {ok, FileHdl} = file:open(Path, [append, raw, binary]), {ok, State1 # dqstate { current_file_handle = FileHdl }}. handle_call({deliver, Q, MsgId}, _From, State) -> @@ -151,7 +151,7 @@ form_filename(Name) -> filename:join(base_directory(), Name). base_directory() -> - filename:join(mnesia:system_info(directory), "/rabbit_disk_queue/"). + filename:join(mnesia:system_info(directory), "rabbit_disk_queue/"). file_packing_adjustment_bytes() -> 1 + (2* (?INTEGER_SIZE_BYTES)). @@ -332,7 +332,7 @@ load_from_disk(State) -> (State1 = #dqstate{ msg_location = MsgLocation }) = load_messages(undefined, Files, State), % Finally, check there is nothing in mnesia which we haven't loaded true = lists:all(fun ({_Q, MsgId}) -> 1 =:= length(ets:lookup(MsgLocation, MsgId)) end, - mnesia:all_keys(rabbit_disk_queue)), + mnesia:async_dirty(fun() -> mnesia:all_keys(rabbit_disk_queue) end)), {ok, State1}. load_messages(undefined, [], State) -> @@ -349,7 +349,7 @@ load_messages(Left, [File|Files], {ok, Messages} = scan_file_for_valid_messages(form_filename(File)), {ValidMessagesRev, ValidTotalSize} = lists:foldl( fun ({MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) -> - case length(mnesia:match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'}, read)) of + case length(mnesia:dirty_match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'})) of 0 -> {VMAcc, VTSAcc}; RefCount -> true = ets:insert_new(MsgLocation, {MsgId, RefCount, File, Offset, TotalSize}), @@ -379,7 +379,7 @@ recover_crashed_compactions(Files, [TmpFile|TmpFiles]) -> {ok, UncorruptedMessagesTmp} = scan_file_for_valid_messages(form_filename(TmpFile)), % all of these messages should appear in the mnesia table, otherwise they wouldn't have been copied out lists:foreach(fun ({MsgId, _TotalSize, _FileOffset}) -> - 0 < length(mnesia:match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'}, read)) + 0 < length(mnesia:dirty_match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'})) end, UncorruptedMessagesTmp), {ok, UncorruptedMessages} = scan_file_for_valid_messages(form_filename(NonTmpRelatedFile)), %% 1) It's possible that everything in the tmp file is also in the main file @@ -412,7 +412,7 @@ recover_crashed_compactions(Files, [TmpFile|TmpFiles]) -> % we're in case 4 above. % check that everything in the main file is a valid message in mnesia lists:foreach(fun (MsgId) -> - 0 < length(mnesia:match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'}, read)) + 0 < length(mnesia:dirty_match_object(rabbit_disk_queue, {dq_msg_loc, {'_', MsgId}, '_'})) end, MsgIds), % The main file should be contiguous {Top, MsgIds} = find_contiguous_block_prefix(UncorruptedMessages), |
