diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2011-10-20 17:01:50 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2011-10-20 17:01:50 +0100 |
| commit | 61f48f0867ca77de86a224db3ba1855497dc722b (patch) | |
| tree | b3ca7bb160b878065df7ae8b5a6c81698dfc7d23 /src | |
| parent | e8dfee98ae3033453a4bb83baa464f1e822fdded (diff) | |
| parent | 0128bf7cce3568e00753f42a516646eb04399c2d (diff) | |
| download | rabbitmq-server-git-61f48f0867ca77de86a224db3ba1855497dc722b.tar.gz | |
Crude merge-from-default to help examine this thing. Currently some tests fail due to q_p blowing up with failed assertions, I don't know if it was like that before.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 43 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 89 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 41 |
7 files changed, 181 insertions, 56 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index b3e92b6918..108e57082b 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -316,34 +316,49 @@ assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, Args, RequiredArgs, QueueName, [<<"x-expires">>, <<"x-message-ttl">>, <<"x-ha-policy">>]). -check_declare_arguments(QueueName, Args) -> - [case Fun(rabbit_misc:table_lookup(Args, Key), Args) of +check_declare_arguments(QueueName = #resource{virtual_host = VHostPath}, + Args) -> + [case Fun(rabbit_misc:table_lookup(Args, Key), Args, VHostPath) of ok -> ok; {error, Error} -> rabbit_misc:protocol_error( precondition_failed, "invalid arg '~s' for ~s: ~255p", [Key, rabbit_misc:rs(QueueName), Error]) - end || {Key, Fun} <- - [{<<"x-expires">>, fun check_integer_argument/2}, - {<<"x-message-ttl">>, fun check_integer_argument/2}, - {<<"x-ha-policy">>, fun check_ha_policy_argument/2}]], + end || + {Key, Fun} <- + [{<<"x-expires">>, fun check_integer_argument/3}, + {<<"x-message-ttl">>, fun check_integer_argument/3}, + {<<"x-ha-policy">>, fun check_ha_policy_argument/3}, + {<<"x-dead-letter-exchange">>, fun check_exchange_argument/3}]], ok. -check_integer_argument(undefined, _Args) -> +check_integer_argument(undefined, _Args, _VHostPath) -> ok; -check_integer_argument({Type, Val}, _Args) when Val > 0 -> +check_integer_argument({Type, Val}, _Args, _VHostPath) when Val > 0 -> case lists:member(Type, ?INTEGER_ARG_TYPES) of true -> ok; false -> {error, {unacceptable_type, Type}} end; -check_integer_argument({_Type, Val}, _Args) -> +check_integer_argument({_Type, Val}, _Args, _VHostPath) -> {error, {value_zero_or_less, Val}}. -check_ha_policy_argument(undefined, _Args) -> +check_exchange_argument(undefined, _Args, _VHostPath) -> ok; -check_ha_policy_argument({longstr, <<"all">>}, _Args) -> +check_exchange_argument({longstr, Val}, _Args, VHostPath) -> + case rabbit_exchange:lookup(rabbit_misc:r(VHostPath, exchange, Val)) of + {ok, _Exchange} -> ok; + {error, not_found} -> {error, {non_existent_exchange, Val}} + end; +check_exchange_argument({Type, _Val}, _Args, _VHostPath) -> + {error, {unacceptable_type, Type}}. + + + +check_ha_policy_argument(undefined, _Args, _VHostPath) -> + ok; +check_ha_policy_argument({longstr, <<"all">>}, _Args, _VHostPath) -> ok; -check_ha_policy_argument({longstr, <<"nodes">>}, Args) -> +check_ha_policy_argument({longstr, <<"nodes">>}, Args, _VHostPath) -> case rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>) of undefined -> {error, {require, 'x-ha-policy-params'}}; @@ -359,9 +374,9 @@ check_ha_policy_argument({longstr, <<"nodes">>}, Args) -> {Type, _} -> {error, {ha_nodes_policy_params_not_array_of_longstr, Type}} end; -check_ha_policy_argument({longstr, Policy}, _Args) -> +check_ha_policy_argument({longstr, Policy}, _Args, _VHostPath) -> {error, {invalid_ha_policy, Policy}}; -check_ha_policy_argument({Type, _}, _Args) -> +check_ha_policy_argument({Type, _}, _Args, _VHostPath) -> {error, {unacceptable_type, Type}}. list() -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 46f6674b04..8b5e984a12 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -49,7 +49,8 @@ stats_timer, msg_id_to_channel, ttl, - ttl_timer_ref + ttl_timer_ref, + dlx }). -record(consumer, {tag, ack_required}). @@ -129,6 +130,7 @@ init(Q) -> rate_timer_ref = undefined, expiry_timer_ref = undefined, ttl = undefined, + dlx = undefined, msg_id_to_channel = dict:new()}, {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -165,17 +167,19 @@ terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); terminate(Reason, State = #q{q = #amqqueue{name = QName}, backing_queue = BQ}) -> + State1 = maybe_dead_letter_queue(queue_deleted, State), %% FIXME: How do we cancel active subscriptions? terminate_shutdown(fun (BQS) -> + rabbit_event:notify( queue_deleted, [{pid, self()}, {name, QName}]), BQS1 = BQ:delete_and_terminate(Reason, BQS), %% don't care if the internal delete %% doesn't return 'ok'. - rabbit_amqqueue:internal_delete(qname(State)), + rabbit_amqqueue:internal_delete(qname(State1)), BQS1 - end, State). + end, State1). code_change(_OldVsn, State, _Extra) -> {ok, State}. @@ -218,12 +222,18 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> undefined -> State1 end end, State, [{<<"x-expires">>, fun init_expires/2}, - {<<"x-message-ttl">>, fun init_ttl/2}]). + {<<"x-message-ttl">>, fun init_ttl/2}, + {<<"x-dead-letter-exchange">>, + fun init_dlx/2}]). init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}). +init_dlx(DLX, State = #q{q = #amqqueue{name = #resource{ + virtual_host = VHostPath}}}) -> + State#q{dlx = rabbit_misc:r(VHostPath, exchange, DLX)}. + terminate_shutdown(Fun, State) -> State1 = #q{backing_queue_state = BQS} = stop_sync_timer(stop_rate_timer(State)), @@ -688,6 +698,7 @@ drop_expired_messages(State = #q{backing_queue_state = BQS, Now = now_micros(), BQS1 = BQ:dropwhile( fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, + dead_letter_callback_fun(expired, State), BQS), ensure_ttl_timer(State#q{backing_queue_state = BQS1}). @@ -704,6 +715,62 @@ ensure_ttl_timer(State = #q{backing_queue = BQ, ensure_ttl_timer(State) -> State. +dead_letter_callback_fun(_Reason, #q{dlx = undefined}) -> + fun(_MsgFun, BQS) -> BQS end; +dead_letter_callback_fun(Reason, State) -> + fun(MsgFun, BQS) -> + {Msg, BQS1} = MsgFun(BQS), + dead_letter_msg(Msg, Reason, State), + BQS1 + end. + +maybe_dead_letter_queue(_Reason, State = #q{dlx = undefined}) -> + State; +maybe_dead_letter_queue(Reason, State = #q{ + backing_queue_state = BQS, + backing_queue = BQ}) -> + case BQ:fetch(false, BQS) of + {empty, BQS1} -> + State#q{backing_queue_state = BQS1}; + {{Msg, _IsDelivered, _AckTag, _Remaining}, BQS1} -> + dead_letter_msg(Msg, Reason, State), + maybe_dead_letter_queue(Reason, State#q{backing_queue_state = BQS1}) + end. + +dead_letter_msg(Msg, Reason, State = #q{dlx = DLX}) -> + Exchange = rabbit_exchange:lookup_or_die(DLX), + + rabbit_basic:publish( + rabbit_basic:delivery( + false, false, make_dead_letter_msg(DLX, Reason, Msg, State), + undefined)), + ok. + +make_dead_letter_msg(DLX, Reason, Msg = #basic_message{content = Content}, + State) -> + + Content1 = #content{ + properties = Props = #'P_basic'{headers = Headers}} = + rabbit_binary_parser:ensure_content_decoded(Content), + + #resource{name = QName} = qname(State), + + DeathHeaders = [{<<"x-death-reason">>, longstr, + list_to_binary(atom_to_list(Reason))}, + {<<"x-death-queue">>, longstr, QName}], + + Headers1 = case Headers of + undefined -> DeathHeaders; + _ -> Headers ++ DeathHeaders + end, + Content2 = + rabbit_binary_generator:clear_encoded_content( + Content1#content{properties = Props#'P_basic'{headers = Headers1}}), + + Msg#basic_message{exchange_name = DLX, id = rabbit_guid:guid(), + content = Content2}. + + now_micros() -> timer:now_diff(now(), {0,0,0}). infos(Items, State) -> @@ -1014,10 +1081,11 @@ handle_call({delete, IfUnused, IfEmpty}, _From, {stop, normal, {ok, BQ:len(BQS)}, State} end; -handle_call(purge, _From, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> +handle_call(purge, _From, State = #q{backing_queue = BQ}) -> + State1 = #q{backing_queue_state = BQS} = + maybe_dead_letter_queue(queue_purged, State), {Count, BQS1} = BQ:purge(BQS), - reply({ok, Count}, State#q{backing_queue_state = BQS1}); + reply({ok, Count}, State1#q{backing_queue_state = BQS1}); handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), @@ -1037,7 +1105,8 @@ handle_cast({ack, AckTags, ChPid}, State) -> ChPid, AckTags, State, fun (State1 = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - {_Guids, BQS1} = BQ:ack(AckTags, BQS), + Fun = fun(_, BQS0) -> BQS0 end, + {_Guids, BQS1} = BQ:ack(AckTags, Fun, BQS), State1#q{backing_queue_state = BQS1} end)); @@ -1048,7 +1117,9 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) -> backing_queue_state = BQS}) -> case Requeue of true -> requeue_and_run(AckTags, State1); - false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS), + false -> Fun = dead_letter_callback_fun(rejected, + State), + {_Guids, BQS1} = BQ:ack(AckTags, Fun, BQS), State1#q{backing_queue_state = BQS1} end end)); diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 77278416e7..0952e73424 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -95,15 +95,19 @@ behaviour_info(callbacks) -> {drain_confirmed, 1}, %% Drop messages from the head of the queue while the supplied - %% predicate returns true. - {dropwhile, 2}, + %% predicate returns true. A callback function is supplied + %% allowing callers access to messages that are about to be + %% dropped. + {dropwhile, 3}, %% Produce the next message. {fetch, 2}, %% Acktags supplied are for messages which can now be forgotten - %% about. Must return 1 msg_id per Ack, in the same order as Acks. - {ack, 2}, + %% about. Must return 1 msg_id per Ack, in the same order as + %% Acks. A callback function is supplied allowing callers to + %% access messages that are being acked. + {ack, 3}, %% Reinsert messages into the queue which have already been %% delivered and were pending acknowledgement. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 328fe639f7..c4173ec600 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -17,8 +17,8 @@ -module(rabbit_mirror_queue_master). -export([init/3, terminate/2, delete_and_terminate/2, - purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, - requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/2, + purge/1, publish/4, publish_delivered/5, fetch/2, ack/3, + requeue/3, len/1, is_empty/1, drain_confirmed/1, dropwhile/3, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, discard/3]). @@ -172,12 +172,12 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 })}. -dropwhile(Fun, State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - set_delivered = SetDelivered }) -> +dropwhile(Pred, DropFun, State = #state{gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + set_delivered = SetDelivered }) -> Len = BQ:len(BQS), - BQS1 = BQ:dropwhile(Fun, BQS), + BQS1 = BQ:dropwhile(Pred, DropFun, BQS), Dropped = Len - BQ:len(BQS1), SetDelivered1 = lists:max([0, SetDelivered - Dropped]), ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}), @@ -235,15 +235,15 @@ fetch(AckRequired, State = #state { gm = GM, ack_msg_id = AM1 }} end. -ack(AckTags, State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - ack_msg_id = AM }) -> - {MsgIds, BQS1} = BQ:ack(AckTags, BQS), +ack(AckTags, Fun, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + ack_msg_id = AM }) -> + {MsgIds, BQS1} = BQ:ack(AckTags, Fun, BQS), AM1 = lists:foldl(fun dict:erase/2, AM, AckTags), case MsgIds of [] -> ok; - _ -> ok = gm:broadcast(GM, {ack, MsgIds}) + _ -> ok = gm:broadcast(GM, {ack, Fun, MsgIds}) end, {MsgIds, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 }}. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index f423760a3d..52511c9637 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -818,12 +818,12 @@ process_instruction({fetch, AckRequired, MsgId, Remaining}, %% we must be shorter than the master State end}; -process_instruction({ack, MsgIds}, +process_instruction({ack, Fun, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, msg_id_ack = MA }) -> {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), - {MsgIds1, BQS1} = BQ:ack(AckTags, BQS), + {MsgIds1, BQS1} = BQ:ack(AckTags, Fun, BQS), [] = MsgIds1 -- MsgIds, %% ASSERTION {ok, State #state { msg_id_ack = MA1, backing_queue_state = BQS1 }}; diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 3a4f6f84e2..b96934c6c0 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2294,7 +2294,9 @@ test_dropwhile(VQ0) -> VQ2 = rabbit_variable_queue:dropwhile( fun(#message_properties { expiry = Expiry }) -> Expiry =< 5 - end, VQ1), + end, + dummy_msg_lookup_fun(), + VQ1), %% fetch five now VQ3 = lists:foldl(fun (_N, VQN) -> @@ -2308,13 +2310,18 @@ test_dropwhile(VQ0) -> VQ4. +dummy_msg_lookup_fun() -> + fun(_Fun, State) -> State end. + test_dropwhile_varying_ram_duration(VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1), - VQ3 = rabbit_variable_queue:dropwhile(fun(_) -> false end, VQ2), + VQ3 = rabbit_variable_queue:dropwhile( + fun(_) -> false end, dummy_msg_lookup_fun(), VQ2), VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3), VQ5 = variable_queue_publish(false, 1, VQ4), - rabbit_variable_queue:dropwhile(fun(_) -> false end, VQ5). + rabbit_variable_queue:dropwhile( + fun(_) -> false end, dummy_msg_lookup_fun(), VQ5). test_variable_queue_dynamic_duration_change(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), @@ -2339,7 +2346,8 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% drain {VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7), - {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8), + {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags, + dummy_msg_lookup_fun(), VQ8), {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -2349,7 +2357,8 @@ publish_fetch_and_ack(0, _Len, VQ0) -> publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), - {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2), + {_Guids, VQ3} = rabbit_variable_queue:ack([AckTag], + dummy_msg_lookup_fun(), VQ2), publish_fetch_and_ack(N-1, Len, VQ3). test_variable_queue_partial_segments_delta_thing(VQ0) -> @@ -2383,7 +2392,8 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> {len, HalfSegment + 1}]), {VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false, HalfSegment + 1, VQ7), - {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), + {_Guids, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, + dummy_msg_lookup_fun(), VQ8), %% should be empty now {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 895fc388a7..c51640bab1 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -18,7 +18,7 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, drain_confirmed/1, - dropwhile/2, fetch/2, ack/2, requeue/3, len/1, is_empty/1, + dropwhile/3, fetch/2, ack/3, requeue/3, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, discard/3, @@ -581,14 +581,14 @@ drain_confirmed(State = #vqstate { confirmed = C }) -> confirmed = gb_sets:new() }} end. -dropwhile(Pred, State) -> +dropwhile(Pred, DropFun, State) -> case queue_out(State) of {empty, State1} -> a(State1); {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> case Pred(MsgProps) of - true -> {_, State2} = internal_fetch(false, MsgStatus, State1), - dropwhile(Pred, State2); + true -> State2 = DropFun(read_msg_callback(MsgStatus), State1), + dropwhile(Pred, DropFun, State2); false -> a(in_r(MsgStatus, State1)) end end. @@ -603,20 +603,45 @@ fetch(AckRequired, State) -> {MsgStatus1, State2} = read_msg(MsgStatus, State1), {Res, State3} = internal_fetch(AckRequired, MsgStatus1, State2), {Res, a(State3)} + + end. + +read_msg_callback(#msg_status { msg = undefined, + msg_id = MsgId, + is_persistent = IsPersistent }) -> + fun(State) -> + read_msg_callback1(MsgId, IsPersistent, State) + end; +read_msg_callback(#msg_status{ msg = Msg}) -> + fun(State) -> + {Msg, State} + end; +read_msg_callback({IsPersistent, MsgId, _MsgProps}) -> + fun(State) -> + read_msg_callback1(MsgId, IsPersistent, State) end. -ack([], State) -> +read_msg_callback1(MsgId, IsPersistent, + State = #vqstate{ msg_store_clients = MSCState }) -> + {{ok, Msg = #basic_message{}}, MSCState1} = + msg_store_read(MSCState, IsPersistent, MsgId), + {Msg, State #vqstate { msg_store_clients = MSCState1 }}. + +ack([], _Fun, State) -> {[], State}; -ack(AckTags, State) -> + +ack(AckTags, Fun, State) -> {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, persistent_count = PCount, ack_out_counter = AckOutCount }} = lists:foldl( - fun (SeqId, {Acc, State2}) -> + fun (SeqId, {Acc, State2 = #vqstate{pending_ack = PA}}) -> + AckEntry = gb_trees:get(SeqId, PA), {MsgStatus, State3} = remove_pending_ack(SeqId, State2), - {accumulate_ack(MsgStatus, Acc), State3} + {accumulate_ack(MsgStatus, Acc), + Fun(read_msg_callback(AckEntry), State3)} end, {accumulate_ack_init(), State}, AckTags), IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) |
