diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-26 13:00:00 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-26 13:00:00 +0000 |
| commit | 5c6a614616804771ff86d6a65e686e7e83553982 (patch) | |
| tree | 4d216f5e02b461103e29ae663348a1bc16875391 | |
| parent | d12aed0607ee8d49f5dcfab0f59226b676541f63 (diff) | |
| parent | 6f2e0baadf31ceb6ad37bef0fab2102acb51675a (diff) | |
| download | rabbitmq-server-git-5c6a614616804771ff86d6a65e686e7e83553982.tar.gz | |
merging in from default
| -rw-r--r-- | docs/rabbitmqctl.1.pod | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_multi.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_plugin_activator.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 2 |
7 files changed, 60 insertions, 48 deletions
diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod index c43ed2ea25..6b4208725f 100644 --- a/docs/rabbitmqctl.1.pod +++ b/docs/rabbitmqctl.1.pod @@ -279,7 +279,7 @@ exchange arguments =item list_bindings [-p I<vhostpath>] List bindings by virtual host. Each line printed describes a binding, -with the exchange name, routing key, queue name and arguments, +with the exchange name, queue name, routing key and arguments, separated by tab characters. =item list_connections [I<connectioninfoitem> ...] diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 1e37a98ffa..cd94c6e450 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -415,7 +415,7 @@ should_auto_delete(State) -> is_unused(State). handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> case lookup_ch(DownPid) of - not_found -> noreply(State); + not_found -> {ok, State}; #cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn, unacked_messages = UAM} -> erlang:demonitor(MonitorRef), @@ -431,16 +431,16 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> ChPid, State#q.blocked_consumers)}, case should_auto_delete(State1) of true -> - {stop, normal, State1}; + {stop, State1}; false -> State2 = case Txn of none -> State1; _ -> rollback_transaction(Txn, State1) end, - noreply( + {ok, deliver_or_requeue_n( [MsgWithAck || - {_MsgId, MsgWithAck} <- dict:to_list(UAM)], State2)) + {_MsgId, MsgWithAck} <- dict:to_list(UAM)], State2)} end end. @@ -610,10 +610,16 @@ handle_call({commit, Txn}, From, State) -> false -> NewState end); -handle_call({notify_down, ChPid}, From, State) -> - %% optimisation: we reply straight away so the sender can continue - gen_server2:reply(From, ok), - handle_ch_down(ChPid, State); +handle_call({notify_down, ChPid}, _From, State) -> + %% we want to do this synchronously, so that auto_deleted queues + %% are no longer visible by the time we send a response to the + %% client. The queue is ultimately deleted in terminate/2; if we + %% return stop with a reply, terminate/2 will be called by + %% gen_server2 *before* the reply is sent. + case handle_ch_down(ChPid, State) of + {ok, NewState} -> reply(ok, NewState); + {stop, NewState} -> {stop, normal, ok, NewState} + end; handle_call({basic_get, ChPid, NoAck}, _From, State = #q{q = #amqqueue{name = QName}, @@ -857,7 +863,10 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, NewState = State#q{owner = none}, {stop, normal, NewState}; handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> - handle_ch_down(DownPid, State); + case handle_ch_down(DownPid, State) of + {ok, NewState} -> noreply(NewState); + {stop, NewState} -> {stop, normal, NewState} + end; handle_info(timeout, State = #q{variable_queue_state = VQS}) -> noreply( diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index c7b81d3788..5e6229b138 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -52,12 +52,11 @@ %%---------------------------------------------------------------------------- start() -> - {ok, [[NodeNameStr|_]|_]} = init:get_argument(nodename), - NodeName = list_to_atom(NodeNameStr), + {ok, [[NodeStr|_]|_]} = init:get_argument(nodename), FullCommand = init:get_plain_arguments(), #params{quiet = Quiet, node = Node, command = Command, args = Args} = parse_args(FullCommand, #params{quiet = false, - node = rabbit_misc:localnode(NodeName)}), + node = rabbit_misc:makenode(NodeStr)}), Inform = case Quiet of true -> fun(_Format, _Args1) -> ok end; false -> fun(Format, Args1) -> @@ -97,12 +96,12 @@ error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args). print_badrpc_diagnostics(Node) -> fmt_stderr("diagnostics:", []), - NodeHost = rabbit_misc:nodehost(Node), + {_NodeName, NodeHost} = rabbit_misc:nodeparts(Node), case net_adm:names(NodeHost) of {error, EpmdReason} -> fmt_stderr("- unable to connect to epmd on ~s: ~w", [NodeHost, EpmdReason]); - {ok, NamePorts} -> + {ok, NamePorts} -> fmt_stderr("- nodes and their ports on ~s: ~p", [NodeHost, [{list_to_atom(Name), Port} || {Name, Port} <- NamePorts]]) @@ -116,11 +115,7 @@ print_badrpc_diagnostics(Node) -> ok. parse_args(["-n", NodeS | Args], Params) -> - Node = case lists:member($@, NodeS) of - true -> list_to_atom(NodeS); - false -> rabbit_misc:localnode(list_to_atom(NodeS)) - end, - parse_args(Args, Params#params{node = Node}); + parse_args(Args, Params#params{node = rabbit_misc:makenode(NodeS)}); parse_args(["-q" | Args], Params) -> parse_args(Args, Params#params{quiet = true}); parse_args([Command | Args], Params) -> @@ -186,7 +181,7 @@ default is to display name and (number of) messages. auto_delete, arguments]. The default is to display name and type. The output format for \"list_bindings\" is a list of rows containing -exchange name, routing key, queue name and arguments, in that order. +exchange name, queue name, routing key and arguments, in that order. <ConnectionInfoItem> must be a member of the list [node, address, port, peer_address, peer_port, state, channels, user, vhost, timeout, frame_max, @@ -289,7 +284,7 @@ action(list_exchanges, Node, Args, Inform) -> action(list_bindings, Node, Args, Inform) -> Inform("Listing bindings", []), {VHostArg, _} = parse_vhost_flag_bin(Args), - InfoKeys = [exchange_name, routing_key, queue_name, args], + InfoKeys = [exchange_name, queue_name, routing_key, args], display_info_list( [lists:zip(InfoKeys, tuple_to_list(X)) || X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])], diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index b20e9a86b6..d84c570b57 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -47,7 +47,7 @@ -export([with_user/2, with_vhost/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). -export([ensure_ok/2]). --export([localnode/1, nodehost/1, cookie_hash/0, tcp_name/3]). +-export([makenode/1, nodeparts/1, cookie_hash/0, tcp_name/3]). -export([intersperse/2, upmap/2, map_in_order/2]). -export([table_foreach/2]). -export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]). @@ -105,8 +105,8 @@ -spec(with_user_and_vhost/3 :: (username(), vhost(), thunk(A)) -> A). -spec(execute_mnesia_transaction/1 :: (thunk(A)) -> A). -spec(ensure_ok/2 :: (ok_or_error(), atom()) -> 'ok'). --spec(localnode/1 :: (atom()) -> erlang_node()). --spec(nodehost/1 :: (erlang_node()) -> string()). +-spec(makenode/1 :: ({string(), string()} | string()) -> erlang_node()). +-spec(nodeparts/1 :: (erlang_node() | string()) -> {string(), string()}). -spec(cookie_hash/0 :: () -> string()). -spec(tcp_name/3 :: (atom(), ip_address(), ip_port()) -> atom()). -spec(intersperse/2 :: (A, [A]) -> [A]). @@ -308,13 +308,19 @@ execute_mnesia_transaction(TxFun) -> ensure_ok(ok, _) -> ok; ensure_ok({error, Reason}, ErrorTag) -> throw({error, {ErrorTag, Reason}}). -localnode(Name) -> - list_to_atom(lists:append([atom_to_list(Name), "@", nodehost(node())])). - -nodehost(Node) -> - %% This is horrible, but there doesn't seem to be a way to split a - %% nodename into its constituent parts. - tl(lists:dropwhile(fun (E) -> E =/= $@ end, atom_to_list(Node))). +makenode({Prefix, Suffix}) -> + list_to_atom(lists:append([Prefix, "@", Suffix])); +makenode(NodeStr) -> + makenode(nodeparts(NodeStr)). + +nodeparts(Node) when is_atom(Node) -> + nodeparts(atom_to_list(Node)); +nodeparts(NodeStr) -> + case lists:splitwith(fun (E) -> E =/= $@ end, NodeStr) of + {Prefix, []} -> {_, Suffix} = nodeparts(node()), + {Prefix, Suffix}; + {Prefix, Suffix} -> {Prefix, tl(Suffix)} + end. cookie_hash() -> ssl_base64:encode(erlang:md5(atom_to_list(erlang:get_cookie()))). diff --git a/src/rabbit_multi.erl b/src/rabbit_multi.erl index b1cc4d028f..f364872eca 100644 --- a/src/rabbit_multi.erl +++ b/src/rabbit_multi.erl @@ -100,10 +100,12 @@ Available commands: action(start_all, [NodeCount], RpcTimeout) -> io:format("Starting all nodes...~n", []), N = list_to_integer(NodeCount), - {NodePids, Running} = start_nodes(N, N, [], true, - getenv("RABBITMQ_NODENAME"), - getenv("RABBITMQ_NODE_PORT"), - RpcTimeout), + {NodePids, Running} = + start_nodes(N, N, [], true, + rabbit_misc:nodeparts( + getenv("RABBITMQ_NODENAME")), + list_to_integer(getenv("RABBITMQ_NODE_PORT")), + RpcTimeout), write_pids_file(NodePids), case Running of true -> ok; @@ -158,24 +160,24 @@ start_nodes(0, _, PNodePid, Running, _, _, _) -> {PNodePid, Running}; start_nodes(N, Total, PNodePid, Running, NodeNameBase, NodePortBase, RpcTimeout) -> + {NodePre, NodeSuff} = NodeNameBase, NodeNumber = Total - N, - NodeName = if NodeNumber == 0 -> + NodePre1 = if NodeNumber == 0 -> %% For compatibility with running a single node - NodeNameBase; + NodePre; true -> - NodeNameBase ++ "_" ++ integer_to_list(NodeNumber) + NodePre ++ "_" ++ integer_to_list(NodeNumber) end, - {NodePid, Started} = start_node(NodeName, - list_to_integer(NodePortBase) + NodeNumber, + {NodePid, Started} = start_node(rabbit_misc:makenode({NodePre1, NodeSuff}), + NodePortBase + NodeNumber, RpcTimeout), start_nodes(N - 1, Total, [NodePid | PNodePid], Started and Running, NodeNameBase, NodePortBase, RpcTimeout). -start_node(NodeName, NodePort, RpcTimeout) -> - os:putenv("RABBITMQ_NODENAME", NodeName), +start_node(Node, NodePort, RpcTimeout) -> + os:putenv("RABBITMQ_NODENAME", atom_to_list(Node)), os:putenv("RABBITMQ_NODE_PORT", integer_to_list(NodePort)), - Node = rabbit_misc:localnode(list_to_atom(NodeName)), io:format("Starting node ~s...~n", [Node]), case rpc:call(Node, os, getpid, []) of {badrpc, _} -> diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_plugin_activator.erl index f28c4a6ec5..e22d844fdf 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_plugin_activator.erl @@ -63,7 +63,7 @@ start() -> %% applications along the way AllApps = case catch sets:to_list(expand_dependencies(RequiredApps)) of {failed_to_load_app, App, Err} -> - error("failed to load application ~s: ~p", [App, Err]); + error("failed to load application ~s:~n~p", [App, Err]); AppList -> AppList end, @@ -98,14 +98,14 @@ start() -> end, ok; {error, Module, Error} -> - error("generation of boot script file ~s failed: ~w", + error("generation of boot script file ~s failed:~n~s", [ScriptFile, Module:format_error(Error)]) end, case post_process_script(ScriptFile) of ok -> ok; {error, Reason} -> - error("post processing of boot script file ~s failed: ~w", + error("post processing of boot script file ~s failed:~n~w", [ScriptFile, Reason]) end, case systools:script2boot(RootName) of diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 547287dbc9..8114247604 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -498,7 +498,7 @@ test_cluster_management() -> ok = control_action(cluster, ["invalid1@invalid", "invalid2@invalid"]), - SecondaryNode = rabbit_misc:localnode(hare), + SecondaryNode = rabbit_misc:makenode("hare"), case net_adm:ping(SecondaryNode) of pong -> passed = test_cluster_management2(SecondaryNode); pang -> io:format("Skipping clustering tests with node ~p~n", |
