summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-02-03 14:23:45 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-02-03 14:23:45 +0000
commit76658beb9c6f7015b75024753f3fd7808c07ffb8 (patch)
tree323f199daae092409bd70b7701cf7aeaaabede59 /src
parentaa81f3531d6e7d65809d7369adbec57aab490dae (diff)
parent7597e6871940458eccea7592e73a021da0021725 (diff)
downloadrabbitmq-server-git-76658beb9c6f7015b75024753f3fd7808c07ffb8.tar.gz
Merge default
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl22
-rw-r--r--src/rabbit_amqqueue_process.erl37
-rw-r--r--src/rabbit_channel.erl32
-rw-r--r--src/rabbit_limiter.erl86
-rw-r--r--src/rabbit_queue_consumers.erl52
-rw-r--r--src/rabbit_reader.erl1
6 files changed, 142 insertions, 88 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index eeb0e0bf03..8a32f46441 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -22,7 +22,7 @@
-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
- stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
+ stat/1, deliver/2, deliver_flow/2, requeue/4, ack/4, reject/5]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/0, notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
@@ -140,9 +140,10 @@
qpids()).
-spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
qpids()).
--spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
--spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok').
--spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
+-spec(requeue/4 :: (pid(), [msg_id()], rabbit_types:ctag(), pid()) -> 'ok').
+-spec(ack/4 :: (pid(), [msg_id()], rabbit_types:ctag(), pid()) -> 'ok').
+-spec(reject/5 :: (pid(), boolean(), [msg_id()], rabbit_types:ctag(), pid()) ->
+ 'ok').
-spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()).
-spec(activate_limit_all/2 :: (qpids(), pid()) -> ok_or_errors()).
-spec(basic_get/4 :: (rabbit_types:amqqueue(), pid(), boolean(), pid()) ->
@@ -432,7 +433,8 @@ declare_args() ->
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2},
{<<"x-max-length">>, fun check_non_neg_int_arg/2}].
-consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}].
+consume_args() -> [{<<"x-priority">>, fun check_int_arg/2},
+ {<<"x-prefetch">>, fun check_non_neg_int_arg/2}].
check_int_arg({Type, _}, _) ->
case lists:member(Type, ?INTEGER_ARG_TYPES) of
@@ -549,12 +551,14 @@ deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow).
deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow).
-requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}).
+requeue(QPid, MsgIds, CTag, ChPid) ->
+ delegate:call(QPid, {requeue, MsgIds, CTag, ChPid}).
-ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}).
+ack(QPid, MsgIds, CTag, ChPid) ->
+ delegate:cast(QPid, {ack, MsgIds, CTag, ChPid}).
-reject(QPid, Requeue, MsgIds, ChPid) ->
- delegate:cast(QPid, {reject, Requeue, MsgIds, ChPid}).
+reject(QPid, Requeue, MsgIds, CTag, ChPid) ->
+ delegate:cast(QPid, {reject, Requeue, MsgIds, CTag, ChPid}).
notify_down_all(QPids, ChPid) ->
{_, Bads} = delegate:call(QPids, {notify_down, ChPid}),
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 21b6089bc6..5be3bac820 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -577,16 +577,16 @@ fetch(AckRequired, State = #q{backing_queue = BQ,
State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}),
{Result, maybe_send_drained(Result =:= empty, State1)}.
-ack(AckTags, ChPid, State) ->
- subtract_acks(ChPid, AckTags, State,
+ack(AckTags, CTag, ChPid, State) ->
+ subtract_acks(AckTags, CTag, ChPid, State,
fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{_Guids, BQS1} = BQ:ack(AckTags, BQS),
State1#q{backing_queue_state = BQS1}
end).
-requeue(AckTags, ChPid, State) ->
- subtract_acks(ChPid, AckTags, State,
+requeue(AckTags, CTag, ChPid, State) ->
+ subtract_acks(AckTags, CTag, ChPid, State,
fun (State1) -> requeue_and_run(AckTags, State1) end).
possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) ->
@@ -649,10 +649,13 @@ backing_queue_timeout(State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
State#q{backing_queue_state = BQ:timeout(BQS)}.
-subtract_acks(ChPid, AckTags, State, Fun) ->
- case rabbit_queue_consumers:subtract_acks(ChPid, AckTags) of
- not_found -> State;
- ok -> Fun(State)
+subtract_acks(AckTags, CTag, ChPid, State = #q{consumers = Consumers}, Fun) ->
+ case rabbit_queue_consumers:subtract_acks(
+ AckTags, CTag, ChPid, Consumers) of
+ not_found -> State;
+ unchanged -> Fun(State);
+ {unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1},
+ run_message_queue(true, Fun(State1))
end.
message_properties(Message, Confirm, #q{ttl = TTL}) ->
@@ -1008,9 +1011,9 @@ handle_call(purge, _From, State = #q{backing_queue = BQ,
State1 = State#q{backing_queue_state = BQS1},
reply({ok, Count}, maybe_send_drained(Count =:= 0, State1));
-handle_call({requeue, AckTags, ChPid}, From, State) ->
+handle_call({requeue, AckTags, CTag, ChPid}, From, State) ->
gen_server2:reply(From, ok),
- noreply(requeue(AckTags, ChPid, State));
+ noreply(requeue(AckTags, CTag, ChPid, State));
handle_call(sync_mirrors, _From,
State = #q{backing_queue = rabbit_mirror_queue_master,
@@ -1071,21 +1074,21 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
State1 = State#q{senders = Senders1},
noreply(deliver_or_enqueue(Delivery, Delivered, State1));
-handle_cast({ack, AckTags, ChPid}, State) ->
- noreply(ack(AckTags, ChPid, State));
+handle_cast({ack, AckTags, CTag, ChPid}, State) ->
+ noreply(ack(AckTags, CTag, ChPid, State));
-handle_cast({reject, true, AckTags, ChPid}, State) ->
- noreply(requeue(AckTags, ChPid, State));
+handle_cast({reject, true, AckTags, CTag, ChPid}, State) ->
+ noreply(requeue(AckTags, CTag, ChPid, State));
-handle_cast({reject, false, AckTags, ChPid}, State) ->
+handle_cast({reject, false, AckTags, CTag, ChPid}, State) ->
noreply(with_dlx(
State#q.dlx,
- fun (X) -> subtract_acks(ChPid, AckTags, State,
+ fun (X) -> subtract_acks(AckTags, CTag, ChPid, State,
fun (State1) ->
dead_letter_rejected_msgs(
AckTags, X, State1)
end) end,
- fun () -> ack(AckTags, ChPid, State) end));
+ fun () -> ack(AckTags, CTag, ChPid, State) end));
handle_cast(delete_immediately, State) ->
stop(State);
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 4d866908ed..44545e1935 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -872,11 +872,13 @@ handle_method(#'basic.recover_async'{requeue = true},
_, State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) ->
OkFun = fun () -> ok end,
UAMQL = queue:to_list(UAMQ),
- foreach_per_queue(
- fun (QPid, MsgIds) ->
+ foreach_per_consumer(
+ fun ({QPid, CTag}, MsgIds) ->
rabbit_misc:with_exit_handler(
OkFun,
- fun () -> rabbit_amqqueue:requeue(QPid, MsgIds, self()) end)
+ fun () ->
+ rabbit_amqqueue:requeue(QPid, MsgIds, CTag, self())
+ end)
end, lists:reverse(UAMQL)),
ok = notify_limiter(Limiter, UAMQL),
%% No answer required - basic.recover is the newer, synchronous
@@ -1313,9 +1315,9 @@ reject(DeliveryTag, Requeue, Multiple,
%% NB: Acked is in youngest-first order
reject(Requeue, Acked, Limiter) ->
- foreach_per_queue(
- fun (QPid, MsgIds) ->
- rabbit_amqqueue:reject(QPid, Requeue, MsgIds, self())
+ foreach_per_consumer(
+ fun ({QPid, CTag}, MsgIds) ->
+ rabbit_amqqueue:reject(QPid, Requeue, MsgIds, CTag, self())
end, Acked),
ok = notify_limiter(Limiter, Acked).
@@ -1373,9 +1375,9 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
%% NB: Acked is in youngest-first order
ack(Acked, State = #ch{queue_names = QNames}) ->
- foreach_per_queue(
- fun (QPid, MsgIds) ->
- ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
+ foreach_per_consumer(
+ fun ({QPid, CTag}, MsgIds) ->
+ ok = rabbit_amqqueue:ack(QPid, MsgIds, CTag, self()),
?INCR_STATS(case dict:find(QPid, QNames) of
{ok, QName} -> Count = length(MsgIds),
[{queue_stats, QName, Count}];
@@ -1406,15 +1408,15 @@ notify_queues(State = #ch{consumer_mapping = Consumers,
sets:union(sets:from_list(consumer_queues(Consumers)), DQ)),
{rabbit_amqqueue:notify_down_all(QPids, self()), State#ch{state = closing}}.
-foreach_per_queue(_F, []) ->
+foreach_per_consumer(_F, []) ->
ok;
-foreach_per_queue(F, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case
- F(QPid, [MsgId]);
+foreach_per_consumer(F, [{_DTag, CTag, {QPid, MsgId}}]) -> %% common case
+ F({QPid, CTag}, [MsgId]);
%% NB: UAL should be in youngest-first order; the tree values will
%% then be in oldest-first order
-foreach_per_queue(F, UAL) ->
- T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) ->
- rabbit_misc:gb_trees_cons(QPid, MsgId, T)
+foreach_per_consumer(F, UAL) ->
+ T = lists:foldl(fun ({_DTag, CTag, {QPid, MsgId}}, T) ->
+ rabbit_misc:gb_trees_cons({QPid, CTag}, MsgId, T)
end, gb_trees:empty(), UAL),
rabbit_misc:gb_trees_foreach(F, T).
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index d37b356cc0..5776fc3f7a 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -17,7 +17,8 @@
%% The purpose of the limiter is to stem the flow of messages from
%% queues to channels, in order to act upon various protocol-level
%% flow control mechanisms, specifically AMQP 0-9-1's basic.qos
-%% prefetch_count and AMQP 1.0's link (aka consumer) credit mechanism.
+%% prefetch_count, our consumer prefetch extension, and AMQP 1.0's
+%% link (aka consumer) credit mechanism.
%%
%% Each channel has an associated limiter process, created with
%% start_link/1, which it passes to queues on consumer creation with
@@ -54,11 +55,15 @@
%% inactive. In practice it is rare for that to happen, though we
%% could optimise this case in the future.
%%
-%% In addition, the consumer credit bookkeeping is local to queues, so
-%% it is not necessary to store information about it in the limiter
-%% process. But for abstraction we hide it from the queue behind the
-%% limiter API, and it therefore becomes part of the queue local
-%% state.
+%% Consumer credit (for AMQP 1.0) and per-consumer prefetch (for AMQP
+%% 0-9-1) are treated as essentially the same thing, but with the
+%% exception that per-consumer prefetch gets an auto-topup when
+%% acknowledgments come in.
+%%
+%% The bookkeeping for this is local to queues, so it is not necessary
+%% to store information about it in the limiter process. But for
+%% abstraction we hide it from the queue behind the limiter API, and
+%% it therefore becomes part of the queue local state.
%%
%% The interactions with the limiter are as follows:
%%
@@ -66,7 +71,8 @@
%% that's what the limit_prefetch/3, unlimit_prefetch/1,
%% get_prefetch_limit/1 API functions are about. They also tell the
%% limiter queue state (via the queue) about consumer credit
-%% changes - that's what credit/5 is for.
+%% changes and message acknowledgement - that's what credit/5 and
+%% ack_from_queue/3 are for.
%%
%% 2. Queues also tell the limiter queue state about the queue
%% becoming empty (via drained/1) and consumers leaving (via
@@ -123,8 +129,8 @@
get_prefetch_limit/1, ack/2, pid/1]).
%% queue API
-export([client/1, activate/1, can_send/3, resume/1, deactivate/1,
- is_suspended/1, is_consumer_blocked/2, credit/5, drained/1,
- forget_consumer/2]).
+ is_suspended/1, is_consumer_blocked/2, credit/5, ack_from_queue/3,
+ drained/1, forget_consumer/2]).
%% callbacks
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, prioritise_call/4]).
@@ -141,6 +147,8 @@
-type(qstate() :: #qstate{pid :: pid(),
state :: 'dormant' | 'active' | 'suspended'}).
+-type(credit_mode() :: 'manual' | 'drain' | 'auto').
+
-spec(start_link/1 :: (rabbit_types:proc_name()) ->
rabbit_types:ok_pid_or_error()).
-spec(new/1 :: (pid()) -> lstate()).
@@ -161,8 +169,10 @@
-spec(deactivate/1 :: (qstate()) -> qstate()).
-spec(is_suspended/1 :: (qstate()) -> boolean()).
-spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()).
--spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean(),
- boolean()) -> {boolean(), qstate()}).
+-spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(),
+ credit_mode(), boolean()) -> {boolean(), qstate()}).
+-spec(ack_from_queue/3 :: (qstate(), rabbit_types:ctag(), non_neg_integer())
+ -> {boolean(), qstate()}).
-spec(drained/1 :: (qstate())
-> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}).
-spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()).
@@ -179,7 +189,7 @@
%% notified of a change in the limit or volume that may allow it to
%% deliver more messages via the limiter's channel.
--record(credit, {credit = 0, drain = false}).
+-record(credit, {credit = 0, mode}).
%%----------------------------------------------------------------------------
%% API
@@ -256,19 +266,32 @@ is_consumer_blocked(#qstate{credits = Credits}, CTag) ->
{value, #credit{}} -> true
end.
-credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain, IsEmpty) ->
- {Res, Cr} = case IsEmpty andalso Drain of
- true -> {true, make_credit(0, false)};
- false -> {false, make_credit(Credit, Drain)}
- end,
- {Res, Limiter#qstate{credits = gb_trees:enter(CTag, Cr, Credits)}}.
+credit(Limiter = #qstate{credits = Credits}, CTag, Crd, Mode, IsEmpty) ->
+ {Res, Cr} =
+ case IsEmpty andalso Mode =:= drain of
+ true -> {true, #credit{credit = 0, mode = manual}};
+ false -> {false, #credit{credit = Crd, mode = Mode}}
+ end,
+ {Res, Limiter#qstate{credits = enter_credit(CTag, Cr, Credits)}}.
+
+ack_from_queue(Limiter = #qstate{credits = Credits}, CTag, Credit) ->
+ {Credits1, Unblocked} =
+ case gb_trees:lookup(CTag, Credits) of
+ {value, C = #credit{mode = auto, credit = C0}} ->
+ {update_credit(CTag, C#credit{credit = C0 + Credit}, Credits),
+ C0 =:= 0 andalso Credit =/= 0};
+ _ ->
+ {Credits, false}
+ end,
+ {Unblocked, Limiter#qstate{credits = Credits1}}.
drained(Limiter = #qstate{credits = Credits}) ->
+ Drain = fun(C) -> C#credit{credit = 0, mode = manual} end,
{CTagCredits, Credits2} =
rabbit_misc:gb_trees_fold(
- fun (CTag, #credit{credit = C, drain = true}, {Acc, Creds0}) ->
- {[{CTag, C} | Acc], update_credit(CTag, 0, false, Creds0)};
- (_CTag, #credit{credit = _C, drain = false}, {Acc, Creds0}) ->
+ fun (CTag, C = #credit{credit = Crd, mode = drain}, {Acc, Creds0}) ->
+ {[{CTag, Crd} | Acc], update_credit(CTag, Drain(C), Creds0)};
+ (_CTag, #credit{credit = _Crd, mode = _Mode}, {Acc, Creds0}) ->
{Acc, Creds0}
end, {[], Credits}, Credits),
{CTagCredits, Limiter#qstate{credits = Credits2}}.
@@ -287,20 +310,25 @@ forget_consumer(Limiter = #qstate{credits = Credits}, CTag) ->
%% state for us (#qstate.credits), and maintain a fiction that the
%% limiter is making the decisions...
-make_credit(Credit, Drain) ->
- %% Using up all credit implies no need to send a 'drained' event
- #credit{credit = Credit, drain = Drain andalso Credit > 0}.
-
decrement_credit(CTag, Credits) ->
case gb_trees:lookup(CTag, Credits) of
- {value, #credit{credit = Credit, drain = Drain}} ->
- update_credit(CTag, Credit - 1, Drain, Credits);
+ {value, C = #credit{credit = Credit}} ->
+ update_credit(CTag, C#credit{credit = Credit - 1}, Credits);
none ->
Credits
end.
-update_credit(CTag, Credit, Drain, Credits) ->
- gb_trees:update(CTag, make_credit(Credit, Drain), Credits).
+enter_credit(CTag, C, Credits) ->
+ gb_trees:enter(CTag, ensure_credit_invariant(C), Credits).
+
+update_credit(CTag, C, Credits) ->
+ gb_trees:update(CTag, ensure_credit_invariant(C), Credits).
+
+ensure_credit_invariant(C = #credit{credit = 0, mode = drain}) ->
+ %% Using up all credit implies no need to send a 'drained' event
+ C#credit{mode = manual};
+ensure_credit_invariant(C) ->
+ C.
%%----------------------------------------------------------------------------
%% gen_server callbacks
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index c9540da818..017a4f9c1a 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -18,7 +18,7 @@
-export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
unacknowledged_message_count/0, add/8, remove/3, erase_ch/2,
- send_drained/0, deliver/3, record_ack/3, subtract_acks/2,
+ send_drained/0, deliver/3, record_ack/3, subtract_acks/4,
possibly_unblock/3,
resume_fun/0, notify_sent_fun/1, activate_limit_fun/0,
credit/6, utilisation/1]).
@@ -82,7 +82,8 @@
{'delivered', boolean(), T, state()} |
{'undelivered', boolean(), state()}.
-spec record_ack(ch(), pid(), ack()) -> 'ok'.
--spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'.
+-spec subtract_acks([ack()], rabbit_types:ctag(), ch(), state()) ->
+ 'not_found' | 'unchanged' | {'unblocked', state()}.
-spec possibly_unblock(cr_fun(), ch(), state()) ->
'unchanged' | {'unblocked', state()}.
-spec resume_fun() -> cr_fun().
@@ -130,11 +131,14 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty,
false -> Limiter
end,
C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1},
- update_ch_record(case parse_credit_args(Args) of
- none -> C1;
- {Crd, Drain} -> credit_and_drain(
- C1, CTag, Crd, Drain, IsEmpty)
- end),
+ update_ch_record(
+ case parse_credit_args(Args) of
+ none -> C1;
+ {0, auto} -> C1;
+ {_Credit, auto} when NoAck -> C1;
+ {Credit, Mode} -> credit_and_drain(
+ C1, CTag, Credit, Mode, IsEmpty)
+ end),
Consumer = #consumer{tag = CTag,
ack_required = not NoAck,
args = Args},
@@ -238,14 +242,20 @@ record_ack(ChPid, LimiterPid, AckTag) ->
update_ch_record(C#cr{acktags = queue:in(AckTag, ChAckTags)}),
ok.
-subtract_acks(ChPid, AckTags) ->
+subtract_acks(AckTags, CTag, ChPid, State) ->
case lookup_ch(ChPid) of
not_found ->
not_found;
- C = #cr{acktags = ChAckTags} ->
- update_ch_record(
- C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}),
- ok
+ C = #cr{acktags = ChAckTags, limiter = Lim} ->
+ {Unblocked, Lim2} =
+ rabbit_limiter:ack_from_queue(Lim, CTag, length(AckTags)),
+ AckTags2 = subtract_acks(AckTags, [], ChAckTags),
+ C2 = C#cr{acktags = AckTags2, limiter = Lim2},
+ case Unblocked of
+ true -> unblock(C2, State);
+ false -> update_ch_record(C2),
+ unchanged
+ end
end.
subtract_acks([], [], AckQ) ->
@@ -308,7 +318,7 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) ->
unchanged;
#cr{limiter = Limiter} = C ->
C1 = #cr{limiter = Limiter1} =
- credit_and_drain(C, CTag, Credit, Drain, IsEmpty),
+ credit_and_drain(C, CTag, Credit, drain_mode(Drain), IsEmpty),
case is_ch_blocked(C1) orelse
(not rabbit_limiter:is_consumer_blocked(Limiter, CTag)) orelse
rabbit_limiter:is_consumer_blocked(Limiter1, CTag) of
@@ -318,6 +328,9 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) ->
end
end.
+drain_mode(true) -> drain;
+drain_mode(false) -> manual.
+
utilisation(#state{use = {active, Since, Avg}}) ->
use_avg(now_micros() - Since, 0, Avg);
utilisation(#state{use = {inactive, Since, Active, Avg}}) ->
@@ -329,10 +342,13 @@ parse_credit_args(Args) ->
case rabbit_misc:table_lookup(Args, <<"x-credit">>) of
{table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
rabbit_misc:table_lookup(T, <<"drain">>)} of
- {{long, Credit}, {bool, Drain}} -> {Credit, Drain};
- _ -> none
+ {{long, C}, {bool, D}} -> {C, drain_mode(D)};
+ _ -> none
end;
- undefined -> none
+ undefined -> case rabbit_misc:table_lookup(Args, <<"x-prefetch">>) of
+ {_, Prefetch} -> {Prefetch, auto};
+ _ -> none
+ end
end.
lookup_ch(ChPid) ->
@@ -393,8 +409,8 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
end.
credit_and_drain(C = #cr{ch_pid = ChPid, limiter = Limiter},
- CTag, Credit, Drain, IsEmpty) ->
- case rabbit_limiter:credit(Limiter, CTag, Credit, Drain, IsEmpty) of
+ CTag, Credit, Mode, IsEmpty) ->
+ case rabbit_limiter:credit(Limiter, CTag, Credit, Mode, IsEmpty) of
{true, Limiter1} -> rabbit_channel:send_drained(ChPid,
[{CTag, Credit}]),
C#cr{limiter = Limiter1};
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 122eb30545..f7c4c0a22a 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -176,6 +176,7 @@ server_capabilities(rabbit_framing_amqp_0_9_1) ->
{<<"consumer_cancel_notify">>, bool, true},
{<<"connection.blocked">>, bool, true},
{<<"consumer_priorities">>, bool, true},
+ {<<"consumer_prefetch">>, bool, true},
{<<"authentication_failure_close">>, bool, true}];
server_capabilities(_) ->
[].