diff options
| author | Ben Hood <0x6e6562@gmail.com> | 2008-11-18 19:40:02 +0000 |
|---|---|---|
| committer | Ben Hood <0x6e6562@gmail.com> | 2008-11-18 19:40:02 +0000 |
| commit | b9e461fc6d762c99a7a87e1f0d95c446aad03405 (patch) | |
| tree | 579cefdd0d5f63742a7c94eb73fc3b5feaaef1d2 | |
| parent | 3081245448bc6935438fde4620b5ce0f7a1d8c34 (diff) | |
| download | rabbitmq-server-git-b9e461fc6d762c99a7a87e1f0d95c446aad03405.tar.gz | |
First stab at basic.qos
| -rw-r--r-- | src/rabbit_amqqueue.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 82 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 12 |
3 files changed, 71 insertions, 35 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 56d2c35d94..938182da97 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -30,7 +30,7 @@ -export([lookup/1, with/2, with_or_die/2, list_vhost_queues/1, stat/1, stat_all/0, deliver/5, redeliver/2, requeue/3, ack/4]). -export([claim_queue/2]). --export([basic_get/3, basic_consume/7, basic_cancel/4]). +-export([basic_get/3, basic_consume/8, basic_cancel/4]). -export([notify_sent/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2]). -export([on_node_down/1]). @@ -82,8 +82,8 @@ -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). -spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> {'ok', non_neg_integer(), msg()} | 'empty'). --spec(basic_consume/7 :: - (amqqueue(), bool(), pid(), pid(), ctag(), bool(), any()) -> +-spec(basic_consume/8 :: + (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) -> 'ok' | {'error', 'queue_owned_by_another_connection' | 'exclusive_consume_unavailable'}). -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). @@ -238,10 +238,10 @@ claim_queue(#amqqueue{pid = QPid}, ReaderPid) -> basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> gen_server:call(QPid, {basic_get, ChPid, NoAck}). -basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, +basic_consume(#amqqueue{pid = QPid}, NoAck, ReaderPid, ChPid, LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg) -> - gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, - ConsumerTag, ExclusiveConsume, OkMsg}). + gen_server:call(QPid, {basic_consume, NoAck, ReaderPid, ChPid, + LimiterPid, ConsumerTag, ExclusiveConsume, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> ok = gen_server:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f8964e3454..43355a5a74 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -43,6 +43,7 @@ % Queue's state -record(q, {q, owner, + limiter_mapping, exclusive_consumer, has_had_consumers, next_msg_id, @@ -75,6 +76,7 @@ init(Q) -> exclusive_consumer = none, has_had_consumers = false, next_msg_id = 1, + limiter_mapping = dict:new(), message_buffer = queue:new(), round_robin = queue:new()}, ?HIBERNATE_AFTER}. @@ -141,34 +143,61 @@ update_store_and_maybe_block_ch( deliver_immediately(Message, Delivered, State = #q{q = #amqqueue{name = QName}, round_robin = RoundRobin, + limiter_mapping = LimiterMapping, next_msg_id = NextId}) -> ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]), case queue:out(RoundRobin) of - {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, - ack_required = AckRequired}}}, + {{value, QEntry = {ChPid, + #consumer{tag = ConsumerTag, + ack_required = AckRequired = true}}}, RoundRobinTail} -> - rabbit_channel:deliver( - ChPid, ConsumerTag, AckRequired, - {QName, self(), NextId, Delivered, Message}), - C = #cr{unsent_message_count = Count, - unacked_messages = UAM} = ch_record(ChPid), - NewUAM = case AckRequired of - true -> dict:store(NextId, Message, UAM); - false -> UAM - end, - NewConsumers = - case update_store_and_maybe_block_ch( - C#cr{unsent_message_count = Count + 1, - unacked_messages = NewUAM}) of - ok -> queue:in(QEntry, RoundRobinTail); - block_ch -> block_consumers(ChPid, RoundRobinTail) - end, - {offered, AckRequired, State#q{round_robin = NewConsumers, - next_msg_id = NextId +1}}; + % Use Qos Limits if an ack is required + % Query the limiter to find out if a limit has been breached + LimiterPid = dict:fetch(ChPid, LimiterMapping), + case rabbit_limiter:can_send(LimiterPid, self()) of + true -> + really_deliver(AckRequired, ChPid, ConsumerTag, + Delivered, Message, NextId, QName, + QEntry, RoundRobinTail, State); + false -> + % Have another go by cycling through the consumer + % queue + NewConsumers = block_consumers(ChPid, RoundRobinTail), + deliver_immediately(Message, Delivered, + State#q{round_robin = NewConsumers}) + end; + {{value, QEntry = {ChPid, + #consumer{tag = ConsumerTag, + ack_required = AckRequired = false}}}, + RoundRobinTail} -> + really_deliver(AckRequired, ChPid, ConsumerTag, + Delivered, Message, NextId, QName, + QEntry, RoundRobinTail, State); {empty, _} -> not_offered end. +% TODO The arity of this function seems a bit large :-( +really_deliver(AckRequired, ChPid, ConsumerTag, Delivered, Message, NextId, + QName, QEntry, RoundRobinTail, State) -> + rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, + {QName, self(), NextId, Delivered, Message}), + C = #cr{unsent_message_count = Count, + unacked_messages = UAM} = ch_record(ChPid), + NewUAM = case AckRequired of + true -> dict:store(NextId, Message, UAM); + false -> UAM + end, + NewConsumers = + case update_store_and_maybe_block_ch( + C#cr{unsent_message_count = Count + 1, + unacked_messages = NewUAM}) of + ok -> queue:in(QEntry, RoundRobinTail); + block_ch -> block_consumers(ChPid, RoundRobinTail) + end, + {offered, AckRequired, State#q{round_robin = NewConsumers, + next_msg_id = NextId +1}}. + attempt_delivery(none, Message, State) -> case deliver_immediately(Message, false, State) of {offered, false, State1} -> @@ -519,11 +548,14 @@ handle_call({basic_get, ChPid, NoAck}, _From, reply(empty, State) end; -handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, - ExclusiveConsume, OkMsg}, - _From, State = #q{owner = Owner, - exclusive_consumer = ExistingHolder, - round_robin = RoundRobin}) -> +handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, + ConsumerTag, ExclusiveConsume, OkMsg}, + _From, _State = #q{owner = Owner, + limiter_mapping = Mapping, + exclusive_consumer = ExistingHolder, + round_robin = RoundRobin}) -> + % TODO Remove the underscore in front of the first State variable + State = _State#q{limiter_mapping = dict:store(ChPid, LimiterPid, Mapping)}, case check_queue_owner(Owner, ReaderPid) of mismatch -> reply({error, queue_owned_by_another_connection}, State); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 1eb421cad4..7331a34b65 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -36,7 +36,7 @@ -record(ch, {state, proxy_pid, reader_pid, writer_pid, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, - username, virtual_host, + username, virtual_host, limiter, most_recently_declared_queue, consumer_mapping}). %%---------------------------------------------------------------------------- @@ -102,6 +102,8 @@ init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> username = Username, virtual_host = VHost, most_recently_declared_queue = <<>>, + % TODO See point 3.1.1 of the design - start the limiter lazily + limiter = rabbit_limiter:start_link(), consumer_mapping = dict:new()}. handle_message({method, Method, Content}, State) -> @@ -323,6 +325,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, nowait = NoWait}, _, State = #ch{ proxy_pid = ProxyPid, reader_pid = ReaderPid, + limiter = LimiterPid, consumer_mapping = ConsumerMapping }) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> @@ -340,7 +343,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, QueueName, fun (Q) -> rabbit_amqqueue:basic_consume( - Q, NoAck, ReaderPid, ProxyPid, + Q, NoAck, ReaderPid, ProxyPid, LimiterPid, ActualConsumerTag, ExclusiveConsume, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})) @@ -405,8 +408,9 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, end end; -handle_method(#'basic.qos'{}, _, State) -> - %% FIXME: Need to implement QOS +handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, + _, State = #ch{limiter = Limiter}) -> + Limiter ! {prefetch_count, PrefetchCount}, {reply, #'basic.qos_ok'{}, State}; handle_method(#'basic.recover'{requeue = true}, |
