diff options
| author | Ask Solem <askh@opera.com> | 2010-07-19 17:09:13 +0100 |
|---|---|---|
| committer | Ask Solem <askh@opera.com> | 2010-07-19 17:09:13 +0100 |
| commit | 771d0b6d33f5adb6ab91ae4a21121e8e992dfc2e (patch) | |
| tree | 318bdbd0a46b183b3d641f25f98755cb4722c89c /src | |
| parent | 89d63aa6763a95e85c5afb8b587c693871fe3afb (diff) | |
| download | rabbitmq-server-git-771d0b6d33f5adb6ab91ae4a21121e8e992dfc2e.tar.gz | |
Initial commit of Ask's work
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 58 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 81 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 26 |
3 files changed, 143 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index e5faef5416..c12c066b8e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -35,7 +35,7 @@ -export([internal_declare/2, internal_delete/1, maybe_run_queue_via_backing_queue/2, update_ram_duration/1, set_ram_duration_target/2, - set_maximum_since_use/2]). + set_maximum_since_use/2, maybe_expire/1]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, @@ -146,6 +146,7 @@ -spec(update_ram_duration/1 :: (pid()) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). +-spec(maybe_expire/1 :: (pid()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> rabbit_types:amqqueue()). @@ -183,6 +184,20 @@ recover_durable_queues(DurableQueues) -> Qs = [start_queue_process(Q) || Q <- DurableQueues], [Q || Q <- Qs, gen_server2:call(Q#amqqueue.pid, {init, true}) == Q]. +handle_error(QueueName, expires_not_of_type_long) -> + rabbit_misc:protocol_error( + precondition_failed, + "~s: Argument x-expires must be of type long.", + [rabbit_misc:rs(QueueName)]); +handle_error(QueueName, expires_zero_or_less) -> + rabbit_misc:protocol_error( + precondition_failed, + "~s: Argument x-expires must be more than zero.", + [rabbit_misc:rs(QueueName)]); +handle_error(QueueName, Error) -> + rabbit_misc:protocol_error( + internal_error, "Queue ~s: ~w", [rabbit_misc:rs(QueueName), Error]). + declare(QueueName, Durable, AutoDelete, Args, Owner) -> Q = start_queue_process(#amqqueue{name = QueueName, durable = Durable, @@ -191,8 +206,9 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> exclusive_owner = Owner, pid = none}), case gen_server2:call(Q#amqqueue.pid, {init, false}) of - not_found -> rabbit_misc:not_found(QueueName); - Q1 -> Q1 + {error, Error} -> handle_error(QueueName, Error); + not_found -> rabbit_misc:not_found(QueueName); + Q1 -> Q1 end. internal_declare(Q = #amqqueue{name = QueueName}, Recover) -> @@ -227,8 +243,10 @@ store_queue(Q = #amqqueue{durable = false}) -> ok. start_queue_process(Q) -> - {ok, Pid} = rabbit_amqqueue_sup:start_child([Q]), - Q#amqqueue{pid = Pid}. + case rabbit_amqqueue_sup:start_child([Q]) of + {ok, Pid} -> Q#amqqueue{pid = Pid}; + {error, Error} -> handle_error(Q#amqqueue.name, Error) + end. add_default_binding(#amqqueue{name = QueueName}) -> Exchange = rabbit_misc:r(QueueName, exchange, <<>>), @@ -251,8 +269,12 @@ with(Name, F) -> with_or_die(Name, F) -> with(Name, F, fun () -> rabbit_misc:not_found(Name) end). -assert_equivalence(#amqqueue{durable = Durable, auto_delete = AutoDelete} = Q, - Durable, AutoDelete, _Args, Owner) -> +assert_equivalence(#amqqueue{name = Name, + durable = Durable, + auto_delete = AutoDelete, + arguments = Args1} = Q, + Durable, AutoDelete, Args, Owner) -> + check_argument_equivalent(Args1, Args, <<"x-expires">>, Name), check_exclusive_access(Q, Owner, strict); assert_equivalence(#amqqueue{name = QueueName}, _Durable, _AutoDelete, _Args, _Owner) -> @@ -276,6 +298,25 @@ with_exclusive_access_or_die(Name, ReaderPid, F) -> with_or_die(Name, fun (Q) -> check_exclusive_access(Q, ReaderPid), F(Q) end). +check_argument_equivalent(Prev, Now, Key, QueueName) -> + {AType, AVal} = rabbit_misc:table_lookup(Now, Key), + {BType, BVal} = rabbit_misc:table_lookup(Prev, Key), + if AType =:= BType + -> ok; + true -> rabbit_misc:protocol_error( + precondition_failed, + "argument types for ~s not equivalent: ~s=~w (was ~w)", + [rabbit_misc:rs(QueueName), Key, AType, BType]) + end, + + if AVal == BVal + -> ok; + true -> rabbit_misc:protocol_error( + precondition_failed, + "arguments for ~s not equivalent: ~s=~w (was ~w)", + [rabbit_misc:rs(QueueName), Key, AVal, BVal]) + end. + list(VHostPath) -> mnesia:dirty_match_object( rabbit_queue, @@ -416,6 +457,9 @@ set_ram_duration_target(QPid, Duration) -> set_maximum_since_use(QPid, Age) -> gen_server2:pcast(QPid, 8, {set_maximum_since_use, Age}). +maybe_expire(QPid) -> + gen_server2:pcast(QPid, 8, maybe_expire). + on_node_down(Node) -> [Hook() || Hook <- rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a2cbcf5517..834176e1fb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -56,9 +56,10 @@ backing_queue_state, active_consumers, blocked_consumers, + expires, sync_timer_ref, - rate_timer_ref - }). + rate_timer_ref, + expiry_timer_ref}). -record(consumer, {tag, ack_required}). @@ -97,12 +98,32 @@ info_keys() -> ?INFO_KEYS. %%---------------------------------------------------------------------------- +-define(EXPIRES_TYPE, long). + +check_argument_expires(?EXPIRES_TYPE, Expires) when not is_integer(Expires) -> + {error, expires_not_of_type_long}; +check_argument_expires(?EXPIRES_TYPE, Expires) when Expires =< 0 -> + {error, expires_zero_or_less}; +check_argument_expires(undefined, undefined) -> + {ok, undefined}; +check_argument_expires(?EXPIRES_TYPE, Expires) -> + {ok, Expires}; +check_argument_expires(_, _) -> + {error, expires_not_of_type_long}. + +init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) -> + {Type, Expires} = rabbit_misc:table_lookup(Arguments, <<"x-expires">>), + case check_argument_expires(Type, Expires) of + {error, Error} -> {error, Error}; + {ok, Expires} -> start_expiry_timer(State, Expires) + end. + init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), process_flag(trap_exit, true), {ok, BQ} = application:get_env(backing_queue_module), - {ok, #q{q = Q#amqqueue{pid = self()}, + State = #q{q = Q#amqqueue{pid = self()}, exclusive_consumer = none, has_had_consumers = false, backing_queue = BQ, @@ -110,8 +131,15 @@ init(Q) -> active_consumers = queue:new(), blocked_consumers = queue:new(), sync_timer_ref = undefined, - rate_timer_ref = undefined}, hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. + rate_timer_ref = undefined, + expiry_timer_ref = undefined}, + + case init_expires(State) of + {error, Error} -> {stop, Error}; + NewState -> {ok, NewState, hibernate, + {backoff, ?HIBERNATE_AFTER_MIN, + ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}} + end. terminate(shutdown, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(BQS) end, State); @@ -218,6 +246,26 @@ stop_rate_timer(State = #q{rate_timer_ref = TRef}) -> {ok, cancel} = timer:cancel(TRef), State#q{rate_timer_ref = undefined}. +stop_expiry_timer(State = #q{expiry_timer_ref = undefined}) -> + State; +stop_expiry_timer(State = #q{expiry_timer_ref = TRef}) -> + {ok, cancel} = timer:cancel(TRef), + State#q{expiry_timer_ref = undefined}. + +start_expiry_timer(State = #q{expires = undefined}) -> + State; +start_expiry_timer(State = #q{expires = Expires}) -> + ?LOGDEBUG("~p: Starting expire timer: ~p~n", [State#q.q, Expires]), + NewState = stop_expiry_timer(State), + {ok, TRef} = timer:apply_after( + Expires, + rabbit_amqqueue, maybe_expire, + [self()]), + NewState#q{expiry_timer_ref = TRef}. + +start_expiry_timer(State, Expires) -> + start_expiry_timer(State#q{expires = Expires}). + assert_invariant(#q{active_consumers = AC, backing_queue = BQ, backing_queue_state = BQS}) -> true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)). @@ -611,7 +659,8 @@ handle_call({basic_get, ChPid, NoAck}, _From, backing_queue_state = BQS, backing_queue = BQ}) -> AckRequired = not NoAck, case BQ:fetch(AckRequired, BQS) of - {empty, BQS1} -> reply(empty, State#q{backing_queue_state = BQS1}); + {empty, BQS1} -> reply(empty, start_expiry_timer( + State#q{backing_queue_state = BQS1})); {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> case AckRequired of true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), @@ -620,7 +669,8 @@ handle_call({basic_get, ChPid, NoAck}, _From, false -> ok end, Msg = {QName, self(), AckTag, IsDelivered, Message}, - reply({ok, Remaining, Msg}, State#q{backing_queue_state = BQS1}) + reply({ok, Remaining, Msg}, + start_expiry_timer(State#q{backing_queue_state = BQS1})) end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, @@ -660,7 +710,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, ChPid, Consumer, State1#q.active_consumers)}) end, - reply(ok, State2) + reply(ok, start_expiry_timer(State2)) end; handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, @@ -687,7 +737,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, ChPid, ConsumerTag, State#q.blocked_consumers)}, case should_auto_delete(NewState) of - false -> reply(ok, NewState); + false -> reply(ok, start_expiry_timer(NewState)); true -> {stop, normal, ok, NewState} end end; @@ -725,7 +775,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> C = #cr{acktags = ChAckTags} -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), store_ch_record(C#cr{acktags = ChAckTags1}), - noreply(requeue_and_run(AckTags, State)) + noreply(start_expiry_timer(requeue_and_run(AckTags, State))) end; handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> @@ -749,7 +799,7 @@ handle_cast({ack, Txn, AckTags, ChPid}, _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)} end, store_ch_record(C1), - noreply(State #q { backing_queue_state = BQS1 }) + noreply(start_expiry_timer(State#q{backing_queue_state = BQS1})) end; handle_cast({rollback, Txn, ChPid}, State) -> @@ -803,7 +853,14 @@ handle_cast({set_ram_duration_target, Duration}, handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), - noreply(State). + noreply(State); + +handle_cast(maybe_expire, State) -> + case is_unused(State) of + true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]), + {stop, normal, State}; + false -> noreply(start_expiry_timer(State)) + end. handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State = #q{q = #amqqueue{exclusive_owner = DownPid}}) -> diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index fcc9fc7e54..a0061b1201 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -41,6 +41,7 @@ -export([not_found/1]). -export([get_config/1, get_config/2, set_config/2]). -export([dirty_read/1]). +-export([table_lookup/2, table_lookup/3]). -export([r/3, r/2, r_arg/4, rs/1]). -export([enable_cover/0, report_cover/0]). -export([enable_cover/1, report_cover/1]). @@ -103,6 +104,13 @@ -spec(set_config/2 :: (atom(), any()) -> 'ok'). -spec(dirty_read/1 :: ({atom(), any()}) -> rabbit_types:ok_or_error2(any(), 'not_found')). +-spec(table_lookup/2 :: + (rabbit_framing:amqp_table(), binary()) + -> {undefined, undefined} | {rabbit_framing:amqp_field_type(), any()}). +-spec(table_lookup/3 :: + (rabbit_framing:amqp_table(), binary(), + rabbit_framing:amqp_field_type()) + -> undefined | any()). -spec(r/2 :: (rabbit_types:vhost(), K) -> rabbit_types:r3(rabbit_types:vhost(), K, '_') when is_subtype(K, atom())). @@ -228,6 +236,18 @@ dirty_read(ReadSpec) -> [] -> {error, not_found} end. +table_lookup(Table, Key, Type) -> + case lists:keysearch(Key, 1, Table) of + {value, {_, Type, ValueBin}} -> ValueBin; + _ -> undefined + end. + +table_lookup(Table, Key) -> + case lists:keysearch(Key, 1, Table) of + {value, {_, TypeBin, ValueBin}} -> {TypeBin, ValueBin}; + false -> {undefined, undefined} + end. + r(#resource{virtual_host = VHostPath}, Kind, Name) when is_binary(Name) -> #resource{virtual_host = VHostPath, kind = Kind, name = Name}; @@ -240,9 +260,9 @@ r(VHostPath, Kind) when is_binary(VHostPath) -> r_arg(#resource{virtual_host = VHostPath}, Kind, Table, Key) -> r_arg(VHostPath, Kind, Table, Key); r_arg(VHostPath, Kind, Table, Key) -> - case lists:keysearch(Key, 1, Table) of - {value, {_, longstr, NameBin}} -> r(VHostPath, Kind, NameBin); - false -> undefined + case table_lookup(Table, Key, longstr) of + undefined -> undefined; + NameBin -> r(VHostPath, Kind, NameBin) end. rs(#resource{virtual_host = VHostPath, kind = Kind, name = Name}) -> |
