summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-09-06 16:37:20 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-09-06 16:37:20 +0100
commitbee1ea63ef56dd2382dde0240b4f2bb38edcdf3d (patch)
treeacf557a97c86d22ddd5bf5cbcf40d8e5cc863bc7 /src
parent9d5967ac9e273d757d39f277185474d23f3f08ce (diff)
downloadrabbitmq-server-git-bee1ea63ef56dd2382dde0240b4f2bb38edcdf3d.tar.gz
Hook TTL / max length / expiry into policy system.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl61
1 files changed, 33 insertions, 28 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f5c6cf85b4..9c5b5fa205 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -263,40 +263,45 @@ recovery_barrier(BarrierPid) ->
{'DOWN', MRef, process, _, _} -> ok
end.
-process_args_policy(State0 = #q{q = Q = #amqqueue{arguments = Arguments}}) ->
- State1 = lists:foldl(
- fun({Arg, Fun}, StateN) ->
- case rabbit_policy:get(Arg, Q) of
- undefined -> StateN;
- Val -> Fun(Val, StateN)
- end
- end, State0,
- [{<<"dead-letter-exchange">>, fun init_dlx/2},
- {<<"dead-letter-routing-key">>, fun init_dlx_routing_key/2}]),
+process_args_policy(State = #q{q = Q}) ->
lists:foldl(
- fun({Arg, Fun}, StateN) ->
- case rabbit_misc:table_lookup(Arguments, Arg) of
- {_Type, Val} -> Fun(Val, StateN);
- undefined -> StateN
- end
- end, State1,
- [{<<"x-expires">>, fun init_expires/2},
- {<<"x-dead-letter-exchange">>, fun init_dlx/2},
- {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2},
- {<<"x-message-ttl">>, fun init_ttl/2},
- {<<"x-max-length">>, fun init_max_length/2}]).
-
-init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
-
-init_ttl(TTL, State) -> drop_expired_msgs(State#q{ttl = TTL}).
+ fun({Name, Resolve, Fun}, StateN) ->
+ Fun(args_policy_lookup(Name, Resolve, Q), StateN)
+ end, State,
+ [{<<"expires">>, fun res_min/2, fun init_exp/2},
+ {<<"dead-letter-exchange">>, fun res_arg/2, fun init_dlx/2},
+ {<<"dead-letter-routing-key">>, fun res_arg/2, fun init_dlx_rkey/2},
+ {<<"message-ttl">>, fun res_min/2, fun init_ttl/2},
+ {<<"max-length">>, fun res_min/2, fun init_max_length/2}]).
+
+args_policy_lookup(Name, Resolve, Q = #amqqueue{arguments = Args}) ->
+ AName = <<"x-", Name/binary>>,
+ case {rabbit_policy:get(Name, Q), rabbit_misc:table_lookup(Args, AName)} of
+ {undefined, undefined} -> undefined;
+ {undefined, {_Type, Val}} -> Val;
+ {Val, undefined} -> Val;
+ {PolVal, {_Type, ArgVal}} -> Resolve(PolVal, ArgVal)
+ end.
+
+res_arg(_PolVal, ArgVal) -> ArgVal.
+res_min(PolVal, ArgVal) -> erlang:min(PolVal, ArgVal).
+
+init_exp(undefined, State) -> stop_expiry_timer(State#q{expires = undefined});
+init_exp(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}).
+init_ttl(undefined, State) -> stop_ttl_timer(State#q{ttl = undefined});
+init_ttl(TTL, State) -> drop_expired_msgs(State#q{ttl = TTL}).
+
+init_dlx(undefined, State) ->
+ State#q{dlx = undefined};
init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) ->
State#q{dlx = rabbit_misc:r(QName, exchange, DLX)}.
-init_dlx_routing_key(RoutingKey, State) ->
- State#q{dlx_routing_key = RoutingKey}.
+init_dlx_rkey(RoutingKey, State) -> State#q{dlx_routing_key = RoutingKey}.
-init_max_length(MaxLen, State) -> State#q{max_length = MaxLen}.
+init_max_length(MaxLen, State) ->
+ {_Dropped, State1} = maybe_drop_head(State#q{max_length = MaxLen}),
+ State1.
terminate_shutdown(Fun, State) ->
State1 = #q{backing_queue_state = BQS} =