summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-03-02 18:45:17 +0000
committerMatthew Sackman <matthew@lshift.net>2010-03-02 18:45:17 +0000
commit54e3ef0d54a2f93e79213c6fb838aa4e08083c0b (patch)
tree99753fc5a2ca2489e4e3e9ff299ad1baaef50071 /src
parent649bf2f2f04f1479778c1407a5832843953741a4 (diff)
downloadrabbitmq-server-git-54e3ef0d54a2f93e79213c6fb838aa4e08083c0b.tar.gz
Incomplete implementation
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl48
-rw-r--r--src/rabbit_limiter.erl35
2 files changed, 65 insertions, 18 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 9b6c87d8cb..38a6a8447f 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -542,23 +542,15 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
"prefetch_size!=0 (~w)", [Size]);
handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
- _, State = #ch{ limiter_pid = LimiterPid,
- unacked_message_q = UAMQ }) ->
+ _, State = #ch{limiter_pid = LimiterPid}) ->
LimiterPid1 = case {LimiterPid, PrefetchCount} of
- {undefined, 0} ->
- undefined;
- {undefined, _} ->
- LPid = rabbit_limiter:start_link(self(),
- queue:len(UAMQ)),
- ok = limit_queues(LPid, State),
- LPid;
- {_, _} ->
- LimiterPid
+ {undefined, 0} -> undefined;
+ {undefined, _} -> start_limiter(State);
+ {_, _} -> LimiterPid
end,
LimiterPid2 = case rabbit_limiter:limit(LimiterPid1, PrefetchCount) of
ok -> LimiterPid1;
- stopped -> ok = limit_queues(undefined, State),
- undefined
+ stopped -> unlimit_queues(State)
end,
{reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}};
@@ -793,9 +785,24 @@ handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) ->
handle_method(#'tx.rollback'{}, _, State) ->
{reply, #'tx.rollback_ok'{}, internal_rollback(State)};
-handle_method(#'channel.flow'{active = _}, _, State) ->
- %% FIXME: implement
- {reply, #'channel.flow_ok'{active = true}, State};
+handle_method(#'channel.flow'{active = true}, _,
+ State = #ch{limiter_pid = LimiterPid}) ->
+ LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of
+ ok -> LimiterPid;
+ stopped -> unlimit_queues(State)
+ end,
+ {reply, #'channel.flow_ok'{active = true},
+ State#ch{limiter_pid = LimiterPid1}};
+
+handle_method(#'channel.flow'{active = false}, _,
+ State = #ch{limiter_pid = LimiterPid}) ->
+ LimiterPid1 = case LimiterPid =:= undefined of
+ true -> start_limiter(State);
+ false -> LimiterPid
+ end,
+ ok = rabbit_limiter:block(LimiterPid1),
+ {reply, #'channel.flow_ok'{active = false},
+ State#ch{limiter_pid = LimiterPid1}};
handle_method(#'channel.flow_ok'{active = _}, _, State) ->
%% TODO: We may want to correlate this to channel.flow messages we
@@ -942,9 +949,18 @@ fold_per_queue(F, Acc0, UAQ) ->
dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end,
Acc0, D).
+start_limiter(State = #ch{unacked_message_q = UAMQ}) ->
+ LPid = rabbit_limiter:start_link(self(), queue:len(UAMQ)),
+ ok = limit_queues(LPid, State),
+ LPid.
+
notify_queues(#ch{consumer_mapping = Consumers}) ->
rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()).
+unlimit_queues(State) ->
+ ok = limit_queues(undefined, State),
+ undefined.
+
limit_queues(LPid, #ch{consumer_mapping = Consumers}) ->
rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid).
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index d998499d76..43f31511c9 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -37,7 +37,7 @@
handle_info/2]).
-export([start_link/2, shutdown/1]).
-export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
--export([get_limit/1]).
+-export([get_limit/1, block/1, unblock/1]).
%%----------------------------------------------------------------------------
@@ -53,6 +53,8 @@
-spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()).
+-spec(block/1 :: (maybe_pid()) -> 'ok').
+-spec(unblock/1 :: (maybe_pid()) -> 'ok' | 'stopped').
-endif.
@@ -60,6 +62,7 @@
-record(lim, {prefetch_count = 0,
ch_pid,
+ blocked = false,
queues = dict:new(), % QPid -> {MonitorRef, Notify}
volume = 0}).
%% 'Notify' is a boolean that indicates whether a queue should be
@@ -112,6 +115,16 @@ get_limit(Pid) ->
fun () -> 0 end,
fun () -> gen_server2:pcall(Pid, 9, get_limit, infinity) end).
+block(undefined) ->
+ ok;
+block(LimiterPid) ->
+ gen_server2:call(LimiterPid, block, infinity).
+
+unblock(undefined) ->
+ ok;
+unblock(LimiterPid) ->
+ gen_server2:call(LimiterPid, unblock, infinity).
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
@@ -119,6 +132,9 @@ get_limit(Pid) ->
init([ChPid, UnackedMsgCount]) ->
{ok, #lim{ch_pid = ChPid, volume = UnackedMsgCount}}.
+handle_call({can_send, _QPid, _AckRequired}, _From,
+ State = #lim{blocked = true}) ->
+ {reply, false, State};
handle_call({can_send, QPid, AckRequired}, _From,
State = #lim{volume = Volume}) ->
case limit_reached(State) of
@@ -131,11 +147,23 @@ handle_call({can_send, QPid, AckRequired}, _From,
handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) ->
{reply, PrefetchCount, State};
+handle_call({limit, PrefetchCount}, _From, State = #lim{blocked = true}) ->
+ {reply, ok, State#lim{prefetch_count = PrefetchCount}};
handle_call({limit, PrefetchCount}, _From, State) ->
State1 = maybe_notify(State, State#lim{prefetch_count = PrefetchCount}),
case PrefetchCount == 0 of
true -> {stop, normal, stopped, State1};
false -> {reply, ok, State1}
+ end;
+
+handle_call(block, _From, State) ->
+ {reply, ok, State#lim{blocked = true}};
+
+handle_call(unblock, _From, State = #lim{prefetch_count = PrefetchCount}) ->
+ State1 = maybe_notify(State, State#lim{blocked = false}),
+ case PrefetchCount == 0 of
+ true -> {stop, normal, stopped, State1};
+ false -> {reply, ok, State1}
end.
handle_cast(shutdown, State) ->
@@ -167,7 +195,8 @@ code_change(_, State, _) ->
%%----------------------------------------------------------------------------
maybe_notify(OldState, NewState) ->
- case limit_reached(OldState) andalso not(limit_reached(NewState)) of
+ case (limit_reached(OldState) andalso not limit_reached(NewState)) orelse
+ (is_blocked(OldState) andalso not is_blocked(NewState)) of
true -> notify_queues(NewState);
false -> NewState
end.
@@ -175,6 +204,8 @@ maybe_notify(OldState, NewState) ->
limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
Limit =/= 0 andalso Volume >= Limit.
+is_blocked(#lim{blocked = Blocked}) -> Blocked.
+
remember_queue(QPid, State = #lim{queues = Queues}) ->
case dict:is_key(QPid, Queues) of
false -> MRef = erlang:monitor(process, QPid),