summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl8
-rw-r--r--src/rabbit_disk_queue.erl28
2 files changed, 18 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 7c19ea7291..9d97e881ac 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -120,10 +120,10 @@ init(Q = #amqqueue { name = QName, durable = Durable }) ->
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
- QName = qname(State),
- rabbit_mixed_queue:delete_queue(State #q.mixed_state),
- stop_memory_timer(State),
- ok = rabbit_amqqueue:internal_delete(QName).
+ State1 = stop_memory_timer(State),
+ QName = qname(State1),
+ ok = rabbit_amqqueue:internal_delete(QName),
+ {ok, _MS} = rabbit_mixed_queue:delete_queue(State1 #q.mixed_state).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 5d7c7a358d..0ff4c50e73 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -380,15 +380,6 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
process_flag(trap_exit, true),
ok = rabbit_memory_manager:register
(self(), true, rabbit_disk_queue, set_mode, []),
- Node = node(),
- ok =
- case mnesia:change_table_copy_type(rabbit_disk_queue, Node,
- disc_copies) of
- {atomic, ok} -> ok;
- {aborted, {already_exists, rabbit_disk_queue, Node,
- disc_copies}} -> ok;
- E -> E
- end,
ok = filelib:ensure_dir(form_filename("nothing")),
file:delete(form_filename(atom_to_list(?MSG_LOC_NAME) ++
?FILE_EXTENSION_DETS)),
@@ -438,6 +429,15 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
{ok, State1 = #dqstate { current_file_name = CurrentName,
current_offset = Offset } } =
load_from_disk(State),
+ Node = node(),
+ ok =
+ case mnesia:change_table_copy_type(rabbit_disk_queue, Node,
+ disc_copies) of
+ {atomic, ok} -> ok;
+ {aborted, {already_exists, rabbit_disk_queue, Node,
+ disc_copies}} -> ok;
+ E -> E
+ end,
%% read is only needed so that we can seek
{ok, FileHdl} = file:open(form_filename(CurrentName),
[read, write, raw, binary, delayed_write]),
@@ -1757,11 +1757,12 @@ recover_crashed_compactions1(Files, TmpFile) ->
%% consist only of valid messages. Plan: Truncate the main file
%% back to before any of the files in the tmp file and copy
%% them over again
+ TmpPath = form_filename(TmpFile),
case lists:all(fun (MsgId) -> lists:member(MsgId, MsgIds) end, MsgIdsTmp) of
true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file
%% note this also catches the case when the tmp file
%% is empty
- ok = file:delete(TmpFile);
+ ok = file:delete(TmpPath);
false ->
%% we're in case 4 above. Check that everything in the
%% main file is a valid message in mnesia
@@ -1793,13 +1794,12 @@ recover_crashed_compactions1(Files, TmpFile) ->
%% single move if we run out of disk space, this truncate
%% could fail, but we still aren't risking losing data
ok = file:truncate(MainHdl),
- {ok, TmpHdl} = file:open(form_filename(TmpFile),
- [read, raw, binary, read_ahead]),
+ {ok, TmpHdl} = file:open(TmpPath, [read, raw, binary, read_ahead]),
{ok, TmpSize} = file:copy(TmpHdl, MainHdl, TmpSize),
ok = file:sync(MainHdl),
ok = file:close(MainHdl),
ok = file:close(TmpHdl),
- ok = file:delete(TmpFile),
+ ok = file:delete(TmpPath),
{ok, _MainMessages, MsgIdsMain} =
scan_file_for_valid_messages_msg_ids(NonTmpRelatedFile),
@@ -1910,7 +1910,7 @@ read_next_file_entry(FileHdl, Offset) ->
case file:read(FileHdl, TwoIntegers) of
{ok,
<<TotalSize:?INTEGER_SIZE_BITS, MsgIdBinSize:?INTEGER_SIZE_BITS>>} ->
- case {TotalSize =:= 0, MsgIdBinSize =:= 0} of
+ case {TotalSize =< 0, MsgIdBinSize =< 0} of
{true, _} -> eof; %% Nothing we can do other than stop
{false, true} ->
%% current message corrupted, try skipping past it