diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-11-15 22:46:45 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-11-15 22:46:45 +0000 |
| commit | a5c489b17f85c935a41b618936db03125ab819dd (patch) | |
| tree | 6876b27385f734227bafd04bfda5c7aee55bb380 /src | |
| parent | 19d62579c3cf28e05300ed068c95dd1d7976e885 (diff) | |
| parent | 53f90c8adb1a9cfedc3dd552663d596af58dbe94 (diff) | |
| download | rabbitmq-server-git-a5c489b17f85c935a41b618936db03125ab819dd.tar.gz | |
merge bug23495 into default
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 87 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 63 | ||||
| -rw-r--r-- | src/rabbit_connection_sup.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_heartbeat.erl | 54 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 53 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 93 | ||||
| -rw-r--r-- | src/rabbit_net.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 31 | ||||
| -rw-r--r-- | src/rabbit_upgrade.erl | 156 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 51 |
11 files changed, 464 insertions, 154 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 8c36a9f0a4..a1dd2c2e40 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -301,49 +301,38 @@ run_boot_step({StepName, Attributes}) -> ok end. -module_attributes(Module) -> - case catch Module:module_info(attributes) of - {'EXIT', {undef, [{Module, module_info, _} | _]}} -> - io:format("WARNING: module ~p not found, so not scanned for boot steps.~n", - [Module]), - []; - {'EXIT', Reason} -> - exit(Reason); - V -> - V - end. - boot_steps() -> - AllApps = [App || {App, _, _} <- application:loaded_applications()], - Modules = lists:usort( - lists:append([Modules - || {ok, Modules} <- - [application:get_key(App, modules) - || App <- AllApps]])), - UnsortedSteps = - lists:flatmap(fun (Module) -> - [{StepName, Attributes} - || {rabbit_boot_step, [{StepName, Attributes}]} - <- module_attributes(Module)] - end, Modules), - sort_boot_steps(UnsortedSteps). + sort_boot_steps(rabbit_misc:all_module_attributes(rabbit_boot_step)). + +vertices(_Module, Steps) -> + [{StepName, {StepName, Atts}} || {StepName, Atts} <- Steps]. + +edges(_Module, Steps) -> + [case Key of + requires -> {StepName, OtherStep}; + enables -> {OtherStep, StepName} + end || {StepName, Atts} <- Steps, + {Key, OtherStep} <- Atts, + Key =:= requires orelse Key =:= enables]. + +graph_build_error({vertex, duplicate, StepName}) -> + boot_error("Duplicate boot step name: ~w~n", [StepName]); +graph_build_error({edge, Reason, From, To}) -> + boot_error( + "Could not add boot step dependency of ~w on ~w:~n~s", + [To, From, + case Reason of + {bad_vertex, V} -> + io_lib:format("Boot step not registered: ~w~n", [V]); + {bad_edge, [First | Rest]} -> + [io_lib:format("Cyclic dependency: ~w", [First]), + [io_lib:format(" depends on ~w", [Next]) || Next <- Rest], + io_lib:format(" depends on ~w~n", [First])] + end]). sort_boot_steps(UnsortedSteps) -> - G = digraph:new([acyclic]), - - %% Add vertices, with duplicate checking. - [case digraph:vertex(G, StepName) of - false -> digraph:add_vertex(G, StepName, Step); - _ -> boot_error("Duplicate boot step name: ~w~n", [StepName]) - end || Step = {StepName, _Attrs} <- UnsortedSteps], - - %% Add edges, detecting cycles and missing vertices. - lists:foreach(fun ({StepName, Attributes}) -> - [add_boot_step_dep(G, StepName, PrecedingStepName) - || {requires, PrecedingStepName} <- Attributes], - [add_boot_step_dep(G, SucceedingStepName, StepName) - || {enables, SucceedingStepName} <- Attributes] - end, UnsortedSteps), + G = rabbit_misc:build_acyclic_graph( + fun vertices/2, fun edges/2, fun graph_build_error/1, UnsortedSteps), %% Use topological sort to find a consistent ordering (if there is %% one, otherwise fail). @@ -365,24 +354,6 @@ sort_boot_steps(UnsortedSteps) -> [MissingFunctions]) end. -add_boot_step_dep(G, RunsSecond, RunsFirst) -> - case digraph:add_edge(G, RunsSecond, RunsFirst) of - {error, Reason} -> - boot_error("Could not add boot step dependency of ~w on ~w:~n~s", - [RunsSecond, RunsFirst, - case Reason of - {bad_vertex, V} -> - io_lib:format("Boot step not registered: ~w~n", [V]); - {bad_edge, [First | Rest]} -> - [io_lib:format("Cyclic dependency: ~w", [First]), - [io_lib:format(" depends on ~w", [Next]) - || Next <- Rest], - io_lib:format(" depends on ~w~n", [First])] - end]); - _ -> - ok - end. - %%--------------------------------------------------------------------------- log_location(Type) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fe2c975b4e..75f285dffb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -315,6 +315,25 @@ ch_record(ChPid) -> store_ch_record(C = #cr{ch_pid = ChPid}) -> put({ch, ChPid}, C). +maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount, + acktags = ChAckTags, + txn = Txn, + unsent_message_count = UnsentMessageCount}) -> + case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount, Txn} of + {0, 0, 0, none} -> ok = erase_ch_record(C), + false; + _ -> store_ch_record(C), + true + end. + +erase_ch_record(#cr{ch_pid = ChPid, + limiter_pid = LimiterPid, + monitor_ref = MonitorRef}) -> + ok = rabbit_limiter:unregister(LimiterPid, self()), + erlang:demonitor(MonitorRef), + erase({ch, ChPid}), + ok. + all_ch_record() -> [C || {{ch, _}, C} <- get()]. @@ -361,7 +380,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, end, NewC = C#cr{unsent_message_count = Count + 1, acktags = ChAckTags1}, - store_ch_record(NewC), + true = maybe_store_ch_record(NewC), {NewActiveConsumers, NewBlockedConsumers} = case ch_record_state_transition(C, NewC) of ok -> {queue:in(QEntry, ActiveConsumersTail), @@ -380,7 +399,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, deliver_msgs_to_consumers(Funs, FunAcc1, State2); %% if IsMsgReady then we've hit the limiter false when IsMsgReady -> - store_ch_record(C#cr{is_limit_active = true}), + true = maybe_store_ch_record(C#cr{is_limit_active = true}), {NewActiveConsumers, NewBlockedConsumers} = move_consumers(ChPid, ActiveConsumers, @@ -479,7 +498,7 @@ possibly_unblock(State, ChPid, Update) -> State; C -> NewC = Update(C), - store_ch_record(NewC), + maybe_store_ch_record(NewC), case ch_record_state_transition(C, NewC) of ok -> State; unblock -> {NewBlockedConsumers, NewActiveConsumers} = @@ -500,10 +519,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> case lookup_ch(DownPid) of not_found -> {ok, State}; - #cr{monitor_ref = MonitorRef, ch_pid = ChPid, txn = Txn, - acktags = ChAckTags} -> - erlang:demonitor(MonitorRef), - erase({ch, ChPid}), + C = #cr{ch_pid = ChPid, txn = Txn, acktags = ChAckTags} -> + ok = erase_ch_record(C), State1 = State#q{ exclusive_consumer = case Holder of {ChPid, _} -> none; @@ -562,7 +579,7 @@ commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, %% by the channel. C = #cr{acktags = ChAckTags} = lookup_ch(ChPid), ChAckTags1 = subtract_acks(ChAckTags, AckTags), - store_ch_record(C#cr{acktags = ChAckTags1, txn = none}), + maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}), State#q{backing_queue_state = BQS1}. rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ, @@ -772,8 +789,9 @@ handle_call({basic_get, ChPid, NoAck}, _From, {{Message, IsDelivered, AckTag, Remaining}, State2} -> case AckRequired of true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), - store_ch_record( - C#cr{acktags = sets:add_element(AckTag, ChAckTags)}); + true = maybe_store_ch_record( + C#cr{acktags = sets:add_element(AckTag, + ChAckTags)}); false -> ok end, Msg = {QName, self(), AckTag, IsDelivered, Message}, @@ -791,8 +809,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, - store_ch_record(C#cr{consumer_count = ConsumerCount +1, - limiter_pid = LimiterPid}), + true = maybe_store_ch_record(C#cr{consumer_count = ConsumerCount +1, + limiter_pid = LimiterPid}), ok = case ConsumerCount of 0 -> rabbit_limiter:register(LimiterPid, self()); _ -> ok @@ -826,12 +844,15 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, not_found -> ok = maybe_send_reply(ChPid, OkMsg), reply(ok, State); - C = #cr{consumer_count = ConsumerCount, limiter_pid = LimiterPid} -> - store_ch_record(C#cr{consumer_count = ConsumerCount - 1}), - case ConsumerCount of - 1 -> ok = rabbit_limiter:unregister(LimiterPid, self()); - _ -> ok - end, + C = #cr{consumer_count = ConsumerCount, + limiter_pid = LimiterPid} -> + C1 = C#cr{consumer_count = ConsumerCount -1}, + maybe_store_ch_record( + case ConsumerCount of + 1 -> ok = rabbit_limiter:unregister(LimiterPid, self()), + C1#cr{limiter_pid = undefined}; + _ -> C1 + end), ok = maybe_send_reply(ChPid, OkMsg), NewState = State#q{exclusive_consumer = cancel_holder(ChPid, @@ -880,7 +901,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> noreply(State); C = #cr{acktags = ChAckTags} -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), - store_ch_record(C#cr{acktags = ChAckTags1}), + maybe_store_ch_record(C#cr{acktags = ChAckTags1}), noreply(requeue_and_run(AckTags, State)) end; @@ -904,7 +925,7 @@ handle_cast({ack, Txn, AckTags, ChPid}, {C#cr{acktags = ChAckTags1}, BQ:ack(AckTags, BQS)}; _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)} end, - store_ch_record(C1), + maybe_store_ch_record(C1), noreply(State#q{backing_queue_state = BQS1}) end; @@ -915,7 +936,7 @@ handle_cast({reject, AckTags, Requeue, ChPid}, noreply(State); C = #cr{acktags = ChAckTags} -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), - store_ch_record(C#cr{acktags = ChAckTags1}), + maybe_store_ch_record(C#cr{acktags = ChAckTags1}), noreply(case Requeue of true -> requeue_and_run(AckTags, State); false -> BQS1 = BQ:ack(AckTags, BQS), diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index 495baeb240..ff3995b54a 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -66,7 +66,8 @@ start_link() -> supervisor2:start_child( SupPid, {reader, {rabbit_reader, start_link, - [ChannelSupSupPid, Collector, start_heartbeat_fun(SupPid)]}, + [ChannelSupSupPid, Collector, + rabbit_heartbeat:start_heartbeat_fun(SupPid)]}, intrinsic, ?MAX_WAIT, worker, [rabbit_reader]}), {ok, SupPid, ReaderPid}. @@ -78,22 +79,3 @@ reader(Pid) -> init([]) -> {ok, {{one_for_all, 0, 1}, []}}. -start_heartbeat_fun(SupPid) -> - fun (_Sock, 0) -> - none; - (Sock, TimeoutSec) -> - Parent = self(), - {ok, Sender} = - supervisor2:start_child( - SupPid, {heartbeat_sender, - {rabbit_heartbeat, start_heartbeat_sender, - [Parent, Sock, TimeoutSec]}, - transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), - {ok, Receiver} = - supervisor2:start_child( - SupPid, {heartbeat_receiver, - {rabbit_heartbeat, start_heartbeat_receiver, - [Parent, Sock, TimeoutSec]}, - transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), - {Sender, Receiver} - end. diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 6b21274529..6c0a727bdf 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -346,8 +346,6 @@ format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] = Value) when is_binary(TableEntryKey) andalso is_atom(TableEntryType) -> io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]); -format_info_item([C|_] = Value) when is_number(C), C >= 32, C =< 255 -> - Value; format_info_item(Value) -> io_lib:format("~w", [Value]). diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index a9945af1d4..589bf7cc9d 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -32,7 +32,7 @@ -module(rabbit_heartbeat). -export([start_heartbeat_sender/3, start_heartbeat_receiver/3, - pause_monitor/1, resume_monitor/1]). + start_heartbeat_fun/1, pause_monitor/1, resume_monitor/1]). -include("rabbit.hrl"). @@ -41,16 +41,28 @@ -ifdef(use_specs). -export_type([heartbeaters/0]). +-export_type([start_heartbeat_fun/0]). --type(heartbeaters() :: rabbit_types:maybe({pid(), pid()})). +-type(heartbeaters() :: {rabbit_types:maybe(pid()), rabbit_types:maybe(pid())}). + +-type(heartbeat_callback() :: fun (() -> any())). + +-type(start_heartbeat_fun() :: + fun((rabbit_net:socket(), non_neg_integer(), heartbeat_callback(), + non_neg_integer(), heartbeat_callback()) -> + no_return())). -spec(start_heartbeat_sender/3 :: - (pid(), rabbit_net:socket(), non_neg_integer()) -> + (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) -> rabbit_types:ok(pid())). -spec(start_heartbeat_receiver/3 :: - (pid(), rabbit_net:socket(), non_neg_integer()) -> + (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) -> rabbit_types:ok(pid())). +-spec(start_heartbeat_fun/1 :: + (pid()) -> start_heartbeat_fun()). + + -spec(pause_monitor/1 :: (heartbeaters()) -> 'ok'). -spec(resume_monitor/1 :: (heartbeaters()) -> 'ok'). @@ -58,40 +70,60 @@ %%---------------------------------------------------------------------------- -start_heartbeat_sender(_Parent, Sock, TimeoutSec) -> +start_heartbeat_sender(Sock, TimeoutSec, SendFun) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case %% where the last message was sent just after a heartbeat. heartbeater( {Sock, TimeoutSec * 1000 div 2, send_oct, 0, fun () -> - catch rabbit_net:send( - Sock, rabbit_binary_generator:build_heartbeat_frame()), + SendFun(), continue end}). -start_heartbeat_receiver(Parent, Sock, TimeoutSec) -> +start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out between %% 2 and 3 intervals after the last data has been received. heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, fun () -> - Parent ! timeout, + ReceiveFun(), stop end}). -pause_monitor(none) -> +start_heartbeat_fun(SupPid) -> + fun (Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) -> + {ok, Sender} = + start_heartbeater(SendTimeoutSec, SupPid, Sock, + SendFun, heartbeat_sender, + start_heartbeat_sender), + {ok, Receiver} = + start_heartbeater(ReceiveTimeoutSec, SupPid, Sock, + ReceiveFun, heartbeat_receiver, + start_heartbeat_receiver), + {Sender, Receiver} + end. + +pause_monitor({_Sender, none}) -> ok; pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok. -resume_monitor(none) -> +resume_monitor({_Sender, none}) -> ok; resume_monitor({_Sender, Receiver}) -> Receiver ! resume, ok. %%---------------------------------------------------------------------------- +start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) -> + {ok, none}; +start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) -> + supervisor2:start_child( + SupPid, {Name, + {rabbit_heartbeat, Callback, + [Sock, TimeoutSec, TimeoutFun]}, + transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}). heartbeater(Params) -> {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, {0, 0}) end)}. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index d5c8bd49cb..0522afdc8d 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -64,6 +64,7 @@ -export([recursive_delete/1, dict_cons/3, orddict_cons/3, unlink_and_capture_exit/1]). -export([get_options/2]). +-export([all_module_attributes/1, build_acyclic_graph/4]). -export([now_ms/0]). -import(mnesia). @@ -83,6 +84,12 @@ -type(optdef() :: {flag, string()} | {option, string(), any()}). -type(channel_or_connection_exit() :: rabbit_types:channel_exit() | rabbit_types:connection_exit()). +-type(digraph_label() :: term()). +-type(graph_vertex_fun() :: + fun ((atom(), [term()]) -> {digraph:vertex(), digraph_label()})). +-type(graph_edge_fun() :: + fun ((atom(), [term()]) -> {digraph:vertex(), digraph:vertex()})). +-type(graph_error_fun() :: fun ((any()) -> any() | no_return())). -spec(method_record_type/1 :: (rabbit_framing:amqp_method_record()) -> rabbit_framing:amqp_method_name()). @@ -183,6 +190,10 @@ -spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok'). -spec(get_options/2 :: ([optdef()], [string()]) -> {[string()], [{string(), any()}]}). +-spec(all_module_attributes/1 :: (atom()) -> [{atom(), [term()]}]). +-spec(build_acyclic_graph/4 :: (graph_vertex_fun(), graph_edge_fun(), + graph_error_fun(), [{atom(), [term()]}]) -> + digraph()). -spec(now_ms/0 :: () -> non_neg_integer()). -endif. @@ -725,3 +736,45 @@ get_flag(_, []) -> now_ms() -> timer:now_diff(now(), {0,0,0}) div 1000. + +module_attributes(Module) -> + case catch Module:module_info(attributes) of + {'EXIT', {undef, [{Module, module_info, _} | _]}} -> + io:format("WARNING: module ~p not found, so not scanned for boot steps.~n", + [Module]), + []; + {'EXIT', Reason} -> + exit(Reason); + V -> + V + end. + +all_module_attributes(Name) -> + Modules = + lists:usort( + lists:append( + [Modules || {App, _, _} <- application:loaded_applications(), + {ok, Modules} <- [application:get_key(App, modules)]])), + lists:foldl( + fun (Module, Acc) -> + case lists:append([Atts || {N, Atts} <- module_attributes(Module), + N =:= Name]) of + [] -> Acc; + Atts -> [{Module, Atts} | Acc] + end + end, [], Modules). + + +build_acyclic_graph(VertexFun, EdgeFun, ErrorFun, Graph) -> + G = digraph:new([acyclic]), + [ case digraph:vertex(G, Vertex) of + false -> digraph:add_vertex(G, Vertex, Label); + _ -> ErrorFun({vertex, duplicate, Vertex}) + end || {Module, Atts} <- Graph, + {Vertex, Label} <- VertexFun(Module, Atts) ], + [ case digraph:add_edge(G, From, To) of + {error, E} -> ErrorFun({edge, E, From, To}); + _ -> ok + end || {Module, Atts} <- Graph, + {From, To} <- EdgeFun(Module, Atts) ], + G. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 8de2f0d664..9d17226960 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -44,9 +44,6 @@ -include("rabbit.hrl"). --define(SCHEMA_VERSION_SET, []). --define(SCHEMA_VERSION_FILENAME, "schema_version"). - %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -94,9 +91,6 @@ init() -> ok = ensure_mnesia_running(), ok = ensure_mnesia_dir(), ok = init_db(read_cluster_nodes_config(), true), - ok = rabbit_misc:write_term_file(filename:join( - dir(), ?SCHEMA_VERSION_FILENAME), - [?SCHEMA_VERSION_SET]), ok. is_db_empty() -> @@ -256,12 +250,12 @@ ensure_mnesia_dir() -> ensure_mnesia_running() -> case mnesia:system_info(is_running) of yes -> ok; - no -> throw({error, mnesia_not_running}) + no -> throw({error, mnesia_not_running}) end. ensure_mnesia_not_running() -> case mnesia:system_info(is_running) of - no -> ok; + no -> ok; yes -> throw({error, mnesia_unexpectedly_running}) end. @@ -378,28 +372,31 @@ init_db(ClusterNodes, Force) -> end; _ -> ok end, - case Nodes of - [] -> - case mnesia:system_info(use_dir) of - true -> - case check_schema_integrity() of - ok -> - ok; - {error, Reason} -> - %% NB: we cannot use rabbit_log here since - %% it may not have been started yet - error_logger:warning_msg( - "schema integrity check failed: ~p~n" - "moving database to backup location " - "and recreating schema from scratch~n", - [Reason]), - ok = move_db(), - ok = create_schema() - end; - false -> - ok = create_schema() + case {Nodes, mnesia:system_info(use_dir), + mnesia:system_info(db_nodes)} of + {[], true, [_]} -> + %% True single disc node, attempt upgrade + wait_for_tables(), + case rabbit_upgrade:maybe_upgrade() of + ok -> + schema_ok_or_exit(); + version_not_available -> + schema_ok_or_move() end; - [_|_] -> + {[], true, _} -> + %% "Master" (i.e. without config) disc node in cluster, + %% verify schema + wait_for_tables(), + version_ok_or_exit(rabbit_upgrade:read_version()), + schema_ok_or_exit(); + {[], false, _} -> + %% First RAM node in cluster, start from scratch + ok = create_schema(); + {[AnotherNode|_], _, _} -> + %% Subsequent node in cluster, catch up + version_ok_or_exit(rabbit_upgrade:read_version()), + version_ok_or_exit( + rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), IsDiskNode = ClusterNodes == [] orelse lists:member(node(), ClusterNodes), ok = wait_for_replicated_tables(), @@ -408,7 +405,7 @@ init_db(ClusterNodes, Force) -> true -> disc; false -> ram end), - ok = ensure_schema_integrity() + schema_ok_or_exit() end; {error, Reason} -> %% one reason we may end up here is if we try to join @@ -418,6 +415,39 @@ init_db(ClusterNodes, Force) -> ClusterNodes, Reason}}) end. +schema_ok_or_move() -> + case check_schema_integrity() of + ok -> + ok; + {error, Reason} -> + %% NB: we cannot use rabbit_log here since it may not have been + %% started yet + error_logger:warning_msg("schema integrity check failed: ~p~n" + "moving database to backup location " + "and recreating schema from scratch~n", + [Reason]), + ok = move_db(), + ok = create_schema() + end. + +version_ok_or_exit({ok, DiscVersion}) -> + case rabbit_upgrade:desired_version() of + DiscVersion -> + ok; + DesiredVersion -> + exit({schema_mismatch, DesiredVersion, DiscVersion}) + end; +version_ok_or_exit({error, _}) -> + ok = rabbit_upgrade:write_version(). + +schema_ok_or_exit() -> + case check_schema_integrity() of + ok -> + ok; + {error, Reason} -> + exit({schema_invalid, Reason}) + end. + create_schema() -> mnesia:stop(), rabbit_misc:ensure_ok(mnesia:create_schema([node()]), @@ -426,7 +456,8 @@ create_schema() -> cannot_start_mnesia), ok = create_tables(), ok = ensure_schema_integrity(), - ok = wait_for_tables(). + ok = wait_for_tables(), + ok = rabbit_upgrade:write_version(). move_db() -> mnesia:stop(), diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index 53d0d5cbf3..0940dce2f4 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -34,7 +34,7 @@ -export([async_recv/3, close/1, controlling_process/2, getstat/2, peername/1, peercert/1, port_command/2, - send/2, sockname/1]). + send/2, sockname/1, is_ssl/1]). %%--------------------------------------------------------------------------- @@ -65,6 +65,7 @@ -spec(sockname/1 :: (socket()) -> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})). +-spec(is_ssl/1 :: (socket()) -> boolean()). -spec(getstat/2 :: (socket(), [stat_option()]) -> ok_val_or_error([{stat_option(), integer()}])). @@ -133,3 +134,6 @@ sockname(Sock) when ?IS_SSL(Sock) -> ssl:sockname(Sock#ssl_socket.ssl); sockname(Sock) when is_port(Sock) -> inet:sockname(Sock). + +is_ssl(Sock) -> + ?IS_SSL(Sock). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 7f7bd9d847..12730ccfeb 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -65,7 +65,7 @@ -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels]). --define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, +-define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl, peer_cert_subject, peer_cert_issuer, peer_cert_validity, protocol, user, vhost, timeout, frame_max, @@ -162,11 +162,7 @@ -ifdef(use_specs). --type(start_heartbeat_fun() :: - fun ((rabbit_net:socket(), non_neg_integer()) -> - rabbit_heartbeat:heartbeaters())). - --spec(start_link/3 :: (pid(), pid(), start_heartbeat_fun()) -> +-spec(start_link/3 :: (pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) -> rabbit_types:ok(pid())). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (pid()) -> rabbit_types:infos()). @@ -177,9 +173,10 @@ -spec(server_properties/0 :: () -> rabbit_framing:amqp_table()). %% These specs only exists to add no_return() to keep dialyzer happy --spec(init/4 :: (pid(), pid(), pid(), start_heartbeat_fun()) -> no_return()). +-spec(init/4 :: (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun()) + -> no_return()). -spec(start_connection/7 :: - (pid(), pid(), pid(), start_heartbeat_fun(), any(), + (pid(), pid(), pid(), rabbit_heartbeat:start_heartbeat_fun(), any(), rabbit_net:socket(), fun ((rabbit_net:socket()) -> rabbit_types:ok_or_error2( @@ -771,7 +768,19 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, not_allowed, "frame_max=~w > ~w max size", [FrameMax, ?FRAME_MAX]); true -> - Heartbeater = SHF(Sock, ClientHeartbeat), + SendFun = + fun() -> + Frame = rabbit_binary_generator:build_heartbeat_frame(), + catch rabbit_net:send(Sock, Frame) + end, + + Parent = self(), + ReceiveFun = + fun() -> + Parent ! timeout + end, + Heartbeater = SHF(Sock, ClientHeartbeat, SendFun, + ClientHeartbeat, ReceiveFun), State#v1{connection_state = opening, connection = Connection#connection{ timeout_sec = ClientHeartbeat, @@ -839,6 +848,8 @@ i(peer_address, #v1{sock = Sock}) -> socket_info(fun rabbit_net:peername/1, fun ({A, _}) -> A end, Sock); i(peer_port, #v1{sock = Sock}) -> socket_info(fun rabbit_net:peername/1, fun ({_, P}) -> P end, Sock); +i(ssl, #v1{sock = Sock}) -> + rabbit_net:is_ssl(Sock); i(peer_cert_issuer, #v1{sock = Sock}) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, Sock); i(peer_cert_subject, #v1{sock = Sock}) -> @@ -889,7 +900,7 @@ cert_info(F, Sock) -> case rabbit_net:peercert(Sock) of nossl -> ''; {error, no_peercert} -> ''; - {ok, Cert} -> F(Cert) + {ok, Cert} -> list_to_binary(F(Cert)) end. %%-------------------------------------------------------------------------- diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl new file mode 100644 index 0000000000..0071a08ad7 --- /dev/null +++ b/src/rabbit_upgrade.erl @@ -0,0 +1,156 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are Rabbit Technologies Ltd. +%% +%% Copyright (C) 2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_upgrade). + +-export([maybe_upgrade/0, read_version/0, write_version/0, desired_version/0]). + +-include("rabbit.hrl"). + +-define(VERSION_FILENAME, "schema_version"). +-define(LOCK_FILENAME, "schema_upgrade_lock"). + +%% ------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(maybe_upgrade/0 :: () -> 'ok' | 'version_not_available'). +-spec(read_version/0 :: + () -> {'ok', [any()]} | rabbit_types:error(any())). +-spec(write_version/0 :: () -> 'ok'). +-spec(desired_version/0 :: () -> [atom()]). + +-endif. + +%% ------------------------------------------------------------------- + +%% Try to upgrade the schema. If no information on the existing schema +%% could be found, do nothing. rabbit_mnesia:check_schema_integrity() +%% will catch the problem. +maybe_upgrade() -> + case read_version() of + {ok, CurrentHeads} -> + G = load_graph(), + case unknown_heads(CurrentHeads, G) of + [] -> + case upgrades_to_apply(CurrentHeads, G) of + [] -> ok; + Upgrades -> apply_upgrades(Upgrades) + end; + Unknown -> + exit({future_upgrades_found, Unknown}) + end, + true = digraph:delete(G), + ok; + {error, enoent} -> + version_not_available + end. + +read_version() -> + case rabbit_misc:read_term_file(schema_filename()) of + {ok, [Heads]} -> {ok, Heads}; + {error, E} -> {error, E} + end. + +write_version() -> + ok = rabbit_misc:write_term_file(schema_filename(), [desired_version()]), + ok. + +desired_version() -> + G = load_graph(), + Version = heads(G), + true = digraph:delete(G), + Version. + +%% ------------------------------------------------------------------- + +load_graph() -> + Upgrades = rabbit_misc:all_module_attributes(rabbit_upgrade), + rabbit_misc:build_acyclic_graph( + fun vertices/2, fun edges/2, fun graph_build_error/1, Upgrades). + +vertices(Module, Steps) -> + [{StepName, {Module, StepName}} || {StepName, _Reqs} <- Steps]. + +edges(_Module, Steps) -> + [{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires]. + +graph_build_error({vertex, duplicate, StepName}) -> + exit({duplicate_upgrade, StepName}); +graph_build_error({edge, E, From, To}) -> + exit({E, From, To}). + +unknown_heads(Heads, G) -> + [H || H <- Heads, digraph:vertex(G, H) =:= false]. + +upgrades_to_apply(Heads, G) -> + %% Take all the vertices which can reach the known heads. That's + %% everything we've already applied. Subtract that from all + %% vertices: that's what we have to apply. + Unsorted = sets:to_list( + sets:subtract( + sets:from_list(digraph:vertices(G)), + sets:from_list(digraph_utils:reaching(Heads, G)))), + %% Form a subgraph from that list and find a topological ordering + %% so we can invoke them in order. + [element(2, digraph:vertex(G, StepName)) + || StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))]. + +heads(G) -> + lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]). + +%% ------------------------------------------------------------------- + +apply_upgrades(Upgrades) -> + LockFile = lock_filename(), + case file:open(LockFile, [write, exclusive]) of + {ok, Lock} -> + ok = file:close(Lock), + info("Upgrades: ~w to apply~n", [length(Upgrades)]), + [apply_upgrade(Upgrade) || Upgrade <- Upgrades], + info("Upgrades: All applied~n", []), + ok = write_version(), + ok = file:delete(LockFile); + {error, eexist} -> + exit(previous_upgrade_failed); + {error, _} = Error -> + exit(Error) + end. + +apply_upgrade({M, F}) -> + info("Upgrades: Applying ~w:~w~n", [M, F]), + ok = apply(M, F, []). + +%% ------------------------------------------------------------------- + +schema_filename() -> + filename:join(dir(), ?VERSION_FILENAME). + +lock_filename() -> + filename:join(dir(), ?LOCK_FILENAME). + +%% NB: we cannot use rabbit_log here since it may not have been +%% started yet +info(Msg, Args) -> + error_logger:info_msg(Msg, Args). + +dir() -> + rabbit_mnesia:dir(). diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl new file mode 100644 index 0000000000..59b8705df5 --- /dev/null +++ b/src/rabbit_upgrade_functions.erl @@ -0,0 +1,51 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are Rabbit Technologies Ltd. +%% +%% Copyright (C) 2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% +-module(rabbit_upgrade_functions). + +-include("rabbit.hrl"). + +-compile([export_all]). + +-rabbit_upgrade({remove_user_scope, []}). + +%% ------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(remove_user_scope/0 :: () -> 'ok'). + +-endif. + +%%-------------------------------------------------------------------- + +remove_user_scope() -> + {atomic, ok} = mnesia:transform_table( + rabbit_user_permission, + fun (Perm = #user_permission{ + permission = {permission, + _Scope, Conf, Write, Read}}) -> + Perm#user_permission{ + permission = #permission{configure = Conf, + write = Write, + read = Read}} + end, + record_info(fields, user_permission)), + ok. |
