diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-07-21 13:16:20 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-07-21 13:16:20 +0100 |
| commit | 459a365dd2e23fcc6ba365fd56d5ac9484b22dad (patch) | |
| tree | 78e85af94c66a53095abf92515b900fe5249783d /src | |
| parent | dec6c75f600b30e3cfad8b45b4a5615173c2d24c (diff) | |
| download | rabbitmq-server-git-459a365dd2e23fcc6ba365fd56d5ac9484b22dad.tar.gz | |
Move logic for validating arguments to queue declaration out of the initialisation of the queue process.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 37 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 1 |
3 files changed, 40 insertions, 32 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9a05406c80..79c4e34add 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -39,6 +39,7 @@ -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, + check_declare_arguments/2, stat/1, deliver/2, requeue/3, ack/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([consumers/1, consumers_all/1]). @@ -55,6 +56,8 @@ -include("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). +-define(EXPIRES_TYPE, long). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -86,6 +89,8 @@ rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) -> 'ok' | no_return()). -spec(check_exclusive_access/2 :: (rabbit_types:amqqueue(), pid()) -> 'ok'). +-spec(check_declare_arguments/2 :: (name(), rabbit_framing:amqp_table()) -> + 'ok' | no_return()). -spec(with_exclusive_access_or_die/3 :: (name(), pid(), qfun(A)) -> A). -spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:amqqueue()]). -spec(info_keys/0 :: () -> [rabbit_types:info_key()]). @@ -228,12 +233,8 @@ store_queue(Q = #amqqueue{durable = false}) -> ok. start_queue_process(Q) -> - case rabbit_amqqueue_sup:start_child([Q]) of - {ok, Pid} -> Q#amqqueue{pid = Pid}; - {error, Error} -> rabbit_misc:protocol_error( - internal_error, "Queue ~s: ~w", - [rabbit_misc:rs(Q#amqqueue.name), Error]) - end. + {ok, Pid} = rabbit_amqqueue_sup:start_child([Q]), + Q#amqqueue{pid = Pid}. add_default_binding(#amqqueue{name = QueueName}) -> Exchange = rabbit_misc:r(QueueName, exchange, <<>>), @@ -288,6 +289,30 @@ assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName, [<<"x-expires">>]). +check_declare_arguments(QueueName, Args) -> + [case Fun(Args) of + ok -> ok; + {error, Error} -> rabbit_misc:protocol_error( + precondition_failed, + "Invalid arguments in declaration of queue ~s: " + "~w (arguments: ~w)", + [rabbit_misc:rs(QueueName), Error, Args]) + end || Fun <- [fun check_expires_argument/1]], + ok. + +check_expires_argument(Args) -> + check_expires_argument1(rabbit_misc:table_lookup(Args, <<"x-expires">>)). + +check_expires_argument1(undefined) -> + ok; +check_expires_argument1({?EXPIRES_TYPE, Expires}) + when is_integer(Expires) andalso Expires > 0 -> + ok; +check_expires_argument1({?EXPIRES_TYPE, _Expires}) -> + {error, expires_zero_or_less}; +check_expires_argument1(_) -> + {error, expires_not_of_type_long}. + list(VHostPath) -> mnesia:dirty_match_object( rabbit_queue, diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a9c827413f..ccc5577aa3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -99,26 +99,6 @@ info_keys() -> ?INFO_KEYS. %%---------------------------------------------------------------------------- --define(EXPIRES_TYPE, long). - -check_argument_expires({?EXPIRES_TYPE, Expires}) when not is_integer(Expires) -> - {error, expires_not_of_type_long}; -check_argument_expires({?EXPIRES_TYPE, Expires}) when Expires =< 0 -> - {error, expires_zero_or_less}; -check_argument_expires({?EXPIRES_TYPE, Expires}) -> - {ok, Expires}; -check_argument_expires(undefined) -> - {ok, undefined}; -check_argument_expires(_) -> - {error, expires_not_of_type_long}. - -init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) -> - case check_argument_expires( - rabbit_misc:table_lookup(Arguments, <<"x-expires">>)) of - {error, Error} -> {error, Error}; - {ok, Expires} -> start_expiry_timer(State, Expires) - end. - init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), process_flag(trap_exit, true), @@ -135,12 +115,8 @@ init(Q) -> rate_timer_ref = undefined, expiry_timer_ref = undefined}, - case init_expires(State) of - {error, Error} -> {stop, Error}; - NewState -> {ok, NewState, hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, - ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}} - end. + {ok, init_expires(State), hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(shutdown, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State); @@ -161,6 +137,12 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- +init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) -> + case rabbit_misc:table_lookup(Arguments, <<"x-expires">>) of + {long, Expires} -> start_expiry_timer(State, Expires); + undefined -> State + end. + declare(Recover, From, State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable}, backing_queue = BQ, backing_queue_state = undefined}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index c4db3ace73..2d8146190d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -726,6 +726,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin, return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, State); {error, not_found} -> + rabbit_amqqueue:check_declare_arguments(QueueName, Args), case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args, Owner) of {new, Q = #amqqueue{}} -> |
