diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-02 13:41:28 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-02 13:41:28 +0000 |
| commit | 02fcc0b8c7475708c818f28a8a8c9022b6f012fb (patch) | |
| tree | b1023cd2f33bff6d8fcef720471ca111bb55d468 | |
| parent | de298c26de4b68707da041e6e4726356569f0915 (diff) | |
| parent | ada42944f5931ca278ff48009181baf6c1c68398 (diff) | |
| download | rabbitmq-server-git-02fcc0b8c7475708c818f28a8a8c9022b6f012fb.tar.gz | |
merge bug25372 into bug25327
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 130 |
1 files changed, 66 insertions, 64 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f9614517fe..66e48024fd 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -716,25 +716,59 @@ drop_expired_msgs(State = #q{dlx = DLX, backing_queue = BQ }) -> Now = now_micros(), ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end, - {Props, BQS1} = case DLX of - undefined -> BQ:dropwhile(ExpirePred, BQS); - _ -> {Next, Msgs, BQS2} = - BQ:fetchwhile(ExpirePred, - fun accumulate_msgs/3, - [], BQS), - case Msgs of - [] -> ok; - _ -> (dead_letter_fun(expired))( - lists:reverse(Msgs)) - end, - {Next, BQS2} - end, + {Props, State1} = + case DLX of + undefined -> {Next, BQS1} = BQ:dropwhile(ExpirePred, BQS), + {Next, State#q{backing_queue_state = BQS1}}; + _ -> case rabbit_exchange:lookup(DLX) of + {ok, X} -> + dead_letter_expired_msgs(ExpirePred, X, State); + {error, not_found} -> + {Next, BQS1} = BQ:dropwhile(ExpirePred, BQS), + {Next, State#q{backing_queue_state = BQS1}} + end + end, ensure_ttl_timer(case Props of undefined -> undefined; #message_properties{expiry = Exp} -> Exp - end, State#q{backing_queue_state = BQS1}). - -accumulate_msgs(Msg, AckTag, Acc) -> [{Msg, AckTag} | Acc]. + end, State1). + +dead_letter_expired_msgs(ExpirePred, X, State = #q{backing_queue = BQ}) -> + dead_letter_msgs(fun (DLFun, Acc, BQS1) -> + BQ:fetchwhile(ExpirePred, DLFun, Acc, BQS1) + end, expired, X, State). + +dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) -> + {ok, State1} = + dead_letter_msgs( + fun (DLFun, Acc, BQS) -> + {Acc1, BQS1} = BQ:ackfold(DLFun, Acc, BQS, AckTags), + {ok, Acc1, BQS1} + end, rejected, X, State), + State1. + +dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK, + publish_seqno = SeqNo0, + unconfirmed = UC0, + queue_monitors = QMons0, + backing_queue_state = BQS, + backing_queue = BQ}) -> + QName = qname(State), + {Res, {AckImm1, SeqNo1, UC1, QMons1}, BQS1} = + Fun(fun (Msg, AckTag, {AckImm, SeqNo, UC, QMons}) -> + case dead_letter_publish(Msg, Reason, + X, RK, SeqNo, QName) of + [] -> {[AckTag | AckImm], SeqNo, UC, QMons}; + QPids -> {AckImm, SeqNo + 1, + dtree:insert(SeqNo, QPids, AckTag, UC), + pmon:monitor_all(QPids, QMons)} + end + end, {[], SeqNo0, UC0, QMons0}, BQS), + {_Guids, BQS2} = BQ:ack(AckImm1, BQS1), + {Res, State#q{publish_seqno = SeqNo1, + unconfirmed = UC1, + queue_monitors = QMons1, + backing_queue_state = BQS2}}. ensure_ttl_timer(undefined, State) -> State; @@ -755,11 +789,8 @@ ensure_ttl_timer(Expiry, State = #q{ttl_timer_ref = TRef, ensure_ttl_timer(_Expiry, State) -> State. -dead_letter_fun(Reason) -> - fun(Msgs) -> gen_server2:cast(self(), {dead_letter, Msgs, Reason}) end. - -dead_letter_publish(Msg, Reason, X, State = #q{publish_seqno = MsgSeqNo}) -> - DLMsg = make_dead_letter_msg(Reason, Msg, State), +dead_letter_publish(Msg, Reason, X, RK, MsgSeqNo, QName) -> + DLMsg = make_dead_letter_msg(Msg, Reason, X#exchange.name, RK, QName), Delivery = rabbit_basic:delivery(false, DLMsg, MsgSeqNo), {Queues, Cycles} = detect_dead_letter_cycles( DLMsg, rabbit_exchange:route(X, Delivery)), @@ -838,19 +869,16 @@ detect_dead_letter_cycles(#basic_message{content = Content}, Queues) -> end end. -make_dead_letter_msg(Reason, - Msg = #basic_message{content = Content, +make_dead_letter_msg(Msg = #basic_message{content = Content, exchange_name = Exchange, routing_keys = RoutingKeys}, - State = #q{dlx = DLX, dlx_routing_key = DlxRoutingKey}) -> + Reason, DLX, RK, #resource{name = QName}) -> {DeathRoutingKeys, HeadersFun1} = - case DlxRoutingKey of + case RK of undefined -> {RoutingKeys, fun (H) -> H end}; - _ -> {[DlxRoutingKey], - fun (H) -> lists:keydelete(<<"CC">>, 1, H) end} + _ -> {[RK], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end} end, ReasonBin = list_to_binary(atom_to_list(Reason)), - #resource{name = QName} = qname(State), TimeSec = rabbit_misc:now_ms() div 1000, HeadersFun2 = fun (Headers) -> @@ -1196,17 +1224,16 @@ handle_cast({reject, AckTags, true, ChPid}, State) -> handle_cast({reject, AckTags, false, ChPid}, State = #q{dlx = undefined}) -> noreply(ack(AckTags, ChPid, State)); -handle_cast({reject, AckTags, false, ChPid}, State) -> - DLXFun = dead_letter_fun(rejected), - noreply(subtract_acks( - ChPid, AckTags, State, - fun (State1 = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - {ok, BQS1} = BQ:ackfold( - fun (M, A, ok) -> DLXFun([{M, A}]) end, - ok, BQS, AckTags), - State1#q{backing_queue_state = BQS1} - end)); +handle_cast({reject, AckTags, false, ChPid}, State = #q{dlx = DLX}) -> + noreply(case rabbit_exchange:lookup(DLX) of + {ok, X} -> subtract_acks( + ChPid, AckTags, State, + fun (State1) -> + dead_letter_rejected_msgs( + AckTags, X, State1) + end); + {error, not_found} -> ack(AckTags, ChPid, State) + end); handle_cast(delete_immediately, State) -> stop(State); @@ -1252,31 +1279,6 @@ handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), noreply(State); -handle_cast({dead_letter, Msgs, Reason}, State = #q{dlx = XName}) -> - case rabbit_exchange:lookup(XName) of - {ok, X} -> - {AckImmediately, State2} = - lists:foldl( - fun({Msg, AckTag}, - {Acks, State1 = #q{publish_seqno = SeqNo, - unconfirmed = UC, - queue_monitors = QMons}}) -> - case dead_letter_publish(Msg, Reason, X, State1) of - [] -> {[AckTag | Acks], State1}; - QPids -> UC1 = dtree:insert( - SeqNo, QPids, AckTag, UC), - QMons1 = pmon:monitor_all(QPids, QMons), - {Acks, - State1#q{publish_seqno = SeqNo + 1, - unconfirmed = UC1, - queue_monitors = QMons1}} - end - end, {[], State}, Msgs), - cleanup_after_confirm(AckImmediately, State2); - {error, not_found} -> - cleanup_after_confirm([AckTag || {_, AckTag} <- Msgs], State) - end; - handle_cast(start_mirroring, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> %% lookup again to get policy for init_with_existing_bq |
