summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit.erl16
-rw-r--r--src/rabbit_amqqueue.erl58
-rw-r--r--src/rabbit_disk_queue.erl50
-rw-r--r--src/rabbit_mixed_queue.erl9
-rw-r--r--src/rabbit_tests.erl6
5 files changed, 89 insertions, 50 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index e79c7f5936..ce73f6ce6d 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -145,17 +145,21 @@ start(normal, []) ->
ok = start_child(rabbit_router),
ok = start_child(rabbit_node_monitor)
end},
- {"recovery",
- fun () ->
- ok = maybe_insert_default_data(),
- ok = rabbit_exchange:recover(),
- ok = rabbit_amqqueue:recover()
- end},
{"disk queue",
fun () ->
ok = start_child(rabbit_disk_queue),
ok = rabbit_disk_queue:to_ram_disk_mode() %% TODO, CHANGE ME
end},
+ {"recovery",
+ fun () ->
+ ok = maybe_insert_default_data(),
+ ok = rabbit_exchange:recover(),
+ {ok, DurableQueues} = rabbit_amqqueue:recover(),
+ DurableQueueNames =
+ sets:from_list(lists:map(
+ fun(Q) -> Q #amqqueue.name end, DurableQueues)),
+ ok = rabbit_disk_queue:delete_non_durable_queues(DurableQueueNames)
+ end},
{"guid generator",
fun () ->
ok = start_child(rabbit_guid)
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 0316788fe1..c56e51888b 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -119,37 +119,39 @@ start() ->
ok.
recover() ->
- ok = recover_durable_queues(),
- ok.
+ {ok, DurableQueues} = recover_durable_queues(),
+ {ok, DurableQueues}.
recover_durable_queues() ->
Node = node(),
- lists:foreach(
- fun (RecoveredQ) ->
- Q = start_queue_process(RecoveredQ),
- %% We need to catch the case where a client connected to
- %% another node has deleted the queue (and possibly
- %% re-created it).
- case rabbit_misc:execute_mnesia_transaction(
- fun () -> case mnesia:match_object(
- rabbit_durable_queue, RecoveredQ, read) of
- [_] -> ok = store_queue(Q),
- true;
- [] -> false
- end
- end) of
- true -> ok;
- false -> exit(Q#amqqueue.pid, shutdown)
- end
- end,
- %% TODO: use dirty ops instead
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
- <- mnesia:table(rabbit_durable_queue),
- node(Pid) == Node]))
- end)),
- ok.
+ DurableQueues =
+ lists:foldl(
+ fun (RecoveredQ, Acc) ->
+ Q = start_queue_process(RecoveredQ),
+ %% We need to catch the case where a client connected to
+ %% another node has deleted the queue (and possibly
+ %% re-created it).
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () -> case mnesia:match_object(
+ rabbit_durable_queue, RecoveredQ, read) of
+ [_] -> ok = store_queue(Q),
+ true;
+ [] -> false
+ end
+ end) of
+ true -> [Q|Acc];
+ false -> exit(Q#amqqueue.pid, shutdown),
+ Acc
+ end
+ end, [],
+ %% TODO: use dirty ops instead
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ qlc:e(qlc:q([Q || Q = #amqqueue{pid = Pid}
+ <- mnesia:table(rabbit_durable_queue),
+ node(Pid) == Node]))
+ end)),
+ {ok, DurableQueues}.
declare(QueueName, Durable, AutoDelete, Args) ->
Q = start_queue_process(#amqqueue{name = QueueName,
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 2ffb9a754d..b7eca499d6 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -41,7 +41,7 @@
-export([publish/3, publish_with_seq/4, deliver/1, phantom_deliver/1, ack/2,
tx_publish/2, tx_commit/3, tx_commit_with_seqs/3, tx_cancel/1,
requeue/2, requeue_with_seqs/2, purge/1, delete_queue/1,
- dump_queue/1
+ dump_queue/1, delete_non_durable_queues/1
]).
-export([length/1, is_empty/1]).
@@ -248,7 +248,8 @@
-spec(requeue_with_seqs/2 :: (queue_name(), [{{msg_id(), seq_id()}, seq_id_or_next()}]) -> 'ok').
-spec(purge/1 :: (queue_name()) -> non_neg_integer()).
-spec(dump_queue/1 :: (queue_name()) -> [{msg_id(), binary(), non_neg_integer(),
- bool(), {msg_id(), seq_id()}}]).
+ bool(), seq_id()}]).
+-spec(delete_non_durable_queues/1 :: (set()) -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_obliterate/0 :: () -> 'ok').
-spec(to_ram_disk_mode/0 :: () -> 'ok').
@@ -307,6 +308,9 @@ delete_queue(Q) ->
dump_queue(Q) ->
gen_server2:call(?SERVER, {dump_queue, Q}, infinity).
+delete_non_durable_queues(DurableQueues) ->
+ gen_server2:call(?SERVER, {delete_non_durable_queues, DurableQueues}, infinity).
+
stop() ->
gen_server2:call(?SERVER, stop, infinity).
@@ -346,7 +350,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
E -> E
end,
ok = filelib:ensure_dir(form_filename("nothing")),
- InitName = "0" ++ ?FILE_EXTENSION,
+ file:delete(form_filename(atom_to_list(?MSG_LOC_NAME) ++
+ ?FILE_EXTENSION_DETS)),
{ok, MsgLocationDets} =
dets:open_file(?MSG_LOC_NAME,
[{file, form_filename(atom_to_list(?MSG_LOC_NAME) ++
@@ -360,6 +365,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
%% it would be better to have this as private, but dets:from_ets/2
%% seems to blow up if it is set private
MsgLocationEts = ets:new(?MSG_LOC_NAME, [set, protected]),
+
+ InitName = "0" ++ ?FILE_EXTENSION,
State =
#dqstate { msg_location_dets = MsgLocationDets,
msg_location_ets = MsgLocationEts,
@@ -449,7 +456,10 @@ handle_call({length, Q}, _From, State = #dqstate { sequences = Sequences }) ->
end;
handle_call({dump_queue, Q}, _From, State) ->
{Result, State1} = internal_dump_queue(Q, State),
- {reply, Result, State1}.
+ {reply, Result, State1};
+handle_call({delete_non_durable_queues, DurableQueues}, _From, State) ->
+ {ok, State1} = internal_delete_non_durable_queues(DurableQueues, State),
+ {reply, ok, State1}.
handle_cast({publish, Q, MsgId, MsgBody}, State) ->
{ok, State1} = internal_publish(Q, MsgId, next, MsgBody, State),
@@ -928,13 +938,27 @@ internal_dump_queue(Q, State = #dqstate { sequences = Sequences }) ->
fun ({SeqId, _State1}) when SeqId == WriteSeq ->
false;
({SeqId, State1}) ->
- {ok, Result, NextReadSeqId, State2} =
+ {ok, {MsgId, Msg, Size, Delivered, {MsgId, SeqId}}, NextReadSeqId, State2} =
internal_read_message(Q, SeqId, true, true, State1),
- {true, Result, {NextReadSeqId, State2}}
+ {true, {MsgId, Msg, Size, Delivered, SeqId}, {NextReadSeqId, State2}}
end, {ReadSeq, State}),
{lists:reverse(QList), State3}
end.
+internal_delete_non_durable_queues(DurableQueues, State = #dqstate { sequences = Sequences }) ->
+ State3 =
+ ets:foldl(
+ fun ({Q, _Read, _Write, _Length}, State1) ->
+ case sets:is_element(Q, DurableQueues) of
+ true ->
+ State1;
+ false ->
+ {ok, State2} = internal_delete_queue(Q, State1),
+ State2
+ end
+ end, State, Sequences),
+ {ok, State3}.
+
%% ---- ROLLING OVER THE APPEND FILE ----
maybe_roll_to_new_file(Offset,
@@ -1064,10 +1088,10 @@ combine_files({Source, SourceValid, _SourceContiguousTop,
State = close_file(Source, close_file(Destination, State1)),
{ok, SourceHdl} =
file:open(form_filename(Source),
- [read, write, raw, binary, delayed_write, read_ahead]),
+ [read, write, raw, binary, read_ahead, delayed_write]),
{ok, DestinationHdl} =
file:open(form_filename(Destination),
- [read, write, raw, binary, delayed_write, read_ahead]),
+ [read, write, raw, binary, read_ahead, delayed_write]),
ExpectedSize = SourceValid + DestinationValid,
%% if DestinationValid =:= DestinationContiguousTop then we don't need a tmp file
%% if they're not equal, then we need to write out everything past the DestinationContiguousTop to a tmp file
@@ -1080,7 +1104,7 @@ combine_files({Source, SourceValid, _SourceContiguousTop,
Tmp = filename:rootname(Destination) ++ ?FILE_EXTENSION_TMP,
{ok, TmpHdl} =
file:open(form_filename(Tmp),
- [read, write, raw, binary, delayed_write, read_ahead]),
+ [read, write, raw, binary, read_ahead, delayed_write]),
Worklist =
lists:dropwhile(
fun ({_, _, _, Offset, _})
@@ -1262,9 +1286,11 @@ load_from_disk(State) ->
{atomic, true} = mnesia:transaction(
fun() ->
ok = mnesia:read_lock_table(rabbit_disk_queue),
- mnesia:foldl(fun (#dq_msg_loc { msg_id = MsgId }, true) ->
- true = 1 =:=
- erlang:length(dets_ets_lookup(State1, MsgId))
+ mnesia:foldl(fun (#dq_msg_loc { msg_id = MsgId, queue_and_seq_id = {Q, SeqId} }, true) ->
+ case erlang:length(dets_ets_lookup(State1, MsgId)) of
+ 0 -> ok == mnesia:delete(rabbit_disk_queue, {Q, SeqId}, write);
+ 1 -> true
+ end
end,
true, rabbit_disk_queue)
end),
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index d1000c887e..8455bf1c9d 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -48,7 +48,14 @@
).
start_link(Queue, IsDurable, Mode) when Mode =:= disk orelse Mode =:= mixed ->
- {ok, #mqstate { mode = Mode, msg_buf = queue:new(), next_write_seq = 1,
+ QList = rabbit_disk_queue:dump_queue(Queue),
+ {MsgBuf, NextSeq} =
+ lists:foldl(
+ fun ({MsgId, Msg, Size, Delivered, SeqId}, {Buf, NSeq})
+ when SeqId >= NSeq ->
+ {queue:in({SeqId, Msg, Delivered}, Buf), SeqId + 1}
+ end, {queue:new(), 0}, QList),
+ {ok, #mqstate { mode = Mode, msg_buf = MsgBuf, next_write_seq = NextSeq,
queue = Queue, is_durable = IsDurable }}.
msg_to_bin(Msg = #basic_message { content = Content }) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 75b36d6f3d..70fc45e099 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -908,7 +908,7 @@ rdq_test_dump_queue() ->
[rabbit_disk_queue:tx_publish(N, Msg) || N <- All],
rabbit_disk_queue:tx_commit(q, All, []),
io:format("Publish done~n", []),
- QList = [{N, Msg, 256, false, {N, (N-1)}} || N <- All],
+ QList = [{N, Msg, 256, false, (N-1)} || N <- All],
QList = rabbit_disk_queue:dump_queue(q),
rdq_stop(),
io:format("dump ok undelivered~n", []),
@@ -916,13 +916,13 @@ rdq_test_dump_queue() ->
lists:foreach(
fun (N) ->
Remaining = Total - N,
- {N, Msg, 256, false, SeqId, Remaining} = rabbit_disk_queue:deliver(q)
+ {N, Msg, 256, false, _SeqId, Remaining} = rabbit_disk_queue:deliver(q)
end, All),
[] = rabbit_disk_queue:dump_queue(q),
rdq_stop(),
io:format("dump ok post delivery~n", []),
rdq_start(),
- QList2 = [{N, Msg, 256, true, {N, (N-1)}} || N <- All],
+ QList2 = [{N, Msg, 256, true, (N-1)} || N <- All],
QList2 = rabbit_disk_queue:dump_queue(q),
io:format("dump ok post delivery + restart~n", []),
passed.