diff options
| author | Vlad Ionescu <vlad@lshift.net> | 2009-09-22 16:49:00 +0100 |
|---|---|---|
| committer | Vlad Ionescu <vlad@lshift.net> | 2009-09-22 16:49:00 +0100 |
| commit | ed8b9f138f30749a3e38600f46284a8408c8bb33 (patch) | |
| tree | 29e987b255bb1a4897a7cdcf794bdd1d535412d0 /src | |
| parent | 2bb52b060f3f0f8e0c645927bcf89f67780d27b7 (diff) | |
| parent | 46bb73b94cfb4bb044096ae75ec5234c46ada561 (diff) | |
| download | rabbitmq-server-git-ed8b9f138f30749a3e38600f46284a8408c8bb33.tar.gz | |
merging from default
Diffstat (limited to 'src')
| -rw-r--r-- | src/gen_server2.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 31 | ||||
| -rw-r--r-- | src/rabbit_load.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_multi.erl | 35 | ||||
| -rw-r--r-- | src/rabbit_plugin_activator.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 118 |
7 files changed, 99 insertions, 120 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 36fb4fa8c3..a2d9350c4e 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -437,7 +437,10 @@ unregister_name({local,Name}) -> unregister_name({global,Name}) -> _ = global:unregister_name(Name); unregister_name(Pid) when is_pid(Pid) -> - Pid. + Pid; +% Under R12 let's just ignore it, as we have a single term as Name. +% On R13 it will never get here, as we get tuple with 'local/global' atom. +unregister_name(_Name) -> ok. extend_backoff(undefined) -> undefined; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 16b7c938ca..1285064f43 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -126,7 +126,7 @@ handle_cast({method, Method, Content}, State) -> {stop, normal, State#ch{state = terminating}} catch exit:{amqp, Error, Explanation, none} -> - ok = notify_queues(internal_rollback(State)), + ok = rollback_and_notify(State), Reason = {amqp, Error, Explanation, rabbit_misc:method_record_type(Method)}, State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason}, @@ -175,7 +175,7 @@ terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid, terminate(Reason, State = #ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) -> - Res = notify_queues(internal_rollback(State)), + Res = rollback_and_notify(State), case Reason of normal -> ok = Res; _ -> ok @@ -297,7 +297,7 @@ handle_method(_Method, _, #ch{state = starting}) -> rabbit_misc:protocol_error(channel_error, "expected 'channel.open'", []); handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) -> - ok = notify_queues(internal_rollback(State)), + ok = rollback_and_notify(State), ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}), stop; @@ -872,6 +872,11 @@ internal_rollback(State = #ch{transaction_id = TxnKey, internal_error, "rollback failed: ~w", [Errors]) end. +rollback_and_notify(State = #ch{transaction_id = none}) -> + notify_queues(State); +rollback_and_notify(State) -> + notify_queues(internal_rollback(State)). + fold_per_queue(F, Acc0, UAQ) -> D = lists:foldl( fun ({_DTag, _CTag, diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index cf20520eb0..69e91803d6 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -164,7 +164,7 @@ exchange name, routing key, queue name and arguments, in that order. <ConnectionInfoItem> must be a member of the list [node, address, port, peer_address, peer_port, state, channels, user, vhost, timeout, frame_max, recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. The default is to display -user, peer_address and peer_port. +user, peer_address, peer_port and state. "), halt(1). @@ -270,8 +270,9 @@ action(list_bindings, Node, Args, Inform) -> action(list_connections, Node, Args, Inform) -> Inform("Listing connections", []), - ArgAtoms = list_replace(node, pid, - default_if_empty(Args, [user, peer_address, peer_port])), + ArgAtoms = list_replace(node, pid, + default_if_empty(Args, [user, peer_address, + peer_port, state])), display_info_list(rpc_call(Node, rabbit_networking, connection_info_all, [ArgAtoms]), ArgAtoms); @@ -314,7 +315,7 @@ default_if_empty(List, Default) when is_list(List) -> end. display_info_list(Results, InfoItemKeys) when is_list(Results) -> - lists:foreach(fun (Result) -> display_row([format_info_item(Result, X) || + lists:foreach(fun (Result) -> display_row([format_info_item(X, Result) || X <- InfoItemKeys]) end, Results), ok; @@ -325,18 +326,20 @@ display_row(Row) -> io:fwrite(lists:flatten(rabbit_misc:intersperse("\t", Row))), io:nl(). -format_info_item(Items, Key) -> - {value, Info = {Key, Value}} = lists:keysearch(Key, 1, Items), - case Info of - {_, #resource{name = Name}} -> +format_info_item(Key, Items) -> + case proplists:get_value(Key, Items) of + #resource{name = Name} -> escape(Name); - _ when Key =:= address; Key =:= peer_address andalso is_tuple(Value) -> + Value when Key =:= address; Key =:= peer_address andalso + is_tuple(Value) -> inet_parse:ntoa(Value); - _ when is_pid(Value) -> + Value when is_pid(Value) -> atom_to_list(node(Value)); - _ when is_binary(Value) -> + Value when is_binary(Value) -> escape(Value); - _ -> + Value when is_atom(Value) -> + escape(atom_to_list(Value)); + Value -> io_lib:format("~w", [Value]) end. @@ -362,7 +365,9 @@ rpc_call(Node, Mod, Fun, Args) -> %% form part of UTF-8 strings. escape(Bin) when binary(Bin) -> - escape_char(lists:reverse(binary_to_list(Bin)), []). + escape(binary_to_list(Bin)); +escape(L) when is_list(L) -> + escape_char(lists:reverse(L), []). escape_char([$\\ | T], Acc) -> escape_char(T, [$\\, $\\ | Acc]); diff --git a/src/rabbit_load.erl b/src/rabbit_load.erl index 7bf85347fb..6ef638cb59 100644 --- a/src/rabbit_load.erl +++ b/src/rabbit_load.erl @@ -41,7 +41,7 @@ -ifdef(use_specs). -type(erlang_node() :: atom()). --type(load() :: {{non_neg_integer(), float()}, erlang_node()}). +-type(load() :: {{non_neg_integer(), integer() | 'unknown'}, erlang_node()}). -spec(local_load/0 :: () -> load()). -spec(remote_loads/0 :: () -> [load()]). -spec(pick/0 :: () -> erlang_node()). @@ -52,8 +52,11 @@ local_load() -> LoadAvg = case whereis(cpu_sup) of - undefined -> 0.0; - _Other -> cpu_sup:avg1() + undefined -> unknown; + _ -> case cpu_sup:avg1() of + L when is_integer(L) -> L; + {error, timeout} -> unknown + end end, {{statistics(run_queue), LoadAvg}, node()}. @@ -65,8 +68,12 @@ remote_loads() -> pick() -> RemoteLoads = remote_loads(), {{RunQ, LoadAvg}, Node} = local_load(), - %% add bias towards current node - AdjustedLoadAvg = LoadAvg * ?FUDGE_FACTOR, + %% add bias towards current node; we rely on Erlang's term order + %% of SomeFloat < local_unknown < unknown. + AdjustedLoadAvg = case LoadAvg of + unknown -> local_unknown; + _ -> LoadAvg * ?FUDGE_FACTOR + end, Loads = [{{RunQ, AdjustedLoadAvg}, Node} | RemoteLoads], {_, SelectedNode} = lists:min(Loads), SelectedNode. diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index d91975359a..b1cc4d028f 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -114,12 +114,13 @@ action(status, [], RpcTimeout) -> io:format("Status of all running nodes...~n", []), call_all_nodes( fun({Node, Pid}) -> - Status = rpc:call(Node, rabbit, status, [], RpcTimeout), + RabbitRunning = + case is_rabbit_running(Node, RpcTimeout) of + false -> not_running; + true -> running + end, io:format("Node '~p' with Pid ~p: ~p~n", - [Node, Pid, case parse_status(Status) of - false -> not_running; - true -> running - end]) + [Node, Pid, RabbitRunning]) end); action(stop_all, [], RpcTimeout) -> @@ -197,7 +198,7 @@ start_node(NodeName, NodePort, RpcTimeout) -> wait_for_rabbit_to_start(_ , RpcTimeout, _) when RpcTimeout < 0 -> false; wait_for_rabbit_to_start(Node, RpcTimeout, Port) -> - case parse_status(rpc:call(Node, rabbit, status, [])) of + case is_rabbit_running(Node, RpcTimeout) of true -> true; false -> receive {'EXIT', Port, PosixCode} -> @@ -211,22 +212,20 @@ wait_for_rabbit_to_start(Node, RpcTimeout, Port) -> run_cmd(FullPath) -> erlang:open_port({spawn, FullPath}, [nouse_stdio]). -parse_status({badrpc, _}) -> - false; - -parse_status(Status) -> - case lists:keysearch(running_applications, 1, Status) of - {value, {running_applications, Apps}} -> - lists:keymember(rabbit, 1, Apps); - _ -> - false +is_rabbit_running(Node, RpcTimeout) -> + case rpc:call(Node, rabbit, status, [], RpcTimeout) of + {badrpc, _} -> false; + Status -> case proplists:get_value(running_applications, Status) of + undefined -> false; + Apps -> lists:keymember(rabbit, 1, Apps) + end end. with_os(Handlers) -> {OsFamily, _} = os:type(), - case lists:keysearch(OsFamily, 1, Handlers) of - {value, {_, Handler}} -> Handler(); - false -> throw({unsupported_os, OsFamily}) + case proplists:get_value(OsFamily, Handlers) of + undefined -> throw({unsupported_os, OsFamily}); + Handler -> Handler() end. script_filename() -> diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl index 71278bfb2a..0206f73e9f 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_plugin_activator.erl @@ -68,7 +68,7 @@ start() -> AppList end, AppVersions = [determine_version(App) || App <- AllApps], - {value, {rabbit, RabbitVersion}} = lists:keysearch(rabbit, 1, AppVersions), + {rabbit, RabbitVersion} = proplists:lookup(rabbit, AppVersions), %% Build the overall release descriptor RDesc = {release, diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 69dbc008b3..beb5376164 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -49,7 +49,6 @@ -define(HANDSHAKE_TIMEOUT, 10). -define(NORMAL_TIMEOUT, 3). -define(CLOSING_TIMEOUT, 1). --define(CHANNEL_CLOSING_TIMEOUT, 1). -define(CHANNEL_TERMINATION_TIMEOUT, 3). %--------------------------------------------------------------------------- @@ -94,23 +93,19 @@ %% -> log error, wait for channels to terminate forcefully, start %% terminate_connection timer, send close, *closed* %% channel exit with soft error -%% -> log error, start terminate_channel timer, mark channel as -%% closing, *running* -%% terminate_channel timeout -> remove 'closing' mark, *running* +%% -> log error, mark channel as closing, *running* %% handshake_timeout -> ignore, *running* %% heartbeat timeout -> *throw* %% closing: %% socket close -> *terminate* %% receive frame -> ignore, *closing* -%% terminate_channel timeout -> remove 'closing' mark, *closing* %% handshake_timeout -> ignore, *closing* %% heartbeat timeout -> *throw* %% channel exit with hard error %% -> log error, wait for channels to terminate forcefully, start %% terminate_connection timer, send close, *closed* %% channel exit with soft error -%% -> log error, start terminate_channel timer, mark channel as -%% closing +%% -> log error, mark channel as closing %% if last channel to exit then send connection.close_ok, %% start terminate_connection timer, *closed* %% else *closing* @@ -123,7 +118,6 @@ %% *closed* %% receive frame -> ignore, *closed* %% terminate_connection timeout -> *terminate* -%% terminate_channel timeout -> remove 'closing' mark, *closed* %% handshake_timeout -> ignore, *closed* %% heartbeat timeout -> *throw* %% channel exit -> log error, *closed* @@ -292,8 +286,6 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State)); {'EXIT', Pid, Reason} -> mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State)); - {terminate_channel, Channel, Ref1} -> - mainloop(Parent, Deb, terminate_channel(Channel, Ref1, State)); terminate_connection -> State; handshake_timeout -> @@ -341,32 +333,14 @@ close_connection(State = #v1{connection = #connection{ State#v1{connection_state = closed}. close_channel(Channel, State) -> - Ref = make_ref(), - TRef = erlang:send_after(1000 * ?CHANNEL_CLOSING_TIMEOUT, - self(), - {terminate_channel, Channel, Ref}), - put({closing_channel, Channel}, {Ref, TRef}), - State. - -terminate_channel(Channel, Ref, State) -> - case get({closing_channel, Channel}) of - undefined -> ok; %% got close_ok in the meantime - {Ref, _} -> erase({closing_channel, Channel}), - ok; - {_Ref, _} -> ok %% got close_ok, and have new closing channel - end, + put({channel, Channel}, closing), State. handle_channel_exit(Channel, Reason, State) -> - %% We remove the channel from the inbound map only. That allows - %% the channel to be re-opened, but also means the remaining - %% cleanup, including possibly closing the connection, is deferred - %% until we get the (normal) exit signal. - erase({channel, Channel}), handle_exception(State, Channel, Reason). handle_dependent_exit(Pid, normal, State) -> - channel_cleanup(Pid), + erase({chpid, Pid}), maybe_close(State); handle_dependent_exit(Pid, Reason, State) -> case channel_cleanup(Pid) of @@ -376,17 +350,10 @@ handle_dependent_exit(Pid, Reason, State) -> channel_cleanup(Pid) -> case get({chpid, Pid}) of - undefined -> - case get({closing_chpid, Pid}) of - undefined -> undefined; - {channel, Channel} -> - erase({closing_chpid, Pid}), - Channel - end; - {channel, Channel} -> - erase({channel, Channel}), - erase({chpid, Pid}), - Channel + undefined -> undefined; + {channel, Channel} -> erase({channel, Channel}), + erase({chpid, Pid}), + Channel end. all_channels() -> [Pid || {{chpid, Pid},_} <- get()]. @@ -451,7 +418,7 @@ handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS}) State; handle_frame(Type, 0, Payload, State) -> case analyze_frame(Type, Payload) of - error -> throw({unknown_frame, Type, Payload}); + error -> throw({unknown_frame, 0, Type, Payload}); heartbeat -> State; trace -> State; {method, MethodName, FieldsBin} -> @@ -460,20 +427,34 @@ handle_frame(Type, 0, Payload, State) -> end; handle_frame(Type, Channel, Payload, State) -> case analyze_frame(Type, Payload) of - error -> throw({unknown_frame, Type, Payload}); + error -> throw({unknown_frame, Channel, Type, Payload}); heartbeat -> throw({unexpected_heartbeat_frame, Channel}); trace -> throw({unexpected_trace_frame, Channel}); AnalyzedFrame -> %%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]), case get({channel, Channel}) of {chpid, ChPid} -> - ok = check_for_close(Channel, ChPid, AnalyzedFrame), + case AnalyzedFrame of + {method, 'channel.close', _} -> + erase({channel, Channel}); + _ -> ok + end, ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame), State; + closing -> + %% According to the spec, after sending a + %% channel.close we must ignore all frames except + %% channel.close_ok. + case AnalyzedFrame of + {method, 'channel.close_ok', _} -> + erase({channel, Channel}); + _ -> ok + end, + State; undefined -> case State#v1.connection_state of - running -> send_to_new_channel( - Channel, AnalyzedFrame, State), + running -> ok = send_to_new_channel( + Channel, AnalyzedFrame, State), State; Other -> throw({channel_frame_while_starting, Channel, Other, AnalyzedFrame}) @@ -703,7 +684,7 @@ i(channels, #v1{}) -> i(user, #v1{connection = #connection{user = #user{username = Username}}}) -> Username; i(user, #v1{connection = #connection{user = none}}) -> - none; + ''; i(vhost, #v1{connection = #connection{vhost = VHost}}) -> VHost; i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) -> @@ -716,38 +697,17 @@ i(Item, #v1{}) -> %%-------------------------------------------------------------------------- send_to_new_channel(Channel, AnalyzedFrame, State) -> - case get({closing_channel, Channel}) of - undefined -> - #v1{sock = Sock, - connection = #connection{ - frame_max = FrameMax, - user = #user{username = Username}, - vhost = VHost}} = State, - WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), - ChPid = rabbit_framing_channel:start_link( - fun rabbit_channel:start_link/5, - [Channel, self(), WriterPid, Username, VHost]), - put({channel, Channel}, {chpid, ChPid}), - put({chpid, ChPid}, {channel, Channel}), - ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame); - {_, TRef} -> - %% According to the spec, after sending a channel.close we - %% must ignore all frames except channel.close_ok. - case AnalyzedFrame of - {method, 'channel.close_ok', _} -> - erlang:cancel_timer(TRef), - erase({closing_channel, Channel}), - ok; - _Other -> ok - end - end. - -check_for_close(Channel, ChPid, {method, 'channel.close', _}) -> - channel_cleanup(ChPid), - put({closing_chpid, ChPid}, {channel, Channel}), - ok; -check_for_close(_Channel, _ChPid, _Frame) -> - ok. + #v1{sock = Sock, connection = #connection{ + frame_max = FrameMax, + user = #user{username = Username}, + vhost = VHost}} = State, + WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), + ChPid = rabbit_framing_channel:start_link( + fun rabbit_channel:start_link/5, + [Channel, self(), WriterPid, Username, VHost]), + put({channel, Channel}, {chpid, ChPid}), + put({chpid, ChPid}, {channel, Channel}), + ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame). log_channel_error(ConnectionState, Channel, Reason) -> rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n", |
