diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 47 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 72 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 36 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 4 | ||||
| -rw-r--r-- | src/supervisor2.erl | 115 | ||||
| -rw-r--r-- | src/test_sup.erl | 94 |
11 files changed, 346 insertions, 89 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index df9474439f..6bf2f6db28 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -35,7 +35,7 @@ -export([internal_declare/2, internal_delete/1, maybe_run_queue_via_backing_queue/2, update_ram_duration/1, set_ram_duration_target/2, - set_maximum_since_use/2]). + set_maximum_since_use/2, maybe_expire/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, @@ -55,6 +55,8 @@ -include("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). +-define(EXPIRES_TYPE, long). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -83,8 +85,8 @@ -spec(with_or_die/2 :: (name(), qfun(A)) -> A). -spec(assert_equivalence/5 :: (rabbit_types:amqqueue(), boolean(), boolean(), - rabbit_framing:amqp_table(), rabbit_types:maybe(pid)) - -> ok). + rabbit_framing:amqp_table(), rabbit_types:maybe(pid())) + -> 'ok' | no_return()). -spec(check_exclusive_access/2 :: (rabbit_types:amqqueue(), pid()) -> 'ok'). -spec(with_exclusive_access_or_die/3 :: (name(), pid(), qfun(A)) -> A). -spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:amqqueue()]). @@ -146,6 +148,7 @@ -spec(update_ram_duration/1 :: (pid()) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). +-spec(maybe_expire/1 :: (pid()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> rabbit_types:amqqueue()). @@ -186,6 +189,7 @@ recover_durable_queues(DurableQueues) -> [Q || Q <- Qs, gen_server2:call(Q#amqqueue.pid, {init, true}) == Q]. declare(QueueName, Durable, AutoDelete, Args, Owner) -> + ok = check_declare_arguments(QueueName, Args), Q = start_queue_process(#amqqueue{name = QueueName, durable = Durable, auto_delete = AutoDelete, @@ -253,11 +257,13 @@ with(Name, F) -> with_or_die(Name, F) -> with(Name, F, fun () -> rabbit_misc:not_found(Name) end). -assert_equivalence(#amqqueue{durable = Durable, auto_delete = AutoDelete} = Q, - Durable, AutoDelete, _Args, Owner) -> +assert_equivalence(#amqqueue{durable = Durable, + auto_delete = AutoDelete} = Q, + Durable, AutoDelete, RequiredArgs, Owner) -> + assert_args_equivalence(Q, RequiredArgs), check_exclusive_access(Q, Owner, strict); assert_equivalence(#amqqueue{name = QueueName}, - _Durable, _AutoDelete, _Args, _Owner) -> + _Durable, _AutoDelete, _RequiredArgs, _Owner) -> rabbit_misc:protocol_error( not_allowed, "parameters for ~s not equivalent", [rabbit_misc:rs(QueueName)]). @@ -278,6 +284,32 @@ with_exclusive_access_or_die(Name, ReaderPid, F) -> with_or_die(Name, fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end). +assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, + RequiredArgs) -> + rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName, + [<<"x-expires">>]). + +check_declare_arguments(QueueName, Args) -> + [case Fun(rabbit_misc:table_lookup(Args, Key)) of + ok -> ok; + {error, Error} -> rabbit_misc:protocol_error( + precondition_failed, + "Invalid arguments in declaration of queue ~s: " + "~w (on argument: ~w)", + [rabbit_misc:rs(QueueName), Error, Key]) + end || {Key, Fun} <- [{<<"x-expires">>, fun check_expires_argument/1}]], + ok. + +check_expires_argument(undefined) -> + ok; +check_expires_argument({?EXPIRES_TYPE, Expires}) + when is_integer(Expires) andalso Expires > 0 -> + ok; +check_expires_argument({?EXPIRES_TYPE, _Expires}) -> + {error, expires_zero_or_less}; +check_expires_argument(_) -> + {error, expires_not_of_type_long}. + list(VHostPath) -> mnesia:dirty_match_object( rabbit_queue, @@ -418,6 +450,9 @@ set_ram_duration_target(QPid, Duration) -> set_maximum_since_use(QPid, Age) -> gen_server2:pcast(QPid, 8, {set_maximum_since_use, Age}). +maybe_expire(QPid) -> + gen_server2:pcast(QPid, 8, maybe_expire). + on_node_down(Node) -> [Hook() || Hook <- rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 468a41b206..67f0fcf5ae 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -56,8 +56,10 @@ backing_queue_state, active_consumers, blocked_consumers, + expires, sync_timer_ref, - rate_timer_ref + rate_timer_ref, + expiry_timer_ref }). -record(consumer, {tag, ack_required}). @@ -102,15 +104,17 @@ init(Q) -> process_flag(trap_exit, true), {ok, BQ} = application:get_env(backing_queue_module), - {ok, #q{q = Q#amqqueue{pid = self()}, - exclusive_consumer = none, - has_had_consumers = false, - backing_queue = BQ, + {ok, #q{q = Q#amqqueue{pid = self()}, + exclusive_consumer = none, + has_had_consumers = false, + backing_queue = BQ, backing_queue_state = undefined, - active_consumers = queue:new(), - blocked_consumers = queue:new(), - sync_timer_ref = undefined, - rate_timer_ref = undefined}, hibernate, + active_consumers = queue:new(), + blocked_consumers = queue:new(), + expires = undefined, + sync_timer_ref = undefined, + rate_timer_ref = undefined, + expiry_timer_ref = undefined}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(shutdown, State = #q{backing_queue = BQ}) -> @@ -132,6 +136,12 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- +init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) -> + case rabbit_misc:table_lookup(Arguments, <<"x-expires">>) of + {long, Expires} -> ensure_expiry_timer(State#q{expires = Expires}); + undefined -> State + end. + declare(Recover, From, State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable}, backing_queue = BQ, backing_queue_state = undefined}) -> @@ -145,7 +155,7 @@ declare(Recover, From, self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), BQS = BQ:init(QName, IsDurable, Recover), - noreply(State#q{backing_queue_state = BQS}); + noreply(init_expires(State#q{backing_queue_state = BQS})); Q1 -> {stop, normal, {existing, Q1}, State} end. @@ -218,6 +228,27 @@ stop_rate_timer(State = #q{rate_timer_ref = TRef}) -> {ok, cancel} = timer:cancel(TRef), State#q{rate_timer_ref = undefined}. +stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) -> + State; +stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) -> + {ok, cancel} = timer:cancel(TRef), + State#q{expiry_timer_ref = undefined}. + +%% We only wish to expire where there are no consumers *and* when +%% basic.get hasn't been called for the configured period. +ensure_expiry_timer(State = #q{expires = undefined}) -> + State; +ensure_expiry_timer(State = #q{expires = Expires}) -> + case is_unused(State) of + true -> + NewState = stop_expiry_timer(State), + {ok, TRef} = timer:apply_after( + Expires, rabbit_amqqueue, maybe_expire, [self()]), + NewState#q{expiry_timer_ref = TRef}; + false -> + State + end. + assert_invariant(#q{active_consumers = AC, backing_queue = BQ, backing_queue_state = BQS}) -> true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)). @@ -439,7 +470,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> _ -> rollback_transaction(Txn, ChPid, State1) end, - {ok, requeue_and_run(sets:to_list(ChAckTags), State2)} + {ok, requeue_and_run(sets:to_list(ChAckTags), + ensure_expiry_timer(State2))} end end. @@ -610,8 +642,9 @@ handle_call({basic_get, ChPid, NoAck}, _From, State = #q{q = #amqqueue{name = QName}, backing_queue_state = BQS, backing_queue = BQ}) -> AckRequired = not NoAck, + State1 = ensure_expiry_timer(State), case BQ:fetch(AckRequired, BQS) of - {empty, BQS1} -> reply(empty, State#q{backing_queue_state = BQS1}); + {empty, BQS1} -> reply(empty, State1#q{backing_queue_state = BQS1}); {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> case AckRequired of true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), @@ -620,7 +653,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, false -> ok end, Msg = {QName, self(), AckTag, IsDelivered, Message}, - reply({ok, Remaining, Msg}, State#q{backing_queue_state = BQS1}) + reply({ok, Remaining, Msg}, State1#q{backing_queue_state = BQS1}) end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, @@ -687,7 +720,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, ChPid, ConsumerTag, State#q.blocked_consumers)}, case should_auto_delete(NewState) of - false -> reply(ok, NewState); + false -> reply(ok, ensure_expiry_timer(NewState)); true -> {stop, normal, ok, NewState} end end; @@ -747,7 +780,7 @@ handle_cast({ack, Txn, AckTags, ChPid}, _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)} end, store_ch_record(C1), - noreply(State #q { backing_queue_state = BQS1 }) + noreply(State#q{backing_queue_state = BQS1}) end; handle_cast({rollback, Txn, ChPid}, State) -> @@ -801,7 +834,14 @@ handle_cast({set_ram_duration_target, Duration}, handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), - noreply(State). + noreply(State); + +handle_cast(maybe_expire, State) -> + case is_unused(State) of + true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]), + {stop, normal, State}; + false -> noreply(ensure_expiry_timer(State)) + end. handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State = #q{q = #amqqueue{exclusive_owner = DownPid}}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 5757d9f369..373cb6907d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -986,7 +986,7 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> end; {empty, _} -> rabbit_misc:protocol_error( - not_found, "unknown delivery tag ~w", [DeliveryTag]) + precondition_failed, "unknown delivery tag ~w", [DeliveryTag]) end. add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) -> diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index f04341628c..7f7622b255 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -75,9 +75,10 @@ -spec(assert_equivalence/5 :: (rabbit_types:exchange(), atom(), boolean(), boolean(), rabbit_framing:amqp_table()) - -> 'ok'). + -> 'ok' | no_return()). -spec(assert_args_equivalence/2 :: - (rabbit_types:exchange(), rabbit_framing:amqp_table()) -> 'ok'). + (rabbit_types:exchange(), rabbit_framing:amqp_table()) -> + 'ok' | no_return()). -spec(lookup/1 :: (name()) -> rabbit_types:ok(rabbit_types:exchange()) | rabbit_types:error('not_found')). @@ -217,9 +218,8 @@ check_type(TypeBin) -> assert_equivalence(X = #exchange{ durable = Durable, auto_delete = AutoDelete, type = Type}, - Type, Durable, AutoDelete, - RequiredArgs) -> - ok = (type_to_module(Type)):assert_args_equivalence(X, RequiredArgs); + Type, Durable, AutoDelete, RequiredArgs) -> + (type_to_module(Type)):assert_args_equivalence(X, RequiredArgs); assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete, _Args) -> rabbit_misc:protocol_error( @@ -227,23 +227,14 @@ assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete, "cannot redeclare ~s with different type, durable or autodelete value", [rabbit_misc:rs(Name)]). -alternate_exchange_value(Args) -> - lists:keysearch(<<"alternate-exchange">>, 1, Args). - assert_args_equivalence(#exchange{ name = Name, arguments = Args }, RequiredArgs) -> %% The spec says "Arguments are compared for semantic %% equivalence". The only arg we care about is %% "alternate-exchange". - Ae1 = alternate_exchange_value(RequiredArgs), - Ae2 = alternate_exchange_value(Args), - if Ae1==Ae2 -> ok; - true -> rabbit_misc:protocol_error( - not_allowed, - "cannot redeclare ~s with inequivalent args", - [rabbit_misc:rs(Name)]) - end. + rabbit_misc:assert_args_equivalence(Args, RequiredArgs, Name, + [<<"alternate-exchange">>]). lookup(Name) -> rabbit_misc:dirty_read({rabbit_exchange, Name}). diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index a0a5ba589d..050b499f0f 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -38,9 +38,10 @@ -export([method_record_type/1, polite_pause/0, polite_pause/1]). -export([die/1, frame_error/2, amqp_error/4, protocol_error/3, protocol_error/4, protocol_error/1]). --export([not_found/1]). +-export([not_found/1, assert_args_equivalence/4]). -export([get_config/1, get_config/2, set_config/2]). -export([dirty_read/1]). +-export([table_lookup/2]). -export([r/3, r/2, r_arg/4, rs/1]). -export([enable_cover/0, report_cover/0]). -export([enable_cover/1, report_cover/1]). @@ -98,12 +99,19 @@ -> no_return()). -spec(protocol_error/1 :: (rabbit_types:amqp_error()) -> no_return()). -spec(not_found/1 :: (rabbit_types:r(atom())) -> no_return()). +-spec(assert_args_equivalence/4 :: (rabbit_framing:amqp_table(), + rabbit_framing:amqp_table(), + rabbit_types:r(any()), [binary()]) -> + 'ok' | no_return()). -spec(get_config/1 :: (atom()) -> rabbit_types:ok_or_error2(any(), 'not_found')). -spec(get_config/2 :: (atom(), A) -> A). -spec(set_config/2 :: (atom(), any()) -> 'ok'). -spec(dirty_read/1 :: ({atom(), any()}) -> rabbit_types:ok_or_error2(any(), 'not_found')). +-spec(table_lookup/2 :: + (rabbit_framing:amqp_table(), binary()) + -> 'undefined' | {rabbit_framing:amqp_field_type(), any()}). -spec(r/2 :: (rabbit_types:vhost(), K) -> rabbit_types:r3(rabbit_types:vhost(), K, '_') when is_subtype(K, atom())). @@ -211,6 +219,20 @@ protocol_error(#amqp_error{} = Error) -> not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]). +assert_args_equivalence(Orig, New, Name, Keys) -> + [assert_args_equivalence1(Orig, New, Name, Key) || Key <- Keys], + ok. + +assert_args_equivalence1(Orig, New, Name, Key) -> + case {table_lookup(Orig, Key), table_lookup(New, Key)} of + {Same, Same} -> ok; + {Orig1, New1} -> protocol_error( + not_allowed, + "cannot redeclare ~s with inequivalent args for ~s: " + "required ~w, received ~w", + [rabbit_misc:rs(Name), Key, New1, Orig1]) + end. + get_config(Key) -> case dirty_read({rabbit_config, Key}) of {ok, {rabbit_config, Key, V}} -> {ok, V}; @@ -232,6 +254,12 @@ dirty_read(ReadSpec) -> [] -> {error, not_found} end. +table_lookup(Table, Key) -> + case lists:keysearch(Key, 1, Table) of + {value, {_, TypeBin, ValueBin}} -> {TypeBin, ValueBin}; + false -> undefined + end. + r(#resource{virtual_host = VHostPath}, Kind, Name) when is_binary(Name) -> #resource{virtual_host = VHostPath, kind = Kind, name = Name}; @@ -244,9 +272,9 @@ r(VHostPath, Kind) when is_binary(VHostPath) -> r_arg(#resource{virtual_host = VHostPath}, Kind, Table, Key) -> r_arg(VHostPath, Kind, Table, Key); r_arg(VHostPath, Kind, Table, Key) -> - case lists:keysearch(Key, 1, Table) of - {value, {_, longstr, NameBin}} -> r(VHostPath, Kind, NameBin); - false -> undefined + case table_lookup(Table, Key) of + {longstr, NameBin} -> r(VHostPath, Kind, NameBin); + undefined -> undefined end. rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) -> diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 689f799de9..c808499b0d 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -264,20 +264,9 @@ read_cluster_nodes_config() -> case rabbit_misc:read_term_file(FileName) of {ok, [ClusterNodes]} -> ClusterNodes; {error, enoent} -> - case application:get_env(cluster_config) of + case application:get_env(cluster_nodes) of undefined -> []; - {ok, DefaultFileName} -> - case file:consult(DefaultFileName) of - {ok, [ClusterNodes]} -> ClusterNodes; - {error, enoent} -> - error_logger:warning_msg( - "default cluster config file ~p does not exist~n", - [DefaultFileName]), - []; - {error, Reason} -> - throw({error, {cannot_read_cluster_nodes_config, - DefaultFileName, Reason}}) - end + {ok, ClusterNodes} -> ClusterNodes end; {error, Reason} -> throw({error, {cannot_read_cluster_nodes_config, diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 91b19976c6..d6b8bb2889 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -31,7 +31,7 @@ -module(rabbit_queue_index). --export([init/3, terminate/2, delete_and_terminate/1, publish/4, +-export([init/4, terminate/2, delete_and_terminate/1, publish/4, deliver/2, ack/2, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). @@ -193,7 +193,7 @@ {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})), A}). --spec(init/3 :: (rabbit_amqqueue:name(), boolean(), +-spec(init/4 :: (rabbit_amqqueue:name(), boolean(), boolean(), fun ((rabbit_guid:guid()) -> boolean())) -> {'undefined' | non_neg_integer(), [any()], qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). @@ -220,8 +220,8 @@ %% public API %%---------------------------------------------------------------------------- -init(Name, MsgStoreRecovered, ContainsCheckFun) -> - State = #qistate { dir = Dir } = blank_state(Name), +init(Name, Recover, MsgStoreRecovered, ContainsCheckFun) -> + State = #qistate { dir = Dir } = blank_state(Name, not Recover), Terms = case read_shutdown_terms(Dir) of {error, _} -> []; {ok, Terms1} -> Terms1 @@ -356,9 +356,14 @@ recover(DurableQueues) -> %% startup and shutdown %%---------------------------------------------------------------------------- -blank_state(QueueName) -> +blank_state(QueueName, EnsureFresh) -> StrName = queue_name_to_dir_name(QueueName), Dir = filename:join(queues_dir(), StrName), + ok = case EnsureFresh of + true -> false = filelib:is_file(Dir), %% is_file == is file or dir + ok; + false -> ok + end, ok = filelib:ensure_dir(filename:join(Dir, "nothing")), {ok, MaxJournal} = application:get_env(rabbit, queue_index_max_journal_entries), @@ -463,9 +468,7 @@ recover_message(false, _, no_del, RelSeq, Segment) -> add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)). queue_name_to_dir_name(Name = #resource { kind = queue }) -> - Bin = term_to_binary(Name), - Size = 8*size(Bin), - <<Num:Size>> = Bin, + <<Num:128>> = erlang:md5(term_to_binary(Name)), lists:flatten(io_lib:format("~.36B", [Num])). queues_dir() -> @@ -497,7 +500,7 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) -> queue_index_walker_reader(QueueName, Gatherer) -> State = #qistate { segments = Segments, dir = Dir } = - recover_journal(blank_state(QueueName)), + recover_journal(blank_state(QueueName, false)), [ok = segment_entries_foldr( fun (_RelSeq, {{Guid, true}, _IsDelivered, no_ack}, ok) -> gatherer:in(Gatherer, {Guid, 1}); diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 9acc58c946..b46df02de7 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -60,6 +60,7 @@ all_tests() -> passed = test_bpqueue(), passed = test_pg_local(), passed = test_unfold(), + passed = test_supervisor_delayed_restart(), passed = test_parsing(), passed = test_content_framing(), passed = test_topic_matching(), @@ -1344,6 +1345,9 @@ bad_handle_hook(_, _, _) -> extra_arg_hook(Hookname, Handler, Args, Extra1, Extra2) -> handle_hook(Hookname, Handler, {Args, Extra1, Extra2}). +test_supervisor_delayed_restart() -> + test_sup:test_supervisor_delayed_restart(). + test_backing_queue() -> case application:get_env(rabbit, backing_queue_module) of {ok, rabbit_variable_queue} -> @@ -1550,7 +1554,7 @@ test_queue() -> init_test_queue() -> rabbit_queue_index:init( - test_queue(), false, + test_queue(), true, false, fun (Guid) -> rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) end). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 92ffc511e0..0f52eee84f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -368,10 +368,10 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). -init(QueueName, IsDurable, _Recover) -> +init(QueueName, IsDurable, Recover) -> {DeltaCount, Terms, IndexState} = rabbit_queue_index:init( - QueueName, + QueueName, Recover, rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), fun (Guid) -> rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 55f16feeb1..d716108cd0 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -4,15 +4,38 @@ %% 1) the module name is supervisor2 %% %% 2) there is a new strategy called -%% simple_one_for_one_terminate. This is exactly the same as for -%% simple_one_for_one, except that children *are* explicitly -%% terminated as per the shutdown component of the child_spec. +%% simple_one_for_one_terminate. This is exactly the same as for +%% simple_one_for_one, except that children *are* explicitly +%% terminated as per the shutdown component of the child_spec. %% -%% 3) When the MaxR (intensity) == 0, errors that would otherwise be -%% reported concerning child death, or the reaching of max restart -%% intensity are elided. +%% 3) child specifications can contain, as the restart type, a tuple +%% {permanent, Delay} | {transient, Delay} where Delay >= 0. The +%% delay, in seconds, indicates what should happen if a child, upon +%% being restarted, exceeds the MaxT and MaxR parameters. Thus, if +%% a child exits, it is restarted as normal. If it exits +%% sufficiently quickly and often to exceed the boundaries set by +%% the MaxT and MaxR parameters, and a Delay is specified, then +%% rather than stopping the supervisor, the supervisor instead +%% continues and tries to start up the child again, Delay seconds +%% later. %% -%% All modifications are (C) 2010 LShift Ltd. +%% Note that you can never restart more frequently than the MaxT +%% and MaxR parameters allow: i.e. you must wait until *both* the +%% Delay has passed *and* the MaxT and MaxR parameters allow the +%% child to be restarted. +%% +%% Also note that the Delay is a *minimum*. There is no guarantee +%% that the child will be restarted within that time, especially if +%% other processes are dying and being restarted at the same time - +%% essentially we have to wait for the delay to have passed and for +%% the MaxT and MaxR parameters to permit the child to be +%% restarted. This may require waiting for longer than Delay. +%% +%% 4) When the MaxR (intensity) == 0, errors that would otherwise be +%% reported concerning child death, or the reaching of max restart +%% intensity are elided. +%% +%% All modifications are (C) 2010 Rabbit Technologies Ltd. %% %% %CopyrightBegin% %% @@ -47,6 +70,7 @@ %% Internal exports -export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3]). -export([handle_cast/2]). +-export([delayed_restart/2]). -define(DICT, dict). @@ -127,6 +151,9 @@ check_childspecs(ChildSpecs) when is_list(ChildSpecs) -> end; check_childspecs(X) -> {error, {badarg, X}}. +delayed_restart(Supervisor, RestartDetails) -> + gen_server:cast(Supervisor, {delayed_restart, RestartDetails}). + %%% --------------------------------------------------- %%% %%% Initialize the supervisor. @@ -322,6 +349,20 @@ handle_call(which_children, _From, State) -> State#state.children), {reply, Resp, State}. +handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) + when ?is_simple(State) -> + {ok, NState} = do_restart(RestartType, Reason, Child, State), + {noreply, NState}; +handle_cast({delayed_restart, {RestartType, Reason, Child}}, State) + when not (?is_simple(State)) -> + case get_child(Child#child.name, State) of + {value, Child} -> + {ok, NState} = do_restart(RestartType, Reason, Child, State), + {noreply, NState}; + _ -> + {noreply, State} + end; + %%% Hopefully cause a function-clause as there is no API function %%% that utilizes cast. handle_cast(null, State) -> @@ -487,6 +528,16 @@ restart_child(Pid, Reason, State) -> {ok, State} end. +do_restart({RestartType, Delay}, Reason, Child, State) -> + case restart1(Child, State) of + {ok, NState} -> + {ok, NState}; + {terminate, NState} -> + {ok, _TRef} = timer:apply_after( + trunc(Delay*1000), ?MODULE, delayed_restart, + [self(), {{RestartType, Delay}, Reason, Child}]), + {ok, NState} + end; do_restart(permanent, Reason, Child, State) -> report_error(child_terminated, Reason, Child, State), restart(Child, State); @@ -507,14 +558,27 @@ do_restart(temporary, Reason, Child, State) -> restart(Child, State) -> case add_restart(State) of {ok, NState} -> - restart(NState#state.strategy, Child, NState); + restart(NState#state.strategy, Child, NState, fun restart/2); {terminate, NState} -> report_error(shutdown, reached_max_restart_intensity, Child, State), {shutdown, remove_child(Child, NState)} end. -restart(Strategy, Child, State) +restart1(Child, State) -> + case add_restart(State) of + {ok, NState} -> + restart(NState#state.strategy, Child, NState, fun restart1/2); + {terminate, _NState} -> + %% we've reached the max restart intensity, but the + %% add_restart will have added to the restarts + %% field. Given we don't want to die here, we need to go + %% back to the old restarts field otherwise we'll never + %% attempt to restart later. + {terminate, State} + end. + +restart(Strategy, Child, State, Restart) when Strategy =:= simple_one_for_one orelse Strategy =:= simple_one_for_one_terminate -> #child{mfa = {M, F, A}} = Child, @@ -528,9 +592,9 @@ restart(Strategy, Child, State) {ok, NState}; {error, Error} -> report_error(start_error, Error, Child, State#state.name), - restart(Child, State) + Restart(Child, State) end; -restart(one_for_one, Child, State) -> +restart(one_for_one, Child, State, Restart) -> case do_start_child(State#state.name, Child) of {ok, Pid} -> NState = replace_child(Child#child{pid = Pid}, State), @@ -540,25 +604,25 @@ restart(one_for_one, Child, State) -> {ok, NState}; {error, Reason} -> report_error(start_error, Reason, Child, State#state.name), - restart(Child, State) + Restart(Child, State) end; -restart(rest_for_one, Child, State) -> +restart(rest_for_one, Child, State, Restart) -> {ChAfter, ChBefore} = split_child(Child#child.pid, State#state.children), ChAfter2 = terminate_children(ChAfter, State#state.name), case start_children(ChAfter2, State#state.name) of {ok, ChAfter3} -> {ok, State#state{children = ChAfter3 ++ ChBefore}}; {error, ChAfter3} -> - restart(Child, State#state{children = ChAfter3 ++ ChBefore}) + Restart(Child, State#state{children = ChAfter3 ++ ChBefore}) end; -restart(one_for_all, Child, State) -> +restart(one_for_all, Child, State, Restart) -> Children1 = del_child(Child#child.pid, State#state.children), Children2 = terminate_children(Children1, State#state.name), case start_children(Children2, State#state.name) of {ok, NChs} -> {ok, State#state{children = NChs}}; {error, NChs} -> - restart(Child, State#state{children = NChs}) + Restart(Child, State#state{children = NChs}) end. %%----------------------------------------------------------------- @@ -785,7 +849,9 @@ supname(N,_) -> N. %%% {Name, Func, RestartType, Shutdown, ChildType, Modules} %%% where Name is an atom %%% Func is {Mod, Fun, Args} == {atom, atom, list} -%%% RestartType is permanent | temporary | transient +%%% RestartType is permanent | temporary | transient | +%%% {permanent, Delay} | +%%% {transient, Delay} where Delay >= 0 %%% Shutdown = integer() | infinity | brutal_kill %%% ChildType = supervisor | worker %%% Modules = [atom()] | dynamic @@ -831,10 +897,17 @@ validFunc({M, F, A}) when is_atom(M), is_list(A) -> true; validFunc(Func) -> throw({invalid_mfa, Func}). -validRestartType(permanent) -> true; -validRestartType(temporary) -> true; -validRestartType(transient) -> true; -validRestartType(RestartType) -> throw({invalid_restart_type, RestartType}). +validRestartType(permanent) -> true; +validRestartType(temporary) -> true; +validRestartType(transient) -> true; +validRestartType({permanent, Delay}) -> validDelay(Delay); +validRestartType({transient, Delay}) -> validDelay(Delay); +validRestartType(RestartType) -> throw({invalid_restart_type, + RestartType}). + +validDelay(Delay) when is_number(Delay), + Delay >= 0 -> true; +validDelay(What) -> throw({invalid_delay, What}). validShutdown(Shutdown, _) when is_integer(Shutdown), Shutdown > 0 -> true; diff --git a/src/test_sup.erl b/src/test_sup.erl new file mode 100644 index 0000000000..f41793bc89 --- /dev/null +++ b/src/test_sup.erl @@ -0,0 +1,94 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(test_sup). + +-behaviour(supervisor2). + +-export([test_supervisor_delayed_restart/0, + init/1, start_child/0]). + +test_supervisor_delayed_restart() -> + passed = with_sup(simple_one_for_one_terminate, + fun (SupPid) -> + {ok, _ChildPid} = + supervisor2:start_child(SupPid, []), + test_supervisor_delayed_restart(SupPid) + end), + passed = with_sup(one_for_one, fun test_supervisor_delayed_restart/1). + +test_supervisor_delayed_restart(SupPid) -> + ok = ping_child(SupPid), + ok = exit_child(SupPid), + timer:sleep(10), + ok = ping_child(SupPid), + ok = exit_child(SupPid), + timer:sleep(10), + timeout = ping_child(SupPid), + timer:sleep(1010), + ok = ping_child(SupPid), + passed. + +with_sup(RestartStrategy, Fun) -> + {ok, SupPid} = supervisor2:start_link(?MODULE, [RestartStrategy]), + Res = Fun(SupPid), + exit(SupPid, shutdown), + rabbit_misc:unlink_and_capture_exit(SupPid), + Res. + +init([RestartStrategy]) -> + {ok, {{RestartStrategy, 1, 1}, + [{test, {test_sup, start_child, []}, {permanent, 1}, + 16#ffffffff, worker, [test_sup]}]}}. + +start_child() -> + {ok, proc_lib:spawn_link(fun run_child/0)}. + +ping_child(SupPid) -> + Ref = make_ref(), + get_child_pid(SupPid) ! {ping, Ref, self()}, + receive {pong, Ref} -> ok + after 1000 -> timeout + end. + +exit_child(SupPid) -> + true = exit(get_child_pid(SupPid), abnormal), + ok. + +get_child_pid(SupPid) -> + [{_Id, ChildPid, worker, [test_sup]}] = + supervisor2:which_children(SupPid), + ChildPid. + +run_child() -> + receive {ping, Ref, Pid} -> Pid ! {pong, Ref}, + run_child() + end. |
