summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2012-02-03 13:01:07 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2012-02-03 13:01:07 +0000
commit0437e0c87f9f1a2dcb95d880c9380523f8ba3203 (patch)
treeee5b2a42e634dd14b4f6e0935fb86cda0559c2a5
parent568f4601c90c8c55dcc65126d94ad284275a60ab (diff)
downloadrabbitmq-server-git-0437e0c87f9f1a2dcb95d880c9380523f8ba3203.tar.gz
drop messages in dl cycles
This is the fate of messages in cycles: =WARNING REPORT==== 3-Feb-2012::12:54:34 === Message dropped. Dead-letter queues cycle detected: [<<"foo">>,<<"foo">>]
-rw-r--r--src/rabbit_amqqueue_process.erl47
1 files changed, 45 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 60772e25f3..04728a030b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -872,6 +872,33 @@ cleanup_after_confirm(State = #q{blocked_op = Op,
noreply(State1#q{blocked_op = Op})
end.
+already_been_here(Delivery = #delivery{message = #basic_message{content = Content}},
+ State) ->
+ #content{properties = #'P_basic'{headers = Headers}} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ #resource{name = QueueName} = qname(State),
+ case Headers of
+ undefined ->
+ false;
+ _ ->
+ case rabbit_misc:table_lookup(Headers, <<"x-death">>) of
+ {array, DeathTables} ->
+ OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) ||
+ {table, D} <- DeathTables],
+ OldQueues1 = lists:append(
+ lists:map(fun (undefined) -> [];
+ ({longstr, QName}) -> [QName]
+ end, OldQueues)),
+ case lists:any(fun(QName) -> QName == QueueName end,
+ OldQueues1) of
+ true -> [QueueName | OldQueues1];
+ _ -> false
+ end;
+ _ ->
+ false
+ end
+ end.
+
make_dead_letter_msg(DLX, Reason,
Msg = #basic_message{content = Content,
exchange_name = Exchange,
@@ -1252,9 +1279,25 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
-handle_cast({deliver, Delivery}, State) ->
+handle_cast({deliver, Delivery = #delivery{sender = Sender,
+ msg_seq_no = MsgSeqNo}},
+ State = #q{dlx = DLX}) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- noreply(deliver_or_enqueue(Delivery, State));
+ case DLX of
+ undefined ->
+ noreply(deliver_or_enqueue(Delivery, State));
+ _ ->
+ case already_been_here(Delivery, State) of
+ false ->
+ noreply(deliver_or_enqueue(Delivery, State));
+ Qs ->
+ rabbit_log:warning(
+ "Message dropped. Dead-letter queues " ++
+ "cycle detected: ~p~n", [Qs]),
+ rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]),
+ noreply(State)
+ end
+ end;
handle_cast({ack, AckTags, ChPid}, State) ->
noreply(subtract_acks(