diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 92 | ||||
| -rw-r--r-- | src/rabbit_control_main.erl | 167 | ||||
| -rw-r--r-- | src/rabbit_file.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_hipe.erl | 89 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_policy.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_priority_queue.erl | 68 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 19 | ||||
| -rw-r--r-- | src/truncate.erl | 75 |
10 files changed, 325 insertions, 232 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 3ee14e4f7d..2db86391a5 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -54,6 +54,7 @@ max_length, max_bytes, args_policy_version, + mirroring_policy_version = 0, status }). @@ -702,7 +703,7 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers, exclusive_consumer = Holder1}, notify_decorators(State2), case should_auto_delete(State2) of - true -> + true -> log_auto_delete( io_lib:format( "because all of its consumers (~p) were on a channel that was closed", @@ -1071,11 +1072,11 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, notify_decorators(State1), case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); - true -> + true -> log_auto_delete( io_lib:format( "because its last consumer with tag '~s' was cancelled", - [ConsumerTag]), + [ConsumerTag]), State), stop(ok, State1) end @@ -1207,22 +1208,15 @@ handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State); -handle_cast(start_mirroring, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - %% lookup again to get policy for init_with_existing_bq - {ok, Q} = rabbit_amqqueue:lookup(qname(State)), - true = BQ =/= rabbit_mirror_queue_master, %% assertion - BQ1 = rabbit_mirror_queue_master, - BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS), - noreply(State#q{backing_queue = BQ1, - backing_queue_state = BQS1}); - -handle_cast(stop_mirroring, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - BQ = rabbit_mirror_queue_master, %% assertion - {BQ1, BQS1} = BQ:stop_mirroring(BQS), - noreply(State#q{backing_queue = BQ1, - backing_queue_state = BQS1}); +handle_cast(update_mirroring, State = #q{q = Q, + mirroring_policy_version = Version}) -> + case needs_update_mirroring(Q, Version) of + false -> + noreply(State); + {Policy, NewVersion} -> + State1 = State#q{mirroring_policy_version = NewVersion}, + noreply(update_mirroring(Policy, State1)) + end; handle_cast({credit, ChPid, CTag, Credit, Drain}, State = #q{consumers = Consumers, @@ -1371,19 +1365,67 @@ log_delete_exclusive({ConPid, _ConRef}, State) -> log_delete_exclusive(ConPid, State); log_delete_exclusive(ConPid, #q{ q = #amqqueue{ name = Resource } }) -> #resource{ name = QName, virtual_host = VHost } = Resource, - rabbit_queue:debug("Deleting exclusive queue '~s' in vhost '~s' " ++ - " because its declaring connection ~p was closed", - [QName, VHost, ConPid]). + rabbit_log_queue:debug("Deleting exclusive queue '~s' in vhost '~s' " ++ + "because its declaring connection ~p was closed", + [QName, VHost, ConPid]). log_auto_delete(Reason, #q{ q = #amqqueue{ name = Resource } }) -> #resource{ name = QName, virtual_host = VHost } = Resource, - rabbit_queue:debug("Deleting auto-delete queue '~s' in vhost '~s' " ++ - Reason, - [QName, VHost]). + rabbit_log_queue:debug("Deleting auto-delete queue '~s' in vhost '~s' " ++ + Reason, + [QName, VHost]). + +needs_update_mirroring(Q, Version) -> + {ok, UpQ} = rabbit_amqqueue:lookup(Q#amqqueue.name), + DBVersion = UpQ#amqqueue.policy_version, + case DBVersion > Version of + true -> {rabbit_policy:get(<<"ha-mode">>, UpQ), DBVersion}; + false -> false + end. +update_mirroring(Policy, State = #q{backing_queue = BQ}) -> + case update_to(Policy, BQ) of + start_mirroring -> + start_mirroring(State); + stop_mirroring -> + stop_mirroring(State); + ignore -> + State; + update_ha_mode -> + update_ha_mode(State) + end. +update_to(undefined, rabbit_mirror_queue_master) -> + stop_mirroring; +update_to(_, rabbit_mirror_queue_master) -> + update_ha_mode; +update_to(undefined, BQ) when BQ =/= rabbit_mirror_queue_master -> + ignore; +update_to(_, BQ) when BQ =/= rabbit_mirror_queue_master -> + start_mirroring. + +start_mirroring(State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + %% lookup again to get policy for init_with_existing_bq + {ok, Q} = rabbit_amqqueue:lookup(qname(State)), + true = BQ =/= rabbit_mirror_queue_master, %% assertion + BQ1 = rabbit_mirror_queue_master, + BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS), + State#q{backing_queue = BQ1, + backing_queue_state = BQS1}. +stop_mirroring(State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + BQ = rabbit_mirror_queue_master, %% assertion + {BQ1, BQS1} = BQ:stop_mirroring(BQS), + State#q{backing_queue = BQ1, + backing_queue_state = BQS1}. + +update_ha_mode(State) -> + {ok, Q} = rabbit_amqqueue:lookup(qname(State)), + ok = rabbit_mirror_queue_misc:update_mirrors(Q), + State. diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 7f653c3780..bf50fd6c49 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -23,7 +23,7 @@ sync_queue/1, cancel_sync_queue/1, become/1, purge_queue/1]). --import(rabbit_misc, [rpc_call/4, rpc_call/5, rpc_call/7]). +-import(rabbit_misc, [rpc_call/4, rpc_call/5]). -define(EXTERNAL_CHECK_INTERVAL, 1000). @@ -37,6 +37,7 @@ reset, force_reset, rotate_logs, + hipe_compile, {join_cluster, [?RAM_DEF]}, change_cluster_node_type, @@ -113,7 +114,7 @@ [stop, stop_app, start_app, wait, reset, force_reset, rotate_logs, join_cluster, change_cluster_node_type, update_cluster_nodes, forget_cluster_node, rename_cluster_node, cluster_status, status, - environment, eval, force_boot, help, node_health_check]). + environment, eval, force_boot, help, node_health_check, hipe_compile]). -define(COMMANDS_WITH_TIMEOUT, [list_user_permissions, list_policies, list_queues, list_exchanges, @@ -380,6 +381,16 @@ action(rotate_logs, Node, [], _Opts, Inform) -> Inform("Rotating logs for node ~p", [Node]), call(Node, {rabbit, rotate_logs, []}); +action(hipe_compile, _Node, [TargetDir], _Opts, _Inform) -> + ok = application:load(rabbit), + case rabbit_hipe:can_hipe_compile() of + true -> + {ok, _, _} = rabbit_hipe:compile_to_directory(TargetDir), + ok; + false -> + {error, "HiPE compilation is not supported"} + end; + action(close_connection, Node, [PidStr, Explanation], _Opts, Inform) -> Inform("Closing connection \"~s\"", [PidStr]), rpc_call(Node, rabbit_networking, close_connection, @@ -579,56 +590,74 @@ action(purge_queue, Node, [Q], Opts, Inform, Timeout) -> action(list_users, Node, [], _Opts, Inform, Timeout) -> Inform("Listing users", []), - call(Node, {rabbit_auth_backend_internal, list_users, []}, - rabbit_auth_backend_internal:user_info_keys(), true, Timeout); + call_emitter(Node, {rabbit_auth_backend_internal, list_users, []}, + rabbit_auth_backend_internal:user_info_keys(), + [{timeout, Timeout}, to_bin_utf8]); action(list_permissions, Node, [], Opts, Inform, Timeout) -> VHost = proplists:get_value(?VHOST_OPT, Opts), Inform("Listing permissions in vhost \"~s\"", [VHost]), - call(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]}, - rabbit_auth_backend_internal:vhost_perms_info_keys(), true, Timeout, - true); + call_emitter(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]}, + rabbit_auth_backend_internal:vhost_perms_info_keys(), + [{timeout, Timeout}, to_bin_utf8, is_escaped]); action(list_parameters, Node, [], Opts, Inform, Timeout) -> VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), Inform("Listing runtime parameters", []), - call(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]}, - rabbit_runtime_parameters:info_keys(), Timeout); + call_emitter(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]}, + rabbit_runtime_parameters:info_keys(), + [{timeout, Timeout}]); action(list_policies, Node, [], Opts, Inform, Timeout) -> VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), Inform("Listing policies", []), - call(Node, {rabbit_policy, list_formatted, [VHostArg]}, - rabbit_policy:info_keys(), Timeout); + call_emitter(Node, {rabbit_policy, list_formatted, [VHostArg]}, + rabbit_policy:info_keys(), + [{timeout, Timeout}]); action(list_vhosts, Node, Args, _Opts, Inform, Timeout) -> Inform("Listing vhosts", []), ArgAtoms = default_if_empty(Args, [name]), - call(Node, {rabbit_vhost, info_all, []}, ArgAtoms, true, Timeout); + call_emitter(Node, {rabbit_vhost, info_all, []}, ArgAtoms, + [{timeout, Timeout}, to_bin_utf8]); action(list_user_permissions, _Node, _Args = [], _Opts, _Inform, _Timeout) -> {error_string, "list_user_permissions expects a username argument, but none provided."}; action(list_user_permissions, Node, Args = [_Username], _Opts, Inform, Timeout) -> Inform("Listing permissions for user ~p", Args), - call(Node, {rabbit_auth_backend_internal, list_user_permissions, Args}, - rabbit_auth_backend_internal:user_perms_info_keys(), true, Timeout, - true); + call_emitter(Node, {rabbit_auth_backend_internal, list_user_permissions, Args}, + rabbit_auth_backend_internal:user_perms_info_keys(), + [{timeout, Timeout}, to_bin_utf8, is_escaped]); action(list_queues, Node, Args, Opts, Inform, Timeout) -> - [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]), Inform("Listing queues", []), + %% User options + [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), ArgAtoms = default_if_empty(Args, [name, messages]), - call(Node, {rabbit_amqqueue, info_all, [VHostArg, ArgAtoms, Online, Offline]}, - ArgAtoms, Timeout); + + %% Data for emission + Nodes = nodes_in_cluster(Node, Timeout), + OnlineChunks = if Online -> length(Nodes); true -> 0 end, + OfflineChunks = if Offline -> 1; true -> 0 end, + ChunksOpt = {chunks, OnlineChunks + OfflineChunks}, + TimeoutOpt = {timeout, Timeout}, + EmissionRef = make_ref(), + EmissionRefOpt = {ref, EmissionRef}, + + _ = Online andalso start_emission(Node, {rabbit_amqqueue, emit_info_all, [Nodes, VHostArg, ArgAtoms]}, + [TimeoutOpt, EmissionRefOpt]), + _ = Offline andalso start_emission(Node, {rabbit_amqqueue, emit_info_down, [VHostArg, ArgAtoms]}, + [TimeoutOpt, EmissionRefOpt]), + display_emission_result(EmissionRef, ArgAtoms, [ChunksOpt, TimeoutOpt]); action(list_exchanges, Node, Args, Opts, Inform, Timeout) -> Inform("Listing exchanges", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), ArgAtoms = default_if_empty(Args, [name, type]), - call(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]}, - ArgAtoms, Timeout); + call_emitter(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]}, + ArgAtoms, [{timeout, Timeout}]); action(list_bindings, Node, Args, Opts, Inform, Timeout) -> Inform("Listing bindings", []), @@ -636,27 +665,31 @@ action(list_bindings, Node, Args, Opts, Inform, Timeout) -> ArgAtoms = default_if_empty(Args, [source_name, source_kind, destination_name, destination_kind, routing_key, arguments]), - call(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]}, - ArgAtoms, Timeout); + call_emitter(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]}, + ArgAtoms, [{timeout, Timeout}]); action(list_connections, Node, Args, _Opts, Inform, Timeout) -> Inform("Listing connections", []), ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]), - call(Node, {rabbit_networking, connection_info_all, [ArgAtoms]}, - ArgAtoms, Timeout); + Nodes = nodes_in_cluster(Node, Timeout), + call_emitter(Node, {rabbit_networking, emit_connection_info_all, [Nodes, ArgAtoms]}, + ArgAtoms, [{timeout, Timeout}, {chunks, length(Nodes)}]); action(list_channels, Node, Args, _Opts, Inform, Timeout) -> Inform("Listing channels", []), ArgAtoms = default_if_empty(Args, [pid, user, consumer_count, messages_unacknowledged]), - call(Node, {rabbit_channel, info_all, [ArgAtoms]}, - ArgAtoms, Timeout); + Nodes = nodes_in_cluster(Node, Timeout), + call_emitter(Node, {rabbit_channel, emit_info_all, [Nodes, ArgAtoms]}, ArgAtoms, + [{timeout, Timeout}, {chunks, length(Nodes)}]); action(list_consumers, Node, _Args, Opts, Inform, Timeout) -> Inform("Listing consumers", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - call(Node, {rabbit_amqqueue, consumers_all, [VHostArg]}, - rabbit_amqqueue:consumer_info_keys(), Timeout). + Nodes = nodes_in_cluster(Node, Timeout), + call_emitter(Node, {rabbit_amqqueue, emit_consumers_all, [Nodes, VHostArg]}, + rabbit_amqqueue:consumer_info_keys(), + [{timeout, Timeout}, {chunks, length(Nodes)}]). format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)). @@ -766,17 +799,18 @@ display_info_message_row(IsEscaped, Result, InfoItemKeys) -> {X, Value} -> Value end, IsEscaped) || X <- InfoItemKeys]). -display_info_message(IsEscaped) -> +display_info_message(IsEscaped, InfoItemKeys) -> fun ([], _) -> ok; - ([FirstResult|_] = List, InfoItemKeys) when is_list(FirstResult) -> + ([FirstResult|_] = List, _) when is_list(FirstResult) -> lists:foreach(fun(Result) -> display_info_message_row(IsEscaped, Result, InfoItemKeys) end, List), ok; - (Result, InfoItemKeys) -> - display_info_message_row(IsEscaped, Result, InfoItemKeys) + (Result, _) -> + display_info_message_row(IsEscaped, Result, InfoItemKeys), + ok end. display_info_list(Results, InfoItemKeys) when is_list(Results) -> @@ -833,7 +867,10 @@ display_call_result(Node, MFA) -> end. unsafe_rpc(Node, Mod, Fun, Args) -> - case rpc_call(Node, Mod, Fun, Args) of + unsafe_rpc(Node, Mod, Fun, Args, ?RPC_TIMEOUT). + +unsafe_rpc(Node, Mod, Fun, Args, Timeout) -> + case rpc_call(Node, Mod, Fun, Args, Timeout) of {badrpc, _} = Res -> throw(Res); Normal -> Normal end. @@ -852,33 +889,42 @@ ensure_app_running(Node) -> call(Node, {Mod, Fun, Args}) -> rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)). -call(Node, {Mod, Fun, Args}, InfoKeys, Timeout) -> - call(Node, {Mod, Fun, Args}, InfoKeys, false, Timeout, false). +call_emitter(Node, {Mod, Fun, Args}, InfoKeys, Opts) -> + Ref = start_emission(Node, {Mod, Fun, Args}, Opts), + display_emission_result(Ref, InfoKeys, Opts). + +start_emission(Node, {Mod, Fun, Args}, Opts) -> + ToBinUtf8 = proplists:get_value(to_bin_utf8, Opts, false), + Timeout = proplists:get_value(timeout, Opts, infinity), + Ref = proplists:get_value(ref, Opts, make_ref()), + rabbit_control_misc:spawn_emitter_caller( + Node, Mod, Fun, prepare_call_args(Args, ToBinUtf8), + Ref, self(), Timeout), + Ref. + +display_emission_result(Ref, InfoKeys, Opts) -> + IsEscaped = proplists:get_value(is_escaped, Opts, false), + Chunks = proplists:get_value(chunks, Opts, 1), + Timeout = proplists:get_value(timeout, Opts, infinity), + EmissionStatus = rabbit_control_misc:wait_for_info_messages( + self(), Ref, display_info_message(IsEscaped, InfoKeys), ok, Timeout, Chunks), + emission_to_action_result(EmissionStatus). + +%% Convert rabbit_control_misc:wait_for_info_messages/6 return value +%% into form expected by rabbit_cli:main/3. +emission_to_action_result({ok, ok}) -> + ok; +emission_to_action_result({error, Error}) -> + Error. -call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout) -> - call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout, false). +prepare_call_args(Args, ToBinUtf8) -> + case ToBinUtf8 of + true -> valid_utf8_args(Args); + false -> Args + end. -call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout, IsEscaped) -> - Args0 = case ToBinUtf8 of - true -> lists:map(fun list_to_binary_utf8/1, Args); - false -> Args - end, - Ref = make_ref(), - Pid = self(), - spawn_link( - fun () -> - case rabbit_cli:rpc_call(Node, Mod, Fun, Args0, - Ref, Pid, Timeout) of - {error, _} = Error -> - Pid ! {error, Error}; - {bad_argument, _} = Error -> - Pid ! {error, Error}; - _ -> - ok - end - end), - rabbit_control_misc:wait_for_info_messages( - Pid, Ref, InfoKeys, display_info_message(IsEscaped), Timeout). +valid_utf8_args(Args) -> + lists:map(fun list_to_binary_utf8/1, Args). list_to_binary_utf8(L) -> B = list_to_binary(L), @@ -928,7 +974,10 @@ split_list([_]) -> exit(even_list_needed); split_list([A, B | T]) -> [{A, B} | split_list(T)]. nodes_in_cluster(Node) -> - unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running]). + unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], ?RPC_TIMEOUT). + +nodes_in_cluster(Node, Timeout) -> + unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], Timeout). alarms_by_node(Name) -> Status = unsafe_rpc(Name, rabbit, status, []), diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl index 3dd0421485..d2d37f0ec0 100644 --- a/src/rabbit_file.erl +++ b/src/rabbit_file.erl @@ -24,6 +24,7 @@ -export([rename/2, delete/1, recursive_delete/1, recursive_copy/2]). -export([lock_file/1]). -export([read_file_info/1]). +-export([filename_as_a_directory/1]). -import(file_handle_cache, [with_handle/1, with_handle/2]). @@ -59,6 +60,7 @@ (file:filename(), file:filename()) -> rabbit_types:ok_or_error({file:filename(), file:filename(), any()})). -spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')). +-spec(filename_as_a_directory/1 :: (file:filename()) -> file:filename()). -endif. @@ -306,3 +308,11 @@ lock_file(Path) -> ok = prim_file:close(Lock) end) end. + +filename_as_a_directory(FileName) -> + case lists:last(FileName) of + "/" -> + FileName; + _ -> + FileName ++ "/" + end. diff --git a/src/rabbit_hipe.erl b/src/rabbit_hipe.erl index cbd9181e6a..6957d85cb4 100644 --- a/src/rabbit_hipe.erl +++ b/src/rabbit_hipe.erl @@ -5,15 +5,15 @@ %% practice 2 processes seems just as fast as any other number > 1, %% and keeps the progress bar realistic-ish. -define(HIPE_PROCESSES, 2). --export([maybe_hipe_compile/0, log_hipe_result/1]). -%% HiPE compilation happens before we have log handlers - so we have -%% to io:format/2, it's all we can do. +-export([maybe_hipe_compile/0, log_hipe_result/1]). +-export([compile_to_directory/1]). +-export([can_hipe_compile/0]). +%% Compile and load during server startup sequence maybe_hipe_compile() -> {ok, Want} = application:get_env(rabbit, hipe_compile), - Can = code:which(hipe) =/= non_existing, - case {Want, Can} of + case {Want, can_hipe_compile()} of {true, true} -> hipe_compile(); {true, false} -> false; {false, _} -> {ok, disabled} @@ -33,38 +33,49 @@ log_hipe_result(false) -> rabbit_log:warning( "Not HiPE compiling: HiPE not found in this Erlang installation.~n"). +hipe_compile() -> + hipe_compile(fun compile_and_load/1, false). + +compile_to_directory(Dir0) -> + Dir = rabbit_file:filename_as_a_directory(Dir0), + ok = prepare_ebin_directory(Dir), + hipe_compile(fun (Mod) -> compile_and_save(Mod, Dir) end, true). + +needs_compilation(Mod, Force) -> + Exists = code:which(Mod) =/= non_existing, + %% We skip modules already natively compiled. This + %% happens when RabbitMQ is stopped (just the + %% application, not the entire node) and started + %% again. + NotYetCompiled = not already_hipe_compiled(Mod), + NotVersioned = not compiled_with_version_support(Mod), + Exists andalso (Force orelse (NotYetCompiled andalso NotVersioned)). + %% HiPE compilation happens before we have log handlers and can take a %% long time, so make an exception to our no-stdout policy and display %% progress via stdout. -hipe_compile() -> +hipe_compile(CompileFun, Force) -> {ok, HipeModulesAll} = application:get_env(rabbit, hipe_modules), - HipeModules = [HM || HM <- HipeModulesAll, - code:which(HM) =/= non_existing andalso - %% We skip modules already natively compiled. This - %% happens when RabbitMQ is stopped (just the - %% application, not the entire node) and started - %% again. - already_hipe_compiled(HM) - andalso (not compiled_with_version_support(HM))], + HipeModules = lists:filter(fun(Mod) -> needs_compilation(Mod, Force) end, HipeModulesAll), case HipeModules of [] -> {ok, already_compiled}; - _ -> do_hipe_compile(HipeModules) + _ -> do_hipe_compile(HipeModules, CompileFun) end. already_hipe_compiled(Mod) -> try %% OTP 18.x or later - Mod:module_info(native) =:= false + Mod:module_info(native) =:= true %% OTP prior to 18.x catch error:badarg -> - code:is_module_native(Mod) =:= false + code:is_module_native(Mod) =:= true end. compiled_with_version_support(Mod) -> proplists:get_value(erlang_version_support, Mod:module_info(attributes)) =/= undefined. -do_hipe_compile(HipeModules) -> +do_hipe_compile(HipeModules, CompileFun) -> Count = length(HipeModules), io:format("~nHiPE compiling: |~s|~n |", [string:copies("-", Count)]), @@ -79,11 +90,7 @@ do_hipe_compile(HipeModules) -> %% advanced API does not load automatically the code, except if the %% 'load' option is set. PidMRefs = [spawn_monitor(fun () -> [begin - {M, Beam, _} = - code:get_object_code(M), - {ok, _} = - hipe:compile(M, [], Beam, - [o3, load]), + CompileFun(M), io:format("#") end || M <- Ms] end) || @@ -101,3 +108,39 @@ split(L, N) -> split0(L, [[] || _ <- lists:seq(1, N)]). split0([], Ls) -> Ls; split0([I | Is], [L | Ls]) -> split0(Is, Ls ++ [[I | L]]). + +prepare_ebin_directory(Dir) -> + ok = rabbit_file:ensure_dir(Dir), + ok = delete_beam_files(Dir), + ok. + +delete_beam_files(Dir) -> + {ok, Files} = file:list_dir(Dir), + lists:foreach(fun(File) -> + case filename:extension(File) of + ".beam" -> + ok = file:delete(filename:join([Dir, File])); + _ -> + ok + end + end, + Files). + +compile_and_load(Mod) -> + {Mod, Beam, _} = code:get_object_code(Mod), + {ok, _} = hipe:compile(Mod, [], Beam, [o3, load]). + +compile_and_save(Module, Dir) -> + {Module, BeamCode, _} = code:get_object_code(Module), + BeamName = filename:join([Dir, atom_to_list(Module) ++ ".beam"]), + {ok, {Architecture, NativeCode}} = hipe:compile(Module, [], BeamCode, [o3]), + {ok, _, Chunks0} = beam_lib:all_chunks(BeamCode), + ChunkName = hipe_unified_loader:chunk_name(Architecture), + Chunks1 = lists:keydelete(ChunkName, 1, Chunks0), + Chunks = Chunks1 ++ [{ChunkName,NativeCode}], + {ok, BeamPlusNative} = beam_lib:build_module(Chunks), + ok = file:write_file(BeamName, BeamPlusNative), + BeamName. + +can_hipe_compile() -> + code:which(hipe) =/= non_existing. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index e447e9de82..9674a4ef2c 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -363,7 +363,7 @@ fetch(AckRequired, State = #state { backing_queue = BQ, State1 = State #state { backing_queue_state = BQS1 }, {Result, case Result of empty -> State1; - {_MsgId, _IsDelivered, AckTag} -> drop_one(AckTag, State1) + {_MsgId, _IsDelivered, _AckTag} -> drop_one(AckRequired, State1) end}. drop(AckRequired, State = #state { backing_queue = BQ, @@ -372,7 +372,7 @@ drop(AckRequired, State = #state { backing_queue = BQ, State1 = State #state { backing_queue_state = BQS1 }, {Result, case Result of empty -> State1; - {_MsgId, AckTag} -> drop_one(AckTag, State1) + {_MsgId, _AckTag} -> drop_one(AckRequired, State1) end}. ack(AckTags, State = #state { gm = GM, @@ -518,6 +518,7 @@ promote_backing_queue_state(QName, CPid, BQ, BQS, GM, AckTags, Seen, KS) -> Depth = BQ:depth(BQS1), true = Len == Depth, %% ASSERTION: everything must have been requeued ok = gm:broadcast(GM, {depth, Depth}), + WaitTimeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000), #state { name = QName, gm = GM, coordinator = CPid, @@ -525,7 +526,8 @@ promote_backing_queue_state(QName, CPid, BQ, BQS, GM, AckTags, Seen, KS) -> backing_queue_state = BQS1, seen_status = Seen, confirmed = [], - known_senders = sets:from_list(KS) }. + known_senders = sets:from_list(KS), + wait_timeout = WaitTimeout }. sender_death_fun() -> Self = self(), @@ -556,10 +558,10 @@ depth_fun() -> %% Helpers %% --------------------------------------------------------------------------- -drop_one(AckTag, State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS }) -> - ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckTag =/= undefined}), +drop_one(AckRequired, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckRequired}), State. drop(PrevLen, AckRequired, State = #state { gm = GM, diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 849efa3611..b188298a9b 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -20,7 +20,7 @@ -export([remove_from_queue/3, on_node_up/0, add_mirrors/3, report_deaths/4, store_updated_slaves/1, initial_queue_node/2, suggested_queue_nodes/1, - is_mirrored/1, update_mirrors/2, validate_policy/1, + is_mirrored/1, update_mirrors/2, update_mirrors/1, validate_policy/1, maybe_auto_sync/1, maybe_drop_master_after_sync/1, sync_batch_size/1, log_info/3, log_warning/3]). @@ -64,6 +64,8 @@ -spec(is_mirrored/1 :: (rabbit_types:amqqueue()) -> boolean()). -spec(update_mirrors/2 :: (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). +-spec(update_mirrors/1 :: + (rabbit_types:amqqueue()) -> 'ok'). -spec(maybe_drop_master_after_sync/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(maybe_auto_sync/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(log_info/3 :: (rabbit_amqqueue:name(), string(), [any()]) -> 'ok'). @@ -384,15 +386,12 @@ update_mirrors(OldQ = #amqqueue{pid = QPid}, NewQ = #amqqueue{pid = QPid}) -> case {is_mirrored(OldQ), is_mirrored(NewQ)} of {false, false} -> ok; - {true, false} -> rabbit_amqqueue:stop_mirroring(QPid); - {false, true} -> rabbit_amqqueue:start_mirroring(QPid); - {true, true} -> update_mirrors0(OldQ, NewQ) + _ -> rabbit_amqqueue:update_mirroring(QPid) end. -update_mirrors0(OldQ = #amqqueue{name = QName}, - NewQ = #amqqueue{name = QName}) -> - {OldMNode, OldSNodes, _} = actual_queue_nodes(OldQ), - {NewMNode, NewSNodes} = suggested_queue_nodes(NewQ), +update_mirrors(Q = #amqqueue{name = QName}) -> + {OldMNode, OldSNodes, _} = actual_queue_nodes(Q), + {NewMNode, NewSNodes} = suggested_queue_nodes(Q), OldNodes = [OldMNode | OldSNodes], NewNodes = [NewMNode | NewSNodes], %% When a mirror dies, remove_from_queue/2 might have to add new @@ -406,7 +405,7 @@ update_mirrors0(OldQ = #amqqueue{name = QName}, drop_mirrors(QName, OldNodes -- NewNodes), %% This is for the case where no extra nodes were added but we changed to %% a policy requiring auto-sync. - maybe_auto_sync(NewQ), + maybe_auto_sync(Q), ok. %% The arrival of a newly synced slave may cause the master to die if diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index eb8cf63327..a9caadf972 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -276,7 +276,9 @@ update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) -> NewPolicy -> case rabbit_amqqueue:update( QName, fun(Q1) -> rabbit_queue_decorator:set( - Q1#amqqueue{policy = NewPolicy}) + Q1#amqqueue{policy = NewPolicy, + policy_version = + Q1#amqqueue.policy_version + 1 }) end) of #amqqueue{} = Q1 -> {Q, Q1}; not_found -> {Q, Q } diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl index 6141796f7b..a3bfb5cdfa 100644 --- a/src/rabbit_priority_queue.erl +++ b/src/rabbit_priority_queue.erl @@ -205,8 +205,8 @@ publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough1(publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS)). -batch_publish(Publishes, ChPid, Flow, State = #state{bq = BQ}) -> - PubDict = partition_publish_batch(Publishes), +batch_publish(Publishes, ChPid, Flow, State = #state{bq = BQ, bqss = [{MaxP, _} |_]}) -> + PubDict = partition_publish_batch(Publishes, MaxP), lists:foldl( fun ({Priority, Pubs}, St) -> pick1(fun (_P, BQSN) -> @@ -227,8 +227,8 @@ publish_delivered(Msg, MsgProps, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough2(publish_delivered(Msg, MsgProps, ChPid, Flow, BQS)). -batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ}) -> - PubDict = partition_publish_delivered_batch(Publishes), +batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ, bqss = [{MaxP, _} |_]}) -> + PubDict = partition_publish_delivered_batch(Publishes, MaxP), {PrioritiesAndAcks, State1} = lists:foldl( fun ({Priority, Pubs}, {PriosAndAcks, St}) -> @@ -404,7 +404,6 @@ msg_rates(#state{bq = BQ, bqss = BQSs}) -> end, {0.0, 0.0}, BQSs); msg_rates(#passthrough{bq = BQ, bqs = BQS}) -> BQ:msg_rates(BQS). - info(backing_queue_status, #state{bq = BQ, bqss = BQSs}) -> fold0(fun (P, BQSN, Acc) -> combine_status(P, BQ:info(backing_queue_status, BQSN), Acc) @@ -433,8 +432,8 @@ set_queue_mode(Mode, State = #state{bq = BQ}) -> set_queue_mode(Mode, State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough1(set_queue_mode(Mode, BQS)). -zip_msgs_and_acks(Msgs, AckTags, Accumulator, #state{}) -> - MsgsByPriority = partition_publish_delivered_batch(Msgs), +zip_msgs_and_acks(Msgs, AckTags, Accumulator, #state{bqss = [{MaxP, _} |_]}) -> + MsgsByPriority = partition_publish_delivered_batch(Msgs, MaxP), lists:foldl(fun (Acks, MAs) -> {P, _AckTag} = hd(Acks), Pubs = orddict:fetch(P, MsgsByPriority), @@ -484,13 +483,14 @@ foreach1(_Fun, [], BQSAcc) -> %% For a given thing, just go to its BQ pick1(Fun, Prioritisable, #state{bqss = BQSs} = State) -> - {P, BQSN} = priority(Prioritisable, BQSs), + {P, BQSN} = priority_bq(Prioritisable, BQSs), a(State#state{bqss = bq_store(P, Fun(P, BQSN), BQSs)}). %% Fold over results fold2(Fun, Acc, State = #state{bqss = BQSs}) -> {Res, BQSs1} = fold2(Fun, Acc, BQSs, []), {Res, a(State#state{bqss = BQSs1})}. + fold2(Fun, Acc, [{P, BQSN} | Rest], BQSAcc) -> {Acc1, BQSN1} = Fun(P, BQSN, Acc), fold2(Fun, Acc1, Rest, [{P, BQSN1} | BQSAcc]); @@ -532,7 +532,7 @@ fold_by_acktags2(Fun, AckTags, State) -> %% For a given thing, just go to its BQ pick2(Fun, Prioritisable, #state{bqss = BQSs} = State) -> - {P, BQSN} = priority(Prioritisable, BQSs), + {P, BQSN} = priority_bq(Prioritisable, BQSs), {Res, BQSN1} = Fun(P, BQSN), {Res, a(State#state{bqss = bq_store(P, BQSN1, BQSs)})}. @@ -564,8 +564,7 @@ findfold3(_Fun, Acc, NotFound, [], BQSAcc) -> {NotFound, Acc, lists:reverse(BQSAcc)}. bq_fetch(P, []) -> exit({not_found, P}); -bq_fetch(P, [{P, BQSN} | _]) -> BQSN; -bq_fetch(P, [{P1, BQSN} | _]) when P > P1 -> BQSN; +bq_fetch(P, [{P, BQSN} | _]) -> {P, BQSN}; bq_fetch(P, [{_, _BQSN} | T]) -> bq_fetch(P, T). bq_store(P, BQS, BQSs) -> @@ -583,41 +582,36 @@ a(State = #state{bqss = BQSs}) -> end. %%---------------------------------------------------------------------------- -partition_publish_batch(Publishes) -> +partition_publish_batch(Publishes, MaxP) -> partition_publishes( - Publishes, fun ({Msg, _, _}) -> Msg end). + Publishes, fun ({Msg, _, _}) -> Msg end, MaxP). -partition_publish_delivered_batch(Publishes) -> +partition_publish_delivered_batch(Publishes, MaxP) -> partition_publishes( - Publishes, fun ({Msg, _}) -> Msg end). + Publishes, fun ({Msg, _}) -> Msg end, MaxP). -partition_publishes(Publishes, ExtractMsg) -> +partition_publishes(Publishes, ExtractMsg, MaxP) -> lists:foldl(fun (Pub, Dict) -> Msg = ExtractMsg(Pub), - rabbit_misc:orddict_cons(priority2(Msg), Pub, Dict) + rabbit_misc:orddict_cons(priority(Msg, MaxP), Pub, Dict) end, orddict:new(), Publishes). -priority(P, BQSs) when is_integer(P) -> - {P, bq_fetch(P, BQSs)}; -priority(#basic_message{content = Content}, BQSs) -> - priority1(rabbit_binary_parser:ensure_content_decoded(Content), BQSs). - -priority1(_Content, [{P, BQSN}]) -> - {P, BQSN}; -priority1(Content, [{P, BQSN} | Rest]) -> - case priority2(Content) >= P of - true -> {P, BQSN}; - false -> priority1(Content, Rest) - end. - -priority2(#basic_message{content = Content}) -> - priority2(rabbit_binary_parser:ensure_content_decoded(Content)); -priority2(#content{properties = Props}) -> +priority_bq(Priority, [{MaxP, _} | _] = BQSs) -> + bq_fetch(priority(Priority, MaxP), BQSs). + +%% Messages with a priority which is higher than the queue's maximum are treated +%% as if they were published with the maximum priority. +priority(undefined, _MaxP) -> + 0; +priority(Priority, MaxP) when is_integer(Priority), Priority =< MaxP -> + Priority; +priority(Priority, MaxP) when is_integer(Priority), Priority > MaxP -> + MaxP; +priority(#basic_message{content = Content}, MaxP) -> + priority(rabbit_binary_parser:ensure_content_decoded(Content), MaxP); +priority(#content{properties = Props}, MaxP) -> #'P_basic'{priority = Priority0} = Props, - case Priority0 of - undefined -> 0; - _ when is_integer(Priority0) -> Priority0 - end. + priority(Priority0, MaxP). add_maybe_infinity(infinity, _) -> infinity; add_maybe_infinity(_, infinity) -> infinity; diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index b99a1d12ee..0f55b9e4a9 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -52,6 +52,7 @@ -rabbit_upgrade({down_slave_nodes, mnesia, [queue_decorators]}). -rabbit_upgrade({queue_state, mnesia, [down_slave_nodes]}). -rabbit_upgrade({recoverable_slaves, mnesia, [queue_state]}). +-rabbit_upgrade({policy_version, mnesia, [recoverable_slaves]}). -rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}). %% ------------------------------------------------------------------- @@ -447,6 +448,24 @@ recoverable_slaves(Table) -> sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators, state]). +policy_version() -> + ok = policy_version(rabbit_queue), + ok = policy_version(rabbit_durable_queue). + +policy_version(Table) -> + transform( + Table, + fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators, + State}) -> + {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators, + State, 0} + end, + [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, + sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators, state, + policy_version]). + %% Prior to 3.6.0, passwords were hashed using MD5, this populates %% existing records with said default. Users created with 3.6.0+ will %% have internal_user.hashing_algorithm populated by the internal diff --git a/src/truncate.erl b/src/truncate.erl index 1c9b08ed27..a1586b0cb0 100644 --- a/src/truncate.erl +++ b/src/truncate.erl @@ -21,8 +21,10 @@ -record(params, {content, struct, content_dec, struct_dec}). -export([log_event/2, term/2]). -%% exported for testing --export([test/0]). + +-ifdef(TEST). +-export([term_size/3]). +-endif. log_event({Type, GL, {Pid, Format, Args}}, Params) when Type =:= error orelse @@ -123,72 +125,3 @@ tuple_term_size(_T, M, I, S, _W) when I > S -> M; tuple_term_size(T, M, I, S, W) -> tuple_term_size(T, lim(term_size(element(I, T), M, W), 2 * W), I + 1, S, W). - -%%---------------------------------------------------------------------------- - -test() -> - test_short_examples_exactly(), - test_term_limit(), - test_large_examples_for_size(), - ok. - -test_short_examples_exactly() -> - F = fun (Term, Exp) -> - Exp = term(Term, {1, {10, 10, 5, 5}}), - Term = term(Term, {100000, {10, 10, 5, 5}}) - end, - FSmall = fun (Term, Exp) -> - Exp = term(Term, {1, {2, 2, 2, 2}}), - Term = term(Term, {100000, {2, 2, 2, 2}}) - end, - F([], []), - F("h", "h"), - F("hello world", "hello w..."), - F([[h,e,l,l,o,' ',w,o,r,l,d]], [[h,e,l,l,o,'...']]), - F([a|b], [a|b]), - F(<<"hello">>, <<"hello">>), - F([<<"hello world">>], [<<"he...">>]), - F(<<1:1>>, <<1:1>>), - F(<<1:81>>, <<0:56, "...">>), - F({{{{a}}},{b},c,d,e,f,g,h,i,j,k}, {{{'...'}},{b},c,d,e,f,g,h,i,j,'...'}), - FSmall({a,30,40,40,40,40}, {a,30,'...'}), - FSmall([a,30,40,40,40,40], [a,30,'...']), - P = spawn(fun() -> receive die -> ok end end), - F([0, 0.0, <<1:1>>, F, P], [0, 0.0, <<1:1>>, F, P]), - P ! die, - R = make_ref(), - F([R], [R]), - ok. - -test_term_limit() -> - W = erlang:system_info(wordsize), - S = <<"abc">>, - 1 = term_size(S, 4, W), - limit_exceeded = term_size(S, 3, W), - case 100 - term_size([S, S], 100, W) of - 22 -> ok; %% 32 bit - 38 -> ok %% 64 bit - end, - case 100 - term_size([S, [S]], 100, W) of - 30 -> ok; %% ditto - 54 -> ok - end, - limit_exceeded = term_size([S, S], 6, W), - ok. - -test_large_examples_for_size() -> - %% Real world values - Shrink = fun(Term) -> term(Term, {1, {1000, 100, 50, 5}}) end, - TestSize = fun(Term) -> - true = 5000000 < size(term_to_binary(Term)), - true = 500000 > size(term_to_binary(Shrink(Term))) - end, - TestSize(lists:seq(1, 5000000)), - TestSize(recursive_list(1000, 10)), - TestSize(recursive_list(5000, 20)), - TestSize(gb_sets:from_list([I || I <- lists:seq(1, 1000000)])), - TestSize(gb_trees:from_orddict([{I, I} || I <- lists:seq(1, 1000000)])), - ok. - -recursive_list(S, 0) -> lists:seq(1, S); -recursive_list(S, N) -> [recursive_list(S div N, N-1) || _ <- lists:seq(1, S)]. |
