diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_file.erl | 43 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 41 | ||||
| -rw-r--r-- | src/rabbit_node_monitor.erl | 27 |
4 files changed, 40 insertions, 77 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 7ae6aa25fb..a9af7335b9 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -199,8 +199,7 @@ rabbit_queue_index, gen, dict, ordsets, file_handle_cache, rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file, rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia, - mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon, - ssl_connection, ssl_record, gen_fsm, ssl]). + mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon]). %% HiPE compilation uses multiple cores anyway, but some bits are %% IO-bound so we can go faster if we parallelise a bit more. In @@ -264,7 +263,7 @@ maybe_hipe_compile() -> hipe_compile() -> Count = length(?HIPE_WORTHY), - io:format("~nHiPE compiling: |~s|~n |", + io:format("HiPE compiling: |~s|~n |", [string:copies("-", Count)]), T1 = erlang:now(), PidMRefs = [spawn_monitor(fun () -> [begin @@ -410,7 +409,6 @@ start(normal, []) -> end. stop(_State) -> - ok = rabbit_mnesia:update_cluster_nodes_status(), terminated_ok = error_logger:delete_report_handler(rabbit_error_logger), ok = rabbit_alarm:stop(), ok = case rabbit_mnesia:is_clustered() of diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl index 02487d127c..59df14f318 100644 --- a/src/rabbit_file.erl +++ b/src/rabbit_file.erl @@ -19,8 +19,7 @@ -include_lib("kernel/include/file.hrl"). -export([is_file/1, is_dir/1, file_size/1, ensure_dir/1, wildcard/2, list_dir/1]). --export([read_term_file/1, write_term_file/2, map_term_file/2, write_file/2, - write_file/3]). +-export([read_term_file/1, write_term_file/2, write_file/2, write_file/3]). -export([append_file/2, ensure_parent_dirs_exist/1]). -export([rename/2, delete/1, recursive_delete/1, recursive_copy/2]). -export([lock_file/1]). @@ -41,9 +40,6 @@ -spec(read_term_file/1 :: (file:filename()) -> {'ok', [any()]} | rabbit_types:error(any())). -spec(write_term_file/2 :: (file:filename(), [any()]) -> ok_or_error()). --spec(map_term_file/2 :: - (fun(([any()]) -> [any()]), file:filename()) - -> {'ok', [any()]} | rabbit_types:error(any())). -spec(write_file/2 :: (file:filename(), iodata()) -> ok_or_error()). -spec(write_file/3 :: (file:filename(), iodata(), [any()]) -> ok_or_error()). -spec(append_file/2 :: (file:filename(), string()) -> ok_or_error()). @@ -111,13 +107,18 @@ with_fhc_handle(Fun) -> after ok = file_handle_cache:release() end. -binary_to_terms(Data) -> - {ok, Tokens, _} = erl_scan:string(binary_to_list(Data)), - TokenGroups = group_tokens(Tokens), - [begin - {ok, Term} = erl_parse:parse_term(Tokens1), - Term - end || Tokens1 <- TokenGroups]. +read_term_file(File) -> + try + {ok, Data} = with_fhc_handle(fun () -> prim_file:read_file(File) end), + {ok, Tokens, _} = erl_scan:string(binary_to_list(Data)), + TokenGroups = group_tokens(Tokens), + {ok, [begin + {ok, Term} = erl_parse:parse_term(Tokens1), + Term + end || Tokens1 <- TokenGroups]} + catch + error:{badmatch, Error} -> Error + end. group_tokens(Ts) -> [lists:reverse(G) || G <- group_tokens([], Ts)]. @@ -126,28 +127,10 @@ group_tokens(Cur, []) -> [Cur]; group_tokens(Cur, [T = {dot, _} | Ts]) -> [[T | Cur] | group_tokens([], Ts)]; group_tokens(Cur, [T | Ts]) -> group_tokens([T | Cur], Ts). -read_term_file(File) -> - try - {ok, Data} = with_fhc_handle(fun () -> prim_file:read_file(File) end), - {ok, binary_to_terms(Data)} - catch - error:{badmatch, Error} -> Error - end. - write_term_file(File, Terms) -> write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) || Term <- Terms])). -map_term_file(Fun, File) -> - try - with_fhc_handle(fun () -> - {ok, Data} = prim_file:read_file(File), - {ok, Fun(binary_to_terms(Data))} - end) - catch - error:{badmatch, Error} -> Error - end. - write_file(Path, Data) -> write_file(Path, Data, []). %% write_file/3 and make_binary/1 are both based on corresponding diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index f8e6cac6a0..7aac7f3f63 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -41,9 +41,8 @@ empty_ram_only_tables/0, copy_db/1, wait_for_tables/0, - update_cluster_nodes_status/0, - on_node_up/2, + on_node_up/1, on_node_down/1 ]). @@ -66,8 +65,7 @@ -export_type([node_type/0]). -type(node_type() :: disc | ram). --type(node_status() :: {[ordsets:ordset(node())], [ordsets:ordset(node())], - [ordsets:ordset(node())]}). +-type(node_status() :: {[node()], [node()], [node()]}). %% Main interface -spec(prepare/0 :: () -> 'ok'). @@ -97,10 +95,9 @@ -spec(create_tables/0 :: () -> 'ok'). -spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())). -spec(wait_for_tables/1 :: ([atom()]) -> 'ok'). --spec(update_cluster_nodes_status/0 :: () -> 'ok'). %% Hooks used in `rabbit_node_monitor' --spec(on_node_up/2 :: (node(), boolean()) -> 'ok'). +-spec(on_node_up/1 :: (node()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). %% Functions used in internal rpc calls @@ -412,9 +409,9 @@ running_clustered_disc_nodes() -> cluster_status_if_running() -> case mnesia:system_info(is_running) of no -> error; - yes -> {ok, {ordsets:from_list(mnesia:system_info(db_nodes)), - ordsets:from_list(mnesia:table_info(schema, disc_copies)), - ordsets:from_list(mnesia:system_info(running_db_nodes))}} + yes -> {ok, {mnesia:system_info(db_nodes), + mnesia:table_info(schema, disc_copies), + mnesia:system_info(running_db_nodes)}} end. node_info() -> @@ -706,16 +703,8 @@ update_cluster_nodes_status() -> %% Hooks for `rabbit_node_monitor' %%-------------------------------------------------------------------- -on_node_up(Node, IsDiscNode) -> - {ok, _} = rabbit_file:map_term_file( - fun ([{AllNodes, DiscNodes, RunningNodes}]) -> - [{ordsets:add_element(Node, AllNodes), - case IsDiscNode of - true -> ordsets:add_element(Node, DiscNodes); - false -> DiscNodes - end, - ordsets:add_element(Node, RunningNodes)}] - end, cluster_nodes_status_filename()), +on_node_up(Node) -> + update_cluster_nodes_status(), case is_only_disc_node(Node) of true -> rabbit_log:info("cluster contains disc nodes again~n"); false -> ok @@ -726,13 +715,7 @@ on_node_down(Node) -> true -> rabbit_log:info("only running disc node went down~n"); false -> ok end, - {ok, _} = rabbit_file:map_term_file( - fun ([{AllNodes, DiscNodes, RunningNodes}]) -> - [{ordsets:del_element(Node, AllNodes), - ordsets:del_element(Node, DiscNodes), - ordsets:del_element(Node, RunningNodes)}] - end, cluster_nodes_status_filename()), - ok. + update_cluster_nodes_status(). %%-------------------------------------------------------------------- %% Internal helpers @@ -1026,9 +1009,11 @@ remove_node_if_mnesia_running(Node) -> %% propagated to all nodes case mnesia:del_table_copy(schema, Node) of {atomic, ok} -> - on_node_down(Node), + update_cluster_nodes_status(), + io:format("nodes: ~p~n", [running_clustered_disc_nodes()]), {_, []} = rpc:multicall(running_clustered_nodes(), - rabbit_mnesia, on_node_down, [Node]), + rabbit_mnesia, + update_cluster_nodes_status, []), ok; {aborted, Reason} -> {error, {failed_to_remove_node, Node, Reason}} diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index fee7c278b8..323cf0ce9e 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -22,7 +22,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([notify_cluster/0, rabbit_running_on/2]). +-export([notify_cluster/0, rabbit_running_on/1]). -define(SERVER, ?MODULE). -define(RABBIT_UP_RPC_TIMEOUT, 2000). @@ -32,7 +32,7 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(rabbit_running_on/2 :: (node(), boolean()) -> 'ok'). +-spec(rabbit_running_on/1 :: (node()) -> 'ok'). -spec(notify_cluster/0 :: () -> 'ok'). -endif. @@ -42,23 +42,20 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -rabbit_running_on(Node, IsDiscNode) -> - gen_server:cast(rabbit_node_monitor, {rabbit_running_on, Node, IsDiscNode}). +rabbit_running_on(Node) -> + gen_server:cast(rabbit_node_monitor, {rabbit_running_on, Node}). notify_cluster() -> Node = node(), - IsDiscNode = rabbit_mnesia:is_disc_node(), - RunningNodes = rabbit_mnesia:running_clustered_nodes() -- [Node], - DiscNodes = rabbit_mnesia:all_clustered_disc_nodes(), + Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node], %% notify other rabbits of this rabbit - case rpc:multicall(RunningNodes, rabbit_node_monitor, rabbit_running_on, - [Node, IsDiscNode], ?RABBIT_UP_RPC_TIMEOUT) of + case rpc:multicall(Nodes, rabbit_node_monitor, rabbit_running_on, + [Node], ?RABBIT_UP_RPC_TIMEOUT) of {_, [] } -> ok; {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad]) end, %% register other active rabbits with this rabbit - [rabbit_running_on(N, ordsets:is_element(N, DiscNodes)) || - N <- RunningNodes], + [ rabbit_running_on(N) || N <- Nodes ], ok. %%-------------------------------------------------------------------- @@ -69,12 +66,12 @@ init([]) -> handle_call(_Request, _From, State) -> {noreply, State}. -handle_cast({rabbit_running_on, Node, IsDiscNode}, Nodes) -> +handle_cast({rabbit_running_on, Node}, Nodes) -> case ordsets:is_element(Node, Nodes) of true -> {noreply, Nodes}; false -> rabbit_log:info("rabbit on node ~p up~n", [Node]), erlang:monitor(process, {rabbit, Node}), - ok = handle_live_rabbit(Node, IsDiscNode), + ok = handle_live_rabbit(Node), {noreply, ordsets:add_element(Node, Nodes)} end; handle_cast(_Msg, State) -> @@ -104,6 +101,6 @@ handle_dead_rabbit(Node) -> ok = rabbit_alarm:on_node_down(Node), ok = rabbit_mnesia:on_node_down(Node). -handle_live_rabbit(Node, IsDiscNode) -> +handle_live_rabbit(Node) -> ok = rabbit_alarm:on_node_up(Node), - ok = rabbit_mnesia:on_node_up(Node, IsDiscNode). + ok = rabbit_mnesia:on_node_up(Node). |
