summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-02-01 22:09:43 +0000
committerMatthias Radestock <matthias@lshift.net>2010-02-01 22:09:43 +0000
commiteafef585eb6c367d99dafcccb605e517e7c60839 (patch)
tree03aaae27a2895e47cba29622ee2b159451608a77 /src
parente307ae7484eb6615358a47a2d2bd5f737c280eb7 (diff)
downloadrabbitmq-server-git-eafef585eb6c367d99dafcccb605e517e7c60839.tar.gz
add prefetch_count channel info item
This forces us to make the limiter a gen_server*2* at last, since we want to issue info-like calls at higher priority, as elsewhere.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl5
-rw-r--r--src/rabbit_limiter.erl18
2 files changed, 19 insertions, 4 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 9899964709..2035d5611c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -58,7 +58,8 @@
vhost,
transactional,
consumer_count,
- messages_unacknowledged]).
+ messages_unacknowledged,
+ prefetch_count]).
%%----------------------------------------------------------------------------
@@ -1010,5 +1011,7 @@ i(consumer_count, #ch{consumer_mapping = ConsumerMapping}) ->
dict:size(ConsumerMapping);
i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) ->
queue:len(UAMQ);
+i(prefetch_count, #ch{limiter_pid = LimiterPid}) ->
+ rabbit_limiter:get_limit(LimiterPid);
i(Item, _) ->
throw({bad_argument, Item}).
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 087a9f64d9..6bd803a27b 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -31,12 +31,13 @@
-module(rabbit_limiter).
--behaviour(gen_server).
+-behaviour(gen_server2).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
-export([start_link/1, shutdown/1]).
-export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
+-export([get_limit/1]).
%%----------------------------------------------------------------------------
@@ -51,6 +52,7 @@
-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
-spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok').
+-spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()).
-endif.
@@ -69,7 +71,7 @@
%%----------------------------------------------------------------------------
start_link(ChPid) ->
- {ok, Pid} = gen_server:start_link(?MODULE, [ChPid], []),
+ {ok, Pid} = gen_server2:start_link(?MODULE, [ChPid], []),
Pid.
shutdown(undefined) ->
@@ -104,6 +106,13 @@ register(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {register, QPid}).
unregister(undefined, _QPid) -> ok;
unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid}).
+get_limit(undefined) ->
+ 0;
+get_limit(Pid) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> 0 end,
+ fun () -> gen_server2:pcall(Pid, 9, get_limit, infinity) end).
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
@@ -118,7 +127,10 @@ handle_call({can_send, QPid, AckRequired}, _From,
false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1;
true -> Volume
end}}
- end.
+ end;
+
+handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) ->
+ {reply, PrefetchCount, State}.
handle_cast(shutdown, State) ->
{stop, normal, State};