diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-07-21 15:10:12 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-07-21 15:10:12 +0100 |
| commit | 942ac6ae6e624a4e0edddd8faf318d142f3af9eb (patch) | |
| tree | 2e333eae2ea5ed591679d28d326e259513d9c442 | |
| parent | 036b6f14a125f5355f300d9068e261f3c96a0572 (diff) | |
| download | rabbitmq-server-git-942ac6ae6e624a4e0edddd8faf318d142f3af9eb.tar.gz | |
Well the rabbit_tests now pass owing to being able to support a very similar API to previously wrt channel, but shutdown seems to be sporadically successful at best.
| -rw-r--r-- | src/rabbit_channel.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_channel_sup.erl | 41 | ||||
| -rw-r--r-- | src/rabbit_heartbeat.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_writer.erl | 6 |
6 files changed, 65 insertions, 34 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 600a267250..e043492a15 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,7 +35,7 @@ -behaviour(gen_server2). --export([start_link/5, do/2, do/3, shutdown/1]). +-export([start_link/5, start_link/6, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). @@ -78,7 +78,10 @@ -spec(start_link/5 :: (channel_number(), pid(), rabbit_access_control:username(), - rabbit_types:vhost(), pid()) -> {'ok', pid()}). + rabbit_types:vhost(), pid()) -> rabbit_types:ok(pid())). +-spec(start_link/6 :: + (channel_number(), pid(), pid(), rabbit_access_control:username(), + rabbit_types:vhost(), pid()) -> rabbit_types:ok(pid())). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). @@ -106,14 +109,16 @@ start_link(Channel, ReaderPid, Username, VHost, CollectorPid) -> {ok, proc_lib:spawn_link( fun () -> WriterPid = rabbit_channel_sup:writer(Parent), - State = init([Channel, Parent, ReaderPid, WriterPid, - Username, VHost, CollectorPid]), - gen_server2:enter_loop( - ?MODULE, [], State, self(), hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, - ?DESIRED_HIBERNATE}) + init_and_go([Channel, Parent, ReaderPid, WriterPid, + Username, VHost, CollectorPid]) end)}. +start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) -> + Parent = self(), + {ok, proc_lib:spawn_link( + fun () -> init_and_go([Channel, Parent, ReaderPid, WriterPid, + Username, VHost, CollectorPid]) end)}. + do(Pid, Method) -> do(Pid, Method, none). @@ -185,6 +190,11 @@ init([Channel, ParentPid, ReaderPid, WriterPid, Username, VHost, flow = #flow{server = true, client = true, pending = none}}. +init_and_go(InitArgs) -> + gen_server2:enter_loop(?MODULE, [], init(InitArgs), self(), hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, + ?DESIRED_HIBERNATE}). + handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State); diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index f8a7a7c6f4..0e716b4808 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -33,26 +33,31 @@ -behaviour(supervisor2). --export([start_link/7, writer/1, framing_channel/1, channel/1]). +-export([start_link/7, stop/1, writer/1, framing_channel/1, channel/1]). -export([init/1]). -include("rabbit.hrl"). +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/7 :: + (rabbit_net:socket(), rabbit_channel:channel_number(), + non_neg_integer(), pid(), rabbit_access_control:username(), + rabbit_types:vhost(), pid()) -> + ignore | rabbit_types:ok_or_error2(pid(), any())). + +-endif. + +%%---------------------------------------------------------------------------- + start_link(Sock, Channel, FrameMax, ReaderPid, Username, VHost, Collector) -> supervisor2:start_link(?MODULE, [Sock, Channel, FrameMax, ReaderPid, Username, VHost, Collector]). - -init([Sock, Channel, FrameMax, ReaderPid, Username, VHost, Collector]) -> - {ok, {{one_for_all, 0, 1}, - [{channel, {rabbit_channel, start_link, - [Channel, ReaderPid, Username, VHost, Collector]}, - permanent, ?MAX_WAIT, worker, [rabbit_channel]}, - {writer, {rabbit_writer, start_link, [Sock, Channel, FrameMax]}, - permanent, ?MAX_WAIT, worker, [rabbit_writer]}, - {framing_channel, {rabbit_framing_channel, start_link, []}, - permanent, ?MAX_WAIT, worker, [rabbit_framing_channel]} - ]}}. +stop(Pid) -> + supervisor2:stop(Pid). writer(Pid) -> hd(supervisor2:find_child(Pid, writer, worker, [rabbit_writer])). @@ -64,3 +69,15 @@ framing_channel(Pid) -> hd(supervisor2:find_child(Pid, framing_channel, worker, [rabbit_framing_channel])). +%%---------------------------------------------------------------------------- + +init([Sock, Channel, FrameMax, ReaderPid, Username, VHost, Collector]) -> + {ok, {{one_for_all, 0, 1}, + [{channel, {rabbit_channel, start_link, + [Channel, ReaderPid, Username, VHost, Collector]}, + permanent, ?MAX_WAIT, worker, [rabbit_channel]}, + {writer, {rabbit_writer, start_link, [Sock, Channel, FrameMax]}, + permanent, ?MAX_WAIT, worker, [rabbit_writer]}, + {framing_channel, {rabbit_framing_channel, start_link, []}, + permanent, ?MAX_WAIT, worker, [rabbit_framing_channel]} + ]}}. diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index 7da17071e6..b7c73ae7a6 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -95,10 +95,8 @@ heartbeater(Sock, TimeoutMillisec, StatName, Threshold, Handler) -> SameCount < Threshold -> F({NewStatVal, SameCount + 1}); true -> - case Handler() of - stop -> ok; - continue -> F({NewStatVal, 0}) - end + continue = Handler(), + F({NewStatVal, 0}) end; {error, einval} -> %% the socket is dead, most diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index ca38b6abf7..542ef32c41 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -144,6 +144,14 @@ -spec(shutdown/2 :: (pid(), string()) -> 'ok'). -spec(server_properties/0 :: () -> rabbit_framing:amqp_table()). +%% These specs only exists to add no_return() to keep dialyzer happy +-spec(init/1 :: (pid()) -> no_return()). +-spec(start_connection/4 :: + (pid(), any(), rabbit_networking:socket(), + fun ((rabbit_networking:socket()) -> + rabbit_types:ok_or_error2( + rabbit_networking:socket(), any()))) -> no_return()). + -endif. %%-------------------------------------------------------------------------- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 969f33b87c..46ba0b53da 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -794,8 +794,8 @@ test_user_management() -> test_server_status() -> %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), - Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, - self()), + {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, + self()), [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], {new, Queue = #amqqueue{}} <- [rabbit_amqqueue:declare( @@ -926,8 +926,8 @@ test_memory_pressure_sync(Ch, Writer) -> test_memory_pressure_spawn() -> Me = self(), Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end), - Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, - self()), + {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, + self()), ok = rabbit_channel:do(Ch, #'channel.open'{}), receive #'channel.open_ok'{} -> ok after 1000 -> throw(failed_to_receive_channel_open_ok) @@ -990,8 +990,8 @@ test_memory_pressure() -> alarm_handler:set_alarm({vm_memory_high_watermark, []}), Me = self(), Writer4 = spawn(fun () -> test_memory_pressure_receiver(Me) end), - Ch4 = rabbit_channel:start_link(1, self(), Writer4, <<"user">>, <<"/">>, - self()), + {ok, Ch4} = rabbit_channel:start_link(1, self(), Writer4, <<"user">>, + <<"/">>, self()), ok = rabbit_channel:do(Ch4, #'channel.open'{}), Writer4 ! sync, receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end, diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index a5355c5844..581ea428bd 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -50,12 +50,10 @@ -spec(start/3 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer()) - -> {'ok', pid()}). + non_neg_integer()) -> rabbit_types:ok(pid())). -spec(start_link/3 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer()) - -> {'ok', pid()}). + non_neg_integer()) -> rabbit_types:ok(pid())). -spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(send_command/3 :: |
