diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 50 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 49 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 24 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 51 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 1 |
6 files changed, 130 insertions, 52 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index f0540c93a2..b6e92e0698 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -128,6 +128,7 @@ %%---------------------------------------------------------------------------- start() -> + ok = rabbit_msg_store:clean(?TRANSIENT_MSG_STORE, rabbit_mnesia:dir()), ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store, [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), fun (ok) -> finished end, ok]), @@ -152,26 +153,32 @@ find_durable_queues() -> end). recover_durable_queues(DurableQueues) -> - lists:foldl( - fun (RecoveredQ, Acc) -> - Q = start_queue_process(RecoveredQ), - %% We need to catch the case where a client connected to - %% another node has deleted the queue (and possibly - %% re-created it). - case rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:match_object( - rabbit_durable_queue, RecoveredQ, read) of - [_] -> ok = store_queue(Q), - true; - [] -> false - end - end) of - true -> [Q|Acc]; - false -> exit(Q#amqqueue.pid, shutdown), - Acc - end - end, [], DurableQueues). + Qs = lists:foldl( + fun (RecoveredQ, Acc) -> + Q = start_queue_process(RecoveredQ), + %% We need to catch the case where a client + %% connected to another node has deleted the queue + %% (and possibly re-created it). + case rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:match_object( + rabbit_durable_queue, RecoveredQ, + read) of + [_] -> ok = store_queue(Q), + true; + [] -> false + end + end) of + true -> + ok = gen_server2:cast(Q#amqqueue.pid, + init_variable_queue), + [Q|Acc]; + false -> exit(Q#amqqueue.pid, shutdown), + Acc + end + end, [], DurableQueues), + [ok = gen_server2:call(Q#amqqueue.pid, sync, infinity) || Q <- Qs], + Qs. declare(QueueName, Durable, AutoDelete, Args) -> Q = start_queue_process(#amqqueue{name = QueueName, @@ -202,7 +209,8 @@ internal_declare(Q = #amqqueue{name = QueueName}, WantDefaultBinding) -> end) of not_found -> exit(Q#amqqueue.pid, shutdown), rabbit_misc:not_found(QueueName); - Q -> Q; + Q -> ok = gen_server2:cast(Q#amqqueue.pid, init_variable_queue), + Q; ExistingQ -> exit(Q#amqqueue.pid, shutdown), ExistingQ end. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 41435c08c9..e6c8d238f8 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -105,19 +105,18 @@ info_keys() -> ?INFO_KEYS. %%---------------------------------------------------------------------------- -init(Q = #amqqueue { name = QName }) -> +init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), process_flag(trap_exit, true), ok = file_handle_cache:register_callback( rabbit_amqqueue, set_maximum_since_use, [self()]), ok = rabbit_memory_monitor:register (self(), {rabbit_amqqueue, set_queue_duration, [self()]}), - VQS = rabbit_variable_queue:init(QName), {ok, #q{q = Q, owner = none, exclusive_consumer = none, has_had_consumers = false, - variable_queue_state = VQS, + variable_queue_state = undefined, next_msg_id = 1, active_consumers = queue:new(), blocked_consumers = queue:new(), @@ -127,25 +126,37 @@ init(Q = #amqqueue { name = QName }) -> terminate(shutdown, #q{variable_queue_state = VQS}) -> ok = rabbit_memory_monitor:deregister(self()), - _VQS = rabbit_variable_queue:terminate(VQS); + case VQS of + undefined -> ok; + _ -> rabbit_variable_queue:terminate(VQS) + end; terminate({shutdown, _}, #q{variable_queue_state = VQS}) -> ok = rabbit_memory_monitor:deregister(self()), - _VQS = rabbit_variable_queue:terminate(VQS); + case VQS of + undefined -> ok; + _ -> rabbit_variable_queue:terminate(VQS) + end; terminate(_Reason, State = #q{variable_queue_state = VQS}) -> ok = rabbit_memory_monitor:deregister(self()), %% FIXME: How do we cancel active subscriptions? %% Ensure that any persisted tx messages are removed. %% TODO: wait for all in flight tx_commits to complete - VQS1 = rabbit_variable_queue:tx_rollback( - lists:concat([PM || #tx { pending_messages = PM } <- - all_tx_record()]), VQS), - %% Delete from disk first. If we crash at this point, when a - %% durable queue, we will be recreated at startup, possibly with - %% partial content. The alternative is much worse however - if we - %% called internal_delete first, we would then have a race between - %% the disk delete and a new queue with the same name being - %% created and published to. - _VQS = rabbit_variable_queue:delete_and_terminate(VQS1), + case VQS of + undefined -> + ok; + _ -> + VQS1 = rabbit_variable_queue:tx_rollback( + lists:concat([PM || #tx { pending_messages = PM } <- + all_tx_record()]), VQS), + %% Delete from disk first. If we crash at this point, when + %% a durable queue, we will be recreated at startup, + %% possibly with partial content. The alternative is much + %% worse however - if we called internal_delete first, we + %% would then have a race between the disk delete and a + %% new queue with the same name being created and + %% published to. + rabbit_variable_queue:delete_and_terminate(VQS1) + end, ok = rabbit_amqqueue:internal_delete(qname(State)). code_change(_OldVsn, State, _Extra) -> @@ -610,6 +621,9 @@ i(Item, _) -> %--------------------------------------------------------------------------- +handle_call(sync, _From, State) -> + reply(ok, State); + handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State); @@ -815,6 +829,11 @@ handle_call({claim_queue, ReaderPid}, _From, reply(locked, State) end. +handle_cast(init_variable_queue, #q{variable_queue_state = undefined, + q = #amqqueue{name = QName}} = State) -> + noreply( + State #q { variable_queue_state = rabbit_variable_queue:init(QName) }); + handle_cast({deliver, Txn, Message, ChPid}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 81cecb38f3..3bc35ca2be 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -59,6 +59,7 @@ -export([sort_field_table/1]). -export([pid_to_string/1, string_to_pid/1]). -export([version_compare/2, version_compare/3]). +-export([recursive_delete/1]). -import(mnesia). -import(lists). @@ -133,6 +134,7 @@ -spec(sort_field_table/1 :: (amqp_table()) -> amqp_table()). -spec(pid_to_string/1 :: (pid()) -> string()). -spec(string_to_pid/1 :: (string()) -> pid()). +-spec(recursive_delete/1 :: (string()) -> 'ok' | {'error', any()}). -endif. @@ -601,3 +603,25 @@ version_compare(A, B) -> ANum < BNum -> lt; ANum > BNum -> gt end. + +recursive_delete(Path) -> + case filelib:is_dir(Path) of + false -> + case file:delete(Path) of + ok -> ok; + %% Path doesn't exist anyway + {error, enoent} -> ok + end; + true -> + case file:list_dir(Path) of + {ok, FileNames} -> + lists:foldl( + fun (FileName, ok) -> + recursive_delete(filename:join(Path, FileName)); + (_FileName, Error) -> + Error + end, ok, FileNames); + {error, Error} -> + {error, {Path, Error}} + end + end. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index a33b1a3470..5610b35e34 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -34,7 +34,7 @@ -behaviour(gen_server2). -export([start_link/4, write/4, read/3, contains/2, remove/2, release/2, - sync/3, client_init/1, client_terminate/1]). + sync/3, client_init/1, client_terminate/1, clean/2]). -export([sync/1, gc_done/4, set_maximum_since_use/2]). %% internal @@ -113,6 +113,7 @@ -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). -spec(client_init/1 :: (server()) -> client_msstate()). -spec(client_terminate/1 :: (client_msstate()) -> 'ok'). +-spec(clean/2 :: (atom(), file_path()) -> 'ok'). -endif. @@ -340,6 +341,10 @@ client_terminate(CState) -> close_all_handles(CState), ok. +clean(Server, BaseDir) -> + Dir = filename:join(BaseDir, atom_to_list(Server)), + ok = rabbit_misc:recursive_delete(Dir). + %%---------------------------------------------------------------------------- %% Client-side-only helpers %%---------------------------------------------------------------------------- diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index c0c1b40bd5..935f275451 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -37,6 +37,8 @@ find_lowest_seq_id_seg_and_next_seq_id/1, start_persistent_msg_store/1]). +-export([queue_index_walker_reader/3]). %% for internal use only + -define(CLEAN_FILENAME, "clean.dot"). %%---------------------------------------------------------------------------- @@ -419,30 +421,49 @@ start_persistent_msg_store(DurableQueues) -> %% Msg Store Startup Delta Function %%---------------------------------------------------------------------------- -queue_index_walker([]) -> - finished; -queue_index_walker([QueueName|QueueNames]) -> +queue_index_walker(DurableQueues) when is_list(DurableQueues) -> + queue_index_walker({DurableQueues, sets:new()}); + +queue_index_walker({[], Kids}) -> + case sets:size(Kids) of + 0 -> finished; + _ -> receive + {found, MsgId, Count} -> + {MsgId, Count, {[], Kids}}; + {finished, Child} -> + queue_index_walker({[], sets:del_element(Child, Kids)}) + end + end; +queue_index_walker({[QueueName | QueueNames], Kids}) -> + Child = make_ref(), + ok = worker_pool:submit_async({?MODULE, queue_index_walker_reader, + [QueueName, self(), Child]}), + queue_index_walker({QueueNames, sets:add_element(Child, Kids)}). + +queue_index_walker_reader(QueueName, Parent, Guid) -> State = blank_state(QueueName), State1 = load_journal(State), SegNums = all_segment_nums(State1), - queue_index_walker({SegNums, State1, QueueNames}); + queue_index_walker_reader(Parent, Guid, State1, SegNums). -queue_index_walker({[], State, QueueNames}) -> +queue_index_walker_reader(Parent, Guid, State, []) -> _State = terminate(false, State), - queue_index_walker(QueueNames); -queue_index_walker({[Seg | SegNums], State, QueueNames}) -> + Parent ! {finished, Guid}; +queue_index_walker_reader(Parent, Guid, State, [Seg | SegNums]) -> SeqId = reconstruct_seq_id(Seg, 0), {Messages, State1} = read_segment_entries(SeqId, State), - queue_index_walker({Messages, State1, SegNums, QueueNames}); + queue_index_walker_reader(Parent, Guid, SegNums, State1, Messages). -queue_index_walker({[], State, SegNums, QueueNames}) -> - queue_index_walker({SegNums, State, QueueNames}); -queue_index_walker({[{MsgId, _SeqId, IsPersistent, _IsDelivered} | Msgs], - State, SegNums, QueueNames}) -> +queue_index_walker_reader(Parent, Guid, SegNums, State, []) -> + queue_index_walker_reader(Parent, Guid, State, SegNums); +queue_index_walker_reader( + Parent, Guid, SegNums, State, + [{MsgId, _SeqId, IsPersistent, _IsDelivered} | Msgs]) -> case IsPersistent of - true -> {MsgId, 1, {Msgs, State, SegNums, QueueNames}}; - false -> queue_index_walker({Msgs, State, SegNums, QueueNames}) - end. + true -> Parent ! {found, MsgId, 1}; + false -> ok + end, + queue_index_walker_reader(Parent, Guid, SegNums, State, Msgs). %%---------------------------------------------------------------------------- %% Minors diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 3ccb83b6f4..474afbcafb 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1001,6 +1001,7 @@ start_msg_store(MsgRefDeltaGen, MsgRefDeltaGenInit) -> start_transient_msg_store(). start_transient_msg_store() -> + ok = rabbit_msg_store:clean(?TRANSIENT_MSG_STORE, rabbit_mnesia:dir()), ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store, [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), fun (ok) -> finished end, ok]). |
