summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-03-04 10:53:01 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-03-04 10:53:01 +0000
commit4a0a415bd3b0a7dae6e1596cb0101fbe0f80a915 (patch)
tree3d2b9648fd2876fbce607c184c5c8dd843793935 /src
parentd8a0d777bfed7eb45589a7118cb8f3573e917e18 (diff)
downloadrabbitmq-server-git-4a0a415bd3b0a7dae6e1596cb0101fbe0f80a915.tar.gz
Change the semantics of the basic.qos global flag to switch between per-consumer and per-channel prefetch.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl13
-rw-r--r--src/rabbit_amqqueue_process.erl17
-rw-r--r--src/rabbit_channel.erl29
-rw-r--r--src/rabbit_queue_consumers.erl17
4 files changed, 45 insertions, 31 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 67bf000db1..2bf5c840ae 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -26,7 +26,7 @@
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/1, notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
--export([basic_get/4, basic_consume/9, basic_cancel/4, notify_decorators/1]).
+-export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]).
-export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
-export([notify_down_all/2, activate_limit_all/2, credit/5]).
-export([on_node_down/1]).
@@ -149,9 +149,10 @@
{'ok', non_neg_integer(), qmsg()} | 'empty').
-spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(),
non_neg_integer(), boolean()) -> 'ok').
--spec(basic_consume/9 ::
+-spec(basic_consume/10 ::
(rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(),
- rabbit_types:ctag(), boolean(), rabbit_framing:amqp_table(), any())
+ non_neg_integer(), rabbit_types:ctag(), boolean(),
+ rabbit_framing:amqp_table(), any())
-> rabbit_types:ok_or_error('exclusive_consume_unavailable')).
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
@@ -579,10 +580,12 @@ basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) ->
delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}).
basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid, LimiterPid,
- LimiterActive, ConsumerTag, ExclusiveConsume, Args, OkMsg) ->
+ LimiterActive, ConsumerPrefetchCount, ConsumerTag,
+ ExclusiveConsume, Args, OkMsg) ->
ok = check_consume_arguments(QName, Args),
delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, Args, OkMsg}).
+ ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume,
+ Args, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a0c8fb545e..97a9c312a8 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -846,13 +846,13 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName) ->
prioritise_call(Msg, _From, _Len, State) ->
case Msg of
- info -> 9;
- {info, _Items} -> 9;
- consumers -> 9;
- stat -> 7;
- {basic_consume, _, _, _, _, _, _, _, _} -> consumer_bias(State);
- {basic_cancel, _, _, _} -> consumer_bias(State);
- _ -> 0
+ info -> 9;
+ {info, _Items} -> 9;
+ consumers -> 9;
+ stat -> 7;
+ {basic_consume, _, _, _, _, _, _, _, _, _} -> consumer_bias(State);
+ {basic_cancel, _, _, _} -> consumer_bias(State);
+ _ -> 0
end.
prioritise_cast(Msg, _Len, State) ->
@@ -961,7 +961,7 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, Args, OkMsg},
+ ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg},
_From, State = #q{consumers = Consumers,
exclusive_consumer = Holder}) ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
@@ -969,6 +969,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
ok -> Consumers1 = rabbit_queue_consumers:add(
ChPid, ConsumerTag, NoAck,
LimiterPid, LimiterActive,
+ ConsumerPrefetchCount,
Args, is_empty(State), Consumers),
ExclusiveConsumer =
if ExclusiveConsume -> {ChPid, ConsumerTag};
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 7907c96c6f..6421a7655f 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -38,7 +38,8 @@
queue_names, queue_monitors, consumer_mapping,
queue_consumers, delivering_queues,
queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
- unconfirmed, confirmed, mandatory, capabilities, trace_state}).
+ unconfirmed, confirmed, mandatory, capabilities, trace_state,
+ consumer_prefetch}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -52,6 +53,7 @@
messages_uncommitted,
acks_uncommitted,
prefetch_count,
+ consumer_prefetch_count,
state]).
-define(CREATION_EVENT_KEYS,
@@ -216,7 +218,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
confirmed = [],
mandatory = dtree:empty(),
capabilities = Capabilities,
- trace_state = rabbit_trace:init(VHost)},
+ trace_state = rabbit_trace:init(VHost),
+ consumer_prefetch = 0},
State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer),
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(State1, #ch.stats_timer,
@@ -752,9 +755,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
exclusive = ExclusiveConsume,
nowait = NoWait,
arguments = Args},
- _, State = #ch{conn_pid = ConnPid,
- limiter = Limiter,
- consumer_mapping = ConsumerMapping}) ->
+ _, State = #ch{conn_pid = ConnPid,
+ limiter = Limiter,
+ consumer_prefetch = ConsumerPrefetchCount,
+ consumer_mapping = ConsumerMapping}) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
QueueName = qbin_to_resource(QueueNameBin, State),
@@ -776,6 +780,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
Q, NoAck, self(),
rabbit_limiter:pid(Limiter),
rabbit_limiter:is_active(Limiter),
+ ConsumerPrefetchCount,
ActualConsumerTag, ExclusiveConsume, Args,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag})),
@@ -842,19 +847,22 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait},
end
end;
-handle_method(#'basic.qos'{global = true}, _, _State) ->
- rabbit_misc:protocol_error(not_implemented, "global=true", []);
-
handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
rabbit_misc:protocol_error(not_implemented,
"prefetch_size!=0 (~w)", [Size]);
-handle_method(#'basic.qos'{prefetch_count = 0},
+handle_method(#'basic.qos'{global = false,
+ prefetch_count = PrefetchCount}, _, State) ->
+ {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount}};
+
+handle_method(#'basic.qos'{global = true,
+ prefetch_count = 0},
_, State = #ch{limiter = Limiter}) ->
Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter),
{reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}};
-handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
+handle_method(#'basic.qos'{global = true,
+ prefetch_count = PrefetchCount},
_, State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) ->
%% TODO queue:len(UAMQ) is not strictly right since that counts
%% unacked messages from basic.get too. Pretty obscure though.
@@ -1603,6 +1611,7 @@ i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks);
i(acks_uncommitted, #ch{}) -> 0;
i(state, #ch{state = running}) -> credit_flow:state();
i(state, #ch{state = State}) -> State;
+i(consumer_prefetch_count, #ch{consumer_prefetch = C}) -> C;
i(prefetch_count, #ch{limiter = Limiter}) ->
rabbit_limiter:get_prefetch_limit(Limiter);
i(Item, _) ->
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index ca050f147d..2086c856a8 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -17,7 +17,7 @@
-module(rabbit_queue_consumers).
-export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
- unacknowledged_message_count/0, add/8, remove/3, erase_ch/2,
+ unacknowledged_message_count/0, add/9, remove/3, erase_ch/2,
send_drained/0, deliver/3, record_ack/3, subtract_acks/3,
possibly_unblock/3,
resume_fun/0, notify_sent_fun/1, activate_limit_fun/0,
@@ -70,7 +70,8 @@
-spec count() -> non_neg_integer().
-spec unacknowledged_message_count() -> non_neg_integer().
-spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(),
- rabbit_framing:amqp_table(), boolean(), state()) -> state().
+ non_neg_integer(), rabbit_framing:amqp_table(), boolean(), state())
+ -> state().
-spec remove(ch(), rabbit_types:ctag(), state()) ->
'not_found' | state().
-spec erase_ch(ch(), state()) ->
@@ -122,8 +123,8 @@ count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
unacknowledged_message_count() ->
lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).
-add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty,
- State = #state{consumers = Consumers}) ->
+add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, ConsumerPrefetchCount, Args,
+ IsEmpty, State = #state{consumers = Consumers}) ->
C = #cr{consumer_count = Count,
limiter = Limiter} = ch_record(ChPid, LimiterPid),
Limiter1 = case LimiterActive of
@@ -132,7 +133,7 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty,
end,
C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1},
update_ch_record(
- case parse_credit_args(Args) of
+ case parse_credit_args(ConsumerPrefetchCount, Args) of
none -> C1;
{0, auto} -> C1;
{_Credit, auto} when NoAck -> C1;
@@ -347,16 +348,16 @@ utilisation(#state{use = {inactive, Since, Active, Avg}}) ->
%%----------------------------------------------------------------------------
-parse_credit_args(Args) ->
+parse_credit_args(Default, Args) ->
case rabbit_misc:table_lookup(Args, <<"x-credit">>) of
{table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
rabbit_misc:table_lookup(T, <<"drain">>)} of
{{long, C}, {bool, D}} -> {C, drain_mode(D)};
- _ -> none
+ _ -> {Default, auto}
end;
undefined -> case rabbit_misc:table_lookup(Args, <<"x-prefetch">>) of
{_, Prefetch} -> {Prefetch, auto};
- _ -> none
+ _ -> {Default, auto}
end
end.