diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2018-11-08 15:01:47 +0000 |
|---|---|---|
| committer | Diana Corbacho <diana@rabbitmq.com> | 2018-11-08 15:01:47 +0000 |
| commit | 0dee589d53619a64e1f8aab77e490d6fc978a674 (patch) | |
| tree | 506169ea21903f8e1bc28278d6ee3fdc2d5b5a15 | |
| parent | a91a4fa9024be5cd015455417feefc622a04ace5 (diff) | |
| download | rabbitmq-server-git-0dee589d53619a64e1f8aab77e490d6fc978a674.tar.gz | |
Remove dead letter process
No longer needed, rabbit_quorum_queue:untracked_enqueue is as least
as reliable as the current implementation
[#161343708]
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_dead_letter.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_vhost_dead_letter.erl | 132 |
5 files changed, 14 insertions, 160 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d9f1711401..c2dab3da6f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -954,13 +954,11 @@ dead_letter_maxlen_msg(X, State = #q{backing_queue = BQ}) -> dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK, backing_queue_state = BQS, - backing_queue = BQ, - q = #amqqueue{ name = Resource } }) -> - #resource{virtual_host = VHost} = Resource, + backing_queue = BQ}) -> QName = qname(State), {Res, Acks1, BQS1} = Fun(fun (Msg, AckTag, Acks) -> - rabbit_vhost_dead_letter:publish(VHost, X, RK, QName, [{Reason, Msg}]), + rabbit_dead_letter:publish(Msg, Reason, X, RK, QName), [AckTag | Acks] end, [], BQS), {_Guids, BQS2} = BQ:ack(Acks1, BQS1), diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl index 06691a29ad..bf77f2d832 100644 --- a/src/rabbit_dead_letter.erl +++ b/src/rabbit_dead_letter.erl @@ -16,7 +16,7 @@ -module(rabbit_dead_letter). --export([publish/6]). +-export([publish/5]). -include("rabbit.hrl"). -include("rabbit_framing.hrl"). @@ -26,20 +26,17 @@ -type reason() :: 'expired' | 'rejected' | 'maxlen'. -spec publish(rabbit_types:message(), reason(), rabbit_types:exchange(), - 'undefined' | binary(), rabbit_amqqueue:name(), - #{Name :: atom() => rabbit_fifo_client:state()}) -> 'ok'. + 'undefined' | binary(), rabbit_amqqueue:name()) -> 'ok'. %%---------------------------------------------------------------------------- -publish(Msg, Reason, X, RK, QName, QueueStates0) -> +publish(Msg, Reason, X, RK, QName) -> DLMsg = make_msg(Msg, Reason, X#exchange.name, RK, QName), Delivery = rabbit_basic:delivery(false, false, DLMsg, undefined), {Queues, Cycles} = detect_cycles(Reason, DLMsg, rabbit_exchange:route(X, Delivery)), lists:foreach(fun log_cycle_once/1, Cycles), - {_, _, QueueStates} = rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(Queues), - Delivery, QueueStates0), - QueueStates. + rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(Queues), Delivery). make_msg(Msg = #basic_message{content = Content, exchange_name = Exchange, diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 795465855b..4c7cb607ee 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -23,7 +23,7 @@ -export([credit/4]). -export([purge/1]). -export([stateless_deliver/2, deliver/3]). --export([dead_letter_publish/5]). +-export([dead_letter_publish/4]). -export([queue_name/1]). -export([cluster_state/1, status/2]). -export([cancel_consumer_handler/3, cancel_consumer/3]). @@ -498,11 +498,10 @@ delete_member(#amqqueue{pid = {RaName, _}, name = QName}, Node) -> end. %%---------------------------------------------------------------------------- -dlx_mfa(#amqqueue{name = Resource} = Q) -> - #resource{virtual_host = VHost} = Resource, +dlx_mfa(Q) -> DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>, fun res_arg/2, Q), Q), DLXRKey = args_policy_lookup(<<"dead-letter-routing-key">>, fun res_arg/2, Q), - {?MODULE, dead_letter_publish, [VHost, DLX, DLXRKey, Q#amqqueue.name]}. + {?MODULE, dead_letter_publish, [DLX, DLXRKey, Q#amqqueue.name]}. init_dlx(undefined, _Q) -> undefined; @@ -520,10 +519,12 @@ args_policy_lookup(Name, Resolve, Q = #amqqueue{arguments = Args}) -> {PolVal, {_Type, ArgVal}} -> Resolve(PolVal, ArgVal) end. -dead_letter_publish(_, undefined, _, _, _) -> +dead_letter_publish(undefined, _, _, _) -> ok; -dead_letter_publish(VHost, X, RK, QName, ReasonMsgs) -> - rabbit_vhost_dead_letter:publish(VHost, X, RK, QName, ReasonMsgs). +dead_letter_publish(X, RK, QName, ReasonMsgs) -> + {ok, Exchange} = rabbit_exchange:lookup(X), + [rabbit_dead_letter:publish(Msg, Reason, Exchange, RK, QName) + || {Reason, Msg} <- ReasonMsgs]. %% TODO escape hack qname_to_rname(#resource{virtual_host = <<"/">>, name = Name}) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f91f8a648e..83738b7e52 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -488,12 +488,10 @@ start(VHost, DurableQueues) -> Ref = proplists:get_value(persistent_ref, Terms), Ref =/= undefined end], - start_dead_letter_process(VHost), start_msg_store(VHost, ClientRefs, StartFunState), {ok, AllTerms}. stop(VHost) -> - ok = rabbit_vhost_dead_letter:stop(VHost), ok = stop_msg_store(VHost), ok = rabbit_queue_index:stop(VHost). @@ -517,14 +515,6 @@ do_start_msg_store(VHost, Type, Refs, StartFunState) -> exit({error, Error}) end. -start_dead_letter_process(VHost) -> - case rabbit_vhost_dead_letter:start(VHost) of - {ok, _} -> - rabbit_log:info("Started dead letter process for vhost '~s'~n", [VHost]); - Err -> - exit(Err) - end. - abbreviated_type(?TRANSIENT_MSG_STORE) -> transient; abbreviated_type(?PERSISTENT_MSG_STORE) -> persistent. diff --git a/src/rabbit_vhost_dead_letter.erl b/src/rabbit_vhost_dead_letter.erl deleted file mode 100644 index b4dede5e19..0000000000 --- a/src/rabbit_vhost_dead_letter.erl +++ /dev/null @@ -1,132 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. -%% - --module(rabbit_vhost_dead_letter). - --include("rabbit.hrl"). - --behaviour(gen_server). - --export([start/1, start_link/0]). --export([stop/1]). --export([publish/5]). --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). - --record(state, {queue_states, - queue_cleanup_timer}). - -start(VHost) -> - case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of - {ok, VHostSup} -> - supervisor2:start_child(VHostSup, - {rabbit_vhost_dead_letter, - {rabbit_vhost_dead_letter, start_link, []}, - transient, ?WORKER_WAIT, worker, [rabbit_vhost_dead_letter]}); - {error, {no_such_vhost, VHost}} = E -> - rabbit_log:error("Failed to start a dead letter process for vhost ~s: vhost no" - " longer exists!", [VHost]), - E - end. - -stop(VHost) -> - case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of - {ok, VHostSup} -> - ok = supervisor2:terminate_child(VHostSup, rabbit_vhost_dead_letter), - ok = supervisor2:delete_child(VHostSup, rabbit_vhost_dead_letter); - {error, {no_such_vhost, VHost}} -> - rabbit_log:error("Failed to stop a dead letter process for vhost ~s: " - "vhost no longer exists!", [VHost]), - - ok - end. - -publish(VHost, X, RK, QName, ReasonMsgs) -> - case vhost_dead_letter_pid(VHost) of - no_pid -> - %% TODO what to do??? - ok; - Pid -> - gen_server:cast(Pid, {publish, X, RK, QName, ReasonMsgs}) - end. - -vhost_dead_letter_pid(VHost) -> - {ok, VHostSup} = rabbit_vhost_sup_sup:get_vhost_sup(VHost), - case supervisor2:find_child(VHostSup, rabbit_vhost_dead_letter) of - [Pid] -> Pid; - [] -> no_pid - end. - -start_link() -> - gen_server:start_link(?MODULE, [], []). - -init([]) -> - {ok, init_queue_cleanup_timer(#state{queue_states = #{}})}. - -handle_call(_Req, _From, State) -> - {reply, ok, State}. - -handle_cast({publish, X, RK, QName, ReasonMsgs}, #state{queue_states = QueueStates0} = State) - when is_record(X, exchange) -> - QueueStates = batch_publish(X, RK, QName, ReasonMsgs, QueueStates0), - {noreply, State#state{queue_states = QueueStates}}; -handle_cast({publish, DLX, RK, QName, ReasonMsgs}, #state{queue_states = QueueStates0} = State) -> - QueueStates = - case rabbit_exchange:lookup(DLX) of - {ok, X} -> - batch_publish(X, RK, QName, ReasonMsgs, QueueStates0); - {error, not_found} -> - QueueStates0 - end, - {noreply, State#state{queue_states = QueueStates}}. - -handle_info({ra_event, {Name, _}, _} = Evt, - #state{queue_states = QueueStates} = State0) -> - FState0 = maps:get(Name, QueueStates), - case rabbit_quorum_queue:handle_event(Evt, FState0) of - {_, _, _, FState1} -> - {noreply, - State0#state{queue_states = maps:put(Name, FState1, QueueStates)}}; - eol -> - {noreply, - State0#state{queue_states = maps:remove(Name, QueueStates)}} - end; -handle_info(queue_cleanup, State = #state{queue_states = QueueStates0}) -> - QueueStates = maps:filter(fun(Name, _) -> - QName = rabbit_quorum_queue:queue_name(Name), - case rabbit_amqqueue:lookup(QName) of - [] -> - false; - _ -> - true - end - end, QueueStates0), - {noreply, init_queue_cleanup_timer(State#state{queue_states = QueueStates})}; -handle_info(_I, State) -> - {noreply, State}. - -terminate(_, _) -> ok. - -code_change(_, State, _) -> {ok, State}. - -batch_publish(X, RK, QName, ReasonMsgs, QueueStates) -> - lists:foldl(fun({Reason, Msg}, Acc) -> - rabbit_dead_letter:publish(Msg, Reason, X, RK, QName, Acc) - end, QueueStates, ReasonMsgs). - -init_queue_cleanup_timer(State) -> - {ok, Interval} = application:get_env(rabbit, channel_queue_cleanup_interval), - State#state{queue_cleanup_timer = erlang:send_after(Interval, self(), queue_cleanup)}. |
