summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorVlad Ionescu <vlad@lshift.net>2009-09-22 16:49:00 +0100
committerVlad Ionescu <vlad@lshift.net>2009-09-22 16:49:00 +0100
commited8b9f138f30749a3e38600f46284a8408c8bb33 (patch)
tree29e987b255bb1a4897a7cdcf794bdd1d535412d0 /src
parent2bb52b060f3f0f8e0c645927bcf89f67780d27b7 (diff)
parent46bb73b94cfb4bb044096ae75ec5234c46ada561 (diff)
downloadrabbitmq-server-git-ed8b9f138f30749a3e38600f46284a8408c8bb33.tar.gz
merging from default
Diffstat (limited to 'src')
-rw-r--r--src/gen_server2.erl5
-rw-r--r--src/rabbit_channel.erl11
-rw-r--r--src/rabbit_control.erl31
-rw-r--r--src/rabbit_load.erl17
-rw-r--r--src/rabbit_multi.erl35
-rw-r--r--src/rabbit_plugin_activator.erl2
-rw-r--r--src/rabbit_reader.erl118
7 files changed, 99 insertions, 120 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 36fb4fa8c3..a2d9350c4e 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -437,7 +437,10 @@ unregister_name({local,Name}) ->
unregister_name({global,Name}) ->
_ = global:unregister_name(Name);
unregister_name(Pid) when is_pid(Pid) ->
- Pid.
+ Pid;
+% Under R12 let's just ignore it, as we have a single term as Name.
+% On R13 it will never get here, as we get tuple with 'local/global' atom.
+unregister_name(_Name) -> ok.
extend_backoff(undefined) ->
undefined;
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 16b7c938ca..1285064f43 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -126,7 +126,7 @@ handle_cast({method, Method, Content}, State) ->
{stop, normal, State#ch{state = terminating}}
catch
exit:{amqp, Error, Explanation, none} ->
- ok = notify_queues(internal_rollback(State)),
+ ok = rollback_and_notify(State),
Reason = {amqp, Error, Explanation,
rabbit_misc:method_record_type(Method)},
State#ch.reader_pid ! {channel_exit, State#ch.channel, Reason},
@@ -175,7 +175,7 @@ terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid,
terminate(Reason, State = #ch{writer_pid = WriterPid,
limiter_pid = LimiterPid}) ->
- Res = notify_queues(internal_rollback(State)),
+ Res = rollback_and_notify(State),
case Reason of
normal -> ok = Res;
_ -> ok
@@ -297,7 +297,7 @@ handle_method(_Method, _, #ch{state = starting}) ->
rabbit_misc:protocol_error(channel_error, "expected 'channel.open'", []);
handle_method(#'channel.close'{}, _, State = #ch{writer_pid = WriterPid}) ->
- ok = notify_queues(internal_rollback(State)),
+ ok = rollback_and_notify(State),
ok = rabbit_writer:send_command(WriterPid, #'channel.close_ok'{}),
stop;
@@ -872,6 +872,11 @@ internal_rollback(State = #ch{transaction_id = TxnKey,
internal_error, "rollback failed: ~w", [Errors])
end.
+rollback_and_notify(State = #ch{transaction_id = none}) ->
+ notify_queues(State);
+rollback_and_notify(State) ->
+ notify_queues(internal_rollback(State)).
+
fold_per_queue(F, Acc0, UAQ) ->
D = lists:foldl(
fun ({_DTag, _CTag,
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index cf20520eb0..69e91803d6 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -164,7 +164,7 @@ exchange name, routing key, queue name and arguments, in that order.
<ConnectionInfoItem> must be a member of the list [node, address, port,
peer_address, peer_port, state, channels, user, vhost, timeout, frame_max,
recv_oct, recv_cnt, send_oct, send_cnt, send_pend]. The default is to display
-user, peer_address and peer_port.
+user, peer_address, peer_port and state.
"),
halt(1).
@@ -270,8 +270,9 @@ action(list_bindings, Node, Args, Inform) ->
action(list_connections, Node, Args, Inform) ->
Inform("Listing connections", []),
- ArgAtoms = list_replace(node, pid,
- default_if_empty(Args, [user, peer_address, peer_port])),
+ ArgAtoms = list_replace(node, pid,
+ default_if_empty(Args, [user, peer_address,
+ peer_port, state])),
display_info_list(rpc_call(Node, rabbit_networking, connection_info_all,
[ArgAtoms]),
ArgAtoms);
@@ -314,7 +315,7 @@ default_if_empty(List, Default) when is_list(List) ->
end.
display_info_list(Results, InfoItemKeys) when is_list(Results) ->
- lists:foreach(fun (Result) -> display_row([format_info_item(Result, X) ||
+ lists:foreach(fun (Result) -> display_row([format_info_item(X, Result) ||
X <- InfoItemKeys])
end, Results),
ok;
@@ -325,18 +326,20 @@ display_row(Row) ->
io:fwrite(lists:flatten(rabbit_misc:intersperse("\t", Row))),
io:nl().
-format_info_item(Items, Key) ->
- {value, Info = {Key, Value}} = lists:keysearch(Key, 1, Items),
- case Info of
- {_, #resource{name = Name}} ->
+format_info_item(Key, Items) ->
+ case proplists:get_value(Key, Items) of
+ #resource{name = Name} ->
escape(Name);
- _ when Key =:= address; Key =:= peer_address andalso is_tuple(Value) ->
+ Value when Key =:= address; Key =:= peer_address andalso
+ is_tuple(Value) ->
inet_parse:ntoa(Value);
- _ when is_pid(Value) ->
+ Value when is_pid(Value) ->
atom_to_list(node(Value));
- _ when is_binary(Value) ->
+ Value when is_binary(Value) ->
escape(Value);
- _ ->
+ Value when is_atom(Value) ->
+ escape(atom_to_list(Value));
+ Value ->
io_lib:format("~w", [Value])
end.
@@ -362,7 +365,9 @@ rpc_call(Node, Mod, Fun, Args) ->
%% form part of UTF-8 strings.
escape(Bin) when binary(Bin) ->
- escape_char(lists:reverse(binary_to_list(Bin)), []).
+ escape(binary_to_list(Bin));
+escape(L) when is_list(L) ->
+ escape_char(lists:reverse(L), []).
escape_char([$\\ | T], Acc) ->
escape_char(T, [$\\, $\\ | Acc]);
diff --git a/src/rabbit_load.erl b/src/rabbit_load.erl
index 7bf85347fb..6ef638cb59 100644
--- a/src/rabbit_load.erl
+++ b/src/rabbit_load.erl
@@ -41,7 +41,7 @@
-ifdef(use_specs).
-type(erlang_node() :: atom()).
--type(load() :: {{non_neg_integer(), float()}, erlang_node()}).
+-type(load() :: {{non_neg_integer(), integer() | 'unknown'}, erlang_node()}).
-spec(local_load/0 :: () -> load()).
-spec(remote_loads/0 :: () -> [load()]).
-spec(pick/0 :: () -> erlang_node()).
@@ -52,8 +52,11 @@
local_load() ->
LoadAvg = case whereis(cpu_sup) of
- undefined -> 0.0;
- _Other -> cpu_sup:avg1()
+ undefined -> unknown;
+ _ -> case cpu_sup:avg1() of
+ L when is_integer(L) -> L;
+ {error, timeout} -> unknown
+ end
end,
{{statistics(run_queue), LoadAvg}, node()}.
@@ -65,8 +68,12 @@ remote_loads() ->
pick() ->
RemoteLoads = remote_loads(),
{{RunQ, LoadAvg}, Node} = local_load(),
- %% add bias towards current node
- AdjustedLoadAvg = LoadAvg * ?FUDGE_FACTOR,
+ %% add bias towards current node; we rely on Erlang's term order
+ %% of SomeFloat < local_unknown < unknown.
+ AdjustedLoadAvg = case LoadAvg of
+ unknown -> local_unknown;
+ _ -> LoadAvg * ?FUDGE_FACTOR
+ end,
Loads = [{{RunQ, AdjustedLoadAvg}, Node} | RemoteLoads],
{_, SelectedNode} = lists:min(Loads),
SelectedNode.
diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl
index d91975359a..b1cc4d028f 100644
--- a/src/rabbit_multi.erl
+++ b/src/rabbit_multi.erl
@@ -114,12 +114,13 @@ action(status, [], RpcTimeout) ->
io:format("Status of all running nodes...~n", []),
call_all_nodes(
fun({Node, Pid}) ->
- Status = rpc:call(Node, rabbit, status, [], RpcTimeout),
+ RabbitRunning =
+ case is_rabbit_running(Node, RpcTimeout) of
+ false -> not_running;
+ true -> running
+ end,
io:format("Node '~p' with Pid ~p: ~p~n",
- [Node, Pid, case parse_status(Status) of
- false -> not_running;
- true -> running
- end])
+ [Node, Pid, RabbitRunning])
end);
action(stop_all, [], RpcTimeout) ->
@@ -197,7 +198,7 @@ start_node(NodeName, NodePort, RpcTimeout) ->
wait_for_rabbit_to_start(_ , RpcTimeout, _) when RpcTimeout < 0 ->
false;
wait_for_rabbit_to_start(Node, RpcTimeout, Port) ->
- case parse_status(rpc:call(Node, rabbit, status, [])) of
+ case is_rabbit_running(Node, RpcTimeout) of
true -> true;
false -> receive
{'EXIT', Port, PosixCode} ->
@@ -211,22 +212,20 @@ wait_for_rabbit_to_start(Node, RpcTimeout, Port) ->
run_cmd(FullPath) ->
erlang:open_port({spawn, FullPath}, [nouse_stdio]).
-parse_status({badrpc, _}) ->
- false;
-
-parse_status(Status) ->
- case lists:keysearch(running_applications, 1, Status) of
- {value, {running_applications, Apps}} ->
- lists:keymember(rabbit, 1, Apps);
- _ ->
- false
+is_rabbit_running(Node, RpcTimeout) ->
+ case rpc:call(Node, rabbit, status, [], RpcTimeout) of
+ {badrpc, _} -> false;
+ Status -> case proplists:get_value(running_applications, Status) of
+ undefined -> false;
+ Apps -> lists:keymember(rabbit, 1, Apps)
+ end
end.
with_os(Handlers) ->
{OsFamily, _} = os:type(),
- case lists:keysearch(OsFamily, 1, Handlers) of
- {value, {_, Handler}} -> Handler();
- false -> throw({unsupported_os, OsFamily})
+ case proplists:get_value(OsFamily, Handlers) of
+ undefined -> throw({unsupported_os, OsFamily});
+ Handler -> Handler()
end.
script_filename() ->
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl
index 71278bfb2a..0206f73e9f 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_plugin_activator.erl
@@ -68,7 +68,7 @@ start() ->
AppList
end,
AppVersions = [determine_version(App) || App <- AllApps],
- {value, {rabbit, RabbitVersion}} = lists:keysearch(rabbit, 1, AppVersions),
+ {rabbit, RabbitVersion} = proplists:lookup(rabbit, AppVersions),
%% Build the overall release descriptor
RDesc = {release,
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 69dbc008b3..beb5376164 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -49,7 +49,6 @@
-define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
-define(CLOSING_TIMEOUT, 1).
--define(CHANNEL_CLOSING_TIMEOUT, 1).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
%---------------------------------------------------------------------------
@@ -94,23 +93,19 @@
%% -> log error, wait for channels to terminate forcefully, start
%% terminate_connection timer, send close, *closed*
%% channel exit with soft error
-%% -> log error, start terminate_channel timer, mark channel as
-%% closing, *running*
-%% terminate_channel timeout -> remove 'closing' mark, *running*
+%% -> log error, mark channel as closing, *running*
%% handshake_timeout -> ignore, *running*
%% heartbeat timeout -> *throw*
%% closing:
%% socket close -> *terminate*
%% receive frame -> ignore, *closing*
-%% terminate_channel timeout -> remove 'closing' mark, *closing*
%% handshake_timeout -> ignore, *closing*
%% heartbeat timeout -> *throw*
%% channel exit with hard error
%% -> log error, wait for channels to terminate forcefully, start
%% terminate_connection timer, send close, *closed*
%% channel exit with soft error
-%% -> log error, start terminate_channel timer, mark channel as
-%% closing
+%% -> log error, mark channel as closing
%% if last channel to exit then send connection.close_ok,
%% start terminate_connection timer, *closed*
%% else *closing*
@@ -123,7 +118,6 @@
%% *closed*
%% receive frame -> ignore, *closed*
%% terminate_connection timeout -> *terminate*
-%% terminate_channel timeout -> remove 'closing' mark, *closed*
%% handshake_timeout -> ignore, *closed*
%% heartbeat timeout -> *throw*
%% channel exit -> log error, *closed*
@@ -292,8 +286,6 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State));
{'EXIT', Pid, Reason} ->
mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State));
- {terminate_channel, Channel, Ref1} ->
- mainloop(Parent, Deb, terminate_channel(Channel, Ref1, State));
terminate_connection ->
State;
handshake_timeout ->
@@ -341,32 +333,14 @@ close_connection(State = #v1{connection = #connection{
State#v1{connection_state = closed}.
close_channel(Channel, State) ->
- Ref = make_ref(),
- TRef = erlang:send_after(1000 * ?CHANNEL_CLOSING_TIMEOUT,
- self(),
- {terminate_channel, Channel, Ref}),
- put({closing_channel, Channel}, {Ref, TRef}),
- State.
-
-terminate_channel(Channel, Ref, State) ->
- case get({closing_channel, Channel}) of
- undefined -> ok; %% got close_ok in the meantime
- {Ref, _} -> erase({closing_channel, Channel}),
- ok;
- {_Ref, _} -> ok %% got close_ok, and have new closing channel
- end,
+ put({channel, Channel}, closing),
State.
handle_channel_exit(Channel, Reason, State) ->
- %% We remove the channel from the inbound map only. That allows
- %% the channel to be re-opened, but also means the remaining
- %% cleanup, including possibly closing the connection, is deferred
- %% until we get the (normal) exit signal.
- erase({channel, Channel}),
handle_exception(State, Channel, Reason).
handle_dependent_exit(Pid, normal, State) ->
- channel_cleanup(Pid),
+ erase({chpid, Pid}),
maybe_close(State);
handle_dependent_exit(Pid, Reason, State) ->
case channel_cleanup(Pid) of
@@ -376,17 +350,10 @@ handle_dependent_exit(Pid, Reason, State) ->
channel_cleanup(Pid) ->
case get({chpid, Pid}) of
- undefined ->
- case get({closing_chpid, Pid}) of
- undefined -> undefined;
- {channel, Channel} ->
- erase({closing_chpid, Pid}),
- Channel
- end;
- {channel, Channel} ->
- erase({channel, Channel}),
- erase({chpid, Pid}),
- Channel
+ undefined -> undefined;
+ {channel, Channel} -> erase({channel, Channel}),
+ erase({chpid, Pid}),
+ Channel
end.
all_channels() -> [Pid || {{chpid, Pid},_} <- get()].
@@ -451,7 +418,7 @@ handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS})
State;
handle_frame(Type, 0, Payload, State) ->
case analyze_frame(Type, Payload) of
- error -> throw({unknown_frame, Type, Payload});
+ error -> throw({unknown_frame, 0, Type, Payload});
heartbeat -> State;
trace -> State;
{method, MethodName, FieldsBin} ->
@@ -460,20 +427,34 @@ handle_frame(Type, 0, Payload, State) ->
end;
handle_frame(Type, Channel, Payload, State) ->
case analyze_frame(Type, Payload) of
- error -> throw({unknown_frame, Type, Payload});
+ error -> throw({unknown_frame, Channel, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
trace -> throw({unexpected_trace_frame, Channel});
AnalyzedFrame ->
%%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]),
case get({channel, Channel}) of
{chpid, ChPid} ->
- ok = check_for_close(Channel, ChPid, AnalyzedFrame),
+ case AnalyzedFrame of
+ {method, 'channel.close', _} ->
+ erase({channel, Channel});
+ _ -> ok
+ end,
ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame),
State;
+ closing ->
+ %% According to the spec, after sending a
+ %% channel.close we must ignore all frames except
+ %% channel.close_ok.
+ case AnalyzedFrame of
+ {method, 'channel.close_ok', _} ->
+ erase({channel, Channel});
+ _ -> ok
+ end,
+ State;
undefined ->
case State#v1.connection_state of
- running -> send_to_new_channel(
- Channel, AnalyzedFrame, State),
+ running -> ok = send_to_new_channel(
+ Channel, AnalyzedFrame, State),
State;
Other -> throw({channel_frame_while_starting,
Channel, Other, AnalyzedFrame})
@@ -703,7 +684,7 @@ i(channels, #v1{}) ->
i(user, #v1{connection = #connection{user = #user{username = Username}}}) ->
Username;
i(user, #v1{connection = #connection{user = none}}) ->
- none;
+ '';
i(vhost, #v1{connection = #connection{vhost = VHost}}) ->
VHost;
i(timeout, #v1{connection = #connection{timeout_sec = Timeout}}) ->
@@ -716,38 +697,17 @@ i(Item, #v1{}) ->
%%--------------------------------------------------------------------------
send_to_new_channel(Channel, AnalyzedFrame, State) ->
- case get({closing_channel, Channel}) of
- undefined ->
- #v1{sock = Sock,
- connection = #connection{
- frame_max = FrameMax,
- user = #user{username = Username},
- vhost = VHost}} = State,
- WriterPid = rabbit_writer:start(Sock, Channel, FrameMax),
- ChPid = rabbit_framing_channel:start_link(
- fun rabbit_channel:start_link/5,
- [Channel, self(), WriterPid, Username, VHost]),
- put({channel, Channel}, {chpid, ChPid}),
- put({chpid, ChPid}, {channel, Channel}),
- ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame);
- {_, TRef} ->
- %% According to the spec, after sending a channel.close we
- %% must ignore all frames except channel.close_ok.
- case AnalyzedFrame of
- {method, 'channel.close_ok', _} ->
- erlang:cancel_timer(TRef),
- erase({closing_channel, Channel}),
- ok;
- _Other -> ok
- end
- end.
-
-check_for_close(Channel, ChPid, {method, 'channel.close', _}) ->
- channel_cleanup(ChPid),
- put({closing_chpid, ChPid}, {channel, Channel}),
- ok;
-check_for_close(_Channel, _ChPid, _Frame) ->
- ok.
+ #v1{sock = Sock, connection = #connection{
+ frame_max = FrameMax,
+ user = #user{username = Username},
+ vhost = VHost}} = State,
+ WriterPid = rabbit_writer:start(Sock, Channel, FrameMax),
+ ChPid = rabbit_framing_channel:start_link(
+ fun rabbit_channel:start_link/5,
+ [Channel, self(), WriterPid, Username, VHost]),
+ put({channel, Channel}, {chpid, ChPid}),
+ put({chpid, ChPid}, {channel, Channel}),
+ ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame).
log_channel_error(ConnectionState, Channel, Reason) ->
rabbit_log:error("connection ~p (~p), channel ~p - error:~n~p~n",