diff options
Diffstat (limited to 'deps/rabbit_common/src/rabbit_misc.erl')
-rw-r--r-- | deps/rabbit_common/src/rabbit_misc.erl | 1434 |
1 files changed, 1434 insertions, 0 deletions
diff --git a/deps/rabbit_common/src/rabbit_misc.erl b/deps/rabbit_common/src/rabbit_misc.erl new file mode 100644 index 0000000000..c5fd86dcbb --- /dev/null +++ b/deps/rabbit_common/src/rabbit_misc.erl @@ -0,0 +1,1434 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_misc). + +-ignore_xref([{maps, get, 2}]). + +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). +-include("rabbit_misc.hrl"). + +-ifdef(TEST). +-export([decompose_pid/1, compose_pid/4]). +-endif. + +-export([method_record_type/1, polite_pause/0, polite_pause/1]). +-export([die/1, frame_error/2, amqp_error/4, quit/1, + protocol_error/3, protocol_error/4, protocol_error/1]). +-export([type_class/1, assert_args_equivalence/4, assert_field_equivalence/4]). +-export([dirty_read/1]). +-export([table_lookup/2, set_table_value/4, amqp_table/1, to_amqp_table/1]). +-export([r/3, r/2, r_arg/4, rs/1]). +-export([enable_cover/0, report_cover/0]). +-export([enable_cover/1, report_cover/1]). +-export([start_cover/1]). +-export([throw_on_error/2, with_exit_handler/2, is_abnormal_exit/1, + filter_exit_map/2]). +-export([with_user/2]). +-export([execute_mnesia_transaction/1]). +-export([execute_mnesia_transaction/2]). +-export([execute_mnesia_tx_with_tail/1]). +-export([ensure_ok/2]). +-export([tcp_name/3, format_inet_error/1]). +-export([upmap/2, map_in_order/2, utf8_safe/1]). +-export([table_filter/3]). +-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). +-export([format/2, format_many/1, format_stderr/2]). +-export([unfold/2, ceil/1, queue_fold/3]). +-export([sort_field_table/1]). +-export([atom_to_binary/1, parse_bool/1, parse_int/1]). +-export([pid_to_string/1, string_to_pid/1, + pid_change_node/2, node_to_fake_pid/1]). +-export([version_compare/2, version_compare/3]). +-export([version_minor_equivalent/2, strict_version_minor_equivalent/2]). +-export([dict_cons/3, orddict_cons/3, maps_cons/3, gb_trees_cons/3]). +-export([gb_trees_fold/3, gb_trees_foreach/2]). +-export([all_module_attributes/1, + rabbitmq_related_module_attributes/1, + module_attributes_from_apps/2, + build_acyclic_graph/3]). +-export([const/1]). +-export([ntoa/1, ntoab/1]). +-export([is_process_alive/1]). +-export([pget/2, pget/3, pupdate/3, pget_or_die/2, pmerge/3, pset/3, plmerge/2]). +-export([format_message_queue/2]). +-export([append_rpc_all_nodes/4, append_rpc_all_nodes/5]). +-export([os_cmd/1]). +-export([is_os_process_alive/1]). +-export([gb_sets_difference/2]). +-export([version/0, otp_release/0, platform_and_version/0, otp_system_version/0, + rabbitmq_and_erlang_versions/0, which_applications/0]). +-export([sequence_error/1]). +-export([check_expiry/1]). +-export([base64url/1]). +-export([interval_operation/5]). +-export([ensure_timer/4, stop_timer/2, send_after/3, cancel_timer/1]). +-export([get_parent/0]). +-export([store_proc_name/1, store_proc_name/2, get_proc_name/0]). +-export([moving_average/4]). +-export([escape_html_tags/1, b64decode_or_throw/1]). +-export([get_env/3]). +-export([get_channel_operation_timeout/0]). +-export([random/1]). +-export([rpc_call/4, rpc_call/5]). +-export([get_gc_info/1]). +-export([group_proplists_by/2]). + +%% Horrible macro to use in guards +-define(IS_BENIGN_EXIT(R), + R =:= noproc; R =:= noconnection; R =:= nodedown; R =:= normal; + R =:= shutdown). + +%%---------------------------------------------------------------------------- + +-export_type([resource_name/0, thunk/1, channel_or_connection_exit/0]). + +-type ok_or_error() :: rabbit_types:ok_or_error(any()). +-type thunk(T) :: fun(() -> T). +-type resource_name() :: binary(). +-type channel_or_connection_exit() + :: rabbit_types:channel_exit() | rabbit_types:connection_exit(). +-type digraph_label() :: term(). +-type graph_vertex_fun() :: + fun (({atom(), [term()]}) -> [{digraph:vertex(), digraph_label()}]). +-type graph_edge_fun() :: + fun (({atom(), [term()]}) -> [{digraph:vertex(), digraph:vertex()}]). +-type tref() :: {'erlang', reference()} | {timer, timer:tref()}. + +-spec method_record_type(rabbit_framing:amqp_method_record()) -> + rabbit_framing:amqp_method_name(). +-spec polite_pause() -> 'done'. +-spec polite_pause(non_neg_integer()) -> 'done'. +-spec die(rabbit_framing:amqp_exception()) -> channel_or_connection_exit(). + +-spec quit(integer()) -> no_return(). + +-spec frame_error(rabbit_framing:amqp_method_name(), binary()) -> + rabbit_types:connection_exit(). +-spec amqp_error + (rabbit_framing:amqp_exception(), string(), [any()], + rabbit_framing:amqp_method_name()) -> + rabbit_types:amqp_error(). +-spec protocol_error(rabbit_framing:amqp_exception(), string(), [any()]) -> + channel_or_connection_exit(). +-spec protocol_error + (rabbit_framing:amqp_exception(), string(), [any()], + rabbit_framing:amqp_method_name()) -> + channel_or_connection_exit(). +-spec protocol_error(rabbit_types:amqp_error()) -> + channel_or_connection_exit(). +-spec type_class(rabbit_framing:amqp_field_type()) -> atom(). +-spec assert_args_equivalence + (rabbit_framing:amqp_table(), rabbit_framing:amqp_table(), + rabbit_types:r(any()), [binary()]) -> + 'ok' | rabbit_types:connection_exit(). +-spec assert_field_equivalence + (any(), any(), rabbit_types:r(any()), atom() | binary()) -> + 'ok' | rabbit_types:connection_exit(). +-spec equivalence_fail + (any(), any(), rabbit_types:r(any()), atom() | binary()) -> + rabbit_types:connection_exit(). +-spec dirty_read({atom(), any()}) -> + rabbit_types:ok_or_error2(any(), 'not_found'). +-spec table_lookup(rabbit_framing:amqp_table(), binary()) -> + 'undefined' | {rabbit_framing:amqp_field_type(), any()}. +-spec set_table_value + (rabbit_framing:amqp_table(), binary(), rabbit_framing:amqp_field_type(), + rabbit_framing:amqp_value()) -> + rabbit_framing:amqp_table(). +-spec r(rabbit_types:vhost(), K) -> + rabbit_types:r3(rabbit_types:vhost(), K, '_') + when is_subtype(K, atom()). +-spec r(rabbit_types:vhost() | rabbit_types:r(atom()), K, resource_name()) -> + rabbit_types:r3(rabbit_types:vhost(), K, resource_name()) + when is_subtype(K, atom()). +-spec r_arg + (rabbit_types:vhost() | rabbit_types:r(atom()), K, + rabbit_framing:amqp_table(), binary()) -> + undefined | + rabbit_types:error( + {invalid_type, rabbit_framing:amqp_field_type()}) | + rabbit_types:r(K) when is_subtype(K, atom()). +-spec rs(rabbit_types:r(atom())) -> string(). +-spec enable_cover() -> ok_or_error(). +-spec start_cover([{string(), string()} | string()]) -> 'ok'. +-spec report_cover() -> 'ok'. +-spec enable_cover([file:filename() | atom()]) -> ok_or_error(). +-spec report_cover([file:filename() | atom()]) -> 'ok'. +-spec throw_on_error + (atom(), thunk(rabbit_types:error(any()) | {ok, A} | A)) -> A. +-spec with_exit_handler(thunk(A), thunk(A)) -> A. +-spec is_abnormal_exit(any()) -> boolean(). +-spec filter_exit_map(fun ((A) -> B), [A]) -> [B]. +-spec with_user(rabbit_types:username(), thunk(A)) -> A. +-spec execute_mnesia_transaction(thunk(A)) -> A. +-spec execute_mnesia_transaction(thunk(A), fun ((A, boolean()) -> B)) -> B. +-spec execute_mnesia_tx_with_tail + (thunk(fun ((boolean()) -> B))) -> B | (fun ((boolean()) -> B)). +-spec ensure_ok(ok_or_error(), atom()) -> 'ok'. +-spec tcp_name(atom(), inet:ip_address(), rabbit_net:ip_port()) -> + atom(). +-spec format_inet_error(atom()) -> string(). +-spec upmap(fun ((A) -> B), [A]) -> [B]. +-spec map_in_order(fun ((A) -> B), [A]) -> [B]. +-spec table_filter + (fun ((A) -> boolean()), fun ((A, boolean()) -> 'ok'), atom()) -> [A]. +-spec dirty_read_all(atom()) -> [any()]. +-spec dirty_foreach_key(fun ((any()) -> any()), atom()) -> + 'ok' | 'aborted'. +-spec dirty_dump_log(file:filename()) -> ok_or_error(). +-spec format(string(), [any()]) -> string(). +-spec format_many([{string(), [any()]}]) -> string(). +-spec format_stderr(string(), [any()]) -> 'ok'. +-spec unfold (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}. +-spec ceil(number()) -> integer(). +-spec queue_fold(fun ((any(), B) -> B), B, queue:queue()) -> B. +-spec sort_field_table(rabbit_framing:amqp_table()) -> + rabbit_framing:amqp_table(). +-spec pid_to_string(pid()) -> string(). +-spec string_to_pid(string()) -> pid(). +-spec pid_change_node(pid(), node()) -> pid(). +-spec node_to_fake_pid(atom()) -> pid(). +-spec version_compare(string(), string()) -> 'lt' | 'eq' | 'gt'. +-spec version_compare + (rabbit_semver:version_string(), rabbit_semver:version_string(), + ('lt' | 'lte' | 'eq' | 'gte' | 'gt')) -> boolean(). +-spec version_minor_equivalent(rabbit_semver:version_string(), rabbit_semver:version_string()) -> boolean(). +-spec dict_cons(any(), any(), dict:dict()) -> dict:dict(). +-spec orddict_cons(any(), any(), orddict:orddict()) -> orddict:orddict(). +-spec gb_trees_cons(any(), any(), gb_trees:tree()) -> gb_trees:tree(). +-spec gb_trees_fold(fun ((any(), any(), A) -> A), A, gb_trees:tree()) -> A. +-spec gb_trees_foreach(fun ((any(), any()) -> any()), gb_trees:tree()) -> + 'ok'. +-spec all_module_attributes(atom()) -> [{atom(), atom(), [term()]}]. +-spec build_acyclic_graph + (graph_vertex_fun(), graph_edge_fun(), [{atom(), [term()]}]) -> + rabbit_types:ok_or_error2( + digraph:graph(), + {'vertex', 'duplicate', digraph:vertex()} | + {'edge', + ({bad_vertex, digraph:vertex()} | + {bad_edge, [digraph:vertex()]}), + digraph:vertex(), digraph:vertex()}). +-spec const(A) -> thunk(A). +-spec ntoa(inet:ip_address()) -> string(). +-spec ntoab(inet:ip_address()) -> string(). +-spec is_process_alive(pid()) -> boolean(). + +-spec pmerge(term(), term(), [term()]) -> [term()]. +-spec plmerge([term()], [term()]) -> [term()]. +-spec pset(term(), term(), [term()]) -> [term()]. +-spec format_message_queue(any(), priority_queue:q()) -> term(). +-spec os_cmd(string()) -> string(). +-spec is_os_process_alive(non_neg_integer()) -> boolean(). +-spec gb_sets_difference(gb_sets:set(), gb_sets:set()) -> gb_sets:set(). +-spec version() -> string(). +-spec otp_release() -> string(). +-spec otp_system_version() -> string(). +-spec platform_and_version() -> string(). +-spec rabbitmq_and_erlang_versions() -> {string(), string()}. +-spec which_applications() -> [{atom(), string(), string()}]. +-spec sequence_error([({'error', any()} | any())]) -> + {'error', any()} | any(). +-spec check_expiry(integer()) -> rabbit_types:ok_or_error(any()). +-spec base64url(binary()) -> string(). +-spec interval_operation + ({atom(), atom(), any()}, float(), non_neg_integer(), non_neg_integer(), + non_neg_integer()) -> + {any(), non_neg_integer()}. +-spec ensure_timer(A, non_neg_integer(), non_neg_integer(), any()) -> A. +-spec stop_timer(A, non_neg_integer()) -> A. +-spec send_after(non_neg_integer(), pid(), any()) -> tref(). +-spec cancel_timer(tref()) -> 'ok'. +-spec get_parent() -> pid(). +-spec store_proc_name(atom(), rabbit_types:proc_name()) -> ok. +-spec store_proc_name(rabbit_types:proc_type_and_name()) -> ok. +-spec get_proc_name() -> rabbit_types:proc_name(). +-spec moving_average(float(), float(), float(), float() | 'undefined') -> + float(). +-spec get_env(atom(), atom(), term()) -> term(). +-spec get_channel_operation_timeout() -> non_neg_integer(). +-spec random(non_neg_integer()) -> non_neg_integer(). +-spec get_gc_info(pid()) -> [any()]. +-spec group_proplists_by(fun((proplists:proplist()) -> any()), + list(proplists:proplist())) -> list(list(proplists:proplist())). + + +%%---------------------------------------------------------------------------- + +method_record_type(Record) -> + element(1, Record). + +polite_pause() -> + polite_pause(3000). + +polite_pause(N) -> + receive + after N -> done + end. + +die(Error) -> + protocol_error(Error, "~w", [Error]). + +frame_error(MethodName, BinaryFields) -> + protocol_error(frame_error, "cannot decode ~w", [BinaryFields], MethodName). + +amqp_error(Name, ExplanationFormat, Params, Method) -> + Explanation = format(ExplanationFormat, Params), + #amqp_error{name = Name, explanation = Explanation, method = Method}. + +protocol_error(Name, ExplanationFormat, Params) -> + protocol_error(Name, ExplanationFormat, Params, none). + +protocol_error(Name, ExplanationFormat, Params, Method) -> + protocol_error(amqp_error(Name, ExplanationFormat, Params, Method)). + +protocol_error(#amqp_error{} = Error) -> + exit(Error). + +type_class(byte) -> int; +type_class(short) -> int; +type_class(signedint) -> int; +type_class(long) -> int; +type_class(decimal) -> int; +type_class(unsignedbyte) -> int; +type_class(unsignedshort) -> int; +type_class(unsignedint) -> int; +type_class(float) -> float; +type_class(double) -> float; +type_class(Other) -> Other. + +assert_args_equivalence(Orig, New, Name, Keys) -> + [assert_args_equivalence1(Orig, New, Name, Key) || Key <- Keys], + ok. + +assert_args_equivalence1(Orig, New, Name, Key) -> + {Orig1, New1} = {table_lookup(Orig, Key), table_lookup(New, Key)}, + case {Orig1, New1} of + {Same, Same} -> + ok; + {{OrigType, OrigVal}, {NewType, NewVal}} -> + case type_class(OrigType) == type_class(NewType) andalso + OrigVal == NewVal of + true -> ok; + false -> assert_field_equivalence(OrigVal, NewVal, Name, Key) + end; + {OrigTypeVal, NewTypeVal} -> + assert_field_equivalence(OrigTypeVal, NewTypeVal, Name, Key) + end. + +%% Classic queues do not necessarily have an x-queue-type field associated with them +%% so we special-case that scenario here +%% +%% Fixes rabbitmq/rabbitmq-common#341 +%% +assert_field_equivalence(_Orig, _Orig, _Name, _Key) -> + ok; +assert_field_equivalence(undefined, {longstr, <<"classic">>}, _Name, <<"x-queue-type">>) -> + ok; +assert_field_equivalence({longstr, <<"classic">>}, undefined, _Name, <<"x-queue-type">>) -> + ok; +assert_field_equivalence(Orig, New, Name, Key) -> + equivalence_fail(Orig, New, Name, Key). + +equivalence_fail(Orig, New, Name, Key) -> + protocol_error(precondition_failed, "inequivalent arg '~s' " + "for ~s: received ~s but current is ~s", + [Key, rs(Name), val(New), val(Orig)]). + +val(undefined) -> + "none"; +val({Type, Value}) -> + ValFmt = case is_binary(Value) of + true -> "~s"; + false -> "~p" + end, + format("the value '" ++ ValFmt ++ "' of type '~s'", [Value, Type]); +val(Value) -> + format(case is_binary(Value) of + true -> "'~s'"; + false -> "'~p'" + end, [Value]). + +%% Normally we'd call mnesia:dirty_read/1 here, but that is quite +%% expensive due to general mnesia overheads (figuring out table types +%% and locations, etc). We get away with bypassing these because we +%% know that the tables we are looking at here +%% - are not the schema table +%% - have a local ram copy +%% - do not have any indices +dirty_read({Table, Key}) -> + case ets:lookup(Table, Key) of + [Result] -> {ok, Result}; + [] -> {error, not_found} + end. + +%% +%% Attribute Tables +%% + +table_lookup(Table, Key) -> + case lists:keysearch(Key, 1, Table) of + {value, {_, TypeBin, ValueBin}} -> {TypeBin, ValueBin}; + false -> undefined + end. + +set_table_value(Table, Key, Type, Value) -> + sort_field_table( + lists:keystore(Key, 1, Table, {Key, Type, Value})). + +to_amqp_table(M) when is_map(M) -> + lists:reverse(maps:fold(fun(K, V, Acc) -> [to_amqp_table_row(K, V)|Acc] end, + [], M)); +to_amqp_table(L) when is_list(L) -> + L. + +to_amqp_table_row(K, V) -> + {T, V2} = type_val(V), + {K, T, V2}. + +to_amqp_array(L) -> + [type_val(I) || I <- L]. + +type_val(M) when is_map(M) -> {table, to_amqp_table(M)}; +type_val(L) when is_list(L) -> {array, to_amqp_array(L)}; +type_val(X) when is_binary(X) -> {longstr, X}; +type_val(X) when is_integer(X) -> {long, X}; +type_val(X) when is_number(X) -> {double, X}; +type_val(true) -> {bool, true}; +type_val(false) -> {bool, false}; +type_val(null) -> throw({error, null_not_allowed}); +type_val(X) -> throw({error, {unhandled_type, X}}). + +amqp_table(unknown) -> unknown; +amqp_table(undefined) -> amqp_table([]); +amqp_table([]) -> #{}; +amqp_table(#{}) -> #{}; +amqp_table(Table) -> maps:from_list([{Name, amqp_value(Type, Value)} || + {Name, Type, Value} <- Table]). + +amqp_value(array, Vs) -> [amqp_value(T, V) || {T, V} <- Vs]; +amqp_value(table, V) -> amqp_table(V); +amqp_value(decimal, {Before, After}) -> + erlang:list_to_float( + lists:flatten(io_lib:format("~p.~p", [Before, After]))); +amqp_value(_Type, V) when is_binary(V) -> utf8_safe(V); +amqp_value(_Type, V) -> V. + + +%% +%% Resources +%% + +r(#resource{virtual_host = VHostPath}, Kind, Name) -> + #resource{virtual_host = VHostPath, kind = Kind, name = Name}; +r(VHostPath, Kind, Name) -> + #resource{virtual_host = VHostPath, kind = Kind, name = Name}. + +r(VHostPath, Kind) -> + #resource{virtual_host = VHostPath, kind = Kind, name = '_'}. + +r_arg(#resource{virtual_host = VHostPath}, Kind, Table, Key) -> + r_arg(VHostPath, Kind, Table, Key); +r_arg(VHostPath, Kind, Table, Key) -> + case table_lookup(Table, Key) of + {longstr, NameBin} -> r(VHostPath, Kind, NameBin); + undefined -> undefined; + {Type, _} -> {error, {invalid_type, Type}} + end. + +rs(#resource{virtual_host = VHostPath, kind = topic, name = Name}) -> + format("'~s' in vhost '~s'", [Name, VHostPath]); +rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) -> + format("~s '~s' in vhost '~s'", [Kind, Name, VHostPath]). + +enable_cover() -> enable_cover(["."]). + +enable_cover(Dirs) -> + lists:foldl(fun (Dir, ok) -> + case cover:compile_beam_directory( + filename:join(lists:concat([Dir]),"ebin")) of + {error, _} = Err -> Err; + _ -> ok + end; + (_Dir, Err) -> + Err + end, ok, Dirs). + +start_cover(NodesS) -> + {ok, _} = cover:start([rabbit_nodes_common:make(N) || N <- NodesS]), + ok. + +report_cover() -> report_cover(["."]). + +report_cover(Dirs) -> [report_cover1(lists:concat([Dir])) || Dir <- Dirs], ok. + +report_cover1(Root) -> + Dir = filename:join(Root, "cover"), + ok = filelib:ensure_dir(filename:join(Dir, "junk")), + lists:foreach(fun (F) -> file:delete(F) end, + filelib:wildcard(filename:join(Dir, "*.html"))), + {ok, SummaryFile} = file:open(filename:join(Dir, "summary.txt"), [write]), + {CT, NCT} = + lists:foldl( + fun (M,{CovTot, NotCovTot}) -> + {ok, {M, {Cov, NotCov}}} = cover:analyze(M, module), + ok = report_coverage_percentage(SummaryFile, + Cov, NotCov, M), + {ok,_} = cover:analyze_to_file( + M, + filename:join(Dir, atom_to_list(M) ++ ".html"), + [html]), + {CovTot+Cov, NotCovTot+NotCov} + end, + {0, 0}, + lists:sort(cover:modules())), + ok = report_coverage_percentage(SummaryFile, CT, NCT, 'TOTAL'), + ok = file:close(SummaryFile), + ok. + +report_coverage_percentage(File, Cov, NotCov, Mod) -> + io:fwrite(File, "~6.2f ~p~n", + [if + Cov+NotCov > 0 -> 100.0*Cov/(Cov+NotCov); + true -> 100.0 + end, + Mod]). + +%% @doc Halts the emulator returning the given status code to the os. +%% On Windows this function will block indefinitely so as to give the io +%% subsystem time to flush stdout completely. +quit(Status) -> + case os:type() of + {unix, _} -> halt(Status); + {win32, _} -> init:stop(Status), + receive + after infinity -> ok + end + end. + +throw_on_error(E, Thunk) -> + case Thunk() of + {error, Reason} -> throw({E, Reason}); + {ok, Res} -> Res; + Res -> Res + end. + +with_exit_handler(Handler, Thunk) -> + try + Thunk() + catch + exit:{R, _} when ?IS_BENIGN_EXIT(R) -> Handler(); + exit:{{R, _}, _} when ?IS_BENIGN_EXIT(R) -> Handler() + end. + +is_abnormal_exit(R) when ?IS_BENIGN_EXIT(R) -> false; +is_abnormal_exit({R, _}) when ?IS_BENIGN_EXIT(R) -> false; +is_abnormal_exit(_) -> true. + +filter_exit_map(F, L) -> + Ref = make_ref(), + lists:filter(fun (R) -> R =/= Ref end, + [with_exit_handler( + fun () -> Ref end, + fun () -> F(I) end) || I <- L]). + + +with_user(Username, Thunk) -> + fun () -> + case mnesia:read({rabbit_user, Username}) of + [] -> + mnesia:abort({no_such_user, Username}); + [_U] -> + Thunk() + end + end. + +execute_mnesia_transaction(TxFun) -> + %% Making this a sync_transaction allows us to use dirty_read + %% elsewhere and get a consistent result even when that read + %% executes on a different node. + case worker_pool:submit( + fun () -> + case mnesia:is_transaction() of + false -> DiskLogBefore = mnesia_dumper:get_log_writes(), + Res = mnesia:sync_transaction(TxFun), + DiskLogAfter = mnesia_dumper:get_log_writes(), + case DiskLogAfter == DiskLogBefore of + true -> file_handle_cache_stats:update( + mnesia_ram_tx), + Res; + false -> file_handle_cache_stats:update( + mnesia_disk_tx), + {sync, Res} + end; + true -> mnesia:sync_transaction(TxFun) + end + end, single) of + {sync, {atomic, Result}} -> mnesia_sync:sync(), Result; + {sync, {aborted, Reason}} -> throw({error, Reason}); + {atomic, Result} -> Result; + {aborted, Reason} -> throw({error, Reason}) + end. + +%% Like execute_mnesia_transaction/1 with additional Pre- and Post- +%% commit function +execute_mnesia_transaction(TxFun, PrePostCommitFun) -> + case mnesia:is_transaction() of + true -> throw(unexpected_transaction); + false -> ok + end, + PrePostCommitFun(execute_mnesia_transaction( + fun () -> + Result = TxFun(), + PrePostCommitFun(Result, true), + Result + end), false). + +%% Like execute_mnesia_transaction/2, but TxFun is expected to return a +%% TailFun which gets called (only) immediately after the tx commit +execute_mnesia_tx_with_tail(TxFun) -> + case mnesia:is_transaction() of + true -> execute_mnesia_transaction(TxFun); + false -> TailFun = execute_mnesia_transaction(TxFun), + TailFun() + end. + +ensure_ok(ok, _) -> ok; +ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}). + +tcp_name(Prefix, IPAddress, Port) + when is_atom(Prefix) andalso is_number(Port) -> + list_to_atom( + format("~w_~s:~w", [Prefix, inet_parse:ntoa(IPAddress), Port])). + +format_inet_error(E) -> format("~w (~s)", [E, format_inet_error0(E)]). + +format_inet_error0(address) -> "cannot connect to host/port"; +format_inet_error0(timeout) -> "timed out"; +format_inet_error0(Error) -> inet:format_error(Error). + +%% base64:decode throws lots of weird errors. Catch and convert to one +%% that will cause a bad_request. +b64decode_or_throw(B64) -> + try + base64:decode(B64) + catch error:_ -> + throw({error, {not_base64, B64}}) + end. + +utf8_safe(V) -> + try + _ = xmerl_ucs:from_utf8(V), + V + catch exit:{ucs, _} -> + Enc = split_lines(base64:encode(V)), + <<"Not UTF-8, base64 is: ", Enc/binary>> + end. + +%% MIME enforces a limit on line length of base 64-encoded data to 76 characters. +split_lines(<<Text:76/binary, Rest/binary>>) -> + <<Text/binary, $\n, (split_lines(Rest))/binary>>; +split_lines(Text) -> + Text. + + +%% This is a modified version of Luke Gorrie's pmap - +%% https://lukego.livejournal.com/6753.html - that doesn't care about +%% the order in which results are received. +%% +%% WARNING: This is is deliberately lightweight rather than robust -- if F +%% throws, upmap will hang forever, so make sure F doesn't throw! +upmap(F, L) -> + Parent = self(), + Ref = make_ref(), + [receive {Ref, Result} -> Result end + || _ <- [spawn(fun () -> Parent ! {Ref, F(X)} end) || X <- L]]. + +map_in_order(F, L) -> + lists:reverse( + lists:foldl(fun (E, Acc) -> [F(E) | Acc] end, [], L)). + +%% Apply a pre-post-commit function to all entries in a table that +%% satisfy a predicate, and return those entries. +%% +%% We ignore entries that have been modified or removed. +table_filter(Pred, PrePostCommitFun, TableName) -> + lists:foldl( + fun (E, Acc) -> + case execute_mnesia_transaction( + fun () -> mnesia:match_object(TableName, E, read) =/= [] + andalso Pred(E) end, + fun (false, _Tx) -> false; + (true, Tx) -> PrePostCommitFun(E, Tx), true + end) of + false -> Acc; + true -> [E | Acc] + end + end, [], dirty_read_all(TableName)). + +dirty_read_all(TableName) -> + mnesia:dirty_select(TableName, [{'$1',[],['$1']}]). + +dirty_foreach_key(F, TableName) -> + dirty_foreach_key1(F, TableName, mnesia:dirty_first(TableName)). + +dirty_foreach_key1(_F, _TableName, '$end_of_table') -> + ok; +dirty_foreach_key1(F, TableName, K) -> + case catch mnesia:dirty_next(TableName, K) of + {'EXIT', _} -> + aborted; + NextKey -> + F(K), + dirty_foreach_key1(F, TableName, NextKey) + end. + +dirty_dump_log(FileName) -> + {ok, LH} = disk_log:open([{name, dirty_dump_log}, + {mode, read_only}, + {file, FileName}]), + dirty_dump_log1(LH, disk_log:chunk(LH, start)), + disk_log:close(LH). + +dirty_dump_log1(_LH, eof) -> + io:format("Done.~n"); +dirty_dump_log1(LH, {K, Terms}) -> + io:format("Chunk: ~p~n", [Terms]), + dirty_dump_log1(LH, disk_log:chunk(LH, K)); +dirty_dump_log1(LH, {K, Terms, BadBytes}) -> + io:format("Bad Chunk, ~p: ~p~n", [BadBytes, Terms]), + dirty_dump_log1(LH, disk_log:chunk(LH, K)). + +format(Fmt, Args) -> lists:flatten(io_lib:format(Fmt, Args)). + +format_many(List) -> + lists:flatten([io_lib:format(F ++ "~n", A) || {F, A} <- List]). + +format_stderr(Fmt, Args) -> + io:format(standard_error, Fmt, Args), + ok. + +unfold(Fun, Init) -> + unfold(Fun, [], Init). + +unfold(Fun, Acc, Init) -> + case Fun(Init) of + {true, E, I} -> unfold(Fun, [E|Acc], I); + false -> {Acc, Init} + end. + +ceil(N) -> + T = trunc(N), + case N == T of + true -> T; + false -> 1 + T + end. + +parse_bool(<<"true">>) -> true; +parse_bool(<<"false">>) -> false; +parse_bool(true) -> true; +parse_bool(false) -> false; +parse_bool(undefined) -> undefined; +parse_bool(V) -> throw({error, {not_boolean, V}}). + +parse_int(I) when is_integer(I) -> I; +parse_int(F) when is_number(F) -> trunc(F); +parse_int(S) -> try + list_to_integer(binary_to_list(S)) + catch error:badarg -> + throw({error, {not_integer, S}}) + end. + + +queue_fold(Fun, Init, Q) -> + case queue:out(Q) of + {empty, _Q} -> Init; + {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1) + end. + +%% Sorts a list of AMQP 0-9-1 table fields as per the AMQP 0-9-1 spec +sort_field_table([]) -> + []; +sort_field_table(M) when is_map(M) andalso map_size(M) =:= 0 -> + []; +sort_field_table(Arguments) when is_map(Arguments) -> + sort_field_table(maps:to_list(Arguments)); +sort_field_table(Arguments) -> + lists:keysort(1, Arguments). + +atom_to_binary(A) -> + list_to_binary(atom_to_list(A)). + +%% This provides a string representation of a pid that is the same +%% regardless of what node we are running on. The representation also +%% permits easy identification of the pid's node. +pid_to_string(Pid) when is_pid(Pid) -> + {Node, Cre, Id, Ser} = decompose_pid(Pid), + format("<~s.~B.~B.~B>", [Node, Cre, Id, Ser]). + +%% inverse of above +string_to_pid(Str) -> + Err = {error, {invalid_pid_syntax, Str}}, + %% The \ before the trailing $ is only there to keep emacs + %% font-lock from getting confused. + case re:run(Str, "^<(.*)\\.(\\d+)\\.(\\d+)\\.(\\d+)>\$", + [{capture,all_but_first,list}]) of + {match, [NodeStr, CreStr, IdStr, SerStr]} -> + [Cre, Id, Ser] = lists:map(fun list_to_integer/1, + [CreStr, IdStr, SerStr]), + compose_pid(list_to_atom(NodeStr), Cre, Id, Ser); + nomatch -> + throw(Err) + end. + +pid_change_node(Pid, NewNode) -> + {_OldNode, Cre, Id, Ser} = decompose_pid(Pid), + compose_pid(NewNode, Cre, Id, Ser). + +%% node(node_to_fake_pid(Node)) =:= Node. +node_to_fake_pid(Node) -> + compose_pid(Node, 0, 0, 0). + +decompose_pid(Pid) when is_pid(Pid) -> + %% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and + %% 8.7) + Node = node(Pid), + BinPid0 = term_to_binary(Pid), + case BinPid0 of + %% NEW_PID_EXT + <<131, 88, BinPid/bits>> -> + NodeByteSize = byte_size(BinPid0) - 14, + <<_NodePrefix:NodeByteSize/binary, Id:32, Ser:32, Cre:32>> = BinPid, + {Node, Cre, Id, Ser}; + %% PID_EXT + <<131, 103, BinPid/bits>> -> + NodeByteSize = byte_size(BinPid0) - 11, + <<_NodePrefix:NodeByteSize/binary, Id:32, Ser:32, Cre:8>> = BinPid, + {Node, Cre, Id, Ser} + end. + +compose_pid(Node, Cre, Id, Ser) -> + <<131,NodeEnc/binary>> = term_to_binary(Node), + binary_to_term(<<131,88,NodeEnc/binary,Id:32,Ser:32,Cre:32>>). + +version_compare(A, B, eq) -> rabbit_semver:eql(A, B); +version_compare(A, B, lt) -> rabbit_semver:lt(A, B); +version_compare(A, B, lte) -> rabbit_semver:lte(A, B); +version_compare(A, B, gt) -> rabbit_semver:gt(A, B); +version_compare(A, B, gte) -> rabbit_semver:gte(A, B). + +version_compare(A, B) -> + case version_compare(A, B, lt) of + true -> lt; + false -> case version_compare(A, B, gt) of + true -> gt; + false -> eq + end + end. + +%% For versions starting from 3.7.x: +%% Versions are considered compatible (except for special cases; see +%% below). The feature flags will determine if they are actually +%% compatible. +%% +%% For versions up-to 3.7.x: +%% a.b.c and a.b.d match, but a.b.c and a.d.e don't. If +%% versions do not match that pattern, just compare them. +%% +%% Special case for 3.6.6 because it introduced a change to the schema. +%% e.g. 3.6.6 is not compatible with 3.6.5 +%% This special case can be removed once 3.6.x reaches EOL +version_minor_equivalent(A, B) -> + {{MajA, MinA, PatchA, _}, _} = rabbit_semver:normalize(rabbit_semver:parse(A)), + {{MajB, MinB, PatchB, _}, _} = rabbit_semver:normalize(rabbit_semver:parse(B)), + + case {MajA, MinA, MajB, MinB} of + {3, 6, 3, 6} -> + if + PatchA >= 6 -> PatchB >= 6; + PatchA < 6 -> PatchB < 6; + true -> false + end; + _ + when (MajA < 3 orelse (MajA =:= 3 andalso MinA =< 6)) + orelse + (MajB < 3 orelse (MajB =:= 3 andalso MinB =< 6)) -> + MajA =:= MajB andalso MinA =:= MinB; + _ -> + %% Starting with RabbitMQ 3.7.x, we consider this + %% minor release series and all subsequent series to + %% be possibly compatible, based on just the version. + %% The real compatibility check is deferred to the + %% rabbit_feature_flags module in rabbitmq-server. + true + end. + +%% This is the same as above except that e.g. 3.7.x and 3.8.x are +%% considered incompatible (as if there were no feature flags). This is +%% useful to check plugin compatibility (`broker_versions_requirement` +%% field in plugins). + +strict_version_minor_equivalent(A, B) -> + {{MajA, MinA, PatchA, _}, _} = rabbit_semver:normalize(rabbit_semver:parse(A)), + {{MajB, MinB, PatchB, _}, _} = rabbit_semver:normalize(rabbit_semver:parse(B)), + + case {MajA, MinA, MajB, MinB} of + {3, 6, 3, 6} -> if + PatchA >= 6 -> PatchB >= 6; + PatchA < 6 -> PatchB < 6; + true -> false + end; + _ -> MajA =:= MajB andalso MinA =:= MinB + end. + +dict_cons(Key, Value, Dict) -> + dict:update(Key, fun (List) -> [Value | List] end, [Value], Dict). + +orddict_cons(Key, Value, Dict) -> + orddict:update(Key, fun (List) -> [Value | List] end, [Value], Dict). + +maps_cons(Key, Value, Map) -> + maps:update_with(Key, fun (List) -> [Value | List] end, [Value], Map). + +gb_trees_cons(Key, Value, Tree) -> + case gb_trees:lookup(Key, Tree) of + {value, Values} -> gb_trees:update(Key, [Value | Values], Tree); + none -> gb_trees:insert(Key, [Value], Tree) + end. + +gb_trees_fold(Fun, Acc, Tree) -> + gb_trees_fold1(Fun, Acc, gb_trees:next(gb_trees:iterator(Tree))). + +gb_trees_fold1(_Fun, Acc, none) -> + Acc; +gb_trees_fold1(Fun, Acc, {Key, Val, It}) -> + gb_trees_fold1(Fun, Fun(Key, Val, Acc), gb_trees:next(It)). + +gb_trees_foreach(Fun, Tree) -> + gb_trees_fold(fun (Key, Val, Acc) -> Fun(Key, Val), Acc end, ok, Tree). + +module_attributes(Module) -> + try + Module:module_info(attributes) + catch + _:undef -> + io:format("WARNING: module ~p not found, so not scanned for boot steps.~n", + [Module]), + [] + end. + +all_module_attributes(Name) -> + Apps = [App || {App, _, _} <- application:loaded_applications()], + module_attributes_from_apps(Name, Apps). + +rabbitmq_related_module_attributes(Name) -> + Apps = rabbitmq_related_apps(), + module_attributes_from_apps(Name, Apps). + +rabbitmq_related_apps() -> + [App + || {App, _, _} <- application:loaded_applications(), + %% Only select RabbitMQ-related applications. + App =:= rabbit_common orelse + App =:= rabbitmq_prelaunch orelse + App =:= rabbit orelse + lists:member( + rabbit, + element(2, application:get_key(App, applications)))]. + +module_attributes_from_apps(Name, Apps) -> + Targets = + lists:usort( + lists:append( + [[{App, Module} || Module <- Modules] || + App <- Apps, + {ok, Modules} <- [application:get_key(App, modules)]])), + lists:foldl( + fun ({App, Module}, Acc) -> + case lists:append([Atts || {N, Atts} <- module_attributes(Module), + N =:= Name]) of + [] -> Acc; + Atts -> [{App, Module, Atts} | Acc] + end + end, [], Targets). + +build_acyclic_graph(VertexFun, EdgeFun, Graph) -> + G = digraph:new([acyclic]), + try + _ = [case digraph:vertex(G, Vertex) of + false -> digraph:add_vertex(G, Vertex, Label); + _ -> ok = throw({graph_error, {vertex, duplicate, Vertex}}) + end || GraphElem <- Graph, + {Vertex, Label} <- VertexFun(GraphElem)], + [case digraph:add_edge(G, From, To) of + {error, E} -> throw({graph_error, {edge, E, From, To}}); + _ -> ok + end || GraphElem <- Graph, + {From, To} <- EdgeFun(GraphElem)], + {ok, G} + catch {graph_error, Reason} -> + true = digraph:delete(G), + {error, Reason} + end. + +const(X) -> fun () -> X end. + +%% Format IPv4-mapped IPv6 addresses as IPv4, since they're what we see +%% when IPv6 is enabled but not used (i.e. 99% of the time). +ntoa({0,0,0,0,0,16#ffff,AB,CD}) -> + inet_parse:ntoa({AB bsr 8, AB rem 256, CD bsr 8, CD rem 256}); +ntoa(IP) -> + inet_parse:ntoa(IP). + +ntoab(IP) -> + Str = ntoa(IP), + case string:str(Str, ":") of + 0 -> Str; + _ -> "[" ++ Str ++ "]" + end. + +%% We try to avoid reconnecting to down nodes here; this is used in a +%% loop in rabbit_amqqueue:on_node_down/1 and any delays we incur +%% would be bad news. +%% +%% See also rabbit_mnesia:is_process_alive/1 which also requires the +%% process be in the same running cluster as us (i.e. not partitioned +%% or some random node). +is_process_alive(Pid) when node(Pid) =:= node() -> + erlang:is_process_alive(Pid); +is_process_alive(Pid) -> + Node = node(Pid), + lists:member(Node, [node() | nodes(connected)]) andalso + rpc:call(Node, erlang, is_process_alive, [Pid]) =:= true. + +-spec pget(term(), list() | map()) -> term(). +pget(K, M) when is_map(M) -> + case maps:find(K, M) of + {ok, V} -> + V; + _ -> + undefined + end; + +pget(K, P) -> + case lists:keyfind(K, 1, P) of + {K, V} -> + V; + _ -> + undefined + end. + +-spec pget(term(), list() | map(), term()) -> term(). +pget(K, M, D) when is_map(M) -> + case maps:find(K, M) of + {ok, V} -> + V; + _ -> + D + end; + +pget(K, P, D) -> + case lists:keyfind(K, 1, P) of + {K, V} -> + V; + _ -> + D + end. + +-spec pget_or_die(term(), list() | map()) -> term() | no_return(). +pget_or_die(K, M) when is_map(M) -> + case maps:find(K, M) of + error -> exit({error, key_missing, K}); + {ok, V} -> V + end; + +pget_or_die(K, P) -> + case proplists:get_value(K, P) of + undefined -> exit({error, key_missing, K}); + V -> V + end. + +pupdate(K, UpdateFun, P) -> + case lists:keyfind(K, 1, P) of + {K, V} -> + pset(K, UpdateFun(V), P); + _ -> + undefined + end. + +%% property merge +pmerge(Key, Val, List) -> + case proplists:is_defined(Key, List) of + true -> List; + _ -> [{Key, Val} | List] + end. + +%% proplists merge +plmerge(P1, P2) -> + %% Value from P1 suppresses value from P2 + maps:to_list(maps:merge(maps:from_list(P2), + maps:from_list(P1))). + +%% groups a list of proplists by a key function +group_proplists_by(KeyFun, ListOfPropLists) -> + Res = lists:foldl(fun(P, Agg) -> + Key = KeyFun(P), + Val = case maps:find(Key, Agg) of + {ok, O} -> [P|O]; + error -> [P] + end, + maps:put(Key, Val, Agg) + end, #{}, ListOfPropLists), + [ X || {_, X} <- maps:to_list(Res)]. + +pset(Key, Value, List) -> [{Key, Value} | proplists:delete(Key, List)]. + +format_message_queue(_Opt, MQ) -> + Len = priority_queue:len(MQ), + {Len, + case Len > 100 of + false -> priority_queue:to_list(MQ); + true -> {summary, + maps:to_list( + lists:foldl( + fun ({P, V}, Counts) -> + maps:update_with( + {P, format_message_queue_entry(V)}, + fun(Old) -> Old + 1 end, 1, Counts) + end, maps:new(), priority_queue:to_list(MQ)))} + end}. + +format_message_queue_entry(V) when is_atom(V) -> + V; +format_message_queue_entry(V) when is_tuple(V) -> + list_to_tuple([format_message_queue_entry(E) || E <- tuple_to_list(V)]); +format_message_queue_entry(_V) -> + '_'. + +%% Same as rpc:multicall/4 but concatenates all results. +%% M, F, A is expected to return a list. If it does not, +%% its return value will be wrapped in a list. +-spec append_rpc_all_nodes([node()], atom(), atom(), [any()]) -> [any()]. +append_rpc_all_nodes(Nodes, M, F, A) -> + do_append_rpc_all_nodes(Nodes, M, F, A, ?RPC_INFINITE_TIMEOUT). + +-spec append_rpc_all_nodes([node()], atom(), atom(), [any()], timeout()) -> [any()]. +append_rpc_all_nodes(Nodes, M, F, A, Timeout) -> + do_append_rpc_all_nodes(Nodes, M, F, A, Timeout). + +do_append_rpc_all_nodes(Nodes, M, F, A, ?RPC_INFINITE_TIMEOUT) -> + {ResL, _} = rpc:multicall(Nodes, M, F, A, ?RPC_INFINITE_TIMEOUT), + process_rpc_multicall_result(ResL); +do_append_rpc_all_nodes(Nodes, M, F, A, Timeout) -> + {ResL, _} = try + rpc:multicall(Nodes, M, F, A, Timeout) + catch + error:internal_error -> {[], Nodes} + end, + process_rpc_multicall_result(ResL). + +process_rpc_multicall_result(ResL) -> + lists:append([case Res of + {badrpc, _} -> []; + Xs when is_list(Xs) -> Xs; + %% wrap it in a list + Other -> [Other] + end || Res <- ResL]). + +os_cmd(Command) -> + case os:type() of + {win32, _} -> + %% Clink workaround; see + %% https://code.google.com/p/clink/issues/detail?id=141 + os:cmd(" " ++ Command); + _ -> + %% Don't just return "/bin/sh: <cmd>: not found" if not found + Exec = hd(string:tokens(Command, " ")), + case os:find_executable(Exec) of + false -> throw({command_not_found, Exec}); + _ -> os:cmd(Command) + end + end. + +is_os_process_alive(Pid) -> + with_os([{unix, fun () -> + run_ps(Pid) =:= 0 + end}, + {win32, fun () -> + PidS = rabbit_data_coercion:to_list(Pid), + case os:find_executable("tasklist.exe") of + false -> + Cmd = + format( + "PowerShell -Command " + "\"(Get-Process -Id ~s).ProcessName\"", + [PidS]), + Res = + os_cmd(Cmd ++ " 2>&1") -- [$\r, $\n], + case Res of + "erl" -> true; + "werl" -> true; + _ -> false + end; + _ -> + Cmd = + "tasklist /nh /fi " + "\"pid eq " ++ PidS ++ "\"", + Res = os_cmd(Cmd ++ " 2>&1"), + match =:= re:run(Res, + "erl\\.exe", + [{capture, none}]) + end + end}]). + +with_os(Handlers) -> + {OsFamily, _} = os:type(), + case proplists:get_value(OsFamily, Handlers) of + undefined -> throw({unsupported_os, OsFamily}); + Handler -> Handler() + end. + +run_ps(Pid) -> + Cmd = "ps -p " ++ rabbit_data_coercion:to_list(Pid), + Port = erlang:open_port({spawn, Cmd}, + [exit_status, {line, 16384}, + use_stdio, stderr_to_stdout]), + exit_loop(Port). + +exit_loop(Port) -> + receive + {Port, {exit_status, Rc}} -> Rc; + {Port, _} -> exit_loop(Port) + end. + +gb_sets_difference(S1, S2) -> + gb_sets:fold(fun gb_sets:delete_any/2, S1, S2). + +version() -> + {ok, VSN} = application:get_key(rabbit, vsn), + VSN. + +%% See https://www.erlang.org/doc/system_principles/versions.html +otp_release() -> + File = filename:join([code:root_dir(), "releases", + erlang:system_info(otp_release), "OTP_VERSION"]), + case file:read_file(File) of + {ok, VerBin} -> + %% 17.0 or later, we need the file for the minor version + string:strip(binary_to_list(VerBin), both, $\n); + {error, _} -> + %% R16B03 or earlier (no file, otp_release is correct) + %% or we couldn't read the file (so this is best we can do) + erlang:system_info(otp_release) + end. + +platform_and_version() -> + string:join(["Erlang/OTP", otp_release()], " "). + +otp_system_version() -> + string:strip(erlang:system_info(system_version), both, $\n). + +rabbitmq_and_erlang_versions() -> + {version(), otp_release()}. + +%% application:which_applications(infinity) is dangerous, since it can +%% cause deadlocks on shutdown. So we have to use a timeout variant, +%% but w/o creating spurious timeout errors. The timeout value is twice +%% that of gen_server:call/2. +which_applications() -> + try + application:which_applications(10000) + catch + exit:{timeout, _} -> [] + end. + +sequence_error([T]) -> T; +sequence_error([{error, _} = Error | _]) -> Error; +sequence_error([_ | Rest]) -> sequence_error(Rest). + +check_expiry(N) when N < 0 -> {error, {value_negative, N}}; +check_expiry(_N) -> ok. + +base64url(In) -> + lists:reverse(lists:foldl(fun ($\+, Acc) -> [$\- | Acc]; + ($\/, Acc) -> [$\_ | Acc]; + ($\=, Acc) -> Acc; + (Chr, Acc) -> [Chr | Acc] + end, [], base64:encode_to_string(In))). + +%% Ideally, you'd want Fun to run every IdealInterval. but you don't +%% want it to take more than MaxRatio of IdealInterval. So if it takes +%% more then you want to run it less often. So we time how long it +%% takes to run, and then suggest how long you should wait before +%% running it again with a user specified max interval. Times are in millis. +interval_operation({M, F, A}, MaxRatio, MaxInterval, IdealInterval, LastInterval) -> + {Micros, Res} = timer:tc(M, F, A), + {Res, case {Micros > 1000 * (MaxRatio * IdealInterval), + Micros > 1000 * (MaxRatio * LastInterval)} of + {true, true} -> lists:min([MaxInterval, + round(LastInterval * 1.5)]); + {true, false} -> LastInterval; + {false, false} -> lists:max([IdealInterval, + round(LastInterval / 1.5)]) + end}. + +ensure_timer(State, Idx, After, Msg) -> + case element(Idx, State) of + undefined -> TRef = send_after(After, self(), Msg), + setelement(Idx, State, TRef); + _ -> State + end. + +stop_timer(State, Idx) -> + case element(Idx, State) of + undefined -> State; + TRef -> cancel_timer(TRef), + setelement(Idx, State, undefined) + end. + +%% timer:send_after/3 goes through a single timer process but allows +%% long delays. erlang:send_after/3 does not have a bottleneck but +%% only allows max 2^32-1 millis. +-define(MAX_ERLANG_SEND_AFTER, 4294967295). +send_after(Millis, Pid, Msg) when Millis > ?MAX_ERLANG_SEND_AFTER -> + {ok, Ref} = timer:send_after(Millis, Pid, Msg), + {timer, Ref}; +send_after(Millis, Pid, Msg) -> + {erlang, erlang:send_after(Millis, Pid, Msg)}. + +cancel_timer({erlang, Ref}) -> _ = erlang:cancel_timer(Ref), + ok; +cancel_timer({timer, Ref}) -> {ok, cancel} = timer:cancel(Ref), + ok. + +store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}). +store_proc_name(TypeProcName) -> put(process_name, TypeProcName). + +get_proc_name() -> + case get(process_name) of + undefined -> + undefined; + {_Type, Name} -> + {ok, Name} + end. + +%% application:get_env/3 is only available in R16B01 or later. +get_env(Application, Key, Def) -> + case application:get_env(Application, Key) of + {ok, Val} -> Val; + undefined -> Def + end. + +get_channel_operation_timeout() -> + %% Default channel_operation_timeout set to net_ticktime + 10s to + %% give allowance for any down messages to be received first, + %% whenever it is used for cross-node calls with timeouts. + Default = (net_kernel:get_net_ticktime() + 10) * 1000, + application:get_env(rabbit, channel_operation_timeout, Default). + +moving_average(_Time, _HalfLife, Next, undefined) -> + Next; +%% We want the Weight to decrease as Time goes up (since Weight is the +%% weight for the current sample, not the new one), so that the moving +%% average decays at the same speed regardless of how long the time is +%% between samplings. So we want Weight = math:exp(Something), where +%% Something turns out to be negative. +%% +%% We want to determine Something here in terms of the Time taken +%% since the last measurement, and a HalfLife. So we want Weight = +%% math:exp(Time * Constant / HalfLife). What should Constant be? We +%% want Weight to be 0.5 when Time = HalfLife. +%% +%% Plug those numbers in and you get 0.5 = math:exp(Constant). Take +%% the log of each side and you get math:log(0.5) = Constant. +moving_average(Time, HalfLife, Next, Current) -> + Weight = math:exp(Time * math:log(0.5) / HalfLife), + Next * (1 - Weight) + Current * Weight. + +random(N) -> + rand:uniform(N). + +-spec escape_html_tags(string()) -> binary(). + +escape_html_tags(S) -> + escape_html_tags(rabbit_data_coercion:to_list(S), []). + + +-spec escape_html_tags(string(), string()) -> binary(). + +escape_html_tags([], Acc) -> + rabbit_data_coercion:to_binary(lists:reverse(Acc)); +escape_html_tags("<" ++ Rest, Acc) -> + escape_html_tags(Rest, lists:reverse("<", Acc)); +escape_html_tags(">" ++ Rest, Acc) -> + escape_html_tags(Rest, lists:reverse(">", Acc)); +escape_html_tags("&" ++ Rest, Acc) -> + escape_html_tags(Rest, lists:reverse("&", Acc)); +escape_html_tags([C | Rest], Acc) -> + escape_html_tags(Rest, [C | Acc]). + +%% If the server we are talking to has non-standard net_ticktime, and +%% our connection lasts a while, we could get disconnected because of +%% a timeout unless we set our ticktime to be the same. So let's do +%% that. +%% TODO: do not use an infinite timeout! +-spec rpc_call(node(), atom(), atom(), [any()]) -> any() | {badrpc, term()}. +rpc_call(Node, Mod, Fun, Args) -> + rpc_call(Node, Mod, Fun, Args, ?RPC_INFINITE_TIMEOUT). + +-spec rpc_call(node(), atom(), atom(), [any()], infinity | non_neg_integer()) -> any() | {badrpc, term()}. +rpc_call(Node, Mod, Fun, Args, Timeout) -> + case rpc:call(Node, net_kernel, get_net_ticktime, [], Timeout) of + {badrpc, _} = E -> E; + ignored -> + rpc:call(Node, Mod, Fun, Args, Timeout); + {ongoing_change_to, NewValue} -> + _ = net_kernel:set_net_ticktime(NewValue, 0), + rpc:call(Node, Mod, Fun, Args, Timeout); + Time -> + _ = net_kernel:set_net_ticktime(Time, 0), + rpc:call(Node, Mod, Fun, Args, Timeout) + end. + +get_gc_info(Pid) -> + rabbit_runtime:get_gc_info(Pid). + +%% ------------------------------------------------------------------------- +%% Begin copypasta from gen_server2.erl + +get_parent() -> + case get('$ancestors') of + [Parent | _] when is_pid (Parent) -> Parent; + [Parent | _] when is_atom(Parent) -> name_to_pid(Parent); + _ -> exit(process_was_not_started_by_proc_lib) + end. + +name_to_pid(Name) -> + case whereis(Name) of + undefined -> case whereis_name(Name) of + undefined -> exit(could_not_find_registered_name); + Pid -> Pid + end; + Pid -> Pid + end. + +whereis_name(Name) -> + case ets:lookup(global_names, Name) of + [{_Name, Pid, _Method, _RPid, _Ref}] -> + if node(Pid) == node() -> case erlang:is_process_alive(Pid) of + true -> Pid; + false -> undefined + end; + true -> Pid + end; + [] -> undefined + end. + +%% End copypasta from gen_server2.erl +%% ------------------------------------------------------------------------- |