summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2015-07-30 17:33:05 +0200
committerJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2015-08-05 16:16:38 +0200
commitd718d4c5af58418cad3b6e8d7747efef66d5f801 (patch)
treebd848d444fe64c46d7c4a9cd6d931a135fd9c791
parent40b5c82b4b0feeab28fa85c2f1855590b722fc08 (diff)
downloadrabbitmq-server-git-d718d4c5af58418cad3b6e8d7747efef66d5f801.tar.gz
Use monotonic_time() to compute elapsed time
... not `erlang:now()` (deprecated) or `os:timestamp()` (which could go backward). file_handle_cache: While here, change the LRU tree key from `erlang:now()` to a combination of the monotonic time and the handle reference (which was the value in the tree previously). This is required as the monotonic time doesn't increase strictly: two subsequent calls could return the same time, leading to non-unique key. We could use unique_integer([monotonic]) but it's a bit heavy weight just to keep a rough order of opened files. So now, files sharing the same age are arbitrary "sorted" by their reference. References #233.
-rw-r--r--src/credit_flow.erl7
-rw-r--r--src/file_handle_cache.erl38
-rw-r--r--src/file_handle_cache_stats.erl7
-rw-r--r--src/gen_server2.erl10
-rw-r--r--src/rabbit.erl6
-rw-r--r--src/rabbit_mirror_queue_sync.erl8
-rw-r--r--src/rabbit_queue_consumers.erl14
-rw-r--r--src/rabbit_reader.erl7
-rw-r--r--src/rabbit_variable_queue.erl7
-rw-r--r--src/supervisor2.erl17
10 files changed, 66 insertions, 55 deletions
diff --git a/src/credit_flow.erl b/src/credit_flow.erl
index edf9805c53..afd97ce96c 100644
--- a/src/credit_flow.erl
+++ b/src/credit_flow.erl
@@ -150,7 +150,10 @@ state() -> case blocked() of
true -> flow;
false -> case get(credit_blocked_at) of
undefined -> running;
- B -> Diff = timer:now_diff(os:timestamp(), B),
+ B -> Now = time_compat:monotonic_time(),
+ Diff = time_compat:convert_time_unit(Now - B,
+ native,
+ micro_seconds),
case Diff < ?STATE_CHANGE_INTERVAL of
true -> flow;
false -> running
@@ -179,7 +182,7 @@ grant(To, Quantity) ->
block(From) ->
?TRACE_BLOCKED(self(), From),
case blocked() of
- false -> put(credit_blocked_at, os:timestamp());
+ false -> put(credit_blocked_at, time_compat:monotonic_time());
true -> ok
end,
?UPDATE(credit_blocked, [], Blocks, [From | Blocks]).
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index d7e5abc873..1e97632cf9 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -175,6 +175,7 @@
-record(handle,
{ hdl,
+ ref,
offset,
is_dirty,
write_buffer_size,
@@ -536,12 +537,15 @@ clear(Ref) ->
end).
set_maximum_since_use(MaximumAge) ->
- Now = now(),
+ Now = time_compat:monotonic_time(),
case lists:foldl(
fun ({{Ref, fhc_handle},
Handle = #handle { hdl = Hdl, last_used_at = Then }}, Rep) ->
case Hdl =/= closed andalso
- timer:now_diff(Now, Then) >= MaximumAge of
+ time_compat:convert_time_unit(Now - Then,
+ native,
+ micro_seconds)
+ >= MaximumAge of
true -> soft_close(Ref, Handle) orelse Rep;
false -> Rep
end;
@@ -708,7 +712,8 @@ get_or_reopen(RefNewOrReopens) ->
{OpenHdls, []} ->
{ok, [Handle || {_Ref, Handle} <- OpenHdls]};
{OpenHdls, ClosedHdls} ->
- Oldest = oldest(get_age_tree(), fun () -> now() end),
+ Oldest = oldest(get_age_tree(),
+ fun () -> time_compat:monotonic_time() end),
case gen_server2:call(?SERVER, {open, self(), length(ClosedHdls),
Oldest}, infinity) of
ok ->
@@ -744,14 +749,14 @@ reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed,
end,
case prim_file:open(Path, Mode) of
{ok, Hdl} ->
- Now = now(),
+ Now = time_compat:monotonic_time(),
{{ok, _Offset}, Handle1} =
maybe_seek(Offset, reset_read_buffer(
Handle#handle{hdl = Hdl,
offset = 0,
last_used_at = Now})),
put({Ref, fhc_handle}, Handle1),
- reopen(RefNewOrReopenHdls, gb_trees:insert(Now, Ref, Tree),
+ reopen(RefNewOrReopenHdls, gb_trees:insert({Now, Ref}, true, Tree),
[{Ref, Handle1} | RefHdls]);
Error ->
%% NB: none of the handles in ToOpen are in the age tree
@@ -780,7 +785,7 @@ sort_handles([{Ref, _} | RefHdls], RefHdlsA, [{Ref, Handle} | RefHdlsB], Acc) ->
sort_handles(RefHdls, RefHdlsA, RefHdlsB, [Handle | Acc]).
put_handle(Ref, Handle = #handle { last_used_at = Then }) ->
- Now = now(),
+ Now = time_compat:monotonic_time(),
age_tree_update(Then, Now, Ref),
put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }).
@@ -797,13 +802,14 @@ put_age_tree(Tree) -> put(fhc_age_tree, Tree).
age_tree_update(Then, Now, Ref) ->
with_age_tree(
fun (Tree) ->
- gb_trees:insert(Now, Ref, gb_trees:delete_any(Then, Tree))
+ gb_trees:insert({Now, Ref}, true,
+ gb_trees:delete_any({Then, Ref}, Tree))
end).
-age_tree_delete(Then) ->
+age_tree_delete(Then, Ref) ->
with_age_tree(
fun (Tree) ->
- Tree1 = gb_trees:delete_any(Then, Tree),
+ Tree1 = gb_trees:delete_any({Then, Ref}, Tree),
Oldest = oldest(Tree1, fun () -> undefined end),
gen_server2:cast(?SERVER, {close, self(), Oldest}),
Tree1
@@ -814,7 +820,7 @@ age_tree_change() ->
fun (Tree) ->
case gb_trees:is_empty(Tree) of
true -> Tree;
- false -> {Oldest, _Ref} = gb_trees:smallest(Tree),
+ false -> {{Oldest, _Ref}, _} = gb_trees:smallest(Tree),
gen_server2:cast(?SERVER, {update, self(), Oldest})
end,
Tree
@@ -823,7 +829,7 @@ age_tree_change() ->
oldest(Tree, DefaultFun) ->
case gb_trees:is_empty(Tree) of
true -> DefaultFun();
- false -> {Oldest, _Ref} = gb_trees:smallest(Tree),
+ false -> {{Oldest, _Ref}, _} = gb_trees:smallest(Tree),
Oldest
end.
@@ -849,6 +855,7 @@ new_closed_handle(Path, Mode, Options) ->
end,
Ref = make_ref(),
put({Ref, fhc_handle}, #handle { hdl = closed,
+ ref = Ref,
offset = 0,
is_dirty = false,
write_buffer_size = 0,
@@ -883,6 +890,7 @@ soft_close(Handle = #handle { hdl = closed }) ->
soft_close(Handle) ->
case write_buffer(Handle) of
{ok, #handle { hdl = Hdl,
+ ref = Ref,
is_dirty = IsDirty,
last_used_at = Then } = Handle1 } ->
ok = case IsDirty of
@@ -890,7 +898,7 @@ soft_close(Handle) ->
false -> ok
end,
ok = prim_file:close(Hdl),
- age_tree_delete(Then),
+ age_tree_delete(Then, Ref),
{ok, Handle1 #handle { hdl = closed,
is_dirty = false,
last_used_at = undefined }};
@@ -1419,17 +1427,19 @@ reduce(State = #fhc_state { open_pending = OpenPending,
elders = Elders,
clients = Clients,
timer_ref = TRef }) ->
- Now = now(),
+ Now = time_compat:monotonic_time(),
{CStates, Sum, ClientCount} =
ets:foldl(fun ({Pid, Eldest}, {CStatesAcc, SumAcc, CountAcc} = Accs) ->
[#cstate { pending_closes = PendingCloses,
opened = Opened,
blocked = Blocked } = CState] =
ets:lookup(Clients, Pid),
+ TimeDiff = time_compat:convert_time_unit(
+ Now - Eldest, native, micro_seconds),
case Blocked orelse PendingCloses =:= Opened of
true -> Accs;
false -> {[CState | CStatesAcc],
- SumAcc + timer:now_diff(Now, Eldest),
+ SumAcc + TimeDiff,
CountAcc + 1}
end
end, {[], 0, 0}, Elders),
diff --git a/src/file_handle_cache_stats.erl b/src/file_handle_cache_stats.erl
index 5f6926b5d2..5dcf19c5f1 100644
--- a/src/file_handle_cache_stats.erl
+++ b/src/file_handle_cache_stats.erl
@@ -61,7 +61,8 @@ get() ->
%% TODO timer:tc/1 was introduced in R14B03; use that function once we
%% require that version.
timer_tc(Thunk) ->
- T1 = os:timestamp(),
+ T1 = time_compat:monotonic_time(),
Res = Thunk(),
- T2 = os:timestamp(),
- {timer:now_diff(T2, T1), Res}.
+ T2 = time_compat:monotonic_time(),
+ Diff = time_compat:convert_time_unit(T2 - T1, native, micro_seconds),
+ {Diff, Res}.
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 3f09b8d3ef..56b2720cac 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -691,7 +691,9 @@ wake_hib(GS2State = #gs2_state { timeout_state = TS }) ->
undefined ->
undefined;
{SleptAt, TimeoutState} ->
- adjust_timeout_state(SleptAt, now(), TimeoutState)
+ adjust_timeout_state(SleptAt,
+ time_compat:monotonic_time(),
+ TimeoutState)
end,
post_hibernate(
drain(GS2State #gs2_state { timeout_state = TimeoutState1 })).
@@ -699,7 +701,8 @@ wake_hib(GS2State = #gs2_state { timeout_state = TS }) ->
hibernate(GS2State = #gs2_state { timeout_state = TimeoutState }) ->
TS = case TimeoutState of
undefined -> undefined;
- {backoff, _, _, _, _} -> {now(), TimeoutState}
+ {backoff, _, _, _, _} -> {time_compat:monotonic_time(),
+ TimeoutState}
end,
proc_lib:hibernate(?MODULE, wake_hib,
[GS2State #gs2_state { timeout_state = TS }]).
@@ -744,7 +747,8 @@ post_hibernate(GS2State = #gs2_state { state = State,
adjust_timeout_state(SleptAt, AwokeAt, {backoff, CurrentTO, MinimumTO,
DesiredHibPeriod, RandomState}) ->
- NapLengthMicros = timer:now_diff(AwokeAt, SleptAt),
+ NapLengthMicros = time_compat:convert_time_unit(AwokeAt - SleptAt,
+ native, micro_seconds),
CurrentMicros = CurrentTO * 1000,
MinimumMicros = MinimumTO * 1000,
DesiredHibMicros = DesiredHibPeriod * 1000,
diff --git a/src/rabbit.erl b/src/rabbit.erl
index d218a07a98..5152c11eeb 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -277,7 +277,7 @@ hipe_compile() ->
Count = length(HipeModules),
io:format("~nHiPE compiling: |~s|~n |",
[string:copies("-", Count)]),
- T1 = erlang:now(),
+ T1 = time_compat:monotonic_time(),
PidMRefs = [spawn_monitor(fun () -> [begin
{ok, M} = hipe:c(M, [o3]),
io:format("#")
@@ -288,8 +288,8 @@ hipe_compile() ->
{'DOWN', MRef, process, _, normal} -> ok;
{'DOWN', MRef, process, _, Reason} -> exit(Reason)
end || {_Pid, MRef} <- PidMRefs],
- T2 = erlang:now(),
- Duration = timer:now_diff(T2, T1) div 1000000,
+ T2 = time_compat:monotonic_time(),
+ Duration = time_compat:convert_time_unit(T2 - T1, native, seconds),
io:format("|~n~nCompiled ~B modules in ~Bs~n", [Count, Duration]),
{ok, Count, Duration}.
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index 9a8d55f94b..b76422ee6b 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -100,7 +100,7 @@ master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) ->
master_go0(Args, BQ, BQS) ->
case BQ:fold(fun (Msg, MsgProps, Unacked, Acc) ->
master_send(Msg, MsgProps, Unacked, Args, Acc)
- end, {0, erlang:now()}, BQS) of
+ end, {0, time_compat:monotonic_time()}, BQS) of
{{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1};
{{sync_died, Reason}, BQS1} -> {sync_died, Reason, BQS1};
{_, BQS1} -> master_done(Args, BQS1)
@@ -108,10 +108,12 @@ master_go0(Args, BQ, BQS) ->
master_send(Msg, MsgProps, Unacked,
{Syncer, Ref, Log, HandleInfo, EmitStats, Parent}, {I, Last}) ->
- T = case timer:now_diff(erlang:now(), Last) > ?SYNC_PROGRESS_INTERVAL of
+ Interval = time_compat:convert_time_unit(
+ time_compat:monotonic_time() - Last, native, micro_seconds),
+ T = case Interval > ?SYNC_PROGRESS_INTERVAL of
true -> EmitStats({syncing, I}),
Log("~p messages", [I]),
- erlang:now();
+ time_compat:monotonic_time();
false -> Last
end,
HandleInfo({syncing, I}),
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index ae8481aaf8..34e447ccd3 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -99,7 +99,9 @@
%%----------------------------------------------------------------------------
new() -> #state{consumers = priority_queue:new(),
- use = {active, now_micros(), 1.0}}.
+ use = {active,
+ time_compat:monotonic_time(micro_seconds),
+ 1.0}}.
max_active_priority(#state{consumers = Consumers}) ->
priority_queue:highest(Consumers).
@@ -346,9 +348,9 @@ drain_mode(true) -> drain;
drain_mode(false) -> manual.
utilisation(#state{use = {active, Since, Avg}}) ->
- use_avg(now_micros() - Since, 0, Avg);
+ use_avg(time_compat:monotonic_time(micro_seconds) - Since, 0, Avg);
utilisation(#state{use = {inactive, Since, Active, Avg}}) ->
- use_avg(Active, now_micros() - Since, Avg).
+ use_avg(Active, time_compat:monotonic_time(micro_seconds) - Since, Avg).
%%----------------------------------------------------------------------------
@@ -455,14 +457,12 @@ update_use({inactive, _, _, _} = CUInfo, inactive) ->
update_use({active, _, _} = CUInfo, active) ->
CUInfo;
update_use({active, Since, Avg}, inactive) ->
- Now = now_micros(),
+ Now = time_compat:monotonic_time(micro_seconds),
{inactive, Now, Now - Since, Avg};
update_use({inactive, Since, Active, Avg}, active) ->
- Now = now_micros(),
+ Now = time_compat:monotonic_time(micro_seconds),
{active, Now, use_avg(Active, Now - Since, Avg)}.
use_avg(Active, Inactive, Avg) ->
Time = Inactive + Active,
rabbit_misc:moving_average(Time, ?USE_AVG_HALF_LIFE, Active / Time, Avg).
-
-now_micros() -> timer:now_diff(now(), {0,0,0}).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index eaa2c0d3bb..4707d47b9c 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -621,7 +621,8 @@ maybe_block(State = #v1{connection_state = blocking,
State1 = State#v1{connection_state = blocked,
throttle = update_last_blocked_by(
Throttle#throttle{
- last_blocked_at = os:timestamp()})},
+ last_blocked_at =
+ time_compat:monotonic_time()})},
case {blocked_by_alarm(State), blocked_by_alarm(State1)} of
{false, true} -> ok = send_blocked(State1);
{_, _} -> ok
@@ -1307,7 +1308,9 @@ i(state, #v1{connection_state = ConnectionState,
(credit_flow:blocked() %% throttled by flow now
orelse %% throttled by flow recently
(WasBlockedBy =:= flow andalso T =/= never andalso
- timer:now_diff(os:timestamp(), T) < 5000000)) of
+ time_compat:convert_time_unit(time_compat:monotonic_time() - T,
+ native,
+ micro_seconds) < 5000000)) of
true -> flow;
false -> ConnectionState
end;
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 213e600ef9..99ea33b973 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -792,7 +792,7 @@ update_rates(State = #vqstate{ in_counter = InCount,
ack_in = AckInRate,
ack_out = AckOutRate,
timestamp = TS }}) ->
- Now = erlang:now(),
+ Now = time_compat:monotonic_time(),
Rates = #rates { in = update_rate(Now, TS, InCount, InRate),
out = update_rate(Now, TS, OutCount, OutRate),
@@ -807,7 +807,8 @@ update_rates(State = #vqstate{ in_counter = InCount,
rates = Rates }.
update_rate(Now, TS, Count, Rate) ->
- Time = timer:now_diff(Now, TS) / ?MICROS_PER_SECOND,
+ Time = time_compat:convert_time_unit(Now - TS, native, micro_seconds) /
+ ?MICROS_PER_SECOND,
rabbit_misc:moving_average(Time, ?RATE_AVG_HALF_LIFE, Count / Time, Rate).
ram_duration(State) ->
@@ -1190,7 +1191,7 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
count = DeltaCount1,
end_seq_id = NextSeqId })
end,
- Now = now(),
+ Now = time_compat:monotonic_time(),
State = #vqstate {
q1 = ?QUEUE:new(),
q2 = ?QUEUE:new(),
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 7b9421eb3e..c8ffbb12ea 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -1492,7 +1492,7 @@ add_restart(State) ->
I = State#state.intensity,
P = State#state.period,
R = State#state.restarts,
- Now = erlang:now(),
+ Now = time_compat:monotonic_time(),
R1 = add_restart([Now|R], Now, P),
State1 = State#state{restarts = R1},
case length(R1) of
@@ -1513,26 +1513,13 @@ add_restart([], _, _) ->
[].
inPeriod(Time, Now, Period) ->
- case difference(Time, Now) of
+ case time_compat:convert_time_unit(Now - Time, native, seconds) of
T when T > Period ->
false;
_ ->
true
end.
-%%
-%% Time = {MegaSecs, Secs, MicroSecs} (NOTE: MicroSecs is ignored)
-%% Calculate the time elapsed in seconds between two timestamps.
-%% If MegaSecs is equal just subtract Secs.
-%% Else calculate the Mega difference and add the Secs difference,
-%% note that Secs difference can be negative, e.g.
-%% {827, 999999, 676} diff {828, 1, 653753} == > 2 secs.
-%%
-difference({TimeM, TimeS, _}, {CurM, CurS, _}) when CurM > TimeM ->
- ((CurM - TimeM) * 1000000) + (CurS - TimeS);
-difference({_, TimeS, _}, {_, CurS, _}) ->
- CurS - TimeS.
-
%%% ------------------------------------------------------
%%% Error and progress reporting.
%%% ------------------------------------------------------