summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl6
-rw-r--r--src/rabbit_file.erl43
-rw-r--r--src/rabbit_mnesia.erl41
-rw-r--r--src/rabbit_node_monitor.erl27
4 files changed, 77 insertions, 40 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index a9af7335b9..7ae6aa25fb 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -199,7 +199,8 @@
rabbit_queue_index, gen, dict, ordsets, file_handle_cache,
rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file,
rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia,
- mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon]).
+ mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon,
+ ssl_connection, ssl_record, gen_fsm, ssl]).
%% HiPE compilation uses multiple cores anyway, but some bits are
%% IO-bound so we can go faster if we parallelise a bit more. In
@@ -263,7 +264,7 @@ maybe_hipe_compile() ->
hipe_compile() ->
Count = length(?HIPE_WORTHY),
- io:format("HiPE compiling: |~s|~n |",
+ io:format("~nHiPE compiling: |~s|~n |",
[string:copies("-", Count)]),
T1 = erlang:now(),
PidMRefs = [spawn_monitor(fun () -> [begin
@@ -409,6 +410,7 @@ start(normal, []) ->
end.
stop(_State) ->
+ ok = rabbit_mnesia:update_cluster_nodes_status(),
terminated_ok = error_logger:delete_report_handler(rabbit_error_logger),
ok = rabbit_alarm:stop(),
ok = case rabbit_mnesia:is_clustered() of
diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl
index 59df14f318..02487d127c 100644
--- a/src/rabbit_file.erl
+++ b/src/rabbit_file.erl
@@ -19,7 +19,8 @@
-include_lib("kernel/include/file.hrl").
-export([is_file/1, is_dir/1, file_size/1, ensure_dir/1, wildcard/2, list_dir/1]).
--export([read_term_file/1, write_term_file/2, write_file/2, write_file/3]).
+-export([read_term_file/1, write_term_file/2, map_term_file/2, write_file/2,
+ write_file/3]).
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([rename/2, delete/1, recursive_delete/1, recursive_copy/2]).
-export([lock_file/1]).
@@ -40,6 +41,9 @@
-spec(read_term_file/1 ::
(file:filename()) -> {'ok', [any()]} | rabbit_types:error(any())).
-spec(write_term_file/2 :: (file:filename(), [any()]) -> ok_or_error()).
+-spec(map_term_file/2 ::
+ (fun(([any()]) -> [any()]), file:filename())
+ -> {'ok', [any()]} | rabbit_types:error(any())).
-spec(write_file/2 :: (file:filename(), iodata()) -> ok_or_error()).
-spec(write_file/3 :: (file:filename(), iodata(), [any()]) -> ok_or_error()).
-spec(append_file/2 :: (file:filename(), string()) -> ok_or_error()).
@@ -107,18 +111,13 @@ with_fhc_handle(Fun) ->
after ok = file_handle_cache:release()
end.
-read_term_file(File) ->
- try
- {ok, Data} = with_fhc_handle(fun () -> prim_file:read_file(File) end),
- {ok, Tokens, _} = erl_scan:string(binary_to_list(Data)),
- TokenGroups = group_tokens(Tokens),
- {ok, [begin
- {ok, Term} = erl_parse:parse_term(Tokens1),
- Term
- end || Tokens1 <- TokenGroups]}
- catch
- error:{badmatch, Error} -> Error
- end.
+binary_to_terms(Data) ->
+ {ok, Tokens, _} = erl_scan:string(binary_to_list(Data)),
+ TokenGroups = group_tokens(Tokens),
+ [begin
+ {ok, Term} = erl_parse:parse_term(Tokens1),
+ Term
+ end || Tokens1 <- TokenGroups].
group_tokens(Ts) -> [lists:reverse(G) || G <- group_tokens([], Ts)].
@@ -127,10 +126,28 @@ group_tokens(Cur, []) -> [Cur];
group_tokens(Cur, [T = {dot, _} | Ts]) -> [[T | Cur] | group_tokens([], Ts)];
group_tokens(Cur, [T | Ts]) -> group_tokens([T | Cur], Ts).
+read_term_file(File) ->
+ try
+ {ok, Data} = with_fhc_handle(fun () -> prim_file:read_file(File) end),
+ {ok, binary_to_terms(Data)}
+ catch
+ error:{badmatch, Error} -> Error
+ end.
+
write_term_file(File, Terms) ->
write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) ||
Term <- Terms])).
+map_term_file(Fun, File) ->
+ try
+ with_fhc_handle(fun () ->
+ {ok, Data} = prim_file:read_file(File),
+ {ok, Fun(binary_to_terms(Data))}
+ end)
+ catch
+ error:{badmatch, Error} -> Error
+ end.
+
write_file(Path, Data) -> write_file(Path, Data, []).
%% write_file/3 and make_binary/1 are both based on corresponding
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 7aac7f3f63..f8e6cac6a0 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -41,8 +41,9 @@
empty_ram_only_tables/0,
copy_db/1,
wait_for_tables/0,
+ update_cluster_nodes_status/0,
- on_node_up/1,
+ on_node_up/2,
on_node_down/1
]).
@@ -65,7 +66,8 @@
-export_type([node_type/0]).
-type(node_type() :: disc | ram).
--type(node_status() :: {[node()], [node()], [node()]}).
+-type(node_status() :: {[ordsets:ordset(node())], [ordsets:ordset(node())],
+ [ordsets:ordset(node())]}).
%% Main interface
-spec(prepare/0 :: () -> 'ok').
@@ -95,9 +97,10 @@
-spec(create_tables/0 :: () -> 'ok').
-spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())).
-spec(wait_for_tables/1 :: ([atom()]) -> 'ok').
+-spec(update_cluster_nodes_status/0 :: () -> 'ok').
%% Hooks used in `rabbit_node_monitor'
--spec(on_node_up/1 :: (node()) -> 'ok').
+-spec(on_node_up/2 :: (node(), boolean()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
%% Functions used in internal rpc calls
@@ -409,9 +412,9 @@ running_clustered_disc_nodes() ->
cluster_status_if_running() ->
case mnesia:system_info(is_running) of
no -> error;
- yes -> {ok, {mnesia:system_info(db_nodes),
- mnesia:table_info(schema, disc_copies),
- mnesia:system_info(running_db_nodes)}}
+ yes -> {ok, {ordsets:from_list(mnesia:system_info(db_nodes)),
+ ordsets:from_list(mnesia:table_info(schema, disc_copies)),
+ ordsets:from_list(mnesia:system_info(running_db_nodes))}}
end.
node_info() ->
@@ -703,8 +706,16 @@ update_cluster_nodes_status() ->
%% Hooks for `rabbit_node_monitor'
%%--------------------------------------------------------------------
-on_node_up(Node) ->
- update_cluster_nodes_status(),
+on_node_up(Node, IsDiscNode) ->
+ {ok, _} = rabbit_file:map_term_file(
+ fun ([{AllNodes, DiscNodes, RunningNodes}]) ->
+ [{ordsets:add_element(Node, AllNodes),
+ case IsDiscNode of
+ true -> ordsets:add_element(Node, DiscNodes);
+ false -> DiscNodes
+ end,
+ ordsets:add_element(Node, RunningNodes)}]
+ end, cluster_nodes_status_filename()),
case is_only_disc_node(Node) of
true -> rabbit_log:info("cluster contains disc nodes again~n");
false -> ok
@@ -715,7 +726,13 @@ on_node_down(Node) ->
true -> rabbit_log:info("only running disc node went down~n");
false -> ok
end,
- update_cluster_nodes_status().
+ {ok, _} = rabbit_file:map_term_file(
+ fun ([{AllNodes, DiscNodes, RunningNodes}]) ->
+ [{ordsets:del_element(Node, AllNodes),
+ ordsets:del_element(Node, DiscNodes),
+ ordsets:del_element(Node, RunningNodes)}]
+ end, cluster_nodes_status_filename()),
+ ok.
%%--------------------------------------------------------------------
%% Internal helpers
@@ -1009,11 +1026,9 @@ remove_node_if_mnesia_running(Node) ->
%% propagated to all nodes
case mnesia:del_table_copy(schema, Node) of
{atomic, ok} ->
- update_cluster_nodes_status(),
- io:format("nodes: ~p~n", [running_clustered_disc_nodes()]),
+ on_node_down(Node),
{_, []} = rpc:multicall(running_clustered_nodes(),
- rabbit_mnesia,
- update_cluster_nodes_status, []),
+ rabbit_mnesia, on_node_down, [Node]),
ok;
{aborted, Reason} ->
{error, {failed_to_remove_node, Node, Reason}}
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 323cf0ce9e..fee7c278b8 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -22,7 +22,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([notify_cluster/0, rabbit_running_on/1]).
+-export([notify_cluster/0, rabbit_running_on/2]).
-define(SERVER, ?MODULE).
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
@@ -32,7 +32,7 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(rabbit_running_on/1 :: (node()) -> 'ok').
+-spec(rabbit_running_on/2 :: (node(), boolean()) -> 'ok').
-spec(notify_cluster/0 :: () -> 'ok').
-endif.
@@ -42,20 +42,23 @@
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-rabbit_running_on(Node) ->
- gen_server:cast(rabbit_node_monitor, {rabbit_running_on, Node}).
+rabbit_running_on(Node, IsDiscNode) ->
+ gen_server:cast(rabbit_node_monitor, {rabbit_running_on, Node, IsDiscNode}).
notify_cluster() ->
Node = node(),
- Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node],
+ IsDiscNode = rabbit_mnesia:is_disc_node(),
+ RunningNodes = rabbit_mnesia:running_clustered_nodes() -- [Node],
+ DiscNodes = rabbit_mnesia:all_clustered_disc_nodes(),
%% notify other rabbits of this rabbit
- case rpc:multicall(Nodes, rabbit_node_monitor, rabbit_running_on,
- [Node], ?RABBIT_UP_RPC_TIMEOUT) of
+ case rpc:multicall(RunningNodes, rabbit_node_monitor, rabbit_running_on,
+ [Node, IsDiscNode], ?RABBIT_UP_RPC_TIMEOUT) of
{_, [] } -> ok;
{_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad])
end,
%% register other active rabbits with this rabbit
- [ rabbit_running_on(N) || N <- Nodes ],
+ [rabbit_running_on(N, ordsets:is_element(N, DiscNodes)) ||
+ N <- RunningNodes],
ok.
%%--------------------------------------------------------------------
@@ -66,12 +69,12 @@ init([]) ->
handle_call(_Request, _From, State) ->
{noreply, State}.
-handle_cast({rabbit_running_on, Node}, Nodes) ->
+handle_cast({rabbit_running_on, Node, IsDiscNode}, Nodes) ->
case ordsets:is_element(Node, Nodes) of
true -> {noreply, Nodes};
false -> rabbit_log:info("rabbit on node ~p up~n", [Node]),
erlang:monitor(process, {rabbit, Node}),
- ok = handle_live_rabbit(Node),
+ ok = handle_live_rabbit(Node, IsDiscNode),
{noreply, ordsets:add_element(Node, Nodes)}
end;
handle_cast(_Msg, State) ->
@@ -101,6 +104,6 @@ handle_dead_rabbit(Node) ->
ok = rabbit_alarm:on_node_down(Node),
ok = rabbit_mnesia:on_node_down(Node).
-handle_live_rabbit(Node) ->
+handle_live_rabbit(Node, IsDiscNode) ->
ok = rabbit_alarm:on_node_up(Node),
- ok = rabbit_mnesia:on_node_up(Node).
+ ok = rabbit_mnesia:on_node_up(Node, IsDiscNode).