summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/rabbitmqctl.849
-rw-r--r--src/rabbit_amqqueue.erl65
-rw-r--r--src/rabbit_control_pbe.erl105
-rw-r--r--src/rabbit_exchange_parameters.erl5
-rw-r--r--src/rabbit_looking_glass.erl2
-rw-r--r--src/rabbit_queue_index.erl111
-rw-r--r--src/rabbit_runtime_parameters.erl15
-rw-r--r--src/rabbit_vhost.erl2
-rw-r--r--test/cluster_SUITE.erl5
-rw-r--r--test/clustering_management_SUITE.erl1
-rw-r--r--test/unit_SUITE.erl36
11 files changed, 217 insertions, 179 deletions
diff --git a/docs/rabbitmqctl.8 b/docs/rabbitmqctl.8
index 6f8d40eaf2..7a8497a89b 100644
--- a/docs/rabbitmqctl.8
+++ b/docs/rabbitmqctl.8
@@ -1811,42 +1811,55 @@ floating point number.
Values lower than 1.0 can be dangerous and should be used carefully.
.El
.\" ------------------------------------
-.It Cm encode Oo Fl -decode Oc Oo Ar value Oc Oo Ar passphrase Oc Oo Fl -list-ciphers Oc Oo Fl -list-hashes Oc Oo Fl -cipher Ar cipher Oc Oo Fl -hash Ar hash Oc Op Fl -iterations Ar iterations
+.It Cm encode Ar value Ar passphrase Oo Fl -cipher Ar cipher Oc Oo Fl -hash Ar hash Oc Op Fl -iterations Ar iterations
.Bl -tag -width Ds
-.It Fl -decode
-Flag to decrypt the input value.
-.Pp
-For example:
-.sp
-.Dl rabbitmqctl encode --decode '{encrypted,<<"...">>}' mypassphrase
.It Ar value Ar passphrase
-Value to encrypt/decrypt and passphrase.
+Value to encrypt and passphrase.
.Pp
For example:
.sp
.Dl rabbitmqctl encode '<<"guest">>' mypassphrase
-.sp
-.Dl rabbitmqctl encode --decode '{encrypted,<<"...">>}' mypassphrase
-.It Fl -list-ciphers
-Flag to list the supported ciphers.
+.It Fl -cipher Ar cipher Fl -hash Ar hash Fl -iterations Ar iterations
+Options to specify the encryption settings.
+They can be used independently.
.Pp
For example:
.sp
-.Dl rabbitmqctl encode --list-ciphers
-.It Fl -list-hashes
-Flag to list the supported hash algorithms.
+.Dl rabbitmqctl encode --cipher blowfish_cfb64 --hash sha256 --iterations 10000 '<<"guest">>' mypassphrase
+.El
+.\" ------------------------------------
+.It Cm decode Ar value Ar passphrase Oo Fl -cipher Ar cipher Oc Oo Fl -hash Ar hash Oc Op Fl -iterations Ar iterations
+.Bl -tag -width Ds
+.It Ar value Ar passphrase
+Value to decrypt (as produced by the encode command) and passphrase.
.Pp
For example:
.sp
-.Dl rabbitmqctl encode --list-hashes
+.Dl rabbitmqctl decode '{encrypted, <<"...">>}' mypassphrase
.It Fl -cipher Ar cipher Fl -hash Ar hash Fl -iterations Ar iterations
-Options to specify the encryption settings.
+Options to specify the decryption settings.
They can be used independently.
.Pp
For example:
.sp
-.Dl rabbitmqctl encode --cipher blowfish_cfb64 --hash sha256 --iterations 10000 '<<"guest">>' mypassphrase
+.Dl rabbitmqctl decode --cipher blowfish_cfb64 --hash sha256 --iterations 10000 '{encrypted,<<"...">>} mypassphrase
.El
+.\" ------------------------------------
+.It Cm list_hashes
+Lists hash functions supported by encoding commands.
+.Pp
+For example, this command instructs the RabbitMQ broker to list all hash
+functions supported by encoding commands:
+.sp
+.Dl rabbitmqctl list_hashes
+.\" ------------------------------------
+.It Cm list_ciphers
+Lists cipher suites supported by encoding commands.
+.Pp
+For example, this command instructs the RabbitMQ broker to list all
+cipher suites supported by encoding commands:
+.sp
+.Dl rabbitmqctl list_ciphers
.El
.\" ------------------------------------------------------------------
.Sh PLUGIN COMMANDS
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 03dac03b7e..bd6199c166 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -633,12 +633,12 @@ info_keys() -> rabbit_amqqueue_process:info_keys().
map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs).
info(Q = #amqqueue{ state = crashed }) -> info_down(Q, crashed);
-info(#amqqueue{ pid = QPid }) -> delegate:call(QPid, info).
+info(#amqqueue{ pid = QPid }) -> delegate:invoke(QPid, {gen_server2, call, [info, infinity]}).
info(Q = #amqqueue{ state = crashed }, Items) ->
info_down(Q, Items, crashed);
info(#amqqueue{ pid = QPid }, Items) ->
- case delegate:call(QPid, {info, Items}) of
+ case delegate:invoke(QPid, {gen_server2, call, [{info, Items}, infinity]}) of
{ok, Res} -> Res;
{error, Error} -> throw(Error)
end.
@@ -699,7 +699,8 @@ force_event_refresh(Ref) ->
notify_policy_changed(#amqqueue{pid = QPid}) ->
gen_server2:cast(QPid, policy_changed).
-consumers(#amqqueue{ pid = QPid }) -> delegate:call(QPid, consumers).
+consumers(#amqqueue{ pid = QPid }) ->
+ delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}).
consumer_info_keys() -> ?CONSUMER_INFO_KEYS.
@@ -727,7 +728,7 @@ get_queue_consumer_info(Q, ConsumerInfoKeys) ->
AckRequired, Prefetch, Args]) ||
{ChPid, CTag, AckRequired, Prefetch, Args, _} <- consumers(Q)].
-stat(#amqqueue{pid = QPid}) -> delegate:call(QPid, stat).
+stat(#amqqueue{pid = QPid}) -> delegate:invoke(QPid, {gen_server2, call, [stat, infinity]}).
pid_of(#amqqueue{pid = Pid}) -> Pid.
pid_of(VHost, QueueName) ->
@@ -745,7 +746,7 @@ delete_immediately(QPids) ->
ok.
delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty, ActingUser) ->
- delegate:call(QPid, {delete, IfUnused, IfEmpty, ActingUser}).
+ delegate:invoke(QPid, {gen_server2, call, [{delete, IfUnused, IfEmpty, ActingUser}, infinity]}).
delete_crashed(Q) ->
delete_crashed(Q, ?INTERNAL_USER).
@@ -758,21 +759,24 @@ delete_crashed_internal(Q = #amqqueue{ name = QName }, ActingUser) ->
BQ:delete_crashed(Q),
ok = internal_delete(QName, ActingUser).
-purge(#amqqueue{ pid = QPid }) -> delegate:call(QPid, purge).
+purge(#amqqueue{ pid = QPid }) ->
+ delegate:invoke(QPid, {gen_server2, call, [purge, infinity]}).
-requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}).
+requeue(QPid, MsgIds, ChPid) ->
+ delegate:invoke(QPid, {gen_server2, call, [{requeue, MsgIds, ChPid}, infinity]}).
-ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}).
+ack(QPid, MsgIds, ChPid) ->
+ delegate:invoke_no_result(QPid, {gen_server2, cast, [{ack, MsgIds, ChPid}]}).
reject(QPid, Requeue, MsgIds, ChPid) ->
- delegate:cast(QPid, {reject, Requeue, MsgIds, ChPid}).
+ delegate:invoke_no_result(QPid, {gen_server2, cast, [{reject, Requeue, MsgIds, ChPid}]}).
notify_down_all(QPids, ChPid) ->
notify_down_all(QPids, ChPid, ?CHANNEL_OPERATION_TIMEOUT).
notify_down_all(QPids, ChPid, Timeout) ->
- case rpc:call(node(), delegate, call,
- [QPids, {notify_down, ChPid}], Timeout) of
+ case rpc:call(node(), delegate, invoke,
+ [QPids, {gen_server2, call, [{notify_down, ChPid}, infinity]}], Timeout) of
{badrpc, timeout} -> {error, {channel_operation_timeout, Timeout}};
{badrpc, Reason} -> {error, Reason};
{_, Bads} ->
@@ -788,27 +792,29 @@ notify_down_all(QPids, ChPid, Timeout) ->
end.
activate_limit_all(QPids, ChPid) ->
- delegate:cast(QPids, {activate_limit, ChPid}).
+ delegate:invoke_no_result(QPids, {gen_server2, cast, [{activate_limit, ChPid}]}).
credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) ->
- delegate:cast(QPid, {credit, ChPid, CTag, Credit, Drain}).
+ delegate:invoke_no_result(QPid, {gen_server2, cast, [{credit, ChPid, CTag, Credit, Drain}]}).
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) ->
- delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}).
+ delegate:invoke(QPid, {gen_server2, call, [{basic_get, ChPid, NoAck, LimiterPid}, infinity]}).
basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid, LimiterPid,
LimiterActive, ConsumerPrefetchCount, ConsumerTag,
ExclusiveConsume, Args, OkMsg, ActingUser) ->
ok = check_consume_arguments(QName, Args),
- delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume,
- Args, OkMsg, ActingUser}).
+ delegate:invoke(QPid, {gen_server2, call,
+ [{basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
+ ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume,
+ Args, OkMsg, ActingUser}, infinity]}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg, ActingUser) ->
- delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}).
+ delegate:invoke(QPid, {gen_server2, call,
+ [{basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}, infinity]}).
notify_decorators(#amqqueue{pid = QPid}) ->
- delegate:cast(QPid, notify_decorators).
+ delegate:invoke_no_result(QPid, {gen_server2, cast, [notify_decorators]}).
notify_sent(QPid, ChPid) ->
rabbit_amqqueue_common:notify_sent(QPid, ChPid).
@@ -816,7 +822,7 @@ notify_sent(QPid, ChPid) ->
notify_sent_queue_down(QPid) ->
rabbit_amqqueue_common:notify_sent_queue_down(QPid).
-resume(QPid, ChPid) -> delegate:cast(QPid, {resume, ChPid}).
+resume(QPid, ChPid) -> delegate:invoke_no_result(QPid, {gen_server2, cast, [{resume, ChPid}]}).
internal_delete1(QueueName, OnlyDurable) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
@@ -913,12 +919,17 @@ set_ram_duration_target(QPid, Duration) ->
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
-update_mirroring(QPid) -> ok = delegate:cast(QPid, update_mirroring).
+update_mirroring(QPid) ->
+ ok = delegate:invoke_no_result(QPid, {gen_server2, cast, [update_mirroring]}).
-sync_mirrors(#amqqueue{pid = QPid}) -> delegate:call(QPid, sync_mirrors);
-sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors).
-cancel_sync_mirrors(#amqqueue{pid = QPid}) -> delegate:call(QPid, cancel_sync_mirrors);
-cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mirrors).
+sync_mirrors(#amqqueue{pid = QPid}) ->
+ delegate:invoke(QPid, {gen_server2, call, [sync_mirrors, infinity]});
+sync_mirrors(QPid) ->
+ delegate:invoke(QPid, {gen_server2, call, [sync_mirrors, infinity]}).
+cancel_sync_mirrors(#amqqueue{pid = QPid}) ->
+ delegate:invoke(QPid, {gen_server2, call, [cancel_sync_mirrors, infinity]});
+cancel_sync_mirrors(QPid) ->
+ delegate:invoke(QPid, {gen_server2, call, [cancel_sync_mirrors, infinity]}).
is_mirrored(Q) ->
rabbit_mirror_queue_misc:is_mirrored(Q).
@@ -1037,8 +1048,8 @@ deliver(Qs, Delivery = #delivery{flow = Flow}) ->
%% done with it.
MMsg = {deliver, Delivery, false},
SMsg = {deliver, Delivery, true},
- delegate:cast(MPids, MMsg),
- delegate:cast(SPids, SMsg),
+ delegate:invoke_no_result(MPids, {gen_server2, cast, [MMsg]}),
+ delegate:invoke_no_result(SPids, {gen_server2, cast, [SMsg]}),
QPids.
qpids([]) -> {[], []}; %% optimisation
diff --git a/src/rabbit_control_pbe.erl b/src/rabbit_control_pbe.erl
index ff498ed4aa..40d8741d74 100644
--- a/src/rabbit_control_pbe.erl
+++ b/src/rabbit_control_pbe.erl
@@ -16,61 +16,74 @@
-module(rabbit_control_pbe).
--export([encode/7]).
+-export([decode/4, encode/4, list_ciphers/0, list_hashes/0]).
% for testing purposes
-export([evaluate_input_as_term/1]).
-encode(ListCiphers, _ListHashes, _Decode, _Cipher, _Hash, _Iterations, _Args) when ListCiphers ->
- {ok, io_lib:format("~p", [rabbit_pbe:supported_ciphers()])};
+list_ciphers() ->
+ {ok, io_lib:format("~p", [rabbit_pbe:supported_ciphers()])}.
-encode(_ListCiphers, ListHashes, _Decode, _Cipher, _Hash, _Iterations, _Args) when ListHashes ->
- {ok, io_lib:format("~p", [rabbit_pbe:supported_hashes()])};
+list_hashes() ->
+ {ok, io_lib:format("~p", [rabbit_pbe:supported_hashes()])}.
-encode(_ListCiphers, _ListHashes, Decode, Cipher, Hash, Iterations, Args) ->
- CipherExists = lists:member(Cipher, rabbit_pbe:supported_ciphers()),
- HashExists = lists:member(Hash, rabbit_pbe:supported_hashes()),
- encode_encrypt_decrypt(CipherExists, HashExists, Decode, Cipher, Hash, Iterations, Args).
-
-encode_encrypt_decrypt(CipherExists, _HashExists, _Decode, _Cipher, _Hash, _Iterations, _Args) when CipherExists =:= false ->
- {error, io_lib:format("The requested cipher is not supported", [])};
-
-encode_encrypt_decrypt(_CipherExists, HashExists, _Decode, _Cipher, _Hash, _Iterations, _Args) when HashExists =:= false ->
- {error, io_lib:format("The requested hash is not supported", [])};
-
-encode_encrypt_decrypt(_CipherExists, _HashExists, _Decode, _Cipher, _Hash, Iterations, _Args) when Iterations =< 0 ->
+validate(_Cipher, _Hash, Iterations, _Args) when Iterations =< 0 ->
{error, io_lib:format("The requested number of iterations is incorrect", [])};
+validate(_Cipher, _Hash, _Iterations, Args) when length(Args) < 2 ->
+ {error, io_lib:format("Please provide a value to encode/decode and a passphrase", [])};
+validate(_Cipher, _Hash, _Iterations, Args) when length(Args) > 2 ->
+ {error, io_lib:format("Too many arguments. Please provide a value to encode/decode and a passphrase", [])};
+validate(Cipher, Hash, _Iterations, _Args) ->
+ case lists:member(Cipher, rabbit_pbe:supported_ciphers()) of
+ false ->
+ {error, io_lib:format("The requested cipher is not supported", [])};
+ true ->
+ case lists:member(Hash, rabbit_pbe:supported_hashes()) of
+ false ->
+ {error, io_lib:format("The requested hash is not supported", [])};
+ true -> ok
+ end
+ end.
-encode_encrypt_decrypt(_CipherExists, _HashExists, Decode, Cipher, Hash, Iterations, Args) when length(Args) == 2, Decode =:= false ->
- [Value, PassPhrase] = Args,
- try begin
- TermValue = evaluate_input_as_term(Value),
- Result = rabbit_pbe:encrypt_term(Cipher, Hash, Iterations, list_to_binary(PassPhrase), TermValue),
- {ok, io_lib:format("~p", [{encrypted, Result}])}
- end
- catch
- _:Msg -> {error, io_lib:format("Error during cipher operation: ~p", [Msg])}
- end;
-
-encode_encrypt_decrypt(_CipherExists, _HashExists, Decode, Cipher, Hash, Iterations, Args) when length(Args) == 2, Decode ->
- [Value, PassPhrase] = Args,
- try begin
- TermValue = evaluate_input_as_term(Value),
- TermToDecrypt = case TermValue of
- {encrypted, EncryptedTerm} ->
- EncryptedTerm;
- _ ->
- TermValue
- end,
- Result = rabbit_pbe:decrypt_term(Cipher, Hash, Iterations, list_to_binary(PassPhrase), TermToDecrypt),
- {ok, io_lib:format("~p", [Result])}
- end
- catch
- _:Msg -> {error, io_lib:format("Error during cipher operation: ~p", [Msg])}
- end;
+encode(Cipher, Hash, Iterations, Args) ->
+ case validate(Cipher, Hash, Iterations, Args) of
+ {error, Err} -> {error, Err};
+ ok ->
+ [Value, PassPhrase] = Args,
+ try begin
+ TermValue = evaluate_input_as_term(Value),
+ Result = rabbit_pbe:encrypt_term(Cipher, Hash, Iterations,
+ list_to_binary(PassPhrase),
+ TermValue),
+ {ok, io_lib:format("~p", [{encrypted, Result}])}
+ end
+ catch
+ _:Msg -> {error, io_lib:format("Error during cipher operation: ~p", [Msg])}
+ end
+ end.
-encode_encrypt_decrypt(_CipherExists, _HashExists, _Decode, _Cipher, _Hash, _Iterations, _Args) ->
- {error, io_lib:format("Please provide a value to encode/decode and a passphrase", [])}.
+decode(Cipher, Hash, Iterations, Args) ->
+ case validate(Cipher, Hash, Iterations, Args) of
+ {error, Err} -> {error, Err};
+ ok ->
+ [Value, PassPhrase] = Args,
+ try begin
+ TermValue = evaluate_input_as_term(Value),
+ TermToDecrypt = case TermValue of
+ {encrypted, EncryptedTerm} ->
+ EncryptedTerm;
+ _ ->
+ TermValue
+ end,
+ Result = rabbit_pbe:decrypt_term(Cipher, Hash, Iterations,
+ list_to_binary(PassPhrase),
+ TermToDecrypt),
+ {ok, io_lib:format("~p", [Result])}
+ end
+ catch
+ _:Msg -> {error, io_lib:format("Error during cipher operation: ~p", [Msg])}
+ end
+ end.
evaluate_input_as_term(Input) ->
{ok,Tokens,_EndLine} = erl_scan:string(Input ++ "."),
diff --git a/src/rabbit_exchange_parameters.erl b/src/rabbit_exchange_parameters.erl
index feba5e255b..2c95a823ee 100644
--- a/src/rabbit_exchange_parameters.erl
+++ b/src/rabbit_exchange_parameters.erl
@@ -23,8 +23,6 @@
-export([register/0]).
-export([validate/5, notify/5, notify_clear/4]).
--import(rabbit_misc, [pget/2]).
-
-rabbit_boot_step({?MODULE,
[{description, "exchange parameters"},
{mfa, {rabbit_exchange_parameters, register, []}},
@@ -36,7 +34,8 @@ register() ->
?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, ?MODULE),
%% ensure there are no leftovers from before node restart/crash
rabbit_runtime_parameters:clear_component(
- ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT),
+ ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT,
+ ?INTERNAL_USER),
ok.
validate(_VHost, ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, _Name, _Term, _User) ->
diff --git a/src/rabbit_looking_glass.erl b/src/rabbit_looking_glass.erl
index 71d7b067b8..c6c353d552 100644
--- a/src/rabbit_looking_glass.erl
+++ b/src/rabbit_looking_glass.erl
@@ -16,6 +16,8 @@
-module(rabbit_looking_glass).
+-ignore_xref([{lg, trace, 4}]).
+
-export([boot/0]).
-export([connections/0]).
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index ee172d48e5..e9776f47ad 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -116,6 +116,7 @@
%% ---- Journal details ----
-define(JOURNAL_FILENAME, "journal.jif").
+-define(QUEUE_NAME_STUB_FILE, ".queue_name").
-define(PUB_PERSIST_JPREFIX, 2#00).
-define(PUB_TRANS_JPREFIX, 2#01).
@@ -204,7 +205,9 @@
%% optimisation
pre_publish_cache,
%% optimisation
- delivered_cache}).
+ delivered_cache,
+ %% queue name resource record
+ queue_name}).
-record(segment, {
%% segment ID (an integer)
@@ -295,7 +298,8 @@ erase(Name) ->
erase_index_dir(Dir).
%% used during variable queue purge when there are no pending acks
-reset_state(#qistate{ dir = Dir,
+reset_state(#qistate{ queue_name = Name,
+ dir = Dir,
on_sync = OnSyncFun,
on_sync_msg = OnSyncMsgFun,
journal_handle = JournalHdl }) ->
@@ -304,7 +308,7 @@ reset_state(#qistate{ dir = Dir,
_ -> file_handle_cache:close(JournalHdl)
end,
ok = erase_index_dir(Dir),
- blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun).
+ blank_state_name_dir_funs(Name, Dir, OnSyncFun, OnSyncMsgFun).
init(Name, OnSyncFun, OnSyncMsgFun) ->
State = #qistate { dir = Dir } = blank_state(Name),
@@ -520,32 +524,6 @@ start(VHost, DurableQueueNames) ->
{OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
-read_global_recovery_terms(DurableQueueNames) ->
- ok = rabbit_recovery_terms:open_global_table(),
-
- DurableTerms =
- lists:foldl(
- fun(QName, RecoveryTerms) ->
- DirName = queue_name_to_dir_name(QName),
- RecoveryInfo = case rabbit_recovery_terms:read_global(DirName) of
- {error, _} -> non_clean_shutdown;
- {ok, Terms} -> Terms
- end,
- [RecoveryInfo | RecoveryTerms]
- end, [], DurableQueueNames),
-
- ok = rabbit_recovery_terms:close_global_table(),
- %% The backing queue interface requires that the queue recovery terms
- %% which come back from start/1 are in the same order as DurableQueueNames
- OrderedTerms = lists:reverse(DurableTerms),
- {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
-
-cleanup_global_recovery_terms() ->
- rabbit_file:recursive_delete([filename:join([queues_base_dir(), "queues"])]),
- rabbit_recovery_terms:delete_global_table(),
- ok.
-
-
stop(VHost) -> rabbit_recovery_terms:stop(VHost).
all_queue_directory_names(VHost) ->
@@ -567,10 +545,9 @@ erase_index_dir(Dir) ->
end.
blank_state(QueueName) ->
- blank_state_dir(queue_dir(QueueName)).
-
-blank_state_dir(Dir) ->
- blank_state_dir_funs(Dir,
+ Dir = queue_dir(QueueName),
+ blank_state_name_dir_funs(QueueName,
+ Dir,
fun (_) -> ok end,
fun (_) -> ok end).
@@ -581,7 +558,20 @@ queue_dir(#resource{ virtual_host = VHost } = QueueName) ->
QueueDir = queue_name_to_dir_name(QueueName),
filename:join([VHostDir, "queues", QueueDir]).
-blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun) ->
+queue_name_to_dir_name(#resource { kind = queue,
+ virtual_host = VHost,
+ name = QName }) ->
+ <<Num:128>> = erlang:md5(<<"queue", VHost/binary, QName/binary>>),
+ rabbit_misc:format("~.36B", [Num]).
+
+queue_name_to_dir_name_legacy(Name = #resource { kind = queue }) ->
+ <<Num:128>> = erlang:md5(term_to_binary_compat:term_to_binary_1(Name)),
+ rabbit_misc:format("~.36B", [Num]).
+
+queues_base_dir() ->
+ rabbit_mnesia:dir().
+
+blank_state_name_dir_funs(Name, Dir, OnSyncFun, OnSyncMsgFun) ->
{ok, MaxJournal} =
application:get_env(rabbit, queue_index_max_journal_entries),
#qistate { dir = Dir,
@@ -594,7 +584,8 @@ blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun) ->
unconfirmed = gb_sets:new(),
unconfirmed_msg = gb_sets:new(),
pre_publish_cache = [],
- delivered_cache = [] }.
+ delivered_cache = [],
+ queue_name = Name }.
init_clean(RecoveredCounts, State) ->
%% Load the journal. Since this is a clean recovery this (almost)
@@ -690,13 +681,6 @@ recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}) ->
add_to_journal(RelSeq, del, Segment)),
DirtyCount + 2}.
-queue_name_to_dir_name(Name = #resource { kind = queue }) ->
- <<Num:128>> = erlang:md5(term_to_binary_compat:term_to_binary_1(Name)),
- rabbit_misc:format("~.36B", [Num]).
-
-queues_base_dir() ->
- rabbit_mnesia:dir().
-
%%----------------------------------------------------------------------------
%% msg store startup delta function
%%----------------------------------------------------------------------------
@@ -890,9 +874,11 @@ append_journal_to_segment(#segment { journal_entries = JEntries,
end.
get_journal_handle(State = #qistate { journal_handle = undefined,
- dir = Dir }) ->
+ dir = Dir,
+ queue_name = Name }) ->
Path = filename:join(Dir, ?JOURNAL_FILENAME),
ok = rabbit_file:ensure_dir(Path),
+ ok = ensure_queue_name_stub_file(Dir, Name),
{ok, Hdl} = file_handle_cache:open_with_absolute_path(
Path, ?WRITE_MODE, [{write_buffer, infinity}]),
{Hdl, State #qistate { journal_handle = Hdl }};
@@ -1413,7 +1399,8 @@ store_msg_segment(_) ->
-
+%%----------------------------------------------------------------------------
+%% Migration functions
%%----------------------------------------------------------------------------
foreach_queue_index(Funs) ->
@@ -1467,18 +1454,50 @@ drive_transform_fun(Fun, Hdl, Contents) ->
move_to_per_vhost_stores(#resource{} = QueueName) ->
OldQueueDir = filename:join([queues_base_dir(), "queues",
- queue_name_to_dir_name(QueueName)]),
+ queue_name_to_dir_name_legacy(QueueName)]),
NewQueueDir = queue_dir(QueueName),
case rabbit_file:is_dir(OldQueueDir) of
true ->
ok = rabbit_file:ensure_dir(NewQueueDir),
- ok = rabbit_file:rename(OldQueueDir, NewQueueDir);
+ ok = rabbit_file:rename(OldQueueDir, NewQueueDir),
+ ok = ensure_queue_name_stub_file(NewQueueDir, QueueName);
false ->
rabbit_log:info("Queue index directory not found for queue ~p~n",
[QueueName])
end,
ok.
+ensure_queue_name_stub_file(Dir, #resource{virtual_host = VHost, name = QName}) ->
+ QueueNameFile = filename:join(Dir, ?QUEUE_NAME_STUB_FILE),
+ file:write_file(QueueNameFile, <<"VHOST: ", VHost/binary, "\n",
+ "QUEUE: ", QName/binary, "\n">>).
+
+read_global_recovery_terms(DurableQueueNames) ->
+ ok = rabbit_recovery_terms:open_global_table(),
+
+ DurableTerms =
+ lists:foldl(
+ fun(QName, RecoveryTerms) ->
+ DirName = queue_name_to_dir_name_legacy(QName),
+ RecoveryInfo = case rabbit_recovery_terms:read_global(DirName) of
+ {error, _} -> non_clean_shutdown;
+ {ok, Terms} -> Terms
+ end,
+ [RecoveryInfo | RecoveryTerms]
+ end, [], DurableQueueNames),
+
+ ok = rabbit_recovery_terms:close_global_table(),
+ %% The backing queue interface requires that the queue recovery terms
+ %% which come back from start/1 are in the same order as DurableQueueNames
+ OrderedTerms = lists:reverse(DurableTerms),
+ {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
+
+cleanup_global_recovery_terms() ->
+ rabbit_file:recursive_delete([filename:join([queues_base_dir(), "queues"])]),
+ rabbit_recovery_terms:delete_global_table(),
+ ok.
+
+
update_recovery_term(#resource{virtual_host = VHost} = QueueName, Term) ->
Key = queue_name_to_dir_name(QueueName),
rabbit_recovery_terms:store(VHost, Key, Term).
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index 1b3cfb58c6..ab39d86659 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -53,7 +53,7 @@
-export([parse_set/5, set/5, set_any/5, clear/4, clear_any/4, list/0, list/1,
list_component/1, list/2, list_formatted/1, list_formatted/3,
- lookup/3, value/3, value/4, info_keys/0, clear_component/1]).
+ lookup/3, value/3, value/4, info_keys/0, clear_component/2]).
-export([parse_set_global/3, set_global/3, value_global/1, value_global/2,
list_global/0, list_global_formatted/0, list_global_formatted/2,
@@ -95,7 +95,7 @@
%%---------------------------------------------------------------------------
--import(rabbit_misc, [pget/2, pset/3]).
+-import(rabbit_misc, [pget/2]).
-define(TABLE, rabbit_runtime_parameters).
@@ -228,14 +228,15 @@ clear_global(Key, ActingUser) ->
end
end.
-clear_component(Component) ->
- case rabbit_runtime_parameters:list_component(Component) of
+clear_component(Component, ActingUser) ->
+ case list_component(Component) of
[] ->
ok;
Xs ->
- [rabbit_runtime_parameters:clear(pget(vhost, X),
- pget(component, X),
- pget(name, X))|| X <- Xs],
+ [clear(pget(vhost, X),
+ pget(component, X),
+ pget(name, X),
+ ActingUser) || X <- Xs],
ok
end.
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 290762698d..4dc2ec86d0 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -219,7 +219,7 @@ set_limits(VHost = #vhost{}, Limits) ->
dir(Vhost) ->
- <<Num:128>> = erlang:md5(term_to_binary(Vhost)),
+ <<Num:128>> = erlang:md5(Vhost),
rabbit_misc:format("~.36B", [Num]).
msg_store_dir_path(VHost) ->
diff --git a/test/cluster_SUITE.erl b/test/cluster_SUITE.erl
index 3dba65ae1f..4864989b6a 100644
--- a/test/cluster_SUITE.erl
+++ b/test/cluster_SUITE.erl
@@ -123,11 +123,6 @@ delegates_async1(_Config, SecondaryNode) ->
ok = delegate:invoke_no_result(spawn(SecondaryNode, Responder), Sender),
await_response(2),
- LocalPids = spawn_responders(node(), Responder, 10),
- RemotePids = spawn_responders(SecondaryNode, Responder, 10),
- ok = delegate:invoke_no_result(LocalPids ++ RemotePids, Sender),
- await_response(20),
-
passed.
delegates_sync(Config) ->
diff --git a/test/clustering_management_SUITE.erl b/test/clustering_management_SUITE.erl
index 51c0928ba5..2a23c4997e 100644
--- a/test/clustering_management_SUITE.erl
+++ b/test/clustering_management_SUITE.erl
@@ -402,6 +402,7 @@ change_cluster_node_type(Config) ->
assert_cluster_status({[Rabbit, Hare], [Hare], [Hare]},
[Rabbit, Hare]),
change_cluster_node_type(Rabbit, disc),
+
assert_cluster_status({[Rabbit, Hare], [Rabbit, Hare], [Hare]},
[Rabbit, Hare]),
change_cluster_node_type(Rabbit, ram),
diff --git a/test/unit_SUITE.erl b/test/unit_SUITE.erl
index b3ad7e4fc3..29c72eacac 100644
--- a/test/unit_SUITE.erl
+++ b/test/unit_SUITE.erl
@@ -389,29 +389,23 @@ decrypt_start_app_wrong_passphrase(Config) ->
rabbitmqctl_encode(_Config) ->
% list ciphers and hashes
- {ok, _} = rabbit_control_pbe:encode(true, false, undefined, undefined, undefined, undefined, undefined),
- {ok, _} = rabbit_control_pbe:encode(false, true, undefined, undefined, undefined, undefined, undefined),
+ {ok, _} = rabbit_control_pbe:list_ciphers(),
+ {ok, _} = rabbit_control_pbe:list_hashes(),
% incorrect ciphers, hashes and iteration number
- {error, _} = rabbit_control_pbe:encode(false, false, undefined, funny_cipher, undefined, undefined, undefined),
- {error, _} = rabbit_control_pbe:encode(false, false, undefined, undefined, funny_hash, undefined, undefined),
- {error, _} = rabbit_control_pbe:encode(false, false, undefined, undefined, undefined, -1, undefined),
- {error, _} = rabbit_control_pbe:encode(false, false, undefined, undefined, undefined, 0, undefined),
+ {error, _} = rabbit_control_pbe:encode(funny_cipher, undefined, undefined, undefined),
+ {error, _} = rabbit_control_pbe:encode(undefined, funny_hash, undefined, undefined),
+ {error, _} = rabbit_control_pbe:encode(undefined, undefined, -1, undefined),
+ {error, _} = rabbit_control_pbe:encode(undefined, undefined, 0, undefined),
% incorrect number of arguments
{error, _} = rabbit_control_pbe:encode(
- false, false,
- false, % encrypt
rabbit_pbe:default_cipher(), rabbit_pbe:default_hash(), rabbit_pbe:default_iterations(),
[]
),
{error, _} = rabbit_control_pbe:encode(
- false, false,
- false, % encrypt
rabbit_pbe:default_cipher(), rabbit_pbe:default_hash(), rabbit_pbe:default_iterations(),
[undefined]
),
{error, _} = rabbit_control_pbe:encode(
- false, false,
- false, % encrypt
rabbit_pbe:default_cipher(), rabbit_pbe:default_hash(), rabbit_pbe:default_iterations(),
[undefined, undefined, undefined]
),
@@ -429,38 +423,28 @@ rabbitmqctl_encode(_Config) ->
rabbitmqctl_encode_encrypt_decrypt(Secret) ->
PassPhrase = "passphrase",
{ok, Output} = rabbit_control_pbe:encode(
- false, false,
- false, % encrypt
rabbit_pbe:default_cipher(), rabbit_pbe:default_hash(), rabbit_pbe:default_iterations(),
[Secret, PassPhrase]
),
{encrypted, Encrypted} = rabbit_control_pbe:evaluate_input_as_term(lists:flatten(Output)),
- {ok, Result} = rabbit_control_pbe:encode(
- false, false,
- true, % decrypt
+ {ok, Result} = rabbit_control_pbe:decode(
rabbit_pbe:default_cipher(), rabbit_pbe:default_hash(), rabbit_pbe:default_iterations(),
[lists:flatten(io_lib:format("~p", [Encrypted])), PassPhrase]
),
Secret = lists:flatten(Result),
% decrypt with {encrypted, ...} form as input
- {ok, Result} = rabbit_control_pbe:encode(
- false, false,
- true, % decrypt
+ {ok, Result} = rabbit_control_pbe:decode(
rabbit_pbe:default_cipher(), rabbit_pbe:default_hash(), rabbit_pbe:default_iterations(),
[lists:flatten(io_lib:format("~p", [{encrypted, Encrypted}])), PassPhrase]
),
% wrong passphrase
- {error, _} = rabbit_control_pbe:encode(
- false, false,
- true, % decrypt
+ {error, _} = rabbit_control_pbe:decode(
rabbit_pbe:default_cipher(), rabbit_pbe:default_hash(), rabbit_pbe:default_iterations(),
[lists:flatten(io_lib:format("~p", [Encrypted])), PassPhrase ++ " "]
),
- {error, _} = rabbit_control_pbe:encode(
- false, false,
- true, % decrypt
+ {error, _} = rabbit_control_pbe:decode(
rabbit_pbe:default_cipher(), rabbit_pbe:default_hash(), rabbit_pbe:default_iterations(),
[lists:flatten(io_lib:format("~p", [{encrypted, Encrypted}])), PassPhrase ++ " "]
)