diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-04-10 12:21:10 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-04-10 12:21:10 +0100 |
| commit | 3fe72bb890545f839265f3a2a00f889170ade6d1 (patch) | |
| tree | d801c166dca1ec7700f1f6046ead9f447963c682 | |
| parent | 043369e89c9ad3090fca99b9c9df97a1aff5ef4a (diff) | |
| parent | db207126bb61a4d251dc2755d15dd8aa5cb1a4e0 (diff) | |
| download | rabbitmq-server-git-3fe72bb890545f839265f3a2a00f889170ade6d1.tar.gz | |
Merging default into bug23554
| -rw-r--r-- | include/rabbit.hrl | 2 | ||||
| -rw-r--r-- | include/rabbit_backing_queue_spec.hrl | 31 | ||||
| -rw-r--r-- | src/gm_soak_test.erl | 8 | ||||
| -rw-r--r-- | src/rabbit.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 45 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 262 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process_utils.erl | 99 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 37 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 194 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 383 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 101 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 737 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave_sup.erl | 60 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 33 | ||||
| -rw-r--r-- | src/rabbit_types.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 133 |
19 files changed, 1932 insertions, 230 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 9f483c307d..f8c1a13d44 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -45,7 +45,7 @@ -record(exchange, {name, type, durable, auto_delete, internal, arguments}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, - arguments, pid}). + arguments, pid, mirror_pids}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index b2bf6bbbce..d9296bf631 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -25,23 +25,24 @@ -type(message_properties_transformer() :: fun ((rabbit_types:message_properties()) -> rabbit_types:message_properties())). --type(async_callback() :: fun ((fun ((state()) -> state())) -> 'ok')). --type(sync_callback() :: fun ((fun ((state()) -> state())) -> 'ok' | 'error')). +-type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). +-type(sync_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok' | 'error')). -spec(start/1 :: ([rabbit_amqqueue:name()]) -> 'ok'). -spec(stop/0 :: () -> 'ok'). --spec(init/5 :: (rabbit_amqqueue:name(), is_durable(), attempt_recovery(), +-spec(init/4 :: (rabbit_types:amqqueue(), attempt_recovery(), async_callback(), sync_callback()) -> state()). -spec(terminate/1 :: (state()) -> state()). -spec(delete_and_terminate/1 :: (state()) -> state()). -spec(purge/1 :: (state()) -> {purged_msg_count(), state()}). --spec(publish/3 :: (rabbit_types:basic_message(), - rabbit_types:message_properties(), state()) -> state()). --spec(publish_delivered/4 :: (true, rabbit_types:basic_message(), - rabbit_types:message_properties(), state()) +-spec(publish/4 :: (rabbit_types:basic_message(), + rabbit_types:message_properties(), pid(), state()) -> + state()). +-spec(publish_delivered/5 :: (true, rabbit_types:basic_message(), + rabbit_types:message_properties(), pid(), state()) -> {ack(), state()}; (false, rabbit_types:basic_message(), - rabbit_types:message_properties(), state()) + rabbit_types:message_properties(), pid(), state()) -> {undefined, state()}). -spec(drain_confirmed/1 :: (state()) -> {[rabbit_guid:guid()], state()}). -spec(dropwhile/2 :: @@ -49,16 +50,17 @@ -> state()). -spec(fetch/2 :: (true, state()) -> {fetch_result(ack()), state()}; (false, state()) -> {fetch_result(undefined), state()}). --spec(ack/2 :: ([ack()], state()) -> state()). --spec(tx_publish/4 :: (rabbit_types:txn(), rabbit_types:basic_message(), - rabbit_types:message_properties(), state()) -> state()). +-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). +-spec(tx_publish/5 :: (rabbit_types:txn(), rabbit_types:basic_message(), + rabbit_types:message_properties(), pid(), state()) -> + state()). -spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()). -spec(tx_rollback/2 :: (rabbit_types:txn(), state()) -> {[ack()], state()}). -spec(tx_commit/4 :: (rabbit_types:txn(), fun (() -> any()), message_properties_transformer(), state()) -> {[ack()], state()}). -spec(requeue/3 :: ([ack()], message_properties_transformer(), state()) - -> state()). + -> {[rabbit_guid:guid()], state()}). -spec(len/1 :: (state()) -> non_neg_integer()). -spec(is_empty/1 :: (state()) -> boolean()). -spec(set_ram_duration_target/2 :: @@ -68,3 +70,8 @@ -spec(idle_timeout/1 :: (state()) -> state()). -spec(handle_pre_hibernate/1 :: (state()) -> state()). -spec(status/1 :: (state()) -> [{atom(), any()}]). +-spec(invoke/3 :: (atom(), fun ((atom(), A) -> A), state()) -> state()). +-spec(is_duplicate/3 :: + (rabbit_types:txn(), rabbit_types:basic_message(), state()) -> + {'false'|'published'|'discarded', state()}). +-spec(discard/3 :: (rabbit_types:basic_message(), pid(), state()) -> state()). diff --git a/src/gm_soak_test.erl b/src/gm_soak_test.erl index dae42ac7b8..5e5a3a5a6f 100644 --- a/src/gm_soak_test.erl +++ b/src/gm_soak_test.erl @@ -80,12 +80,12 @@ handle_msg([], From, {test_msg, Num}) -> {ok, Num} -> ok; {ok, Num1} when Num < Num1 -> exit({{from, From}, - {duplicate_delivery_of, Num1}, - {expecting, Num}}); + {duplicate_delivery_of, Num}, + {expecting, Num1}}); {ok, Num1} -> exit({{from, From}, - {missing_delivery_of, Num}, - {received_early, Num1}}); + {received_early, Num}, + {expecting, Num1}}); error -> exit({{from, From}, {received_premature_delivery, Num}}) diff --git a/src/rabbit.erl b/src/rabbit.erl index 0731613883..50fd9c4a65 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -36,6 +36,12 @@ []}}, {enables, external_infrastructure}]}). +-rabbit_boot_step({rabbit_registry, + [{description, "plugin registry"}, + {mfa, {rabbit_sup, start_child, + [rabbit_registry]}}, + {enables, external_infrastructure}]}). + -rabbit_boot_step({database, [{mfa, {rabbit_mnesia, init, []}}, {requires, file_handle_cache}, @@ -55,13 +61,6 @@ -rabbit_boot_step({external_infrastructure, [{description, "external infrastructure ready"}]}). --rabbit_boot_step({rabbit_registry, - [{description, "plugin registry"}, - {mfa, {rabbit_sup, start_child, - [rabbit_registry]}}, - {requires, external_infrastructure}, - {enables, kernel_ready}]}). - -rabbit_boot_step({rabbit_log, [{description, "logging server"}, {mfa, {rabbit_sup, start_restartable_child, diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 77d3841bc5..a681041d93 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -27,10 +27,12 @@ -export([notify_sent/2, unblock/2, flush_all/2]). -export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). +-export([store_queue/1]). + %% internal -export([internal_declare/2, internal_delete/1, - run_backing_queue/2, run_backing_queue_async/2, + run_backing_queue/3, run_backing_queue_async/3, sync_timeout/1, update_ram_duration/1, set_ram_duration_target/2, set_maximum_since_use/2, maybe_expire/1, drop_expired/1, emit_stats/1]). @@ -141,10 +143,12 @@ rabbit_types:connection_exit() | fun ((boolean()) -> rabbit_types:ok_or_error('not_found') | rabbit_types:connection_exit())). --spec(run_backing_queue/2 :: - (pid(), (fun ((A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). --spec(run_backing_queue_async/2 :: - (pid(), (fun ((A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). +-spec(run_backing_queue/3 :: + (pid(), atom(), + (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). +-spec(run_backing_queue_async/3 :: + (pid(), atom(), + (fun ((atom(), A) -> {[rabbit_types:msg_id()], A}))) -> 'ok'). -spec(sync_timeout/1 :: (pid()) -> 'ok'). -spec(update_ram_duration/1 :: (pid()) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). @@ -191,12 +195,13 @@ recover_durable_queues(DurableQueues) -> declare(QueueName, Durable, AutoDelete, Args, Owner) -> ok = check_declare_arguments(QueueName, Args), - Q = start_queue_process(#amqqueue{name = QueueName, - durable = Durable, - auto_delete = AutoDelete, - arguments = Args, + Q = start_queue_process(#amqqueue{name = QueueName, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args, exclusive_owner = Owner, - pid = none}), + pid = none, + mirror_pids = []}), case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of not_found -> rabbit_misc:not_found(QueueName); Q1 -> Q1 @@ -438,11 +443,11 @@ internal_delete(QueueName) -> end end). -run_backing_queue(QPid, Fun) -> - gen_server2:call(QPid, {run_backing_queue, Fun}, infinity). +run_backing_queue(QPid, Mod, Fun) -> + gen_server2:call(QPid, {run_backing_queue, Mod, Fun}, infinity). -run_backing_queue_async(QPid, Fun) -> - gen_server2:cast(QPid, {run_backing_queue, Fun}). +run_backing_queue_async(QPid, Mod, Fun) -> + gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}). sync_timeout(QPid) -> gen_server2:cast(QPid, sync_timeout). @@ -465,7 +470,8 @@ drop_expired(QPid) -> on_node_down(Node) -> rabbit_misc:execute_mnesia_transaction( fun () -> qlc:e(qlc:q([delete_queue(QueueName) || - #amqqueue{name = QueueName, pid = Pid} + #amqqueue{name = QueueName, pid = Pid, + mirror_pids = []} <- mnesia:table(rabbit_queue), node(Pid) == Node])) end, @@ -482,11 +488,12 @@ delete_queue(QueueName) -> rabbit_binding:remove_transient_for_destination(QueueName). pseudo_queue(QueueName, Pid) -> - #amqqueue{name = QueueName, - durable = false, + #amqqueue{name = QueueName, + durable = false, auto_delete = false, - arguments = [], - pid = Pid}. + arguments = [], + pid = Pid, + mirror_pids = []}. safe_delegate_call_ok(F, Pids) -> case delegate:invoke(Pids, fun (Pid) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2b0fe17e54..3bcdf70694 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -21,8 +21,6 @@ -behaviour(gen_server2). -define(UNSENT_MESSAGE_LIMIT, 100). --define(SYNC_INTERVAL, 25). %% milliseconds --define(RAM_DURATION_UPDATE_INTERVAL, 5000). -define(BASE_MESSAGE_PROPERTIES, #message_properties{expiry = undefined, needs_confirming = false}). @@ -33,7 +31,9 @@ handle_info/2, handle_pre_hibernate/1, prioritise_call/3, prioritise_cast/2, prioritise_info/2]). -%% Queue's state +-export([init_with_backing_queue_state/7]). + +% Queue's state -record(q, {q, exclusive_consumer, has_had_consumers, @@ -72,7 +72,8 @@ messages, consumers, memory, - backing_queue_status + backing_queue_status, + mirror_pids ]). -define(CREATION_EVENT_KEYS, @@ -97,12 +98,11 @@ info_keys() -> ?INFO_KEYS. init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), process_flag(trap_exit, true), - {ok, BQ} = application:get_env(backing_queue_module), {ok, #q{q = Q#amqqueue{pid = self()}, exclusive_consumer = none, has_had_consumers = false, - backing_queue = BQ, + backing_queue = backing_queue_module(Q), backing_queue_state = undefined, active_consumers = queue:new(), blocked_consumers = queue:new(), @@ -115,6 +115,34 @@ init(Q) -> msg_id_to_channel = dict:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. +init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, + RateTRef, AckTags, Deliveries, MTC) -> + ?LOGDEBUG("Queue starting - ~p~n", [Q]), + case Owner of + none -> ok; + _ -> erlang:monitor(process, Owner) + end, + State = requeue_and_run( + AckTags, + process_args( + #q{q = Q#amqqueue{pid = self()}, + exclusive_consumer = none, + has_had_consumers = false, + backing_queue = BQ, + backing_queue_state = BQS, + active_consumers = queue:new(), + blocked_consumers = queue:new(), + expires = undefined, + sync_timer_ref = undefined, + rate_timer_ref = RateTRef, + expiry_timer_ref = undefined, + ttl = undefined, + stats_timer = rabbit_event:init_stats_timer(), + msg_id_to_channel = MTC})), + lists:foldl( + fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, StateN) end, + State, Deliveries). + terminate(shutdown, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State); terminate({shutdown, _}, State = #q{backing_queue = BQ}) -> @@ -137,8 +165,7 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- declare(Recover, From, - State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable}, - backing_queue = BQ, backing_queue_state = undefined, + State = #q{q = Q, backing_queue = BQ, backing_queue_state = undefined, stats_timer = StatsTimer}) -> case rabbit_amqqueue:internal_declare(Q, Recover) of not_found -> {stop, normal, not_found, State}; @@ -149,7 +176,7 @@ declare(Recover, From, ok = rabbit_memory_monitor:register( self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), - BQS = bq_init(BQ, QName, IsDurable, Recover), + BQS = bq_init(BQ, Q, Recover), State1 = process_args(State#q{backing_queue_state = BQS}), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), @@ -159,17 +186,17 @@ declare(Recover, From, Q1 -> {stop, normal, {existing, Q1}, State} end. -bq_init(BQ, QName, IsDurable, Recover) -> +bq_init(BQ, Q, Recover) -> Self = self(), - BQ:init(QName, IsDurable, Recover, - fun (Fun) -> - rabbit_amqqueue:run_backing_queue_async(Self, Fun) + BQ:init(Q, Recover, + fun (Mod, Fun) -> + rabbit_amqqueue:run_backing_queue_async(Self, Mod, Fun) end, - fun (Fun) -> + fun (Mod, Fun) -> rabbit_misc:with_exit_handler( fun () -> error end, fun () -> - rabbit_amqqueue:run_backing_queue(Self, Fun) + rabbit_amqqueue:run_backing_queue(Self, Mod, Fun) end) end). @@ -226,37 +253,34 @@ next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> false -> {stop_sync_timer(State1), hibernate} end. -ensure_sync_timer(State = #q{sync_timer_ref = undefined}) -> - {ok, TRef} = timer:apply_after( - ?SYNC_INTERVAL, rabbit_amqqueue, sync_timeout, [self()]), - State#q{sync_timer_ref = TRef}; +backing_queue_module(#amqqueue{arguments = Args}) -> + case rabbit_misc:table_lookup(Args, <<"x-mirror">>) of + undefined -> {ok, BQM} = application:get_env(backing_queue_module), + BQM; + _Nodes -> rabbit_mirror_queue_master + end. + ensure_sync_timer(State) -> - State. + rabbit_amqqueue_process_utils:ensure_sync_timer( + fun sync_timer_getter/1, fun sync_timer_setter/2, State). + +stop_sync_timer(State) -> + rabbit_amqqueue_process_utils:stop_sync_timer( + fun sync_timer_getter/1, fun sync_timer_setter/2, State). + +sync_timer_getter(State) -> State#q.sync_timer_ref. +sync_timer_setter(Timer, State) -> State#q{sync_timer_ref = Timer}. -stop_sync_timer(State = #q{sync_timer_ref = undefined}) -> - State; -stop_sync_timer(State = #q{sync_timer_ref = TRef}) -> - {ok, cancel} = timer:cancel(TRef), - State#q{sync_timer_ref = undefined}. - -ensure_rate_timer(State = #q{rate_timer_ref = undefined}) -> - {ok, TRef} = timer:apply_after( - ?RAM_DURATION_UPDATE_INTERVAL, - rabbit_amqqueue, update_ram_duration, - [self()]), - State#q{rate_timer_ref = TRef}; -ensure_rate_timer(State = #q{rate_timer_ref = just_measured}) -> - State#q{rate_timer_ref = undefined}; ensure_rate_timer(State) -> - State. + rabbit_amqqueue_process_utils:ensure_rate_timer( + fun rate_timer_getter/1, fun rate_timer_setter/2, State). -stop_rate_timer(State = #q{rate_timer_ref = undefined}) -> - State; -stop_rate_timer(State = #q{rate_timer_ref = just_measured}) -> - State#q{rate_timer_ref = undefined}; -stop_rate_timer(State = #q{rate_timer_ref = TRef}) -> - {ok, cancel} = timer:cancel(TRef), - State#q{rate_timer_ref = undefined}. +stop_rate_timer(State) -> + rabbit_amqqueue_process_utils:stop_rate_timer( + fun rate_timer_getter/1, fun rate_timer_setter/2, State). + +rate_timer_getter(State) -> State#q.rate_timer_ref. +rate_timer_setter(Timer, State) -> State#q{rate_timer_ref = Timer}. stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) -> State; @@ -477,45 +501,70 @@ run_message_queue(State) -> {_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1), State2. -attempt_delivery(#delivery{txn = none, - sender = ChPid, - message = Message, - msg_seq_no = MsgSeqNo} = Delivery, - State = #q{backing_queue = BQ}) -> +attempt_delivery(Delivery = #delivery{txn = none, + sender = ChPid, + message = Message, + msg_seq_no = MsgSeqNo}, + State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> Confirm = should_confirm_message(Delivery, State), case Confirm of immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]); _ -> ok end, - PredFun = fun (IsEmpty, _State) -> not IsEmpty end, - DeliverFun = - fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> - %% we don't need an expiry here because messages are - %% not being enqueued, so we use an empty - %% message_properties. - {AckTag, BQS1} = - BQ:publish_delivered( - AckRequired, Message, - (?BASE_MESSAGE_PROPERTIES)#message_properties{ - needs_confirming = needs_confirming(Confirm)}, - BQS), - {{Message, false, AckTag}, true, - State1#q{backing_queue_state = BQS1}} - end, - {Delivered, State1} = - deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State), - {Delivered, Confirm, State1}; -attempt_delivery(#delivery{txn = Txn, - sender = ChPid, - message = Message} = Delivery, - State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - store_ch_record((ch_record(ChPid))#cr{txn = Txn}), - BQS1 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS), - {true, should_confirm_message(Delivery, State), - State#q{backing_queue_state = BQS1}}. + case BQ:is_duplicate(none, Message, BQS) of + {false, BQS1} -> + PredFun = fun (IsEmpty, _State) -> not IsEmpty end, + DeliverFun = + fun (AckRequired, false, + State1 = #q{backing_queue_state = BQS2}) -> + %% we don't need an expiry here because + %% messages are not being enqueued, so we use + %% an empty message_properties. + {AckTag, BQS3} = + BQ:publish_delivered( + AckRequired, Message, + (?BASE_MESSAGE_PROPERTIES)#message_properties{ + needs_confirming = needs_confirming(Confirm)}, + ChPid, BQS2), + {{Message, false, AckTag}, true, + State1#q{backing_queue_state = BQS3}} + end, + {Delivered, State2} = + deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, + State#q{backing_queue_state = BQS1}), + {Delivered, Confirm, State2}; + {Duplicate, BQS1} -> + %% if the message has previously been seen by the BQ then + %% it must have been seen under the same circumstances as + %% now: i.e. if it is now a deliver_immediately then it + %% must have been before. + Delivered = case Duplicate of + published -> true; + discarded -> false + end, + {Delivered, Confirm, State#q{backing_queue_state = BQS1}} + end; +attempt_delivery(Delivery = #delivery{txn = Txn, + sender = ChPid, + message = Message}, + State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + Confirm = should_confirm_message(Delivery, State), + case BQ:is_duplicate(Txn, Message, BQS) of + {false, BQS1} -> + store_ch_record((ch_record(ChPid))#cr{txn = Txn}), + BQS2 = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, ChPid, + BQS1), + {true, Confirm, State#q{backing_queue_state = BQS2}}; + {Duplicate, BQS1} -> + Delivered = case Duplicate of + published -> true; + discarded -> false + end, + {Delivered, Confirm, State#q{backing_queue_state = BQS1}} + end. -deliver_or_enqueue(Delivery = #delivery{message = Message}, State) -> +deliver_or_enqueue(Delivery = #delivery{message = Message, + sender = ChPid}, State) -> {Delivered, Confirm, State1} = attempt_delivery(Delivery, State), State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = maybe_record_confirm_message(Confirm, State1), @@ -525,14 +574,17 @@ deliver_or_enqueue(Delivery = #delivery{message = Message}, State) -> BQ:publish(Message, (message_properties(State)) #message_properties{ needs_confirming = needs_confirming(Confirm)}, - BQS), + ChPid, BQS), ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) end. requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> run_backing_queue( - fun (BQS) -> BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS) end, - State). + BQ, fun (M, BQS) -> + {_MsgIds, BQS1} = + M:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS), + BQS1 + end, State). fetch(AckRequired, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> @@ -635,10 +687,11 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. backing_queue_idle_timeout(State = #q{backing_queue = BQ}) -> - run_backing_queue(fun (BQS) -> BQ:idle_timeout(BQS) end, State). + run_backing_queue(BQ, fun (M, BQS) -> M:idle_timeout(BQS) end, State). -run_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> - run_message_queue(State#q{backing_queue_state = Fun(BQS)}). +run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + run_message_queue(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}). commit_transaction(Txn, From, C = #cr{acktags = ChAckTags}, State = #q{backing_queue = BQ, @@ -662,6 +715,12 @@ rollback_transaction(Txn, C, State = #q{backing_queue = BQ, subtract_acks(A, B) when is_list(B) -> lists:foldl(fun sets:del_element/2, A, B). +discard_delivery(#delivery{sender = ChPid, + message = Message}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + State#q{backing_queue_state = BQ:discard(Message, ChPid, BQS)}. + reset_msg_expiry_fun(TTL) -> fun(MsgProps) -> MsgProps#message_properties{expiry = calculate_msg_expiry(TTL)} @@ -733,6 +792,9 @@ i(memory, _) -> M; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); +i(mirror_pids, #q{q = #amqqueue{name = Name}}) -> + {ok, #amqqueue{mirror_pids = MPids}} = rabbit_amqqueue:lookup(Name), + MPids; i(Item, _) -> throw({bad_argument, Item}). @@ -768,11 +830,11 @@ emit_consumer_deleted(ChPid, ConsumerTag) -> prioritise_call(Msg, _From, _State) -> case Msg of - info -> 9; - {info, _Items} -> 9; - consumers -> 9; - {run_backing_queue, _Fun} -> 6; - _ -> 0 + info -> 9; + {info, _Items} -> 9; + consumers -> 9; + {run_backing_queue, _Mod, _Fun} -> 6; + _ -> 0 end. prioritise_cast(Msg, _State) -> @@ -788,7 +850,7 @@ prioritise_cast(Msg, _State) -> {reject, _AckTags, _Requeue, _ChPid} -> 7; {notify_sent, _ChPid} -> 7; {unblock, _ChPid} -> 7; - {run_backing_queue, _Fun} -> 6; + {run_backing_queue, _Mod, _Fun} -> 6; sync_timeout -> 6; _ -> 0 end. @@ -807,14 +869,14 @@ handle_call({init, Recover}, From, true -> erlang:monitor(process, Owner), declare(Recover, From, State); false -> #q{backing_queue = BQ, backing_queue_state = undefined, - q = #amqqueue{name = QName, durable = IsDurable}} = State, + q = #amqqueue{name = QName} = Q} = State, gen_server2:reply(From, not_found), case Recover of true -> ok; _ -> rabbit_log:warning( "Queue ~p exclusive owner went away~n", [QName]) end, - BQS = bq_init(BQ, QName, IsDurable, Recover), + BQS = bq_init(BQ, Q, Recover), %% Rely on terminate to delete the queue. {stop, normal, State#q{backing_queue_state = BQS}} end; @@ -848,7 +910,7 @@ handle_call({deliver_immediately, Delivery}, _From, State) -> {Delivered, Confirm, State1} = attempt_delivery(Delivery, State), reply(Delivered, case Delivered of true -> maybe_record_confirm_message(Confirm, State1); - false -> State1 + false -> discard_delivery(Delivery, State1) end); handle_call({deliver, Delivery}, From, State) -> @@ -1004,12 +1066,12 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> noreply(requeue_and_run(AckTags, State)) end; -handle_call({run_backing_queue, Fun}, _From, State) -> - reply(ok, run_backing_queue(Fun, State)). +handle_call({run_backing_queue, Mod, Fun}, _From, State) -> + reply(ok, run_backing_queue(Mod, Fun, State)). -handle_cast({run_backing_queue, Fun}, State) -> - noreply(run_backing_queue(Fun, State)); +handle_cast({run_backing_queue, Mod, Fun}, State) -> + noreply(run_backing_queue(Mod, Fun, State)); handle_cast(sync_timeout, State) -> noreply(backing_queue_idle_timeout(State#q{sync_timer_ref = undefined})); @@ -1028,7 +1090,7 @@ handle_cast({ack, Txn, AckTags, ChPid}, case Txn of none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), NewC = C#cr{acktags = ChAckTags1}, - BQS1 = BQ:ack(AckTags, BQS), + {_Guids, BQS1} = BQ:ack(AckTags, BQS), {NewC, State#q{backing_queue_state = BQS1}}; _ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS), {C#cr{txn = Txn}, @@ -1049,7 +1111,7 @@ handle_cast({reject, AckTags, Requeue, ChPid}, maybe_store_ch_record(C#cr{acktags = ChAckTags1}), noreply(case Requeue of true -> requeue_and_run(AckTags, State); - false -> BQS1 = BQ:ack(AckTags, BQS), + false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS), State#q{backing_queue_state = BQS1} end) end; @@ -1160,15 +1222,11 @@ handle_pre_hibernate(State = #q{backing_queue_state = undefined}) -> handle_pre_hibernate(State = #q{backing_queue = BQ, backing_queue_state = BQS, stats_timer = StatsTimer}) -> - {RamDuration, BQS1} = BQ:ram_duration(BQS), - DesiredDuration = - rabbit_memory_monitor:report_ram_duration(self(), RamDuration), - BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), - BQS3 = BQ:handle_pre_hibernate(BQS2), + BQS1 = rabbit_amqqueue_process_utils:backing_queue_pre_hibernate(BQ, BQS), rabbit_event:if_enabled(StatsTimer, fun () -> emit_stats(State, [{idle_since, now()}]) end), State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer), - backing_queue_state = BQS3}, + backing_queue_state = BQS1}, {hibernate, stop_rate_timer(State1)}. diff --git a/src/rabbit_amqqueue_process_utils.erl b/src/rabbit_amqqueue_process_utils.erl new file mode 100644 index 0000000000..feb2a79ca2 --- /dev/null +++ b/src/rabbit_amqqueue_process_utils.erl @@ -0,0 +1,99 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 201-2011 VMware, Inc. All rights reserved. +%% + +-module(rabbit_amqqueue_process_utils). + +-define(SYNC_INTERVAL, 25). %% milliseconds +-define(RAM_DURATION_UPDATE_INTERVAL, 5000). + +-export([backing_queue_pre_hibernate/2, + ensure_sync_timer/3, stop_sync_timer/3, + ensure_rate_timer/3, stop_rate_timer/3]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-type(bq_mod() :: atom()). +-type(bq_state() :: any()). %% A good example of dialyzer's shortcomings + +-type(queue_state() :: any()). %% Another such example. +-type(getter(A) :: fun ((queue_state()) -> A)). +-type(setter(A) :: fun ((A, queue_state()) -> queue_state())). + +-type(tref() :: term()). %% Sigh. According to timer docs. + +-spec(backing_queue_pre_hibernate/2 :: (bq_mod(), bq_state()) -> bq_state()). + +-spec(ensure_sync_timer/3 :: (getter('undefined'|tref()), + setter('undefined'|tref()), + queue_state()) -> queue_state()). +-spec(stop_sync_timer/3 :: (getter('undefined'|tref()), + setter('undefined'|tref()), + queue_state()) -> queue_state()). + +-spec(ensure_rate_timer/3 :: (getter('undefined'|'just_measured'|tref()), + setter('undefined'|'just_measured'|tref()), + queue_state()) -> queue_state()). +-spec(stop_rate_timer/3 :: (getter('undefined'|'just_measured'|tref()), + setter('undefined'|'just_measured'|tref()), + queue_state()) -> queue_state()). + +-endif. + +%%---------------------------------------------------------------------------- + +backing_queue_pre_hibernate(BQ, BQS) -> + {RamDuration, BQS1} = BQ:ram_duration(BQS), + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), + BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), + BQ:handle_pre_hibernate(BQS2). + +ensure_sync_timer(Getter, Setter, State) -> + case Getter(State) of + undefined -> {ok, TRef} = timer:apply_after( + ?SYNC_INTERVAL, rabbit_amqqueue, + sync_timeout, [self()]), + Setter(TRef, State); + _TRef -> State + end. + +stop_sync_timer(Getter, Setter, State) -> + case Getter(State) of + undefined -> State; + TRef -> {ok, cancel} = timer:cancel(TRef), + Setter(undefined, State) + end. + +ensure_rate_timer(Getter, Setter, State) -> + case Getter(State) of + undefined -> {ok, TRef} = + timer:apply_after( + ?RAM_DURATION_UPDATE_INTERVAL, rabbit_amqqueue, + update_ram_duration, [self()]), + Setter(TRef, State); + just_measured -> Setter(undefined, State); + _TRef -> State + end. + +stop_rate_timer(Getter, Setter, State) -> + case Getter(State) of + undefined -> State; + just_measured -> Setter(undefined, State); + TRef -> {ok, cancel} = timer:cancel(TRef), + Setter(undefined, State) + end. diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 0ca8d260ef..0955a0804b 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -35,19 +35,18 @@ behaviour_info(callbacks) -> %% Initialise the backing queue and its state. %% %% Takes - %% 1. the queue name - %% 2. a boolean indicating whether the queue is durable - %% 3. a boolean indicating whether the queue is an existing queue + %% 1. the amqqueue record + %% 2. a boolean indicating whether the queue is an existing queue %% that should be recovered - %% 4. an asynchronous callback which accepts a function of type + %% 3. an asynchronous callback which accepts a function of type %% backing-queue-state to backing-queue-state. This callback %% function can be safely invoked from any process, which %% makes it useful for passing messages back into the backing %% queue, especially as the backing queue does not have %% control of its own mailbox. - %% 5. a synchronous callback. Same as the asynchronous callback + %% 4. a synchronous callback. Same as the asynchronous callback %% but waits for completion and returns 'error' on error. - {init, 5}, + {init, 4}, %% Called on queue shutdown when queue isn't being deleted. {terminate, 1}, @@ -61,12 +60,12 @@ behaviour_info(callbacks) -> {purge, 1}, %% Publish a message. - {publish, 3}, + {publish, 4}, %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls %% (i.e. saves the round trip through the backing queue). - {publish_delivered, 4}, + {publish_delivered, 5}, %% Return ids of messages which have been confirmed since %% the last invocation of this function (or initialisation). @@ -109,7 +108,7 @@ behaviour_info(callbacks) -> {ack, 2}, %% A publish, but in the context of a transaction. - {tx_publish, 4}, + {tx_publish, 5}, %% Acks, but in the context of a transaction. {tx_ack, 3}, @@ -165,7 +164,25 @@ behaviour_info(callbacks) -> %% Exists for debugging purposes, to be able to expose state via %% rabbitmqctl list_queues backing_queue_status - {status, 1} + {status, 1}, + + %% Passed a function to be invoked with the relevant backing + %% queue's state. Useful for when the backing queue or other + %% components need to pass functions into the backing queue. + {invoke, 3}, + + %% Called prior to a publish or publish_delivered call. Allows + %% the BQ to signal that it's already seen this message (and in + %% what capacity - i.e. was it published previously or discarded + %% previously) and thus the message should be dropped. + {is_duplicate, 3}, + + %% Called to inform the BQ about messages which have reached the + %% queue, but are not going to be further passed to BQ for some + %% reason. Note that this is may be invoked for messages for + %% which BQ:is_duplicate/2 has already returned {'published' | + %% 'discarded', BQS}. + {discard, 3} ]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 1af91f4c3a..f0eadea206 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -238,6 +238,12 @@ action(list_queues, Node, Args, Opts, Inform) -> [VHostArg, ArgAtoms]), ArgAtoms); +action(add_queue_mirror, Node, [Queue, MirrorNode], Opts, Inform) -> + Inform("Adding mirror of queue ~p on node ~p~n", [Queue, MirrorNode]), + VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), + rpc_call(Node, rabbit_mirror_queue_misc, add_slave, + [VHostArg, list_to_binary(Queue), list_to_atom(MirrorNode)]); + action(list_exchanges, Node, Args, Opts, Inform) -> Inform("Listing exchanges", []), VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)), @@ -361,6 +367,12 @@ format_info_item([{TableEntryKey, TableEntryType, _TableEntryValue} | _] = Value) when is_binary(TableEntryKey) andalso is_atom(TableEntryType) -> io_lib:format("~1000000000000p", [prettify_amqp_table(Value)]); +format_info_item([T | _] = Value) + when is_tuple(T) orelse is_pid(T) orelse is_binary(T) orelse is_atom(T) orelse + is_list(T) -> + "[" ++ + lists:nthtail(2, lists:append( + [", " ++ format_info_item(E) || E <- Value])) ++ "]"; format_info_item(Value) -> io_lib:format("~w", [Value]). diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl new file mode 100644 index 0000000000..84220a5b54 --- /dev/null +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -0,0 +1,194 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_coordinator). + +-export([start_link/2, get_gm/1]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-export([joined/2, members_changed/3, handle_msg/3]). + +-behaviour(gen_server2). +-behaviour(gm). + +-include("rabbit.hrl"). +-include("gm_specs.hrl"). + +-record(state, { q, + gm + }). + +-define(ONE_SECOND, 1000). + +%%---------------------------------------------------------------------------- +%% +%% Mirror Queues +%% +%% A queue with mirrors consists of the following: +%% +%% #amqqueue{ pid, mirror_pids } +%% | | +%% +----------+ +-------+--------------+-----------...etc... +%% | | | +%% V V V +%% amqqueue_process---+ slave-----+ slave-----+ ...etc... +%% | BQ = master----+ | | BQ = vq | | BQ = vq | +%% | | BQ = vq | | +-+-------+ +-+-------+ +%% | +-+-------+ | | | +%% +-++-----|---------+ | | (some details elided) +%% || | | | +%% || coordinator-+ | | +%% || +-+---------+ | | +%% || | | | +%% || gm-+ -- -- -- -- gm-+- -- -- -- gm-+- -- --...etc... +%% || +--+ +--+ +--+ +%% || +%% consumers +%% +%% The master is merely an implementation of BQ, and thus is invoked +%% through the normal BQ interface by the amqqueue_process. The slaves +%% meanwhile are processes in their own right (as is the +%% coordinator). The coordinator and all slaves belong to the same gm +%% group. Every member of a gm group receives messages sent to the gm +%% group. Because the master is the BQ of amqqueue_process, it doesn't +%% have sole control over its mailbox, and as a result, the master +%% itself cannot be passed messages directly, yet it needs to react to +%% gm events, such as the death of slaves. Thus the master creates the +%% coordinator, and it is the coordinator that is the gm callback +%% module and event handler for the master. +%% +%% Consumers are only attached to the master. Thus the master is +%% responsible for informing all slaves when messages are fetched from +%% the BQ, when they're acked, and when they're requeued. +%% +%% The basic goal is to ensure that all slaves performs actions on +%% their BQ in the same order as the master. Thus the master +%% intercepts all events going to its BQ, and suitably broadcasts +%% these events on the gm. The slaves thus receive two streams of +%% events: one stream is via the gm, and one stream is from channels +%% directly. Note that whilst the stream via gm is guaranteed to be +%% consistently seen by all slaves, the same is not true of the stream +%% via channels. For example, in the event of an unexpected death of a +%% channel during a publish, only some of the mirrors may receive that +%% publish. As a result of this problem, the messages broadcast over +%% the gm contain published content, and thus slaves can operate +%% successfully on messages that they only receive via the gm. The key +%% purpose of also sending messages directly from the channels to the +%% slaves is that without this, in the event of the death of the +%% master, messages can be lost until a suitable slave is promoted. +%% +%% However, there are other reasons as well. For example, if confirms +%% are in use, then there is no guarantee that every slave will see +%% the delivery with the same msg_seq_no. As a result, the slaves have +%% to wait until they've seen both the publish via gm, and the publish +%% via the channel before they have enough information to be able to +%% issue the confirm, if necessary. Either form of publish can arrive +%% first, and a slave can be upgraded to the master at any point +%% during this process. Confirms continue to be issued correctly, +%% however. +%% +%% Because the slave is a full process, it impersonates parts of the +%% amqqueue API. However, it does not need to implement all parts: for +%% example, no ack or consumer-related message can arrive directly at +%% a slave from a channel: it is only publishes that pass both +%% directly to the slaves and go via gm. +%% +%%---------------------------------------------------------------------------- + +start_link(Queue, GM) -> + gen_server2:start_link(?MODULE, [Queue, GM], []). + +get_gm(CPid) -> + gen_server2:call(CPid, get_gm, infinity). + +%% --------------------------------------------------------------------------- +%% gen_server +%% --------------------------------------------------------------------------- + +init([#amqqueue { name = QueueName } = Q, GM]) -> + GM1 = case GM of + undefined -> + ok = gm:create_tables(), + {ok, GM2} = gm:start_link(QueueName, ?MODULE, [self()]), + receive {joined, GM2, _Members} -> + ok + end, + GM2; + _ -> + true = link(GM), + GM + end, + {ok, _TRef} = + timer:apply_interval(?ONE_SECOND, gm, broadcast, [GM1, heartbeat]), + {ok, #state { q = Q, gm = GM1 }, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call(get_gm, _From, State = #state { gm = GM }) -> + reply(GM, State). + +handle_cast({gm_deaths, Deaths}, + State = #state { q = #amqqueue { name = QueueName } }) -> + rabbit_log:info("Master ~p saw deaths ~p for ~s~n", + [self(), Deaths, rabbit_misc:rs(QueueName)]), + case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of + {ok, Pid} when node(Pid) =:= node() -> + noreply(State); + {error, not_found} -> + {stop, normal, State} + end. + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +terminate(_Reason, #state{}) -> + %% gen_server case + ok; +terminate([_CPid], _Reason) -> + %% gm case + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% --------------------------------------------------------------------------- +%% GM +%% --------------------------------------------------------------------------- + +joined([CPid], Members) -> + CPid ! {joined, self(), Members}, + ok. + +members_changed([_CPid], _Births, []) -> + ok; +members_changed([CPid], _Births, Deaths) -> + ok = gen_server2:cast(CPid, {gm_deaths, Deaths}). + +handle_msg([_CPid], _From, heartbeat) -> + ok; +handle_msg([_CPid], _From, _Msg) -> + ok. + +%% --------------------------------------------------------------------------- +%% Others +%% --------------------------------------------------------------------------- + +noreply(State) -> + {noreply, State, hibernate}. + +reply(Reply, State) -> + {reply, Reply, State, hibernate}. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl new file mode 100644 index 0000000000..e6a71370ae --- /dev/null +++ b/src/rabbit_mirror_queue_master.erl @@ -0,0 +1,383 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_master). + +-export([init/4, terminate/1, delete_and_terminate/1, + purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, + tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, + requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/2, + set_ram_duration_target/2, ram_duration/1, + needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, + status/1, invoke/3, is_duplicate/3, discard/3]). + +-export([start/1, stop/0]). + +-export([promote_backing_queue_state/5]). + +-behaviour(rabbit_backing_queue). + +-include("rabbit.hrl"). + +-record(state, { gm, + coordinator, + backing_queue, + backing_queue_state, + set_delivered, + seen_status, + confirmed, + ack_msg_id + }). + +%% For general documentation of HA design, see +%% rabbit_mirror_queue_coordinator +%% +%% Some notes on transactions +%% +%% We don't support transactions on mirror queues. To do so is +%% challenging. The underlying bq is free to add the contents of the +%% txn to the queue proper at any point after the tx.commit comes in +%% but before the tx.commit-ok goes out. This means that it is not +%% safe for all mirrors to simply issue the BQ:tx_commit at the same +%% time, as the addition of the txn's contents to the queue may +%% subsequently be inconsistently interwoven with other actions on the +%% BQ. The solution to this is, in the master, wrap the PostCommitFun +%% and do the gm:broadcast in there: at that point, you're in the BQ +%% (well, there's actually nothing to stop that function being invoked +%% by some other process, but let's pretend for now: you could always +%% use run_backing_queue_async to ensure you really are in the queue +%% process), the gm:broadcast is safe because you don't have to worry +%% about races with other gm:broadcast calls (same process). Thus this +%% signal would indicate sufficiently to all the slaves that they must +%% insert the complete contents of the txn at precisely this point in +%% the stream of events. +%% +%% However, it's quite difficult for the slaves to make that happen: +%% they would be forced to issue the tx_commit at that point, but then +%% stall processing any further instructions from gm until they +%% receive the notification from their bq that the tx_commit has fully +%% completed (i.e. they need to treat what is an async system as being +%% fully synchronous). This is not too bad (apart from the +%% vomit-inducing notion of it all): just need a queue of instructions +%% from the GM; but then it gets rather worse when you consider what +%% needs to happen if the master dies at this point and the slave in +%% the middle of this tx_commit needs to be promoted. +%% +%% Finally, we can't possibly hope to make transactions atomic across +%% mirror queues, and it's not even clear that that's desirable: if a +%% slave fails whilst there's an open transaction in progress then +%% when the channel comes to commit the txn, it will detect the +%% failure and destroy the channel. However, the txn will have +%% actually committed successfully in all the other mirrors (including +%% master). To do this bit properly would require 2PC and all the +%% baggage that goes with that. + +%% --------------------------------------------------------------------------- +%% Backing queue +%% --------------------------------------------------------------------------- + +start(_DurableQueues) -> + %% This will never get called as this module will never be + %% installed as the default BQ implementation. + exit({not_valid_for_generic_backing_queue, ?MODULE}). + +stop() -> + %% Same as start/1. + exit({not_valid_for_generic_backing_queue, ?MODULE}). + +init(#amqqueue { arguments = Args, name = QName } = Q, Recover, + AsyncCallback, SyncCallback) -> + {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, undefined), + GM = rabbit_mirror_queue_coordinator:get_gm(CPid), + {_Type, Nodes} = rabbit_misc:table_lookup(Args, <<"x-mirror">>), + Nodes1 = case Nodes of + [] -> nodes(); + _ -> [list_to_atom(binary_to_list(Node)) || + {longstr, Node} <- Nodes] + end, + [rabbit_mirror_queue_misc:add_slave(QName, Node) || Node <- Nodes1], + {ok, BQ} = application:get_env(backing_queue_module), + BQS = BQ:init(Q, Recover, AsyncCallback, SyncCallback), + #state { gm = GM, + coordinator = CPid, + backing_queue = BQ, + backing_queue_state = BQS, + set_delivered = 0, + seen_status = dict:new(), + confirmed = [], + ack_msg_id = dict:new() }. + +promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus) -> + #state { gm = GM, + coordinator = CPid, + backing_queue = BQ, + backing_queue_state = BQS, + set_delivered = BQ:len(BQS), + seen_status = SeenStatus, + confirmed = [], + ack_msg_id = dict:new() }. + +terminate(State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> + %% Backing queue termination. The queue is going down but + %% shouldn't be deleted. Most likely safe shutdown of this + %% node. Thus just let some other slave take over. + State #state { backing_queue_state = BQ:terminate(BQS) }. + +delete_and_terminate(State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + ok = gm:broadcast(GM, delete_and_terminate), + State #state { backing_queue_state = BQ:delete_and_terminate(BQS), + set_delivered = 0 }. + +purge(State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + ok = gm:broadcast(GM, {set_length, 0}), + {Count, BQS1} = BQ:purge(BQS), + {Count, State #state { backing_queue_state = BQS1, + set_delivered = 0 }}. + +publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid, + State = #state { gm = GM, + seen_status = SS, + backing_queue = BQ, + backing_queue_state = BQS }) -> + false = dict:is_key(MsgId, SS), %% ASSERTION + ok = gm:broadcast(GM, {publish, false, ChPid, MsgProps, Msg}), + BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + State #state { backing_queue_state = BQS1 }. + +publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, + ChPid, State = #state { gm = GM, + seen_status = SS, + backing_queue = BQ, + backing_queue_state = BQS, + ack_msg_id = AM }) -> + false = dict:is_key(MsgId, SS), %% ASSERTION + %% Must use confirmed_broadcast here in order to guarantee that + %% all slaves are forced to interpret this publish_delivered at + %% the same point, especially if we die and a slave is promoted. + ok = gm:confirmed_broadcast( + GM, {publish, {true, AckRequired}, ChPid, MsgProps, Msg}), + {AckTag, BQS1} = + BQ:publish_delivered(AckRequired, Msg, MsgProps, ChPid, BQS), + AM1 = maybe_store_acktag(AckTag, MsgId, AM), + {AckTag, State #state { backing_queue_state = BQS1, + ack_msg_id = AM1 }}. + +dropwhile(Fun, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + set_delivered = SetDelivered }) -> + Len = BQ:len(BQS), + BQS1 = BQ:dropwhile(Fun, BQS), + Dropped = Len - BQ:len(BQS1), + SetDelivered1 = lists:max([0, SetDelivered - Dropped]), + ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}), + State #state { backing_queue_state = BQS1, + set_delivered = SetDelivered1 }. + +drain_confirmed(State = #state { backing_queue = BQ, + backing_queue_state = BQS, + seen_status = SS, + confirmed = Confirmed }) -> + {MsgIds, BQS1} = BQ:drain_confirmed(BQS), + {MsgIds1, SS1} = + lists:foldl( + fun (MsgId, {MsgIdsN, SSN}) -> + %% We will never see 'discarded' here + case dict:find(MsgId, SSN) of + error -> + {[MsgId | MsgIdsN], SSN}; + {ok, published} -> + %% It was published when we were a slave, + %% and we were promoted before we saw the + %% publish from the channel. We still + %% haven't seen the channel publish, and + %% consequently we need to filter out the + %% confirm here. We will issue the confirm + %% when we see the publish from the channel. + {MsgIdsN, dict:store(MsgId, confirmed, SSN)}; + {ok, confirmed} -> + %% Well, confirms are racy by definition. + {[MsgId | MsgIdsN], SSN} + end + end, {[], SS}, MsgIds), + {Confirmed ++ MsgIds1, State #state { backing_queue_state = BQS1, + seen_status = SS1, + confirmed = [] }}. + +fetch(AckRequired, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + set_delivered = SetDelivered, + ack_msg_id = AM }) -> + {Result, BQS1} = BQ:fetch(AckRequired, BQS), + State1 = State #state { backing_queue_state = BQS1 }, + case Result of + empty -> + {Result, State1}; + {#basic_message { id = MsgId } = Message, IsDelivered, AckTag, + Remaining} -> + ok = gm:broadcast(GM, {fetch, AckRequired, MsgId, Remaining}), + IsDelivered1 = IsDelivered orelse SetDelivered > 0, + SetDelivered1 = lists:max([0, SetDelivered - 1]), + AM1 = maybe_store_acktag(AckTag, MsgId, AM), + {{Message, IsDelivered1, AckTag, Remaining}, + State1 #state { set_delivered = SetDelivered1, + ack_msg_id = AM1 }} + end. + +ack(AckTags, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + ack_msg_id = AM }) -> + {MsgIds, BQS1} = BQ:ack(AckTags, BQS), + AM1 = lists:foldl(fun dict:erase/2, AM, AckTags), + case MsgIds of + [] -> ok; + _ -> ok = gm:broadcast(GM, {ack, MsgIds}) + end, + {MsgIds, State #state { backing_queue_state = BQS1, + ack_msg_id = AM1 }}. + +tx_publish(_Txn, _Msg, _MsgProps, _ChPid, State) -> + %% We don't support txns in mirror queues + State. + +tx_ack(_Txn, _AckTags, State) -> + %% We don't support txns in mirror queues + State. + +tx_rollback(_Txn, State) -> + {[], State}. + +tx_commit(_Txn, PostCommitFun, _MsgPropsFun, State) -> + PostCommitFun(), %% Probably must run it to avoid deadlocks + {[], State}. + +requeue(AckTags, MsgPropsFun, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + {MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS), + ok = gm:broadcast(GM, {requeue, MsgPropsFun, MsgIds}), + {MsgIds, State #state { backing_queue_state = BQS1 }}. + +len(#state { backing_queue = BQ, backing_queue_state = BQS}) -> + BQ:len(BQS). + +is_empty(#state { backing_queue = BQ, backing_queue_state = BQS}) -> + BQ:is_empty(BQS). + +set_ram_duration_target(Target, State = #state { backing_queue = BQ, + backing_queue_state = BQS}) -> + State #state { backing_queue_state = + BQ:set_ram_duration_target(Target, BQS) }. + +ram_duration(State = #state { backing_queue = BQ, backing_queue_state = BQS}) -> + {Result, BQS1} = BQ:ram_duration(BQS), + {Result, State #state { backing_queue_state = BQS1 }}. + +needs_idle_timeout(#state { backing_queue = BQ, backing_queue_state = BQS}) -> + BQ:needs_idle_timeout(BQS). + +idle_timeout(State = #state { backing_queue = BQ, backing_queue_state = BQS}) -> + State #state { backing_queue_state = BQ:idle_timeout(BQS) }. + +handle_pre_hibernate(State = #state { backing_queue = BQ, + backing_queue_state = BQS}) -> + State #state { backing_queue_state = BQ:handle_pre_hibernate(BQS) }. + +status(#state { backing_queue = BQ, backing_queue_state = BQS}) -> + BQ:status(BQS). + +invoke(?MODULE, Fun, State) -> + Fun(?MODULE, State); +invoke(Mod, Fun, State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }. + +is_duplicate(none, Message = #basic_message { id = MsgId }, + State = #state { seen_status = SS, + backing_queue = BQ, + backing_queue_state = BQS, + confirmed = Confirmed }) -> + %% Here, we need to deal with the possibility that we're about to + %% receive a message that we've already seen when we were a slave + %% (we received it via gm). Thus if we do receive such message now + %% via the channel, there may be a confirm waiting to issue for + %% it. + + %% We will never see {published, ChPid, MsgSeqNo} here. + case dict:find(MsgId, SS) of + error -> + %% We permit the underlying BQ to have a peek at it, but + %% only if we ourselves are not filtering out the msg. + {Result, BQS1} = BQ:validate_message(Message, BQS), + {Result, State #state { backing_queue_state = BQS1 }}; + {ok, published} -> + %% It already got published when we were a slave and no + %% confirmation is waiting. amqqueue_process will have, in + %% its msg_id_to_channel mapping, the entry for dealing + %% with the confirm when that comes back in (it's added + %% immediately after calling is_duplicate). The msg is + %% invalid. We will not see this again, nor will we be + %% further involved in confirming this message, so erase. + {published, State #state { seen_status = dict:erase(MsgId, SS) }}; + {ok, confirmed} -> + %% It got published when we were a slave via gm, and + %% confirmed some time after that (maybe even after + %% promotion), but before we received the publish from the + %% channel, so couldn't previously know what the + %% msg_seq_no was (and thus confirm as a slave). So we + %% need to confirm now. As above, amqqueue_process will + %% have the entry for the msg_id_to_channel mapping added + %% immediately after calling is_duplicate/2. + {published, State #state { seen_status = dict:erase(MsgId, SS), + confirmed = [MsgId | Confirmed] }}; + {ok, discarded} -> + %% Don't erase from SS here because discard/2 is about to + %% be called and we need to be able to detect this case + {discarded, State} + end; +is_duplicate(_Txn, _Msg, State) -> + %% In a transaction. We don't support txns in mirror queues. But + %% it's probably not a duplicate... + {false, State}. + +discard(Msg = #basic_message { id = MsgId }, ChPid, + State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + seen_status = SS }) -> + %% It's a massive error if we get told to discard something that's + %% already been published or published-and-confirmed. To do that + %% would require non FIFO access. Hence we should not find + %% 'published' or 'confirmed' in this dict:find. + case dict:find(MsgId, SS) of + error -> + ok = gm:broadcast(GM, {discard, ChPid, Msg}), + State #state { backing_queue_state = BQ:discard(Msg, ChPid, BQS), + seen_status = dict:erase(MsgId, SS) }; + {ok, discarded} -> + State + end. + +maybe_store_acktag(undefined, _MsgId, AM) -> + AM; +maybe_store_acktag(AckTag, MsgId, AM) -> + dict:store(AckTag, MsgId, AM). diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl new file mode 100644 index 0000000000..bf341c74e8 --- /dev/null +++ b/src/rabbit_mirror_queue_misc.erl @@ -0,0 +1,101 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_misc). + +-export([remove_from_queue/2, add_slave/2, add_slave/3, on_node_up/0]). + +-include("rabbit.hrl"). + +remove_from_queue(QueueName, DeadPids) -> + DeadNodes = [node(DeadPid) || DeadPid <- DeadPids], + rabbit_misc:execute_mnesia_transaction( + fun () -> + %% Someone else could have deleted the queue before we + %% get here. + case mnesia:read({rabbit_queue, QueueName}) of + [] -> {error, not_found}; + [Q = #amqqueue { pid = QPid, + mirror_pids = MPids }] -> + [QPid1 | MPids1] = + [Pid || Pid <- [QPid | MPids], + not lists:member(node(Pid), DeadNodes)], + case {{QPid, MPids}, {QPid1, MPids1}} of + {Same, Same} -> + {ok, QPid}; + _ -> + Q1 = Q #amqqueue { pid = QPid1, + mirror_pids = MPids1 }, + ok = rabbit_amqqueue:store_queue(Q1), + {ok, QPid1} + end + end + end). + +add_slave(VHostPath, QueueName, MirrorNode) -> + add_slave(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). + +add_slave(Queue, MirrorNode) -> + rabbit_amqqueue:with( + Queue, + fun (#amqqueue { arguments = Args, name = Name, + pid = QPid, mirror_pids = MPids } = Q) -> + case rabbit_misc:table_lookup(Args, <<"x-mirror">>) of + undefined -> + ok; + _ -> + case [MirrorNode || Pid <- [QPid | MPids], + node(Pid) =:= MirrorNode] of + [] -> + Result = + rabbit_mirror_queue_slave_sup:start_child( + MirrorNode, [Q]), + rabbit_log:info("Adding slave node for ~s: ~p~n", + [rabbit_misc:rs(Name), Result]), + case Result of + {ok, _Pid} -> ok; + _ -> Result + end; + [_] -> + {error, queue_already_mirrored_on_node} + end + end + end). + +on_node_up() -> + Qs = + rabbit_misc:execute_mnesia_transaction( + fun () -> + mnesia:foldl( + fun (#amqqueue{ arguments = Args, name = QName }, QsN) -> + case rabbit_misc:table_lookup( + Args, <<"x-mirror">>) of + {_Type, []} -> + [QName | QsN]; + {_Type, Nodes} -> + Nodes1 = [list_to_atom(binary_to_list(Node)) + || {longstr, Node} <- Nodes], + case lists:member(node(), Nodes1) of + true -> [QName | QsN]; + false -> QsN + end; + _ -> + QsN + end + end, [], rabbit_queue) + end), + [add_slave(Q, node()) || Q <- Qs], + ok. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl new file mode 100644 index 0000000000..89b8971cf1 --- /dev/null +++ b/src/rabbit_mirror_queue_slave.erl @@ -0,0 +1,737 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_slave). + +%% For general documentation of HA design, see +%% rabbit_mirror_queue_coordinator +%% +%% We join the GM group before we add ourselves to the amqqueue +%% record. As a result: +%% 1. We can receive msgs from GM that correspond to messages we will +%% never receive from publishers. +%% 2. When we receive a message from publishers, we must receive a +%% message from the GM group for it. +%% 3. However, that instruction from the GM group can arrive either +%% before or after the actual message. We need to be able to +%% distinguish between GM instructions arriving early, and case (1) +%% above. +%% +%% All instructions from the GM group must be processed in the order +%% in which they're received. + +-export([start_link/1, set_maximum_since_use/2]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3, handle_pre_hibernate/1, prioritise_call/3, + prioritise_cast/2]). + +-export([joined/2, members_changed/3, handle_msg/3]). + +-behaviour(gen_server2). +-behaviour(gm). + +-include("rabbit.hrl"). +-include("gm_specs.hrl"). + +-record(state, { q, + gm, + master_node, + backing_queue, + backing_queue_state, + sync_timer_ref, + rate_timer_ref, + + sender_queues, %% :: Pid -> MsgQ + msg_id_ack, %% :: MsgId -> AckTag + + msg_id_status + }). + +start_link(Q) -> + gen_server2:start_link(?MODULE, [Q], []). + +set_maximum_since_use(QPid, Age) -> + gen_server2:cast(QPid, {set_maximum_since_use, Age}). + +init([#amqqueue { name = QueueName } = Q]) -> + process_flag(trap_exit, true), %% amqqueue_process traps exits too. + ok = gm:create_tables(), + {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), + receive {joined, GM} -> + ok + end, + Self = self(), + Node = node(), + {ok, MPid} = + rabbit_misc:execute_mnesia_transaction( + fun () -> + [Q1 = #amqqueue { pid = QPid, mirror_pids = MPids }] = + mnesia:read({rabbit_queue, QueueName}), + %% ASSERTION + [] = [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node], + MPids1 = MPids ++ [Self], + mnesia:write(rabbit_queue, + Q1 #amqqueue { mirror_pids = MPids1 }, + write), + {ok, QPid} + end), + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, [self()]), + ok = rabbit_memory_monitor:register( + self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), + {ok, BQ} = application:get_env(backing_queue_module), + BQS = bq_init(BQ, Q, false), + {ok, #state { q = Q, + gm = GM, + master_node = node(MPid), + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = undefined, + sync_timer_ref = undefined, + + sender_queues = dict:new(), + msg_id_ack = dict:new(), + msg_id_status = dict:new() + }, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + +handle_call({deliver_immediately, Delivery = #delivery {}}, From, State) -> + %% Synchronous, "immediate" delivery mode + + %% It is safe to reply 'false' here even if a) we've not seen the + %% msg via gm, or b) the master dies before we receive the msg via + %% gm. In the case of (a), we will eventually receive the msg via + %% gm, and it's only the master's result to the channel that is + %% important. In the case of (b), if the master does die and we do + %% get promoted then at that point we have no consumers, thus + %% 'false' is precisely the correct answer. However, we must be + %% careful to _not_ enqueue the message in this case. + + %% Note this is distinct from the case where we receive the msg + %% via gm first, then we're promoted to master, and only then do + %% we receive the msg from the channel. + gen_server2:reply(From, false), %% master may deliver it, not us + noreply(maybe_enqueue_message(Delivery, false, State)); + +handle_call({deliver, Delivery = #delivery {}}, From, State) -> + %% Synchronous, "mandatory" delivery mode + gen_server2:reply(From, true), %% amqqueue throws away the result anyway + noreply(maybe_enqueue_message(Delivery, true, State)); + +handle_call({gm_deaths, Deaths}, From, + State = #state { q = #amqqueue { name = QueueName }, + gm = GM, + master_node = MNode }) -> + rabbit_log:info("Slave ~p saw deaths ~p for ~s~n", + [self(), Deaths, rabbit_misc:rs(QueueName)]), + %% The GM has told us about deaths, which means we're not going to + %% receive any more messages from GM + case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of + {ok, Pid} when node(Pid) =:= MNode -> + reply(ok, State); + {ok, Pid} when node(Pid) =:= node() -> + promote_me(From, State); + {ok, Pid} -> + gen_server2:reply(From, ok), + ok = gm:broadcast(GM, heartbeat), + noreply(State #state { master_node = node(Pid) }); + {error, not_found} -> + gen_server2:reply(From, ok), + {stop, normal, State} + end; + +handle_call({run_backing_queue, Mod, Fun}, _From, State) -> + reply(ok, run_backing_queue(Mod, Fun, State)); + +handle_call({commit, _Txn, _ChPid}, _From, State) -> + %% We don't support transactions in mirror queues + reply(ok, State). + +handle_cast({run_backing_queue, Mod, Fun}, State) -> + noreply(run_backing_queue(Mod, Fun, State)); + +handle_cast({gm, Instruction}, State) -> + handle_process_result(process_instruction(Instruction, State)); + +handle_cast({deliver, Delivery = #delivery {}}, State) -> + %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. + noreply(maybe_enqueue_message(Delivery, true, State)); + +handle_cast({set_maximum_since_use, Age}, State) -> + ok = file_handle_cache:set_maximum_since_use(Age), + noreply(State); + +handle_cast({set_ram_duration_target, Duration}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + BQS1 = BQ:set_ram_duration_target(Duration, BQS), + noreply(State #state { backing_queue_state = BQS1 }); + +handle_cast(update_ram_duration, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + {RamDuration, BQS1} = BQ:ram_duration(BQS), + DesiredDuration = + rabbit_memory_monitor:report_ram_duration(self(), RamDuration), + BQS2 = BQ:set_ram_duration_target(DesiredDuration, BQS1), + noreply(State #state { rate_timer_ref = just_measured, + backing_queue_state = BQS2 }); + +handle_cast(sync_timeout, State) -> + noreply(backing_queue_idle_timeout( + State #state { sync_timer_ref = undefined })); + +handle_cast({rollback, _Txn, _ChPid}, State) -> + %% We don't support transactions in mirror queues + noreply(State). + +handle_info(timeout, State) -> + noreply(backing_queue_idle_timeout(State)); + +handle_info(Msg, State) -> + {stop, {unexpected_info, Msg}, State}. + +%% If the Reason is shutdown, or {shutdown, _}, it is not the queue +%% being deleted: it's just the node going down. Even though we're a +%% slave, we have no idea whether or not we'll be the only copy coming +%% back up. Thus we must assume we will be, and preserve anything we +%% have on disk. +terminate(_Reason, #state { backing_queue_state = undefined }) -> + %% We've received a delete_and_terminate from gm, thus nothing to + %% do here. + ok; +terminate(Reason, #state { q = Q, + gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = RateTRef }) -> + ok = gm:leave(GM), + QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( + Q, BQ, BQS, RateTRef, [], [], dict:new()), + rabbit_amqqueue_process:terminate(Reason, QueueState); +terminate([_SPid], _Reason) -> + %% gm case + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +handle_pre_hibernate(State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + BQS1 = rabbit_amqqueue_process_utils:backing_queue_pre_hibernate(BQ, BQS), + {hibernate, stop_rate_timer(State #state { backing_queue_state = BQS1 })}. + +prioritise_call(Msg, _From, _State) -> + case Msg of + {run_backing_queue, _Mod, _Fun} -> 6; + {gm_deaths, _Deaths} -> 5; + _ -> 0 + end. + +prioritise_cast(Msg, _State) -> + case Msg of + update_ram_duration -> 8; + {set_ram_duration_target, _Duration} -> 8; + {set_maximum_since_use, _Age} -> 8; + {run_backing_queue, _Mod, _Fun} -> 6; + sync_timeout -> 6; + {gm, _Msg} -> 5; + {post_commit, _Txn, _AckTags} -> 4; + _ -> 0 + end. + +%% --------------------------------------------------------------------------- +%% GM +%% --------------------------------------------------------------------------- + +joined([SPid], _Members) -> + SPid ! {joined, self()}, + ok. + +members_changed([_SPid], _Births, []) -> + ok; +members_changed([SPid], _Births, Deaths) -> + rabbit_misc:with_exit_handler( + fun () -> {stop, normal} end, + fun () -> + case gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) of + ok -> + ok; + {promote, CPid} -> + {become, rabbit_mirror_queue_coordinator, [CPid]} + end + end). + +handle_msg([_SPid], _From, heartbeat) -> + ok; +handle_msg([SPid], _From, Msg) -> + ok = gen_server2:cast(SPid, {gm, Msg}). + +%% --------------------------------------------------------------------------- +%% Others +%% --------------------------------------------------------------------------- + +bq_init(BQ, Q, Recover) -> + Self = self(), + BQ:init(Q, Recover, + fun (Mod, Fun) -> + rabbit_amqqueue:run_backing_queue_async(Self, Mod, Fun) + end, + fun (Mod, Fun) -> + rabbit_misc:with_exit_handler( + fun () -> error end, + fun () -> + rabbit_amqqueue:run_backing_queue(Self, Mod, Fun) + end) + end). + +run_backing_queue(rabbit_mirror_queue_master, Fun, State) -> + %% Yes, this might look a little crazy, but see comments around + %% process_instruction({tx_commit,...}, State). + Fun(rabbit_mirror_queue_master, State); +run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }. + +needs_confirming(#delivery{ msg_seq_no = undefined }, _State) -> + never; +needs_confirming(#delivery { message = #basic_message { + is_persistent = true } }, + #state { q = #amqqueue { durable = true } }) -> + eventually; +needs_confirming(_Delivery, _State) -> + immediately. + +confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> + {MS1, CMs} = + lists:foldl( + fun (MsgId, {MSN, CMsN} = Acc) -> + %% We will never see 'discarded' here + case dict:find(MsgId, MSN) of + error -> + %% If it needed confirming, it'll have + %% already been done. + Acc; + {ok, {published, ChPid}} -> + %% Still not seen it from the channel, just + %% record that it's been confirmed. + {dict:store(MsgId, {confirmed, ChPid}, MSN), CMsN}; + {ok, {published, ChPid, MsgSeqNo}} -> + %% Seen from both GM and Channel. Can now + %% confirm. + {dict:erase(MsgId, MSN), + gb_trees_cons(ChPid, MsgSeqNo, CMsN)}; + {ok, {confirmed, _ChPid}} -> + %% It's already been confirmed. This is + %% probably it's been both sync'd to disk + %% and then delivered and ack'd before we've + %% seen the publish from the + %% channel. Nothing to do here. + Acc + end + end, {MS, gb_trees:empty()}, MsgIds), + gb_trees:map(fun (ChPid, MsgSeqNos) -> + ok = rabbit_channel:confirm(ChPid, MsgSeqNos) + end, CMs), + State #state { msg_id_status = MS1 }. + +gb_trees_cons(Key, Value, Tree) -> + case gb_trees:lookup(Key, Tree) of + {value, Values} -> gb_trees:update(Key, [Value | Values], Tree); + none -> gb_trees:insert(Key, [Value], Tree) + end. + +handle_process_result({ok, State}) -> noreply(State); +handle_process_result({stop, State}) -> {stop, normal, State}. + +promote_me(From, #state { q = Q, + gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + rate_timer_ref = RateTRef, + sender_queues = SQ, + msg_id_ack = MA, + msg_id_status = MS }) -> + rabbit_log:info("Promoting slave ~p for ~s~n", + [self(), rabbit_misc:rs(Q #amqqueue.name)]), + {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(Q, GM), + true = unlink(GM), + gen_server2:reply(From, {promote, CPid}), + ok = gm:confirmed_broadcast(GM, heartbeat), + + %% We find all the messages that we've received from channels but + %% not from gm, and if they're due to be enqueued on promotion + %% then we pass them to the + %% queue_process:init_with_backing_queue_state to be enqueued. + %% + %% We also have to requeue messages which are pending acks: the + %% consumers from the master queue have been lost and so these + %% messages need requeuing. They might also be pending + %% confirmation, and indeed they might also be pending arrival of + %% the publication from the channel itself, if we received both + %% the publication and the fetch via gm first! Requeuing doesn't + %% affect confirmations: if the message was previously pending a + %% confirmation then it still will be, under the same msg_id. So + %% as a master, we need to be prepared to filter out the + %% publication of said messages from the channel (is_duplicate + %% (thus such requeued messages must remain in the msg_id_status + %% (MS) which becomes seen_status (SS) in the master)). + %% + %% Then there are messages we already have in the queue, which are + %% not currently pending acknowledgement: + %% 1. Messages we've only received via gm: + %% Filter out subsequent publication from channel through + %% validate_message. Might have to issue confirms then or + %% later, thus queue_process state will have to know that + %% there's a pending confirm. + %% 2. Messages received via both gm and channel: + %% Queue will have to deal with issuing confirms if necessary. + %% + %% MS contains the following three entry types: + %% + %% a) {published, ChPid}: + %% published via gm only; pending arrival of publication from + %% channel, maybe pending confirm. + %% + %% b) {published, ChPid, MsgSeqNo}: + %% published via gm and channel; pending confirm. + %% + %% c) {confirmed, ChPid}: + %% published via gm only, and confirmed; pending publication + %% from channel. + %% + %% d) discarded + %% seen via gm only as discarded. Pending publication from + %% channel + %% + %% The forms a, c and d only, need to go to the master state + %% seen_status (SS). + %% + %% The form b only, needs to go through to the queue_process + %% state to form the msg_id_to_channel mapping (MTC). + %% + %% No messages that are enqueued from SQ at this point will have + %% entries in MS. + %% + %% Messages that are extracted from MA may have entries in MS, and + %% those messages are then requeued. However, as discussed above, + %% this does not affect MS, nor which bits go through to SS in + %% Master, or MTC in queue_process. + %% + %% Everything that's in MA gets requeued. Consequently the new + %% master should start with a fresh AM as there are no messages + %% pending acks (txns will have been rolled back). + + MSList = dict:to_list(MS), + SS = dict:from_list( + [E || E = {_MsgId, discarded} <- MSList] ++ + [{MsgId, Status} + || {MsgId, {Status, _ChPid}} <- MSList, + Status =:= published orelse Status =:= confirmed]), + + MasterState = rabbit_mirror_queue_master:promote_backing_queue_state( + CPid, BQ, BQS, GM, SS), + + MTC = dict:from_list( + [{MsgId, {ChPid, MsgSeqNo}} || + {MsgId, {published, ChPid, MsgSeqNo}} <- dict:to_list(MS)]), + AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)], + Deliveries = [Delivery || {_ChPid, PubQ} <- dict:to_list(SQ), + {Delivery, true} <- queue:to_list(PubQ)], + QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( + Q, rabbit_mirror_queue_master, MasterState, RateTRef, + AckTags, Deliveries, MTC), + {become, rabbit_amqqueue_process, QueueState, hibernate}. + +noreply(State) -> + {NewState, Timeout} = next_state(State), + {noreply, NewState, Timeout}. + +reply(Reply, State) -> + {NewState, Timeout} = next_state(State), + {reply, Reply, NewState, Timeout}. + +next_state(State = #state{backing_queue = BQ, backing_queue_state = BQS}) -> + {MsgIds, BQS1} = BQ:drain_confirmed(BQS), + State1 = ensure_rate_timer( + confirm_messages(MsgIds, State #state { + backing_queue_state = BQS1 })), + case BQ:needs_idle_timeout(BQS1) of + true -> {ensure_sync_timer(State1), 0}; + false -> {stop_sync_timer(State1), hibernate} + end. + +backing_queue_idle_timeout(State = #state { backing_queue = BQ }) -> + run_backing_queue(BQ, fun (M, BQS) -> M:idle_timeout(BQS) end, State). + +ensure_sync_timer(State) -> + rabbit_amqqueue_process_utils:ensure_sync_timer( + fun sync_timer_getter/1, fun sync_timer_setter/2, State). + +stop_sync_timer(State) -> + rabbit_amqqueue_process_utils:stop_sync_timer( + fun sync_timer_getter/1, fun sync_timer_setter/2, State). + +sync_timer_getter(State) -> State#state.sync_timer_ref. +sync_timer_setter(Timer, State) -> State#state{sync_timer_ref = Timer}. + +ensure_rate_timer(State) -> + rabbit_amqqueue_process_utils:ensure_rate_timer( + fun rate_timer_getter/1, fun rate_timer_setter/2, State). + +stop_rate_timer(State) -> + rabbit_amqqueue_process_utils:stop_rate_timer( + fun rate_timer_getter/1, fun rate_timer_setter/2, State). + +rate_timer_getter(State) -> State#state.rate_timer_ref. +rate_timer_setter(Timer, State) -> State#state{rate_timer_ref = Timer}. + +maybe_enqueue_message( + Delivery = #delivery { message = #basic_message { id = MsgId }, + msg_seq_no = MsgSeqNo, + sender = ChPid, + txn = none }, + EnqueueOnPromotion, + State = #state { sender_queues = SQ, + msg_id_status = MS }) -> + %% We will never see {published, ChPid, MsgSeqNo} here. + case dict:find(MsgId, MS) of + error -> + MQ = case dict:find(ChPid, SQ) of + {ok, MQ1} -> MQ1; + error -> queue:new() + end, + SQ1 = dict:store(ChPid, + queue:in({Delivery, EnqueueOnPromotion}, MQ), SQ), + State #state { sender_queues = SQ1 }; + {ok, {confirmed, ChPid}} -> + %% BQ has confirmed it but we didn't know what the + %% msg_seq_no was at the time. We do now! + ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + State #state { msg_id_status = dict:erase(MsgId, MS) }; + {ok, {published, ChPid}} -> + %% It was published to the BQ and we didn't know the + %% msg_seq_no so couldn't confirm it at the time. + case needs_confirming(Delivery, State) of + never -> + State #state { msg_id_status = dict:erase(MsgId, MS) }; + eventually -> + State #state { + msg_id_status = + dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) }; + immediately -> + ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + State #state { msg_id_status = dict:erase(MsgId, MS) } + end; + {ok, discarded} -> + %% We've already heard from GM that the msg is to be + %% discarded. We won't see this again. + State #state { msg_id_status = dict:erase(MsgId, MS) } + end; +maybe_enqueue_message(_Delivery, _EnqueueOnPromotion, State) -> + %% We don't support txns in mirror queues. + State. + +process_instruction( + {publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId }}, + State = #state { sender_queues = SQ, + backing_queue = BQ, + backing_queue_state = BQS, + msg_id_ack = MA, + msg_id_status = MS }) -> + + %% We really are going to do the publish right now, even though we + %% may not have seen it directly from the channel. As a result, we + %% may know that it needs confirming without knowing its + %% msg_seq_no, which means that we can see the confirmation come + %% back from the backing queue without knowing the msg_seq_no, + %% which means that we're going to have to hang on to the fact + %% that we've seen the msg_id confirmed until we can associate it + %% with a msg_seq_no. + MS1 = dict:store(MsgId, {published, ChPid}, MS), + {SQ1, MS2} = + case dict:find(ChPid, SQ) of + error -> + {SQ, MS1}; + {ok, MQ} -> + case queue:out(MQ) of + {empty, _MQ} -> + {SQ, MS1}; + {{value, {Delivery = #delivery { + msg_seq_no = MsgSeqNo, + message = #basic_message { id = MsgId } }, + _EnqueueOnPromotion}}, MQ1} -> + %% We received the msg from the channel + %% first. Thus we need to deal with confirms + %% here. + {dict:store(ChPid, MQ1, SQ), + case needs_confirming(Delivery, State) of + never -> + MS; + eventually -> + dict:store( + MsgId, {published, ChPid, MsgSeqNo}, MS); + immediately -> + ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + MS + end}; + {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ1} -> + %% The instruction was sent to us before we + %% were within the mirror_pids within the + %% #amqqueue{} record. We'll never receive the + %% message directly from the channel. And the + %% channel will not be expecting any confirms + %% from us. + {SQ, MS} + end + end, + + State1 = State #state { sender_queues = SQ1, + msg_id_status = MS2 }, + + {ok, + case Deliver of + false -> + BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS), + State1 #state { backing_queue_state = BQS1 }; + {true, AckRequired} -> + {AckTag, BQS1} = BQ:publish_delivered(AckRequired, Msg, MsgProps, + ChPid, BQS), + MA1 = case AckRequired of + true -> dict:store(MsgId, AckTag, MA); + false -> MA + end, + State1 #state { backing_queue_state = BQS1, + msg_id_ack = MA1 } + end}; +process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, + State = #state { sender_queues = SQ, + backing_queue = BQ, + backing_queue_state = BQS, + msg_id_status = MS }) -> + %% Many of the comments around the publish head above apply here + %% too. + MS1 = dict:store(MsgId, discarded, MS), + {SQ1, MS2} = + case dict:find(ChPid, SQ) of + error -> + {SQ, MS1}; + {ok, MQ} -> + case queue:out(MQ) of + {empty, _MQ} -> + {SQ, MS1}; + {{value, {#delivery { + message = #basic_message { id = MsgId } }, + _EnqueueOnPromotion}}, MQ1} -> + %% We've already seen it from the channel, + %% we're not going to see this again, so don't + %% add it to MS + {dict:store(ChPid, MQ1, SQ), MS}; + {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ1} -> + %% The instruction was sent to us before we + %% were within the mirror_pids within the + %% #amqqueue{} record. We'll never receive the + %% message directly from the channel. + {SQ, MS} + end + end, + BQS1 = BQ:discard(Msg, ChPid, BQS), + {ok, State #state { sender_queues = SQ1, + msg_id_status = MS2, + backing_queue_state = BQS1 }}; +process_instruction({set_length, Length}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + QLen = BQ:len(BQS), + ToDrop = QLen - Length, + {ok, case ToDrop > 0 of + true -> BQS1 = + lists:foldl( + fun (const, BQSN) -> + {{_Msg, _IsDelivered, _AckTag, _Remaining}, + BQSN1} = BQ:fetch(false, BQSN), + BQSN1 + end, BQS, lists:duplicate(ToDrop, const)), + State #state { backing_queue_state = BQS1 }; + false -> State + end}; +process_instruction({fetch, AckRequired, MsgId, Remaining}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS, + msg_id_ack = MA }) -> + QLen = BQ:len(BQS), + {ok, case QLen - 1 of + Remaining -> + {{_Msg, _IsDelivered, AckTag, Remaining}, BQS1} = + BQ:fetch(AckRequired, BQS), + MA1 = case AckRequired of + true -> dict:store(MsgId, AckTag, MA); + false -> MA + end, + State #state { backing_queue_state = BQS1, + msg_id_ack = MA1 }; + Other when Other < Remaining -> + %% we must be shorter than the master + State + end}; +process_instruction({ack, MsgIds}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS, + msg_id_ack = MA }) -> + {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), + {MsgIds1, BQS1} = BQ:ack(AckTags, BQS), + [] = MsgIds1 -- MsgIds, %% ASSERTION + {ok, State #state { msg_id_ack = MA1, + backing_queue_state = BQS1 }}; +process_instruction({requeue, MsgPropsFun, MsgIds}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS, + msg_id_ack = MA }) -> + {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), + {ok, case length(AckTags) =:= length(MsgIds) of + true -> + {MsgIds, BQS1} = BQ:requeue(AckTags, MsgPropsFun, BQS), + State #state { msg_id_ack = MA1, + backing_queue_state = BQS1 }; + false -> + %% The only thing we can safely do is nuke out our BQ + %% and MA. The interaction between this and confirms + %% doesn't really bear thinking about... + {_Count, BQS1} = BQ:purge(BQS), + {_MsgIds, BQS2} = ack_all(BQ, MA, BQS1), + State #state { msg_id_ack = dict:new(), + backing_queue_state = BQS2 } + end}; +process_instruction(delete_and_terminate, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + BQ:delete_and_terminate(BQS), + {stop, State #state { backing_queue_state = undefined }}. + +msg_ids_to_acktags(MsgIds, MA) -> + {AckTags, MA1} = + lists:foldl( + fun (MsgId, {Acc, MAN}) -> + case dict:find(MsgId, MA) of + error -> {Acc, MAN}; + {ok, AckTag} -> {[AckTag | Acc], dict:erase(MsgId, MAN)} + end + end, {[], MA}, MsgIds), + {lists:reverse(AckTags), MA1}. + +ack_all(BQ, MA, BQS) -> + BQ:ack([AckTag || {_MsgId, AckTag} <- dict:to_list(MA)], BQS). diff --git a/src/rabbit_mirror_queue_slave_sup.erl b/src/rabbit_mirror_queue_slave_sup.erl new file mode 100644 index 0000000000..2fb3be5196 --- /dev/null +++ b/src/rabbit_mirror_queue_slave_sup.erl @@ -0,0 +1,60 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2010 VMware, Inc. All rights reserved. +%% + +-module(rabbit_mirror_queue_slave_sup). + +-rabbit_boot_step({mirror_queue_slave_sup, + [{description, "mirror queue slave sup"}, + {mfa, {rabbit_mirror_queue_slave_sup, start, []}}, + {requires, queue_sup_queue_recovery}, + {enables, routing_ready}]}). + +-rabbit_boot_step({mirrored_queues, + [{description, "adding mirrors to queues"}, + {mfa, {rabbit_mirror_queue_misc, on_node_up, []}}, + {requires, mirror_queue_slave_sup}, + {enables, routing_ready}]}). + +-behaviour(supervisor2). + +-export([start/0, start_link/0, start_child/2]). + +-export([init/1]). + +-include_lib("rabbit.hrl"). + +-define(SERVER, ?MODULE). + +start() -> + {ok, _} = + supervisor:start_child( + rabbit_sup, + {rabbit_mirror_queue_slave_sup, + {rabbit_mirror_queue_slave_sup, start_link, []}, + transient, infinity, supervisor, [rabbit_mirror_queue_slave_sup]}), + ok. + +start_link() -> + supervisor2:start_link({local, ?SERVER}, ?MODULE, []). + +start_child(Node, Args) -> + supervisor2:start_child({?SERVER, Node}, Args). + +init([]) -> + {ok, {{simple_one_for_one_terminate, 10, 10}, + [{rabbit_mirror_queue_slave, + {rabbit_mirror_queue_slave, start_link, []}, + temporary, ?MAX_WAIT, worker, [rabbit_mirror_queue_slave]}]}}. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index fbcf07ae77..f2f31ef374 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -228,7 +228,8 @@ table_definitions() -> {rabbit_queue, [{record_name, amqqueue}, {attributes, record_info(fields, amqqueue)}, - {match, #amqqueue{name = queue_name_match(), _='_'}}]}]. + {match, #amqqueue{name = queue_name_match(), _='_'}}]}] + ++ gm:table_definitions(). binding_match() -> #binding{source = exchange_name_match(), diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index f6a1c92fcc..4f68356440 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -113,7 +113,9 @@ check_delivery(_ , _ , {_ , Qs}) -> {routed, Qs}. lookup_qpids(QNames) -> lists:foldl(fun (QName, QPids) -> case mnesia:dirty_read({rabbit_queue, QName}) of - [#amqqueue{pid = QPid}] -> [QPid | QPids]; - [] -> QPids + [#amqqueue{pid = QPid, mirror_pids = MPids}] -> + MPids ++ [QPid | QPids]; + [] -> + QPids end end, [], QNames). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 384929846d..524e8e6edd 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2072,9 +2072,9 @@ test_queue_index() -> passed. -variable_queue_init(QName, IsDurable, Recover) -> - rabbit_variable_queue:init(QName, IsDurable, Recover, - fun nop/1, fun nop/1, fun nop/2, fun nop/1). +variable_queue_init(Q, Recover) -> + rabbit_variable_queue:init( + Q, Recover, fun nop/1, fun nop/1, fun nop/2, fun nop/1). variable_queue_publish(IsPersistent, Count, VQ) -> lists:foldl( @@ -2086,7 +2086,7 @@ variable_queue_publish(IsPersistent, Count, VQ) -> true -> 2; false -> 1 end}, <<>>), - #message_properties{}, VQN) + #message_properties{}, self(), VQN) end, VQ, lists:seq(1, Count)). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> @@ -2104,9 +2104,13 @@ assert_prop(List, Prop, Value) -> assert_props(List, PropVals) -> [assert_prop(List, Prop, Value) || {Prop, Value} <- PropVals]. +test_amqqueue(Durable) -> + (rabbit_amqqueue:pseudo_queue(test_queue(), self())) + #amqqueue { durable = Durable }. + with_fresh_variable_queue(Fun) -> ok = empty_test_queue(), - VQ = variable_queue_init(test_queue(), true, false), + VQ = variable_queue_init(test_amqqueue(true), false), S0 = rabbit_variable_queue:status(VQ), assert_props(S0, [{q1, 0}, {q2, 0}, {delta, {delta, undefined, 0, undefined}}, @@ -2164,7 +2168,7 @@ test_dropwhile(VQ0) -> rabbit_basic:message( rabbit_misc:r(<<>>, exchange, <<>>), <<>>, #'P_basic'{}, <<>>), - #message_properties{expiry = N}, VQN) + #message_properties{expiry = N}, self(), VQN) end, VQ0, lists:seq(1, Count)), %% drop the first 5 messages @@ -2208,7 +2212,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% drain {VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7), - VQ9 = rabbit_variable_queue:ack(AckTags, VQ8), + {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8), {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -2218,7 +2222,7 @@ publish_fetch_and_ack(0, _Len, VQ0) -> publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), - VQ3 = rabbit_variable_queue:ack([AckTag], VQ2), + {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2), publish_fetch_and_ack(N-1, Len, VQ3). test_variable_queue_partial_segments_delta_thing(VQ0) -> @@ -2252,7 +2256,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> {len, HalfSegment + 1}]), {VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false, HalfSegment + 1, VQ7), - VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), + {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), %% should be empty now {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -2281,7 +2285,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false, Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = variable_queue_init(test_queue(), true, true), + VQ7 = variable_queue_init(test_amqqueue(true), true), {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), VQ9 = variable_queue_publish(false, 1, VQ8), @@ -2294,17 +2298,18 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), VQ2 = variable_queue_publish(false, 4, VQ1), {VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2), - VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), + {_Guids, VQ4} = + rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = variable_queue_init(test_queue(), true, true), + VQ7 = variable_queue_init(test_amqqueue(true), true), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. test_queue_recover() -> Count = 2 * rabbit_queue_index:next_segment_boundary(0), TxID = rabbit_guid:guid(), - {new, #amqqueue { pid = QPid, name = QName }} = + {new, #amqqueue { pid = QPid, name = QName } = Q} = rabbit_amqqueue:declare(test_queue(), true, false, [], none), [begin Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>), @@ -2328,7 +2333,7 @@ test_queue_recover() -> {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), - VQ1 = variable_queue_init(QName, true, true), + VQ1 = variable_queue_init(Q, true), {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 1f0f8bbeac..aa174e96ab 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -124,7 +124,8 @@ auto_delete :: boolean(), exclusive_owner :: rabbit_types:maybe(pid()), arguments :: rabbit_framing:amqp_table(), - pid :: rabbit_types:maybe(pid())}). + pid :: rabbit_types:maybe(pid()), + mirror_pids :: [pid()]}). -type(exchange() :: #exchange{name :: rabbit_exchange:name(), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ff7252fd68..7a3c17a29c 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -16,18 +16,19 @@ -module(rabbit_variable_queue). --export([init/5, terminate/1, delete_and_terminate/1, - purge/1, publish/3, publish_delivered/4, drain_confirmed/1, - fetch/2, ack/2, tx_publish/4, tx_ack/3, tx_rollback/2, tx_commit/4, +-export([init/4, terminate/1, delete_and_terminate/1, + purge/1, publish/4, publish_delivered/5, drain_confirmed/1, + fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, dropwhile/2, set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, idle_timeout/1, handle_pre_hibernate/1, - status/1, multiple_routing_keys/0]). + status/1, invoke/3, is_duplicate/3, discard/3, + multiple_routing_keys/0]). -export([start/1, stop/0]). %% exported for testing only --export([start_msg_store/2, stop_msg_store/0, init/7]). +-export([start_msg_store/2, stop_msg_store/0, init/6]). %%---------------------------------------------------------------------------- %% Definitions: @@ -408,15 +409,15 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). -init(QueueName, IsDurable, Recover, AsyncCallback, SyncCallback) -> - init(QueueName, IsDurable, Recover, AsyncCallback, SyncCallback, +init(Queue, Recover, AsyncCallback, SyncCallback) -> + init(Queue, Recover, AsyncCallback, SyncCallback, fun (MsgIds, ActionTaken) -> msgs_written_to_disk(AsyncCallback, MsgIds, ActionTaken) end, fun (MsgIds) -> msg_indices_written_to_disk(AsyncCallback, MsgIds) end). -init(QueueName, IsDurable, false, AsyncCallback, SyncCallback, - MsgOnDiskFun, MsgIdxOnDiskFun) -> +init(#amqqueue { name = QueueName, durable = IsDurable }, false, + AsyncCallback, SyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), init(IsDurable, IndexState, 0, [], AsyncCallback, SyncCallback, case IsDurable of @@ -426,8 +427,8 @@ init(QueueName, IsDurable, false, AsyncCallback, SyncCallback, end, msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback)); -init(QueueName, true, true, AsyncCallback, SyncCallback, - MsgOnDiskFun, MsgIdxOnDiskFun) -> +init(#amqqueue { name = QueueName, durable = true }, true, + AsyncCallback, SyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun) -> Terms = rabbit_queue_index:shutdown_terms(QueueName), {PRef, TRef, Terms1} = case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of @@ -517,13 +518,14 @@ purge(State = #vqstate { q4 = Q4, ram_index_count = 0, persistent_count = PCount1 })}. -publish(Msg, MsgProps, State) -> +publish(Msg, MsgProps, _ChPid, State) -> {_SeqId, State1} = publish(Msg, MsgProps, false, false, State), a(reduce_memory_use(State1)). publish_delivered(false, #basic_message { id = MsgId }, #message_properties { needs_confirming = NeedsConfirming }, - State = #vqstate { async_callback = Callback, len = 0 }) -> + _ChPid, State = #vqstate { async_callback = Callback, + len = 0 }) -> case NeedsConfirming of true -> blind_confirm(Callback, gb_sets:singleton(MsgId)); false -> ok @@ -533,13 +535,13 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, - State = #vqstate { len = 0, - next_seq_id = SeqId, - out_counter = OutCount, - in_counter = InCount, - persistent_count = PCount, - durable = IsDurable, - unconfirmed = UC }) -> + _ChPid, State = #vqstate { len = 0, + next_seq_id = SeqId, + out_counter = OutCount, + in_counter = InCount, + persistent_count = PCount, + durable = IsDurable, + unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = true }, @@ -665,13 +667,14 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { persistent_count = PCount1 })}. ack(AckTags, State) -> - a(ack(fun msg_store_remove/3, - fun (_, State0) -> State0 end, - AckTags, State)). + {MsgIds, State1} = ack(fun msg_store_remove/3, + fun (_, State0) -> State0 end, + AckTags, State), + {MsgIds, a(State1)}. tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps, - State = #vqstate { durable = IsDurable, - msg_store_clients = MSCState }) -> + _ChPid, State = #vqstate { durable = IsDurable, + msg_store_clients = MSCState }) -> Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }), case IsPersistent andalso IsDurable of @@ -727,7 +730,7 @@ requeue(AckTags, MsgPropsFun, State) -> (MsgPropsFun(MsgProps)) #message_properties { needs_confirming = false } end, - a(reduce_memory_use( + {MsgIds, State1} = ack(fun (_, _, _) -> ok end, fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) -> {_SeqId, State2} = publish(Msg, MsgPropsFun1(MsgProps), @@ -742,7 +745,8 @@ requeue(AckTags, MsgPropsFun, State) -> true, true, State2), State3 end, - AckTags, State))). + AckTags, State), + {MsgIds, a(reduce_memory_use(State1))}. len(#vqstate { len = Len }) -> Len. @@ -880,6 +884,13 @@ status(#vqstate { {avg_ack_ingress_rate, AvgAckIngressRate}, {avg_ack_egress_rate , AvgAckEgressRate} ]. +invoke(?MODULE, Fun, State) -> + Fun(?MODULE, State). + +is_duplicate(_Txn, _Msg, State) -> {false, State}. + +discard(_Msg, _ChPid, State) -> State. + %%---------------------------------------------------------------------------- %% Minor helpers %%---------------------------------------------------------------------------- @@ -954,8 +965,8 @@ msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) -> msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) -> CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE), - rabbit_msg_store:client_init( - MsgStore, Ref, MsgOnDiskFun, fun () -> Callback(CloseFDsFun) end). + rabbit_msg_store:client_init(MsgStore, Ref, MsgOnDiskFun, + fun () -> Callback(?MODULE, CloseFDsFun) end). msg_store_write(MSCState, IsPersistent, MsgId, Msg) -> with_immutable_msg_store_state( @@ -983,7 +994,7 @@ msg_store_close_fds(MSCState, IsPersistent) -> fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end). msg_store_close_fds_fun(IsPersistent) -> - fun (State = #vqstate { msg_store_clients = MSCState }) -> + fun (?MODULE, State = #vqstate { msg_store_clients = MSCState }) -> {ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent), State #vqstate { msg_store_clients = MSCState1 } end. @@ -1129,7 +1140,8 @@ blank_rate(Timestamp, IngressLength) -> msg_store_callback(PersistentMsgIds, Pubs, AckTags, Fun, MsgPropsFun, AsyncCallback, SyncCallback) -> - case SyncCallback(fun (StateN) -> + case SyncCallback(?MODULE, + fun (?MODULE, StateN) -> tx_commit_post_msg_store(true, Pubs, AckTags, Fun, MsgPropsFun, StateN) end) of @@ -1192,20 +1204,21 @@ tx_commit_index(State = #vqstate { on_sync = #sync { Acks = lists:append(SAcks), Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs), {Msg, MsgProps} <- lists:reverse(PubsN)], - {SeqIds, State1 = #vqstate { index_state = IndexState }} = + {_MsgIds, State1} = ack(Acks, State), + {SeqIds, State2 = #vqstate { index_state = IndexState }} = lists:foldl( fun ({Msg = #basic_message { is_persistent = IsPersistent }, MsgProps}, - {SeqIdsAcc, State2}) -> + {SeqIdsAcc, State3}) -> IsPersistent1 = IsDurable andalso IsPersistent, - {SeqId, State3} = - publish(Msg, MsgProps, false, IsPersistent1, State2), - {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} - end, {PAcks, ack(Acks, State)}, Pubs), + {SeqId, State4} = + publish(Msg, MsgProps, false, IsPersistent1, State3), + {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State4} + end, {PAcks, State1}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), [ Fun() || Fun <- lists:reverse(SFuns) ], reduce_memory_use( - State1 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }). + State2 #vqstate { index_state = IndexState1, on_sync = ?BLANK_SYNC }). purge_betas_and_deltas(LensByStore, State = #vqstate { q3 = Q3, @@ -1352,7 +1365,7 @@ remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, index_state = IndexState, msg_store_clients = MSCState }) -> - {PersistentSeqIds, MsgIdsByStore} = + {PersistentSeqIds, MsgIdsByStore, _AllMsgIds} = dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA), State1 = State #vqstate { pending_ack = dict:new(), ram_ack_index = gb_trees:empty() }, @@ -1371,9 +1384,9 @@ remove_pending_ack(KeepPersistent, end. ack(_MsgStoreFun, _Fun, [], State) -> - State; + {[], State}; ack(MsgStoreFun, Fun, AckTags, State) -> - {{PersistentSeqIds, MsgIdsByStore}, + {{PersistentSeqIds, MsgIdsByStore, AllMsgIds}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, persistent_count = PCount, @@ -1393,21 +1406,24 @@ ack(MsgStoreFun, Fun, AckTags, State) -> || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len( orddict:new(), MsgIdsByStore)), - State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1, - ack_out_counter = AckOutCount + length(AckTags) }. + {lists:reverse(AllMsgIds), + State1 #vqstate { index_state = IndexState1, + persistent_count = PCount1, + ack_out_counter = AckOutCount + length(AckTags) }}. -accumulate_ack_init() -> {[], orddict:new()}. +accumulate_ack_init() -> {[], orddict:new(), []}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, - index_on_disk = false }, - {PersistentSeqIdsAcc, MsgIdsByStore}) -> - {PersistentSeqIdsAcc, MsgIdsByStore}; + index_on_disk = false, + msg_id = MsgId }, + {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> + {PersistentSeqIdsAcc, MsgIdsByStore, [MsgId | AllMsgIds]}; accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps}, - {PersistentSeqIdsAcc, MsgIdsByStore}) -> + {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> {cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc), - rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore)}. + rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore), + [MsgId | AllMsgIds]}. find_persistent_count(LensByStore) -> case orddict:find(true, LensByStore) of @@ -1451,14 +1467,16 @@ needs_index_sync(#vqstate { msg_indices_on_disk = MIOD, not (gb_sets:is_empty(UC) orelse gb_sets:is_subset(UC, MIOD)). blind_confirm(Callback, MsgIdSet) -> - Callback(fun (State) -> record_confirms(MsgIdSet, State) end). + Callback(?MODULE, + fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end). msgs_written_to_disk(Callback, MsgIdSet, removed) -> blind_confirm(Callback, MsgIdSet); msgs_written_to_disk(Callback, MsgIdSet, written) -> - Callback(fun (State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> + Callback(?MODULE, + fun (?MODULE, State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> Confirmed = gb_sets:intersection(UC, MsgIdSet), record_confirms(gb_sets:intersection(MsgIdSet, MIOD), State #vqstate { @@ -1467,9 +1485,10 @@ msgs_written_to_disk(Callback, MsgIdSet, written) -> end). msg_indices_written_to_disk(Callback, MsgIdSet) -> - Callback(fun (State = #vqstate { msgs_on_disk = MOD, - msg_indices_on_disk = MIOD, - unconfirmed = UC }) -> + Callback(?MODULE, + fun (?MODULE, State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> Confirmed = gb_sets:intersection(UC, MsgIdSet), record_confirms(gb_sets:intersection(MsgIdSet, MOD), State #vqstate { |
