summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-12-09 17:57:59 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2010-12-09 17:57:59 +0000
commit6fbaa75b6957e4bde48e7fcc62ad7aab40a6f089 (patch)
treeef785995a96325ea6354c2876c64c6cf0bfa7001 /src
parent23f063e3caeac2663bb5b3171ec580898217d63b (diff)
parentfc084ed0e925235f8b3d3e32e9f8ca3854bb55e3 (diff)
downloadrabbitmq-server-git-6fbaa75b6957e4bde48e7fcc62ad7aab40a6f089.tar.gz
Merging in from default
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl6
-rw-r--r--src/rabbit_amqqueue.erl5
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_binary_generator.erl2
-rw-r--r--src/rabbit_binary_parser.erl2
-rw-r--r--src/rabbit_channel.erl32
-rw-r--r--src/rabbit_control.erl41
-rw-r--r--src/rabbit_log.erl3
-rw-r--r--src/rabbit_misc.erl5
-rw-r--r--src/rabbit_mnesia.erl2
-rw-r--r--src/rabbit_prelaunch.erl (renamed from src/rabbit_plugin_activator.erl)46
-rw-r--r--src/rabbit_tests.erl2
-rw-r--r--src/rabbit_writer.erl2
13 files changed, 79 insertions, 73 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index a619ef362d..ace8f2868a 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -170,12 +170,6 @@
%%---------------------------------------------------------------------------
--import(application).
--import(mnesia).
--import(lists).
--import(inet).
--import(gen_tcp).
-
-include("rabbit_framing.hrl").
-include("rabbit.hrl").
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index f525ffd1ad..94c05f8b88 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -49,11 +49,6 @@
-export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
--import(mnesia).
--import(gen_server2).
--import(lists).
--import(queue).
-
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d781cd35e3..c1972c261b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -50,10 +50,6 @@
-export([init_with_backing_queue_state/4]).
--import(queue).
--import(erlang).
--import(lists).
-
% Queue's state
-record(q, {q,
exclusive_consumer,
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index b2997ae259..a5297a707c 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -49,8 +49,6 @@
-export([ensure_content_encoded/2, clear_encoded_content/1]).
-export([map_exception/3]).
--import(lists).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index ebf063f031..4b4358b4c7 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -36,8 +36,6 @@
-export([parse_table/1, parse_properties/2]).
-export([ensure_content_decoded/1, clear_decoded_content/1]).
--import(lists).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 0c8ad00ae3..a1db2ccfd6 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -260,14 +260,24 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(WriterPid, Msg),
noreply(State);
-handle_cast({deliver, ConsumerTag, AckRequired, Msg},
+handle_cast({deliver, ConsumerTag, AckRequired,
+ Msg = {_QName, QPid, _MsgId, Redelivered,
+ #basic_message{exchange_name = ExchangeName,
+ routing_key = RoutingKey,
+ content = Content}}},
State = #ch{writer_pid = WriterPid,
next_tag = DeliveryTag}) ->
State1 = lock_message(AckRequired,
ack_record(DeliveryTag, ConsumerTag, Msg),
State),
- ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg),
- {_QName, QPid, _MsgId, _Redelivered, _Msg} = Msg,
+
+ M = #'basic.deliver'{consumer_tag = ConsumerTag,
+ delivery_tag = DeliveryTag,
+ redelivered = Redelivered,
+ exchange = ExchangeName#resource.name,
+ routing_key = RoutingKey},
+ rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content),
+
maybe_incr_stats([{QPid, 1}],
case AckRequired of
true -> deliver;
@@ -1240,22 +1250,6 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
lock_message(false, _MsgStruct, State) ->
State.
-internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag,
- {_QName, QPid, _MsgId, Redelivered,
- #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
- content = Content}}) ->
- M = #'basic.deliver'{consumer_tag = ConsumerTag,
- delivery_tag = DeliveryTag,
- redelivered = Redelivered,
- exchange = ExchangeName#resource.name,
- routing_key = RoutingKey},
- ok = case Notify of
- true -> rabbit_writer:send_command_and_notify(
- WriterPid, QPid, self(), M, Content);
- false -> rabbit_writer:send_command(WriterPid, M, Content)
- end.
-
terminate(State) ->
stop_confirm_timer(State),
pg_local:leave(rabbit_channels, self()),
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 72b77b1f07..360217a221 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -32,7 +32,7 @@
-module(rabbit_control).
-include("rabbit.hrl").
--export([start/0, stop/0, action/5]).
+-export([start/0, stop/0, action/5, diagnostics/1]).
-define(RPC_TIMEOUT, infinity).
@@ -50,6 +50,7 @@
(atom(), node(), [string()], [{string(), any()}],
fun ((string(), [any()]) -> 'ok'))
-> 'ok').
+-spec(diagnostics/1 :: (node()) -> [{string(), [any()]}]).
-spec(usage/0 :: () -> no_return()).
-endif.
@@ -116,24 +117,28 @@ fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args).
print_error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args).
print_badrpc_diagnostics(Node) ->
- fmt_stderr("diagnostics:", []),
+ [fmt_stderr(Fmt, Args) || {Fmt, Args} <- diagnostics(Node)].
+
+diagnostics(Node) ->
{_NodeName, NodeHost} = rabbit_misc:nodeparts(Node),
- case net_adm:names(NodeHost) of
- {error, EpmdReason} ->
- fmt_stderr("- unable to connect to epmd on ~s: ~w",
- [NodeHost, EpmdReason]);
- {ok, NamePorts} ->
- fmt_stderr("- nodes and their ports on ~s: ~p",
- [NodeHost, [{list_to_atom(Name), Port} ||
- {Name, Port} <- NamePorts]])
- end,
- fmt_stderr("- current node: ~w", [node()]),
- case init:get_argument(home) of
- {ok, [[Home]]} -> fmt_stderr("- current node home dir: ~s", [Home]);
- Other -> fmt_stderr("- no current node home dir: ~p", [Other])
- end,
- fmt_stderr("- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]),
- ok.
+ [
+ {"diagnostics:", []},
+ case net_adm:names(NodeHost) of
+ {error, EpmdReason} ->
+ {"- unable to connect to epmd on ~s: ~w",
+ [NodeHost, EpmdReason]};
+ {ok, NamePorts} ->
+ {"- nodes and their ports on ~s: ~p",
+ [NodeHost, [{list_to_atom(Name), Port} ||
+ {Name, Port} <- NamePorts]]}
+ end,
+ {"- current node: ~w", [node()]},
+ case init:get_argument(home) of
+ {ok, [[Home]]} -> {"- current node home dir: ~s", [Home]};
+ Other -> {"- no current node home dir: ~p", [Other]}
+ end,
+ {"- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]}
+ ].
stop() ->
ok.
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index 863f77e7eb..a1a8364c63 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -41,9 +41,6 @@
-export([debug/1, debug/2, message/4, info/1, info/2,
warning/1, warning/2, error/1, error/2]).
--import(io).
--import(error_logger).
-
-define(SERVER, ?MODULE).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 1a05b7298a..52d76ac48b 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -68,11 +68,6 @@
-export([now_ms/0]).
-export([lock_file/1]).
--import(mnesia).
--import(lists).
--import(cover).
--import(disk_log).
-
%%----------------------------------------------------------------------------
-ifdef(use_specs).
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index bdc4d76c98..c97988d0f7 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -402,7 +402,7 @@ init_db(ClusterNodes, Force) ->
ensure_version_ok(rabbit_upgrade:read_version()),
ensure_schema_ok();
{[], false, _} ->
- %% First RAM node in cluster, start from scratch
+ %% Nothing there at all, start from scratch
ok = create_schema();
{[AnotherNode|_], _, _} ->
%% Subsequent node in cluster, catch up
diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_prelaunch.erl
index 072f297e69..867ecb1257 100644
--- a/src/rabbit_plugin_activator.erl
+++ b/src/rabbit_prelaunch.erl
@@ -29,11 +29,12 @@
%% Contributor(s): ______________________________________.
%%
--module(rabbit_plugin_activator).
+-module(rabbit_prelaunch).
-export([start/0, stop/0]).
-define(BaseApps, [rabbit]).
+-define(ERROR_CODE, 1).
%%----------------------------------------------------------------------------
%% Specs
@@ -52,7 +53,7 @@ start() ->
io:format("Activating RabbitMQ plugins ...~n"),
%% Determine our various directories
- [PluginDir, UnpackedPluginDir] = init:get_plain_arguments(),
+ [PluginDir, UnpackedPluginDir, Node] = init:get_plain_arguments(),
RootName = UnpackedPluginDir ++ "/rabbit",
%% Unpack any .ez plugins
@@ -130,7 +131,10 @@ start() ->
[io:format("* ~s-~s~n", [App, proplists:get_value(App, AppVersions)])
|| App <- PluginApps],
io:nl(),
- halt(),
+
+ ok = duplicate_node_check(Node),
+
+ terminate(0),
ok.
stop() ->
@@ -251,6 +255,40 @@ process_entry(Entry = {apply,{application,start_boot,[rabbit,permanent]}}) ->
process_entry(Entry) ->
[Entry].
+%% Check whether a node with the same name is already running
+duplicate_node_check([]) ->
+ %% Ignore running node while installing windows service
+ ok;
+duplicate_node_check(Node) ->
+ {NodeName, NodeHost} = rabbit_misc:nodeparts(Node),
+ case net_adm:names(NodeHost) of
+ {ok, NamePorts} ->
+ case proplists:is_defined(NodeName, NamePorts) of
+ true -> io:format("node with name ~p "
+ "already running on ~p~n",
+ [NodeName, NodeHost]),
+ [io:format(Fmt ++ "~n", Args) ||
+ {Fmt, Args} <- rabbit_control:diagnostics(Node)],
+ terminate(?ERROR_CODE);
+ false -> ok
+ end;
+ {error, address} -> ok;
+ {error, EpmdReason} -> terminate("unexpected epmd error: ~p~n",
+ [EpmdReason])
+ end.
+
terminate(Fmt, Args) ->
io:format("ERROR: " ++ Fmt ++ "~n", Args),
- halt(1).
+ terminate(?ERROR_CODE).
+
+terminate(Status) ->
+ case os:type() of
+ {unix, _} ->
+ halt(Status);
+ {win32, _} ->
+ init:stop(Status),
+ receive
+ after infinity -> ok
+ end
+ end.
+
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 1142eb8b6d..3343bb99cd 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -35,8 +35,6 @@
-export([all_tests/0, test_parsing/0]).
--import(lists).
-
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
-include_lib("kernel/include/file.hrl").
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 0159609d9a..068ac18612 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -39,8 +39,6 @@
send_command_and_notify/4, send_command_and_notify/5]).
-export([internal_send_command/4, internal_send_command/6]).
--import(gen_tcp).
-
-record(wstate, {sock, channel, frame_max, protocol}).
-define(HIBERNATE_AFTER, 5000).