summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/credit_flow.erl15
-rw-r--r--src/file_handle_cache.erl38
-rw-r--r--src/file_handle_cache_stats.erl7
-rw-r--r--src/gen_server2.erl15
-rw-r--r--src/gm.erl5
-rw-r--r--src/pg2_fixed.erl4
-rw-r--r--src/rabbit.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl16
-rw-r--r--src/rabbit_auth_backend_internal.erl5
-rw-r--r--src/rabbit_channel.erl5
-rw-r--r--src/rabbit_dead_letter.erl2
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_event.erl5
-rw-r--r--src/rabbit_mirror_queue_mode_exactly.erl5
-rw-r--r--src/rabbit_mirror_queue_sync.erl8
-rw-r--r--src/rabbit_misc.erl12
-rw-r--r--src/rabbit_node_monitor.erl26
-rw-r--r--src/rabbit_queue_consumers.erl14
-rw-r--r--src/rabbit_queue_location_random.erl2
-rw-r--r--src/rabbit_reader.erl10
-rw-r--r--src/rabbit_variable_queue.erl7
-rw-r--r--src/supervisor2.erl17
-rw-r--r--src/time_compat.erl305
-rw-r--r--test/src/gm_soak_test.erl13
-rw-r--r--test/src/gm_speed_test.erl8
25 files changed, 440 insertions, 112 deletions
diff --git a/src/credit_flow.erl b/src/credit_flow.erl
index 345055a361..d2f2355f03 100644
--- a/src/credit_flow.erl
+++ b/src/credit_flow.erl
@@ -92,11 +92,15 @@
{process_info, erlang:process_info(SELF)},
{from, FROM},
{from_info, erlang:process_info(FROM)},
- {timestamp, os:timestamp()}])).
+ {timestamp,
+ time_compat:os_system_time(
+ milliseconds)}])).
-define(TRACE_UNBLOCKED(SELF, FROM), rabbit_event:notify(credit_flow_unblocked,
[{process, SELF},
{from, FROM},
- {timestamp, os:timestamp()}])).
+ {timestamp,
+ time_compat:os_system_time(
+ milliseconds)}])).
-else.
-define(TRACE_BLOCKED(SELF, FROM), ok).
-define(TRACE_UNBLOCKED(SELF, FROM), ok).
@@ -150,7 +154,10 @@ state() -> case blocked() of
true -> flow;
false -> case get(credit_blocked_at) of
undefined -> running;
- B -> Diff = timer:now_diff(os:timestamp(), B),
+ B -> Now = time_compat:monotonic_time(),
+ Diff = time_compat:convert_time_unit(Now - B,
+ native,
+ micro_seconds),
case Diff < ?STATE_CHANGE_INTERVAL of
true -> flow;
false -> running
@@ -179,7 +186,7 @@ grant(To, Quantity) ->
block(From) ->
?TRACE_BLOCKED(self(), From),
case blocked() of
- false -> put(credit_blocked_at, os:timestamp());
+ false -> put(credit_blocked_at, time_compat:monotonic_time());
true -> ok
end,
?UPDATE(credit_blocked, [], Blocks, [From | Blocks]).
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index d7e5abc873..1e97632cf9 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -175,6 +175,7 @@
-record(handle,
{ hdl,
+ ref,
offset,
is_dirty,
write_buffer_size,
@@ -536,12 +537,15 @@ clear(Ref) ->
end).
set_maximum_since_use(MaximumAge) ->
- Now = now(),
+ Now = time_compat:monotonic_time(),
case lists:foldl(
fun ({{Ref, fhc_handle},
Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) ->
case Hdl =/= closed andalso
- timer:now_diff(Now, Then) >= MaximumAge of
+ time_compat:convert_time_unit(Now - Then,
+ native,
+ micro_seconds)
+ >= MaximumAge of
true -> soft_close(Ref, Handle) orelse Rep;
false -> Rep
end;
@@ -708,7 +712,8 @@ get_or_reopen(RefNewOrReopens) ->
{OpenHdls, []} ->
{ok, [Handle || {_Ref, Handle} <- OpenHdls]};
{OpenHdls, ClosedHdls} ->
- Oldest = oldest(get_age_tree(), fun () -> now() end),
+ Oldest = oldest(get_age_tree(),
+ fun () -> time_compat:monotonic_time() end),
case gen_server2:call(?SERVER, {open, self(), length(ClosedHdls),
Oldest}, infinity) of
ok ->
@@ -744,14 +749,14 @@ reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed,
end,
case prim_file:open(Path, Mode) of
{ok, Hdl} ->
- Now = now(),
+ Now = time_compat:monotonic_time(),
{{ok, _Offset}, Handle1} =
maybe_seek(Offset, reset_read_buffer(
Handle#handle{hdl = Hdl,
offset = 0,
last_used_at = Now})),
put({Ref, fhc_handle}, Handle1),
- reopen(RefNewOrReopenHdls, gb_trees:insert(Now, Ref, Tree),
+ reopen(RefNewOrReopenHdls, gb_trees:insert({Now, Ref}, true, Tree),
[{Ref, Handle1} | RefHdls]);
Error ->
%% NB: none of the handles in ToOpen are in the age tree
@@ -780,7 +785,7 @@ sort_handles([{Ref, _} | RefHdls], RefHdlsA, [{Ref, Handle} | RefHdlsB], Acc) ->
sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]).
put_handle(Ref, Handle = #handle { last_used_at = Then }) ->
- Now = now(),
+ Now = time_compat:monotonic_time(),
age_tree_update(Then, Now, Ref),
put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }).
@@ -797,13 +802,14 @@ put_age_tree(Tree) -> put(fhc_age_tree, Tree).
age_tree_update(Then, Now, Ref) ->
with_age_tree(
fun (Tree) ->
- gb_trees:insert(Now, Ref, gb_trees:delete_any(Then, Tree))
+ gb_trees:insert({Now, Ref}, true,
+ gb_trees:delete_any({Then, Ref}, Tree))
end).
-age_tree_delete(Then) ->
+age_tree_delete(Then, Ref) ->
with_age_tree(
fun (Tree) ->
- Tree1 = gb_trees:delete_any(Then, Tree),
+ Tree1 = gb_trees:delete_any({Then, Ref}, Tree),
Oldest = oldest(Tree1, fun () -> undefined end),
gen_server2:cast(?SERVER, {close, self(), Oldest}),
Tree1
@@ -814,7 +820,7 @@ age_tree_change() ->
fun (Tree) ->
case gb_trees:is_empty(Tree) of
true -> Tree;
- false -> {Oldest, _Ref} = gb_trees:smallest(Tree),
+ false -> {{Oldest, _Ref}, _} = gb_trees:smallest(Tree),
gen_server2:cast(?SERVER, {update, self(), Oldest})
end,
Tree
@@ -823,7 +829,7 @@ age_tree_change() ->
oldest(Tree, DefaultFun) ->
case gb_trees:is_empty(Tree) of
true -> DefaultFun();
- false -> {Oldest, _Ref} = gb_trees:smallest(Tree),
+ false -> {{Oldest, _Ref}, _} = gb_trees:smallest(Tree),
Oldest
end.
@@ -849,6 +855,7 @@ new_closed_handle(Path, Mode, Options) ->
end,
Ref = make_ref(),
put({Ref, fhc_handle}, #handle { hdl = closed,
+ ref = Ref,
offset = 0,
is_dirty = false,
write_buffer_size = 0,
@@ -883,6 +890,7 @@ soft_close(Handle = #handle { hdl = closed }) ->
soft_close(Handle) ->
case write_buffer(Handle) of
{ok, #handle { hdl = Hdl,
+ ref = Ref,
is_dirty = IsDirty,
last_used_at = Then } = Handle1 } ->
ok = case IsDirty of
@@ -890,7 +898,7 @@ soft_close(Handle) ->
false -> ok
end,
ok = prim_file:close(Hdl),
- age_tree_delete(Then),
+ age_tree_delete(Then, Ref),
{ok, Handle1 #handle { hdl = closed,
is_dirty = false,
last_used_at = undefined }};
@@ -1419,17 +1427,19 @@ reduce(State = #fhc_state { open_pending = OpenPending,
elders = Elders,
clients = Clients,
timer_ref = TRef }) ->
- Now = now(),
+ Now = time_compat:monotonic_time(),
{CStates, Sum, ClientCount} =
ets:foldl(fun ({Pid, Eldest}, {CStatesAcc, SumAcc, CountAcc} = Accs) ->
[#cstate { pending_closes = PendingCloses,
opened = Opened,
blocked = Blocked } = CState] =
ets:lookup(Clients, Pid),
+ TimeDiff = time_compat:convert_time_unit(
+ Now - Eldest, native, micro_seconds),
case Blocked orelse PendingCloses =:= Opened of
true -> Accs;
false -> {[CState | CStatesAcc],
- SumAcc + timer:now_diff(Now, Eldest),
+ SumAcc + TimeDiff,
CountAcc + 1}
end
end, {[], 0, 0}, Elders),
diff --git a/src/file_handle_cache_stats.erl b/src/file_handle_cache_stats.erl
index 5f6926b5d2..5dcf19c5f1 100644
--- a/src/file_handle_cache_stats.erl
+++ b/src/file_handle_cache_stats.erl
@@ -61,7 +61,8 @@ get() ->
%% TODO timer:tc/1 was introduced in R14B03; use that function once we
%% require that version.
timer_tc(Thunk) ->
- T1 = os:timestamp(),
+ T1 = time_compat:monotonic_time(),
Res = Thunk(),
- T2 = os:timestamp(),
- {timer:now_diff(T2, T1), Res}.
+ T2 = time_compat:monotonic_time(),
+ Diff = time_compat:convert_time_unit(T2 - T1, native, micro_seconds),
+ {Diff, Res}.
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index fd0e6553b5..56b2720cac 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -624,7 +624,10 @@ unregister_name(_Name) -> ok.
extend_backoff(undefined) ->
undefined;
extend_backoff({backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod}) ->
- {backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod, now()}.
+ {backoff, InitialTimeout, MinimumTimeout, DesiredHibPeriod,
+ {erlang:phash2([node()]),
+ time_compat:monotonic_time(),
+ time_compat:unique_integer()}}.
%%%========================================================================
%%% Internal functions
@@ -688,7 +691,9 @@ wake_hib(GS2State = #gs2_state { timeout_state = TS }) ->
undefined ->
undefined;
{SleptAt, TimeoutState} ->
- adjust_timeout_state(SleptAt, now(), TimeoutState)
+ adjust_timeout_state(SleptAt,
+ time_compat:monotonic_time(),
+ TimeoutState)
end,
post_hibernate(
drain(GS2State #gs2_state { timeout_state = TimeoutState1 })).
@@ -696,7 +701,8 @@ wake_hib(GS2State = #gs2_state { timeout_state = TS }) ->
hibernate(GS2State = #gs2_state { timeout_state = TimeoutState }) ->
TS = case TimeoutState of
undefined -> undefined;
- {backoff, _, _, _, _} -> {now(), TimeoutState}
+ {backoff, _, _, _, _} -> {time_compat:monotonic_time(),
+ TimeoutState}
end,
proc_lib:hibernate(?MODULE, wake_hib,
[GS2State #gs2_state { timeout_state = TS }]).
@@ -741,7 +747,8 @@ post_hibernate(GS2State = #gs2_state { state = State,
adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO,
DesiredHibPeriod, RandomState}) ->
- NapLengthMicros = timer:now_diff(AwokeAt, SleptAt),
+ NapLengthMicros = time_compat:convert_time_unit(AwokeAt - SleptAt,
+ native, micro_seconds),
CurrentMicros = CurrentTO * 1000,
MinimumMicros = MinimumTO * 1000,
DesiredHibMicros = DesiredHibPeriod * 1000,
diff --git a/src/gm.erl b/src/gm.erl
index dbf9c295f9..3a9f4405be 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -551,8 +551,9 @@ forget_group(GroupName) ->
init([GroupName, Module, Args, TxnFun]) ->
put(process_name, {?MODULE, GroupName}),
- {MegaSecs, Secs, MicroSecs} = now(),
- random:seed(MegaSecs, Secs, MicroSecs),
+ random:seed(erlang:phash2([node()]),
+ time_compat:monotonic_time(),
+ time_compat:unique_integer()),
Self = make_member(GroupName),
gen_server2:cast(self(), join),
{ok, #state { self = Self,
diff --git a/src/pg2_fixed.erl b/src/pg2_fixed.erl
index 8926b83b77..d6f756ce68 100644
--- a/src/pg2_fixed.erl
+++ b/src/pg2_fixed.erl
@@ -146,14 +146,14 @@ get_closest_pid(Name) ->
[Pid] ->
Pid;
[] ->
- {_,_,X} = erlang:now(),
case get_members(Name) of
[] -> {error, {no_process, Name}};
Members ->
+ X = time_compat:erlang_system_time(micro_seconds),
lists:nth((X rem length(Members))+1, Members)
end;
Members when is_list(Members) ->
- {_,_,X} = erlang:now(),
+ X = time_compat:erlang_system_time(micro_seconds),
lists:nth((X rem length(Members))+1, Members);
Else ->
Else
diff --git a/src/rabbit.erl b/src/rabbit.erl
index d218a07a98..5152c11eeb 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -277,7 +277,7 @@ hipe_compile() ->
Count = length(HipeModules),
io:format("~nHiPE compiling: |~s|~n |",
[string:copies("-", Count)]),
- T1 = erlang:now(),
+ T1 = time_compat:monotonic_time(),
PidMRefs = [spawn_monitor(fun () -> [begin
{ok, M} = hipe:c(M, [o3]),
io:format("#")
@@ -288,8 +288,8 @@ hipe_compile() ->
{'DOWN', MRef, process, _, normal} -> ok;
{'DOWN', MRef, process, _, Reason} -> exit(Reason)
end || {_Pid, MRef} <- PidMRefs],
- T2 = erlang:now(),
- Duration = timer:now_diff(T2, T1) div 1000000,
+ T2 = time_compat:monotonic_time(),
+ Duration = time_compat:convert_time_unit(T2 - T1, native, seconds),
io:format("|~n~nCompiled ~B modules in ~Bs~n", [Count, Duration]),
{ok, Count, Duration}.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c5e4206fe3..c1e6e5286e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -421,7 +421,7 @@ ensure_ttl_timer(undefined, State) ->
State;
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined,
args_policy_version = Version}) ->
- After = (case Expiry - now_micros() of
+ After = (case Expiry - time_compat:os_system_time(micro_seconds) of
V when V > 0 -> V + 999; %% always fire later
_ -> 0
end) div 1000,
@@ -729,7 +729,7 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
{ok, MsgTTL} = rabbit_basic:parse_expiration(Props),
case lists:min([TTL, MsgTTL]) of
undefined -> undefined;
- T -> now_micros() + T * 1000
+ T -> time_compat:os_system_time(micro_seconds) + T * 1000
end.
%% Logically this function should invoke maybe_send_drained/2.
@@ -740,7 +740,8 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
drop_expired_msgs(State) ->
case is_empty(State) of
true -> State;
- false -> drop_expired_msgs(now_micros(), State)
+ false -> drop_expired_msgs(time_compat:os_system_time(micro_seconds),
+ State)
end.
drop_expired_msgs(Now, State = #q{backing_queue_state = BQS,
@@ -803,8 +804,6 @@ stop(State) -> stop(noreply, State).
stop(noreply, State) -> {stop, normal, State};
stop(Reply, State) -> {stop, normal, Reply, State}.
-now_micros() -> timer:now_diff(now(), {0,0,0}).
-
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(name, #q{q = #amqqueue{name = Name}}) -> Name;
@@ -1306,8 +1305,11 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
BQS3 = BQ:handle_pre_hibernate(BQS2),
rabbit_event:if_enabled(
State, #q.stats_timer,
- fun () -> emit_stats(State, [{idle_since, now()},
- {consumer_utilisation, ''}]) end),
+ fun () -> emit_stats(State,
+ [{idle_since,
+ time_compat:os_system_time(milli_seconds)},
+ {consumer_utilisation, ''}])
+ end),
State1 = rabbit_event:stop_stats_timer(State#q{backing_queue_state = BQS3},
#q.stats_timer),
{hibernate, stop_rate_timer(State1)}.
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index e53ce50c22..ced109d3ff 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -202,8 +202,9 @@ clear_password(Username) ->
R.
hash_password(Cleartext) ->
- {A1,A2,A3} = now(),
- random:seed(A1, A2, A3),
+ random:seed(erlang:phash2([node()]),
+ time_compat:monotonic_time(),
+ time_compat:unique_integer()),
Salt = random:uniform(16#ffffffff),
SaltBin = <<Salt:32>>,
Hash = salted_md5(SaltBin, Cleartext),
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f7383c87cd..db16817845 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -578,7 +578,10 @@ handle_pre_hibernate(State) ->
ok = clear_permission_cache(),
rabbit_event:if_enabled(
State, #ch.stats_timer,
- fun () -> emit_stats(State, [{idle_since, now()}]) end),
+ fun () -> emit_stats(State,
+ [{idle_since,
+ time_compat:os_system_time(milli_seconds)}])
+ end),
{hibernate, rabbit_event:stop_stats_timer(State, #ch.stats_timer)}.
terminate(Reason, State) ->
diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl
index 29032df856..5a4aad10da 100644
--- a/src/rabbit_dead_letter.erl
+++ b/src/rabbit_dead_letter.erl
@@ -53,7 +53,7 @@ make_msg(Msg = #basic_message{content = Content,
_ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
end,
ReasonBin = list_to_binary(atom_to_list(Reason)),
- TimeSec = rabbit_misc:now_ms() div 1000,
+ TimeSec = time_compat:os_system_time(seconds),
PerMsgTTL = per_msg_ttl_header(Content#content.properties),
HeadersFun2 =
fun (Headers) ->
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index eecb2d64d9..425d171bae 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -101,7 +101,7 @@ publish(_Other, _Format, _Data, _State) ->
publish1(RoutingKey, Format, Data, LogExch) ->
%% 0-9-1 says the timestamp is a "64 bit POSIX timestamp". That's
%% second resolution, not millisecond.
- Timestamp = rabbit_misc:now_ms() div 1000,
+ Timestamp = time_compat:os_system_time(seconds),
Args = [truncate:term(A, ?LOG_TRUNC) || A <- Data],
{ok, _DeliveredQPids} =
diff --git a/src/rabbit_event.erl b/src/rabbit_event.erl
index 13bf6bc6f8..4d62dd079b 100644
--- a/src/rabbit_event.erl
+++ b/src/rabbit_event.erl
@@ -37,8 +37,7 @@
-type(event_type() :: atom()).
-type(event_props() :: term()).
--type(event_timestamp() ::
- {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
+-type(event_timestamp() :: non_neg_integer()).
-type(event() :: #event { type :: event_type(),
props :: event_props(),
@@ -160,5 +159,5 @@ event_cons(Type, Props, Ref) ->
#event{type = Type,
props = Props,
reference = Ref,
- timestamp = os:timestamp()}.
+ timestamp = time_compat:os_system_time(milli_seconds)}.
diff --git a/src/rabbit_mirror_queue_mode_exactly.erl b/src/rabbit_mirror_queue_mode_exactly.erl
index 0c0b7a10e8..4721ad6136 100644
--- a/src/rabbit_mirror_queue_mode_exactly.erl
+++ b/src/rabbit_mirror_queue_mode_exactly.erl
@@ -45,8 +45,9 @@ suggested_queue_nodes(Count, MNode, SNodes, _SSNodes, Poss) ->
end}.
shuffle(L) ->
- {A1,A2,A3} = now(),
- random:seed(A1, A2, A3),
+ random:seed(erlang:phash2([node()]),
+ time_compat:monotonic_time(),
+ time_compat:unique_integer()),
{_, L1} = lists:unzip(lists:keysort(1, [{random:uniform(), N} || N <- L])),
L1.
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index 9a8d55f94b..b76422ee6b 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -100,7 +100,7 @@ master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) ->
master_go0(Args, BQ, BQS) ->
case BQ:fold(fun (Msg, MsgProps, Unacked, Acc) ->
master_send(Msg, MsgProps, Unacked, Args, Acc)
- end, {0, erlang:now()}, BQS) of
+ end, {0, time_compat:monotonic_time()}, BQS) of
{{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1};
{{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1};
{_, BQS1} -> master_done(Args, BQS1)
@@ -108,10 +108,12 @@ master_go0(Args, BQ, BQS) ->
master_send(Msg, MsgProps, Unacked,
{Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, {I, Last}) ->
- T = case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of
+ Interval = time_compat:convert_time_unit(
+ time_compat:monotonic_time() - Last, native, micro_seconds),
+ T = case Interval > ?SYNC_PROGRESS_INTERVAL of
true -> EmitStats({syncing, I}),
Log("~p messages", [I]),
- erlang:now();
+ time_compat:monotonic_time();
false -> Last
end,
HandleInfo({syncing, I}),
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index ce563bf665..4f65cbfd7a 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -51,7 +51,6 @@
-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]).
-export([gb_trees_fold/3, gb_trees_foreach/2]).
-export([all_module_attributes/1, build_acyclic_graph/3]).
--export([now_ms/0]).
-export([const/1]).
-export([ntoa/1, ntoab/1]).
-export([is_process_alive/1]).
@@ -71,7 +70,6 @@
-export([get_parent/0]).
-export([store_proc_name/1, store_proc_name/2]).
-export([moving_average/4]).
--export([now_to_ms/1]).
-export([get_env/3]).
%% Horrible macro to use in guards
@@ -222,7 +220,6 @@
{'edge', ({bad_vertex, digraph:vertex()} |
{bad_edge, [digraph:vertex()]}),
digraph:vertex(), digraph:vertex()})).
--spec(now_ms/0 :: () -> non_neg_integer()).
-spec(const/1 :: (A) -> thunk(A)).
-spec(ntoa/1 :: (inet:ip_address()) -> string()).
-spec(ntoab/1 :: (inet:ip_address()) -> string()).
@@ -259,9 +256,6 @@
-spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok).
-spec(moving_average/4 :: (float(), float(), float(), float() | 'undefined')
-> float()).
--spec(now_to_ms/1 :: ({non_neg_integer(),
- non_neg_integer(),
- non_neg_integer()}) -> pos_integer()).
-spec(get_env/3 :: (atom(), atom(), term()) -> term()).
-endif.
@@ -804,9 +798,6 @@ gb_trees_fold1(Fun, Acc, {Key, Val, It}) ->
gb_trees_foreach(Fun, Tree) ->
gb_trees_fold(fun (Key, Val, Acc) -> Fun(Key, Val), Acc end, ok, Tree).
-now_ms() ->
- timer:now_diff(now(), {0,0,0}) div 1000.
-
module_attributes(Module) ->
case catch Module:module_info(attributes) of
{'EXIT', {undef, [{Module, module_info, _} | _]}} ->
@@ -1038,9 +1029,6 @@ term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse
V =:= true orelse V =:= false ->
V.
-now_to_ms({Mega, Sec, Micro}) ->
- (Mega * 1000000 * 1000000 + Sec * 1000000 + Micro) div 1000.
-
check_expiry(N) when N < 0 -> {error, {value_negative, N}};
check_expiry(_N) -> ok.
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index e3960c5c8a..43db5431e0 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -288,24 +288,28 @@ workaround_global_hang() ->
receive
global_sync_done ->
ok
- after 15000 ->
+ after 10000 ->
find_blocked_global_peers()
end.
find_blocked_global_peers() ->
+ Snapshot1 = snapshot_global_dict(),
+ timer:sleep(10000),
+ Snapshot2 = snapshot_global_dict(),
+ find_blocked_global_peers1(Snapshot2, Snapshot1).
+
+snapshot_global_dict() ->
{status, _, _, [Dict | _]} = sys:get_status(global_name_server),
- find_blocked_global_peers1(Dict).
+ [E || {{sync_tag_his, _}, _} = E <- Dict].
-find_blocked_global_peers1([{{sync_tag_his, Peer}, Timestamp} | Rest]) ->
- Diff = timer:now_diff(erlang:now(), Timestamp),
- if
- Diff >= 10000 -> unblock_global_peer(Peer);
- true -> ok
+find_blocked_global_peers1([{{sync_tag_his, Peer}, _} = Item | Rest],
+ OlderSnapshot) ->
+ case lists:member(Item, OlderSnapshot) of
+ true -> unblock_global_peer(Peer);
+ false -> ok
end,
- find_blocked_global_peers1(Rest);
-find_blocked_global_peers1([_ | Rest]) ->
- find_blocked_global_peers1(Rest);
-find_blocked_global_peers1([]) ->
+ find_blocked_global_peers1(Rest, OlderSnapshot);
+find_blocked_global_peers1([], _) ->
ok.
unblock_global_peer(PeerNode) ->
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index ae8481aaf8..34e447ccd3 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -99,7 +99,9 @@
%%----------------------------------------------------------------------------
new() -> #state{consumers = priority_queue:new(),
- use = {active, now_micros(), 1.0}}.
+ use = {active,
+ time_compat:monotonic_time(micro_seconds),
+ 1.0}}.
max_active_priority(#state{consumers = Consumers}) ->
priority_queue:highest(Consumers).
@@ -346,9 +348,9 @@ drain_mode(true) -> drain;
drain_mode(false) -> manual.
utilisation(#state{use = {active, Since, Avg}}) ->
- use_avg(now_micros() - Since, 0, Avg);
+ use_avg(time_compat:monotonic_time(micro_seconds) - Since, 0, Avg);
utilisation(#state{use = {inactive, Since, Active, Avg}}) ->
- use_avg(Active, now_micros() - Since, Avg).
+ use_avg(Active, time_compat:monotonic_time(micro_seconds) - Since, Avg).
%%----------------------------------------------------------------------------
@@ -455,14 +457,12 @@ update_use({inactive, _, _, _} = CUInfo, inactive) ->
update_use({active, _, _} = CUInfo, active) ->
CUInfo;
update_use({active, Since, Avg}, inactive) ->
- Now = now_micros(),
+ Now = time_compat:monotonic_time(micro_seconds),
{inactive, Now, Now - Since, Avg};
update_use({inactive, Since, Active, Avg}, active) ->
- Now = now_micros(),
+ Now = time_compat:monotonic_time(micro_seconds),
{active, Now, use_avg(Active, Now - Since, Avg)}.
use_avg(Active, Inactive, Avg) ->
Time = Inactive + Active,
rabbit_misc:moving_average(Time, ?USE_AVG_HALF_LIFE, Active / Time, Avg).
-
-now_micros() -> timer:now_diff(now(), {0,0,0}).
diff --git a/src/rabbit_queue_location_random.erl b/src/rabbit_queue_location_random.erl
index b708077cf8..6a3d1fea67 100644
--- a/src/rabbit_queue_location_random.erl
+++ b/src/rabbit_queue_location_random.erl
@@ -39,6 +39,6 @@ description() ->
queue_master_location(#amqqueue{}) ->
Cluster = rabbit_queue_master_location_misc:all_nodes(),
- RandomPos = erlang:phash(now(), length(Cluster)),
+ RandomPos = erlang:phash2(time_compat:monotonic_time(), length(Cluster)),
MasterNode = lists:nth(RandomPos, Cluster),
{ok, MasterNode}.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index eaa2c0d3bb..c1cfb10c67 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -358,7 +358,8 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
capabilities = [],
auth_mechanism = none,
auth_state = none,
- connected_at = rabbit_misc:now_to_ms(os:timestamp())},
+ connected_at = time_compat:os_system_time(
+ milli_seconds)},
callback = uninitialized_callback,
recv_len = 0,
pending_recv = false,
@@ -621,7 +622,8 @@ maybe_block(State = #v1{connection_state = blocking,
State1 = State#v1{connection_state = blocked,
throttle = update_last_blocked_by(
Throttle#throttle{
- last_blocked_at = os:timestamp()})},
+ last_blocked_at =
+ time_compat:monotonic_time()})},
case {blocked_by_alarm(State), blocked_by_alarm(State1)} of
{false, true} -> ok = send_blocked(State1);
{_, _} -> ok
@@ -1307,7 +1309,9 @@ i(state, #v1{connection_state = ConnectionState,
(credit_flow:blocked() %% throttled by flow now
orelse %% throttled by flow recently
(WasBlockedBy =:= flow andalso T =/= never andalso
- timer:now_diff(os:timestamp(), T) < 5000000)) of
+ time_compat:convert_time_unit(time_compat:monotonic_time() - T,
+ native,
+ micro_seconds) < 5000000)) of
true -> flow;
false -> ConnectionState
end;
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 213e600ef9..99ea33b973 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -792,7 +792,7 @@ update_rates(State = #vqstate{ in_counter = InCount,
ack_in = AckInRate,
ack_out = AckOutRate,
timestamp = TS }}) ->
- Now = erlang:now(),
+ Now = time_compat:monotonic_time(),
Rates = #rates { in = update_rate(Now, TS, InCount, InRate),
out = update_rate(Now, TS, OutCount, OutRate),
@@ -807,7 +807,8 @@ update_rates(State = #vqstate{ in_counter = InCount,
rates = Rates }.
update_rate(Now, TS, Count, Rate) ->
- Time = timer:now_diff(Now, TS) / ?MICROS_PER_SECOND,
+ Time = time_compat:convert_time_unit(Now - TS, native, micro_seconds) /
+ ?MICROS_PER_SECOND,
rabbit_misc:moving_average(Time, ?RATE_AVG_HALF_LIFE, Count / Time, Rate).
ram_duration(State) ->
@@ -1190,7 +1191,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
count = DeltaCount1,
end_seq_id = NextSeqId })
end,
- Now = now(),
+ Now = time_compat:monotonic_time(),
State = #vqstate {
q1 = ?QUEUE:new(),
q2 = ?QUEUE:new(),
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 7b9421eb3e..c8ffbb12ea 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -1492,7 +1492,7 @@ add_restart(State) ->
I = State#state.intensity,
P = State#state.period,
R = State#state.restarts,
- Now = erlang:now(),
+ Now = time_compat:monotonic_time(),
R1 = add_restart([Now|R], Now, P),
State1 = State#state{restarts = R1},
case length(R1) of
@@ -1513,26 +1513,13 @@ add_restart([], _, _) ->
[].
inPeriod(Time, Now, Period) ->
- case difference(Time, Now) of
+ case time_compat:convert_time_unit(Now - Time, native, seconds) of
T when T > Period ->
false;
_ ->
true
end.
-%%
-%% Time = {MegaSecs, Secs, MicroSecs} (NOTE: MicroSecs is ignored)
-%% Calculate the time elapsed in seconds between two timestamps.
-%% If MegaSecs is equal just subtract Secs.
-%% Else calculate the Mega difference and add the Secs difference,
-%% note that Secs difference can be negative, e.g.
-%% {827, 999999, 676} diff {828, 1, 653753} == > 2 secs.
-%%
-difference({TimeM, TimeS, _}, {CurM, CurS, _}) when CurM > TimeM ->
- ((CurM - TimeM) * 1000000) + (CurS - TimeS);
-difference({_, TimeS, _}, {_, CurS, _}) ->
- CurS - TimeS.
-
%%% ------------------------------------------------------
%%% Error and progress reporting.
%%% ------------------------------------------------------
diff --git a/src/time_compat.erl b/src/time_compat.erl
new file mode 100644
index 0000000000..b87c6cc550
--- /dev/null
+++ b/src/time_compat.erl
@@ -0,0 +1,305 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2014-2015. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+%%
+%% If your code need to be able to execute on ERTS versions both
+%% earlier and later than 7.0, the best approach is to use the new
+%% time API introduced in ERTS 7.0 and implement a fallback
+%% solution using the old primitives to be used on old ERTS
+%% versions. This way your code can automatically take advantage
+%% of the improvements in the API when available. This is an
+%% example of how to implement such an API, but it can be used
+%% as is if you want to. Just add (a preferrably renamed version of)
+%% this module to your project, and call the API via this module
+%% instead of calling the BIFs directly.
+%%
+
+-module(time_compat).
+
+%% We don't want warnings about the use of erlang:now/0 in
+%% this module.
+-compile(nowarn_deprecated_function).
+%%
+%% We don't use
+%% -compile({nowarn_deprecated_function, [{erlang, now, 0}]}).
+%% since this will produce warnings when compiled on systems
+%% where it has not yet been deprecated.
+%%
+
+-export([monotonic_time/0,
+ monotonic_time/1,
+ erlang_system_time/0,
+ erlang_system_time/1,
+ os_system_time/0,
+ os_system_time/1,
+ time_offset/0,
+ time_offset/1,
+ convert_time_unit/3,
+ timestamp/0,
+ unique_integer/0,
+ unique_integer/1,
+ monitor/2,
+ system_info/1,
+ system_flag/2]).
+
+monotonic_time() ->
+ try
+ erlang:monotonic_time()
+ catch
+ error:undef ->
+ %% Use Erlang system time as monotonic time
+ erlang_system_time_fallback()
+ end.
+
+monotonic_time(Unit) ->
+ try
+ erlang:monotonic_time(Unit)
+ catch
+ error:badarg ->
+ erlang:error(badarg, [Unit]);
+ error:undef ->
+ %% Use Erlang system time as monotonic time
+ STime = erlang_system_time_fallback(),
+ try
+ convert_time_unit_fallback(STime, native, Unit)
+ catch
+ error:bad_time_unit -> erlang:error(badarg, [Unit])
+ end
+ end.
+
+erlang_system_time() ->
+ try
+ erlang:system_time()
+ catch
+ error:undef ->
+ erlang_system_time_fallback()
+ end.
+
+erlang_system_time(Unit) ->
+ try
+ erlang:system_time(Unit)
+ catch
+ error:badarg ->
+ erlang:error(badarg, [Unit]);
+ error:undef ->
+ STime = erlang_system_time_fallback(),
+ try
+ convert_time_unit_fallback(STime, native, Unit)
+ catch
+ error:bad_time_unit -> erlang:error(badarg, [Unit])
+ end
+ end.
+
+os_system_time() ->
+ try
+ os:system_time()
+ catch
+ error:undef ->
+ os_system_time_fallback()
+ end.
+
+os_system_time(Unit) ->
+ try
+ os:system_time(Unit)
+ catch
+ error:badarg ->
+ erlang:error(badarg, [Unit]);
+ error:undef ->
+ STime = os_system_time_fallback(),
+ try
+ convert_time_unit_fallback(STime, native, Unit)
+ catch
+ error:bad_time_unit -> erlang:error(badarg, [Unit])
+ end
+ end.
+
+time_offset() ->
+ try
+ erlang:time_offset()
+ catch
+ error:undef ->
+ %% Erlang system time and Erlang monotonic
+ %% time are always aligned
+ 0
+ end.
+
+time_offset(Unit) ->
+ try
+ erlang:time_offset(Unit)
+ catch
+ error:badarg ->
+ erlang:error(badarg, [Unit]);
+ error:undef ->
+ try
+ _ = integer_time_unit(Unit)
+ catch
+ error:bad_time_unit -> erlang:error(badarg, [Unit])
+ end,
+ %% Erlang system time and Erlang monotonic
+ %% time are always aligned
+ 0
+ end.
+
+convert_time_unit(Time, FromUnit, ToUnit) ->
+ try
+ erlang:convert_time_unit(Time, FromUnit, ToUnit)
+ catch
+ error:undef ->
+ try
+ convert_time_unit_fallback(Time, FromUnit, ToUnit)
+ catch
+ _:_ ->
+ erlang:error(badarg, [Time, FromUnit, ToUnit])
+ end;
+ error:Error ->
+ erlang:error(Error, [Time, FromUnit, ToUnit])
+ end.
+
+timestamp() ->
+ try
+ erlang:timestamp()
+ catch
+ error:undef ->
+ erlang:now()
+ end.
+
+unique_integer() ->
+ try
+ erlang:unique_integer()
+ catch
+ error:undef ->
+ {MS, S, US} = erlang:now(),
+ (MS*1000000+S)*1000000+US
+ end.
+
+unique_integer(Modifiers) ->
+ try
+ erlang:unique_integer(Modifiers)
+ catch
+ error:badarg ->
+ erlang:error(badarg, [Modifiers]);
+ error:undef ->
+ case is_valid_modifier_list(Modifiers) of
+ true ->
+ %% now() converted to an integer
+ %% fullfill the requirements of
+ %% all modifiers: unique, positive,
+ %% and monotonic...
+ {MS, S, US} = erlang:now(),
+ (MS*1000000+S)*1000000+US;
+ false ->
+ erlang:error(badarg, [Modifiers])
+ end
+ end.
+
+monitor(Type, Item) ->
+ try
+ erlang:monitor(Type, Item)
+ catch
+ error:Error ->
+ case {Error, Type, Item} of
+ {badarg, time_offset, clock_service} ->
+ %% Time offset is final and will never change.
+ %% Return a dummy reference, there will never
+ %% be any need for 'CHANGE' messages...
+ make_ref();
+ _ ->
+ erlang:error(Error, [Type, Item])
+ end
+ end.
+
+system_info(Item) ->
+ try
+ erlang:system_info(Item)
+ catch
+ error:badarg ->
+ case Item of
+ time_correction ->
+ case erlang:system_info(tolerant_timeofday) of
+ enabled -> true;
+ disabled -> false
+ end;
+ time_warp_mode ->
+ no_time_warp;
+ time_offset ->
+ final;
+ NotSupArg when NotSupArg == os_monotonic_time_source;
+ NotSupArg == os_system_time_source;
+ NotSupArg == start_time;
+ NotSupArg == end_time ->
+ %% Cannot emulate this...
+ erlang:error(notsup, [NotSupArg]);
+ _ ->
+ erlang:error(badarg, [Item])
+ end;
+ error:Error ->
+ erlang:error(Error, [Item])
+ end.
+
+system_flag(Flag, Value) ->
+ try
+ erlang:system_flag(Flag, Value)
+ catch
+ error:Error ->
+ case {Error, Flag, Value} of
+ {badarg, time_offset, finalize} ->
+ %% Time offset is final
+ final;
+ _ ->
+ erlang:error(Error, [Flag, Value])
+ end
+ end.
+
+%%
+%% Internal functions
+%%
+
+integer_time_unit(native) -> 1000*1000;
+integer_time_unit(nano_seconds) -> 1000*1000*1000;
+integer_time_unit(micro_seconds) -> 1000*1000;
+integer_time_unit(milli_seconds) -> 1000;
+integer_time_unit(seconds) -> 1;
+integer_time_unit(I) when is_integer(I), I > 0 -> I;
+integer_time_unit(BadRes) -> erlang:error(bad_time_unit, [BadRes]).
+
+erlang_system_time_fallback() ->
+ {MS, S, US} = erlang:now(),
+ (MS*1000000+S)*1000000+US.
+
+os_system_time_fallback() ->
+ {MS, S, US} = os:timestamp(),
+ (MS*1000000+S)*1000000+US.
+
+convert_time_unit_fallback(Time, FromUnit, ToUnit) ->
+ FU = integer_time_unit(FromUnit),
+ TU = integer_time_unit(ToUnit),
+ case Time < 0 of
+ true -> TU*Time - (FU - 1);
+ false -> TU*Time
+ end div FU.
+
+is_valid_modifier_list([positive|Ms]) ->
+ is_valid_modifier_list(Ms);
+is_valid_modifier_list([monotonic|Ms]) ->
+ is_valid_modifier_list(Ms);
+is_valid_modifier_list([]) ->
+ true;
+is_valid_modifier_list(_) ->
+ false.
diff --git a/test/src/gm_soak_test.erl b/test/src/gm_soak_test.erl
index 32476b56bc..64baa035b7 100644
--- a/test/src/gm_soak_test.erl
+++ b/test/src/gm_soak_test.erl
@@ -35,9 +35,11 @@ with_state(Fun) ->
inc() ->
case 1 + get(count) of
- 100000 -> Now = now(),
+ 100000 -> Now = time_compat:monotonic_time(),
Start = put(ts, Now),
- Diff = timer:now_diff(Now, Start),
+ Diff = time_compat:convert_time_unit(Now - Start,
+ native,
+ micro_seconds),
Rate = 100000 / (Diff / 1000000),
io:format("~p seeing ~p msgs/sec~n", [self(), Rate]),
put(count, 0);
@@ -48,7 +50,7 @@ joined([], Members) ->
io:format("Joined ~p (~p members)~n", [self(), length(Members)]),
put(state, dict:from_list([{Member, empty} || Member <- Members])),
put(count, 0),
- put(ts, now()),
+ put(ts, time_compat:monotonic_time()),
ok.
members_changed([], Births, Deaths) ->
@@ -101,8 +103,9 @@ handle_terminate([], Reason) ->
spawn_member() ->
spawn_link(
fun () ->
- {MegaSecs, Secs, MicroSecs} = now(),
- random:seed(MegaSecs, Secs, MicroSecs),
+ random:seed(erlang:phash2([node()]),
+ time_compat:monotonic_time(),
+ time_compat:unique_integer()),
%% start up delay of no more than 10 seconds
timer:sleep(random:uniform(10000)),
{ok, Pid} = gm:start_link(
diff --git a/test/src/gm_speed_test.erl b/test/src/gm_speed_test.erl
index f11e7d487b..b0693fc136 100644
--- a/test/src/gm_speed_test.erl
+++ b/test/src/gm_speed_test.erl
@@ -49,12 +49,14 @@ wile_e_coyote(Time, WriteUnit) ->
receive joined -> ok end,
timer:sleep(1000), %% wait for all to join
timer:send_after(Time, stop),
- Start = now(),
+ Start = time_compat:monotonic_time(),
{Sent, Received} = loop(Pid, WriteUnit, 0, 0),
- End = now(),
+ End = time_compat:monotonic_time(),
ok = gm:leave(Pid),
receive terminated -> ok end,
- Elapsed = timer:now_diff(End, Start) / 1000000,
+ Elapsed = time_compat:convert_time_unit(End - Start,
+ native,
+ micro_seconds) / 1000000,
io:format("Sending rate: ~p msgs/sec~nReceiving rate: ~p msgs/sec~n~n",
[Sent/Elapsed, Received/Elapsed]),
ok.