diff options
| -rw-r--r-- | docs/rabbitmqctl.8 | 49 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 65 | ||||
| -rw-r--r-- | src/rabbit_control_pbe.erl | 105 | ||||
| -rw-r--r-- | src/rabbit_exchange_parameters.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_looking_glass.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 111 | ||||
| -rw-r--r-- | src/rabbit_runtime_parameters.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 2 | ||||
| -rw-r--r-- | test/cluster_SUITE.erl | 5 | ||||
| -rw-r--r-- | test/clustering_management_SUITE.erl | 1 | ||||
| -rw-r--r-- | test/unit_SUITE.erl | 36 |
11 files changed, 217 insertions, 179 deletions
diff --git a/docs/rabbitmqctl.8 b/docs/rabbitmqctl.8 index 6f8d40eaf2..7a8497a89b 100644 --- a/docs/rabbitmqctl.8 +++ b/docs/rabbitmqctl.8 @@ -1811,42 +1811,55 @@ floating point number. Values lower than 1.0 can be dangerous and should be used carefully. .El .\" ------------------------------------ -.It Cm encode Oo Fl -decode Oc Oo Ar value Oc Oo Ar passphrase Oc Oo Fl -list-ciphers Oc Oo Fl -list-hashes Oc Oo Fl -cipher Ar cipher Oc Oo Fl -hash Ar hash Oc Op Fl -iterations Ar iterations +.It Cm encode Ar value Ar passphrase Oo Fl -cipher Ar cipher Oc Oo Fl -hash Ar hash Oc Op Fl -iterations Ar iterations .Bl -tag -width Ds -.It Fl -decode -Flag to decrypt the input value. -.Pp -For example: -.sp -.Dl rabbitmqctl encode --decode '{encrypted,<<"...">>}' mypassphrase .It Ar value Ar passphrase -Value to encrypt/decrypt and passphrase. +Value to encrypt and passphrase. .Pp For example: .sp .Dl rabbitmqctl encode '<<"guest">>' mypassphrase -.sp -.Dl rabbitmqctl encode --decode '{encrypted,<<"...">>}' mypassphrase -.It Fl -list-ciphers -Flag to list the supported ciphers. +.It Fl -cipher Ar cipher Fl -hash Ar hash Fl -iterations Ar iterations +Options to specify the encryption settings. +They can be used independently. .Pp For example: .sp -.Dl rabbitmqctl encode --list-ciphers -.It Fl -list-hashes -Flag to list the supported hash algorithms. +.Dl rabbitmqctl encode --cipher blowfish_cfb64 --hash sha256 --iterations 10000 '<<"guest">>' mypassphrase +.El +.\" ------------------------------------ +.It Cm decode Ar value Ar passphrase Oo Fl -cipher Ar cipher Oc Oo Fl -hash Ar hash Oc Op Fl -iterations Ar iterations +.Bl -tag -width Ds +.It Ar value Ar passphrase +Value to decrypt (as produced by the encode command) and passphrase. .Pp For example: .sp -.Dl rabbitmqctl encode --list-hashes +.Dl rabbitmqctl decode '{encrypted, <<"...">>}' mypassphrase .It Fl -cipher Ar cipher Fl -hash Ar hash Fl -iterations Ar iterations -Options to specify the encryption settings. +Options to specify the decryption settings. They can be used independently. .Pp For example: .sp -.Dl rabbitmqctl encode --cipher blowfish_cfb64 --hash sha256 --iterations 10000 '<<"guest">>' mypassphrase +.Dl rabbitmqctl decode --cipher blowfish_cfb64 --hash sha256 --iterations 10000 '{encrypted,<<"...">>} mypassphrase .El +.\" ------------------------------------ +.It Cm list_hashes +Lists hash functions supported by encoding commands. +.Pp +For example, this command instructs the RabbitMQ broker to list all hash +functions supported by encoding commands: +.sp +.Dl rabbitmqctl list_hashes +.\" ------------------------------------ +.It Cm list_ciphers +Lists cipher suites supported by encoding commands. +.Pp +For example, this command instructs the RabbitMQ broker to list all +cipher suites supported by encoding commands: +.sp +.Dl rabbitmqctl list_ciphers .El .\" ------------------------------------------------------------------ .Sh PLUGIN COMMANDS diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 03dac03b7e..bd6199c166 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -633,12 +633,12 @@ info_keys() -> rabbit_amqqueue_process:info_keys(). map(Qs, F) -> rabbit_misc:filter_exit_map(F, Qs). info(Q = #amqqueue{ state = crashed }) -> info_down(Q, crashed); -info(#amqqueue{ pid = QPid }) -> delegate:call(QPid, info). +info(#amqqueue{ pid = QPid }) -> delegate:invoke(QPid, {gen_server2, call, [info, infinity]}). info(Q = #amqqueue{ state = crashed }, Items) -> info_down(Q, Items, crashed); info(#amqqueue{ pid = QPid }, Items) -> - case delegate:call(QPid, {info, Items}) of + case delegate:invoke(QPid, {gen_server2, call, [{info, Items}, infinity]}) of {ok, Res} -> Res; {error, Error} -> throw(Error) end. @@ -699,7 +699,8 @@ force_event_refresh(Ref) -> notify_policy_changed(#amqqueue{pid = QPid}) -> gen_server2:cast(QPid, policy_changed). -consumers(#amqqueue{ pid = QPid }) -> delegate:call(QPid, consumers). +consumers(#amqqueue{ pid = QPid }) -> + delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}). consumer_info_keys() -> ?CONSUMER_INFO_KEYS. @@ -727,7 +728,7 @@ get_queue_consumer_info(Q, ConsumerInfoKeys) -> AckRequired, Prefetch, Args]) || {ChPid, CTag, AckRequired, Prefetch, Args, _} <- consumers(Q)]. -stat(#amqqueue{pid = QPid}) -> delegate:call(QPid, stat). +stat(#amqqueue{pid = QPid}) -> delegate:invoke(QPid, {gen_server2, call, [stat, infinity]}). pid_of(#amqqueue{pid = Pid}) -> Pid. pid_of(VHost, QueueName) -> @@ -745,7 +746,7 @@ delete_immediately(QPids) -> ok. delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty, ActingUser) -> - delegate:call(QPid, {delete, IfUnused, IfEmpty, ActingUser}). + delegate:invoke(QPid, {gen_server2, call, [{delete, IfUnused, IfEmpty, ActingUser}, infinity]}). delete_crashed(Q) -> delete_crashed(Q, ?INTERNAL_USER). @@ -758,21 +759,24 @@ delete_crashed_internal(Q = #amqqueue{ name = QName }, ActingUser) -> BQ:delete_crashed(Q), ok = internal_delete(QName, ActingUser). -purge(#amqqueue{ pid = QPid }) -> delegate:call(QPid, purge). +purge(#amqqueue{ pid = QPid }) -> + delegate:invoke(QPid, {gen_server2, call, [purge, infinity]}). -requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}). +requeue(QPid, MsgIds, ChPid) -> + delegate:invoke(QPid, {gen_server2, call, [{requeue, MsgIds, ChPid}, infinity]}). -ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}). +ack(QPid, MsgIds, ChPid) -> + delegate:invoke_no_result(QPid, {gen_server2, cast, [{ack, MsgIds, ChPid}]}). reject(QPid, Requeue, MsgIds, ChPid) -> - delegate:cast(QPid, {reject, Requeue, MsgIds, ChPid}). + delegate:invoke_no_result(QPid, {gen_server2, cast, [{reject, Requeue, MsgIds, ChPid}]}). notify_down_all(QPids, ChPid) -> notify_down_all(QPids, ChPid, ?CHANNEL_OPERATION_TIMEOUT). notify_down_all(QPids, ChPid, Timeout) -> - case rpc:call(node(), delegate, call, - [QPids, {notify_down, ChPid}], Timeout) of + case rpc:call(node(), delegate, invoke, + [QPids, {gen_server2, call, [{notify_down, ChPid}, infinity]}], Timeout) of {badrpc, timeout} -> {error, {channel_operation_timeout, Timeout}}; {badrpc, Reason} -> {error, Reason}; {_, Bads} -> @@ -788,27 +792,29 @@ notify_down_all(QPids, ChPid, Timeout) -> end. activate_limit_all(QPids, ChPid) -> - delegate:cast(QPids, {activate_limit, ChPid}). + delegate:invoke_no_result(QPids, {gen_server2, cast, [{activate_limit, ChPid}]}). credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) -> - delegate:cast(QPid, {credit, ChPid, CTag, Credit, Drain}). + delegate:invoke_no_result(QPid, {gen_server2, cast, [{credit, ChPid, CTag, Credit, Drain}]}). basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) -> - delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}). + delegate:invoke(QPid, {gen_server2, call, [{basic_get, ChPid, NoAck, LimiterPid}, infinity]}). basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid, LimiterPid, LimiterActive, ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg, ActingUser) -> ok = check_consume_arguments(QName, Args), - delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, - Args, OkMsg, ActingUser}). + delegate:invoke(QPid, {gen_server2, call, + [{basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, + ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, + Args, OkMsg, ActingUser}, infinity]}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg, ActingUser) -> - delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}). + delegate:invoke(QPid, {gen_server2, call, + [{basic_cancel, ChPid, ConsumerTag, OkMsg, ActingUser}, infinity]}). notify_decorators(#amqqueue{pid = QPid}) -> - delegate:cast(QPid, notify_decorators). + delegate:invoke_no_result(QPid, {gen_server2, cast, [notify_decorators]}). notify_sent(QPid, ChPid) -> rabbit_amqqueue_common:notify_sent(QPid, ChPid). @@ -816,7 +822,7 @@ notify_sent(QPid, ChPid) -> notify_sent_queue_down(QPid) -> rabbit_amqqueue_common:notify_sent_queue_down(QPid). -resume(QPid, ChPid) -> delegate:cast(QPid, {resume, ChPid}). +resume(QPid, ChPid) -> delegate:invoke_no_result(QPid, {gen_server2, cast, [{resume, ChPid}]}). internal_delete1(QueueName, OnlyDurable) -> ok = mnesia:delete({rabbit_queue, QueueName}), @@ -913,12 +919,17 @@ set_ram_duration_target(QPid, Duration) -> set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). -update_mirroring(QPid) -> ok = delegate:cast(QPid, update_mirroring). +update_mirroring(QPid) -> + ok = delegate:invoke_no_result(QPid, {gen_server2, cast, [update_mirroring]}). -sync_mirrors(#amqqueue{pid = QPid}) -> delegate:call(QPid, sync_mirrors); -sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors). -cancel_sync_mirrors(#amqqueue{pid = QPid}) -> delegate:call(QPid, cancel_sync_mirrors); -cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mirrors). +sync_mirrors(#amqqueue{pid = QPid}) -> + delegate:invoke(QPid, {gen_server2, call, [sync_mirrors, infinity]}); +sync_mirrors(QPid) -> + delegate:invoke(QPid, {gen_server2, call, [sync_mirrors, infinity]}). +cancel_sync_mirrors(#amqqueue{pid = QPid}) -> + delegate:invoke(QPid, {gen_server2, call, [cancel_sync_mirrors, infinity]}); +cancel_sync_mirrors(QPid) -> + delegate:invoke(QPid, {gen_server2, call, [cancel_sync_mirrors, infinity]}). is_mirrored(Q) -> rabbit_mirror_queue_misc:is_mirrored(Q). @@ -1037,8 +1048,8 @@ deliver(Qs, Delivery = #delivery{flow = Flow}) -> %% done with it. MMsg = {deliver, Delivery, false}, SMsg = {deliver, Delivery, true}, - delegate:cast(MPids, MMsg), - delegate:cast(SPids, SMsg), + delegate:invoke_no_result(MPids, {gen_server2, cast, [MMsg]}), + delegate:invoke_no_result(SPids, {gen_server2, cast, [SMsg]}), QPids. qpids([]) -> {[], []}; %% optimisation diff --git a/src/rabbit_control_pbe.erl b/src/rabbit_control_pbe.erl index ff498ed4aa..40d8741d74 100644 --- a/src/rabbit_control_pbe.erl +++ b/src/rabbit_control_pbe.erl @@ -16,61 +16,74 @@ -module(rabbit_control_pbe). --export([encode/7]). +-export([decode/4, encode/4, list_ciphers/0, list_hashes/0]). % for testing purposes -export([evaluate_input_as_term/1]). -encode(ListCiphers, _ListHashes, _Decode, _Cipher, _Hash, _Iterations, _Args) when ListCiphers -> - {ok, io_lib:format("~p", [rabbit_pbe:supported_ciphers()])}; +list_ciphers() -> + {ok, io_lib:format("~p", [rabbit_pbe:supported_ciphers()])}. -encode(_ListCiphers, ListHashes, _Decode, _Cipher, _Hash, _Iterations, _Args) when ListHashes -> - {ok, io_lib:format("~p", [rabbit_pbe:supported_hashes()])}; +list_hashes() -> + {ok, io_lib:format("~p", [rabbit_pbe:supported_hashes()])}. -encode(_ListCiphers, _ListHashes, Decode, Cipher, Hash, Iterations, Args) -> - CipherExists = lists:member(Cipher, rabbit_pbe:supported_ciphers()), - HashExists = lists:member(Hash, rabbit_pbe:supported_hashes()), - encode_encrypt_decrypt(CipherExists, HashExists, Decode, Cipher, Hash, Iterations, Args). - -encode_encrypt_decrypt(CipherExists, _HashExists, _Decode, _Cipher, _Hash, _Iterations, _Args) when CipherExists =:= false -> - {error, io_lib:format("The requested cipher is not supported", [])}; - -encode_encrypt_decrypt(_CipherExists, HashExists, _Decode, _Cipher, _Hash, _Iterations, _Args) when HashExists =:= false -> - {error, io_lib:format("The requested hash is not supported", [])}; - -encode_encrypt_decrypt(_CipherExists, _HashExists, _Decode, _Cipher, _Hash, Iterations, _Args) when Iterations =< 0 -> +validate(_Cipher, _Hash, Iterations, _Args) when Iterations =< 0 -> {error, io_lib:format("The requested number of iterations is incorrect", [])}; +validate(_Cipher, _Hash, _Iterations, Args) when length(Args) < 2 -> + {error, io_lib:format("Please provide a value to encode/decode and a passphrase", [])}; +validate(_Cipher, _Hash, _Iterations, Args) when length(Args) > 2 -> + {error, io_lib:format("Too many arguments. Please provide a value to encode/decode and a passphrase", [])}; +validate(Cipher, Hash, _Iterations, _Args) -> + case lists:member(Cipher, rabbit_pbe:supported_ciphers()) of + false -> + {error, io_lib:format("The requested cipher is not supported", [])}; + true -> + case lists:member(Hash, rabbit_pbe:supported_hashes()) of + false -> + {error, io_lib:format("The requested hash is not supported", [])}; + true -> ok + end + end. -encode_encrypt_decrypt(_CipherExists, _HashExists, Decode, Cipher, Hash, Iterations, Args) when length(Args) == 2, Decode =:= false -> - [Value, PassPhrase] = Args, - try begin - TermValue = evaluate_input_as_term(Value), - Result = rabbit_pbe:encrypt_term(Cipher, Hash, Iterations, list_to_binary(PassPhrase), TermValue), - {ok, io_lib:format("~p", [{encrypted, Result}])} - end - catch - _:Msg -> {error, io_lib:format("Error during cipher operation: ~p", [Msg])} - end; - -encode_encrypt_decrypt(_CipherExists, _HashExists, Decode, Cipher, Hash, Iterations, Args) when length(Args) == 2, Decode -> - [Value, PassPhrase] = Args, - try begin - TermValue = evaluate_input_as_term(Value), - TermToDecrypt = case TermValue of - {encrypted, EncryptedTerm} -> - EncryptedTerm; - _ -> - TermValue - end, - Result = rabbit_pbe:decrypt_term(Cipher, Hash, Iterations, list_to_binary(PassPhrase), TermToDecrypt), - {ok, io_lib:format("~p", [Result])} - end - catch - _:Msg -> {error, io_lib:format("Error during cipher operation: ~p", [Msg])} - end; +encode(Cipher, Hash, Iterations, Args) -> + case validate(Cipher, Hash, Iterations, Args) of + {error, Err} -> {error, Err}; + ok -> + [Value, PassPhrase] = Args, + try begin + TermValue = evaluate_input_as_term(Value), + Result = rabbit_pbe:encrypt_term(Cipher, Hash, Iterations, + list_to_binary(PassPhrase), + TermValue), + {ok, io_lib:format("~p", [{encrypted, Result}])} + end + catch + _:Msg -> {error, io_lib:format("Error during cipher operation: ~p", [Msg])} + end + end. -encode_encrypt_decrypt(_CipherExists, _HashExists, _Decode, _Cipher, _Hash, _Iterations, _Args) -> - {error, io_lib:format("Please provide a value to encode/decode and a passphrase", [])}. +decode(Cipher, Hash, Iterations, Args) -> + case validate(Cipher, Hash, Iterations, Args) of + {error, Err} -> {error, Err}; + ok -> + [Value, PassPhrase] = Args, + try begin + TermValue = evaluate_input_as_term(Value), + TermToDecrypt = case TermValue of + {encrypted, EncryptedTerm} -> + EncryptedTerm; + _ -> + TermValue + end, + Result = rabbit_pbe:decrypt_term(Cipher, Hash, Iterations, + list_to_binary(PassPhrase), + TermToDecrypt), + {ok, io_lib:format("~p", [Result])} + end + catch + _:Msg -> {error, io_lib:format("Error during cipher operation: ~p", [Msg])} + end + end. evaluate_input_as_term(Input) -> {ok,Tokens,_EndLine} = erl_scan:string(Input ++ "."), diff --git a/src/rabbit_exchange_parameters.erl b/src/rabbit_exchange_parameters.erl index feba5e255b..2c95a823ee 100644 --- a/src/rabbit_exchange_parameters.erl +++ b/src/rabbit_exchange_parameters.erl @@ -23,8 +23,6 @@ -export([register/0]). -export([validate/5, notify/5, notify_clear/4]). --import(rabbit_misc, [pget/2]). - -rabbit_boot_step({?MODULE, [{description, "exchange parameters"}, {mfa, {rabbit_exchange_parameters, register, []}}, @@ -36,7 +34,8 @@ register() -> ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, ?MODULE), %% ensure there are no leftovers from before node restart/crash rabbit_runtime_parameters:clear_component( - ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT), + ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, + ?INTERNAL_USER), ok. validate(_VHost, ?EXCHANGE_DELETE_IN_PROGRESS_COMPONENT, _Name, _Term, _User) -> diff --git a/src/rabbit_looking_glass.erl b/src/rabbit_looking_glass.erl index 71d7b067b8..c6c353d552 100644 --- a/src/rabbit_looking_glass.erl +++ b/src/rabbit_looking_glass.erl @@ -16,6 +16,8 @@ -module(rabbit_looking_glass). +-ignore_xref([{lg, trace, 4}]). + -export([boot/0]). -export([connections/0]). diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index ee172d48e5..e9776f47ad 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -116,6 +116,7 @@ %% ---- Journal details ---- -define(JOURNAL_FILENAME, "journal.jif"). +-define(QUEUE_NAME_STUB_FILE, ".queue_name"). -define(PUB_PERSIST_JPREFIX, 2#00). -define(PUB_TRANS_JPREFIX, 2#01). @@ -204,7 +205,9 @@ %% optimisation pre_publish_cache, %% optimisation - delivered_cache}). + delivered_cache, + %% queue name resource record + queue_name}). -record(segment, { %% segment ID (an integer) @@ -295,7 +298,8 @@ erase(Name) -> erase_index_dir(Dir). %% used during variable queue purge when there are no pending acks -reset_state(#qistate{ dir = Dir, +reset_state(#qistate{ queue_name = Name, + dir = Dir, on_sync = OnSyncFun, on_sync_msg = OnSyncMsgFun, journal_handle = JournalHdl }) -> @@ -304,7 +308,7 @@ reset_state(#qistate{ dir = Dir, _ -> file_handle_cache:close(JournalHdl) end, ok = erase_index_dir(Dir), - blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun). + blank_state_name_dir_funs(Name, Dir, OnSyncFun, OnSyncMsgFun). init(Name, OnSyncFun, OnSyncMsgFun) -> State = #qistate { dir = Dir } = blank_state(Name), @@ -520,32 +524,6 @@ start(VHost, DurableQueueNames) -> {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. -read_global_recovery_terms(DurableQueueNames) -> - ok = rabbit_recovery_terms:open_global_table(), - - DurableTerms = - lists:foldl( - fun(QName, RecoveryTerms) -> - DirName = queue_name_to_dir_name(QName), - RecoveryInfo = case rabbit_recovery_terms:read_global(DirName) of - {error, _} -> non_clean_shutdown; - {ok, Terms} -> Terms - end, - [RecoveryInfo | RecoveryTerms] - end, [], DurableQueueNames), - - ok = rabbit_recovery_terms:close_global_table(), - %% The backing queue interface requires that the queue recovery terms - %% which come back from start/1 are in the same order as DurableQueueNames - OrderedTerms = lists:reverse(DurableTerms), - {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. - -cleanup_global_recovery_terms() -> - rabbit_file:recursive_delete([filename:join([queues_base_dir(), "queues"])]), - rabbit_recovery_terms:delete_global_table(), - ok. - - stop(VHost) -> rabbit_recovery_terms:stop(VHost). all_queue_directory_names(VHost) -> @@ -567,10 +545,9 @@ erase_index_dir(Dir) -> end. blank_state(QueueName) -> - blank_state_dir(queue_dir(QueueName)). - -blank_state_dir(Dir) -> - blank_state_dir_funs(Dir, + Dir = queue_dir(QueueName), + blank_state_name_dir_funs(QueueName, + Dir, fun (_) -> ok end, fun (_) -> ok end). @@ -581,7 +558,20 @@ queue_dir(#resource{ virtual_host = VHost } = QueueName) -> QueueDir = queue_name_to_dir_name(QueueName), filename:join([VHostDir, "queues", QueueDir]). -blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun) -> +queue_name_to_dir_name(#resource { kind = queue, + virtual_host = VHost, + name = QName }) -> + <<Num:128>> = erlang:md5(<<"queue", VHost/binary, QName/binary>>), + rabbit_misc:format("~.36B", [Num]). + +queue_name_to_dir_name_legacy(Name = #resource { kind = queue }) -> + <<Num:128>> = erlang:md5(term_to_binary_compat:term_to_binary_1(Name)), + rabbit_misc:format("~.36B", [Num]). + +queues_base_dir() -> + rabbit_mnesia:dir(). + +blank_state_name_dir_funs(Name, Dir, OnSyncFun, OnSyncMsgFun) -> {ok, MaxJournal} = application:get_env(rabbit, queue_index_max_journal_entries), #qistate { dir = Dir, @@ -594,7 +584,8 @@ blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun) -> unconfirmed = gb_sets:new(), unconfirmed_msg = gb_sets:new(), pre_publish_cache = [], - delivered_cache = [] }. + delivered_cache = [], + queue_name = Name }. init_clean(RecoveredCounts, State) -> %% Load the journal. Since this is a clean recovery this (almost) @@ -690,13 +681,6 @@ recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}) -> add_to_journal(RelSeq, del, Segment)), DirtyCount + 2}. -queue_name_to_dir_name(Name = #resource { kind = queue }) -> - <<Num:128>> = erlang:md5(term_to_binary_compat:term_to_binary_1(Name)), - rabbit_misc:format("~.36B", [Num]). - -queues_base_dir() -> - rabbit_mnesia:dir(). - %%---------------------------------------------------------------------------- %% msg store startup delta function %%---------------------------------------------------------------------------- @@ -890,9 +874,11 @@ append_journal_to_segment(#segment { journal_entries = JEntries, end. get_journal_handle(State = #qistate { journal_handle = undefined, - dir = Dir }) -> + dir = Dir, + queue_name = Name }) -> Path = filename:join(Dir, ?JOURNAL_FILENAME), ok = rabbit_file:ensure_dir(Path), + ok = ensure_queue_name_stub_file(Dir, Name), {ok, Hdl} = file_handle_cache:open_with_absolute_path( Path, ?WRITE_MODE, [{write_buffer, infinity}]), {Hdl, State #qistate { journal_handle = Hdl }}; @@ -1413,7 +1399,8 @@ store_msg_segment(_) -> - +%%---------------------------------------------------------------------------- +%% Migration functions %%---------------------------------------------------------------------------- foreach_queue_index(Funs) -> @@ -1467,18 +1454,50 @@ drive_transform_fun(Fun, Hdl, Contents) -> move_to_per_vhost_stores(#resource{} = QueueName) -> OldQueueDir = filename:join([queues_base_dir(), "queues", - queue_name_to_dir_name(QueueName)]), + queue_name_to_dir_name_legacy(QueueName)]), NewQueueDir = queue_dir(QueueName), case rabbit_file:is_dir(OldQueueDir) of true -> ok = rabbit_file:ensure_dir(NewQueueDir), - ok = rabbit_file:rename(OldQueueDir, NewQueueDir); + ok = rabbit_file:rename(OldQueueDir, NewQueueDir), + ok = ensure_queue_name_stub_file(NewQueueDir, QueueName); false -> rabbit_log:info("Queue index directory not found for queue ~p~n", [QueueName]) end, ok. +ensure_queue_name_stub_file(Dir, #resource{virtual_host = VHost, name = QName}) -> + QueueNameFile = filename:join(Dir, ?QUEUE_NAME_STUB_FILE), + file:write_file(QueueNameFile, <<"VHOST: ", VHost/binary, "\n", + "QUEUE: ", QName/binary, "\n">>). + +read_global_recovery_terms(DurableQueueNames) -> + ok = rabbit_recovery_terms:open_global_table(), + + DurableTerms = + lists:foldl( + fun(QName, RecoveryTerms) -> + DirName = queue_name_to_dir_name_legacy(QName), + RecoveryInfo = case rabbit_recovery_terms:read_global(DirName) of + {error, _} -> non_clean_shutdown; + {ok, Terms} -> Terms + end, + [RecoveryInfo | RecoveryTerms] + end, [], DurableQueueNames), + + ok = rabbit_recovery_terms:close_global_table(), + %% The backing queue interface requires that the queue recovery terms + %% which come back from start/1 are in the same order as DurableQueueNames + OrderedTerms = lists:reverse(DurableTerms), + {OrderedTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. + +cleanup_global_recovery_terms() -> + rabbit_file:recursive_delete([filename:join([queues_base_dir(), "queues"])]), + rabbit_recovery_terms:delete_global_table(), + ok. + + update_recovery_term(#resource{virtual_host = VHost} = QueueName, Term) -> Key = queue_name_to_dir_name(QueueName), rabbit_recovery_terms:store(VHost, Key, Term). diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl index 1b3cfb58c6..ab39d86659 100644 --- a/src/rabbit_runtime_parameters.erl +++ b/src/rabbit_runtime_parameters.erl @@ -53,7 +53,7 @@ -export([parse_set/5, set/5, set_any/5, clear/4, clear_any/4, list/0, list/1, list_component/1, list/2, list_formatted/1, list_formatted/3, - lookup/3, value/3, value/4, info_keys/0, clear_component/1]). + lookup/3, value/3, value/4, info_keys/0, clear_component/2]). -export([parse_set_global/3, set_global/3, value_global/1, value_global/2, list_global/0, list_global_formatted/0, list_global_formatted/2, @@ -95,7 +95,7 @@ %%--------------------------------------------------------------------------- --import(rabbit_misc, [pget/2, pset/3]). +-import(rabbit_misc, [pget/2]). -define(TABLE, rabbit_runtime_parameters). @@ -228,14 +228,15 @@ clear_global(Key, ActingUser) -> end end. -clear_component(Component) -> - case rabbit_runtime_parameters:list_component(Component) of +clear_component(Component, ActingUser) -> + case list_component(Component) of [] -> ok; Xs -> - [rabbit_runtime_parameters:clear(pget(vhost, X), - pget(component, X), - pget(name, X))|| X <- Xs], + [clear(pget(vhost, X), + pget(component, X), + pget(name, X), + ActingUser) || X <- Xs], ok end. diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 290762698d..4dc2ec86d0 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -219,7 +219,7 @@ set_limits(VHost = #vhost{}, Limits) -> dir(Vhost) -> - <<Num:128>> = erlang:md5(term_to_binary(Vhost)), + <<Num:128>> = erlang:md5(Vhost), rabbit_misc:format("~.36B", [Num]). msg_store_dir_path(VHost) -> diff --git a/test/cluster_SUITE.erl b/test/cluster_SUITE.erl index 3dba65ae1f..4864989b6a 100644 --- a/test/cluster_SUITE.erl +++ b/test/cluster_SUITE.erl @@ -123,11 +123,6 @@ delegates_async1(_Config, SecondaryNode) -> ok = delegate:invoke_no_result(spawn(SecondaryNode, Responder), Sender), await_response(2), - LocalPids = spawn_responders(node(), Responder, 10), - RemotePids = spawn_responders(SecondaryNode, Responder, 10), - ok = delegate:invoke_no_result(LocalPids ++ RemotePids, Sender), - await_response(20), - passed. delegates_sync(Config) -> diff --git a/test/clustering_management_SUITE.erl b/test/clustering_management_SUITE.erl index 51c0928ba5..2a23c4997e 100644 --- a/test/clustering_management_SUITE.erl +++ b/test/clustering_management_SUITE.erl @@ -402,6 +402,7 @@ change_cluster_node_type(Config) -> assert_cluster_status({[Rabbit, Hare], [Hare], [Hare]}, [Rabbit, Hare]), change_cluster_node_type(Rabbit, disc), + assert_cluster_status({[Rabbit, Hare], [Rabbit, Hare], [Hare]}, [Rabbit, Hare]), change_cluster_node_type(Rabbit, ram), diff --git a/test/unit_SUITE.erl b/test/unit_SUITE.erl index b3ad7e4fc3..29c72eacac 100644 --- a/test/unit_SUITE.erl +++ b/test/unit_SUITE.erl @@ -389,29 +389,23 @@ decrypt_start_app_wrong_passphrase(Config) -> rabbitmqctl_encode(_Config) -> % list ciphers and hashes - {ok, _} = rabbit_control_pbe:encode(true, false, undefined, undefined, undefined, undefined, undefined), - {ok, _} = rabbit_control_pbe:encode(false, true, undefined, undefined, undefined, undefined, undefined), + {ok, _} = rabbit_control_pbe:list_ciphers(), + {ok, _} = rabbit_control_pbe:list_hashes(), % incorrect ciphers, hashes and iteration number - {error, _} = rabbit_control_pbe:encode(false, false, undefined, funny_cipher, undefined, undefined, undefined), - {error, _} = rabbit_control_pbe:encode(false, false, undefined, undefined, funny_hash, undefined, undefined), - {error, _} = rabbit_control_pbe:encode(false, false, undefined, undefined, undefined, -1, undefined), - {error, _} = rabbit_control_pbe:encode(false, false, undefined, undefined, undefined, 0, undefined), + {error, _} = rabbit_control_pbe:encode(funny_cipher, undefined, undefined, undefined), + {error, _} = rabbit_control_pbe:encode(undefined, funny_hash, undefined, undefined), + {error, _} = rabbit_control_pbe:encode(undefined, undefined, -1, undefined), + {error, _} = rabbit_control_pbe:encode(undefined, undefined, 0, undefined), % incorrect number of arguments {error, _} = rabbit_control_pbe:encode( - false, false, - false, % encrypt rabbit_pbe:default_cipher(), rabbit_pbe:default_hash(), rabbit_pbe:default_iterations(), [] ), {error, _} = rabbit_control_pbe:encode( - false, false, - false, % encrypt rabbit_pbe:default_cipher(), rabbit_pbe:default_hash(), rabbit_pbe:default_iterations(), [undefined] ), {error, _} = rabbit_control_pbe:encode( - false, false, - false, % encrypt rabbit_pbe:default_cipher(), rabbit_pbe:default_hash(), rabbit_pbe:default_iterations(), [undefined, undefined, undefined] ), @@ -429,38 +423,28 @@ rabbitmqctl_encode(_Config) -> rabbitmqctl_encode_encrypt_decrypt(Secret) -> PassPhrase = "passphrase", {ok, Output} = rabbit_control_pbe:encode( - false, false, - false, % encrypt rabbit_pbe:default_cipher(), rabbit_pbe:default_hash(), rabbit_pbe:default_iterations(), [Secret, PassPhrase] ), {encrypted, Encrypted} = rabbit_control_pbe:evaluate_input_as_term(lists:flatten(Output)), - {ok, Result} = rabbit_control_pbe:encode( - false, false, - true, % decrypt + {ok, Result} = rabbit_control_pbe:decode( rabbit_pbe:default_cipher(), rabbit_pbe:default_hash(), rabbit_pbe:default_iterations(), [lists:flatten(io_lib:format("~p", [Encrypted])), PassPhrase] ), Secret = lists:flatten(Result), % decrypt with {encrypted, ...} form as input - {ok, Result} = rabbit_control_pbe:encode( - false, false, - true, % decrypt + {ok, Result} = rabbit_control_pbe:decode( rabbit_pbe:default_cipher(), rabbit_pbe:default_hash(), rabbit_pbe:default_iterations(), [lists:flatten(io_lib:format("~p", [{encrypted, Encrypted}])), PassPhrase] ), % wrong passphrase - {error, _} = rabbit_control_pbe:encode( - false, false, - true, % decrypt + {error, _} = rabbit_control_pbe:decode( rabbit_pbe:default_cipher(), rabbit_pbe:default_hash(), rabbit_pbe:default_iterations(), [lists:flatten(io_lib:format("~p", [Encrypted])), PassPhrase ++ " "] ), - {error, _} = rabbit_control_pbe:encode( - false, false, - true, % decrypt + {error, _} = rabbit_control_pbe:decode( rabbit_pbe:default_cipher(), rabbit_pbe:default_hash(), rabbit_pbe:default_iterations(), [lists:flatten(io_lib:format("~p", [{encrypted, Encrypted}])), PassPhrase ++ " "] ) |
