summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-15 13:37:42 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-15 13:37:42 +0000
commit1fe25e1a3b83eef969e1ce38c37742fd846465cf (patch)
treee142744cddbd6a125312af763b1dabb8e48a843e /src
parentc65a78ddee736dd6fedeffa3de88b3d346b08e19 (diff)
downloadrabbitmq-server-git-1fe25e1a3b83eef969e1ce38c37742fd846465cf.tar.gz
Move credit args parsing into rabbit_queue_consumers, to reduce some nasty arities and increase locality.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl23
-rw-r--r--src/rabbit_amqqueue_process.erl5
-rw-r--r--src/rabbit_channel.erl16
-rw-r--r--src/rabbit_queue_consumers.erl24
4 files changed, 32 insertions, 36 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index ddeeef98a1..cf94e61cfa 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -26,7 +26,7 @@
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/0, notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
--export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]).
+-export([basic_get/4, basic_consume/9, basic_cancel/4, notify_decorators/1]).
-export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]).
-export([notify_down_all/2, activate_limit_all/2, credit/5]).
-export([on_node_down/1]).
@@ -152,9 +152,9 @@
{'ok', non_neg_integer(), qmsg()} | 'empty').
-spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(),
non_neg_integer(), boolean()) -> 'ok').
--spec(basic_consume/10 ::
+-spec(basic_consume/9 ::
(rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(),
- rabbit_types:ctag(), boolean(), {non_neg_integer(), boolean()} | 'none', any(), any())
+ rabbit_types:ctag(), boolean(), any(), any())
-> rabbit_types:ok_or_error('exclusive_consume_unavailable')).
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
@@ -428,9 +428,10 @@ declare_args() ->
[{<<"x-expires">>, fun check_expires_arg/2},
{<<"x-message-ttl">>, fun check_message_ttl_arg/2},
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
- {<<"x-max-length">>, fun check_max_length_arg/2}].
+ {<<"x-max-length">>, fun check_non_neg_int_arg/2}].
-consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}].
+consume_args() -> [{<<"x-priority">>, fun check_int_arg/2},
+ {<<"x-prefetch">>, fun check_non_neg_int_arg/2}].
check_int_arg({Type, _}, _) ->
case lists:member(Type, ?INTEGER_ARG_TYPES) of
@@ -438,7 +439,7 @@ check_int_arg({Type, _}, _) ->
false -> {error, {unacceptable_type, Type}}
end.
-check_max_length_arg({Type, Val}, Args) ->
+check_non_neg_int_arg({Type, Val}, Args) ->
case check_int_arg({Type, Val}, Args) of
ok when Val >= 0 -> ok;
ok -> {error, {value_negative, Val}};
@@ -575,13 +576,11 @@ credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) ->
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) ->
delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}).
-basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid,
- LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs, OkMsg) ->
- ok = check_consume_arguments(QName, OtherArgs),
+basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid, LimiterPid,
+ LimiterActive, ConsumerTag, ExclusiveConsume, Args, OkMsg) ->
+ ok = check_consume_arguments(QName, Args),
delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, CreditArgs, OtherArgs,
- OkMsg}).
+ ConsumerTag, ExclusiveConsume, Args, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 57c6fb238b..801c48d11f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -922,7 +922,7 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, CreditArgs, Args, OkMsg},
+ ConsumerTag, ExclusiveConsume, Args, OkMsg},
_From, State = #q{consumers = Consumers,
exclusive_consumer = Holder}) ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
@@ -930,8 +930,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
ok -> Consumers1 = rabbit_queue_consumers:add(
ChPid, ConsumerTag, NoAck,
LimiterPid, LimiterActive,
- CreditArgs, Args,
- is_empty(State), Consumers),
+ Args, is_empty(State), Consumers),
ExclusiveConsumer =
if ExclusiveConsume -> {ChPid, ConsumerTag};
true -> Holder
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index da62fd21c3..da2ba2676c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -789,8 +789,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
Q, NoAck, self(),
rabbit_limiter:pid(Limiter),
rabbit_limiter:is_active(Limiter),
- ActualConsumerTag, ExclusiveConsume,
- parse_credit_args(Args), Args,
+ ActualConsumerTag, ExclusiveConsume, Args,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag})),
Q}
@@ -1280,19 +1279,6 @@ handle_consuming_queue_down(QPid,
handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
State#ch{delivering_queues = sets:del_element(QPid, DQ)}.
-parse_credit_args(Args) ->
- case rabbit_misc:table_lookup(Args, <<"x-credit">>) of
- {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
- rabbit_misc:table_lookup(T, <<"drain">>)} of
- {{long, C}, {bool, D}} -> {credit, C, D};
- _ -> none
- end;
- undefined -> case rabbit_misc:table_lookup(Args, <<"x-prefetch">>) of
- {_, P} when is_number(P) -> {prefetch, P};
- _ -> none
- end
- end.
-
binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin,
RoutingKey, Arguments, ReturnMethod, NoWait,
State = #ch{virtual_host = VHostPath,
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index c410a0997c..f864a5ba78 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -17,7 +17,7 @@
-module(rabbit_queue_consumers).
-export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
- unacknowledged_message_count/0, add/9, remove/3, erase_ch/2,
+ unacknowledged_message_count/0, add/8, remove/3, erase_ch/2,
send_drained/0, deliver/3, record_ack/3, subtract_acks/4,
possibly_unblock/3,
resume_fun/0, notify_sent_fun/1, activate_limit_fun/0,
@@ -57,7 +57,6 @@
-type ch() :: pid().
-type ack() :: non_neg_integer().
-type cr_fun() :: fun ((#cr{}) -> #cr{}).
--type credit_args() :: {non_neg_integer(), boolean()} | 'none'.
-type fetch_result() :: {rabbit_types:basic_message(), boolean(), ack()}.
-spec new() -> state().
@@ -68,7 +67,7 @@
-spec count() -> non_neg_integer().
-spec unacknowledged_message_count() -> non_neg_integer().
-spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(),
- credit_args(), rabbit_framing:amqp_table(), boolean(),
+ rabbit_framing:amqp_table(), boolean(),
state()) -> state().
-spec remove(ch(), rabbit_types:ctag(), state()) ->
'not_found' | state().
@@ -121,8 +120,8 @@ count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
unacknowledged_message_count() ->
lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).
-add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, Args,
- IsEmpty, State = #state{consumers = Consumers}) ->
+add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty,
+ State = #state{consumers = Consumers}) ->
C = #cr{consumer_count = Count,
limiter = Limiter} = ch_record(ChPid, LimiterPid),
Limiter1 = case LimiterActive of
@@ -131,7 +130,7 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, Args,
end,
C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1},
update_ch_record(
- case CreditArgs of
+ case parse_credit_args(Args) of
none ->
C1;
{credit, Credit, Drain} ->
@@ -146,6 +145,19 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, Args,
args = Args},
State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}.
+parse_credit_args(Args) ->
+ case rabbit_misc:table_lookup(Args, <<"x-credit">>) of
+ {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
+ rabbit_misc:table_lookup(T, <<"drain">>)} of
+ {{long, C}, {bool, D}} -> {credit, C, D};
+ _ -> none
+ end;
+ undefined -> case rabbit_misc:table_lookup(Args, <<"x-prefetch">>) of
+ {_, Prefetch} -> {prefetch, Prefetch};
+ _ -> none
+ end
+ end.
+
remove(ChPid, ConsumerTag, State = #state{consumers = Consumers}) ->
case lookup_ch(ChPid) of
not_found ->