diff options
| -rw-r--r-- | include/rabbit.hrl | 3 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_policies.erl | 4 |
4 files changed, 26 insertions, 14 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 5ac3197ed9..ee1da6b2ea 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -105,9 +105,6 @@ -define(DESIRED_HIBERNATE, 10000). -define(CREDIT_DISC_BOUND, {2000, 500}). -%% This is dictated by `erlang:send_after' on which we depend to implement TTL. --define(MAX_EXPIRY_TIMER, 4294967295). - -define(INVALID_HEADERS_KEY, <<"x-invalid-headers">>). -define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]). -define(DELETED_HEADER, <<"BCC">>). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 9b785303a4..753d8e15c2 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -385,12 +385,12 @@ ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = undefined, V when V > 0 -> V + 999; %% always fire later _ -> 0 end) div 1000, - TRef = erlang:send_after(After, self(), {drop_expired, Version}), + TRef = rabbit_misc:send_after(After, self(), {drop_expired, Version}), State#q{ttl_timer_ref = TRef, ttl_timer_expiry = Expiry}; ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef, ttl_timer_expiry = TExpiry}) when Expiry + 1000 < TExpiry -> - case erlang:cancel_timer(TRef) of + case rabbit_misc:cancel_timer(TRef) of false -> State; _ -> ensure_ttl_timer(Expiry, State#q{ttl_timer_ref = undefined}) end; diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 18c07f86f1..cff16040b4 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -67,7 +67,7 @@ -export([check_expiry/1]). -export([base64url/1]). -export([interval_operation/4]). --export([ensure_timer/4, stop_timer/2]). +-export([ensure_timer/4, stop_timer/2, send_after/3, cancel_timer/1]). -export([get_parent/0]). -export([store_proc_name/1, store_proc_name/2]). -export([moving_average/4]). @@ -94,6 +94,7 @@ fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])). -type(graph_edge_fun() :: fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])). +-type(tref() :: {'erlang', reference()} | {timer, timer:tref()}). -spec(method_record_type/1 :: (rabbit_framing:amqp_method_record()) -> rabbit_framing:amqp_method_name()). @@ -245,6 +246,8 @@ -> {any(), non_neg_integer()}). -spec(ensure_timer/4 :: (A, non_neg_integer(), non_neg_integer(), any()) -> A). -spec(stop_timer/2 :: (A, non_neg_integer()) -> A). +-spec(send_after/3 :: (non_neg_integer(), pid(), any()) -> tref()). +-spec(cancel_timer/1 :: (tref()) -> 'ok'). -spec(get_parent/0 :: () -> pid()). -spec(store_proc_name/2 :: (atom(), rabbit_types:proc_name()) -> ok). -spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok). @@ -1012,7 +1015,6 @@ term_to_json(V) when is_binary(V) orelse is_number(V) orelse V =:= null orelse V =:= true orelse V =:= false -> V. -check_expiry(N) when N > ?MAX_EXPIRY_TIMER -> {error, {value_too_big, N}}; check_expiry(N) when N < 0 -> {error, {value_negative, N}}; check_expiry(_N) -> ok. @@ -1040,7 +1042,7 @@ interval_operation({M, F, A}, MaxRatio, IdealInterval, LastInterval) -> ensure_timer(State, Idx, After, Msg) -> case element(Idx, State) of - undefined -> TRef = erlang:send_after(After, self(), Msg), + undefined -> TRef = send_after(After, self(), Msg), setelement(Idx, State, TRef); _ -> State end. @@ -1048,12 +1050,25 @@ ensure_timer(State, Idx, After, Msg) -> stop_timer(State, Idx) -> case element(Idx, State) of undefined -> State; - TRef -> case erlang:cancel_timer(TRef) of - false -> State; - _ -> setelement(Idx, State, undefined) - end + TRef -> cancel_timer(TRef), + setelement(Idx, State, undefined) end. +%% timer:send_after/3 goes through a single timer process but allows +%% long delays. erlang:send_after/3 does not have a bottleneck but +%% only allows max 2^32-1 millis. +-define(MAX_ERLANG_SEND_AFTER, 4294967295). +send_after(Millis, Pid, Msg) when Millis > ?MAX_ERLANG_SEND_AFTER -> + {ok, Ref} = timer:send_after(Millis, Pid, Msg), + {timer, Ref}; +send_after(Millis, Pid, Msg) -> + {erlang, erlang:send_after(Millis, Pid, Msg)}. + +cancel_timer({erlang, Ref}) -> erlang:cancel_timer(Ref), + ok; +cancel_timer({timer, Ref}) -> {ok, cancel} = timer:cancel(Ref), + ok. + store_proc_name(Type, ProcName) -> store_proc_name({Type, ProcName}). store_proc_name(TypeProcName) -> put(process_name, TypeProcName). diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index fe2b766f30..3558cf98df 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -61,13 +61,13 @@ validate_policy0(<<"dead-letter-routing-key">>, Value) -> {error, "~p is not a valid dead letter routing key", [Value]}; validate_policy0(<<"message-ttl">>, Value) - when is_integer(Value), Value >= 0, Value =< ?MAX_EXPIRY_TIMER -> + when is_integer(Value), Value >= 0 -> ok; validate_policy0(<<"message-ttl">>, Value) -> {error, "~p is not a valid message TTL", [Value]}; validate_policy0(<<"expires">>, Value) - when is_integer(Value), Value >= 1, Value =< ?MAX_EXPIRY_TIMER -> + when is_integer(Value), Value >= 1 -> ok; validate_policy0(<<"expires">>, Value) -> {error, "~p is not a valid queue expiry", [Value]}; |
