diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-15 13:37:42 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-15 13:37:42 +0000 |
| commit | 1fe25e1a3b83eef969e1ce38c37742fd846465cf (patch) | |
| tree | e142744cddbd6a125312af763b1dabb8e48a843e /src | |
| parent | c65a78ddee736dd6fedeffa3de88b3d346b08e19 (diff) | |
| download | rabbitmq-server-git-1fe25e1a3b83eef969e1ce38c37742fd846465cf.tar.gz | |
Move credit args parsing into rabbit_queue_consumers, to reduce some nasty arities and increase locality.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 24 |
4 files changed, 32 insertions, 36 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ddeeef98a1..cf94e61cfa 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -26,7 +26,7 @@ -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/0, notify_policy_changed/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). --export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]). +-export([basic_get/4, basic_consume/9, basic_cancel/4, notify_decorators/1]). -export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]). -export([notify_down_all/2, activate_limit_all/2, credit/5]). -export([on_node_down/1]). @@ -152,9 +152,9 @@ {'ok', non_neg_integer(), qmsg()} | 'empty'). -spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), non_neg_integer(), boolean()) -> 'ok'). --spec(basic_consume/10 :: +-spec(basic_consume/9 :: (rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(), - rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any(), any()) + rabbit_types:ctag(), boolean(), any(), any()) -> rabbit_types:ok_or_error('exclusive_consume_unavailable')). -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). @@ -428,9 +428,10 @@ declare_args() -> [{<<"x-expires">>, fun check_expires_arg/2}, {<<"x-message-ttl">>, fun check_message_ttl_arg/2}, {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}, - {<<"x-max-length">>, fun check_max_length_arg/2}]. + {<<"x-max-length">>, fun check_non_neg_int_arg/2}]. -consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}]. +consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}, + {<<"x-prefetch">>, fun check_non_neg_int_arg/2}]. check_int_arg({Type, _}, _) -> case lists:member(Type, ?INTEGER_ARG_TYPES) of @@ -438,7 +439,7 @@ check_int_arg({Type, _}, _) -> false -> {error, {unacceptable_type, Type}} end. -check_max_length_arg({Type, Val}, Args) -> +check_non_neg_int_arg({Type, Val}, Args) -> case check_int_arg({Type, Val}, Args) of ok when Val >= 0 -> ok; ok -> {error, {value_negative, Val}}; @@ -575,13 +576,11 @@ credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) -> basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) -> delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}). -basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid, - LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg) -> - ok = check_consume_arguments(QName, OtherArgs), +basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid, LimiterPid, + LimiterActive, ConsumerTag, ExclusiveConsume, Args, OkMsg) -> + ok = check_consume_arguments(QName, Args), delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, - OkMsg}). + ConsumerTag, ExclusiveConsume, Args, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 57c6fb238b..801c48d11f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -922,7 +922,7 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, CreditArgs, Args, OkMsg}, + ConsumerTag, ExclusiveConsume, Args, OkMsg}, _From, State = #q{consumers = Consumers, exclusive_consumer = Holder}) -> case check_exclusive_access(Holder, ExclusiveConsume, State) of @@ -930,8 +930,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, ok -> Consumers1 = rabbit_queue_consumers:add( ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, - CreditArgs, Args, - is_empty(State), Consumers), + Args, is_empty(State), Consumers), ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; true -> Holder diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index da62fd21c3..da2ba2676c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -789,8 +789,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, Q, NoAck, self(), rabbit_limiter:pid(Limiter), rabbit_limiter:is_active(Limiter), - ActualConsumerTag, ExclusiveConsume, - parse_credit_args(Args), Args, + ActualConsumerTag, ExclusiveConsume, Args, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})), Q} @@ -1280,19 +1279,6 @@ handle_consuming_queue_down(QPid, handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> State#ch{delivering_queues = sets:del_element(QPid, DQ)}. -parse_credit_args(Args) -> - case rabbit_misc:table_lookup(Args, <<"x-credit">>) of - {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>), - rabbit_misc:table_lookup(T, <<"drain">>)} of - {{long, C}, {bool, D}} -> {credit, C, D}; - _ -> none - end; - undefined -> case rabbit_misc:table_lookup(Args, <<"x-prefetch">>) of - {_, P} when is_number(P) -> {prefetch, P}; - _ -> none - end - end. - binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index c410a0997c..f864a5ba78 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -17,7 +17,7 @@ -module(rabbit_queue_consumers). -export([new/0, max_active_priority/1, inactive/1, all/1, count/0, - unacknowledged_message_count/0, add/9, remove/3, erase_ch/2, + unacknowledged_message_count/0, add/8, remove/3, erase_ch/2, send_drained/0, deliver/3, record_ack/3, subtract_acks/4, possibly_unblock/3, resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, @@ -57,7 +57,6 @@ -type ch() :: pid(). -type ack() :: non_neg_integer(). -type cr_fun() :: fun ((#cr{}) -> #cr{}). --type credit_args() :: {non_neg_integer(), boolean()} | 'none'. -type fetch_result() :: {rabbit_types:basic_message(), boolean(), ack()}. -spec new() -> state(). @@ -68,7 +67,7 @@ -spec count() -> non_neg_integer(). -spec unacknowledged_message_count() -> non_neg_integer(). -spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(), - credit_args(), rabbit_framing:amqp_table(), boolean(), + rabbit_framing:amqp_table(), boolean(), state()) -> state(). -spec remove(ch(), rabbit_types:ctag(), state()) -> 'not_found' | state(). @@ -121,8 +120,8 @@ count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). unacknowledged_message_count() -> lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]). -add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, Args, - IsEmpty, State = #state{consumers = Consumers}) -> +add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty, + State = #state{consumers = Consumers}) -> C = #cr{consumer_count = Count, limiter = Limiter} = ch_record(ChPid, LimiterPid), Limiter1 = case LimiterActive of @@ -131,7 +130,7 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, Args, end, C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1}, update_ch_record( - case CreditArgs of + case parse_credit_args(Args) of none -> C1; {credit, Credit, Drain} -> @@ -146,6 +145,19 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, Args, args = Args}, State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}. +parse_credit_args(Args) -> + case rabbit_misc:table_lookup(Args, <<"x-credit">>) of + {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>), + rabbit_misc:table_lookup(T, <<"drain">>)} of + {{long, C}, {bool, D}} -> {credit, C, D}; + _ -> none + end; + undefined -> case rabbit_misc:table_lookup(Args, <<"x-prefetch">>) of + {_, Prefetch} -> {prefetch, Prefetch}; + _ -> none + end + end. + remove(ChPid, ConsumerTag, State = #state{consumers = Consumers}) -> case lookup_ch(ChPid) of not_found -> |
