summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAsk Solem <askh@opera.com>2010-07-19 17:09:13 +0100
committerAsk Solem <askh@opera.com>2010-07-19 17:09:13 +0100
commit771d0b6d33f5adb6ab91ae4a21121e8e992dfc2e (patch)
tree318bdbd0a46b183b3d641f25f98755cb4722c89c /src
parent89d63aa6763a95e85c5afb8b587c693871fe3afb (diff)
downloadrabbitmq-server-git-771d0b6d33f5adb6ab91ae4a21121e8e992dfc2e.tar.gz
Initial commit of Ask's work
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl58
-rw-r--r--src/rabbit_amqqueue_process.erl81
-rw-r--r--src/rabbit_misc.erl26
3 files changed, 143 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index e5faef5416..c12c066b8e 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -35,7 +35,7 @@
-export([internal_declare/2, internal_delete/1,
maybe_run_queue_via_backing_queue/2,
update_ram_duration/1, set_ram_duration_target/2,
- set_maximum_since_use/2]).
+ set_maximum_since_use/2, maybe_expire/1]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
@@ -146,6 +146,7 @@
-spec(update_ram_duration/1 :: (pid()) -> 'ok').
-spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok').
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
+-spec(maybe_expire/1 :: (pid()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> rabbit_types:amqqueue()).
@@ -183,6 +184,20 @@ recover_durable_queues(DurableQueues) ->
Qs = [start_queue_process(Q) || Q <- DurableQueues],
[Q || Q <- Qs, gen_server2:call(Q#amqqueue.pid, {init, true}) == Q].
+handle_error(QueueName, expires_not_of_type_long) ->
+ rabbit_misc:protocol_error(
+ precondition_failed,
+ "~s: Argument x-expires must be of type long.",
+ [rabbit_misc:rs(QueueName)]);
+handle_error(QueueName, expires_zero_or_less) ->
+ rabbit_misc:protocol_error(
+ precondition_failed,
+ "~s: Argument x-expires must be more than zero.",
+ [rabbit_misc:rs(QueueName)]);
+handle_error(QueueName, Error) ->
+ rabbit_misc:protocol_error(
+ internal_error, "Queue ~s: ~w", [rabbit_misc:rs(QueueName), Error]).
+
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
Q = start_queue_process(#amqqueue{name = QueueName,
durable = Durable,
@@ -191,8 +206,9 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
exclusive_owner = Owner,
pid = none}),
case gen_server2:call(Q#amqqueue.pid, {init, false}) of
- not_found -> rabbit_misc:not_found(QueueName);
- Q1 -> Q1
+ {error, Error} -> handle_error(QueueName, Error);
+ not_found -> rabbit_misc:not_found(QueueName);
+ Q1 -> Q1
end.
internal_declare(Q = #amqqueue{name = QueueName}, Recover) ->
@@ -227,8 +243,10 @@ store_queue(Q = #amqqueue{durable = false}) ->
ok.
start_queue_process(Q) ->
- {ok, Pid} = rabbit_amqqueue_sup:start_child([Q]),
- Q#amqqueue{pid = Pid}.
+ case rabbit_amqqueue_sup:start_child([Q]) of
+ {ok, Pid} -> Q#amqqueue{pid = Pid};
+ {error, Error} -> handle_error(Q#amqqueue.name, Error)
+ end.
add_default_binding(#amqqueue{name = QueueName}) ->
Exchange = rabbit_misc:r(QueueName, exchange, <<>>),
@@ -251,8 +269,12 @@ with(Name, F) ->
with_or_die(Name, F) ->
with(Name, F, fun () -> rabbit_misc:not_found(Name) end).
-assert_equivalence(#amqqueue{durable = Durable, auto_delete = AutoDelete} = Q,
- Durable, AutoDelete, _Args, Owner) ->
+assert_equivalence(#amqqueue{name = Name,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args1} = Q,
+ Durable, AutoDelete, Args, Owner) ->
+ check_argument_equivalent(Args1, Args, <<"x-expires">>, Name),
check_exclusive_access(Q, Owner, strict);
assert_equivalence(#amqqueue{name = QueueName},
_Durable, _AutoDelete, _Args, _Owner) ->
@@ -276,6 +298,25 @@ with_exclusive_access_or_die(Name, ReaderPid, F) ->
with_or_die(Name,
fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end).
+check_argument_equivalent(Prev, Now, Key, QueueName) ->
+ {AType, AVal} = rabbit_misc:table_lookup(Now, Key),
+ {BType, BVal} = rabbit_misc:table_lookup(Prev, Key),
+ if AType =:= BType
+ -> ok;
+ true -> rabbit_misc:protocol_error(
+ precondition_failed,
+ "argument types for ~s not equivalent: ~s=~w (was ~w)",
+ [rabbit_misc:rs(QueueName), Key, AType, BType])
+ end,
+
+ if AVal == BVal
+ -> ok;
+ true -> rabbit_misc:protocol_error(
+ precondition_failed,
+ "arguments for ~s not equivalent: ~s=~w (was ~w)",
+ [rabbit_misc:rs(QueueName), Key, AVal, BVal])
+ end.
+
list(VHostPath) ->
mnesia:dirty_match_object(
rabbit_queue,
@@ -416,6 +457,9 @@ set_ram_duration_target(QPid, Duration) ->
set_maximum_since_use(QPid, Age) ->
gen_server2:pcast(QPid, 8, {set_maximum_since_use, Age}).
+maybe_expire(QPid) ->
+ gen_server2:pcast(QPid, 8, maybe_expire).
+
on_node_down(Node) ->
[Hook() ||
Hook <- rabbit_misc:execute_mnesia_transaction(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a2cbcf5517..834176e1fb 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -56,9 +56,10 @@
backing_queue_state,
active_consumers,
blocked_consumers,
+ expires,
sync_timer_ref,
- rate_timer_ref
- }).
+ rate_timer_ref,
+ expiry_timer_ref}).
-record(consumer, {tag, ack_required}).
@@ -97,12 +98,32 @@ info_keys() -> ?INFO_KEYS.
%%----------------------------------------------------------------------------
+-define(EXPIRES_TYPE, long).
+
+check_argument_expires(?EXPIRES_TYPE, Expires) when not is_integer(Expires) ->
+ {error, expires_not_of_type_long};
+check_argument_expires(?EXPIRES_TYPE, Expires) when Expires =< 0 ->
+ {error, expires_zero_or_less};
+check_argument_expires(undefined, undefined) ->
+ {ok, undefined};
+check_argument_expires(?EXPIRES_TYPE, Expires) ->
+ {ok, Expires};
+check_argument_expires(_, _) ->
+ {error, expires_not_of_type_long}.
+
+init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) ->
+ {Type, Expires} = rabbit_misc:table_lookup(Arguments, <<"x-expires">>),
+ case check_argument_expires(Type, Expires) of
+ {error, Error} -> {error, Error};
+ {ok, Expires} -> start_expiry_timer(State, Expires)
+ end.
+
init(Q) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
{ok, BQ} = application:get_env(backing_queue_module),
- {ok, #q{q = Q#amqqueue{pid = self()},
+ State = #q{q = Q#amqqueue{pid = self()},
exclusive_consumer = none,
has_had_consumers = false,
backing_queue = BQ,
@@ -110,8 +131,15 @@ init(Q) ->
active_consumers = queue:new(),
blocked_consumers = queue:new(),
sync_timer_ref = undefined,
- rate_timer_ref = undefined}, hibernate,
- {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
+ rate_timer_ref = undefined,
+ expiry_timer_ref = undefined},
+
+ case init_expires(State) of
+ {error, Error} -> {stop, Error};
+ NewState -> {ok, NewState, hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN,
+ ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}
+ end.
terminate(shutdown, State = #q{backing_queue = BQ}) ->
terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State);
@@ -218,6 +246,26 @@ stop_rate_timer(State = #q{rate_timer_ref = TRef}) ->
{ok, cancel} = timer:cancel(TRef),
State#q{rate_timer_ref = undefined}.
+stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) ->
+ State;
+stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) ->
+ {ok, cancel} = timer:cancel(TRef),
+ State#q{expiry_timer_ref = undefined}.
+
+start_expiry_timer(State = #q{expires = undefined}) ->
+ State;
+start_expiry_timer(State = #q{expires = Expires}) ->
+ ?LOGDEBUG("~p: Starting expire timer: ~p~n", [State#q.q, Expires]),
+ NewState = stop_expiry_timer(State),
+ {ok, TRef} = timer:apply_after(
+ Expires,
+ rabbit_amqqueue, maybe_expire,
+ [self()]),
+ NewState#q{expiry_timer_ref = TRef}.
+
+start_expiry_timer(State, Expires) ->
+ start_expiry_timer(State#q{expires = Expires}).
+
assert_invariant(#q{active_consumers = AC,
backing_queue = BQ, backing_queue_state = BQS}) ->
true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)).
@@ -611,7 +659,8 @@ handle_call({basic_get, ChPid, NoAck}, _From,
backing_queue_state = BQS, backing_queue = BQ}) ->
AckRequired = not NoAck,
case BQ:fetch(AckRequired, BQS) of
- {empty, BQS1} -> reply(empty, State#q{backing_queue_state = BQS1});
+ {empty, BQS1} -> reply(empty, start_expiry_timer(
+ State#q{backing_queue_state = BQS1}));
{{Message, IsDelivered, AckTag, Remaining}, BQS1} ->
case AckRequired of
true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
@@ -620,7 +669,8 @@ handle_call({basic_get, ChPid, NoAck}, _From,
false -> ok
end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
- reply({ok, Remaining, Msg}, State#q{backing_queue_state = BQS1})
+ reply({ok, Remaining, Msg},
+ start_expiry_timer(State#q{backing_queue_state = BQS1}))
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid,
@@ -660,7 +710,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid,
ChPid, Consumer,
State1#q.active_consumers)})
end,
- reply(ok, State2)
+ reply(ok, start_expiry_timer(State2))
end;
handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
@@ -687,7 +737,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
ChPid, ConsumerTag,
State#q.blocked_consumers)},
case should_auto_delete(NewState) of
- false -> reply(ok, NewState);
+ false -> reply(ok, start_expiry_timer(NewState));
true -> {stop, normal, ok, NewState}
end
end;
@@ -725,7 +775,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
C = #cr{acktags = ChAckTags} ->
ChAckTags1 = subtract_acks(ChAckTags, AckTags),
store_ch_record(C#cr{acktags = ChAckTags1}),
- noreply(requeue_and_run(AckTags, State))
+ noreply(start_expiry_timer(requeue_and_run(AckTags, State)))
end;
handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) ->
@@ -749,7 +799,7 @@ handle_cast({ack, Txn, AckTags, ChPid},
_ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)}
end,
store_ch_record(C1),
- noreply(State #q { backing_queue_state = BQS1 })
+ noreply(start_expiry_timer(State#q{backing_queue_state = BQS1}))
end;
handle_cast({rollback, Txn, ChPid}, State) ->
@@ -803,7 +853,14 @@ handle_cast({set_ram_duration_target, Duration},
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
- noreply(State).
+ noreply(State);
+
+handle_cast(maybe_expire, State) ->
+ case is_unused(State) of
+ true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]),
+ {stop, normal, State};
+ false -> noreply(start_expiry_timer(State))
+ end.
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
State = #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index fcc9fc7e54..a0061b1201 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -41,6 +41,7 @@
-export([not_found/1]).
-export([get_config/1, get_config/2, set_config/2]).
-export([dirty_read/1]).
+-export([table_lookup/2, table_lookup/3]).
-export([r/3, r/2, r_arg/4, rs/1]).
-export([enable_cover/0, report_cover/0]).
-export([enable_cover/1, report_cover/1]).
@@ -103,6 +104,13 @@
-spec(set_config/2 :: (atom(), any()) -> 'ok').
-spec(dirty_read/1 ::
({atom(), any()}) -> rabbit_types:ok_or_error2(any(), 'not_found')).
+-spec(table_lookup/2 ::
+ (rabbit_framing:amqp_table(), binary())
+ -> {undefined, undefined} | {rabbit_framing:amqp_field_type(), any()}).
+-spec(table_lookup/3 ::
+ (rabbit_framing:amqp_table(), binary(),
+ rabbit_framing:amqp_field_type())
+ -> undefined | any()).
-spec(r/2 :: (rabbit_types:vhost(), K)
-> rabbit_types:r3(rabbit_types:vhost(), K, '_')
when is_subtype(K, atom())).
@@ -228,6 +236,18 @@ dirty_read(ReadSpec) ->
[] -> {error, not_found}
end.
+table_lookup(Table, Key, Type) ->
+ case lists:keysearch(Key, 1, Table) of
+ {value, {_, Type, ValueBin}} -> ValueBin;
+ _ -> undefined
+ end.
+
+table_lookup(Table, Key) ->
+ case lists:keysearch(Key, 1, Table) of
+ {value, {_, TypeBin, ValueBin}} -> {TypeBin, ValueBin};
+ false -> {undefined, undefined}
+ end.
+
r(#resource{virtual_host = VHostPath}, Kind, Name)
when is_binary(Name) ->
#resource{virtual_host = VHostPath, kind = Kind, name = Name};
@@ -240,9 +260,9 @@ r(VHostPath, Kind) when is_binary(VHostPath) ->
r_arg(#resource{virtual_host = VHostPath}, Kind, Table, Key) ->
r_arg(VHostPath, Kind, Table, Key);
r_arg(VHostPath, Kind, Table, Key) ->
- case lists:keysearch(Key, 1, Table) of
- {value, {_, longstr, NameBin}} -> r(VHostPath, Kind, NameBin);
- false -> undefined
+ case table_lookup(Table, Key, longstr) of
+ undefined -> undefined;
+ NameBin -> r(VHostPath, Kind, NameBin)
end.
rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) ->