summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Hood <0x6e6562@gmail.com>2008-11-18 19:40:02 +0000
committerBen Hood <0x6e6562@gmail.com>2008-11-18 19:40:02 +0000
commitb9e461fc6d762c99a7a87e1f0d95c446aad03405 (patch)
tree579cefdd0d5f63742a7c94eb73fc3b5feaaef1d2
parent3081245448bc6935438fde4620b5ce0f7a1d8c34 (diff)
downloadrabbitmq-server-git-b9e461fc6d762c99a7a87e1f0d95c446aad03405.tar.gz
First stab at basic.qos
-rw-r--r--src/rabbit_amqqueue.erl12
-rw-r--r--src/rabbit_amqqueue_process.erl82
-rw-r--r--src/rabbit_channel.erl12
3 files changed, 71 insertions, 35 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 56d2c35d94..938182da97 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -30,7 +30,7 @@
-export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1,
stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]).
-export([claim_queue/2]).
--export([basic_get/3, basic_consume/7, basic_cancel/4]).
+-export([basic_get/3, basic_consume/8, basic_cancel/4]).
-export([notify_sent/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2]).
-export([on_node_down/1]).
@@ -82,8 +82,8 @@
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
-spec(basic_get/3 :: (amqqueue(), pid(), bool()) ->
{'ok', non_neg_integer(), msg()} | 'empty').
--spec(basic_consume/7 ::
- (amqqueue(), bool(), pid(), pid(), ctag(), bool(), any()) ->
+-spec(basic_consume/8 ::
+ (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) ->
'ok' | {'error', 'queue_owned_by_another_connection' |
'exclusive_consume_unavailable'}).
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
@@ -238,10 +238,10 @@ claim_queue(#amqqueue{pid = QPid}, ReaderPid) ->
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
gen_server:call(QPid, {basic_get, ChPid, NoAck}).
-basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid,
+basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid,
ConsumerTag, ExclusiveConsume, OkMsg) ->
- gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
- ConsumerTag, ExclusiveConsume, OkMsg}).
+ gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid,
+ LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
ok = gen_server:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f8964e3454..43355a5a74 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -43,6 +43,7 @@
% Queue's state
-record(q, {q,
owner,
+ limiter_mapping,
exclusive_consumer,
has_had_consumers,
next_msg_id,
@@ -75,6 +76,7 @@ init(Q) ->
exclusive_consumer = none,
has_had_consumers = false,
next_msg_id = 1,
+ limiter_mapping = dict:new(),
message_buffer = queue:new(),
round_robin = queue:new()}, ?HIBERNATE_AFTER}.
@@ -141,34 +143,61 @@ update_store_and_maybe_block_ch(
deliver_immediately(Message, Delivered,
State = #q{q = #amqqueue{name = QName},
round_robin = RoundRobin,
+ limiter_mapping = LimiterMapping,
next_msg_id = NextId}) ->
?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]),
case queue:out(RoundRobin) of
- {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
- ack_required = AckRequired}}},
+ {{value, QEntry = {ChPid,
+ #consumer{tag = ConsumerTag,
+ ack_required = AckRequired = true}}},
RoundRobinTail} ->
- rabbit_channel:deliver(
- ChPid, ConsumerTag, AckRequired,
- {QName, self(), NextId, Delivered, Message}),
- C = #cr{unsent_message_count = Count,
- unacked_messages = UAM} = ch_record(ChPid),
- NewUAM = case AckRequired of
- true -> dict:store(NextId, Message, UAM);
- false -> UAM
- end,
- NewConsumers =
- case update_store_and_maybe_block_ch(
- C#cr{unsent_message_count = Count + 1,
- unacked_messages = NewUAM}) of
- ok -> queue:in(QEntry, RoundRobinTail);
- block_ch -> block_consumers(ChPid, RoundRobinTail)
- end,
- {offered, AckRequired, State#q{round_robin = NewConsumers,
- next_msg_id = NextId +1}};
+ % Use Qos Limits if an ack is required
+ % Query the limiter to find out if a limit has been breached
+ LimiterPid = dict:fetch(ChPid, LimiterMapping),
+ case rabbit_limiter:can_send(LimiterPid, self()) of
+ true ->
+ really_deliver(AckRequired, ChPid, ConsumerTag,
+ Delivered, Message, NextId, QName,
+ QEntry, RoundRobinTail, State);
+ false ->
+ % Have another go by cycling through the consumer
+ % queue
+ NewConsumers = block_consumers(ChPid, RoundRobinTail),
+ deliver_immediately(Message, Delivered,
+ State#q{round_robin = NewConsumers})
+ end;
+ {{value, QEntry = {ChPid,
+ #consumer{tag = ConsumerTag,
+ ack_required = AckRequired = false}}},
+ RoundRobinTail} ->
+ really_deliver(AckRequired, ChPid, ConsumerTag,
+ Delivered, Message, NextId, QName,
+ QEntry, RoundRobinTail, State);
{empty, _} ->
not_offered
end.
+% TODO The arity of this function seems a bit large :-(
+really_deliver(AckRequired, ChPid, ConsumerTag, Delivered, Message, NextId,
+ QName, QEntry, RoundRobinTail, State) ->
+ rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
+ {QName, self(), NextId, Delivered, Message}),
+ C = #cr{unsent_message_count = Count,
+ unacked_messages = UAM} = ch_record(ChPid),
+ NewUAM = case AckRequired of
+ true -> dict:store(NextId, Message, UAM);
+ false -> UAM
+ end,
+ NewConsumers =
+ case update_store_and_maybe_block_ch(
+ C#cr{unsent_message_count = Count + 1,
+ unacked_messages = NewUAM}) of
+ ok -> queue:in(QEntry, RoundRobinTail);
+ block_ch -> block_consumers(ChPid, RoundRobinTail)
+ end,
+ {offered, AckRequired, State#q{round_robin = NewConsumers,
+ next_msg_id = NextId +1}}.
+
attempt_delivery(none, Message, State) ->
case deliver_immediately(Message, false, State) of
{offered, false, State1} ->
@@ -519,11 +548,14 @@ handle_call({basic_get, ChPid, NoAck}, _From,
reply(empty, State)
end;
-handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
- ExclusiveConsume, OkMsg},
- _From, State = #q{owner = Owner,
- exclusive_consumer = ExistingHolder,
- round_robin = RoundRobin}) ->
+handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
+ ConsumerTag, ExclusiveConsume, OkMsg},
+ _From, _State = #q{owner = Owner,
+ limiter_mapping = Mapping,
+ exclusive_consumer = ExistingHolder,
+ round_robin = RoundRobin}) ->
+ % TODO Remove the underscore in front of the first State variable
+ State = _State#q{limiter_mapping = dict:store(ChPid, LimiterPid, Mapping)},
case check_queue_owner(Owner, ReaderPid) of
mismatch ->
reply({error, queue_owned_by_another_connection}, State);
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 1eb421cad4..7331a34b65 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -36,7 +36,7 @@
-record(ch, {state, proxy_pid, reader_pid, writer_pid,
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
- username, virtual_host,
+ username, virtual_host, limiter,
most_recently_declared_queue, consumer_mapping}).
%%----------------------------------------------------------------------------
@@ -102,6 +102,8 @@ init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) ->
username = Username,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
+ % TODO See point 3.1.1 of the design - start the limiter lazily
+ limiter = rabbit_limiter:start_link(),
consumer_mapping = dict:new()}.
handle_message({method, Method, Content}, State) ->
@@ -323,6 +325,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
nowait = NoWait},
_, State = #ch{ proxy_pid = ProxyPid,
reader_pid = ReaderPid,
+ limiter = LimiterPid,
consumer_mapping = ConsumerMapping }) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
@@ -340,7 +343,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
QueueName,
fun (Q) ->
rabbit_amqqueue:basic_consume(
- Q, NoAck, ReaderPid, ProxyPid,
+ Q, NoAck, ReaderPid, ProxyPid, LimiterPid,
ActualConsumerTag, ExclusiveConsume,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag}))
@@ -405,8 +408,9 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
end
end;
-handle_method(#'basic.qos'{}, _, State) ->
- %% FIXME: Need to implement QOS
+handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
+ _, State = #ch{limiter = Limiter}) ->
+ Limiter ! {prefetch_count, PrefetchCount},
{reply, #'basic.qos_ok'{}, State};
handle_method(#'basic.recover'{requeue = true},