summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit.hrl3
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_misc.erl25
-rw-r--r--src/rabbit_policies.erl4
4 files changed, 25 insertions, 11 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 44f0931eef..3629dd439f 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -105,9 +105,6 @@
-define(DESIRED_HIBERNATE, 10000).
-define(CREDIT_DISC_BOUND, {2000, 500}).
-%% This is dictated by `erlang:send_after' on which we depend to implement TTL.
--define(MAX_EXPIRY_TIMER, 4294967295).
-
-define(INVALID_HEADERS_KEY, <<"x-invalid-headers">>).
-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
-define(DELETED_HEADER, <<"BCC">>).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 9b785303a4..753d8e15c2 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -385,12 +385,12 @@ ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined,
V when V > 0 -> V + 999; %% always fire later
_ -> 0
end) div 1000,
- TRef = erlang:send_after(After, self(), {drop_expired, Version}),
+ TRef = rabbit_misc:send_after(After, self(), {drop_expired, Version}),
State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry};
ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef,
ttl_timer_expiry = TExpiry})
when Expiry + 1000 < TExpiry ->
- case erlang:cancel_timer(TRef) of
+ case rabbit_misc:cancel_timer(TRef) of
false -> State;
_ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined})
end;
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 58e93a3f9e..6f10b43bef 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -67,7 +67,7 @@
-export([check_expiry/1]).
-export([base64url/1]).
-export([interval_operation/4]).
--export([ensure_timer/4, stop_timer/2]).
+-export([ensure_timer/4, stop_timer/2, send_after/3]).
-export([get_parent/0]).
-export([store_proc_name/1, store_proc_name/2]).
-export([moving_average/4]).
@@ -94,6 +94,7 @@
fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])).
-type(graph_edge_fun() ::
fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])).
+-type(tref() :: {'erlang', reference()} | {timer, timer:tref()}).
-spec(method_record_type/1 :: (rabbit_framing:amqp_method_record())
-> rabbit_framing:amqp_method_name()).
@@ -245,6 +246,8 @@
-> {any(), non_neg_integer()}).
-spec(ensure_timer/4 :: (A, non_neg_integer(), non_neg_integer(), any()) -> A).
-spec(stop_timer/2 :: (A, non_neg_integer()) -> A).
+-spec(send_after/3 :: (non_neg_integer(), pid(), any()) -> tref()).
+-spec(cancel_timer/1 :: (tref()) -> 'ok').
-spec(get_parent/0 :: () -> pid()).
-spec(store_proc_name/2 :: (atom(), rabbit_types:proc_name()) -> ok).
-spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok).
@@ -1012,7 +1015,6 @@ term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse
V =:= true orelse V =:= false ->
V.
-check_expiry(N) when N > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, N}};
check_expiry(N) when N < 0 -> {error, {value_negative, N}};
check_expiry(_N) -> ok.
@@ -1040,7 +1042,7 @@ interval_operation({M, F, A}, MaxRatio, IdealInterval, LastInterval) ->
ensure_timer(State, Idx, After, Msg) ->
case element(Idx, State) of
- undefined -> TRef = erlang:send_after(After, self(), Msg),
+ undefined -> TRef = send_after(After, self(), Msg),
setelement(Idx, State, TRef);
_ -> State
end.
@@ -1048,12 +1050,27 @@ ensure_timer(State, Idx, After, Msg) ->
stop_timer(State, Idx) ->
case element(Idx, State) of
undefined -> State;
- TRef -> case erlang:cancel_timer(TRef) of
+ TRef -> case cancel_timer(TRef) of %% TODO bug 25393 comment 3
false -> State;
_ -> setelement(Idx, State, undefined)
end
end.
+%% timer:send_after/3 goes through a single timer process but allows
+%% long delays. erlang:send_after/3 does not have a bottleneck but
+%% only allows max 2^32-1 millis.
+-define(MAX_ERLANG_SEND_AFTER, 4294967295).
+send_after(Millis, Pid, Msg) when Millis > ?MAX_ERLANG_SEND_AFTER ->
+ {ok, Ref} = timer:send_after(Millis, Pid, Msg),
+ {timer, Ref};
+send_after(Millis, Pid, Msg) ->
+ {erlang, erlang:send_after(Millis, Pid, Msg)}.
+
+cancel_timer({erlang, Ref}) -> erlang:cancel_timer(Ref),
+ ok;
+cancel_timer({timer, Ref}) -> {ok, cancel} = timer:cancel(Ref),
+ ok.
+
store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}).
store_proc_name(TypeProcName) -> put(process_name, TypeProcName).
diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl
index fe2b766f30..3558cf98df 100644
--- a/src/rabbit_policies.erl
+++ b/src/rabbit_policies.erl
@@ -61,13 +61,13 @@ validate_policy0(<<"dead-letter-routing-key">>, Value) ->
{error, "~p is not a valid dead letter routing key", [Value]};
validate_policy0(<<"message-ttl">>, Value)
- when is_integer(Value), Value >= 0, Value =< ?MAX_EXPIRY_TIMER ->
+ when is_integer(Value), Value >= 0 ->
ok;
validate_policy0(<<"message-ttl">>, Value) ->
{error, "~p is not a valid message TTL", [Value]};
validate_policy0(<<"expires">>, Value)
- when is_integer(Value), Value >= 1, Value =< ?MAX_EXPIRY_TIMER ->
+ when is_integer(Value), Value >= 1 ->
ok;
validate_policy0(<<"expires">>, Value) ->
{error, "~p is not a valid queue expiry", [Value]};