summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-02-01 16:28:36 +0000
committerSimon MacMullen <simon@rabbitmq.com>2011-02-01 16:28:36 +0000
commit61c19a4a9abecc79916caf25847dcace23f66f49 (patch)
treeb3599721fa37f70cb2d27da05a673b5cfcfcd497 /src
parent594d856c2c403e69ec921e769dfd3f3c01e8da8f (diff)
downloadrabbitmq-server-git-61c19a4a9abecc79916caf25847dcace23f66f49.tar.gz
Beginning of channel.credit support. Tests pass, except for drain=true.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl3
-rw-r--r--src/rabbit_channel.erl54
-rw-r--r--src/rabbit_limiter.erl46
3 files changed, 60 insertions, 43 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3418c663f4..b74b9034b9 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1075,7 +1075,8 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
true ->
ok
end,
- NewLimited = Limited andalso LimiterPid =/= undefined,
+ NewLimited = Limited andalso LimiterPid =/= undefined
+ andalso rabbit_limiter:is_blocked(LimiterPid),
C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
end));
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a82e5eff3e..eb634cca13 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -462,6 +462,7 @@ check_name(Kind, NameBin = <<"amq.", _/binary>>) ->
check_name(_Kind, NameBin) ->
NameBin.
+%% TODO port this
queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
case dict:find(QPid, Blocking) of
error -> State;
@@ -1003,31 +1004,46 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) ->
return_ok(State#ch{confirm_enabled = true},
NoWait, #'confirm.select_ok'{});
-handle_method(#'channel.flow'{active = true}, _,
- State = #ch{limiter_pid = LimiterPid}) ->
- LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of
- ok -> LimiterPid;
- stopped -> unlimit_queues(State)
- end,
- {reply, #'channel.flow_ok'{active = true},
- State#ch{limiter_pid = LimiterPid1}};
+handle_method(#'channel.flow'{active = true}, Content, State) ->
+ {noreply, State1 = #ch{writer_pid = WriterPid}} =
+ handle_method(#'channel.credit'{credit = -1, drain = true},
+ Content, State),
+ ok = rabbit_writer:send_command(WriterPid,
+ #'channel.flow_ok'{active = true}),
+ {noreply, State1};
-handle_method(#'channel.flow'{active = false}, _,
- State = #ch{limiter_pid = LimiterPid,
+handle_method(#'channel.flow'{active = false}, Content, State) ->
+ {noreply, State1 = #ch{writer_pid = WriterPid}} =
+ handle_method(#'channel.credit'{credit = 0, drain = true},
+ Content, State),
+ ok = rabbit_writer:send_command(WriterPid,
+ #'channel.flow_ok'{active = false}),
+ {noreply, State1};
+
+handle_method(#'channel.credit'{credit = Credit, drain = Drain}, _,
+ State = #ch{limiter_pid = LimiterPid,
consumer_mapping = Consumers}) ->
LimiterPid1 = case LimiterPid of
undefined -> start_limiter(State);
Other -> Other
end,
- State1 = State#ch{limiter_pid = LimiterPid1},
- ok = rabbit_limiter:block(LimiterPid1),
- case consumer_queues(Consumers) of
- [] -> {reply, #'channel.flow_ok'{active = false}, State1};
- QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} ||
- QPid <- QPids],
- ok = rabbit_amqqueue:flush_all(QPids, self()),
- {noreply, State1#ch{blocking = dict:from_list(Queues)}}
- end;
+ LimiterPid2 =
+ case rabbit_limiter:set_credit(LimiterPid1, Credit, Drain) of
+ ok -> limit_queues(LimiterPid1, State),
+ LimiterPid1;
+ stopped -> unlimit_queues(State)
+ end,
+ State1 = State#ch{limiter_pid = LimiterPid2},
+ {noreply, State1};
+
+ %% TODO port this bit
+ %% case consumer_queues(Consumers) of
+ %% [] -> {reply, #'channel.flow_ok'{active = false}, State1};
+ %% QPids -> Queues = [{QPid, erlang:monitor(process, QPid)} ||
+ %% QPid <- QPids],
+ %% ok = rabbit_amqqueue:flush_all(QPids, self()),
+ %% {noreply, State1#ch{blocking = dict:from_list(Queues)}}
+ %% end;
handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 86ea7282d9..bf9cf583e4 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -22,7 +22,7 @@
handle_info/2, prioritise_call/3]).
-export([start_link/2]).
-export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
--export([get_limit/1, block/1, unblock/1, is_blocked/1]).
+-export([get_limit/1, set_credit/3, is_blocked/1]).
%%----------------------------------------------------------------------------
@@ -38,8 +38,8 @@
-spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()).
--spec(block/1 :: (maybe_pid()) -> 'ok').
--spec(unblock/1 :: (maybe_pid()) -> 'ok' | 'stopped').
+%% -spec(block/1 :: (maybe_pid()) -> 'ok').
+%% -spec(unblock/1 :: (maybe_pid()) -> 'ok' | 'stopped').
-spec(is_blocked/1 :: (maybe_pid()) -> boolean()).
-endif.
@@ -48,7 +48,8 @@
-record(lim, {prefetch_count = 0,
ch_pid,
- blocked = false,
+ credit = unlimited,
+ drain = false,
queues = dict:new(), % QPid -> {MonitorRef, Notify}
volume = 0}).
%% 'Notify' is a boolean that indicates whether a queue should be
@@ -95,15 +96,12 @@ get_limit(Pid) ->
fun () -> 0 end,
fun () -> gen_server2:call(Pid, get_limit, infinity) end).
-block(undefined) ->
+set_credit(undefined, _, _) ->
ok;
-block(LimiterPid) ->
- gen_server2:call(LimiterPid, block, infinity).
-
-unblock(undefined) ->
- ok;
-unblock(LimiterPid) ->
- gen_server2:call(LimiterPid, unblock, infinity).
+set_credit(LimiterPid, -1, Drain) ->
+ gen_server2:call(LimiterPid, {set_credit, unlimited, Drain}, infinity);
+set_credit(LimiterPid, Credit, Drain) ->
+ gen_server2:call(LimiterPid, {set_credit, Credit, Drain}, infinity).
is_blocked(undefined) ->
false;
@@ -121,14 +119,18 @@ prioritise_call(get_limit, _From, _State) -> 9;
prioritise_call(_Msg, _From, _State) -> 0.
handle_call({can_send, _QPid, _AckRequired}, _From,
- State = #lim{blocked = true}) ->
+ State = #lim{credit = 0}) ->
{reply, false, State};
handle_call({can_send, QPid, AckRequired}, _From,
- State = #lim{volume = Volume}) ->
+ State = #lim{volume = Volume, credit = Credit}) ->
case limit_reached(State) of
true -> {reply, false, limit_queue(QPid, State)};
false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1;
true -> Volume
+ end,
+ credit = case Credit of
+ unlimited -> unlimited;
+ _ -> Credit - 1
end}}
end;
@@ -141,11 +143,8 @@ handle_call({limit, PrefetchCount}, _From, State) ->
{stop, State1} -> {stop, normal, stopped, State1}
end;
-handle_call(block, _From, State) ->
- {reply, ok, State#lim{blocked = true}};
-
-handle_call(unblock, _From, State) ->
- case maybe_notify(State, State#lim{blocked = false}) of
+handle_call({set_credit, Credit, Drain}, _From, State) ->
+ case maybe_notify(State, State#lim{credit = Credit, drain = Drain}) of
{cont, State1} -> {reply, ok, State1};
{stop, State1} -> {stop, normal, stopped, State1}
end;
@@ -183,9 +182,9 @@ maybe_notify(OldState, NewState) ->
case (limit_reached(OldState) orelse blocked(OldState)) andalso
not (limit_reached(NewState) orelse blocked(NewState)) of
true -> NewState1 = notify_queues(NewState),
- {case NewState1#lim.prefetch_count of
- 0 -> stop;
- _ -> cont
+ {case {NewState1#lim.prefetch_count, NewState1#lim.credit} of
+ {0, unlimited} -> stop;
+ _ -> cont
end, NewState1};
false -> {cont, NewState}
end.
@@ -193,7 +192,8 @@ maybe_notify(OldState, NewState) ->
limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
Limit =/= 0 andalso Volume >= Limit.
-blocked(#lim{blocked = Blocked}) -> Blocked.
+blocked(#lim{credit = 0}) -> true;
+blocked(_) -> false.
remember_queue(QPid, State = #lim{queues = Queues}) ->
case dict:is_key(QPid, Queues) of