diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/gm.erl | 19 | ||||
| -rw-r--r-- | src/pg2_fixed.erl | 399 | ||||
| -rw-r--r-- | src/rabbit.app.src | 10 | ||||
| -rw-r--r-- | src/rabbit.erl | 106 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_control_main.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_control_pbe.erl | 79 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_mode_nodes.erl | 36 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 149 |
10 files changed, 357 insertions, 472 deletions
diff --git a/src/gm.erl b/src/gm.erl index 74e19ee6fd..41aa01f04d 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -1175,11 +1175,20 @@ record_new_member_in_group(NewMember, Left, GroupName, TxnFun) -> try Group = #gm_group { members = Members, version = Ver } = check_membership(Left, read_group(GroupName)), - {Prefix, [Left | Suffix]} = - lists:splitwith(fun (M) -> M =/= Left end, Members), - write_group(Group #gm_group { - members = Prefix ++ [Left, NewMember | Suffix], - version = Ver + 1 }) + case lists:member(NewMember, Members) of + true -> + %% This avois duplicates during partial partitions, + %% as inconsistent views might happen during them + rabbit_log:warning("(~p) GM avoiding duplicate of ~p", + [self(), NewMember]), + Group; + false -> + {Prefix, [Left | Suffix]} = + lists:splitwith(fun (M) -> M =/= Left end, Members), + write_group(Group #gm_group { + members = Prefix ++ [Left, NewMember | Suffix], + version = Ver + 1 }) + end catch lost_membership -> %% The transaction must not be abruptly crashed, but diff --git a/src/pg2_fixed.erl b/src/pg2_fixed.erl deleted file mode 100644 index 222a0bc849..0000000000 --- a/src/pg2_fixed.erl +++ /dev/null @@ -1,399 +0,0 @@ -%% This is the version of pg2 from R14B02, which contains the fix -%% described at -%% http://erlang.2086793.n4.nabble.com/pg2-still-busted-in-R13B04-td2230601.html. -%% The changes are a search-and-replace to rename the module and avoid -%% clashes with other versions of pg2, and also a simple rewrite of -%% "andalso" and "orelse" expressions to case statements where the second -%% operand is not a boolean since R12B does not allow this. - -%% -%% %CopyrightBegin% -%% -%% Copyright Ericsson AB 1997-2010. All Rights Reserved. -%% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. -%% -%% %CopyrightEnd% -%% --module(pg2_fixed). - --export([create/1, delete/1, join/2, leave/2]). --export([get_members/1, get_local_members/1]). --export([get_closest_pid/1, which_groups/0]). --export([start/0,start_link/0,init/1,handle_call/3,handle_cast/2,handle_info/2, - terminate/2]). - -%%% As of R13B03 monitors are used instead of links. - -%%% -%%% Exported functions -%%% - --spec start_link() -> {'ok', pid()} | {'error', term()}. - -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - --spec start() -> {'ok', pid()} | {'error', term()}. - -start() -> - ensure_started(). - --spec create(term()) -> 'ok'. - -create(Name) -> - _ = ensure_started(), - case ets:member(pg2_fixed_table, {group, Name}) of - false -> - global:trans({{?MODULE, Name}, self()}, - fun() -> - gen_server:multi_call(?MODULE, {create, Name}) - end), - ok; - true -> - ok - end. - --type name() :: term(). - --spec delete(name()) -> 'ok'. - -delete(Name) -> - _ = ensure_started(), - global:trans({{?MODULE, Name}, self()}, - fun() -> - gen_server:multi_call(?MODULE, {delete, Name}) - end), - ok. - --spec join(name(), pid()) -> 'ok' | {'error', {'no_such_group', term()}}. - -join(Name, Pid) when is_pid(Pid) -> - _ = ensure_started(), - case ets:member(pg2_fixed_table, {group, Name}) of - false -> - {error, {no_such_group, Name}}; - true -> - global:trans({{?MODULE, Name}, self()}, - fun() -> - gen_server:multi_call(?MODULE, - {join, Name, Pid}) - end), - ok - end. - --spec leave(name(), pid()) -> 'ok' | {'error', {'no_such_group', name()}}. - -leave(Name, Pid) when is_pid(Pid) -> - _ = ensure_started(), - case ets:member(pg2_fixed_table, {group, Name}) of - false -> - {error, {no_such_group, Name}}; - true -> - global:trans({{?MODULE, Name}, self()}, - fun() -> - gen_server:multi_call(?MODULE, - {leave, Name, Pid}) - end), - ok - end. - --type get_members_ret() :: [pid()] | {'error', {'no_such_group', name()}}. - --spec get_members(name()) -> get_members_ret(). - -get_members(Name) -> - _ = ensure_started(), - case ets:member(pg2_fixed_table, {group, Name}) of - true -> - group_members(Name); - false -> - {error, {no_such_group, Name}} - end. - --spec get_local_members(name()) -> get_members_ret(). - -get_local_members(Name) -> - _ = ensure_started(), - case ets:member(pg2_fixed_table, {group, Name}) of - true -> - local_group_members(Name); - false -> - {error, {no_such_group, Name}} - end. - --spec which_groups() -> [name()]. - -which_groups() -> - _ = ensure_started(), - all_groups(). - --type gcp_error_reason() :: {'no_process', term()} | {'no_such_group', term()}. - --spec get_closest_pid(term()) -> pid() | {'error', gcp_error_reason()}. - -get_closest_pid(Name) -> - case get_local_members(Name) of - [Pid] -> - Pid; - [] -> - case get_members(Name) of - [] -> {error, {no_process, Name}}; - Members -> - X = time_compat:erlang_system_time(micro_seconds), - lists:nth((X rem length(Members))+1, Members) - end; - Members when is_list(Members) -> - X = time_compat:erlang_system_time(micro_seconds), - lists:nth((X rem length(Members))+1, Members); - Else -> - Else - end. - -%%% -%%% Callback functions from gen_server -%%% - --record(state, {}). - --spec init([]) -> {'ok', #state{}}. - -init([]) -> - Ns = nodes(), - _ = net_kernel:monitor_nodes(true), - lists:foreach(fun(N) -> - {?MODULE, N} ! {new_pg2_fixed, node()}, - self() ! {nodeup, N} - end, Ns), - pg2_fixed_table = ets:new(pg2_fixed_table, [ordered_set, protected, named_table]), - {ok, #state{}}. - --type call() :: {'create', name()} - | {'delete', name()} - | {'join', name(), pid()} - | {'leave', name(), pid()}. - --spec handle_call(call(), _, #state{}) -> - {'reply', 'ok', #state{}}. - -handle_call({create, Name}, _From, S) -> - assure_group(Name), - {reply, ok, S}; -handle_call({join, Name, Pid}, _From, S) -> - case ets:member(pg2_fixed_table, {group, Name}) of - true -> _ = join_group(Name, Pid), - ok; - _ -> ok - end, - {reply, ok, S}; -handle_call({leave, Name, Pid}, _From, S) -> - case ets:member(pg2_fixed_table, {group, Name}) of - true -> leave_group(Name, Pid); - _ -> ok - end, - {reply, ok, S}; -handle_call({delete, Name}, _From, S) -> - delete_group(Name), - {reply, ok, S}; -handle_call(Request, From, S) -> - error_logger:warning_msg("The pg2_fixed server received an unexpected message:\n" - "handle_call(~p, ~p, _)\n", - [Request, From]), - {noreply, S}. - --type all_members() :: [[name(),...]]. --type cast() :: {'exchange', node(), all_members()} - | {'del_member', name(), pid()}. - --spec handle_cast(cast(), #state{}) -> {'noreply', #state{}}. - -handle_cast({exchange, _Node, List}, S) -> - store(List), - {noreply, S}; -handle_cast(_, S) -> - %% Ignore {del_member, Name, Pid}. - {noreply, S}. - --spec handle_info(tuple(), #state{}) -> {'noreply', #state{}}. - -handle_info({'DOWN', MonitorRef, process, _Pid, _Info}, S) -> - member_died(MonitorRef), - {noreply, S}; -handle_info({nodeup, Node}, S) -> - gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}), - {noreply, S}; -handle_info({new_pg2_fixed, Node}, S) -> - gen_server:cast({?MODULE, Node}, {exchange, node(), all_members()}), - {noreply, S}; -handle_info(_, S) -> - {noreply, S}. - --spec terminate(term(), #state{}) -> 'ok'. - -terminate(_Reason, _S) -> - true = ets:delete(pg2_fixed_table), - ok. - -%%% -%%% Local functions -%%% - -%%% One ETS table, pg2_fixed_table, is used for bookkeeping. The type of the -%%% table is ordered_set, and the fast matching of partially -%%% instantiated keys is used extensively. -%%% -%%% {{group, Name}} -%%% Process group Name. -%%% {{ref, Pid}, RPid, MonitorRef, Counter} -%%% {{ref, MonitorRef}, Pid} -%%% Each process has one monitor. Sometimes a process is spawned to -%%% monitor the pid (RPid). Counter is incremented when the Pid joins -%%% some group. -%%% {{member, Name, Pid}, GroupCounter} -%%% {{local_member, Name, Pid}} -%%% Pid is a member of group Name, GroupCounter is incremented when the -%%% Pid joins the group Name. -%%% {{pid, Pid, Name}} -%%% Pid is a member of group Name. - -store(List) -> - _ = [assure_group(Name) - andalso - [join_group(Name, P) || P <- Members -- group_members(Name)] || - [Name, Members] <- List], - ok. - -assure_group(Name) -> - Key = {group, Name}, - ets:member(pg2_fixed_table, Key) orelse true =:= ets:insert(pg2_fixed_table, {Key}). - -delete_group(Name) -> - _ = [leave_group(Name, Pid) || Pid <- group_members(Name)], - true = ets:delete(pg2_fixed_table, {group, Name}), - ok. - -member_died(Ref) -> - [{{ref, Ref}, Pid}] = ets:lookup(pg2_fixed_table, {ref, Ref}), - Names = member_groups(Pid), - _ = [leave_group(Name, P) || - Name <- Names, - P <- member_in_group(Pid, Name)], - %% Kept for backward compatibility with links. Can be removed, eventually. - _ = [gen_server:abcast(nodes(), ?MODULE, {del_member, Name, Pid}) || - Name <- Names], - ok. - -join_group(Name, Pid) -> - Ref_Pid = {ref, Pid}, - try _ = ets:update_counter(pg2_fixed_table, Ref_Pid, {4, +1}) - catch _:_ -> - {RPid, Ref} = do_monitor(Pid), - true = ets:insert(pg2_fixed_table, {Ref_Pid, RPid, Ref, 1}), - true = ets:insert(pg2_fixed_table, {{ref, Ref}, Pid}) - end, - Member_Name_Pid = {member, Name, Pid}, - try _ = ets:update_counter(pg2_fixed_table, Member_Name_Pid, {2, +1, 1, 1}) - catch _:_ -> - true = ets:insert(pg2_fixed_table, {Member_Name_Pid, 1}), - _ = [ets:insert(pg2_fixed_table, {{local_member, Name, Pid}}) || - node(Pid) =:= node()], - true = ets:insert(pg2_fixed_table, {{pid, Pid, Name}}) - end. - -leave_group(Name, Pid) -> - Member_Name_Pid = {member, Name, Pid}, - try ets:update_counter(pg2_fixed_table, Member_Name_Pid, {2, -1, 0, 0}) of - N -> - if - N =:= 0 -> - true = ets:delete(pg2_fixed_table, {pid, Pid, Name}), - _ = [ets:delete(pg2_fixed_table, {local_member, Name, Pid}) || - node(Pid) =:= node()], - true = ets:delete(pg2_fixed_table, Member_Name_Pid); - true -> - ok - end, - Ref_Pid = {ref, Pid}, - case ets:update_counter(pg2_fixed_table, Ref_Pid, {4, -1}) of - 0 -> - [{Ref_Pid,RPid,Ref,0}] = ets:lookup(pg2_fixed_table, Ref_Pid), - true = ets:delete(pg2_fixed_table, {ref, Ref}), - true = ets:delete(pg2_fixed_table, Ref_Pid), - true = erlang:demonitor(Ref, [flush]), - kill_monitor_proc(RPid, Pid); - _ -> - ok - end - catch _:_ -> - ok - end. - -all_members() -> - [[G, group_members(G)] || G <- all_groups()]. - -group_members(Name) -> - [P || - [P, N] <- ets:match(pg2_fixed_table, {{member, Name, '$1'},'$2'}), - _ <- lists:seq(1, N)]. - -local_group_members(Name) -> - [P || - [Pid] <- ets:match(pg2_fixed_table, {{local_member, Name, '$1'}}), - P <- member_in_group(Pid, Name)]. - -member_in_group(Pid, Name) -> - case ets:lookup(pg2_fixed_table, {member, Name, Pid}) of - [] -> []; - [{{member, Name, Pid}, N}] -> - lists:duplicate(N, Pid) - end. - -member_groups(Pid) -> - [Name || [Name] <- ets:match(pg2_fixed_table, {{pid, Pid, '$1'}})]. - -all_groups() -> - [N || [N] <- ets:match(pg2_fixed_table, {{group,'$1'}})]. - -ensure_started() -> - case whereis(?MODULE) of - undefined -> - C = {pg2_fixed, {?MODULE, start_link, []}, permanent, - 1000, worker, [?MODULE]}, - supervisor:start_child(kernel_safe_sup, C); - Pg2_FixedPid -> - {ok, Pg2_FixedPid} - end. - - -kill_monitor_proc(RPid, Pid) -> - case RPid of - Pid -> ok; - _ -> exit(RPid, kill) - end. - -%% When/if erlang:monitor() returns before trying to connect to the -%% other node this function can be removed. -do_monitor(Pid) -> - case (node(Pid) =:= node()) orelse lists:member(node(Pid), nodes()) of - true -> - %% Assume the node is still up - {Pid, erlang:monitor(process, Pid)}; - false -> - F = fun() -> - Ref = erlang:monitor(process, Pid), - receive - {'DOWN', Ref, process, Pid, _Info} -> - exit(normal) - end - end, - erlang:spawn_monitor(F) - end. diff --git a/src/rabbit.app.src b/src/rabbit.app.src index 872336bd8e..dd38ad4072 100644 --- a/src/rabbit.app.src +++ b/src/rabbit.app.src @@ -99,5 +99,13 @@ {credit_flow_default_credit, {200, 100}}, %% see rabbitmq-server#248 %% and rabbitmq-server#667 - {channel_operation_timeout, 15000} + {channel_operation_timeout, 15000}, + {config_entry_decoder, [ + {cipher, aes_cbc256}, + {hash, sha512}, + {iterations, 1000}, + {passphrase, undefined} + ]}, + %% rabbitmq-server-973 + {lazy_queue_explicit_gc_run_operation_threshold, 250} ]}]}. diff --git a/src/rabbit.erl b/src/rabbit.erl index c9a008c446..f0d66f4110 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -24,7 +24,7 @@ start_fhc/0]). -export([start/2, stop/1, prep_stop/1]). -export([start_apps/1, stop_apps/1]). --export([log_location/1, config_files/0]). %% for testing and mgmt-agent +-export([log_location/1, config_files/0, decrypt_config/2]). %% for testing and mgmt-agent %%--------------------------------------------------------------------------- %% Boot steps. @@ -449,6 +449,38 @@ stop_and_halt() -> start_apps(Apps) -> app_utils:load_applications(Apps), + + ConfigEntryDecoder = case application:get_env(rabbit, config_entry_decoder) of + undefined -> + []; + {ok, Val} -> + Val + end, + PassPhrase = case proplists:get_value(passphrase, ConfigEntryDecoder) of + prompt -> + IoDevice = get_input_iodevice(), + io:setopts(IoDevice, [{echo, false}]), + PP = lists:droplast(io:get_line(IoDevice, + "\nPlease enter the passphrase to unlock encrypted " + "configuration entries.\n\nPassphrase: ")), + io:setopts(IoDevice, [{echo, true}]), + io:format(IoDevice, "~n", []), + PP; + {file, Filename} -> + {ok, File} = file:read_file(Filename), + [PP|_] = binary:split(File, [<<"\r\n">>, <<"\n">>]), + PP; + PP -> + PP + end, + Algo = { + proplists:get_value(cipher, ConfigEntryDecoder, rabbit_pbe:default_cipher()), + proplists:get_value(hash, ConfigEntryDecoder, rabbit_pbe:default_hash()), + proplists:get_value(iterations, ConfigEntryDecoder, rabbit_pbe:default_iterations()), + PassPhrase + }, + decrypt_config(Apps, Algo), + OrderedApps = app_utils:app_dependency_order(Apps, false), case lists:member(rabbit, Apps) of false -> rabbit_boot_steps:run_boot_steps(Apps); %% plugin activation @@ -457,6 +489,78 @@ start_apps(Apps) -> ok = app_utils:start_applications(OrderedApps, handle_app_error(could_not_start)). +%% This function retrieves the correct IoDevice for requesting +%% input. The problem with using the default IoDevice is that +%% the Erlang shell prevents us from getting the input. +%% +%% Instead we therefore look for the io process used by the +%% shell and if it can't be found (because the shell is not +%% started e.g with -noshell) we use the 'user' process. +%% +%% This function will not work when either -oldshell or -noinput +%% options are passed to erl. +get_input_iodevice() -> + case whereis(user) of + undefined -> user; + User -> + case group:interfaces(User) of + [] -> + user; + [{user_drv, Drv}] -> + case user_drv:interfaces(Drv) of + [] -> + user; + [{current_group, IoDevice}] -> + IoDevice + end + end + end. + +decrypt_config([], _) -> + ok; +decrypt_config([App|Apps], Algo) -> + decrypt_app(App, application:get_all_env(App), Algo), + decrypt_config(Apps, Algo). + +decrypt_app(_, [], _) -> + ok; +decrypt_app(App, [{Key, Value}|Tail], Algo) -> + try begin + case decrypt(Value, Algo) of + Value -> + ok; + NewValue -> + application:set_env(App, Key, NewValue) + end + end + catch + exit:{bad_configuration, config_entry_decoder} -> + exit({bad_configuration, config_entry_decoder}); + _:Msg -> + rabbit_log:info("Error while decrypting key '~p'. Please check encrypted value, passphrase, and encryption configuration~n", [Key]), + exit({decryption_error, {key, Key}, Msg}) + end, + decrypt_app(App, Tail, Algo). + +decrypt({encrypted, _}, {_, _, _, undefined}) -> + exit({bad_configuration, config_entry_decoder}); +decrypt({encrypted, EncValue}, {Cipher, Hash, Iterations, Password}) -> + rabbit_pbe:decrypt_term(Cipher, Hash, Iterations, Password, EncValue); +decrypt(List, Algo) when is_list(List) -> + decrypt_list(List, Algo, []); +decrypt(Value, _) -> + Value. + +%% We make no distinction between strings and other lists. +%% When we receive a string, we loop through each element +%% and ultimately return the string unmodified, as intended. +decrypt_list([], _, Acc) -> + lists:reverse(Acc); +decrypt_list([{Key, Value}|Tail], Algo, Acc) when Key =/= encrypted -> + decrypt_list(Tail, Algo, [{Key, decrypt(Value, Algo)}|Acc]); +decrypt_list([Value|Tail], Algo, Acc) -> + decrypt_list(Tail, Algo, [decrypt(Value, Algo)|Acc]). + stop_apps(Apps) -> ok = app_utils:stop_applications( Apps, handle_app_error(error_during_shutdown)), diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 4f8581f78a..b7df5a9a19 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1310,7 +1310,13 @@ handle_cast(policy_changed, State = #q{q = #amqqueue{name = Name}}) -> %% This also has the side effect of waking us up so we emit a %% stats event - so event consumers see the changed policy. {ok, Q} = rabbit_amqqueue:lookup(Name), - noreply(process_args_policy(State#q{q = Q})). + noreply(process_args_policy(State#q{q = Q})); + +handle_cast({sync_start, _, _}, State = #q{q = #amqqueue{name = Name}}) -> + %% Only a slave should receive this, it means we are a duplicated master + rabbit_mirror_queue_misc:log_warning( + Name, "Stopping after receiving sync_start from another master", []), + stop(State). handle_info({maybe_expire, Vsn}, State = #q{args_policy_version = Vsn}) -> case is_unused(State) of diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 92898c2a2c..8c245892b7 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -92,7 +92,8 @@ {trace_off, [?VHOST_DEF]}, set_vm_memory_high_watermark, set_disk_free_limit, - help + help, + {encode, [?DECODE_DEF, ?CIPHER_DEF, ?HASH_DEF, ?ITERATIONS_DEF, ?LIST_CIPHERS_DEF, ?LIST_HASHES_DEF]} ]). -define(GLOBAL_QUERIES, @@ -114,7 +115,7 @@ [stop, stop_app, start_app, wait, reset, force_reset, rotate_logs, join_cluster, change_cluster_node_type, update_cluster_nodes, forget_cluster_node, rename_cluster_node, cluster_status, status, - environment, eval, force_boot, help, hipe_compile]). + environment, eval, force_boot, help, hipe_compile, encode]). %% [Command | {Command, DefaultTimeoutInMilliSeconds}] -define(COMMANDS_WITH_TIMEOUT, @@ -579,6 +580,17 @@ action(eval, Node, [Expr], _Opts, _Inform) -> action(help, _Node, _Args, _Opts, _Inform) -> io:format("~s", [rabbit_ctl_usage:usage()]); +action(encode, _Node, Args, Opts, _Inform) -> + ListCiphers = lists:member({?LIST_CIPHERS_OPT, true}, Opts), + ListHashes = lists:member({?LIST_HASHES_OPT, true}, Opts), + Decode = lists:member({?DECODE_OPT, true}, Opts), + Cipher = list_to_atom(proplists:get_value(?CIPHER_OPT, Opts)), + Hash = list_to_atom(proplists:get_value(?HASH_OPT, Opts)), + Iterations = list_to_integer(proplists:get_value(?ITERATIONS_OPT, Opts)), + + {_, Msg} = rabbit_control_pbe:encode(ListCiphers, ListHashes, Decode, Cipher, Hash, Iterations, Args), + io:format(Msg ++ "~n"); + action(Command, Node, Args, Opts, Inform) -> %% For backward compatibility, run commands accepting a timeout with %% the default timeout. diff --git a/src/rabbit_control_pbe.erl b/src/rabbit_control_pbe.erl new file mode 100644 index 0000000000..2fa2c90a6e --- /dev/null +++ b/src/rabbit_control_pbe.erl @@ -0,0 +1,79 @@ +% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_control_pbe). + +-export([encode/7]). + +% for testing purposes +-export([evaluate_input_as_term/1]). + +encode(ListCiphers, _ListHashes, _Decode, _Cipher, _Hash, _Iterations, _Args) when ListCiphers -> + {ok, io_lib:format("~p", [rabbit_pbe:supported_ciphers()])}; + +encode(_ListCiphers, ListHashes, _Decode, _Cipher, _Hash, _Iterations, _Args) when ListHashes -> + {ok, io_lib:format("~p", [rabbit_pbe:supported_hashes()])}; + +encode(_ListCiphers, _ListHashes, Decode, Cipher, Hash, Iterations, Args) -> + CipherExists = lists:member(Cipher, rabbit_pbe:supported_ciphers()), + HashExists = lists:member(Hash, rabbit_pbe:supported_hashes()), + encode_encrypt_decrypt(CipherExists, HashExists, Decode, Cipher, Hash, Iterations, Args). + +encode_encrypt_decrypt(CipherExists, _HashExists, _Decode, _Cipher, _Hash, _Iterations, _Args) when CipherExists =:= false -> + {error, io_lib:format("The requested cipher is not supported", [])}; + +encode_encrypt_decrypt(_CipherExists, HashExists, _Decode, _Cipher, _Hash, _Iterations, _Args) when HashExists =:= false -> + {error, io_lib:format("The requested hash is not supported", [])}; + +encode_encrypt_decrypt(_CipherExists, _HashExists, _Decode, _Cipher, _Hash, Iterations, _Args) when Iterations =< 0 -> + {error, io_lib:format("The requested number of iterations is incorrect", [])}; + +encode_encrypt_decrypt(_CipherExists, _HashExists, Decode, Cipher, Hash, Iterations, Args) when length(Args) == 2, Decode =:= false -> + [Value, PassPhrase] = Args, + try begin + TermValue = evaluate_input_as_term(Value), + Result = rabbit_pbe:encrypt_term(Cipher, Hash, Iterations, list_to_binary(PassPhrase), TermValue), + {ok, io_lib:format("~p", [{encrypted, Result}])} + end + catch + _:Msg -> {error, io_lib:format("Error during cipher operation: ~p", [Msg])} + end; + +encode_encrypt_decrypt(_CipherExists, _HashExists, Decode, Cipher, Hash, Iterations, Args) when length(Args) == 2, Decode -> + [Value, PassPhrase] = Args, + try begin + TermValue = evaluate_input_as_term(Value), + TermToDecrypt = case TermValue of + {encrypted, EncryptedTerm} -> + EncryptedTerm; + _ -> + TermValue + end, + Result = rabbit_pbe:decrypt_term(Cipher, Hash, Iterations, list_to_binary(PassPhrase), TermToDecrypt), + {ok, io_lib:format("~p", [Result])} + end + catch + _:Msg -> {error, io_lib:format("Error during cipher operation: ~p", [Msg])} + end; + +encode_encrypt_decrypt(_CipherExists, _HashExists, _Decode, _Cipher, _Hash, _Iterations, _Args) -> + {error, io_lib:format("Please provide a value to encode/decode and a passphrase", [])}. + +evaluate_input_as_term(Input) -> + {ok,Tokens,_EndLine} = erl_scan:string(Input ++ "."), + {ok,AbsForm} = erl_parse:parse_exprs(Tokens), + {value,TermValue,_Bs} = erl_eval:exprs(AbsForm, erl_eval:new_bindings()), + TermValue. diff --git a/src/rabbit_mirror_queue_mode_nodes.erl b/src/rabbit_mirror_queue_mode_nodes.erl index e63f340373..31c55722a5 100644 --- a/src/rabbit_mirror_queue_mode_nodes.erl +++ b/src/rabbit_mirror_queue_mode_nodes.erl @@ -32,29 +32,37 @@ description() -> [{description, <<"Mirror queue to specified nodes">>}]. -suggested_queue_nodes(Nodes0, MNode, _SNodes, SSNodes, Poss) -> - Nodes1 = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0], +suggested_queue_nodes(PolicyNodes0, CurrentMaster, _SNodes, SSNodes, NodesRunningRabbitMQ) -> + PolicyNodes1 = [list_to_atom(binary_to_list(Node)) || Node <- PolicyNodes0], %% If the current master is not in the nodes specified, then what we want %% to do depends on whether there are any synchronised slaves. If there %% are then we can just kill the current master - the admin has asked for %% a migration and we should give it to them. If there are not however %% then we must keep the master around so as not to lose messages. - Nodes = case SSNodes of - [] -> lists:usort([MNode | Nodes1]); - _ -> Nodes1 - end, - Unavailable = Nodes -- Poss, - Available = Nodes -- Unavailable, - case Available of + + PolicyNodes = case SSNodes of + [] -> lists:usort([CurrentMaster | PolicyNodes1]); + _ -> PolicyNodes1 + end, + Unavailable = PolicyNodes -- NodesRunningRabbitMQ, + AvailablePolicyNodes = PolicyNodes -- Unavailable, + case AvailablePolicyNodes of [] -> %% We have never heard of anything? Not much we can do but %% keep the master alive. - {MNode, []}; - _ -> case lists:member(MNode, Available) of - true -> {MNode, Available -- [MNode]}; + {CurrentMaster, []}; + _ -> case lists:member(CurrentMaster, AvailablePolicyNodes) of + true -> {CurrentMaster, + AvailablePolicyNodes -- [CurrentMaster]}; false -> %% Make sure the new master is synced! In order to %% get here SSNodes must not be empty. - [NewMNode | _] = SSNodes, - {NewMNode, Available -- [NewMNode]} + SyncPolicyNodes = [Node || + Node <- AvailablePolicyNodes, + lists:member(Node, SSNodes)], + NewMaster = case SyncPolicyNodes of + [Node | _] -> Node; + [] -> erlang:hd(SSNodes) + end, + {NewMaster, AvailablePolicyNodes -- [NewMaster]} end end. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 4770018f9e..6017e5a028 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -314,7 +314,12 @@ handle_cast({set_ram_duration_target, Duration}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> BQS1 = BQ:set_ram_duration_target(Duration, BQS), - noreply(State #state { backing_queue_state = BQS1 }). + noreply(State #state { backing_queue_state = BQS1 }); + +handle_cast(policy_changed, State) -> + %% During partial partitions, we might end up receiving messages expected by a master + %% Ignore them + noreply(State). handle_info(update_ram_duration, State = #state{backing_queue = BQ, backing_queue_state = BQS}) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 297df086ad..dd92256146 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -306,7 +306,11 @@ io_batch_size, %% default queue or lazy queue - mode + mode, + %% number of reduce_memory_usage executions, once it + %% reaches a threshold the queue will manually trigger a runtime GC + %% see: maybe_execute_gc/1 + memory_reduction_run_count }). -record(rates, { in, out, ack_in, ack_out, timestamp }). @@ -402,7 +406,8 @@ disk_write_count :: non_neg_integer(), io_batch_size :: pos_integer(), - mode :: 'default' | 'lazy' }. + mode :: 'default' | 'lazy', + memory_reduction_run_count :: non_neg_integer()}. %% Duplicated from rabbit_backing_queue -spec ack([ack()], state()) -> {[rabbit_guid:guid()], state()}. @@ -427,6 +432,21 @@ %% rabbit_amqqueue_process need fairly fresh rates. -define(MSGS_PER_RATE_CALC, 100). + +%% we define the garbage collector threshold +%% it needs to tune the GC calls inside `reduce_memory_use` +%% see: rabbitmq-server-973 and `maybe_execute_gc` function +-define(DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD, 250). +-define(EXPLICIT_GC_RUN_OP_THRESHOLD, + case get(explicit_gc_run_operation_threshold) of + undefined -> + Val = rabbit_misc:get_env(rabbit, lazy_queue_explicit_gc_run_operation_threshold, + ?DEFAULT_EXPLICIT_GC_RUN_OP_THRESHOLD), + put(explicit_gc_run_operation_threshold, Val), + Val; + Val -> Val + end). + %%---------------------------------------------------------------------------- %% Public API %%---------------------------------------------------------------------------- @@ -633,25 +653,28 @@ ack([], State) -> %% optimisation: this head is essentially a partial evaluation of the %% general case below, for the single-ack case. ack([SeqId], State) -> - {#msg_status { msg_id = MsgId, - is_persistent = IsPersistent, - msg_in_store = MsgInStore, - index_on_disk = IndexOnDisk }, - State1 = #vqstate { index_state = IndexState, - msg_store_clients = MSCState, - ack_out_counter = AckOutCount }} = - remove_pending_ack(true, SeqId, State), - IndexState1 = case IndexOnDisk of - true -> rabbit_queue_index:ack([SeqId], IndexState); - false -> IndexState - end, - case MsgInStore of - true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]); - false -> ok - end, - {[MsgId], - a(State1 #vqstate { index_state = IndexState1, - ack_out_counter = AckOutCount + 1 })}; + case remove_pending_ack(true, SeqId, State) of + {none, _} -> + State; + {#msg_status { msg_id = MsgId, + is_persistent = IsPersistent, + msg_in_store = MsgInStore, + index_on_disk = IndexOnDisk }, + State1 = #vqstate { index_state = IndexState, + msg_store_clients = MSCState, + ack_out_counter = AckOutCount }} -> + IndexState1 = case IndexOnDisk of + true -> rabbit_queue_index:ack([SeqId], IndexState); + false -> IndexState + end, + case MsgInStore of + true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]); + false -> ok + end, + {[MsgId], + a(State1 #vqstate { index_state = IndexState1, + ack_out_counter = AckOutCount + 1 })} + end; ack(AckTags, State) -> {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds}, State1 = #vqstate { index_state = IndexState, @@ -659,8 +682,12 @@ ack(AckTags, State) -> ack_out_counter = AckOutCount }} = lists:foldl( fun (SeqId, {Acc, State2}) -> - {MsgStatus, State3} = remove_pending_ack(true, SeqId, State2), - {accumulate_ack(MsgStatus, Acc), State3} + case remove_pending_ack(true, SeqId, State2) of + {none, _} -> + {Acc, State2}; + {MsgStatus, State3} -> + {accumulate_ack(MsgStatus, Acc), State3} + end end, {accumulate_ack_init(), State}, AckTags), IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), remove_msgs_by_id(MsgIdsByStore, MSCState), @@ -1330,7 +1357,8 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, io_batch_size = IoBatchSize, - mode = default }, + mode = default, + memory_reduction_run_count = 0}, a(maybe_deltas_to_betas(State)). blank_rates(Now) -> @@ -1977,8 +2005,12 @@ lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA, %% First parameter = UpdateStats remove_pending_ack(true, SeqId, State) -> - {MsgStatus, State1} = remove_pending_ack(false, SeqId, State), - {MsgStatus, stats({0, -1}, {MsgStatus, none}, State1)}; + case remove_pending_ack(false, SeqId, State) of + {none, _} -> + {none, State}; + {MsgStatus, State1} -> + {MsgStatus, stats({0, -1}, {MsgStatus, none}, State1)} + end; remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA, disk_pending_ack = DPA, qi_pending_ack = QPA}) -> @@ -1990,9 +2022,13 @@ remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA, DPA1 = gb_trees:delete(SeqId, DPA), {V, State#vqstate{disk_pending_ack = DPA1}}; none -> - QPA1 = gb_trees:delete(SeqId, QPA), - {gb_trees:get(SeqId, QPA), - State#vqstate{qi_pending_ack = QPA1}} + case gb_trees:lookup(SeqId, QPA) of + {value, V} -> + QPA1 = gb_trees:delete(SeqId, QPA), + {V, State#vqstate{qi_pending_ack = QPA1}}; + none -> + {none, State} + end end end. @@ -2143,11 +2179,15 @@ queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, Limit, PubFun, State); {_, _Q1} -> %% enqueue from the remaining list of sequence ids - {MsgStatus, State1} = msg_from_pending_ack(SeqId, State), - {#msg_status { msg_id = MsgId } = MsgStatus1, State2} = - PubFun(MsgStatus, State1), - queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds], - Limit, PubFun, State2) + case msg_from_pending_ack(SeqId, State) of + {none, _} -> + queue_merge(Rest, Q, Front, MsgIds, Limit, PubFun, State); + {MsgStatus, State1} -> + {#msg_status { msg_id = MsgId } = MsgStatus1, State2} = + PubFun(MsgStatus, State1), + queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds], + Limit, PubFun, State2) + end end; queue_merge(SeqIds, Q, Front, MsgIds, _Limit, _PubFun, State) -> @@ -2156,22 +2196,28 @@ queue_merge(SeqIds, Q, Front, MsgIds, delta_merge([], Delta, MsgIds, State) -> {Delta, MsgIds, State}; delta_merge(SeqIds, Delta, MsgIds, State) -> - lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) -> - {#msg_status { msg_id = MsgId } = MsgStatus, State1} = - msg_from_pending_ack(SeqId, State0), - {_MsgStatus, State2} = - maybe_prepare_write_to_disk(true, true, MsgStatus, State1), - {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], - stats({1, -1}, {MsgStatus, none}, State2)} + lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0} = Acc) -> + case msg_from_pending_ack(SeqId, State0) of + {none, _} -> + Acc; + {#msg_status { msg_id = MsgId } = MsgStatus, State1} -> + {_MsgStatus, State2} = + maybe_prepare_write_to_disk(true, true, MsgStatus, State1), + {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], + stats({1, -1}, {MsgStatus, none}, State2)} + end end, {Delta, MsgIds, State}, SeqIds). %% Mostly opposite of record_pending_ack/2 msg_from_pending_ack(SeqId, State) -> - {#msg_status { msg_props = MsgProps } = MsgStatus, State1} = - remove_pending_ack(false, SeqId, State), - {MsgStatus #msg_status { - msg_props = MsgProps #message_properties { needs_confirming = false } }, - State1}. + case remove_pending_ack(false, SeqId, State) of + {none, _} -> + {none, State}; + {#msg_status { msg_props = MsgProps } = MsgStatus, State1} -> + {MsgStatus #msg_status { + msg_props = MsgProps #message_properties { needs_confirming = false } }, + State1} + end. beta_limit(Q) -> case ?QUEUE:peek(Q) of @@ -2264,6 +2310,14 @@ ifold(Fun, Acc, Its, State) -> %% Phase changes %%---------------------------------------------------------------------------- +maybe_execute_gc(State = #vqstate {memory_reduction_run_count = MRedRunCount}) -> + case MRedRunCount >= ?EXPLICIT_GC_RUN_OP_THRESHOLD of + true -> garbage_collect(), + State#vqstate{memory_reduction_run_count = 0}; + false -> State#vqstate{memory_reduction_run_count = MRedRunCount + 1} + + end. + reduce_memory_use(State = #vqstate { target_ram_count = infinity }) -> State; reduce_memory_use(State = #vqstate { @@ -2336,8 +2390,7 @@ reduce_memory_use(State = #vqstate { S2 -> push_betas_to_deltas(S2, State1) end, - garbage_collect(), - State3. + maybe_execute_gc(State3). limit_ram_acks(0, State) -> {0, ui(State)}; |
