summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl29
-rw-r--r--src/rabbit_amqqueue_process.erl86
-rw-r--r--src/rabbit_channel.erl10
-rw-r--r--src/rabbit_limiter.erl87
-rw-r--r--src/rabbit_tests.erl19
5 files changed, 123 insertions, 108 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index bd5de23923..be8ab38513 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -26,9 +26,9 @@
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/0, wake_up/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
--export([basic_get/3, basic_consume/7, basic_cancel/4]).
--export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]).
--export([notify_down_all/2, limit_all/3]).
+-export([basic_get/3, basic_consume/8, basic_cancel/4]).
+-export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]).
+-export([notify_down_all/2, activate_limit_all/2]).
-export([on_node_down/1]).
-export([update/2, store_queue/1, policy_changed/2]).
-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1,
@@ -144,19 +144,18 @@
-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
-spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()).
--spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:lstate()) ->
- ok_or_errors()).
+-spec(activate_limit_all/2 :: (qpids(), pid()) -> ok_or_errors()).
-spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) ->
{'ok', non_neg_integer(), qmsg()} | 'empty').
--spec(basic_consume/7 ::
- (rabbit_types:amqqueue(), boolean(), pid(),
- rabbit_limiter:token(), rabbit_types:ctag(), boolean(), any())
+-spec(basic_consume/8 ::
+ (rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(),
+ rabbit_types:ctag(), boolean(), any())
-> rabbit_types:ok_or_error('exclusive_consume_unavailable')).
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(notify_sent_queue_down/1 :: (pid()) -> 'ok').
--spec(unblock/2 :: (pid(), pid()) -> 'ok').
+-spec(resume/2 :: (pid(), pid()) -> 'ok').
-spec(flush_all/2 :: (qpids(), pid()) -> 'ok').
-spec(internal_delete/1 ::
(name()) -> rabbit_types:ok_or_error('not_found') |
@@ -538,16 +537,16 @@ notify_down_all(QPids, ChPid) ->
Bads1 -> {error, Bads1}
end.
-limit_all(QPids, ChPid, Limiter) ->
- delegate:cast(QPids, {limit, ChPid, Limiter}).
+activate_limit_all(QPids, ChPid) ->
+ delegate:cast(QPids, {activate_limit, ChPid}).
basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
delegate:call(QPid, {basic_get, ChPid, NoAck}).
-basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter,
+basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, LimiterActive,
ConsumerTag, ExclusiveConsume, OkMsg) ->
- delegate:call(QPid, {basic_consume, NoAck, ChPid,
- Limiter, ConsumerTag, ExclusiveConsume, OkMsg}).
+ delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
+ ConsumerTag, ExclusiveConsume, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
@@ -569,7 +568,7 @@ notify_sent_queue_down(QPid) ->
erase({consumer_credit_to, QPid}),
ok.
-unblock(QPid, ChPid) -> delegate:cast(QPid, {unblock, ChPid}).
+resume(QPid, ChPid) -> delegate:cast(QPid, {resume, ChPid}).
flush_all(QPids, ChPid) -> delegate:cast(QPids, {flush, ChPid}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 0ddc9eba11..37daa0dfca 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -68,7 +68,6 @@
consumer_count,
blocked_consumers,
limiter,
- is_limit_active,
unsent_message_count}).
%%----------------------------------------------------------------------------
@@ -371,7 +370,6 @@ ch_record(ChPid) ->
acktags = queue:new(),
consumer_count = 0,
blocked_consumers = queue:new(),
- is_limit_active = false,
limiter = undefined,
unsent_message_count = 0},
put(Key, C),
@@ -392,30 +390,18 @@ store_ch_record(C = #cr{ch_pid = ChPid}) ->
put({ch, ChPid}, C),
ok.
-erase_ch_record(#cr{ch_pid = ChPid,
- limiter = Limiter,
- monitor_ref = MonitorRef}) ->
- ok = rabbit_limiter:unregister(Limiter),
+erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) ->
erlang:demonitor(MonitorRef),
erase({ch, ChPid}),
ok.
-update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) ->
- ok = rabbit_limiter:register(Limiter),
- update_ch_record(C#cr{consumer_count = 1});
-update_consumer_count(C = #cr{consumer_count = 1, limiter = Limiter}, -1) ->
- ok = rabbit_limiter:unregister(Limiter),
- update_ch_record(C#cr{consumer_count = 0, limiter = undefined});
-update_consumer_count(C = #cr{consumer_count = Count}, Delta) ->
- update_ch_record(C#cr{consumer_count = Count + Delta}).
-
all_ch_record() -> [C || {{ch, _}, C} <- get()].
block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) ->
update_ch_record(C#cr{blocked_consumers = queue:in(QEntry, Blocked)}).
-is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
- Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT.
+is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) ->
+ Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter).
ch_record_state_transition(OldCR, NewCR) ->
case {is_ch_blocked(OldCR), is_ch_blocked(NewCR)} of
@@ -439,18 +425,20 @@ deliver_msgs_to_consumers(DeliverFun, false,
end.
deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
- C = ch_record(ChPid),
+ C = lookup_ch(ChPid),
case is_ch_blocked(C) of
true -> block_consumer(C, E),
{false, State};
false -> case rabbit_limiter:can_send(C#cr.limiter,
Consumer#consumer.ack_required) of
- false -> block_consumer(C#cr{is_limit_active = true}, E),
- {false, State};
- true -> AC1 = queue:in(E, State#q.active_consumers),
- deliver_msg_to_consumer(
- DeliverFun, Consumer, C,
- State#q{active_consumers = AC1})
+ {suspend, Limiter} ->
+ block_consumer(C#cr{limiter = Limiter}, E),
+ {false, State};
+ {continue, Limiter} ->
+ AC1 = queue:in(E, State#q.active_consumers),
+ deliver_msg_to_consumer(
+ DeliverFun, Consumer, C#cr{limiter = Limiter},
+ State#q{active_consumers = AC1})
end
end.
@@ -1127,15 +1115,25 @@ handle_call({basic_get, ChPid, NoAck}, _From,
reply({ok, BQ:len(BQS), Msg}, State3)
end;
-handle_call({basic_consume, NoAck, ChPid, Limiter,
+handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
ConsumerTag, ExclusiveConsume, OkMsg},
_From, State = #q{exclusive_consumer = Holder}) ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
in_use ->
reply({error, exclusive_consume_unavailable}, State);
ok ->
- C = ch_record(ChPid),
- update_consumer_count(C#cr{limiter = Limiter}, +1),
+ C = #cr{consumer_count = Count,
+ limiter = Limiter0} = ch_record(ChPid),
+ Limiter = case Limiter0 of
+ undefined -> rabbit_limiter:client(LimiterPid);
+ _ -> Limiter0
+ end,
+ Limiter1 = case LimiterActive of
+ true -> rabbit_limiter:activate(Limiter);
+ false -> Limiter
+ end,
+ update_ch_record(C#cr{consumer_count = Count + 1,
+ limiter = Limiter1}),
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
@@ -1156,10 +1154,18 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
case lookup_ch(ChPid) of
not_found ->
reply(ok, State);
- C = #cr{blocked_consumers = Blocked} ->
+ C = #cr{consumer_count = Count,
+ limiter = Limiter,
+ blocked_consumers = Blocked} ->
emit_consumer_deleted(ChPid, ConsumerTag, qname(State)),
Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
- update_consumer_count(C#cr{blocked_consumers = Blocked1}, -1),
+ Limiter1 = case Count of
+ 1 -> rabbit_limiter:forget(Limiter);
+ _ -> Limiter
+ end,
+ update_ch_record(C#cr{consumer_count = Count - 1,
+ limiter = Limiter1,
+ blocked_consumers = Blocked1}),
State1 = State#q{
exclusive_consumer = case Holder of
{ChPid, ConsumerTag} -> none;
@@ -1287,10 +1293,13 @@ handle_cast({reject, AckTags, false, ChPid}, State) ->
handle_cast(delete_immediately, State) ->
stop(State);
-handle_cast({unblock, ChPid}, State) ->
+handle_cast({resume, ChPid}, State) ->
noreply(
possibly_unblock(State, ChPid,
- fun (C) -> C#cr{is_limit_active = false} end));
+ fun (C = #cr{limiter = undefined}) -> C;
+ (C = #cr{limiter = Limiter}) ->
+ C#cr{limiter = rabbit_limiter:resume(Limiter)}
+ end));
handle_cast({notify_sent, ChPid, Credit}, State) ->
noreply(
@@ -1299,20 +1308,13 @@ handle_cast({notify_sent, ChPid, Credit}, State) ->
C#cr{unsent_message_count = Count - Credit}
end));
-handle_cast({limit, ChPid, Limiter}, State) ->
+handle_cast({activate_limit, ChPid}, State) ->
noreply(
possibly_unblock(
State, ChPid,
- fun (C = #cr{consumer_count = ConsumerCount,
- limiter = OldLimiter,
- is_limit_active = OldLimited}) ->
- case (ConsumerCount =/= 0 andalso
- not rabbit_limiter:is_active(OldLimiter)) of
- true -> ok = rabbit_limiter:register(Limiter);
- false -> ok
- end,
- Limited = OldLimited andalso rabbit_limiter:is_active(Limiter),
- C#cr{limiter = Limiter, is_limit_active = Limited}
+ fun (C = #cr{limiter = Limiter, consumer_count = Count}) ->
+ true = Limiter =/= undefined andalso Count =/= 0, %% assertion
+ C#cr{limiter = rabbit_limiter:activate(Limiter)}
end));
handle_cast({flush, ChPid}, State) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index cda5747ad4..005200f84c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -81,8 +81,8 @@
-spec(start_link/11 ::
(channel_number(), pid(), pid(), pid(), string(),
rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(),
- rabbit_framing:amqp_table(),
- pid(), pid()) -> rabbit_types:ok_pid_or_error()).
+ rabbit_framing:amqp_table(), pid(), pid()) ->
+ rabbit_types:ok_pid_or_error()).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
@@ -728,7 +728,9 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
QueueName, ConnPid,
fun (Q) ->
{rabbit_amqqueue:basic_consume(
- Q, NoAck, self(), Limiter,
+ Q, NoAck, self(),
+ rabbit_limiter:pid(Limiter),
+ rabbit_limiter:is_active(Limiter),
ActualConsumerTag, ExclusiveConsume,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag})),
@@ -1326,7 +1328,7 @@ maybe_limit_queues(OldLimiter, NewLimiter, State) ->
case ((not rabbit_limiter:is_active(OldLimiter)) andalso
rabbit_limiter:is_active(NewLimiter)) of
true -> Queues = consumer_queues(State#ch.consumer_mapping),
- rabbit_amqqueue:limit_all(Queues, self(), NewLimiter);
+ rabbit_amqqueue:activate_limit_all(Queues, self());
false -> ok
end,
State.
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index ae6563286e..235c69c2a8 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -19,24 +19,27 @@
-behaviour(gen_server2).
-export([start_link/0]).
+%% channel API
-export([new/1, limit/3, unlimit/1, block/1, unblock/1,
- is_limited/1, is_blocked/1, is_active/1, get_limit/1, ack/2]).
--export([can_send/2, register/1, unregister/1]).
-
+ is_limited/1, is_blocked/1, is_active/1, get_limit/1, ack/2, pid/1]).
+%% queue API
+-export([client/1, activate/1, can_send/2, resume/1, forget/1, is_suspended/1]).
+%% callbacks
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, prioritise_call/3]).
%%----------------------------------------------------------------------------
-record(lstate, {pid, limited, blocked}).
+-record(qstate, {pid, state}).
-ifdef(use_specs).
--export_type([lstate/0]).
-
--opaque(lstate() :: #lstate {pid :: pid(),
- limited :: boolean(),
- blocked :: boolean()}).
+-type(lstate() :: #lstate{pid :: pid(),
+ limited :: boolean(),
+ blocked :: boolean()}).
+-type(qstate() :: #qstate{pid :: pid(),
+ state :: 'dormant' | 'active' | 'suspended'}).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(new/1 :: (pid()) -> lstate()).
@@ -51,10 +54,15 @@
-spec(is_active/1 :: (lstate()) -> boolean()).
-spec(get_limit/1 :: (lstate()) -> non_neg_integer()).
-spec(ack/2 :: (lstate(), non_neg_integer()) -> 'ok').
+-spec(pid/1 :: (lstate()) -> pid()).
--spec(can_send/2 :: (lstate(), boolean()) -> boolean()).
--spec(register/1 :: (lstate()) -> 'ok').
--spec(unregister/1 :: (lstate()) -> 'ok').
+-spec(client/1 :: (pid()) -> qstate()).
+-spec(activate/1 :: (qstate()) -> qstate()).
+-spec(can_send/2 :: (qstate(), boolean()) ->
+ {'continue' | 'suspend', qstate()}).
+-spec(resume/1 :: (qstate()) -> qstate()).
+-spec(forget/1 :: (qstate()) -> undefined).
+-spec(is_suspended/1 :: (qstate()) -> boolean()).
-endif.
@@ -108,29 +116,37 @@ get_limit(L) -> gen_server:call(L#lstate.pid, get_limit).
ack(#lstate{limited = false}, _AckCount) -> ok;
ack(L, AckCount) -> gen_server:cast(L#lstate.pid, {ack, AckCount}).
-%% Ask the limiter whether the queue can deliver a message without
-%% breaching a limit.
-can_send(L, AckRequired) ->
- case is_active(L) of
- false -> true;
- true -> rabbit_misc:with_exit_handler(
- fun () -> true end,
- fun () -> Msg = {can_send, self(), AckRequired},
- gen_server2:call(L#lstate.pid, Msg, infinity)
- end)
- end.
+pid(#lstate{pid = Pid}) -> Pid.
-register(L) ->
- case is_active(L) of
- false -> ok;
- true -> gen_server:cast(L#lstate.pid, {register, self()})
- end.
+client(Pid) -> #qstate{pid = Pid, state = dormant}.
-unregister(L) ->
- case is_active(L) of
- false -> ok;
- true -> gen_server:cast(L#lstate.pid, {unregister, self()})
- end.
+activate(L = #qstate{state = dormant}) ->
+ ok = gen_server:cast(L#qstate.pid, {register, self()}),
+ L#qstate{state = active};
+activate(L) -> L.
+
+%% Ask the limiter whether the queue can deliver a message without
+%% breaching a limit.
+can_send(L = #qstate{state = active}, AckRequired) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> {continue, L} end,
+ fun () -> Msg = {can_send, self(), AckRequired},
+ case gen_server2:call(L#qstate.pid, Msg, infinity) of
+ true -> {continue, L};
+ false -> {suspend, L#qstate{state = suspended}}
+ end
+ end);
+can_send(L, _AckRequired) -> {continue, L}.
+
+resume(L) -> L#qstate{state = active}.
+
+forget(#qstate{state = dormant}) -> undefined;
+forget(L) ->
+ ok = gen_server:cast(L#qstate.pid, {unregister, self()}),
+ undefined.
+
+is_suspended(#qstate{state = suspended}) -> true;
+is_suspended(#qstate{}) -> false.
%%----------------------------------------------------------------------------
%% gen_server callbacks
@@ -220,10 +236,9 @@ remember_queue(QPid, State = #lim{queues = Queues}) ->
true -> State
end.
-forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) ->
+forget_queue(QPid, State = #lim{queues = Queues}) ->
case orddict:find(QPid, Queues) of
{ok, {MRef, _}} -> true = erlang:demonitor(MRef),
- ok = rabbit_amqqueue:unblock(QPid, ChPid),
State#lim{queues = orddict:erase(QPid, Queues)};
error -> State
end.
@@ -240,13 +255,13 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
end, {[], Queues}, Queues),
case length(QList) of
0 -> ok;
- 1 -> ok = rabbit_amqqueue:unblock(hd(QList), ChPid); %% common case
+ 1 -> ok = rabbit_amqqueue:resume(hd(QList), ChPid); %% common case
L ->
%% We randomly vary the position of queues in the list,
%% thus ensuring that each queue has an equal chance of
%% being notified first.
{L1, L2} = lists:split(random:uniform(L), QList),
- [[ok = rabbit_amqqueue:unblock(Q, ChPid) || Q <- L3]
+ [[ok = rabbit_amqqueue:resume(Q, ChPid) || Q <- L3]
|| L3 <- [L2, L1]],
ok
end,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 1188c5549a..31a56ac80c 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1100,20 +1100,14 @@ test_policy_validation() ->
test_server_status() ->
%% create a few things so there is some useful information to list
- Writer = spawn(fun test_writer/0),
- {ok, Ch} = rabbit_channel:start_link(
- 1, self(), Writer, self(), "", rabbit_framing_amqp_0_9_1,
- user(<<"user">>), <<"/">>, [], self(),
- rabbit_limiter:make_token(self())),
+ {_Writer, Limiter, Ch} = test_channel(),
[Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
{new, Queue = #amqqueue{}} <-
[rabbit_amqqueue:declare(
rabbit_misc:r(<<"/">>, queue, Name),
false, false, [], none)]],
-
ok = rabbit_amqqueue:basic_consume(
- Q, true, Ch, rabbit_limiter:make_token(),
- <<"ctag">>, true, undefined),
+ Q, true, Ch, Limiter, false, <<"ctag">>, true, undefined),
%% list queues
ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true),
@@ -1191,8 +1185,6 @@ find_listener() ->
N =:= node()],
{H, P}.
-test_writer() -> test_writer(none).
-
test_writer(Pid) ->
receive
{'$gen_call', From, flush} -> gen_server:reply(From, ok),
@@ -1202,13 +1194,18 @@ test_writer(Pid) ->
shutdown -> ok
end.
-test_spawn() ->
+test_channel() ->
Me = self(),
Writer = spawn(fun () -> test_writer(Me) end),
+ {ok, Limiter} = rabbit_limiter:start_link(),
{ok, Ch} = rabbit_channel:start_link(
1, Me, Writer, Me, "", rabbit_framing_amqp_0_9_1,
user(<<"guest">>), <<"/">>, [], Me,
rabbit_limiter:make_token(self())),
+ {Writer, Limiter, Ch}.
+
+test_spawn() ->
+ {Writer, _Limiter, Ch} = test_channel(),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
receive #'channel.open_ok'{} -> ok
after ?TIMEOUT -> throw(failed_to_receive_channel_open_ok)