diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-09 10:35:57 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-09 10:35:57 +0000 |
| commit | 7aa7642d3231e14341f759f64e578075085763b4 (patch) | |
| tree | e1a37eadf8f481c7b500a0c8887e6972d5b86185 | |
| parent | 3c6b988542c3e1abebf4128eb528a43e3955bc7c (diff) | |
| parent | 2087c70d119e8a90faabf1d61438215679d77a6d (diff) | |
| download | rabbitmq-server-git-7aa7642d3231e14341f759f64e578075085763b4.tar.gz | |
Merge in default.
| -rw-r--r-- | src/rabbit_amqqueue.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 25 | ||||
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 32 |
5 files changed, 63 insertions, 38 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 6b1e00b7c9..cb2eb635b8 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -22,7 +22,7 @@ -export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, - stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). + stat/1, deliver/2, deliver_flow/2, requeue/3, ack/4, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/0, notify_policy_changed/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). @@ -143,7 +143,7 @@ -spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> {routing_result(), qpids()}). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). --spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). +%%-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). -spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()). -spec(activate_limit_all/2 :: (qpids(), pid()) -> ok_or_errors()). @@ -548,7 +548,7 @@ deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow). requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}). -ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}). +ack(QPid, CTag, MsgIds, ChPid) -> delegate:cast(QPid, {ack, CTag, MsgIds, ChPid}). reject(QPid, MsgIds, Requeue, ChPid) -> delegate:cast(QPid, {reject, MsgIds, Requeue, ChPid}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6391ebe6ab..2bd0523da1 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -561,8 +561,8 @@ fetch(AckRequired, State = #q{backing_queue = BQ, State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}), {Result, maybe_send_drained(Result =:= empty, State1)}. -ack(AckTags, ChPid, State) -> - subtract_acks(ChPid, AckTags, State, +ack(AckTags, CTag, ChPid, State) -> + subtract_acks(ChPid, CTag, AckTags, State, fun (State1 = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {_Guids, BQS1} = BQ:ack(AckTags, BQS), @@ -570,7 +570,7 @@ ack(AckTags, ChPid, State) -> end). requeue(AckTags, ChPid, State) -> - subtract_acks(ChPid, AckTags, State, + subtract_acks(ChPid, fixme, AckTags, State, fun (State1) -> requeue_and_run(AckTags, State1) end). possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) -> @@ -634,8 +634,8 @@ backing_queue_timeout(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> State#q{backing_queue_state = BQ:timeout(BQS)}. -subtract_acks(ChPid, AckTags, State, Fun) -> - case rabbit_queue_consumers:subtract_acks(ChPid, AckTags) of +subtract_acks(ChPid, CTag, AckTags, State, Fun) -> + case rabbit_queue_consumers:subtract_acks(ChPid, CTag, AckTags) of not_found -> State; ok -> Fun(State) end. @@ -1150,8 +1150,8 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow}, State1 = State#q{senders = Senders1}, noreply(deliver_or_enqueue(Delivery, Delivered, State1)); -handle_cast({ack, AckTags, ChPid}, State) -> - noreply(ack(AckTags, ChPid, State)); +handle_cast({ack, CTag, AckTags, ChPid}, State) -> + noreply(ack(AckTags, CTag, ChPid, State)); handle_cast({reject, AckTags, true, ChPid}, State) -> noreply(requeue(AckTags, ChPid, State)); @@ -1159,12 +1159,12 @@ handle_cast({reject, AckTags, true, ChPid}, State) -> handle_cast({reject, AckTags, false, ChPid}, State) -> noreply(with_dlx( State#q.dlx, - fun (X) -> subtract_acks(ChPid, AckTags, State, + fun (X) -> subtract_acks(ChPid, fixme, AckTags, State, fun (State1) -> dead_letter_rejected_msgs( AckTags, X, State1) end) end, - fun () -> ack(AckTags, ChPid, State) end)); + fun () -> ack(AckTags, fixme, ChPid, State) end)); handle_cast(delete_immediately, State) -> stop(State); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index eded8a9001..7de9974396 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1264,9 +1264,11 @@ handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> parse_credit_args(Arguments) -> case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of {table, T} -> {case {rabbit_misc:table_lookup(T, <<"credit">>), - rabbit_misc:table_lookup(T, <<"drain">>)} of - {{long, Credit}, {bool, Drain}} -> {Credit, Drain}; - _ -> none + rabbit_misc:table_lookup(T, <<"drain">>), + rabbit_misc:table_lookup(T, <<"prefetch">>)} of + {{long, C}, {bool, D}, _} -> {credit, C, D}; + {_, _, {long, P}} -> {prefetch, P}; + _ -> none end, lists:keydelete(<<"x-credit">>, 1, Arguments)}; undefined -> {none, Arguments} end. @@ -1395,8 +1397,8 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> %% NB: Acked is in youngest-first order ack(Acked, State = #ch{queue_names = QNames}) -> foreach_per_queue( - fun (QPid, MsgIds) -> - ok = rabbit_amqqueue:ack(QPid, MsgIds, self()), + fun ({QPid, CTag}, MsgIds) -> + ok = rabbit_amqqueue:ack(QPid, CTag, MsgIds, self()), ?INCR_STATS(case dict:find(QPid, QNames) of {ok, QName} -> Count = length(MsgIds), [{queue_stats, QName, Count}]; @@ -1429,13 +1431,13 @@ notify_queues(State = #ch{consumer_mapping = Consumers, foreach_per_queue(_F, []) -> ok; -foreach_per_queue(F, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case - F(QPid, [MsgId]); +foreach_per_queue(F, [{_DTag, CTag, {QPid, MsgId}}]) -> %% common case + F({QPid, CTag}, [MsgId]); %% NB: UAL should be in youngest-first order; the tree values will %% then be in oldest-first order foreach_per_queue(F, UAL) -> - T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) -> - rabbit_misc:gb_trees_cons(QPid, MsgId, T) + T = lists:foldl(fun ({_DTag, CTag, {QPid, MsgId}}, T) -> + rabbit_misc:gb_trees_cons({QPid, CTag}, MsgId, T) end, gb_trees:empty(), UAL), rabbit_misc:gb_trees_foreach(F, T). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 2857ca55b9..afe63b91a6 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -126,8 +126,9 @@ get_prefetch_limit/1, ack/2, pid/1]). %% queue API -export([client/1, activate/1, can_send/3, resume/1, deactivate/1, - is_suspended/1, is_consumer_blocked/2, credit/5, drained/1, - forget_consumer/2]). + is_suspended/1, is_consumer_blocked/2, credit/5, + set_consumer_prefetch/3, ack_from_queue/3, + drained/1, forget_consumer/2]). %% callbacks -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, prioritise_call/4]). @@ -170,6 +171,8 @@ -spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()). -spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean(), boolean()) -> qstate()). +-spec(ack_from_queue/3 :: (qstate(), rabbit_types:ctag(), non_neg_integer()) + -> qstate()). -spec(drained/1 :: (qstate()) -> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}). -spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()). @@ -187,7 +190,7 @@ %% notified of a change in the limit or volume that may allow it to %% deliver more messages via the limiter's channel. --record(credit, {credit = 0, drain = false}). +-record(credit, {credit = 0, drain = false, mode}). %%---------------------------------------------------------------------------- %% API @@ -281,6 +284,22 @@ credit(Limiter = #qstate{credits = Credits}, CTag, _Credit, true, true) -> credit(Limiter = #qstate{credits = Credits}, CTag, Credit, false, Drain) -> Limiter#qstate{credits = update_credit(CTag, Credit, Drain, Credits)}. +set_consumer_prefetch(Limiter = #qstate{credits = Credits}, CTag, Credit) -> + Credits1 = gb_trees:enter( + CTag, #credit{credit = Credit, mode = auto}, Credits), + Limiter#qstate{credits = Credits1}. + +ack_from_queue(Limiter = #qstate{credits = Credits}, CTag, Credit) -> + Limiter#qstate{ + credits = case gb_trees:lookup(CTag, Credits) of + {value, #credit{mode = auto, + credit = Current, + drain = Drain}} -> + update_credit(CTag, Current + Credit, Drain, Credits); + none -> + Credits + end}. + drained(Limiter = #qstate{credits = Credits}) -> {CTagCredits, Credits2} = rabbit_misc:gb_trees_fold( diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index f06423f7ce..11c26f92cc 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -18,7 +18,7 @@ -export([new/0, max_active_priority/1, inactive/1, all/1, count/0, unacknowledged_message_count/0, add/9, remove/3, erase_ch/2, - send_drained/0, deliver/3, record_ack/3, subtract_acks/2, + send_drained/0, deliver/3, record_ack/3, subtract_acks/3, possibly_unblock/3, resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4, utilisation/1]). @@ -81,7 +81,7 @@ {'delivered', boolean(), T, state()} | {'undelivered', boolean(), state()}. -spec record_ack(ch(), pid(), ack()) -> 'ok'. --spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'. +%% -spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'. -spec possibly_unblock(cr_fun(), ch(), state()) -> 'unchanged' | {'unblocked', state()}. -spec resume_fun() -> cr_fun(). @@ -129,9 +129,11 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs, false -> Limiter end, Limiter2 = case CreditArgs of - none -> Limiter1; - {Crd, Drain} -> rabbit_limiter:credit( - Limiter1, ConsumerTag, Crd, IsEmpty, Drain) + none -> Limiter1; + {credit, C, D} -> rabbit_limiter:credit( + Limiter1, ConsumerTag, C, IsEmpty, D); + {prefetch , P} -> rabbit_limiter:set_consumer_prefetch( + Limiter1, ConsumerTag, P) end, C1 = C#cr{consumer_count = Count + 1, limiter = Limiter2}, @@ -242,24 +244,26 @@ record_ack(ChPid, LimiterPid, AckTag) -> update_ch_record(C#cr{acktags = queue:in(AckTag, ChAckTags)}), ok. -subtract_acks(ChPid, AckTags) -> +subtract_acks(ChPid, CTag, AckTags) -> case lookup_ch(ChPid) of not_found -> not_found; - C = #cr{acktags = ChAckTags} -> - update_ch_record( - C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}), + C = #cr{acktags = ChAckTags, limiter = Lim} -> + Lim2 = rabbit_limiter:ack_from_queue(Lim, CTag, length(AckTags)), + AckTags2 = subtract_acks0(AckTags, [], ChAckTags), + update_ch_record(C#cr{acktags = AckTags2, + limiter = Lim2}), ok end. -subtract_acks([], [], AckQ) -> +subtract_acks0([], [], AckQ) -> AckQ; -subtract_acks([], Prefix, AckQ) -> +subtract_acks0([], Prefix, AckQ) -> queue:join(queue:from_list(lists:reverse(Prefix)), AckQ); -subtract_acks([T | TL] = AckTags, Prefix, AckQ) -> +subtract_acks0([T | TL] = AckTags, Prefix, AckQ) -> case queue:out(AckQ) of - {{value, T}, QTail} -> subtract_acks(TL, Prefix, QTail); - {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail) + {{value, T}, QTail} -> subtract_acks0(TL, Prefix, QTail); + {{value, AT}, QTail} -> subtract_acks0(AckTags, [AT | Prefix], QTail) end. possibly_unblock(Update, ChPid, State) -> |
