diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_msg_store.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 96 |
3 files changed, 70 insertions, 46 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 8e2b1c0d49..f033dea5f0 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -718,7 +718,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> Dir = filename:join(BaseDir, atom_to_list(Server)), - {ok, IndexModule} = application:get_env(msg_store_index_module), + {ok, IndexModule} = application:get_env(rabbit,msg_store_index_module), rabbit_log:info("~w: using ~p to provide index~n", [Server, IndexModule]), AttemptFileSummaryRecovery = @@ -758,7 +758,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> DyingIndex = ets:new(rabbit_msg_store_dying_client_index, [set, public, {keypos, #dying_client.client_ref}]), - {ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit), + {ok, FileSizeLimit} = application:get_env(rabbit,msg_store_file_size_limit), {ok, GCPid} = rabbit_msg_store_gc:start_link( #gc_state { dir = Dir, diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 8b96bbffbd..3e9d92f5c1 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -23,6 +23,7 @@ read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]). -export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]). +-export([scan_queue_segments/3]). -define(CLEAN_FILENAME, "clean.dot"). @@ -660,20 +661,19 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) -> end. queue_index_walker_reader(QueueName, Gatherer) -> - State = blank_state(QueueName), - ok = scan_segments( + ok = scan_queue_segments( fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) when is_binary(MsgId) -> gatherer:sync_in(Gatherer, {MsgId, 1}); (_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered, _IsAcked, Acc) -> Acc - end, ok, State), + end, ok, QueueName), ok = gatherer:finish(Gatherer). -scan_segments(Fun, Acc, State) -> - State1 = #qistate { segments = Segments, dir = Dir } = - recover_journal(State), +scan_queue_segments(Fun, Acc, QueueName) -> + State = #qistate { segments = Segments, dir = Dir } = + recover_journal(blank_state(QueueName)), Result = lists:foldr( fun (Seg, AccN) -> segment_entries_foldr( @@ -682,8 +682,8 @@ scan_segments(Fun, Acc, State) -> Fun(reconstruct_seq_id(Seg, RelSeq), MsgOrId, MsgProps, IsPersistent, IsDelivered, IsAcked, AccM) end, AccN, segment_find_or_new(Seg, Dir, Segments)) - end, Acc, all_segment_nums(State1)), - {_SegmentCounts, _State} = terminate(State1), + end, Acc, all_segment_nums(State)), + {_SegmentCounts, _State} = terminate(State), Result. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 01c412fd60..8460e7ffa9 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -33,6 +33,9 @@ %% exported for testing only -export([start_msg_store/2, stop_msg_store/0, init/6]). +-export([move_messages_to_vhost_store/0]). +-include_lib("stdlib/include/qlc.hrl"). + %%---------------------------------------------------------------------------- %% Messages, and their position in the queue, can be in memory or on %% disk, or both. Persistent messages will have both message and @@ -334,8 +337,11 @@ }). -define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2 +-define(PERSISTENT_MSG_STORE_SUP, msg_store_persistent_vhost). +-define(TRANSIENT_MSG_STORE_SUP, msg_store_transient_vhost). -define(PERSISTENT_MSG_STORE, msg_store_persistent). -define(TRANSIENT_MSG_STORE, msg_store_transient). + -define(QUEUE, lqueue). -include("rabbit.hrl"). @@ -344,7 +350,8 @@ %%---------------------------------------------------------------------------- -rabbit_upgrade({multiple_routing_keys, local, []}). --rabbit_upgrade({move_messages_to_vhost_store, local, []}). +% -rabbit_upgrade({move_messages_to_vhost_store, local, []}). requires mnesia, requires rabbit_sup, requires worker_pool, requires fhc +-compile(export_all). -type seq_id() :: non_neg_integer(). @@ -452,6 +459,8 @@ %% Public API %%---------------------------------------------------------------------------- + + start(DurableQueues) -> {AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues), start_msg_store( @@ -470,23 +479,23 @@ stop() -> start_msg_store(Refs, StartFunState) -> VHosts = rabbit_vhost:list(), - ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store_vhost_sup, - [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(), + ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup, + [?TRANSIENT_MSG_STORE_SUP, rabbit_mnesia:dir(), undefined, {fun (ok) -> finished end, ok}]), - ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store_vhost_sup, - [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(), + ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup, + [?PERSISTENT_MSG_STORE_SUP, rabbit_mnesia:dir(), Refs, StartFunState]), lists:foreach( fun(VHost) -> - rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE, VHost), - rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE, VHost) + rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE_SUP, VHost), + rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE_SUP, VHost) end, VHosts), ok. stop_msg_store() -> - ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), - ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). + ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE_SUP), + ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE_SUP). init(Queue, Recover, Callback) -> init( @@ -504,11 +513,11 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new, VHost = QueueName#resource.virtual_host, init(IsDurable, IndexState, 0, 0, [], case IsDurable of - true -> msg_store_client_init(?PERSISTENT_MSG_STORE, + true -> msg_store_client_init(?PERSISTENT_MSG_STORE_SUP, MsgOnDiskFun, AsyncCallback, VHost); false -> undefined end, - msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, + msg_store_client_init(?TRANSIENT_MSG_STORE_SUP, undefined, AsyncCallback, VHost)); %% We can be recovering a transient queue if it crashed @@ -518,7 +527,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, VHost = QueueName#resource.virtual_host, {PersistentClient, ContainsCheckFun} = case IsDurable of - true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef, + true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE_SUP, PRef, MsgOnDiskFun, AsyncCallback, VHost), {C, fun (MsgId) when is_binary(MsgId) -> @@ -528,14 +537,14 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms, end}; false -> {undefined, fun(_MsgId) -> false end} end, - TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, + TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE_SUP, undefined, AsyncCallback, VHost), {DeltaCount, DeltaBytes, IndexState} = rabbit_queue_index:recover( QueueName, RecoveryTerms, rabbit_msg_store_vhost_sup:successfully_recovered_state( - ?PERSISTENT_MSG_STORE, VHost), + ?PERSISTENT_MSG_STORE_SUP, VHost), ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms, PersistentClient, TransientClient). @@ -1208,7 +1217,7 @@ msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) -> Callback, VHost). msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) -> - CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE), + CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE_SUP), rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, MsgOnDiskFun, fun () -> Callback(?MODULE, CloseFDsFun) @@ -2666,23 +2675,26 @@ multiple_routing_keys() -> %% Assumes message store is not running transform_storage(TransformFun) -> - transform_store(?PERSISTENT_MSG_STORE, TransformFun), - transform_store(?TRANSIENT_MSG_STORE, TransformFun). + transform_store(?PERSISTENT_MSG_STORE_SUP, TransformFun), + transform_store(?TRANSIENT_MSG_STORE_SUP, TransformFun). transform_store(Store, TransformFun) -> rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store), rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun). move_messages_to_vhost_store() -> - Queues = list_persistent_queues(), + Queues = rabbit_variable_queue:list_persistent_queues(), % Maybe recover old store. - {RecoveryTerms, StartFunState} = start_recovery_terms(Queues), - OldStore = run_old_persistent_store(RecoveryTerms, StartFunState), - NewStoreSup = start_new_store_sup(), - Migrations = spawn_for_each(fun(Queue) -> + {RecoveryTerms, StartFunState} = rabbit_variable_queue:start_recovery_terms(Queues), + OldStore = rabbit_variable_queue:run_old_persistent_store(RecoveryTerms, StartFunState), + NewStoreSup = rabbit_variable_queue:start_new_store_sup(), + lists:map(fun(Queue) -> migrate_queue(Queue, OldStore, NewStoreSup) end, Queues), - wait(Migrations), + % Migrations = spawn_for_each(fun(Queue) -> + % migrate_queue(Queue, OldStore, NewStoreSup) + % end, Queues), + % wait(Migrations), delete_old_store(OldStore). migrate_queue(Queue, OldStore, NewStoreSup) -> @@ -2697,8 +2709,19 @@ migrate_queue(Queue, OldStore, NewStoreSup) -> _ -> OldC end end, + OldStoreClient, Queue). +walk_queue_index(Fun, Client, #amqqueue{name = QueueName}) -> + % WARNING: State is being recovered and terminated. This can cause side effects! + rabbit_queue_index:scan_queue_segments( + fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, _IsAcked, ClientState) + when is_binary(MsgId) -> + Fun(MsgId, ClientState); + (_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered, _IsAcked, ClientState) -> + ClientState + end, Client, QueueName). + spawn_for_each(Fun, List) -> Ref = erlang:make_ref(), Self = self(), @@ -2752,7 +2775,8 @@ list_persistent_queues() -> end). start_recovery_terms(Queues) -> - {AllTerms, StartFunState} = rabbit_queue_index:start(Queues), + QueueNames = [Name || #amqqueue{name = Name} <- Queues], + {AllTerms, StartFunState} = rabbit_queue_index:start(QueueNames), Refs = [Ref || Terms <- AllTerms, Terms /= non_clean_shutdown, begin @@ -2762,30 +2786,30 @@ start_recovery_terms(Queues) -> {Refs, StartFunState}. run_old_persistent_store(Refs, StartFunState) -> - OldStoreName = old_persistent_msg_store, + OldStoreName = ?PERSISTENT_MSG_STORE, ok = rabbit_sup:start_child(OldStoreName, rabbit_msg_store, [OldStoreName, rabbit_mnesia:dir(), Refs, StartFunState]), OldStoreName. -run_persistent_store(Vhost) -> - - - ?PERSISTENT_MSG_STORE. - start_new_store_sup() -> % Start persistent store sup without recovery. - ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store_vhost_sup, - [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(), + ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup, + [?PERSISTENT_MSG_STORE_SUP, rabbit_mnesia:dir(), undefined, {fun (ok) -> finished end, ok}]), - ?PERSISTENT_MSG_STORE. + ?PERSISTENT_MSG_STORE_SUP. delete_old_store(OldStore) -> gen_server:stop(OldStore), - rabbit_file:recursive_delete( - filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])). - + rabbit_file:recursive_delete([filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])]). +setup() -> + application:load(rabbit), + mnesia:start(), + rabbit_sup:start_link(), + rabbit:start_fhc(), + rabbit_sup:start_restartable_child(rabbit_guid), + rabbit_sup:start_supervisor_child(worker_pool_sup). |
