summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-09 10:35:57 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-09 10:35:57 +0000
commit7aa7642d3231e14341f759f64e578075085763b4 (patch)
treee1a37eadf8f481c7b500a0c8887e6972d5b86185
parent3c6b988542c3e1abebf4128eb528a43e3955bc7c (diff)
parent2087c70d119e8a90faabf1d61438215679d77a6d (diff)
downloadrabbitmq-server-git-7aa7642d3231e14341f759f64e578075085763b4.tar.gz
Merge in default.
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl18
-rw-r--r--src/rabbit_channel.erl20
-rw-r--r--src/rabbit_limiter.erl25
-rw-r--r--src/rabbit_queue_consumers.erl32
5 files changed, 63 insertions, 38 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 6b1e00b7c9..cb2eb635b8 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -22,7 +22,7 @@
-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
- stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
+ stat/1, deliver/2, deliver_flow/2, requeue/3, ack/4, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/0, notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
@@ -143,7 +143,7 @@
-spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
{routing_result(), qpids()}).
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
--spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok').
+%%-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
-spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()).
-spec(activate_limit_all/2 :: (qpids(), pid()) -> ok_or_errors()).
@@ -548,7 +548,7 @@ deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow).
requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}).
-ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}).
+ack(QPid, CTag, MsgIds, ChPid) -> delegate:cast(QPid, {ack, CTag, MsgIds, ChPid}).
reject(QPid, MsgIds, Requeue, ChPid) ->
delegate:cast(QPid, {reject, MsgIds, Requeue, ChPid}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6391ebe6ab..2bd0523da1 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -561,8 +561,8 @@ fetch(AckRequired, State = #q{backing_queue = BQ,
State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}),
{Result, maybe_send_drained(Result =:= empty, State1)}.
-ack(AckTags, ChPid, State) ->
- subtract_acks(ChPid, AckTags, State,
+ack(AckTags, CTag, ChPid, State) ->
+ subtract_acks(ChPid, CTag, AckTags, State,
fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{_Guids, BQS1} = BQ:ack(AckTags, BQS),
@@ -570,7 +570,7 @@ ack(AckTags, ChPid, State) ->
end).
requeue(AckTags, ChPid, State) ->
- subtract_acks(ChPid, AckTags, State,
+ subtract_acks(ChPid, fixme, AckTags, State,
fun (State1) -> requeue_and_run(AckTags, State1) end).
possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) ->
@@ -634,8 +634,8 @@ backing_queue_timeout(State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
State#q{backing_queue_state = BQ:timeout(BQS)}.
-subtract_acks(ChPid, AckTags, State, Fun) ->
- case rabbit_queue_consumers:subtract_acks(ChPid, AckTags) of
+subtract_acks(ChPid, CTag, AckTags, State, Fun) ->
+ case rabbit_queue_consumers:subtract_acks(ChPid, CTag, AckTags) of
not_found -> State;
ok -> Fun(State)
end.
@@ -1150,8 +1150,8 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
State1 = State#q{senders = Senders1},
noreply(deliver_or_enqueue(Delivery, Delivered, State1));
-handle_cast({ack, AckTags, ChPid}, State) ->
- noreply(ack(AckTags, ChPid, State));
+handle_cast({ack, CTag, AckTags, ChPid}, State) ->
+ noreply(ack(AckTags, CTag, ChPid, State));
handle_cast({reject, AckTags, true, ChPid}, State) ->
noreply(requeue(AckTags, ChPid, State));
@@ -1159,12 +1159,12 @@ handle_cast({reject, AckTags, true, ChPid}, State) ->
handle_cast({reject, AckTags, false, ChPid}, State) ->
noreply(with_dlx(
State#q.dlx,
- fun (X) -> subtract_acks(ChPid, AckTags, State,
+ fun (X) -> subtract_acks(ChPid, fixme, AckTags, State,
fun (State1) ->
dead_letter_rejected_msgs(
AckTags, X, State1)
end) end,
- fun () -> ack(AckTags, ChPid, State) end));
+ fun () -> ack(AckTags, fixme, ChPid, State) end));
handle_cast(delete_immediately, State) ->
stop(State);
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index eded8a9001..7de9974396 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1264,9 +1264,11 @@ handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) ->
parse_credit_args(Arguments) ->
case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of
{table, T} -> {case {rabbit_misc:table_lookup(T, <<"credit">>),
- rabbit_misc:table_lookup(T, <<"drain">>)} of
- {{long, Credit}, {bool, Drain}} -> {Credit, Drain};
- _ -> none
+ rabbit_misc:table_lookup(T, <<"drain">>),
+ rabbit_misc:table_lookup(T, <<"prefetch">>)} of
+ {{long, C}, {bool, D}, _} -> {credit, C, D};
+ {_, _, {long, P}} -> {prefetch, P};
+ _ -> none
end, lists:keydelete(<<"x-credit">>, 1, Arguments)};
undefined -> {none, Arguments}
end.
@@ -1395,8 +1397,8 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
%% NB: Acked is in youngest-first order
ack(Acked, State = #ch{queue_names = QNames}) ->
foreach_per_queue(
- fun (QPid, MsgIds) ->
- ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
+ fun ({QPid, CTag}, MsgIds) ->
+ ok = rabbit_amqqueue:ack(QPid, CTag, MsgIds, self()),
?INCR_STATS(case dict:find(QPid, QNames) of
{ok, QName} -> Count = length(MsgIds),
[{queue_stats, QName, Count}];
@@ -1429,13 +1431,13 @@ notify_queues(State = #ch{consumer_mapping = Consumers,
foreach_per_queue(_F, []) ->
ok;
-foreach_per_queue(F, [{_DTag, _CTag, {QPid, MsgId}}]) -> %% common case
- F(QPid, [MsgId]);
+foreach_per_queue(F, [{_DTag, CTag, {QPid, MsgId}}]) -> %% common case
+ F({QPid, CTag}, [MsgId]);
%% NB: UAL should be in youngest-first order; the tree values will
%% then be in oldest-first order
foreach_per_queue(F, UAL) ->
- T = lists:foldl(fun ({_DTag, _CTag, {QPid, MsgId}}, T) ->
- rabbit_misc:gb_trees_cons(QPid, MsgId, T)
+ T = lists:foldl(fun ({_DTag, CTag, {QPid, MsgId}}, T) ->
+ rabbit_misc:gb_trees_cons({QPid, CTag}, MsgId, T)
end, gb_trees:empty(), UAL),
rabbit_misc:gb_trees_foreach(F, T).
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 2857ca55b9..afe63b91a6 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -126,8 +126,9 @@
get_prefetch_limit/1, ack/2, pid/1]).
%% queue API
-export([client/1, activate/1, can_send/3, resume/1, deactivate/1,
- is_suspended/1, is_consumer_blocked/2, credit/5, drained/1,
- forget_consumer/2]).
+ is_suspended/1, is_consumer_blocked/2, credit/5,
+ set_consumer_prefetch/3, ack_from_queue/3,
+ drained/1, forget_consumer/2]).
%% callbacks
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, prioritise_call/4]).
@@ -170,6 +171,8 @@
-spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()).
-spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean(),
boolean()) -> qstate()).
+-spec(ack_from_queue/3 :: (qstate(), rabbit_types:ctag(), non_neg_integer())
+ -> qstate()).
-spec(drained/1 :: (qstate())
-> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}).
-spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()).
@@ -187,7 +190,7 @@
%% notified of a change in the limit or volume that may allow it to
%% deliver more messages via the limiter's channel.
--record(credit, {credit = 0, drain = false}).
+-record(credit, {credit = 0, drain = false, mode}).
%%----------------------------------------------------------------------------
%% API
@@ -281,6 +284,22 @@ credit(Limiter = #qstate{credits = Credits}, CTag, _Credit, true, true) ->
credit(Limiter = #qstate{credits = Credits}, CTag, Credit, false, Drain) ->
Limiter#qstate{credits = update_credit(CTag, Credit, Drain, Credits)}.
+set_consumer_prefetch(Limiter = #qstate{credits = Credits}, CTag, Credit) ->
+ Credits1 = gb_trees:enter(
+ CTag, #credit{credit = Credit, mode = auto}, Credits),
+ Limiter#qstate{credits = Credits1}.
+
+ack_from_queue(Limiter = #qstate{credits = Credits}, CTag, Credit) ->
+ Limiter#qstate{
+ credits = case gb_trees:lookup(CTag, Credits) of
+ {value, #credit{mode = auto,
+ credit = Current,
+ drain = Drain}} ->
+ update_credit(CTag, Current + Credit, Drain, Credits);
+ none ->
+ Credits
+ end}.
+
drained(Limiter = #qstate{credits = Credits}) ->
{CTagCredits, Credits2} =
rabbit_misc:gb_trees_fold(
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index f06423f7ce..11c26f92cc 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -18,7 +18,7 @@
-export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
unacknowledged_message_count/0, add/9, remove/3, erase_ch/2,
- send_drained/0, deliver/3, record_ack/3, subtract_acks/2,
+ send_drained/0, deliver/3, record_ack/3, subtract_acks/3,
possibly_unblock/3,
resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4,
utilisation/1]).
@@ -81,7 +81,7 @@
{'delivered', boolean(), T, state()} |
{'undelivered', boolean(), state()}.
-spec record_ack(ch(), pid(), ack()) -> 'ok'.
--spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'.
+%% -spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'.
-spec possibly_unblock(cr_fun(), ch(), state()) ->
'unchanged' | {'unblocked', state()}.
-spec resume_fun() -> cr_fun().
@@ -129,9 +129,11 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs,
false -> Limiter
end,
Limiter2 = case CreditArgs of
- none -> Limiter1;
- {Crd, Drain} -> rabbit_limiter:credit(
- Limiter1, ConsumerTag, Crd, IsEmpty, Drain)
+ none -> Limiter1;
+ {credit, C, D} -> rabbit_limiter:credit(
+ Limiter1, ConsumerTag, C, IsEmpty, D);
+ {prefetch , P} -> rabbit_limiter:set_consumer_prefetch(
+ Limiter1, ConsumerTag, P)
end,
C1 = C#cr{consumer_count = Count + 1,
limiter = Limiter2},
@@ -242,24 +244,26 @@ record_ack(ChPid, LimiterPid, AckTag) ->
update_ch_record(C#cr{acktags = queue:in(AckTag, ChAckTags)}),
ok.
-subtract_acks(ChPid, AckTags) ->
+subtract_acks(ChPid, CTag, AckTags) ->
case lookup_ch(ChPid) of
not_found ->
not_found;
- C = #cr{acktags = ChAckTags} ->
- update_ch_record(
- C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}),
+ C = #cr{acktags = ChAckTags, limiter = Lim} ->
+ Lim2 = rabbit_limiter:ack_from_queue(Lim, CTag, length(AckTags)),
+ AckTags2 = subtract_acks0(AckTags, [], ChAckTags),
+ update_ch_record(C#cr{acktags = AckTags2,
+ limiter = Lim2}),
ok
end.
-subtract_acks([], [], AckQ) ->
+subtract_acks0([], [], AckQ) ->
AckQ;
-subtract_acks([], Prefix, AckQ) ->
+subtract_acks0([], Prefix, AckQ) ->
queue:join(queue:from_list(lists:reverse(Prefix)), AckQ);
-subtract_acks([T | TL] = AckTags, Prefix, AckQ) ->
+subtract_acks0([T | TL] = AckTags, Prefix, AckQ) ->
case queue:out(AckQ) of
- {{value, T}, QTail} -> subtract_acks(TL, Prefix, QTail);
- {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail)
+ {{value, T}, QTail} -> subtract_acks0(TL, Prefix, QTail);
+ {{value, AT}, QTail} -> subtract_acks0(AckTags, [AT | Prefix], QTail)
end.
possibly_unblock(Update, ChPid, State) ->