diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/gm_speed_test.erl | 82 | ||||
| -rw-r--r-- | src/rabbit.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 38 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_error_logger.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 30 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 127 | ||||
| -rw-r--r-- | src/rabbit_prelaunch.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_upgrade.erl | 331 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_version.erl | 172 |
14 files changed, 628 insertions, 183 deletions
diff --git a/src/gm_speed_test.erl b/src/gm_speed_test.erl new file mode 100644 index 0000000000..defb0f29b8 --- /dev/null +++ b/src/gm_speed_test.erl @@ -0,0 +1,82 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% + +-module(gm_speed_test). + +-export([test/3]). +-export([joined/2, members_changed/3, handle_msg/3, terminate/2]). +-export([wile_e_coyote/2]). + +-behaviour(gm). + +-include("gm_specs.hrl"). + +%% callbacks + +joined(Owner, _Members) -> + Owner ! joined, + ok. + +members_changed(_Owner, _Births, _Deaths) -> + ok. + +handle_msg(Owner, _From, ping) -> + Owner ! ping, + ok. + +terminate(Owner, _Reason) -> + Owner ! terminated, + ok. + +%% other + +wile_e_coyote(Time, WriteUnit) -> + {ok, Pid} = gm:start_link(?MODULE, ?MODULE, self()), + receive joined -> ok end, + timer:sleep(1000), %% wait for all to join + timer:send_after(Time, stop), + Start = now(), + {Sent, Received} = loop(Pid, WriteUnit, 0, 0), + End = now(), + ok = gm:leave(Pid), + receive terminated -> ok end, + Elapsed = timer:now_diff(End, Start) / 1000000, + io:format("Sending rate: ~p msgs/sec~nReceiving rate: ~p msgs/sec~n~n", + [Sent/Elapsed, Received/Elapsed]), + ok. + +loop(Pid, WriteUnit, Sent, Received) -> + case read(Received) of + {stop, Received1} -> {Sent, Received1}; + {ok, Received1} -> ok = write(Pid, WriteUnit), + loop(Pid, WriteUnit, Sent + WriteUnit, Received1) + end. + +read(Count) -> + receive + ping -> read(Count + 1); + stop -> {stop, Count} + after 5 -> + {ok, Count} + end. + +write(_Pid, 0) -> ok; +write(Pid, N) -> ok = gm:broadcast(Pid, ping), + write(Pid, N - 1). + +test(Time, WriteUnit, Nodes) -> + ok = gm:create_tables(), + [spawn(Node, ?MODULE, wile_e_coyote, [Time, WriteUnit]) || Node <- Nodes]. diff --git a/src/rabbit.erl b/src/rabbit.erl index 0548d6bf09..82202cbd41 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -191,7 +191,8 @@ %%---------------------------------------------------------------------------- prepare() -> - ok = ensure_working_log_handlers(). + ok = ensure_working_log_handlers(), + ok = rabbit_upgrade:maybe_upgrade_mnesia(). start() -> try @@ -232,6 +233,7 @@ rotate_logs(BinarySuffix) -> start(normal, []) -> case erts_version_check() of ok -> + ok = rabbit_mnesia:delete_previously_running_nodes(), {ok, SupPid} = rabbit_sup:start_link(), true = register(rabbit, self()), @@ -244,6 +246,7 @@ start(normal, []) -> end. stop(_State) -> + ok = rabbit_mnesia:record_running_nodes(), terminated_ok = error_logger:delete_report_handler(rabbit_error_logger), ok = rabbit_alarm:stop(), ok = case rabbit_mnesia:is_clustered() of diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index ce6143dd3d..7087be9137 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -38,13 +38,12 @@ behaviour_info(callbacks) -> %% 1. the amqqueue record %% 2. a boolean indicating whether the queue is an existing queue %% that should be recovered - %% 3. an asynchronous callback which accepts a function from - %% state to state and invokes it with the current backing - %% queue state. This is useful for handling events, e.g. when - %% the backing queue does not have its own process to receive - %% such events, or when the processing of an event results in - %% a state transition the queue logic needs to know about - %% (such as messages getting confirmed). + %% 3. an asynchronous callback which accepts a function of type + %% backing-queue-state to backing-queue-state. This callback + %% function can be safely invoked from any process, which + %% makes it useful for passing messages back into the backing + %% queue, especially as the backing queue does not have + %% control of its own mailbox. %% 4. a synchronous callback. Same as the asynchronous callback %% but waits for completion and returns 'error' on error. {init, 4}, @@ -70,6 +69,31 @@ behaviour_info(callbacks) -> %% Return ids of messages which have been confirmed since %% the last invocation of this function (or initialisation). + %% + %% Message ids should only appear in the result of + %% drain_confirmed under the following circumstances: + %% + %% 1. The message appears in a call to publish_delivered/4 and + %% the first argument (ack_required) is false; or + %% 2. The message is fetched from the queue with fetch/2 and the + %% first argument (ack_required) is false; or + %% 3. The message is acked (ack/2 is called for the message); or + %% 4. The message is fully fsync'd to disk in such a way that the + %% recovery of the message is guaranteed in the event of a + %% crash of this rabbit node (excluding hardware failure). + %% + %% In addition to the above conditions, a message id may only + %% appear in the result of drain_confirmed if + %% #message_properties.needs_confirming = true when the msg was + %% published (through whichever means) to the backing queue. + %% + %% It is legal for the same message id to appear in the results + %% of multiple calls to drain_confirmed, which means that the + %% backing queue is not required to keep track of which messages + %% it has already confirmed. The confirm will be issued to the + %% publisher the first time the message id appears in the result + %% of drain_confirmed. All subsequent appearances of that message + %% id will be ignored. {drain_confirmed, 1}, %% Drop messages from the head of the queue while the supplied diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 0c12614cc6..5099bf3fbe 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -156,7 +156,6 @@ ready_for_close(Pid) -> init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, Capabilities, CollectorPid, StartLimiterFun]) -> - process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), StatsTimer = rabbit_event:init_stats_timer(), State = #ch{state = starting, diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 0120f0d6d4..3fb0817a0c 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -67,8 +67,12 @@ publish(_Other, _Format, _Data, _State) -> ok. publish1(RoutingKey, Format, Data, LogExch) -> + %% 0-9-1 says the timestamp is a "64 bit POSIX timestamp". That's + %% second resolution, not millisecond. + Timestamp = rabbit_misc:now_ms() div 1000, {ok, _RoutingRes, _DeliveredQPids} = rabbit_basic:publish(LogExch, RoutingKey, false, false, none, - #'P_basic'{content_type = <<"text/plain">>}, + #'P_basic'{content_type = <<"text/plain">>, + timestamp = Timestamp}, list_to_binary(io_lib:format(Format, Data))), ok. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index a463e57067..9d9b07aff4 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -18,12 +18,13 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([recover/0, declare/6, lookup/1, lookup_or_die/1, list/1, info_keys/0, - info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]). --export([callback/3]). +-export([recover/0, callback/3, declare/6, + assert_equivalence/6, assert_args_equivalence/2, check_type/1, + lookup/1, lookup_or_die/1, list/1, + info_keys/0, info/1, info/2, info_all/1, info_all/2, + publish/2, delete/2]). %% this must be run inside a mnesia tx -export([maybe_auto_delete/1]). --export([assert_equivalence/6, assert_args_equivalence/2, check_type/1]). %%---------------------------------------------------------------------------- @@ -33,8 +34,10 @@ -type(name() :: rabbit_types:r('exchange')). -type(type() :: atom()). +-type(fun_name() :: atom()). -spec(recover/0 :: () -> 'ok'). +-spec(callback/3:: (rabbit_types:exchange(), fun_name(), [any()]) -> 'ok'). -spec(declare/6 :: (name(), type(), boolean(), boolean(), boolean(), rabbit_framing:amqp_table()) @@ -72,7 +75,6 @@ -spec(maybe_auto_delete/1:: (rabbit_types:exchange()) -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}). --spec(callback/3:: (rabbit_types:exchange(), atom(), [any()]) -> 'ok'). -endif. @@ -101,6 +103,9 @@ recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) -> recover_with_bindings([], [], []) -> ok. +callback(#exchange{type = XType}, Fun, Args) -> + apply(type_to_module(XType), Fun, Args). + declare(XName, Type, Durable, AutoDelete, Internal, Args) -> X = #exchange{name = XName, type = Type, @@ -126,7 +131,7 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> end end, fun ({new, Exchange}, Tx) -> - callback(Exchange, create, [Tx, Exchange]), + ok = (type_to_module(Type)):create(Tx, Exchange), rabbit_event:notify_if(not Tx, exchange_created, info(Exchange)), Exchange; ({existing, Exchange}, _Tx) -> @@ -135,11 +140,6 @@ declare(XName, Type, Durable, AutoDelete, Internal, Args) -> Err end). -%% Used with atoms from records; e.g., the type is expected to exist. -type_to_module(T) -> - {ok, Module} = rabbit_registry:lookup_module(exchange, T), - Module. - %% Used with binaries sent over the wire; the type may not exist. check_type(TypeBin) -> case rabbit_registry:binary_to_type(TypeBin) of @@ -294,9 +294,6 @@ maybe_auto_delete(#exchange{auto_delete = true} = X) -> {deleted, X, [], Deletions} -> {deleted, Deletions} end. -callback(#exchange{type = XType}, Fun, Args) -> - apply(type_to_module(XType), Fun, Args). - conditional_delete(X = #exchange{name = XName}) -> case rabbit_binding:has_for_source(XName) of false -> unconditional_delete(X); @@ -308,3 +305,8 @@ unconditional_delete(X = #exchange{name = XName}) -> ok = mnesia:delete({rabbit_exchange, XName}), Bindings = rabbit_binding:remove_for_source(XName), {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}. + +%% Used with atoms from records; e.g., the type is expected to exist. +type_to_module(T) -> + {ok, Module} = rabbit_registry:lookup_module(exchange, T), + Module. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index e79a58a1b6..2e9563cf3c 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -871,4 +871,3 @@ is_process_alive(Pid) -> true -> true; _ -> false end. - diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 884db799f7..f2f31ef374 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -18,9 +18,12 @@ -module(rabbit_mnesia). -export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0, - cluster/1, force_cluster/1, reset/0, force_reset/0, + cluster/1, force_cluster/1, reset/0, force_reset/0, init_db/3, is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0, - empty_ram_only_tables/0, copy_db/1, wait_for_tables/1]). + empty_ram_only_tables/0, copy_db/1, wait_for_tables/1, + create_cluster_nodes_config/1, read_cluster_nodes_config/0, + record_running_nodes/0, read_previously_running_nodes/0, + delete_previously_running_nodes/0, running_nodes_filename/0]). -export([table_names/0]). @@ -42,6 +45,7 @@ -spec(dir/0 :: () -> file:filename()). -spec(ensure_mnesia_dir/0 :: () -> 'ok'). -spec(init/0 :: () -> 'ok'). +-spec(init_db/3 :: ([node()], boolean(), rabbit_misc:thunk('ok')) -> 'ok'). -spec(is_db_empty/0 :: () -> boolean()). -spec(cluster/1 :: ([node()]) -> 'ok'). -spec(force_cluster/1 :: ([node()]) -> 'ok'). @@ -55,6 +59,12 @@ -spec(create_tables/0 :: () -> 'ok'). -spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())). -spec(wait_for_tables/1 :: ([atom()]) -> 'ok'). +-spec(create_cluster_nodes_config/1 :: ([node()]) -> 'ok'). +-spec(read_cluster_nodes_config/0 :: () -> [node()]). +-spec(record_running_nodes/0 :: () -> 'ok'). +-spec(read_previously_running_nodes/0 :: () -> [node()]). +-spec(delete_previously_running_nodes/0 :: () -> 'ok'). +-spec(running_nodes_filename/0 :: () -> file:filename()). -endif. @@ -78,9 +88,10 @@ status() -> {running_nodes, running_clustered_nodes()}]. init() -> - ok = ensure_mnesia_running(), - ok = ensure_mnesia_dir(), - ok = init_db(read_cluster_nodes_config(), true), + ensure_mnesia_running(), + ensure_mnesia_dir(), + ok = init_db(read_cluster_nodes_config(), true, + fun maybe_upgrade_local_or_record_desired/0), ok. is_db_empty() -> @@ -98,11 +109,12 @@ force_cluster(ClusterNodes) -> %% node. If Force is false, only connections to online nodes are %% allowed. cluster(ClusterNodes, Force) -> - ok = ensure_mnesia_not_running(), - ok = ensure_mnesia_dir(), + ensure_mnesia_not_running(), + ensure_mnesia_dir(), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), try - ok = init_db(ClusterNodes, Force), + ok = init_db(ClusterNodes, Force, + fun maybe_upgrade_local_or_record_desired/0), ok = create_cluster_nodes_config(ClusterNodes) after mnesia:stop() @@ -368,11 +380,40 @@ delete_cluster_nodes_config() -> FileName, Reason}}) end. +running_nodes_filename() -> + filename:join(dir(), "nodes_running_at_shutdown"). + +record_running_nodes() -> + FileName = running_nodes_filename(), + Nodes = running_clustered_nodes() -- [node()], + %% Don't check the result: we're shutting down anyway and this is + %% a best-effort-basis. + rabbit_misc:write_term_file(FileName, [Nodes]), + ok. + +read_previously_running_nodes() -> + FileName = running_nodes_filename(), + case rabbit_misc:read_term_file(FileName) of + {ok, [Nodes]} -> Nodes; + {error, enoent} -> []; + {error, Reason} -> throw({error, {cannot_read_previous_nodes_file, + FileName, Reason}}) + end. + +delete_previously_running_nodes() -> + FileName = running_nodes_filename(), + case file:delete(FileName) of + ok -> ok; + {error, enoent} -> ok; + {error, Reason} -> throw({error, {cannot_delete_previous_nodes_file, + FileName, Reason}}) + end. + %% Take a cluster node config and create the right kind of node - a %% standalone disk node, or disk or ram node connected to the %% specified cluster nodes. If Force is false, don't allow %% connections to offline nodes. -init_db(ClusterNodes, Force) -> +init_db(ClusterNodes, Force, SecondaryPostMnesiaFun) -> UClusterNodes = lists:usort(ClusterNodes), ProperClusterNodes = UClusterNodes -- [node()], case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of @@ -388,26 +429,21 @@ init_db(ClusterNodes, Force) -> end; true -> ok end, - case {Nodes, mnesia:system_info(use_dir), all_clustered_nodes()} of - {[], true, [_]} -> - %% True single disc node, attempt upgrade - case rabbit_upgrade:maybe_upgrade() of - ok -> ensure_schema_integrity(); - version_not_available -> schema_ok_or_move() - end; - {[], true, _} -> - %% "Master" (i.e. without config) disc node in cluster, - %% verify schema - ensure_version_ok(rabbit_upgrade:read_version()), - ensure_schema_integrity(); - {[], false, _} -> + case {Nodes, mnesia:system_info(use_dir)} of + {[], false} -> %% Nothing there at all, start from scratch ok = create_schema(); - {[AnotherNode|_], _, _} -> + {[], true} -> + %% We're the first node up + case rabbit_upgrade:maybe_upgrade_local() of + ok -> ensure_schema_integrity(); + version_not_available -> ok = schema_ok_or_move() + end, + ok; + {[AnotherNode|_], _} -> %% Subsequent node in cluster, catch up - ensure_version_ok(rabbit_upgrade:read_version()), ensure_version_ok( - rpc:call(AnotherNode, rabbit_upgrade, read_version, [])), + rpc:call(AnotherNode, rabbit_version, recorded, [])), IsDiskNode = ClusterNodes == [] orelse lists:member(node(), ClusterNodes), ok = wait_for_replicated_tables(), @@ -416,7 +452,9 @@ init_db(ClusterNodes, Force) -> true -> disc; false -> ram end), - ensure_schema_integrity() + ok = SecondaryPostMnesiaFun(), + ensure_schema_integrity(), + ok end; {error, Reason} -> %% one reason we may end up here is if we try to join @@ -425,6 +463,14 @@ init_db(ClusterNodes, Force) -> throw({error, {unable_to_join_cluster, ClusterNodes, Reason}}) end. +maybe_upgrade_local_or_record_desired() -> + case rabbit_upgrade:maybe_upgrade_local() of + ok -> ok; + %% If we're just starting up a new node we won't have a + %% version + version_not_available -> ok = rabbit_version:record_desired() + end. + schema_ok_or_move() -> case check_schema_integrity() of ok -> @@ -441,13 +487,13 @@ schema_ok_or_move() -> end. ensure_version_ok({ok, DiscVersion}) -> - case rabbit_upgrade:desired_version() of - DiscVersion -> ok; - DesiredVersion -> throw({error, {schema_mismatch, - DesiredVersion, DiscVersion}}) + DesiredVersion = rabbit_version:desired(), + case rabbit_version:matches(DesiredVersion, DiscVersion) of + true -> ok; + false -> throw({error, {version_mismatch, DesiredVersion, DiscVersion}}) end; ensure_version_ok({error, _}) -> - ok = rabbit_upgrade:write_version(). + ok = rabbit_version:record_desired(). create_schema() -> mnesia:stop(), @@ -456,8 +502,8 @@ create_schema() -> rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), ok = create_tables(), - ok = ensure_schema_integrity(), - ok = rabbit_upgrade:write_version(). + ensure_schema_integrity(), + ok = rabbit_version:record_desired(). move_db() -> mnesia:stop(), @@ -477,18 +523,13 @@ move_db() -> {error, Reason} -> throw({error, {cannot_backup_mnesia, MnesiaDir, BackupDir, Reason}}) end, - ok = ensure_mnesia_dir(), + ensure_mnesia_dir(), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), ok. copy_db(Destination) -> - mnesia:stop(), - case rabbit_misc:recursive_copy(dir(), Destination) of - ok -> - rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia); - {error, E} -> - {error, E} - end. + ok = ensure_mnesia_not_running(), + rabbit_misc:recursive_copy(dir(), Destination). create_tables() -> lists:foreach(fun ({Tab, TabDef}) -> @@ -562,12 +603,12 @@ wait_for_tables(TableNames) -> end. reset(Force) -> - ok = ensure_mnesia_not_running(), + ensure_mnesia_not_running(), Node = node(), case Force of true -> ok; false -> - ok = ensure_mnesia_dir(), + ensure_mnesia_dir(), rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), {Nodes, RunningNodes} = try diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index 7bb8c0ea3b..8800e8d685 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -235,7 +235,7 @@ post_process_script(ScriptFile) -> {error, {failed_to_load_script, Reason}} end. -process_entry(Entry = {apply,{application,start_boot,[rabbit,permanent]}}) -> +process_entry(Entry = {apply,{application,start_boot,[mnesia,permanent]}}) -> [{apply,{rabbit,prepare,[]}}, Entry]; process_entry(Entry) -> [Entry]. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 33c5391b20..367953b897 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -169,7 +169,7 @@ %%---------------------------------------------------------------------------- --rabbit_upgrade({add_queue_ttl, []}). +-rabbit_upgrade({add_queue_ttl, local, []}). -ifdef(use_specs). diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index ebda5d03a7..a2abb1e58b 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -16,7 +16,7 @@ -module(rabbit_upgrade). --export([maybe_upgrade/0, read_version/0, write_version/0, desired_version/0]). +-export([maybe_upgrade_mnesia/0, maybe_upgrade_local/0]). -include("rabbit.hrl"). @@ -27,141 +27,260 @@ -ifdef(use_specs). --type(step() :: atom()). --type(version() :: [step()]). - --spec(maybe_upgrade/0 :: () -> 'ok' | 'version_not_available'). --spec(read_version/0 :: () -> rabbit_types:ok_or_error2(version(), any())). --spec(write_version/0 :: () -> 'ok'). --spec(desired_version/0 :: () -> version()). +-spec(maybe_upgrade_mnesia/0 :: () -> 'ok'). +-spec(maybe_upgrade_local/0 :: () -> 'ok' | 'version_not_available'). -endif. %% ------------------------------------------------------------------- -%% Try to upgrade the schema. If no information on the existing schema -%% could be found, do nothing. rabbit_mnesia:check_schema_integrity() -%% will catch the problem. -maybe_upgrade() -> - case read_version() of - {ok, CurrentHeads} -> - with_upgrade_graph( - fun (G) -> - case unknown_heads(CurrentHeads, G) of - [] -> case upgrades_to_apply(CurrentHeads, G) of - [] -> ok; - Upgrades -> apply_upgrades(Upgrades) - end; - Unknown -> throw({error, - {future_upgrades_found, Unknown}}) - end - end); - {error, enoent} -> - version_not_available +%% The upgrade logic is quite involved, due to the existence of +%% clusters. +%% +%% Firstly, we have two different types of upgrades to do: Mnesia and +%% everythinq else. Mnesia upgrades must only be done by one node in +%% the cluster (we treat a non-clustered node as a single-node +%% cluster). This is the primary upgrader. The other upgrades need to +%% be done by all nodes. +%% +%% The primary upgrader has to start first (and do its Mnesia +%% upgrades). Secondary upgraders need to reset their Mnesia database +%% and then rejoin the cluster. They can't do the Mnesia upgrades as +%% well and then merge databases since the cookie for each table will +%% end up different and the merge will fail. +%% +%% This in turn means that we need to determine whether we are the +%% primary or secondary upgrader *before* Mnesia comes up. If we +%% didn't then the secondary upgrader would try to start Mnesia, and +%% either hang waiting for a node which is not yet up, or fail since +%% its schema differs from the other nodes in the cluster. +%% +%% Also, the primary upgrader needs to start Mnesia to do its +%% upgrades, but needs to forcibly load tables rather than wait for +%% them (in case it was not the last node to shut down, in which case +%% it would wait forever). +%% +%% This in turn means that maybe_upgrade_mnesia/0 has to be patched +%% into the boot process by prelaunch before the mnesia application is +%% started. By the time Mnesia is started the upgrades have happened +%% (on the primary), or Mnesia has been reset (on the secondary) and +%% rabbit_mnesia:init_db/3 can then make the node rejoin the cluster +%% in the normal way. +%% +%% The non-mnesia upgrades are then triggered by +%% rabbit_mnesia:init_db/3. Of course, it's possible for a given +%% upgrade process to only require Mnesia upgrades, or only require +%% non-Mnesia upgrades. In the latter case no Mnesia resets and +%% reclusterings occur. +%% +%% The primary upgrader needs to be a disc node. Ideally we would like +%% it to be the last disc node to shut down (since otherwise there's a +%% risk of data loss). On each node we therefore record the disc nodes +%% that were still running when we shut down. A disc node that knows +%% other nodes were up when it shut down, or a ram node, will refuse +%% to be the primary upgrader, and will thus not start when upgrades +%% are needed. +%% +%% However, this is racy if several nodes are shut down at once. Since +%% rabbit records the running nodes, and shuts down before mnesia, the +%% race manifests as all disc nodes thinking they are not the primary +%% upgrader. Therefore the user can remove the record of the last disc +%% node to shut down to get things going again. This may lose any +%% mnesia changes that happened after the node chosen as the primary +%% upgrader was shut down. + +%% ------------------------------------------------------------------- + +ensure_backup_taken() -> + case filelib:is_file(lock_filename()) of + false -> case filelib:is_dir(backup_dir()) of + false -> ok = take_backup(); + _ -> ok + end; + true -> throw({error, previous_upgrade_failed}) end. -read_version() -> - case rabbit_misc:read_term_file(schema_filename()) of - {ok, [Heads]} -> {ok, Heads}; - {error, _} = Err -> Err +take_backup() -> + BackupDir = backup_dir(), + case rabbit_mnesia:copy_db(BackupDir) of + ok -> info("upgrades: Mnesia dir backed up to ~p~n", + [BackupDir]); + {error, E} -> throw({could_not_back_up_mnesia_dir, E}) end. -write_version() -> - ok = rabbit_misc:write_term_file(schema_filename(), [desired_version()]), - ok. +ensure_backup_removed() -> + case filelib:is_dir(backup_dir()) of + true -> ok = remove_backup(); + _ -> ok + end. -desired_version() -> - with_upgrade_graph(fun (G) -> heads(G) end). +remove_backup() -> + ok = rabbit_misc:recursive_delete([backup_dir()]), + info("upgrades: Mnesia backup removed~n", []). -%% ------------------------------------------------------------------- +maybe_upgrade_mnesia() -> + AllNodes = rabbit_mnesia:all_clustered_nodes(), + case rabbit_version:upgrades_required(mnesia) of + {error, version_not_available} -> + case AllNodes of + [_] -> ok; + _ -> die("Cluster upgrade needed but upgrading from " + "< 2.1.1.~nUnfortunately you will need to " + "rebuild the cluster.", []) + end; + {error, _} = Err -> + throw(Err); + {ok, []} -> + ok; + {ok, Upgrades} -> + ensure_backup_taken(), + ok = case upgrade_mode(AllNodes) of + primary -> primary_upgrade(Upgrades, AllNodes); + secondary -> secondary_upgrade(AllNodes) + end + end. -with_upgrade_graph(Fun) -> - case rabbit_misc:build_acyclic_graph( - fun vertices/2, fun edges/2, - rabbit_misc:all_module_attributes(rabbit_upgrade)) of - {ok, G} -> try - Fun(G) - after - true = digraph:delete(G) - end; - {error, {vertex, duplicate, StepName}} -> - throw({error, {duplicate_upgrade_step, StepName}}); - {error, {edge, {bad_vertex, StepName}, _From, _To}} -> - throw({error, {dependency_on_unknown_upgrade_step, StepName}}); - {error, {edge, {bad_edge, StepNames}, _From, _To}} -> - throw({error, {cycle_in_upgrade_steps, StepNames}}) +upgrade_mode(AllNodes) -> + case nodes_running(AllNodes) of + [] -> + AfterUs = rabbit_mnesia:read_previously_running_nodes(), + case {is_disc_node(), AfterUs} of + {true, []} -> + primary; + {true, _} -> + Filename = rabbit_mnesia:running_nodes_filename(), + die("Cluster upgrade needed but other disc nodes shut " + "down after this one.~nPlease first start the last " + "disc node to shut down.~n~nNote: if several disc " + "nodes were shut down simultaneously they may " + "all~nshow this message. In which case, remove " + "the lock file on one of them and~nstart that node. " + "The lock file on this node is:~n~n ~s ", [Filename]); + {false, _} -> + die("Cluster upgrade needed but this is a ram node.~n" + "Please first start the last disc node to shut down.", + []) + end; + [Another|_] -> + MyVersion = rabbit_version:desired_for_scope(mnesia), + ErrFun = fun (ClusterVersion) -> + %% The other node(s) are running an + %% unexpected version. + die("Cluster upgrade needed but other nodes are " + "running ~p~nand I want ~p", + [ClusterVersion, MyVersion]) + end, + case rpc:call(Another, rabbit_version, desired_for_scope, + [mnesia]) of + {badrpc, {'EXIT', {undef, _}}} -> ErrFun(unknown_old_version); + {badrpc, Reason} -> ErrFun({unknown, Reason}); + CV -> case rabbit_version:matches( + MyVersion, CV) of + true -> secondary; + false -> ErrFun(CV) + end + end end. -vertices(Module, Steps) -> - [{StepName, {Module, StepName}} || {StepName, _Reqs} <- Steps]. +is_disc_node() -> + %% This is pretty ugly but we can't start Mnesia and ask it (will hang), + %% we can't look at the config file (may not include us even if we're a + %% disc node). + filelib:is_regular(filename:join(dir(), "rabbit_durable_exchange.DCD")). + +die(Msg, Args) -> + %% We don't throw or exit here since that gets thrown + %% straight out into do_boot, generating an erl_crash.dump + %% and displaying any error message in a confusing way. + error_logger:error_msg(Msg, Args), + io:format("~n~n****~n~n" ++ Msg ++ "~n~n****~n~n~n", Args), + error_logger:logfile(close), + halt(1). + +primary_upgrade(Upgrades, Nodes) -> + Others = Nodes -- [node()], + ok = apply_upgrades( + mnesia, + Upgrades, + fun () -> + force_tables(), + case Others of + [] -> ok; + _ -> info("mnesia upgrades: Breaking cluster~n", []), + [{atomic, ok} = mnesia:del_table_copy(schema, Node) + || Node <- Others] + end + end), + ok. -edges(_Module, Steps) -> - [{Require, StepName} || {StepName, Requires} <- Steps, Require <- Requires]. +force_tables() -> + [mnesia:force_load_table(T) || T <- rabbit_mnesia:table_names()]. -unknown_heads(Heads, G) -> - [H || H <- Heads, digraph:vertex(G, H) =:= false]. +secondary_upgrade(AllNodes) -> + %% must do this before we wipe out schema + IsDiscNode = is_disc_node(), + rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), + cannot_delete_schema), + %% Note that we cluster with all nodes, rather than all disc nodes + %% (as we can't know all disc nodes at this point). This is safe as + %% we're not writing the cluster config, just setting up Mnesia. + ClusterNodes = case IsDiscNode of + true -> AllNodes; + false -> AllNodes -- [node()] + end, + rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), + ok = rabbit_mnesia:init_db(ClusterNodes, true, fun () -> ok end), + ok = rabbit_version:record_desired_for_scope(mnesia), + ok. -upgrades_to_apply(Heads, G) -> - %% Take all the vertices which can reach the known heads. That's - %% everything we've already applied. Subtract that from all - %% vertices: that's what we have to apply. - Unsorted = sets:to_list( - sets:subtract( - sets:from_list(digraph:vertices(G)), - sets:from_list(digraph_utils:reaching(Heads, G)))), - %% Form a subgraph from that list and find a topological ordering - %% so we can invoke them in order. - [element(2, digraph:vertex(G, StepName)) || - StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))]. +nodes_running(Nodes) -> + [N || N <- Nodes, node_running(N)]. -heads(G) -> - lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]). +node_running(Node) -> + case rpc:call(Node, application, which_applications, []) of + {badrpc, _} -> false; + Apps -> lists:keysearch(rabbit, 1, Apps) =/= false + end. %% ------------------------------------------------------------------- -apply_upgrades(Upgrades) -> - LockFile = lock_filename(dir()), - case rabbit_misc:lock_file(LockFile) of - ok -> - BackupDir = dir() ++ "-upgrade-backup", - info("Upgrades: ~w to apply~n", [length(Upgrades)]), - case rabbit_mnesia:copy_db(BackupDir) of - ok -> - %% We need to make the backup after creating the - %% lock file so that it protects us from trying to - %% overwrite the backup. Unfortunately this means - %% the lock file exists in the backup too, which - %% is not intuitive. Remove it. - ok = file:delete(lock_filename(BackupDir)), - info("Upgrades: Mnesia dir backed up to ~p~n", [BackupDir]), - [apply_upgrade(Upgrade) || Upgrade <- Upgrades], - info("Upgrades: All upgrades applied successfully~n", []), - ok = write_version(), - ok = rabbit_misc:recursive_delete([BackupDir]), - info("Upgrades: Mnesia backup removed~n", []), - ok = file:delete(LockFile); - {error, E} -> - %% If we can't backup, the upgrade hasn't started - %% hence we don't need the lockfile since the real - %% mnesia dir is the good one. - ok = file:delete(LockFile), - throw({could_not_back_up_mnesia_dir, E}) - end; - {error, eexist} -> - throw({error, previous_upgrade_failed}) +maybe_upgrade_local() -> + case rabbit_version:upgrades_required(local) of + {error, version_not_available} -> version_not_available; + {error, _} = Err -> throw(Err); + {ok, []} -> ensure_backup_removed(), + ok; + {ok, Upgrades} -> mnesia:stop(), + ensure_backup_taken(), + ok = apply_upgrades(local, Upgrades, + fun () -> ok end), + ensure_backup_removed(), + ok end. -apply_upgrade({M, F}) -> - info("Upgrades: Applying ~w:~w~n", [M, F]), +%% ------------------------------------------------------------------- + +apply_upgrades(Scope, Upgrades, Fun) -> + ok = rabbit_misc:lock_file(lock_filename()), + info("~s upgrades: ~w to apply~n", [Scope, length(Upgrades)]), + rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), + Fun(), + [apply_upgrade(Scope, Upgrade) || Upgrade <- Upgrades], + info("~s upgrades: All upgrades applied successfully~n", [Scope]), + ok = rabbit_version:record_desired_for_scope(Scope), + ok = file:delete(lock_filename()). + +apply_upgrade(Scope, {M, F}) -> + info("~s upgrades: Applying ~w:~w~n", [Scope, M, F]), ok = apply(M, F, []). %% ------------------------------------------------------------------- dir() -> rabbit_mnesia:dir(). -schema_filename() -> filename:join(dir(), ?VERSION_FILENAME). - +lock_filename() -> lock_filename(dir()). lock_filename(Dir) -> filename:join(Dir, ?LOCK_FILENAME). +backup_dir() -> dir() ++ "-upgrade-backup". %% NB: we cannot use rabbit_log here since it may not have been %% started yet diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index b9dbe418d9..7567c29ef3 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -20,12 +20,12 @@ -compile([export_all]). --rabbit_upgrade({remove_user_scope, []}). --rabbit_upgrade({hash_passwords, []}). --rabbit_upgrade({add_ip_to_listener, []}). --rabbit_upgrade({internal_exchanges, []}). --rabbit_upgrade({user_to_internal_user, [hash_passwords]}). --rabbit_upgrade({topic_trie, []}). +-rabbit_upgrade({remove_user_scope, mnesia, []}). +-rabbit_upgrade({hash_passwords, mnesia, []}). +-rabbit_upgrade({add_ip_to_listener, mnesia, []}). +-rabbit_upgrade({internal_exchanges, mnesia, []}). +-rabbit_upgrade({user_to_internal_user, mnesia, [hash_passwords]}). +-rabbit_upgrade({topic_trie, mnesia, []}). %% ------------------------------------------------------------------- diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c29a4b045f..bba54d1682 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -301,7 +301,7 @@ %%---------------------------------------------------------------------------- --rabbit_upgrade({multiple_routing_keys, []}). +-rabbit_upgrade({multiple_routing_keys, local, []}). -ifdef(use_specs). diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl new file mode 100644 index 0000000000..400abc1083 --- /dev/null +++ b/src/rabbit_version.erl @@ -0,0 +1,172 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% + +-module(rabbit_version). + +-export([recorded/0, matches/2, desired/0, desired_for_scope/1, + record_desired/0, record_desired_for_scope/1, + upgrades_required/1]). + +%% ------------------------------------------------------------------- +-ifdef(use_specs). + +-export_type([scope/0, step/0]). + +-type(scope() :: atom()). +-type(scope_version() :: [atom()]). +-type(step() :: {atom(), atom()}). + +-type(version() :: [atom()]). + +-spec(recorded/0 :: () -> rabbit_types:ok_or_error2(version(), any())). +-spec(matches/2 :: ([A], [A]) -> boolean()). +-spec(desired/0 :: () -> version()). +-spec(desired_for_scope/1 :: (scope()) -> scope_version()). +-spec(record_desired/0 :: () -> 'ok'). +-spec(record_desired_for_scope/1 :: + (scope()) -> rabbit_types:ok_or_error(any())). +-spec(upgrades_required/1 :: + (scope()) -> rabbit_types:ok_or_error2([step()], any())). + +-endif. +%% ------------------------------------------------------------------- + +-define(VERSION_FILENAME, "schema_version"). +-define(SCOPES, [mnesia, local]). + +%% ------------------------------------------------------------------- + +recorded() -> case rabbit_misc:read_term_file(schema_filename()) of + {ok, [V]} -> {ok, V}; + {error, _} = Err -> Err + end. + +record(V) -> ok = rabbit_misc:write_term_file(schema_filename(), [V]). + +recorded_for_scope(Scope) -> + case recorded() of + {error, _} = Err -> + Err; + {ok, Version} -> + {ok, case lists:keysearch(Scope, 1, categorise_by_scope(Version)) of + false -> []; + {value, {Scope, SV1}} -> SV1 + end} + end. + +record_for_scope(Scope, ScopeVersion) -> + case recorded() of + {error, _} = Err -> + Err; + {ok, Version} -> + Version1 = lists:keystore(Scope, 1, categorise_by_scope(Version), + {Scope, ScopeVersion}), + ok = record([Name || {_Scope, Names} <- Version1, Name <- Names]) + end. + +%% ------------------------------------------------------------------- + +matches(VerA, VerB) -> + lists:usort(VerA) =:= lists:usort(VerB). + +%% ------------------------------------------------------------------- + +desired() -> [Name || Scope <- ?SCOPES, Name <- desired_for_scope(Scope)]. + +desired_for_scope(Scope) -> with_upgrade_graph(fun heads/1, Scope). + +record_desired() -> record(desired()). + +record_desired_for_scope(Scope) -> + record_for_scope(Scope, desired_for_scope(Scope)). + +upgrades_required(Scope) -> + case recorded_for_scope(Scope) of + {error, enoent} -> + {error, version_not_available}; + {ok, CurrentHeads} -> + with_upgrade_graph( + fun (G) -> + case unknown_heads(CurrentHeads, G) of + [] -> {ok, upgrades_to_apply(CurrentHeads, G)}; + Unknown -> {error, {future_upgrades_found, Unknown}} + end + end, Scope) + end. + +%% ------------------------------------------------------------------- + +with_upgrade_graph(Fun, Scope) -> + case rabbit_misc:build_acyclic_graph( + fun (Module, Steps) -> vertices(Module, Steps, Scope) end, + fun (Module, Steps) -> edges(Module, Steps, Scope) end, + rabbit_misc:all_module_attributes(rabbit_upgrade)) of + {ok, G} -> try + Fun(G) + after + true = digraph:delete(G) + end; + {error, {vertex, duplicate, StepName}} -> + throw({error, {duplicate_upgrade_step, StepName}}); + {error, {edge, {bad_vertex, StepName}, _From, _To}} -> + throw({error, {dependency_on_unknown_upgrade_step, StepName}}); + {error, {edge, {bad_edge, StepNames}, _From, _To}} -> + throw({error, {cycle_in_upgrade_steps, StepNames}}) + end. + +vertices(Module, Steps, Scope0) -> + [{StepName, {Module, StepName}} || {StepName, Scope1, _Reqs} <- Steps, + Scope0 == Scope1]. + +edges(_Module, Steps, Scope0) -> + [{Require, StepName} || {StepName, Scope1, Requires} <- Steps, + Require <- Requires, + Scope0 == Scope1]. +unknown_heads(Heads, G) -> + [H || H <- Heads, digraph:vertex(G, H) =:= false]. + +upgrades_to_apply(Heads, G) -> + %% Take all the vertices which can reach the known heads. That's + %% everything we've already applied. Subtract that from all + %% vertices: that's what we have to apply. + Unsorted = sets:to_list( + sets:subtract( + sets:from_list(digraph:vertices(G)), + sets:from_list(digraph_utils:reaching(Heads, G)))), + %% Form a subgraph from that list and find a topological ordering + %% so we can invoke them in order. + [element(2, digraph:vertex(G, StepName)) || + StepName <- digraph_utils:topsort(digraph_utils:subgraph(G, Unsorted))]. + +heads(G) -> + lists:sort([V || V <- digraph:vertices(G), digraph:out_degree(G, V) =:= 0]). + +%% ------------------------------------------------------------------- + +categorise_by_scope(Version) when is_list(Version) -> + Categorised = + [{Scope, Name} || {_Module, Attributes} <- + rabbit_misc:all_module_attributes(rabbit_upgrade), + {Name, Scope, _Requires} <- Attributes, + lists:member(Name, Version)], + orddict:to_list( + lists:foldl(fun ({Scope, Name}, CatVersion) -> + rabbit_misc:orddict_cons(Scope, Name, CatVersion) + end, orddict:new(), Categorised)). + +dir() -> rabbit_mnesia:dir(). + +schema_filename() -> filename:join(dir(), ?VERSION_FILENAME). |
