diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2012-02-03 13:01:07 +0000 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2012-02-03 13:01:07 +0000 |
| commit | 0437e0c87f9f1a2dcb95d880c9380523f8ba3203 (patch) | |
| tree | ee5b2a42e634dd14b4f6e0935fb86cda0559c2a5 | |
| parent | 568f4601c90c8c55dcc65126d94ad284275a60ab (diff) | |
| download | rabbitmq-server-git-0437e0c87f9f1a2dcb95d880c9380523f8ba3203.tar.gz | |
drop messages in dl cycles
This is the fate of messages in cycles:
=WARNING REPORT==== 3-Feb-2012::12:54:34 ===
Message dropped. Dead-letter queues cycle detected: [<<"foo">>,<<"foo">>]
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 47 |
1 files changed, 45 insertions, 2 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 60772e25f3..04728a030b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -872,6 +872,33 @@ cleanup_after_confirm(State = #q{blocked_op = Op, noreply(State1#q{blocked_op = Op}) end. +already_been_here(Delivery = #delivery{message = #basic_message{content = Content}}, + State) -> + #content{properties = #'P_basic'{headers = Headers}} = + rabbit_binary_parser:ensure_content_decoded(Content), + #resource{name = QueueName} = qname(State), + case Headers of + undefined -> + false; + _ -> + case rabbit_misc:table_lookup(Headers, <<"x-death">>) of + {array, DeathTables} -> + OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) || + {table, D} <- DeathTables], + OldQueues1 = lists:append( + lists:map(fun (undefined) -> []; + ({longstr, QName}) -> [QName] + end, OldQueues)), + case lists:any(fun(QName) -> QName == QueueName end, + OldQueues1) of + true -> [QueueName | OldQueues1]; + _ -> false + end; + _ -> + false + end + end. + make_dead_letter_msg(DLX, Reason, Msg = #basic_message{content = Content, exchange_name = Exchange, @@ -1252,9 +1279,25 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); -handle_cast({deliver, Delivery}, State) -> +handle_cast({deliver, Delivery = #delivery{sender = Sender, + msg_seq_no = MsgSeqNo}}, + State = #q{dlx = DLX}) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - noreply(deliver_or_enqueue(Delivery, State)); + case DLX of + undefined -> + noreply(deliver_or_enqueue(Delivery, State)); + _ -> + case already_been_here(Delivery, State) of + false -> + noreply(deliver_or_enqueue(Delivery, State)); + Qs -> + rabbit_log:warning( + "Message dropped. Dead-letter queues " ++ + "cycle detected: ~p~n", [Qs]), + rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]), + noreply(State) + end + end; handle_cast({ack, AckTags, ChPid}, State) -> noreply(subtract_acks( |
