diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-02-03 14:23:45 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-02-03 14:23:45 +0000 |
| commit | 76658beb9c6f7015b75024753f3fd7808c07ffb8 (patch) | |
| tree | 323f199daae092409bd70b7701cf7aeaaabede59 /src | |
| parent | aa81f3531d6e7d65809d7369adbec57aab490dae (diff) | |
| parent | 7597e6871940458eccea7592e73a021da0021725 (diff) | |
| download | rabbitmq-server-git-76658beb9c6f7015b75024753f3fd7808c07ffb8.tar.gz | |
Merge default
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 37 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 32 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 86 | ||||
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 52 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 1 |
6 files changed, 142 insertions, 88 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index eeb0e0bf03..8a32f46441 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -22,7 +22,7 @@ -export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, - stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). + stat/1, deliver/2, deliver_flow/2, requeue/4, ack/4, reject/5]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/0, notify_policy_changed/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). @@ -140,9 +140,10 @@ qpids()). -spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> qpids()). --spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). --spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). --spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). +-spec(requeue/4 :: (pid(), [msg_id()], rabbit_types:ctag(), pid()) -> 'ok'). +-spec(ack/4 :: (pid(), [msg_id()], rabbit_types:ctag(), pid()) -> 'ok'). +-spec(reject/5 :: (pid(), boolean(), [msg_id()], rabbit_types:ctag(), pid()) -> + 'ok'). -spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()). -spec(activate_limit_all/2 :: (qpids(), pid()) -> ok_or_errors()). -spec(basic_get/4 :: (rabbit_types:amqqueue(), pid(), boolean(), pid()) -> @@ -432,7 +433,8 @@ declare_args() -> {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}, {<<"x-max-length">>, fun check_non_neg_int_arg/2}]. -consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}]. +consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}, + {<<"x-prefetch">>, fun check_non_neg_int_arg/2}]. check_int_arg({Type, _}, _) -> case lists:member(Type, ?INTEGER_ARG_TYPES) of @@ -549,12 +551,14 @@ deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow). deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow). -requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}). +requeue(QPid, MsgIds, CTag, ChPid) -> + delegate:call(QPid, {requeue, MsgIds, CTag, ChPid}). -ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}). +ack(QPid, MsgIds, CTag, ChPid) -> + delegate:cast(QPid, {ack, MsgIds, CTag, ChPid}). -reject(QPid, Requeue, MsgIds, ChPid) -> - delegate:cast(QPid, {reject, Requeue, MsgIds, ChPid}). +reject(QPid, Requeue, MsgIds, CTag, ChPid) -> + delegate:cast(QPid, {reject, Requeue, MsgIds, CTag, ChPid}). notify_down_all(QPids, ChPid) -> {_, Bads} = delegate:call(QPids, {notify_down, ChPid}), diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 21b6089bc6..5be3bac820 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -577,16 +577,16 @@ fetch(AckRequired, State = #q{backing_queue = BQ, State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}), {Result, maybe_send_drained(Result =:= empty, State1)}. -ack(AckTags, ChPid, State) -> - subtract_acks(ChPid, AckTags, State, +ack(AckTags, CTag, ChPid, State) -> + subtract_acks(AckTags, CTag, ChPid, State, fun (State1 = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {_Guids, BQS1} = BQ:ack(AckTags, BQS), State1#q{backing_queue_state = BQS1} end). -requeue(AckTags, ChPid, State) -> - subtract_acks(ChPid, AckTags, State, +requeue(AckTags, CTag, ChPid, State) -> + subtract_acks(AckTags, CTag, ChPid, State, fun (State1) -> requeue_and_run(AckTags, State1) end). possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) -> @@ -649,10 +649,13 @@ backing_queue_timeout(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> State#q{backing_queue_state = BQ:timeout(BQS)}. -subtract_acks(ChPid, AckTags, State, Fun) -> - case rabbit_queue_consumers:subtract_acks(ChPid, AckTags) of - not_found -> State; - ok -> Fun(State) +subtract_acks(AckTags, CTag, ChPid, State = #q{consumers = Consumers}, Fun) -> + case rabbit_queue_consumers:subtract_acks( + AckTags, CTag, ChPid, Consumers) of + not_found -> State; + unchanged -> Fun(State); + {unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1}, + run_message_queue(true, Fun(State1)) end. message_properties(Message, Confirm, #q{ttl = TTL}) -> @@ -1008,9 +1011,9 @@ handle_call(purge, _From, State = #q{backing_queue = BQ, State1 = State#q{backing_queue_state = BQS1}, reply({ok, Count}, maybe_send_drained(Count =:= 0, State1)); -handle_call({requeue, AckTags, ChPid}, From, State) -> +handle_call({requeue, AckTags, CTag, ChPid}, From, State) -> gen_server2:reply(From, ok), - noreply(requeue(AckTags, ChPid, State)); + noreply(requeue(AckTags, CTag, ChPid, State)); handle_call(sync_mirrors, _From, State = #q{backing_queue = rabbit_mirror_queue_master, @@ -1071,21 +1074,21 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow}, State1 = State#q{senders = Senders1}, noreply(deliver_or_enqueue(Delivery, Delivered, State1)); -handle_cast({ack, AckTags, ChPid}, State) -> - noreply(ack(AckTags, ChPid, State)); +handle_cast({ack, AckTags, CTag, ChPid}, State) -> + noreply(ack(AckTags, CTag, ChPid, State)); -handle_cast({reject, true, AckTags, ChPid}, State) -> - noreply(requeue(AckTags, ChPid, State)); +handle_cast({reject, true, AckTags, CTag, ChPid}, State) -> + noreply(requeue(AckTags, CTag, ChPid, State)); -handle_cast({reject, false, AckTags, ChPid}, State) -> +handle_cast({reject, false, AckTags, CTag, ChPid}, State) -> noreply(with_dlx( State#q.dlx, - fun (X) -> subtract_acks(ChPid, AckTags, State, + fun (X) -> subtract_acks(AckTags, CTag, ChPid, State, fun (State1) -> dead_letter_rejected_msgs( AckTags, X, State1) end) end, - fun () -> ack(AckTags, ChPid, State) end)); + fun () -> ack(AckTags, CTag, ChPid, State) end)); handle_cast(delete_immediately, State) -> stop(State); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 4d866908ed..44545e1935 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -872,11 +872,13 @@ handle_method(#'basic.recover_async'{requeue = true}, _, State = #ch{unacked_message_q = UAMQ, limiter = Limiter}) -> OkFun = fun () -> ok end, UAMQL = queue:to_list(UAMQ), - foreach_per_queue( - fun (QPid, MsgIds) -> + foreach_per_consumer( + fun ({QPid, CTag}, MsgIds) -> rabbit_misc:with_exit_handler( OkFun, - fun () -> rabbit_amqqueue:requeue(QPid, MsgIds, self()) end) + fun () -> + rabbit_amqqueue:requeue(QPid, MsgIds, CTag, self()) + end) end, lists:reverse(UAMQL)), ok = notify_limiter(Limiter, UAMQL), %% No answer required - basic.recover is the newer, synchronous @@ -1313,9 +1315,9 @@ reject(DeliveryTag, Requeue, Multiple, %% NB: Acked is in youngest-first order reject(Requeue, Acked, Limiter) -> - foreach_per_queue( - fun (QPid, MsgIds) -> - rabbit_amqqueue:reject(QPid, Requeue, MsgIds, self()) + foreach_per_consumer( + fun ({QPid, CTag}, MsgIds) -> + rabbit_amqqueue:reject(QPid, Requeue, MsgIds, CTag, self()) end, Acked), ok = notify_limiter(Limiter, Acked). @@ -1373,9 +1375,9 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> %% NB: Acked is in youngest-first order ack(Acked, State = #ch{queue_names = QNames}) -> - foreach_per_queue( - fun (QPid, MsgIds) -> - ok = rabbit_amqqueue:ack(QPid, MsgIds, self()), + foreach_per_consumer( + fun ({QPid, CTag}, MsgIds) -> + ok = rabbit_amqqueue:ack(QPid, MsgIds, CTag, self()), ?INCR_STATS(case dict:find(QPid, QNames) of {ok, QName} -> Count = length(MsgIds), [{queue_stats, QName, Count}]; @@ -1406,15 +1408,15 @@ notify_queues(State = #ch{consumer_mapping = Consumers, sets:union(sets:from_list(consumer_queues(Consumers)), DQ)), {rabbit_amqqueue:notify_down_all(QPids, self()), State#ch{state = closing}}. -foreach_per_queue(_F, []) -> +foreach_per_consumer(_F, []) -> ok; -foreach_per_queue(F, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case - F(QPid, [MsgId]); +foreach_per_consumer(F, [{_DTag, CTag, {QPid, MsgId}}]) -> %% common case + F({QPid, CTag}, [MsgId]); %% NB: UAL should be in youngest-first order; the tree values will %% then be in oldest-first order -foreach_per_queue(F, UAL) -> - T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) -> - rabbit_misc:gb_trees_cons(QPid, MsgId, T) +foreach_per_consumer(F, UAL) -> + T = lists:foldl(fun ({_DTag, CTag, {QPid, MsgId}}, T) -> + rabbit_misc:gb_trees_cons({QPid, CTag}, MsgId, T) end, gb_trees:empty(), UAL), rabbit_misc:gb_trees_foreach(F, T). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index d37b356cc0..5776fc3f7a 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -17,7 +17,8 @@ %% The purpose of the limiter is to stem the flow of messages from %% queues to channels, in order to act upon various protocol-level %% flow control mechanisms, specifically AMQP 0-9-1's basic.qos -%% prefetch_count and AMQP 1.0's link (aka consumer) credit mechanism. +%% prefetch_count, our consumer prefetch extension, and AMQP 1.0's +%% link (aka consumer) credit mechanism. %% %% Each channel has an associated limiter process, created with %% start_link/1, which it passes to queues on consumer creation with @@ -54,11 +55,15 @@ %% inactive. In practice it is rare for that to happen, though we %% could optimise this case in the future. %% -%% In addition, the consumer credit bookkeeping is local to queues, so -%% it is not necessary to store information about it in the limiter -%% process. But for abstraction we hide it from the queue behind the -%% limiter API, and it therefore becomes part of the queue local -%% state. +%% Consumer credit (for AMQP 1.0) and per-consumer prefetch (for AMQP +%% 0-9-1) are treated as essentially the same thing, but with the +%% exception that per-consumer prefetch gets an auto-topup when +%% acknowledgments come in. +%% +%% The bookkeeping for this is local to queues, so it is not necessary +%% to store information about it in the limiter process. But for +%% abstraction we hide it from the queue behind the limiter API, and +%% it therefore becomes part of the queue local state. %% %% The interactions with the limiter are as follows: %% @@ -66,7 +71,8 @@ %% that's what the limit_prefetch/3, unlimit_prefetch/1, %% get_prefetch_limit/1 API functions are about. They also tell the %% limiter queue state (via the queue) about consumer credit -%% changes - that's what credit/5 is for. +%% changes and message acknowledgement - that's what credit/5 and +%% ack_from_queue/3 are for. %% %% 2. Queues also tell the limiter queue state about the queue %% becoming empty (via drained/1) and consumers leaving (via @@ -123,8 +129,8 @@ get_prefetch_limit/1, ack/2, pid/1]). %% queue API -export([client/1, activate/1, can_send/3, resume/1, deactivate/1, - is_suspended/1, is_consumer_blocked/2, credit/5, drained/1, - forget_consumer/2]). + is_suspended/1, is_consumer_blocked/2, credit/5, ack_from_queue/3, + drained/1, forget_consumer/2]). %% callbacks -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, prioritise_call/4]). @@ -141,6 +147,8 @@ -type(qstate() :: #qstate{pid :: pid(), state :: 'dormant' | 'active' | 'suspended'}). +-type(credit_mode() :: 'manual' | 'drain' | 'auto'). + -spec(start_link/1 :: (rabbit_types:proc_name()) -> rabbit_types:ok_pid_or_error()). -spec(new/1 :: (pid()) -> lstate()). @@ -161,8 +169,10 @@ -spec(deactivate/1 :: (qstate()) -> qstate()). -spec(is_suspended/1 :: (qstate()) -> boolean()). -spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()). --spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean(), - boolean()) -> {boolean(), qstate()}). +-spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), + credit_mode(), boolean()) -> {boolean(), qstate()}). +-spec(ack_from_queue/3 :: (qstate(), rabbit_types:ctag(), non_neg_integer()) + -> {boolean(), qstate()}). -spec(drained/1 :: (qstate()) -> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}). -spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()). @@ -179,7 +189,7 @@ %% notified of a change in the limit or volume that may allow it to %% deliver more messages via the limiter's channel. --record(credit, {credit = 0, drain = false}). +-record(credit, {credit = 0, mode}). %%---------------------------------------------------------------------------- %% API @@ -256,19 +266,32 @@ is_consumer_blocked(#qstate{credits = Credits}, CTag) -> {value, #credit{}} -> true end. -credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain, IsEmpty) -> - {Res, Cr} = case IsEmpty andalso Drain of - true -> {true, make_credit(0, false)}; - false -> {false, make_credit(Credit, Drain)} - end, - {Res, Limiter#qstate{credits = gb_trees:enter(CTag, Cr, Credits)}}. +credit(Limiter = #qstate{credits = Credits}, CTag, Crd, Mode, IsEmpty) -> + {Res, Cr} = + case IsEmpty andalso Mode =:= drain of + true -> {true, #credit{credit = 0, mode = manual}}; + false -> {false, #credit{credit = Crd, mode = Mode}} + end, + {Res, Limiter#qstate{credits = enter_credit(CTag, Cr, Credits)}}. + +ack_from_queue(Limiter = #qstate{credits = Credits}, CTag, Credit) -> + {Credits1, Unblocked} = + case gb_trees:lookup(CTag, Credits) of + {value, C = #credit{mode = auto, credit = C0}} -> + {update_credit(CTag, C#credit{credit = C0 + Credit}, Credits), + C0 =:= 0 andalso Credit =/= 0}; + _ -> + {Credits, false} + end, + {Unblocked, Limiter#qstate{credits = Credits1}}. drained(Limiter = #qstate{credits = Credits}) -> + Drain = fun(C) -> C#credit{credit = 0, mode = manual} end, {CTagCredits, Credits2} = rabbit_misc:gb_trees_fold( - fun (CTag, #credit{credit = C, drain = true}, {Acc, Creds0}) -> - {[{CTag, C} | Acc], update_credit(CTag, 0, false, Creds0)}; - (_CTag, #credit{credit = _C, drain = false}, {Acc, Creds0}) -> + fun (CTag, C = #credit{credit = Crd, mode = drain}, {Acc, Creds0}) -> + {[{CTag, Crd} | Acc], update_credit(CTag, Drain(C), Creds0)}; + (_CTag, #credit{credit = _Crd, mode = _Mode}, {Acc, Creds0}) -> {Acc, Creds0} end, {[], Credits}, Credits), {CTagCredits, Limiter#qstate{credits = Credits2}}. @@ -287,20 +310,25 @@ forget_consumer(Limiter = #qstate{credits = Credits}, CTag) -> %% state for us (#qstate.credits), and maintain a fiction that the %% limiter is making the decisions... -make_credit(Credit, Drain) -> - %% Using up all credit implies no need to send a 'drained' event - #credit{credit = Credit, drain = Drain andalso Credit > 0}. - decrement_credit(CTag, Credits) -> case gb_trees:lookup(CTag, Credits) of - {value, #credit{credit = Credit, drain = Drain}} -> - update_credit(CTag, Credit - 1, Drain, Credits); + {value, C = #credit{credit = Credit}} -> + update_credit(CTag, C#credit{credit = Credit - 1}, Credits); none -> Credits end. -update_credit(CTag, Credit, Drain, Credits) -> - gb_trees:update(CTag, make_credit(Credit, Drain), Credits). +enter_credit(CTag, C, Credits) -> + gb_trees:enter(CTag, ensure_credit_invariant(C), Credits). + +update_credit(CTag, C, Credits) -> + gb_trees:update(CTag, ensure_credit_invariant(C), Credits). + +ensure_credit_invariant(C = #credit{credit = 0, mode = drain}) -> + %% Using up all credit implies no need to send a 'drained' event + C#credit{mode = manual}; +ensure_credit_invariant(C) -> + C. %%---------------------------------------------------------------------------- %% gen_server callbacks diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index c9540da818..017a4f9c1a 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -18,7 +18,7 @@ -export([new/0, max_active_priority/1, inactive/1, all/1, count/0, unacknowledged_message_count/0, add/8, remove/3, erase_ch/2, - send_drained/0, deliver/3, record_ack/3, subtract_acks/2, + send_drained/0, deliver/3, record_ack/3, subtract_acks/4, possibly_unblock/3, resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit/6, utilisation/1]). @@ -82,7 +82,8 @@ {'delivered', boolean(), T, state()} | {'undelivered', boolean(), state()}. -spec record_ack(ch(), pid(), ack()) -> 'ok'. --spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'. +-spec subtract_acks([ack()], rabbit_types:ctag(), ch(), state()) -> + 'not_found' | 'unchanged' | {'unblocked', state()}. -spec possibly_unblock(cr_fun(), ch(), state()) -> 'unchanged' | {'unblocked', state()}. -spec resume_fun() -> cr_fun(). @@ -130,11 +131,14 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty, false -> Limiter end, C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1}, - update_ch_record(case parse_credit_args(Args) of - none -> C1; - {Crd, Drain} -> credit_and_drain( - C1, CTag, Crd, Drain, IsEmpty) - end), + update_ch_record( + case parse_credit_args(Args) of + none -> C1; + {0, auto} -> C1; + {_Credit, auto} when NoAck -> C1; + {Credit, Mode} -> credit_and_drain( + C1, CTag, Credit, Mode, IsEmpty) + end), Consumer = #consumer{tag = CTag, ack_required = not NoAck, args = Args}, @@ -238,14 +242,20 @@ record_ack(ChPid, LimiterPid, AckTag) -> update_ch_record(C#cr{acktags = queue:in(AckTag, ChAckTags)}), ok. -subtract_acks(ChPid, AckTags) -> +subtract_acks(AckTags, CTag, ChPid, State) -> case lookup_ch(ChPid) of not_found -> not_found; - C = #cr{acktags = ChAckTags} -> - update_ch_record( - C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}), - ok + C = #cr{acktags = ChAckTags, limiter = Lim} -> + {Unblocked, Lim2} = + rabbit_limiter:ack_from_queue(Lim, CTag, length(AckTags)), + AckTags2 = subtract_acks(AckTags, [], ChAckTags), + C2 = C#cr{acktags = AckTags2, limiter = Lim2}, + case Unblocked of + true -> unblock(C2, State); + false -> update_ch_record(C2), + unchanged + end end. subtract_acks([], [], AckQ) -> @@ -308,7 +318,7 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) -> unchanged; #cr{limiter = Limiter} = C -> C1 = #cr{limiter = Limiter1} = - credit_and_drain(C, CTag, Credit, Drain, IsEmpty), + credit_and_drain(C, CTag, Credit, drain_mode(Drain), IsEmpty), case is_ch_blocked(C1) orelse (not rabbit_limiter:is_consumer_blocked(Limiter, CTag)) orelse rabbit_limiter:is_consumer_blocked(Limiter1, CTag) of @@ -318,6 +328,9 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) -> end end. +drain_mode(true) -> drain; +drain_mode(false) -> manual. + utilisation(#state{use = {active, Since, Avg}}) -> use_avg(now_micros() - Since, 0, Avg); utilisation(#state{use = {inactive, Since, Active, Avg}}) -> @@ -329,10 +342,13 @@ parse_credit_args(Args) -> case rabbit_misc:table_lookup(Args, <<"x-credit">>) of {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>), rabbit_misc:table_lookup(T, <<"drain">>)} of - {{long, Credit}, {bool, Drain}} -> {Credit, Drain}; - _ -> none + {{long, C}, {bool, D}} -> {C, drain_mode(D)}; + _ -> none end; - undefined -> none + undefined -> case rabbit_misc:table_lookup(Args, <<"x-prefetch">>) of + {_, Prefetch} -> {Prefetch, auto}; + _ -> none + end end. lookup_ch(ChPid) -> @@ -393,8 +409,8 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> end. credit_and_drain(C = #cr{ch_pid = ChPid, limiter = Limiter}, - CTag, Credit, Drain, IsEmpty) -> - case rabbit_limiter:credit(Limiter, CTag, Credit, Drain, IsEmpty) of + CTag, Credit, Mode, IsEmpty) -> + case rabbit_limiter:credit(Limiter, CTag, Credit, Mode, IsEmpty) of {true, Limiter1} -> rabbit_channel:send_drained(ChPid, [{CTag, Credit}]), C#cr{limiter = Limiter1}; diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 122eb30545..f7c4c0a22a 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -176,6 +176,7 @@ server_capabilities(rabbit_framing_amqp_0_9_1) -> {<<"consumer_cancel_notify">>, bool, true}, {<<"connection.blocked">>, bool, true}, {<<"consumer_priorities">>, bool, true}, + {<<"consumer_prefetch">>, bool, true}, {<<"authentication_failure_close">>, bool, true}]; server_capabilities(_) -> []. |
