summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-07-21 15:10:12 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-07-21 15:10:12 +0100
commit942ac6ae6e624a4e0edddd8faf318d142f3af9eb (patch)
tree2e333eae2ea5ed591679d28d326e259513d9c442
parent036b6f14a125f5355f300d9068e261f3c96a0572 (diff)
downloadrabbitmq-server-git-942ac6ae6e624a4e0edddd8faf318d142f3af9eb.tar.gz
Well the rabbit_tests now pass owing to being able to support a very similar API to previously wrt channel, but shutdown seems to be sporadically successful at best.
-rw-r--r--src/rabbit_channel.erl26
-rw-r--r--src/rabbit_channel_sup.erl41
-rw-r--r--src/rabbit_heartbeat.erl6
-rw-r--r--src/rabbit_reader.erl8
-rw-r--r--src/rabbit_tests.erl12
-rw-r--r--src/rabbit_writer.erl6
6 files changed, 65 insertions, 34 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 600a267250..e043492a15 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -35,7 +35,7 @@
-behaviour(gen_server2).
--export([start_link/5, do/2, do/3, shutdown/1]).
+-export([start_link/5, start_link/6, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, conserve_memory/2, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
@@ -78,7 +78,10 @@
-spec(start_link/5 ::
(channel_number(), pid(), rabbit_access_control:username(),
- rabbit_types:vhost(), pid()) -> {'ok', pid()}).
+ rabbit_types:vhost(), pid()) -> rabbit_types:ok(pid())).
+-spec(start_link/6 ::
+ (channel_number(), pid(), pid(), rabbit_access_control:username(),
+ rabbit_types:vhost(), pid()) -> rabbit_types:ok(pid())).
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
@@ -106,14 +109,16 @@ start_link(Channel, ReaderPid, Username, VHost, CollectorPid) ->
{ok, proc_lib:spawn_link(
fun () ->
WriterPid = rabbit_channel_sup:writer(Parent),
- State = init([Channel, Parent, ReaderPid, WriterPid,
- Username, VHost, CollectorPid]),
- gen_server2:enter_loop(
- ?MODULE, [], State, self(), hibernate,
- {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
- ?DESIRED_HIBERNATE})
+ init_and_go([Channel, Parent, ReaderPid, WriterPid,
+ Username, VHost, CollectorPid])
end)}.
+start_link(Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid) ->
+ Parent = self(),
+ {ok, proc_lib:spawn_link(
+ fun () -> init_and_go([Channel, Parent, ReaderPid, WriterPid,
+ Username, VHost, CollectorPid]) end)}.
+
do(Pid, Method) ->
do(Pid, Method, none).
@@ -185,6 +190,11 @@ init([Channel, ParentPid, ReaderPid, WriterPid, Username, VHost,
flow = #flow{server = true, client = true,
pending = none}}.
+init_and_go(InitArgs) ->
+ gen_server2:enter_loop(?MODULE, [], init(InitArgs), self(), hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
+ ?DESIRED_HIBERNATE}).
+
handle_call(info, _From, State) ->
reply(infos(?INFO_KEYS, State), State);
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index f8a7a7c6f4..0e716b4808 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -33,26 +33,31 @@
-behaviour(supervisor2).
--export([start_link/7, writer/1, framing_channel/1, channel/1]).
+-export([start_link/7, stop/1, writer/1, framing_channel/1, channel/1]).
-export([init/1]).
-include("rabbit.hrl").
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/7 ::
+ (rabbit_net:socket(), rabbit_channel:channel_number(),
+ non_neg_integer(), pid(), rabbit_access_control:username(),
+ rabbit_types:vhost(), pid()) ->
+ ignore | rabbit_types:ok_or_error2(pid(), any())).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
start_link(Sock, Channel, FrameMax, ReaderPid, Username, VHost, Collector) ->
supervisor2:start_link(?MODULE, [Sock, Channel, FrameMax, ReaderPid,
Username, VHost, Collector]).
-
-init([Sock, Channel, FrameMax, ReaderPid, Username, VHost, Collector]) ->
- {ok, {{one_for_all, 0, 1},
- [{channel, {rabbit_channel, start_link,
- [Channel, ReaderPid, Username, VHost, Collector]},
- permanent, ?MAX_WAIT, worker, [rabbit_channel]},
- {writer, {rabbit_writer, start_link, [Sock, Channel, FrameMax]},
- permanent, ?MAX_WAIT, worker, [rabbit_writer]},
- {framing_channel, {rabbit_framing_channel, start_link, []},
- permanent, ?MAX_WAIT, worker, [rabbit_framing_channel]}
- ]}}.
+stop(Pid) ->
+ supervisor2:stop(Pid).
writer(Pid) ->
hd(supervisor2:find_child(Pid, writer, worker, [rabbit_writer])).
@@ -64,3 +69,15 @@ framing_channel(Pid) ->
hd(supervisor2:find_child(Pid, framing_channel, worker,
[rabbit_framing_channel])).
+%%----------------------------------------------------------------------------
+
+init([Sock, Channel, FrameMax, ReaderPid, Username, VHost, Collector]) ->
+ {ok, {{one_for_all, 0, 1},
+ [{channel, {rabbit_channel, start_link,
+ [Channel, ReaderPid, Username, VHost, Collector]},
+ permanent, ?MAX_WAIT, worker, [rabbit_channel]},
+ {writer, {rabbit_writer, start_link, [Sock, Channel, FrameMax]},
+ permanent, ?MAX_WAIT, worker, [rabbit_writer]},
+ {framing_channel, {rabbit_framing_channel, start_link, []},
+ permanent, ?MAX_WAIT, worker, [rabbit_framing_channel]}
+ ]}}.
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index 7da17071e6..b7c73ae7a6 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -95,10 +95,8 @@ heartbeater(Sock, TimeoutMillisec, StatName, Threshold, Handler) ->
SameCount < Threshold ->
F({NewStatVal, SameCount + 1});
true ->
- case Handler() of
- stop -> ok;
- continue -> F({NewStatVal, 0})
- end
+ continue = Handler(),
+ F({NewStatVal, 0})
end;
{error, einval} ->
%% the socket is dead, most
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index ca38b6abf7..542ef32c41 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -144,6 +144,14 @@
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
-spec(server_properties/0 :: () -> rabbit_framing:amqp_table()).
+%% These specs only exists to add no_return() to keep dialyzer happy
+-spec(init/1 :: (pid()) -> no_return()).
+-spec(start_connection/4 ::
+ (pid(), any(), rabbit_networking:socket(),
+ fun ((rabbit_networking:socket()) ->
+ rabbit_types:ok_or_error2(
+ rabbit_networking:socket(), any()))) -> no_return()).
+
-endif.
%%--------------------------------------------------------------------------
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 969f33b87c..46ba0b53da 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -794,8 +794,8 @@ test_user_management() ->
test_server_status() ->
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
- Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>,
- self()),
+ {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>,
+ self()),
[Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
{new, Queue = #amqqueue{}} <-
[rabbit_amqqueue:declare(
@@ -926,8 +926,8 @@ test_memory_pressure_sync(Ch, Writer) ->
test_memory_pressure_spawn() ->
Me = self(),
Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end),
- Ch = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>,
- self()),
+ {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>,
+ self()),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
receive #'channel.open_ok'{} -> ok
after 1000 -> throw(failed_to_receive_channel_open_ok)
@@ -990,8 +990,8 @@ test_memory_pressure() ->
alarm_handler:set_alarm({vm_memory_high_watermark, []}),
Me = self(),
Writer4 = spawn(fun () -> test_memory_pressure_receiver(Me) end),
- Ch4 = rabbit_channel:start_link(1, self(), Writer4, <<"user">>, <<"/">>,
- self()),
+ {ok, Ch4} = rabbit_channel:start_link(1, self(), Writer4, <<"user">>,
+ <<"/">>, self()),
ok = rabbit_channel:do(Ch4, #'channel.open'{}),
Writer4 ! sync,
receive sync -> ok after 1000 -> throw(failed_to_receive_writer_sync) end,
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index a5355c5844..581ea428bd 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -50,12 +50,10 @@
-spec(start/3 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer())
- -> {'ok', pid()}).
+ non_neg_integer()) -> rabbit_types:ok(pid())).
-spec(start_link/3 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer())
- -> {'ok', pid()}).
+ non_neg_integer()) -> rabbit_types:ok(pid())).
-spec(send_command/2 ::
(pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(send_command/3 ::