diff options
| author | Ben Hood <0x6e6562@gmail.com> | 2008-11-20 14:29:11 +0000 |
|---|---|---|
| committer | Ben Hood <0x6e6562@gmail.com> | 2008-11-20 14:29:11 +0000 |
| commit | f8b2adc26c76082254d9a78a7ded2eaf61dc5d99 (patch) | |
| tree | 116386ecaed8c5a3067776f4d256800ea3757bbb /src | |
| parent | 9b2a5b83144774f5f154000d2c8a0430bd62b947 (diff) | |
| download | rabbitmq-server-git-f8b2adc26c76082254d9a78a7ded2eaf61dc5d99.tar.gz | |
First cut with some actual load balancing working
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 39 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 28 |
4 files changed, 54 insertions, 20 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 938182da97..4e524e3cb9 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -32,6 +32,7 @@ -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). -export([notify_sent/2]). +-export([unblock/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2]). -export([on_node_down/1]). @@ -88,6 +89,7 @@ 'exclusive_consume_unavailable'}). -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). +-spec(unblock/2 :: (pid(), pid()) -> 'ok'). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). @@ -249,6 +251,9 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> notify_sent(QPid, ChPid) -> gen_server:cast(QPid, {notify_sent, ChPid}). +unblock(QPid, ChPid) -> + gen_server:cast(QPid, {unblock, ChPid}). + internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( fun () -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c5a6a3437b..6ef5f97071 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -59,6 +59,7 @@ limiter_pid, monitor_ref, unacked_messages, + is_limit_active, is_overload_protection_active, unsent_message_count}). @@ -125,18 +126,22 @@ all_ch_record() -> [C || {{ch, _}, C} <- get()]. update_store_and_maybe_block_ch( - C = #cr{is_overload_protection_active = Active, + C = #cr{is_overload_protection_active = Overloaded, + is_limit_active = Limited, unsent_message_count = Count}) -> - {Result, NewActive} = + {Result, NewOverloaded, NewLimited} = if - not(Active) and (Count > ?UNSENT_MESSAGE_LIMIT) -> - {block_ch, true}; - Active and (Count == 0) -> - {unblock_ch, false}; + not(Overloaded) and (Count > ?UNSENT_MESSAGE_LIMIT) -> + {block_ch, true, Limited}; + Overloaded and (Count == 0) -> + {unblock_ch, false, Limited}; + Limited and (Count < ?UNSENT_MESSAGE_LIMIT) -> + {unblock_ch, Overloaded, false}; true -> - {ok, Active} + {ok, Overloaded, Limited} end, - store_ch_record(C#cr{is_overload_protection_active = NewActive}), + store_ch_record(C#cr{is_overload_protection_active = NewOverloaded, + is_limit_active = NewLimited}), Result. deliver_immediately(Message, Delivered, @@ -160,6 +165,8 @@ deliver_immediately(Message, Delivered, false -> % Have another go by cycling through the consumer % queue + C = ch_record(ChPid), + store_ch_record(C#cr{is_limit_active = true}), NewConsumers = block_consumers(ChPid, RoundRobinTail), deliver_immediately(Message, Delivered, State#q{round_robin = NewConsumers}) @@ -659,7 +666,7 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> not_found -> noreply(State); C = #cr{unacked_messages = UAM, limiter_pid = LimiterPid} -> - rabbit_limiter:decrement_capacity(LimiterPid, qname(State)), + rabbit_limiter:decrement_capacity(LimiterPid, self()), {Acked, Remaining} = collect_messages(MsgIds, UAM), persist_acks(Txn, qname(State), Acked), case Txn of @@ -692,6 +699,20 @@ handle_cast({requeue, MsgIds, ChPid}, State) -> [{Message, true} || Message <- Messages], State)) end; +handle_cast({unblock, ChPid}, State) -> + % TODO Refactor the code duplication + % between this an the notify_sent cast handler + case lookup_ch(ChPid) of + not_found -> + noreply(State); + C = #cr{is_limit_active = true} -> + noreply(possibly_unblock(C, State)); + C -> + rabbit_log:warning("Ignoring unblock for an active ch: ~p~n", + [C]), + noreply(State) + end; + handle_cast({notify_sent, ChPid}, State) -> case lookup_ch(ChPid) of not_found -> noreply(State); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ac186cfafe..3306d6f60a 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -103,7 +103,7 @@ init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) -> virtual_host = VHost, most_recently_declared_queue = <<>>, % TODO See point 3.1.1 of the design - start the limiter lazily - limiter = rabbit_limiter:start_link(self()), + limiter = rabbit_limiter:start_link(ProxyPid), consumer_mapping = dict:new()}. handle_message({method, Method, Content}, State) -> diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 3dfeb5fe20..1973d3588b 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -51,9 +51,9 @@ handle_call({can_send, QPid}, _From, State) -> % a queue. This allows the limiter to update the the in-use-by-that queue % capacity infromation. handle_cast({decrement_capacity, QPid}, State) -> - NewState = decrement_in_use(QPid, State), - maybe_notify_queues(NewState), - {noreply, NewState}. + State1 = decrement_in_use(QPid, State), + State2 = maybe_notify_queues(State1), + {noreply, State2}. % When the prefetch count has not been set, % e.g. when the channel has not yet been issued a basic.qos @@ -84,10 +84,10 @@ code_change(_, State, _) -> % Internal plumbing %--------------------------------------------------------------------------- +% Reduces the in-use-count of the queue by one decrement_in_use(QPid, State = #lim{in_use = InUse}) -> case dict:find(QPid, InUse) of {ok, Capacity} -> - io:format("capacity ~p~n",[Capacity]), if % Is there a lower bound on capacity? % i.e. what is the zero mark, how much is unlimited? @@ -96,26 +96,35 @@ decrement_in_use(QPid, State = #lim{in_use = InUse}) -> State#lim{in_use = NewInUse}; true -> % TODO How should this be handled? + rabbit_log:warning( + "Ignoring decrement for zero capacity: ~p~n", + [QPid]), State end; error -> % TODO How should this case be handled? + rabbit_log:warning("Ignoring decrement for unknown queue: ~p~n", + [QPid]), State end. +% Works out whether any queues should be notified +% If any notification is required, it propagates a transition +% of the blocked state maybe_notify_queues(State = #lim{ch_pid = ChPid, in_use = InUse}) -> - Capacity = current_capcity(State), + Capacity = current_capacity(State), case should_notify(Capacity, State) of true -> dict:map(fun(Q,_) -> - rabbit_amqqueue:notify_sent(Q, ChPid) + rabbit_amqqueue:unblock(Q, ChPid) end, InUse), State#lim{blocked = false}; false -> - ok + State end. -current_capcity(#lim{in_use = InUse}) -> +% Computes the current aggregrate capacity of all of the in-use queues +current_capacity(#lim{in_use = InUse}) -> % TODO This *seems* expensive to compute this on the fly dict:fold(fun(_, PerQ, Acc) -> PerQ + Acc end, 0, InUse). @@ -135,8 +144,7 @@ maybe_can_send(_, State = #lim{blocked = true}) -> maybe_can_send(QPid, State = #lim{prefetch_count = Limit, in_use = InUse, blocked = false}) -> - Capacity = current_capcity(State), - io:format("Limit was ~p, capacity ~p~n",[Limit, Capacity]), + Capacity = current_capacity(State), if Capacity < Limit -> NewInUse = update_in_use_capacity(QPid, InUse), |
