diff options
| author | Rob Harrop <rob@rabbitmq.com> | 2011-06-13 10:32:45 +0100 |
|---|---|---|
| committer | Rob Harrop <rob@rabbitmq.com> | 2011-06-13 10:32:45 +0100 |
| commit | 44d75673aa6c6c8c429072124bf4e391b9c93bd7 (patch) | |
| tree | 0e111b2ac8960710a320a84ab9b66abb9f5d5ef0 | |
| parent | 6e508e646db617d2a98fa650cb0fd1c4877f4f68 (diff) | |
| parent | cc4bf8586cfa9d0771ad5c51556f8de972d76782 (diff) | |
| download | rabbitmq-server-git-44d75673aa6c6c8c429072124bf4e391b9c93bd7.tar.gz | |
Merge with default
| -rw-r--r-- | src/rabbit_amqqueue.erl | 31 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 126 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 46 |
5 files changed, 165 insertions, 46 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c870374084..619ee64170 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -298,28 +298,43 @@ assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, rabbit_misc:assert_args_equivalence(Args, RequiredArgs, QueueName, [<<"x-expires">>, <<"x-message-ttl">>]). -check_declare_arguments(QueueName, Args) -> - [case Fun(rabbit_misc:table_lookup(Args, Key)) of +check_declare_arguments(QueueName = #resource{virtual_host = VHostPath}, + Args) -> + [case Fun(rabbit_misc:table_lookup(Args, Key), VHostPath) of ok -> ok; {error, Error} -> rabbit_misc:protocol_error( precondition_failed, "invalid arg '~s' for ~s: ~w", [Key, rabbit_misc:rs(QueueName), Error]) - end || {Key, Fun} <- - [{<<"x-expires">>, fun check_integer_argument/1}, - {<<"x-message-ttl">>, fun check_integer_argument/1}]], + end || + {Key, Fun} <- + [{<<"x-expires">>, fun check_integer_argument/2}, + {<<"x-message-ttl">>, fun check_integer_argument/2}, + {<<"x-dead-letter-exchange">>, fun check_exchange_argument/2}]], ok. -check_integer_argument(undefined) -> +check_integer_argument(undefined, _VHostPath) -> ok; -check_integer_argument({Type, Val}) when Val > 0 -> +check_integer_argument({Type, Val}, _VHostPath) when Val > 0 -> case lists:member(Type, ?INTEGER_ARG_TYPES) of true -> ok; false -> {error, {unacceptable_type, Type}} end; -check_integer_argument({_Type, Val}) -> +check_integer_argument({_Type, Val}, _VHostPath) -> {error, {value_zero_or_less, Val}}. +check_exchange_argument(undefined, _VHostPath) -> + ok; +check_exchange_argument({longstr, Val}, VHostPath) -> + case rabbit_exchange:lookup(rabbit_misc:r(VHostPath, exchange, Val)) of + {ok, _Exchange} -> ok; + {error, not_found} -> {error, {non_existent_exchange, Val}} + end; +check_exchange_argument({Type, _Val}, _VHostPath) -> + {error, {unacceptable_type, Type}}. + + + list(VHostPath) -> mnesia:dirty_match_object( rabbit_queue, diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 1e5ad3490c..1c3277d6e4 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -48,7 +48,8 @@ stats_timer, msg_id_to_channel, ttl, - ttl_timer_ref + ttl_timer_ref, + dead_letter_exchange }). -record(consumer, {tag, ack_required}). @@ -98,20 +99,21 @@ init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), process_flag(trap_exit, true), - {ok, #q{q = Q#amqqueue{pid = self()}, - exclusive_consumer = none, - has_had_consumers = false, - backing_queue = backing_queue_module(Q), - backing_queue_state = undefined, - active_consumers = queue:new(), - blocked_consumers = queue:new(), - expires = undefined, - sync_timer_ref = undefined, - rate_timer_ref = undefined, - expiry_timer_ref = undefined, - ttl = undefined, - stats_timer = rabbit_event:init_stats_timer(), - msg_id_to_channel = dict:new()}, hibernate, + {ok, #q{q = Q#amqqueue{pid = self()}, + exclusive_consumer = none, + has_had_consumers = false, + backing_queue = backing_queue_module(Q), + backing_queue_state = undefined, + active_consumers = queue:new(), + blocked_consumers = queue:new(), + expires = undefined, + sync_timer_ref = undefined, + rate_timer_ref = undefined, + expiry_timer_ref = undefined, + ttl = undefined, + dead_letter_exchange = undefined, + stats_timer = rabbit_event:init_stats_timer(), + msg_id_to_channel = dict:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(shutdown = R, State = #q{backing_queue = BQ}) -> @@ -119,16 +121,18 @@ terminate(shutdown = R, State = #q{backing_queue = BQ}) -> terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); terminate(Reason, State = #q{backing_queue = BQ}) -> + State1 = maybe_dead_letter_queue(queue_deleted, State), %% FIXME: How do we cancel active subscriptions? terminate_shutdown(fun (BQS) -> + rabbit_event:notify( queue_deleted, [{pid, self()}]), BQS1 = BQ:delete_and_terminate(Reason, BQS), %% don't care if the internal delete %% doesn't return 'ok'. - rabbit_amqqueue:internal_delete(qname(State)), + rabbit_amqqueue:internal_delete(qname(State1)), BQS1 - end, State). + end, State1). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -178,12 +182,19 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> undefined -> State1 end end, State, [{<<"x-expires">>, fun init_expires/2}, - {<<"x-message-ttl">>, fun init_ttl/2}]). + {<<"x-message-ttl">>, fun init_ttl/2}, + {<<"x-dead-letter-exchange">>, + fun init_dead_letter_exchange/2}]). init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}). +init_dead_letter_exchange(DLE, State = #q{q = #amqqueue{ + name = #resource{ + virtual_host = VHostPath}}}) -> + State#q{dead_letter_exchange = rabbit_misc:r(VHostPath, exchange, DLE)}. + terminate_shutdown(Fun, State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = stop_sync_timer(stop_rate_timer(State)), @@ -716,6 +727,7 @@ drop_expired_messages(State = #q{backing_queue_state = BQS, Now = now_micros(), BQS1 = BQ:dropwhile( fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, + dead_letter_callback_fun(expired, State), BQS), ensure_ttl_timer(State#q{backing_queue_state = BQS1}). @@ -733,6 +745,69 @@ ensure_ttl_timer(State = #q{backing_queue = BQ, ensure_ttl_timer(State) -> State. +dead_letter_callback_fun(_Reason, #q{dead_letter_exchange = undefined}) -> + fun(_MsgFun, LookupState) -> LookupState end; +dead_letter_callback_fun(Reason, State) -> + fun(MsgFun, LookupState) -> + {Msg, LookupState1} = MsgFun(LookupState), + dead_letter_msg(Msg, Reason, State), + LookupState1 + end. + +maybe_dead_letter_queue(_Reason, State = #q{ + dead_letter_exchange = undefined}) -> + State; +maybe_dead_letter_queue(Reason, State = #q{ + backing_queue_state = BQS, + backing_queue = BQ}) -> + case BQ:fetch(false, BQS) of + {empty, BQS1} -> + State#q{backing_queue_state = BQS1}; + {{Msg, _IsDelivered, _AckTag, _Remaining}, BQS1} -> + dead_letter_msg(Msg, Reason, State), + maybe_dead_letter_queue(Reason, State#q{backing_queue_state = BQS1}) + end. + +dead_letter_msg(Msg, Reason, State = #q{dead_letter_exchange = DLE}) -> + %% Should this be lookup_or_die? Do we really want to stop the + %% message from being discarded if the exchange is not there? + Exchange = rabbit_exchange:lookup_or_die(DLE), + + %% Should do something with the routing result here, but what? + %% Are we going to stop the message from being discarded if + %% unroutable? At the least we should write to the error log if + %% the routing fails. + rabbit_exchange:publish( + Exchange, + rabbit_basic:delivery(false, false, none, + make_dead_letter_msg(Reason, Msg, State), + undefined)), + ok. + +make_dead_letter_msg(Reason, + Msg = #basic_message{ + content = Content = #content{ + properties = Props = #'P_basic'{ + headers = Headers}}}, + State) -> + + #resource{name = QName} = qname(State), + + DeathHeaders = [{<<"x-death-reason">>, longstr, + list_to_binary(atom_to_list(Reason))}, + {<<"x-death-queue">>, longstr, QName}], + + Headers1 = case Headers of + undefined -> DeathHeaders; + _ -> Headers ++ DeathHeaders + end, + Content1 = + rabbit_binary_generator:clear_encoded_content( + Content#content{properties = Props#'P_basic'{headers = Headers1}}), + + Msg#basic_message{id = rabbit_guid:guid(), content = Content1}. + + now_micros() -> timer:now_diff(now(), {0,0,0}). infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. @@ -1024,10 +1099,11 @@ handle_call({delete, IfUnused, IfEmpty}, _From, {stop, normal, {ok, BQ:len(BQS)}, State} end; -handle_call(purge, _From, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> +handle_call(purge, _From, State = #q{backing_queue = BQ}) -> + State1 = #q{backing_queue_state = BQS} = + maybe_dead_letter_queue(queue_purged, State), {Count, BQS1} = BQ:purge(BQS), - reply({ok, Count}, State#q{backing_queue_state = BQS1}); + reply({ok, Count}, State1#q{backing_queue_state = BQS1}); handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), @@ -1064,7 +1140,9 @@ handle_cast({ack, Txn, AckTags, ChPid}, case Txn of none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), NewC = C#cr{acktags = ChAckTags1}, - {_Guids, BQS1} = BQ:ack(AckTags, BQS), + {_Guids, BQS1} = BQ:ack(AckTags, + fun(_, BQS0) -> BQS0 end, + BQS), {NewC, State#q{backing_queue_state = BQS1}}; _ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS), {C#cr{txn = Txn}, @@ -1085,7 +1163,9 @@ handle_cast({reject, AckTags, Requeue, ChPid}, maybe_store_ch_record(C#cr{acktags = ChAckTags1}), noreply(case Requeue of true -> requeue_and_run(AckTags, State); - false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS), + false -> Fun = dead_letter_callback_fun( + rejected, State), + {_Guids, BQS1} = BQ:ack(AckTags, Fun, BQS), State#q{backing_queue_state = BQS1} end) end; diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 217ad3eb5b..3d7fb4895d 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -98,14 +98,14 @@ behaviour_info(callbacks) -> %% Drop messages from the head of the queue while the supplied %% predicate returns true. - {dropwhile, 2}, + {dropwhile, 3}, %% Produce the next message. {fetch, 2}, %% Acktags supplied are for messages which can now be forgotten %% about. Must return 1 msg_id per Ack, in the same order as Acks. - {ack, 2}, + {ack, 3}, %% A publish, but in the context of a transaction. {tx_publish, 5}, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 3f4aa54e7f..41053aeee9 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2175,7 +2175,9 @@ test_dropwhile(VQ0) -> VQ2 = rabbit_variable_queue:dropwhile( fun(#message_properties { expiry = Expiry }) -> Expiry =< 5 - end, VQ1), + end, + fun(_Msg) -> ok end, + VQ1), %% fetch five now VQ3 = lists:foldl(fun (_N, VQN) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index a167cca0c5..f75095346f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -18,8 +18,8 @@ -export([init/4, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, drain_confirmed/1, - fetch/2, ack/2, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, - requeue/3, len/1, is_empty/1, dropwhile/2, + fetch/2, ack/3, tx_publish/5, tx_ack/3, tx_rollback/2, tx_commit/4, + requeue/3, len/1, is_empty/1, dropwhile/3, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/3, discard/3, @@ -559,18 +559,22 @@ publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, drain_confirmed(State = #vqstate { confirmed = C }) -> {gb_sets:to_list(C), State #vqstate { confirmed = gb_sets:new() }}. -dropwhile(Pred, State) -> - {_OkOrEmpty, State1} = dropwhile1(Pred, State), +dropwhile(Pred, DropFun, State) -> + {_OkOrEmpty, State1} = dropwhile1(Pred, DropFun, State), a(State1). -dropwhile1(Pred, State) -> +dropwhile1(Pred, DropFun, State) -> internal_queue_out( fun(MsgStatus = #msg_status { msg_props = MsgProps }, State1) -> case Pred(MsgProps) of - true -> {_, State2} = internal_fetch(false, MsgStatus, - State1), - dropwhile1(Pred, State2); - false -> {ok, in_r(MsgStatus, State1)} + true -> + {MsgStatus1, State2} = + DropFun(read_msg_callback(), {MsgStatus, State1}), + + {_, State3} = internal_fetch(false, MsgStatus1, State2), + dropwhile1(Pred, DropFun, State3); + false -> + {ok, in_r(MsgStatus, State1)} end end, State). @@ -592,6 +596,7 @@ fetch(AckRequired, State) -> internal_fetch(AckRequired, MsgStatus1, State2) end, State). + internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) -> case queue:out(Q4) of {empty, _Q4} -> @@ -603,6 +608,19 @@ internal_queue_out(Fun, State = #vqstate { q4 = Q4 }) -> Fun(MsgStatus, State #vqstate { q4 = Q4a }) end. +read_msg_callback() -> + fun({MsgStatus = #msg_status {}, State}) -> + {MsgStatus1 = #msg_status { msg = Msg }, State1} = + read_msg(MsgStatus, State), + {Msg, {MsgStatus1, State1}}; + ({{IsPersistent, MsgId, _MsgProps}, State}) -> + #vqstate { msg_store_clients = MSCState } = State, + {{ok, Msg = #basic_message{}}, MSCState1} = + msg_store_read(MSCState, IsPersistent, MsgId), + {Msg, {undefined, State #vqstate { + msg_store_clients = MSCState1 }}} + end. + read_msg(MsgStatus = #msg_status { msg = undefined, msg_id = MsgId, is_persistent = IsPersistent }, @@ -668,9 +686,13 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { len = Len1, persistent_count = PCount1 })}. -ack(AckTags, State) -> +ack(AckTags, Fun, State) -> {MsgIds, State1} = ack(fun msg_store_remove/3, - fun (_, State0) -> State0 end, + fun (MsgStatus = #msg_status {}, State0) -> + {_, State2} = Fun(read_msg_callback(), + {MsgStatus, State0}), + State2 + end, AckTags, State), {MsgIds, a(State1)}. @@ -1207,7 +1229,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync { Acks = lists:append(SAcks), Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs), {Msg, MsgProps} <- lists:reverse(PubsN)], - {_MsgIds, State1} = ack(Acks, State), + {_MsgIds, State1} = ack(Acks, fun(_, State0) -> State0 end, State), {SeqIds, State2 = #vqstate { index_state = IndexState }} = lists:foldl( fun ({Msg = #basic_message { is_persistent = IsPersistent }, |
