diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2012-02-28 12:29:37 +0000 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2012-02-28 12:29:37 +0000 |
| commit | 4435c748294379add22aca9e3ee7273113daee18 (patch) | |
| tree | 260b85749a269e21313ff119a07ed3510a333f68 /src | |
| parent | 168c060308e2ccdba8ef6f8e06fe2e87400bc3b4 (diff) | |
| parent | 5e78aacf560b504834559e7e2b5a1fc9118983bd (diff) | |
| download | rabbitmq-server-git-4435c748294379add22aca9e3ee7273113daee18.tar.gz | |
merge bug24650 into default (Move names for connections and channels from mgmt to broker)
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 25 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 349 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_basic.erl | 33 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_ssl.erl | 41 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 43 |
12 files changed, 506 insertions, 102 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 0a0ca90a63..dd5fb89ce4 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -212,14 +212,13 @@ -type(file_suffix() :: binary()). %% this really should be an abstract type -type(log_location() :: 'tty' | 'undefined' | file:filename()). +-type(param() :: atom()). -spec(maybe_hipe_compile/0 :: () -> 'ok'). -spec(prepare/0 :: () -> 'ok'). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). -spec(stop_and_halt/0 :: () -> no_return()). --spec(rotate_logs/1 :: (file_suffix()) -> rabbit_types:ok_or_error(any())). --spec(force_event_refresh/0 :: () -> 'ok'). -spec(status/0 :: () -> [{pid, integer()} | {running_applications, [{atom(), string(), string()}]} | @@ -228,12 +227,11 @@ {memory, any()}]). -spec(is_running/0 :: () -> boolean()). -spec(is_running/1 :: (node()) -> boolean()). --spec(environment/0 :: () -> [{atom() | term()}]). --spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()). +-spec(environment/0 :: () -> [{param() | term()}]). +-spec(rotate_logs/1 :: (file_suffix()) -> rabbit_types:ok_or_error(any())). +-spec(force_event_refresh/0 :: () -> 'ok'). --spec(maybe_insert_default_data/0 :: () -> 'ok'). --spec(boot_delegate/0 :: () -> 'ok'). --spec(recover/0 :: () -> 'ok'). +-spec(log_location/1 :: ('sasl' | 'kernel') -> log_location()). -spec(start/2 :: ('normal',[]) -> {'error', @@ -243,6 +241,10 @@ {'ok',pid()}). -spec(stop/1 :: (_) -> 'ok'). +-spec(maybe_insert_default_data/0 :: () -> 'ok'). +-spec(boot_delegate/0 :: () -> 'ok'). +-spec(recover/0 :: () -> 'ok'). + -endif. %%---------------------------------------------------------------------------- @@ -712,6 +714,6 @@ config_files() -> case init:get_argument(config) of {ok, Files} -> [filename:absname( filename:rootname(File, ".config") ++ ".config") || - File <- Files]; + [File] <- Files]; error -> [] end. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index a7dfd535c8..c95efa1447 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -334,12 +334,23 @@ check_declare_arguments(QueueName, Args) -> precondition_failed, "invalid arg '~s' for ~s: ~255p", [Key, rabbit_misc:rs(QueueName), Error]) - end || {Key, Fun} <- + end || + {Key, Fun} <- [{<<"x-expires">>, fun check_integer_argument/2}, {<<"x-message-ttl">>, fun check_integer_argument/2}, - {<<"x-ha-policy">>, fun check_ha_policy_argument/2}]], + {<<"x-ha-policy">>, fun check_ha_policy_argument/2}, + {<<"x-dead-letter-exchange">>, fun check_string_argument/2}, + {<<"x-dead-letter-routing-key">>, + fun check_dlxrk_argument/2}]], ok. +check_string_argument(undefined, _Args) -> + ok; +check_string_argument({longstr, _}, _Args) -> + ok; +check_string_argument({Type, _}, _) -> + {error, {unacceptable_type, Type}}. + check_integer_argument(undefined, _Args) -> ok; check_integer_argument({Type, Val}, _Args) when Val > 0 -> @@ -350,6 +361,16 @@ check_integer_argument({Type, Val}, _Args) when Val > 0 -> check_integer_argument({_Type, Val}, _Args) -> {error, {value_zero_or_less, Val}}. +check_dlxrk_argument(undefined, _Args) -> + ok; +check_dlxrk_argument({longstr, _}, Args) -> + case rabbit_misc:table_lookup(Args, <<"x-dead-letter-exchange">>) of + undefined -> {error, routing_key_but_no_dlx_defined}; + _ -> ok + end; +check_dlxrk_argument({Type, _}, _Args) -> + {error, {unacceptable_type, Type}}. + check_ha_policy_argument(undefined, _Args) -> ok; check_ha_policy_argument({longstr, <<"all">>}, _Args) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 12cd0c93ff..fd2d7214d2 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -49,7 +49,14 @@ stats_timer, msg_id_to_channel, ttl, - ttl_timer_ref + ttl_timer_ref, + publish_seqno, + unconfirmed_mq, + unconfirmed_qm, + delayed_stop, + queue_monitors, + dlx, + dlx_routing_key }). -record(consumer, {tag, ack_required}). @@ -128,6 +135,13 @@ init(Q) -> rate_timer_ref = undefined, expiry_timer_ref = undefined, ttl = undefined, + dlx = undefined, + dlx_routing_key = undefined, + publish_seqno = 1, + unconfirmed_mq = gb_trees:empty(), + unconfirmed_qm = gb_trees:empty(), + delayed_stop = undefined, + queue_monitors = dict:new(), msg_id_to_channel = gb_trees:empty()}, {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -149,6 +163,11 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, rate_timer_ref = RateTRef, expiry_timer_ref = undefined, ttl = undefined, + publish_seqno = 1, + unconfirmed_mq = gb_trees:empty(), + unconfirmed_qm = gb_trees:empty(), + delayed_stop = undefined, + queue_monitors = dict:new(), msg_id_to_channel = MTC}, State1 = requeue_and_run(AckTags, process_args( rabbit_event:init_stats_timer( @@ -210,18 +229,28 @@ bq_init(BQ, Q, Recover) -> end). process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> - lists:foldl(fun({Arg, Fun}, State1) -> - case rabbit_misc:table_lookup(Arguments, Arg) of - {_Type, Val} -> Fun(Val, State1); - undefined -> State1 - end - end, State, [{<<"x-expires">>, fun init_expires/2}, - {<<"x-message-ttl">>, fun init_ttl/2}]). + lists:foldl( + fun({Arg, Fun}, State1) -> + case rabbit_misc:table_lookup(Arguments, Arg) of + {_Type, Val} -> Fun(Val, State1); + undefined -> State1 + end + end, State, + [{<<"x-expires">>, fun init_expires/2}, + {<<"x-message-ttl">>, fun init_ttl/2}, + {<<"x-dead-letter-exchange">>, fun init_dlx/2}, + {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2}]). init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). init_ttl(TTL, State) -> drop_expired_messages(State#q{ttl = TTL}). +init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) -> + State#q{dlx = rabbit_misc:r(QName, exchange, DLX)}. + +init_dlx_routing_key(RoutingKey, State) -> + State#q{dlx_routing_key = RoutingKey}. + terminate_shutdown(Fun, State) -> State1 = #q{backing_queue_state = BQS} = stop_sync_timer(stop_rate_timer(State)), @@ -449,34 +478,36 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> lists:foldl( fun(MsgId, {CMs, MTC0}) -> case gb_trees:lookup(MsgId, MTC0) of - {value, {ChPid, MsgSeqNo}} -> - {rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMs), + {value, {SenderPid, MsgSeqNo}} -> + {rabbit_misc:gb_trees_cons(SenderPid, + MsgSeqNo, CMs), gb_trees:delete(MsgId, MTC0)}; none -> {CMs, MTC0} end end, {gb_trees:empty(), MTC}, MsgIds), - rabbit_misc:gb_trees_foreach(fun rabbit_channel:confirm/2, CMs), + rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs), State#q{msg_id_to_channel = MTC1}. should_confirm_message(#delivery{msg_seq_no = undefined}, _State) -> never; -should_confirm_message(#delivery{sender = ChPid, +should_confirm_message(#delivery{sender = SenderPid, msg_seq_no = MsgSeqNo, message = #basic_message { is_persistent = true, id = MsgId}}, #q{q = #amqqueue{durable = true}}) -> - {eventually, ChPid, MsgSeqNo, MsgId}; + {eventually, SenderPid, MsgSeqNo, MsgId}; should_confirm_message(_Delivery, _State) -> immediately. needs_confirming({eventually, _, _, _}) -> true; needs_confirming(_) -> false. -maybe_record_confirm_message({eventually, ChPid, MsgSeqNo, MsgId}, +maybe_record_confirm_message({eventually, SenderPid, MsgSeqNo, MsgId}, State = #q{msg_id_to_channel = MTC}) -> - State#q{msg_id_to_channel = gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC)}; + State#q{msg_id_to_channel = + gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC)}; maybe_record_confirm_message(_Confirm, State) -> State. @@ -488,13 +519,13 @@ run_message_queue(State) -> BQ:is_empty(BQS), State1), State2. -attempt_delivery(Delivery = #delivery{sender = ChPid, +attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message, msg_seq_no = MsgSeqNo}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> Confirm = should_confirm_message(Delivery, State), case Confirm of - immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]); + immediately -> rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]); _ -> ok end, case BQ:is_duplicate(Message, BQS) of @@ -509,7 +540,7 @@ attempt_delivery(Delivery = #delivery{sender = ChPid, AckRequired, Message, (?BASE_MESSAGE_PROPERTIES)#message_properties{ needs_confirming = needs_confirming(Confirm)}, - ChPid, BQS2), + SenderPid, BQS2), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS3}} end, @@ -530,7 +561,7 @@ attempt_delivery(Delivery = #delivery{sender = ChPid, end. deliver_or_enqueue(Delivery = #delivery{message = Message, - sender = ChPid}, State) -> + sender = SenderPid}, State) -> {Delivered, Confirm, State1} = attempt_delivery(Delivery, State), State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = maybe_record_confirm_message(Confirm, State1), @@ -538,7 +569,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, true -> State2; false -> Props = (message_properties(State)) #message_properties{ needs_confirming = needs_confirming(Confirm)}, - BQS1 = BQ:publish(Message, Props, ChPid, BQS), + BQS1 = BQ:publish(Message, Props, SenderPid, BQS), ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) end. @@ -659,11 +690,11 @@ subtract_acks(ChPid, AckTags, State, Fun) -> Fun(State) end. -discard_delivery(#delivery{sender = ChPid, +discard_delivery(#delivery{sender = SenderPid, message = Message}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - State#q{backing_queue_state = BQ:discard(Message, ChPid, BQS)}. + State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}. message_properties(#q{ttl=TTL}) -> #message_properties{expiry = calculate_msg_expiry(TTL)}. @@ -674,10 +705,11 @@ calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000). drop_expired_messages(State = #q{ttl = undefined}) -> State; drop_expired_messages(State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> + backing_queue = BQ}) -> Now = now_micros(), BQS1 = BQ:dropwhile( fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, + dead_letter_fun(expired, State), BQS), ensure_ttl_timer(State#q{backing_queue_state = BQS1}). @@ -694,6 +726,199 @@ ensure_ttl_timer(State = #q{backing_queue = BQ, ensure_ttl_timer(State) -> State. +dead_letter_fun(_Reason, #q{dlx = undefined}) -> + undefined; +dead_letter_fun(Reason, _State) -> + fun(Msg, AckTag) -> + gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason}) + end. + +dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) -> + case rabbit_exchange:lookup(DLX) of + {error, not_found} -> + noreply(State); + _ -> + dead_letter_msg_existing_dlx(Msg, AckTag, Reason, State) + end. + +dead_letter_msg_existing_dlx(Msg, AckTag, Reason, + State = #q{publish_seqno = MsgSeqNo, + unconfirmed_mq = UMQ, + dlx = DLX, + backing_queue = BQ, + backing_queue_state = BQS}) -> + {ok, _, QPids} = + rabbit_basic:publish( + rabbit_basic:delivery( + false, false, make_dead_letter_msg(DLX, Reason, Msg, State), + MsgSeqNo)), + State1 = lists:foldl(fun monitor_queue/2, State, QPids), + State2 = State1#q{publish_seqno = MsgSeqNo + 1}, + case QPids of + [] -> {_, BQS1} = BQ:ack([AckTag], BQS), + cleanup_after_confirm(State2#q{backing_queue_state = BQS1}); + _ -> State3 = + lists:foldl( + fun(QPid, State0 = #q{unconfirmed_qm = UQM}) -> + UQM1 = rabbit_misc:gb_trees_set_insert( + QPid, MsgSeqNo, UQM), + State0#q{unconfirmed_qm = UQM1} + end, State2, QPids), + noreply(State3#q{ + unconfirmed_mq = + gb_trees:insert( + MsgSeqNo, {gb_sets:from_list(QPids), + AckTag}, UMQ)}) + end. + +monitor_queue(QPid, State = #q{queue_monitors = QMons}) -> + case dict:is_key(QPid, QMons) of + true -> State; + false -> State#q{queue_monitors = + dict:store(QPid, erlang:monitor(process, QPid), + QMons)} + end. + +demonitor_queue(QPid, State = #q{queue_monitors = QMons}) -> + case dict:find(QPid, QMons) of + {ok, MRef} -> erlang:demonitor(MRef), + State#q{queue_monitors = dict:erase(QPid, QMons)}; + error -> State + end. + +handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons, + unconfirmed_qm = UQM}) -> + case dict:find(QPid, QMons) of + error -> + noreply(State); + {ok, _} -> + #resource{name = QName} = qname(State), + rabbit_log:info("DLQ ~p (for ~p) died~n", [QPid, QName]), + case gb_trees:lookup(QPid, UQM) of + none -> + noreply(State); + {value, MsgSeqNosSet} -> + case rabbit_misc:is_abnormal_termination(Reason) of + true -> rabbit_log:warning( + "Dead queue lost ~p messages~n", + [gb_sets:size(MsgSeqNosSet)]); + false -> ok + end, + handle_confirm(gb_sets:to_list(MsgSeqNosSet), QPid, + State#q{queue_monitors = + dict:erase(QPid, QMons)}) + end + end. + +handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ, + unconfirmed_qm = UQM, + backing_queue = BQ, + backing_queue_state = BQS}) -> + {AckTags1, UMQ3} = + lists:foldl( + fun (MsgSeqNo, {AckTags, UMQ1}) -> + {QPids, AckTag} = gb_trees:get(MsgSeqNo, UMQ1), + QPids1 = gb_sets:delete(QPid, QPids), + case gb_sets:is_empty(QPids1) of + true -> {[AckTag | AckTags], + gb_trees:delete(MsgSeqNo, UMQ1)}; + false -> {AckTags, gb_trees:update( + MsgSeqNo, {QPids1, AckTag}, UMQ1)} + end + end, {[], UMQ}, MsgSeqNos), + {_Guids, BQS1} = BQ:ack(AckTags1, BQS), + MsgSeqNos1 = gb_sets:difference(gb_trees:get(QPid, UQM), + gb_sets:from_list(MsgSeqNos)), + State1 = case gb_sets:is_empty(MsgSeqNos1) of + false -> State#q{ + unconfirmed_qm = + gb_trees:update(QPid, MsgSeqNos1, UQM)}; + true -> demonitor_queue( + QPid, State#q{ + unconfirmed_qm = + gb_trees:delete(QPid, UQM)}) + end, + cleanup_after_confirm(State1#q{unconfirmed_mq = UMQ3, + backing_queue_state = BQS1}). + +stop_later(Reason, State) -> + stop_later(Reason, undefined, noreply, State). + +stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) -> + case {gb_trees:is_empty(UMQ), Reply} of + {true, noreply} -> + {stop, Reason, State}; + {true, _} -> + {stop, Reason, Reply, State}; + {false, _} -> + noreply(State#q{delayed_stop = {Reason, {From, Reply}}}) + end. + +cleanup_after_confirm(State = #q{delayed_stop = DS, + unconfirmed_mq = UMQ}) -> + case gb_trees:is_empty(UMQ) andalso DS =/= undefined of + true -> case DS of + {_, {_, noreply}} -> ok; + {_, {From, Reply}} -> gen_server2:reply(From, Reply) + end, + {Reason, _} = DS, + {stop, Reason, State}; + false -> noreply(State) + end. + +already_been_here(_Delivery, #q{dlx = undefined}) -> + false; +already_been_here(#delivery{message = #basic_message{content = Content}}, + State) -> + #content{properties = #'P_basic'{headers = Headers}} = + rabbit_binary_parser:ensure_content_decoded(Content), + #resource{name = QueueName} = qname(State), + case Headers of + undefined -> + false; + _ -> + case rabbit_misc:table_lookup(Headers, <<"x-death">>) of + {array, DeathTables} -> + OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) || + {table, D} <- DeathTables], + OldQueues1 = [QName || {longstr, QName} <- OldQueues], + case lists:member(QueueName, OldQueues1) of + true -> [QueueName | OldQueues1]; + _ -> false + end; + _ -> + false + end + end. + +make_dead_letter_msg(DLX, Reason, + Msg = #basic_message{content = Content, + exchange_name = Exchange, + routing_keys = RoutingKeys}, + State = #q{dlx_routing_key = DlxRoutingKey}) -> + Headers = rabbit_basic:extract_headers(Content), + #resource{name = QName} = qname(State), + %% The first routing key is the one specified in the + %% basic.publish; all others are CC or BCC keys. + RoutingKeys1 = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], + Info = [{<<"reason">>, longstr, list_to_binary(atom_to_list(Reason))}, + {<<"queue">>, longstr, QName}, + {<<"time">>, timestamp, rabbit_misc:now_ms() div 1000}, + {<<"exchange">>, longstr, Exchange#resource.name}, + {<<"routing-keys">>, array, + [{longstr, Key} || Key <- RoutingKeys1]}], + Headers1 = rabbit_basic:append_table_header(<<"x-death">>, Info, Headers), + {DeathRoutingKeys, Headers2} = + case DlxRoutingKey of + undefined -> {RoutingKeys, Headers1}; + _ -> {[DlxRoutingKey], + lists:keydelete(<<"CC">>, 1, Headers1)} + end, + Content1 = rabbit_basic:replace_headers(Headers2, Content), + Msg#basic_message{exchange_name = DLX, id = rabbit_guid:gen(), + routing_keys = DeathRoutingKeys, content = Content1}. + + now_micros() -> timer:now_diff(now(), {0,0,0}). infos(Items, State) -> @@ -835,6 +1060,9 @@ prioritise_info(Msg, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> _ -> 0 end. +handle_call(_, _, State = #q{delayed_stop = DS}) when DS =/= undefined -> + noreply(State); + handle_call({init, Recover}, From, State = #q{q = #amqqueue{exclusive_owner = none}}) -> declare(Recover, From, State); @@ -891,15 +1119,15 @@ handle_call({deliver, Delivery = #delivery{mandatory = true}}, From, State) -> gen_server2:reply(From, true), noreply(deliver_or_enqueue(Delivery, State)); -handle_call({notify_down, ChPid}, _From, State) -> +handle_call({notify_down, ChPid}, From, State) -> %% we want to do this synchronously, so that auto_deleted queues %% are no longer visible by the time we send a response to the %% client. The queue is ultimately deleted in terminate/2; if we %% return stop with a reply, terminate/2 will be called by %% gen_server2 *before* the reply is sent. case handle_ch_down(ChPid, State) of - {ok, NewState} -> reply(ok, NewState); - {stop, NewState} -> {stop, normal, ok, NewState} + {ok, State1} -> reply(ok, State1); + {stop, State1} -> stop_later(normal, From, ok, State1) end; handle_call({basic_get, ChPid, NoAck}, _From, @@ -954,7 +1182,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, reply(ok, State2) end; -handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, +handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, State = #q{exclusive_consumer = Holder}) -> ok = maybe_send_reply(ChPid, OkMsg), case lookup_ch(ChPid) of @@ -974,7 +1202,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, State#q.active_consumers)}, case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); - true -> {stop, normal, ok, State1} + true -> stop_later(normal, From, ok, State1) end end; @@ -983,20 +1211,18 @@ handle_call(stat, _From, State) -> drop_expired_messages(ensure_expiry_timer(State)), reply({ok, BQ:len(BQS), active_consumer_count()}, State1); -handle_call({delete, IfUnused, IfEmpty}, _From, +handle_call({delete, IfUnused, IfEmpty}, From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> IsEmpty = BQ:is_empty(BQS), IsUnused = is_unused(State), if - IfEmpty and not(IsEmpty) -> - reply({error, not_empty}, State); - IfUnused and not(IsUnused) -> - reply({error, in_use}, State); - true -> - {stop, normal, {ok, BQ:len(BQS)}, State} + IfEmpty and not(IsEmpty) -> reply({error, not_empty}, State); + IfUnused and not(IsUnused) -> reply({error, in_use}, State); + true -> stop_later(normal, From, + {ok, BQ:len(BQS)}, State) end; -handle_call(purge, _From, State = #q{backing_queue = BQ, +handle_call(purge, _From, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {Count, BQS1} = BQ:purge(BQS), reply({ok, Count}, State#q{backing_queue_state = BQS1}); @@ -1007,10 +1233,18 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> ChPid, AckTags, State, fun (State1) -> requeue_and_run(AckTags, State1) end)). +handle_cast({confirm, MsgSeqNos, QPid}, State) -> + handle_confirm(MsgSeqNos, QPid, State); + +handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> + noreply(State); + handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); -handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) -> +handle_cast({deliver, Delivery = #delivery{sender = Sender, + msg_seq_no = MsgSeqNo}, Flow}, + State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. case Flow of flow -> Key = {ch_publisher, Sender}, @@ -1021,7 +1255,12 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) -> credit_flow:ack(Sender); noflow -> ok end, - noreply(deliver_or_enqueue(Delivery, State)); + case already_been_here(Delivery, State) of + false -> noreply(deliver_or_enqueue(Delivery, State)); + Qs -> log_cycle_once(Qs), + rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]), + noreply(State) + end; handle_cast({ack, AckTags, ChPid}, State) -> noreply(subtract_acks( @@ -1039,13 +1278,14 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) -> backing_queue_state = BQS}) -> case Requeue of true -> requeue_and_run(AckTags, State1); - false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS), + false -> Fun = dead_letter_fun(rejected, State), + BQS1 = BQ:fold(Fun, BQS, AckTags), State1#q{backing_queue_state = BQS1} end end)); handle_cast(delete_immediately, State) -> - {stop, normal, State}; + stop_later(normal, State); handle_cast({unblock, ChPid}, State) -> noreply( @@ -1096,11 +1336,17 @@ handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) -> {Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State), emit_consumer_created(Ch, CTag, true, AckRequired) end, - noreply(State). + noreply(State); + +handle_cast({dead_letter, {Msg, AckTag}, Reason}, State) -> + dead_letter_msg(Msg, AckTag, Reason, State). + +handle_info(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> + noreply(State); handle_info(maybe_expire, State) -> case is_unused(State) of - true -> {stop, normal, State}; + true -> stop_later(normal, State); false -> noreply(ensure_expiry_timer(State)) end; @@ -1122,11 +1368,11 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, %% match what people expect (see bug 21824). However we need this %% monitor-and-async- delete in case the connection goes away %% unexpectedly. - {stop, normal, State}; -handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> + stop_later(normal, State); +handle_info({'DOWN', _MonitorRef, process, DownPid, Reason}, State) -> case handle_ch_down(DownPid, State) of - {ok, NewState} -> noreply(NewState); - {stop, NewState} -> {stop, normal, NewState} + {ok, State1} -> handle_queue_down(DownPid, Reason, State1); + {stop, State1} -> stop_later(normal, State1) end; handle_info(update_ram_duration, State = #q{backing_queue = BQ, @@ -1171,3 +1417,14 @@ handle_pre_hibernate(State = #q{backing_queue = BQ, {hibernate, stop_rate_timer(State1)}. format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). + +log_cycle_once(Queues) -> + Key = {queue_cycle, Queues}, + case get(Key) of + true -> ok; + undefined -> rabbit_log:warning( + "Message dropped. Dead-letter queues cycle detected" ++ + ": ~p~nThis cycle will NOT be reported again.~n", + [Queues]), + put(Key, true) + end. diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 364eb8f646..42627aae7e 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -95,16 +95,24 @@ behaviour_info(callbacks) -> {drain_confirmed, 1}, %% Drop messages from the head of the queue while the supplied - %% predicate returns true. - {dropwhile, 2}, + %% predicate returns true. A callback function is supplied + %% allowing callers access to messages that are about to be + %% dropped. + {dropwhile, 3}, %% Produce the next message. {fetch, 2}, %% Acktags supplied are for messages which can now be forgotten - %% about. Must return 1 msg_id per Ack, in the same order as Acks. + %% about. Must return 1 msg_id per Ack, in the same order as + %% Acks. {ack, 2}, + %% Acktags supplied are for messages which should be + %% processed. The provided callback function is called with each + %% message. + {fold, 3}, + %% Reinsert messages into the queue which have already been %% delivered and were pending acknowledgement. {requeue, 2}, diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index b8211d4332..25485ca0b0 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -19,7 +19,8 @@ -include("rabbit_framing.hrl"). -export([publish/4, publish/6, publish/1, - message/3, message/4, properties/1, delivery/4]). + message/3, message/4, properties/1, append_table_header/3, + extract_headers/1, replace_headers/2, delivery/4, header_routes/1]). -export([build_content/2, from_content/1]). %%---------------------------------------------------------------------------- @@ -31,6 +32,7 @@ -type(publish_result() :: ({ok, rabbit_amqqueue:routing_result(), [pid()]} | rabbit_types:error('not_found'))). +-type(headers() :: rabbit_framing:amqp_table() | 'undefined'). -type(exchange_input() :: (rabbit_types:exchange() | rabbit_exchange:name())). -type(body_input() :: (binary() | [binary()])). @@ -55,6 +57,17 @@ rabbit_types:ok_or_error2(rabbit_types:message(), any())). -spec(properties/1 :: (properties_input()) -> rabbit_framing:amqp_property_record()). + +-spec(append_table_header/3 :: + (binary(), rabbit_framing:amqp_table(), headers()) -> headers()). + +-spec(extract_headers/1 :: (rabbit_types:content()) -> headers()). + +-spec(replace_headers/2 :: (headers(), rabbit_types:content()) + -> rabbit_types:content()). + +-spec(header_routes/1 :: + (undefined | rabbit_framing:amqp_table()) -> [string()]). -spec(build_content/2 :: (rabbit_framing:amqp_property_record(), binary() | [binary()]) -> rabbit_types:content()). -spec(from_content/1 :: (rabbit_types:content()) -> @@ -166,6 +179,24 @@ properties(P) when is_list(P) -> end end, #'P_basic'{}, P). +append_table_header(Name, Info, undefined) -> + append_table_header(Name, Info, []); +append_table_header(Name, Info, Headers) -> + Prior = case rabbit_misc:table_lookup(Headers, Name) of + undefined -> []; + {array, Existing} -> Existing + end, + rabbit_misc:set_table_value(Headers, Name, array, [{table, Info} | Prior]). + +extract_headers(Content) -> + #content{properties = #'P_basic'{headers = Headers}} = + rabbit_binary_parser:ensure_content_decoded(Content), + Headers. + +replace_headers(Headers, Content = #content{properties = Props}) -> + rabbit_binary_generator:clear_encoded_content( + Content#content{properties = Props#'P_basic'{headers = Headers}}). + indexof(L, Element) -> indexof(L, Element, 1). indexof([], _Element, _N) -> 0; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 24cfbcf890..d5cba91b2e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -21,7 +21,7 @@ -behaviour(gen_server2). -export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]). --export([send_command/2, deliver/4, flushed/2, confirm/2]). +-export([send_command/2, deliver/4, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([refresh_config_local/0, ready_for_close/1]). -export([force_event_refresh/0]). @@ -89,7 +89,6 @@ (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). --spec(confirm/2 ::(pid(), [non_neg_integer()]) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(list_local/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). @@ -136,9 +135,6 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). -confirm(Pid, MsgSeqNos) -> - gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}). - list() -> rabbit_misc:append_rpc_all_nodes(rabbit_mnesia:running_clustered_nodes(), rabbit_channel, list_local, []). @@ -1166,14 +1162,9 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> %% the set one by one which which would be inefficient State1 = State#ch{unconfirmed_qm = gb_trees:delete_any(QPid, UQM)}, {Nack, SendFun} = - case Reason of - Reason when Reason =:= noproc; Reason =:= noconnection; - Reason =:= normal; Reason =:= shutdown -> - {false, fun record_confirms/2}; - {shutdown, _} -> - {false, fun record_confirms/2}; - _ -> - {true, fun send_nacks/2} + case rabbit_misc:is_abnormal_termination(Reason) of + true -> {true, fun send_nacks/2}; + false -> {false, fun record_confirms/2} end, {MXs, State2} = process_confirms(MsgSeqNos, QPid, Nack, State1), SendFun(MXs, State2). diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 64a4a7371e..bfdab487f3 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -18,10 +18,10 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, - requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/2, + requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/3, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/2, discard/3]). + status/1, invoke/3, is_duplicate/2, discard/3, fold/3]). -export([start/1, stop/0]). @@ -172,12 +172,13 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 })}. -dropwhile(Fun, State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - set_delivered = SetDelivered }) -> +dropwhile(Pred, MsgFun, + State = #state{gm = GM, + backing_queue = BQ, + set_delivered = SetDelivered, + backing_queue_state = BQS }) -> Len = BQ:len(BQS), - BQS1 = BQ:dropwhile(Fun, BQS), + BQS1 = BQ:dropwhile(Pred, MsgFun, BQS), Dropped = Len - BQ:len(BQS1), SetDelivered1 = lists:max([0, SetDelivered - Dropped]), ok = gm:broadcast(GM, {set_length, BQ:len(BQS1)}), @@ -248,6 +249,13 @@ ack(AckTags, State = #state { gm = GM, {MsgIds, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 }}. +fold(MsgFun, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS}, AckTags) -> + BQS1 = BQ:fold(MsgFun, BQS, AckTags), + ok = gm:broadcast(GM, {fold, MsgFun, AckTags}), + State #state { backing_queue_state = BQS1 }. + requeue(AckTags, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 9bf89bce3f..98a80a2619 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -430,7 +430,7 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) -> Acc end end, {gb_trees:empty(), MS}, MsgIds), - rabbit_misc:gb_trees_foreach(fun rabbit_channel:confirm/2, CMs), + rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs), State #state { msg_id_status = MS1 }. handle_process_result({ok, State}) -> noreply(State); @@ -665,7 +665,7 @@ maybe_enqueue_message( {ok, {confirmed, ChPid}} -> %% BQ has confirmed it but we didn't know what the %% msg_seq_no was at the time. We do now! - ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), State1 #state { sender_queues = SQ1, msg_id_status = dict:erase(MsgId, MS) }; @@ -682,7 +682,7 @@ maybe_enqueue_message( msg_id_status = dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) }; immediately -> - ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ), State1 #state { msg_id_status = dict:erase(MsgId, MS), sender_queues = SQ1 } @@ -744,7 +744,7 @@ process_instruction( {MQ2, PendingCh, dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)}; immediately -> - ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]), + ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), {MQ2, PendingCh, MS} end; {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} -> @@ -843,6 +843,11 @@ process_instruction({ack, MsgIds}, [] = MsgIds1 -- MsgIds, %% ASSERTION {ok, State #state { msg_id_ack = MA1, backing_queue_state = BQS1 }}; +process_instruction({fold, MsgFun, AckTags}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + BQS1 = BQ:fold(AckTags, MsgFun, BQS), + {ok, State #state { backing_queue_state = BQS1 }}; process_instruction({requeue, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index b6d38172b5..dca3bead75 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -28,7 +28,9 @@ -export([enable_cover/0, report_cover/0]). -export([enable_cover/1, report_cover/1]). -export([start_cover/1]). +-export([confirm_to_sender/2]). -export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]). +-export([is_abnormal_termination/1]). -export([with_user/2, with_user_and_vhost/3]). -export([execute_mnesia_transaction/1]). -export([execute_mnesia_transaction/2]). @@ -44,7 +46,8 @@ -export([sort_field_table/1]). -export([pid_to_string/1, string_to_pid/1]). -export([version_compare/2, version_compare/3]). --export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]). +-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3, + gb_trees_set_insert/3]). -export([gb_trees_fold/3, gb_trees_foreach/2]). -export([get_options/2]). -export([all_module_attributes/1, build_acyclic_graph/3]). @@ -108,7 +111,6 @@ (rabbit_framing:amqp_table(), binary(), rabbit_framing:amqp_field_type(), rabbit_framing:amqp_value()) -> rabbit_framing:amqp_table()). - -spec(r/2 :: (rabbit_types:vhost(), K) -> rabbit_types:r3(rabbit_types:vhost(), K, '_') when is_subtype(K, atom())). @@ -131,6 +133,7 @@ (atom(), thunk(rabbit_types:error(any()) | {ok, A} | A)) -> A). -spec(with_exit_handler/2 :: (thunk(A), thunk(A)) -> A). -spec(filter_exit_map/2 :: (fun ((A) -> B), [A]) -> [B]). +-spec(is_abnormal_termination/1 :: (any()) -> boolean()). -spec(with_user/2 :: (rabbit_types:username(), thunk(A)) -> A). -spec(with_user_and_vhost/3 :: (rabbit_types:username(), rabbit_types:vhost(), thunk(A)) @@ -172,6 +175,7 @@ -spec(dict_cons/3 :: (any(), any(), dict()) -> dict()). -spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()). -spec(gb_trees_cons/3 :: (any(), any(), gb_tree()) -> gb_tree()). +-spec(gb_trees_set_insert/3 :: (any(), any(), gb_tree()) -> gb_tree()). -spec(gb_trees_fold/3 :: (fun ((any(), any(), A) -> A), A, gb_tree()) -> A). -spec(gb_trees_foreach/2 :: (fun ((any(), any()) -> any()), gb_tree()) -> 'ok'). @@ -372,6 +376,9 @@ report_coverage_percentage(File, Cov, NotCov, Mod) -> end, Mod]). +confirm_to_sender(Pid, MsgSeqNos) -> + gen_server2:cast(Pid, {confirm, MsgSeqNos, self()}). + throw_on_error(E, Thunk) -> case Thunk() of {error, Reason} -> throw({E, Reason}); @@ -397,6 +404,12 @@ filter_exit_map(F, L) -> fun () -> Ref end, fun () -> F(I) end) || I <- L]). +is_abnormal_termination(Reason) + when Reason =:= noproc; Reason =:= noconnection; + Reason =:= normal; Reason =:= shutdown -> false; +is_abnormal_termination({shutdown, _}) -> false; +is_abnormal_termination(_) -> true. + with_user(Username, Thunk) -> fun () -> case mnesia:read({rabbit_user, Username}) of @@ -701,6 +714,15 @@ gb_trees_cons(Key, Value, Tree) -> none -> gb_trees:insert(Key, [Value], Tree) end. +gb_trees_set_insert(Key, Value, Tree) -> + case gb_trees:lookup(Key, Tree) of + {value, Values} -> + Values1 = gb_sets:insert(Value, Values), + gb_trees:update(Key, Values1, Tree); + none -> + gb_trees:insert(Key, gb_sets:singleton(Value), Tree) + end. + gb_trees_fold(Fun, Acc, Tree) -> gb_trees_fold1(Fun, Acc, gb_trees:next(gb_trees:iterator(Tree))). diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl index 3025d981d4..22ff555ff0 100644 --- a/src/rabbit_ssl.erl +++ b/src/rabbit_ssl.erl @@ -21,7 +21,7 @@ -include_lib("public_key/include/public_key.hrl"). -export([peer_cert_issuer/1, peer_cert_subject/1, peer_cert_validity/1]). --export([peer_cert_subject_items/2]). +-export([peer_cert_subject_items/2, peer_cert_auth_name/1]). %%-------------------------------------------------------------------------- @@ -36,6 +36,8 @@ -spec(peer_cert_validity/1 :: (certificate()) -> string()). -spec(peer_cert_subject_items/2 :: (certificate(), tuple()) -> [string()] | 'not_found'). +-spec(peer_cert_auth_name/1 :: + (certificate()) -> binary() | 'not_found' | 'unsafe'). -endif. @@ -76,6 +78,43 @@ peer_cert_validity(Cert) -> format_asn1_value(End)]) end, Cert). +%% Extract a username from the certificate +peer_cert_auth_name(Cert) -> + {ok, Mode} = application:get_env(rabbit, ssl_cert_login_from), + peer_cert_auth_name(Mode, Cert). + +peer_cert_auth_name(distinguished_name, Cert) -> + case auth_config_sane() of + true -> iolist_to_binary(peer_cert_subject(Cert)); + false -> unsafe + end; + +peer_cert_auth_name(common_name, Cert) -> + %% If there is more than one CN then we join them with "," in a + %% vaguely DN-like way. But this is more just so we do something + %% more intelligent than crashing, if you actually want to escape + %% things properly etc, use DN mode. + case auth_config_sane() of + true -> case peer_cert_subject_items(Cert, ?'id-at-commonName') of + not_found -> not_found; + CNs -> list_to_binary(string:join(CNs, ",")) + end; + false -> unsafe + end. + +auth_config_sane() -> + {ok, Opts} = application:get_env(rabbit, ssl_options), + case {proplists:get_value(fail_if_no_peer_cert, Opts), + proplists:get_value(verify, Opts)} of + {true, verify_peer} -> + true; + {F, V} -> + rabbit_log:warning("SSL certificate authentication disabled, " + "fail_if_no_peer_cert=~p; " + "verify=~p~n", [F, V]), + false + end. + %%-------------------------------------------------------------------------- cert_info(F, Cert) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 29e0428de2..f7e3baa706 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2366,7 +2366,7 @@ test_dropwhile(VQ0) -> VQ2 = rabbit_variable_queue:dropwhile( fun(#message_properties { expiry = Expiry }) -> Expiry =< 5 - end, VQ1), + end, undefined, VQ1), %% fetch five now VQ3 = lists:foldl(fun (_N, VQN) -> @@ -2383,10 +2383,11 @@ test_dropwhile(VQ0) -> test_dropwhile_varying_ram_duration(VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1), - VQ3 = rabbit_variable_queue:dropwhile(fun(_) -> false end, VQ2), + VQ3 = rabbit_variable_queue:dropwhile( + fun(_) -> false end, undefined, VQ2), VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3), VQ5 = variable_queue_publish(false, 1, VQ4), - rabbit_variable_queue:dropwhile(fun(_) -> false end, VQ5). + rabbit_variable_queue:dropwhile(fun(_) -> false end, undefined, VQ5). test_variable_queue_dynamic_duration_change(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 52eb168a42..1b32d21197 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -18,11 +18,11 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, drain_confirmed/1, - dropwhile/2, fetch/2, ack/2, requeue/2, len/1, is_empty/1, + dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, discard/3, - multiple_routing_keys/0]). + multiple_routing_keys/0, fold/3]). -export([start/1, stop/0]). @@ -581,15 +581,23 @@ drain_confirmed(State = #vqstate { confirmed = C }) -> confirmed = gb_sets:new() }} end. -dropwhile(Pred, State) -> +dropwhile(Pred, MsgFun, State) -> case queue_out(State) of {empty, State1} -> a(State1); {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> - case Pred(MsgProps) of - true -> {_, State2} = internal_fetch(false, MsgStatus, State1), - dropwhile(Pred, State2); - false -> a(in_r(MsgStatus, State1)) + case {Pred(MsgProps), MsgFun} of + {true, undefined} -> + {_, State2} = internal_fetch(false, MsgStatus, State1), + dropwhile(Pred, MsgFun, State2); + {true, _} -> + {{_, _, AckTag, _}, State2} = + internal_fetch(true, MsgStatus, State1), + {MsgStatus, State3} = read_msg(MsgStatus, State2), + MsgFun(MsgStatus#msg_status.msg, AckTag), + dropwhile(Pred, MsgFun, State3); + {false, _} -> + a(in_r(MsgStatus, State1)) end end. @@ -628,11 +636,22 @@ ack(AckTags, State) -> persistent_count = PCount1, ack_out_counter = AckOutCount + length(AckTags) })}. -requeue(AckTags, #vqstate { delta = Delta, - q3 = Q3, - q4 = Q4, - in_counter = InCounter, - len = Len } = State) -> +fold(undefined, State, _AckTags) -> + State; +fold(MsgFun, State = #vqstate{pending_ack = PA}, AckTags) -> + lists:foldl( + fun(SeqId, State1) -> + {MsgStatus, State2} = + read_msg(gb_trees:get(SeqId, PA), State1), + MsgFun(MsgStatus#msg_status.msg, SeqId), + State2 + end, State, AckTags). + +requeue(AckTags, #vqstate { delta = Delta, + q3 = Q3, + q4 = Q4, + in_counter = InCounter, + len = Len } = State) -> {SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [], beta_limit(Q3), fun publish_alpha/2, State), |
