diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2013-09-06 16:37:20 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2013-09-06 16:37:20 +0100 |
| commit | bee1ea63ef56dd2382dde0240b4f2bb38edcdf3d (patch) | |
| tree | acf557a97c86d22ddd5bf5cbcf40d8e5cc863bc7 /src | |
| parent | 9d5967ac9e273d757d39f277185474d23f3f08ce (diff) | |
| download | rabbitmq-server-git-bee1ea63ef56dd2382dde0240b4f2bb38edcdf3d.tar.gz | |
Hook TTL / max length / expiry into policy system.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 61 |
1 files changed, 33 insertions, 28 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f5c6cf85b4..9c5b5fa205 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -263,40 +263,45 @@ recovery_barrier(BarrierPid) -> {'DOWN', MRef, process, _, _} -> ok end. -process_args_policy(State0 = #q{q = Q = #amqqueue{arguments = Arguments}}) -> - State1 = lists:foldl( - fun({Arg, Fun}, StateN) -> - case rabbit_policy:get(Arg, Q) of - undefined -> StateN; - Val -> Fun(Val, StateN) - end - end, State0, - [{<<"dead-letter-exchange">>, fun init_dlx/2}, - {<<"dead-letter-routing-key">>, fun init_dlx_routing_key/2}]), +process_args_policy(State = #q{q = Q}) -> lists:foldl( - fun({Arg, Fun}, StateN) -> - case rabbit_misc:table_lookup(Arguments, Arg) of - {_Type, Val} -> Fun(Val, StateN); - undefined -> StateN - end - end, State1, - [{<<"x-expires">>, fun init_expires/2}, - {<<"x-dead-letter-exchange">>, fun init_dlx/2}, - {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2}, - {<<"x-message-ttl">>, fun init_ttl/2}, - {<<"x-max-length">>, fun init_max_length/2}]). - -init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). - -init_ttl(TTL, State) -> drop_expired_msgs(State#q{ttl = TTL}). + fun({Name, Resolve, Fun}, StateN) -> + Fun(args_policy_lookup(Name, Resolve, Q), StateN) + end, State, + [{<<"expires">>, fun res_min/2, fun init_exp/2}, + {<<"dead-letter-exchange">>, fun res_arg/2, fun init_dlx/2}, + {<<"dead-letter-routing-key">>, fun res_arg/2, fun init_dlx_rkey/2}, + {<<"message-ttl">>, fun res_min/2, fun init_ttl/2}, + {<<"max-length">>, fun res_min/2, fun init_max_length/2}]). + +args_policy_lookup(Name, Resolve, Q = #amqqueue{arguments = Args}) -> + AName = <<"x-", Name/binary>>, + case {rabbit_policy:get(Name, Q), rabbit_misc:table_lookup(Args, AName)} of + {undefined, undefined} -> undefined; + {undefined, {_Type, Val}} -> Val; + {Val, undefined} -> Val; + {PolVal, {_Type, ArgVal}} -> Resolve(PolVal, ArgVal) + end. + +res_arg(_PolVal, ArgVal) -> ArgVal. +res_min(PolVal, ArgVal) -> erlang:min(PolVal, ArgVal). + +init_exp(undefined, State) -> stop_expiry_timer(State#q{expires = undefined}); +init_exp(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). +init_ttl(undefined, State) -> stop_ttl_timer(State#q{ttl = undefined}); +init_ttl(TTL, State) -> drop_expired_msgs(State#q{ttl = TTL}). + +init_dlx(undefined, State) -> + State#q{dlx = undefined}; init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) -> State#q{dlx = rabbit_misc:r(QName, exchange, DLX)}. -init_dlx_routing_key(RoutingKey, State) -> - State#q{dlx_routing_key = RoutingKey}. +init_dlx_rkey(RoutingKey, State) -> State#q{dlx_routing_key = RoutingKey}. -init_max_length(MaxLen, State) -> State#q{max_length = MaxLen}. +init_max_length(MaxLen, State) -> + {_Dropped, State1} = maybe_drop_head(State#q{max_length = MaxLen}), + State1. terminate_shutdown(Fun, State) -> State1 = #q{backing_queue_state = BQS} = |
