diff options
| author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-06-06 16:39:49 +0100 |
|---|---|---|
| committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-06-06 16:39:49 +0100 |
| commit | bc64b13637114e8e3720dc7f4f341cc7880854b0 (patch) | |
| tree | c10b5fcdacba8caa62f343291aca9babdd07de04 | |
| parent | 6e8ef1d7853f3b3e4438f56d6999a348dd4412fe (diff) | |
| download | rabbitmq-server-git-bc64b13637114e8e3720dc7f4f341cc7880854b0.tar.gz | |
add node to existing cluster status instead of creating a new one on on_node_{up,down}
This is because the invocation might come before the mnesia data is propagated
correctly.
| -rw-r--r-- | src/rabbit.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_file.erl | 43 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 41 | ||||
| -rw-r--r-- | src/rabbit_node_monitor.erl | 27 |
4 files changed, 77 insertions, 40 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index a9af7335b9..7ae6aa25fb 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -199,7 +199,8 @@ rabbit_queue_index, gen, dict, ordsets, file_handle_cache, rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file, rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia, - mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon]). + mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon, + ssl_connection, ssl_record, gen_fsm, ssl]). %% HiPE compilation uses multiple cores anyway, but some bits are %% IO-bound so we can go faster if we parallelise a bit more. In @@ -263,7 +264,7 @@ maybe_hipe_compile() -> hipe_compile() -> Count = length(?HIPE_WORTHY), - io:format("HiPE compiling: |~s|~n |", + io:format("~nHiPE compiling: |~s|~n |", [string:copies("-", Count)]), T1 = erlang:now(), PidMRefs = [spawn_monitor(fun () -> [begin @@ -409,6 +410,7 @@ start(normal, []) -> end. stop(_State) -> + ok = rabbit_mnesia:update_cluster_nodes_status(), terminated_ok = error_logger:delete_report_handler(rabbit_error_logger), ok = rabbit_alarm:stop(), ok = case rabbit_mnesia:is_clustered() of diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl index 59df14f318..02487d127c 100644 --- a/src/rabbit_file.erl +++ b/src/rabbit_file.erl @@ -19,7 +19,8 @@ -include_lib("kernel/include/file.hrl"). -export([is_file/1, is_dir/1, file_size/1, ensure_dir/1, wildcard/2, list_dir/1]). --export([read_term_file/1, write_term_file/2, write_file/2, write_file/3]). +-export([read_term_file/1, write_term_file/2, map_term_file/2, write_file/2, + write_file/3]). -export([append_file/2, ensure_parent_dirs_exist/1]). -export([rename/2, delete/1, recursive_delete/1, recursive_copy/2]). -export([lock_file/1]). @@ -40,6 +41,9 @@ -spec(read_term_file/1 :: (file:filename()) -> {'ok', [any()]} | rabbit_types:error(any())). -spec(write_term_file/2 :: (file:filename(), [any()]) -> ok_or_error()). +-spec(map_term_file/2 :: + (fun(([any()]) -> [any()]), file:filename()) + -> {'ok', [any()]} | rabbit_types:error(any())). -spec(write_file/2 :: (file:filename(), iodata()) -> ok_or_error()). -spec(write_file/3 :: (file:filename(), iodata(), [any()]) -> ok_or_error()). -spec(append_file/2 :: (file:filename(), string()) -> ok_or_error()). @@ -107,18 +111,13 @@ with_fhc_handle(Fun) -> after ok = file_handle_cache:release() end. -read_term_file(File) -> - try - {ok, Data} = with_fhc_handle(fun () -> prim_file:read_file(File) end), - {ok, Tokens, _} = erl_scan:string(binary_to_list(Data)), - TokenGroups = group_tokens(Tokens), - {ok, [begin - {ok, Term} = erl_parse:parse_term(Tokens1), - Term - end || Tokens1 <- TokenGroups]} - catch - error:{badmatch, Error} -> Error - end. +binary_to_terms(Data) -> + {ok, Tokens, _} = erl_scan:string(binary_to_list(Data)), + TokenGroups = group_tokens(Tokens), + [begin + {ok, Term} = erl_parse:parse_term(Tokens1), + Term + end || Tokens1 <- TokenGroups]. group_tokens(Ts) -> [lists:reverse(G) || G <- group_tokens([], Ts)]. @@ -127,10 +126,28 @@ group_tokens(Cur, []) -> [Cur]; group_tokens(Cur, [T = {dot, _} | Ts]) -> [[T | Cur] | group_tokens([], Ts)]; group_tokens(Cur, [T | Ts]) -> group_tokens([T | Cur], Ts). +read_term_file(File) -> + try + {ok, Data} = with_fhc_handle(fun () -> prim_file:read_file(File) end), + {ok, binary_to_terms(Data)} + catch + error:{badmatch, Error} -> Error + end. + write_term_file(File, Terms) -> write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) || Term <- Terms])). +map_term_file(Fun, File) -> + try + with_fhc_handle(fun () -> + {ok, Data} = prim_file:read_file(File), + {ok, Fun(binary_to_terms(Data))} + end) + catch + error:{badmatch, Error} -> Error + end. + write_file(Path, Data) -> write_file(Path, Data, []). %% write_file/3 and make_binary/1 are both based on corresponding diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 7aac7f3f63..f8e6cac6a0 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -41,8 +41,9 @@ empty_ram_only_tables/0, copy_db/1, wait_for_tables/0, + update_cluster_nodes_status/0, - on_node_up/1, + on_node_up/2, on_node_down/1 ]). @@ -65,7 +66,8 @@ -export_type([node_type/0]). -type(node_type() :: disc | ram). --type(node_status() :: {[node()], [node()], [node()]}). +-type(node_status() :: {[ordsets:ordset(node())], [ordsets:ordset(node())], + [ordsets:ordset(node())]}). %% Main interface -spec(prepare/0 :: () -> 'ok'). @@ -95,9 +97,10 @@ -spec(create_tables/0 :: () -> 'ok'). -spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())). -spec(wait_for_tables/1 :: ([atom()]) -> 'ok'). +-spec(update_cluster_nodes_status/0 :: () -> 'ok'). %% Hooks used in `rabbit_node_monitor' --spec(on_node_up/1 :: (node()) -> 'ok'). +-spec(on_node_up/2 :: (node(), boolean()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). %% Functions used in internal rpc calls @@ -409,9 +412,9 @@ running_clustered_disc_nodes() -> cluster_status_if_running() -> case mnesia:system_info(is_running) of no -> error; - yes -> {ok, {mnesia:system_info(db_nodes), - mnesia:table_info(schema, disc_copies), - mnesia:system_info(running_db_nodes)}} + yes -> {ok, {ordsets:from_list(mnesia:system_info(db_nodes)), + ordsets:from_list(mnesia:table_info(schema, disc_copies)), + ordsets:from_list(mnesia:system_info(running_db_nodes))}} end. node_info() -> @@ -703,8 +706,16 @@ update_cluster_nodes_status() -> %% Hooks for `rabbit_node_monitor' %%-------------------------------------------------------------------- -on_node_up(Node) -> - update_cluster_nodes_status(), +on_node_up(Node, IsDiscNode) -> + {ok, _} = rabbit_file:map_term_file( + fun ([{AllNodes, DiscNodes, RunningNodes}]) -> + [{ordsets:add_element(Node, AllNodes), + case IsDiscNode of + true -> ordsets:add_element(Node, DiscNodes); + false -> DiscNodes + end, + ordsets:add_element(Node, RunningNodes)}] + end, cluster_nodes_status_filename()), case is_only_disc_node(Node) of true -> rabbit_log:info("cluster contains disc nodes again~n"); false -> ok @@ -715,7 +726,13 @@ on_node_down(Node) -> true -> rabbit_log:info("only running disc node went down~n"); false -> ok end, - update_cluster_nodes_status(). + {ok, _} = rabbit_file:map_term_file( + fun ([{AllNodes, DiscNodes, RunningNodes}]) -> + [{ordsets:del_element(Node, AllNodes), + ordsets:del_element(Node, DiscNodes), + ordsets:del_element(Node, RunningNodes)}] + end, cluster_nodes_status_filename()), + ok. %%-------------------------------------------------------------------- %% Internal helpers @@ -1009,11 +1026,9 @@ remove_node_if_mnesia_running(Node) -> %% propagated to all nodes case mnesia:del_table_copy(schema, Node) of {atomic, ok} -> - update_cluster_nodes_status(), - io:format("nodes: ~p~n", [running_clustered_disc_nodes()]), + on_node_down(Node), {_, []} = rpc:multicall(running_clustered_nodes(), - rabbit_mnesia, - update_cluster_nodes_status, []), + rabbit_mnesia, on_node_down, [Node]), ok; {aborted, Reason} -> {error, {failed_to_remove_node, Node, Reason}} diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 323cf0ce9e..fee7c278b8 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -22,7 +22,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([notify_cluster/0, rabbit_running_on/1]). +-export([notify_cluster/0, rabbit_running_on/2]). -define(SERVER, ?MODULE). -define(RABBIT_UP_RPC_TIMEOUT, 2000). @@ -32,7 +32,7 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(rabbit_running_on/1 :: (node()) -> 'ok'). +-spec(rabbit_running_on/2 :: (node(), boolean()) -> 'ok'). -spec(notify_cluster/0 :: () -> 'ok'). -endif. @@ -42,20 +42,23 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -rabbit_running_on(Node) -> - gen_server:cast(rabbit_node_monitor, {rabbit_running_on, Node}). +rabbit_running_on(Node, IsDiscNode) -> + gen_server:cast(rabbit_node_monitor, {rabbit_running_on, Node, IsDiscNode}). notify_cluster() -> Node = node(), - Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node], + IsDiscNode = rabbit_mnesia:is_disc_node(), + RunningNodes = rabbit_mnesia:running_clustered_nodes() -- [Node], + DiscNodes = rabbit_mnesia:all_clustered_disc_nodes(), %% notify other rabbits of this rabbit - case rpc:multicall(Nodes, rabbit_node_monitor, rabbit_running_on, - [Node], ?RABBIT_UP_RPC_TIMEOUT) of + case rpc:multicall(RunningNodes, rabbit_node_monitor, rabbit_running_on, + [Node, IsDiscNode], ?RABBIT_UP_RPC_TIMEOUT) of {_, [] } -> ok; {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad]) end, %% register other active rabbits with this rabbit - [ rabbit_running_on(N) || N <- Nodes ], + [rabbit_running_on(N, ordsets:is_element(N, DiscNodes)) || + N <- RunningNodes], ok. %%-------------------------------------------------------------------- @@ -66,12 +69,12 @@ init([]) -> handle_call(_Request, _From, State) -> {noreply, State}. -handle_cast({rabbit_running_on, Node}, Nodes) -> +handle_cast({rabbit_running_on, Node, IsDiscNode}, Nodes) -> case ordsets:is_element(Node, Nodes) of true -> {noreply, Nodes}; false -> rabbit_log:info("rabbit on node ~p up~n", [Node]), erlang:monitor(process, {rabbit, Node}), - ok = handle_live_rabbit(Node), + ok = handle_live_rabbit(Node, IsDiscNode), {noreply, ordsets:add_element(Node, Nodes)} end; handle_cast(_Msg, State) -> @@ -101,6 +104,6 @@ handle_dead_rabbit(Node) -> ok = rabbit_alarm:on_node_down(Node), ok = rabbit_mnesia:on_node_down(Node). -handle_live_rabbit(Node) -> +handle_live_rabbit(Node, IsDiscNode) -> ok = rabbit_alarm:on_node_up(Node), - ok = rabbit_mnesia:on_node_up(Node). + ok = rabbit_mnesia:on_node_up(Node, IsDiscNode). |
