summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl5
-rw-r--r--src/rabbit_amqqueue_process.erl39
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_limiter.erl28
4 files changed, 54 insertions, 20 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 938182da97..4e524e3cb9 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -32,6 +32,7 @@
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
-export([notify_sent/2]).
+-export([unblock/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2]).
-export([on_node_down/1]).
@@ -88,6 +89,7 @@
'exclusive_consume_unavailable'}).
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
+-spec(unblock/2 :: (pid(), pid()) -> 'ok').
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
@@ -249,6 +251,9 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
notify_sent(QPid, ChPid) ->
gen_server:cast(QPid, {notify_sent, ChPid}).
+unblock(QPid, ChPid) ->
+ gen_server:cast(QPid, {unblock, ChPid}).
+
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c5a6a3437b..6ef5f97071 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -59,6 +59,7 @@
limiter_pid,
monitor_ref,
unacked_messages,
+ is_limit_active,
is_overload_protection_active,
unsent_message_count}).
@@ -125,18 +126,22 @@ all_ch_record() ->
[C || {{ch, _}, C} <- get()].
update_store_and_maybe_block_ch(
- C = #cr{is_overload_protection_active = Active,
+ C = #cr{is_overload_protection_active = Overloaded,
+ is_limit_active = Limited,
unsent_message_count = Count}) ->
- {Result, NewActive} =
+ {Result, NewOverloaded, NewLimited} =
if
- not(Active) and (Count > ?UNSENT_MESSAGE_LIMIT) ->
- {block_ch, true};
- Active and (Count == 0) ->
- {unblock_ch, false};
+ not(Overloaded) and (Count > ?UNSENT_MESSAGE_LIMIT) ->
+ {block_ch, true, Limited};
+ Overloaded and (Count == 0) ->
+ {unblock_ch, false, Limited};
+ Limited and (Count < ?UNSENT_MESSAGE_LIMIT) ->
+ {unblock_ch, Overloaded, false};
true ->
- {ok, Active}
+ {ok, Overloaded, Limited}
end,
- store_ch_record(C#cr{is_overload_protection_active = NewActive}),
+ store_ch_record(C#cr{is_overload_protection_active = NewOverloaded,
+ is_limit_active = NewLimited}),
Result.
deliver_immediately(Message, Delivered,
@@ -160,6 +165,8 @@ deliver_immediately(Message, Delivered,
false ->
% Have another go by cycling through the consumer
% queue
+ C = ch_record(ChPid),
+ store_ch_record(C#cr{is_limit_active = true}),
NewConsumers = block_consumers(ChPid, RoundRobinTail),
deliver_immediately(Message, Delivered,
State#q{round_robin = NewConsumers})
@@ -659,7 +666,7 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
not_found ->
noreply(State);
C = #cr{unacked_messages = UAM, limiter_pid = LimiterPid} ->
- rabbit_limiter:decrement_capacity(LimiterPid, qname(State)),
+ rabbit_limiter:decrement_capacity(LimiterPid, self()),
{Acked, Remaining} = collect_messages(MsgIds, UAM),
persist_acks(Txn, qname(State), Acked),
case Txn of
@@ -692,6 +699,20 @@ handle_cast({requeue, MsgIds, ChPid}, State) ->
[{Message, true} || Message <- Messages], State))
end;
+handle_cast({unblock, ChPid}, State) ->
+ % TODO Refactor the code duplication
+ % between this an the notify_sent cast handler
+ case lookup_ch(ChPid) of
+ not_found ->
+ noreply(State);
+ C = #cr{is_limit_active = true} ->
+ noreply(possibly_unblock(C, State));
+ C ->
+ rabbit_log:warning("Ignoring unblock for an active ch: ~p~n",
+ [C]),
+ noreply(State)
+ end;
+
handle_cast({notify_sent, ChPid}, State) ->
case lookup_ch(ChPid) of
not_found -> noreply(State);
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index ac186cfafe..3306d6f60a 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -103,7 +103,7 @@ init(ProxyPid, [ReaderPid, WriterPid, Username, VHost]) ->
virtual_host = VHost,
most_recently_declared_queue = <<>>,
% TODO See point 3.1.1 of the design - start the limiter lazily
- limiter = rabbit_limiter:start_link(self()),
+ limiter = rabbit_limiter:start_link(ProxyPid),
consumer_mapping = dict:new()}.
handle_message({method, Method, Content}, State) ->
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 3dfeb5fe20..1973d3588b 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -51,9 +51,9 @@ handle_call({can_send, QPid}, _From, State) ->
% a queue. This allows the limiter to update the the in-use-by-that queue
% capacity infromation.
handle_cast({decrement_capacity, QPid}, State) ->
- NewState = decrement_in_use(QPid, State),
- maybe_notify_queues(NewState),
- {noreply, NewState}.
+ State1 = decrement_in_use(QPid, State),
+ State2 = maybe_notify_queues(State1),
+ {noreply, State2}.
% When the prefetch count has not been set,
% e.g. when the channel has not yet been issued a basic.qos
@@ -84,10 +84,10 @@ code_change(_, State, _) ->
% Internal plumbing
%---------------------------------------------------------------------------
+% Reduces the in-use-count of the queue by one
decrement_in_use(QPid, State = #lim{in_use = InUse}) ->
case dict:find(QPid, InUse) of
{ok, Capacity} ->
- io:format("capacity ~p~n",[Capacity]),
if
% Is there a lower bound on capacity?
% i.e. what is the zero mark, how much is unlimited?
@@ -96,26 +96,35 @@ decrement_in_use(QPid, State = #lim{in_use = InUse}) ->
State#lim{in_use = NewInUse};
true ->
% TODO How should this be handled?
+ rabbit_log:warning(
+ "Ignoring decrement for zero capacity: ~p~n",
+ [QPid]),
State
end;
error ->
% TODO How should this case be handled?
+ rabbit_log:warning("Ignoring decrement for unknown queue: ~p~n",
+ [QPid]),
State
end.
+% Works out whether any queues should be notified
+% If any notification is required, it propagates a transition
+% of the blocked state
maybe_notify_queues(State = #lim{ch_pid = ChPid, in_use = InUse}) ->
- Capacity = current_capcity(State),
+ Capacity = current_capacity(State),
case should_notify(Capacity, State) of
true ->
dict:map(fun(Q,_) ->
- rabbit_amqqueue:notify_sent(Q, ChPid)
+ rabbit_amqqueue:unblock(Q, ChPid)
end, InUse),
State#lim{blocked = false};
false ->
- ok
+ State
end.
-current_capcity(#lim{in_use = InUse}) ->
+% Computes the current aggregrate capacity of all of the in-use queues
+current_capacity(#lim{in_use = InUse}) ->
% TODO This *seems* expensive to compute this on the fly
dict:fold(fun(_, PerQ, Acc) -> PerQ + Acc end, 0, InUse).
@@ -135,8 +144,7 @@ maybe_can_send(_, State = #lim{blocked = true}) ->
maybe_can_send(QPid, State = #lim{prefetch_count = Limit,
in_use = InUse,
blocked = false}) ->
- Capacity = current_capcity(State),
- io:format("Limit was ~p, capacity ~p~n",[Limit, Capacity]),
+ Capacity = current_capacity(State),
if
Capacity < Limit ->
NewInUse = update_in_use_capacity(QPid, InUse),