diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-03-03 18:21:17 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-03-03 18:21:17 +0000 |
| commit | b84db20174d443f9ebdd6848a3e0899a5b678a1d (patch) | |
| tree | 635b2f276b52607e3c8b15bc4a56fed535e02a36 | |
| parent | 75fa8b11634f6ddcb43df825cad68b2225680e3f (diff) | |
| parent | f53aee7bf6cc84caf3521c3945b87666c6574111 (diff) | |
| download | rabbitmq-server-git-b84db20174d443f9ebdd6848a3e0899a5b678a1d.tar.gz | |
Merging bug 16653 into default (now permits client side channel.flow to be used with shovel)
| -rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 8 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 103 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 77 |
5 files changed, 157 insertions, 52 deletions
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 54d0c8f38b..fa9bb2ee23 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -65,12 +65,12 @@ mkdir -p %{buildroot}%{_sysconfdir}/rabbitmq rm %{_maindir}/LICENSE %{_maindir}/LICENSE-MPL-RabbitMQ %{_maindir}/INSTALL #Build the list of files -rm -f %{_builddir}/filelist.%{name}.rpm -echo '%defattr(-,root,root, -)' >> %{_builddir}/filelist.%{name}.rpm +rm -f %{_builddir}/%{name}.files +echo '%defattr(-,root,root, -)' >> %{_builddir}/%{name}.files (cd %{buildroot}; \ find . -type f ! -regex '\.%{_sysconfdir}.*' \ ! -regex '\.\(%{_rabbit_erllibdir}\|%{_rabbit_libdir}\).*' \ - | sed -e 's/^\.//' >> %{_builddir}/filelist.%{name}.rpm) + | sed -e 's/^\.//' >> %{_builddir}/%{name}.files) %pre @@ -103,7 +103,7 @@ if [ $1 = 0 ]; then # Leave rabbitmq user and group fi -%files -f ../filelist.%{name}.rpm +%files -f ../%{name}.files %defattr(-,root,root,-) %attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/lib/rabbitmq %attr(0750, rabbitmq, rabbitmq) %dir %{_localstatedir}/log/rabbitmq diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 3f25d72e4f..31787466bb 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -40,7 +40,7 @@ -export([consumers/1, consumers_all/1]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). --export([notify_sent/2, unblock/2]). +-export([notify_sent/2, unblock/2, flush_all/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). @@ -107,6 +107,7 @@ -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). +-spec(flush_all/2 :: ([pid()], pid()) -> 'ok'). -spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). @@ -284,7 +285,7 @@ requeue(QPid, MsgIds, ChPid) -> gen_server2:cast(QPid, {requeue, MsgIds, ChPid}). ack(QPid, Txn, MsgIds, ChPid) -> - gen_server2:pcast(QPid, 8, {ack, Txn, MsgIds, ChPid}). + gen_server2:pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). commit_all(QPids, Txn) -> safe_pmap_ok( @@ -329,10 +330,16 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> infinity). notify_sent(QPid, ChPid) -> - gen_server2:pcast(QPid, 8, {notify_sent, ChPid}). + gen_server2:pcast(QPid, 7, {notify_sent, ChPid}). unblock(QPid, ChPid) -> - gen_server2:pcast(QPid, 8, {unblock, ChPid}). + gen_server2:pcast(QPid, 7, {unblock, ChPid}). + +flush_all(QPids, ChPid) -> + safe_pmap_ok( + fun (_) -> ok end, + fun (QPid) -> gen_server2:cast(QPid, {flush, ChPid}) end, + QPids). internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e4791f9524..19cb5c711f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -826,7 +826,11 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> end, NewLimited = Limited andalso LimiterPid =/= undefined, C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} - end)). + end)); + +handle_cast({flush, ChPid}, State) -> + ok = rabbit_channel:flushed(ChPid, self()), + noreply(State). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 585c59dc9b..41085fb738 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -36,7 +36,7 @@ -behaviour(gen_server2). -export([start_link/5, do/2, do/3, shutdown/1]). --export([send_command/2, deliver/4, conserve_memory/2]). +-export([send_command/2, deliver/4, conserve_memory/2, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([init/1, terminate/2, code_change/3, @@ -45,8 +45,8 @@ -record(ch, {state, channel, reader_pid, writer_pid, limiter_pid, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, - username, virtual_host, - most_recently_declared_queue, consumer_mapping}). + username, virtual_host, most_recently_declared_queue, + consumer_mapping, blocking}). -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). @@ -77,6 +77,7 @@ -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). -spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok'). -spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). +-spec(flushed/2 :: (pid(), pid()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> [info_key()]). -spec(info/1 :: (pid()) -> [info()]). @@ -110,7 +111,10 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). conserve_memory(Pid, Conserve) -> - gen_server2:pcast(Pid, 9, {conserve_memory, Conserve}). + gen_server2:pcast(Pid, 8, {conserve_memory, Conserve}). + +flushed(Pid, QPid) -> + gen_server2:cast(Pid, {flushed, QPid}). list() -> pg_local:get_members(rabbit_channels). @@ -152,7 +156,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost]) -> username = Username, virtual_host = VHost, most_recently_declared_queue = <<>>, - consumer_mapping = dict:new()}, + consumer_mapping = dict:new(), + blocking = dict:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -190,6 +195,9 @@ handle_cast({method, Method, Content}, State) -> {stop, {Reason, erlang:get_stacktrace()}, State} end; +handle_cast({flushed, QPid}, State) -> + {noreply, queue_blocked(QPid, State)}; + handle_cast(terminate, State) -> {stop, normal, State}; @@ -215,7 +223,9 @@ handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, {stop, normal, State}; handle_info({'EXIT', _Pid, Reason}, State) -> - {stop, Reason, State}. + {stop, Reason, State}; +handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> + {noreply, queue_blocked(QPid, State)}. handle_pre_hibernate(State) -> ok = clear_permission_cache(), @@ -331,6 +341,20 @@ check_name(Kind, NameBin = <<"amq.", _/binary>>) -> check_name(_Kind, NameBin) -> NameBin. +queue_blocked(QPid, State = #ch{blocking = Blocking}) -> + case dict:find(QPid, Blocking) of + error -> State; + {ok, MRef} -> true = erlang:demonitor(MRef), + Blocking1 = dict:erase(QPid, Blocking), + ok = case dict:size(Blocking1) of + 0 -> rabbit_writer:send_command( + State#ch.writer_pid, + #'channel.flow_ok'{active = false}); + _ -> ok + end, + State#ch{blocking = Blocking1} + end. + handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -540,25 +564,17 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> "prefetch_size!=0 (~w)", [Size]); handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, - _, State = #ch{ limiter_pid = LimiterPid, - unacked_message_q = UAMQ }) -> - NewLimiterPid = case {LimiterPid, PrefetchCount} of - {undefined, 0} -> - undefined; - {undefined, _} -> - LPid = rabbit_limiter:start_link(self(), - queue:len(UAMQ)), - ok = limit_queues(LPid, State), - LPid; - {_, 0} -> - ok = rabbit_limiter:shutdown(LimiterPid), - ok = limit_queues(undefined, State), - undefined; - {_, _} -> - LimiterPid - end, - ok = rabbit_limiter:limit(NewLimiterPid, PrefetchCount), - {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = NewLimiterPid}}; + _, State = #ch{limiter_pid = LimiterPid}) -> + LimiterPid1 = case {LimiterPid, PrefetchCount} of + {undefined, 0} -> undefined; + {undefined, _} -> start_limiter(State); + {_, _} -> LimiterPid + end, + LimiterPid2 = case rabbit_limiter:limit(LimiterPid1, PrefetchCount) of + ok -> LimiterPid1; + stopped -> unlimit_queues(State) + end, + {reply, #'basic.qos_ok'{}, State#ch{limiter_pid = LimiterPid2}}; handle_method(#'basic.recover'{requeue = true}, _, State = #ch{ transaction_id = none, @@ -791,9 +807,31 @@ handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) -> handle_method(#'tx.rollback'{}, _, State) -> {reply, #'tx.rollback_ok'{}, internal_rollback(State)}; -handle_method(#'channel.flow'{active = _}, _, State) -> - %% FIXME: implement - {reply, #'channel.flow_ok'{active = true}, State}; +handle_method(#'channel.flow'{active = true}, _, + State = #ch{limiter_pid = LimiterPid}) -> + LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of + ok -> LimiterPid; + stopped -> unlimit_queues(State) + end, + {reply, #'channel.flow_ok'{active = true}, + State#ch{limiter_pid = LimiterPid1}}; + +handle_method(#'channel.flow'{active = false}, _, + State = #ch{limiter_pid = LimiterPid, + consumer_mapping = Consumers}) -> + LimiterPid1 = case LimiterPid of + undefined -> start_limiter(State); + Other -> Other + end, + ok = rabbit_limiter:block(LimiterPid1), + QPids = consumer_queues(Consumers), + Queues = [{QPid, erlang:monitor(process, QPid)} || QPid <- QPids], + ok = rabbit_amqqueue:flush_all(QPids, self()), + case Queues of + [] -> {reply, #'channel.flow_ok'{active = false}, State}; + _ -> {noreply, State#ch{limiter_pid = LimiterPid1, + blocking = dict:from_list(Queues)}} + end; handle_method(#'channel.flow_ok'{active = _}, _, State) -> %% TODO: We may want to correlate this to channel.flow messages we @@ -940,9 +978,18 @@ fold_per_queue(F, Acc0, UAQ) -> dict:fold(fun (QPid, MsgIds, Acc) -> F(QPid, MsgIds, Acc) end, Acc0, D). +start_limiter(State = #ch{unacked_message_q = UAMQ}) -> + LPid = rabbit_limiter:start_link(self(), queue:len(UAMQ)), + ok = limit_queues(LPid, State), + LPid. + notify_queues(#ch{consumer_mapping = Consumers}) -> rabbit_amqqueue:notify_down_all(consumer_queues(Consumers), self()). +unlimit_queues(State) -> + ok = limit_queues(undefined, State), + undefined. + limit_queues(LPid, #ch{consumer_mapping = Consumers}) -> rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), LPid). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index c9f8183fc9..7d84086108 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -37,7 +37,7 @@ handle_info/2]). -export([start_link/2, shutdown/1]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). --export([get_limit/1]). +-export([get_limit/1, block/1, unblock/1]). %%---------------------------------------------------------------------------- @@ -47,12 +47,14 @@ -spec(start_link/2 :: (pid(), non_neg_integer()) -> pid()). -spec(shutdown/1 :: (maybe_pid()) -> 'ok'). --spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). +-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped'). -spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). -spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()). +-spec(block/1 :: (maybe_pid()) -> 'ok'). +-spec(unblock/1 :: (maybe_pid()) -> 'ok' | 'stopped'). -endif. @@ -60,6 +62,7 @@ -record(lim, {prefetch_count = 0, ch_pid, + blocked = false, queues = dict:new(), % QPid -> {MonitorRef, Notify} volume = 0}). %% 'Notify' is a boolean that indicates whether a queue should be @@ -77,13 +80,14 @@ start_link(ChPid, UnackedMsgCount) -> shutdown(undefined) -> ok; shutdown(LimiterPid) -> - unlink(LimiterPid), + true = unlink(LimiterPid), gen_server2:cast(LimiterPid, shutdown). limit(undefined, 0) -> ok; limit(LimiterPid, PrefetchCount) -> - gen_server2:cast(LimiterPid, {limit, PrefetchCount}). + unlink_on_stopped(LimiterPid, + gen_server2:call(LimiterPid, {limit, PrefetchCount})). %% Ask the limiter whether the queue can deliver a message without %% breaching a limit @@ -113,6 +117,17 @@ get_limit(Pid) -> fun () -> 0 end, fun () -> gen_server2:pcall(Pid, 9, get_limit, infinity) end). +block(undefined) -> + ok; +block(LimiterPid) -> + gen_server2:call(LimiterPid, block, infinity). + +unblock(undefined) -> + ok; +unblock(LimiterPid) -> + unlink_on_stopped(LimiterPid, + gen_server2:call(LimiterPid, unblock, infinity)). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -120,29 +135,45 @@ get_limit(Pid) -> init([ChPid, UnackedMsgCount]) -> {ok, #lim{ch_pid = ChPid, volume = UnackedMsgCount}}. +handle_call({can_send, _QPid, _AckRequired}, _From, + State = #lim{blocked = true}) -> + {reply, false, State}; handle_call({can_send, QPid, AckRequired}, _From, State = #lim{volume = Volume}) -> case limit_reached(State) of true -> {reply, false, limit_queue(QPid, State)}; - false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; - true -> Volume - end}} + false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; + true -> Volume + end}} end; handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> - {reply, PrefetchCount, State}. + {reply, PrefetchCount, State}; + +handle_call({limit, PrefetchCount}, _From, State) -> + case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of + {cont, State1} -> {reply, ok, State1}; + {stop, State1} -> {stop, normal, stopped, State1} + end; + +handle_call(block, _From, State) -> + {reply, ok, State#lim{blocked = true}}; + +handle_call(unblock, _From, State) -> + case maybe_notify(State, State#lim{blocked = false}) of + {cont, State1} -> {reply, ok, State1}; + {stop, State1} -> {stop, normal, stopped, State1} + end. handle_cast(shutdown, State) -> {stop, normal, State}; -handle_cast({limit, PrefetchCount}, State) -> - {noreply, maybe_notify(State, State#lim{prefetch_count = PrefetchCount})}; - handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; true -> Volume - Count end, - {noreply, maybe_notify(State, State#lim{volume = NewVolume})}; + {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}), + {noreply, State1}; handle_cast({register, QPid}, State) -> {noreply, remember_queue(QPid, State)}; @@ -164,14 +195,21 @@ code_change(_, State, _) -> %%---------------------------------------------------------------------------- maybe_notify(OldState, NewState) -> - case limit_reached(OldState) andalso not(limit_reached(NewState)) of - true -> notify_queues(NewState); - false -> NewState + case (limit_reached(OldState) orelse is_blocked(OldState)) andalso + not (limit_reached(NewState) orelse is_blocked(NewState)) of + true -> NewState1 = notify_queues(NewState), + {case NewState1#lim.prefetch_count of + 0 -> stop; + _ -> cont + end, NewState1}; + false -> {cont, NewState} end. limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> Limit =/= 0 andalso Volume >= Limit. +is_blocked(#lim{blocked = Blocked}) -> Blocked. + remember_queue(QPid, State = #lim{queues = Queues}) -> case dict:is_key(QPid, Queues) of false -> MRef = erlang:monitor(process, QPid), @@ -209,3 +247,12 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> ok end, State#lim{queues = NewQueues}. + +unlink_on_stopped(LimiterPid, stopped) -> + true = unlink(LimiterPid), + ok = receive {'EXIT', LimiterPid, _Reason} -> ok + after 0 -> ok + end, + stopped; +unlink_on_stopped(_LimiterPid, Result) -> + Result. |
