diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 20 |
2 files changed, 17 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f6255d2ee9..e78eb06f06 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -426,6 +426,9 @@ confirm_message(#basic_message{guid = Guid}, State) -> record_confirm_message(#delivery{msg_seq_no = undefined}, State) -> State; +record_confirm_message(#delivery{message = #basic_message{ + is_persistent = false}}, State) -> + State; record_confirm_message(#delivery{msg_seq_no = MsgSeqNo, sender = ChPid, message = #basic_message{guid = Guid}}, @@ -447,9 +450,15 @@ run_message_queue(State) -> State2. attempt_delivery(#delivery{txn = none, + sender = ChPid, message = Message, msg_seq_no = MsgSeqNo}, State = #q{backing_queue = BQ}) -> + IsPersistent = Message#basic_message.is_persistent, + case IsPersistent of + false -> rabbit_channel:confirm(ChPid, MsgSeqNo); + _ -> ok + end, PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> @@ -460,8 +469,7 @@ attempt_delivery(#delivery{txn = none, BQ:publish_delivered(AckRequired, Message, ?BASE_MESSAGE_PROPERTIES #message_properties { - needs_confirming = - (MsgSeqNo =/= undefined)}, + needs_confirming = IsPersistent }, BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 7c45b52d9d..fdf4bdc218 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -556,12 +556,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, - rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, - case IsPersistent of - true -> MsgSeqNo; - false -> undefined - end)), - State2 = process_routing_result(RoutingRes, DeliveredQPids, IsPersistent, + rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, MsgSeqNo)), + State2 = process_routing_result(RoutingRes, DeliveredQPids, MsgSeqNo, Message, State1), maybe_incr_stats([{ExchangeName, 1} | [{{QPid, ExchangeName}, 1} || @@ -1245,19 +1241,17 @@ is_message_persistent(Content) -> IsPersistent end. -process_routing_result(unroutable, _, _, MsgSeqNo, Message, State) -> +process_routing_result(unroutable, _, MsgSeqNo, Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_route), send_or_enqueue_ack(MsgSeqNo, undefined, State); -process_routing_result(not_delivered, _, _, MsgSeqNo, Message, State) -> +process_routing_result(not_delivered, _, MsgSeqNo, Message, State) -> ok = basic_return(Message, State#ch.writer_pid, no_consumers), send_or_enqueue_ack(MsgSeqNo, undefined, State); -process_routing_result(routed, [], _, MsgSeqNo, _, State) -> +process_routing_result(routed, [], MsgSeqNo, _, State) -> send_or_enqueue_ack(MsgSeqNo, undefined, State); -process_routing_result(routed, _, _, undefined, _, State) -> +process_routing_result(routed, _, undefined, _, State) -> State; -process_routing_result(routed, _, false, MsgSeqNo, _, State) -> - send_or_enqueue_ack(MsgSeqNo, undefined, State); -process_routing_result(routed, QPids, true, MsgSeqNo, _, +process_routing_result(routed, QPids, MsgSeqNo, _, State = #ch{queues_for_msg = QFM}) -> QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM), [maybe_monitor(QPid) || QPid <- QPids], |
