diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-10-17 16:54:23 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-10-17 16:54:23 +0100 |
| commit | 66217363cd194531b3fa345fcc037c62978e95ac (patch) | |
| tree | f9c7fff94e8a8570fc006c876624c99f1c8d376c /src | |
| parent | 7e0f44f2a8d4c602ad25d574ebb6fac2e9e3b1b5 (diff) | |
| parent | bbd45f3d83621c78df930ab03864de04f65daacf (diff) | |
| download | rabbitmq-server-git-66217363cd194531b3fa345fcc037c62978e95ac.tar.gz | |
Merge bug21413
Diffstat (limited to 'src')
| -rw-r--r-- | src/gatherer.erl | 51 | ||||
| -rw-r--r-- | src/rabbit.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 133 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 186 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_control_main.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 58 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 114 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 241 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 330 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 316 | ||||
| -rw-r--r-- | src/rabbit_policy.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_table.erl | 311 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 51 | ||||
| -rw-r--r-- | src/rabbit_types.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_upgrade.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 33 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_vm.erl | 129 |
21 files changed, 1173 insertions, 837 deletions
diff --git a/src/gatherer.erl b/src/gatherer.erl index 98b360389a..29d2d71366 100644 --- a/src/gatherer.erl +++ b/src/gatherer.erl @@ -18,7 +18,7 @@ -behaviour(gen_server2). --export([start_link/0, stop/1, fork/1, finish/1, in/2, out/1]). +-export([start_link/0, stop/1, fork/1, finish/1, in/2, sync_in/2, out/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -32,6 +32,7 @@ -spec(fork/1 :: (pid()) -> 'ok'). -spec(finish/1 :: (pid()) -> 'ok'). -spec(in/2 :: (pid(), any()) -> 'ok'). +-spec(sync_in/2 :: (pid(), any()) -> 'ok'). -spec(out/1 :: (pid()) -> {'value', any()} | 'empty'). -endif. @@ -62,6 +63,9 @@ finish(Pid) -> in(Pid, Value) -> gen_server2:cast(Pid, {in, Value}). +sync_in(Pid, Value) -> + gen_server2:call(Pid, {in, Value}, infinity). + out(Pid) -> gen_server2:call(Pid, out, infinity). @@ -78,19 +82,22 @@ handle_call(stop, _From, State) -> handle_call(fork, _From, State = #gstate { forks = Forks }) -> {reply, ok, State #gstate { forks = Forks + 1 }, hibernate}; +handle_call({in, Value}, From, State) -> + {noreply, in(Value, From, State), hibernate}; + handle_call(out, From, State = #gstate { forks = Forks, values = Values, blocked = Blocked }) -> case queue:out(Values) of + {empty, _} when Forks == 0 -> + {reply, empty, State, hibernate}; {empty, _} -> - case Forks of - 0 -> {reply, empty, State, hibernate}; - _ -> {noreply, - State #gstate { blocked = queue:in(From, Blocked) }, - hibernate} - end; - {{value, _Value} = V, NewValues} -> - {reply, V, State #gstate { values = NewValues }, hibernate} + {noreply, State #gstate { blocked = queue:in(From, Blocked) }, + hibernate}; + {{value, {PendingIn, Value}}, NewValues} -> + reply(PendingIn, ok), + {reply, {value, Value}, State #gstate { values = NewValues }, + hibernate} end; handle_call(Msg, _From, State) -> @@ -107,15 +114,8 @@ handle_cast(finish, State = #gstate { forks = Forks, blocked = Blocked }) -> {noreply, State #gstate { forks = NewForks, blocked = NewBlocked }, hibernate}; -handle_cast({in, Value}, State = #gstate { values = Values, - blocked = Blocked }) -> - {noreply, case queue:out(Blocked) of - {empty, _} -> - State #gstate { values = queue:in(Value, Values) }; - {{value, From}, NewBlocked} -> - gen_server2:reply(From, {value, Value}), - State #gstate { blocked = NewBlocked } - end, hibernate}; +handle_cast({in, Value}, State) -> + {noreply, in(Value, undefined, State), hibernate}; handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. @@ -128,3 +128,18 @@ code_change(_OldVsn, State, _Extra) -> terminate(_Reason, State) -> State. + +%%---------------------------------------------------------------------------- + +in(Value, From, State = #gstate { values = Values, blocked = Blocked }) -> + case queue:out(Blocked) of + {empty, _} -> + State #gstate { values = queue:in({From, Value}, Values) }; + {{value, PendingOut}, NewBlocked} -> + reply(From, ok), + gen_server2:reply(PendingOut, {value, Value}), + State #gstate { blocked = NewBlocked } + end. + +reply(undefined, _Reply) -> ok; +reply(From, Reply) -> gen_server2:reply(From, Reply). diff --git a/src/rabbit.erl b/src/rabbit.erl index 3fe27cd901..93808f8413 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -364,7 +364,7 @@ status() -> {running_applications, application:which_applications(infinity)}, {os, os:type()}, {erlang_version, erlang:system_info(system_version)}, - {memory, erlang:memory()}], + {memory, rabbit_vm:memory()}], S2 = rabbit_misc:filter_exit_map( fun ({Key, {M, F, A}}) -> {Key, erlang:apply(M, F, A)} end, [{vm_memory_high_watermark, {vm_memory_monitor, @@ -427,7 +427,7 @@ stop(_State) -> ok = rabbit_alarm:stop(), ok = case rabbit_mnesia:is_clustered() of true -> rabbit_amqqueue:on_node_down(node()); - false -> rabbit_mnesia:empty_ram_only_tables() + false -> rabbit_table:clear_ram_only_tables() end, ok. @@ -558,7 +558,7 @@ recover() -> rabbit_binding:recover(rabbit_exchange:recover(), rabbit_amqqueue:start()). maybe_insert_default_data() -> - case rabbit_mnesia:is_db_empty() of + case rabbit_table:is_empty() of true -> insert_default_data(); false -> ok end. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 4a20a1bcaf..6ad85b24f5 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -29,7 +29,7 @@ -export([notify_down_all/2, limit_all/3]). -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). - +-export([start_mirroring/1, stop_mirroring/1]). %% internal -export([internal_declare/2, internal_delete/2, run_backing_queue/3, @@ -165,6 +165,8 @@ -spec(store_queue/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(policy_changed/2 :: (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). +-spec(start_mirroring/1 :: (pid()) -> 'ok'). +-spec(stop_mirroring/1 :: (pid()) -> 'ok'). -endif. @@ -210,19 +212,20 @@ recover_durable_queues(DurableQueues) -> declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), - {Node, MNodes} = determine_queue_nodes(Args), - Q = start_queue_process(Node, #amqqueue{name = QueueName, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args, - exclusive_owner = Owner, - pid = none, - slave_pids = [], - sync_slave_pids = [], - mirror_nodes = MNodes}), - case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of + Q0 = rabbit_policy:set(#amqqueue{name = QueueName, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, + exclusive_owner = Owner, + pid = none, + slave_pids = [], + sync_slave_pids = [], + gm_pids = []}), + {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0), + Q1 = start_queue_process(Node, Q0), + case gen_server2:call(Q1#amqqueue.pid, {init, false}, infinity) of not_found -> rabbit_misc:not_found(QueueName); - Q1 -> Q1 + Q2 -> Q2 end. internal_declare(Q, true) -> @@ -271,24 +274,8 @@ store_queue(Q = #amqqueue{durable = false}) -> ok = mnesia:write(rabbit_queue, Q, write), ok. -policy_changed(_Q1, _Q2) -> - ok. - -determine_queue_nodes(Args) -> - Policy = rabbit_misc:table_lookup(Args, <<"x-ha-policy">>), - PolicyParams = rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>), - case {Policy, PolicyParams} of - {{_Type, <<"nodes">>}, {array, Nodes}} -> - case [list_to_atom(binary_to_list(Node)) || - {longstr, Node} <- Nodes] of - [Node] -> {Node, undefined}; - [First | Rest] -> {First, [First | Rest]} - end; - {{_Type, <<"all">>}, _} -> - {node(), all}; - _ -> - {node(), undefined} - end. +policy_changed(Q1, Q2) -> + rabbit_mirror_queue_misc:update_mirrors(Q1, Q2). start_queue_process(Node, Q) -> {ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]), @@ -311,8 +298,6 @@ lookup(Name) -> with(Name, F, E) -> case lookup(Name) of - {ok, Q = #amqqueue{slave_pids = []}} -> - rabbit_misc:with_exit_handler(E, fun () -> F(Q) end); {ok, Q = #amqqueue{pid = QPid}} -> %% We check is_process_alive(QPid) in case we receive a %% nodedown (for example) in F() that has nothing to do @@ -364,13 +349,11 @@ with_exclusive_access_or_die(Name, ReaderPid, F) -> assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, RequiredArgs) -> rabbit_misc:assert_args_equivalence( - Args, RequiredArgs, QueueName, - [<<"x-expires">>, <<"x-message-ttl">>, <<"x-ha-policy">>]). + Args, RequiredArgs, QueueName, [<<"x-expires">>, <<"x-message-ttl">>]). check_declare_arguments(QueueName, Args) -> Checks = [{<<"x-expires">>, fun check_positive_int_arg/2}, {<<"x-message-ttl">>, fun check_non_neg_int_arg/2}, - {<<"x-ha-policy">>, fun check_ha_policy_arg/2}, {<<"x-dead-letter-exchange">>, fun check_string_arg/2}, {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}], [case rabbit_misc:table_lookup(Args, Key) of @@ -421,29 +404,6 @@ check_dlxrk_arg({longstr, _}, Args) -> check_dlxrk_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. -check_ha_policy_arg({longstr, <<"all">>}, _Args) -> - ok; -check_ha_policy_arg({longstr, <<"nodes">>}, Args) -> - case rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>) of - undefined -> - {error, {require, 'x-ha-policy-params'}}; - {array, []} -> - {error, {require_non_empty_list_of_nodes_for_ha}}; - {array, Ary} -> - case lists:all(fun ({longstr, _Node}) -> true; - (_ ) -> false - end, Ary) of - true -> ok; - false -> {error, {require_node_list_as_longstrs_for_ha, Ary}} - end; - {Type, _} -> - {error, {ha_nodes_policy_params_not_array_of_longstr, Type}} - end; -check_ha_policy_arg({longstr, Policy}, _Args) -> - {error, {invalid_ha_policy, Policy}}; -check_ha_policy_arg({Type, _}, _Args) -> - {error, {unacceptable_type, Type}}. - list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}). @@ -613,6 +573,9 @@ set_ram_duration_target(QPid, Duration) -> set_maximum_since_use(QPid, Age) -> gen_server2:cast(QPid, {set_maximum_since_use, Age}). +start_mirroring(QPid) -> ok = delegate_call(QPid, start_mirroring). +stop_mirroring(QPid) -> ok = delegate_call(QPid, stop_mirroring). + on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = @@ -647,8 +610,7 @@ pseudo_queue(QueueName, Pid) -> auto_delete = false, arguments = [], pid = Pid, - slave_pids = [], - mirror_nodes = undefined}. + slave_pids = []}. deliver([], #delivery{mandatory = false}, _Flow) -> %% /dev/null optimisation @@ -661,29 +623,50 @@ deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) -> %% returned. It is therefore safe to use a fire-and-forget cast %% here and return the QPids - the semantics is preserved. This %% scales much better than the case below. - QPids = qpids(Qs), + {MPids, SPids} = qpids(Qs), + QPids = MPids ++ SPids, case Flow of flow -> [credit_flow:send(QPid) || QPid <- QPids]; noflow -> ok end, - delegate:invoke_no_result( - QPids, fun (QPid) -> - gen_server2:cast(QPid, {deliver, Delivery, Flow}) - end), + + %% We let slaves know that they were being addressed as slaves at + %% the time - if they receive such a message from the channel + %% after they have become master they should mark the message as + %% 'delivered' since they do not know what the master may have + %% done with it. + MMsg = {deliver, Delivery, false, Flow}, + SMsg = {deliver, Delivery, true, Flow}, + delegate:invoke_no_result(MPids, + fun (QPid) -> gen_server2:cast(QPid, MMsg) end), + delegate:invoke_no_result(SPids, + fun (QPid) -> gen_server2:cast(QPid, SMsg) end), {routed, QPids}; deliver(Qs, Delivery, _Flow) -> - case delegate:invoke( - qpids(Qs), fun (QPid) -> - ok = gen_server2:call(QPid, {deliver, Delivery}, - infinity) - end) of - {[], _} -> {unroutable, []}; - {R , _} -> {routed, [QPid || {QPid, ok} <- R]} + {MPids, SPids} = qpids(Qs), + %% see comment above + MMsg = {deliver, Delivery, false}, + SMsg = {deliver, Delivery, true}, + {MRouted, _} = delegate:invoke( + MPids, fun (QPid) -> + ok = gen_server2:call(QPid, MMsg, infinity) + end), + {SRouted, _} = delegate:invoke( + SPids, fun (QPid) -> + ok = gen_server2:call(QPid, SMsg, infinity) + end), + case MRouted ++ SRouted of + [] -> {unroutable, []}; + R -> {routed, [QPid || {QPid, ok} <- R]} end. -qpids(Qs) -> lists:append([[QPid | SPids] || - #amqqueue{pid = QPid, slave_pids = SPids} <- Qs]). +qpids(Qs) -> + {MPids, SPids} = lists:foldl(fun (#amqqueue{pid = QPid, slave_pids = SPids}, + {MPidAcc, SPidAcc}) -> + {[QPid | MPidAcc], [SPids | SPidAcc]} + end, {[], []}, Qs), + {MPids, lists:append(SPids)}. safe_delegate_call_ok(F, Pids) -> {_, Bads} = delegate:invoke(Pids, fun (Pid) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 10ac5bea60..7ce6958223 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -26,7 +26,7 @@ -export([start_link/1, info_keys/0]). --export([init_with_backing_queue_state/8]). +-export([init_with_backing_queue_state/7]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, @@ -76,8 +76,8 @@ -spec(start_link/1 :: (rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). --spec(init_with_backing_queue_state/8 :: - (rabbit_types:amqqueue(), atom(), tuple(), any(), [any()], +-spec(init_with_backing_queue_state/7 :: + (rabbit_types:amqqueue(), atom(), tuple(), any(), [rabbit_types:delivery()], pmon:pmon(), dict()) -> #q{}). -endif. @@ -86,12 +86,14 @@ -define(STATISTICS_KEYS, [pid, + policy, exclusive_consumer_pid, exclusive_consumer_tag, messages_ready, messages_unacknowledged, messages, consumers, + active_consumers, memory, slave_pids, synchronised_slave_pids, @@ -144,7 +146,7 @@ init(Q) -> {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, - RateTRef, AckTags, Deliveries, Senders, MTC) -> + RateTRef, Deliveries, Senders, MTC) -> case Owner of none -> ok; _ -> erlang:monitor(process, Owner) @@ -166,9 +168,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, delayed_stop = undefined, queue_monitors = pmon:new(), msg_id_to_channel = MTC}, - State1 = requeue_and_run(AckTags, process_args( - rabbit_event:init_stats_timer( - State, #q.stats_timer))), + State1 = process_args(rabbit_event:init_stats_timer(State, #q.stats_timer)), lists:foldl(fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, true, StateN) end, State1, Deliveries). @@ -179,7 +179,6 @@ terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); terminate(Reason, State = #q{q = #amqqueue{name = QName}, backing_queue = BQ}) -> - %% FIXME: How do we cancel active subscriptions? terminate_shutdown( fun (BQS) -> BQS1 = BQ:delete_and_terminate(Reason, BQS), @@ -230,8 +229,7 @@ matches(false, Q1, Q2) -> Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso - Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids andalso - Q1#amqqueue.mirror_nodes =:= Q2#amqqueue.mirror_nodes. + Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids. bq_init(BQ, Q, Recover) -> Self = self(), @@ -296,11 +294,11 @@ next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> timed -> {ensure_sync_timer(State1), 0 } end. -backing_queue_module(#amqqueue{arguments = Args}) -> - case rabbit_misc:table_lookup(Args, <<"x-ha-policy">>) of - undefined -> {ok, BQM} = application:get_env(backing_queue_module), - BQM; - _Policy -> rabbit_mirror_queue_master +backing_queue_module(Q) -> + case rabbit_mirror_queue_misc:is_mirrored(Q) of + false -> {ok, BQM} = application:get_env(backing_queue_module), + BQM; + true -> rabbit_mirror_queue_master end. ensure_sync_timer(State = #q{sync_timer_ref = undefined}) -> @@ -499,32 +497,21 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs), State#q{msg_id_to_channel = MTC1}. -should_confirm_message(#delivery{msg_seq_no = undefined}, _State) -> - never; -should_confirm_message(#delivery{sender = SenderPid, +send_or_record_confirm(#delivery{msg_seq_no = undefined}, State) -> + {never, State}; +send_or_record_confirm(#delivery{sender = SenderPid, msg_seq_no = MsgSeqNo, message = #basic_message { is_persistent = true, id = MsgId}}, - #q{q = #amqqueue{durable = true}}) -> - {eventually, SenderPid, MsgSeqNo, MsgId}; -should_confirm_message(#delivery{sender = SenderPid, - msg_seq_no = MsgSeqNo}, - _State) -> - {immediately, SenderPid, MsgSeqNo}. - -needs_confirming({eventually, _, _, _}) -> true; -needs_confirming(_) -> false. - -maybe_record_confirm_message({eventually, SenderPid, MsgSeqNo, MsgId}, - State = #q{msg_id_to_channel = MTC}) -> - State#q{msg_id_to_channel = - gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC)}; -maybe_record_confirm_message({immediately, SenderPid, MsgSeqNo}, State) -> + State = #q{q = #amqqueue{durable = true}, + msg_id_to_channel = MTC}) -> + MTC1 = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC), + {eventually, State#q{msg_id_to_channel = MTC1}}; +send_or_record_confirm(#delivery{sender = SenderPid, + msg_seq_no = MsgSeqNo}, State) -> rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), - State; -maybe_record_confirm_message(_Confirm, State) -> - State. + {immediately, State}. run_message_queue(State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = @@ -546,46 +533,39 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Props, {{Message, Props#message_properties.delivered, AckTag}, true, State1#q{backing_queue_state = BQS3}} end, false, State#q{backing_queue_state = BQS1}); - {Duplicate, BQS1} -> - %% if the message has previously been seen by the BQ then - %% it must have been seen under the same circumstances as - %% now: i.e. if it is now a deliver_immediately then it - %% must have been before. - {case Duplicate of - published -> true; - discarded -> false - end, - State#q{backing_queue_state = BQS1}} + {published, BQS1} -> + {true, State#q{backing_queue_state = BQS1}}; + {discarded, BQS1} -> + {false, State#q{backing_queue_state = BQS1}} end. -deliver_or_enqueue(Delivery = #delivery{message = Message, - sender = SenderPid}, Delivered, - State) -> - Confirm = should_confirm_message(Delivery, State), +deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, + Delivered, State) -> + {Confirm, State1} = send_or_record_confirm(Delivery, State), Props = message_properties(Confirm, Delivered, State), - case attempt_delivery(Delivery, Props, State) of - {true, State1} -> - maybe_record_confirm_message(Confirm, State1); - %% the next one is an optimisations - %% TODO: optimise the Confirm =/= never case too - {false, State1 = #q{ttl = 0, dlx = undefined}} when Confirm == never -> - discard_delivery(Delivery, State1); - {false, State1} -> - State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = - maybe_record_confirm_message(Confirm, State1), + case attempt_delivery(Delivery, Props, State1) of + {true, State2} -> + State2; + %% The next one is an optimisation + {false, State2 = #q{ttl = 0, dlx = undefined}} -> + %% fake an 'eventual' confirm from BQ; noop if not needed + State3 = #q{backing_queue = BQ, backing_queue_state = BQS} = + confirm_messages([Message#basic_message.id], State2), + BQS1 = BQ:discard(Message, SenderPid, BQS), + State3#q{backing_queue_state = BQS1}; + {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> BQS1 = BQ:publish(Message, Props, SenderPid, BQS), ensure_ttl_timer(Props#message_properties.expiry, State2#q{backing_queue_state = BQS1}) end. -requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> - run_backing_queue(BQ, fun (M, BQS) -> - {_MsgIds, BQS1} = M:requeue(AckTags, BQS), - BQS1 - end, State). +requeue_and_run(AckTags, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), + run_message_queue(State#q{backing_queue_state = BQS1}). -fetch(AckRequired, State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> +fetch(AckRequired, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), {Result, State#q{backing_queue_state = BQS1}}. @@ -679,12 +659,9 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. -backing_queue_timeout(State = #q{backing_queue = BQ}) -> - run_backing_queue(BQ, fun (M, BQS) -> M:timeout(BQS) end, State). - -run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - run_message_queue(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}). +backing_queue_timeout(State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + State#q{backing_queue_state = BQ:timeout(BQS)}. subtract_acks(ChPid, AckTags, State, Fun) -> case lookup_ch(ChPid) of @@ -696,15 +673,9 @@ subtract_acks(ChPid, AckTags, State, Fun) -> Fun(State) end. -discard_delivery(#delivery{sender = SenderPid, - message = Message}, - State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}. - message_properties(Confirm, Delivered, #q{ttl = TTL}) -> #message_properties{expiry = calculate_msg_expiry(TTL), - needs_confirming = needs_confirming(Confirm), + needs_confirming = Confirm == eventually, delivered = Delivered}. calculate_msg_expiry(undefined) -> undefined; @@ -897,6 +868,12 @@ i(owner_pid, #q{q = #amqqueue{exclusive_owner = none}}) -> ''; i(owner_pid, #q{q = #amqqueue{exclusive_owner = ExclusiveOwner}}) -> ExclusiveOwner; +i(policy, #q{q = #amqqueue{name = Name}}) -> + {ok, Q} = rabbit_amqqueue:lookup(Name), + case rabbit_policy:name(Q) of + none -> ''; + Policy -> Policy + end; i(exclusive_consumer_pid, #q{exclusive_consumer = none}) -> ''; i(exclusive_consumer_pid, #q{exclusive_consumer = {ChPid, _ConsumerTag}}) -> @@ -914,18 +891,24 @@ i(messages, State) -> messages_unacknowledged]]); i(consumers, _) -> consumer_count(); +i(active_consumers, _) -> + active_consumer_count(); i(memory, _) -> {memory, M} = process_info(self(), memory), M; i(slave_pids, #q{q = #amqqueue{name = Name}}) -> - case rabbit_amqqueue:lookup(Name) of - {ok, #amqqueue{mirror_nodes = undefined}} -> ''; - {ok, #amqqueue{slave_pids = SPids}} -> SPids + {ok, Q = #amqqueue{slave_pids = SPids}} = + rabbit_amqqueue:lookup(Name), + case rabbit_mirror_queue_misc:is_mirrored(Q) of + false -> ''; + true -> SPids end; i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) -> - case rabbit_amqqueue:lookup(Name) of - {ok, #amqqueue{mirror_nodes = undefined}} -> ''; - {ok, #amqqueue{sync_slave_pids = SSPids}} -> SSPids + {ok, Q = #amqqueue{sync_slave_pids = SSPids}} = + rabbit_amqqueue:lookup(Name), + case rabbit_mirror_queue_misc:is_mirrored(Q) of + false -> ''; + true -> SSPids end; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); @@ -1030,10 +1013,10 @@ handle_call({info, Items}, _From, State) -> handle_call(consumers, _From, State) -> reply(consumers(State), State); -handle_call({deliver, Delivery}, From, State) -> +handle_call({deliver, Delivery, Delivered}, From, State) -> %% Synchronous, "mandatory" deliver mode. gen_server2:reply(From, ok), - noreply(deliver_or_enqueue(Delivery, false, State)); + noreply(deliver_or_enqueue(Delivery, Delivered, State)); handle_call({notify_down, ChPid}, From, State) -> %% we want to do this synchronously, so that auto_deleted queues @@ -1149,6 +1132,23 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> ChPid, AckTags, State, fun (State1) -> requeue_and_run(AckTags, State1) end)); +handle_call(start_mirroring, _From, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + %% lookup again to get policy for init_with_existing_bq + {ok, Q} = rabbit_amqqueue:lookup(qname(State)), + true = BQ =/= rabbit_mirror_queue_master, %% assertion + BQ1 = rabbit_mirror_queue_master, + BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS), + reply(ok, State#q{backing_queue = BQ1, + backing_queue_state = BQS1}); + +handle_call(stop_mirroring, _From, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + BQ = rabbit_mirror_queue_master, %% assertion + {BQ1, BQS1} = BQ:stop_mirroring(BQS), + reply(ok, State#q{backing_queue = BQ1, + backing_queue_state = BQS1}); + handle_call(force_event_refresh, _From, State = #q{exclusive_consumer = Exclusive}) -> rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)), @@ -1173,10 +1173,12 @@ handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) -> handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> noreply(State); -handle_cast({run_backing_queue, Mod, Fun}, State) -> - noreply(run_backing_queue(Mod, Fun, State)); +handle_cast({run_backing_queue, Mod, Fun}, + State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + noreply(run_message_queue( + State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)})); -handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, +handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow}, State = #q{senders = Senders}) -> %% Asynchronous, non-"mandatory" deliver mode. Senders1 = case Flow of @@ -1185,7 +1187,7 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, noflow -> Senders end, State1 = State#q{senders = Senders1}, - noreply(deliver_or_enqueue(Delivery, false, State1)); + noreply(deliver_or_enqueue(Delivery, Delivered, State1)); handle_cast({ack, AckTags, ChPid}, State) -> noreply(subtract_acks( diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index d69a6c3b98..c6d1778532 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -24,6 +24,7 @@ -type(ack() :: any()). -type(state() :: any()). +-type(msg_ids() :: [rabbit_types:msg_id()]). -type(fetch_result(Ack) :: ('empty' | %% Message, IsDelivered, AckTag, Remaining_Len @@ -117,7 +118,7 @@ %% first time the message id appears in the result of %% drain_confirmed. All subsequent appearances of that message id will %% be ignored. --callback drain_confirmed(state()) -> {[rabbit_guid:guid()], state()}. +-callback drain_confirmed(state()) -> {msg_ids(), state()}. %% Drop messages from the head of the queue while the supplied predicate returns %% true. Also accepts a boolean parameter that determines whether the messages @@ -136,7 +137,7 @@ %% Acktags supplied are for messages which can now be forgotten %% about. Must return 1 msg_id per Ack, in the same order as Acks. --callback ack([ack()], state()) -> {[rabbit_guid:guid()], state()}. +-callback ack([ack()], state()) -> {msg_ids(), state()}. %% Acktags supplied are for messages which should be processed. The %% provided callback function is called with each message. @@ -144,7 +145,7 @@ %% Reinsert messages into the queue which have already been delivered %% and were pending acknowledgement. --callback requeue([ack()], state()) -> {[rabbit_guid:guid()], state()}. +-callback requeue([ack()], state()) -> {msg_ids(), state()}. %% How long is my queue? -callback len(state()) -> non_neg_integer(). diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index e75e1f6f7c..a3cbf6e53c 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -170,8 +170,8 @@ start() -> {error, Reason} -> print_error("~p", [Reason]), rabbit_misc:quit(2); - {parse_error, {_Line, Mod, Err}} -> - print_error("~s", [lists:flatten(Mod:format_error(Err))]), + {error_string, Reason} -> + print_error("~s", [Reason]), rabbit_misc:quit(2); {badrpc, {'EXIT', Reason}} -> print_error("~p", [Reason]), @@ -477,12 +477,14 @@ action(eval, Node, [Expr], _Opts, _Inform) -> Node, erl_eval, exprs, [Parsed, []]), io:format("~p~n", [Value]), ok; - {error, E} -> {parse_error, E} + {error, E} -> {error_string, format_parse_error(E)} end; {error, E, _} -> - {parse_error, E} + {error_string, format_parse_error(E)} end. +format_parse_error({_Line, Mod, Err}) -> lists:flatten(Mod:format_error(Err)). + %%---------------------------------------------------------------------------- wait_for_application(Node, PidFile, Application, Inform) -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 4cc96ef552..a205b23d0b 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -298,7 +298,10 @@ i(durable, #exchange{durable = Durable}) -> Durable; i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete; i(internal, #exchange{internal = Internal}) -> Internal; i(arguments, #exchange{arguments = Arguments}) -> Arguments; -i(policy, X) -> rabbit_policy:name(X); +i(policy, X) -> case rabbit_policy:name(X) of + none -> ''; + Policy -> Policy + end; i(Item, _) -> throw({bad_argument, Item}). info(X = #exchange{}) -> infos(?INFO_KEYS, X). diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 4455b4419f..16690693f1 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -33,14 +33,14 @@ gm, monitors, death_fun, - length_fun + depth_fun }). -ifdef(use_specs). -spec(start_link/4 :: (rabbit_types:amqqueue(), pid() | 'undefined', rabbit_mirror_queue_master:death_fun(), - rabbit_mirror_queue_master:length_fun()) -> + rabbit_mirror_queue_master:depth_fun()) -> rabbit_types:ok_pid_or_error()). -spec(get_gm/1 :: (pid()) -> pid()). -spec(ensure_monitoring/2 :: (pid(), [pid()]) -> 'ok'). @@ -101,19 +101,25 @@ %% channel during a publish, only some of the mirrors may receive that %% publish. As a result of this problem, the messages broadcast over %% the gm contain published content, and thus slaves can operate -%% successfully on messages that they only receive via the gm. The key -%% purpose of also sending messages directly from the channels to the -%% slaves is that without this, in the event of the death of the -%% master, messages could be lost until a suitable slave is promoted. +%% successfully on messages that they only receive via the gm. %% -%% However, that is not the only reason. For example, if confirms are -%% in use, then there is no guarantee that every slave will see the -%% delivery with the same msg_seq_no. As a result, the slaves have to -%% wait until they've seen both the publish via gm, and the publish -%% via the channel before they have enough information to be able to -%% perform the publish to their own bq, and subsequently issue the -%% confirm, if necessary. Either form of publish can arrive first, and -%% a slave can be upgraded to the master at any point during this +%% The key purpose of also sending messages directly from the channels +%% to the slaves is that without this, in the event of the death of +%% the master, messages could be lost until a suitable slave is +%% promoted. However, that is not the only reason. A slave cannot send +%% confirms for a message until it has seen it from the +%% channel. Otherwise, it might send a confirm to a channel for a +%% message that it might *never* receive from that channel. This can +%% happen because new slaves join the gm ring (and thus receive +%% messages from the master) before inserting themselves in the +%% queue's mnesia record (which is what channels look at for routing). +%% As it turns out, channels will simply ignore such bogus confirms, +%% but relying on that would introduce a dangerously tight coupling. +%% +%% Hence the slaves have to wait until they've seen both the publish +%% via gm, and the publish via the channel before they issue the +%% confirm. Either form of publish can arrive first, and a slave can +%% be upgraded to the master at any point during this %% process. Confirms continue to be issued correctly, however. %% %% Because the slave is a full process, it impersonates parts of the @@ -154,8 +160,8 @@ %% be able to work out when their head does not differ from the master %% (and is much simpler and cheaper than getting the master to hang on %% to the guid of the msg at the head of its queue). When a slave is -%% promoted to a master, it unilaterally broadcasts its length, in -%% order to solve the problem of length requests from new slaves being +%% promoted to a master, it unilaterally broadcasts its depth, in +%% order to solve the problem of depth requests from new slaves being %% unanswered by a dead master. %% %% Obviously, due to the async nature of communication across gm, the @@ -297,15 +303,15 @@ %% if they have no mirrored content at all. This is not surprising: to %% achieve anything more sophisticated would require the master and %% recovering slave to be able to check to see whether they agree on -%% the last seen state of the queue: checking length alone is not +%% the last seen state of the queue: checking depth alone is not %% sufficient in this case. %% %% For more documentation see the comments in bug 23554. %% %%---------------------------------------------------------------------------- -start_link(Queue, GM, DeathFun, LengthFun) -> - gen_server2:start_link(?MODULE, [Queue, GM, DeathFun, LengthFun], []). +start_link(Queue, GM, DeathFun, DepthFun) -> + gen_server2:start_link(?MODULE, [Queue, GM, DeathFun, DepthFun], []). get_gm(CPid) -> gen_server2:call(CPid, get_gm, infinity). @@ -317,7 +323,7 @@ ensure_monitoring(CPid, Pids) -> %% gen_server %% --------------------------------------------------------------------------- -init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) -> +init([#amqqueue { name = QueueName } = Q, GM, DeathFun, DepthFun]) -> GM1 = case GM of undefined -> {ok, GM2} = gm:start_link(QueueName, ?MODULE, [self()]), @@ -333,7 +339,7 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, LengthFun]) -> gm = GM1, monitors = pmon:new(), death_fun = DeathFun, - length_fun = LengthFun }, + depth_fun = DepthFun }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -343,7 +349,7 @@ handle_call(get_gm, _From, State = #state { gm = GM }) -> handle_cast({gm_deaths, Deaths}, State = #state { q = #amqqueue { name = QueueName, pid = MPid } }) when node(MPid) =:= node() -> - case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of + case rabbit_mirror_queue_misc:remove_from_queue(QueueName, MPid, Deaths) of {ok, MPid, DeadPids} -> rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName, DeadPids), @@ -352,8 +358,8 @@ handle_cast({gm_deaths, Deaths}, {stop, normal, State} end; -handle_cast(request_length, State = #state { length_fun = LengthFun }) -> - ok = LengthFun(), +handle_cast(request_depth, State = #state { depth_fun = DepthFun }) -> + ok = DepthFun(), noreply(State); handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) -> @@ -397,9 +403,7 @@ members_changed([_CPid], _Births, []) -> members_changed([CPid], _Births, Deaths) -> ok = gen_server2:cast(CPid, {gm_deaths, Deaths}). -handle_msg([_CPid], _From, master_changed) -> - ok; -handle_msg([CPid], _From, request_length = Msg) -> +handle_msg([CPid], _From, request_depth = Msg) -> ok = gen_server2:cast(CPid, Msg); handle_msg([CPid], _From, {ensure_monitoring, _Pids} = Msg) -> ok = gen_server2:cast(CPid, Msg); diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 4cfb3dcbfa..377d51868d 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -25,7 +25,9 @@ -export([start/1, stop/0]). --export([promote_backing_queue_state/6, sender_death_fun/0, length_fun/0]). +-export([promote_backing_queue_state/7, sender_death_fun/0, depth_fun/0]). + +-export([init_with_existing_bq/3, stop_mirroring/1]). -behaviour(rabbit_backing_queue). @@ -44,10 +46,10 @@ -ifdef(use_specs). --export_type([death_fun/0, length_fun/0]). +-export_type([death_fun/0, depth_fun/0]). -type(death_fun() :: fun ((pid()) -> 'ok')). --type(length_fun() :: fun (() -> 'ok')). +-type(depth_fun() :: fun (() -> 'ok')). -type(master_state() :: #state { gm :: pid(), coordinator :: pid(), backing_queue :: atom(), @@ -59,10 +61,14 @@ known_senders :: set() }). --spec(promote_backing_queue_state/6 :: - (pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()). +-spec(promote_backing_queue_state/7 :: + (pid(), atom(), any(), pid(), [any()], dict(), [pid()]) -> + master_state()). -spec(sender_death_fun/0 :: () -> death_fun()). --spec(length_fun/0 :: () -> length_fun()). +-spec(depth_fun/0 :: () -> depth_fun()). +-spec(init_with_existing_bq/3 :: (rabbit_types:amqqueue(), atom(), any()) -> + master_state()). +-spec(stop_mirroring/1 :: (master_state()) -> {atom(), any()}). -endif. @@ -82,20 +88,27 @@ stop() -> %% Same as start/1. exit({not_valid_for_generic_backing_queue, ?MODULE}). -init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, - AsyncCallback) -> - {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( - Q, undefined, sender_death_fun(), length_fun()), - GM = rabbit_mirror_queue_coordinator:get_gm(CPid), - MNodes1 = (case MNodes of - all -> rabbit_mnesia:cluster_nodes(all); - undefined -> []; - _ -> MNodes - end) -- [node()], - [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1], +init(Q = #amqqueue{name = QName}, Recover, AsyncCallback) -> {ok, BQ} = application:get_env(backing_queue_module), BQS = BQ:init(Q, Recover, AsyncCallback), + State = #state{gm = GM} = init_with_existing_bq(Q, BQ, BQS), + {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), + rabbit_mirror_queue_misc:add_mirrors(QName, SNodes), ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}), + State. + +init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) -> + {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( + Q, undefined, sender_death_fun(), depth_fun()), + GM = rabbit_mirror_queue_coordinator:get_gm(CPid), + Self = self(), + ok = rabbit_misc:execute_mnesia_transaction( + fun () -> + [Q1 = #amqqueue{gm_pids = GMPids}] + = mnesia:read({rabbit_queue, QName}), + ok = rabbit_amqqueue:store_queue( + Q1#amqqueue{gm_pids = [{GM, Self} | GMPids]}) + end), #state { gm = GM, coordinator = CPid, backing_queue = BQ, @@ -106,8 +119,16 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, ack_msg_id = dict:new(), known_senders = sets:new() }. +stop_mirroring(State = #state { coordinator = CPid, + backing_queue = BQ, + backing_queue_state = BQS }) -> + unlink(CPid), + stop_all_slaves(shutdown, State), + {BQ, BQS}. + terminate({shutdown, dropped} = Reason, - State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> %% Backing queue termination - this node has been explicitly %% dropped. Normally, non-durable queues would be tidied up on %% startup, but there's a possibility that we will be added back @@ -123,26 +144,30 @@ terminate(Reason, %% node. Thus just let some other slave take over. State #state { backing_queue_state = BQ:terminate(Reason, BQS) }. -delete_and_terminate(Reason, State = #state { gm = GM, - backing_queue = BQ, +delete_and_terminate(Reason, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + stop_all_slaves(Reason, State), + State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), + set_delivered = 0 }. + +stop_all_slaves(Reason, #state{gm = GM}) -> Info = gm:info(GM), Slaves = [Pid || Pid <- proplists:get_value(group_members, Info), node(Pid) =/= node()], MRefs = [erlang:monitor(process, S) || S <- Slaves], ok = gm:broadcast(GM, {delete_and_terminate, Reason}), - monitor_wait(MRefs), - ok = gm:forget_group(proplists:get_value(group_name, Info)), - State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), - set_delivered = 0 }. - -monitor_wait([]) -> - ok; -monitor_wait([MRef | MRefs]) -> - receive({'DOWN', MRef, process, _Pid, _Info}) -> - ok - end, - monitor_wait(MRefs). + [receive {'DOWN', MRef, process, _Pid, _Info} -> ok end || MRef <- MRefs], + %% Normally when we remove a slave another slave or master will + %% notice and update Mnesia. But we just removed them all, and + %% have stopped listening ourselves. So manually clean up. + QName = proplists:get_value(group_name, Info), + rabbit_misc:execute_mnesia_transaction( + fun () -> + [Q] = mnesia:read({rabbit_queue, QName}), + rabbit_mirror_queue_misc:store_updated_slaves( + Q #amqqueue { gm_pids = [], slave_pids = [] }) + end), + ok = gm:forget_group(QName). purge(State = #state { gm = GM, backing_queue = BQ, @@ -362,8 +387,10 @@ discard(Msg = #basic_message { id = MsgId }, ChPid, case dict:find(MsgId, SS) of error -> ok = gm:broadcast(GM, {discard, ChPid, Msg}), - State #state { backing_queue_state = BQ:discard(Msg, ChPid, BQS), - seen_status = dict:erase(MsgId, SS) }; + ensure_monitoring( + ChPid, State #state { + backing_queue_state = BQ:discard(Msg, ChPid, BQS), + seen_status = dict:erase(MsgId, SS) }); {ok, discarded} -> State end. @@ -372,13 +399,16 @@ discard(Msg = #basic_message { id = MsgId }, ChPid, %% Other exported functions %% --------------------------------------------------------------------------- -promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) -> - Len = BQ:len(BQS), - ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}), +promote_backing_queue_state(CPid, BQ, BQS, GM, AckTags, SeenStatus, KS) -> + {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), + Len = BQ:len(BQS1), + Depth = BQ:depth(BQS1), + true = Len == Depth, %% ASSERTION: everything must have been requeued + ok = gm:broadcast(GM, {depth, Depth}), #state { gm = GM, coordinator = CPid, backing_queue = BQ, - backing_queue_state = BQS, + backing_queue_state = BQS1, set_delivered = Len, seen_status = SeenStatus, confirmed = [], @@ -397,7 +427,7 @@ sender_death_fun() -> end) end. -length_fun() -> +depth_fun() -> Self = self(), fun () -> rabbit_amqqueue:run_backing_queue( @@ -410,10 +440,8 @@ length_fun() -> end) end. -maybe_store_acktag(undefined, _MsgId, AM) -> - AM; -maybe_store_acktag(AckTag, MsgId, AM) -> - dict:store(AckTag, MsgId, AM). +maybe_store_acktag(undefined, _MsgId, AM) -> AM; +maybe_store_acktag(AckTag, MsgId, AM) -> dict:store(AckTag, MsgId, AM). ensure_monitoring(ChPid, State = #state { coordinator = CPid, known_senders = KS }) -> diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 89e334ddd2..4c8406d9cf 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -16,9 +16,12 @@ -module(rabbit_mirror_queue_misc). --export([remove_from_queue/2, on_node_up/0, - drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3, - report_deaths/4, store_updated_slaves/1]). +-export([remove_from_queue/3, on_node_up/0, add_mirrors/2, add_mirror/2, + report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1, + is_mirrored/1, update_mirrors/2]). + +%% for testing only +-export([suggested_queue_nodes/4]). -include("rabbit.hrl"). @@ -26,19 +29,21 @@ -ifdef(use_specs). --spec(remove_from_queue/2 :: - (rabbit_amqqueue:name(), [pid()]) +-spec(remove_from_queue/3 :: + (rabbit_amqqueue:name(), pid(), [pid()]) -> {'ok', pid(), [pid()]} | {'error', 'not_found'}). -spec(on_node_up/0 :: () -> 'ok'). --spec(drop_mirror/2 :: - (rabbit_amqqueue:name(), node()) -> rabbit_types:ok_or_error(any())). +-spec(add_mirrors/2 :: (rabbit_amqqueue:name(), [node()]) -> 'ok'). -spec(add_mirror/2 :: - (rabbit_amqqueue:name(), node()) -> rabbit_types:ok_or_error(any())). --spec(add_mirror/3 :: - (rabbit_types:vhost(), binary(), atom()) - -> rabbit_types:ok_or_error(any())). + (rabbit_amqqueue:name(), node()) -> + {'ok', atom()} | rabbit_types:error(any())). -spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) -> rabbit_types:amqqueue()). +-spec(suggested_queue_nodes/1 :: (rabbit_types:amqqueue()) -> + {node(), [node()]}). +-spec(is_mirrored/1 :: (rabbit_types:amqqueue()) -> boolean()). +-spec(update_mirrors/2 :: + (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok'). -endif. @@ -52,8 +57,7 @@ %% slave (now master) receives messages it's not ready for (for %% example, new consumers). %% Returns {ok, NewMPid, DeadPids} -remove_from_queue(QueueName, DeadPids) -> - DeadNodes = [node(DeadPid) || DeadPid <- DeadPids], +remove_from_queue(QueueName, Self, DeadGMPids) -> rabbit_misc:execute_mnesia_transaction( fun () -> %% Someone else could have deleted the queue before we @@ -61,20 +65,27 @@ remove_from_queue(QueueName, DeadPids) -> case mnesia:read({rabbit_queue, QueueName}) of [] -> {error, not_found}; [Q = #amqqueue { pid = QPid, - slave_pids = SPids }] -> - [QPid1 | SPids1] = Alive = - [Pid || Pid <- [QPid | SPids], - not lists:member(node(Pid), DeadNodes)], + slave_pids = SPids, + gm_pids = GMPids }] -> + {Dead, GMPids1} = lists:partition( + fun ({GM, _}) -> + lists:member(GM, DeadGMPids) + end, GMPids), + DeadPids = [Pid || {_GM, Pid} <- Dead], + Alive = [QPid | SPids] -- DeadPids, + {QPid1, SPids1} = promote_slave(Alive), case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> + GMPids = GMPids1, %% ASSERTION {ok, QPid1, []}; - _ when QPid =:= QPid1 orelse node(QPid1) =:= node() -> + _ when QPid =:= QPid1 orelse QPid1 =:= Self -> %% Either master hasn't changed, so %% we're ok to update mnesia; or we have %% become the master. store_updated_slaves( Q #amqqueue { pid = QPid1, - slave_pids = SPids1 }), + slave_pids = SPids1, + gm_pids = GMPids1 }), {ok, QPid1, [QPid | SPids] -- Alive}; _ -> %% Master has changed, and we're not it, @@ -87,32 +98,41 @@ remove_from_queue(QueueName, DeadPids) -> end). on_node_up() -> - Qs = + QNames = rabbit_misc:execute_mnesia_transaction( fun () -> mnesia:foldl( - fun (#amqqueue { mirror_nodes = undefined }, QsN) -> - QsN; - (#amqqueue { name = QName, - mirror_nodes = all }, QsN) -> - [QName | QsN]; - (#amqqueue { name = QName, - mirror_nodes = MNodes }, QsN) -> - case lists:member(node(), MNodes) of - true -> [QName | QsN]; - false -> QsN + fun (Q = #amqqueue{name = QName, + pid = Pid, + slave_pids = SPids}, QNames0) -> + %% We don't want to pass in the whole + %% cluster - we don't want a situation + %% where starting one node causes us to + %% decide to start a mirror on another + PossibleNodes0 = [node(P) || P <- [Pid | SPids]], + PossibleNodes = + case lists:member(node(), PossibleNodes0) of + true -> PossibleNodes0; + false -> [node() | PossibleNodes0] + end, + {_MNode, SNodes} = suggested_queue_nodes( + Q, PossibleNodes), + case lists:member(node(), SNodes) of + true -> [QName | QNames0]; + false -> QNames0 end end, [], rabbit_queue) end), - [add_mirror(Q, node()) || Q <- Qs], + [{ok, _} = add_mirror(QName, node()) || QName <- QNames], ok. -drop_mirror(VHostPath, QueueName, MirrorNode) -> - drop_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). +drop_mirrors(QName, Nodes) -> + [ok = drop_mirror(QName, Node) || Node <- Nodes], + ok. -drop_mirror(Queue, MirrorNode) -> +drop_mirror(QName, MirrorNode) -> if_mirrored_queue( - Queue, + QName, fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids }) -> case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of [] -> @@ -128,56 +148,61 @@ drop_mirror(Queue, MirrorNode) -> end end). -add_mirror(VHostPath, QueueName, MirrorNode) -> - add_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). +add_mirrors(QName, Nodes) -> + [{ok, _} = add_mirror(QName, Node) || Node <- Nodes], + ok. -add_mirror(Queue, MirrorNode) -> +add_mirror(QName, MirrorNode) -> if_mirrored_queue( - Queue, + QName, fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) -> case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of [] -> start_child(Name, MirrorNode, Q); [SPid] -> case rabbit_misc:is_process_alive(SPid) of - true -> - {error,{queue_already_mirrored_on_node, - MirrorNode}}; - false -> - start_child(Name, MirrorNode, Q) + true -> {ok, already_mirrored}; + false -> start_child(Name, MirrorNode, Q) end end end). start_child(Name, MirrorNode, Q) -> - case rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) of + case rabbit_misc:with_exit_handler( + rabbit_misc:const({ok, down}), + fun () -> + rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) + end) of {ok, undefined} -> %% this means the mirror process was %% already running on the given node. - ok; + {ok, already_mirrored}; + {ok, down} -> + %% Node went down between us deciding to start a mirror + %% and actually starting it. Which is fine. + {ok, node_down}; {ok, SPid} -> rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", [rabbit_misc:rs(Name), MirrorNode, SPid]), - ok; + {ok, started}; {error, {{stale_master_pid, StalePid}, _}} -> rabbit_log:warning("Detected stale HA master while adding " "mirror of ~s on node ~p: ~p~n", [rabbit_misc:rs(Name), MirrorNode, StalePid]), - ok; + {ok, stale_master}; {error, {{duplicate_live_master, _}=Err, _}} -> - throw(Err); + Err; Other -> Other end. -if_mirrored_queue(Queue, Fun) -> - rabbit_amqqueue:with( - Queue, fun (#amqqueue { arguments = Args } = Q) -> - case rabbit_misc:table_lookup(Args, <<"x-ha-policy">>) of - undefined -> ok; - _ -> Fun(Q) - end - end). +if_mirrored_queue(QName, Fun) -> + rabbit_amqqueue:with(QName, fun (Q) -> + case is_mirrored(Q) of + false -> ok; + true -> Fun(Q) + end + end). report_deaths(_MirrorPid, _IsMaster, _QueueName, []) -> ok; @@ -201,3 +226,105 @@ store_updated_slaves(Q = #amqqueue{slave_pids = SPids, %% Wake it up so that we emit a stats event rabbit_amqqueue:wake_up(Q1), Q1. + +%%---------------------------------------------------------------------------- + +promote_slave([SPid | SPids]) -> + %% The slave pids are maintained in descending order of age, so + %% the one to promote is the oldest. + {SPid, SPids}. + +suggested_queue_nodes(Q) -> + suggested_queue_nodes(Q, rabbit_mnesia:cluster_nodes(running)). + +%% This variant exists so we can pull a call to +%% rabbit_mnesia:cluster_nodes(running) out of a loop or +%% transaction or both. +suggested_queue_nodes(Q, PossibleNodes) -> + {MNode0, SNodes} = actual_queue_nodes(Q), + MNode = case MNode0 of + none -> node(); + _ -> MNode0 + end, + suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q), + {MNode, SNodes}, PossibleNodes). + +policy(Policy, Q) -> + case rabbit_policy:get(Policy, Q) of + {ok, P} -> P; + _ -> none + end. + +suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, Possible) -> + {MNode, Possible -- [MNode]}; +suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) -> + Nodes = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0], + Unavailable = Nodes -- Possible, + Available = Nodes -- Unavailable, + case Available of + [] -> %% We have never heard of anything? Not much we can do but + %% keep the master alive. + {MNode, []}; + _ -> case lists:member(MNode, Available) of + true -> {MNode, Available -- [MNode]}; + false -> promote_slave(Available) + end + end; +%% When we need to add nodes, we randomise our candidate list as a +%% crude form of load-balancing. TODO it would also be nice to +%% randomise the list of ones to remove when we have too many - but +%% that would fail to take account of synchronisation... +suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, Possible) -> + SCount = Count - 1, + {MNode, case SCount > length(SNodes) of + true -> Cand = shuffle((Possible -- [MNode]) -- SNodes), + SNodes ++ lists:sublist(Cand, SCount - length(SNodes)); + false -> lists:sublist(SNodes, SCount) + end}; +suggested_queue_nodes(_, _, {MNode, _}, _) -> + {MNode, []}. + +shuffle(L) -> + {A1,A2,A3} = now(), + random:seed(A1, A2, A3), + {_, L1} = lists:unzip(lists:keysort(1, [{random:uniform(), N} || N <- L])), + L1. + +actual_queue_nodes(#amqqueue{pid = MPid, slave_pids = SPids}) -> + {case MPid of + none -> none; + _ -> node(MPid) + end, [node(Pid) || Pid <- SPids]}. + +is_mirrored(Q) -> + case policy(<<"ha-mode">>, Q) of + <<"all">> -> true; + <<"nodes">> -> true; + <<"exactly">> -> true; + _ -> false + end. + + +%% [1] - rabbit_amqqueue:start_mirroring/1 will turn unmirrored to +%% master and start any needed slaves. However, if node(QPid) is not +%% in the nodes for the policy, it won't switch it. So this is for the +%% case where we kill the existing queue and restart elsewhere. TODO: +%% is this TRTTD? All alternatives seem ugly. +update_mirrors(OldQ = #amqqueue{pid = QPid}, + NewQ = #amqqueue{pid = QPid}) -> + case {is_mirrored(OldQ), is_mirrored(NewQ)} of + {false, false} -> ok; + {true, false} -> rabbit_amqqueue:stop_mirroring(QPid); + {false, true} -> rabbit_amqqueue:start_mirroring(QPid), + update_mirrors0(OldQ, NewQ); %% [1] + {true, true} -> update_mirrors0(OldQ, NewQ) + end. + +update_mirrors0(OldQ = #amqqueue{name = QName}, + NewQ = #amqqueue{name = QName}) -> + All = fun ({A,B}) -> [A|B] end, + OldNodes = All(actual_queue_nodes(OldQ)), + NewNodes = All(suggested_queue_nodes(NewQ)), + add_mirrors(QName, NewNodes -- OldNodes), + drop_mirrors(QName, OldNodes -- NewNodes), + ok. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 625bcdffba..3d8bd8b408 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -64,7 +64,6 @@ -record(state, { q, gm, - master_pid, backing_queue, backing_queue_state, sync_timer_ref, @@ -72,7 +71,6 @@ sender_queues, %% :: Pid -> {Q Msg, Set MsgId} msg_id_ack, %% :: MsgId -> AckTag - ack_num, msg_id_status, known_senders, @@ -88,7 +86,7 @@ set_maximum_since_use(QPid, Age) -> info(QPid) -> gen_server2:call(QPid, info, infinity). -init(#amqqueue { name = QueueName } = Q) -> +init(Q = #amqqueue { name = QName }) -> %% We join the GM group before we add ourselves to the amqqueue %% record. As a result: %% 1. We can receive msgs from GM that correspond to messages we will @@ -101,23 +99,23 @@ init(#amqqueue { name = QueueName } = Q) -> %% above. %% process_flag(trap_exit, true), %% amqqueue_process traps exits too. - {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), + {ok, GM} = gm:start_link(QName, ?MODULE, [self()]), receive {joined, GM} -> ok end, Self = self(), Node = node(), case rabbit_misc:execute_mnesia_transaction( - fun() -> init_it(Self, Node, QueueName) end) of - {new, MPid} -> - erlang:monitor(process, MPid), + fun() -> init_it(Self, GM, Node, QName) end) of + {new, QPid} -> + erlang:monitor(process, QPid), ok = file_handle_cache:register_callback( rabbit_amqqueue, set_maximum_since_use, [Self]), ok = rabbit_memory_monitor:register( Self, {rabbit_amqqueue, set_ram_duration_target, [Self]}), {ok, BQ} = application:get_env(backing_queue_module), - BQS = bq_init(BQ, Q, false), - State = #state { q = Q, + Q1 = Q #amqqueue { pid = QPid }, + BQS = bq_init(BQ, Q1, false), + State = #state { q = Q1, gm = GM, - master_pid = MPid, backing_queue = BQ, backing_queue_state = BQS, rate_timer_ref = undefined, @@ -125,7 +123,6 @@ init(#amqqueue { name = QueueName } = Q) -> sender_queues = dict:new(), msg_id_ack = dict:new(), - ack_num = 0, msg_id_status = dict:new(), known_senders = pmon:new(), @@ -134,7 +131,7 @@ init(#amqqueue { name = QueueName } = Q) -> }, rabbit_event:notify(queue_slave_created, infos(?CREATION_EVENT_KEYS, State)), - ok = gm:broadcast(GM, request_length), + ok = gm:broadcast(GM, request_depth), {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}; @@ -143,16 +140,15 @@ init(#amqqueue { name = QueueName } = Q) -> duplicate_live_master -> {stop, {duplicate_live_master, Node}}; existing -> + gm:leave(GM), ignore end. -init_it(Self, Node, QueueName) -> - [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = - mnesia:read({rabbit_queue, QueueName}), - case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of - [] -> MPids1 = MPids ++ [Self], - rabbit_mirror_queue_misc:store_updated_slaves( - Q1#amqqueue{slave_pids = MPids1}), +init_it(Self, GM, Node, QName) -> + [Q = #amqqueue { pid = QPid, slave_pids = SPids, gm_pids = GMPids }] = + mnesia:read({rabbit_queue, QName}), + case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of + [] -> add_slave(Q, Self, GM), {new, QPid}; [QPid] -> case rabbit_misc:is_process_alive(QPid) of true -> duplicate_live_master; @@ -160,48 +156,50 @@ init_it(Self, Node, QueueName) -> end; [SPid] -> case rabbit_misc:is_process_alive(SPid) of true -> existing; - false -> MPids1 = (MPids -- [SPid]) ++ [Self], - rabbit_mirror_queue_misc:store_updated_slaves( - Q1#amqqueue{slave_pids = MPids1}), + false -> Q1 = Q#amqqueue { + slave_pids = SPids -- [SPid], + gm_pids = [T || T = {_, S} <- GMPids, + S =/= SPid] }, + add_slave(Q1, Self, GM), {new, QPid} end end. -handle_call({deliver, Delivery}, From, State) -> +%% Add to the end, so they are in descending order of age, see +%% rabbit_mirror_queue_misc:promote_slave/1 +add_slave(Q = #amqqueue { slave_pids = SPids, gm_pids = GMPids }, New, GM) -> + rabbit_mirror_queue_misc:store_updated_slaves( + Q#amqqueue{slave_pids = SPids ++ [New], gm_pids = [{GM, New} | GMPids]}). + +handle_call({deliver, Delivery, true}, From, State) -> %% Synchronous, "mandatory" deliver mode. gen_server2:reply(From, ok), noreply(maybe_enqueue_message(Delivery, State)); handle_call({gm_deaths, Deaths}, From, - State = #state { q = #amqqueue { name = QueueName }, - gm = GM, - master_pid = MPid }) -> - %% The GM has told us about deaths, which means we're not going to - %% receive any more messages from GM - case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of + State = #state { q = Q = #amqqueue { name = QName, pid = MPid }}) -> + Self = self(), + case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, Deaths) of {error, not_found} -> gen_server2:reply(From, ok), {stop, normal, State}; {ok, Pid, DeadPids} -> - rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName, + rabbit_mirror_queue_misc:report_deaths(Self, false, QName, DeadPids), - if node(Pid) =:= node(MPid) -> + case Pid of + MPid -> %% master hasn't changed - reply(ok, State); - node(Pid) =:= node() -> + gen_server2:reply(From, ok), + noreply(State); + Self -> %% we've become master - promote_me(From, State); - true -> - %% master has changed to not us. + QueueState = promote_me(From, State), + {become, rabbit_amqqueue_process, QueueState, hibernate}; + _ -> + %% master has changed to not us gen_server2:reply(From, ok), erlang:monitor(process, Pid), - %% GM is lazy. So we know of the death of the - %% slave since it is a neighbour of ours, but - %% until a message is sent, not all members will - %% know. That might include the new master. So - %% broadcast a no-op message to wake everyone up. - ok = gm:broadcast(GM, master_changed), - noreply(State #state { master_pid = Pid }) + noreply(State #state { q = Q #amqqueue { pid = Pid } }) end end; @@ -214,7 +212,8 @@ handle_cast({run_backing_queue, Mod, Fun}, State) -> handle_cast({gm, Instruction}, State) -> handle_process_result(process_instruction(Instruction, State)); -handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) -> +handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow}, + State) -> %% Asynchronous, non-"mandatory", deliver mode. case Flow of flow -> credit_flow:ack(Sender); @@ -250,8 +249,8 @@ handle_info(timeout, State) -> noreply(backing_queue_timeout(State)); handle_info({'DOWN', _MonitorRef, process, MPid, _Reason}, - State = #state { gm = GM, master_pid = MPid }) -> - ok = gm:broadcast(GM, {process_death, MPid}), + State = #state { gm = GM, q = #amqqueue { pid = MPid } }) -> + ok = gm:broadcast(GM, process_death), noreply(State); handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) -> @@ -287,7 +286,7 @@ terminate(Reason, #state { q = Q, rate_timer_ref = RateTRef }) -> ok = gm:leave(GM), QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( - Q, BQ, BQS, RateTRef, [], [], pmon:new(), dict:new()), + Q, BQ, BQS, RateTRef, [], pmon:new(), dict:new()), rabbit_amqqueue_process:terminate(Reason, QueueState); terminate([_SPid], _Reason) -> %% gm case @@ -338,16 +337,21 @@ joined([SPid], _Members) -> SPid ! {joined, self()}, ok. members_changed([_SPid], _Births, []) -> ok; members_changed([ SPid], _Births, Deaths) -> inform_deaths(SPid, Deaths). -handle_msg([_SPid], _From, master_changed) -> - ok; -handle_msg([_SPid], _From, request_length) -> +handle_msg([_SPid], _From, request_depth) -> %% This is only of value to the master ok; handle_msg([_SPid], _From, {ensure_monitoring, _Pid}) -> %% This is only of value to the master ok; -handle_msg([SPid], _From, {process_death, Pid}) -> - inform_deaths(SPid, [Pid]); +handle_msg([_SPid], _From, process_death) -> + %% Since GM is by nature lazy we need to make sure there is some + %% traffic when a master dies, to make sure we get informed of the + %% death. That's all process_death does, create some traffic. We + %% must not take any notice of the master death here since it + %% comes without ordering guarantees - there could still be + %% messages from the master we have yet to receive. When we get + %% members_changed, then there will be no more messages. + ok; handle_msg([CPid], _From, {delete_and_terminate, _Reason} = Msg) -> ok = gen_server2:cast(CPid, {gm, Msg}), {stop, {shutdown, ring_shutdown}}; @@ -368,7 +372,7 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, _State) -> self(); i(name, #state { q = #amqqueue { name = Name } }) -> Name; -i(master_pid, #state { master_pid = MPid }) -> MPid; +i(master_pid, #state { q = #amqqueue { pid = MPid } }) -> MPid; i(is_synchronised, #state { depth_delta = DD }) -> DD =:= 0; i(Item, _State) -> throw({bad_argument, Item}). @@ -387,14 +391,20 @@ run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }. -needs_confirming(#delivery{ msg_seq_no = undefined }, _State) -> - never; -needs_confirming(#delivery { message = #basic_message { - is_persistent = true } }, - #state { q = #amqqueue { durable = true } }) -> - eventually; -needs_confirming(_Delivery, _State) -> - immediately. +send_or_record_confirm(_, #delivery{ msg_seq_no = undefined }, MS, _State) -> + MS; +send_or_record_confirm(published, #delivery { sender = ChPid, + msg_seq_no = MsgSeqNo, + message = #basic_message { + id = MsgId, + is_persistent = true } }, + MS, #state { q = #amqqueue { durable = true } }) -> + dict:store(MsgId, {published, ChPid, MsgSeqNo} , MS); +send_or_record_confirm(_Status, #delivery { sender = ChPid, + msg_seq_no = MsgSeqNo }, + MS, _State) -> + ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), + MS. confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> {CMs, MS1} = @@ -406,16 +416,16 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> %% If it needed confirming, it'll have %% already been done. Acc; - {ok, {published, ChPid}} -> + {ok, published} -> %% Still not seen it from the channel, just %% record that it's been confirmed. - {CMsN, dict:store(MsgId, {confirmed, ChPid}, MSN)}; + {CMsN, dict:store(MsgId, confirmed, MSN)}; {ok, {published, ChPid, MsgSeqNo}} -> %% Seen from both GM and Channel. Can now %% confirm. {rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMsN), dict:erase(MsgId, MSN)}; - {ok, {confirmed, _ChPid}} -> + {ok, confirmed} -> %% It's already been confirmed. This is %% probably it's been both sync'd to disk %% and then delivered and ack'd before we've @@ -444,12 +454,9 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, Q1 = Q #amqqueue { pid = self() }, {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( Q1, GM, rabbit_mirror_queue_master:sender_death_fun(), - rabbit_mirror_queue_master:length_fun()), + rabbit_mirror_queue_master:depth_fun()), true = unlink(GM), gen_server2:reply(From, {promote, CPid}), - %% TODO this has been in here since the beginning, but it's not - %% obvious if it is needed. Investigate... - ok = gm:confirmed_broadcast(GM, master_changed), %% Everything that we're monitoring, we need to ensure our new %% coordinator is monitoring. @@ -485,18 +492,18 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, %% %% MS contains the following three entry types: %% - %% a) {published, ChPid}: + %% a) published: %% published via gm only; pending arrival of publication from %% channel, maybe pending confirm. %% %% b) {published, ChPid, MsgSeqNo}: %% published via gm and channel; pending confirm. %% - %% c) {confirmed, ChPid}: + %% c) confirmed: %% published via gm only, and confirmed; pending publication %% from channel. %% - %% d) discarded + %% d) discarded: %% seen via gm only as discarded. Pending publication from %% channel %% @@ -513,34 +520,24 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, %% those messages are then requeued. However, as discussed above, %% this does not affect MS, nor which bits go through to SS in %% Master, or MTC in queue_process. - %% - %% Everything that's in MA gets requeued. Consequently the new - %% master should start with a fresh AM as there are no messages - %% pending acks. - MSList = dict:to_list(MS), - SS = dict:from_list( - [E || E = {_MsgId, discarded} <- MSList] ++ - [{MsgId, Status} - || {MsgId, {Status, _ChPid}} <- MSList, - Status =:= published orelse Status =:= confirmed]), + St = [published, confirmed, discarded], + SS = dict:filter(fun (_MsgId, Status) -> lists:member(Status, St) end, MS), + AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)], MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( - CPid, BQ, BQS, GM, SS, MPids), - - MTC = lists:foldl(fun ({MsgId, {published, ChPid, MsgSeqNo}}, MTC0) -> - gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0); - (_, MTC0) -> - MTC0 - end, gb_trees:empty(), MSList), - NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)], - AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)], + CPid, BQ, BQS, GM, AckTags, SS, MPids), + + MTC = dict:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) -> + gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0); + (_Msgid, _Status, MTC0) -> + MTC0 + end, gb_trees:empty(), MS), Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ), Delivery <- queue:to_list(PubQ)], - QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( - Q1, rabbit_mirror_queue_master, MasterState, RateTRef, - AckTags, Deliveries, KS, MTC), - {become, rabbit_amqqueue_process, QueueState, hibernate}. + rabbit_amqqueue_process:init_with_backing_queue_state( + Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS, + MTC). noreply(State) -> {NewState, Timeout} = next_state(State), @@ -634,9 +631,8 @@ confirm_sender_death(Pid) -> ok. maybe_enqueue_message( - Delivery = #delivery { message = #basic_message { id = MsgId }, - msg_seq_no = MsgSeqNo, - sender = ChPid }, + Delivery = #delivery { message = #basic_message { id = MsgId }, + sender = ChPid }, State = #state { sender_queues = SQ, msg_id_status = MS }) -> State1 = ensure_monitoring(ChPid, State), %% We will never see {published, ChPid, MsgSeqNo} here. @@ -646,34 +642,11 @@ maybe_enqueue_message( MQ1 = queue:in(Delivery, MQ), SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ), State1 #state { sender_queues = SQ1 }; - {ok, {confirmed, ChPid}} -> - %% BQ has confirmed it but we didn't know what the - %% msg_seq_no was at the time. We do now! - ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), + {ok, Status} -> + MS1 = send_or_record_confirm( + Status, Delivery, dict:erase(MsgId, MS), State1), SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), - State1 #state { msg_id_status = dict:erase(MsgId, MS), - sender_queues = SQ1 }; - {ok, {published, ChPid}} -> - %% It was published to the BQ and we didn't know the - %% msg_seq_no so couldn't confirm it at the time. - {MS1, SQ1} = - case needs_confirming(Delivery, State1) of - never -> {dict:erase(MsgId, MS), - remove_from_pending_ch(MsgId, ChPid, SQ)}; - eventually -> MMS = {published, ChPid, MsgSeqNo}, - {dict:store(MsgId, MMS, MS), SQ}; - immediately -> ok = rabbit_misc:confirm_to_sender( - ChPid, [MsgSeqNo]), - {dict:erase(MsgId, MS), - remove_from_pending_ch(MsgId, ChPid, SQ)} - end, State1 #state { msg_id_status = MS1, - sender_queues = SQ1 }; - {ok, discarded} -> - %% We've already heard from GM that the msg is to be - %% discarded. We won't see this again. - SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), - State1 #state { msg_id_status = dict:erase(MsgId, MS), sender_queues = SQ1 } end. @@ -691,42 +664,26 @@ remove_from_pending_ch(MsgId, ChPid, SQ) -> dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh)}, SQ) end. -process_instruction( - {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId }}, - State = #state { sender_queues = SQ, - backing_queue = BQ, - backing_queue_state = BQS, - msg_id_status = MS }) -> - - %% We really are going to do the publish right now, even though we - %% may not have seen it directly from the channel. As a result, we - %% may know that it needs confirming without knowing its - %% msg_seq_no, which means that we can see the confirmation come - %% back from the backing queue without knowing the msg_seq_no, - %% which means that we're going to have to hang on to the fact - %% that we've seen the msg_id confirmed until we can associate it - %% with a msg_seq_no. +publish_or_discard(Status, ChPid, MsgId, + State = #state { sender_queues = SQ, msg_id_status = MS }) -> + %% We really are going to do the publish/discard right now, even + %% though we may not have seen it directly from the channel. But + %% we cannot issues confirms until the latter has happened. So we + %% need to keep track of the MsgId and its confirmation status in + %% the meantime. State1 = ensure_monitoring(ChPid, State), {MQ, PendingCh} = get_sender_queue(ChPid, SQ), {MQ1, PendingCh1, MS1} = case queue:out(MQ) of {empty, _MQ2} -> {MQ, sets:add_element(MsgId, PendingCh), - dict:store(MsgId, {published, ChPid}, MS)}; + dict:store(MsgId, Status, MS)}; {{value, Delivery = #delivery { - msg_seq_no = MsgSeqNo, - message = #basic_message { id = MsgId } }}, MQ2} -> + message = #basic_message { id = MsgId } }}, MQ2} -> {MQ2, PendingCh, %% We received the msg from the channel first. Thus %% we need to deal with confirms here. - case needs_confirming(Delivery, State1) of - never -> MS; - eventually -> MMS = {published, ChPid, MsgSeqNo}, - dict:store(MsgId, MMS , MS); - immediately -> ok = rabbit_misc:confirm_to_sender( - ChPid, [MsgSeqNo]), - MS - end}; + send_or_record_confirm(Status, Delivery, MS, State1)}; {{value, #delivery {}}, _MQ2} -> %% The instruction was sent to us before we were %% within the slave_pids within the #amqqueue{} @@ -735,52 +692,30 @@ process_instruction( %% expecting any confirms from us. {MQ, PendingCh, MS} end, - SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ), - State2 = State1 #state { sender_queues = SQ1, msg_id_status = MS1 }, - - {ok, - case Deliver of - false -> - BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), - State2 #state { backing_queue_state = BQS1 }; - {true, AckRequired} -> - {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, - ChPid, BQS), - maybe_store_ack(AckRequired, MsgId, AckTag, - State2 #state { backing_queue_state = BQS1 }) - end}; + State1 #state { sender_queues = SQ1, msg_id_status = MS1 }. + + +process_instruction({publish, false, ChPid, MsgProps, + Msg = #basic_message { id = MsgId }}, State) -> + State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = + publish_or_discard(published, ChPid, MsgId, State), + BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + {ok, State1 #state { backing_queue_state = BQS1 }}; +process_instruction({publish, {true, AckRequired}, ChPid, MsgProps, + Msg = #basic_message { id = MsgId }}, State) -> + State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = + publish_or_discard(published, ChPid, MsgId, State), + {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, + ChPid, BQS), + {ok, maybe_store_ack(AckRequired, MsgId, AckTag, + State1 #state { backing_queue_state = BQS1 })}; process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, - State = #state { sender_queues = SQ, - backing_queue = BQ, - backing_queue_state = BQS, - msg_id_status = MS }) -> - %% Many of the comments around the publish head above apply here - %% too. - State1 = ensure_monitoring(ChPid, State), - {MQ, PendingCh} = get_sender_queue(ChPid, SQ), - {MQ1, PendingCh1, MS1} = - case queue:out(MQ) of - {empty, _MQ} -> - {MQ, sets:add_element(MsgId, PendingCh), - dict:store(MsgId, discarded, MS)}; - {{value, #delivery { message = #basic_message { id = MsgId } }}, - MQ2} -> - %% We've already seen it from the channel, we're not - %% going to see this again, so don't add it to MS - {MQ2, PendingCh, MS}; - {{value, #delivery {}}, _MQ2} -> - %% The instruction was sent to us before we were - %% within the slave_pids within the #amqqueue{} - %% record. We'll never receive the message directly - %% from the channel. - {MQ, PendingCh, MS} - end, - SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ), + State) -> + State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = + publish_or_discard(discarded, ChPid, MsgId, State), BQS1 = BQ:discard(Msg, ChPid, BQS), - {ok, State1 #state { sender_queues = SQ1, - msg_id_status = MS1, - backing_queue_state = BQS1 }}; + {ok, State1 #state { backing_queue_state = BQS1 }}; process_instruction({drop, Length, Dropped, AckRequired}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> @@ -867,19 +802,16 @@ msg_ids_to_acktags(MsgIds, MA) -> lists:foldl( fun (MsgId, {Acc, MAN}) -> case dict:find(MsgId, MA) of - error -> {Acc, MAN}; - {ok, {_Num, AckTag}} -> {[AckTag | Acc], - dict:erase(MsgId, MAN)} + error -> {Acc, MAN}; + {ok, AckTag} -> {[AckTag | Acc], dict:erase(MsgId, MAN)} end end, {[], MA}, MsgIds), {lists:reverse(AckTags), MA1}. maybe_store_ack(false, _MsgId, _AckTag, State) -> State; -maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, - ack_num = Num }) -> - State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA), - ack_num = Num + 1 }. +maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA }) -> + State #state { msg_id_ack = dict:store(MsgId, AckTag, MA) }. set_delta(0, State = #state { depth_delta = undefined }) -> ok = record_synchronised(State#state.q), diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index ae36febb33..870692282f 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -14,7 +14,6 @@ %% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. %% - -module(rabbit_mnesia). -export([init/0, @@ -26,18 +25,14 @@ forget_cluster_node/2, status/0, - is_db_empty/0, is_clustered/0, cluster_nodes/1, node_type/0, dir/0, - table_names/0, cluster_status_from_mnesia/0, init_db_unchecked/2, - empty_ram_only_tables/0, copy_db/1, - wait_for_tables/1, check_cluster_consistency/0, ensure_mnesia_dir/0, @@ -51,10 +46,6 @@ is_running_remote/0 ]). -%% create_tables/0 exported for helping embed RabbitMQ in or alongside -%% other mnesia-using Erlang applications, such as ejabberd --export([create_tables/0]). - -include("rabbit.hrl"). %%---------------------------------------------------------------------------- @@ -78,21 +69,16 @@ %% Various queries to get the status of the db -spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} | {'running_nodes', [node()]}]). --spec(is_db_empty/0 :: () -> boolean()). -spec(is_clustered/0 :: () -> boolean()). -spec(cluster_nodes/1 :: ('all' | 'disc' | 'ram' | 'running') -> [node()]). -spec(node_type/0 :: () -> node_type()). -spec(dir/0 :: () -> file:filename()). --spec(table_names/0 :: () -> [atom()]). -spec(cluster_status_from_mnesia/0 :: () -> rabbit_types:ok_or_error2( cluster_status(), any())). %% Operations on the db and utils, mainly used in `rabbit_upgrade' and `rabbit' -spec(init_db_unchecked/2 :: ([node()], node_type()) -> 'ok'). --spec(empty_ram_only_tables/0 :: () -> 'ok'). --spec(create_tables/0 :: () -> 'ok'). -spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())). --spec(wait_for_tables/1 :: ([atom()]) -> 'ok'). -spec(check_cluster_consistency/0 :: () -> 'ok'). -spec(ensure_mnesia_dir/0 :: () -> 'ok'). @@ -290,6 +276,9 @@ forget_cluster_node(Node, RemoveWhenOffline) -> end. remove_node_offline_node(Node) -> + %% We want the running nodes *now*, so we don't call + %% `cluster_nodes(running)' which will just get what's in the cluster status + %% file. case {running_nodes(cluster_nodes(all)) -- [Node], node_type()} of {[], disc} -> %% Note that while we check if the nodes was the last to @@ -303,9 +292,13 @@ remove_node_offline_node(Node) -> case cluster_nodes(running) -- [node(), Node] of [] -> start_mnesia(), try - [mnesia:force_load_table(T) || T <- table_names()], - forget_cluster_node(Node, false), - ensure_mnesia_running() + %% What we want to do here is replace the last node to + %% go down with the current node. The way we do this + %% is by force loading the table, and making sure that + %% they are loaded. + rabbit_table:force_load(), + rabbit_table:wait_for_replicated(), + forget_cluster_node(Node, false) after stop_mnesia() end; @@ -331,10 +324,6 @@ status() -> no -> [] end. -is_db_empty() -> - lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end, - table_names()). - is_clustered() -> AllNodes = cluster_nodes(all), AllNodes =/= [] andalso AllNodes =/= [node()]. @@ -356,9 +345,7 @@ mnesia_nodes() -> true -> disc; false -> ram end, - Tables = mnesia:system_info(tables), - [{Table, _} | _] = table_definitions(NodeType), - case lists:member(Table, Tables) of + case rabbit_table:is_present() of true -> AllNodes = mnesia:system_info(db_nodes), DiscCopies = mnesia:table_info(schema, disc_copies), DiscNodes = case NodeType of @@ -413,8 +400,6 @@ node_type() -> dir() -> mnesia:system_info(directory). -table_names() -> [Tab || {Tab, _} <- table_definitions()]. - %%---------------------------------------------------------------------------- %% Operations on the db %%---------------------------------------------------------------------------- @@ -444,18 +429,8 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) -> %% Subsequent node in cluster, catch up ensure_version_ok( rpc:call(AnotherNode, rabbit_version, recorded, [])), - ok = wait_for_replicated_tables(), - %% The sequence in which we delete the schema and then the - %% other tables is important: if we delete the schema - %% first when moving to RAM mnesia will loudly complain - %% since it doesn't make much sense to do that. But when - %% moving to disc, we need to move the schema first. - case NodeType of - disc -> create_local_table_copy(schema, disc_copies), - create_local_table_copies(disc); - ram -> create_local_table_copies(ram), - create_local_table_copy(schema, ram_copies) - end + ok = rabbit_table:wait_for_replicated(), + ok = rabbit_table:create_local_copy(NodeType) end, ensure_schema_integrity(), rabbit_node_monitor:update_cluster_status(), @@ -476,7 +451,7 @@ init_db_and_upgrade(ClusterNodes, NodeType, CheckOtherNodes) -> case NodeType of ram -> start_mnesia(), change_extra_db_nodes(ClusterNodes, false), - wait_for_replicated_tables(); + rabbit_table:wait_for_replicated(); disc -> ok end, ok. @@ -522,70 +497,17 @@ ensure_mnesia_not_running() -> end. ensure_schema_integrity() -> - case check_schema_integrity() of + case rabbit_table:check_schema_integrity() of ok -> ok; {error, Reason} -> throw({error, {schema_integrity_check_failed, Reason}}) end. -check_schema_integrity() -> - Tables = mnesia:system_info(tables), - case check_tables(fun (Tab, TabDef) -> - case lists:member(Tab, Tables) of - false -> {error, {table_missing, Tab}}; - true -> check_table_attributes(Tab, TabDef) - end - end) of - ok -> ok = wait_for_tables(table_names()), - check_tables(fun check_table_content/2); - Other -> Other - end. - -empty_ram_only_tables() -> - Node = node(), - lists:foreach( - fun (TabName) -> - case lists:member(Node, mnesia:table_info(TabName, ram_copies)) of - true -> {atomic, ok} = mnesia:clear_table(TabName); - false -> ok - end - end, table_names()), - ok. - -create_tables() -> create_tables(disc). - -create_tables(Type) -> - lists:foreach(fun ({Tab, TabDef}) -> - TabDef1 = proplists:delete(match, TabDef), - case mnesia:create_table(Tab, TabDef1) of - {atomic, ok} -> ok; - {aborted, Reason} -> - throw({error, {table_creation_failed, - Tab, TabDef1, Reason}}) - end - end, - table_definitions(Type)), - ok. - copy_db(Destination) -> ok = ensure_mnesia_not_running(), rabbit_file:recursive_copy(dir(), Destination). -wait_for_replicated_tables() -> - wait_for_tables([Tab || {Tab, TabDef} <- table_definitions(), - not lists:member({local_content, true}, TabDef)]). - -wait_for_tables(TableNames) -> - case mnesia:wait_for_tables(TableNames, 30000) of - ok -> - ok; - {timeout, BadTabs} -> - throw({error, {timeout_waiting_for_tables, BadTabs}}); - {error, Reason} -> - throw({error, {failed_waiting_for_tables, Reason}}) - end. - %% This does not guarantee us much, but it avoids some situations that %% will definitely end up badly check_cluster_consistency() -> @@ -678,158 +600,8 @@ discover_cluster(Node) -> end end. -%% The tables aren't supposed to be on disk on a ram node -table_definitions(disc) -> - table_definitions(); -table_definitions(ram) -> - [{Tab, copy_type_to_ram(TabDef)} || {Tab, TabDef} <- table_definitions()]. - -table_definitions() -> - [{rabbit_user, - [{record_name, internal_user}, - {attributes, record_info(fields, internal_user)}, - {disc_copies, [node()]}, - {match, #internal_user{_='_'}}]}, - {rabbit_user_permission, - [{record_name, user_permission}, - {attributes, record_info(fields, user_permission)}, - {disc_copies, [node()]}, - {match, #user_permission{user_vhost = #user_vhost{_='_'}, - permission = #permission{_='_'}, - _='_'}}]}, - {rabbit_vhost, - [{record_name, vhost}, - {attributes, record_info(fields, vhost)}, - {disc_copies, [node()]}, - {match, #vhost{_='_'}}]}, - {rabbit_listener, - [{record_name, listener}, - {attributes, record_info(fields, listener)}, - {type, bag}, - {match, #listener{_='_'}}]}, - {rabbit_durable_route, - [{record_name, route}, - {attributes, record_info(fields, route)}, - {disc_copies, [node()]}, - {match, #route{binding = binding_match(), _='_'}}]}, - {rabbit_semi_durable_route, - [{record_name, route}, - {attributes, record_info(fields, route)}, - {type, ordered_set}, - {match, #route{binding = binding_match(), _='_'}}]}, - {rabbit_route, - [{record_name, route}, - {attributes, record_info(fields, route)}, - {type, ordered_set}, - {match, #route{binding = binding_match(), _='_'}}]}, - {rabbit_reverse_route, - [{record_name, reverse_route}, - {attributes, record_info(fields, reverse_route)}, - {type, ordered_set}, - {match, #reverse_route{reverse_binding = reverse_binding_match(), - _='_'}}]}, - {rabbit_topic_trie_node, - [{record_name, topic_trie_node}, - {attributes, record_info(fields, topic_trie_node)}, - {type, ordered_set}, - {match, #topic_trie_node{trie_node = trie_node_match(), _='_'}}]}, - {rabbit_topic_trie_edge, - [{record_name, topic_trie_edge}, - {attributes, record_info(fields, topic_trie_edge)}, - {type, ordered_set}, - {match, #topic_trie_edge{trie_edge = trie_edge_match(), _='_'}}]}, - {rabbit_topic_trie_binding, - [{record_name, topic_trie_binding}, - {attributes, record_info(fields, topic_trie_binding)}, - {type, ordered_set}, - {match, #topic_trie_binding{trie_binding = trie_binding_match(), - _='_'}}]}, - {rabbit_durable_exchange, - [{record_name, exchange}, - {attributes, record_info(fields, exchange)}, - {disc_copies, [node()]}, - {match, #exchange{name = exchange_name_match(), _='_'}}]}, - {rabbit_exchange, - [{record_name, exchange}, - {attributes, record_info(fields, exchange)}, - {match, #exchange{name = exchange_name_match(), _='_'}}]}, - {rabbit_exchange_serial, - [{record_name, exchange_serial}, - {attributes, record_info(fields, exchange_serial)}, - {match, #exchange_serial{name = exchange_name_match(), _='_'}}]}, - {rabbit_runtime_parameters, - [{record_name, runtime_parameters}, - {attributes, record_info(fields, runtime_parameters)}, - {disc_copies, [node()]}, - {match, #runtime_parameters{_='_'}}]}, - {rabbit_durable_queue, - [{record_name, amqqueue}, - {attributes, record_info(fields, amqqueue)}, - {disc_copies, [node()]}, - {match, #amqqueue{name = queue_name_match(), _='_'}}]}, - {rabbit_queue, - [{record_name, amqqueue}, - {attributes, record_info(fields, amqqueue)}, - {match, #amqqueue{name = queue_name_match(), _='_'}}]}] - ++ gm:table_definitions() - ++ mirrored_supervisor:table_definitions(). - -binding_match() -> - #binding{source = exchange_name_match(), - destination = binding_destination_match(), - _='_'}. -reverse_binding_match() -> - #reverse_binding{destination = binding_destination_match(), - source = exchange_name_match(), - _='_'}. -binding_destination_match() -> - resource_match('_'). -trie_node_match() -> - #trie_node{ exchange_name = exchange_name_match(), _='_'}. -trie_edge_match() -> - #trie_edge{ exchange_name = exchange_name_match(), _='_'}. -trie_binding_match() -> - #trie_binding{exchange_name = exchange_name_match(), _='_'}. -exchange_name_match() -> - resource_match(exchange). -queue_name_match() -> - resource_match(queue). -resource_match(Kind) -> - #resource{kind = Kind, _='_'}. - -check_table_attributes(Tab, TabDef) -> - {_, ExpAttrs} = proplists:lookup(attributes, TabDef), - case mnesia:table_info(Tab, attributes) of - ExpAttrs -> ok; - Attrs -> {error, {table_attributes_mismatch, Tab, ExpAttrs, Attrs}} - end. - -check_table_content(Tab, TabDef) -> - {_, Match} = proplists:lookup(match, TabDef), - case mnesia:dirty_first(Tab) of - '$end_of_table' -> - ok; - Key -> - ObjList = mnesia:dirty_read(Tab, Key), - MatchComp = ets:match_spec_compile([{Match, [], ['$_']}]), - case ets:match_spec_run(ObjList, MatchComp) of - ObjList -> ok; - _ -> {error, {table_content_invalid, Tab, Match, ObjList}} - end - end. - -check_tables(Fun) -> - case [Error || {Tab, TabDef} <- table_definitions(node_type()), - case Fun(Tab, TabDef) of - ok -> Error = none, false; - {error, Error} -> true - end] of - [] -> ok; - Errors -> {error, Errors} - end. - schema_ok_or_move() -> - case check_schema_integrity() of + case rabbit_table:check_schema_integrity() of ok -> ok; {error, Reason} -> @@ -858,7 +630,7 @@ create_schema() -> stop_mnesia(), rabbit_misc:ensure_ok(mnesia:create_schema([node()]), cannot_create_schema), start_mnesia(), - ok = create_tables(disc), + ok = rabbit_table:create(), ensure_schema_integrity(), ok = rabbit_version:record_desired(). @@ -883,47 +655,6 @@ move_db() -> start_mnesia(), ok. -copy_type_to_ram(TabDef) -> - [{disc_copies, []}, {ram_copies, [node()]} - | proplists:delete(ram_copies, proplists:delete(disc_copies, TabDef))]. - -table_has_copy_type(TabDef, DiscType) -> - lists:member(node(), proplists:get_value(DiscType, TabDef, [])). - -create_local_table_copies(Type) -> - lists:foreach( - fun ({Tab, TabDef}) -> - HasDiscCopies = table_has_copy_type(TabDef, disc_copies), - HasDiscOnlyCopies = table_has_copy_type(TabDef, disc_only_copies), - LocalTab = proplists:get_bool(local_content, TabDef), - StorageType = - if - Type =:= disc orelse LocalTab -> - if - HasDiscCopies -> disc_copies; - HasDiscOnlyCopies -> disc_only_copies; - true -> ram_copies - end; - Type =:= ram -> - ram_copies - end, - ok = create_local_table_copy(Tab, StorageType) - end, - table_definitions(Type)), - ok. - -create_local_table_copy(Tab, Type) -> - StorageType = mnesia:table_info(Tab, storage_type), - {atomic, ok} = - if - StorageType == unknown -> - mnesia:add_table_copy(Tab, node(), Type); - StorageType /= Type -> - mnesia:change_table_copy_type(Tab, node(), Type); - true -> {atomic, ok} - end, - ok. - remove_node_if_mnesia_running(Node) -> case mnesia:system_info(is_running) of yes -> @@ -942,13 +673,12 @@ remove_node_if_mnesia_running(Node) -> end. leave_cluster() -> - RunningNodes = running_nodes(nodes_excl_me(cluster_nodes(all))), - case not is_clustered() andalso RunningNodes =:= [] of - true -> ok; - false -> case lists:any(fun leave_cluster/1, RunningNodes) of - true -> ok; - false -> e(no_running_cluster_nodes) - end + case nodes_excl_me(cluster_nodes(all)) of + [] -> ok; + AllNodes -> case lists:any(fun leave_cluster/1, AllNodes) of + true -> ok; + false -> e(no_running_cluster_nodes) + end end. leave_cluster(Node) -> diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index 69480c9c11..f4c1f42b21 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -98,19 +98,17 @@ update_policies(VHost) -> ok. update_exchange(X = #exchange{name = XName, policy = OldPolicy}, Policies) -> - NewPolicy = match(XName, Policies), - case NewPolicy of + case match(XName, Policies) of OldPolicy -> no_change; - _ -> rabbit_exchange:update( + NewPolicy -> rabbit_exchange:update( XName, fun(X1) -> X1#exchange{policy = NewPolicy} end), {X, X#exchange{policy = NewPolicy}} end. update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) -> - NewPolicy = match(QName, Policies), - case NewPolicy of + case match(QName, Policies) of OldPolicy -> no_change; - _ -> rabbit_amqqueue:update( + NewPolicy -> rabbit_amqqueue:update( QName, fun(Q1) -> Q1#amqqueue{policy = NewPolicy} end), {Q, Q#amqqueue{policy = NewPolicy}} end. @@ -131,12 +129,11 @@ match(Name, Policies) -> matches(#resource{name = Name}, Policy) -> match =:= re:run(Name, pget(<<"pattern">>, Policy), [{capture, none}]). -sort_pred(A, B) -> - pget(<<"priority">>, A, 0) >= pget(<<"priority">>, B, 0). +sort_pred(A, B) -> pget(<<"priority">>, A, 0) >= pget(<<"priority">>, B, 0). %%---------------------------------------------------------------------------- policy_validation() -> [{<<"priority">>, fun rabbit_parameter_validation:number/2, optional}, - {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory}, - {<<"policy">>, fun rabbit_parameter_validation:list/2, mandatory}]. + {<<"pattern">>, fun rabbit_parameter_validation:regex/2, mandatory}, + {<<"policy">>, fun rabbit_parameter_validation:list/2, mandatory}]. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 6d6c648acb..21f581548d 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -537,7 +537,7 @@ queue_index_walker_reader(QueueName, Gatherer) -> State = blank_state(QueueName), ok = scan_segments( fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok) -> - gatherer:in(Gatherer, {MsgId, 1}); + gatherer:sync_in(Gatherer, {MsgId, 1}); (_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered, _IsAcked, Acc) -> Acc diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl new file mode 100644 index 0000000000..fa1c5bbd01 --- /dev/null +++ b/src/rabbit_table.erl @@ -0,0 +1,311 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_table). + +-export([create/0, create_local_copy/1, wait_for_replicated/0, wait/1, + force_load/0, is_present/0, is_empty/0, + check_schema_integrity/0, clear_ram_only_tables/0]). + +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(create/0 :: () -> 'ok'). +-spec(create_local_copy/1 :: ('disc' | 'ram') -> 'ok'). +-spec(wait_for_replicated/0 :: () -> 'ok'). +-spec(wait/1 :: ([atom()]) -> 'ok'). +-spec(force_load/0 :: () -> 'ok'). +-spec(is_present/0 :: () -> boolean()). +-spec(is_empty/0 :: () -> boolean()). +-spec(check_schema_integrity/0 :: () -> rabbit_types:ok_or_error(any())). +-spec(clear_ram_only_tables/0 :: () -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- +%% Main interface +%%---------------------------------------------------------------------------- + +create() -> + lists:foreach(fun ({Tab, TabDef}) -> + TabDef1 = proplists:delete(match, TabDef), + case mnesia:create_table(Tab, TabDef1) of + {atomic, ok} -> ok; + {aborted, Reason} -> + throw({error, {table_creation_failed, + Tab, TabDef1, Reason}}) + end + end, definitions()), + ok. + +%% The sequence in which we delete the schema and then the other +%% tables is important: if we delete the schema first when moving to +%% RAM mnesia will loudly complain since it doesn't make much sense to +%% do that. But when moving to disc, we need to move the schema first. +create_local_copy(disc) -> + create_local_copy(schema, disc_copies), + create_local_copies(disc); +create_local_copy(ram) -> + create_local_copies(ram), + create_local_copy(schema, ram_copies). + +wait_for_replicated() -> + wait([Tab || {Tab, TabDef} <- definitions(), + not lists:member({local_content, true}, TabDef)]). + +wait(TableNames) -> + case mnesia:wait_for_tables(TableNames, 30000) of + ok -> + ok; + {timeout, BadTabs} -> + throw({error, {timeout_waiting_for_tables, BadTabs}}); + {error, Reason} -> + throw({error, {failed_waiting_for_tables, Reason}}) + end. + +force_load() -> [mnesia:force_load_table(T) || T <- names()], ok. + +is_present() -> names() -- mnesia:system_info(tables) =:= []. + +is_empty() -> + lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end, + names()). + +check_schema_integrity() -> + Tables = mnesia:system_info(tables), + case check(fun (Tab, TabDef) -> + case lists:member(Tab, Tables) of + false -> {error, {table_missing, Tab}}; + true -> check_attributes(Tab, TabDef) + end + end) of + ok -> ok = wait(names()), + check(fun check_content/2); + Other -> Other + end. + +clear_ram_only_tables() -> + Node = node(), + lists:foreach( + fun (TabName) -> + case lists:member(Node, mnesia:table_info(TabName, ram_copies)) of + true -> {atomic, ok} = mnesia:clear_table(TabName); + false -> ok + end + end, names()), + ok. + +%%-------------------------------------------------------------------- +%% Internal helpers +%%-------------------------------------------------------------------- + +create_local_copies(Type) -> + lists:foreach( + fun ({Tab, TabDef}) -> + HasDiscCopies = has_copy_type(TabDef, disc_copies), + HasDiscOnlyCopies = has_copy_type(TabDef, disc_only_copies), + LocalTab = proplists:get_bool(local_content, TabDef), + StorageType = + if + Type =:= disc orelse LocalTab -> + if + HasDiscCopies -> disc_copies; + HasDiscOnlyCopies -> disc_only_copies; + true -> ram_copies + end; + Type =:= ram -> + ram_copies + end, + ok = create_local_copy(Tab, StorageType) + end, definitions(Type)), + ok. + +create_local_copy(Tab, Type) -> + StorageType = mnesia:table_info(Tab, storage_type), + {atomic, ok} = + if + StorageType == unknown -> + mnesia:add_table_copy(Tab, node(), Type); + StorageType /= Type -> + mnesia:change_table_copy_type(Tab, node(), Type); + true -> {atomic, ok} + end, + ok. + +has_copy_type(TabDef, DiscType) -> + lists:member(node(), proplists:get_value(DiscType, TabDef, [])). + +check_attributes(Tab, TabDef) -> + {_, ExpAttrs} = proplists:lookup(attributes, TabDef), + case mnesia:table_info(Tab, attributes) of + ExpAttrs -> ok; + Attrs -> {error, {table_attributes_mismatch, Tab, ExpAttrs, Attrs}} + end. + +check_content(Tab, TabDef) -> + {_, Match} = proplists:lookup(match, TabDef), + case mnesia:dirty_first(Tab) of + '$end_of_table' -> + ok; + Key -> + ObjList = mnesia:dirty_read(Tab, Key), + MatchComp = ets:match_spec_compile([{Match, [], ['$_']}]), + case ets:match_spec_run(ObjList, MatchComp) of + ObjList -> ok; + _ -> {error, {table_content_invalid, Tab, Match, ObjList}} + end + end. + +check(Fun) -> + case [Error || {Tab, TabDef} <- definitions(), + case Fun(Tab, TabDef) of + ok -> Error = none, false; + {error, Error} -> true + end] of + [] -> ok; + Errors -> {error, Errors} + end. + +%%-------------------------------------------------------------------- +%% Table definitions +%%-------------------------------------------------------------------- + +names() -> [Tab || {Tab, _} <- definitions()]. + +%% The tables aren't supposed to be on disk on a ram node +definitions(disc) -> + definitions(); +definitions(ram) -> + [{Tab, [{disc_copies, []}, {ram_copies, [node()]} | + proplists:delete( + ram_copies, proplists:delete(disc_copies, TabDef))]} || + {Tab, TabDef} <- definitions()]. + +definitions() -> + [{rabbit_user, + [{record_name, internal_user}, + {attributes, record_info(fields, internal_user)}, + {disc_copies, [node()]}, + {match, #internal_user{_='_'}}]}, + {rabbit_user_permission, + [{record_name, user_permission}, + {attributes, record_info(fields, user_permission)}, + {disc_copies, [node()]}, + {match, #user_permission{user_vhost = #user_vhost{_='_'}, + permission = #permission{_='_'}, + _='_'}}]}, + {rabbit_vhost, + [{record_name, vhost}, + {attributes, record_info(fields, vhost)}, + {disc_copies, [node()]}, + {match, #vhost{_='_'}}]}, + {rabbit_listener, + [{record_name, listener}, + {attributes, record_info(fields, listener)}, + {type, bag}, + {match, #listener{_='_'}}]}, + {rabbit_durable_route, + [{record_name, route}, + {attributes, record_info(fields, route)}, + {disc_copies, [node()]}, + {match, #route{binding = binding_match(), _='_'}}]}, + {rabbit_semi_durable_route, + [{record_name, route}, + {attributes, record_info(fields, route)}, + {type, ordered_set}, + {match, #route{binding = binding_match(), _='_'}}]}, + {rabbit_route, + [{record_name, route}, + {attributes, record_info(fields, route)}, + {type, ordered_set}, + {match, #route{binding = binding_match(), _='_'}}]}, + {rabbit_reverse_route, + [{record_name, reverse_route}, + {attributes, record_info(fields, reverse_route)}, + {type, ordered_set}, + {match, #reverse_route{reverse_binding = reverse_binding_match(), + _='_'}}]}, + {rabbit_topic_trie_node, + [{record_name, topic_trie_node}, + {attributes, record_info(fields, topic_trie_node)}, + {type, ordered_set}, + {match, #topic_trie_node{trie_node = trie_node_match(), _='_'}}]}, + {rabbit_topic_trie_edge, + [{record_name, topic_trie_edge}, + {attributes, record_info(fields, topic_trie_edge)}, + {type, ordered_set}, + {match, #topic_trie_edge{trie_edge = trie_edge_match(), _='_'}}]}, + {rabbit_topic_trie_binding, + [{record_name, topic_trie_binding}, + {attributes, record_info(fields, topic_trie_binding)}, + {type, ordered_set}, + {match, #topic_trie_binding{trie_binding = trie_binding_match(), + _='_'}}]}, + {rabbit_durable_exchange, + [{record_name, exchange}, + {attributes, record_info(fields, exchange)}, + {disc_copies, [node()]}, + {match, #exchange{name = exchange_name_match(), _='_'}}]}, + {rabbit_exchange, + [{record_name, exchange}, + {attributes, record_info(fields, exchange)}, + {match, #exchange{name = exchange_name_match(), _='_'}}]}, + {rabbit_exchange_serial, + [{record_name, exchange_serial}, + {attributes, record_info(fields, exchange_serial)}, + {match, #exchange_serial{name = exchange_name_match(), _='_'}}]}, + {rabbit_runtime_parameters, + [{record_name, runtime_parameters}, + {attributes, record_info(fields, runtime_parameters)}, + {disc_copies, [node()]}, + {match, #runtime_parameters{_='_'}}]}, + {rabbit_durable_queue, + [{record_name, amqqueue}, + {attributes, record_info(fields, amqqueue)}, + {disc_copies, [node()]}, + {match, #amqqueue{name = queue_name_match(), _='_'}}]}, + {rabbit_queue, + [{record_name, amqqueue}, + {attributes, record_info(fields, amqqueue)}, + {match, #amqqueue{name = queue_name_match(), _='_'}}]}] + ++ gm:table_definitions() + ++ mirrored_supervisor:table_definitions(). + +binding_match() -> + #binding{source = exchange_name_match(), + destination = binding_destination_match(), + _='_'}. +reverse_binding_match() -> + #reverse_binding{destination = binding_destination_match(), + source = exchange_name_match(), + _='_'}. +binding_destination_match() -> + resource_match('_'). +trie_node_match() -> + #trie_node{ exchange_name = exchange_name_match(), _='_'}. +trie_edge_match() -> + #trie_edge{ exchange_name = exchange_name_match(), _='_'}. +trie_binding_match() -> + #trie_binding{exchange_name = exchange_name_match(), _='_'}. +exchange_name_match() -> + resource_match(exchange). +queue_name_match() -> + resource_match(queue). +resource_match(Kind) -> + #resource{kind = Kind, _='_'}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index df0ee72147..aa48f22833 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -54,6 +54,7 @@ all_tests() -> passed = test_log_management_during_startup(), passed = test_statistics(), passed = test_arguments_parser(), + passed = test_dynamic_mirroring(), passed = test_user_management(), passed = test_runtime_parameters(), passed = test_server_status(), @@ -882,6 +883,52 @@ test_arguments_parser() -> passed. +test_dynamic_mirroring() -> + %% Just unit tests of the node selection logic, see multi node + %% tests for the rest... + Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params, {OldM, OldSs}, All) -> + {NewM, NewSs0} = + rabbit_mirror_queue_misc:suggested_queue_nodes( + Policy, Params, {OldM, OldSs}, All), + NewSs1 = lists:sort(NewSs0), + case dm_list_match(NewSs, NewSs1, ExtraSs) of + ok -> ok; + error -> exit({no_match, NewSs, NewSs1, ExtraSs}) + end + end, + + Test({a,[b,c],0},<<"all">>,'_',{a,[]}, [a,b,c]), + Test({a,[b,c],0},<<"all">>,'_',{a,[b,c]},[a,b,c]), + Test({a,[b,c],0},<<"all">>,'_',{a,[d]}, [a,b,c]), + + %% Add a node + Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[b]},[a,b,c,d]), + Test({b,[a,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{b,[a]},[a,b,c,d]), + %% Add two nodes and drop one + Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[d]},[a,b,c,d]), + %% Promote slave to master by policy + Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{d,[a]},[a,b,c,d]), + %% Don't try to include nodes that are not running + Test({a,[b], 0},<<"nodes">>,[<<"a">>,<<"b">>,<<"f">>],{a,[b]},[a,b,c,d]), + %% If we can't find any of the nodes listed then just keep the master + Test({a,[], 0},<<"nodes">>,[<<"f">>,<<"g">>,<<"h">>],{a,[b]},[a,b,c,d]), + + Test({a,[], 1},<<"exactly">>,2,{a,[]}, [a,b,c,d]), + Test({a,[], 2},<<"exactly">>,3,{a,[]}, [a,b,c,d]), + Test({a,[c], 0},<<"exactly">>,2,{a,[c]}, [a,b,c,d]), + Test({a,[c], 1},<<"exactly">>,3,{a,[c]}, [a,b,c,d]), + Test({a,[c], 0},<<"exactly">>,2,{a,[c,d]},[a,b,c,d]), + Test({a,[c,d],0},<<"exactly">>,3,{a,[c,d]},[a,b,c,d]), + + passed. + +%% Does the first list match the second where the second is required +%% to have exactly Extra superfluous items? +dm_list_match([], [], 0) -> ok; +dm_list_match(_, [], _Extra) -> error; +dm_list_match([H|T1], [H |T2], Extra) -> dm_list_match(T1, T2, Extra); +dm_list_match(L1, [_H|T2], Extra) -> dm_list_match(L1, T2, Extra - 1). + test_user_management() -> %% lots if stuff that should fail @@ -1053,8 +1100,8 @@ test_server_status() -> ok = control_action(set_vm_memory_high_watermark, [float_to_list(HWM)]), %% eval - {parse_error, _} = control_action(eval, ["\""]), - {parse_error, _} = control_action(eval, ["a("]), + {error_string, _} = control_action(eval, ["\""]), + {error_string, _} = control_action(eval, ["a("]), ok = control_action(eval, ["a."]), %% cleanup diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index f488afb48a..5bc3d9f530 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -117,8 +117,7 @@ exclusive_owner :: rabbit_types:maybe(pid()), arguments :: rabbit_framing:amqp_table(), pid :: rabbit_types:maybe(pid()), - slave_pids :: [pid()], - mirror_nodes :: [node()] | 'undefined' | 'all'}). + slave_pids :: [pid()]}). -type(exchange() :: #exchange{name :: rabbit_exchange:name(), diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index d037f954b4..455134da3f 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -201,7 +201,7 @@ primary_upgrade(Upgrades, Nodes) -> mnesia, Upgrades, fun () -> - force_tables(), + rabbit_table:force_load(), case Others of [] -> ok; _ -> info("mnesia upgrades: Breaking cluster~n", []), @@ -211,9 +211,6 @@ primary_upgrade(Upgrades, Nodes) -> end), ok. -force_tables() -> - [mnesia:force_load_table(T) || T <- rabbit_mnesia:table_names()]. - secondary_upgrade(AllNodes) -> %% must do this before we wipe out schema NodeType = node_type_legacy(), diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 47b22b98f6..21fdcd667b 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -41,6 +41,8 @@ -rabbit_upgrade({policy, mnesia, [exchange_scratches, ha_mirrors]}). -rabbit_upgrade({sync_slave_pids, mnesia, [policy]}). +-rabbit_upgrade({no_mirror_nodes, mnesia, [sync_slave_pids]}). +-rabbit_upgrade({gm_pids, mnesia, [no_mirror_nodes]}). %% ------------------------------------------------------------------- @@ -64,6 +66,8 @@ -spec(runtime_parameters/0 :: () -> 'ok'). -spec(policy/0 :: () -> 'ok'). -spec(sync_slave_pids/0 :: () -> 'ok'). +-spec(no_mirror_nodes/0 :: () -> 'ok'). +-spec(gm_pids/0 :: () -> 'ok'). -endif. @@ -254,16 +258,41 @@ sync_slave_pids() -> || T <- Tables], ok. +no_mirror_nodes() -> + Tables = [rabbit_queue, rabbit_durable_queue], + RemoveMirrorNodesFun = + fun ({amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, _MNodes, Pol}) -> + {amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, Pol} + end, + [ok = transform(T, RemoveMirrorNodesFun, + [name, durable, auto_delete, exclusive_owner, arguments, + pid, slave_pids, sync_slave_pids, policy]) + || T <- Tables], + ok. + +gm_pids() -> + Tables = [rabbit_queue, rabbit_durable_queue], + AddGMPidsFun = + fun ({amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, Pol}) -> + {amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, Pol, []} + end, + [ok = transform(T, AddGMPidsFun, + [name, durable, auto_delete, exclusive_owner, arguments, + pid, slave_pids, sync_slave_pids, policy, gm_pids]) + || T <- Tables], + ok. + + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> - rabbit_mnesia:wait_for_tables([TableName]), + rabbit_table:wait([TableName]), {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList), ok. transform(TableName, Fun, FieldList, NewRecordName) -> - rabbit_mnesia:wait_for_tables([TableName]), + rabbit_table:wait([TableName]), {atomic, ok} = mnesia:transform_table(TableName, Fun, FieldList, NewRecordName), ok. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 68c659dfbe..ddb136a73d 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -874,7 +874,7 @@ gb_sets_maybe_insert(false, _Val, Set) -> Set; gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId }, - MsgProps = #message_properties{delivered = Delivered}) -> + MsgProps = #message_properties { delivered = Delivered }) -> %% TODO would it make sense to remove #msg_status.is_delivered? #msg_status { seq_id = SeqId, msg_id = MsgId, msg = Msg, is_persistent = IsPersistent, is_delivered = Delivered, diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl new file mode 100644 index 0000000000..53f3df18b3 --- /dev/null +++ b/src/rabbit_vm.erl @@ -0,0 +1,129 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_vm). + +-export([memory/0]). + +-define(MAGIC_PLUGINS, ["mochiweb", "webmachine", "cowboy", "sockjs", + "rfc4627_jsonrpc"]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(memory/0 :: () -> rabbit_types:infos()). + +-endif. + +%%---------------------------------------------------------------------------- + +%% Like erlang:memory(), but with awareness of rabbit-y things +memory() -> + Conns = (sup_memory(rabbit_tcp_client_sup) + + sup_memory(ssl_connection_sup) + + sup_memory(amqp_sup)), + Qs = (sup_memory(rabbit_amqqueue_sup) + + sup_memory(rabbit_mirror_queue_slave_sup)), + Mnesia = mnesia_memory(), + MsgIndexETS = ets_memory(rabbit_msg_store_ets_index), + MsgIndexProc = (pid_memory(msg_store_transient) + + pid_memory(msg_store_persistent)), + MgmtDbETS = ets_memory(rabbit_mgmt_db), + MgmtDbProc = sup_memory(rabbit_mgmt_sup), + Plugins = plugin_memory() - MgmtDbProc, + + [{total, Total}, + {processes, Processes}, + {ets, ETS}, + {atom, Atom}, + {binary, Bin}, + {code, Code}, + {system, System}] = + erlang:memory([total, processes, ets, atom, binary, code, system]), + + OtherProc = Processes - Conns - Qs - MsgIndexProc - MgmtDbProc - Plugins, + + [{total, Total}, + {connection_procs, Conns}, + {queue_procs, Qs}, + {plugins, Plugins}, + {other_proc, lists:max([0, OtherProc])}, %% [1] + {mnesia, Mnesia}, + {mgmt_db, MgmtDbETS + MgmtDbProc}, + {msg_index, MsgIndexETS + MsgIndexProc}, + {other_ets, ETS - Mnesia - MsgIndexETS - MgmtDbETS}, + {binary, Bin}, + {code, Code}, + {atom, Atom}, + {other_system, System - ETS - Atom - Bin - Code}]. + +%% [1] - erlang:memory(processes) can be less than the sum of its +%% parts. Rather than display something nonsensical, just silence any +%% claims about negative memory. See +%% http://erlang.org/pipermail/erlang-questions/2012-September/069320.html + +%%---------------------------------------------------------------------------- + +sup_memory(Sup) -> + lists:sum([child_memory(P, T) || {_, P, T, _} <- sup_children(Sup)]) + + pid_memory(Sup). + +sup_children(Sup) -> + rabbit_misc:with_exit_handler( + rabbit_misc:const([]), fun () -> supervisor:which_children(Sup) end). + +pid_memory(Pid) when is_pid(Pid) -> case process_info(Pid, memory) of + {memory, M} -> M; + _ -> 0 + end; +pid_memory(Name) when is_atom(Name) -> case whereis(Name) of + P when is_pid(P) -> pid_memory(P); + _ -> 0 + end. + +child_memory(Pid, worker) when is_pid (Pid) -> pid_memory(Pid); +child_memory(Pid, supervisor) when is_pid (Pid) -> sup_memory(Pid); +child_memory(_, _) -> 0. + +mnesia_memory() -> + case mnesia:system_info(is_running) of + yes -> lists:sum([bytes(mnesia:table_info(Tab, memory)) || + Tab <- mnesia:system_info(tables)]); + no -> 0 + end. + +ets_memory(Name) -> + lists:sum([bytes(ets:info(T, memory)) || T <- ets:all(), + N <- [ets:info(T, name)], + N =:= Name]). + +bytes(Words) -> Words * erlang:system_info(wordsize). + +plugin_memory() -> + lists:sum([plugin_memory(App) || + {App, _, _} <- application:which_applications(), + is_plugin(atom_to_list(App))]). + +plugin_memory(App) -> + case catch application_master:get_child( + application_controller:get_master(App)) of + {Pid, _} -> sup_memory(Pid); + _ -> 0 + end. + +is_plugin("rabbitmq_" ++ _) -> true; +is_plugin(App) -> lists:member(App, ?MAGIC_PLUGINS). |
