summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-07-21 13:16:20 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-07-21 13:16:20 +0100
commit459a365dd2e23fcc6ba365fd56d5ac9484b22dad (patch)
tree78e85af94c66a53095abf92515b900fe5249783d /src
parentdec6c75f600b30e3cfad8b45b4a5615173c2d24c (diff)
downloadrabbitmq-server-git-459a365dd2e23fcc6ba365fd56d5ac9484b22dad.tar.gz
Move logic for validating arguments to queue declaration out of the initialisation of the queue process.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl37
-rw-r--r--src/rabbit_amqqueue_process.erl34
-rw-r--r--src/rabbit_channel.erl1
3 files changed, 40 insertions, 32 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 9a05406c80..79c4e34add 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -39,6 +39,7 @@
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
+ check_declare_arguments/2,
stat/1, deliver/2, requeue/3, ack/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([consumers/1, consumers_all/1]).
@@ -55,6 +56,8 @@
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
+-define(EXPIRES_TYPE, long).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -86,6 +89,8 @@
rabbit_framing:amqp_table(), rabbit_types:maybe(pid()))
-> 'ok' | no_return()).
-spec(check_exclusive_access/2 :: (rabbit_types:amqqueue(), pid()) -> 'ok').
+-spec(check_declare_arguments/2 :: (name(), rabbit_framing:amqp_table()) ->
+ 'ok' | no_return()).
-spec(with_exclusive_access_or_die/3 :: (name(), pid(), qfun(A)) -> A).
-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:amqqueue()]).
-spec(info_keys/0 :: () -> [rabbit_types:info_key()]).
@@ -228,12 +233,8 @@ store_queue(Q = #amqqueue{durable = false}) ->
ok.
start_queue_process(Q) ->
- case rabbit_amqqueue_sup:start_child([Q]) of
- {ok, Pid} -> Q#amqqueue{pid = Pid};
- {error, Error} -> rabbit_misc:protocol_error(
- internal_error, "Queue ~s: ~w",
- [rabbit_misc:rs(Q#amqqueue.name), Error])
- end.
+ {ok, Pid} = rabbit_amqqueue_sup:start_child([Q]),
+ Q#amqqueue{pid = Pid}.
add_default_binding(#amqqueue{name = QueueName}) ->
Exchange = rabbit_misc:r(QueueName, exchange, <<>>),
@@ -288,6 +289,30 @@ assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName,
[<<"x-expires">>]).
+check_declare_arguments(QueueName, Args) ->
+ [case Fun(Args) of
+ ok -> ok;
+ {error, Error} -> rabbit_misc:protocol_error(
+ precondition_failed,
+ "Invalid arguments in declaration of queue ~s: "
+ "~w (arguments: ~w)",
+ [rabbit_misc:rs(QueueName), Error, Args])
+ end || Fun <- [fun check_expires_argument/1]],
+ ok.
+
+check_expires_argument(Args) ->
+ check_expires_argument1(rabbit_misc:table_lookup(Args, <<"x-expires">>)).
+
+check_expires_argument1(undefined) ->
+ ok;
+check_expires_argument1({?EXPIRES_TYPE, Expires})
+ when is_integer(Expires) andalso Expires > 0 ->
+ ok;
+check_expires_argument1({?EXPIRES_TYPE, _Expires}) ->
+ {error, expires_zero_or_less};
+check_expires_argument1(_) ->
+ {error, expires_not_of_type_long}.
+
list(VHostPath) ->
mnesia:dirty_match_object(
rabbit_queue,
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a9c827413f..ccc5577aa3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -99,26 +99,6 @@ info_keys() -> ?INFO_KEYS.
%%----------------------------------------------------------------------------
--define(EXPIRES_TYPE, long).
-
-check_argument_expires({?EXPIRES_TYPE, Expires}) when not is_integer(Expires) ->
- {error, expires_not_of_type_long};
-check_argument_expires({?EXPIRES_TYPE, Expires}) when Expires =< 0 ->
- {error, expires_zero_or_less};
-check_argument_expires({?EXPIRES_TYPE, Expires}) ->
- {ok, Expires};
-check_argument_expires(undefined) ->
- {ok, undefined};
-check_argument_expires(_) ->
- {error, expires_not_of_type_long}.
-
-init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) ->
- case check_argument_expires(
- rabbit_misc:table_lookup(Arguments, <<"x-expires">>)) of
- {error, Error} -> {error, Error};
- {ok, Expires} -> start_expiry_timer(State, Expires)
- end.
-
init(Q) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
@@ -135,12 +115,8 @@ init(Q) ->
rate_timer_ref = undefined,
expiry_timer_ref = undefined},
- case init_expires(State) of
- {error, Error} -> {stop, Error};
- NewState -> {ok, NewState, hibernate,
- {backoff, ?HIBERNATE_AFTER_MIN,
- ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}
- end.
+ {ok, init_expires(State), hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(shutdown, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State);
@@ -161,6 +137,12 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
+init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) ->
+ case rabbit_misc:table_lookup(Arguments, <<"x-expires">>) of
+ {long, Expires} -> start_expiry_timer(State, Expires);
+ undefined -> State
+ end.
+
declare(Recover, From,
State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable},
backing_queue = BQ, backing_queue_state = undefined}) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index c4db3ace73..2d8146190d 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -726,6 +726,7 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
return_queue_declare_ok(QueueName, NoWait, MessageCount,
ConsumerCount, State);
{error, not_found} ->
+ rabbit_amqqueue:check_declare_arguments(QueueName, Args),
case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
Args, Owner) of
{new, Q = #amqqueue{}} ->