summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl50
-rw-r--r--src/rabbit_amqqueue_process.erl49
-rw-r--r--src/rabbit_misc.erl24
-rw-r--r--src/rabbit_msg_store.erl7
-rw-r--r--src/rabbit_queue_index.erl51
-rw-r--r--src/rabbit_tests.erl1
6 files changed, 130 insertions, 52 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index f0540c93a2..b6e92e0698 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -128,6 +128,7 @@
%%----------------------------------------------------------------------------
start() ->
+ ok = rabbit_msg_store:clean(?TRANSIENT_MSG_STORE, rabbit_mnesia:dir()),
ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store,
[?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(),
fun (ok) -> finished end, ok]),
@@ -152,26 +153,32 @@ find_durable_queues() ->
end).
recover_durable_queues(DurableQueues) ->
- lists:foldl(
- fun (RecoveredQ, Acc) ->
- Q = start_queue_process(RecoveredQ),
- %% We need to catch the case where a client connected to
- %% another node has deleted the queue (and possibly
- %% re-created it).
- case rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:match_object(
- rabbit_durable_queue, RecoveredQ, read) of
- [_] -> ok = store_queue(Q),
- true;
- [] -> false
- end
- end) of
- true -> [Q|Acc];
- false -> exit(Q#amqqueue.pid, shutdown),
- Acc
- end
- end, [], DurableQueues).
+ Qs = lists:foldl(
+ fun (RecoveredQ, Acc) ->
+ Q = start_queue_process(RecoveredQ),
+ %% We need to catch the case where a client
+ %% connected to another node has deleted the queue
+ %% (and possibly re-created it).
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:match_object(
+ rabbit_durable_queue, RecoveredQ,
+ read) of
+ [_] -> ok = store_queue(Q),
+ true;
+ [] -> false
+ end
+ end) of
+ true ->
+ ok = gen_server2:cast(Q#amqqueue.pid,
+ init_variable_queue),
+ [Q|Acc];
+ false -> exit(Q#amqqueue.pid, shutdown),
+ Acc
+ end
+ end, [], DurableQueues),
+ [ok = gen_server2:call(Q#amqqueue.pid, sync, infinity) || Q <- Qs],
+ Qs.
declare(QueueName, Durable, AutoDelete, Args) ->
Q = start_queue_process(#amqqueue{name = QueueName,
@@ -202,7 +209,8 @@ internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) ->
end) of
not_found -> exit(Q#amqqueue.pid, shutdown),
rabbit_misc:not_found(QueueName);
- Q -> Q;
+ Q -> ok = gen_server2:cast(Q#amqqueue.pid, init_variable_queue),
+ Q;
ExistingQ -> exit(Q#amqqueue.pid, shutdown),
ExistingQ
end.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 41435c08c9..e6c8d238f8 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -105,19 +105,18 @@ info_keys() -> ?INFO_KEYS.
%%----------------------------------------------------------------------------
-init(Q = #amqqueue { name = QName }) ->
+init(Q) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
ok = file_handle_cache:register_callback(
rabbit_amqqueue, set_maximum_since_use, [self()]),
ok = rabbit_memory_monitor:register
(self(), {rabbit_amqqueue, set_queue_duration, [self()]}),
- VQS = rabbit_variable_queue:init(QName),
{ok, #q{q = Q,
owner = none,
exclusive_consumer = none,
has_had_consumers = false,
- variable_queue_state = VQS,
+ variable_queue_state = undefined,
next_msg_id = 1,
active_consumers = queue:new(),
blocked_consumers = queue:new(),
@@ -127,25 +126,37 @@ init(Q = #amqqueue { name = QName }) ->
terminate(shutdown, #q{variable_queue_state = VQS}) ->
ok = rabbit_memory_monitor:deregister(self()),
- _VQS = rabbit_variable_queue:terminate(VQS);
+ case VQS of
+ undefined -> ok;
+ _ -> rabbit_variable_queue:terminate(VQS)
+ end;
terminate({shutdown, _}, #q{variable_queue_state = VQS}) ->
ok = rabbit_memory_monitor:deregister(self()),
- _VQS = rabbit_variable_queue:terminate(VQS);
+ case VQS of
+ undefined -> ok;
+ _ -> rabbit_variable_queue:terminate(VQS)
+ end;
terminate(_Reason, State = #q{variable_queue_state = VQS}) ->
ok = rabbit_memory_monitor:deregister(self()),
%% FIXME: How do we cancel active subscriptions?
%% Ensure that any persisted tx messages are removed.
%% TODO: wait for all in flight tx_commits to complete
- VQS1 = rabbit_variable_queue:tx_rollback(
- lists:concat([PM || #tx { pending_messages = PM } <-
- all_tx_record()]), VQS),
- %% Delete from disk first. If we crash at this point, when a
- %% durable queue, we will be recreated at startup, possibly with
- %% partial content. The alternative is much worse however - if we
- %% called internal_delete first, we would then have a race between
- %% the disk delete and a new queue with the same name being
- %% created and published to.
- _VQS = rabbit_variable_queue:delete_and_terminate(VQS1),
+ case VQS of
+ undefined ->
+ ok;
+ _ ->
+ VQS1 = rabbit_variable_queue:tx_rollback(
+ lists:concat([PM || #tx { pending_messages = PM } <-
+ all_tx_record()]), VQS),
+ %% Delete from disk first. If we crash at this point, when
+ %% a durable queue, we will be recreated at startup,
+ %% possibly with partial content. The alternative is much
+ %% worse however - if we called internal_delete first, we
+ %% would then have a race between the disk delete and a
+ %% new queue with the same name being created and
+ %% published to.
+ rabbit_variable_queue:delete_and_terminate(VQS1)
+ end,
ok = rabbit_amqqueue:internal_delete(qname(State)).
code_change(_OldVsn, State, _Extra) ->
@@ -610,6 +621,9 @@ i(Item, _) ->
%---------------------------------------------------------------------------
+handle_call(sync, _From, State) ->
+ reply(ok, State);
+
handle_call(info, _From, State) ->
reply(infos(?INFO_KEYS, State), State);
@@ -815,6 +829,11 @@ handle_call({claim_queue, ReaderPid}, _From,
reply(locked, State)
end.
+handle_cast(init_variable_queue, #q{variable_queue_state = undefined,
+ q = #amqqueue{name = QName}} = State) ->
+ noreply(
+ State #q { variable_queue_state = rabbit_variable_queue:init(QName) });
+
handle_cast({deliver, Txn, Message, ChPid}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
{_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 81cecb38f3..3bc35ca2be 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -59,6 +59,7 @@
-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
+-export([recursive_delete/1]).
-import(mnesia).
-import(lists).
@@ -133,6 +134,7 @@
-spec(sort_field_table/1 :: (amqp_table()) -> amqp_table()).
-spec(pid_to_string/1 :: (pid()) -> string()).
-spec(string_to_pid/1 :: (string()) -> pid()).
+-spec(recursive_delete/1 :: (string()) -> 'ok' | {'error', any()}).
-endif.
@@ -601,3 +603,25 @@ version_compare(A, B) ->
ANum < BNum -> lt;
ANum > BNum -> gt
end.
+
+recursive_delete(Path) ->
+ case filelib:is_dir(Path) of
+ false ->
+ case file:delete(Path) of
+ ok -> ok;
+ %% Path doesn't exist anyway
+ {error, enoent} -> ok
+ end;
+ true ->
+ case file:list_dir(Path) of
+ {ok, FileNames} ->
+ lists:foldl(
+ fun (FileName, ok) ->
+ recursive_delete(filename:join(Path, FileName));
+ (_FileName, Error) ->
+ Error
+ end, ok, FileNames);
+ {error, Error} ->
+ {error, {Path, Error}}
+ end
+ end.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index a33b1a3470..5610b35e34 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -34,7 +34,7 @@
-behaviour(gen_server2).
-export([start_link/4, write/4, read/3, contains/2, remove/2, release/2,
- sync/3, client_init/1, client_terminate/1]).
+ sync/3, client_init/1, client_terminate/1, clean/2]).
-export([sync/1, gc_done/4, set_maximum_since_use/2]). %% internal
@@ -113,6 +113,7 @@
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
-spec(client_init/1 :: (server()) -> client_msstate()).
-spec(client_terminate/1 :: (client_msstate()) -> 'ok').
+-spec(clean/2 :: (atom(), file_path()) -> 'ok').
-endif.
@@ -340,6 +341,10 @@ client_terminate(CState) ->
close_all_handles(CState),
ok.
+clean(Server, BaseDir) ->
+ Dir = filename:join(BaseDir, atom_to_list(Server)),
+ ok = rabbit_misc:recursive_delete(Dir).
+
%%----------------------------------------------------------------------------
%% Client-side-only helpers
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index c0c1b40bd5..935f275451 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -37,6 +37,8 @@
find_lowest_seq_id_seg_and_next_seq_id/1,
start_persistent_msg_store/1]).
+-export([queue_index_walker_reader/3]). %% for internal use only
+
-define(CLEAN_FILENAME, "clean.dot").
%%----------------------------------------------------------------------------
@@ -419,30 +421,49 @@ start_persistent_msg_store(DurableQueues) ->
%% Msg Store Startup Delta Function
%%----------------------------------------------------------------------------
-queue_index_walker([]) ->
- finished;
-queue_index_walker([QueueName|QueueNames]) ->
+queue_index_walker(DurableQueues) when is_list(DurableQueues) ->
+ queue_index_walker({DurableQueues, sets:new()});
+
+queue_index_walker({[], Kids}) ->
+ case sets:size(Kids) of
+ 0 -> finished;
+ _ -> receive
+ {found, MsgId, Count} ->
+ {MsgId, Count, {[], Kids}};
+ {finished, Child} ->
+ queue_index_walker({[], sets:del_element(Child, Kids)})
+ end
+ end;
+queue_index_walker({[QueueName | QueueNames], Kids}) ->
+ Child = make_ref(),
+ ok = worker_pool:submit_async({?MODULE, queue_index_walker_reader,
+ [QueueName, self(), Child]}),
+ queue_index_walker({QueueNames, sets:add_element(Child, Kids)}).
+
+queue_index_walker_reader(QueueName, Parent, Guid) ->
State = blank_state(QueueName),
State1 = load_journal(State),
SegNums = all_segment_nums(State1),
- queue_index_walker({SegNums, State1, QueueNames});
+ queue_index_walker_reader(Parent, Guid, State1, SegNums).
-queue_index_walker({[], State, QueueNames}) ->
+queue_index_walker_reader(Parent, Guid, State, []) ->
_State = terminate(false, State),
- queue_index_walker(QueueNames);
-queue_index_walker({[Seg | SegNums], State, QueueNames}) ->
+ Parent ! {finished, Guid};
+queue_index_walker_reader(Parent, Guid, State, [Seg | SegNums]) ->
SeqId = reconstruct_seq_id(Seg, 0),
{Messages, State1} = read_segment_entries(SeqId, State),
- queue_index_walker({Messages, State1, SegNums, QueueNames});
+ queue_index_walker_reader(Parent, Guid, SegNums, State1, Messages).
-queue_index_walker({[], State, SegNums, QueueNames}) ->
- queue_index_walker({SegNums, State, QueueNames});
-queue_index_walker({[{MsgId, _SeqId, IsPersistent, _IsDelivered} | Msgs],
- State, SegNums, QueueNames}) ->
+queue_index_walker_reader(Parent, Guid, SegNums, State, []) ->
+ queue_index_walker_reader(Parent, Guid, State, SegNums);
+queue_index_walker_reader(
+ Parent, Guid, SegNums, State,
+ [{MsgId, _SeqId, IsPersistent, _IsDelivered} | Msgs]) ->
case IsPersistent of
- true -> {MsgId, 1, {Msgs, State, SegNums, QueueNames}};
- false -> queue_index_walker({Msgs, State, SegNums, QueueNames})
- end.
+ true -> Parent ! {found, MsgId, 1};
+ false -> ok
+ end,
+ queue_index_walker_reader(Parent, Guid, SegNums, State, Msgs).
%%----------------------------------------------------------------------------
%% Minors
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 3ccb83b6f4..474afbcafb 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1001,6 +1001,7 @@ start_msg_store(MsgRefDeltaGen, MsgRefDeltaGenInit) ->
start_transient_msg_store().
start_transient_msg_store() ->
+ ok = rabbit_msg_store:clean(?TRANSIENT_MSG_STORE, rabbit_mnesia:dir()),
ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store,
[?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(),
fun (ok) -> finished end, ok]).