summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-03-20 15:05:58 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-03-20 15:05:58 +0000
commitbdb5239b384f266424c6f3e1dbad4f28f49fc801 (patch)
treeb68a76385eb71630381a4e318268447af9e141ae /src
parentf3bfe1e4ab43c8317317be4b2159b81704505247 (diff)
downloadrabbitmq-server-git-bdb5239b384f266424c6f3e1dbad4f28f49fc801.tar.gz
always hold a valid limiter in queue's channel records
This is less fiddly, but does mean we have to pass the limiter pid in basic_get.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl8
-rw-r--r--src/rabbit_amqqueue_process.erl35
-rw-r--r--src/rabbit_channel.erl5
-rw-r--r--src/rabbit_limiter.erl34
-rw-r--r--src/rabbit_tests.erl7
5 files changed, 48 insertions, 41 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index be8ab38513..3f0a7f9c41 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -26,7 +26,7 @@
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/0, wake_up/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
--export([basic_get/3, basic_consume/8, basic_cancel/4]).
+-export([basic_get/4, basic_consume/8, basic_cancel/4]).
-export([notify_sent/2, notify_sent_queue_down/1, resume/2, flush_all/2]).
-export([notify_down_all/2, activate_limit_all/2]).
-export([on_node_down/1]).
@@ -145,7 +145,7 @@
-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
-spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()).
-spec(activate_limit_all/2 :: (qpids(), pid()) -> ok_or_errors()).
--spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) ->
+-spec(basic_get/4 :: (rabbit_types:amqqueue(), pid(), boolean(), pid()) ->
{'ok', non_neg_integer(), qmsg()} | 'empty').
-spec(basic_consume/8 ::
(rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(),
@@ -540,8 +540,8 @@ notify_down_all(QPids, ChPid) ->
activate_limit_all(QPids, ChPid) ->
delegate:cast(QPids, {activate_limit, ChPid}).
-basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) ->
- delegate:call(QPid, {basic_get, ChPid, NoAck}).
+basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) ->
+ delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}).
basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, LimiterPid, LimiterActive,
ConsumerTag, ExclusiveConsume, OkMsg) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d9264736d0..6fc79dca81 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -361,16 +361,17 @@ lookup_ch(ChPid) ->
C -> C
end.
-ch_record(ChPid) ->
+ch_record(ChPid, LimiterPid) ->
Key = {ch, ChPid},
case get(Key) of
undefined -> MonitorRef = erlang:monitor(process, ChPid),
+ Limiter = rabbit_limiter:client(LimiterPid),
C = #cr{ch_pid = ChPid,
monitor_ref = MonitorRef,
acktags = queue:new(),
consumer_count = 0,
blocked_consumers = queue:new(),
- limiter = undefined,
+ limiter = Limiter,
unsent_message_count = 0},
put(Key, C),
C;
@@ -401,9 +402,7 @@ block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) ->
update_ch_record(C#cr{blocked_consumers = queue:in(QEntry, Blocked)}).
is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) ->
- Count >= ?UNSENT_MESSAGE_LIMIT
- orelse (Limiter =/= undefined andalso
- rabbit_limiter:is_suspended(Limiter)).
+ Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter).
deliver_msgs_to_consumers(_DeliverFun, true, State) ->
{true, State};
@@ -1090,7 +1089,7 @@ handle_call({notify_down, ChPid}, From, State) ->
{stop, State1} -> stop(From, ok, State1)
end;
-handle_call({basic_get, ChPid, NoAck}, _From,
+handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
State = #q{q = #amqqueue{name = QName}}) ->
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
@@ -1100,7 +1099,8 @@ handle_call({basic_get, ChPid, NoAck}, _From,
{{Message, IsDelivered, AckTag}, State2} ->
State3 = #q{backing_queue = BQ, backing_queue_state = BQS} =
case AckRequired of
- true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
+ true -> C = #cr{acktags = ChAckTags} =
+ ch_record(ChPid, LimiterPid),
ChAckTags1 = queue:in(AckTag, ChAckTags),
update_ch_record(C#cr{acktags = ChAckTags1}),
State2;
@@ -1118,11 +1118,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
reply({error, exclusive_consume_unavailable}, State);
ok ->
C = #cr{consumer_count = Count,
- limiter = Limiter0} = ch_record(ChPid),
- Limiter = case Limiter0 of
- undefined -> rabbit_limiter:client(LimiterPid);
- _ -> Limiter0
- end,
+ limiter = Limiter} = ch_record(ChPid, LimiterPid),
Limiter1 = case LimiterActive of
true -> rabbit_limiter:activate(Limiter);
false -> Limiter
@@ -1155,7 +1151,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
emit_consumer_deleted(ChPid, ConsumerTag, qname(State)),
Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
Limiter1 = case Count of
- 1 -> rabbit_limiter:forget(Limiter);
+ 1 -> rabbit_limiter:deactivate(Limiter);
_ -> Limiter
end,
update_ch_record(C#cr{consumer_count = Count - 1,
@@ -1291,8 +1287,7 @@ handle_cast(delete_immediately, State) ->
handle_cast({resume, ChPid}, State) ->
noreply(
possibly_unblock(State, ChPid,
- fun (C = #cr{limiter = undefined}) -> C;
- (C = #cr{limiter = Limiter}) ->
+ fun (C = #cr{limiter = Limiter}) ->
C#cr{limiter = rabbit_limiter:resume(Limiter)}
end));
@@ -1305,12 +1300,10 @@ handle_cast({notify_sent, ChPid, Credit}, State) ->
handle_cast({activate_limit, ChPid}, State) ->
noreply(
- possibly_unblock(
- State, ChPid,
- fun (C = #cr{limiter = Limiter, consumer_count = Count}) ->
- true = Limiter =/= undefined andalso Count =/= 0, %% assertion
- C#cr{limiter = rabbit_limiter:activate(Limiter)}
- end));
+ possibly_unblock(State, ChPid,
+ fun (C = #cr{limiter = Limiter}) ->
+ C#cr{limiter = rabbit_limiter:activate(Limiter)}
+ end));
handle_cast({flush, ChPid}, State) ->
ok = rabbit_channel:flushed(ChPid, self()),
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 17bf5c8363..67cabcfbfb 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -676,12 +676,15 @@ handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
_, State = #ch{writer_pid = WriterPid,
conn_pid = ConnPid,
+ limiter = Limiter,
next_tag = DeliveryTag}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_exclusive_access_or_die(
QueueName, ConnPid,
- fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
+ fun (Q) -> rabbit_amqqueue:basic_get(
+ Q, self(), NoAck, rabbit_limiter:pid(Limiter))
+ end) of
{ok, MessageCount,
Msg = {QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 4059fdb089..b914306bd4 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -21,12 +21,17 @@
%%
%% Each channel has an associated limiter process, created with
%% start_link/1, which it passes to queues on consumer creation with
-%% rabbit_amqqueue:basic_consume/8. This process holds state that is,
-%% in effect, shared between the channel and all queues from which the
-%% channel is consuming. Essentially all these queues are competing
-%% for access to a single, limited resource - the ability to deliver
-%% messages via the channel - and it is the job of the limiter process
-%% to mediate that access.
+%% rabbit_amqqueue:basic_consume/8, and rabbit_amqqueue:basic_get/4.
+%% The latter isn't strictly necessary, since basic.get is not
+%% subject to limiting, but it means that whenever a queue knows about
+%% a channel, it also knows about its limiter, which is less fiddly.
+%%
+%% Th limiter process holds state that is, in effect, shared between
+%% the channel and all queues from which the channel is
+%% consuming. Essentially all these queues are competing for access to
+%% a single, limited resource - the ability to deliver messages via
+%% the channel - and it is the job of the limiter process to mediate
+%% that access.
%%
%% The limiter process is separate from the channel process for two
%% reasons: separation of concerns, and efficiency. Channels can get
@@ -90,8 +95,10 @@
%% described in (5).
%%
%% 9. When a queues has no more consumers associated with a particular
-%% channel, it unregisters with the limiter and forgets about it -
-%% all via forget/1.
+%% channel, it deactivates use of the limiter with deactivate/1,
+%% which alters the local state such that no further interactions
+%% with the limiter process take place until a subsequent
+%% activate/1.
-module(rabbit_limiter).
@@ -102,7 +109,8 @@
-export([new/1, limit/3, unlimit/1, block/1, unblock/1,
is_limited/1, is_blocked/1, is_active/1, get_limit/1, ack/2, pid/1]).
%% queue API
--export([client/1, activate/1, can_send/2, resume/1, forget/1, is_suspended/1]).
+-export([client/1, activate/1, can_send/2, resume/1, deactivate/1,
+ is_suspended/1]).
%% callbacks
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, prioritise_call/3]).
@@ -140,7 +148,7 @@
-spec(can_send/2 :: (qstate(), boolean()) ->
{'continue' | 'suspend', qstate()}).
-spec(resume/1 :: (qstate()) -> qstate()).
--spec(forget/1 :: (qstate()) -> undefined).
+-spec(deactivate/1 :: (qstate()) -> qstate()).
-spec(is_suspended/1 :: (qstate()) -> boolean()).
-endif.
@@ -219,10 +227,10 @@ can_send(L, _AckRequired) -> {continue, L}.
resume(L) -> L#qstate{state = active}.
-forget(#qstate{state = dormant}) -> undefined;
-forget(L) ->
+deactivate(L = #qstate{state = dormant}) -> L;
+deactivate(L) ->
ok = gen_server:cast(L#qstate.pid, {unregister, self()}),
- undefined.
+ L#qstate{state = dormant}.
is_suspended(#qstate{state = suspended}) -> true;
is_suspended(#qstate{}) -> false.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index b67be54404..d1ae38bef8 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2718,12 +2718,13 @@ test_queue_recover() ->
end,
rabbit_amqqueue:stop(),
rabbit_amqqueue:start(rabbit_amqqueue:recover()),
+ {ok, Limiter} = rabbit_limiter:start_link(),
rabbit_amqqueue:with_or_die(
QName,
fun (Q1 = #amqqueue { pid = QPid1 }) ->
CountMinusOne = Count - 1,
{ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
- rabbit_amqqueue:basic_get(Q1, self(), false),
+ rabbit_amqqueue:basic_get(Q1, self(), false, Limiter),
exit(QPid1, shutdown),
VQ1 = variable_queue_init(Q, true),
{{_Msg1, true, _AckTag1}, VQ2} =
@@ -2744,9 +2745,11 @@ test_variable_queue_delete_msg_store_files_callback() ->
rabbit_amqqueue:set_ram_duration_target(QPid, 0),
+ {ok, Limiter} = rabbit_limiter:start_link(),
+
CountMinusOne = Count - 1,
{ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}} =
- rabbit_amqqueue:basic_get(Q, self(), true),
+ rabbit_amqqueue:basic_get(Q, self(), true, Limiter),
{ok, CountMinusOne} = rabbit_amqqueue:purge(Q),
%% give the queue a second to receive the close_fds callback msg