diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_disk_queue.erl | 28 |
2 files changed, 18 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7c19ea7291..9d97e881ac 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -120,10 +120,10 @@ init(Q = #amqqueue { name = QName, durable = Durable }) -> terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? - QName = qname(State), - rabbit_mixed_queue:delete_queue(State #q.mixed_state), - stop_memory_timer(State), - ok = rabbit_amqqueue:internal_delete(QName). + State1 = stop_memory_timer(State), + QName = qname(State1), + ok = rabbit_amqqueue:internal_delete(QName), + {ok, _MS} = rabbit_mixed_queue:delete_queue(State1 #q.mixed_state). code_change(_OldVsn, State, _Extra) -> {ok, State}. diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 5d7c7a358d..0ff4c50e73 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -380,15 +380,6 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> process_flag(trap_exit, true), ok = rabbit_memory_manager:register (self(), true, rabbit_disk_queue, set_mode, []), - Node = node(), - ok = - case mnesia:change_table_copy_type(rabbit_disk_queue, Node, - disc_copies) of - {atomic, ok} -> ok; - {aborted, {already_exists, rabbit_disk_queue, Node, - disc_copies}} -> ok; - E -> E - end, ok = filelib:ensure_dir(form_filename("nothing")), file:delete(form_filename(atom_to_list(?MSG_LOC_NAME) ++ ?FILE_EXTENSION_DETS)), @@ -438,6 +429,15 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> {ok, State1 = #dqstate { current_file_name = CurrentName, current_offset = Offset } } = load_from_disk(State), + Node = node(), + ok = + case mnesia:change_table_copy_type(rabbit_disk_queue, Node, + disc_copies) of + {atomic, ok} -> ok; + {aborted, {already_exists, rabbit_disk_queue, Node, + disc_copies}} -> ok; + E -> E + end, %% read is only needed so that we can seek {ok, FileHdl} = file:open(form_filename(CurrentName), [read, write, raw, binary, delayed_write]), @@ -1757,11 +1757,12 @@ recover_crashed_compactions1(Files, TmpFile) -> %% consist only of valid messages. Plan: Truncate the main file %% back to before any of the files in the tmp file and copy %% them over again + TmpPath = form_filename(TmpFile), case lists:all(fun (MsgId) -> lists:member(MsgId, MsgIds) end, MsgIdsTmp) of true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file %% note this also catches the case when the tmp file %% is empty - ok = file:delete(TmpFile); + ok = file:delete(TmpPath); false -> %% we're in case 4 above. Check that everything in the %% main file is a valid message in mnesia @@ -1793,13 +1794,12 @@ recover_crashed_compactions1(Files, TmpFile) -> %% single move if we run out of disk space, this truncate %% could fail, but we still aren't risking losing data ok = file:truncate(MainHdl), - {ok, TmpHdl} = file:open(form_filename(TmpFile), - [read, raw, binary, read_ahead]), + {ok, TmpHdl} = file:open(TmpPath, [read, raw, binary, read_ahead]), {ok, TmpSize} = file:copy(TmpHdl, MainHdl, TmpSize), ok = file:sync(MainHdl), ok = file:close(MainHdl), ok = file:close(TmpHdl), - ok = file:delete(TmpFile), + ok = file:delete(TmpPath), {ok, _MainMessages, MsgIdsMain} = scan_file_for_valid_messages_msg_ids(NonTmpRelatedFile), @@ -1910,7 +1910,7 @@ read_next_file_entry(FileHdl, Offset) -> case file:read(FileHdl, TwoIntegers) of {ok, <<TotalSize:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} -> - case {TotalSize =:= 0, MsgIdBinSize =:= 0} of + case {TotalSize =< 0, MsgIdBinSize =< 0} of {true, _} -> eof; %% Nothing we can do other than stop {false, true} -> %% current message corrupted, try skipping past it |
