diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 37 |
1 files changed, 27 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b016c4d20a..b0b37bb2b9 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -846,7 +846,7 @@ dead_letter_publish(Msg, Reason, X, RK, MsgSeqNo, QName) -> DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName), Delivery = rabbit_basic:delivery(false, DLMsg, MsgSeqNo), {Queues, Cycles} = detect_dead_letter_cycles( - DLMsg, rabbit_exchange:route(X, Delivery)), + Reason, DLMsg, rabbit_exchange:route(X, Delivery)), lists:foreach(fun log_cycle_once/1, Cycles), {_, DeliveredQPids} = rabbit_amqqueue:deliver( rabbit_amqqueue:lookup(Queues), Delivery), @@ -895,7 +895,8 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS, false -> noreply(State1) end. -detect_dead_letter_cycles(#basic_message{content = Content}, Queues) -> +detect_dead_letter_cycles(expired, + #basic_message{content = Content}, Queues) -> #content{properties = #'P_basic'{headers = Headers}} = rabbit_binary_parser:ensure_content_decoded(Content), NoCycles = {Queues, []}, @@ -904,22 +905,38 @@ detect_dead_letter_cycles(#basic_message{content = Content}, Queues) -> NoCycles; _ -> case rabbit_misc:table_lookup(Headers, <<"x-death">>) of - {array, DeathTables} -> - OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) || - {table, D} <- DeathTables], - OldQueues1 = [QName || {longstr, QName} <- OldQueues], - OldQueuesSet = ordsets:from_list(OldQueues1), + {array, Deaths} -> {Cycling, NotCycling} = lists:partition( - fun(Queue) -> - ordsets:is_element(Queue#resource.name, - OldQueuesSet) + fun (#resource{name = Queue}) -> + is_dead_letter_cycle(Queue, Deaths) end, Queues), + OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) || + {table, D} <- Deaths], + OldQueues1 = [QName || {longstr, QName} <- OldQueues], {NotCycling, [[QName | OldQueues1] || #resource{name = QName} <- Cycling]}; _ -> NoCycles end + end; +detect_dead_letter_cycles(_Reason, _Msg, Queues) -> + {Queues, []}. + +is_dead_letter_cycle(Queue, Deaths) -> + {Cycle, Rest} = + lists:splitwith( + fun ({table, D}) -> + {longstr, Queue} =/= rabbit_misc:table_lookup(D, <<"queue">>); + (_) -> + true + end, lists:reverse(Deaths)), + %% Is there a cycle, and if so, is it entirely due to expiry? + case Rest of + [] -> false; + [H|_] -> [] =:= [D || {table, D} <- Cycle ++ [H], + {longstr, <<"expired">>} =/= + rabbit_misc:table_lookup(D, <<"reason">>)] end. make_dead_letter_msg(Msg = #basic_message{content = Content, |
