summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2018-11-08 15:01:47 +0000
committerDiana Corbacho <diana@rabbitmq.com>2018-11-08 15:01:47 +0000
commit0dee589d53619a64e1f8aab77e490d6fc978a674 (patch)
tree506169ea21903f8e1bc28278d6ee3fdc2d5b5a15 /src
parenta91a4fa9024be5cd015455417feefc622a04ace5 (diff)
downloadrabbitmq-server-git-0dee589d53619a64e1f8aab77e490d6fc978a674.tar.gz
Remove dead letter process
No longer needed, rabbit_quorum_queue:untracked_enqueue is as least as reliable as the current implementation [#161343708]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl6
-rw-r--r--src/rabbit_dead_letter.erl11
-rw-r--r--src/rabbit_quorum_queue.erl15
-rw-r--r--src/rabbit_variable_queue.erl10
-rw-r--r--src/rabbit_vhost_dead_letter.erl132
5 files changed, 14 insertions, 160 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d9f1711401..c2dab3da6f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -954,13 +954,11 @@ dead_letter_maxlen_msg(X, State = #q{backing_queue = BQ}) ->
dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK,
backing_queue_state = BQS,
- backing_queue = BQ,
- q = #amqqueue{ name = Resource } }) ->
- #resource{virtual_host = VHost} = Resource,
+ backing_queue = BQ}) ->
QName = qname(State),
{Res, Acks1, BQS1} =
Fun(fun (Msg, AckTag, Acks) ->
- rabbit_vhost_dead_letter:publish(VHost, X, RK, QName, [{Reason, Msg}]),
+ rabbit_dead_letter:publish(Msg, Reason, X, RK, QName),
[AckTag | Acks]
end, [], BQS),
{_Guids, BQS2} = BQ:ack(Acks1, BQS1),
diff --git a/src/rabbit_dead_letter.erl b/src/rabbit_dead_letter.erl
index 06691a29ad..bf77f2d832 100644
--- a/src/rabbit_dead_letter.erl
+++ b/src/rabbit_dead_letter.erl
@@ -16,7 +16,7 @@
-module(rabbit_dead_letter).
--export([publish/6]).
+-export([publish/5]).
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
@@ -26,20 +26,17 @@
-type reason() :: 'expired' | 'rejected' | 'maxlen'.
-spec publish(rabbit_types:message(), reason(), rabbit_types:exchange(),
- 'undefined' | binary(), rabbit_amqqueue:name(),
- #{Name :: atom() => rabbit_fifo_client:state()}) -> 'ok'.
+ 'undefined' | binary(), rabbit_amqqueue:name()) -> 'ok'.
%%----------------------------------------------------------------------------
-publish(Msg, Reason, X, RK, QName, QueueStates0) ->
+publish(Msg, Reason, X, RK, QName) ->
DLMsg = make_msg(Msg, Reason, X#exchange.name, RK, QName),
Delivery = rabbit_basic:delivery(false, false, DLMsg, undefined),
{Queues, Cycles} = detect_cycles(Reason, DLMsg,
rabbit_exchange:route(X, Delivery)),
lists:foreach(fun log_cycle_once/1, Cycles),
- {_, _, QueueStates} = rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(Queues),
- Delivery, QueueStates0),
- QueueStates.
+ rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(Queues), Delivery).
make_msg(Msg = #basic_message{content = Content,
exchange_name = Exchange,
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 795465855b..4c7cb607ee 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -23,7 +23,7 @@
-export([credit/4]).
-export([purge/1]).
-export([stateless_deliver/2, deliver/3]).
--export([dead_letter_publish/5]).
+-export([dead_letter_publish/4]).
-export([queue_name/1]).
-export([cluster_state/1, status/2]).
-export([cancel_consumer_handler/3, cancel_consumer/3]).
@@ -498,11 +498,10 @@ delete_member(#amqqueue{pid = {RaName, _}, name = QName}, Node) ->
end.
%%----------------------------------------------------------------------------
-dlx_mfa(#amqqueue{name = Resource} = Q) ->
- #resource{virtual_host = VHost} = Resource,
+dlx_mfa(Q) ->
DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>, fun res_arg/2, Q), Q),
DLXRKey = args_policy_lookup(<<"dead-letter-routing-key">>, fun res_arg/2, Q),
- {?MODULE, dead_letter_publish, [VHost, DLX, DLXRKey, Q#amqqueue.name]}.
+ {?MODULE, dead_letter_publish, [DLX, DLXRKey, Q#amqqueue.name]}.
init_dlx(undefined, _Q) ->
undefined;
@@ -520,10 +519,12 @@ args_policy_lookup(Name, Resolve, Q = #amqqueue{arguments = Args}) ->
{PolVal, {_Type, ArgVal}} -> Resolve(PolVal, ArgVal)
end.
-dead_letter_publish(_, undefined, _, _, _) ->
+dead_letter_publish(undefined, _, _, _) ->
ok;
-dead_letter_publish(VHost, X, RK, QName, ReasonMsgs) ->
- rabbit_vhost_dead_letter:publish(VHost, X, RK, QName, ReasonMsgs).
+dead_letter_publish(X, RK, QName, ReasonMsgs) ->
+ {ok, Exchange} = rabbit_exchange:lookup(X),
+ [rabbit_dead_letter:publish(Msg, Reason, Exchange, RK, QName)
+ || {Reason, Msg} <- ReasonMsgs].
%% TODO escape hack
qname_to_rname(#resource{virtual_host = <<"/">>, name = Name}) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index f91f8a648e..83738b7e52 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -488,12 +488,10 @@ start(VHost, DurableQueues) ->
Ref = proplists:get_value(persistent_ref, Terms),
Ref =/= undefined
end],
- start_dead_letter_process(VHost),
start_msg_store(VHost, ClientRefs, StartFunState),
{ok, AllTerms}.
stop(VHost) ->
- ok = rabbit_vhost_dead_letter:stop(VHost),
ok = stop_msg_store(VHost),
ok = rabbit_queue_index:stop(VHost).
@@ -517,14 +515,6 @@ do_start_msg_store(VHost, Type, Refs, StartFunState) ->
exit({error, Error})
end.
-start_dead_letter_process(VHost) ->
- case rabbit_vhost_dead_letter:start(VHost) of
- {ok, _} ->
- rabbit_log:info("Started dead letter process for vhost '~s'~n", [VHost]);
- Err ->
- exit(Err)
- end.
-
abbreviated_type(?TRANSIENT_MSG_STORE) -> transient;
abbreviated_type(?PERSISTENT_MSG_STORE) -> persistent.
diff --git a/src/rabbit_vhost_dead_letter.erl b/src/rabbit_vhost_dead_letter.erl
deleted file mode 100644
index b4dede5e19..0000000000
--- a/src/rabbit_vhost_dead_letter.erl
+++ /dev/null
@@ -1,132 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
-%%
-
--module(rabbit_vhost_dead_letter).
-
--include("rabbit.hrl").
-
--behaviour(gen_server).
-
--export([start/1, start_link/0]).
--export([stop/1]).
--export([publish/5]).
--export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
- code_change/3]).
-
--record(state, {queue_states,
- queue_cleanup_timer}).
-
-start(VHost) ->
- case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
- {ok, VHostSup} ->
- supervisor2:start_child(VHostSup,
- {rabbit_vhost_dead_letter,
- {rabbit_vhost_dead_letter, start_link, []},
- transient, ?WORKER_WAIT, worker, [rabbit_vhost_dead_letter]});
- {error, {no_such_vhost, VHost}} = E ->
- rabbit_log:error("Failed to start a dead letter process for vhost ~s: vhost no"
- " longer exists!", [VHost]),
- E
- end.
-
-stop(VHost) ->
- case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
- {ok, VHostSup} ->
- ok = supervisor2:terminate_child(VHostSup, rabbit_vhost_dead_letter),
- ok = supervisor2:delete_child(VHostSup, rabbit_vhost_dead_letter);
- {error, {no_such_vhost, VHost}} ->
- rabbit_log:error("Failed to stop a dead letter process for vhost ~s: "
- "vhost no longer exists!", [VHost]),
-
- ok
- end.
-
-publish(VHost, X, RK, QName, ReasonMsgs) ->
- case vhost_dead_letter_pid(VHost) of
- no_pid ->
- %% TODO what to do???
- ok;
- Pid ->
- gen_server:cast(Pid, {publish, X, RK, QName, ReasonMsgs})
- end.
-
-vhost_dead_letter_pid(VHost) ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:get_vhost_sup(VHost),
- case supervisor2:find_child(VHostSup, rabbit_vhost_dead_letter) of
- [Pid] -> Pid;
- [] -> no_pid
- end.
-
-start_link() ->
- gen_server:start_link(?MODULE, [], []).
-
-init([]) ->
- {ok, init_queue_cleanup_timer(#state{queue_states = #{}})}.
-
-handle_call(_Req, _From, State) ->
- {reply, ok, State}.
-
-handle_cast({publish, X, RK, QName, ReasonMsgs}, #state{queue_states = QueueStates0} = State)
- when is_record(X, exchange) ->
- QueueStates = batch_publish(X, RK, QName, ReasonMsgs, QueueStates0),
- {noreply, State#state{queue_states = QueueStates}};
-handle_cast({publish, DLX, RK, QName, ReasonMsgs}, #state{queue_states = QueueStates0} = State) ->
- QueueStates =
- case rabbit_exchange:lookup(DLX) of
- {ok, X} ->
- batch_publish(X, RK, QName, ReasonMsgs, QueueStates0);
- {error, not_found} ->
- QueueStates0
- end,
- {noreply, State#state{queue_states = QueueStates}}.
-
-handle_info({ra_event, {Name, _}, _} = Evt,
- #state{queue_states = QueueStates} = State0) ->
- FState0 = maps:get(Name, QueueStates),
- case rabbit_quorum_queue:handle_event(Evt, FState0) of
- {_, _, _, FState1} ->
- {noreply,
- State0#state{queue_states = maps:put(Name, FState1, QueueStates)}};
- eol ->
- {noreply,
- State0#state{queue_states = maps:remove(Name, QueueStates)}}
- end;
-handle_info(queue_cleanup, State = #state{queue_states = QueueStates0}) ->
- QueueStates = maps:filter(fun(Name, _) ->
- QName = rabbit_quorum_queue:queue_name(Name),
- case rabbit_amqqueue:lookup(QName) of
- [] ->
- false;
- _ ->
- true
- end
- end, QueueStates0),
- {noreply, init_queue_cleanup_timer(State#state{queue_states = QueueStates})};
-handle_info(_I, State) ->
- {noreply, State}.
-
-terminate(_, _) -> ok.
-
-code_change(_, State, _) -> {ok, State}.
-
-batch_publish(X, RK, QName, ReasonMsgs, QueueStates) ->
- lists:foldl(fun({Reason, Msg}, Acc) ->
- rabbit_dead_letter:publish(Msg, Reason, X, RK, QName, Acc)
- end, QueueStates, ReasonMsgs).
-
-init_queue_cleanup_timer(State) ->
- {ok, Interval} = application:get_env(rabbit, channel_queue_cleanup_interval),
- State#state{queue_cleanup_timer = erlang:send_after(Interval, self(), queue_cleanup)}.