summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-07-29 22:41:26 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-07-29 22:41:26 +0100
commitc27fac485669a82f6ecd603d0edc811dd560e965 (patch)
tree231010c8f55cf7258d9186a0898d736b87850b6e /src
parent4f49ec5f4f210359e4397bad0d7aef6fbe1749dc (diff)
parent4be6a49a3c7504681d49722af2b82e9d79e27475 (diff)
downloadrabbitmq-server-git-c27fac485669a82f6ecd603d0edc811dd560e965.tar.gz
merge default into bug15930
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl47
-rw-r--r--src/rabbit_amqqueue_process.erl72
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_exchange.erl23
-rw-r--r--src/rabbit_misc.erl36
-rw-r--r--src/rabbit_mnesia.erl15
-rw-r--r--src/rabbit_queue_index.erl21
-rw-r--r--src/rabbit_tests.erl6
-rw-r--r--src/rabbit_variable_queue.erl4
-rw-r--r--src/supervisor2.erl115
-rw-r--r--src/test_sup.erl94
11 files changed, 346 insertions, 89 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index df9474439f..6bf2f6db28 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -35,7 +35,7 @@
-export([internal_declare/2, internal_delete/1,
maybe_run_queue_via_backing_queue/2,
update_ram_duration/1, set_ram_duration_target/2,
- set_maximum_since_use/2]).
+ set_maximum_since_use/2, maybe_expire/1]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
@@ -55,6 +55,8 @@
-include("rabbit.hrl").
-include_lib("stdlib/include/qlc.hrl").
+-define(EXPIRES_TYPE, long).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -83,8 +85,8 @@
-spec(with_or_die/2 :: (name(), qfun(A)) -> A).
-spec(assert_equivalence/5 ::
(rabbit_types:amqqueue(), boolean(), boolean(),
- rabbit_framing:amqp_table(), rabbit_types:maybe(pid))
- -> ok).
+ rabbit_framing:amqp_table(), rabbit_types:maybe(pid()))
+ -> 'ok' | no_return()).
-spec(check_exclusive_access/2 :: (rabbit_types:amqqueue(), pid()) -> 'ok').
-spec(with_exclusive_access_or_die/3 :: (name(), pid(), qfun(A)) -> A).
-spec(list/1 :: (rabbit_types:vhost()) -> [rabbit_types:amqqueue()]).
@@ -146,6 +148,7 @@
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
+-spec(maybe_expire/1 :: (pid()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> rabbit_types:amqqueue()).
@@ -186,6 +189,7 @@ recover_durable_queues(DurableQueues) ->
[Q || Q <- Qs, gen_server2:call(Q#amqqueue.pid, {init, true}) == Q].
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
+ ok = check_declare_arguments(QueueName, Args),
Q = start_queue_process(#amqqueue{name = QueueName,
durable = Durable,
auto_delete = AutoDelete,
@@ -253,11 +257,13 @@ with(Name, F) ->
with_or_die(Name, F) ->
with(Name, F, fun () -> rabbit_misc:not_found(Name) end).
-assert_equivalence(#amqqueue{durable = Durable, auto_delete = AutoDelete} = Q,
- Durable, AutoDelete, _Args, Owner) ->
+assert_equivalence(#amqqueue{durable = Durable,
+ auto_delete = AutoDelete} = Q,
+ Durable, AutoDelete, RequiredArgs, Owner) ->
+ assert_args_equivalence(Q, RequiredArgs),
check_exclusive_access(Q, Owner, strict);
assert_equivalence(#amqqueue{name = QueueName},
- _Durable, _AutoDelete, _Args, _Owner) ->
+ _Durable, _AutoDelete, _RequiredArgs, _Owner) ->
rabbit_misc:protocol_error(
not_allowed, "parameters for ~s not equivalent",
[rabbit_misc:rs(QueueName)]).
@@ -278,6 +284,32 @@ with_exclusive_access_or_die(Name, ReaderPid, F) ->
with_or_die(Name,
fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end).
+assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
+ RequiredArgs) ->
+ rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName,
+ [<<"x-expires">>]).
+
+check_declare_arguments(QueueName, Args) ->
+ [case Fun(rabbit_misc:table_lookup(Args, Key)) of
+ ok -> ok;
+ {error, Error} -> rabbit_misc:protocol_error(
+ precondition_failed,
+ "Invalid arguments in declaration of queue ~s: "
+ "~w (on argument: ~w)",
+ [rabbit_misc:rs(QueueName), Error, Key])
+ end || {Key, Fun} <- [{<<"x-expires">>, fun check_expires_argument/1}]],
+ ok.
+
+check_expires_argument(undefined) ->
+ ok;
+check_expires_argument({?EXPIRES_TYPE, Expires})
+ when is_integer(Expires) andalso Expires > 0 ->
+ ok;
+check_expires_argument({?EXPIRES_TYPE, _Expires}) ->
+ {error, expires_zero_or_less};
+check_expires_argument(_) ->
+ {error, expires_not_of_type_long}.
+
list(VHostPath) ->
mnesia:dirty_match_object(
rabbit_queue,
@@ -418,6 +450,9 @@ set_ram_duration_target(QPid, Duration) ->
set_maximum_since_use(QPid, Age) ->
gen_server2:pcast(QPid, 8, {set_maximum_since_use, Age}).
+maybe_expire(QPid) ->
+ gen_server2:pcast(QPid, 8, maybe_expire).
+
on_node_down(Node) ->
[Hook() ||
Hook <- rabbit_misc:execute_mnesia_transaction(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 468a41b206..67f0fcf5ae 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -56,8 +56,10 @@
backing_queue_state,
active_consumers,
blocked_consumers,
+ expires,
sync_timer_ref,
- rate_timer_ref
+ rate_timer_ref,
+ expiry_timer_ref
}).
-record(consumer, {tag, ack_required}).
@@ -102,15 +104,17 @@ init(Q) ->
process_flag(trap_exit, true),
{ok, BQ} = application:get_env(backing_queue_module),
- {ok, #q{q = Q#amqqueue{pid = self()},
- exclusive_consumer = none,
- has_had_consumers = false,
- backing_queue = BQ,
+ {ok, #q{q = Q#amqqueue{pid = self()},
+ exclusive_consumer = none,
+ has_had_consumers = false,
+ backing_queue = BQ,
backing_queue_state = undefined,
- active_consumers = queue:new(),
- blocked_consumers = queue:new(),
- sync_timer_ref = undefined,
- rate_timer_ref = undefined}, hibernate,
+ active_consumers = queue:new(),
+ blocked_consumers = queue:new(),
+ expires = undefined,
+ sync_timer_ref = undefined,
+ rate_timer_ref = undefined,
+ expiry_timer_ref = undefined}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(shutdown, State = #q{backing_queue = BQ}) ->
@@ -132,6 +136,12 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
+init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) ->
+ case rabbit_misc:table_lookup(Arguments, <<"x-expires">>) of
+ {long, Expires} -> ensure_expiry_timer(State#q{expires = Expires});
+ undefined -> State
+ end.
+
declare(Recover, From,
State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable},
backing_queue = BQ, backing_queue_state = undefined}) ->
@@ -145,7 +155,7 @@ declare(Recover, From,
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
BQS = BQ:init(QName, IsDurable, Recover),
- noreply(State#q{backing_queue_state = BQS});
+ noreply(init_expires(State#q{backing_queue_state = BQS}));
Q1 -> {stop, normal, {existing, Q1}, State}
end.
@@ -218,6 +228,27 @@ stop_rate_timer(State = #q{rate_timer_ref = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
State#q{rate_timer_ref = undefined}.
+stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) ->
+ State;
+stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State#q{expiry_timer_ref = undefined}.
+
+%% We only wish to expire where there are no consumers *and* when
+%% basic.get hasn't been called for the configured period.
+ensure_expiry_timer(State = #q{expires = undefined}) ->
+ State;
+ensure_expiry_timer(State = #q{expires = Expires}) ->
+ case is_unused(State) of
+ true ->
+ NewState = stop_expiry_timer(State),
+ {ok, TRef} = timer:apply_after(
+ Expires, rabbit_amqqueue, maybe_expire, [self()]),
+ NewState#q{expiry_timer_ref = TRef};
+ false ->
+ State
+ end.
+
assert_invariant(#q{active_consumers = AC,
backing_queue = BQ, backing_queue_state = BQS}) ->
true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)).
@@ -439,7 +470,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
_ -> rollback_transaction(Txn, ChPid,
State1)
end,
- {ok, requeue_and_run(sets:to_list(ChAckTags), State2)}
+ {ok, requeue_and_run(sets:to_list(ChAckTags),
+ ensure_expiry_timer(State2))}
end
end.
@@ -610,8 +642,9 @@ handle_call({basic_get, ChPid, NoAck}, _From,
State = #q{q = #amqqueue{name = QName},
backing_queue_state = BQS, backing_queue = BQ}) ->
AckRequired = not NoAck,
+ State1 = ensure_expiry_timer(State),
case BQ:fetch(AckRequired, BQS) of
- {empty, BQS1} -> reply(empty, State#q{backing_queue_state = BQS1});
+ {empty, BQS1} -> reply(empty, State1#q{backing_queue_state = BQS1});
{{Message, IsDelivered, AckTag, Remaining}, BQS1} ->
case AckRequired of
true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
@@ -620,7 +653,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
false -> ok
end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
- reply({ok, Remaining, Msg}, State#q{backing_queue_state = BQS1})
+ reply({ok, Remaining, Msg}, State1#q{backing_queue_state = BQS1})
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid,
@@ -687,7 +720,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
ChPid, ConsumerTag,
State#q.blocked_consumers)},
case should_auto_delete(NewState) of
- false -> reply(ok, NewState);
+ false -> reply(ok, ensure_expiry_timer(NewState));
true -> {stop, normal, ok, NewState}
end
end;
@@ -747,7 +780,7 @@ handle_cast({ack, Txn, AckTags, ChPid},
_ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)}
end,
store_ch_record(C1),
- noreply(State #q { backing_queue_state = BQS1 })
+ noreply(State#q{backing_queue_state = BQS1})
end;
handle_cast({rollback, Txn, ChPid}, State) ->
@@ -801,7 +834,14 @@ handle_cast({set_ram_duration_target, Duration},
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
- noreply(State).
+ noreply(State);
+
+handle_cast(maybe_expire, State) ->
+ case is_unused(State) of
+ true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]),
+ {stop, normal, State};
+ false -> noreply(ensure_expiry_timer(State))
+ end.
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
State = #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 5757d9f369..373cb6907d 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -986,7 +986,7 @@ collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) ->
end;
{empty, _} ->
rabbit_misc:protocol_error(
- not_found, "unknown delivery tag ~w", [DeliveryTag])
+ precondition_failed, "unknown delivery tag ~w", [DeliveryTag])
end.
add_tx_participants(MoreP, State = #ch{tx_participants = Participants}) ->
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index f04341628c..7f7622b255 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -75,9 +75,10 @@
-spec(assert_equivalence/5 ::
(rabbit_types:exchange(), atom(), boolean(), boolean(),
rabbit_framing:amqp_table())
- -> 'ok').
+ -> 'ok' | no_return()).
-spec(assert_args_equivalence/2 ::
- (rabbit_types:exchange(), rabbit_framing:amqp_table()) -> 'ok').
+ (rabbit_types:exchange(), rabbit_framing:amqp_table()) ->
+ 'ok' | no_return()).
-spec(lookup/1 ::
(name()) -> rabbit_types:ok(rabbit_types:exchange()) |
rabbit_types:error('not_found')).
@@ -217,9 +218,8 @@ check_type(TypeBin) ->
assert_equivalence(X = #exchange{ durable = Durable,
auto_delete = AutoDelete,
type = Type},
- Type, Durable, AutoDelete,
- RequiredArgs) ->
- ok = (type_to_module(Type)):assert_args_equivalence(X, RequiredArgs);
+ Type, Durable, AutoDelete, RequiredArgs) ->
+ (type_to_module(Type)):assert_args_equivalence(X, RequiredArgs);
assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete,
_Args) ->
rabbit_misc:protocol_error(
@@ -227,23 +227,14 @@ assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete,
"cannot redeclare ~s with different type, durable or autodelete value",
[rabbit_misc:rs(Name)]).
-alternate_exchange_value(Args) ->
- lists:keysearch(<<"alternate-exchange">>, 1, Args).
-
assert_args_equivalence(#exchange{ name = Name,
arguments = Args },
RequiredArgs) ->
%% The spec says "Arguments are compared for semantic
%% equivalence". The only arg we care about is
%% "alternate-exchange".
- Ae1 = alternate_exchange_value(RequiredArgs),
- Ae2 = alternate_exchange_value(Args),
- if Ae1==Ae2 -> ok;
- true -> rabbit_misc:protocol_error(
- not_allowed,
- "cannot redeclare ~s with inequivalent args",
- [rabbit_misc:rs(Name)])
- end.
+ rabbit_misc:assert_args_equivalence(Args, RequiredArgs, Name,
+ [<<"alternate-exchange">>]).
lookup(Name) ->
rabbit_misc:dirty_read({rabbit_exchange, Name}).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index a0a5ba589d..050b499f0f 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -38,9 +38,10 @@
-export([method_record_type/1, polite_pause/0, polite_pause/1]).
-export([die/1, frame_error/2, amqp_error/4,
protocol_error/3, protocol_error/4, protocol_error/1]).
--export([not_found/1]).
+-export([not_found/1, assert_args_equivalence/4]).
-export([get_config/1, get_config/2, set_config/2]).
-export([dirty_read/1]).
+-export([table_lookup/2]).
-export([r/3, r/2, r_arg/4, rs/1]).
-export([enable_cover/0, report_cover/0]).
-export([enable_cover/1, report_cover/1]).
@@ -98,12 +99,19 @@
-> no_return()).
-spec(protocol_error/1 :: (rabbit_types:amqp_error()) -> no_return()).
-spec(not_found/1 :: (rabbit_types:r(atom())) -> no_return()).
+-spec(assert_args_equivalence/4 :: (rabbit_framing:amqp_table(),
+ rabbit_framing:amqp_table(),
+ rabbit_types:r(any()), [binary()]) ->
+ 'ok' | no_return()).
-spec(get_config/1 ::
(atom()) -> rabbit_types:ok_or_error2(any(), 'not_found')).
-spec(get_config/2 :: (atom(), A) -> A).
-spec(set_config/2 :: (atom(), any()) -> 'ok').
-spec(dirty_read/1 ::
({atom(), any()}) -> rabbit_types:ok_or_error2(any(), 'not_found')).
+-spec(table_lookup/2 ::
+ (rabbit_framing:amqp_table(), binary())
+ -> 'undefined' | {rabbit_framing:amqp_field_type(), any()}).
-spec(r/2 :: (rabbit_types:vhost(), K)
-> rabbit_types:r3(rabbit_types:vhost(), K, '_')
when is_subtype(K, atom())).
@@ -211,6 +219,20 @@ protocol_error(#amqp_error{} = Error) ->
not_found(R) -> protocol_error(not_found, "no ~s", [rs(R)]).
+assert_args_equivalence(Orig, New, Name, Keys) ->
+ [assert_args_equivalence1(Orig, New, Name, Key) || Key <- Keys],
+ ok.
+
+assert_args_equivalence1(Orig, New, Name, Key) ->
+ case {table_lookup(Orig, Key), table_lookup(New, Key)} of
+ {Same, Same} -> ok;
+ {Orig1, New1} -> protocol_error(
+ not_allowed,
+ "cannot redeclare ~s with inequivalent args for ~s: "
+ "required ~w, received ~w",
+ [rabbit_misc:rs(Name), Key, New1, Orig1])
+ end.
+
get_config(Key) ->
case dirty_read({rabbit_config, Key}) of
{ok, {rabbit_config, Key, V}} -> {ok, V};
@@ -232,6 +254,12 @@ dirty_read(ReadSpec) ->
[] -> {error, not_found}
end.
+table_lookup(Table, Key) ->
+ case lists:keysearch(Key, 1, Table) of
+ {value, {_, TypeBin, ValueBin}} -> {TypeBin, ValueBin};
+ false -> undefined
+ end.
+
r(#resource{virtual_host = VHostPath}, Kind, Name)
when is_binary(Name) ->
#resource{virtual_host = VHostPath, kind = Kind, name = Name};
@@ -244,9 +272,9 @@ r(VHostPath, Kind) when is_binary(VHostPath) ->
r_arg(#resource{virtual_host = VHostPath}, Kind, Table, Key) ->
r_arg(VHostPath, Kind, Table, Key);
r_arg(VHostPath, Kind, Table, Key) ->
- case lists:keysearch(Key, 1, Table) of
- {value, {_, longstr, NameBin}} -> r(VHostPath, Kind, NameBin);
- false -> undefined
+ case table_lookup(Table, Key) of
+ {longstr, NameBin} -> r(VHostPath, Kind, NameBin);
+ undefined -> undefined
end.
rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) ->
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 689f799de9..c808499b0d 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -264,20 +264,9 @@ read_cluster_nodes_config() ->
case rabbit_misc:read_term_file(FileName) of
{ok, [ClusterNodes]} -> ClusterNodes;
{error, enoent} ->
- case application:get_env(cluster_config) of
+ case application:get_env(cluster_nodes) of
undefined -> [];
- {ok, DefaultFileName} ->
- case file:consult(DefaultFileName) of
- {ok, [ClusterNodes]} -> ClusterNodes;
- {error, enoent} ->
- error_logger:warning_msg(
- "default cluster config file ~p does not exist~n",
- [DefaultFileName]),
- [];
- {error, Reason} ->
- throw({error, {cannot_read_cluster_nodes_config,
- DefaultFileName, Reason}})
- end
+ {ok, ClusterNodes} -> ClusterNodes
end;
{error, Reason} ->
throw({error, {cannot_read_cluster_nodes_config,
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 91b19976c6..d6b8bb2889 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -31,7 +31,7 @@
-module(rabbit_queue_index).
--export([init/3, terminate/2, delete_and_terminate/1, publish/4,
+-export([init/4, terminate/2, delete_and_terminate/1, publish/4,
deliver/2, ack/2, sync/2, flush/1, read/3,
next_segment_boundary/1, bounds/1, recover/1]).
@@ -193,7 +193,7 @@
{(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})),
A}).
--spec(init/3 :: (rabbit_amqqueue:name(), boolean(),
+-spec(init/4 :: (rabbit_amqqueue:name(), boolean(), boolean(),
fun ((rabbit_guid:guid()) -> boolean())) ->
{'undefined' | non_neg_integer(), [any()], qistate()}).
-spec(terminate/2 :: ([any()], qistate()) -> qistate()).
@@ -220,8 +220,8 @@
%% public API
%%----------------------------------------------------------------------------
-init(Name, MsgStoreRecovered, ContainsCheckFun) ->
- State = #qistate { dir = Dir } = blank_state(Name),
+init(Name, Recover, MsgStoreRecovered, ContainsCheckFun) ->
+ State = #qistate { dir = Dir } = blank_state(Name, not Recover),
Terms = case read_shutdown_terms(Dir) of
{error, _} -> [];
{ok, Terms1} -> Terms1
@@ -356,9 +356,14 @@ recover(DurableQueues) ->
%% startup and shutdown
%%----------------------------------------------------------------------------
-blank_state(QueueName) ->
+blank_state(QueueName, EnsureFresh) ->
StrName = queue_name_to_dir_name(QueueName),
Dir = filename:join(queues_dir(), StrName),
+ ok = case EnsureFresh of
+ true -> false = filelib:is_file(Dir), %% is_file == is file or dir
+ ok;
+ false -> ok
+ end,
ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
{ok, MaxJournal} =
application:get_env(rabbit, queue_index_max_journal_entries),
@@ -463,9 +468,7 @@ recover_message(false, _, no_del, RelSeq, Segment) ->
add_to_journal(RelSeq, ack, add_to_journal(RelSeq, del, Segment)).
queue_name_to_dir_name(Name = #resource { kind = queue }) ->
- Bin = term_to_binary(Name),
- Size = 8*size(Bin),
- <<Num:Size>> = Bin,
+ <<Num:128>> = erlang:md5(term_to_binary(Name)),
lists:flatten(io_lib:format("~.36B", [Num])).
queues_dir() ->
@@ -497,7 +500,7 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
queue_index_walker_reader(QueueName, Gatherer) ->
State = #qistate { segments = Segments, dir = Dir } =
- recover_journal(blank_state(QueueName)),
+ recover_journal(blank_state(QueueName, false)),
[ok = segment_entries_foldr(
fun (_RelSeq, {{Guid, true}, _IsDelivered, no_ack}, ok) ->
gatherer:in(Gatherer, {Guid, 1});
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 9acc58c946..b46df02de7 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -60,6 +60,7 @@ all_tests() ->
passed = test_bpqueue(),
passed = test_pg_local(),
passed = test_unfold(),
+ passed = test_supervisor_delayed_restart(),
passed = test_parsing(),
passed = test_content_framing(),
passed = test_topic_matching(),
@@ -1344,6 +1345,9 @@ bad_handle_hook(_, _, _) ->
extra_arg_hook(Hookname, Handler, Args, Extra1, Extra2) ->
handle_hook(Hookname, Handler, {Args, Extra1, Extra2}).
+test_supervisor_delayed_restart() ->
+ test_sup:test_supervisor_delayed_restart().
+
test_backing_queue() ->
case application:get_env(rabbit, backing_queue_module) of
{ok, rabbit_variable_queue} ->
@@ -1550,7 +1554,7 @@ test_queue() ->
init_test_queue() ->
rabbit_queue_index:init(
- test_queue(), false,
+ test_queue(), true, false,
fun (Guid) ->
rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)
end).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 92ffc511e0..0f52eee84f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -368,10 +368,10 @@ stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
-init(QueueName, IsDurable, _Recover) ->
+init(QueueName, IsDurable, Recover) ->
{DeltaCount, Terms, IndexState} =
rabbit_queue_index:init(
- QueueName,
+ QueueName, Recover,
rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
fun (Guid) ->
rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 55f16feeb1..d716108cd0 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -4,15 +4,38 @@
%% 1) the module name is supervisor2
%%
%% 2) there is a new strategy called
-%% simple_one_for_one_terminate. This is exactly the same as for
-%% simple_one_for_one, except that children *are* explicitly
-%% terminated as per the shutdown component of the child_spec.
+%% simple_one_for_one_terminate. This is exactly the same as for
+%% simple_one_for_one, except that children *are* explicitly
+%% terminated as per the shutdown component of the child_spec.
%%
-%% 3) When the MaxR (intensity) == 0, errors that would otherwise be
-%% reported concerning child death, or the reaching of max restart
-%% intensity are elided.
+%% 3) child specifications can contain, as the restart type, a tuple
+%% {permanent, Delay} | {transient, Delay} where Delay >= 0. The
+%% delay, in seconds, indicates what should happen if a child, upon
+%% being restarted, exceeds the MaxT and MaxR parameters. Thus, if
+%% a child exits, it is restarted as normal. If it exits
+%% sufficiently quickly and often to exceed the boundaries set by
+%% the MaxT and MaxR parameters, and a Delay is specified, then
+%% rather than stopping the supervisor, the supervisor instead
+%% continues and tries to start up the child again, Delay seconds
+%% later.
%%
-%% All modifications are (C) 2010 LShift Ltd.
+%% Note that you can never restart more frequently than the MaxT
+%% and MaxR parameters allow: i.e. you must wait until *both* the
+%% Delay has passed *and* the MaxT and MaxR parameters allow the
+%% child to be restarted.
+%%
+%% Also note that the Delay is a *minimum*. There is no guarantee
+%% that the child will be restarted within that time, especially if
+%% other processes are dying and being restarted at the same time -
+%% essentially we have to wait for the delay to have passed and for
+%% the MaxT and MaxR parameters to permit the child to be
+%% restarted. This may require waiting for longer than Delay.
+%%
+%% 4) When the MaxR (intensity) == 0, errors that would otherwise be
+%% reported concerning child death, or the reaching of max restart
+%% intensity are elided.
+%%
+%% All modifications are (C) 2010 Rabbit Technologies Ltd.
%%
%% %CopyrightBegin%
%%
@@ -47,6 +70,7 @@
%% Internal exports
-export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3]).
-export([handle_cast/2]).
+-export([delayed_restart/2]).
-define(DICT, dict).
@@ -127,6 +151,9 @@ check_childspecs(ChildSpecs) when is_list(ChildSpecs) ->
end;
check_childspecs(X) -> {error, {badarg, X}}.
+delayed_restart(Supervisor, RestartDetails) ->
+ gen_server:cast(Supervisor, {delayed_restart, RestartDetails}).
+
%%% ---------------------------------------------------
%%%
%%% Initialize the supervisor.
@@ -322,6 +349,20 @@ handle_call(which_children, _From, State) ->
State#state.children),
{reply, Resp, State}.
+handle_cast({delayed_restart, {RestartType, Reason, Child}}, State)
+ when ?is_simple(State) ->
+ {ok, NState} = do_restart(RestartType, Reason, Child, State),
+ {noreply, NState};
+handle_cast({delayed_restart, {RestartType, Reason, Child}}, State)
+ when not (?is_simple(State)) ->
+ case get_child(Child#child.name, State) of
+ {value, Child} ->
+ {ok, NState} = do_restart(RestartType, Reason, Child, State),
+ {noreply, NState};
+ _ ->
+ {noreply, State}
+ end;
+
%%% Hopefully cause a function-clause as there is no API function
%%% that utilizes cast.
handle_cast(null, State) ->
@@ -487,6 +528,16 @@ restart_child(Pid, Reason, State) ->
{ok, State}
end.
+do_restart({RestartType, Delay}, Reason, Child, State) ->
+ case restart1(Child, State) of
+ {ok, NState} ->
+ {ok, NState};
+ {terminate, NState} ->
+ {ok, _TRef} = timer:apply_after(
+ trunc(Delay*1000), ?MODULE, delayed_restart,
+ [self(), {{RestartType, Delay}, Reason, Child}]),
+ {ok, NState}
+ end;
do_restart(permanent, Reason, Child, State) ->
report_error(child_terminated, Reason, Child, State),
restart(Child, State);
@@ -507,14 +558,27 @@ do_restart(temporary, Reason, Child, State) ->
restart(Child, State) ->
case add_restart(State) of
{ok, NState} ->
- restart(NState#state.strategy, Child, NState);
+ restart(NState#state.strategy, Child, NState, fun restart/2);
{terminate, NState} ->
report_error(shutdown, reached_max_restart_intensity,
Child, State),
{shutdown, remove_child(Child, NState)}
end.
-restart(Strategy, Child, State)
+restart1(Child, State) ->
+ case add_restart(State) of
+ {ok, NState} ->
+ restart(NState#state.strategy, Child, NState, fun restart1/2);
+ {terminate, _NState} ->
+ %% we've reached the max restart intensity, but the
+ %% add_restart will have added to the restarts
+ %% field. Given we don't want to die here, we need to go
+ %% back to the old restarts field otherwise we'll never
+ %% attempt to restart later.
+ {terminate, State}
+ end.
+
+restart(Strategy, Child, State, Restart)
when Strategy =:= simple_one_for_one orelse
Strategy =:= simple_one_for_one_terminate ->
#child{mfa = {M, F, A}} = Child,
@@ -528,9 +592,9 @@ restart(Strategy, Child, State)
{ok, NState};
{error, Error} ->
report_error(start_error, Error, Child, State#state.name),
- restart(Child, State)
+ Restart(Child, State)
end;
-restart(one_for_one, Child, State) ->
+restart(one_for_one, Child, State, Restart) ->
case do_start_child(State#state.name, Child) of
{ok, Pid} ->
NState = replace_child(Child#child{pid = Pid}, State),
@@ -540,25 +604,25 @@ restart(one_for_one, Child, State) ->
{ok, NState};
{error, Reason} ->
report_error(start_error, Reason, Child, State#state.name),
- restart(Child, State)
+ Restart(Child, State)
end;
-restart(rest_for_one, Child, State) ->
+restart(rest_for_one, Child, State, Restart) ->
{ChAfter, ChBefore} = split_child(Child#child.pid, State#state.children),
ChAfter2 = terminate_children(ChAfter, State#state.name),
case start_children(ChAfter2, State#state.name) of
{ok, ChAfter3} ->
{ok, State#state{children = ChAfter3 ++ ChBefore}};
{error, ChAfter3} ->
- restart(Child, State#state{children = ChAfter3 ++ ChBefore})
+ Restart(Child, State#state{children = ChAfter3 ++ ChBefore})
end;
-restart(one_for_all, Child, State) ->
+restart(one_for_all, Child, State, Restart) ->
Children1 = del_child(Child#child.pid, State#state.children),
Children2 = terminate_children(Children1, State#state.name),
case start_children(Children2, State#state.name) of
{ok, NChs} ->
{ok, State#state{children = NChs}};
{error, NChs} ->
- restart(Child, State#state{children = NChs})
+ Restart(Child, State#state{children = NChs})
end.
%%-----------------------------------------------------------------
@@ -785,7 +849,9 @@ supname(N,_) -> N.
%%% {Name, Func, RestartType, Shutdown, ChildType, Modules}
%%% where Name is an atom
%%% Func is {Mod, Fun, Args} == {atom, atom, list}
-%%% RestartType is permanent | temporary | transient
+%%% RestartType is permanent | temporary | transient |
+%%% {permanent, Delay} |
+%%% {transient, Delay} where Delay >= 0
%%% Shutdown = integer() | infinity | brutal_kill
%%% ChildType = supervisor | worker
%%% Modules = [atom()] | dynamic
@@ -831,10 +897,17 @@ validFunc({M, F, A}) when is_atom(M),
is_list(A) -> true;
validFunc(Func) -> throw({invalid_mfa, Func}).
-validRestartType(permanent) -> true;
-validRestartType(temporary) -> true;
-validRestartType(transient) -> true;
-validRestartType(RestartType) -> throw({invalid_restart_type, RestartType}).
+validRestartType(permanent) -> true;
+validRestartType(temporary) -> true;
+validRestartType(transient) -> true;
+validRestartType({permanent, Delay}) -> validDelay(Delay);
+validRestartType({transient, Delay}) -> validDelay(Delay);
+validRestartType(RestartType) -> throw({invalid_restart_type,
+ RestartType}).
+
+validDelay(Delay) when is_number(Delay),
+ Delay >= 0 -> true;
+validDelay(What) -> throw({invalid_delay, What}).
validShutdown(Shutdown, _)
when is_integer(Shutdown), Shutdown > 0 -> true;
diff --git a/src/test_sup.erl b/src/test_sup.erl
new file mode 100644
index 0000000000..f41793bc89
--- /dev/null
+++ b/src/test_sup.erl
@@ -0,0 +1,94 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2010 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2010 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(test_sup).
+
+-behaviour(supervisor2).
+
+-export([test_supervisor_delayed_restart/0,
+ init/1, start_child/0]).
+
+test_supervisor_delayed_restart() ->
+ passed = with_sup(simple_one_for_one_terminate,
+ fun (SupPid) ->
+ {ok, _ChildPid} =
+ supervisor2:start_child(SupPid, []),
+ test_supervisor_delayed_restart(SupPid)
+ end),
+ passed = with_sup(one_for_one, fun test_supervisor_delayed_restart/1).
+
+test_supervisor_delayed_restart(SupPid) ->
+ ok = ping_child(SupPid),
+ ok = exit_child(SupPid),
+ timer:sleep(10),
+ ok = ping_child(SupPid),
+ ok = exit_child(SupPid),
+ timer:sleep(10),
+ timeout = ping_child(SupPid),
+ timer:sleep(1010),
+ ok = ping_child(SupPid),
+ passed.
+
+with_sup(RestartStrategy, Fun) ->
+ {ok, SupPid} = supervisor2:start_link(?MODULE, [RestartStrategy]),
+ Res = Fun(SupPid),
+ exit(SupPid, shutdown),
+ rabbit_misc:unlink_and_capture_exit(SupPid),
+ Res.
+
+init([RestartStrategy]) ->
+ {ok, {{RestartStrategy, 1, 1},
+ [{test, {test_sup, start_child, []}, {permanent, 1},
+ 16#ffffffff, worker, [test_sup]}]}}.
+
+start_child() ->
+ {ok, proc_lib:spawn_link(fun run_child/0)}.
+
+ping_child(SupPid) ->
+ Ref = make_ref(),
+ get_child_pid(SupPid) ! {ping, Ref, self()},
+ receive {pong, Ref} -> ok
+ after 1000 -> timeout
+ end.
+
+exit_child(SupPid) ->
+ true = exit(get_child_pid(SupPid), abnormal),
+ ok.
+
+get_child_pid(SupPid) ->
+ [{_Id, ChildPid, worker, [test_sup]}] =
+ supervisor2:which_children(SupPid),
+ ChildPid.
+
+run_child() ->
+ receive {ping, Ref, Pid} -> Pid ! {pong, Ref},
+ run_child()
+ end.