diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 46 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 200 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 136 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 285 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 46 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 625 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave_sup.erl | 54 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_types.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 139 |
15 files changed, 1432 insertions, 175 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index c9a929ae00..0548d6bf09 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -36,6 +36,12 @@ []}}, {enables, external_infrastructure}]}). +-rabbit_boot_step({rabbit_registry, + [{description, "plugin registry"}, + {mfa, {rabbit_sup, start_child, + [rabbit_registry]}}, + {enables, external_infrastructure}]}). + -rabbit_boot_step({database, [{mfa, {rabbit_mnesia, init, []}}, {requires, file_handle_cache}, @@ -55,13 +61,6 @@ -rabbit_boot_step({external_infrastructure, [{description, "external infrastructure ready"}]}). --rabbit_boot_step({rabbit_registry, - [{description, "plugin registry"}, - {mfa, {rabbit_sup, start_child, - [rabbit_registry]}}, - {requires, external_infrastructure}, - {enables, kernel_ready}]}). - -rabbit_boot_step({rabbit_log, [{description, "logging server"}, {mfa, {rabbit_sup, start_restartable_child, diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 3aa20821b9..36b1662e9a 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -18,8 +18,8 @@ -export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]). -export([internal_declare/2, internal_delete/1, - maybe_run_queue_via_backing_queue/2, - maybe_run_queue_via_backing_queue_async/2, + maybe_run_queue_via_backing_queue/3, + maybe_run_queue_via_backing_queue_async/3, sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2, set_maximum_since_use/2, maybe_expire/1, drop_expired/1]). -export([pseudo_queue/2]). @@ -33,6 +33,7 @@ -export([notify_sent/2, unblock/2, flush_all/2]). -export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). +-export([store_queue/1]). -include("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). @@ -140,10 +141,10 @@ rabbit_types:connection_exit() | fun ((boolean()) -> rabbit_types:ok_or_error('not_found') | rabbit_types:connection_exit())). --spec(maybe_run_queue_via_backing_queue/2 :: - (pid(), (fun ((A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). --spec(maybe_run_queue_via_backing_queue_async/2 :: - (pid(), (fun ((A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). +-spec(maybe_run_queue_via_backing_queue/3 :: + (pid(), atom(), (fun ((A) -> {[rabbit_guid:msg_id()], A}))) -> 'ok'). +-spec(maybe_run_queue_via_backing_queue_async/3 :: + (pid(), atom(), (fun ((A) -> {[rabbit_guid:msg_id()], A}))) -> 'ok'). -spec(sync_timeout/1 :: (pid()) -> 'ok'). -spec(update_ram_duration/1 :: (pid()) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). @@ -191,12 +192,13 @@ recover_durable_queues(DurableQueues) -> declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), - Q = start_queue_process(#amqqueue{name = QueueName, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args, + Q = start_queue_process(#amqqueue{name = QueueName, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, exclusive_owner = Owner, - pid = none}), + pid = none, + mirror_pids = []}), case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of not_found -> rabbit_misc:not_found(QueueName); Q1 -> Q1 @@ -438,11 +440,13 @@ internal_delete(QueueName) -> end end). -maybe_run_queue_via_backing_queue(QPid, Fun) -> - gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity). -maybe_run_queue_via_backing_queue_async(QPid, Fun) -> - gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Fun}). +maybe_run_queue_via_backing_queue(QPid, Mod, Fun) -> + gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Mod, Fun}, + infinity). + +maybe_run_queue_via_backing_queue_async(QPid, Mod, Fun) -> + gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Mod, Fun}). sync_timeout(QPid) -> gen_server2:cast(QPid, sync_timeout). @@ -465,7 +469,8 @@ drop_expired(QPid) -> on_node_down(Node) -> rabbit_misc:execute_mnesia_transaction( fun () -> qlc:e(qlc:q([delete_queue(QueueName) || - #amqqueue{name = QueueName, pid = Pid} + #amqqueue{name = QueueName, pid = Pid, + mirror_pids = []} <- mnesia:table(rabbit_queue), node(Pid) == Node])) end, @@ -482,11 +487,12 @@ delete_queue(QueueName) -> rabbit_binding:remove_transient_for_destination(QueueName). pseudo_queue(QueueName, Pid) -> - #amqqueue{name = QueueName, - durable = false, + #amqqueue{name = QueueName, + durable = false, auto_delete = false, - arguments = [], - pid = Pid}. + arguments = [], + pid = Pid, + mirror_pids = []}. safe_delegate_call_ok(F, Pids) -> case delegate:invoke(Pids, fun (Pid) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 54c92dc70d..eb3b13cc36 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -33,7 +33,9 @@ handle_info/2, handle_pre_hibernate/1, prioritise_call/3, prioritise_cast/2, prioritise_info/2]). -%% Queue's state +-export([init_with_backing_queue_state/7]). + +% Queue's state -record(q, {q, exclusive_consumer, has_had_consumers, @@ -72,7 +74,8 @@ messages, consumers, memory, - backing_queue_status + backing_queue_status, + mirror_pids ]). -define(CREATION_EVENT_KEYS, @@ -97,12 +100,11 @@ info_keys() -> ?INFO_KEYS. init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), process_flag(trap_exit, true), - {ok, BQ} = application:get_env(backing_queue_module), {ok, #q{q = Q#amqqueue{pid = self()}, exclusive_consumer = none, has_had_consumers = false, - backing_queue = BQ, + backing_queue = backing_queue_module(Q), backing_queue_state = undefined, active_consumers = queue:new(), blocked_consumers = queue:new(), @@ -115,6 +117,36 @@ init(Q) -> msg_id_to_channel = dict:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. +init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, + RateTRef, AckTags, Deliveries, MTC) -> + ?LOGDEBUG("Queue starting - ~p~n", [Q]), + case Owner of + none -> ok; + _ -> erlang:monitor(process, Owner) + end, + State = requeue_and_run( + AckTags, + process_args( + #q{q = Q#amqqueue{pid = self()}, + exclusive_consumer = none, + has_had_consumers = false, + backing_queue = BQ, + backing_queue_state = BQS, + active_consumers = queue:new(), + blocked_consumers = queue:new(), + expires = undefined, + sync_timer_ref = undefined, + rate_timer_ref = RateTRef, + expiry_timer_ref = undefined, + ttl = undefined, + stats_timer = rabbit_event:init_stats_timer(), + msg_id_to_channel = MTC})), + lists:foldl( + fun (Delivery, StateN) -> + {_Delivered, StateN1} = deliver_or_enqueue(Delivery, StateN), + StateN1 + end, State, Deliveries). + terminate(shutdown, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State); terminate({shutdown, _}, State = #q{backing_queue = BQ}) -> @@ -137,8 +169,7 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- declare(Recover, From, - State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable}, - backing_queue = BQ, backing_queue_state = undefined, + State = #q{q = Q, backing_queue = BQ, backing_queue_state = undefined, stats_timer = StatsTimer}) -> case rabbit_amqqueue:internal_declare(Q, Recover) of not_found -> {stop, normal, not_found, State}; @@ -149,7 +180,7 @@ declare(Recover, From, ok = rabbit_memory_monitor:register( self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), - BQS = BQ:init(QName, IsDurable, Recover), + BQS = BQ:init(Q, Recover), State1 = process_args(State#q{backing_queue_state = BQS}), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), @@ -210,6 +241,13 @@ next_state(State) -> false -> {stop_sync_timer(State2), hibernate} end. +backing_queue_module(#amqqueue{arguments = Args}) -> + case rabbit_misc:table_lookup(Args, <<"x-mirror">>) of + undefined -> {ok, BQM} = application:get_env(backing_queue_module), + BQM; + _Nodes -> rabbit_mirror_queue_master + end. + ensure_sync_timer(State = #q{sync_timer_ref = undefined}) -> {ok, TRef} = timer:apply_after( ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]), @@ -448,58 +486,78 @@ attempt_delivery(#delivery{txn = none, sender = ChPid, message = Message, msg_seq_no = MsgSeqNo}, - {NeedsConfirming, State = #q{backing_queue = BQ}}) -> + {NeedsConfirming, State = #q{backing_queue = BQ, + backing_queue_state = BQS}}) -> case NeedsConfirming of immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]); _ -> ok end, - PredFun = fun (IsEmpty, _State) -> not IsEmpty end, - DeliverFun = - fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> - %% we don't need an expiry here because messages are - %% not being enqueued, so we use an empty - %% message_properties. - {AckTag, BQS1} = - BQ:publish_delivered( - AckRequired, Message, - (?BASE_MESSAGE_PROPERTIES)#message_properties{ - needs_confirming = (NeedsConfirming =:= eventually)}, - BQS), - {{Message, false, AckTag}, true, - State1#q{backing_queue_state = BQS1}} - end, - {Delivered, State1} = - deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State), - {Delivered, NeedsConfirming, State1}; + case BQ:validate_message(Message, BQS) of + {invalid, BQS1} -> + {invalid, NeedsConfirming, State#q{backing_queue_state = BQS1}}; + {valid, BQS1} -> + PredFun = fun (IsEmpty, _State) -> not IsEmpty end, + DeliverFun = + fun (AckRequired, false, + State1 = #q{backing_queue_state = BQS2}) -> + %% we don't need an expiry here because + %% messages are not being enqueued, so we use + %% an empty message_properties. + {AckTag, BQS3} = + BQ:publish_delivered( + AckRequired, Message, + (?BASE_MESSAGE_PROPERTIES)#message_properties{ + needs_confirming = + (NeedsConfirming =:= eventually)}, + ChPid, BQS2), + {{Message, false, AckTag}, true, + State1#q{backing_queue_state = BQS3}} + end, + {Delivered, State2} = + deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, + State#q{backing_queue_state = BQS1}), + {{valid, Delivered}, NeedsConfirming, State2} + end; attempt_delivery(#delivery{txn = Txn, sender = ChPid, message = Message}, {NeedsConfirming, State = #q{backing_queue = BQ, backing_queue_state = BQS}}) -> - store_ch_record((ch_record(ChPid))#cr{txn = Txn}), - BQS1 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS), - {true, NeedsConfirming, State#q{backing_queue_state = BQS1}}. + case BQ:validate_message(Message, BQS) of + {invalid, BQS1} -> + {invalid, NeedsConfirming, State#q{backing_queue_state = BQS1}}; + {valid, BQS1} -> + store_ch_record((ch_record(ChPid))#cr{txn = Txn}), + BQS2 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, ChPid, + BQS1), + {{valid, true}, NeedsConfirming, + State#q{backing_queue_state = BQS2}} + end. deliver_or_enqueue(Delivery, State) -> case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of - {true, _, State1} -> + {invalid, _, State1} -> + State1; + {{valid, true}, _, State1} -> State1; - {false, NeedsConfirming, State1 = #q{backing_queue = BQ, - backing_queue_state = BQS}} -> + {{valid, false}, NeedsConfirming, + State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> #delivery{message = Message} = Delivery, BQS1 = BQ:publish(Message, (message_properties(State)) #message_properties{ needs_confirming = (NeedsConfirming =:= eventually)}, - BQS), + Delivery #delivery.sender, BQS), ensure_ttl_timer(State1#q{backing_queue_state = BQS1}) end. -requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> +requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl = TTL}) -> maybe_run_queue_via_backing_queue( - fun (BQS) -> - {[], BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS)} - end, State). + BQ, fun (BQS) -> + {_Guids, BQS1} = + BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS), + {[], BQS1} + end, State). fetch(AckRequired, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> @@ -603,10 +661,12 @@ qname(#q{q = #amqqueue{name = QName}}) -> QName. backing_queue_idle_timeout(State = #q{backing_queue = BQ}) -> maybe_run_queue_via_backing_queue( - fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State). + BQ, fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State). -maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> - {MsgIds, BQS1} = Fun(BQS), +maybe_run_queue_via_backing_queue(Mod, Fun, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {MsgIds, BQS1} = BQ:invoke(Mod, Fun, BQS), run_message_queue( confirm_messages(MsgIds, State#q{backing_queue_state = BQS1})). @@ -703,6 +763,9 @@ i(memory, _) -> M; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); +i(mirror_pids, #q{q = #amqqueue{name = Name}}) -> + {ok, #amqqueue{mirror_pids = MPids}} = rabbit_amqqueue:lookup(Name), + MPids; i(Item, _) -> throw({bad_argument, Item}). @@ -738,29 +801,29 @@ emit_consumer_deleted(ChPid, ConsumerTag) -> prioritise_call(Msg, _From, _State) -> case Msg of - info -> 9; - {info, _Items} -> 9; - consumers -> 9; - {maybe_run_queue_via_backing_queue, _Fun} -> 6; - _ -> 0 + info -> 9; + {info, _Items} -> 9; + consumers -> 9; + {maybe_run_queue_via_backing_queue, _Mod, _Fun} -> 6; + _ -> 0 end. prioritise_cast(Msg, _State) -> case Msg of - update_ram_duration -> 8; - delete_immediately -> 8; - {set_ram_duration_target, _Duration} -> 8; - {set_maximum_since_use, _Age} -> 8; - maybe_expire -> 8; - drop_expired -> 8; - emit_stats -> 7; - {ack, _Txn, _AckTags, _ChPid} -> 7; - {reject, _AckTags, _Requeue, _ChPid} -> 7; - {notify_sent, _ChPid} -> 7; - {unblock, _ChPid} -> 7; - {maybe_run_queue_via_backing_queue, _Fun} -> 6; - sync_timeout -> 6; - _ -> 0 + update_ram_duration -> 8; + delete_immediately -> 8; + {set_ram_duration_target, _Duration} -> 8; + {set_maximum_since_use, _Age} -> 8; + maybe_expire -> 8; + drop_expired -> 8; + emit_stats -> 7; + {ack, _Txn, _AckTags, _ChPid} -> 7; + {reject, _AckTags, _Requeue, _ChPid} -> 7; + {notify_sent, _ChPid} -> 7; + {unblock, _ChPid} -> 7; + {maybe_run_queue_via_backing_queue, _Mod, _Fun} -> 6; + sync_timeout -> 6; + _ -> 0 end. prioritise_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, @@ -815,9 +878,12 @@ handle_call({deliver_immediately, Delivery}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, _NeedsConfirming, State1} = + {Valid, _NeedsConfirming, State1} = attempt_delivery(Delivery, record_confirm_message(Delivery, State)), - reply(Delivered, State1); + reply(case Valid of + valid -> true; + invalid -> false + end, State1); handle_call({deliver, Delivery}, From, State) -> %% Synchronous, "mandatory" delivery mode. Reply asap. @@ -972,12 +1038,12 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> noreply(requeue_and_run(AckTags, State)) end; -handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> - reply(ok, maybe_run_queue_via_backing_queue(Fun, State)). +handle_call({maybe_run_queue_via_backing_queue, Mod, Fun}, _From, State) -> + reply(ok, maybe_run_queue_via_backing_queue(Mod, Fun, State)). -handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) -> - noreply(maybe_run_queue_via_backing_queue(Fun, State)); +handle_cast({maybe_run_queue_via_backing_queue, Mod, Fun}, State) -> + noreply(maybe_run_queue_via_backing_queue(Mod, Fun, State)); handle_cast(sync_timeout, State) -> noreply(backing_queue_idle_timeout(State#q{sync_timer_ref = undefined})); @@ -996,7 +1062,7 @@ handle_cast({ack, Txn, AckTags, ChPid}, case Txn of none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), NewC = C#cr{acktags = ChAckTags1}, - BQS1 = BQ:ack(AckTags, BQS), + {_Guids, BQS1} = BQ:ack(AckTags, BQS), {NewC, State#q{backing_queue_state = BQS1}}; _ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS), {C#cr{txn = Txn}, @@ -1017,7 +1083,7 @@ handle_cast({reject, AckTags, Requeue, ChPid}, maybe_store_ch_record(C#cr{acktags = ChAckTags1}), noreply(case Requeue of true -> requeue_and_run(AckTags, State); - false -> BQS1 = BQ:ack(AckTags, BQS), + false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS), State#q{backing_queue_state = BQS1} end) end; diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 03c1fdd1b9..726b9befb8 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -33,7 +33,7 @@ behaviour_info(callbacks) -> {stop, 0}, %% Initialise the backing queue and its state. - {init, 3}, + {init, 2}, %% Called on queue shutdown when queue isn't being deleted. {terminate, 1}, @@ -47,12 +47,12 @@ behaviour_info(callbacks) -> {purge, 1}, %% Publish a message. - {publish, 3}, + {publish, 4}, %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls %% (i.e. saves the round trip through the backing queue). - {publish_delivered, 4}, + {publish_delivered, 5}, %% Drop messages from the head of the queue while the supplied %% predicate returns true. @@ -66,7 +66,7 @@ behaviour_info(callbacks) -> {ack, 2}, %% A publish, but in the context of a transaction. - {tx_publish, 4}, + {tx_publish, 5}, %% Acks, but in the context of a transaction. {tx_ack, 3}, @@ -122,7 +122,15 @@ behaviour_info(callbacks) -> %% Exists for debugging purposes, to be able to expose state via %% rabbitmqctl list_queues backing_queue_status - {status, 1} + {status, 1}, + + %% Passed a function to be invoked with the relevant backing + %% queue's state. Useful for when the backing queue or other + %% components need to pass functions into the backing queue. + {invoke, 3}, + + %% TODO: document me + {validate_message, 2} ]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 8364ecd8d7..e2c050f5d8 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -358,6 +358,12 @@ format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] = Value) when is_binary(TableEntryKey) andalso is_atom(TableEntryType) -> io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]); +format_info_item([T | _] = Value) + when is_tuple(T) orelse is_pid(T) orelse is_binary(T) orelse is_atom(T) orelse + is_list(T) -> + "[" ++ + lists:nthtail(2, lists:append( + [", " ++ format_info_item(E) || E <- Value])) ++ "]"; format_info_item(Value) -> io_lib:format("~w", [Value]). diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl new file mode 100644 index 0000000000..30fd6ed34d --- /dev/null +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -0,0 +1,136 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_coordinator). + +-export([start_link/2, add_slave/2, get_gm/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-export([joined/2, members_changed/3, handle_msg/3]). + +-behaviour(gen_server2). +-behaviour(gm). + +-include("rabbit.hrl"). +-include("gm_specs.hrl"). + +-record(state, { q, + gm + }). + +-define(ONE_SECOND, 1000). + +start_link(Queue, GM) -> + gen_server2:start_link(?MODULE, [Queue, GM], []). + +add_slave(CPid, SlaveNode) -> + gen_server2:cast(CPid, {add_slave, SlaveNode}). + +get_gm(CPid) -> + gen_server2:call(CPid, get_gm, infinity). + +%% --------------------------------------------------------------------------- +%% gen_server +%% --------------------------------------------------------------------------- + +init([#amqqueue { name = QueueName } = Q, GM]) -> + GM1 = case GM of + undefined -> + ok = gm:create_tables(), + {ok, GM2} = gm:start_link(QueueName, ?MODULE, [self()]), + receive {joined, GM2, _Members} -> + ok + end, + GM2; + _ -> + true = link(GM), + GM + end, + {ok, _TRef} = + timer:apply_interval(?ONE_SECOND, gm, broadcast, [GM1, heartbeat]), + {ok, #state { q = Q, gm = GM1 }, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call(get_gm, _From, State = #state { gm = GM }) -> + reply(GM, State). + +handle_cast({add_slave, Node}, State = #state { q = Q }) -> + Nodes = nodes(), + case lists:member(Node, Nodes) of + true -> + Result = rabbit_mirror_queue_slave_sup:start_child(Node, [Q]), + rabbit_log:info("Adding slave node for ~s: ~p~n", + [rabbit_misc:rs(Q #amqqueue.name), Result]); + false -> + rabbit_log:info( + "Ignoring request to add slave on node ~p for ~s~n", + [Node, rabbit_misc:rs(Q #amqqueue.name)]) + end, + noreply(State); + +handle_cast({gm_deaths, Deaths}, + State = #state { q = #amqqueue { name = QueueName } }) -> + rabbit_log:info("Master ~p saw deaths ~p for ~s~n", + [self(), Deaths, rabbit_misc:rs(QueueName)]), + case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of + {ok, Pid} when node(Pid) =:= node() -> + noreply(State); + {error, not_found} -> + {stop, normal, State} + end. + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +terminate(_Reason, #state{}) -> + %% gen_server case + ok; +terminate([_CPid], _Reason) -> + %% gm case + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% --------------------------------------------------------------------------- +%% GM +%% --------------------------------------------------------------------------- + +joined([CPid], Members) -> + CPid ! {joined, self(), Members}, + ok. + +members_changed([_CPid], _Births, []) -> + ok; +members_changed([CPid], _Births, Deaths) -> + ok = gen_server2:cast(CPid, {gm_deaths, Deaths}). + +handle_msg([_CPid], _From, heartbeat) -> + ok; +handle_msg([_CPid], _From, _Msg) -> + ok. + +%% --------------------------------------------------------------------------- +%% Others +%% --------------------------------------------------------------------------- + +noreply(State) -> + {noreply, State, hibernate}. + +reply(Reply, State) -> + {reply, Reply, State, hibernate}. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl new file mode 100644 index 0000000000..dd2357bb48 --- /dev/null +++ b/src/rabbit_mirror_queue_master.erl @@ -0,0 +1,285 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_master). + +-export([init/2, terminate/1, delete_and_terminate/1, + purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, + tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, + requeue/3, len/1, is_empty/1, dropwhile/2, + set_ram_duration_target/2, ram_duration/1, + needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, + status/1, invoke/3]). + +-export([start/1, stop/0]). + +-export([promote_backing_queue_state/5]). + +-behaviour(rabbit_backing_queue). + +-include("rabbit.hrl"). + +-record(state, { gm, + coordinator, + backing_queue, + backing_queue_state, + set_delivered, + seen_status + }). + +%% --------------------------------------------------------------------------- +%% Backing queue +%% --------------------------------------------------------------------------- + +start(_DurableQueues) -> + %% This will never get called as this module will never be + %% installed as the default BQ implementation. + exit({not_valid_for_generic_backing_queue, ?MODULE}). + +stop() -> + %% Same as start/1. + exit({not_valid_for_generic_backing_queue, ?MODULE}). + +init(#amqqueue { arguments = Args } = Q, Recover) -> + {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, undefined), + GM = rabbit_mirror_queue_coordinator:get_gm(CPid), + {_Type, Nodes} = rabbit_misc:table_lookup(Args, <<"x-mirror">>), + Nodes1 = case Nodes of + [] -> nodes(); + _ -> [list_to_atom(binary_to_list(Node)) || + {longstr, Node} <- Nodes] + end, + [rabbit_mirror_queue_coordinator:add_slave(CPid, Node) || Node <- Nodes1], + {ok, BQ} = application:get_env(backing_queue_module), + BQS = BQ:init(Q, Recover), + #state { gm = GM, + coordinator = CPid, + backing_queue = BQ, + backing_queue_state = BQS, + set_delivered = 0, + seen_status = dict:new() }. + +promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus) -> + #state { gm = GM, + coordinator = CPid, + backing_queue = BQ, + backing_queue_state = BQS, + set_delivered = BQ:len(BQS), + seen_status = SeenStatus }. + +terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + %% Backing queue termination. The queue is going down but + %% shouldn't be deleted. Most likely safe shutdown of this + %% node. Thus just let some other slave take over. + State #state { backing_queue_state = BQ:terminate(BQS) }. + +delete_and_terminate(State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + ok = gm:broadcast(GM, delete_and_terminate), + State #state { backing_queue_state = BQ:delete_and_terminate(BQS), + set_delivered = 0 }. + +purge(State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + ok = gm:broadcast(GM, {set_length, 0}), + {Count, BQS1} = BQ:purge(BQS), + {Count, State #state { backing_queue_state = BQS1, + set_delivered = 0 }}. + +publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid, + State = #state { gm = GM, + seen_status = SS, + backing_queue = BQ, + backing_queue_state = BQS }) -> + false = dict:is_key(MsgId, SS), %% ASSERTION + ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}), + BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + State #state { backing_queue_state = BQS1 }. + +publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, + ChPid, State = #state { gm = GM, + backing_queue = BQ, + seen_status = SS, + backing_queue = BQ, + backing_queue_state = BQS }) -> + false = dict:is_key(MsgId, SS), %% ASSERTION + %% Must use confirmed_broadcast here in order to guarantee that + %% all slaves are forced to interpret this publish_delivered at + %% the same point, especially if we die and a slave is promoted. + BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + ok = gm:confirmed_broadcast( + GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg}), + BQS1 = BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS), + State #state { backing_queue_state = BQS1 }. + +dropwhile(Fun, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + set_delivered = SetDelivered }) -> + Len = BQ:len(BQS), + BQS1 = BQ:dropwhile(Fun, BQS), + Dropped = Len - BQ:len(BQS1), + SetDelivered1 = lists:max([0, SetDelivered - Dropped]), + ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}), + State #state { backing_queue_state = BQS1, + set_delivered = SetDelivered1 }. + +fetch(AckRequired, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + set_delivered = SetDelivered, + seen_status = SS }) -> + {Result, BQS1} = BQ:fetch(AckRequired, BQS), + State1 = State #state { backing_queue_state = BQS1 }, + case Result of + empty -> + {Result, State1}; + {#basic_message { id = MsgId } = Message, IsDelivered, AckTag, + Remaining} -> + ok = gm:broadcast(GM, {fetch, AckRequired, MsgId, Remaining}), + IsDelivered1 = IsDelivered orelse SetDelivered > 0, + SetDelivered1 = lists:max([0, SetDelivered - 1]), + SS1 = case SetDelivered + SetDelivered1 of + 1 -> dict:new(); %% transition to empty + _ -> SS + end, + {{Message, IsDelivered1, AckTag, Remaining}, + State1 #state { set_delivered = SetDelivered1, + seen_status = SS1 }} + end. + +ack(AckTags, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + {MsgIds, BQS1} = BQ:ack(AckTags, BQS), + case MsgIds of + [] -> ok; + _ -> ok = gm:broadcast(GM, {ack, MsgIds}) + end, + {MsgIds, State #state { backing_queue_state = BQS1 }}. + +tx_publish(Txn, Msg, MsgProps, ChPid, #state {} = State) -> + %% gm:broadcast(GM, {tx_publish, Txn, MsgId, MsgProps, ChPid}) + State. + +tx_ack(Txn, AckTags, #state {} = State) -> + %% gm:broadcast(GM, {tx_ack, Txn, MsgIds}) + State. + +tx_rollback(Txn, #state {} = State) -> + %% gm:broadcast(GM, {tx_rollback, Txn}) + {[], State}. + +tx_commit(Txn, PostCommitFun, MsgPropsFun, #state {} = State) -> + %% Maybe don't want to transmit the MsgPropsFun but what choice do + %% we have? OTOH, on the slaves, things won't be expiring on their + %% own (props are interpreted by amqqueue, not vq), so if the msg + %% props aren't quite the same, that doesn't matter. + %% + %% The PostCommitFun is actually worse - we need to prevent that + %% from being invoked until we have confirmation from all the + %% slaves that they've done everything up to there. + %% + %% In fact, transactions are going to need work seeing as it's at + %% this point that VQ mentions amqqueue, which will thus not work + %% on the slaves - we need to make sure that all the slaves do the + %% tx_commit_post_msg_store at the same point, and then when they + %% all confirm that (scatter/gather), we can finally invoke the + %% PostCommitFun. + %% + %% Another idea is that the slaves are actually driven with + %% pubacks and thus only the master needs to support txns + %% directly. + {[], State}. + +requeue(AckTags, MsgPropsFun, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + {MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS), + ok = gm:broadcast(GM, {requeue, MsgPropsFun, MsgIds}), + {MsgIds, State #state { backing_queue_state = BQS1 }}. + +len(#state { backing_queue = BQ, backing_queue_state = BQS}) -> + BQ:len(BQS). + +is_empty(#state { backing_queue = BQ, backing_queue_state = BQS}) -> + BQ:is_empty(BQS). + +set_ram_duration_target(Target, State = #state { backing_queue = BQ, + backing_queue_state = BQS}) -> + State #state { backing_queue_state = + BQ:set_ram_duration_target(Target, BQS) }. + +ram_duration(State = #state { backing_queue = BQ, backing_queue_state = BQS}) -> + {Result, BQS1} = BQ:ram_duration(BQS), + {Result, State #state { backing_queue_state = BQS1 }}. + +needs_idle_timeout(#state { backing_queue = BQ, backing_queue_state = BQS}) -> + BQ:needs_idle_timeout(BQS). + +idle_timeout(State = #state { backing_queue = BQ, backing_queue_state = BQS}) -> + State #state { backing_queue_state = BQ:idle_timeout(BQS) }. + +handle_pre_hibernate(State = #state { backing_queue = BQ, + backing_queue_state = BQS}) -> + State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }. + +status(#state { backing_queue = BQ, backing_queue_state = BQS}) -> + BQ:status(BQS). + +invoke(?MODULE, Fun, State) -> + Fun(State); +invoke(Mod, Fun, State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + {MsgIds, BQS1} = BQ:invoke(Mod, Fun, BQS), + {MsgIds, State #state { backing_queue_state = BQS1 }}. + +validate_message(Message = #basic_message { id = MsgId }, + State = #state { seen_status = SS, + backing_queue = BQ, + backing_queue_state = BSQ }) -> + %% Here, we need to deal with the possibility that we're about to + %% receive a message that we've already seen when we were a slave + %% (we received it via gm). Thus if we do receive such message now + %% via the channel, there may be a confirm waiting to issue for + %% it. + + %% We will never see {published, ChPid, MsgSeqNo} here. + case dict:find(MsgId, SS) of + error -> + %% We permit the underlying BQ to have a peek at it, but + %% only if we ourselves are not filtering out the msg. + {Result, BQS1} = BQ:validate_message(Message, BQS), + {Result, State #state { backing_queue_state = BQS1 }}; + {ok, {published, ChPid}} -> + %% It already got published when we were a slave and no + %% confirmation is waiting. amqqueue_process will have, in + %% its msg_id_to_channel mapping, the entry for dealing + %% with the confirm when that comes back in. The msg is + %% invalid. We will not see this again, so erase. + {invalid, State #state { seen_status = dict:erase(MsgId, SS) }}; + {ok, {confirmed, ChPid}} -> + %% It got confirmed before we became master, but we've + %% only just received the publish from the channel, so + %% couldn't previously know what the msg_seq_no was. Thus + %% confirm now. As above, amqqueue_process will have the + %% entry for the msg_id_to_channel mapping. + ok = rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( + self(), ?MODULE, fun (State1) -> {[MsgId], State1} end), + {invalid, State #state { seen_status = dict:erase(MsgId, SS) }} + end. diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl new file mode 100644 index 0000000000..090cb81203 --- /dev/null +++ b/src/rabbit_mirror_queue_misc.erl @@ -0,0 +1,46 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_misc). + +-export([remove_from_queue/2]). + +-include("rabbit.hrl"). + +remove_from_queue(QueueName, DeadPids) -> + DeadNodes = [node(DeadPid) || DeadPid <- DeadPids], + rabbit_misc:execute_mnesia_transaction( + fun () -> + %% Someone else could have deleted the queue before we + %% get here. + case mnesia:read({rabbit_queue, QueueName}) of + [] -> {error, not_found}; + [Q = #amqqueue { pid = QPid, + mirror_pids = MPids }] -> + [QPid1 | MPids1] = + [Pid || Pid <- [QPid | MPids], + not lists:member(node(Pid), DeadNodes)], + case {{QPid, MPids}, {QPid1, MPids1}} of + {Same, Same} -> + {ok, QPid}; + _ -> + Q1 = Q #amqqueue { pid = QPid1, + mirror_pids = MPids1 }, + ok = rabbit_amqqueue:store_queue(Q1), + {ok, QPid1} + end + end + end). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl new file mode 100644 index 0000000000..87ce31d8df --- /dev/null +++ b/src/rabbit_mirror_queue_slave.erl @@ -0,0 +1,625 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_slave). + +%% We join the GM group before we add ourselves to the amqqueue +%% record. As a result: +%% 1. We can receive msgs from GM that correspond to messages we will +%% never receive from publishers. +%% 2. When we receive a message from publishers, we must receive a +%% message from the GM group for it. +%% 3. However, that instruction from the GM group can arrive either +%% before or after the actual message. We need to be able to +%% distinguish between GM instructions arriving early, and case (1) +%% above. +%% +%% All instructions from the GM group must be processed in the order +%% in which they're received. + +-export([start_link/1, set_maximum_since_use/2]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3, handle_pre_hibernate/1, prioritise_call/3, + prioritise_cast/2]). + +-export([joined/2, members_changed/3, handle_msg/3]). + +-behaviour(gen_server2). +-behaviour(gm). + +-include("rabbit.hrl"). +-include("gm_specs.hrl"). + +-record(state, { q, + gm, + master_node, + backing_queue, + backing_queue_state, + sync_timer_ref, + rate_timer_ref, + + sender_queues, %% :: Pid -> MsgQ + msg_id_ack, %% :: MsgId -> AckTag + + msg_id_status + }). + +-define(SYNC_INTERVAL, 25). %% milliseconds +-define(RAM_DURATION_UPDATE_INTERVAL, 5000). + +start_link(Q) -> + gen_server2:start_link(?MODULE, [Q], []). + +set_maximum_since_use(QPid, Age) -> + gen_server2:cast(QPid, {set_maximum_since_use, Age}). + +init([#amqqueue { name = QueueName } = Q]) -> + process_flag(trap_exit, true), %% amqqueue_process traps exits too. + ok = gm:create_tables(), + {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), + receive {joined, GM} -> + ok + end, + Self = self(), + Node = node(), + case rabbit_misc:execute_mnesia_transaction( + fun () -> + [Q1 = #amqqueue { pid = QPid, mirror_pids = MPids }] = + mnesia:read({rabbit_queue, QueueName}), + case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of + [] -> + MPids1 = MPids ++ [Self], + mnesia:write(rabbit_queue, + Q1 #amqqueue { mirror_pids = MPids1 }, + write), + {ok, QPid}; + _ -> + {error, node_already_present} + end + end) of + {ok, MPid} -> + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, [self()]), + ok = rabbit_memory_monitor:register( + self(), {rabbit_amqqueue, set_ram_duration_target, + [self()]}), + {ok, BQ} = application:get_env(backing_queue_module), + BQS = BQ:init(Q, false), + {ok, #state { q = Q, + gm = GM, + master_node = node(MPid), + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = undefined, + sync_timer_ref = undefined, + + sender_queues = dict:new(), + msg_id_ack = dict:new(), + msg_id_status = dict:new() + }, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, + ?DESIRED_HIBERNATE}}; + {error, Error} -> + {stop, Error} + end. + +handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) -> + %% Synchronous, "immediate" delivery mode + + %% It is safe to reply 'false' here even if a) we've not seen the + %% msg via gm, or b) the master dies before we receive the msg via + %% gm. In the case of (a), we will eventually receive the msg via + %% gm, and it's only the master's result to the channel that is + %% important. In the case of (b), if the master does die and we do + %% get promoted then at that point we have no consumers, thus + %% 'false' is precisely the correct answer. However, we must be + %% careful to _not_ enqueue the message in this case. + gen_server2:reply(From, false), %% master may deliver it, not us + noreply(maybe_enqueue_message(Delivery, State)); + +handle_call({deliver, Delivery = #delivery {}}, From, State) -> + %% Synchronous, "mandatory" delivery mode + gen_server2:reply(From, true), %% amqqueue throws away the result anyway + noreply(maybe_enqueue_message(Delivery, State)); + +handle_call({gm_deaths, Deaths}, From, + State = #state { q = #amqqueue { name = QueueName }, + gm = GM, + master_node = MNode }) -> + rabbit_log:info("Slave ~p saw deaths ~p for ~s~n", + [self(), Deaths, rabbit_misc:rs(QueueName)]), + %% The GM has told us about deaths, which means we're not going to + %% receive any more messages from GM + case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of + {ok, Pid} when node(Pid) =:= MNode -> + reply(ok, State); + {ok, Pid} when node(Pid) =:= node() -> + promote_me(From, State); + {ok, Pid} -> + gen_server2:reply(From, ok), + ok = gm:broadcast(GM, heartbeat), + noreply(State #state { master_node = node(Pid) }); + {error, not_found} -> + gen_server2:reply(From, ok), + {stop, normal, State} + end; + +handle_call({maybe_run_queue_via_backing_queue, Mod, Fun}, _From, State) -> + reply(ok, maybe_run_queue_via_backing_queue(Mod, Fun, State)). + + +handle_cast({maybe_run_queue_via_backing_queue, Mod, Fun}, State) -> + noreply(maybe_run_queue_via_backing_queue(Mod, Fun, State)); + +handle_cast({gm, Instruction}, State) -> + handle_process_result(process_instruction(Instruction, State)); + +handle_cast({deliver, Delivery = #delivery {}}, State) -> + %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. + noreply(maybe_enqueue_message(Delivery, State)); + +handle_cast({set_maximum_since_use, Age}, State) -> + ok = file_handle_cache:set_maximum_since_use(Age), + noreply(State); + +handle_cast({set_ram_duration_target, Duration}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + BQS1 = BQ:set_ram_duration_target(Duration, BQS), + noreply(State #state { backing_queue_state = BQS1 }); + +handle_cast(update_ram_duration, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + {RamDuration, BQS1} = BQ:ram_duration(BQS), + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), + BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), + noreply(State #state { rate_timer_ref = just_measured, + backing_queue_state = BQS2 }); + +handle_cast(sync_timeout, State) -> + noreply(backing_queue_idle_timeout( + State #state { sync_timer_ref = undefined })). + +handle_info(timeout, State) -> + noreply(backing_queue_idle_timeout(State)); + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +%% If the Reason is shutdown, or {shutdown, _}, it is not the queue +%% being deleted: it's just the node going down. Even though we're a +%% slave, we have no idea whether or not we'll be the only copy coming +%% back up. Thus we must assume we will be, and preserve anything we +%% have on disk. +terminate(_Reason, #state { backing_queue_state = undefined }) -> + %% We've received a delete_and_terminate from gm, thus nothing to + %% do here. + ok; +terminate(Reason, #state { q = Q, + gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = RateTRef }) -> + ok = gm:leave(GM), + QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( + Q, BQ, BQS, RateTRef, [], []), + rabbit_amqqueue_process:terminate(Reason, QueueState); +terminate([_SPid], _Reason) -> + %% gm case + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +handle_pre_hibernate(State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + %% mainly copied from amqqueue_process + {RamDuration, BQS1} = BQ:ram_duration(BQS), + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), + BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), + BQS3 = BQ:handle_pre_hibernate(BQS2), + {hibernate, stop_rate_timer(State #state { backing_queue_state = BQS3 })}. + +prioritise_call(Msg, _From, _State) -> + case Msg of + {maybe_run_queue_via_backing_queue, _Mod, _Fun} -> 6; + {gm_deaths, _Deaths} -> 5; + _ -> 0 + end. + +prioritise_cast(Msg, _State) -> + case Msg of + update_ram_duration -> 8; + {set_ram_duration_target, _Duration} -> 8; + {set_maximum_since_use, _Age} -> 8; + {maybe_run_queue_via_backing_queue, _Mod, _Fun} -> 6; + sync_timeout -> 6; + {gm, _Msg} -> 5; + _ -> 0 + end. + +%% --------------------------------------------------------------------------- +%% GM +%% --------------------------------------------------------------------------- + +joined([SPid], _Members) -> + SPid ! {joined, self()}, + ok. + +members_changed([_SPid], _Births, []) -> + ok; +members_changed([SPid], _Births, Deaths) -> + rabbit_misc:with_exit_handler( + fun () -> {stop, normal} end, + fun () -> + case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of + ok -> + ok; + {promote, CPid} -> + {become, rabbit_mirror_queue_coordinator, [CPid]} + end + end). + +handle_msg([_SPid], _From, heartbeat) -> + ok; +handle_msg([SPid], _From, Msg) -> + ok = gen_server2:cast(SPid, {gm, Msg}). + +%% --------------------------------------------------------------------------- +%% Others +%% --------------------------------------------------------------------------- + +maybe_run_queue_via_backing_queue( + Mod, Fun, State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + {MsgIds, BQS1} = BQ:invoke(Mod, Fun, BQS), + confirm_messages(MsgIds, State #state { backing_queue_state = BQS1 }). + + +needs_confirming(#delivery{ msg_seq_no = undefined }, _State) -> + never; +needs_confirming(#delivery { message = #basic_message { + is_persistent = true } }, + #state { q = #amqqueue { durable = true } }) -> + eventually; +needs_confirming(_Delivery, _State) -> + immediately. + +confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> + {MS1, CMs} = + lists:foldl( + fun (MsgId, {MSN, CMsN} = Acc) -> + %% We will never see {confirmed, ChPid} here. + case dict:find(MsgId, MSN) of + error -> + %% If it needed confirming, it'll have + %% already been done. + Acc; + {ok, {published, ChPid}} -> + %% Still not seen it from the channel, just + %% record that it's been confirmed. + {dict:store(MsgId, {confirmed, ChPid}, MSN), CMsN}; + {ok, {published, ChPid, MsgSeqNo}} -> + %% Seen from both GM and Channel. Can now + %% confirm. + {dict:erase(MsgId, MSN), + gb_trees_cons(ChPid, MsgSeqNo, CMsN)} + end + end, {MS, gb_trees:empty()}, MsgIds), + gb_trees:map(fun (ChPid, MsgSeqNos) -> + ok = rabbit_channel:confirm(ChPid, MsgSeqNos) + end, CMs), + State #state { msg_id_status = MS1 }. + +gb_trees_cons(Key, Value, Tree) -> + case gb_trees:lookup(Key, Tree) of + {value, Values} -> gb_trees:update(Key, [Value | Values], Tree); + none -> gb_trees:insert(Key, [Value], Tree) + end. + +handle_process_result({ok, State}) -> noreply(State); +handle_process_result({stop, State}) -> {stop, normal, State}. + +promote_me(From, #state { q = Q, + gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = RateTRef, + sender_queues = SQ, + msg_id_ack = MA, + msg_id_status = MS }) -> + rabbit_log:info("Promoting slave ~p for ~s~n", + [self(), rabbit_misc:rs(Q #amqqueue.name)]), + {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, GM), + true = unlink(GM), + gen_server2:reply(From, {promote, CPid}), + ok = gm:confirmed_broadcast(GM, heartbeat), + MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( + CPid, BQ, BQS, GM, MS), + %% We have to do the requeue via this init because otherwise we + %% don't have access to the relevent MsgPropsFun. Also, we are + %% already in mnesia as the master queue pid. Thus we cannot just + %% publish stuff by sending it to ourself - we must pass it + %% through to this init, otherwise we can violate ordering + %% constraints. + + %% MTC should contain only entries for which we are still + %% expecting confirms to come back to use from the underlying BQ. + + %% TODO: what do we do with entries in MS that are 'confirmed' + %% already? Well they should end up in the master queue's state, + %% and the confirms should be issued either by the + %% amqqueue_process if 'immediately', or otherwise by the master + %% queue on validate_message?! That's disgusting. There's no way + %% validate_message should be side-effecting... though we could at + %% least ensure it's idempotent. Hmm. + MTC = dict:from_list( + [{MsgId, {ChPid, MsgSeqNo}} || + {MsgId, {published, ChPid, MsgSeqNo}} <- dict:to_list(MS)]), + AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)], + Deliveries = lists:append([queue:to_list(PubQ) + || {_ChPid, PubQ} <- dict:to_list(SQ)]), + QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( + Q, rabbit_mirror_queue_master, MasterState, RateTRef, + AckTags, Deliveries, MTC), + {become, rabbit_amqqueue_process, QueueState, hibernate}. + +noreply(State) -> + {NewState, Timeout} = next_state(State), + {noreply, NewState, Timeout}. + +reply(Reply, State) -> + {NewState, Timeout} = next_state(State), + {reply, Reply, NewState, Timeout}. + +next_state(State) -> + State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = + ensure_rate_timer(State), + case BQ:needs_idle_timeout(BQS) of + true -> {ensure_sync_timer(State1), 0}; + false -> {stop_sync_timer(State1), hibernate} + end. + +%% copied+pasted from amqqueue_process +backing_queue_idle_timeout(State = #state { backing_queue = BQ }) -> + maybe_run_queue_via_backing_queue( + BQ, fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State). + +ensure_sync_timer(State = #state { sync_timer_ref = undefined }) -> + {ok, TRef} = timer:apply_after( + ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]), + State #state { sync_timer_ref = TRef }; +ensure_sync_timer(State) -> + State. + +stop_sync_timer(State = #state { sync_timer_ref = undefined }) -> + State; +stop_sync_timer(State = #state { sync_timer_ref = TRef }) -> + {ok, cancel} = timer:cancel(TRef), + State #state { sync_timer_ref = undefined }. + +ensure_rate_timer(State = #state { rate_timer_ref = undefined }) -> + {ok, TRef} = timer:apply_after( + ?RAM_DURATION_UPDATE_INTERVAL, + rabbit_amqqueue, update_ram_duration, + [self()]), + State #state { rate_timer_ref = TRef }; +ensure_rate_timer(State = #state { rate_timer_ref = just_measured }) -> + State #state { rate_timer_ref = undefined }; +ensure_rate_timer(State) -> + State. + +stop_rate_timer(State = #state { rate_timer_ref = undefined }) -> + State; +stop_rate_timer(State = #state { rate_timer_ref = just_measured }) -> + State #state { rate_timer_ref = undefined }; +stop_rate_timer(State = #state { rate_timer_ref = TRef }) -> + {ok, cancel} = timer:cancel(TRef), + State #state { rate_timer_ref = undefined }. + +maybe_enqueue_message( + Delivery = #delivery { message = #basic_message { id = MsgId }, + msg_seq_no = MsgSeqNo, + sender = ChPid }, + State = #state { sender_queues = SQ, + msg_id_status = MS }) -> + %% We will never see {published, ChPid, MsgSeqNo} here. + case dict:find(MsgId, MS) of + error -> + MQ = case dict:find(ChPid, SQ) of + {ok, MQ1} -> MQ1; + error -> queue:new() + end, + SQ1 = dict:store(ChPid, queue:in(Delivery, MQ), SQ), + State #state { sender_queues = SQ1 }; + {ok, {confirmed, ChPid}} -> + %% BQ has confirmed it but we didn't know what the + %% msg_seq_no was at the time. We do now! + ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + State #state { msg_id_status = dict:erase(MsgId, MS) }; + {ok, {published, ChPid}} -> + %% It was published to the BQ and we didn't know the + %% msg_seq_no so couldn't confirm it at the time. + case needs_confirming(Delivery, State) of + never -> + State #state { msg_id_status = dict:erase(MsgId, MS) }; + eventually -> + State #state { + msg_id_status = + dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) }; + immediately -> + ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + State #state { msg_id_status = dict:erase(MsgId, MS) } + end + end. + +process_instruction( + {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId }}, + State = #state { sender_queues = SQ, + backing_queue = BQ, + backing_queue_state = BQS, + msg_id_ack = MA, + msg_id_status = MS }) -> + + %% We really are going to do the publish right now, even though we + %% may not have seen it directly from the channel. As a result, we + %% may know that it needs confirming without knowing its + %% msg_seq_no, which means that we can see the confirmation come + %% back from the backing queue without knowing the msg_seq_no, + %% which means that we're going to have to hang on to the fact + %% that we've seen the msg_id confirmed until we can associate it + %% with a msg_seq_no. + MS1 = dict:store(MsgId, {published, ChPid}, MS), + {SQ1, MS2} = + case dict:find(ChPid, SQ) of + error -> + {SQ, MS1}; + {ok, MQ} -> + case queue:out(MQ) of + {empty, _MQ} -> + {SQ, MS1}; + {{value, Delivery = #delivery { + msg_seq_no = MsgSeqNo, + message = #basic_message { id = MsgId } }}, + MQ1} -> + %% We received the msg from the channel + %% first. Thus we need to deal with confirms + %% here. + {dict:store(ChPid, MQ1, SQ), + case needs_confirming(Delivery, State) of + never -> + MS; + eventually -> + dict:store( + MsgId, {published, ChPid, MsgSeqNo}, MS); + immediately -> + ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + MS + end}; + {{value, #delivery {}}, _MQ1} -> + %% The instruction was sent to us before we + %% were within the mirror_pids within the + %% #amqqueue{} record. We'll never receive the + %% message directly from the channel. And the + %% channel will not be expecting any confirms + %% from us. + {SQ, MS} + end + end, + + State1 = State #state { sender_queues = SQ1, + msg_id_status = MS2 }, + %% we probably want to work in BQ:validate_message here + {ok, + case Deliver of + false -> + BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + State1 #state { backing_queue_state = BQS1 }; + {true, AckRequired} -> + {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, + ChPid, BQS), + MA1 = case AckRequired of + true -> dict:store(MsgId, AckTag, MA); + false -> MA + end, + State1 #state { backing_queue_state = BQS1, + msg_id_ack = MA1 } + end}; +process_instruction({set_length, Length}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + QLen = BQ:len(BQS), + ToDrop = QLen - Length, + {ok, case ToDrop > 0 of + true -> BQS1 = + lists:foldl( + fun (const, BQSN) -> + {{_Msg, _IsDelivered, _AckTag, _Remaining}, + BQSN1} = BQ:fetch(false, BQSN), + BQSN1 + end, BQS, lists:duplicate(ToDrop, const)), + State #state { backing_queue_state = BQS1 }; + false -> State + end}; +process_instruction({fetch, AckRequired, MsgId, Remaining}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS, + msg_id_ack = MA }) -> + QLen = BQ:len(BQS), + {ok, case QLen - 1 of + Remaining -> + {{_Msg, _IsDelivered, AckTag, Remaining}, BQS1} = + BQ:fetch(AckRequired, BQS), + MA1 = case AckRequired of + true -> dict:store(MsgId, AckTag, MA); + false -> MA + end, + State #state { backing_queue_state = BQS1, + msg_id_ack = MA1 }; + Other when Other < Remaining -> + %% we must be shorter than the master + State + end}; +process_instruction({ack, MsgIds}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS, + msg_id_ack = MA }) -> + {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), + {MsgIds1, BQS1} = BQ:ack(AckTags, BQS), + [] = MsgIds1 -- MsgIds, %% ASSERTION + {ok, State #state { msg_id_ack = MA1, + backing_queue_state = BQS1 }}; +process_instruction({requeue, MsgPropsFun, MsgIds}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS, + msg_id_ack = MA }) -> + {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), + {ok, case length(AckTags) =:= length(MsgIds) of + true -> + {MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS), + State #state { msg_id_ack = MA1, + backing_queue_state = BQS1 }; + false -> + %% the only thing we can safely do is nuke out our BQ + %% and MA + {_Count, BQS1} = BQ:purge(BQS), + {MsgIds, BQS2} = ack_all(BQ, MA, BQS1), + State #state { msg_id_ack = dict:new(), + backing_queue_state = BQS2 } + end}; +process_instruction(delete_and_terminate, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + BQ:delete_and_terminate(BQS), + {stop, State #state { backing_queue_state = undefined }}. + +msg_ids_to_acktags(MsgIds, MA) -> + {AckTags, MA1} = + lists:foldl(fun (MsgId, {AckTagsN, MAN}) -> + case dict:find(MsgId, MA) of + error -> {AckTagsN, MAN}; + {ok, AckTag} -> {[AckTag | AckTagsN], + dict:erase(MsgId, MAN)} + end + end, {[], MA}, MsgIds), + {lists:reverse(AckTags), MA1}. + +ack_all(BQ, MA, BQS) -> + BQ:ack([AckTag || {_MsgId, AckTag} <- dict:to_list(MA)], BQS). diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl new file mode 100644 index 0000000000..80c0520c08 --- /dev/null +++ b/src/rabbit_mirror_queue_slave_sup.erl @@ -0,0 +1,54 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_slave_sup). + +-rabbit_boot_step({mirror_queue_slave_sup, + [{description, "mirror queue slave sup"}, + {mfa, {rabbit_mirror_queue_slave_sup, start, []}}, + {requires, queue_sup_queue_recovery}, + {enables, routing_ready}]}). + +-behaviour(supervisor2). + +-export([start/0, start_link/0, start_child/2]). + +-export([init/1]). + +-include_lib("rabbit.hrl"). + +-define(SERVER, ?MODULE). + +start() -> + {ok, _} = + supervisor:start_child( + rabbit_sup, + {rabbit_mirror_queue_slave_sup, + {rabbit_mirror_queue_slave_sup, start_link, []}, + transient, infinity, supervisor, [rabbit_mirror_queue_slave_sup]}), + ok. + +start_link() -> + supervisor2:start_link({local, ?SERVER}, ?MODULE, []). + +start_child(Node, Args) -> + supervisor2:start_child({?SERVER, Node}, Args). + +init([]) -> + {ok, {{simple_one_for_one_terminate, 10, 10}, + [{rabbit_mirror_queue_slave, + {rabbit_mirror_queue_slave, start_link, []}, + temporary, ?MAX_WAIT, worker, [rabbit_mirror_queue_slave]}]}}. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 66436920d4..884db799f7 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -216,7 +216,8 @@ table_definitions() -> {rabbit_queue, [{record_name, amqqueue}, {attributes, record_info(fields, amqqueue)}, - {match, #amqqueue{name = queue_name_match(), _='_'}}]}]. + {match, #amqqueue{name = queue_name_match(), _='_'}}]}] + ++ gm:table_definitions(). binding_match() -> #binding{source = exchange_name_match(), diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index f6a1c92fcc..4f68356440 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -113,7 +113,9 @@ check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}. lookup_qpids(QNames) -> lists:foldl(fun (QName, QPids) -> case mnesia:dirty_read({rabbit_queue, QName}) of - [#amqqueue{pid = QPid}] -> [QPid | QPids]; - [] -> QPids + [#amqqueue{pid = QPid, mirror_pids = MPids}] -> + MPids ++ [QPid | QPids]; + [] -> + QPids end end, [], QNames). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 9547cae5f6..e9b8a02063 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2090,7 +2090,7 @@ variable_queue_publish(IsPersistent, Count, VQ) -> true -> 2; false -> 1 end}, <<>>), - #message_properties{}, VQN) + #message_properties{}, self(), VQN) end, VQ, lists:seq(1, Count)). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> @@ -2108,9 +2108,13 @@ assert_prop(List, Prop, Value) -> assert_props(List, PropVals) -> [assert_prop(List, Prop, Value) || {Prop, Value} <- PropVals]. +test_amqqueue(Durable) -> + (rabbit_amqqueue:pseudo_queue(test_queue(), self())) + #amqqueue { durable = Durable }. + with_fresh_variable_queue(Fun) -> ok = empty_test_queue(), - VQ = rabbit_variable_queue:init(test_queue(), true, false, + VQ = rabbit_variable_queue:init(test_amqqueue(true), false, fun nop/2, fun nop/1), S0 = rabbit_variable_queue:status(VQ), assert_props(S0, [{q1, 0}, {q2, 0}, @@ -2169,7 +2173,7 @@ test_dropwhile(VQ0) -> rabbit_basic:message( rabbit_misc:r(<<>>, exchange, <<>>), <<>>, #'P_basic'{}, <<>>), - #message_properties{expiry = N}, VQN) + #message_properties{expiry = N}, self(), VQN) end, VQ0, lists:seq(1, Count)), %% drop the first 5 messages @@ -2213,7 +2217,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% drain {VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7), - VQ9 = rabbit_variable_queue:ack(AckTags, VQ8), + {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8), {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -2223,7 +2227,7 @@ publish_fetch_and_ack(0, _Len, VQ0) -> publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), - VQ3 = rabbit_variable_queue:ack([AckTag], VQ2), + {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2), publish_fetch_and_ack(N-1, Len, VQ3). test_variable_queue_partial_segments_delta_thing(VQ0) -> @@ -2257,7 +2261,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> {len, HalfSegment + 1}]), {VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false, HalfSegment + 1, VQ7), - VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), + {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), %% should be empty now {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -2286,7 +2290,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false, Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true, + VQ7 = rabbit_variable_queue:init(test_amqqueue(true), true, fun nop/2, fun nop/1), {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), @@ -2300,10 +2304,11 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), VQ2 = variable_queue_publish(false, 4, VQ1), {VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2), - VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), + {_Guids, VQ4} = + rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true, + VQ7 = rabbit_variable_queue:init(test_amqqueue(true), true, fun nop/2, fun nop/1), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. @@ -2311,7 +2316,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> test_queue_recover() -> Count = 2 * rabbit_queue_index:next_segment_boundary(0), TxID = rabbit_guid:guid(), - {new, #amqqueue { pid = QPid, name = QName }} = + {new, #amqqueue { pid = QPid, name = QName } = Q} = rabbit_amqqueue:declare(test_queue(), true, false, [], none), [begin Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>), @@ -2335,7 +2340,7 @@ test_queue_recover() -> {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), - VQ1 = rabbit_variable_queue:init(QName, true, true, + VQ1 = rabbit_variable_queue:init(Q, true, fun nop/2, fun nop/1), {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 1f0f8bbeac..aa174e96ab 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -124,7 +124,8 @@ auto_delete :: boolean(), exclusive_owner :: rabbit_types:maybe(pid()), arguments :: rabbit_framing:amqp_table(), - pid :: rabbit_types:maybe(pid())}). + pid :: rabbit_types:maybe(pid()), + mirror_pids :: [pid()]}). -type(exchange() :: #exchange{name :: rabbit_exchange:name(), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index be6691e9af..c9d96db753 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -16,18 +16,18 @@ -module(rabbit_variable_queue). --export([init/3, terminate/1, delete_and_terminate/1, - purge/1, publish/3, publish_delivered/4, fetch/2, ack/2, - tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, +-export([init/2, terminate/1, delete_and_terminate/1, + purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, + tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, - status/1, multiple_routing_keys/0]). + status/1, invoke/3, validate_message/2, multiple_routing_keys/0]). -export([start/1, stop/0]). %% exported for testing only --export([start_msg_store/2, stop_msg_store/0, init/5]). +-export([start_msg_store/2, stop_msg_store/0, init/4]). %%---------------------------------------------------------------------------- %% Definitions: @@ -397,15 +397,16 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). -init(QueueName, IsDurable, Recover) -> +init(Queue, Recover) -> Self = self(), - init(QueueName, IsDurable, Recover, + init(Queue, Recover, fun (MsgIds, ActionTaken) -> msgs_written_to_disk(Self, MsgIds, ActionTaken) end, fun (MsgIds) -> msg_indices_written_to_disk(Self, MsgIds) end). -init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) -> +init(#amqqueue { name = QueueName, durable = IsDurable }, false, + MsgOnDiskFun, MsgIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), init(IsDurable, IndexState, 0, [], case IsDurable of @@ -415,7 +416,8 @@ init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) -> end, msg_store_client_init(?TRANSIENT_MSG_STORE, undefined)); -init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) -> +init(#amqqueue { name = QueueName }, true, + MsgOnDiskFun, MsgIdxOnDiskFun) -> Terms = rabbit_queue_index:shutdown_terms(QueueName), {PRef, TRef, Terms1} = case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of @@ -505,13 +507,13 @@ purge(State = #vqstate { q4 = Q4, ram_index_count = 0, persistent_count = PCount1 })}. -publish(Msg, MsgProps, State) -> +publish(Msg, MsgProps, _ChPid, State) -> {_SeqId, State1} = publish(Msg, MsgProps, false, false, State), a(reduce_memory_use(State1)). publish_delivered(false, #basic_message { id = MsgId }, #message_properties { needs_confirming = NeedsConfirming }, - State = #vqstate { len = 0 }) -> + _ChPid, State = #vqstate { len = 0 }) -> case NeedsConfirming of true -> blind_confirm(self(), gb_sets:singleton(MsgId)); false -> ok @@ -521,6 +523,7 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, + _ChPid, State = #vqstate { len = 0, next_seq_id = SeqId, out_counter = OutCount, @@ -650,13 +653,14 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { persistent_count = PCount1 })}. ack(AckTags, State) -> - a(ack(fun msg_store_remove/3, - fun (_, State0) -> State0 end, - AckTags, State)). + {MsgIds, State1} = ack(fun msg_store_remove/3, + fun (_, State0) -> State0 end, + AckTags, State), + {MsgIds, a(State1)}. tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps, - State = #vqstate { durable = IsDurable, - msg_store_clients = MSCState }) -> + _ChPid, State = #vqstate { durable = IsDurable, + msg_store_clients = MSCState }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }), case IsPersistent andalso IsDurable of @@ -707,7 +711,7 @@ requeue(AckTags, MsgPropsFun, State) -> (MsgPropsFun(MsgProps)) #message_properties { needs_confirming = false } end, - a(reduce_memory_use( + {MsgIds, State1} = ack(fun msg_store_release/3, fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) -> {_SeqId, State2} = publish(Msg, MsgPropsFun1(MsgProps), @@ -722,7 +726,8 @@ requeue(AckTags, MsgPropsFun, State) -> true, true, State2), State3 end, - AckTags, State))). + AckTags, State), + {MsgIds, a(reduce_memory_use(State1))}. len(#vqstate { len = Len }) -> Len. @@ -860,6 +865,11 @@ status(#vqstate { {avg_ack_ingress_rate, AvgAckIngressRate}, {avg_ack_egress_rate , AvgAckEgressRate} ]. +invoke(?MODULE, Fun, State) -> + Fun(State). + +validate_message(_Msg, State) -> {valid, State}. + %%---------------------------------------------------------------------------- %% Minor helpers %%---------------------------------------------------------------------------- @@ -971,7 +981,7 @@ msg_store_close_fds_fun(IsPersistent) -> Self = self(), fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - Self, + Self, ?MODULE, fun (State = #vqstate { msg_store_clients = MSCState }) -> {ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent), @@ -1117,10 +1127,11 @@ blank_rate(Timestamp, IngressLength) -> msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun) -> Self = self(), F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( - Self, fun (StateN) -> {[], tx_commit_post_msg_store( - true, Pubs, AckTags, - Fun, MsgPropsFun, StateN)} - end) + Self, ?MODULE, + fun (StateN) -> {[], tx_commit_post_msg_store( + true, Pubs, AckTags, + Fun, MsgPropsFun, StateN)} + end) end, fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler( fun () -> remove_persistent_messages( @@ -1183,20 +1194,21 @@ tx_commit_index(State = #vqstate { on_sync = #sync { Acks = lists:append(SAcks), Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs), {Msg, MsgProps} <- lists:reverse(PubsN)], - {SeqIds, State1 = #vqstate { index_state = IndexState }} = + {_MsgIds, State1} = ack(Acks, State), + {SeqIds, State2 = #vqstate { index_state = IndexState }} = lists:foldl( fun ({Msg = #basic_message { is_persistent = IsPersistent }, MsgProps}, - {SeqIdsAcc, State2}) -> + {SeqIdsAcc, State3}) -> IsPersistent1 = IsDurable andalso IsPersistent, - {SeqId, State3} = - publish(Msg, MsgProps, false, IsPersistent1, State2), - {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} - end, {PAcks, ack(Acks, State)}, Pubs), + {SeqId, State4} = + publish(Msg, MsgProps, false, IsPersistent1, State3), + {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State4} + end, {PAcks, State1}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), [ Fun() || Fun <- lists:reverse(SFuns) ], reduce_memory_use( - State1 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }). + State2 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }). purge_betas_and_deltas(LensByStore, State = #vqstate { q3 = Q3, @@ -1343,7 +1355,7 @@ remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, index_state = IndexState, msg_store_clients = MSCState }) -> - {PersistentSeqIds, MsgIdsByStore} = + {PersistentSeqIds, MsgIdsByStore, _AllMsgIds} = dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA), State1 = State #vqstate { pending_ack = dict:new(), ram_ack_index = gb_trees:empty() }, @@ -1362,9 +1374,9 @@ remove_pending_ack(KeepPersistent, end. ack(_MsgStoreFun, _Fun, [], State) -> - State; + {[], State}; ack(MsgStoreFun, Fun, AckTags, State) -> - {{PersistentSeqIds, MsgIdsByStore}, + {{PersistentSeqIds, MsgIdsByStore, AllMsgIds}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, persistent_count = PCount, @@ -1384,21 +1396,24 @@ ack(MsgStoreFun, Fun, AckTags, State) -> || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len( orddict:new(), MsgIdsByStore)), - State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1, - ack_out_counter = AckOutCount + length(AckTags) }. + {lists:reverse(AllMsgIds), + State1 #vqstate { index_state = IndexState1, + persistent_count = PCount1, + ack_out_counter = AckOutCount + length(AckTags) }}. -accumulate_ack_init() -> {[], orddict:new()}. +accumulate_ack_init() -> {[], orddict:new(), []}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, - index_on_disk = false }, - {PersistentSeqIdsAcc, MsgIdsByStore}) -> - {PersistentSeqIdsAcc, MsgIdsByStore}; + index_on_disk = false, + msg_id = MsgId }, + {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> + {PersistentSeqIdsAcc, MsgIdsByStore, [MsgId | AllMsgIds]}; accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps}, - {PersistentSeqIdsAcc, MsgIdsByStore}) -> + {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> {cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc), - rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore)}. + rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore), + [MsgId | AllMsgIds]}. find_persistent_count(LensByStore) -> case orddict:find(true, LensByStore) of @@ -1444,33 +1459,35 @@ msgs_confirmed(MsgIdSet, State) -> blind_confirm(QPid, MsgIdSet) -> rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - QPid, fun (State) -> msgs_confirmed(MsgIdSet, State) end). + QPid, ?MODULE, fun (State) -> msgs_confirmed(MsgIdSet, State) end). msgs_written_to_disk(QPid, MsgIdSet, removed) -> blind_confirm(QPid, MsgIdSet); msgs_written_to_disk(QPid, MsgIdSet, written) -> rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - QPid, fun (State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> - Written = gb_sets:intersection(UC, MsgIdSet), - msgs_confirmed(gb_sets:intersection(MsgIdSet, MIOD), - State #vqstate { - msgs_on_disk = - gb_sets:union(MOD, Written) }) - end). + QPid, ?MODULE, + fun (State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + msgs_confirmed(gb_sets:intersection(MsgIdSet, MIOD), + State #vqstate { + msgs_on_disk = + gb_sets:union( + MOD, gb_sets:intersection(UC, MsgIdSet)) }) + end). msg_indices_written_to_disk(QPid, MsgIdSet) -> rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( - QPid, fun (State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> - Written = gb_sets:intersection(UC, MsgIdSet), - msgs_confirmed(gb_sets:intersection(MsgIdSet, MOD), - State #vqstate { - msg_indices_on_disk = - gb_sets:union(MIOD, Written) }) - end). + QPid, ?MODULE, + fun (State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + msgs_confirmed(gb_sets:intersection(MsgIdSet, MOD), + State #vqstate { + msg_indices_on_disk = + gb_sets:union( + MIOD, gb_sets:intersection(UC, MsgIdSet)) }) + end). %%---------------------------------------------------------------------------- %% Phase changes |
