diff options
| author | Michael Bridgen <mikeb@lshift.net> | 2009-10-05 15:31:53 +0100 |
|---|---|---|
| committer | Michael Bridgen <mikeb@lshift.net> | 2009-10-05 15:31:53 +0100 |
| commit | e205a1f1873b19f9dfb4ec934da1b4ee91e461a2 (patch) | |
| tree | 25f878eaf5a22fd1983a48cb09002a70ea631ab5 /src | |
| parent | 88d5f01b0241679fed1b1032dfde1d4760eaeb9d (diff) | |
| parent | 1ff98eb5204ee7edda8b85d74667e3d31e4bc5ac (diff) | |
| download | rabbitmq-server-git-e205a1f1873b19f9dfb4ec934da1b4ee91e461a2.tar.gz | |
Merge from default. Resolved some conflicts due to bool->boolean in
type specs, and a bad automerge in rabbit_reader.erl.
Diffstat (limited to 'src')
29 files changed, 1202 insertions, 523 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 36fb4fa8c3..a2d9350c4e 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -437,7 +437,10 @@ unregister_name({local,Name}) -> unregister_name({global,Name}) -> _ = global:unregister_name(Name); unregister_name(Pid) when is_pid(Pid) -> - Pid. + Pid; +% Under R12 let's just ignore it, as we have a single term as Name. +% On R13 it will never get here, as we get tuple with 'local/global' atom. +unregister_name(_Name) -> ok. extend_backoff(undefined) -> undefined; diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 732757c41c..74b41a910c 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -55,7 +55,8 @@ -module(priority_queue). --export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, out/1]). +-export([new/0, is_queue/1, is_empty/1, len/1, to_list/1, in/2, in/3, + out/1, join/2]). %%---------------------------------------------------------------------------- @@ -66,13 +67,14 @@ -type(pqueue() :: squeue() | {pqueue, [{priority(), squeue()}]}). -spec(new/0 :: () -> pqueue()). --spec(is_queue/1 :: (any()) -> bool()). --spec(is_empty/1 :: (pqueue()) -> bool()). +-spec(is_queue/1 :: (any()) -> boolean()). +-spec(is_empty/1 :: (pqueue()) -> boolean()). -spec(len/1 :: (pqueue()) -> non_neg_integer()). -spec(to_list/1 :: (pqueue()) -> [{priority(), any()}]). -spec(in/2 :: (any(), pqueue()) -> pqueue()). -spec(in/3 :: (any(), priority(), pqueue()) -> pqueue()). -spec(out/1 :: (pqueue()) -> {empty | {value, any()}, pqueue()}). +-spec(join/2 :: (pqueue(), pqueue()) -> pqueue()). -endif. @@ -147,6 +149,42 @@ out({pqueue, [{P, Q} | Queues]}) -> end, {R, NewQ}. +join(A, {queue, [], []}) -> + A; +join({queue, [], []}, B) -> + B; +join({queue, AIn, AOut}, {queue, BIn, BOut}) -> + {queue, BIn, AOut ++ lists:reverse(AIn, BOut)}; +join(A = {queue, _, _}, {pqueue, BPQ}) -> + {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, BPQ), + Post1 = case Post of + [] -> [ {0, A} ]; + [ {0, ZeroQueue} | Rest ] -> [ {0, join(A, ZeroQueue)} | Rest ]; + _ -> [ {0, A} | Post ] + end, + {pqueue, Pre ++ Post1}; +join({pqueue, APQ}, B = {queue, _, _}) -> + {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 end, APQ), + Post1 = case Post of + [] -> [ {0, B} ]; + [ {0, ZeroQueue} | Rest ] -> [ {0, join(ZeroQueue, B)} | Rest ]; + _ -> [ {0, B} | Post ] + end, + {pqueue, Pre ++ Post1}; +join({pqueue, APQ}, {pqueue, BPQ}) -> + {pqueue, merge(APQ, BPQ, [])}. + +merge([], BPQ, Acc) -> + lists:reverse(Acc, BPQ); +merge(APQ, [], Acc) -> + lists:reverse(Acc, APQ); +merge([{P, A}|As], [{P, B}|Bs], Acc) -> + merge(As, Bs, [ {P, join(A, B)} | Acc ]); +merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB -> + merge(As, Bs, [ {PA, A} | Acc ]); +merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) -> + merge(As, Bs, [ {PB, B} | Acc ]). + r2f([]) -> {queue, [], []}; r2f([_] = R) -> {queue, [], R}; r2f([X,Y]) -> {queue, [X], [Y]}; diff --git a/src/rabbit.erl b/src/rabbit.erl index b0d62b5ab8..18fd1b175f 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -33,7 +33,7 @@ -behaviour(application). --export([start/0, stop/0, stop_and_halt/0, status/0, rotate_logs/1]). +-export([prepare/0, start/0, stop/0, stop_and_halt/0, status/0, rotate_logs/1]). -export([start/2, stop/1]). @@ -57,6 +57,7 @@ -type(log_location() :: 'tty' | 'undefined' | string()). -type(file_suffix() :: binary()). +-spec(prepare/0 :: () -> 'ok'). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_halt/0 :: () -> 'ok'). @@ -71,11 +72,14 @@ %%---------------------------------------------------------------------------- +prepare() -> + ok = ensure_working_log_handlers(), + ok = rabbit_mnesia:ensure_mnesia_dir(). + start() -> try - ok = ensure_working_log_handlers(), - ok = rabbit_mnesia:ensure_mnesia_dir(), - ok = rabbit_misc:start_applications(?APPS) + ok = prepare(), + ok = rabbit_misc:start_applications(?APPS) after %%give the error loggers some time to catch up timer:sleep(100) @@ -116,8 +120,6 @@ start(normal, []) -> print_banner(), - {ok, ExtraSteps} = application:get_env(extra_startup_steps), - lists:foreach( fun ({Msg, Thunk}) -> io:format("starting ~-20s ...", [Msg]), @@ -135,13 +137,13 @@ start(normal, []) -> ok = start_child(rabbit_log), ok = rabbit_hooks:start(), - ok = rabbit_amqqueue:start(), + ok = rabbit_binary_generator: + check_empty_content_body_frame_size(), {ok, MemoryAlarms} = application:get_env(memory_alarms), ok = rabbit_alarm:start(MemoryAlarms), - ok = rabbit_binary_generator: - check_empty_content_body_frame_size(), + ok = rabbit_amqqueue:start(), ok = start_child(rabbit_router), ok = start_child(rabbit_node_monitor) @@ -170,14 +172,28 @@ start(normal, []) -> {"TCP listeners", fun () -> ok = rabbit_networking:start(), - {ok, TCPListeners} = application:get_env(tcp_listeners), + {ok, TcpListeners} = application:get_env(tcp_listeners), lists:foreach( fun ({Host, Port}) -> ok = rabbit_networking:start_tcp_listener(Host, Port) end, - TCPListeners) - end}] - ++ ExtraSteps), + TcpListeners) + end}, + {"SSL listeners", + fun () -> + case application:get_env(ssl_listeners) of + {ok, []} -> + ok; + {ok, SslListeners} -> + ok = rabbit_misc:start_applications([crypto, ssl]), + + {ok, SslOpts} = application:get_env(ssl_options), + + [rabbit_networking:start_ssl_listener + (Host, Port, SslOpts) || {Host, Port} <- SslListeners], + ok + end + end}]), io:format("~nbroker running~n"), @@ -203,6 +219,16 @@ log_location(Type) -> _ -> undefined end. +app_location() -> + {ok, Application} = application:get_application(), + filename:absname(code:where_is_file(atom_to_list(Application) ++ ".app")). + +home_dir() -> + case init:get_argument(home) of + {ok, [[Home]]} -> Home; + Other -> Other + end. + %--------------------------------------------------------------------------- print_banner() -> @@ -225,10 +251,13 @@ print_banner() -> [Product, string:right([$v|Version], ProductLen), ?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]), - Settings = [{"node", node()}, - {"log", log_location(kernel)}, - {"sasl log", log_location(sasl)}, - {"database dir", rabbit_mnesia:dir()}], + Settings = [{"node", node()}, + {"app descriptor", app_location()}, + {"home dir", home_dir()}, + {"cookie hash", rabbit_misc:cookie_hash()}, + {"log", log_location(kernel)}, + {"sasl log", log_location(sasl)}, + {"database dir", rabbit_mnesia:dir()}], DescrLen = lists:max([length(K) || {K, _V} <- Settings]), Format = "~-" ++ integer_to_list(DescrLen) ++ "s: ~s~n", lists:foreach(fun ({K, V}) -> io:format(Format, [K, V]) end, Settings), diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index 21999f16c3..7a2fbcb826 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -41,7 +41,7 @@ -define(MEMSUP_CHECK_INTERVAL, 1000). %% OSes on which we know memory alarms to be trustworthy --define(SUPPORTED_OS, [{unix, linux}]). +-define(SUPPORTED_OS, [{unix, linux}, {unix, darwin}]). -record(alarms, {alertees, system_memory_high_watermark = false}). @@ -50,7 +50,7 @@ -ifdef(use_specs). -type(mfa_tuple() :: {atom(), atom(), list()}). --spec(start/1 :: (bool() | 'auto') -> 'ok'). +-spec(start/1 :: (boolean() | 'auto') -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(register/2 :: (pid(), mfa_tuple()) -> 'ok'). @@ -136,33 +136,35 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- start_memsup() -> - Mod = case os:type() of - %% memsup doesn't take account of buffers or cache when - %% considering "free" memory - therefore on Linux we can - %% get memory alarms very easily without any pressure - %% existing on memory at all. Therefore we need to use - %% our own simple memory monitor. - %% - {unix, linux} -> rabbit_memsup_linux; - - %% Start memsup programmatically rather than via the - %% rabbitmq-server script. This is not quite the right - %% thing to do as os_mon checks to see if memsup is - %% available before starting it, but as memsup is - %% available everywhere (even on VXWorks) it should be - %% ok. - %% - %% One benefit of the programmatic startup is that we - %% can add our alarm_handler before memsup is running, - %% thus ensuring that we notice memory alarms that go - %% off on startup. - %% - _ -> memsup - end, + {Mod, Args} = + case os:type() of + %% memsup doesn't take account of buffers or cache when + %% considering "free" memory - therefore on Linux we can + %% get memory alarms very easily without any pressure + %% existing on memory at all. Therefore we need to use + %% our own simple memory monitor. + %% + {unix, linux} -> {rabbit_memsup, [rabbit_memsup_linux]}; + {unix, darwin} -> {rabbit_memsup, [rabbit_memsup_darwin]}; + + %% Start memsup programmatically rather than via the + %% rabbitmq-server script. This is not quite the right + %% thing to do as os_mon checks to see if memsup is + %% available before starting it, but as memsup is + %% available everywhere (even on VXWorks) it should be + %% ok. + %% + %% One benefit of the programmatic startup is that we + %% can add our alarm_handler before memsup is running, + %% thus ensuring that we notice memory alarms that go + %% off on startup. + %% + _ -> {memsup, []} + end, %% This is based on os_mon:childspec(memsup, true) {ok, _} = supervisor:start_child( os_mon_sup, - {memsup, {Mod, start_link, []}, + {memsup, {Mod, start_link, Args}, permanent, 2000, worker, [Mod]}), ok. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 4903c2c57f..1a5e82d714 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -63,7 +63,7 @@ -spec(start/0 :: () -> 'ok'). -spec(recover/0 :: () -> 'ok'). --spec(declare/4 :: (queue_name(), bool(), bool(), amqp_table()) -> +-spec(declare/4 :: (queue_name(), boolean(), boolean(), amqp_table()) -> amqqueue()). -spec(lookup/1 :: (queue_name()) -> {'ok', amqqueue()} | not_found()). -spec(with/2 :: (queue_name(), qfun(A)) -> A | not_found()). @@ -83,8 +83,8 @@ {'error', 'in_use'} | {'error', 'not_empty'}). -spec(purge/1 :: (amqqueue()) -> qlen()). --spec(deliver/2 :: (pid(), delivery()) -> bool()). --spec(redeliver/2 :: (pid(), [{message(), bool()}]) -> 'ok'). +-spec(deliver/2 :: (pid(), delivery()) -> boolean()). +-spec(redeliver/2 :: (pid(), [{message(), boolean()}]) -> 'ok'). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/4 :: (pid(), maybe(txn()), [msg_id()], pid()) -> 'ok'). -spec(commit_all/2 :: ([pid()], txn()) -> ok_or_errors()). @@ -92,16 +92,16 @@ -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). -spec(limit_all/3 :: ([pid()], pid(), pid() | 'undefined') -> ok_or_errors()). -spec(claim_queue/2 :: (amqqueue(), pid()) -> 'ok' | 'locked'). --spec(basic_get/3 :: (amqqueue(), pid(), bool()) -> +-spec(basic_get/3 :: (amqqueue(), pid(), boolean()) -> {'ok', non_neg_integer(), msg()} | 'empty'). -spec(basic_consume/8 :: - (amqqueue(), bool(), pid(), pid(), pid(), ctag(), bool(), any()) -> + (amqqueue(), boolean(), pid(), pid(), pid(), ctag(), boolean(), any()) -> 'ok' | {'error', 'queue_owned_by_another_connection' | 'exclusive_consume_unavailable'}). -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). --spec(internal_declare/2 :: (amqqueue(), bool()) -> amqqueue()). +-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). @@ -303,10 +303,10 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> infinity). notify_sent(QPid, ChPid) -> - gen_server2:cast(QPid, {notify_sent, ChPid}). + gen_server2:pcast(QPid, 8, {notify_sent, ChPid}). unblock(QPid, ChPid) -> - gen_server2:cast(QPid, {unblock, ChPid}). + gen_server2:pcast(QPid, 8, {unblock, ChPid}). internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 4033aaafda..bec2cd0845 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -45,13 +45,14 @@ -type(publish_result() :: ({ok, routing_result(), [pid()]} | not_found())). -spec(publish/1 :: (delivery()) -> publish_result()). --spec(delivery/4 :: (bool(), bool(), maybe(txn()), message()) -> delivery()). +-spec(delivery/4 :: (boolean(), boolean(), maybe(txn()), message()) -> + delivery()). -spec(message/4 :: (exchange_name(), routing_key(), properties_input(), binary()) -> message()). -spec(properties/1 :: (properties_input()) -> amqp_properties()). -spec(publish/4 :: (exchange_name(), routing_key(), properties_input(), binary()) -> publish_result()). --spec(publish/7 :: (exchange_name(), routing_key(), bool(), bool(), +-spec(publish/7 :: (exchange_name(), routing_key(), boolean(), boolean(), maybe(txn()), properties_input(), binary()) -> publish_result()). -spec(build_content/2 :: (amqp_properties(), binary()) -> content()). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 8af85b34c4..26db0777d0 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -60,8 +60,8 @@ -spec(do/3 :: (pid(), amqp_method(), maybe(content())) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), amqp_method()) -> 'ok'). --spec(deliver/4 :: (pid(), ctag(), bool(), msg()) -> 'ok'). --spec(conserve_memory/2 :: (pid(), bool()) -> 'ok'). +-spec(deliver/4 :: (pid(), ctag(), boolean(), msg()) -> 'ok'). +-spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). -endif. @@ -125,11 +125,11 @@ handle_cast({method, Method, Content}, State) -> stop -> {stop, normal, State#ch{state = terminating}} catch - exit:{amqp, Error, Explanation, none} -> - ok = notify_queues(internal_rollback(State)), - Reason = {amqp, Error, Explanation, - rabbit_misc:method_record_type(Method)}, - State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, + exit:Reason = #amqp_error{} -> + ok = rollback_and_notify(State), + MethodName = rabbit_misc:method_record_type(Method), + State#ch.reader_pid ! {channel_exit, State#ch.channel, + Reason#amqp_error{method = MethodName}}, {stop, normal, State#ch{state = terminating}}; exit:normal -> {stop, normal, State}; @@ -157,6 +157,10 @@ handle_cast({conserve_memory, Conserve}, State) -> State#ch.writer_pid, #'channel.flow'{active = not(Conserve)}), noreply(State). +handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, + State = #ch{writer_pid = WriterPid}) -> + State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, + {stop, normal, State}; handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; @@ -171,7 +175,7 @@ terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid, terminate(Reason, State = #ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) -> - Res = notify_queues(internal_rollback(State)), + Res = rollback_and_notify(State), case Reason of normal -> ok = Res; _ -> ok @@ -256,9 +260,6 @@ expand_routing_key_shortcut(<<>>, <<>>, expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _State) -> RoutingKey. -die_precondition_failed(Fmt, Params) -> - rabbit_misc:protocol_error(precondition_failed, Fmt, Params). - %% check that an exchange/queue name does not contain the reserved %% "amq." prefix. %% @@ -290,7 +291,7 @@ handle_method(_Method, _, #ch{state = starting}) -> rabbit_misc:protocol_error(channel_error, "expected 'channel.open'", []); handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> - ok = notify_queues(internal_rollback(State)), + ok = rollback_and_notify(State), ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}), stop; @@ -601,8 +602,8 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin, {error, not_found} -> rabbit_misc:not_found(ExchangeName); {error, in_use} -> - die_precondition_failed( - "~s in use", [rabbit_misc:rs(ExchangeName)]); + rabbit_misc:protocol_error( + precondition_failed, "~s in use", [rabbit_misc:rs(ExchangeName)]); ok -> return_ok(State, NoWait, #'exchange.delete_ok'{}) end; @@ -676,11 +677,11 @@ handle_method(#'queue.delete'{queue = QueueNameBin, QueueName, fun (Q) -> rabbit_amqqueue:delete(Q, IfUnused, IfEmpty) end) of {error, in_use} -> - die_precondition_failed( - "~s in use", [rabbit_misc:rs(QueueName)]); + rabbit_misc:protocol_error( + precondition_failed, "~s in use", [rabbit_misc:rs(QueueName)]); {error, not_empty} -> - die_precondition_failed( - "~s not empty", [rabbit_misc:rs(QueueName)]); + rabbit_misc:protocol_error( + precondition_failed, "~s not empty", [rabbit_misc:rs(QueueName)]); {ok, PurgedMessageCount} -> return_ok(State, NoWait, #'queue.delete_ok'{ @@ -863,6 +864,11 @@ internal_rollback(State = #ch{transaction_id = TxnKey, internal_error, "rollback failed: ~w", [Errors]) end. +rollback_and_notify(State = #ch{transaction_id = none}) -> + notify_queues(State); +rollback_and_notify(State) -> + notify_queues(internal_rollback(State)). + fold_per_queue(F, Acc0, UAQ) -> D = lists:foldl( fun ({_DTag, _CTag, diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 37e4d18993..a53ac289f2 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -52,10 +52,12 @@ %%---------------------------------------------------------------------------- start() -> + {ok, [[NodeNameStr|_]|_]} = init:get_argument(nodename), + NodeName = list_to_atom(NodeNameStr), FullCommand = init:get_plain_arguments(), #params{quiet = Quiet, node = Node, command = Command, args = Args} = parse_args(FullCommand, #params{quiet = false, - node = rabbit_misc:localnode(rabbit)}), + node = rabbit_misc:localnode(NodeName)}), Inform = case Quiet of true -> fun(_Format, _Args1) -> ok end; false -> fun(Format, Args1) -> @@ -80,13 +82,38 @@ start() -> {error, Reason} -> error("~p", [Reason]), halt(2); + {badrpc, Reason} -> + error("unable to connect to node ~w: ~w", [Node, Reason]), + print_badrpc_diagnostics(Node), + halt(2); Other -> error("~p", [Other]), halt(2) end. -error(Format, Args) -> - rabbit_misc:format_stderr("Error: " ++ Format ++ "~n", Args). +fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args). + +error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args). + +print_badrpc_diagnostics(Node) -> + fmt_stderr("diagnostics:", []), + NodeHost = rabbit_misc:nodehost(Node), + case net_adm:names(NodeHost) of + {error, EpmdReason} -> + fmt_stderr("- unable to connect to epmd on ~s: ~w", + [NodeHost, EpmdReason]); + {ok, NamePorts} -> + fmt_stderr("- nodes and their ports on ~s: ~p", + [NodeHost, [{list_to_atom(Name), Port} || + {Name, Port} <- NamePorts]]) + end, + fmt_stderr("- current node: ~w", [node()]), + case init:get_argument(home) of + {ok, [[Home]]} -> fmt_stderr("- current node home dir: ~s", [Home]); + Other -> fmt_stderr("- no current node home dir: ~p", [Other]) + end, + fmt_stderr("- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]), + ok. parse_args(["-n", NodeS | Args], Params) -> Node = case lists:member($@, NodeS) of @@ -164,7 +191,7 @@ exchange name, routing key, queue name and arguments, in that order. <ConnectionInfoItem> must be a member of the list [node, address, port, peer_address, peer_port, state, channels, user, vhost, timeout, frame_max, recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. The default is to display -user, peer_address and peer_port. +user, peer_address, peer_port and state. "), halt(1). @@ -197,9 +224,11 @@ action(cluster, Node, ClusterNodeSs, Inform) -> action(status, Node, [], Inform) -> Inform("Status of node ~p", [Node]), - Res = call(Node, {rabbit, status, []}), - io:format("~p~n", [Res]), - ok; + case call(Node, {rabbit, status, []}) of + {badrpc, _} = Res -> Res; + Res -> io:format("~p~n", [Res]), + ok + end; action(rotate_logs, Node, [], Inform) -> Inform("Reopening logs for node ~p", [Node]), @@ -270,8 +299,9 @@ action(list_bindings, Node, Args, Inform) -> action(list_connections, Node, Args, Inform) -> Inform("Listing connections", []), - ArgAtoms = list_replace(node, pid, - default_if_empty(Args, [user, peer_address, peer_port])), + ArgAtoms = list_replace(node, pid, + default_if_empty(Args, [user, peer_address, + peer_port, state])), display_info_list(rpc_call(Node, rabbit_networking, connection_info_all, [ArgAtoms]), ArgAtoms); @@ -314,7 +344,7 @@ default_if_empty(List, Default) when is_list(List) -> end. display_info_list(Results, InfoItemKeys) when is_list(Results) -> - lists:foreach(fun (Result) -> display_row([format_info_item(Result, X) || + lists:foreach(fun (Result) -> display_row([format_info_item(X, Result) || X <- InfoItemKeys]) end, Results), ok; @@ -325,26 +355,29 @@ display_row(Row) -> io:fwrite(lists:flatten(rabbit_misc:intersperse("\t", Row))), io:nl(). -format_info_item(Items, Key) -> - {value, Info = {Key, Value}} = lists:keysearch(Key, 1, Items), - case Info of - {_, #resource{name = Name}} -> - url_encode(Name); - _ when Key =:= address; Key =:= peer_address andalso is_tuple(Value) -> +format_info_item(Key, Items) -> + case proplists:get_value(Key, Items) of + #resource{name = Name} -> + escape(Name); + Value when Key =:= address; Key =:= peer_address andalso + is_tuple(Value) -> inet_parse:ntoa(Value); - _ when is_pid(Value) -> + Value when is_pid(Value) -> atom_to_list(node(Value)); - _ when is_binary(Value) -> - url_encode(Value); - _ -> + Value when is_binary(Value) -> + escape(Value); + Value when is_atom(Value) -> + escape(atom_to_list(Value)); + Value -> io_lib:format("~w", [Value]) end. display_list(L) when is_list(L) -> lists:foreach(fun (I) when is_binary(I) -> - io:format("~s~n", [url_encode(I)]); + io:format("~s~n", [escape(I)]); (I) when is_tuple(I) -> - display_row([url_encode(V) || V <- tuple_to_list(I)]) + display_row([escape(V) + || V <- tuple_to_list(I)]) end, lists:sort(L)), ok; @@ -356,32 +389,25 @@ call(Node, {Mod, Fun, Args}) -> rpc_call(Node, Mod, Fun, Args) -> rpc:call(Node, Mod, Fun, Args, ?RPC_TIMEOUT). -%% url_encode is lifted from ibrowse, modified to preserve some characters -url_encode(Bin) when binary(Bin) -> - url_encode_char(lists:reverse(binary_to_list(Bin)), []). - -url_encode_char([X | T], Acc) when X >= $a, X =< $z -> - url_encode_char(T, [X | Acc]); -url_encode_char([X | T], Acc) when X >= $A, X =< $Z -> - url_encode_char(T, [X | Acc]); -url_encode_char([X | T], Acc) when X >= $0, X =< $9 -> - url_encode_char(T, [X | Acc]); -url_encode_char([X | T], Acc) - when X == $-; X == $_; X == $.; X == $~; - X == $!; X == $*; X == $'; X == $(; - X == $); X == $;; X == $:; X == $@; - X == $&; X == $=; X == $+; X == $$; - X == $,; X == $/; X == $?; X == $%; - X == $#; X == $[; X == $] -> - url_encode_char(T, [X | Acc]); -url_encode_char([X | T], Acc) -> - url_encode_char(T, [$%, d2h(X bsr 4), d2h(X band 16#0f) | Acc]); -url_encode_char([], Acc) -> +%% escape does C-style backslash escaping of non-printable ASCII +%% characters. We don't escape characters above 127, since they may +%% form part of UTF-8 strings. + +escape(Bin) when binary(Bin) -> + escape(binary_to_list(Bin)); +escape(L) when is_list(L) -> + escape_char(lists:reverse(L), []). + +escape_char([$\\ | T], Acc) -> + escape_char(T, [$\\, $\\ | Acc]); +escape_char([X | T], Acc) when X > 32, X /= 127 -> + escape_char(T, [X | Acc]); +escape_char([X | T], Acc) -> + escape_char(T, [$\\, $0 + (X bsr 6), $0 + (X band 8#070 bsr 3), + $0 + (X band 7) | Acc]); +escape_char([], Acc) -> Acc. -d2h(N) when N<10 -> N+$0; -d2h(N) -> N+$a-10. - list_replace(Find, Replace, List) -> [case X of Find -> Replace; _ -> X end || X <- List]. diff --git a/src/rabbit_dialyzer.erl b/src/rabbit_dialyzer.erl new file mode 100644 index 0000000000..23e6fc4432 --- /dev/null +++ b/src/rabbit_dialyzer.erl @@ -0,0 +1,91 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_dialyzer). +-include("rabbit.hrl"). + +-export([create_basic_plt/1, add_to_plt/2, dialyze_files/2, halt_with_code/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(create_basic_plt/1 :: (string()) -> 'ok'). +-spec(add_to_plt/2 :: (string(), string()) -> 'ok'). +-spec(dialyze_files/2 :: (string(), string()) -> 'ok'). +-spec(halt_with_code/1 :: (atom()) -> no_return()). + +-endif. + +%%---------------------------------------------------------------------------- + +create_basic_plt(BasicPltPath) -> + OptsRecord = dialyzer_options:build( + [{analysis_type, plt_build}, + {output_plt, BasicPltPath}, + {files_rec, otp_apps_dependencies_paths()}]), + dialyzer_cl:start(OptsRecord), + ok. + +add_to_plt(PltPath, FilesString) -> + {ok, Files} = regexp:split(FilesString, " "), + DialyzerWarnings = dialyzer:run([{analysis_type, plt_add}, + {init_plt, PltPath}, + {output_plt, PltPath}, + {files, Files}]), + print_warnings(DialyzerWarnings), + ok. + +dialyze_files(PltPath, ModifiedFiles) -> + {ok, Files} = regexp:split(ModifiedFiles, " "), + DialyzerWarnings = dialyzer:run([{init_plt, PltPath}, + {files, Files}]), + case DialyzerWarnings of + [] -> io:format("~nOk~n"), + ok; + _ -> io:format("~nFAILED with the following warnings:~n"), + print_warnings(DialyzerWarnings), + fail + end. + +print_warnings(Warnings) -> + [io:format("~s", [dialyzer:format_warning(W)]) || W <- Warnings], + io:format("~n"), + ok. + +otp_apps_dependencies_paths() -> + [code:lib_dir(App, ebin) || + App <- [kernel, stdlib, sasl, mnesia, os_mon, ssl, eunit, tools]]. + +halt_with_code(ok) -> + halt(); +halt_with_code(fail) -> + halt(1). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index f17ee2f530..37a1357d41 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -61,7 +61,7 @@ 'exchange_not_found' | 'exchange_and_queue_not_found'}). -spec(recover/0 :: () -> 'ok'). --spec(declare/4 :: (exchange_name(), exchange_type(), bool(), amqp_table()) -> exchange()). +-spec(declare/4 :: (exchange_name(), exchange_type(), boolean(), amqp_table()) -> exchange()). -spec(check_type/1 :: (binary()) -> atom()). -spec(assert_type/2 :: (exchange(), atom()) -> 'ok'). -spec(lookup/1 :: (exchange_name()) -> {'ok', exchange()} | not_found()). @@ -82,9 +82,9 @@ [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). -spec(delete_queue_bindings/1 :: (queue_name()) -> 'ok'). -spec(delete_transient_queue_bindings/1 :: (queue_name()) -> 'ok'). --spec(topic_matches/2 :: (binary(), binary()) -> bool()). --spec(headers_match/2 :: (amqp_table(), amqp_table()) -> bool()). --spec(delete/2 :: (exchange_name(), bool()) -> +-spec(topic_matches/2 :: (binary(), binary()) -> boolean()). +-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> boolean()). +-spec(delete/2 :: (exchange_name(), boolean()) -> 'ok' | not_found() | {'error', 'in_use'}). -spec(list_queue_bindings/1 :: (queue_name()) -> [{exchange_name(), routing_key(), amqp_table()}]). diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index 2be005034e..b789fbd1e0 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -42,6 +42,7 @@ terminate/2, code_change/3]). -define(SERVER, ?MODULE). +-define(SERIAL_FILENAME, "rabbit_serial"). -record(state, {serial}). @@ -59,17 +60,28 @@ %%---------------------------------------------------------------------------- start_link() -> - %% The persister can get heavily loaded, and we don't want that to - %% impact guid generation. We therefore keep the serial in a - %% separate process rather than calling rabbit_persister:serial/0 - %% directly in the functions below. gen_server:start_link({local, ?SERVER}, ?MODULE, - [rabbit_persister:serial()], []). + [update_disk_serial()], []). + +update_disk_serial() -> + Filename = filename:join(rabbit_mnesia:dir(), ?SERIAL_FILENAME), + Serial = case rabbit_misc:read_term_file(Filename) of + {ok, [Num]} -> Num; + {error, enoent} -> rabbit_persister:serial(); + {error, Reason} -> + throw({error, {cannot_read_serial_file, Filename, Reason}}) + end, + case rabbit_misc:write_term_file(Filename, [Serial + 1]) of + ok -> ok; + {error, Reason1} -> + throw({error, {cannot_write_serial_file, Filename, Reason1}}) + end, + Serial. %% generate a guid that is monotonically increasing per process. %% %% The id is only unique within a single cluster and as long as the -%% persistent message store hasn't been deleted. +%% serial store hasn't been deleted. guid() -> %% We don't use erlang:now() here because a) it may return %% duplicates when the system clock has been rewound prior to a @@ -77,7 +89,7 @@ guid() -> %% now() to move ahead of the system time), and b) it is really %% slow since it takes a global lock and makes a system call. %% - %% rabbit_persister:serial/0, in combination with self/0 (which + %% A persisted serial number, in combination with self/0 (which %% includes the node name) uniquely identifies a process in space %% and time. We combine that with a process-local counter to give %% us a GUID that is monotonically increasing per process. diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index 0a68c9adad..ed0066fe07 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -53,7 +53,7 @@ start_heartbeat(Sock, TimeoutSec) -> spawn_link(fun () -> heartbeater(Sock, TimeoutSec * 1000 div 2, send_oct, 0, fun () -> - catch gen_tcp:send(Sock, rabbit_binary_generator:build_heartbeat_frame()), + catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()), continue end, erlang:monitor(process, Parent)) end), @@ -73,7 +73,7 @@ heartbeater(Sock, TimeoutMillisec, StatName, Threshold, Handler, MonitorRef) -> {'DOWN', MonitorRef, process, _Object, _Info} -> ok; Other -> exit({unexpected_message, Other}) after TimeoutMillisec -> - case inet:getstat(Sock, [StatName]) of + case rabbit_net:getstat(Sock, [StatName]) of {ok, [{StatName, NewStatVal}]} -> if NewStatVal =/= StatVal -> F({NewStatVal, 0}); diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 9f3dcbd071..087a9f64d9 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -47,7 +47,7 @@ -spec(start_link/1 :: (pid()) -> pid()). -spec(shutdown/1 :: (maybe_pid()) -> 'ok'). -spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). --spec(can_send/3 :: (maybe_pid(), pid(), bool()) -> bool()). +-spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). -spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). diff --git a/src/rabbit_load.erl b/src/rabbit_load.erl index 7bf85347fb..6ef638cb59 100644 --- a/src/rabbit_load.erl +++ b/src/rabbit_load.erl @@ -41,7 +41,7 @@ -ifdef(use_specs). -type(erlang_node() :: atom()). --type(load() :: {{non_neg_integer(), float()}, erlang_node()}). +-type(load() :: {{non_neg_integer(), integer() | 'unknown'}, erlang_node()}). -spec(local_load/0 :: () -> load()). -spec(remote_loads/0 :: () -> [load()]). -spec(pick/0 :: () -> erlang_node()). @@ -52,8 +52,11 @@ local_load() -> LoadAvg = case whereis(cpu_sup) of - undefined -> 0.0; - _Other -> cpu_sup:avg1() + undefined -> unknown; + _ -> case cpu_sup:avg1() of + L when is_integer(L) -> L; + {error, timeout} -> unknown + end end, {{statistics(run_queue), LoadAvg}, node()}. @@ -65,8 +68,12 @@ remote_loads() -> pick() -> RemoteLoads = remote_loads(), {{RunQ, LoadAvg}, Node} = local_load(), - %% add bias towards current node - AdjustedLoadAvg = LoadAvg * ?FUDGE_FACTOR, + %% add bias towards current node; we rely on Erlang's term order + %% of SomeFloat < local_unknown < unknown. + AdjustedLoadAvg = case LoadAvg of + unknown -> local_unknown; + _ -> LoadAvg * ?FUDGE_FACTOR + end, Loads = [{{RunQ, AdjustedLoadAvg}, Node} | RemoteLoads], {_, SelectedNode} = lists:min(Loads), SelectedNode. diff --git a/src/rabbit_memsup.erl b/src/rabbit_memsup.erl new file mode 100644 index 0000000000..b0d57cb27e --- /dev/null +++ b/src/rabbit_memsup.erl @@ -0,0 +1,142 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_memsup). + +-behaviour(gen_server). + +-export([start_link/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-export([update/0]). + +-record(state, {memory_fraction, + timeout, + timer, + mod, + mod_state, + alarmed + }). + +-define(SERVER, memsup). %% must be the same as the standard memsup + +-define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_link/1 :: (atom()) -> {'ok', pid()} | 'ignore' | {'error', any()}). +-spec(update/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +start_link(Args) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [Args], []). + +update() -> + gen_server:cast(?SERVER, update). + +%%---------------------------------------------------------------------------- + +init([Mod]) -> + Fraction = os_mon:get_env(memsup, system_memory_high_watermark), + TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL), + InitState = Mod:init(), + State = #state { memory_fraction = Fraction, + timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL, + timer = TRef, + mod = Mod, + mod_state = InitState, + alarmed = false }, + {ok, internal_update(State)}. + +start_timer(Timeout) -> + {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []), + TRef. + +%% Export the same API as the real memsup. Note that +%% get_sysmem_high_watermark gives an int in the range 0 - 100, while +%% set_sysmem_high_watermark takes a float in the range 0.0 - 1.0. +handle_call(get_sysmem_high_watermark, _From, State) -> + {reply, trunc(100 * State#state.memory_fraction), State}; + +handle_call({set_sysmem_high_watermark, Float}, _From, State) -> + {reply, ok, State#state{memory_fraction = Float}}; + +handle_call(get_check_interval, _From, State) -> + {reply, State#state.timeout, State}; + +handle_call({set_check_interval, Timeout}, _From, State) -> + {ok, cancel} = timer:cancel(State#state.timer), + {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}}; + +handle_call(get_memory_data, _From, + State = #state { mod = Mod, mod_state = ModState }) -> + {reply, Mod:get_memory_data(ModState), State}; + +handle_call(_Request, _From, State) -> + {noreply, State}. + +handle_cast(update, State) -> + {noreply, internal_update(State)}; + +handle_cast(_Request, State) -> + {noreply, State}. + +handle_info(_Info, State) -> + {noreply, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +internal_update(State = #state { memory_fraction = MemoryFraction, + alarmed = Alarmed, + mod = Mod, mod_state = ModState }) -> + ModState1 = Mod:update(ModState), + {MemTotal, MemUsed, _BigProc} = Mod:get_memory_data(ModState1), + NewAlarmed = MemUsed / MemTotal > MemoryFraction, + case {Alarmed, NewAlarmed} of + {false, true} -> + alarm_handler:set_alarm({system_memory_high_watermark, []}); + {true, false} -> + alarm_handler:clear_alarm(system_memory_high_watermark); + _ -> + ok + end, + State #state { mod_state = ModState1, alarmed = NewAlarmed }. diff --git a/src/rabbit_memsup_darwin.erl b/src/rabbit_memsup_darwin.erl new file mode 100644 index 0000000000..3de2d8430e --- /dev/null +++ b/src/rabbit_memsup_darwin.erl @@ -0,0 +1,88 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_memsup_darwin). + +-export([init/0, update/1, get_memory_data/1]). + +-record(state, {total_memory, + allocated_memory}). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(state() :: #state { total_memory :: ('undefined' | non_neg_integer()), + allocated_memory :: ('undefined' | non_neg_integer()) + }). + +-spec(init/0 :: () -> state()). +-spec(update/1 :: (state()) -> state()). +-spec(get_memory_data/1 :: (state()) -> {non_neg_integer(), non_neg_integer(), + ('undefined' | pid())}). + +-endif. + +%%---------------------------------------------------------------------------- + +init() -> + #state{total_memory = undefined, + allocated_memory = undefined}. + +update(State) -> + File = os:cmd("/usr/bin/vm_stat"), + Lines = string:tokens(File, "\n"), + Dict = dict:from_list(lists:map(fun parse_line/1, Lines)), + [PageSize, Inactive, Active, Free, Wired] = + [dict:fetch(Key, Dict) || + Key <- [page_size, 'Pages inactive', 'Pages active', 'Pages free', + 'Pages wired down']], + MemTotal = PageSize * (Inactive + Active + Free + Wired), + MemUsed = PageSize * (Active + Wired), + State#state{total_memory = MemTotal, allocated_memory = MemUsed}. + +get_memory_data(State) -> + {State#state.total_memory, State#state.allocated_memory, undefined}. + +%%---------------------------------------------------------------------------- + +%% A line looks like "Foo bar: 123456." +parse_line(Line) -> + [Name, RHS | _Rest] = string:tokens(Line, ":"), + case Name of + "Mach Virtual Memory Statistics" -> + ["(page", "size", "of", PageSize, "bytes)"] = + string:tokens(RHS, " "), + {page_size, list_to_integer(PageSize)}; + _ -> + [Value | _Rest1] = string:tokens(RHS, " ."), + {list_to_atom(Name), list_to_integer(Value)} + end. diff --git a/src/rabbit_memsup_linux.erl b/src/rabbit_memsup_linux.erl index ffdc7e9946..ca942d7caa 100644 --- a/src/rabbit_memsup_linux.erl +++ b/src/rabbit_memsup_linux.erl @@ -31,104 +31,44 @@ -module(rabbit_memsup_linux). --behaviour(gen_server). +-export([init/0, update/1, get_memory_data/1]). --export([start_link/0]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --export([update/0]). - --define(SERVER, memsup). %% must be the same as the standard memsup - --define(DEFAULT_MEMORY_CHECK_INTERVAL, 1000). - --record(state, {memory_fraction, alarmed, timeout, timer}). +-record(state, {total_memory, + allocated_memory}). %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). --spec(update/0 :: () -> 'ok'). - --endif. - -%%---------------------------------------------------------------------------- - -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +-type(state() :: #state { total_memory :: ('undefined' | non_neg_integer()), + allocated_memory :: ('undefined' | non_neg_integer()) + }). +-spec(init/0 :: () -> state()). +-spec(update/1 :: (state()) -> state()). +-spec(get_memory_data/1 :: (state()) -> {non_neg_integer(), non_neg_integer(), + ('undefined' | pid())}). -update() -> - gen_server:cast(?SERVER, update). +-endif. %%---------------------------------------------------------------------------- -init(_Args) -> - Fraction = os_mon:get_env(memsup, system_memory_high_watermark), - TRef = start_timer(?DEFAULT_MEMORY_CHECK_INTERVAL), - {ok, #state{alarmed = false, - memory_fraction = Fraction, - timeout = ?DEFAULT_MEMORY_CHECK_INTERVAL, - timer = TRef}}. - -start_timer(Timeout) -> - {ok, TRef} = timer:apply_interval(Timeout, ?MODULE, update, []), - TRef. - -%% Export the same API as the real memsup. Note that -%% get_sysmem_high_watermark gives an int in the range 0 - 100, while -%% set_sysmem_high_watermark takes a float in the range 0.0 - 1.0. -handle_call(get_sysmem_high_watermark, _From, State) -> - {reply, trunc(100 * State#state.memory_fraction), State}; - -handle_call({set_sysmem_high_watermark, Float}, _From, State) -> - {reply, ok, State#state{memory_fraction = Float}}; +init() -> + #state{total_memory = undefined, + allocated_memory = undefined}. -handle_call(get_check_interval, _From, State) -> - {reply, State#state.timeout, State}; - -handle_call({set_check_interval, Timeout}, _From, State) -> - {ok, cancel} = timer:cancel(State#state.timer), - {reply, ok, State#state{timeout = Timeout, timer = start_timer(Timeout)}}; - -handle_call(_Request, _From, State) -> - {noreply, State}. - -handle_cast(update, State = #state{alarmed = Alarmed, - memory_fraction = MemoryFraction}) -> +update(State) -> File = read_proc_file("/proc/meminfo"), Lines = string:tokens(File, "\n"), Dict = dict:from_list(lists:map(fun parse_line/1, Lines)), - MemTotal = dict:fetch('MemTotal', Dict), - MemUsed = MemTotal - - dict:fetch('MemFree', Dict) - - dict:fetch('Buffers', Dict) - - dict:fetch('Cached', Dict), - NewAlarmed = MemUsed / MemTotal > MemoryFraction, - case {Alarmed, NewAlarmed} of - {false, true} -> - alarm_handler:set_alarm({system_memory_high_watermark, []}); - {true, false} -> - alarm_handler:clear_alarm(system_memory_high_watermark); - _ -> - ok - end, - {noreply, State#state{alarmed = NewAlarmed}}; - -handle_cast(_Request, State) -> - {noreply, State}. - -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. + [MemTotal, MemFree, Buffers, Cached] = + [dict:fetch(Key, Dict) || + Key <- ['MemTotal', 'MemFree', 'Buffers', 'Cached']], + MemUsed = MemTotal - MemFree - Buffers - Cached, + State#state{total_memory = MemTotal, allocated_memory = MemUsed}. + +get_memory_data(State) -> + {State#state.total_memory, State#state.allocated_memory, undefined}. %%---------------------------------------------------------------------------- @@ -152,5 +92,10 @@ read_proc_file(IoDevice, Acc) -> %% A line looks like "FooBar: 123456 kB" parse_line(Line) -> - [Name, Value | _] = string:tokens(Line, ": "), - {list_to_atom(Name), list_to_integer(Value)}. + [Name, RHS | _Rest] = string:tokens(Line, ":"), + [Value | UnitsRest] = string:tokens(RHS, " "), + Value1 = case UnitsRest of + [] -> list_to_integer(Value); %% no units + ["kB"] -> list_to_integer(Value) * 1024 + end, + {list_to_atom(Name), Value1}. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index abf4c7ccfa..b20e9a86b6 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -35,7 +35,8 @@ -include_lib("kernel/include/file.hrl"). -export([method_record_type/1, polite_pause/0, polite_pause/1]). --export([die/1, frame_error/2, protocol_error/3, protocol_error/4]). +-export([die/1, frame_error/2, amqp_error/4, + protocol_error/3, protocol_error/4]). -export([not_found/1]). -export([get_config/1, get_config/2, set_config/2]). -export([dirty_read/1]). @@ -46,13 +47,15 @@ -export([with_user/2, with_vhost/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). -export([ensure_ok/2]). --export([localnode/1, tcp_name/3]). +-export([localnode/1, nodehost/1, cookie_hash/0, tcp_name/3]). -export([intersperse/2, upmap/2, map_in_order/2]). -export([table_foreach/2]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). +-export([read_term_file/1, write_term_file/2]). -export([append_file/2, ensure_parent_dirs_exist/1]). -export([format_stderr/2]). -export([start_applications/1, stop_applications/1]). +-export([unfold/2, ceil/1]). -import(mnesia). -import(lists). @@ -65,15 +68,16 @@ -include_lib("kernel/include/inet.hrl"). +-type(ok_or_error() :: 'ok' | {'error', any()}). + -spec(method_record_type/1 :: (tuple()) -> atom()). -spec(polite_pause/0 :: () -> 'done'). -spec(polite_pause/1 :: (non_neg_integer()) -> 'done'). -spec(die/1 :: (atom()) -> no_return()). -spec(frame_error/2 :: (atom(), binary()) -> no_return()). --spec(protocol_error/3 :: - (atom() | amqp_error(), string(), [any()]) -> no_return()). --spec(protocol_error/4 :: - (atom() | amqp_error(), string(), [any()], atom()) -> no_return()). +-spec(amqp_error/4 :: (atom(), string(), [any()], atom()) -> amqp_error()). +-spec(protocol_error/3 :: (atom(), string(), [any()]) -> no_return()). +-spec(protocol_error/4 :: (atom(), string(), [any()], atom()) -> no_return()). -spec(not_found/1 :: (r(atom())) -> no_return()). -spec(get_config/1 :: (atom()) -> {'ok', any()} | not_found()). -spec(get_config/2 :: (atom(), A) -> A). @@ -88,9 +92,9 @@ -spec(r_arg/4 :: (vhost() | r(atom()), K, amqp_table(), binary()) -> undefined | r(K) when is_subtype(K, atom())). -spec(rs/1 :: (r(atom())) -> string()). --spec(enable_cover/0 :: () -> 'ok' | {'error', any()}). +-spec(enable_cover/0 :: () -> ok_or_error()). -spec(report_cover/0 :: () -> 'ok'). --spec(enable_cover/1 :: (string()) -> 'ok' | {'error', any()}). +-spec(enable_cover/1 :: (string()) -> ok_or_error()). -spec(report_cover/1 :: (string()) -> 'ok'). -spec(throw_on_error/2 :: (atom(), thunk({error, any()} | {ok, A} | A)) -> A). @@ -100,8 +104,10 @@ -spec(with_vhost/2 :: (vhost(), thunk(A)) -> A). -spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A). -spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A). --spec(ensure_ok/2 :: ('ok' | {'error', any()}, atom()) -> 'ok'). +-spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok'). -spec(localnode/1 :: (atom()) -> erlang_node()). +-spec(nodehost/1 :: (erlang_node()) -> string()). +-spec(cookie_hash/0 :: () -> string()). -spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()). -spec(intersperse/2 :: (A, [A]) -> [A]). -spec(upmap/2 :: (fun ((A) -> B), [A]) -> [B]). @@ -110,12 +116,16 @@ -spec(dirty_read_all/1 :: (atom()) -> [any()]). -spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom()) -> 'ok' | 'aborted'). --spec(dirty_dump_log/1 :: (string()) -> 'ok' | {'error', any()}). --spec(append_file/2 :: (string(), string()) -> 'ok' | {'error', any()}). +-spec(dirty_dump_log/1 :: (string()) -> ok_or_error()). +-spec(read_term_file/1 :: (string()) -> {'ok', [any()]} | {'error', any()}). +-spec(write_term_file/2 :: (string(), [any()]) -> ok_or_error()). +-spec(append_file/2 :: (string(), string()) -> ok_or_error()). -spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok'). -spec(format_stderr/2 :: (string(), [any()]) -> 'ok'). -spec(start_applications/1 :: ([atom()]) -> 'ok'). -spec(stop_applications/1 :: ([atom()]) -> 'ok'). +-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}). +-spec(ceil/1 :: (number()) -> number()). -endif. @@ -136,15 +146,17 @@ die(Error) -> protocol_error(Error, "~w", [Error]). frame_error(MethodName, BinaryFields) -> - protocol_error(frame_error, "cannot decode ~w", - [BinaryFields], MethodName). + protocol_error(frame_error, "cannot decode ~w", [BinaryFields], MethodName). + +amqp_error(Name, ExplanationFormat, Params, Method) -> + Explanation = lists:flatten(io_lib:format(ExplanationFormat, Params)), + #amqp_error{name = Name, explanation = Explanation, method = Method}. -protocol_error(Error, Explanation, Params) -> - protocol_error(Error, Explanation, Params, none). +protocol_error(Name, ExplanationFormat, Params) -> + protocol_error(Name, ExplanationFormat, Params, none). -protocol_error(Error, Explanation, Params, Method) -> - CompleteExplanation = lists:flatten(io_lib:format(Explanation, Params)), - exit({amqp, Error, CompleteExplanation, Method}). +protocol_error(Name, ExplanationFormat, Params, Method) -> + exit(amqp_error(Name, ExplanationFormat, Params, Method)). not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]). @@ -297,11 +309,15 @@ ensure_ok(ok, _) -> ok; ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}). localnode(Name) -> + list_to_atom(lists:append([atom_to_list(Name), "@", nodehost(node())])). + +nodehost(Node) -> %% This is horrible, but there doesn't seem to be a way to split a %% nodename into its constituent parts. - list_to_atom(lists:append(atom_to_list(Name), - lists:dropwhile(fun (E) -> E =/= $@ end, - atom_to_list(node())))). + tl(lists:dropwhile(fun (E) -> E =/= $@ end, atom_to_list(Node))). + +cookie_hash() -> + ssl_base64:encode(erlang:md5(atom_to_list(erlang:get_cookie()))). tcp_name(Prefix, IPAddress, Port) when is_atom(Prefix) andalso is_number(Port) -> @@ -360,7 +376,9 @@ dirty_foreach_key1(F, TableName, K) -> end. dirty_dump_log(FileName) -> - {ok, LH} = disk_log:open([{name, dirty_dump_log}, {mode, read_only}, {file, FileName}]), + {ok, LH} = disk_log:open([{name, dirty_dump_log}, + {mode, read_only}, + {file, FileName}]), dirty_dump_log1(LH, disk_log:chunk(LH, start)), disk_log:close(LH). @@ -374,6 +392,12 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) -> dirty_dump_log1(LH, disk_log:chunk(LH, K)). +read_term_file(File) -> file:consult(File). + +write_term_file(File, Terms) -> + file:write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) || + Term <- Terms])). + append_file(File, Suffix) -> case file:read_file_info(File) of {ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix); @@ -444,3 +468,18 @@ stop_applications(Apps) -> cannot_stop_application, Apps). +unfold(Fun, Init) -> + unfold(Fun, [], Init). + +unfold(Fun, Acc, Init) -> + case Fun(Init) of + {true, E, I} -> unfold(Fun, [E|Acc], I); + false -> {Acc, Init} + end. + +ceil(N) -> + T = trunc(N), + case N - T of + 0 -> N; + _ -> 1 + T + end. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 575ecb0adc..c4d5aac684 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -50,7 +50,7 @@ -spec(dir/0 :: () -> string()). -spec(ensure_mnesia_dir/0 :: () -> 'ok'). -spec(init/0 :: () -> 'ok'). --spec(is_db_empty/0 :: () -> bool()). +-spec(is_db_empty/0 :: () -> boolean()). -spec(cluster/1 :: ([erlang_node()]) -> 'ok'). -spec(reset/0 :: () -> 'ok'). -spec(force_reset/0 :: () -> 'ok'). @@ -149,6 +149,11 @@ table_definitions() -> table_names() -> [Tab || {Tab, _} <- table_definitions()]. +replicated_table_names() -> + [Tab || {Tab, Attrs} <- table_definitions(), + not lists:member({local_content, true}, Attrs) + ]. + dir() -> mnesia:system_info(directory). ensure_mnesia_dir() -> @@ -192,28 +197,16 @@ cluster_nodes_config_filename() -> create_cluster_nodes_config(ClusterNodes) -> FileName = cluster_nodes_config_filename(), - Handle = case file:open(FileName, [write]) of - {ok, Device} -> Device; - {error, Reason} -> - throw({error, {cannot_create_cluster_nodes_config, - FileName, Reason}}) - end, - try - ok = io:write(Handle, ClusterNodes), - ok = io:put_chars(Handle, [$.]) - after - case file:close(Handle) of - ok -> ok; - {error, Reason1} -> - throw({error, {cannot_close_cluster_nodes_config, - FileName, Reason1}}) - end - end, - ok. + case rabbit_misc:write_term_file(FileName, [ClusterNodes]) of + ok -> ok; + {error, Reason} -> + throw({error, {cannot_create_cluster_nodes_config, + FileName, Reason}}) + end. read_cluster_nodes_config() -> FileName = cluster_nodes_config_filename(), - case file:consult(FileName) of + case rabbit_misc:read_term_file(FileName) of {ok, [ClusterNodes]} -> ClusterNodes; {error, enoent} -> case application:get_env(cluster_config) of @@ -250,12 +243,10 @@ delete_cluster_nodes_config() -> %% standalone disk node, or disk or ram node connected to the %% specified cluster nodes. init_db(ClusterNodes) -> - WasDiskNode = mnesia:system_info(use_dir), - IsDiskNode = ClusterNodes == [] orelse - lists:member(node(), ClusterNodes), case mnesia:change_config(extra_db_nodes, ClusterNodes -- [node()]) of {ok, []} -> - if WasDiskNode and IsDiskNode -> + case mnesia:system_info(use_dir) of + true -> case check_schema_integrity() of ok -> ok; @@ -270,22 +261,18 @@ init_db(ClusterNodes) -> ok = move_db(), ok = create_schema() end; - WasDiskNode -> - throw({error, {cannot_convert_disk_node_to_ram_node, - ClusterNodes}}); - IsDiskNode -> - ok = create_schema(); - true -> - throw({error, {unable_to_contact_cluster_nodes, - ClusterNodes}}) + false -> + ok = create_schema() end; {ok, [_|_]} -> - ok = wait_for_tables(), - ok = create_local_table_copies( - case IsDiskNode of - true -> disc; - false -> ram - end); + IsDiskNode = ClusterNodes == [] orelse + lists:member(node(), ClusterNodes), + ok = wait_for_replicated_tables(), + ok = create_local_table_copy(schema, disc_copies), + ok = create_local_table_copies(case IsDiskNode of + true -> disc; + false -> ram + end); {error, Reason} -> %% one reason we may end up here is if we try to join %% nodes together that are currently running standalone or @@ -336,40 +323,36 @@ create_tables() -> table_definitions()), ok. +table_has_copy_type(TabDef, DiscType) -> + lists:member(node(), proplists:get_value(DiscType, TabDef, [])). + create_local_table_copies(Type) -> - ok = if Type /= ram -> create_local_table_copy(schema, disc_copies); - true -> ok - end, lists:foreach( fun({Tab, TabDef}) -> - HasDiscCopies = - lists:keymember(disc_copies, 1, TabDef), - HasDiscOnlyCopies = - lists:keymember(disc_only_copies, 1, TabDef), + HasDiscCopies = table_has_copy_type(TabDef, disc_copies), + HasDiscOnlyCopies = table_has_copy_type(TabDef, disc_only_copies), + LocalTab = proplists:get_bool(local_content, TabDef), StorageType = - case Type of - disc -> + if + Type =:= disc orelse LocalTab -> if - HasDiscCopies -> disc_copies; + HasDiscCopies -> disc_copies; HasDiscOnlyCopies -> disc_only_copies; - true -> ram_copies + true -> ram_copies end; %% unused code - commented out to keep dialyzer happy -%% disc_only -> +%% Type =:= disc_only -> %% if %% HasDiscCopies or HasDiscOnlyCopies -> %% disc_only_copies; %% true -> ram_copies %% end; - ram -> + Type =:= ram -> ram_copies end, ok = create_local_table_copy(Tab, StorageType) end, table_definitions()), - ok = if Type == ram -> create_local_table_copy(schema, ram_copies); - true -> ok - end, ok. create_local_table_copy(Tab, Type) -> @@ -384,10 +367,14 @@ create_local_table_copy(Tab, Type) -> end, ok. -wait_for_tables() -> +wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()). + +wait_for_tables() -> wait_for_tables(table_names()). + +wait_for_tables(TableNames) -> case check_schema_integrity() of ok -> - case mnesia:wait_for_tables(table_names(), 30000) of + case mnesia:wait_for_tables(TableNames, 30000) of ok -> ok; {timeout, BadTabs} -> throw({error, {timeout_waiting_for_tables, BadTabs}}); diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index d91975359a..b1cc4d028f 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -114,12 +114,13 @@ action(status, [], RpcTimeout) -> io:format("Status of all running nodes...~n", []), call_all_nodes( fun({Node, Pid}) -> - Status = rpc:call(Node, rabbit, status, [], RpcTimeout), + RabbitRunning = + case is_rabbit_running(Node, RpcTimeout) of + false -> not_running; + true -> running + end, io:format("Node '~p' with Pid ~p: ~p~n", - [Node, Pid, case parse_status(Status) of - false -> not_running; - true -> running - end]) + [Node, Pid, RabbitRunning]) end); action(stop_all, [], RpcTimeout) -> @@ -197,7 +198,7 @@ start_node(NodeName, NodePort, RpcTimeout) -> wait_for_rabbit_to_start(_ , RpcTimeout, _) when RpcTimeout < 0 -> false; wait_for_rabbit_to_start(Node, RpcTimeout, Port) -> - case parse_status(rpc:call(Node, rabbit, status, [])) of + case is_rabbit_running(Node, RpcTimeout) of true -> true; false -> receive {'EXIT', Port, PosixCode} -> @@ -211,22 +212,20 @@ wait_for_rabbit_to_start(Node, RpcTimeout, Port) -> run_cmd(FullPath) -> erlang:open_port({spawn, FullPath}, [nouse_stdio]). -parse_status({badrpc, _}) -> - false; - -parse_status(Status) -> - case lists:keysearch(running_applications, 1, Status) of - {value, {running_applications, Apps}} -> - lists:keymember(rabbit, 1, Apps); - _ -> - false +is_rabbit_running(Node, RpcTimeout) -> + case rpc:call(Node, rabbit, status, [], RpcTimeout) of + {badrpc, _} -> false; + Status -> case proplists:get_value(running_applications, Status) of + undefined -> false; + Apps -> lists:keymember(rabbit, 1, Apps) + end end. with_os(Handlers) -> {OsFamily, _} = os:type(), - case lists:keysearch(OsFamily, 1, Handlers) of - {value, {_, Handler}} -> Handler(); - false -> throw({unsupported_os, OsFamily}) + case proplists:get_value(OsFamily, Handlers) of + undefined -> throw({unsupported_os, OsFamily}); + Handler -> Handler() end. script_filename() -> diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl new file mode 100644 index 0000000000..a5ccc8e9ae --- /dev/null +++ b/src/rabbit_net.erl @@ -0,0 +1,132 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_net). +-include("rabbit.hrl"). +-include_lib("kernel/include/inet.hrl"). + +-export([async_recv/3, close/1, controlling_process/2, + getstat/2, peername/1, port_command/2, + send/2, sockname/1]). +%%--------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(stat_option() :: + 'recv_cnt' | 'recv_max' | 'recv_avg' | 'recv_oct' | 'recv_dvi' | + 'send_cnt' | 'send_max' | 'send_avg' | 'send_oct' | 'send_pend'). +-type(error() :: {'error', any()}). + +-spec(async_recv/3 :: (socket(), integer(), timeout()) -> {'ok', any()}). +-spec(close/1 :: (socket()) -> 'ok' | error()). +-spec(controlling_process/2 :: (socket(), pid()) -> 'ok' | error()). +-spec(port_command/2 :: (socket(), iolist()) -> 'true'). +-spec(send/2 :: (socket(), binary() | iolist()) -> 'ok' | error()). +-spec(peername/1 :: (socket()) -> + {'ok', {ip_address(), non_neg_integer()}} | error()). +-spec(sockname/1 :: (socket()) -> + {'ok', {ip_address(), non_neg_integer()}} | error()). +-spec(getstat/2 :: (socket(), [stat_option()]) -> + {'ok', [{stat_option(), integer()}]} | error()). + +-endif. + +%%--------------------------------------------------------------------------- + + +async_recv(Sock, Length, Timeout) when is_record(Sock, ssl_socket) -> + Pid = self(), + Ref = make_ref(), + + spawn(fun() -> Pid ! {inet_async, Sock, Ref, + ssl:recv(Sock#ssl_socket.ssl, Length, Timeout)} + end), + + {ok, Ref}; + +async_recv(Sock, Length, infinity) when is_port(Sock) -> + prim_inet:async_recv(Sock, Length, -1); + +async_recv(Sock, Length, Timeout) when is_port(Sock) -> + prim_inet:async_recv(Sock, Length, Timeout). + +close(Sock) when is_record(Sock, ssl_socket) -> + ssl:close(Sock#ssl_socket.ssl); + +close(Sock) when is_port(Sock) -> + gen_tcp:close(Sock). + + +controlling_process(Sock, Pid) when is_record(Sock, ssl_socket) -> + ssl:controlling_process(Sock#ssl_socket.ssl, Pid); + +controlling_process(Sock, Pid) when is_port(Sock) -> + gen_tcp:controlling_process(Sock, Pid). + + +getstat(Sock, Stats) when is_record(Sock, ssl_socket) -> + inet:getstat(Sock#ssl_socket.tcp, Stats); + +getstat(Sock, Stats) when is_port(Sock) -> + inet:getstat(Sock, Stats). + + +peername(Sock) when is_record(Sock, ssl_socket) -> + ssl:peername(Sock#ssl_socket.ssl); + +peername(Sock) when is_port(Sock) -> + inet:peername(Sock). + + +port_command(Sock, Data) when is_record(Sock, ssl_socket) -> + case ssl:send(Sock#ssl_socket.ssl, Data) of + ok -> + self() ! {inet_reply, Sock, ok}, + true; + {error, Reason} -> + erlang:error(Reason) + end; + +port_command(Sock, Data) when is_port(Sock) -> + erlang:port_command(Sock, Data). + +send(Sock, Data) when is_record(Sock, ssl_socket) -> + ssl:send(Sock#ssl_socket.ssl, Data); + +send(Sock, Data) when is_port(Sock) -> + gen_tcp:send(Sock, Data). + + +sockname(Sock) when is_record(Sock, ssl_socket) -> + ssl:sockname(Sock#ssl_socket.ssl); + +sockname(Sock) when is_port(Sock) -> + inet:sockname(Sock). diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 2dbd5a5af2..1bc17a324c 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -31,18 +31,28 @@ -module(rabbit_networking). --export([start/0, start_tcp_listener/2, stop_tcp_listener/2, - on_node_down/1, active_listeners/0, node_listeners/1, - connections/0, connection_info/1, connection_info/2, - connection_info_all/0, connection_info_all/1]). +-export([start/0, start_tcp_listener/2, start_ssl_listener/3, + stop_tcp_listener/2, on_node_down/1, active_listeners/0, + node_listeners/1, connections/0, connection_info/1, + connection_info/2, connection_info_all/0, + connection_info_all/1]). %%used by TCP-based transports, e.g. STOMP adapter -export([check_tcp_listener_address/3]). --export([tcp_listener_started/2, tcp_listener_stopped/2, start_client/1]). +-export([tcp_listener_started/2, tcp_listener_stopped/2, + start_client/1, start_ssl_client/2]). -include("rabbit.hrl"). -include_lib("kernel/include/inet.hrl"). +-define(RABBIT_TCP_OPTS, [ + binary, + {packet, raw}, % no packaging + {reuseaddr, true}, % allow rebind without waiting + %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg. + %% {delay_send, true}, + {exit_on_close, false} + ]). %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -52,6 +62,7 @@ -spec(start/0 :: () -> 'ok'). -spec(start_tcp_listener/2 :: (host(), ip_port()) -> 'ok'). +-spec(start_ssl_listener/3 :: (host(), ip_port(), [info()]) -> 'ok'). -spec(stop_tcp_listener/2 :: (host(), ip_port()) -> 'ok'). -spec(active_listeners/0 :: () -> [listener()]). -spec(node_listeners/1 :: (erlang_node()) -> [listener()]). @@ -90,27 +101,30 @@ check_tcp_listener_address(NamePrefix, Host, Port) -> if is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> ok; true -> error_logger:error_msg("invalid port ~p - not 0..65535~n", [Port]), - throw({error, invalid_port, Port}) + throw({error, {invalid_port, Port}}) end, Name = rabbit_misc:tcp_name(NamePrefix, IPAddress, Port), {IPAddress, Name}. start_tcp_listener(Host, Port) -> - {IPAddress, Name} = check_tcp_listener_address(rabbit_tcp_listener_sup, Host, Port), + start_listener(Host, Port, "TCP Listener", + {?MODULE, start_client, []}). + +start_ssl_listener(Host, Port, SslOpts) -> + start_listener(Host, Port, "SSL Listener", + {?MODULE, start_ssl_client, [SslOpts]}). + +start_listener(Host, Port, Label, OnConnect) -> + {IPAddress, Name} = + check_tcp_listener_address(rabbit_tcp_listener_sup, Host, Port), {ok,_} = supervisor:start_child( rabbit_sup, {Name, {tcp_listener_sup, start_link, - [IPAddress, Port, - [binary, - {packet, raw}, % no packaging - {reuseaddr, true}, % allow rebind without waiting - %% {nodelay, true}, % TCP_NODELAY - disable Nagle's alg. - %% {delay_send, true}, - {exit_on_close, false}], + [IPAddress, Port, ?RABBIT_TCP_OPTS , {?MODULE, tcp_listener_started, []}, {?MODULE, tcp_listener_stopped, []}, - {?MODULE, start_client, []}]}, + OnConnect, Label]}, transient, infinity, supervisor, [tcp_listener_sup]}), ok. @@ -148,10 +162,35 @@ on_node_down(Node) -> start_client(Sock) -> {ok, Child} = supervisor:start_child(rabbit_tcp_client_sup, []), - ok = gen_tcp:controlling_process(Sock, Child), + ok = rabbit_net:controlling_process(Sock, Child), Child ! {go, Sock}, Child. +start_ssl_client(SslOpts, Sock) -> + case rabbit_net:peername(Sock) of + {ok, {PeerAddress, PeerPort}} -> + PeerIp = inet_parse:ntoa(PeerAddress), + case ssl:ssl_accept(Sock, SslOpts) of + {ok, SslSock} -> + rabbit_log:info("upgraded TCP connection " + "from ~s:~p to SSL~n", + [PeerIp, PeerPort]), + RabbitSslSock = #ssl_socket{tcp = Sock, ssl = SslSock}, + start_client(RabbitSslSock); + {error, Reason} -> + gen_tcp:close(Sock), + rabbit_log:error("failed to upgrade TCP connection " + "from ~s:~p to SSL: ~n~p~n", + [PeerIp, PeerPort, Reason]), + {error, Reason} + end; + {error, Reason} -> + gen_tcp:close(Sock), + rabbit_log:error("failed to upgrade TCP connection to SSL: ~p~n", + [Reason]), + {error, Reason} + end. + connections() -> [Pid || {_, Pid, _, _} <- supervisor:which_children( rabbit_tcp_client_sup)]. diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl index 71278bfb2a..f28c4a6ec5 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_plugin_activator.erl @@ -49,6 +49,8 @@ start() -> UnpackedPluginDir = get_env(plugins_expand_dir, ?DefaultUnpackedPluginDir), RabbitEBin = get_env(rabbit_ebin, ?DefaultRabbitEBin), + RootName = RabbitEBin ++ "/rabbit", + %% Unpack any .ez plugins unpack_ez_plugins(PluginDir, UnpackedPluginDir), @@ -60,15 +62,13 @@ start() -> %% Build the entire set of dependencies - this will load the %% applications along the way AllApps = case catch sets:to_list(expand_dependencies(RequiredApps)) of - {unknown_app, {App, Err}} -> - io:format("ERROR: Failed to load application " ++ - "~s: ~p~n", [App, Err]), - halt(1); + {failed_to_load_app, App, Err} -> + error("failed to load application ~s: ~p", [App, Err]); AppList -> AppList end, AppVersions = [determine_version(App) || App <- AllApps], - {value, {rabbit, RabbitVersion}} = lists:keysearch(rabbit, 1, AppVersions), + {rabbit, RabbitVersion} = proplists:lookup(rabbit, AppVersions), %% Build the overall release descriptor RDesc = {release, @@ -77,11 +77,11 @@ start() -> AppVersions}, %% Write it out to ebin/rabbit.rel - file:write_file(RabbitEBin ++ "/rabbit.rel", - io_lib:format("~p.~n", [RDesc])), + file:write_file(RootName ++ ".rel", io_lib:format("~p.~n", [RDesc])), %% Compile the script - case systools:make_script(RabbitEBin ++ "/rabbit", [local, silent]) of + ScriptFile = RootName ++ ".script", + case systools:make_script(RootName, [local, silent]) of {ok, Module, Warnings} -> %% This gets lots of spurious no-source warnings when we %% have .ez files, so we want to supress them to prevent @@ -98,9 +98,19 @@ start() -> end, ok; {error, Module, Error} -> - io:format("Boot file generation failed: ~s~n", - [Module:format_error(Error)]), - halt(1) + error("generation of boot script file ~s failed: ~w", + [ScriptFile, Module:format_error(Error)]) + end, + + case post_process_script(ScriptFile) of + ok -> ok; + {error, Reason} -> + error("post processing of boot script file ~s failed: ~w", + [ScriptFile, Reason]) + end, + case systools:script2boot(RootName) of + ok -> ok; + error -> error("failed to compile boot script file ~s", [ScriptFile]) end, halt(), ok. @@ -122,10 +132,10 @@ determine_version(App) -> assert_dir(Dir) -> case filelib:is_dir(Dir) of true -> ok; - false -> - ok = filelib:ensure_dir(Dir), - ok = file:make_dir(Dir) + false -> ok = filelib:ensure_dir(Dir), + ok = file:make_dir(Dir) end. + delete_dir(Dir) -> case filelib:is_dir(Dir) of true -> @@ -143,6 +153,7 @@ delete_dir(Dir) -> false -> ok end. + is_symlink(Name) -> case file:read_link(Name) of {ok, _} -> true; @@ -185,14 +196,43 @@ expand_dependencies(Current, [Next|Rest]) -> expand_dependencies(Current, Rest); false -> case application:load(Next) of - ok -> + ok -> ok; - {error, {already_loaded, _}} -> + {error, {already_loaded, _}} -> ok; - X -> - throw({unknown_app, {Next, X}}) + {error, Reason} -> + throw({failed_to_load_app, Next, Reason}) end, {ok, Required} = application:get_key(Next, applications), Unique = [A || A <- Required, not(sets:is_element(A, Current))], expand_dependencies(sets:add_element(Next, Current), Rest ++ Unique) end. + +post_process_script(ScriptFile) -> + case file:consult(ScriptFile) of + {ok, [{script, Name, Entries}]} -> + NewEntries = process_entries(Entries), + case file:open(ScriptFile, [write]) of + {ok, Fd} -> + io:format(Fd, "%% script generated at ~w ~w~n~p.~n", + [date(), time(), {script, Name, NewEntries}]), + file:close(Fd), + ok; + {error, OReason} -> + {error, {failed_to_open_script_file_for_writing, OReason}} + end; + {error, Reason} -> + {error, {failed_to_load_script, Reason}} + end. + +process_entries([]) -> + []; +process_entries([Entry = {apply,{application,start_boot,[stdlib,permanent]}} | + Rest]) -> + [Entry, {apply,{rabbit,prepare,[]}} | Rest]; +process_entries([Entry|Rest]) -> + [Entry | process_entries(Rest)]. + +error(Fmt, Args) -> + io:format("ERROR: " ++ Fmt ++ "~n", Args), + halt(1). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 6a7d68084d..5816ba109b 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -49,7 +49,6 @@ -define(HANDSHAKE_TIMEOUT, 10). -define(NORMAL_TIMEOUT, 3). -define(CLOSING_TIMEOUT, 1). --define(CHANNEL_CLOSING_TIMEOUT, 1). -define(CHANNEL_TERMINATION_TIMEOUT, 3). %--------------------------------------------------------------------------- @@ -94,23 +93,19 @@ %% -> log error, wait for channels to terminate forcefully, start %% terminate_connection timer, send close, *closed* %% channel exit with soft error -%% -> log error, start terminate_channel timer, mark channel as -%% closing, *running* -%% terminate_channel timeout -> remove 'closing' mark, *running* +%% -> log error, mark channel as closing, *running* %% handshake_timeout -> ignore, *running* %% heartbeat timeout -> *throw* %% closing: %% socket close -> *terminate* %% receive frame -> ignore, *closing* -%% terminate_channel timeout -> remove 'closing' mark, *closing* %% handshake_timeout -> ignore, *closing* %% heartbeat timeout -> *throw* %% channel exit with hard error %% -> log error, wait for channels to terminate forcefully, start %% terminate_connection timer, send close, *closed* %% channel exit with soft error -%% -> log error, start terminate_channel timer, mark channel as -%% closing +%% -> log error, mark channel as closing %% if last channel to exit then send connection.close_ok, %% start terminate_connection timer, *closed* %% else *closing* @@ -123,7 +118,6 @@ %% *closed* %% receive frame -> ignore, *closed* %% terminate_connection timeout -> *terminate* -%% terminate_channel timeout -> remove 'closing' mark, *closed* %% handshake_timeout -> ignore, *closed* %% heartbeat timeout -> *throw* %% channel exit -> log error, *closed* @@ -200,7 +194,7 @@ inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). peername(Sock) -> try - {Address, Port} = inet_op(fun () -> inet:peername(Sock) end), + {Address, Port} = inet_op(fun () -> rabbit_net:peername(Sock) end), AddressS = inet_parse:ntoa(Address), {AddressS, Port} catch @@ -269,12 +263,10 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> throw({inet_error, Reason}); {'EXIT', Parent, Reason} -> if State#v1.connection_state =:= running -> - send_exception( - State, 0, - {amqp, connection_forced, - io_lib:format( - "broker forced connection closure with reason '~w'", - [Reason]), none}); + send_exception(State, 0, + rabbit_misc:amqp_error(connection_forced, + "broker forced connection closure with reason '~w'", + [Reason], none)); true -> ok end, %% this is what we are expected to do according to @@ -286,14 +278,12 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> %% since this termination is initiated by our parent it is %% probably more important to exit quickly. exit(Reason); - {'EXIT', _Pid, E = {writer, send_failed, _Error}} -> + {channel_exit, _Chan, E = {writer, send_failed, _Error}} -> throw(E); {channel_exit, Channel, Reason} -> mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State)); {'EXIT', Pid, Reason} -> mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State)); - {terminate_channel, Channel, Ref1} -> - mainloop(Parent, Deb, terminate_channel(Channel, Ref1, State)); terminate_connection -> State; handshake_timeout -> @@ -323,8 +313,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> end. switch_callback(OldState, NewCallback, Length) -> - Ref = inet_op(fun () -> prim_inet:async_recv( - OldState#v1.sock, Length, -1) end), + Ref = inet_op(fun () -> rabbit_net:async_recv( + OldState#v1.sock, Length, infinity) end), OldState#v1{callback = NewCallback, recv_ref = Ref}. @@ -341,32 +331,14 @@ close_connection(State = #v1{connection = #connection{ State#v1{connection_state = closed}. close_channel(Channel, State) -> - Ref = make_ref(), - TRef = erlang:send_after(1000 * ?CHANNEL_CLOSING_TIMEOUT, - self(), - {terminate_channel, Channel, Ref}), - put({closing_channel, Channel}, {Ref, TRef}), - State. - -terminate_channel(Channel, Ref, State) -> - case get({closing_channel, Channel}) of - undefined -> ok; %% got close_ok in the meantime - {Ref, _} -> erase({closing_channel, Channel}), - ok; - {_Ref, _} -> ok %% got close_ok, and have new closing channel - end, + put({channel, Channel}, closing), State. handle_channel_exit(Channel, Reason, State) -> - %% We remove the channel from the inbound map only. That allows - %% the channel to be re-opened, but also means the remaining - %% cleanup, including possibly closing the connection, is deferred - %% until we get the (normal) exit signal. - erase({channel, Channel}), handle_exception(State, Channel, Reason). handle_dependent_exit(Pid, normal, State) -> - channel_cleanup(Pid), + erase({chpid, Pid}), maybe_close(State); handle_dependent_exit(Pid, Reason, State) -> case channel_cleanup(Pid) of @@ -376,17 +348,10 @@ handle_dependent_exit(Pid, Reason, State) -> channel_cleanup(Pid) -> case get({chpid, Pid}) of - undefined -> - case get({closing_chpid, Pid}) of - undefined -> undefined; - {channel, Channel} -> - erase({closing_chpid, Pid}), - Channel - end; - {channel, Channel} -> - erase({channel, Channel}), - erase({chpid, Pid}), - Channel + undefined -> undefined; + {channel, Channel} -> erase({channel, Channel}), + erase({chpid, Pid}), + Channel end. all_channels() -> [Pid || {{chpid, Pid},_} <- get()]. @@ -451,7 +416,7 @@ handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS}) State; handle_frame(Type, 0, Payload, State) -> case analyze_frame(Type, Payload) of - error -> throw({unknown_frame, Type, Payload}); + error -> throw({unknown_frame, 0, Type, Payload}); heartbeat -> State; {method, MethodName, FieldsBin} -> handle_method0(MethodName, FieldsBin, State); @@ -459,19 +424,33 @@ handle_frame(Type, 0, Payload, State) -> end; handle_frame(Type, Channel, Payload, State) -> case analyze_frame(Type, Payload) of - error -> throw({unknown_frame, Type, Payload}); + error -> throw({unknown_frame, Channel, Type, Payload}); heartbeat -> throw({unexpected_heartbeat_frame, Channel}); AnalyzedFrame -> %%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]), case get({channel, Channel}) of {chpid, ChPid} -> - ok = check_for_close(Channel, ChPid, AnalyzedFrame), + case AnalyzedFrame of + {method, 'channel.close', _} -> + erase({channel, Channel}); + _ -> ok + end, ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame), State; + closing -> + %% According to the spec, after sending a + %% channel.close we must ignore all frames except + %% channel.close_ok. + case AnalyzedFrame of + {method, 'channel.close_ok', _} -> + erase({channel, Channel}); + _ -> ok + end, + State; undefined -> case State#v1.connection_state of - running -> send_to_new_channel( - Channel, AnalyzedFrame, State), + running -> ok = send_to_new_channel( + Channel, AnalyzedFrame, State), State; Other -> throw({channel_frame_while_starting, Channel, Other, AnalyzedFrame}) @@ -510,9 +489,8 @@ handle_input(handshake, <<"AMQP",0,ProtocolMajor,ProtocolMinor,ProtocolRevision> handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>, State) -> %% 0-8 and 0-9 style protocol header. check_protocol_header(ProtocolMajor, ProtocolMinor, 0, State); - handle_input(handshake, Other, #v1{sock = Sock}) -> - ok = inet_op(fun () -> gen_tcp:send( + ok = inet_op(fun () -> rabbit_net:send( Sock, <<"AMQP",1,1, ?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR>>) end), @@ -570,17 +548,17 @@ handle_method0(MethodName, FieldsBin, State) -> MethodName, FieldsBin), State) catch exit:Reason -> - CompleteReason = - case Reason of - {amqp, Error, Explanation, none} -> - {amqp, Error, Explanation, MethodName}; - OtherReason -> OtherReason - end, + CompleteReason = case Reason of + #amqp_error{method = none} -> + Reason#amqp_error{method = MethodName}; + OtherReason -> OtherReason + end, case State#v1.connection_state of running -> send_exception(State, 0, CompleteReason); Other -> throw({channel0_error, Other, CompleteReason}) end end. + handle_method0(#'connection.start_ok'{mechanism = Mechanism, response = Response}, State = #v1{connection_state = starting, @@ -646,23 +624,23 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, #v1{}) -> self(); i(address, #v1{sock = Sock}) -> - {ok, {A, _}} = inet:sockname(Sock), + {ok, {A, _}} = rabbit_net:sockname(Sock), A; i(port, #v1{sock = Sock}) -> - {ok, {_, P}} = inet:sockname(Sock), + {ok, {_, P}} = rabbit_net:sockname(Sock), P; i(peer_address, #v1{sock = Sock}) -> - {ok, {A, _}} = inet:peername(Sock), + {ok, {A, _}} = rabbit_net:peername(Sock), A; i(peer_port, #v1{sock = Sock}) -> - {ok, {_, P}} = inet:peername(Sock), + {ok, {_, P}} = rabbit_net:peername(Sock), P; i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct; SockStat =:= recv_cnt; SockStat =:= send_oct; SockStat =:= send_cnt; SockStat =:= send_pend -> - case inet:getstat(Sock, [SockStat]) of + case rabbit_net:getstat(Sock, [SockStat]) of {ok, [{SockStat, StatVal}]} -> StatVal; {error, einval} -> undefined; {error, Error} -> throw({cannot_get_socket_stats, Error}) @@ -674,7 +652,7 @@ i(channels, #v1{}) -> i(user, #v1{connection = #connection{user = #user{username = Username}}}) -> Username; i(user, #v1{connection = #connection{user = none}}) -> - none; + ''; i(vhost, #v1{connection = #connection{vhost = VHost}}) -> VHost; i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) -> @@ -687,38 +665,17 @@ i(Item, #v1{}) -> %%-------------------------------------------------------------------------- send_to_new_channel(Channel, AnalyzedFrame, State) -> - case get({closing_channel, Channel}) of - undefined -> - #v1{sock = Sock, - connection = #connection{ - frame_max = FrameMax, - user = #user{username = Username}, - vhost = VHost}} = State, - WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), - ChPid = rabbit_framing_channel:start_link( - fun rabbit_channel:start_link/5, - [Channel, self(), WriterPid, Username, VHost]), - put({channel, Channel}, {chpid, ChPid}), - put({chpid, ChPid}, {channel, Channel}), - ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame); - {_, TRef} -> - %% According to the spec, after sending a channel.close we - %% must ignore all frames except channel.close_ok. - case AnalyzedFrame of - {method, 'channel.close_ok', _} -> - erlang:cancel_timer(TRef), - erase({closing_channel, Channel}), - ok; - _Other -> ok - end - end. - -check_for_close(Channel, ChPid, {method, 'channel.close', _}) -> - channel_cleanup(ChPid), - put({closing_chpid, ChPid}, {channel, Channel}), - ok; -check_for_close(_Channel, _ChPid, _Frame) -> - ok. + #v1{sock = Sock, connection = #connection{ + frame_max = FrameMax, + user = #user{username = Username}, + vhost = VHost}} = State, + WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), + ChPid = rabbit_framing_channel:start_link( + fun rabbit_channel:start_link/5, + [Channel, self(), WriterPid, Username, VHost]), + put({channel, Channel}, {chpid, ChPid}), + put({chpid, ChPid}, {channel, Channel}), + ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame). log_channel_error(ConnectionState, Channel, Reason) -> rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n", @@ -764,18 +721,27 @@ map_exception(Channel, Reason) -> end, {ShouldClose, CloseChannel, CloseMethod}. -lookup_amqp_exception({amqp, {ShouldClose, Code, Text}, Expl, Method}) -> - ExplBin = list_to_binary(Expl), - CompleteTextBin = <<Text/binary, " - ", ExplBin/binary>>, - SafeTextBin = if size(CompleteTextBin) > 255 -> - <<CompleteTextBin:252/binary, "...">>; - true -> - CompleteTextBin - end, - {ShouldClose, Code, SafeTextBin, Method}; -lookup_amqp_exception({amqp, ErrorName, Expl, Method}) -> - Details = rabbit_framing:lookup_amqp_exception(ErrorName), - lookup_amqp_exception({amqp, Details, Expl, Method}); +%% FIXME: this clause can go when we move to AMQP spec >=8.1 +lookup_amqp_exception(#amqp_error{name = precondition_failed, + explanation = Expl, + method = Method}) -> + ExplBin = amqp_exception_explanation(<<"PRECONDITION_FAILED">>, Expl), + {false, 406, ExplBin, Method}; +lookup_amqp_exception(#amqp_error{name = Name, + explanation = Expl, + method = Method}) -> + {ShouldClose, Code, Text} = rabbit_framing:lookup_amqp_exception(Name), + ExplBin = amqp_exception_explanation(Text, Expl), + {ShouldClose, Code, ExplBin, Method}; lookup_amqp_exception(Other) -> rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]), - {true, ?INTERNAL_ERROR, <<"INTERNAL_ERROR">>, none}. + {ShouldClose, Code, Text} = + rabbit_framing:lookup_amqp_exception(internal_error), + {ShouldClose, Code, Text, none}. + +amqp_exception_explanation(Text, Expl) -> + ExplBin = list_to_binary(Expl), + CompleteTextBin = <<Text/binary, " - ", ExplBin/binary>>, + if size(CompleteTextBin) > 255 -> <<CompleteTextBin:252/binary, "...">>; + true -> CompleteTextBin + end. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index e5100ccd16..5c5c55f1e4 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -49,6 +49,7 @@ test_content_prop_roundtrip(Datum, Binary) -> all_tests() -> passed = test_priority_queue(), + passed = test_unfold(), passed = test_parsing(), passed = test_topic_matching(), passed = test_log_management(), @@ -75,7 +76,8 @@ test_priority_queue() -> %% 1-element priority Q Q1 = priority_queue:in(foo, 1, priority_queue:new()), - {true, false, 1, [{1, foo}], [foo]} = test_priority_queue(Q1), + {true, false, 1, [{1, foo}], [foo]} = + test_priority_queue(Q1), %% 2-element same-priority Q Q2 = priority_queue:in(bar, 1, Q1), @@ -91,6 +93,71 @@ test_priority_queue() -> Q4 = priority_queue:in(foo, -1, priority_queue:new()), {true, false, 1, [{-1, foo}], [foo]} = test_priority_queue(Q4), + %% merge 2 * 1-element no-priority Qs + Q5 = priority_queue:join(priority_queue:in(foo, Q), + priority_queue:in(bar, Q)), + {true, false, 2, [{0, foo}, {0, bar}], [foo, bar]} = + test_priority_queue(Q5), + + %% merge 1-element no-priority Q with 1-element priority Q + Q6 = priority_queue:join(priority_queue:in(foo, Q), + priority_queue:in(bar, 1, Q)), + {true, false, 2, [{1, bar}, {0, foo}], [bar, foo]} = + test_priority_queue(Q6), + + %% merge 1-element priority Q with 1-element no-priority Q + Q7 = priority_queue:join(priority_queue:in(foo, 1, Q), + priority_queue:in(bar, Q)), + {true, false, 2, [{1, foo}, {0, bar}], [foo, bar]} = + test_priority_queue(Q7), + + %% merge 2 * 1-element same-priority Qs + Q8 = priority_queue:join(priority_queue:in(foo, 1, Q), + priority_queue:in(bar, 1, Q)), + {true, false, 2, [{1, foo}, {1, bar}], [foo, bar]} = + test_priority_queue(Q8), + + %% merge 2 * 1-element different-priority Qs + Q9 = priority_queue:join(priority_queue:in(foo, 1, Q), + priority_queue:in(bar, 2, Q)), + {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} = + test_priority_queue(Q9), + + %% merge 2 * 1-element different-priority Qs (other way around) + Q10 = priority_queue:join(priority_queue:in(bar, 2, Q), + priority_queue:in(foo, 1, Q)), + {true, false, 2, [{2, bar}, {1, foo}], [bar, foo]} = + test_priority_queue(Q10), + + %% merge 2 * 2-element multi-different-priority Qs + Q11 = priority_queue:join(Q6, Q5), + {true, false, 4, [{1, bar}, {0, foo}, {0, foo}, {0, bar}], + [bar, foo, foo, bar]} = test_priority_queue(Q11), + + %% and the other way around + Q12 = priority_queue:join(Q5, Q6), + {true, false, 4, [{1, bar}, {0, foo}, {0, bar}, {0, foo}], + [bar, foo, bar, foo]} = test_priority_queue(Q12), + + %% merge with negative priorities + Q13 = priority_queue:join(Q4, Q5), + {true, false, 3, [{0, foo}, {0, bar}, {-1, foo}], [foo, bar, foo]} = + test_priority_queue(Q13), + + %% and the other way around + Q14 = priority_queue:join(Q5, Q4), + {true, false, 3, [{0, foo}, {0, bar}, {-1, foo}], [foo, bar, foo]} = + test_priority_queue(Q14), + + %% joins with empty queues: + Q1 = priority_queue:join(Q, Q1), + Q1 = priority_queue:join(Q1, Q), + + %% insert with priority into non-empty zero-priority queue + Q15 = priority_queue:in(baz, 1, Q5), + {true, false, 3, [{1, baz}, {0, foo}, {0, bar}], [baz, foo, bar]} = + test_priority_queue(Q15), + passed. priority_queue_in_all(Q, L) -> @@ -116,6 +183,14 @@ test_simple_n_element_queue(N) -> {true, false, N, ToListRes, Items} = test_priority_queue(Q), passed. +test_unfold() -> + {[], test} = rabbit_misc:unfold(fun (_V) -> false end, test), + List = lists:seq(2,20,2), + {List, 0} = rabbit_misc:unfold(fun (0) -> false; + (N) -> {true, N*2, N-1} + end, 10), + passed. + test_parsing() -> passed = test_content_properties(), passed. @@ -408,19 +483,17 @@ test_cluster_management() -> end, ClusteringSequence), - %% attempt to convert a disk node into a ram node + %% convert a disk node into a ram node ok = control_action(reset, []), ok = control_action(start_app, []), ok = control_action(stop_app, []), - {error, {cannot_convert_disk_node_to_ram_node, _}} = - control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), + ok = control_action(cluster, ["invalid1@invalid", + "invalid2@invalid"]), - %% attempt to join a non-existing cluster as a ram node + %% join a non-existing cluster as a ram node ok = control_action(reset, []), - {error, {unable_to_contact_cluster_nodes, _}} = - control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), + ok = control_action(cluster, ["invalid1@invalid", + "invalid2@invalid"]), SecondaryNode = rabbit_misc:localnode(hare), case net_adm:ping(SecondaryNode) of @@ -436,11 +509,12 @@ test_cluster_management2(SecondaryNode) -> NodeS = atom_to_list(node()), SecondaryNodeS = atom_to_list(SecondaryNode), - %% attempt to convert a disk node into a ram node + %% make a disk node ok = control_action(reset, []), ok = control_action(cluster, [NodeS]), - {error, {unable_to_join_cluster, _, _}} = - control_action(cluster, [SecondaryNodeS]), + %% make a ram node + ok = control_action(reset, []), + ok = control_action(cluster, [SecondaryNodeS]), %% join cluster as a ram node ok = control_action(reset, []), @@ -453,21 +527,21 @@ test_cluster_management2(SecondaryNode) -> ok = control_action(start_app, []), ok = control_action(stop_app, []), - %% attempt to join non-existing cluster as a ram node - {error, _} = control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), - + %% join non-existing cluster as a ram node + ok = control_action(cluster, ["invalid1@invalid", + "invalid2@invalid"]), %% turn ram node into disk node + ok = control_action(reset, []), ok = control_action(cluster, [SecondaryNodeS, NodeS]), ok = control_action(start_app, []), ok = control_action(stop_app, []), - %% attempt to convert a disk node into a ram node - {error, {cannot_convert_disk_node_to_ram_node, _}} = - control_action(cluster, ["invalid1@invalid", - "invalid2@invalid"]), + %% convert a disk node into a ram node + ok = control_action(cluster, ["invalid1@invalid", + "invalid2@invalid"]), %% turn a disk node into a ram node + ok = control_action(reset, []), ok = control_action(cluster, [SecondaryNodeS]), ok = control_action(start_app, []), ok = control_action(stop_app, []), @@ -592,7 +666,10 @@ test_server_status() -> {ok, _} = rabbit_amqqueue:delete(Q, false, false), %% list connections - [#listener{host = H, port = P} | _] = rabbit_networking:active_listeners(), + [#listener{host = H, port = P} | _] = + [L || L = #listener{node = N} <- rabbit_networking:active_listeners(), + N =:= node()], + {ok, C} = gen_tcp:connect(H, P, []), timer:sleep(100), ok = info_action( diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index e338ddfe9d..1679ce7c15 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -169,7 +169,7 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax) -> tcp_send(Sock, Data) -> rabbit_misc:throw_on_error(inet_error, - fun () -> gen_tcp:send(Sock, Data) end). + fun () -> rabbit_net:send(Sock, Data) end). internal_send_command(Sock, Channel, MethodRecord) -> ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord)). @@ -206,6 +206,6 @@ internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) -> ok. port_cmd(Sock, Data) -> - try erlang:port_command(Sock, Data) + try rabbit_net:port_command(Sock, Data) catch error:Error -> exit({writer, send_failed, Error}) end. diff --git a/src/tcp_acceptor.erl b/src/tcp_acceptor.erl index aa8b8ad5a7..bc7425613f 100644 --- a/src/tcp_acceptor.erl +++ b/src/tcp_acceptor.erl @@ -67,15 +67,20 @@ handle_info({inet_async, LSock, Ref, {ok, Sock}}, {ok, Mod} = inet_db:lookup_socket(LSock), inet_db:register_socket(Sock, Mod), - %% report - {ok, {Address, Port}} = inet:sockname(LSock), - {ok, {PeerAddress, PeerPort}} = inet:peername(Sock), - error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n", - [inet_parse:ntoa(Address), Port, - inet_parse:ntoa(PeerAddress), PeerPort]), - - %% handle - apply(M, F, A ++ [Sock]), + try + %% report + {Address, Port} = inet_op(fun () -> inet:sockname(LSock) end), + {PeerAddress, PeerPort} = inet_op(fun () -> inet:peername(Sock) end), + error_logger:info_msg("accepted TCP connection on ~s:~p from ~s:~p~n", + [inet_parse:ntoa(Address), Port, + inet_parse:ntoa(PeerAddress), PeerPort]), + %% handle + apply(M, F, A ++ [Sock]) + catch {inet_error, Reason} -> + gen_tcp:close(Sock), + error_logger:error_msg("unable to accept TCP connection: ~p~n", + [Reason]) + end, %% accept more case prim_inet:async_accept(LSock, -1) of @@ -95,3 +100,7 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. + +%%-------------------------------------------------------------------- + +inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F). diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl index 92a47cf127..4a2e149bb8 100644 --- a/src/tcp_listener.erl +++ b/src/tcp_listener.erl @@ -33,28 +33,28 @@ -behaviour(gen_server). --export([start_link/7]). +-export([start_link/8]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {sock, on_startup, on_shutdown}). +-record(state, {sock, on_startup, on_shutdown, label}). %%-------------------------------------------------------------------- start_link(IPAddress, Port, SocketOpts, ConcurrentAcceptorCount, AcceptorSup, - OnStartup, OnShutdown) -> + OnStartup, OnShutdown, Label) -> gen_server:start_link( ?MODULE, {IPAddress, Port, SocketOpts, ConcurrentAcceptorCount, AcceptorSup, - OnStartup, OnShutdown}, []). + OnStartup, OnShutdown, Label}, []). %%-------------------------------------------------------------------- init({IPAddress, Port, SocketOpts, ConcurrentAcceptorCount, AcceptorSup, - {M,F,A} = OnStartup, OnShutdown}) -> + {M,F,A} = OnStartup, OnShutdown, Label}) -> process_flag(trap_exit, true), case gen_tcp:listen(Port, SocketOpts ++ [{ip, IPAddress}, {active, false}]) of @@ -65,15 +65,16 @@ init({IPAddress, Port, SocketOpts, end, lists:duplicate(ConcurrentAcceptorCount, dummy)), {ok, {LIPAddress, LPort}} = inet:sockname(LSock), - error_logger:info_msg("started TCP listener on ~s:~p~n", - [inet_parse:ntoa(LIPAddress), LPort]), + error_logger:info_msg("started ~s on ~s:~p~n", + [Label, inet_parse:ntoa(LIPAddress), LPort]), apply(M, F, A ++ [IPAddress, Port]), - {ok, #state{sock=LSock, - on_startup = OnStartup, on_shutdown = OnShutdown}}; + {ok, #state{sock = LSock, + on_startup = OnStartup, on_shutdown = OnShutdown, + label = Label}}; {error, Reason} -> error_logger:error_msg( - "failed to start TCP listener on ~s:~p - ~p~n", - [inet_parse:ntoa(IPAddress), Port, Reason]), + "failed to start ~s on ~s:~p - ~p~n", + [Label, inet_parse:ntoa(IPAddress), Port, Reason]), {stop, {cannot_listen, IPAddress, Port, Reason}} end. @@ -86,11 +87,11 @@ handle_cast(_Msg, State) -> handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, #state{sock=LSock, on_shutdown = {M,F,A}}) -> +terminate(_Reason, #state{sock=LSock, on_shutdown = {M,F,A}, label=Label}) -> {ok, {IPAddress, Port}} = inet:sockname(LSock), gen_tcp:close(LSock), - error_logger:info_msg("stopped TCP listener on ~s:~p~n", - [inet_parse:ntoa(IPAddress), Port]), + error_logger:info_msg("stopped ~s on ~s:~p~n", + [Label, inet_parse:ntoa(IPAddress), Port]), apply(M, F, A ++ [IPAddress, Port]). code_change(_OldVsn, State, _Extra) -> diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl index 901a0da3b7..d6bbac080f 100644 --- a/src/tcp_listener_sup.erl +++ b/src/tcp_listener_sup.erl @@ -33,23 +33,23 @@ -behaviour(supervisor). --export([start_link/6, start_link/7]). +-export([start_link/7, start_link/8]). -export([init/1]). start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown, - AcceptCallback) -> + AcceptCallback, Label) -> start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown, - AcceptCallback, 1). + AcceptCallback, 1, Label). start_link(IPAddress, Port, SocketOpts, OnStartup, OnShutdown, - AcceptCallback, ConcurrentAcceptorCount) -> + AcceptCallback, ConcurrentAcceptorCount, Label) -> supervisor:start_link( ?MODULE, {IPAddress, Port, SocketOpts, OnStartup, OnShutdown, - AcceptCallback, ConcurrentAcceptorCount}). + AcceptCallback, ConcurrentAcceptorCount, Label}). init({IPAddress, Port, SocketOpts, OnStartup, OnShutdown, - AcceptCallback, ConcurrentAcceptorCount}) -> + AcceptCallback, ConcurrentAcceptorCount, Label}) -> %% This is gross. The tcp_listener needs to know about the %% tcp_acceptor_sup, and the only way I can think of accomplishing %% that without jumping through hoops is to register the @@ -62,5 +62,5 @@ init({IPAddress, Port, SocketOpts, OnStartup, OnShutdown, {tcp_listener, {tcp_listener, start_link, [IPAddress, Port, SocketOpts, ConcurrentAcceptorCount, Name, - OnStartup, OnShutdown]}, + OnStartup, OnShutdown, Label]}, transient, 100, worker, [tcp_listener]}]}}. |
