summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_msg_store.erl4
-rw-r--r--src/rabbit_queue_index.erl16
-rw-r--r--src/rabbit_variable_queue.erl96
3 files changed, 70 insertions, 46 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 8e2b1c0d49..f033dea5f0 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -718,7 +718,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
Dir = filename:join(BaseDir, atom_to_list(Server)),
- {ok, IndexModule} = application:get_env(msg_store_index_module),
+ {ok, IndexModule} = application:get_env(rabbit,msg_store_index_module),
rabbit_log:info("~w: using ~p to provide index~n", [Server, IndexModule]),
AttemptFileSummaryRecovery =
@@ -758,7 +758,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
DyingIndex = ets:new(rabbit_msg_store_dying_client_index,
[set, public, {keypos, #dying_client.client_ref}]),
- {ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit),
+ {ok, FileSizeLimit} = application:get_env(rabbit,msg_store_file_size_limit),
{ok, GCPid} = rabbit_msg_store_gc:start_link(
#gc_state { dir = Dir,
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 8b96bbffbd..3e9d92f5c1 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -23,6 +23,7 @@
read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]).
-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]).
+-export([scan_queue_segments/3]).
-define(CLEAN_FILENAME, "clean.dot").
@@ -660,20 +661,19 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
end.
queue_index_walker_reader(QueueName, Gatherer) ->
- State = blank_state(QueueName),
- ok = scan_segments(
+ ok = scan_queue_segments(
fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok)
when is_binary(MsgId) ->
gatherer:sync_in(Gatherer, {MsgId, 1});
(_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered,
_IsAcked, Acc) ->
Acc
- end, ok, State),
+ end, ok, QueueName),
ok = gatherer:finish(Gatherer).
-scan_segments(Fun, Acc, State) ->
- State1 = #qistate { segments = Segments, dir = Dir } =
- recover_journal(State),
+scan_queue_segments(Fun, Acc, QueueName) ->
+ State = #qistate { segments = Segments, dir = Dir } =
+ recover_journal(blank_state(QueueName)),
Result = lists:foldr(
fun (Seg, AccN) ->
segment_entries_foldr(
@@ -682,8 +682,8 @@ scan_segments(Fun, Acc, State) ->
Fun(reconstruct_seq_id(Seg, RelSeq), MsgOrId, MsgProps,
IsPersistent, IsDelivered, IsAcked, AccM)
end, AccN, segment_find_or_new(Seg, Dir, Segments))
- end, Acc, all_segment_nums(State1)),
- {_SegmentCounts, _State} = terminate(State1),
+ end, Acc, all_segment_nums(State)),
+ {_SegmentCounts, _State} = terminate(State),
Result.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 01c412fd60..8460e7ffa9 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -33,6 +33,9 @@
%% exported for testing only
-export([start_msg_store/2, stop_msg_store/0, init/6]).
+-export([move_messages_to_vhost_store/0]).
+-include_lib("stdlib/include/qlc.hrl").
+
%%----------------------------------------------------------------------------
%% Messages, and their position in the queue, can be in memory or on
%% disk, or both. Persistent messages will have both message and
@@ -334,8 +337,11 @@
}).
-define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2
+-define(PERSISTENT_MSG_STORE_SUP, msg_store_persistent_vhost).
+-define(TRANSIENT_MSG_STORE_SUP, msg_store_transient_vhost).
-define(PERSISTENT_MSG_STORE, msg_store_persistent).
-define(TRANSIENT_MSG_STORE, msg_store_transient).
+
-define(QUEUE, lqueue).
-include("rabbit.hrl").
@@ -344,7 +350,8 @@
%%----------------------------------------------------------------------------
-rabbit_upgrade({multiple_routing_keys, local, []}).
--rabbit_upgrade({move_messages_to_vhost_store, local, []}).
+% -rabbit_upgrade({move_messages_to_vhost_store, local, []}). requires mnesia, requires rabbit_sup, requires worker_pool, requires fhc
+-compile(export_all).
-type seq_id() :: non_neg_integer().
@@ -452,6 +459,8 @@
%% Public API
%%----------------------------------------------------------------------------
+
+
start(DurableQueues) ->
{AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues),
start_msg_store(
@@ -470,23 +479,23 @@ stop() ->
start_msg_store(Refs, StartFunState) ->
VHosts = rabbit_vhost:list(),
- ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store_vhost_sup,
- [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(),
+ ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup,
+ [?TRANSIENT_MSG_STORE_SUP, rabbit_mnesia:dir(),
undefined, {fun (ok) -> finished end, ok}]),
- ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store_vhost_sup,
- [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(),
+ ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup,
+ [?PERSISTENT_MSG_STORE_SUP, rabbit_mnesia:dir(),
Refs, StartFunState]),
lists:foreach(
fun(VHost) ->
- rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE, VHost),
- rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE, VHost)
+ rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE_SUP, VHost),
+ rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE_SUP, VHost)
end,
VHosts),
ok.
stop_msg_store() ->
- ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
- ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
+ ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE_SUP),
+ ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE_SUP).
init(Queue, Recover, Callback) ->
init(
@@ -504,11 +513,11 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, new,
VHost = QueueName#resource.virtual_host,
init(IsDurable, IndexState, 0, 0, [],
case IsDurable of
- true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
+ true -> msg_store_client_init(?PERSISTENT_MSG_STORE_SUP,
MsgOnDiskFun, AsyncCallback, VHost);
false -> undefined
end,
- msg_store_client_init(?TRANSIENT_MSG_STORE, undefined,
+ msg_store_client_init(?TRANSIENT_MSG_STORE_SUP, undefined,
AsyncCallback, VHost));
%% We can be recovering a transient queue if it crashed
@@ -518,7 +527,7 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
VHost = QueueName#resource.virtual_host,
{PersistentClient, ContainsCheckFun} =
case IsDurable of
- true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
+ true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE_SUP, PRef,
MsgOnDiskFun, AsyncCallback,
VHost),
{C, fun (MsgId) when is_binary(MsgId) ->
@@ -528,14 +537,14 @@ init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
end};
false -> {undefined, fun(_MsgId) -> false end}
end,
- TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
+ TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE_SUP,
undefined, AsyncCallback,
VHost),
{DeltaCount, DeltaBytes, IndexState} =
rabbit_queue_index:recover(
QueueName, RecoveryTerms,
rabbit_msg_store_vhost_sup:successfully_recovered_state(
- ?PERSISTENT_MSG_STORE, VHost),
+ ?PERSISTENT_MSG_STORE_SUP, VHost),
ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
PersistentClient, TransientClient).
@@ -1208,7 +1217,7 @@ msg_store_client_init(MsgStore, MsgOnDiskFun, Callback, VHost) ->
Callback, VHost).
msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback, VHost) ->
- CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
+ CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE_SUP),
rabbit_msg_store_vhost_sup:client_init(MsgStore, Ref, MsgOnDiskFun,
fun () ->
Callback(?MODULE, CloseFDsFun)
@@ -2666,23 +2675,26 @@ multiple_routing_keys() ->
%% Assumes message store is not running
transform_storage(TransformFun) ->
- transform_store(?PERSISTENT_MSG_STORE, TransformFun),
- transform_store(?TRANSIENT_MSG_STORE, TransformFun).
+ transform_store(?PERSISTENT_MSG_STORE_SUP, TransformFun),
+ transform_store(?TRANSIENT_MSG_STORE_SUP, TransformFun).
transform_store(Store, TransformFun) ->
rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store),
rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun).
move_messages_to_vhost_store() ->
- Queues = list_persistent_queues(),
+ Queues = rabbit_variable_queue:list_persistent_queues(),
% Maybe recover old store.
- {RecoveryTerms, StartFunState} = start_recovery_terms(Queues),
- OldStore = run_old_persistent_store(RecoveryTerms, StartFunState),
- NewStoreSup = start_new_store_sup(),
- Migrations = spawn_for_each(fun(Queue) ->
+ {RecoveryTerms, StartFunState} = rabbit_variable_queue:start_recovery_terms(Queues),
+ OldStore = rabbit_variable_queue:run_old_persistent_store(RecoveryTerms, StartFunState),
+ NewStoreSup = rabbit_variable_queue:start_new_store_sup(),
+ lists:map(fun(Queue) ->
migrate_queue(Queue, OldStore, NewStoreSup)
end, Queues),
- wait(Migrations),
+ % Migrations = spawn_for_each(fun(Queue) ->
+ % migrate_queue(Queue, OldStore, NewStoreSup)
+ % end, Queues),
+ % wait(Migrations),
delete_old_store(OldStore).
migrate_queue(Queue, OldStore, NewStoreSup) ->
@@ -2697,8 +2709,19 @@ migrate_queue(Queue, OldStore, NewStoreSup) ->
_ -> OldC
end
end,
+ OldStoreClient,
Queue).
+walk_queue_index(Fun, Client, #amqqueue{name = QueueName}) ->
+ % WARNING: State is being recovered and terminated. This can cause side effects!
+ rabbit_queue_index:scan_queue_segments(
+ fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, _IsAcked, ClientState)
+ when is_binary(MsgId) ->
+ Fun(MsgId, ClientState);
+ (_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered, _IsAcked, ClientState) ->
+ ClientState
+ end, Client, QueueName).
+
spawn_for_each(Fun, List) ->
Ref = erlang:make_ref(),
Self = self(),
@@ -2752,7 +2775,8 @@ list_persistent_queues() ->
end).
start_recovery_terms(Queues) ->
- {AllTerms, StartFunState} = rabbit_queue_index:start(Queues),
+ QueueNames = [Name || #amqqueue{name = Name} <- Queues],
+ {AllTerms, StartFunState} = rabbit_queue_index:start(QueueNames),
Refs = [Ref || Terms <- AllTerms,
Terms /= non_clean_shutdown,
begin
@@ -2762,30 +2786,30 @@ start_recovery_terms(Queues) ->
{Refs, StartFunState}.
run_old_persistent_store(Refs, StartFunState) ->
- OldStoreName = old_persistent_msg_store,
+ OldStoreName = ?PERSISTENT_MSG_STORE,
ok = rabbit_sup:start_child(OldStoreName, rabbit_msg_store,
[OldStoreName, rabbit_mnesia:dir(),
Refs, StartFunState]),
OldStoreName.
-run_persistent_store(Vhost) ->
-
-
- ?PERSISTENT_MSG_STORE.
-
start_new_store_sup() ->
% Start persistent store sup without recovery.
- ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store_vhost_sup,
- [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(),
+ ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE_SUP, rabbit_msg_store_vhost_sup,
+ [?PERSISTENT_MSG_STORE_SUP, rabbit_mnesia:dir(),
undefined, {fun (ok) -> finished end, ok}]),
- ?PERSISTENT_MSG_STORE.
+ ?PERSISTENT_MSG_STORE_SUP.
delete_old_store(OldStore) ->
gen_server:stop(OldStore),
- rabbit_file:recursive_delete(
- filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])).
-
+ rabbit_file:recursive_delete([filename:join([rabbit_mnesia:dir(), ?PERSISTENT_MSG_STORE])]).
+setup() ->
+ application:load(rabbit),
+ mnesia:start(),
+ rabbit_sup:start_link(),
+ rabbit:start_fhc(),
+ rabbit_sup:start_restartable_child(rabbit_guid),
+ rabbit_sup:start_supervisor_child(worker_pool_sup).