summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/gen_server2.erl5
-rw-r--r--src/priority_queue.erl44
-rw-r--r--src/rabbit.erl63
-rw-r--r--src/rabbit_alarm.erl54
-rw-r--r--src/rabbit_amqqueue.erl16
-rw-r--r--src/rabbit_basic.erl5
-rw-r--r--src/rabbit_channel.erl42
-rw-r--r--src/rabbit_control.erl118
-rw-r--r--src/rabbit_dialyzer.erl91
-rw-r--r--src/rabbit_exchange.erl8
-rw-r--r--src/rabbit_guid.erl26
-rw-r--r--src/rabbit_heartbeat.erl4
-rw-r--r--src/rabbit_limiter.erl2
-rw-r--r--src/rabbit_load.erl17
-rw-r--r--src/rabbit_memsup.erl142
-rw-r--r--src/rabbit_memsup_darwin.erl88
-rw-r--r--src/rabbit_memsup_linux.erl115
-rw-r--r--src/rabbit_misc.erl83
-rw-r--r--src/rabbit_mnesia.erl99
-rw-r--r--src/rabbit_multi.erl35
-rw-r--r--src/rabbit_net.erl132
-rw-r--r--src/rabbit_networking.erl71
-rw-r--r--src/rabbit_plugin_activator.erl76
-rw-r--r--src/rabbit_reader.erl196
-rw-r--r--src/rabbit_tests.erl119
-rw-r--r--src/rabbit_writer.erl4
-rw-r--r--src/tcp_acceptor.erl27
-rw-r--r--src/tcp_listener.erl29
-rw-r--r--src/tcp_listener_sup.erl14
29 files changed, 1202 insertions, 523 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 36fb4fa8c3..a2d9350c4e 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -437,7 +437,10 @@ unregister_name({local,Name}) ->
unregister_name({global,Name}) ->
_ = global:unregister_name(Name);
unregister_name(Pid) when is_pid(Pid) ->
- Pid.
+ Pid;
+% Under R12 let's just ignore it, as we have a single term as Name.
+% On R13 it will never get here, as we get tuple with 'local/global' atom.
+unregister_name(_Name) -> ok.
extend_backoff(undefined) ->
undefined;
diff --git a/src/priority_queue.erl b/src/priority_queue.erl
index 732757c41c..74b41a910c 100644
--- a/src/priority_queue.erl
+++ b/src/priority_queue.erl
@@ -55,7 +55,8 @@
-module(priority_queue).
--export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, out/1]).
+-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3,
+ out/1, join/2]).
%%----------------------------------------------------------------------------
@@ -66,13 +67,14 @@
-type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}).
-spec(new/0 :: () -> pqueue()).
--spec(is_queue/1 :: (any()) -> bool()).
--spec(is_empty/1 :: (pqueue()) -> bool()).
+-spec(is_queue/1 :: (any()) -> boolean()).
+-spec(is_empty/1 :: (pqueue()) -> boolean()).
-spec(len/1 :: (pqueue()) -> non_neg_integer()).
-spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]).
-spec(in/2 :: (any(), pqueue()) -> pqueue()).
-spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()).
-spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}).
+-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()).
-endif.
@@ -147,6 +149,42 @@ out({pqueue, [{P, Q} | Queues]}) ->
end,
{R, NewQ}.
+join(A, {queue, [], []}) ->
+ A;
+join({queue, [], []}, B) ->
+ B;
+join({queue, AIn, AOut}, {queue, BIn, BOut}) ->
+ {queue, BIn, AOut ++ lists:reverse(AIn, BOut)};
+join(A = {queue, _, _}, {pqueue, BPQ}) ->
+ {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, BPQ),
+ Post1 = case Post of
+ [] -> [ {0, A} ];
+ [ {0, ZeroQueue} | Rest ] -> [ {0, join(A, ZeroQueue)} | Rest ];
+ _ -> [ {0, A} | Post ]
+ end,
+ {pqueue, Pre ++ Post1};
+join({pqueue, APQ}, B = {queue, _, _}) ->
+ {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, APQ),
+ Post1 = case Post of
+ [] -> [ {0, B} ];
+ [ {0, ZeroQueue} | Rest ] -> [ {0, join(ZeroQueue, B)} | Rest ];
+ _ -> [ {0, B} | Post ]
+ end,
+ {pqueue, Pre ++ Post1};
+join({pqueue, APQ}, {pqueue, BPQ}) ->
+ {pqueue, merge(APQ, BPQ, [])}.
+
+merge([], BPQ, Acc) ->
+ lists:reverse(Acc, BPQ);
+merge(APQ, [], Acc) ->
+ lists:reverse(Acc, APQ);
+merge([{P, A}|As], [{P, B}|Bs], Acc) ->
+ merge(As, Bs, [ {P, join(A, B)} | Acc ]);
+merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB ->
+ merge(As, Bs, [ {PA, A} | Acc ]);
+merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) ->
+ merge(As, Bs, [ {PB, B} | Acc ]).
+
r2f([]) -> {queue, [], []};
r2f([_] = R) -> {queue, [], R};
r2f([X,Y]) -> {queue, [X], [Y]};
diff --git a/src/rabbit.erl b/src/rabbit.erl
index b0d62b5ab8..18fd1b175f 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -33,7 +33,7 @@
-behaviour(application).
--export([start/0, stop/0, stop_and_halt/0, status/0, rotate_logs/1]).
+-export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0, rotate_logs/1]).
-export([start/2, stop/1]).
@@ -57,6 +57,7 @@
-type(log_location() :: 'tty' | 'undefined' | string()).
-type(file_suffix() :: binary()).
+-spec(prepare/0 :: () -> 'ok').
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(stop_and_halt/0 :: () -> 'ok').
@@ -71,11 +72,14 @@
%%----------------------------------------------------------------------------
+prepare() ->
+ ok = ensure_working_log_handlers(),
+ ok = rabbit_mnesia:ensure_mnesia_dir().
+
start() ->
try
- ok = ensure_working_log_handlers(),
- ok = rabbit_mnesia:ensure_mnesia_dir(),
- ok = rabbit_misc:start_applications(?APPS)
+ ok = prepare(),
+ ok = rabbit_misc:start_applications(?APPS)
after
%%give the error loggers some time to catch up
timer:sleep(100)
@@ -116,8 +120,6 @@ start(normal, []) ->
print_banner(),
- {ok, ExtraSteps} = application:get_env(extra_startup_steps),
-
lists:foreach(
fun ({Msg, Thunk}) ->
io:format("starting ~-20s ...", [Msg]),
@@ -135,13 +137,13 @@ start(normal, []) ->
ok = start_child(rabbit_log),
ok = rabbit_hooks:start(),
- ok = rabbit_amqqueue:start(),
+ ok = rabbit_binary_generator:
+ check_empty_content_body_frame_size(),
{ok, MemoryAlarms} = application:get_env(memory_alarms),
ok = rabbit_alarm:start(MemoryAlarms),
- ok = rabbit_binary_generator:
- check_empty_content_body_frame_size(),
+ ok = rabbit_amqqueue:start(),
ok = start_child(rabbit_router),
ok = start_child(rabbit_node_monitor)
@@ -170,14 +172,28 @@ start(normal, []) ->
{"TCP listeners",
fun () ->
ok = rabbit_networking:start(),
- {ok, TCPListeners} = application:get_env(tcp_listeners),
+ {ok, TcpListeners} = application:get_env(tcp_listeners),
lists:foreach(
fun ({Host, Port}) ->
ok = rabbit_networking:start_tcp_listener(Host, Port)
end,
- TCPListeners)
- end}]
- ++ ExtraSteps),
+ TcpListeners)
+ end},
+ {"SSL listeners",
+ fun () ->
+ case application:get_env(ssl_listeners) of
+ {ok, []} ->
+ ok;
+ {ok, SslListeners} ->
+ ok = rabbit_misc:start_applications([crypto, ssl]),
+
+ {ok, SslOpts} = application:get_env(ssl_options),
+
+ [rabbit_networking:start_ssl_listener
+ (Host, Port, SslOpts) || {Host, Port} <- SslListeners],
+ ok
+ end
+ end}]),
io:format("~nbroker running~n"),
@@ -203,6 +219,16 @@ log_location(Type) ->
_ -> undefined
end.
+app_location() ->
+ {ok, Application} = application:get_application(),
+ filename:absname(code:where_is_file(atom_to_list(Application) ++ ".app")).
+
+home_dir() ->
+ case init:get_argument(home) of
+ {ok, [[Home]]} -> Home;
+ Other -> Other
+ end.
+
%---------------------------------------------------------------------------
print_banner() ->
@@ -225,10 +251,13 @@ print_banner() ->
[Product, string:right([$v|Version], ProductLen),
?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR,
?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
- Settings = [{"node", node()},
- {"log", log_location(kernel)},
- {"sasl log", log_location(sasl)},
- {"database dir", rabbit_mnesia:dir()}],
+ Settings = [{"node", node()},
+ {"app descriptor", app_location()},
+ {"home dir", home_dir()},
+ {"cookie hash", rabbit_misc:cookie_hash()},
+ {"log", log_location(kernel)},
+ {"sasl log", log_location(sasl)},
+ {"database dir", rabbit_mnesia:dir()}],
DescrLen = lists:max([length(K) || {K, _V} <- Settings]),
Format = "~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n",
lists:foreach(fun ({K, V}) -> io:format(Format, [K, V]) end, Settings),
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index 21999f16c3..7a2fbcb826 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -41,7 +41,7 @@
-define(MEMSUP_CHECK_INTERVAL, 1000).
%% OSes on which we know memory alarms to be trustworthy
--define(SUPPORTED_OS, [{unix, linux}]).
+-define(SUPPORTED_OS, [{unix, linux}, {unix, darwin}]).
-record(alarms, {alertees, system_memory_high_watermark = false}).
@@ -50,7 +50,7 @@
-ifdef(use_specs).
-type(mfa_tuple() :: {atom(), atom(), list()}).
--spec(start/1 :: (bool() | 'auto') -> 'ok').
+-spec(start/1 :: (boolean() | 'auto') -> 'ok').
-spec(stop/0 :: () -> 'ok').
-spec(register/2 :: (pid(), mfa_tuple()) -> 'ok').
@@ -136,33 +136,35 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
start_memsup() ->
- Mod = case os:type() of
- %% memsup doesn't take account of buffers or cache when
- %% considering "free" memory - therefore on Linux we can
- %% get memory alarms very easily without any pressure
- %% existing on memory at all. Therefore we need to use
- %% our own simple memory monitor.
- %%
- {unix, linux} -> rabbit_memsup_linux;
-
- %% Start memsup programmatically rather than via the
- %% rabbitmq-server script. This is not quite the right
- %% thing to do as os_mon checks to see if memsup is
- %% available before starting it, but as memsup is
- %% available everywhere (even on VXWorks) it should be
- %% ok.
- %%
- %% One benefit of the programmatic startup is that we
- %% can add our alarm_handler before memsup is running,
- %% thus ensuring that we notice memory alarms that go
- %% off on startup.
- %%
- _ -> memsup
- end,
+ {Mod, Args} =
+ case os:type() of
+ %% memsup doesn't take account of buffers or cache when
+ %% considering "free" memory - therefore on Linux we can
+ %% get memory alarms very easily without any pressure
+ %% existing on memory at all. Therefore we need to use
+ %% our own simple memory monitor.
+ %%
+ {unix, linux} -> {rabbit_memsup, [rabbit_memsup_linux]};
+ {unix, darwin} -> {rabbit_memsup, [rabbit_memsup_darwin]};
+
+ %% Start memsup programmatically rather than via the
+ %% rabbitmq-server script. This is not quite the right
+ %% thing to do as os_mon checks to see if memsup is
+ %% available before starting it, but as memsup is
+ %% available everywhere (even on VXWorks) it should be
+ %% ok.
+ %%
+ %% One benefit of the programmatic startup is that we
+ %% can add our alarm_handler before memsup is running,
+ %% thus ensuring that we notice memory alarms that go
+ %% off on startup.
+ %%
+ _ -> {memsup, []}
+ end,
%% This is based on os_mon:childspec(memsup, true)
{ok, _} = supervisor:start_child(
os_mon_sup,
- {memsup, {Mod, start_link, []},
+ {memsup, {Mod, start_link, Args},
permanent, 2000, worker, [Mod]}),
ok.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 4903c2c57f..1a5e82d714 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -63,7 +63,7 @@
-spec(start/0 :: () -> 'ok').
-spec(recover/0 :: () -> 'ok').
--spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) ->
+-spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) ->
amqqueue()).
-spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()).
-spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()).
@@ -83,8 +83,8 @@
{'error', 'in_use'} |
{'error', 'not_empty'}).
-spec(purge/1 :: (amqqueue()) -> qlen()).
--spec(deliver/2 :: (pid(), delivery()) -> bool()).
--spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok').
+-spec(deliver/2 :: (pid(), delivery()) -> boolean()).
+-spec(redeliver/2 :: (pid(), [{message(), boolean()}]) -> 'ok').
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok').
-spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()).
@@ -92,16 +92,16 @@
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
-spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()).
-spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked').
--spec(basic_get/3 :: (amqqueue(), pid(), bool()) ->
+-spec(basic_get/3 :: (amqqueue(), pid(), boolean()) ->
{'ok', non_neg_integer(), msg()} | 'empty').
-spec(basic_consume/8 ::
- (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) ->
+ (amqqueue(), boolean(), pid(), pid(), pid(), ctag(), boolean(), any()) ->
'ok' | {'error', 'queue_owned_by_another_connection' |
'exclusive_consume_unavailable'}).
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
--spec(internal_declare/2 :: (amqqueue(), bool()) -> amqqueue()).
+-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
@@ -303,10 +303,10 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
infinity).
notify_sent(QPid, ChPid) ->
- gen_server2:cast(QPid, {notify_sent, ChPid}).
+ gen_server2:pcast(QPid, 8, {notify_sent, ChPid}).
unblock(QPid, ChPid) ->
- gen_server2:cast(QPid, {unblock, ChPid}).
+ gen_server2:pcast(QPid, 8, {unblock, ChPid}).
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 4033aaafda..bec2cd0845 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -45,13 +45,14 @@
-type(publish_result() :: ({ok, routing_result(), [pid()]} | not_found())).
-spec(publish/1 :: (delivery()) -> publish_result()).
--spec(delivery/4 :: (bool(), bool(), maybe(txn()), message()) -> delivery()).
+-spec(delivery/4 :: (boolean(), boolean(), maybe(txn()), message()) ->
+ delivery()).
-spec(message/4 :: (exchange_name(), routing_key(), properties_input(),
binary()) -> message()).
-spec(properties/1 :: (properties_input()) -> amqp_properties()).
-spec(publish/4 :: (exchange_name(), routing_key(), properties_input(),
binary()) -> publish_result()).
--spec(publish/7 :: (exchange_name(), routing_key(), bool(), bool(),
+-spec(publish/7 :: (exchange_name(), routing_key(), boolean(), boolean(),
maybe(txn()), properties_input(), binary()) ->
publish_result()).
-spec(build_content/2 :: (amqp_properties(), binary()) -> content()).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 8af85b34c4..26db0777d0 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -60,8 +60,8 @@
-spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
-spec(send_command/2 :: (pid(), amqp_method()) -> 'ok').
--spec(deliver/4 :: (pid(), ctag(), bool(), msg()) -> 'ok').
--spec(conserve_memory/2 :: (pid(), bool()) -> 'ok').
+-spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok').
+-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-endif.
@@ -125,11 +125,11 @@ handle_cast({method, Method, Content}, State) ->
stop ->
{stop, normal, State#ch{state = terminating}}
catch
- exit:{amqp, Error, Explanation, none} ->
- ok = notify_queues(internal_rollback(State)),
- Reason = {amqp, Error, Explanation,
- rabbit_misc:method_record_type(Method)},
- State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
+ exit:Reason = #amqp_error{} ->
+ ok = rollback_and_notify(State),
+ MethodName = rabbit_misc:method_record_type(Method),
+ State#ch.reader_pid ! {channel_exit, State#ch.channel,
+ Reason#amqp_error{method = MethodName}},
{stop, normal, State#ch{state = terminating}};
exit:normal ->
{stop, normal, State};
@@ -157,6 +157,10 @@ handle_cast({conserve_memory, Conserve}, State) ->
State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}),
noreply(State).
+handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
+ State = #ch{writer_pid = WriterPid}) ->
+ State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
+ {stop, normal, State};
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
@@ -171,7 +175,7 @@ terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid,
terminate(Reason, State = #ch{writer_pid = WriterPid,
limiter_pid = LimiterPid}) ->
- Res = notify_queues(internal_rollback(State)),
+ Res = rollback_and_notify(State),
case Reason of
normal -> ok = Res;
_ -> ok
@@ -256,9 +260,6 @@ expand_routing_key_shortcut(<<>>, <<>>,
expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) ->
RoutingKey.
-die_precondition_failed(Fmt, Params) ->
- rabbit_misc:protocol_error(precondition_failed, Fmt, Params).
-
%% check that an exchange/queue name does not contain the reserved
%% "amq." prefix.
%%
@@ -290,7 +291,7 @@ handle_method(_Method, _, #ch{state = starting}) ->
rabbit_misc:protocol_error(channel_error, "expected 'channel.open'", []);
handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
- ok = notify_queues(internal_rollback(State)),
+ ok = rollback_and_notify(State),
ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}),
stop;
@@ -601,8 +602,8 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
{error, not_found} ->
rabbit_misc:not_found(ExchangeName);
{error, in_use} ->
- die_precondition_failed(
- "~s in use", [rabbit_misc:rs(ExchangeName)]);
+ rabbit_misc:protocol_error(
+ precondition_failed, "~s in use", [rabbit_misc:rs(ExchangeName)]);
ok ->
return_ok(State, NoWait, #'exchange.delete_ok'{})
end;
@@ -676,11 +677,11 @@ handle_method(#'queue.delete'{queue = QueueNameBin,
QueueName,
fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of
{error, in_use} ->
- die_precondition_failed(
- "~s in use", [rabbit_misc:rs(QueueName)]);
+ rabbit_misc:protocol_error(
+ precondition_failed, "~s in use", [rabbit_misc:rs(QueueName)]);
{error, not_empty} ->
- die_precondition_failed(
- "~s not empty", [rabbit_misc:rs(QueueName)]);
+ rabbit_misc:protocol_error(
+ precondition_failed, "~s not empty", [rabbit_misc:rs(QueueName)]);
{ok, PurgedMessageCount} ->
return_ok(State, NoWait,
#'queue.delete_ok'{
@@ -863,6 +864,11 @@ internal_rollback(State = #ch{transaction_id = TxnKey,
internal_error, "rollback failed: ~w", [Errors])
end.
+rollback_and_notify(State = #ch{transaction_id = none}) ->
+ notify_queues(State);
+rollback_and_notify(State) ->
+ notify_queues(internal_rollback(State)).
+
fold_per_queue(F, Acc0, UAQ) ->
D = lists:foldl(
fun ({_DTag, _CTag,
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 37e4d18993..a53ac289f2 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -52,10 +52,12 @@
%%----------------------------------------------------------------------------
start() ->
+ {ok, [[NodeNameStr|_]|_]} = init:get_argument(nodename),
+ NodeName = list_to_atom(NodeNameStr),
FullCommand = init:get_plain_arguments(),
#params{quiet = Quiet, node = Node, command = Command, args = Args} =
parse_args(FullCommand, #params{quiet = false,
- node = rabbit_misc:localnode(rabbit)}),
+ node = rabbit_misc:localnode(NodeName)}),
Inform = case Quiet of
true -> fun(_Format, _Args1) -> ok end;
false -> fun(Format, Args1) ->
@@ -80,13 +82,38 @@ start() ->
{error, Reason} ->
error("~p", [Reason]),
halt(2);
+ {badrpc, Reason} ->
+ error("unable to connect to node ~w: ~w", [Node, Reason]),
+ print_badrpc_diagnostics(Node),
+ halt(2);
Other ->
error("~p", [Other]),
halt(2)
end.
-error(Format, Args) ->
- rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args).
+fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args).
+
+error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args).
+
+print_badrpc_diagnostics(Node) ->
+ fmt_stderr("diagnostics:", []),
+ NodeHost = rabbit_misc:nodehost(Node),
+ case net_adm:names(NodeHost) of
+ {error, EpmdReason} ->
+ fmt_stderr("- unable to connect to epmd on ~s: ~w",
+ [NodeHost, EpmdReason]);
+ {ok, NamePorts} ->
+ fmt_stderr("- nodes and their ports on ~s: ~p",
+ [NodeHost, [{list_to_atom(Name), Port} ||
+ {Name, Port} <- NamePorts]])
+ end,
+ fmt_stderr("- current node: ~w", [node()]),
+ case init:get_argument(home) of
+ {ok, [[Home]]} -> fmt_stderr("- current node home dir: ~s", [Home]);
+ Other -> fmt_stderr("- no current node home dir: ~p", [Other])
+ end,
+ fmt_stderr("- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]),
+ ok.
parse_args(["-n", NodeS | Args], Params) ->
Node = case lists:member($@, NodeS) of
@@ -164,7 +191,7 @@ exchange name, routing key, queue name and arguments, in that order.
<ConnectionInfoItem> must be a member of the list [node, address, port,
peer_address, peer_port, state, channels, user, vhost, timeout, frame_max,
recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. The default is to display
-user, peer_address and peer_port.
+user, peer_address, peer_port and state.
"),
halt(1).
@@ -197,9 +224,11 @@ action(cluster, Node, ClusterNodeSs, Inform) ->
action(status, Node, [], Inform) ->
Inform("Status of node ~p", [Node]),
- Res = call(Node, {rabbit, status, []}),
- io:format("~p~n", [Res]),
- ok;
+ case call(Node, {rabbit, status, []}) of
+ {badrpc, _} = Res -> Res;
+ Res -> io:format("~p~n", [Res]),
+ ok
+ end;
action(rotate_logs, Node, [], Inform) ->
Inform("Reopening logs for node ~p", [Node]),
@@ -270,8 +299,9 @@ action(list_bindings, Node, Args, Inform) ->
action(list_connections, Node, Args, Inform) ->
Inform("Listing connections", []),
- ArgAtoms = list_replace(node, pid,
- default_if_empty(Args, [user, peer_address, peer_port])),
+ ArgAtoms = list_replace(node, pid,
+ default_if_empty(Args, [user, peer_address,
+ peer_port, state])),
display_info_list(rpc_call(Node, rabbit_networking, connection_info_all,
[ArgAtoms]),
ArgAtoms);
@@ -314,7 +344,7 @@ default_if_empty(List, Default) when is_list(List) ->
end.
display_info_list(Results, InfoItemKeys) when is_list(Results) ->
- lists:foreach(fun (Result) -> display_row([format_info_item(Result, X) ||
+ lists:foreach(fun (Result) -> display_row([format_info_item(X, Result) ||
X <- InfoItemKeys])
end, Results),
ok;
@@ -325,26 +355,29 @@ display_row(Row) ->
io:fwrite(lists:flatten(rabbit_misc:intersperse("\t", Row))),
io:nl().
-format_info_item(Items, Key) ->
- {value, Info = {Key, Value}} = lists:keysearch(Key, 1, Items),
- case Info of
- {_, #resource{name = Name}} ->
- url_encode(Name);
- _ when Key =:= address; Key =:= peer_address andalso is_tuple(Value) ->
+format_info_item(Key, Items) ->
+ case proplists:get_value(Key, Items) of
+ #resource{name = Name} ->
+ escape(Name);
+ Value when Key =:= address; Key =:= peer_address andalso
+ is_tuple(Value) ->
inet_parse:ntoa(Value);
- _ when is_pid(Value) ->
+ Value when is_pid(Value) ->
atom_to_list(node(Value));
- _ when is_binary(Value) ->
- url_encode(Value);
- _ ->
+ Value when is_binary(Value) ->
+ escape(Value);
+ Value when is_atom(Value) ->
+ escape(atom_to_list(Value));
+ Value ->
io_lib:format("~w", [Value])
end.
display_list(L) when is_list(L) ->
lists:foreach(fun (I) when is_binary(I) ->
- io:format("~s~n", [url_encode(I)]);
+ io:format("~s~n", [escape(I)]);
(I) when is_tuple(I) ->
- display_row([url_encode(V) || V <- tuple_to_list(I)])
+ display_row([escape(V)
+ || V <- tuple_to_list(I)])
end,
lists:sort(L)),
ok;
@@ -356,32 +389,25 @@ call(Node, {Mod, Fun, Args}) ->
rpc_call(Node, Mod, Fun, Args) ->
rpc:call(Node, Mod, Fun, Args, ?RPC_TIMEOUT).
-%% url_encode is lifted from ibrowse, modified to preserve some characters
-url_encode(Bin) when binary(Bin) ->
- url_encode_char(lists:reverse(binary_to_list(Bin)), []).
-
-url_encode_char([X | T], Acc) when X >= $a, X =< $z ->
- url_encode_char(T, [X | Acc]);
-url_encode_char([X | T], Acc) when X >= $A, X =< $Z ->
- url_encode_char(T, [X | Acc]);
-url_encode_char([X | T], Acc) when X >= $0, X =< $9 ->
- url_encode_char(T, [X | Acc]);
-url_encode_char([X | T], Acc)
- when X == $-; X == $_; X == $.; X == $~;
- X == $!; X == $*; X == $'; X == $(;
- X == $); X == $;; X == $:; X == $@;
- X == $&; X == $=; X == $+; X == $$;
- X == $,; X == $/; X == $?; X == $%;
- X == $#; X == $[; X == $] ->
- url_encode_char(T, [X | Acc]);
-url_encode_char([X | T], Acc) ->
- url_encode_char(T, [$%, d2h(X bsr 4), d2h(X band 16#0f) | Acc]);
-url_encode_char([], Acc) ->
+%% escape does C-style backslash escaping of non-printable ASCII
+%% characters. We don't escape characters above 127, since they may
+%% form part of UTF-8 strings.
+
+escape(Bin) when binary(Bin) ->
+ escape(binary_to_list(Bin));
+escape(L) when is_list(L) ->
+ escape_char(lists:reverse(L), []).
+
+escape_char([$\\ | T], Acc) ->
+ escape_char(T, [$\\, $\\ | Acc]);
+escape_char([X | T], Acc) when X > 32, X /= 127 ->
+ escape_char(T, [X | Acc]);
+escape_char([X | T], Acc) ->
+ escape_char(T, [$\\, $0 + (X bsr 6), $0 + (X band 8#070 bsr 3),
+ $0 + (X band 7) | Acc]);
+escape_char([], Acc) ->
Acc.
-d2h(N) when N<10 -> N+$0;
-d2h(N) -> N+$a-10.
-
list_replace(Find, Replace, List) ->
[case X of Find -> Replace; _ -> X end || X <- List].
diff --git a/src/rabbit_dialyzer.erl b/src/rabbit_dialyzer.erl
new file mode 100644
index 0000000000..23e6fc4432
--- /dev/null
+++ b/src/rabbit_dialyzer.erl
@@ -0,0 +1,91 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_dialyzer).
+-include("rabbit.hrl").
+
+-export([create_basic_plt/1, add_to_plt/2, dialyze_files/2, halt_with_code/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(create_basic_plt/1 :: (string()) -> 'ok').
+-spec(add_to_plt/2 :: (string(), string()) -> 'ok').
+-spec(dialyze_files/2 :: (string(), string()) -> 'ok').
+-spec(halt_with_code/1 :: (atom()) -> no_return()).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+create_basic_plt(BasicPltPath) ->
+ OptsRecord = dialyzer_options:build(
+ [{analysis_type, plt_build},
+ {output_plt, BasicPltPath},
+ {files_rec, otp_apps_dependencies_paths()}]),
+ dialyzer_cl:start(OptsRecord),
+ ok.
+
+add_to_plt(PltPath, FilesString) ->
+ {ok, Files} = regexp:split(FilesString, " "),
+ DialyzerWarnings = dialyzer:run([{analysis_type, plt_add},
+ {init_plt, PltPath},
+ {output_plt, PltPath},
+ {files, Files}]),
+ print_warnings(DialyzerWarnings),
+ ok.
+
+dialyze_files(PltPath, ModifiedFiles) ->
+ {ok, Files} = regexp:split(ModifiedFiles, " "),
+ DialyzerWarnings = dialyzer:run([{init_plt, PltPath},
+ {files, Files}]),
+ case DialyzerWarnings of
+ [] -> io:format("~nOk~n"),
+ ok;
+ _ -> io:format("~nFAILED with the following warnings:~n"),
+ print_warnings(DialyzerWarnings),
+ fail
+ end.
+
+print_warnings(Warnings) ->
+ [io:format("~s", [dialyzer:format_warning(W)]) || W <- Warnings],
+ io:format("~n"),
+ ok.
+
+otp_apps_dependencies_paths() ->
+ [code:lib_dir(App, ebin) ||
+ App <- [kernel, stdlib, sasl, mnesia, os_mon, ssl, eunit, tools]].
+
+halt_with_code(ok) ->
+ halt();
+halt_with_code(fail) ->
+ halt(1).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index f17ee2f530..37a1357d41 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -61,7 +61,7 @@
'exchange_not_found' |
'exchange_and_queue_not_found'}).
-spec(recover/0 :: () -> 'ok').
--spec(declare/4 :: (exchange_name(), exchange_type(), bool(), amqp_table()) -> exchange()).
+-spec(declare/4 :: (exchange_name(), exchange_type(), boolean(), amqp_table()) -> exchange()).
-spec(check_type/1 :: (binary()) -> atom()).
-spec(assert_type/2 :: (exchange(), atom()) -> 'ok').
-spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()).
@@ -82,9 +82,9 @@
[{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
-spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok').
-spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok').
--spec(topic_matches/2 :: (binary(), binary()) -> bool()).
--spec(headers_match/2 :: (amqp_table(), amqp_table()) -> bool()).
--spec(delete/2 :: (exchange_name(), bool()) ->
+-spec(topic_matches/2 :: (binary(), binary()) -> boolean()).
+-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()).
+-spec(delete/2 :: (exchange_name(), boolean()) ->
'ok' | not_found() | {'error', 'in_use'}).
-spec(list_queue_bindings/1 :: (queue_name()) ->
[{exchange_name(), routing_key(), amqp_table()}]).
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 2be005034e..b789fbd1e0 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -42,6 +42,7 @@
terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
+-define(SERIAL_FILENAME, "rabbit_serial").
-record(state, {serial}).
@@ -59,17 +60,28 @@
%%----------------------------------------------------------------------------
start_link() ->
- %% The persister can get heavily loaded, and we don't want that to
- %% impact guid generation. We therefore keep the serial in a
- %% separate process rather than calling rabbit_persister:serial/0
- %% directly in the functions below.
gen_server:start_link({local, ?SERVER}, ?MODULE,
- [rabbit_persister:serial()], []).
+ [update_disk_serial()], []).
+
+update_disk_serial() ->
+ Filename = filename:join(rabbit_mnesia:dir(), ?SERIAL_FILENAME),
+ Serial = case rabbit_misc:read_term_file(Filename) of
+ {ok, [Num]} -> Num;
+ {error, enoent} -> rabbit_persister:serial();
+ {error, Reason} ->
+ throw({error, {cannot_read_serial_file, Filename, Reason}})
+ end,
+ case rabbit_misc:write_term_file(Filename, [Serial + 1]) of
+ ok -> ok;
+ {error, Reason1} ->
+ throw({error, {cannot_write_serial_file, Filename, Reason1}})
+ end,
+ Serial.
%% generate a guid that is monotonically increasing per process.
%%
%% The id is only unique within a single cluster and as long as the
-%% persistent message store hasn't been deleted.
+%% serial store hasn't been deleted.
guid() ->
%% We don't use erlang:now() here because a) it may return
%% duplicates when the system clock has been rewound prior to a
@@ -77,7 +89,7 @@ guid() ->
%% now() to move ahead of the system time), and b) it is really
%% slow since it takes a global lock and makes a system call.
%%
- %% rabbit_persister:serial/0, in combination with self/0 (which
+ %% A persisted serial number, in combination with self/0 (which
%% includes the node name) uniquely identifies a process in space
%% and time. We combine that with a process-local counter to give
%% us a GUID that is monotonically increasing per process.
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index 0a68c9adad..ed0066fe07 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -53,7 +53,7 @@ start_heartbeat(Sock, TimeoutSec) ->
spawn_link(fun () -> heartbeater(Sock, TimeoutSec * 1000 div 2,
send_oct, 0,
fun () ->
- catch gen_tcp:send(Sock, rabbit_binary_generator:build_heartbeat_frame()),
+ catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()),
continue
end,
erlang:monitor(process, Parent)) end),
@@ -73,7 +73,7 @@ heartbeater(Sock, TimeoutMillisec, StatName, Threshold, Handler, MonitorRef) ->
{'DOWN', MonitorRef, process, _Object, _Info} -> ok;
Other -> exit({unexpected_message, Other})
after TimeoutMillisec ->
- case inet:getstat(Sock, [StatName]) of
+ case rabbit_net:getstat(Sock, [StatName]) of
{ok, [{StatName, NewStatVal}]} ->
if NewStatVal =/= StatVal ->
F({NewStatVal, 0});
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 9f3dcbd071..087a9f64d9 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -47,7 +47,7 @@
-spec(start_link/1 :: (pid()) -> pid()).
-spec(shutdown/1 :: (maybe_pid()) -> 'ok').
-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
--spec(can_send/3 :: (maybe_pid(), pid(), bool()) -> bool()).
+-spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()).
-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
-spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok').
diff --git a/src/rabbit_load.erl b/src/rabbit_load.erl
index 7bf85347fb..6ef638cb59 100644
--- a/src/rabbit_load.erl
+++ b/src/rabbit_load.erl
@@ -41,7 +41,7 @@
-ifdef(use_specs).
-type(erlang_node() :: atom()).
--type(load() :: {{non_neg_integer(), float()}, erlang_node()}).
+-type(load() :: {{non_neg_integer(), integer() | 'unknown'}, erlang_node()}).
-spec(local_load/0 :: () -> load()).
-spec(remote_loads/0 :: () -> [load()]).
-spec(pick/0 :: () -> erlang_node()).
@@ -52,8 +52,11 @@
local_load() ->
LoadAvg = case whereis(cpu_sup) of
- undefined -> 0.0;
- _Other -> cpu_sup:avg1()
+ undefined -> unknown;
+ _ -> case cpu_sup:avg1() of
+ L when is_integer(L) -> L;
+ {error, timeout} -> unknown
+ end
end,
{{statistics(run_queue), LoadAvg}, node()}.
@@ -65,8 +68,12 @@ remote_loads() ->
pick() ->
RemoteLoads = remote_loads(),
{{RunQ, LoadAvg}, Node} = local_load(),
- %% add bias towards current node
- AdjustedLoadAvg = LoadAvg * ?FUDGE_FACTOR,
+ %% add bias towards current node; we rely on Erlang's term order
+ %% of SomeFloat < local_unknown < unknown.
+ AdjustedLoadAvg = case LoadAvg of
+ unknown -> local_unknown;
+ _ -> LoadAvg * ?FUDGE_FACTOR
+ end,
Loads = [{{RunQ, AdjustedLoadAvg}, Node} | RemoteLoads],
{_, SelectedNode} = lists:min(Loads),
SelectedNode.
diff --git a/src/rabbit_memsup.erl b/src/rabbit_memsup.erl
new file mode 100644
index 0000000000..b0d57cb27e
--- /dev/null
+++ b/src/rabbit_memsup.erl
@@ -0,0 +1,142 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_memsup).
+
+-behaviour(gen_server).
+
+-export([start_link/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-export([update/0]).
+
+-record(state, {memory_fraction,
+ timeout,
+ timer,
+ mod,
+ mod_state,
+ alarmed
+ }).
+
+-define(SERVER, memsup). %% must be the same as the standard memsup
+
+-define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_link/1 :: (atom()) -> {'ok', pid()} | 'ignore' | {'error', any()}).
+-spec(update/0 :: () -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+start_link(Args) ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []).
+
+update() ->
+ gen_server:cast(?SERVER, update).
+
+%%----------------------------------------------------------------------------
+
+init([Mod]) ->
+ Fraction = os_mon:get_env(memsup, system_memory_high_watermark),
+ TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL),
+ InitState = Mod:init(),
+ State = #state { memory_fraction = Fraction,
+ timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL,
+ timer = TRef,
+ mod = Mod,
+ mod_state = InitState,
+ alarmed = false },
+ {ok, internal_update(State)}.
+
+start_timer(Timeout) ->
+ {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []),
+ TRef.
+
+%% Export the same API as the real memsup. Note that
+%% get_sysmem_high_watermark gives an int in the range 0 - 100, while
+%% set_sysmem_high_watermark takes a float in the range 0.0 - 1.0.
+handle_call(get_sysmem_high_watermark, _From, State) ->
+ {reply, trunc(100 * State#state.memory_fraction), State};
+
+handle_call({set_sysmem_high_watermark, Float}, _From, State) ->
+ {reply, ok, State#state{memory_fraction = Float}};
+
+handle_call(get_check_interval, _From, State) ->
+ {reply, State#state.timeout, State};
+
+handle_call({set_check_interval, Timeout}, _From, State) ->
+ {ok, cancel} = timer:cancel(State#state.timer),
+ {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}};
+
+handle_call(get_memory_data, _From,
+ State = #state { mod = Mod, mod_state = ModState }) ->
+ {reply, Mod:get_memory_data(ModState), State};
+
+handle_call(_Request, _From, State) ->
+ {noreply, State}.
+
+handle_cast(update, State) ->
+ {noreply, internal_update(State)};
+
+handle_cast(_Request, State) ->
+ {noreply, State}.
+
+handle_info(_Info, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+internal_update(State = #state { memory_fraction = MemoryFraction,
+ alarmed = Alarmed,
+ mod = Mod, mod_state = ModState }) ->
+ ModState1 = Mod:update(ModState),
+ {MemTotal, MemUsed, _BigProc} = Mod:get_memory_data(ModState1),
+ NewAlarmed = MemUsed / MemTotal > MemoryFraction,
+ case {Alarmed, NewAlarmed} of
+ {false, true} ->
+ alarm_handler:set_alarm({system_memory_high_watermark, []});
+ {true, false} ->
+ alarm_handler:clear_alarm(system_memory_high_watermark);
+ _ ->
+ ok
+ end,
+ State #state { mod_state = ModState1, alarmed = NewAlarmed }.
diff --git a/src/rabbit_memsup_darwin.erl b/src/rabbit_memsup_darwin.erl
new file mode 100644
index 0000000000..3de2d8430e
--- /dev/null
+++ b/src/rabbit_memsup_darwin.erl
@@ -0,0 +1,88 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_memsup_darwin).
+
+-export([init/0, update/1, get_memory_data/1]).
+
+-record(state, {total_memory,
+ allocated_memory}).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(state() :: #state { total_memory :: ('undefined' | non_neg_integer()),
+ allocated_memory :: ('undefined' | non_neg_integer())
+ }).
+
+-spec(init/0 :: () -> state()).
+-spec(update/1 :: (state()) -> state()).
+-spec(get_memory_data/1 :: (state()) -> {non_neg_integer(), non_neg_integer(),
+ ('undefined' | pid())}).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+init() ->
+ #state{total_memory = undefined,
+ allocated_memory = undefined}.
+
+update(State) ->
+ File = os:cmd("/usr/bin/vm_stat"),
+ Lines = string:tokens(File, "\n"),
+ Dict = dict:from_list(lists:map(fun parse_line/1, Lines)),
+ [PageSize, Inactive, Active, Free, Wired] =
+ [dict:fetch(Key, Dict) ||
+ Key <- [page_size, 'Pages inactive', 'Pages active', 'Pages free',
+ 'Pages wired down']],
+ MemTotal = PageSize * (Inactive + Active + Free + Wired),
+ MemUsed = PageSize * (Active + Wired),
+ State#state{total_memory = MemTotal, allocated_memory = MemUsed}.
+
+get_memory_data(State) ->
+ {State#state.total_memory, State#state.allocated_memory, undefined}.
+
+%%----------------------------------------------------------------------------
+
+%% A line looks like "Foo bar: 123456."
+parse_line(Line) ->
+ [Name, RHS | _Rest] = string:tokens(Line, ":"),
+ case Name of
+ "Mach Virtual Memory Statistics" ->
+ ["(page", "size", "of", PageSize, "bytes)"] =
+ string:tokens(RHS, " "),
+ {page_size, list_to_integer(PageSize)};
+ _ ->
+ [Value | _Rest1] = string:tokens(RHS, " ."),
+ {list_to_atom(Name), list_to_integer(Value)}
+ end.
diff --git a/src/rabbit_memsup_linux.erl b/src/rabbit_memsup_linux.erl
index ffdc7e9946..ca942d7caa 100644
--- a/src/rabbit_memsup_linux.erl
+++ b/src/rabbit_memsup_linux.erl
@@ -31,104 +31,44 @@
-module(rabbit_memsup_linux).
--behaviour(gen_server).
+-export([init/0, update/1, get_memory_data/1]).
--export([start_link/0]).
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
--export([update/0]).
-
--define(SERVER, memsup). %% must be the same as the standard memsup
-
--define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000).
-
--record(state, {memory_fraction, alarmed, timeout, timer}).
+-record(state, {total_memory,
+ allocated_memory}).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
--spec(update/0 :: () -> 'ok').
-
--endif.
-
-%%----------------------------------------------------------------------------
-
-start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+-type(state() :: #state { total_memory :: ('undefined' | non_neg_integer()),
+ allocated_memory :: ('undefined' | non_neg_integer())
+ }).
+-spec(init/0 :: () -> state()).
+-spec(update/1 :: (state()) -> state()).
+-spec(get_memory_data/1 :: (state()) -> {non_neg_integer(), non_neg_integer(),
+ ('undefined' | pid())}).
-update() ->
- gen_server:cast(?SERVER, update).
+-endif.
%%----------------------------------------------------------------------------
-init(_Args) ->
- Fraction = os_mon:get_env(memsup, system_memory_high_watermark),
- TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL),
- {ok, #state{alarmed = false,
- memory_fraction = Fraction,
- timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL,
- timer = TRef}}.
-
-start_timer(Timeout) ->
- {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []),
- TRef.
-
-%% Export the same API as the real memsup. Note that
-%% get_sysmem_high_watermark gives an int in the range 0 - 100, while
-%% set_sysmem_high_watermark takes a float in the range 0.0 - 1.0.
-handle_call(get_sysmem_high_watermark, _From, State) ->
- {reply, trunc(100 * State#state.memory_fraction), State};
-
-handle_call({set_sysmem_high_watermark, Float}, _From, State) ->
- {reply, ok, State#state{memory_fraction = Float}};
+init() ->
+ #state{total_memory = undefined,
+ allocated_memory = undefined}.
-handle_call(get_check_interval, _From, State) ->
- {reply, State#state.timeout, State};
-
-handle_call({set_check_interval, Timeout}, _From, State) ->
- {ok, cancel} = timer:cancel(State#state.timer),
- {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}};
-
-handle_call(_Request, _From, State) ->
- {noreply, State}.
-
-handle_cast(update, State = #state{alarmed = Alarmed,
- memory_fraction = MemoryFraction}) ->
+update(State) ->
File = read_proc_file("/proc/meminfo"),
Lines = string:tokens(File, "\n"),
Dict = dict:from_list(lists:map(fun parse_line/1, Lines)),
- MemTotal = dict:fetch('MemTotal', Dict),
- MemUsed = MemTotal
- - dict:fetch('MemFree', Dict)
- - dict:fetch('Buffers', Dict)
- - dict:fetch('Cached', Dict),
- NewAlarmed = MemUsed / MemTotal > MemoryFraction,
- case {Alarmed, NewAlarmed} of
- {false, true} ->
- alarm_handler:set_alarm({system_memory_high_watermark, []});
- {true, false} ->
- alarm_handler:clear_alarm(system_memory_high_watermark);
- _ ->
- ok
- end,
- {noreply, State#state{alarmed = NewAlarmed}};
-
-handle_cast(_Request, State) ->
- {noreply, State}.
-
-handle_info(_Info, State) ->
- {noreply, State}.
-
-terminate(_Reason, _State) ->
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
+ [MemTotal, MemFree, Buffers, Cached] =
+ [dict:fetch(Key, Dict) ||
+ Key <- ['MemTotal', 'MemFree', 'Buffers', 'Cached']],
+ MemUsed = MemTotal - MemFree - Buffers - Cached,
+ State#state{total_memory = MemTotal, allocated_memory = MemUsed}.
+
+get_memory_data(State) ->
+ {State#state.total_memory, State#state.allocated_memory, undefined}.
%%----------------------------------------------------------------------------
@@ -152,5 +92,10 @@ read_proc_file(IoDevice, Acc) ->
%% A line looks like "FooBar: 123456 kB"
parse_line(Line) ->
- [Name, Value | _] = string:tokens(Line, ": "),
- {list_to_atom(Name), list_to_integer(Value)}.
+ [Name, RHS | _Rest] = string:tokens(Line, ":"),
+ [Value | UnitsRest] = string:tokens(RHS, " "),
+ Value1 = case UnitsRest of
+ [] -> list_to_integer(Value); %% no units
+ ["kB"] -> list_to_integer(Value) * 1024
+ end,
+ {list_to_atom(Name), Value1}.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index abf4c7ccfa..b20e9a86b6 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -35,7 +35,8 @@
-include_lib("kernel/include/file.hrl").
-export([method_record_type/1, polite_pause/0, polite_pause/1]).
--export([die/1, frame_error/2, protocol_error/3, protocol_error/4]).
+-export([die/1, frame_error/2, amqp_error/4,
+ protocol_error/3, protocol_error/4]).
-export([not_found/1]).
-export([get_config/1, get_config/2, set_config/2]).
-export([dirty_read/1]).
@@ -46,13 +47,15 @@
-export([with_user/2, with_vhost/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
-export([ensure_ok/2]).
--export([localnode/1, tcp_name/3]).
+-export([localnode/1, nodehost/1, cookie_hash/0, tcp_name/3]).
-export([intersperse/2, upmap/2, map_in_order/2]).
-export([table_foreach/2]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
+-export([read_term_file/1, write_term_file/2]).
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([format_stderr/2]).
-export([start_applications/1, stop_applications/1]).
+-export([unfold/2, ceil/1]).
-import(mnesia).
-import(lists).
@@ -65,15 +68,16 @@
-include_lib("kernel/include/inet.hrl").
+-type(ok_or_error() :: 'ok' | {'error', any()}).
+
-spec(method_record_type/1 :: (tuple()) -> atom()).
-spec(polite_pause/0 :: () -> 'done').
-spec(polite_pause/1 :: (non_neg_integer()) -> 'done').
-spec(die/1 :: (atom()) -> no_return()).
-spec(frame_error/2 :: (atom(), binary()) -> no_return()).
--spec(protocol_error/3 ::
- (atom() | amqp_error(), string(), [any()]) -> no_return()).
--spec(protocol_error/4 ::
- (atom() | amqp_error(), string(), [any()], atom()) -> no_return()).
+-spec(amqp_error/4 :: (atom(), string(), [any()], atom()) -> amqp_error()).
+-spec(protocol_error/3 :: (atom(), string(), [any()]) -> no_return()).
+-spec(protocol_error/4 :: (atom(), string(), [any()], atom()) -> no_return()).
-spec(not_found/1 :: (r(atom())) -> no_return()).
-spec(get_config/1 :: (atom()) -> {'ok', any()} | not_found()).
-spec(get_config/2 :: (atom(), A) -> A).
@@ -88,9 +92,9 @@
-spec(r_arg/4 :: (vhost() | r(atom()), K, amqp_table(), binary()) ->
undefined | r(K) when is_subtype(K, atom())).
-spec(rs/1 :: (r(atom())) -> string()).
--spec(enable_cover/0 :: () -> 'ok' | {'error', any()}).
+-spec(enable_cover/0 :: () -> ok_or_error()).
-spec(report_cover/0 :: () -> 'ok').
--spec(enable_cover/1 :: (string()) -> 'ok' | {'error', any()}).
+-spec(enable_cover/1 :: (string()) -> ok_or_error()).
-spec(report_cover/1 :: (string()) -> 'ok').
-spec(throw_on_error/2 ::
(atom(), thunk({error, any()} | {ok, A} | A)) -> A).
@@ -100,8 +104,10 @@
-spec(with_vhost/2 :: (vhost(), thunk(A)) -> A).
-spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A).
-spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A).
--spec(ensure_ok/2 :: ('ok' | {'error', any()}, atom()) -> 'ok').
+-spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok').
-spec(localnode/1 :: (atom()) -> erlang_node()).
+-spec(nodehost/1 :: (erlang_node()) -> string()).
+-spec(cookie_hash/0 :: () -> string()).
-spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()).
-spec(intersperse/2 :: (A, [A]) -> [A]).
-spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]).
@@ -110,12 +116,16 @@
-spec(dirty_read_all/1 :: (atom()) -> [any()]).
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) ->
'ok' | 'aborted').
--spec(dirty_dump_log/1 :: (string()) -> 'ok' | {'error', any()}).
--spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}).
+-spec(dirty_dump_log/1 :: (string()) -> ok_or_error()).
+-spec(read_term_file/1 :: (string()) -> {'ok', [any()]} | {'error', any()}).
+-spec(write_term_file/2 :: (string(), [any()]) -> ok_or_error()).
+-spec(append_file/2 :: (string(), string()) -> ok_or_error()).
-spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok').
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
-spec(start_applications/1 :: ([atom()]) -> 'ok').
-spec(stop_applications/1 :: ([atom()]) -> 'ok').
+-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
+-spec(ceil/1 :: (number()) -> number()).
-endif.
@@ -136,15 +146,17 @@ die(Error) ->
protocol_error(Error, "~w", [Error]).
frame_error(MethodName, BinaryFields) ->
- protocol_error(frame_error, "cannot decode ~w",
- [BinaryFields], MethodName).
+ protocol_error(frame_error, "cannot decode ~w", [BinaryFields], MethodName).
+
+amqp_error(Name, ExplanationFormat, Params, Method) ->
+ Explanation = lists:flatten(io_lib:format(ExplanationFormat, Params)),
+ #amqp_error{name = Name, explanation = Explanation, method = Method}.
-protocol_error(Error, Explanation, Params) ->
- protocol_error(Error, Explanation, Params, none).
+protocol_error(Name, ExplanationFormat, Params) ->
+ protocol_error(Name, ExplanationFormat, Params, none).
-protocol_error(Error, Explanation, Params, Method) ->
- CompleteExplanation = lists:flatten(io_lib:format(Explanation, Params)),
- exit({amqp, Error, CompleteExplanation, Method}).
+protocol_error(Name, ExplanationFormat, Params, Method) ->
+ exit(amqp_error(Name, ExplanationFormat, Params, Method)).
not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]).
@@ -297,11 +309,15 @@ ensure_ok(ok, _) -> ok;
ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}).
localnode(Name) ->
+ list_to_atom(lists:append([atom_to_list(Name), "@", nodehost(node())])).
+
+nodehost(Node) ->
%% This is horrible, but there doesn't seem to be a way to split a
%% nodename into its constituent parts.
- list_to_atom(lists:append(atom_to_list(Name),
- lists:dropwhile(fun (E) -> E =/= $@ end,
- atom_to_list(node())))).
+ tl(lists:dropwhile(fun (E) -> E =/= $@ end, atom_to_list(Node))).
+
+cookie_hash() ->
+ ssl_base64:encode(erlang:md5(atom_to_list(erlang:get_cookie()))).
tcp_name(Prefix, IPAddress, Port)
when is_atom(Prefix) andalso is_number(Port) ->
@@ -360,7 +376,9 @@ dirty_foreach_key1(F, TableName, K) ->
end.
dirty_dump_log(FileName) ->
- {ok, LH} = disk_log:open([{name, dirty_dump_log}, {mode, read_only}, {file, FileName}]),
+ {ok, LH} = disk_log:open([{name, dirty_dump_log},
+ {mode, read_only},
+ {file, FileName}]),
dirty_dump_log1(LH, disk_log:chunk(LH, start)),
disk_log:close(LH).
@@ -374,6 +392,12 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) ->
dirty_dump_log1(LH, disk_log:chunk(LH, K)).
+read_term_file(File) -> file:consult(File).
+
+write_term_file(File, Terms) ->
+ file:write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) ||
+ Term <- Terms])).
+
append_file(File, Suffix) ->
case file:read_file_info(File) of
{ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix);
@@ -444,3 +468,18 @@ stop_applications(Apps) ->
cannot_stop_application,
Apps).
+unfold(Fun, Init) ->
+ unfold(Fun, [], Init).
+
+unfold(Fun, Acc, Init) ->
+ case Fun(Init) of
+ {true, E, I} -> unfold(Fun, [E|Acc], I);
+ false -> {Acc, Init}
+ end.
+
+ceil(N) ->
+ T = trunc(N),
+ case N - T of
+ 0 -> N;
+ _ -> 1 + T
+ end.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 575ecb0adc..c4d5aac684 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -50,7 +50,7 @@
-spec(dir/0 :: () -> string()).
-spec(ensure_mnesia_dir/0 :: () -> 'ok').
-spec(init/0 :: () -> 'ok').
--spec(is_db_empty/0 :: () -> bool()).
+-spec(is_db_empty/0 :: () -> boolean()).
-spec(cluster/1 :: ([erlang_node()]) -> 'ok').
-spec(reset/0 :: () -> 'ok').
-spec(force_reset/0 :: () -> 'ok').
@@ -149,6 +149,11 @@ table_definitions() ->
table_names() ->
[Tab || {Tab, _} <- table_definitions()].
+replicated_table_names() ->
+ [Tab || {Tab, Attrs} <- table_definitions(),
+ not lists:member({local_content, true}, Attrs)
+ ].
+
dir() -> mnesia:system_info(directory).
ensure_mnesia_dir() ->
@@ -192,28 +197,16 @@ cluster_nodes_config_filename() ->
create_cluster_nodes_config(ClusterNodes) ->
FileName = cluster_nodes_config_filename(),
- Handle = case file:open(FileName, [write]) of
- {ok, Device} -> Device;
- {error, Reason} ->
- throw({error, {cannot_create_cluster_nodes_config,
- FileName, Reason}})
- end,
- try
- ok = io:write(Handle, ClusterNodes),
- ok = io:put_chars(Handle, [$.])
- after
- case file:close(Handle) of
- ok -> ok;
- {error, Reason1} ->
- throw({error, {cannot_close_cluster_nodes_config,
- FileName, Reason1}})
- end
- end,
- ok.
+ case rabbit_misc:write_term_file(FileName, [ClusterNodes]) of
+ ok -> ok;
+ {error, Reason} ->
+ throw({error, {cannot_create_cluster_nodes_config,
+ FileName, Reason}})
+ end.
read_cluster_nodes_config() ->
FileName = cluster_nodes_config_filename(),
- case file:consult(FileName) of
+ case rabbit_misc:read_term_file(FileName) of
{ok, [ClusterNodes]} -> ClusterNodes;
{error, enoent} ->
case application:get_env(cluster_config) of
@@ -250,12 +243,10 @@ delete_cluster_nodes_config() ->
%% standalone disk node, or disk or ram node connected to the
%% specified cluster nodes.
init_db(ClusterNodes) ->
- WasDiskNode = mnesia:system_info(use_dir),
- IsDiskNode = ClusterNodes == [] orelse
- lists:member(node(), ClusterNodes),
case mnesia:change_config(extra_db_nodes, ClusterNodes -- [node()]) of
{ok, []} ->
- if WasDiskNode and IsDiskNode ->
+ case mnesia:system_info(use_dir) of
+ true ->
case check_schema_integrity() of
ok ->
ok;
@@ -270,22 +261,18 @@ init_db(ClusterNodes) ->
ok = move_db(),
ok = create_schema()
end;
- WasDiskNode ->
- throw({error, {cannot_convert_disk_node_to_ram_node,
- ClusterNodes}});
- IsDiskNode ->
- ok = create_schema();
- true ->
- throw({error, {unable_to_contact_cluster_nodes,
- ClusterNodes}})
+ false ->
+ ok = create_schema()
end;
{ok, [_|_]} ->
- ok = wait_for_tables(),
- ok = create_local_table_copies(
- case IsDiskNode of
- true -> disc;
- false -> ram
- end);
+ IsDiskNode = ClusterNodes == [] orelse
+ lists:member(node(), ClusterNodes),
+ ok = wait_for_replicated_tables(),
+ ok = create_local_table_copy(schema, disc_copies),
+ ok = create_local_table_copies(case IsDiskNode of
+ true -> disc;
+ false -> ram
+ end);
{error, Reason} ->
%% one reason we may end up here is if we try to join
%% nodes together that are currently running standalone or
@@ -336,40 +323,36 @@ create_tables() ->
table_definitions()),
ok.
+table_has_copy_type(TabDef, DiscType) ->
+ lists:member(node(), proplists:get_value(DiscType, TabDef, [])).
+
create_local_table_copies(Type) ->
- ok = if Type /= ram -> create_local_table_copy(schema, disc_copies);
- true -> ok
- end,
lists:foreach(
fun({Tab, TabDef}) ->
- HasDiscCopies =
- lists:keymember(disc_copies, 1, TabDef),
- HasDiscOnlyCopies =
- lists:keymember(disc_only_copies, 1, TabDef),
+ HasDiscCopies = table_has_copy_type(TabDef, disc_copies),
+ HasDiscOnlyCopies = table_has_copy_type(TabDef, disc_only_copies),
+ LocalTab = proplists:get_bool(local_content, TabDef),
StorageType =
- case Type of
- disc ->
+ if
+ Type =:= disc orelse LocalTab ->
if
- HasDiscCopies -> disc_copies;
+ HasDiscCopies -> disc_copies;
HasDiscOnlyCopies -> disc_only_copies;
- true -> ram_copies
+ true -> ram_copies
end;
%% unused code - commented out to keep dialyzer happy
-%% disc_only ->
+%% Type =:= disc_only ->
%% if
%% HasDiscCopies or HasDiscOnlyCopies ->
%% disc_only_copies;
%% true -> ram_copies
%% end;
- ram ->
+ Type =:= ram ->
ram_copies
end,
ok = create_local_table_copy(Tab, StorageType)
end,
table_definitions()),
- ok = if Type == ram -> create_local_table_copy(schema, ram_copies);
- true -> ok
- end,
ok.
create_local_table_copy(Tab, Type) ->
@@ -384,10 +367,14 @@ create_local_table_copy(Tab, Type) ->
end,
ok.
-wait_for_tables() ->
+wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()).
+
+wait_for_tables() -> wait_for_tables(table_names()).
+
+wait_for_tables(TableNames) ->
case check_schema_integrity() of
ok ->
- case mnesia:wait_for_tables(table_names(), 30000) of
+ case mnesia:wait_for_tables(TableNames, 30000) of
ok -> ok;
{timeout, BadTabs} ->
throw({error, {timeout_waiting_for_tables, BadTabs}});
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index d91975359a..b1cc4d028f 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -114,12 +114,13 @@ action(status, [], RpcTimeout) ->
io:format("Status of all running nodes...~n", []),
call_all_nodes(
fun({Node, Pid}) ->
- Status = rpc:call(Node, rabbit, status, [], RpcTimeout),
+ RabbitRunning =
+ case is_rabbit_running(Node, RpcTimeout) of
+ false -> not_running;
+ true -> running
+ end,
io:format("Node '~p' with Pid ~p: ~p~n",
- [Node, Pid, case parse_status(Status) of
- false -> not_running;
- true -> running
- end])
+ [Node, Pid, RabbitRunning])
end);
action(stop_all, [], RpcTimeout) ->
@@ -197,7 +198,7 @@ start_node(NodeName, NodePort, RpcTimeout) ->
wait_for_rabbit_to_start(_ , RpcTimeout, _) when RpcTimeout < 0 ->
false;
wait_for_rabbit_to_start(Node, RpcTimeout, Port) ->
- case parse_status(rpc:call(Node, rabbit, status, [])) of
+ case is_rabbit_running(Node, RpcTimeout) of
true -> true;
false -> receive
{'EXIT', Port, PosixCode} ->
@@ -211,22 +212,20 @@ wait_for_rabbit_to_start(Node, RpcTimeout, Port) ->
run_cmd(FullPath) ->
erlang:open_port({spawn, FullPath}, [nouse_stdio]).
-parse_status({badrpc, _}) ->
- false;
-
-parse_status(Status) ->
- case lists:keysearch(running_applications, 1, Status) of
- {value, {running_applications, Apps}} ->
- lists:keymember(rabbit, 1, Apps);
- _ ->
- false
+is_rabbit_running(Node, RpcTimeout) ->
+ case rpc:call(Node, rabbit, status, [], RpcTimeout) of
+ {badrpc, _} -> false;
+ Status -> case proplists:get_value(running_applications, Status) of
+ undefined -> false;
+ Apps -> lists:keymember(rabbit, 1, Apps)
+ end
end.
with_os(Handlers) ->
{OsFamily, _} = os:type(),
- case lists:keysearch(OsFamily, 1, Handlers) of
- {value, {_, Handler}} -> Handler();
- false -> throw({unsupported_os, OsFamily})
+ case proplists:get_value(OsFamily, Handlers) of
+ undefined -> throw({unsupported_os, OsFamily});
+ Handler -> Handler()
end.
script_filename() ->
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
new file mode 100644
index 0000000000..a5ccc8e9ae
--- /dev/null
+++ b/src/rabbit_net.erl
@@ -0,0 +1,132 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_net).
+-include("rabbit.hrl").
+-include_lib("kernel/include/inet.hrl").
+
+-export([async_recv/3, close/1, controlling_process/2,
+ getstat/2, peername/1, port_command/2,
+ send/2, sockname/1]).
+%%---------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(stat_option() ::
+ 'recv_cnt' | 'recv_max' | 'recv_avg' | 'recv_oct' | 'recv_dvi' |
+ 'send_cnt' | 'send_max' | 'send_avg' | 'send_oct' | 'send_pend').
+-type(error() :: {'error', any()}).
+
+-spec(async_recv/3 :: (socket(), integer(), timeout()) -> {'ok', any()}).
+-spec(close/1 :: (socket()) -> 'ok' | error()).
+-spec(controlling_process/2 :: (socket(), pid()) -> 'ok' | error()).
+-spec(port_command/2 :: (socket(), iolist()) -> 'true').
+-spec(send/2 :: (socket(), binary() | iolist()) -> 'ok' | error()).
+-spec(peername/1 :: (socket()) ->
+ {'ok', {ip_address(), non_neg_integer()}} | error()).
+-spec(sockname/1 :: (socket()) ->
+ {'ok', {ip_address(), non_neg_integer()}} | error()).
+-spec(getstat/2 :: (socket(), [stat_option()]) ->
+ {'ok', [{stat_option(), integer()}]} | error()).
+
+-endif.
+
+%%---------------------------------------------------------------------------
+
+
+async_recv(Sock, Length, Timeout) when is_record(Sock, ssl_socket) ->
+ Pid = self(),
+ Ref = make_ref(),
+
+ spawn(fun() -> Pid ! {inet_async, Sock, Ref,
+ ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)}
+ end),
+
+ {ok, Ref};
+
+async_recv(Sock, Length, infinity) when is_port(Sock) ->
+ prim_inet:async_recv(Sock, Length, -1);
+
+async_recv(Sock, Length, Timeout) when is_port(Sock) ->
+ prim_inet:async_recv(Sock, Length, Timeout).
+
+close(Sock) when is_record(Sock, ssl_socket) ->
+ ssl:close(Sock#ssl_socket.ssl);
+
+close(Sock) when is_port(Sock) ->
+ gen_tcp:close(Sock).
+
+
+controlling_process(Sock, Pid) when is_record(Sock, ssl_socket) ->
+ ssl:controlling_process(Sock#ssl_socket.ssl, Pid);
+
+controlling_process(Sock, Pid) when is_port(Sock) ->
+ gen_tcp:controlling_process(Sock, Pid).
+
+
+getstat(Sock, Stats) when is_record(Sock, ssl_socket) ->
+ inet:getstat(Sock#ssl_socket.tcp, Stats);
+
+getstat(Sock, Stats) when is_port(Sock) ->
+ inet:getstat(Sock, Stats).
+
+
+peername(Sock) when is_record(Sock, ssl_socket) ->
+ ssl:peername(Sock#ssl_socket.ssl);
+
+peername(Sock) when is_port(Sock) ->
+ inet:peername(Sock).
+
+
+port_command(Sock, Data) when is_record(Sock, ssl_socket) ->
+ case ssl:send(Sock#ssl_socket.ssl, Data) of
+ ok ->
+ self() ! {inet_reply, Sock, ok},
+ true;
+ {error, Reason} ->
+ erlang:error(Reason)
+ end;
+
+port_command(Sock, Data) when is_port(Sock) ->
+ erlang:port_command(Sock, Data).
+
+send(Sock, Data) when is_record(Sock, ssl_socket) ->
+ ssl:send(Sock#ssl_socket.ssl, Data);
+
+send(Sock, Data) when is_port(Sock) ->
+ gen_tcp:send(Sock, Data).
+
+
+sockname(Sock) when is_record(Sock, ssl_socket) ->
+ ssl:sockname(Sock#ssl_socket.ssl);
+
+sockname(Sock) when is_port(Sock) ->
+ inet:sockname(Sock).
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 2dbd5a5af2..1bc17a324c 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -31,18 +31,28 @@
-module(rabbit_networking).
--export([start/0, start_tcp_listener/2, stop_tcp_listener/2,
- on_node_down/1, active_listeners/0, node_listeners/1,
- connections/0, connection_info/1, connection_info/2,
- connection_info_all/0, connection_info_all/1]).
+-export([start/0, start_tcp_listener/2, start_ssl_listener/3,
+ stop_tcp_listener/2, on_node_down/1, active_listeners/0,
+ node_listeners/1, connections/0, connection_info/1,
+ connection_info/2, connection_info_all/0,
+ connection_info_all/1]).
%%used by TCP-based transports, e.g. STOMP adapter
-export([check_tcp_listener_address/3]).
--export([tcp_listener_started/2, tcp_listener_stopped/2, start_client/1]).
+-export([tcp_listener_started/2, tcp_listener_stopped/2,
+ start_client/1, start_ssl_client/2]).
-include("rabbit.hrl").
-include_lib("kernel/include/inet.hrl").
+-define(RABBIT_TCP_OPTS, [
+ binary,
+ {packet, raw}, % no packaging
+ {reuseaddr, true}, % allow rebind without waiting
+ %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg.
+ %% {delay_send, true},
+ {exit_on_close, false}
+ ]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -52,6 +62,7 @@
-spec(start/0 :: () -> 'ok').
-spec(start_tcp_listener/2 :: (host(), ip_port()) -> 'ok').
+-spec(start_ssl_listener/3 :: (host(), ip_port(), [info()]) -> 'ok').
-spec(stop_tcp_listener/2 :: (host(), ip_port()) -> 'ok').
-spec(active_listeners/0 :: () -> [listener()]).
-spec(node_listeners/1 :: (erlang_node()) -> [listener()]).
@@ -90,27 +101,30 @@ check_tcp_listener_address(NamePrefix, Host, Port) ->
if is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> ok;
true -> error_logger:error_msg("invalid port ~p - not 0..65535~n",
[Port]),
- throw({error, invalid_port, Port})
+ throw({error, {invalid_port, Port}})
end,
Name = rabbit_misc:tcp_name(NamePrefix, IPAddress, Port),
{IPAddress, Name}.
start_tcp_listener(Host, Port) ->
- {IPAddress, Name} = check_tcp_listener_address(rabbit_tcp_listener_sup, Host, Port),
+ start_listener(Host, Port, "TCP Listener",
+ {?MODULE, start_client, []}).
+
+start_ssl_listener(Host, Port, SslOpts) ->
+ start_listener(Host, Port, "SSL Listener",
+ {?MODULE, start_ssl_client, [SslOpts]}).
+
+start_listener(Host, Port, Label, OnConnect) ->
+ {IPAddress, Name} =
+ check_tcp_listener_address(rabbit_tcp_listener_sup, Host, Port),
{ok,_} = supervisor:start_child(
rabbit_sup,
{Name,
{tcp_listener_sup, start_link,
- [IPAddress, Port,
- [binary,
- {packet, raw}, % no packaging
- {reuseaddr, true}, % allow rebind without waiting
- %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg.
- %% {delay_send, true},
- {exit_on_close, false}],
+ [IPAddress, Port, ?RABBIT_TCP_OPTS ,
{?MODULE, tcp_listener_started, []},
{?MODULE, tcp_listener_stopped, []},
- {?MODULE, start_client, []}]},
+ OnConnect, Label]},
transient, infinity, supervisor, [tcp_listener_sup]}),
ok.
@@ -148,10 +162,35 @@ on_node_down(Node) ->
start_client(Sock) ->
{ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []),
- ok = gen_tcp:controlling_process(Sock, Child),
+ ok = rabbit_net:controlling_process(Sock, Child),
Child ! {go, Sock},
Child.
+start_ssl_client(SslOpts, Sock) ->
+ case rabbit_net:peername(Sock) of
+ {ok, {PeerAddress, PeerPort}} ->
+ PeerIp = inet_parse:ntoa(PeerAddress),
+ case ssl:ssl_accept(Sock, SslOpts) of
+ {ok, SslSock} ->
+ rabbit_log:info("upgraded TCP connection "
+ "from ~s:~p to SSL~n",
+ [PeerIp, PeerPort]),
+ RabbitSslSock = #ssl_socket{tcp = Sock, ssl = SslSock},
+ start_client(RabbitSslSock);
+ {error, Reason} ->
+ gen_tcp:close(Sock),
+ rabbit_log:error("failed to upgrade TCP connection "
+ "from ~s:~p to SSL: ~n~p~n",
+ [PeerIp, PeerPort, Reason]),
+ {error, Reason}
+ end;
+ {error, Reason} ->
+ gen_tcp:close(Sock),
+ rabbit_log:error("failed to upgrade TCP connection to SSL: ~p~n",
+ [Reason]),
+ {error, Reason}
+ end.
+
connections() ->
[Pid || {_, Pid, _, _} <- supervisor:which_children(
rabbit_tcp_client_sup)].
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
index 71278bfb2a..f28c4a6ec5 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_plugin_activator.erl
@@ -49,6 +49,8 @@ start() ->
UnpackedPluginDir = get_env(plugins_expand_dir, ?DefaultUnpackedPluginDir),
RabbitEBin = get_env(rabbit_ebin, ?DefaultRabbitEBin),
+ RootName = RabbitEBin ++ "/rabbit",
+
%% Unpack any .ez plugins
unpack_ez_plugins(PluginDir, UnpackedPluginDir),
@@ -60,15 +62,13 @@ start() ->
%% Build the entire set of dependencies - this will load the
%% applications along the way
AllApps = case catch sets:to_list(expand_dependencies(RequiredApps)) of
- {unknown_app, {App, Err}} ->
- io:format("ERROR: Failed to load application " ++
- "~s: ~p~n", [App, Err]),
- halt(1);
+ {failed_to_load_app, App, Err} ->
+ error("failed to load application ~s: ~p", [App, Err]);
AppList ->
AppList
end,
AppVersions = [determine_version(App) || App <- AllApps],
- {value, {rabbit, RabbitVersion}} = lists:keysearch(rabbit, 1, AppVersions),
+ {rabbit, RabbitVersion} = proplists:lookup(rabbit, AppVersions),
%% Build the overall release descriptor
RDesc = {release,
@@ -77,11 +77,11 @@ start() ->
AppVersions},
%% Write it out to ebin/rabbit.rel
- file:write_file(RabbitEBin ++ "/rabbit.rel",
- io_lib:format("~p.~n", [RDesc])),
+ file:write_file(RootName ++ ".rel", io_lib:format("~p.~n", [RDesc])),
%% Compile the script
- case systools:make_script(RabbitEBin ++ "/rabbit", [local, silent]) of
+ ScriptFile = RootName ++ ".script",
+ case systools:make_script(RootName, [local, silent]) of
{ok, Module, Warnings} ->
%% This gets lots of spurious no-source warnings when we
%% have .ez files, so we want to supress them to prevent
@@ -98,9 +98,19 @@ start() ->
end,
ok;
{error, Module, Error} ->
- io:format("Boot file generation failed: ~s~n",
- [Module:format_error(Error)]),
- halt(1)
+ error("generation of boot script file ~s failed: ~w",
+ [ScriptFile, Module:format_error(Error)])
+ end,
+
+ case post_process_script(ScriptFile) of
+ ok -> ok;
+ {error, Reason} ->
+ error("post processing of boot script file ~s failed: ~w",
+ [ScriptFile, Reason])
+ end,
+ case systools:script2boot(RootName) of
+ ok -> ok;
+ error -> error("failed to compile boot script file ~s", [ScriptFile])
end,
halt(),
ok.
@@ -122,10 +132,10 @@ determine_version(App) ->
assert_dir(Dir) ->
case filelib:is_dir(Dir) of
true -> ok;
- false ->
- ok = filelib:ensure_dir(Dir),
- ok = file:make_dir(Dir)
+ false -> ok = filelib:ensure_dir(Dir),
+ ok = file:make_dir(Dir)
end.
+
delete_dir(Dir) ->
case filelib:is_dir(Dir) of
true ->
@@ -143,6 +153,7 @@ delete_dir(Dir) ->
false ->
ok
end.
+
is_symlink(Name) ->
case file:read_link(Name) of
{ok, _} -> true;
@@ -185,14 +196,43 @@ expand_dependencies(Current, [Next|Rest]) ->
expand_dependencies(Current, Rest);
false ->
case application:load(Next) of
- ok ->
+ ok ->
ok;
- {error, {already_loaded, _}} ->
+ {error, {already_loaded, _}} ->
ok;
- X ->
- throw({unknown_app, {Next, X}})
+ {error, Reason} ->
+ throw({failed_to_load_app, Next, Reason})
end,
{ok, Required} = application:get_key(Next, applications),
Unique = [A || A <- Required, not(sets:is_element(A, Current))],
expand_dependencies(sets:add_element(Next, Current), Rest ++ Unique)
end.
+
+post_process_script(ScriptFile) ->
+ case file:consult(ScriptFile) of
+ {ok, [{script, Name, Entries}]} ->
+ NewEntries = process_entries(Entries),
+ case file:open(ScriptFile, [write]) of
+ {ok, Fd} ->
+ io:format(Fd, "%% script generated at ~w ~w~n~p.~n",
+ [date(), time(), {script, Name, NewEntries}]),
+ file:close(Fd),
+ ok;
+ {error, OReason} ->
+ {error, {failed_to_open_script_file_for_writing, OReason}}
+ end;
+ {error, Reason} ->
+ {error, {failed_to_load_script, Reason}}
+ end.
+
+process_entries([]) ->
+ [];
+process_entries([Entry = {apply,{application,start_boot,[stdlib,permanent]}} |
+ Rest]) ->
+ [Entry, {apply,{rabbit,prepare,[]}} | Rest];
+process_entries([Entry|Rest]) ->
+ [Entry | process_entries(Rest)].
+
+error(Fmt, Args) ->
+ io:format("ERROR: " ++ Fmt ++ "~n", Args),
+ halt(1).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 6a7d68084d..5816ba109b 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -49,7 +49,6 @@
-define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
-define(CLOSING_TIMEOUT, 1).
--define(CHANNEL_CLOSING_TIMEOUT, 1).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
%---------------------------------------------------------------------------
@@ -94,23 +93,19 @@
%% -> log error, wait for channels to terminate forcefully, start
%% terminate_connection timer, send close, *closed*
%% channel exit with soft error
-%% -> log error, start terminate_channel timer, mark channel as
-%% closing, *running*
-%% terminate_channel timeout -> remove 'closing' mark, *running*
+%% -> log error, mark channel as closing, *running*
%% handshake_timeout -> ignore, *running*
%% heartbeat timeout -> *throw*
%% closing:
%% socket close -> *terminate*
%% receive frame -> ignore, *closing*
-%% terminate_channel timeout -> remove 'closing' mark, *closing*
%% handshake_timeout -> ignore, *closing*
%% heartbeat timeout -> *throw*
%% channel exit with hard error
%% -> log error, wait for channels to terminate forcefully, start
%% terminate_connection timer, send close, *closed*
%% channel exit with soft error
-%% -> log error, start terminate_channel timer, mark channel as
-%% closing
+%% -> log error, mark channel as closing
%% if last channel to exit then send connection.close_ok,
%% start terminate_connection timer, *closed*
%% else *closing*
@@ -123,7 +118,6 @@
%% *closed*
%% receive frame -> ignore, *closed*
%% terminate_connection timeout -> *terminate*
-%% terminate_channel timeout -> remove 'closing' mark, *closed*
%% handshake_timeout -> ignore, *closed*
%% heartbeat timeout -> *throw*
%% channel exit -> log error, *closed*
@@ -200,7 +194,7 @@ inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
peername(Sock) ->
try
- {Address, Port} = inet_op(fun () -> inet:peername(Sock) end),
+ {Address, Port} = inet_op(fun () -> rabbit_net:peername(Sock) end),
AddressS = inet_parse:ntoa(Address),
{AddressS, Port}
catch
@@ -269,12 +263,10 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
throw({inet_error, Reason});
{'EXIT', Parent, Reason} ->
if State#v1.connection_state =:= running ->
- send_exception(
- State, 0,
- {amqp, connection_forced,
- io_lib:format(
- "broker forced connection closure with reason '~w'",
- [Reason]), none});
+ send_exception(State, 0,
+ rabbit_misc:amqp_error(connection_forced,
+ "broker forced connection closure with reason '~w'",
+ [Reason], none));
true -> ok
end,
%% this is what we are expected to do according to
@@ -286,14 +278,12 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
%% since this termination is initiated by our parent it is
%% probably more important to exit quickly.
exit(Reason);
- {'EXIT', _Pid, E = {writer, send_failed, _Error}} ->
+ {channel_exit, _Chan, E = {writer, send_failed, _Error}} ->
throw(E);
{channel_exit, Channel, Reason} ->
mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State));
{'EXIT', Pid, Reason} ->
mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State));
- {terminate_channel, Channel, Ref1} ->
- mainloop(Parent, Deb, terminate_channel(Channel, Ref1, State));
terminate_connection ->
State;
handshake_timeout ->
@@ -323,8 +313,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
end.
switch_callback(OldState, NewCallback, Length) ->
- Ref = inet_op(fun () -> prim_inet:async_recv(
- OldState#v1.sock, Length, -1) end),
+ Ref = inet_op(fun () -> rabbit_net:async_recv(
+ OldState#v1.sock, Length, infinity) end),
OldState#v1{callback = NewCallback,
recv_ref = Ref}.
@@ -341,32 +331,14 @@ close_connection(State = #v1{connection = #connection{
State#v1{connection_state = closed}.
close_channel(Channel, State) ->
- Ref = make_ref(),
- TRef = erlang:send_after(1000 * ?CHANNEL_CLOSING_TIMEOUT,
- self(),
- {terminate_channel, Channel, Ref}),
- put({closing_channel, Channel}, {Ref, TRef}),
- State.
-
-terminate_channel(Channel, Ref, State) ->
- case get({closing_channel, Channel}) of
- undefined -> ok; %% got close_ok in the meantime
- {Ref, _} -> erase({closing_channel, Channel}),
- ok;
- {_Ref, _} -> ok %% got close_ok, and have new closing channel
- end,
+ put({channel, Channel}, closing),
State.
handle_channel_exit(Channel, Reason, State) ->
- %% We remove the channel from the inbound map only. That allows
- %% the channel to be re-opened, but also means the remaining
- %% cleanup, including possibly closing the connection, is deferred
- %% until we get the (normal) exit signal.
- erase({channel, Channel}),
handle_exception(State, Channel, Reason).
handle_dependent_exit(Pid, normal, State) ->
- channel_cleanup(Pid),
+ erase({chpid, Pid}),
maybe_close(State);
handle_dependent_exit(Pid, Reason, State) ->
case channel_cleanup(Pid) of
@@ -376,17 +348,10 @@ handle_dependent_exit(Pid, Reason, State) ->
channel_cleanup(Pid) ->
case get({chpid, Pid}) of
- undefined ->
- case get({closing_chpid, Pid}) of
- undefined -> undefined;
- {channel, Channel} ->
- erase({closing_chpid, Pid}),
- Channel
- end;
- {channel, Channel} ->
- erase({channel, Channel}),
- erase({chpid, Pid}),
- Channel
+ undefined -> undefined;
+ {channel, Channel} -> erase({channel, Channel}),
+ erase({chpid, Pid}),
+ Channel
end.
all_channels() -> [Pid || {{chpid, Pid},_} <- get()].
@@ -451,7 +416,7 @@ handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS})
State;
handle_frame(Type, 0, Payload, State) ->
case analyze_frame(Type, Payload) of
- error -> throw({unknown_frame, Type, Payload});
+ error -> throw({unknown_frame, 0, Type, Payload});
heartbeat -> State;
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
@@ -459,19 +424,33 @@ handle_frame(Type, 0, Payload, State) ->
end;
handle_frame(Type, Channel, Payload, State) ->
case analyze_frame(Type, Payload) of
- error -> throw({unknown_frame, Type, Payload});
+ error -> throw({unknown_frame, Channel, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
AnalyzedFrame ->
%%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]),
case get({channel, Channel}) of
{chpid, ChPid} ->
- ok = check_for_close(Channel, ChPid, AnalyzedFrame),
+ case AnalyzedFrame of
+ {method, 'channel.close', _} ->
+ erase({channel, Channel});
+ _ -> ok
+ end,
ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame),
State;
+ closing ->
+ %% According to the spec, after sending a
+ %% channel.close we must ignore all frames except
+ %% channel.close_ok.
+ case AnalyzedFrame of
+ {method, 'channel.close_ok', _} ->
+ erase({channel, Channel});
+ _ -> ok
+ end,
+ State;
undefined ->
case State#v1.connection_state of
- running -> send_to_new_channel(
- Channel, AnalyzedFrame, State),
+ running -> ok = send_to_new_channel(
+ Channel, AnalyzedFrame, State),
State;
Other -> throw({channel_frame_while_starting,
Channel, Other, AnalyzedFrame})
@@ -510,9 +489,8 @@ handle_input(handshake, <<"AMQP",0,ProtocolMajor,ProtocolMinor,ProtocolRevision>
handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>, State) ->
%% 0-8 and 0-9 style protocol header.
check_protocol_header(ProtocolMajor, ProtocolMinor, 0, State);
-
handle_input(handshake, Other, #v1{sock = Sock}) ->
- ok = inet_op(fun () -> gen_tcp:send(
+ ok = inet_op(fun () -> rabbit_net:send(
Sock, <<"AMQP",1,1,
?PROTOCOL_VERSION_MAJOR,
?PROTOCOL_VERSION_MINOR>>) end),
@@ -570,17 +548,17 @@ handle_method0(MethodName, FieldsBin, State) ->
MethodName, FieldsBin),
State)
catch exit:Reason ->
- CompleteReason =
- case Reason of
- {amqp, Error, Explanation, none} ->
- {amqp, Error, Explanation, MethodName};
- OtherReason -> OtherReason
- end,
+ CompleteReason = case Reason of
+ #amqp_error{method = none} ->
+ Reason#amqp_error{method = MethodName};
+ OtherReason -> OtherReason
+ end,
case State#v1.connection_state of
running -> send_exception(State, 0, CompleteReason);
Other -> throw({channel0_error, Other, CompleteReason})
end
end.
+
handle_method0(#'connection.start_ok'{mechanism = Mechanism,
response = Response},
State = #v1{connection_state = starting,
@@ -646,23 +624,23 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, #v1{}) ->
self();
i(address, #v1{sock = Sock}) ->
- {ok, {A, _}} = inet:sockname(Sock),
+ {ok, {A, _}} = rabbit_net:sockname(Sock),
A;
i(port, #v1{sock = Sock}) ->
- {ok, {_, P}} = inet:sockname(Sock),
+ {ok, {_, P}} = rabbit_net:sockname(Sock),
P;
i(peer_address, #v1{sock = Sock}) ->
- {ok, {A, _}} = inet:peername(Sock),
+ {ok, {A, _}} = rabbit_net:peername(Sock),
A;
i(peer_port, #v1{sock = Sock}) ->
- {ok, {_, P}} = inet:peername(Sock),
+ {ok, {_, P}} = rabbit_net:peername(Sock),
P;
i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct;
SockStat =:= recv_cnt;
SockStat =:= send_oct;
SockStat =:= send_cnt;
SockStat =:= send_pend ->
- case inet:getstat(Sock, [SockStat]) of
+ case rabbit_net:getstat(Sock, [SockStat]) of
{ok, [{SockStat, StatVal}]} -> StatVal;
{error, einval} -> undefined;
{error, Error} -> throw({cannot_get_socket_stats, Error})
@@ -674,7 +652,7 @@ i(channels, #v1{}) ->
i(user, #v1{connection = #connection{user = #user{username = Username}}}) ->
Username;
i(user, #v1{connection = #connection{user = none}}) ->
- none;
+ '';
i(vhost, #v1{connection = #connection{vhost = VHost}}) ->
VHost;
i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) ->
@@ -687,38 +665,17 @@ i(Item, #v1{}) ->
%%--------------------------------------------------------------------------
send_to_new_channel(Channel, AnalyzedFrame, State) ->
- case get({closing_channel, Channel}) of
- undefined ->
- #v1{sock = Sock,
- connection = #connection{
- frame_max = FrameMax,
- user = #user{username = Username},
- vhost = VHost}} = State,
- WriterPid = rabbit_writer:start(Sock, Channel, FrameMax),
- ChPid = rabbit_framing_channel:start_link(
- fun rabbit_channel:start_link/5,
- [Channel, self(), WriterPid, Username, VHost]),
- put({channel, Channel}, {chpid, ChPid}),
- put({chpid, ChPid}, {channel, Channel}),
- ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame);
- {_, TRef} ->
- %% According to the spec, after sending a channel.close we
- %% must ignore all frames except channel.close_ok.
- case AnalyzedFrame of
- {method, 'channel.close_ok', _} ->
- erlang:cancel_timer(TRef),
- erase({closing_channel, Channel}),
- ok;
- _Other -> ok
- end
- end.
-
-check_for_close(Channel, ChPid, {method, 'channel.close', _}) ->
- channel_cleanup(ChPid),
- put({closing_chpid, ChPid}, {channel, Channel}),
- ok;
-check_for_close(_Channel, _ChPid, _Frame) ->
- ok.
+ #v1{sock = Sock, connection = #connection{
+ frame_max = FrameMax,
+ user = #user{username = Username},
+ vhost = VHost}} = State,
+ WriterPid = rabbit_writer:start(Sock, Channel, FrameMax),
+ ChPid = rabbit_framing_channel:start_link(
+ fun rabbit_channel:start_link/5,
+ [Channel, self(), WriterPid, Username, VHost]),
+ put({channel, Channel}, {chpid, ChPid}),
+ put({chpid, ChPid}, {channel, Channel}),
+ ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame).
log_channel_error(ConnectionState, Channel, Reason) ->
rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n",
@@ -764,18 +721,27 @@ map_exception(Channel, Reason) ->
end,
{ShouldClose, CloseChannel, CloseMethod}.
-lookup_amqp_exception({amqp, {ShouldClose, Code, Text}, Expl, Method}) ->
- ExplBin = list_to_binary(Expl),
- CompleteTextBin = <<Text/binary, " - ", ExplBin/binary>>,
- SafeTextBin = if size(CompleteTextBin) > 255 ->
- <<CompleteTextBin:252/binary, "...">>;
- true ->
- CompleteTextBin
- end,
- {ShouldClose, Code, SafeTextBin, Method};
-lookup_amqp_exception({amqp, ErrorName, Expl, Method}) ->
- Details = rabbit_framing:lookup_amqp_exception(ErrorName),
- lookup_amqp_exception({amqp, Details, Expl, Method});
+%% FIXME: this clause can go when we move to AMQP spec >=8.1
+lookup_amqp_exception(#amqp_error{name = precondition_failed,
+ explanation = Expl,
+ method = Method}) ->
+ ExplBin = amqp_exception_explanation(<<"PRECONDITION_FAILED">>, Expl),
+ {false, 406, ExplBin, Method};
+lookup_amqp_exception(#amqp_error{name = Name,
+ explanation = Expl,
+ method = Method}) ->
+ {ShouldClose, Code, Text} = rabbit_framing:lookup_amqp_exception(Name),
+ ExplBin = amqp_exception_explanation(Text, Expl),
+ {ShouldClose, Code, ExplBin, Method};
lookup_amqp_exception(Other) ->
rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]),
- {true, ?INTERNAL_ERROR, <<"INTERNAL_ERROR">>, none}.
+ {ShouldClose, Code, Text} =
+ rabbit_framing:lookup_amqp_exception(internal_error),
+ {ShouldClose, Code, Text, none}.
+
+amqp_exception_explanation(Text, Expl) ->
+ ExplBin = list_to_binary(Expl),
+ CompleteTextBin = <<Text/binary, " - ", ExplBin/binary>>,
+ if size(CompleteTextBin) > 255 -> <<CompleteTextBin:252/binary, "...">>;
+ true -> CompleteTextBin
+ end.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index e5100ccd16..5c5c55f1e4 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -49,6 +49,7 @@ test_content_prop_roundtrip(Datum, Binary) ->
all_tests() ->
passed = test_priority_queue(),
+ passed = test_unfold(),
passed = test_parsing(),
passed = test_topic_matching(),
passed = test_log_management(),
@@ -75,7 +76,8 @@ test_priority_queue() ->
%% 1-element priority Q
Q1 = priority_queue:in(foo, 1, priority_queue:new()),
- {true, false, 1, [{1, foo}], [foo]} = test_priority_queue(Q1),
+ {true, false, 1, [{1, foo}], [foo]} =
+ test_priority_queue(Q1),
%% 2-element same-priority Q
Q2 = priority_queue:in(bar, 1, Q1),
@@ -91,6 +93,71 @@ test_priority_queue() ->
Q4 = priority_queue:in(foo, -1, priority_queue:new()),
{true, false, 1, [{-1, foo}], [foo]} = test_priority_queue(Q4),
+ %% merge 2 * 1-element no-priority Qs
+ Q5 = priority_queue:join(priority_queue:in(foo, Q),
+ priority_queue:in(bar, Q)),
+ {true, false, 2, [{0, foo}, {0, bar}], [foo, bar]} =
+ test_priority_queue(Q5),
+
+ %% merge 1-element no-priority Q with 1-element priority Q
+ Q6 = priority_queue:join(priority_queue:in(foo, Q),
+ priority_queue:in(bar, 1, Q)),
+ {true, false, 2, [{1, bar}, {0, foo}], [bar, foo]} =
+ test_priority_queue(Q6),
+
+ %% merge 1-element priority Q with 1-element no-priority Q
+ Q7 = priority_queue:join(priority_queue:in(foo, 1, Q),
+ priority_queue:in(bar, Q)),
+ {true, false, 2, [{1, foo}, {0, bar}], [foo, bar]} =
+ test_priority_queue(Q7),
+
+ %% merge 2 * 1-element same-priority Qs
+ Q8 = priority_queue:join(priority_queue:in(foo, 1, Q),
+ priority_queue:in(bar, 1, Q)),
+ {true, false, 2, [{1, foo}, {1, bar}], [foo, bar]} =
+ test_priority_queue(Q8),
+
+ %% merge 2 * 1-element different-priority Qs
+ Q9 = priority_queue:join(priority_queue:in(foo, 1, Q),
+ priority_queue:in(bar, 2, Q)),
+ {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} =
+ test_priority_queue(Q9),
+
+ %% merge 2 * 1-element different-priority Qs (other way around)
+ Q10 = priority_queue:join(priority_queue:in(bar, 2, Q),
+ priority_queue:in(foo, 1, Q)),
+ {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} =
+ test_priority_queue(Q10),
+
+ %% merge 2 * 2-element multi-different-priority Qs
+ Q11 = priority_queue:join(Q6, Q5),
+ {true, false, 4, [{1, bar}, {0, foo}, {0, foo}, {0, bar}],
+ [bar, foo, foo, bar]} = test_priority_queue(Q11),
+
+ %% and the other way around
+ Q12 = priority_queue:join(Q5, Q6),
+ {true, false, 4, [{1, bar}, {0, foo}, {0, bar}, {0, foo}],
+ [bar, foo, bar, foo]} = test_priority_queue(Q12),
+
+ %% merge with negative priorities
+ Q13 = priority_queue:join(Q4, Q5),
+ {true, false, 3, [{0, foo}, {0, bar}, {-1, foo}], [foo, bar, foo]} =
+ test_priority_queue(Q13),
+
+ %% and the other way around
+ Q14 = priority_queue:join(Q5, Q4),
+ {true, false, 3, [{0, foo}, {0, bar}, {-1, foo}], [foo, bar, foo]} =
+ test_priority_queue(Q14),
+
+ %% joins with empty queues:
+ Q1 = priority_queue:join(Q, Q1),
+ Q1 = priority_queue:join(Q1, Q),
+
+ %% insert with priority into non-empty zero-priority queue
+ Q15 = priority_queue:in(baz, 1, Q5),
+ {true, false, 3, [{1, baz}, {0, foo}, {0, bar}], [baz, foo, bar]} =
+ test_priority_queue(Q15),
+
passed.
priority_queue_in_all(Q, L) ->
@@ -116,6 +183,14 @@ test_simple_n_element_queue(N) ->
{true, false, N, ToListRes, Items} = test_priority_queue(Q),
passed.
+test_unfold() ->
+ {[], test} = rabbit_misc:unfold(fun (_V) -> false end, test),
+ List = lists:seq(2,20,2),
+ {List, 0} = rabbit_misc:unfold(fun (0) -> false;
+ (N) -> {true, N*2, N-1}
+ end, 10),
+ passed.
+
test_parsing() ->
passed = test_content_properties(),
passed.
@@ -408,19 +483,17 @@ test_cluster_management() ->
end,
ClusteringSequence),
- %% attempt to convert a disk node into a ram node
+ %% convert a disk node into a ram node
ok = control_action(reset, []),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
- {error, {cannot_convert_disk_node_to_ram_node, _}} =
- control_action(cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
+ ok = control_action(cluster, ["invalid1@invalid",
+ "invalid2@invalid"]),
- %% attempt to join a non-existing cluster as a ram node
+ %% join a non-existing cluster as a ram node
ok = control_action(reset, []),
- {error, {unable_to_contact_cluster_nodes, _}} =
- control_action(cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
+ ok = control_action(cluster, ["invalid1@invalid",
+ "invalid2@invalid"]),
SecondaryNode = rabbit_misc:localnode(hare),
case net_adm:ping(SecondaryNode) of
@@ -436,11 +509,12 @@ test_cluster_management2(SecondaryNode) ->
NodeS = atom_to_list(node()),
SecondaryNodeS = atom_to_list(SecondaryNode),
- %% attempt to convert a disk node into a ram node
+ %% make a disk node
ok = control_action(reset, []),
ok = control_action(cluster, [NodeS]),
- {error, {unable_to_join_cluster, _, _}} =
- control_action(cluster, [SecondaryNodeS]),
+ %% make a ram node
+ ok = control_action(reset, []),
+ ok = control_action(cluster, [SecondaryNodeS]),
%% join cluster as a ram node
ok = control_action(reset, []),
@@ -453,21 +527,21 @@ test_cluster_management2(SecondaryNode) ->
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
- %% attempt to join non-existing cluster as a ram node
- {error, _} = control_action(cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
-
+ %% join non-existing cluster as a ram node
+ ok = control_action(cluster, ["invalid1@invalid",
+ "invalid2@invalid"]),
%% turn ram node into disk node
+ ok = control_action(reset, []),
ok = control_action(cluster, [SecondaryNodeS, NodeS]),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
- %% attempt to convert a disk node into a ram node
- {error, {cannot_convert_disk_node_to_ram_node, _}} =
- control_action(cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
+ %% convert a disk node into a ram node
+ ok = control_action(cluster, ["invalid1@invalid",
+ "invalid2@invalid"]),
%% turn a disk node into a ram node
+ ok = control_action(reset, []),
ok = control_action(cluster, [SecondaryNodeS]),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
@@ -592,7 +666,10 @@ test_server_status() ->
{ok, _} = rabbit_amqqueue:delete(Q, false, false),
%% list connections
- [#listener{host = H, port = P} | _] = rabbit_networking:active_listeners(),
+ [#listener{host = H, port = P} | _] =
+ [L || L = #listener{node = N} <- rabbit_networking:active_listeners(),
+ N =:= node()],
+
{ok, C} = gen_tcp:connect(H, P, []),
timer:sleep(100),
ok = info_action(
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index e338ddfe9d..1679ce7c15 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -169,7 +169,7 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax) ->
tcp_send(Sock, Data) ->
rabbit_misc:throw_on_error(inet_error,
- fun () -> gen_tcp:send(Sock, Data) end).
+ fun () -> rabbit_net:send(Sock, Data) end).
internal_send_command(Sock, Channel, MethodRecord) ->
ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord)).
@@ -206,6 +206,6 @@ internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) ->
ok.
port_cmd(Sock, Data) ->
- try erlang:port_command(Sock, Data)
+ try rabbit_net:port_command(Sock, Data)
catch error:Error -> exit({writer, send_failed, Error})
end.
diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl
index aa8b8ad5a7..bc7425613f 100644
--- a/src/tcp_acceptor.erl
+++ b/src/tcp_acceptor.erl
@@ -67,15 +67,20 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}},
{ok, Mod} = inet_db:lookup_socket(LSock),
inet_db:register_socket(Sock, Mod),
- %% report
- {ok, {Address, Port}} = inet:sockname(LSock),
- {ok, {PeerAddress, PeerPort}} = inet:peername(Sock),
- error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n",
- [inet_parse:ntoa(Address), Port,
- inet_parse:ntoa(PeerAddress), PeerPort]),
-
- %% handle
- apply(M, F, A ++ [Sock]),
+ try
+ %% report
+ {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end),
+ {PeerAddress, PeerPort} = inet_op(fun () -> inet:peername(Sock) end),
+ error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n",
+ [inet_parse:ntoa(Address), Port,
+ inet_parse:ntoa(PeerAddress), PeerPort]),
+ %% handle
+ apply(M, F, A ++ [Sock])
+ catch {inet_error, Reason} ->
+ gen_tcp:close(Sock),
+ error_logger:error_msg("unable to accept TCP connection: ~p~n",
+ [Reason])
+ end,
%% accept more
case prim_inet:async_accept(LSock, -1) of
@@ -95,3 +100,7 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+
+%%--------------------------------------------------------------------
+
+inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl
index 92a47cf127..4a2e149bb8 100644
--- a/src/tcp_listener.erl
+++ b/src/tcp_listener.erl
@@ -33,28 +33,28 @@
-behaviour(gen_server).
--export([start_link/7]).
+-export([start_link/8]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--record(state, {sock, on_startup, on_shutdown}).
+-record(state, {sock, on_startup, on_shutdown, label}).
%%--------------------------------------------------------------------
start_link(IPAddress, Port, SocketOpts,
ConcurrentAcceptorCount, AcceptorSup,
- OnStartup, OnShutdown) ->
+ OnStartup, OnShutdown, Label) ->
gen_server:start_link(
?MODULE, {IPAddress, Port, SocketOpts,
ConcurrentAcceptorCount, AcceptorSup,
- OnStartup, OnShutdown}, []).
+ OnStartup, OnShutdown, Label}, []).
%%--------------------------------------------------------------------
init({IPAddress, Port, SocketOpts,
ConcurrentAcceptorCount, AcceptorSup,
- {M,F,A} = OnStartup, OnShutdown}) ->
+ {M,F,A} = OnStartup, OnShutdown, Label}) ->
process_flag(trap_exit, true),
case gen_tcp:listen(Port, SocketOpts ++ [{ip, IPAddress},
{active, false}]) of
@@ -65,15 +65,16 @@ init({IPAddress, Port, SocketOpts,
end,
lists:duplicate(ConcurrentAcceptorCount, dummy)),
{ok, {LIPAddress, LPort}} = inet:sockname(LSock),
- error_logger:info_msg("started TCP listener on ~s:~p~n",
- [inet_parse:ntoa(LIPAddress), LPort]),
+ error_logger:info_msg("started ~s on ~s:~p~n",
+ [Label, inet_parse:ntoa(LIPAddress), LPort]),
apply(M, F, A ++ [IPAddress, Port]),
- {ok, #state{sock=LSock,
- on_startup = OnStartup, on_shutdown = OnShutdown}};
+ {ok, #state{sock = LSock,
+ on_startup = OnStartup, on_shutdown = OnShutdown,
+ label = Label}};
{error, Reason} ->
error_logger:error_msg(
- "failed to start TCP listener on ~s:~p - ~p~n",
- [inet_parse:ntoa(IPAddress), Port, Reason]),
+ "failed to start ~s on ~s:~p - ~p~n",
+ [Label, inet_parse:ntoa(IPAddress), Port, Reason]),
{stop, {cannot_listen, IPAddress, Port, Reason}}
end.
@@ -86,11 +87,11 @@ handle_cast(_Msg, State) ->
handle_info(_Info, State) ->
{noreply, State}.
-terminate(_Reason, #state{sock=LSock, on_shutdown = {M,F,A}}) ->
+terminate(_Reason, #state{sock=LSock, on_shutdown = {M,F,A}, label=Label}) ->
{ok, {IPAddress, Port}} = inet:sockname(LSock),
gen_tcp:close(LSock),
- error_logger:info_msg("stopped TCP listener on ~s:~p~n",
- [inet_parse:ntoa(IPAddress), Port]),
+ error_logger:info_msg("stopped ~s on ~s:~p~n",
+ [Label, inet_parse:ntoa(IPAddress), Port]),
apply(M, F, A ++ [IPAddress, Port]).
code_change(_OldVsn, State, _Extra) ->
diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl
index 901a0da3b7..d6bbac080f 100644
--- a/src/tcp_listener_sup.erl
+++ b/src/tcp_listener_sup.erl
@@ -33,23 +33,23 @@
-behaviour(supervisor).
--export([start_link/6, start_link/7]).
+-export([start_link/7, start_link/8]).
-export([init/1]).
start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
- AcceptCallback) ->
+ AcceptCallback, Label) ->
start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
- AcceptCallback, 1).
+ AcceptCallback, 1, Label).
start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
- AcceptCallback, ConcurrentAcceptorCount) ->
+ AcceptCallback, ConcurrentAcceptorCount, Label) ->
supervisor:start_link(
?MODULE, {IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
- AcceptCallback, ConcurrentAcceptorCount}).
+ AcceptCallback, ConcurrentAcceptorCount, Label}).
init({IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
- AcceptCallback, ConcurrentAcceptorCount}) ->
+ AcceptCallback, ConcurrentAcceptorCount, Label}) ->
%% This is gross. The tcp_listener needs to know about the
%% tcp_acceptor_sup, and the only way I can think of accomplishing
%% that without jumping through hoops is to register the
@@ -62,5 +62,5 @@ init({IPAddress, Port, SocketOpts, OnStartup, OnShutdown,
{tcp_listener, {tcp_listener, start_link,
[IPAddress, Port, SocketOpts,
ConcurrentAcceptorCount, Name,
- OnStartup, OnShutdown]},
+ OnStartup, OnShutdown, Label]},
transient, 100, worker, [tcp_listener]}]}}.