diff options
| author | sylvainhubsch <sylvain.hubsch@gmail.com> | 2016-06-05 15:56:12 -0700 |
|---|---|---|
| committer | sylvainhubsch <sylvain.hubsch@gmail.com> | 2016-06-05 15:56:12 -0700 |
| commit | 60db6956798b8dbbb664ba4c61080faad9f12441 (patch) | |
| tree | ccb7705db4da7be7c4ca6ea2f10b9ea84dff6e34 /src | |
| parent | 8b9a54c5cae6541dadef021f35abc50b84b6cf96 (diff) | |
| parent | 2b4df8a8382b9cd62d23165409754b54c5fab2ca (diff) | |
| download | rabbitmq-server-git-60db6956798b8dbbb664ba4c61080faad9f12441.tar.gz | |
Merge remote-tracking branch 'upstream/master' into rabbitmq-server-xh-notypeandshortcuts
Diffstat (limited to 'src')
35 files changed, 577 insertions, 321 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index d5f0cbee6f..e9754821e8 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -537,12 +537,12 @@ clear(Ref) -> end). set_maximum_since_use(MaximumAge) -> - Now = time_compat:monotonic_time(), + Now = erlang:monotonic_time(), case lists:foldl( fun ({{Ref, fhc_handle}, Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) -> case Hdl =/= closed andalso - time_compat:convert_time_unit(Now - Then, + erlang:convert_time_unit(Now - Then, native, micro_seconds) >= MaximumAge of @@ -715,7 +715,7 @@ get_or_reopen(RefNewOrReopens) -> {ok, [Handle || {_Ref, Handle} <- OpenHdls]}; {OpenHdls, ClosedHdls} -> Oldest = oldest(get_age_tree(), - fun () -> time_compat:monotonic_time() end), + fun () -> erlang:monotonic_time() end), case gen_server2:call(?SERVER, {open, self(), length(ClosedHdls), Oldest}, infinity) of ok -> @@ -751,7 +751,7 @@ reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed, end, case prim_file:open(Path, Mode) of {ok, Hdl} -> - Now = time_compat:monotonic_time(), + Now = erlang:monotonic_time(), {{ok, _Offset}, Handle1} = maybe_seek(Offset, reset_read_buffer( Handle#handle{hdl = Hdl, @@ -787,7 +787,7 @@ sort_handles([{Ref, _} | RefHdls], RefHdlsA, [{Ref, Handle} | RefHdlsB], Acc) -> sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]). put_handle(Ref, Handle = #handle { last_used_at = Then }) -> - Now = time_compat:monotonic_time(), + Now = erlang:monotonic_time(), age_tree_update(Then, Now, Ref), put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }). @@ -1429,14 +1429,14 @@ reduce(State = #fhc_state { open_pending = OpenPending, elders = Elders, clients = Clients, timer_ref = TRef }) -> - Now = time_compat:monotonic_time(), + Now = erlang:monotonic_time(), {CStates, Sum, ClientCount} = ets:foldl(fun ({Pid, Eldest}, {CStatesAcc, SumAcc, CountAcc} = Accs) -> [#cstate { pending_closes = PendingCloses, opened = Opened, blocked = Blocked } = CState] = ets:lookup(Clients, Pid), - TimeDiff = time_compat:convert_time_unit( + TimeDiff = erlang:convert_time_unit( Now - Eldest, native, micro_seconds), case Blocked orelse PendingCloses =:= Opened of true -> Accs; diff --git a/src/file_handle_cache_stats.erl b/src/file_handle_cache_stats.erl index ccf1e49662..8f368a8405 100644 --- a/src/file_handle_cache_stats.erl +++ b/src/file_handle_cache_stats.erl @@ -59,8 +59,8 @@ get() -> lists:sort(ets:tab2list(?TABLE)). timer_tc(Thunk) -> - T1 = time_compat:monotonic_time(), + T1 = erlang:monotonic_time(), Res = Thunk(), - T2 = time_compat:monotonic_time(), - Diff = time_compat:convert_time_unit(T2 - T1, native, micro_seconds), + T2 = erlang:monotonic_time(), + Diff = erlang:convert_time_unit(T2 - T1, native, micro_seconds), {Diff, Res}. diff --git a/src/gm.erl b/src/gm.erl index 199cf7c4de..a83d8d1932 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -552,8 +552,8 @@ forget_group(GroupName) -> init([GroupName, Module, Args, TxnFun]) -> put(process_name, {?MODULE, GroupName}), _ = random:seed(erlang:phash2([node()]), - time_compat:monotonic_time(), - time_compat:unique_integer()), + erlang:monotonic_time(), + erlang:unique_integer()), Self = make_member(GroupName), gen_server2:cast(self(), join), {ok, #state { self = Self, @@ -1338,7 +1338,11 @@ find_common(A, B, Common) -> {{{value, Val}, A1}, {{value, Val}, B1}} -> find_common(A1, B1, queue:in(Val, Common)); {{empty, _A}, _} -> - {Common, B} + {Common, B}; + {_, {_, B1}} -> + find_common(A, B1, Common); + {{_, A1}, _} -> + find_common(A1, B, Common) end. diff --git a/src/pg2_fixed.erl b/src/pg2_fixed.erl index 222a0bc849..73c05819d4 100644 --- a/src/pg2_fixed.erl +++ b/src/pg2_fixed.erl @@ -149,11 +149,11 @@ get_closest_pid(Name) -> case get_members(Name) of [] -> {error, {no_process, Name}}; Members -> - X = time_compat:erlang_system_time(micro_seconds), + X = erlang:system_time(micro_seconds), lists:nth((X rem length(Members))+1, Members) end; Members when is_list(Members) -> - X = time_compat:erlang_system_time(micro_seconds), + X = erlang:system_time(micro_seconds), lists:nth((X rem length(Members))+1, Members); Else -> Else diff --git a/src/rabbit.erl b/src/rabbit.erl index 7816cd53aa..d62a4b6935 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -507,6 +507,7 @@ await_startup(HaveSeenRabbitBoot) -> status() -> S1 = [{pid, list_to_integer(os:getpid())}, + %% The timeout value used is twice that of gen_server:call/2. {running_applications, rabbit_misc:which_applications()}, {os, os:type()}, {erlang_version, erlang:system_info(system_version)}, @@ -563,8 +564,9 @@ is_running() -> is_running(node()). is_running(Node) -> rabbit_nodes:is_process_running(Node, rabbit). environment() -> + %% The timeout value is twice that of gen_server:call/2. [{A, environment(A)} || - {A, _, _} <- lists:keysort(1, application:which_applications())]. + {A, _, _} <- lists:keysort(1, application:which_applications(10000))]. environment(App) -> Ignore = [default_pass, included_applications], @@ -731,7 +733,7 @@ log_broker_started(Plugins) -> erts_version_check() -> ERTSVer = erlang:system_info(version), - OTPRel = erlang:system_info(otp_release), + OTPRel = rabbit_misc:otp_release(), case rabbit_misc:version_compare(?ERTS_MINIMUM, ERTSVer, lte) of true when ?ERTS_MINIMUM =/= ERTSVer -> ok; diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6d3bf892b0..d9724e0012 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -33,27 +33,64 @@ prioritise_cast/3, prioritise_info/3, format_message_queue/2]). %% Queue's state --record(q, {q, +-record(q, { + %% an #amqqueue record + q, + %% none | {exclusive consumer channel PID, consumer tag} exclusive_consumer, + %% Set to true if a queue has ever had a consumer. + %% This is used to determine when to delete auto-delete queues. has_had_consumers, + %% backing queue module. + %% for mirrored queues, this will be rabbit_mirror_queue_master. + %% for non-priority and non-mirrored queues, rabbit_variable_queue. + %% see rabbit_backing_queue. backing_queue, + %% backing queue state. + %% see rabbit_backing_queue, rabbit_variable_queue. backing_queue_state, + %% consumers state, see rabbit_queue_consumers consumers, + %% queue expiration value expires, + %% timer used to periodically sync (flush) queue index sync_timer_ref, + %% timer used to update ingress/egress rates and queue RAM duration target rate_timer_ref, + %% timer used to clean up this queue due to TTL (on when unused) expiry_timer_ref, + %% stats emission timer stats_timer, + %% maps message IDs to {channel pid, MsgSeqNo} + %% pairs msg_id_to_channel, + %% message TTL value ttl, + %% timer used to delete expired messages ttl_timer_ref, ttl_timer_expiry, + %% Keeps track of channels that publish to this queue. + %% When channel process goes down, queues have to perform + %% certain cleanup. senders, + %% dead letter exchange as a #resource record, if any dlx, dlx_routing_key, + %% max length in messages, if configured max_length, + %% max length in bytes, if configured max_bytes, + %% when policies change, this version helps queue + %% determine what previously scheduled/set up state to ignore, + %% e.g. message expiration messages from previously set up timers + %% that may or may not be still valid args_policy_version, + %% used to discard outdated/superseded policy updates, + %% e.g. when policies are applied concurrently. See + %% https://github.com/rabbitmq/rabbitmq-server/issues/803 for one + %% example. + mirroring_policy_version = 0, + %% running | flow | idle status }). @@ -431,7 +468,7 @@ ensure_ttl_timer(undefined, State) -> State; ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined, args_policy_version = Version}) -> - After = (case Expiry - time_compat:os_system_time(micro_seconds) of + After = (case Expiry - os:system_time(micro_seconds) of V when V > 0 -> V + 999; %% always fire later _ -> 0 end) div 1000, @@ -702,7 +739,7 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers, exclusive_consumer = Holder1}, notify_decorators(State2), case should_auto_delete(State2) of - true -> + true -> log_auto_delete( io_lib:format( "because all of its consumers (~p) were on a channel that was closed", @@ -757,7 +794,7 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) -> {ok, MsgTTL} = rabbit_basic:parse_expiration(Props), case lists:min([TTL, MsgTTL]) of undefined -> undefined; - T -> time_compat:os_system_time(micro_seconds) + T * 1000 + T -> os:system_time(micro_seconds) + T * 1000 end. %% Logically this function should invoke maybe_send_drained/2. @@ -768,7 +805,7 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) -> drop_expired_msgs(State) -> case is_empty(State) of true -> State; - false -> drop_expired_msgs(time_compat:os_system_time(micro_seconds), + false -> drop_expired_msgs(os:system_time(micro_seconds), State) end. @@ -1071,11 +1108,11 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, notify_decorators(State1), case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); - true -> + true -> log_auto_delete( io_lib:format( "because its last consumer with tag '~s' was cancelled", - [ConsumerTag]), + [ConsumerTag]), State), stop(ok, State1) end @@ -1207,22 +1244,15 @@ handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State); -handle_cast(start_mirroring, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - %% lookup again to get policy for init_with_existing_bq - {ok, Q} = rabbit_amqqueue:lookup(qname(State)), - true = BQ =/= rabbit_mirror_queue_master, %% assertion - BQ1 = rabbit_mirror_queue_master, - BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS), - noreply(State#q{backing_queue = BQ1, - backing_queue_state = BQS1}); - -handle_cast(stop_mirroring, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - BQ = rabbit_mirror_queue_master, %% assertion - {BQ1, BQS1} = BQ:stop_mirroring(BQS), - noreply(State#q{backing_queue = BQ1, - backing_queue_state = BQS1}); +handle_cast(update_mirroring, State = #q{q = Q, + mirroring_policy_version = Version}) -> + case needs_update_mirroring(Q, Version) of + false -> + noreply(State); + {Policy, NewVersion} -> + State1 = State#q{mirroring_policy_version = NewVersion}, + noreply(update_mirroring(Policy, State1)) + end; handle_cast({credit, ChPid, CTag, Credit, Drain}, State = #q{consumers = Consumers, @@ -1358,7 +1388,7 @@ handle_pre_hibernate(State = #q{backing_queue = BQ, State, #q.stats_timer, fun () -> emit_stats(State, [{idle_since, - time_compat:os_system_time(milli_seconds)}, + os:system_time(milli_seconds)}, {consumer_utilisation, ''}]) end), State1 = rabbit_event:stop_stats_timer(State#q{backing_queue_state = BQS3}, @@ -1371,19 +1401,67 @@ log_delete_exclusive({ConPid, _ConRef}, State) -> log_delete_exclusive(ConPid, State); log_delete_exclusive(ConPid, #q{ q = #amqqueue{ name = Resource } }) -> #resource{ name = QName, virtual_host = VHost } = Resource, - rabbit_queue:debug("Deleting exclusive queue '~s' in vhost '~s' " ++ - " because its declaring connection ~p was closed", - [QName, VHost, ConPid]). + rabbit_log_queue:debug("Deleting exclusive queue '~s' in vhost '~s' " ++ + "because its declaring connection ~p was closed", + [QName, VHost, ConPid]). log_auto_delete(Reason, #q{ q = #amqqueue{ name = Resource } }) -> #resource{ name = QName, virtual_host = VHost } = Resource, - rabbit_queue:debug("Deleting auto-delete queue '~s' in vhost '~s' " ++ - Reason, - [QName, VHost]). + rabbit_log_queue:debug("Deleting auto-delete queue '~s' in vhost '~s' " ++ + Reason, + [QName, VHost]). + +needs_update_mirroring(Q, Version) -> + {ok, UpQ} = rabbit_amqqueue:lookup(Q#amqqueue.name), + DBVersion = UpQ#amqqueue.policy_version, + case DBVersion > Version of + true -> {rabbit_policy:get(<<"ha-mode">>, UpQ), DBVersion}; + false -> false + end. +update_mirroring(Policy, State = #q{backing_queue = BQ}) -> + case update_to(Policy, BQ) of + start_mirroring -> + start_mirroring(State); + stop_mirroring -> + stop_mirroring(State); + ignore -> + State; + update_ha_mode -> + update_ha_mode(State) + end. +update_to(undefined, rabbit_mirror_queue_master) -> + stop_mirroring; +update_to(_, rabbit_mirror_queue_master) -> + update_ha_mode; +update_to(undefined, BQ) when BQ =/= rabbit_mirror_queue_master -> + ignore; +update_to(_, BQ) when BQ =/= rabbit_mirror_queue_master -> + start_mirroring. + +start_mirroring(State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + %% lookup again to get policy for init_with_existing_bq + {ok, Q} = rabbit_amqqueue:lookup(qname(State)), + true = BQ =/= rabbit_mirror_queue_master, %% assertion + BQ1 = rabbit_mirror_queue_master, + BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS), + State#q{backing_queue = BQ1, + backing_queue_state = BQS1}. +stop_mirroring(State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + BQ = rabbit_mirror_queue_master, %% assertion + {BQ1, BQS1} = BQ:stop_mirroring(BQS), + State#q{backing_queue = BQ1, + backing_queue_state = BQS1}. + +update_ha_mode(State) -> + {ok, Q} = rabbit_amqqueue:lookup(qname(State)), + ok = rabbit_mirror_queue_misc:update_mirrors(Q), + State. diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 299e254c50..8904c1dd74 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -100,7 +100,8 @@ -define(INFO_KEYS, [source_name, source_kind, destination_name, destination_kind, - routing_key, arguments]). + routing_key, arguments, + vhost]). recover(XNames, QNames) -> rabbit_misc:table_filter( @@ -272,6 +273,7 @@ infos(Items, B) -> [{Item, i(Item, B)} || Item <- Items]. i(source_name, #binding{source = SrcName}) -> SrcName#resource.name; i(source_kind, #binding{source = SrcName}) -> SrcName#resource.kind; +i(vhost, #binding{source = SrcName}) -> SrcName#resource.virtual_host; i(destination_name, #binding{destination = DstName}) -> DstName#resource.name; i(destination_kind, #binding{destination = DstName}) -> DstName#resource.kind; i(routing_key, #binding{key = RoutingKey}) -> RoutingKey; diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 7f653c3780..bf50fd6c49 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -23,7 +23,7 @@ sync_queue/1, cancel_sync_queue/1, become/1, purge_queue/1]). --import(rabbit_misc, [rpc_call/4, rpc_call/5, rpc_call/7]). +-import(rabbit_misc, [rpc_call/4, rpc_call/5]). -define(EXTERNAL_CHECK_INTERVAL, 1000). @@ -37,6 +37,7 @@ reset, force_reset, rotate_logs, + hipe_compile, {join_cluster, [?RAM_DEF]}, change_cluster_node_type, @@ -113,7 +114,7 @@ [stop, stop_app, start_app, wait, reset, force_reset, rotate_logs, join_cluster, change_cluster_node_type, update_cluster_nodes, forget_cluster_node, rename_cluster_node, cluster_status, status, - environment, eval, force_boot, help, node_health_check]). + environment, eval, force_boot, help, node_health_check, hipe_compile]). -define(COMMANDS_WITH_TIMEOUT, [list_user_permissions, list_policies, list_queues, list_exchanges, @@ -380,6 +381,16 @@ action(rotate_logs, Node, [], _Opts, Inform) -> Inform("Rotating logs for node ~p", [Node]), call(Node, {rabbit, rotate_logs, []}); +action(hipe_compile, _Node, [TargetDir], _Opts, _Inform) -> + ok = application:load(rabbit), + case rabbit_hipe:can_hipe_compile() of + true -> + {ok, _, _} = rabbit_hipe:compile_to_directory(TargetDir), + ok; + false -> + {error, "HiPE compilation is not supported"} + end; + action(close_connection, Node, [PidStr, Explanation], _Opts, Inform) -> Inform("Closing connection \"~s\"", [PidStr]), rpc_call(Node, rabbit_networking, close_connection, @@ -579,56 +590,74 @@ action(purge_queue, Node, [Q], Opts, Inform, Timeout) -> action(list_users, Node, [], _Opts, Inform, Timeout) -> Inform("Listing users", []), - call(Node, {rabbit_auth_backend_internal, list_users, []}, - rabbit_auth_backend_internal:user_info_keys(), true, Timeout); + call_emitter(Node, {rabbit_auth_backend_internal, list_users, []}, + rabbit_auth_backend_internal:user_info_keys(), + [{timeout, Timeout}, to_bin_utf8]); action(list_permissions, Node, [], Opts, Inform, Timeout) -> VHost = proplists:get_value(?VHOST_OPT, Opts), Inform("Listing permissions in vhost \"~s\"", [VHost]), - call(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]}, - rabbit_auth_backend_internal:vhost_perms_info_keys(), true, Timeout, - true); + call_emitter(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]}, + rabbit_auth_backend_internal:vhost_perms_info_keys(), + [{timeout, Timeout}, to_bin_utf8, is_escaped]); action(list_parameters, Node, [], Opts, Inform, Timeout) -> VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), Inform("Listing runtime parameters", []), - call(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]}, - rabbit_runtime_parameters:info_keys(), Timeout); + call_emitter(Node, {rabbit_runtime_parameters, list_formatted, [VHostArg]}, + rabbit_runtime_parameters:info_keys(), + [{timeout, Timeout}]); action(list_policies, Node, [], Opts, Inform, Timeout) -> VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), Inform("Listing policies", []), - call(Node, {rabbit_policy, list_formatted, [VHostArg]}, - rabbit_policy:info_keys(), Timeout); + call_emitter(Node, {rabbit_policy, list_formatted, [VHostArg]}, + rabbit_policy:info_keys(), + [{timeout, Timeout}]); action(list_vhosts, Node, Args, _Opts, Inform, Timeout) -> Inform("Listing vhosts", []), ArgAtoms = default_if_empty(Args, [name]), - call(Node, {rabbit_vhost, info_all, []}, ArgAtoms, true, Timeout); + call_emitter(Node, {rabbit_vhost, info_all, []}, ArgAtoms, + [{timeout, Timeout}, to_bin_utf8]); action(list_user_permissions, _Node, _Args = [], _Opts, _Inform, _Timeout) -> {error_string, "list_user_permissions expects a username argument, but none provided."}; action(list_user_permissions, Node, Args = [_Username], _Opts, Inform, Timeout) -> Inform("Listing permissions for user ~p", Args), - call(Node, {rabbit_auth_backend_internal, list_user_permissions, Args}, - rabbit_auth_backend_internal:user_perms_info_keys(), true, Timeout, - true); + call_emitter(Node, {rabbit_auth_backend_internal, list_user_permissions, Args}, + rabbit_auth_backend_internal:user_perms_info_keys(), + [{timeout, Timeout}, to_bin_utf8, is_escaped]); action(list_queues, Node, Args, Opts, Inform, Timeout) -> - [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]), Inform("Listing queues", []), + %% User options + [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), ArgAtoms = default_if_empty(Args, [name, messages]), - call(Node, {rabbit_amqqueue, info_all, [VHostArg, ArgAtoms, Online, Offline]}, - ArgAtoms, Timeout); + + %% Data for emission + Nodes = nodes_in_cluster(Node, Timeout), + OnlineChunks = if Online -> length(Nodes); true -> 0 end, + OfflineChunks = if Offline -> 1; true -> 0 end, + ChunksOpt = {chunks, OnlineChunks + OfflineChunks}, + TimeoutOpt = {timeout, Timeout}, + EmissionRef = make_ref(), + EmissionRefOpt = {ref, EmissionRef}, + + _ = Online andalso start_emission(Node, {rabbit_amqqueue, emit_info_all, [Nodes, VHostArg, ArgAtoms]}, + [TimeoutOpt, EmissionRefOpt]), + _ = Offline andalso start_emission(Node, {rabbit_amqqueue, emit_info_down, [VHostArg, ArgAtoms]}, + [TimeoutOpt, EmissionRefOpt]), + display_emission_result(EmissionRef, ArgAtoms, [ChunksOpt, TimeoutOpt]); action(list_exchanges, Node, Args, Opts, Inform, Timeout) -> Inform("Listing exchanges", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), ArgAtoms = default_if_empty(Args, [name, type]), - call(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]}, - ArgAtoms, Timeout); + call_emitter(Node, {rabbit_exchange, info_all, [VHostArg, ArgAtoms]}, + ArgAtoms, [{timeout, Timeout}]); action(list_bindings, Node, Args, Opts, Inform, Timeout) -> Inform("Listing bindings", []), @@ -636,27 +665,31 @@ action(list_bindings, Node, Args, Opts, Inform, Timeout) -> ArgAtoms = default_if_empty(Args, [source_name, source_kind, destination_name, destination_kind, routing_key, arguments]), - call(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]}, - ArgAtoms, Timeout); + call_emitter(Node, {rabbit_binding, info_all, [VHostArg, ArgAtoms]}, + ArgAtoms, [{timeout, Timeout}]); action(list_connections, Node, Args, _Opts, Inform, Timeout) -> Inform("Listing connections", []), ArgAtoms = default_if_empty(Args, [user, peer_host, peer_port, state]), - call(Node, {rabbit_networking, connection_info_all, [ArgAtoms]}, - ArgAtoms, Timeout); + Nodes = nodes_in_cluster(Node, Timeout), + call_emitter(Node, {rabbit_networking, emit_connection_info_all, [Nodes, ArgAtoms]}, + ArgAtoms, [{timeout, Timeout}, {chunks, length(Nodes)}]); action(list_channels, Node, Args, _Opts, Inform, Timeout) -> Inform("Listing channels", []), ArgAtoms = default_if_empty(Args, [pid, user, consumer_count, messages_unacknowledged]), - call(Node, {rabbit_channel, info_all, [ArgAtoms]}, - ArgAtoms, Timeout); + Nodes = nodes_in_cluster(Node, Timeout), + call_emitter(Node, {rabbit_channel, emit_info_all, [Nodes, ArgAtoms]}, ArgAtoms, + [{timeout, Timeout}, {chunks, length(Nodes)}]); action(list_consumers, Node, _Args, Opts, Inform, Timeout) -> Inform("Listing consumers", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), - call(Node, {rabbit_amqqueue, consumers_all, [VHostArg]}, - rabbit_amqqueue:consumer_info_keys(), Timeout). + Nodes = nodes_in_cluster(Node, Timeout), + call_emitter(Node, {rabbit_amqqueue, emit_consumers_all, [Nodes, VHostArg]}, + rabbit_amqqueue:consumer_info_keys(), + [{timeout, Timeout}, {chunks, length(Nodes)}]). format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)). @@ -766,17 +799,18 @@ display_info_message_row(IsEscaped, Result, InfoItemKeys) -> {X, Value} -> Value end, IsEscaped) || X <- InfoItemKeys]). -display_info_message(IsEscaped) -> +display_info_message(IsEscaped, InfoItemKeys) -> fun ([], _) -> ok; - ([FirstResult|_] = List, InfoItemKeys) when is_list(FirstResult) -> + ([FirstResult|_] = List, _) when is_list(FirstResult) -> lists:foreach(fun(Result) -> display_info_message_row(IsEscaped, Result, InfoItemKeys) end, List), ok; - (Result, InfoItemKeys) -> - display_info_message_row(IsEscaped, Result, InfoItemKeys) + (Result, _) -> + display_info_message_row(IsEscaped, Result, InfoItemKeys), + ok end. display_info_list(Results, InfoItemKeys) when is_list(Results) -> @@ -833,7 +867,10 @@ display_call_result(Node, MFA) -> end. unsafe_rpc(Node, Mod, Fun, Args) -> - case rpc_call(Node, Mod, Fun, Args) of + unsafe_rpc(Node, Mod, Fun, Args, ?RPC_TIMEOUT). + +unsafe_rpc(Node, Mod, Fun, Args, Timeout) -> + case rpc_call(Node, Mod, Fun, Args, Timeout) of {badrpc, _} = Res -> throw(Res); Normal -> Normal end. @@ -852,33 +889,42 @@ ensure_app_running(Node) -> call(Node, {Mod, Fun, Args}) -> rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)). -call(Node, {Mod, Fun, Args}, InfoKeys, Timeout) -> - call(Node, {Mod, Fun, Args}, InfoKeys, false, Timeout, false). +call_emitter(Node, {Mod, Fun, Args}, InfoKeys, Opts) -> + Ref = start_emission(Node, {Mod, Fun, Args}, Opts), + display_emission_result(Ref, InfoKeys, Opts). + +start_emission(Node, {Mod, Fun, Args}, Opts) -> + ToBinUtf8 = proplists:get_value(to_bin_utf8, Opts, false), + Timeout = proplists:get_value(timeout, Opts, infinity), + Ref = proplists:get_value(ref, Opts, make_ref()), + rabbit_control_misc:spawn_emitter_caller( + Node, Mod, Fun, prepare_call_args(Args, ToBinUtf8), + Ref, self(), Timeout), + Ref. + +display_emission_result(Ref, InfoKeys, Opts) -> + IsEscaped = proplists:get_value(is_escaped, Opts, false), + Chunks = proplists:get_value(chunks, Opts, 1), + Timeout = proplists:get_value(timeout, Opts, infinity), + EmissionStatus = rabbit_control_misc:wait_for_info_messages( + self(), Ref, display_info_message(IsEscaped, InfoKeys), ok, Timeout, Chunks), + emission_to_action_result(EmissionStatus). + +%% Convert rabbit_control_misc:wait_for_info_messages/6 return value +%% into form expected by rabbit_cli:main/3. +emission_to_action_result({ok, ok}) -> + ok; +emission_to_action_result({error, Error}) -> + Error. -call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout) -> - call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout, false). +prepare_call_args(Args, ToBinUtf8) -> + case ToBinUtf8 of + true -> valid_utf8_args(Args); + false -> Args + end. -call(Node, {Mod, Fun, Args}, InfoKeys, ToBinUtf8, Timeout, IsEscaped) -> - Args0 = case ToBinUtf8 of - true -> lists:map(fun list_to_binary_utf8/1, Args); - false -> Args - end, - Ref = make_ref(), - Pid = self(), - spawn_link( - fun () -> - case rabbit_cli:rpc_call(Node, Mod, Fun, Args0, - Ref, Pid, Timeout) of - {error, _} = Error -> - Pid ! {error, Error}; - {bad_argument, _} = Error -> - Pid ! {error, Error}; - _ -> - ok - end - end), - rabbit_control_misc:wait_for_info_messages( - Pid, Ref, InfoKeys, display_info_message(IsEscaped), Timeout). +valid_utf8_args(Args) -> + lists:map(fun list_to_binary_utf8/1, Args). list_to_binary_utf8(L) -> B = list_to_binary(L), @@ -928,7 +974,10 @@ split_list([_]) -> exit(even_list_needed); split_list([A, B | T]) -> [{A, B} | split_list(T)]. nodes_in_cluster(Node) -> - unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running]). + unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], ?RPC_TIMEOUT). + +nodes_in_cluster(Node, Timeout) -> + unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running], Timeout). alarms_by_node(Name) -> Status = unsafe_rpc(Name, rabbit, status, []), diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl index 252405d62b..b5182ee2e0 100644 --- a/src/rabbit_dead_letter.erl +++ b/src/rabbit_dead_letter.erl @@ -53,7 +53,7 @@ make_msg(Msg = #basic_message{content = Content, _ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end} end, ReasonBin = list_to_binary(atom_to_list(Reason)), - TimeSec = time_compat:os_system_time(seconds), + TimeSec = os:system_time(seconds), PerMsgTTL = per_msg_ttl_header(Content#content.properties), HeadersFun2 = fun (Headers) -> @@ -139,7 +139,19 @@ update_x_death_header(Info, Headers) -> end, rabbit_misc:set_table_value( Headers, <<"x-death">>, array, - [{table, rabbit_misc:sort_field_table(Info1)} | Others]) + [{table, rabbit_misc:sort_field_table(Info1)} | Others]); + {<<"x-death">>, InvalidType, Header} -> + rabbit_log:warning("Message has invalid x-death header (type: ~p)." + " Resetting header ~p~n", + [InvalidType, Header]), + %% if x-death is something other than an array (list) + %% then we reset it: this happens when some clients consume + %% a message and re-publish is, converting header values + %% to strings, intentionally or not. + %% See rabbitmq/rabbitmq-server#767 for details. + rabbit_misc:set_table_value( + Headers, <<"x-death">>, array, + [{table, [{<<"count">>, long, 1} | Info]}]) end. ensure_xdeath_event_count({table, Info}, InitialVal) when InitialVal >= 1 -> diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 35d7eb7940..b5970274d4 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -76,8 +76,8 @@ connect({Username, none}, VHost, Protocol, Pid, Infos) -> VHost, Protocol, Pid, Infos); connect({Username, Password}, VHost, Protocol, Pid, Infos) -> - connect0(fun () -> rabbit_access_control:check_user_pass_login( - Username, Password) end, + connect0(fun () -> rabbit_access_control:check_user_login( + Username, [{password, Password}, {vhost, VHost}]) end, VHost, Protocol, Pid, Infos). connect0(AuthFun, VHost, Protocol, Pid, Infos) -> diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl index 88a8096fd4..a56b92b501 100644 --- a/src/rabbit_disk_monitor.erl +++ b/src/rabbit_disk_monitor.erl @@ -213,9 +213,11 @@ get_disk_free(Dir) -> get_disk_free(Dir, {unix, Sun}) when Sun =:= sunos; Sun =:= sunos4; Sun =:= solaris -> - parse_free_unix(rabbit_misc:os_cmd("/usr/bin/df -k " ++ Dir)); + Df = os:find_executable("df"), + parse_free_unix(rabbit_misc:os_cmd(Df ++ " -k " ++ Dir)); get_disk_free(Dir, {unix, _}) -> - parse_free_unix(rabbit_misc:os_cmd("/bin/df -kP " ++ Dir)); + Df = os:find_executable("df"), + parse_free_unix(rabbit_misc:os_cmd(Df ++ " -kP " ++ Dir)); get_disk_free(Dir, {win32, _}) -> parse_free_win32(rabbit_misc:os_cmd("dir /-C /W \"" ++ Dir ++ "\"")). diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index efe8495299..0103d4e503 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -100,7 +100,7 @@ publish(_Other, _Format, _Data, _State) -> publish1(RoutingKey, Format, Data, LogExch) -> %% 0-9-1 says the timestamp is a "64 bit POSIX timestamp". That's %% second resolution, not millisecond. - Timestamp = time_compat:os_system_time(seconds), + Timestamp = os:system_time(seconds), Args = [truncate:term(A, ?LOG_TRUNC) || A <- Data], Headers = [{<<"node">>, longstr, list_to_binary(atom_to_list(node()))}], diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 2e9afbfd2e..5d646e0c90 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -346,11 +346,17 @@ i(policy, X) -> case rabbit_policy:name(X) of none -> ''; Policy -> Policy end; -i(Item, _) -> throw({bad_argument, Item}). +i(Item, #exchange{type = Type} = X) -> + case (type_to_module(Type)):info(X, [Item]) of + [{Item, I}] -> I; + [] -> throw({bad_argument, Item}) + end. -info(X = #exchange{}) -> infos(?INFO_KEYS, X). +info(X = #exchange{type = Type}) -> + infos(?INFO_KEYS, X) ++ (type_to_module(Type)):info(X). -info(X = #exchange{}, Items) -> infos(Items, X). +info(X = #exchange{type = _Type}, Items) -> + infos(Items, X). info_all(VHostPath) -> map(VHostPath, fun (X) -> info(X) end). diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index 8a6886e376..ed675b572a 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -23,6 +23,7 @@ -export([validate/1, validate_binding/2, create/2, delete/3, policy_changed/2, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). +-export([info/1, info/2]). -rabbit_boot_step({?MODULE, [{description, "exchange type direct"}, @@ -31,6 +32,9 @@ {requires, rabbit_registry}, {enables, kernel_ready}]}). +info(_X) -> []. +info(_X, _) -> []. + description() -> [{description, <<"AMQP direct exchange, as per the AMQP specification">>}]. diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index d81e407f8f..3aebc07b41 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -23,6 +23,7 @@ -export([validate/1, validate_binding/2, create/2, delete/3, policy_changed/2, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). +-export([info/1, info/2]). -rabbit_boot_step({?MODULE, [{description, "exchange type fanout"}, @@ -31,6 +32,9 @@ {requires, rabbit_registry}, {enables, kernel_ready}]}). +info(_X) -> []. +info(_X, _) -> []. + description() -> [{description, <<"AMQP fanout exchange, as per the AMQP specification">>}]. diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 444d507c7e..cd41b903c2 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -24,6 +24,7 @@ -export([validate/1, validate_binding/2, create/2, delete/3, policy_changed/2, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). +-export([info/1, info/2]). -rabbit_boot_step({?MODULE, [{description, "exchange type headers"}, @@ -37,6 +38,9 @@ rabbit_framing:amqp_table()) -> boolean()). -endif. +info(_X) -> []. +info(_X, _) -> []. + description() -> [{description, <<"AMQP headers exchange, as per the AMQP specification">>}]. diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl index c8ca7ecae4..b2e2798e3a 100644 --- a/src/rabbit_exchange_type_invalid.erl +++ b/src/rabbit_exchange_type_invalid.erl @@ -23,6 +23,10 @@ -export([validate/1, validate_binding/2, create/2, delete/3, policy_changed/2, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). +-export([info/1, info/2]). + +info(_X) -> []. +info(_X, _) -> []. description() -> [{description, diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 0eccb66cfd..60be070426 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -24,6 +24,7 @@ -export([validate/1, validate_binding/2, create/2, delete/3, policy_changed/2, add_binding/3, remove_bindings/3, assert_args_equivalence/2]). +-export([info/1, info/2]). -rabbit_boot_step({?MODULE, [{description, "exchange type topic"}, @@ -34,6 +35,9 @@ %%---------------------------------------------------------------------------- +info(_X) -> []. +info(_X, _) -> []. + description() -> [{description, <<"AMQP topic exchange, as per the AMQP specification">>}]. diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl index 3dd0421485..d2d37f0ec0 100644 --- a/src/rabbit_file.erl +++ b/src/rabbit_file.erl @@ -24,6 +24,7 @@ -export([rename/2, delete/1, recursive_delete/1, recursive_copy/2]). -export([lock_file/1]). -export([read_file_info/1]). +-export([filename_as_a_directory/1]). -import(file_handle_cache, [with_handle/1, with_handle/2]). @@ -59,6 +60,7 @@ (file:filename(), file:filename()) -> rabbit_types:ok_or_error({file:filename(), file:filename(), any()})). -spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')). +-spec(filename_as_a_directory/1 :: (file:filename()) -> file:filename()). -endif. @@ -306,3 +308,11 @@ lock_file(Path) -> ok = prim_file:close(Lock) end) end. + +filename_as_a_directory(FileName) -> + case lists:last(FileName) of + "/" -> + FileName; + _ -> + FileName ++ "/" + end. diff --git a/src/rabbit_hipe.erl b/src/rabbit_hipe.erl index 05b5f3719d..6957d85cb4 100644 --- a/src/rabbit_hipe.erl +++ b/src/rabbit_hipe.erl @@ -5,15 +5,15 @@ %% practice 2 processes seems just as fast as any other number > 1, %% and keeps the progress bar realistic-ish. -define(HIPE_PROCESSES, 2). --export([maybe_hipe_compile/0, log_hipe_result/1]). -%% HiPE compilation happens before we have log handlers - so we have -%% to io:format/2, it's all we can do. +-export([maybe_hipe_compile/0, log_hipe_result/1]). +-export([compile_to_directory/1]). +-export([can_hipe_compile/0]). +%% Compile and load during server startup sequence maybe_hipe_compile() -> {ok, Want} = application:get_env(rabbit, hipe_compile), - Can = code:which(hipe) =/= non_existing, - case {Want, Can} of + case {Want, can_hipe_compile()} of {true, true} -> hipe_compile(); {true, false} -> false; {false, _} -> {ok, disabled} @@ -33,42 +33,53 @@ log_hipe_result(false) -> rabbit_log:warning( "Not HiPE compiling: HiPE not found in this Erlang installation.~n"). +hipe_compile() -> + hipe_compile(fun compile_and_load/1, false). + +compile_to_directory(Dir0) -> + Dir = rabbit_file:filename_as_a_directory(Dir0), + ok = prepare_ebin_directory(Dir), + hipe_compile(fun (Mod) -> compile_and_save(Mod, Dir) end, true). + +needs_compilation(Mod, Force) -> + Exists = code:which(Mod) =/= non_existing, + %% We skip modules already natively compiled. This + %% happens when RabbitMQ is stopped (just the + %% application, not the entire node) and started + %% again. + NotYetCompiled = not already_hipe_compiled(Mod), + NotVersioned = not compiled_with_version_support(Mod), + Exists andalso (Force orelse (NotYetCompiled andalso NotVersioned)). + %% HiPE compilation happens before we have log handlers and can take a %% long time, so make an exception to our no-stdout policy and display %% progress via stdout. -hipe_compile() -> +hipe_compile(CompileFun, Force) -> {ok, HipeModulesAll} = application:get_env(rabbit, hipe_modules), - HipeModules = [HM || HM <- HipeModulesAll, - code:which(HM) =/= non_existing andalso - %% We skip modules already natively compiled. This - %% happens when RabbitMQ is stopped (just the - %% application, not the entire node) and started - %% again. - already_hipe_compiled(HM) - andalso (not compiled_with_version_support(HM))], + HipeModules = lists:filter(fun(Mod) -> needs_compilation(Mod, Force) end, HipeModulesAll), case HipeModules of [] -> {ok, already_compiled}; - _ -> do_hipe_compile(HipeModules) + _ -> do_hipe_compile(HipeModules, CompileFun) end. already_hipe_compiled(Mod) -> try %% OTP 18.x or later - Mod:module_info(native) =:= false + Mod:module_info(native) =:= true %% OTP prior to 18.x catch error:badarg -> - code:is_module_native(Mod) =:= false + code:is_module_native(Mod) =:= true end. compiled_with_version_support(Mod) -> proplists:get_value(erlang_version_support, Mod:module_info(attributes)) =/= undefined. -do_hipe_compile(HipeModules) -> +do_hipe_compile(HipeModules, CompileFun) -> Count = length(HipeModules), io:format("~nHiPE compiling: |~s|~n |", [string:copies("-", Count)]), - T1 = time_compat:monotonic_time(), + T1 = erlang:monotonic_time(), %% We use code:get_object_code/1 below to get the beam binary, %% instead of letting hipe get it itself, because hipe:c/{1,2} %% expects the given filename to actually exist on disk: it does not @@ -79,11 +90,7 @@ do_hipe_compile(HipeModules) -> %% advanced API does not load automatically the code, except if the %% 'load' option is set. PidMRefs = [spawn_monitor(fun () -> [begin - {M, Beam, _} = - code:get_object_code(M), - {ok, _} = - hipe:compile(M, [], Beam, - [o3, load]), + CompileFun(M), io:format("#") end || M <- Ms] end) || @@ -92,8 +99,8 @@ do_hipe_compile(HipeModules) -> {'DOWN', MRef, process, _, normal} -> ok; {'DOWN', MRef, process, _, Reason} -> exit(Reason) end || {_Pid, MRef} <- PidMRefs], - T2 = time_compat:monotonic_time(), - Duration = time_compat:convert_time_unit(T2 - T1, native, seconds), + T2 = erlang:monotonic_time(), + Duration = erlang:convert_time_unit(T2 - T1, native, seconds), io:format("|~n~nCompiled ~B modules in ~Bs~n", [Count, Duration]), {ok, Count, Duration}. @@ -101,3 +108,39 @@ split(L, N) -> split0(L, [[] || _ <- lists:seq(1, N)]). split0([], Ls) -> Ls; split0([I | Is], [L | Ls]) -> split0(Is, Ls ++ [[I | L]]). + +prepare_ebin_directory(Dir) -> + ok = rabbit_file:ensure_dir(Dir), + ok = delete_beam_files(Dir), + ok. + +delete_beam_files(Dir) -> + {ok, Files} = file:list_dir(Dir), + lists:foreach(fun(File) -> + case filename:extension(File) of + ".beam" -> + ok = file:delete(filename:join([Dir, File])); + _ -> + ok + end + end, + Files). + +compile_and_load(Mod) -> + {Mod, Beam, _} = code:get_object_code(Mod), + {ok, _} = hipe:compile(Mod, [], Beam, [o3, load]). + +compile_and_save(Module, Dir) -> + {Module, BeamCode, _} = code:get_object_code(Module), + BeamName = filename:join([Dir, atom_to_list(Module) ++ ".beam"]), + {ok, {Architecture, NativeCode}} = hipe:compile(Module, [], BeamCode, [o3]), + {ok, _, Chunks0} = beam_lib:all_chunks(BeamCode), + ChunkName = hipe_unified_loader:chunk_name(Architecture), + Chunks1 = lists:keydelete(ChunkName, 1, Chunks0), + Chunks = Chunks1 ++ [{ChunkName,NativeCode}], + {ok, BeamPlusNative} = beam_lib:build_module(Chunks), + ok = file:write_file(BeamName, BeamPlusNative), + BeamName. + +can_hipe_compile() -> + code:which(hipe) =/= non_existing. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index e447e9de82..9674a4ef2c 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -363,7 +363,7 @@ fetch(AckRequired, State = #state { backing_queue = BQ, State1 = State #state { backing_queue_state = BQS1 }, {Result, case Result of empty -> State1; - {_MsgId, _IsDelivered, AckTag} -> drop_one(AckTag, State1) + {_MsgId, _IsDelivered, _AckTag} -> drop_one(AckRequired, State1) end}. drop(AckRequired, State = #state { backing_queue = BQ, @@ -372,7 +372,7 @@ drop(AckRequired, State = #state { backing_queue = BQ, State1 = State #state { backing_queue_state = BQS1 }, {Result, case Result of empty -> State1; - {_MsgId, AckTag} -> drop_one(AckTag, State1) + {_MsgId, _AckTag} -> drop_one(AckRequired, State1) end}. ack(AckTags, State = #state { gm = GM, @@ -518,6 +518,7 @@ promote_backing_queue_state(QName, CPid, BQ, BQS, GM, AckTags, Seen, KS) -> Depth = BQ:depth(BQS1), true = Len == Depth, %% ASSERTION: everything must have been requeued ok = gm:broadcast(GM, {depth, Depth}), + WaitTimeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000), #state { name = QName, gm = GM, coordinator = CPid, @@ -525,7 +526,8 @@ promote_backing_queue_state(QName, CPid, BQ, BQS, GM, AckTags, Seen, KS) -> backing_queue_state = BQS1, seen_status = Seen, confirmed = [], - known_senders = sets:from_list(KS) }. + known_senders = sets:from_list(KS), + wait_timeout = WaitTimeout }. sender_death_fun() -> Self = self(), @@ -556,10 +558,10 @@ depth_fun() -> %% Helpers %% --------------------------------------------------------------------------- -drop_one(AckTag, State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS }) -> - ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckTag =/= undefined}), +drop_one(AckRequired, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckRequired}), State. drop(PrevLen, AckRequired, State = #state { gm = GM, diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 849efa3611..b188298a9b 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -20,7 +20,7 @@ -export([remove_from_queue/3, on_node_up/0, add_mirrors/3, report_deaths/4, store_updated_slaves/1, initial_queue_node/2, suggested_queue_nodes/1, - is_mirrored/1, update_mirrors/2, validate_policy/1, + is_mirrored/1, update_mirrors/2, update_mirrors/1, validate_policy/1, maybe_auto_sync/1, maybe_drop_master_after_sync/1, sync_batch_size/1, log_info/3, log_warning/3]). @@ -64,6 +64,8 @@ -spec(is_mirrored/1 :: (rabbit_types:amqqueue()) -> boolean()). -spec(update_mirrors/2 :: (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). +-spec(update_mirrors/1 :: + (rabbit_types:amqqueue()) -> 'ok'). -spec(maybe_drop_master_after_sync/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(maybe_auto_sync/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(log_info/3 :: (rabbit_amqqueue:name(), string(), [any()]) -> 'ok'). @@ -384,15 +386,12 @@ update_mirrors(OldQ = #amqqueue{pid = QPid}, NewQ = #amqqueue{pid = QPid}) -> case {is_mirrored(OldQ), is_mirrored(NewQ)} of {false, false} -> ok; - {true, false} -> rabbit_amqqueue:stop_mirroring(QPid); - {false, true} -> rabbit_amqqueue:start_mirroring(QPid); - {true, true} -> update_mirrors0(OldQ, NewQ) + _ -> rabbit_amqqueue:update_mirroring(QPid) end. -update_mirrors0(OldQ = #amqqueue{name = QName}, - NewQ = #amqqueue{name = QName}) -> - {OldMNode, OldSNodes, _} = actual_queue_nodes(OldQ), - {NewMNode, NewSNodes} = suggested_queue_nodes(NewQ), +update_mirrors(Q = #amqqueue{name = QName}) -> + {OldMNode, OldSNodes, _} = actual_queue_nodes(Q), + {NewMNode, NewSNodes} = suggested_queue_nodes(Q), OldNodes = [OldMNode | OldSNodes], NewNodes = [NewMNode | NewSNodes], %% When a mirror dies, remove_from_queue/2 might have to add new @@ -406,7 +405,7 @@ update_mirrors0(OldQ = #amqqueue{name = QName}, drop_mirrors(QName, OldNodes -- NewNodes), %% This is for the case where no extra nodes were added but we changed to %% a policy requiring auto-sync. - maybe_auto_sync(NewQ), + maybe_auto_sync(Q), ok. %% The arrival of a newly synced slave may cause the master to die if diff --git a/src/rabbit_mirror_queue_mode_exactly.erl b/src/rabbit_mirror_queue_mode_exactly.erl index 4721ad6136..28ed8ca463 100644 --- a/src/rabbit_mirror_queue_mode_exactly.erl +++ b/src/rabbit_mirror_queue_mode_exactly.erl @@ -46,8 +46,8 @@ suggested_queue_nodes(Count, MNode, SNodes, _SSNodes, Poss) -> shuffle(L) -> random:seed(erlang:phash2([node()]), - time_compat:monotonic_time(), - time_compat:unique_integer()), + erlang:monotonic_time(), + erlang:unique_integer()), {_, L1} = lists:unzip(lists:keysort(1, [{random:uniform(), N} || N <- L])), L1. diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index a97a9b50c8..898aa5abcf 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -108,7 +108,7 @@ master_batch_go0(Args, BatchSize, BQ, BQS) -> false -> {cont, Acc1} end end, - FoldAcc = {[], 0, {0, BQ:depth(BQS)}, time_compat:monotonic_time()}, + FoldAcc = {[], 0, {0, BQ:depth(BQS)}, erlang:monotonic_time()}, bq_fold(FoldFun, FoldAcc, Args, BQ, BQS). master_batch_send({Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, @@ -168,12 +168,12 @@ stop_syncer(Syncer, Msg) -> end. maybe_emit_stats(Last, I, EmitStats, Log) -> - Interval = time_compat:convert_time_unit( - time_compat:monotonic_time() - Last, native, micro_seconds), + Interval = erlang:convert_time_unit( + erlang:monotonic_time() - Last, native, micro_seconds), case Interval > ?SYNC_PROGRESS_INTERVAL of true -> EmitStats({syncing, I}), Log("~p messages", [I]), - time_compat:monotonic_time(); + erlang:monotonic_time(); false -> Last end. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index afd0508aac..6a57f6bb2c 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -423,6 +423,7 @@ cluster_status(WhichNodes) -> node_info() -> {rabbit_misc:otp_release(), rabbit_misc:version(), + mnesia:system_info(protocol_version), cluster_status_from_mnesia()}. node_type() -> @@ -593,26 +594,37 @@ check_cluster_consistency() -> end. check_cluster_consistency(Node, CheckNodesConsistency) -> - case rpc:call(Node, rabbit_mnesia, node_info, []) of + case remote_node_info(Node) of {badrpc, _Reason} -> {error, not_found}; - {_OTP, _Rabbit, {error, _}} -> + {_OTP, Rabbit, DelegateModuleHash, _Status} when is_binary(DelegateModuleHash) -> + %% when a delegate module .beam file hash is present + %% in the tuple, we are dealing with an old version + rabbit_version:version_error("Rabbit", rabbit_misc:version(), Rabbit); + {_OTP, _Rabbit, _Protocol, {error, _}} -> {error, not_found}; - {OTP, Rabbit, {ok, Status}} when CheckNodesConsistency -> - case check_consistency(OTP, Rabbit, Node, Status) of + {OTP, Rabbit, Protocol, {ok, Status}} when CheckNodesConsistency -> + case check_consistency(Node, OTP, Rabbit, Protocol, Status) of {error, _} = E -> E; {ok, Res} -> {ok, Res} end; - {OTP, Rabbit, {ok, Status}} -> - case check_consistency(OTP, Rabbit) of + {OTP, Rabbit, Protocol, {ok, Status}} -> + case check_consistency(Node, OTP, Rabbit, Protocol) of {error, _} = E -> E; ok -> {ok, Status} - end; - {_OTP, Rabbit, _Hash, _Status} -> - %% delegate hash checking implies version mismatch - rabbit_version:version_error("Rabbit", rabbit_misc:version(), Rabbit) + end + end. + +remote_node_info(Node) -> + case rpc:call(Node, rabbit_mnesia, node_info, []) of + {badrpc, _} = Error -> Error; + %% RabbitMQ prior to 3.6.2 + {OTP, Rabbit, Status} -> {OTP, Rabbit, unsupported, Status}; + %% RabbitMQ 3.6.2 or later + {OTP, Rabbit, Protocol, Status} -> {OTP, Rabbit, Protocol, Status} end. + %%-------------------------------------------------------------------- %% Hooks for `rabbit_node_monitor' %%-------------------------------------------------------------------- @@ -763,14 +775,14 @@ change_extra_db_nodes(ClusterNodes0, CheckOtherNodes) -> Nodes end. -check_consistency(OTP, Rabbit) -> +check_consistency(Node, OTP, Rabbit, ProtocolVersion) -> rabbit_misc:sequence_error( - [rabbit_version:check_otp_consistency(OTP), + [check_mnesia_or_otp_consistency(Node, ProtocolVersion, OTP), check_rabbit_consistency(Rabbit)]). -check_consistency(OTP, Rabbit, Node, Status) -> +check_consistency(Node, OTP, Rabbit, ProtocolVersion, Status) -> rabbit_misc:sequence_error( - [rabbit_version:check_otp_consistency(OTP), + [check_mnesia_or_otp_consistency(Node, ProtocolVersion, OTP), check_rabbit_consistency(Rabbit), check_nodes_consistency(Node, Status)]). @@ -785,6 +797,55 @@ check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) -> [node(), Node, Node])}} end. +check_mnesia_or_otp_consistency(_Node, unsupported, OTP) -> + rabbit_version:check_otp_consistency(OTP); +check_mnesia_or_otp_consistency(Node, ProtocolVersion, _) -> + check_mnesia_consistency(Node, ProtocolVersion). + +check_mnesia_consistency(Node, ProtocolVersion) -> + % If mnesia is running we will just check protocol version + % If it's not running, we don't want it to join cluster until all checks pass + % so we start it without `dir` env variable to prevent + % joining cluster and/or corrupting data + with_running_or_clean_mnesia(fun() -> + case negotiate_protocol([Node]) of + [Node] -> ok; + [] -> + LocalVersion = mnesia:system_info(protocol_version), + {error, {inconsistent_cluster, + rabbit_misc:format("Mnesia protocol negotiation failed." + " Local version: ~p." + " Remote version ~p", + [LocalVersion, ProtocolVersion])}} + end + end). + +negotiate_protocol([Node]) -> + mnesia_monitor:negotiate_protocol([Node]). + +with_running_or_clean_mnesia(Fun) -> + IsMnesiaRunning = case mnesia:system_info(is_running) of + yes -> true; + no -> false; + stopping -> + ensure_mnesia_not_running(), + false; + starting -> + ensure_mnesia_running(), + true + end, + case IsMnesiaRunning of + true -> Fun(); + false -> + {ok, MnesiaDir} = application:get_env(mnesia, dir), + application:unset_env(mnesia, dir), + mnesia:start(), + Result = Fun(), + application:stop(mnesia), + application:set_env(mnesia, dir, MnesiaDir), + Result + end. + check_rabbit_consistency(Remote) -> rabbit_version:check_version_consistency( rabbit_misc:version(), Remote, "Rabbit", @@ -819,16 +880,20 @@ find_auto_cluster_node([Node | Nodes]) -> "Could not auto-cluster with ~s: " ++ Fmt, [Node | Args]), find_auto_cluster_node(Nodes) end, - case rpc:call(Node, rabbit_mnesia, node_info, []) of - {badrpc, _} = Reason -> Fail("~p~n", [Reason]); + case remote_node_info(Node) of + {badrpc, _} = Reason -> + Fail("~p~n", [Reason]); %% old delegate hash check - {_OTP, RMQ, _Hash, _} -> Fail("version ~s~n", [RMQ]); - {_OTP, _RMQ, {error, _} = E} -> Fail("~p~n", [E]); - {OTP, RMQ, _} -> case check_consistency(OTP, RMQ) of - {error, _} -> Fail("versions ~p~n", - [{OTP, RMQ}]); - ok -> {ok, Node} - end + {_OTP, RMQ, Hash, _} when is_binary(Hash) -> + Fail("version ~s~n", [RMQ]); + {_OTP, _RMQ, _Protocol, {error, _} = E} -> + Fail("~p~n", [E]); + {OTP, RMQ, Protocol, _} -> + case check_consistency(Node, OTP, RMQ, Protocol) of + {error, _} -> Fail("versions ~p~n", + [{OTP, RMQ}]); + ok -> {ok, Node} + end end. is_only_clustered_disc_node() -> diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 6f41836b98..976f4a4b2f 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -414,7 +414,12 @@ handle_cast({check_partial_partition, Node, Rep, NodeGUID, MyGUID, RepGUID}, fun () -> case rpc:call(Node, rabbit, is_running, []) of {badrpc, _} -> ok; - _ -> cast(Rep, {partial_partition, + _ -> + rabbit_log:warning("Received a 'DOWN' message" + " from ~p but still can" + " communicate with it ~n", + [Node]), + cast(Rep, {partial_partition, Node, node(), RepGUID}) end end); diff --git a/src/rabbit_password.erl b/src/rabbit_password.erl index d5b0945de9..8d5cf8d69e 100644 --- a/src/rabbit_password.erl +++ b/src/rabbit_password.erl @@ -36,8 +36,8 @@ hash(HashingMod, Cleartext) -> generate_salt() -> random:seed(erlang:phash2([node()]), - time_compat:monotonic_time(), - time_compat:unique_integer()), + erlang:monotonic_time(), + erlang:unique_integer()), Salt = random:uniform(16#ffffffff), <<Salt:32>>. diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index 8f7319182e..47574a9d55 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -253,8 +253,7 @@ prepare_plugins(Enabled) -> Wanted = dependencies(false, Enabled, AllPlugins), WantedPlugins = lookup_plugins(Wanted, AllPlugins), {ValidPlugins, Problems} = validate_plugins(WantedPlugins), - %% TODO: error message formatting - rabbit_log:warning(format_invalid_plugins(Problems)), + maybe_warn_about_invalid_plugins(Problems), case filelib:ensure_dir(ExpandDir ++ "/") of ok -> ok; {error, E2} -> throw({error, {cannot_create_plugins_expand_dir, @@ -266,6 +265,13 @@ prepare_plugins(Enabled) -> PluginAppDescPath <- filelib:wildcard(ExpandDir ++ "/*/ebin/*.app")], Wanted. +maybe_warn_about_invalid_plugins([]) -> + ok; +maybe_warn_about_invalid_plugins(InvalidPlugins) -> + %% TODO: error message formatting + rabbit_log:warning(format_invalid_plugins(InvalidPlugins)). + + format_invalid_plugins(InvalidPlugins) -> lists:flatten(["Failed to enable some plugins: \r\n" | [format_invalid_plugin(Plugin) diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index 1f7e521dfd..a9caadf972 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -221,11 +221,11 @@ validate(_VHost, <<"policy">>, Name, Term, _User) -> Name, policy_validation(), Term). notify(VHost, <<"policy">>, Name, Term) -> - rabbit_event:notify(policy_set, [{name, Name} | Term]), + rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost} | Term]), update_policies(VHost). notify_clear(VHost, <<"policy">>, Name) -> - rabbit_event:notify(policy_cleared, [{name, Name}]), + rabbit_event:notify(policy_cleared, [{name, Name}, {vhost, VHost}]), update_policies(VHost). %%---------------------------------------------------------------------------- @@ -242,8 +242,10 @@ update_policies(VHost) -> fun() -> [mnesia:lock({table, T}, write) || T <- Tabs], %% [1] case catch list(VHost) of - {error, {no_such_vhost, _}} -> - ok; %% [2] + {'EXIT', {throw, {error, {no_such_vhost, _}}}} -> + {[], []}; %% [2] + {'EXIT', Exit} -> + exit(Exit); Policies -> {[update_exchange(X, Policies) || X <- rabbit_exchange:list(VHost)], @@ -274,7 +276,9 @@ update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) -> NewPolicy -> case rabbit_amqqueue:update( QName, fun(Q1) -> rabbit_queue_decorator:set( - Q1#amqqueue{policy = NewPolicy}) + Q1#amqqueue{policy = NewPolicy, + policy_version = + Q1#amqqueue.policy_version + 1 }) end) of #amqqueue{} = Q1 -> {Q, Q1}; not_found -> {Q, Q } diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl index b58a8c535e..a3bfb5cdfa 100644 --- a/src/rabbit_priority_queue.erl +++ b/src/rabbit_priority_queue.erl @@ -205,8 +205,8 @@ publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough1(publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS)). -batch_publish(Publishes, ChPid, Flow, State = #state{bq = BQ}) -> - PubDict = partition_publish_batch(Publishes), +batch_publish(Publishes, ChPid, Flow, State = #state{bq = BQ, bqss = [{MaxP, _} |_]}) -> + PubDict = partition_publish_batch(Publishes, MaxP), lists:foldl( fun ({Priority, Pubs}, St) -> pick1(fun (_P, BQSN) -> @@ -227,8 +227,8 @@ publish_delivered(Msg, MsgProps, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough2(publish_delivered(Msg, MsgProps, ChPid, Flow, BQS)). -batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ}) -> - PubDict = partition_publish_delivered_batch(Publishes), +batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ, bqss = [{MaxP, _} |_]}) -> + PubDict = partition_publish_delivered_batch(Publishes, MaxP), {PrioritiesAndAcks, State1} = lists:foldl( fun ({Priority, Pubs}, {PriosAndAcks, St}) -> @@ -404,7 +404,6 @@ msg_rates(#state{bq = BQ, bqss = BQSs}) -> end, {0.0, 0.0}, BQSs); msg_rates(#passthrough{bq = BQ, bqs = BQS}) -> BQ:msg_rates(BQS). - info(backing_queue_status, #state{bq = BQ, bqss = BQSs}) -> fold0(fun (P, BQSN, Acc) -> combine_status(P, BQ:info(backing_queue_status, BQSN), Acc) @@ -433,8 +432,8 @@ set_queue_mode(Mode, State = #state{bq = BQ}) -> set_queue_mode(Mode, State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough1(set_queue_mode(Mode, BQS)). -zip_msgs_and_acks(Msgs, AckTags, Accumulator, #state{}) -> - MsgsByPriority = partition_publish_delivered_batch(Msgs), +zip_msgs_and_acks(Msgs, AckTags, Accumulator, #state{bqss = [{MaxP, _} |_]}) -> + MsgsByPriority = partition_publish_delivered_batch(Msgs, MaxP), lists:foldl(fun (Acks, MAs) -> {P, _AckTag} = hd(Acks), Pubs = orddict:fetch(P, MsgsByPriority), @@ -484,13 +483,14 @@ foreach1(_Fun, [], BQSAcc) -> %% For a given thing, just go to its BQ pick1(Fun, Prioritisable, #state{bqss = BQSs} = State) -> - {P, BQSN} = priority(Prioritisable, BQSs), + {P, BQSN} = priority_bq(Prioritisable, BQSs), a(State#state{bqss = bq_store(P, Fun(P, BQSN), BQSs)}). %% Fold over results fold2(Fun, Acc, State = #state{bqss = BQSs}) -> {Res, BQSs1} = fold2(Fun, Acc, BQSs, []), {Res, a(State#state{bqss = BQSs1})}. + fold2(Fun, Acc, [{P, BQSN} | Rest], BQSAcc) -> {Acc1, BQSN1} = Fun(P, BQSN, Acc), fold2(Fun, Acc1, Rest, [{P, BQSN1} | BQSAcc]); @@ -532,7 +532,7 @@ fold_by_acktags2(Fun, AckTags, State) -> %% For a given thing, just go to its BQ pick2(Fun, Prioritisable, #state{bqss = BQSs} = State) -> - {P, BQSN} = priority(Prioritisable, BQSs), + {P, BQSN} = priority_bq(Prioritisable, BQSs), {Res, BQSN1} = Fun(P, BQSN), {Res, a(State#state{bqss = bq_store(P, BQSN1, BQSs)})}. @@ -563,8 +563,8 @@ findfold3(Fun, Acc, NotFound, [{P, BQSN} | Rest], BQSAcc) -> findfold3(_Fun, Acc, NotFound, [], BQSAcc) -> {NotFound, Acc, lists:reverse(BQSAcc)}. -bq_fetch(P, []) -> exit({not_found, P}); -bq_fetch(P, [{P, BQSN} | _]) -> BQSN; +bq_fetch(P, []) -> exit({not_found, P}); +bq_fetch(P, [{P, BQSN} | _]) -> {P, BQSN}; bq_fetch(P, [{_, _BQSN} | T]) -> bq_fetch(P, T). bq_store(P, BQS, BQSs) -> @@ -582,41 +582,36 @@ a(State = #state{bqss = BQSs}) -> end. %%---------------------------------------------------------------------------- -partition_publish_batch(Publishes) -> +partition_publish_batch(Publishes, MaxP) -> partition_publishes( - Publishes, fun ({Msg, _, _}) -> Msg end). + Publishes, fun ({Msg, _, _}) -> Msg end, MaxP). -partition_publish_delivered_batch(Publishes) -> +partition_publish_delivered_batch(Publishes, MaxP) -> partition_publishes( - Publishes, fun ({Msg, _}) -> Msg end). + Publishes, fun ({Msg, _}) -> Msg end, MaxP). -partition_publishes(Publishes, ExtractMsg) -> +partition_publishes(Publishes, ExtractMsg, MaxP) -> lists:foldl(fun (Pub, Dict) -> Msg = ExtractMsg(Pub), - rabbit_misc:orddict_cons(priority2(Msg), Pub, Dict) + rabbit_misc:orddict_cons(priority(Msg, MaxP), Pub, Dict) end, orddict:new(), Publishes). -priority(P, BQSs) when is_integer(P) -> - {P, bq_fetch(P, BQSs)}; -priority(#basic_message{content = Content}, BQSs) -> - priority1(rabbit_binary_parser:ensure_content_decoded(Content), BQSs). - -priority1(_Content, [{P, BQSN}]) -> - {P, BQSN}; -priority1(Content, [{P, BQSN} | Rest]) -> - case priority2(Content) >= P of - true -> {P, BQSN}; - false -> priority1(Content, Rest) - end. - -priority2(#basic_message{content = Content}) -> - priority2(rabbit_binary_parser:ensure_content_decoded(Content)); -priority2(#content{properties = Props}) -> +priority_bq(Priority, [{MaxP, _} | _] = BQSs) -> + bq_fetch(priority(Priority, MaxP), BQSs). + +%% Messages with a priority which is higher than the queue's maximum are treated +%% as if they were published with the maximum priority. +priority(undefined, _MaxP) -> + 0; +priority(Priority, MaxP) when is_integer(Priority), Priority =< MaxP -> + Priority; +priority(Priority, MaxP) when is_integer(Priority), Priority > MaxP -> + MaxP; +priority(#basic_message{content = Content}, MaxP) -> + priority(rabbit_binary_parser:ensure_content_decoded(Content), MaxP); +priority(#content{properties = Props}, MaxP) -> #'P_basic'{priority = Priority0} = Props, - case Priority0 of - undefined -> 0; - _ when is_integer(Priority0) -> Priority0 - end. + priority(Priority0, MaxP). add_maybe_infinity(infinity, _) -> infinity; add_maybe_infinity(_, infinity) -> infinity; diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 5b5c9b3074..6200f9d2c1 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -100,7 +100,7 @@ new() -> #state{consumers = priority_queue:new(), use = {active, - time_compat:monotonic_time(micro_seconds), + erlang:monotonic_time(micro_seconds), 1.0}}. max_active_priority(#state{consumers = Consumers}) -> @@ -350,9 +350,9 @@ drain_mode(true) -> drain; drain_mode(false) -> manual. utilisation(#state{use = {active, Since, Avg}}) -> - use_avg(time_compat:monotonic_time(micro_seconds) - Since, 0, Avg); + use_avg(erlang:monotonic_time(micro_seconds) - Since, 0, Avg); utilisation(#state{use = {inactive, Since, Active, Avg}}) -> - use_avg(Active, time_compat:monotonic_time(micro_seconds) - Since, Avg). + use_avg(Active, erlang:monotonic_time(micro_seconds) - Since, Avg). %%---------------------------------------------------------------------------- @@ -459,10 +459,10 @@ update_use({inactive, _, _, _} = CUInfo, inactive) -> update_use({active, _, _} = CUInfo, active) -> CUInfo; update_use({active, Since, Avg}, inactive) -> - Now = time_compat:monotonic_time(micro_seconds), + Now = erlang:monotonic_time(micro_seconds), {inactive, Now, Now - Since, Avg}; update_use({inactive, Since, Active, Avg}, active) -> - Now = time_compat:monotonic_time(micro_seconds), + Now = erlang:monotonic_time(micro_seconds), {active, Now, use_avg(Active, Now - Since, Avg)}. use_avg(0, 0, Avg) -> diff --git a/src/rabbit_queue_location_random.erl b/src/rabbit_queue_location_random.erl index 2579cbb2b1..73d509bf33 100644 --- a/src/rabbit_queue_location_random.erl +++ b/src/rabbit_queue_location_random.erl @@ -39,6 +39,6 @@ description() -> queue_master_location(#amqqueue{}) -> Cluster = rabbit_queue_master_location_misc:all_nodes(), - RandomPos = erlang:phash2(time_compat:monotonic_time(), length(Cluster)), + RandomPos = erlang:phash2(erlang:monotonic_time(), length(Cluster)), MasterNode = lists:nth(RandomPos + 1, Cluster), {ok, MasterNode}. diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index b99a1d12ee..0f55b9e4a9 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -52,6 +52,7 @@ -rabbit_upgrade({down_slave_nodes, mnesia, [queue_decorators]}). -rabbit_upgrade({queue_state, mnesia, [down_slave_nodes]}). -rabbit_upgrade({recoverable_slaves, mnesia, [queue_state]}). +-rabbit_upgrade({policy_version, mnesia, [recoverable_slaves]}). -rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}). %% ------------------------------------------------------------------- @@ -447,6 +448,24 @@ recoverable_slaves(Table) -> sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators, state]). +policy_version() -> + ok = policy_version(rabbit_queue), + ok = policy_version(rabbit_durable_queue). + +policy_version(Table) -> + transform( + Table, + fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators, + State}) -> + {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators, + State, 0} + end, + [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, + sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators, state, + policy_version]). + %% Prior to 3.6.0, passwords were hashed using MD5, this populates %% existing records with said default. Users created with 3.6.0+ will %% have internal_user.hashing_algorithm populated by the internal diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index d5b090bed4..5b86cbd3d1 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -688,12 +688,12 @@ requeue(AckTags, #vqstate { mode = default, State2), MsgCount = length(MsgIds2), {MsgIds2, a(reduce_memory_use( - maybe_update_rates( + maybe_update_rates(ui( State3 #vqstate { delta = Delta1, q3 = Q3a, q4 = Q4a, in_counter = InCounter + MsgCount, - len = Len + MsgCount })))}; + len = Len + MsgCount }))))}; requeue(AckTags, #vqstate { mode = lazy, delta = Delta, q3 = Q3, @@ -706,11 +706,11 @@ requeue(AckTags, #vqstate { mode = lazy, State1), MsgCount = length(MsgIds1), {MsgIds1, a(reduce_memory_use( - maybe_update_rates( + maybe_update_rates(ui( State2 #vqstate { delta = Delta1, q3 = Q3a, in_counter = InCounter + MsgCount, - len = Len + MsgCount })))}. + len = Len + MsgCount }))))}. ackfold(MsgFun, Acc, State, AckTags) -> {AccN, StateN} = @@ -774,7 +774,7 @@ update_rates(State = #vqstate{ in_counter = InCount, ack_in = AckInRate, ack_out = AckOutRate, timestamp = TS }}) -> - Now = time_compat:monotonic_time(), + Now = erlang:monotonic_time(), Rates = #rates { in = update_rate(Now, TS, InCount, InRate), out = update_rate(Now, TS, OutCount, OutRate), @@ -789,7 +789,7 @@ update_rates(State = #vqstate{ in_counter = InCount, rates = Rates }. update_rate(Now, TS, Count, Rate) -> - Time = time_compat:convert_time_unit(Now - TS, native, micro_seconds) / + Time = erlang:convert_time_unit(Now - TS, native, micro_seconds) / ?MICROS_PER_SECOND, if Time == 0 -> Rate; @@ -1287,7 +1287,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, count = DeltaCount1, end_seq_id = NextSeqId }) end, - Now = time_compat:monotonic_time(), + Now = erlang:monotonic_time(), IoBatchSize = rabbit_misc:get_env(rabbit, msg_store_io_batch_size, ?IO_BATCH_SIZE), @@ -2124,7 +2124,7 @@ publish_alpha(MsgStatus, State) -> {MsgStatus, stats({1, -1}, {MsgStatus, MsgStatus}, State)}. publish_beta(MsgStatus, State) -> - {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), + {MsgStatus1, State1} = maybe_prepare_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), {MsgStatus2, stats({1, -1}, {MsgStatus, MsgStatus2}, State1)}. @@ -2161,7 +2161,7 @@ delta_merge(SeqIds, Delta, MsgIds, State) -> {#msg_status { msg_id = MsgId } = MsgStatus, State1} = msg_from_pending_ack(SeqId, State0), {_MsgStatus, State2} = - maybe_write_to_disk(true, true, MsgStatus, State1), + maybe_prepare_write_to_disk(true, true, MsgStatus, State1), {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], stats({1, -1}, {MsgStatus, none}, State2)} end, {Delta, MsgIds, State}, SeqIds). diff --git a/src/truncate.erl b/src/truncate.erl index 1c9b08ed27..a1586b0cb0 100644 --- a/src/truncate.erl +++ b/src/truncate.erl @@ -21,8 +21,10 @@ -record(params, {content, struct, content_dec, struct_dec}). -export([log_event/2, term/2]). -%% exported for testing --export([test/0]). + +-ifdef(TEST). +-export([term_size/3]). +-endif. log_event({Type, GL, {Pid, Format, Args}}, Params) when Type =:= error orelse @@ -123,72 +125,3 @@ tuple_term_size(_T, M, I, S, _W) when I > S -> M; tuple_term_size(T, M, I, S, W) -> tuple_term_size(T, lim(term_size(element(I, T), M, W), 2 * W), I + 1, S, W). - -%%---------------------------------------------------------------------------- - -test() -> - test_short_examples_exactly(), - test_term_limit(), - test_large_examples_for_size(), - ok. - -test_short_examples_exactly() -> - F = fun (Term, Exp) -> - Exp = term(Term, {1, {10, 10, 5, 5}}), - Term = term(Term, {100000, {10, 10, 5, 5}}) - end, - FSmall = fun (Term, Exp) -> - Exp = term(Term, {1, {2, 2, 2, 2}}), - Term = term(Term, {100000, {2, 2, 2, 2}}) - end, - F([], []), - F("h", "h"), - F("hello world", "hello w..."), - F([[h,e,l,l,o,' ',w,o,r,l,d]], [[h,e,l,l,o,'...']]), - F([a|b], [a|b]), - F(<<"hello">>, <<"hello">>), - F([<<"hello world">>], [<<"he...">>]), - F(<<1:1>>, <<1:1>>), - F(<<1:81>>, <<0:56, "...">>), - F({{{{a}}},{b},c,d,e,f,g,h,i,j,k}, {{{'...'}},{b},c,d,e,f,g,h,i,j,'...'}), - FSmall({a,30,40,40,40,40}, {a,30,'...'}), - FSmall([a,30,40,40,40,40], [a,30,'...']), - P = spawn(fun() -> receive die -> ok end end), - F([0, 0.0, <<1:1>>, F, P], [0, 0.0, <<1:1>>, F, P]), - P ! die, - R = make_ref(), - F([R], [R]), - ok. - -test_term_limit() -> - W = erlang:system_info(wordsize), - S = <<"abc">>, - 1 = term_size(S, 4, W), - limit_exceeded = term_size(S, 3, W), - case 100 - term_size([S, S], 100, W) of - 22 -> ok; %% 32 bit - 38 -> ok %% 64 bit - end, - case 100 - term_size([S, [S]], 100, W) of - 30 -> ok; %% ditto - 54 -> ok - end, - limit_exceeded = term_size([S, S], 6, W), - ok. - -test_large_examples_for_size() -> - %% Real world values - Shrink = fun(Term) -> term(Term, {1, {1000, 100, 50, 5}}) end, - TestSize = fun(Term) -> - true = 5000000 < size(term_to_binary(Term)), - true = 500000 > size(term_to_binary(Shrink(Term))) - end, - TestSize(lists:seq(1, 5000000)), - TestSize(recursive_list(1000, 10)), - TestSize(recursive_list(5000, 20)), - TestSize(gb_sets:from_list([I || I <- lists:seq(1, 1000000)])), - TestSize(gb_trees:from_orddict([{I, I} || I <- lists:seq(1, 1000000)])), - ok. - -recursive_list(S, 0) -> lists:seq(1, S); -recursive_list(S, N) -> [recursive_list(S div N, N-1) || _ <- lists:seq(1, S)]. |
