summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl47
1 files changed, 45 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 60772e25f3..04728a030b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -872,6 +872,33 @@ cleanup_after_confirm(State = #q{blocked_op = Op,
noreply(State1#q{blocked_op = Op})
end.
+already_been_here(Delivery = #delivery{message = #basic_message{content = Content}},
+ State) ->
+ #content{properties = #'P_basic'{headers = Headers}} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ #resource{name = QueueName} = qname(State),
+ case Headers of
+ undefined ->
+ false;
+ _ ->
+ case rabbit_misc:table_lookup(Headers, <<"x-death">>) of
+ {array, DeathTables} ->
+ OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) ||
+ {table, D} <- DeathTables],
+ OldQueues1 = lists:append(
+ lists:map(fun (undefined) -> [];
+ ({longstr, QName}) -> [QName]
+ end, OldQueues)),
+ case lists:any(fun(QName) -> QName == QueueName end,
+ OldQueues1) of
+ true -> [QueueName | OldQueues1];
+ _ -> false
+ end;
+ _ ->
+ false
+ end
+ end.
+
make_dead_letter_msg(DLX, Reason,
Msg = #basic_message{content = Content,
exchange_name = Exchange,
@@ -1252,9 +1279,25 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
-handle_cast({deliver, Delivery}, State) ->
+handle_cast({deliver, Delivery = #delivery{sender = Sender,
+ msg_seq_no = MsgSeqNo}},
+ State = #q{dlx = DLX}) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- noreply(deliver_or_enqueue(Delivery, State));
+ case DLX of
+ undefined ->
+ noreply(deliver_or_enqueue(Delivery, State));
+ _ ->
+ case already_been_here(Delivery, State) of
+ false ->
+ noreply(deliver_or_enqueue(Delivery, State));
+ Qs ->
+ rabbit_log:warning(
+ "Message dropped. Dead-letter queues " ++
+ "cycle detected: ~p~n", [Qs]),
+ rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]),
+ noreply(State)
+ end
+ end;
handle_cast({ack, AckTags, ChPid}, State) ->
noreply(subtract_acks(