diff options
| author | Rob Harrop <rob@rabbitmq.com> | 2011-06-07 13:55:05 +0100 |
|---|---|---|
| committer | Rob Harrop <rob@rabbitmq.com> | 2011-06-07 13:55:05 +0100 |
| commit | 9d36f516d5eeaa303619b45159473fabb3015e2e (patch) | |
| tree | 757cb911bc6fcfa5780552d7dce5573f2667fd95 | |
| parent | febeb8df17ac5a0b024bce2c4f73c1a5b269d0da (diff) | |
| parent | 39a0d03500351179d771ad80eec8a697ba3ae54a (diff) | |
| download | rabbitmq-server-git-9d36f516d5eeaa303619b45159473fabb3015e2e.tar.gz | |
Merge with default and rough outline of DL for ttl expiry
| -rw-r--r-- | src/rabbit_amqqueue.erl | 31 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 77 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 17 |
5 files changed, 98 insertions, 33 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c870374084..619ee64170 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -298,28 +298,43 @@ assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName, [<<"x-expires">>, <<"x-message-ttl">>]). -check_declare_arguments(QueueName, Args) -> - [case Fun(rabbit_misc:table_lookup(Args, Key)) of +check_declare_arguments(QueueName = #resource{virtual_host = VHostPath}, + Args) -> + [case Fun(rabbit_misc:table_lookup(Args, Key), VHostPath) of ok -> ok; {error, Error} -> rabbit_misc:protocol_error( precondition_failed, "invalid arg '~s' for ~s: ~w", [Key, rabbit_misc:rs(QueueName), Error]) - end || {Key, Fun} <- - [{<<"x-expires">>, fun check_integer_argument/1}, - {<<"x-message-ttl">>, fun check_integer_argument/1}]], + end || + {Key, Fun} <- + [{<<"x-expires">>, fun check_integer_argument/2}, + {<<"x-message-ttl">>, fun check_integer_argument/2}, + {<<"x-dead-letter-exchange">>, fun check_exchange_argument/2}]], ok. -check_integer_argument(undefined) -> +check_integer_argument(undefined, _VHostPath) -> ok; -check_integer_argument({Type, Val}) when Val > 0 -> +check_integer_argument({Type, Val}, _VHostPath) when Val > 0 -> case lists:member(Type, ?INTEGER_ARG_TYPES) of true -> ok; false -> {error, {unacceptable_type, Type}} end; -check_integer_argument({_Type, Val}) -> +check_integer_argument({_Type, Val}, _VHostPath) -> {error, {value_zero_or_less, Val}}. +check_exchange_argument(undefined, _VHostPath) -> + ok; +check_exchange_argument({longstr, Val}, VHostPath) -> + case rabbit_exchange:lookup(rabbit_misc:r(VHostPath, exchange, Val)) of + {ok, _Exchange} -> ok; + {error, not_found} -> {error, {non_existent_exchange, Val}} + end; +check_exchange_argument({Type, _Val}, _VHostPath) -> + {error, {unacceptable_type, Type}}. + + + list(VHostPath) -> mnesia:dirty_match_object( rabbit_queue, diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 07a24af828..074768f409 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -48,7 +48,8 @@ stats_timer, msg_id_to_channel, ttl, - ttl_timer_ref + ttl_timer_ref, + dead_letter_exchange }). -record(consumer, {tag, ack_required}). @@ -98,20 +99,21 @@ init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), process_flag(trap_exit, true), - {ok, #q{q = Q#amqqueue{pid = self()}, - exclusive_consumer = none, - has_had_consumers = false, - backing_queue = backing_queue_module(Q), - backing_queue_state = undefined, - active_consumers = queue:new(), - blocked_consumers = queue:new(), - expires = undefined, - sync_timer_ref = undefined, - rate_timer_ref = undefined, - expiry_timer_ref = undefined, - ttl = undefined, - stats_timer = rabbit_event:init_stats_timer(), - msg_id_to_channel = dict:new()}, hibernate, + {ok, #q{q = Q#amqqueue{pid = self()}, + exclusive_consumer = none, + has_had_consumers = false, + backing_queue = backing_queue_module(Q), + backing_queue_state = undefined, + active_consumers = queue:new(), + blocked_consumers = queue:new(), + expires = undefined, + sync_timer_ref = undefined, + rate_timer_ref = undefined, + expiry_timer_ref = undefined, + ttl = undefined, + dead_letter_exchange = undefined, + stats_timer = rabbit_event:init_stats_timer(), + msg_id_to_channel = dict:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(shutdown = R, State = #q{backing_queue = BQ}) -> @@ -178,12 +180,19 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> undefined -> State1 end end, State, [{<<"x-expires">>, fun init_expires/2}, - {<<"x-message-ttl">>, fun init_ttl/2}]). + {<<"x-message-ttl">>, fun init_ttl/2}, + {<<"x-dead-letter-exchange">>, + fun init_dead_letter_exchange/2}]). init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}). +init_dead_letter_exchange(DLE, State = #q{q = #amqqueue{ + name = #resource{ + virtual_host = VHostPath}}}) -> + State#q{dead_letter_exchange = rabbit_misc:r(VHostPath, exchange, DLE)}. + terminate_shutdown(Fun, State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = stop_sync_timer(stop_rate_timer(State)), @@ -718,6 +727,7 @@ drop_expired_messages(State = #q{backing_queue_state = BQS, Now = now_micros(), BQS1 = BQ:dropwhile( fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, + fun (Msg) -> maybe_dead_letter(Msg, expired_queue_ttl, State) end, BQS), ensure_ttl_timer(State#q{backing_queue_state = BQS1}). @@ -735,6 +745,41 @@ ensure_ttl_timer(State = #q{backing_queue = BQ, ensure_ttl_timer(State) -> State. +maybe_dead_letter(Msg, _Reason, #q{dead_letter_exchange = undefined}) -> + ok; +maybe_dead_letter(Msg = #basic_message{content = Content}, + Reason, #q{dead_letter_exchange = DLE}) -> + %% Should this be lookup_or_die? Do we really want to stop the + %% message from being discarded if the exchange is not there? + Exchange = rabbit_exchange:lookup_or_die(DLE), + + %% Should do something with the routing result here, but what? + %% Are we going to stop the message from being discarded if + %% unroutable? At the least we should write to the error log if + %% the routing fails. + rabbit_exchange:publish( + Exchange, + rabbit_basic:delivery(false, false, none, + record_death_reason(Reason, Msg), undefined)), + ok. + +record_death_reason(Reason, + Msg = #basic_message{ + content = Content = #content{ + properties = Props = #'P_basic'{ + headers = Headers}}}) -> + ReasonTuple = {<<"x-death-reason">>, longstr, + list_to_binary(atom_to_list(Reason))}, + Headers1 = case Headers of + undefined -> [ReasonTuple]; + _ -> [ReasonTuple | Headers] + end, + Msg#basic_message{ + content = Content#content{ + properties = Props#'P_basic'{ + headers = Headers1}}}. + + now_micros() -> timer:now_diff(now(), {0,0,0}). infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 217ad3eb5b..96aeb4cad8 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -98,7 +98,7 @@ behaviour_info(callbacks) -> %% Drop messages from the head of the queue while the supplied %% predicate returns true. - {dropwhile, 2}, + {dropwhile, 3}, %% Produce the next message. {fetch, 2}, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 3f4aa54e7f..41053aeee9 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2175,7 +2175,9 @@ test_dropwhile(VQ0) -> VQ2 = rabbit_variable_queue:dropwhile( fun(#message_properties { expiry = Expiry }) -> Expiry =< 5 - end, VQ1), + end, + fun(_Msg) -> ok end, + VQ1), %% fetch five now VQ3 = lists:foldl(fun (_N, VQN) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index a167cca0c5..c777ad4d02 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -19,7 +19,7 @@ -export([init/4, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, drain_confirmed/1, fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, - requeue/3, len/1, is_empty/1, dropwhile/2, + requeue/3, len/1, is_empty/1, dropwhile/3, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/3, discard/3, @@ -559,17 +559,19 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, drain_confirmed(State = #vqstate { confirmed = C }) -> {gb_sets:to_list(C), State #vqstate { confirmed = gb_sets:new() }}. -dropwhile(Pred, State) -> - {_OkOrEmpty, State1} = dropwhile1(Pred, State), +dropwhile(Pred, DropFun, State) -> + {_OkOrEmpty, State1} = dropwhile1(Pred, DropFun, State), a(State1). -dropwhile1(Pred, State) -> +dropwhile1(Pred, DropFun, State) -> internal_queue_out( - fun(MsgStatus = #msg_status { msg_props = MsgProps }, State1) -> + fun(MsgStatus = #msg_status { msg_props = MsgProps, + msg = Msg }, State1) -> case Pred(MsgProps) of - true -> {_, State2} = internal_fetch(false, MsgStatus, + true -> DropFun(Msg), + {_, State2} = internal_fetch(false, MsgStatus, State1), - dropwhile1(Pred, State2); + dropwhile1(Pred, DropFun, State2); false -> {ok, in_r(MsgStatus, State1)} end end, State). @@ -592,6 +594,7 @@ fetch(AckRequired, State) -> internal_fetch(AckRequired, MsgStatus1, State2) end, State). + internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) -> case queue:out(Q4) of {empty, _Q4} -> |
