summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl6
-rw-r--r--src/rabbit_file.erl43
-rw-r--r--src/rabbit_mnesia.erl41
-rw-r--r--src/rabbit_node_monitor.erl27
4 files changed, 40 insertions, 77 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 7ae6aa25fb..a9af7335b9 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -199,8 +199,7 @@
rabbit_queue_index, gen, dict, ordsets, file_handle_cache,
rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file,
rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia,
- mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon,
- ssl_connection, ssl_record, gen_fsm, ssl]).
+ mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon]).
%% HiPE compilation uses multiple cores anyway, but some bits are
%% IO-bound so we can go faster if we parallelise a bit more. In
@@ -264,7 +263,7 @@ maybe_hipe_compile() ->
hipe_compile() ->
Count = length(?HIPE_WORTHY),
- io:format("~nHiPE compiling: |~s|~n |",
+ io:format("HiPE compiling: |~s|~n |",
[string:copies("-", Count)]),
T1 = erlang:now(),
PidMRefs = [spawn_monitor(fun () -> [begin
@@ -410,7 +409,6 @@ start(normal, []) ->
end.
stop(_State) ->
- ok = rabbit_mnesia:update_cluster_nodes_status(),
terminated_ok = error_logger:delete_report_handler(rabbit_error_logger),
ok = rabbit_alarm:stop(),
ok = case rabbit_mnesia:is_clustered() of
diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl
index 02487d127c..59df14f318 100644
--- a/src/rabbit_file.erl
+++ b/src/rabbit_file.erl
@@ -19,8 +19,7 @@
-include_lib("kernel/include/file.hrl").
-export([is_file/1, is_dir/1, file_size/1, ensure_dir/1, wildcard/2, list_dir/1]).
--export([read_term_file/1, write_term_file/2, map_term_file/2, write_file/2,
- write_file/3]).
+-export([read_term_file/1, write_term_file/2, write_file/2, write_file/3]).
-export([append_file/2, ensure_parent_dirs_exist/1]).
-export([rename/2, delete/1, recursive_delete/1, recursive_copy/2]).
-export([lock_file/1]).
@@ -41,9 +40,6 @@
-spec(read_term_file/1 ::
(file:filename()) -> {'ok', [any()]} | rabbit_types:error(any())).
-spec(write_term_file/2 :: (file:filename(), [any()]) -> ok_or_error()).
--spec(map_term_file/2 ::
- (fun(([any()]) -> [any()]), file:filename())
- -> {'ok', [any()]} | rabbit_types:error(any())).
-spec(write_file/2 :: (file:filename(), iodata()) -> ok_or_error()).
-spec(write_file/3 :: (file:filename(), iodata(), [any()]) -> ok_or_error()).
-spec(append_file/2 :: (file:filename(), string()) -> ok_or_error()).
@@ -111,13 +107,18 @@ with_fhc_handle(Fun) ->
after ok = file_handle_cache:release()
end.
-binary_to_terms(Data) ->
- {ok, Tokens, _} = erl_scan:string(binary_to_list(Data)),
- TokenGroups = group_tokens(Tokens),
- [begin
- {ok, Term} = erl_parse:parse_term(Tokens1),
- Term
- end || Tokens1 <- TokenGroups].
+read_term_file(File) ->
+ try
+ {ok, Data} = with_fhc_handle(fun () -> prim_file:read_file(File) end),
+ {ok, Tokens, _} = erl_scan:string(binary_to_list(Data)),
+ TokenGroups = group_tokens(Tokens),
+ {ok, [begin
+ {ok, Term} = erl_parse:parse_term(Tokens1),
+ Term
+ end || Tokens1 <- TokenGroups]}
+ catch
+ error:{badmatch, Error} -> Error
+ end.
group_tokens(Ts) -> [lists:reverse(G) || G <- group_tokens([], Ts)].
@@ -126,28 +127,10 @@ group_tokens(Cur, []) -> [Cur];
group_tokens(Cur, [T = {dot, _} | Ts]) -> [[T | Cur] | group_tokens([], Ts)];
group_tokens(Cur, [T | Ts]) -> group_tokens([T | Cur], Ts).
-read_term_file(File) ->
- try
- {ok, Data} = with_fhc_handle(fun () -> prim_file:read_file(File) end),
- {ok, binary_to_terms(Data)}
- catch
- error:{badmatch, Error} -> Error
- end.
-
write_term_file(File, Terms) ->
write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) ||
Term <- Terms])).
-map_term_file(Fun, File) ->
- try
- with_fhc_handle(fun () ->
- {ok, Data} = prim_file:read_file(File),
- {ok, Fun(binary_to_terms(Data))}
- end)
- catch
- error:{badmatch, Error} -> Error
- end.
-
write_file(Path, Data) -> write_file(Path, Data, []).
%% write_file/3 and make_binary/1 are both based on corresponding
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index f8e6cac6a0..7aac7f3f63 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -41,9 +41,8 @@
empty_ram_only_tables/0,
copy_db/1,
wait_for_tables/0,
- update_cluster_nodes_status/0,
- on_node_up/2,
+ on_node_up/1,
on_node_down/1
]).
@@ -66,8 +65,7 @@
-export_type([node_type/0]).
-type(node_type() :: disc | ram).
--type(node_status() :: {[ordsets:ordset(node())], [ordsets:ordset(node())],
- [ordsets:ordset(node())]}).
+-type(node_status() :: {[node()], [node()], [node()]}).
%% Main interface
-spec(prepare/0 :: () -> 'ok').
@@ -97,10 +95,9 @@
-spec(create_tables/0 :: () -> 'ok').
-spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())).
-spec(wait_for_tables/1 :: ([atom()]) -> 'ok').
--spec(update_cluster_nodes_status/0 :: () -> 'ok').
%% Hooks used in `rabbit_node_monitor'
--spec(on_node_up/2 :: (node(), boolean()) -> 'ok').
+-spec(on_node_up/1 :: (node()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
%% Functions used in internal rpc calls
@@ -412,9 +409,9 @@ running_clustered_disc_nodes() ->
cluster_status_if_running() ->
case mnesia:system_info(is_running) of
no -> error;
- yes -> {ok, {ordsets:from_list(mnesia:system_info(db_nodes)),
- ordsets:from_list(mnesia:table_info(schema, disc_copies)),
- ordsets:from_list(mnesia:system_info(running_db_nodes))}}
+ yes -> {ok, {mnesia:system_info(db_nodes),
+ mnesia:table_info(schema, disc_copies),
+ mnesia:system_info(running_db_nodes)}}
end.
node_info() ->
@@ -706,16 +703,8 @@ update_cluster_nodes_status() ->
%% Hooks for `rabbit_node_monitor'
%%--------------------------------------------------------------------
-on_node_up(Node, IsDiscNode) ->
- {ok, _} = rabbit_file:map_term_file(
- fun ([{AllNodes, DiscNodes, RunningNodes}]) ->
- [{ordsets:add_element(Node, AllNodes),
- case IsDiscNode of
- true -> ordsets:add_element(Node, DiscNodes);
- false -> DiscNodes
- end,
- ordsets:add_element(Node, RunningNodes)}]
- end, cluster_nodes_status_filename()),
+on_node_up(Node) ->
+ update_cluster_nodes_status(),
case is_only_disc_node(Node) of
true -> rabbit_log:info("cluster contains disc nodes again~n");
false -> ok
@@ -726,13 +715,7 @@ on_node_down(Node) ->
true -> rabbit_log:info("only running disc node went down~n");
false -> ok
end,
- {ok, _} = rabbit_file:map_term_file(
- fun ([{AllNodes, DiscNodes, RunningNodes}]) ->
- [{ordsets:del_element(Node, AllNodes),
- ordsets:del_element(Node, DiscNodes),
- ordsets:del_element(Node, RunningNodes)}]
- end, cluster_nodes_status_filename()),
- ok.
+ update_cluster_nodes_status().
%%--------------------------------------------------------------------
%% Internal helpers
@@ -1026,9 +1009,11 @@ remove_node_if_mnesia_running(Node) ->
%% propagated to all nodes
case mnesia:del_table_copy(schema, Node) of
{atomic, ok} ->
- on_node_down(Node),
+ update_cluster_nodes_status(),
+ io:format("nodes: ~p~n", [running_clustered_disc_nodes()]),
{_, []} = rpc:multicall(running_clustered_nodes(),
- rabbit_mnesia, on_node_down, [Node]),
+ rabbit_mnesia,
+ update_cluster_nodes_status, []),
ok;
{aborted, Reason} ->
{error, {failed_to_remove_node, Node, Reason}}
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index fee7c278b8..323cf0ce9e 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -22,7 +22,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([notify_cluster/0, rabbit_running_on/2]).
+-export([notify_cluster/0, rabbit_running_on/1]).
-define(SERVER, ?MODULE).
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
@@ -32,7 +32,7 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(rabbit_running_on/2 :: (node(), boolean()) -> 'ok').
+-spec(rabbit_running_on/1 :: (node()) -> 'ok').
-spec(notify_cluster/0 :: () -> 'ok').
-endif.
@@ -42,23 +42,20 @@
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-rabbit_running_on(Node, IsDiscNode) ->
- gen_server:cast(rabbit_node_monitor, {rabbit_running_on, Node, IsDiscNode}).
+rabbit_running_on(Node) ->
+ gen_server:cast(rabbit_node_monitor, {rabbit_running_on, Node}).
notify_cluster() ->
Node = node(),
- IsDiscNode = rabbit_mnesia:is_disc_node(),
- RunningNodes = rabbit_mnesia:running_clustered_nodes() -- [Node],
- DiscNodes = rabbit_mnesia:all_clustered_disc_nodes(),
+ Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node],
%% notify other rabbits of this rabbit
- case rpc:multicall(RunningNodes, rabbit_node_monitor, rabbit_running_on,
- [Node, IsDiscNode], ?RABBIT_UP_RPC_TIMEOUT) of
+ case rpc:multicall(Nodes, rabbit_node_monitor, rabbit_running_on,
+ [Node], ?RABBIT_UP_RPC_TIMEOUT) of
{_, [] } -> ok;
{_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad])
end,
%% register other active rabbits with this rabbit
- [rabbit_running_on(N, ordsets:is_element(N, DiscNodes)) ||
- N <- RunningNodes],
+ [ rabbit_running_on(N) || N <- Nodes ],
ok.
%%--------------------------------------------------------------------
@@ -69,12 +66,12 @@ init([]) ->
handle_call(_Request, _From, State) ->
{noreply, State}.
-handle_cast({rabbit_running_on, Node, IsDiscNode}, Nodes) ->
+handle_cast({rabbit_running_on, Node}, Nodes) ->
case ordsets:is_element(Node, Nodes) of
true -> {noreply, Nodes};
false -> rabbit_log:info("rabbit on node ~p up~n", [Node]),
erlang:monitor(process, {rabbit, Node}),
- ok = handle_live_rabbit(Node, IsDiscNode),
+ ok = handle_live_rabbit(Node),
{noreply, ordsets:add_element(Node, Nodes)}
end;
handle_cast(_Msg, State) ->
@@ -104,6 +101,6 @@ handle_dead_rabbit(Node) ->
ok = rabbit_alarm:on_node_down(Node),
ok = rabbit_mnesia:on_node_down(Node).
-handle_live_rabbit(Node, IsDiscNode) ->
+handle_live_rabbit(Node) ->
ok = rabbit_alarm:on_node_up(Node),
- ok = rabbit_mnesia:on_node_up(Node, IsDiscNode).
+ ok = rabbit_mnesia:on_node_up(Node).