diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-12-09 17:57:59 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-12-09 17:57:59 +0000 |
| commit | 6fbaa75b6957e4bde48e7fcc62ad7aab40a6f089 (patch) | |
| tree | ef785995a96325ea6354c2876c64c6cf0bfa7001 /src | |
| parent | 23f063e3caeac2663bb5b3171ec580898217d63b (diff) | |
| parent | fc084ed0e925235f8b3d3e32e9f8ca3854bb55e3 (diff) | |
| download | rabbitmq-server-git-6fbaa75b6957e4bde48e7fcc62ad7aab40a6f089.tar.gz | |
Merging in from default
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_binary_generator.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_binary_parser.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 32 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 41 | ||||
| -rw-r--r-- | src/rabbit_log.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_prelaunch.erl (renamed from src/rabbit_plugin_activator.erl) | 46 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_writer.erl | 2 |
13 files changed, 79 insertions, 73 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index a619ef362d..ace8f2868a 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -170,12 +170,6 @@ %%--------------------------------------------------------------------------- --import(application). --import(mnesia). --import(lists). --import(inet). --import(gen_tcp). - -include("rabbit_framing.hrl"). -include("rabbit.hrl"). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index f525ffd1ad..94c05f8b88 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -49,11 +49,6 @@ -export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). --import(mnesia). --import(gen_server2). --import(lists). --import(queue). - -include("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d781cd35e3..c1972c261b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -50,10 +50,6 @@ -export([init_with_backing_queue_state/4]). --import(queue). --import(erlang). --import(lists). - % Queue's state -record(q, {q, exclusive_consumer, diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index b2997ae259..a5297a707c 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -49,8 +49,6 @@ -export([ensure_content_encoded/2, clear_encoded_content/1]). -export([map_exception/3]). --import(lists). - %%---------------------------------------------------------------------------- -ifdef(use_specs). diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl index ebf063f031..4b4358b4c7 100644 --- a/src/rabbit_binary_parser.erl +++ b/src/rabbit_binary_parser.erl @@ -36,8 +36,6 @@ -export([parse_table/1, parse_properties/2]). -export([ensure_content_decoded/1, clear_decoded_content/1]). --import(lists). - %%---------------------------------------------------------------------------- -ifdef(use_specs). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 0c8ad00ae3..a1db2ccfd6 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -260,14 +260,24 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Msg), noreply(State); -handle_cast({deliver, ConsumerTag, AckRequired, Msg}, +handle_cast({deliver, ConsumerTag, AckRequired, + Msg = {_QName, QPid, _MsgId, Redelivered, + #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKey, + content = Content}}}, State = #ch{writer_pid = WriterPid, next_tag = DeliveryTag}) -> State1 = lock_message(AckRequired, ack_record(DeliveryTag, ConsumerTag, Msg), State), - ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg), - {_QName, QPid, _MsgId, _Redelivered, _Msg} = Msg, + + M = #'basic.deliver'{consumer_tag = ConsumerTag, + delivery_tag = DeliveryTag, + redelivered = Redelivered, + exchange = ExchangeName#resource.name, + routing_key = RoutingKey}, + rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content), + maybe_incr_stats([{QPid, 1}], case AckRequired of true -> deliver; @@ -1240,22 +1250,6 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> lock_message(false, _MsgStruct, State) -> State. -internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, - {_QName, QPid, _MsgId, Redelivered, - #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, - content = Content}}) -> - M = #'basic.deliver'{consumer_tag = ConsumerTag, - delivery_tag = DeliveryTag, - redelivered = Redelivered, - exchange = ExchangeName#resource.name, - routing_key = RoutingKey}, - ok = case Notify of - true -> rabbit_writer:send_command_and_notify( - WriterPid, QPid, self(), M, Content); - false -> rabbit_writer:send_command(WriterPid, M, Content) - end. - terminate(State) -> stop_confirm_timer(State), pg_local:leave(rabbit_channels, self()), diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 72b77b1f07..360217a221 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -32,7 +32,7 @@ -module(rabbit_control). -include("rabbit.hrl"). --export([start/0, stop/0, action/5]). +-export([start/0, stop/0, action/5, diagnostics/1]). -define(RPC_TIMEOUT, infinity). @@ -50,6 +50,7 @@ (atom(), node(), [string()], [{string(), any()}], fun ((string(), [any()]) -> 'ok')) -> 'ok'). +-spec(diagnostics/1 :: (node()) -> [{string(), [any()]}]). -spec(usage/0 :: () -> no_return()). -endif. @@ -116,24 +117,28 @@ fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args). print_error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args). print_badrpc_diagnostics(Node) -> - fmt_stderr("diagnostics:", []), + [fmt_stderr(Fmt, Args) || {Fmt, Args} <- diagnostics(Node)]. + +diagnostics(Node) -> {_NodeName, NodeHost} = rabbit_misc:nodeparts(Node), - case net_adm:names(NodeHost) of - {error, EpmdReason} -> - fmt_stderr("- unable to connect to epmd on ~s: ~w", - [NodeHost, EpmdReason]); - {ok, NamePorts} -> - fmt_stderr("- nodes and their ports on ~s: ~p", - [NodeHost, [{list_to_atom(Name), Port} || - {Name, Port} <- NamePorts]]) - end, - fmt_stderr("- current node: ~w", [node()]), - case init:get_argument(home) of - {ok, [[Home]]} -> fmt_stderr("- current node home dir: ~s", [Home]); - Other -> fmt_stderr("- no current node home dir: ~p", [Other]) - end, - fmt_stderr("- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]), - ok. + [ + {"diagnostics:", []}, + case net_adm:names(NodeHost) of + {error, EpmdReason} -> + {"- unable to connect to epmd on ~s: ~w", + [NodeHost, EpmdReason]}; + {ok, NamePorts} -> + {"- nodes and their ports on ~s: ~p", + [NodeHost, [{list_to_atom(Name), Port} || + {Name, Port} <- NamePorts]]} + end, + {"- current node: ~w", [node()]}, + case init:get_argument(home) of + {ok, [[Home]]} -> {"- current node home dir: ~s", [Home]}; + Other -> {"- no current node home dir: ~p", [Other]} + end, + {"- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]} + ]. stop() -> ok. diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index 863f77e7eb..a1a8364c63 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -41,9 +41,6 @@ -export([debug/1, debug/2, message/4, info/1, info/2, warning/1, warning/2, error/1, error/2]). --import(io). --import(error_logger). - -define(SERVER, ?MODULE). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 1a05b7298a..52d76ac48b 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -68,11 +68,6 @@ -export([now_ms/0]). -export([lock_file/1]). --import(mnesia). --import(lists). --import(cover). --import(disk_log). - %%---------------------------------------------------------------------------- -ifdef(use_specs). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index bdc4d76c98..c97988d0f7 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -402,7 +402,7 @@ init_db(ClusterNodes, Force) -> ensure_version_ok(rabbit_upgrade:read_version()), ensure_schema_ok(); {[], false, _} -> - %% First RAM node in cluster, start from scratch + %% Nothing there at all, start from scratch ok = create_schema(); {[AnotherNode|_], _, _} -> %% Subsequent node in cluster, catch up diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_prelaunch.erl index 072f297e69..867ecb1257 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_prelaunch.erl @@ -29,11 +29,12 @@ %% Contributor(s): ______________________________________. %% --module(rabbit_plugin_activator). +-module(rabbit_prelaunch). -export([start/0, stop/0]). -define(BaseApps, [rabbit]). +-define(ERROR_CODE, 1). %%---------------------------------------------------------------------------- %% Specs @@ -52,7 +53,7 @@ start() -> io:format("Activating RabbitMQ plugins ...~n"), %% Determine our various directories - [PluginDir, UnpackedPluginDir] = init:get_plain_arguments(), + [PluginDir, UnpackedPluginDir, Node] = init:get_plain_arguments(), RootName = UnpackedPluginDir ++ "/rabbit", %% Unpack any .ez plugins @@ -130,7 +131,10 @@ start() -> [io:format("* ~s-~s~n", [App, proplists:get_value(App, AppVersions)]) || App <- PluginApps], io:nl(), - halt(), + + ok = duplicate_node_check(Node), + + terminate(0), ok. stop() -> @@ -251,6 +255,40 @@ process_entry(Entry = {apply,{application,start_boot,[rabbit,permanent]}}) -> process_entry(Entry) -> [Entry]. +%% Check whether a node with the same name is already running +duplicate_node_check([]) -> + %% Ignore running node while installing windows service + ok; +duplicate_node_check(Node) -> + {NodeName, NodeHost} = rabbit_misc:nodeparts(Node), + case net_adm:names(NodeHost) of + {ok, NamePorts} -> + case proplists:is_defined(NodeName, NamePorts) of + true -> io:format("node with name ~p " + "already running on ~p~n", + [NodeName, NodeHost]), + [io:format(Fmt ++ "~n", Args) || + {Fmt, Args} <- rabbit_control:diagnostics(Node)], + terminate(?ERROR_CODE); + false -> ok + end; + {error, address} -> ok; + {error, EpmdReason} -> terminate("unexpected epmd error: ~p~n", + [EpmdReason]) + end. + terminate(Fmt, Args) -> io:format("ERROR: " ++ Fmt ++ "~n", Args), - halt(1). + terminate(?ERROR_CODE). + +terminate(Status) -> + case os:type() of + {unix, _} -> + halt(Status); + {win32, _} -> + init:stop(Status), + receive + after infinity -> ok + end + end. + diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 1142eb8b6d..3343bb99cd 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -35,8 +35,6 @@ -export([all_tests/0, test_parsing/0]). --import(lists). - -include("rabbit.hrl"). -include("rabbit_framing.hrl"). -include_lib("kernel/include/file.hrl"). diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 0159609d9a..068ac18612 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -39,8 +39,6 @@ send_command_and_notify/4, send_command_and_notify/5]). -export([internal_send_command/4, internal_send_command/6]). --import(gen_tcp). - -record(wstate, {sock, channel, frame_max, protocol}). -define(HIBERNATE_AFTER, 5000). |
