diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-03-04 10:53:01 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-03-04 10:53:01 +0000 |
| commit | 4a0a415bd3b0a7dae6e1596cb0101fbe0f80a915 (patch) | |
| tree | 3d2b9648fd2876fbce607c184c5c8dd843793935 | |
| parent | d8a0d777bfed7eb45589a7118cb8f3573e917e18 (diff) | |
| download | rabbitmq-server-git-4a0a415bd3b0a7dae6e1596cb0101fbe0f80a915.tar.gz | |
Change the semantics of the basic.qos global flag to switch between per-consumer and per-channel prefetch.
| -rw-r--r-- | src/rabbit_amqqueue.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 17 |
4 files changed, 45 insertions, 31 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 67bf000db1..2bf5c840ae 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -26,7 +26,7 @@ -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/1, notify_policy_changed/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). --export([basic_get/4, basic_consume/9, basic_cancel/4, notify_decorators/1]). +-export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]). -export([notify_sent/2, notify_sent_queue_down/1, resume/2]). -export([notify_down_all/2, activate_limit_all/2, credit/5]). -export([on_node_down/1]). @@ -149,9 +149,10 @@ {'ok', non_neg_integer(), qmsg()} | 'empty'). -spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), non_neg_integer(), boolean()) -> 'ok'). --spec(basic_consume/9 :: +-spec(basic_consume/10 :: (rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(), - rabbit_types:ctag(), boolean(), rabbit_framing:amqp_table(), any()) + non_neg_integer(), rabbit_types:ctag(), boolean(), + rabbit_framing:amqp_table(), any()) -> rabbit_types:ok_or_error('exclusive_consume_unavailable')). -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). @@ -579,10 +580,12 @@ basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) -> delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}). basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid, LimiterPid, - LimiterActive, ConsumerTag, ExclusiveConsume, Args, OkMsg) -> + LimiterActive, ConsumerPrefetchCount, ConsumerTag, + ExclusiveConsume, Args, OkMsg) -> ok = check_consume_arguments(QName, Args), delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, Args, OkMsg}). + ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, + Args, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a0c8fb545e..97a9c312a8 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -846,13 +846,13 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName) -> prioritise_call(Msg, _From, _Len, State) -> case Msg of - info -> 9; - {info, _Items} -> 9; - consumers -> 9; - stat -> 7; - {basic_consume, _, _, _, _, _, _, _, _} -> consumer_bias(State); - {basic_cancel, _, _, _} -> consumer_bias(State); - _ -> 0 + info -> 9; + {info, _Items} -> 9; + consumers -> 9; + stat -> 7; + {basic_consume, _, _, _, _, _, _, _, _, _} -> consumer_bias(State); + {basic_cancel, _, _, _} -> consumer_bias(State); + _ -> 0 end. prioritise_cast(Msg, _Len, State) -> @@ -961,7 +961,7 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, Args, OkMsg}, + ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg}, _From, State = #q{consumers = Consumers, exclusive_consumer = Holder}) -> case check_exclusive_access(Holder, ExclusiveConsume, State) of @@ -969,6 +969,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, ok -> Consumers1 = rabbit_queue_consumers:add( ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, + ConsumerPrefetchCount, Args, is_empty(State), Consumers), ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 7907c96c6f..6421a7655f 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -38,7 +38,8 @@ queue_names, queue_monitors, consumer_mapping, queue_consumers, delivering_queues, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, - unconfirmed, confirmed, mandatory, capabilities, trace_state}). + unconfirmed, confirmed, mandatory, capabilities, trace_state, + consumer_prefetch}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -52,6 +53,7 @@ messages_uncommitted, acks_uncommitted, prefetch_count, + consumer_prefetch_count, state]). -define(CREATION_EVENT_KEYS, @@ -216,7 +218,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, confirmed = [], mandatory = dtree:empty(), capabilities = Capabilities, - trace_state = rabbit_trace:init(VHost)}, + trace_state = rabbit_trace:init(VHost), + consumer_prefetch = 0}, State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer), rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(State1, #ch.stats_timer, @@ -752,9 +755,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin, exclusive = ExclusiveConsume, nowait = NoWait, arguments = Args}, - _, State = #ch{conn_pid = ConnPid, - limiter = Limiter, - consumer_mapping = ConsumerMapping}) -> + _, State = #ch{conn_pid = ConnPid, + limiter = Limiter, + consumer_prefetch = ConsumerPrefetchCount, + consumer_mapping = ConsumerMapping}) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> QueueName = qbin_to_resource(QueueNameBin, State), @@ -776,6 +780,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, Q, NoAck, self(), rabbit_limiter:pid(Limiter), rabbit_limiter:is_active(Limiter), + ConsumerPrefetchCount, ActualConsumerTag, ExclusiveConsume, Args, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})), @@ -842,19 +847,22 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, end end; -handle_method(#'basic.qos'{global = true}, _, _State) -> - rabbit_misc:protocol_error(not_implemented, "global=true", []); - handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> rabbit_misc:protocol_error(not_implemented, "prefetch_size!=0 (~w)", [Size]); -handle_method(#'basic.qos'{prefetch_count = 0}, +handle_method(#'basic.qos'{global = false, + prefetch_count = PrefetchCount}, _, State) -> + {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount}}; + +handle_method(#'basic.qos'{global = true, + prefetch_count = 0}, _, State = #ch{limiter = Limiter}) -> Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter), {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}}; -handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, +handle_method(#'basic.qos'{global = true, + prefetch_count = PrefetchCount}, _, State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) -> %% TODO queue:len(UAMQ) is not strictly right since that counts %% unacked messages from basic.get too. Pretty obscure though. @@ -1603,6 +1611,7 @@ i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks); i(acks_uncommitted, #ch{}) -> 0; i(state, #ch{state = running}) -> credit_flow:state(); i(state, #ch{state = State}) -> State; +i(consumer_prefetch_count, #ch{consumer_prefetch = C}) -> C; i(prefetch_count, #ch{limiter = Limiter}) -> rabbit_limiter:get_prefetch_limit(Limiter); i(Item, _) -> diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index ca050f147d..2086c856a8 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -17,7 +17,7 @@ -module(rabbit_queue_consumers). -export([new/0, max_active_priority/1, inactive/1, all/1, count/0, - unacknowledged_message_count/0, add/8, remove/3, erase_ch/2, + unacknowledged_message_count/0, add/9, remove/3, erase_ch/2, send_drained/0, deliver/3, record_ack/3, subtract_acks/3, possibly_unblock/3, resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, @@ -70,7 +70,8 @@ -spec count() -> non_neg_integer(). -spec unacknowledged_message_count() -> non_neg_integer(). -spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(), - rabbit_framing:amqp_table(), boolean(), state()) -> state(). + non_neg_integer(), rabbit_framing:amqp_table(), boolean(), state()) + -> state(). -spec remove(ch(), rabbit_types:ctag(), state()) -> 'not_found' | state(). -spec erase_ch(ch(), state()) -> @@ -122,8 +123,8 @@ count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). unacknowledged_message_count() -> lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]). -add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty, - State = #state{consumers = Consumers}) -> +add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, ConsumerPrefetchCount, Args, + IsEmpty, State = #state{consumers = Consumers}) -> C = #cr{consumer_count = Count, limiter = Limiter} = ch_record(ChPid, LimiterPid), Limiter1 = case LimiterActive of @@ -132,7 +133,7 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty, end, C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1}, update_ch_record( - case parse_credit_args(Args) of + case parse_credit_args(ConsumerPrefetchCount, Args) of none -> C1; {0, auto} -> C1; {_Credit, auto} when NoAck -> C1; @@ -347,16 +348,16 @@ utilisation(#state{use = {inactive, Since, Active, Avg}}) -> %%---------------------------------------------------------------------------- -parse_credit_args(Args) -> +parse_credit_args(Default, Args) -> case rabbit_misc:table_lookup(Args, <<"x-credit">>) of {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>), rabbit_misc:table_lookup(T, <<"drain">>)} of {{long, C}, {bool, D}} -> {C, drain_mode(D)}; - _ -> none + _ -> {Default, auto} end; undefined -> case rabbit_misc:table_lookup(Args, <<"x-prefetch">>) of {_, Prefetch} -> {Prefetch, auto}; - _ -> none + _ -> {Default, auto} end end. |
