diff options
| -rw-r--r-- | src/credit_flow.erl | 15 | ||||
| -rw-r--r-- | src/file_handle_cache.erl | 38 | ||||
| -rw-r--r-- | src/file_handle_cache_stats.erl | 7 | ||||
| -rw-r--r-- | src/gen_server2.erl | 15 | ||||
| -rw-r--r-- | src/gm.erl | 5 | ||||
| -rw-r--r-- | src/pg2_fixed.erl | 4 | ||||
| -rw-r--r-- | src/rabbit.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_auth_backend_internal.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_dead_letter.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_error_logger.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_event.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_mode_exactly.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_node_monitor.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_queue_location_random.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 7 | ||||
| -rw-r--r-- | src/supervisor2.erl | 17 | ||||
| -rw-r--r-- | src/time_compat.erl | 305 | ||||
| -rw-r--r-- | test/src/gm_soak_test.erl | 13 | ||||
| -rw-r--r-- | test/src/gm_speed_test.erl | 8 |
25 files changed, 440 insertions, 112 deletions
diff --git a/src/credit_flow.erl b/src/credit_flow.erl index 345055a361..d2f2355f03 100644 --- a/src/credit_flow.erl +++ b/src/credit_flow.erl @@ -92,11 +92,15 @@ {process_info, erlang:process_info(SELF)}, {from, FROM}, {from_info, erlang:process_info(FROM)}, - {timestamp, os:timestamp()}])). + {timestamp, + time_compat:os_system_time( + milliseconds)}])). -define(TRACE_UNBLOCKED(SELF, FROM), rabbit_event:notify(credit_flow_unblocked, [{process, SELF}, {from, FROM}, - {timestamp, os:timestamp()}])). + {timestamp, + time_compat:os_system_time( + milliseconds)}])). -else. -define(TRACE_BLOCKED(SELF, FROM), ok). -define(TRACE_UNBLOCKED(SELF, FROM), ok). @@ -150,7 +154,10 @@ state() -> case blocked() of true -> flow; false -> case get(credit_blocked_at) of undefined -> running; - B -> Diff = timer:now_diff(os:timestamp(), B), + B -> Now = time_compat:monotonic_time(), + Diff = time_compat:convert_time_unit(Now - B, + native, + micro_seconds), case Diff < ?STATE_CHANGE_INTERVAL of true -> flow; false -> running @@ -179,7 +186,7 @@ grant(To, Quantity) -> block(From) -> ?TRACE_BLOCKED(self(), From), case blocked() of - false -> put(credit_blocked_at, os:timestamp()); + false -> put(credit_blocked_at, time_compat:monotonic_time()); true -> ok end, ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]). diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index d7e5abc873..1e97632cf9 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -175,6 +175,7 @@ -record(handle, { hdl, + ref, offset, is_dirty, write_buffer_size, @@ -536,12 +537,15 @@ clear(Ref) -> end). set_maximum_since_use(MaximumAge) -> - Now = now(), + Now = time_compat:monotonic_time(), case lists:foldl( fun ({{Ref, fhc_handle}, Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) -> case Hdl =/= closed andalso - timer:now_diff(Now, Then) >= MaximumAge of + time_compat:convert_time_unit(Now - Then, + native, + micro_seconds) + >= MaximumAge of true -> soft_close(Ref, Handle) orelse Rep; false -> Rep end; @@ -708,7 +712,8 @@ get_or_reopen(RefNewOrReopens) -> {OpenHdls, []} -> {ok, [Handle || {_Ref, Handle} <- OpenHdls]}; {OpenHdls, ClosedHdls} -> - Oldest = oldest(get_age_tree(), fun () -> now() end), + Oldest = oldest(get_age_tree(), + fun () -> time_compat:monotonic_time() end), case gen_server2:call(?SERVER, {open, self(), length(ClosedHdls), Oldest}, infinity) of ok -> @@ -744,14 +749,14 @@ reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed, end, case prim_file:open(Path, Mode) of {ok, Hdl} -> - Now = now(), + Now = time_compat:monotonic_time(), {{ok, _Offset}, Handle1} = maybe_seek(Offset, reset_read_buffer( Handle#handle{hdl = Hdl, offset = 0, last_used_at = Now})), put({Ref, fhc_handle}, Handle1), - reopen(RefNewOrReopenHdls, gb_trees:insert(Now, Ref, Tree), + reopen(RefNewOrReopenHdls, gb_trees:insert({Now, Ref}, true, Tree), [{Ref, Handle1} | RefHdls]); Error -> %% NB: none of the handles in ToOpen are in the age tree @@ -780,7 +785,7 @@ sort_handles([{Ref, _} | RefHdls], RefHdlsA, [{Ref, Handle} | RefHdlsB], Acc) -> sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]). put_handle(Ref, Handle = #handle { last_used_at = Then }) -> - Now = now(), + Now = time_compat:monotonic_time(), age_tree_update(Then, Now, Ref), put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }). @@ -797,13 +802,14 @@ put_age_tree(Tree) -> put(fhc_age_tree, Tree). age_tree_update(Then, Now, Ref) -> with_age_tree( fun (Tree) -> - gb_trees:insert(Now, Ref, gb_trees:delete_any(Then, Tree)) + gb_trees:insert({Now, Ref}, true, + gb_trees:delete_any({Then, Ref}, Tree)) end). -age_tree_delete(Then) -> +age_tree_delete(Then, Ref) -> with_age_tree( fun (Tree) -> - Tree1 = gb_trees:delete_any(Then, Tree), + Tree1 = gb_trees:delete_any({Then, Ref}, Tree), Oldest = oldest(Tree1, fun () -> undefined end), gen_server2:cast(?SERVER, {close, self(), Oldest}), Tree1 @@ -814,7 +820,7 @@ age_tree_change() -> fun (Tree) -> case gb_trees:is_empty(Tree) of true -> Tree; - false -> {Oldest, _Ref} = gb_trees:smallest(Tree), + false -> {{Oldest, _Ref}, _} = gb_trees:smallest(Tree), gen_server2:cast(?SERVER, {update, self(), Oldest}) end, Tree @@ -823,7 +829,7 @@ age_tree_change() -> oldest(Tree, DefaultFun) -> case gb_trees:is_empty(Tree) of true -> DefaultFun(); - false -> {Oldest, _Ref} = gb_trees:smallest(Tree), + false -> {{Oldest, _Ref}, _} = gb_trees:smallest(Tree), Oldest end. @@ -849,6 +855,7 @@ new_closed_handle(Path, Mode, Options) -> end, Ref = make_ref(), put({Ref, fhc_handle}, #handle { hdl = closed, + ref = Ref, offset = 0, is_dirty = false, write_buffer_size = 0, @@ -883,6 +890,7 @@ soft_close(Handle = #handle { hdl = closed }) -> soft_close(Handle) -> case write_buffer(Handle) of {ok, #handle { hdl = Hdl, + ref = Ref, is_dirty = IsDirty, last_used_at = Then } = Handle1 } -> ok = case IsDirty of @@ -890,7 +898,7 @@ soft_close(Handle) -> false -> ok end, ok = prim_file:close(Hdl), - age_tree_delete(Then), + age_tree_delete(Then, Ref), {ok, Handle1 #handle { hdl = closed, is_dirty = false, last_used_at = undefined }}; @@ -1419,17 +1427,19 @@ reduce(State = #fhc_state { open_pending = OpenPending, elders = Elders, clients = Clients, timer_ref = TRef }) -> - Now = now(), + Now = time_compat:monotonic_time(), {CStates, Sum, ClientCount} = ets:foldl(fun ({Pid, Eldest}, {CStatesAcc, SumAcc, CountAcc} = Accs) -> [#cstate { pending_closes = PendingCloses, opened = Opened, blocked = Blocked } = CState] = ets:lookup(Clients, Pid), + TimeDiff = time_compat:convert_time_unit( + Now - Eldest, native, micro_seconds), case Blocked orelse PendingCloses =:= Opened of true -> Accs; false -> {[CState | CStatesAcc], - SumAcc + timer:now_diff(Now, Eldest), + SumAcc + TimeDiff, CountAcc + 1} end end, {[], 0, 0}, Elders), diff --git a/src/file_handle_cache_stats.erl b/src/file_handle_cache_stats.erl index 5f6926b5d2..5dcf19c5f1 100644 --- a/src/file_handle_cache_stats.erl +++ b/src/file_handle_cache_stats.erl @@ -61,7 +61,8 @@ get() -> %% TODO timer:tc/1 was introduced in R14B03; use that function once we %% require that version. timer_tc(Thunk) -> - T1 = os:timestamp(), + T1 = time_compat:monotonic_time(), Res = Thunk(), - T2 = os:timestamp(), - {timer:now_diff(T2, T1), Res}. + T2 = time_compat:monotonic_time(), + Diff = time_compat:convert_time_unit(T2 - T1, native, micro_seconds), + {Diff, Res}. diff --git a/src/gen_server2.erl b/src/gen_server2.erl index fd0e6553b5..56b2720cac 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -624,7 +624,10 @@ unregister_name(_Name) -> ok. extend_backoff(undefined) -> undefined; extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) -> - {backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod, now()}. + {backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod, + {erlang:phash2([node()]), + time_compat:monotonic_time(), + time_compat:unique_integer()}}. %%%======================================================================== %%% Internal functions @@ -688,7 +691,9 @@ wake_hib(GS2State = #gs2_state { timeout_state = TS }) -> undefined -> undefined; {SleptAt, TimeoutState} -> - adjust_timeout_state(SleptAt, now(), TimeoutState) + adjust_timeout_state(SleptAt, + time_compat:monotonic_time(), + TimeoutState) end, post_hibernate( drain(GS2State #gs2_state { timeout_state = TimeoutState1 })). @@ -696,7 +701,8 @@ wake_hib(GS2State = #gs2_state { timeout_state = TS }) -> hibernate(GS2State = #gs2_state { timeout_state = TimeoutState }) -> TS = case TimeoutState of undefined -> undefined; - {backoff, _, _, _, _} -> {now(), TimeoutState} + {backoff, _, _, _, _} -> {time_compat:monotonic_time(), + TimeoutState} end, proc_lib:hibernate(?MODULE, wake_hib, [GS2State #gs2_state { timeout_state = TS }]). @@ -741,7 +747,8 @@ post_hibernate(GS2State = #gs2_state { state = State, adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO, DesiredHibPeriod, RandomState}) -> - NapLengthMicros = timer:now_diff(AwokeAt, SleptAt), + NapLengthMicros = time_compat:convert_time_unit(AwokeAt - SleptAt, + native, micro_seconds), CurrentMicros = CurrentTO * 1000, MinimumMicros = MinimumTO * 1000, DesiredHibMicros = DesiredHibPeriod * 1000, diff --git a/src/gm.erl b/src/gm.erl index dbf9c295f9..3a9f4405be 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -551,8 +551,9 @@ forget_group(GroupName) -> init([GroupName, Module, Args, TxnFun]) -> put(process_name, {?MODULE, GroupName}), - {MegaSecs, Secs, MicroSecs} = now(), - random:seed(MegaSecs, Secs, MicroSecs), + random:seed(erlang:phash2([node()]), + time_compat:monotonic_time(), + time_compat:unique_integer()), Self = make_member(GroupName), gen_server2:cast(self(), join), {ok, #state { self = Self, diff --git a/src/pg2_fixed.erl b/src/pg2_fixed.erl index 8926b83b77..d6f756ce68 100644 --- a/src/pg2_fixed.erl +++ b/src/pg2_fixed.erl @@ -146,14 +146,14 @@ get_closest_pid(Name) -> [Pid] -> Pid; [] -> - {_,_,X} = erlang:now(), case get_members(Name) of [] -> {error, {no_process, Name}}; Members -> + X = time_compat:erlang_system_time(micro_seconds), lists:nth((X rem length(Members))+1, Members) end; Members when is_list(Members) -> - {_,_,X} = erlang:now(), + X = time_compat:erlang_system_time(micro_seconds), lists:nth((X rem length(Members))+1, Members); Else -> Else diff --git a/src/rabbit.erl b/src/rabbit.erl index d218a07a98..5152c11eeb 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -277,7 +277,7 @@ hipe_compile() -> Count = length(HipeModules), io:format("~nHiPE compiling: |~s|~n |", [string:copies("-", Count)]), - T1 = erlang:now(), + T1 = time_compat:monotonic_time(), PidMRefs = [spawn_monitor(fun () -> [begin {ok, M} = hipe:c(M, [o3]), io:format("#") @@ -288,8 +288,8 @@ hipe_compile() -> {'DOWN', MRef, process, _, normal} -> ok; {'DOWN', MRef, process, _, Reason} -> exit(Reason) end || {_Pid, MRef} <- PidMRefs], - T2 = erlang:now(), - Duration = timer:now_diff(T2, T1) div 1000000, + T2 = time_compat:monotonic_time(), + Duration = time_compat:convert_time_unit(T2 - T1, native, seconds), io:format("|~n~nCompiled ~B modules in ~Bs~n", [Count, Duration]), {ok, Count, Duration}. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c5e4206fe3..c1e6e5286e 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -421,7 +421,7 @@ ensure_ttl_timer(undefined, State) -> State; ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined, args_policy_version = Version}) -> - After = (case Expiry - now_micros() of + After = (case Expiry - time_compat:os_system_time(micro_seconds) of V when V > 0 -> V + 999; %% always fire later _ -> 0 end) div 1000, @@ -729,7 +729,7 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) -> {ok, MsgTTL} = rabbit_basic:parse_expiration(Props), case lists:min([TTL, MsgTTL]) of undefined -> undefined; - T -> now_micros() + T * 1000 + T -> time_compat:os_system_time(micro_seconds) + T * 1000 end. %% Logically this function should invoke maybe_send_drained/2. @@ -740,7 +740,8 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) -> drop_expired_msgs(State) -> case is_empty(State) of true -> State; - false -> drop_expired_msgs(now_micros(), State) + false -> drop_expired_msgs(time_compat:os_system_time(micro_seconds), + State) end. drop_expired_msgs(Now, State = #q{backing_queue_state = BQS, @@ -803,8 +804,6 @@ stop(State) -> stop(noreply, State). stop(noreply, State) -> {stop, normal, State}; stop(Reply, State) -> {stop, normal, Reply, State}. -now_micros() -> timer:now_diff(now(), {0,0,0}). - infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(name, #q{q = #amqqueue{name = Name}}) -> Name; @@ -1306,8 +1305,11 @@ handle_pre_hibernate(State = #q{backing_queue = BQ, BQS3 = BQ:handle_pre_hibernate(BQS2), rabbit_event:if_enabled( State, #q.stats_timer, - fun () -> emit_stats(State, [{idle_since, now()}, - {consumer_utilisation, ''}]) end), + fun () -> emit_stats(State, + [{idle_since, + time_compat:os_system_time(milli_seconds)}, + {consumer_utilisation, ''}]) + end), State1 = rabbit_event:stop_stats_timer(State#q{backing_queue_state = BQS3}, #q.stats_timer), {hibernate, stop_rate_timer(State1)}. diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl index e53ce50c22..ced109d3ff 100644 --- a/src/rabbit_auth_backend_internal.erl +++ b/src/rabbit_auth_backend_internal.erl @@ -202,8 +202,9 @@ clear_password(Username) -> R. hash_password(Cleartext) -> - {A1,A2,A3} = now(), - random:seed(A1, A2, A3), + random:seed(erlang:phash2([node()]), + time_compat:monotonic_time(), + time_compat:unique_integer()), Salt = random:uniform(16#ffffffff), SaltBin = <<Salt:32>>, Hash = salted_md5(SaltBin, Cleartext), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f7383c87cd..db16817845 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -578,7 +578,10 @@ handle_pre_hibernate(State) -> ok = clear_permission_cache(), rabbit_event:if_enabled( State, #ch.stats_timer, - fun () -> emit_stats(State, [{idle_since, now()}]) end), + fun () -> emit_stats(State, + [{idle_since, + time_compat:os_system_time(milli_seconds)}]) + end), {hibernate, rabbit_event:stop_stats_timer(State, #ch.stats_timer)}. terminate(Reason, State) -> diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl index 29032df856..5a4aad10da 100644 --- a/src/rabbit_dead_letter.erl +++ b/src/rabbit_dead_letter.erl @@ -53,7 +53,7 @@ make_msg(Msg = #basic_message{content = Content, _ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end} end, ReasonBin = list_to_binary(atom_to_list(Reason)), - TimeSec = rabbit_misc:now_ms() div 1000, + TimeSec = time_compat:os_system_time(seconds), PerMsgTTL = per_msg_ttl_header(Content#content.properties), HeadersFun2 = fun (Headers) -> diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index eecb2d64d9..425d171bae 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -101,7 +101,7 @@ publish(_Other, _Format, _Data, _State) -> publish1(RoutingKey, Format, Data, LogExch) -> %% 0-9-1 says the timestamp is a "64 bit POSIX timestamp". That's %% second resolution, not millisecond. - Timestamp = rabbit_misc:now_ms() div 1000, + Timestamp = time_compat:os_system_time(seconds), Args = [truncate:term(A, ?LOG_TRUNC) || A <- Data], {ok, _DeliveredQPids} = diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl index 13bf6bc6f8..4d62dd079b 100644 --- a/src/rabbit_event.erl +++ b/src/rabbit_event.erl @@ -37,8 +37,7 @@ -type(event_type() :: atom()). -type(event_props() :: term()). --type(event_timestamp() :: - {non_neg_integer(), non_neg_integer(), non_neg_integer()}). +-type(event_timestamp() :: non_neg_integer()). -type(event() :: #event { type :: event_type(), props :: event_props(), @@ -160,5 +159,5 @@ event_cons(Type, Props, Ref) -> #event{type = Type, props = Props, reference = Ref, - timestamp = os:timestamp()}. + timestamp = time_compat:os_system_time(milli_seconds)}. diff --git a/src/rabbit_mirror_queue_mode_exactly.erl b/src/rabbit_mirror_queue_mode_exactly.erl index 0c0b7a10e8..4721ad6136 100644 --- a/src/rabbit_mirror_queue_mode_exactly.erl +++ b/src/rabbit_mirror_queue_mode_exactly.erl @@ -45,8 +45,9 @@ suggested_queue_nodes(Count, MNode, SNodes, _SSNodes, Poss) -> end}. shuffle(L) -> - {A1,A2,A3} = now(), - random:seed(A1, A2, A3), + random:seed(erlang:phash2([node()]), + time_compat:monotonic_time(), + time_compat:unique_integer()), {_, L1} = lists:unzip(lists:keysort(1, [{random:uniform(), N} || N <- L])), L1. diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index 9a8d55f94b..b76422ee6b 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -100,7 +100,7 @@ master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) -> master_go0(Args, BQ, BQS) -> case BQ:fold(fun (Msg, MsgProps, Unacked, Acc) -> master_send(Msg, MsgProps, Unacked, Args, Acc) - end, {0, erlang:now()}, BQS) of + end, {0, time_compat:monotonic_time()}, BQS) of {{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1}; {{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1}; {_, BQS1} -> master_done(Args, BQS1) @@ -108,10 +108,12 @@ master_go0(Args, BQ, BQS) -> master_send(Msg, MsgProps, Unacked, {Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, {I, Last}) -> - T = case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of + Interval = time_compat:convert_time_unit( + time_compat:monotonic_time() - Last, native, micro_seconds), + T = case Interval > ?SYNC_PROGRESS_INTERVAL of true -> EmitStats({syncing, I}), Log("~p messages", [I]), - erlang:now(); + time_compat:monotonic_time(); false -> Last end, HandleInfo({syncing, I}), diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index ce563bf665..4f65cbfd7a 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -51,7 +51,6 @@ -export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]). -export([gb_trees_fold/3, gb_trees_foreach/2]). -export([all_module_attributes/1, build_acyclic_graph/3]). --export([now_ms/0]). -export([const/1]). -export([ntoa/1, ntoab/1]). -export([is_process_alive/1]). @@ -71,7 +70,6 @@ -export([get_parent/0]). -export([store_proc_name/1, store_proc_name/2]). -export([moving_average/4]). --export([now_to_ms/1]). -export([get_env/3]). %% Horrible macro to use in guards @@ -222,7 +220,6 @@ {'edge', ({bad_vertex, digraph:vertex()} | {bad_edge, [digraph:vertex()]}), digraph:vertex(), digraph:vertex()})). --spec(now_ms/0 :: () -> non_neg_integer()). -spec(const/1 :: (A) -> thunk(A)). -spec(ntoa/1 :: (inet:ip_address()) -> string()). -spec(ntoab/1 :: (inet:ip_address()) -> string()). @@ -259,9 +256,6 @@ -spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok). -spec(moving_average/4 :: (float(), float(), float(), float() | 'undefined') -> float()). --spec(now_to_ms/1 :: ({non_neg_integer(), - non_neg_integer(), - non_neg_integer()}) -> pos_integer()). -spec(get_env/3 :: (atom(), atom(), term()) -> term()). -endif. @@ -804,9 +798,6 @@ gb_trees_fold1(Fun, Acc, {Key, Val, It}) -> gb_trees_foreach(Fun, Tree) -> gb_trees_fold(fun (Key, Val, Acc) -> Fun(Key, Val), Acc end, ok, Tree). -now_ms() -> - timer:now_diff(now(), {0,0,0}) div 1000. - module_attributes(Module) -> case catch Module:module_info(attributes) of {'EXIT', {undef, [{Module, module_info, _} | _]}} -> @@ -1038,9 +1029,6 @@ term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse V =:= true orelse V =:= false -> V. -now_to_ms({Mega, Sec, Micro}) -> - (Mega * 1000000 * 1000000 + Sec * 1000000 + Micro) div 1000. - check_expiry(N) when N < 0 -> {error, {value_negative, N}}; check_expiry(_N) -> ok. diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index e3960c5c8a..43db5431e0 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -288,24 +288,28 @@ workaround_global_hang() -> receive global_sync_done -> ok - after 15000 -> + after 10000 -> find_blocked_global_peers() end. find_blocked_global_peers() -> + Snapshot1 = snapshot_global_dict(), + timer:sleep(10000), + Snapshot2 = snapshot_global_dict(), + find_blocked_global_peers1(Snapshot2, Snapshot1). + +snapshot_global_dict() -> {status, _, _, [Dict | _]} = sys:get_status(global_name_server), - find_blocked_global_peers1(Dict). + [E || {{sync_tag_his, _}, _} = E <- Dict]. -find_blocked_global_peers1([{{sync_tag_his, Peer}, Timestamp} | Rest]) -> - Diff = timer:now_diff(erlang:now(), Timestamp), - if - Diff >= 10000 -> unblock_global_peer(Peer); - true -> ok +find_blocked_global_peers1([{{sync_tag_his, Peer}, _} = Item | Rest], + OlderSnapshot) -> + case lists:member(Item, OlderSnapshot) of + true -> unblock_global_peer(Peer); + false -> ok end, - find_blocked_global_peers1(Rest); -find_blocked_global_peers1([_ | Rest]) -> - find_blocked_global_peers1(Rest); -find_blocked_global_peers1([]) -> + find_blocked_global_peers1(Rest, OlderSnapshot); +find_blocked_global_peers1([], _) -> ok. unblock_global_peer(PeerNode) -> diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index ae8481aaf8..34e447ccd3 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -99,7 +99,9 @@ %%---------------------------------------------------------------------------- new() -> #state{consumers = priority_queue:new(), - use = {active, now_micros(), 1.0}}. + use = {active, + time_compat:monotonic_time(micro_seconds), + 1.0}}. max_active_priority(#state{consumers = Consumers}) -> priority_queue:highest(Consumers). @@ -346,9 +348,9 @@ drain_mode(true) -> drain; drain_mode(false) -> manual. utilisation(#state{use = {active, Since, Avg}}) -> - use_avg(now_micros() - Since, 0, Avg); + use_avg(time_compat:monotonic_time(micro_seconds) - Since, 0, Avg); utilisation(#state{use = {inactive, Since, Active, Avg}}) -> - use_avg(Active, now_micros() - Since, Avg). + use_avg(Active, time_compat:monotonic_time(micro_seconds) - Since, Avg). %%---------------------------------------------------------------------------- @@ -455,14 +457,12 @@ update_use({inactive, _, _, _} = CUInfo, inactive) -> update_use({active, _, _} = CUInfo, active) -> CUInfo; update_use({active, Since, Avg}, inactive) -> - Now = now_micros(), + Now = time_compat:monotonic_time(micro_seconds), {inactive, Now, Now - Since, Avg}; update_use({inactive, Since, Active, Avg}, active) -> - Now = now_micros(), + Now = time_compat:monotonic_time(micro_seconds), {active, Now, use_avg(Active, Now - Since, Avg)}. use_avg(Active, Inactive, Avg) -> Time = Inactive + Active, rabbit_misc:moving_average(Time, ?USE_AVG_HALF_LIFE, Active / Time, Avg). - -now_micros() -> timer:now_diff(now(), {0,0,0}). diff --git a/src/rabbit_queue_location_random.erl b/src/rabbit_queue_location_random.erl index b708077cf8..6a3d1fea67 100644 --- a/src/rabbit_queue_location_random.erl +++ b/src/rabbit_queue_location_random.erl @@ -39,6 +39,6 @@ description() -> queue_master_location(#amqqueue{}) -> Cluster = rabbit_queue_master_location_misc:all_nodes(), - RandomPos = erlang:phash(now(), length(Cluster)), + RandomPos = erlang:phash2(time_compat:monotonic_time(), length(Cluster)), MasterNode = lists:nth(RandomPos, Cluster), {ok, MasterNode}. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index eaa2c0d3bb..c1cfb10c67 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -358,7 +358,8 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> capabilities = [], auth_mechanism = none, auth_state = none, - connected_at = rabbit_misc:now_to_ms(os:timestamp())}, + connected_at = time_compat:os_system_time( + milli_seconds)}, callback = uninitialized_callback, recv_len = 0, pending_recv = false, @@ -621,7 +622,8 @@ maybe_block(State = #v1{connection_state = blocking, State1 = State#v1{connection_state = blocked, throttle = update_last_blocked_by( Throttle#throttle{ - last_blocked_at = os:timestamp()})}, + last_blocked_at = + time_compat:monotonic_time()})}, case {blocked_by_alarm(State), blocked_by_alarm(State1)} of {false, true} -> ok = send_blocked(State1); {_, _} -> ok @@ -1307,7 +1309,9 @@ i(state, #v1{connection_state = ConnectionState, (credit_flow:blocked() %% throttled by flow now orelse %% throttled by flow recently (WasBlockedBy =:= flow andalso T =/= never andalso - timer:now_diff(os:timestamp(), T) < 5000000)) of + time_compat:convert_time_unit(time_compat:monotonic_time() - T, + native, + micro_seconds) < 5000000)) of true -> flow; false -> ConnectionState end; diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 213e600ef9..99ea33b973 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -792,7 +792,7 @@ update_rates(State = #vqstate{ in_counter = InCount, ack_in = AckInRate, ack_out = AckOutRate, timestamp = TS }}) -> - Now = erlang:now(), + Now = time_compat:monotonic_time(), Rates = #rates { in = update_rate(Now, TS, InCount, InRate), out = update_rate(Now, TS, OutCount, OutRate), @@ -807,7 +807,8 @@ update_rates(State = #vqstate{ in_counter = InCount, rates = Rates }. update_rate(Now, TS, Count, Rate) -> - Time = timer:now_diff(Now, TS) / ?MICROS_PER_SECOND, + Time = time_compat:convert_time_unit(Now - TS, native, micro_seconds) / + ?MICROS_PER_SECOND, rabbit_misc:moving_average(Time, ?RATE_AVG_HALF_LIFE, Count / Time, Rate). ram_duration(State) -> @@ -1190,7 +1191,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, count = DeltaCount1, end_seq_id = NextSeqId }) end, - Now = now(), + Now = time_compat:monotonic_time(), State = #vqstate { q1 = ?QUEUE:new(), q2 = ?QUEUE:new(), diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 7b9421eb3e..c8ffbb12ea 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -1492,7 +1492,7 @@ add_restart(State) -> I = State#state.intensity, P = State#state.period, R = State#state.restarts, - Now = erlang:now(), + Now = time_compat:monotonic_time(), R1 = add_restart([Now|R], Now, P), State1 = State#state{restarts = R1}, case length(R1) of @@ -1513,26 +1513,13 @@ add_restart([], _, _) -> []. inPeriod(Time, Now, Period) -> - case difference(Time, Now) of + case time_compat:convert_time_unit(Now - Time, native, seconds) of T when T > Period -> false; _ -> true end. -%% -%% Time = {MegaSecs, Secs, MicroSecs} (NOTE: MicroSecs is ignored) -%% Calculate the time elapsed in seconds between two timestamps. -%% If MegaSecs is equal just subtract Secs. -%% Else calculate the Mega difference and add the Secs difference, -%% note that Secs difference can be negative, e.g. -%% {827, 999999, 676} diff {828, 1, 653753} == > 2 secs. -%% -difference({TimeM, TimeS, _}, {CurM, CurS, _}) when CurM > TimeM -> - ((CurM - TimeM) * 1000000) + (CurS - TimeS); -difference({_, TimeS, _}, {_, CurS, _}) -> - CurS - TimeS. - %%% ------------------------------------------------------ %%% Error and progress reporting. %%% ------------------------------------------------------ diff --git a/src/time_compat.erl b/src/time_compat.erl new file mode 100644 index 0000000000..b87c6cc550 --- /dev/null +++ b/src/time_compat.erl @@ -0,0 +1,305 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2014-2015. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% + +%% +%% If your code need to be able to execute on ERTS versions both +%% earlier and later than 7.0, the best approach is to use the new +%% time API introduced in ERTS 7.0 and implement a fallback +%% solution using the old primitives to be used on old ERTS +%% versions. This way your code can automatically take advantage +%% of the improvements in the API when available. This is an +%% example of how to implement such an API, but it can be used +%% as is if you want to. Just add (a preferrably renamed version of) +%% this module to your project, and call the API via this module +%% instead of calling the BIFs directly. +%% + +-module(time_compat). + +%% We don't want warnings about the use of erlang:now/0 in +%% this module. +-compile(nowarn_deprecated_function). +%% +%% We don't use +%% -compile({nowarn_deprecated_function, [{erlang, now, 0}]}). +%% since this will produce warnings when compiled on systems +%% where it has not yet been deprecated. +%% + +-export([monotonic_time/0, + monotonic_time/1, + erlang_system_time/0, + erlang_system_time/1, + os_system_time/0, + os_system_time/1, + time_offset/0, + time_offset/1, + convert_time_unit/3, + timestamp/0, + unique_integer/0, + unique_integer/1, + monitor/2, + system_info/1, + system_flag/2]). + +monotonic_time() -> + try + erlang:monotonic_time() + catch + error:undef -> + %% Use Erlang system time as monotonic time + erlang_system_time_fallback() + end. + +monotonic_time(Unit) -> + try + erlang:monotonic_time(Unit) + catch + error:badarg -> + erlang:error(badarg, [Unit]); + error:undef -> + %% Use Erlang system time as monotonic time + STime = erlang_system_time_fallback(), + try + convert_time_unit_fallback(STime, native, Unit) + catch + error:bad_time_unit -> erlang:error(badarg, [Unit]) + end + end. + +erlang_system_time() -> + try + erlang:system_time() + catch + error:undef -> + erlang_system_time_fallback() + end. + +erlang_system_time(Unit) -> + try + erlang:system_time(Unit) + catch + error:badarg -> + erlang:error(badarg, [Unit]); + error:undef -> + STime = erlang_system_time_fallback(), + try + convert_time_unit_fallback(STime, native, Unit) + catch + error:bad_time_unit -> erlang:error(badarg, [Unit]) + end + end. + +os_system_time() -> + try + os:system_time() + catch + error:undef -> + os_system_time_fallback() + end. + +os_system_time(Unit) -> + try + os:system_time(Unit) + catch + error:badarg -> + erlang:error(badarg, [Unit]); + error:undef -> + STime = os_system_time_fallback(), + try + convert_time_unit_fallback(STime, native, Unit) + catch + error:bad_time_unit -> erlang:error(badarg, [Unit]) + end + end. + +time_offset() -> + try + erlang:time_offset() + catch + error:undef -> + %% Erlang system time and Erlang monotonic + %% time are always aligned + 0 + end. + +time_offset(Unit) -> + try + erlang:time_offset(Unit) + catch + error:badarg -> + erlang:error(badarg, [Unit]); + error:undef -> + try + _ = integer_time_unit(Unit) + catch + error:bad_time_unit -> erlang:error(badarg, [Unit]) + end, + %% Erlang system time and Erlang monotonic + %% time are always aligned + 0 + end. + +convert_time_unit(Time, FromUnit, ToUnit) -> + try + erlang:convert_time_unit(Time, FromUnit, ToUnit) + catch + error:undef -> + try + convert_time_unit_fallback(Time, FromUnit, ToUnit) + catch + _:_ -> + erlang:error(badarg, [Time, FromUnit, ToUnit]) + end; + error:Error -> + erlang:error(Error, [Time, FromUnit, ToUnit]) + end. + +timestamp() -> + try + erlang:timestamp() + catch + error:undef -> + erlang:now() + end. + +unique_integer() -> + try + erlang:unique_integer() + catch + error:undef -> + {MS, S, US} = erlang:now(), + (MS*1000000+S)*1000000+US + end. + +unique_integer(Modifiers) -> + try + erlang:unique_integer(Modifiers) + catch + error:badarg -> + erlang:error(badarg, [Modifiers]); + error:undef -> + case is_valid_modifier_list(Modifiers) of + true -> + %% now() converted to an integer + %% fullfill the requirements of + %% all modifiers: unique, positive, + %% and monotonic... + {MS, S, US} = erlang:now(), + (MS*1000000+S)*1000000+US; + false -> + erlang:error(badarg, [Modifiers]) + end + end. + +monitor(Type, Item) -> + try + erlang:monitor(Type, Item) + catch + error:Error -> + case {Error, Type, Item} of + {badarg, time_offset, clock_service} -> + %% Time offset is final and will never change. + %% Return a dummy reference, there will never + %% be any need for 'CHANGE' messages... + make_ref(); + _ -> + erlang:error(Error, [Type, Item]) + end + end. + +system_info(Item) -> + try + erlang:system_info(Item) + catch + error:badarg -> + case Item of + time_correction -> + case erlang:system_info(tolerant_timeofday) of + enabled -> true; + disabled -> false + end; + time_warp_mode -> + no_time_warp; + time_offset -> + final; + NotSupArg when NotSupArg == os_monotonic_time_source; + NotSupArg == os_system_time_source; + NotSupArg == start_time; + NotSupArg == end_time -> + %% Cannot emulate this... + erlang:error(notsup, [NotSupArg]); + _ -> + erlang:error(badarg, [Item]) + end; + error:Error -> + erlang:error(Error, [Item]) + end. + +system_flag(Flag, Value) -> + try + erlang:system_flag(Flag, Value) + catch + error:Error -> + case {Error, Flag, Value} of + {badarg, time_offset, finalize} -> + %% Time offset is final + final; + _ -> + erlang:error(Error, [Flag, Value]) + end + end. + +%% +%% Internal functions +%% + +integer_time_unit(native) -> 1000*1000; +integer_time_unit(nano_seconds) -> 1000*1000*1000; +integer_time_unit(micro_seconds) -> 1000*1000; +integer_time_unit(milli_seconds) -> 1000; +integer_time_unit(seconds) -> 1; +integer_time_unit(I) when is_integer(I), I > 0 -> I; +integer_time_unit(BadRes) -> erlang:error(bad_time_unit, [BadRes]). + +erlang_system_time_fallback() -> + {MS, S, US} = erlang:now(), + (MS*1000000+S)*1000000+US. + +os_system_time_fallback() -> + {MS, S, US} = os:timestamp(), + (MS*1000000+S)*1000000+US. + +convert_time_unit_fallback(Time, FromUnit, ToUnit) -> + FU = integer_time_unit(FromUnit), + TU = integer_time_unit(ToUnit), + case Time < 0 of + true -> TU*Time - (FU - 1); + false -> TU*Time + end div FU. + +is_valid_modifier_list([positive|Ms]) -> + is_valid_modifier_list(Ms); +is_valid_modifier_list([monotonic|Ms]) -> + is_valid_modifier_list(Ms); +is_valid_modifier_list([]) -> + true; +is_valid_modifier_list(_) -> + false. diff --git a/test/src/gm_soak_test.erl b/test/src/gm_soak_test.erl index 32476b56bc..64baa035b7 100644 --- a/test/src/gm_soak_test.erl +++ b/test/src/gm_soak_test.erl @@ -35,9 +35,11 @@ with_state(Fun) -> inc() -> case 1 + get(count) of - 100000 -> Now = now(), + 100000 -> Now = time_compat:monotonic_time(), Start = put(ts, Now), - Diff = timer:now_diff(Now, Start), + Diff = time_compat:convert_time_unit(Now - Start, + native, + micro_seconds), Rate = 100000 / (Diff / 1000000), io:format("~p seeing ~p msgs/sec~n", [self(), Rate]), put(count, 0); @@ -48,7 +50,7 @@ joined([], Members) -> io:format("Joined ~p (~p members)~n", [self(), length(Members)]), put(state, dict:from_list([{Member, empty} || Member <- Members])), put(count, 0), - put(ts, now()), + put(ts, time_compat:monotonic_time()), ok. members_changed([], Births, Deaths) -> @@ -101,8 +103,9 @@ handle_terminate([], Reason) -> spawn_member() -> spawn_link( fun () -> - {MegaSecs, Secs, MicroSecs} = now(), - random:seed(MegaSecs, Secs, MicroSecs), + random:seed(erlang:phash2([node()]), + time_compat:monotonic_time(), + time_compat:unique_integer()), %% start up delay of no more than 10 seconds timer:sleep(random:uniform(10000)), {ok, Pid} = gm:start_link( diff --git a/test/src/gm_speed_test.erl b/test/src/gm_speed_test.erl index f11e7d487b..b0693fc136 100644 --- a/test/src/gm_speed_test.erl +++ b/test/src/gm_speed_test.erl @@ -49,12 +49,14 @@ wile_e_coyote(Time, WriteUnit) -> receive joined -> ok end, timer:sleep(1000), %% wait for all to join timer:send_after(Time, stop), - Start = now(), + Start = time_compat:monotonic_time(), {Sent, Received} = loop(Pid, WriteUnit, 0, 0), - End = now(), + End = time_compat:monotonic_time(), ok = gm:leave(Pid), receive terminated -> ok end, - Elapsed = timer:now_diff(End, Start) / 1000000, + Elapsed = time_compat:convert_time_unit(End - Start, + native, + micro_seconds) / 1000000, io:format("Sending rate: ~p msgs/sec~nReceiving rate: ~p msgs/sec~n~n", [Sent/Elapsed, Received/Elapsed]), ok. |
