summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl12
-rw-r--r--src/rabbit_channel.erl20
2 files changed, 17 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f6255d2ee9..e78eb06f06 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -426,6 +426,9 @@ confirm_message(#basic_message{guid = Guid}, State) ->
record_confirm_message(#delivery{msg_seq_no = undefined}, State) ->
State;
+record_confirm_message(#delivery{message = #basic_message{
+ is_persistent = false}}, State) ->
+ State;
record_confirm_message(#delivery{msg_seq_no = MsgSeqNo,
sender = ChPid,
message = #basic_message{guid = Guid}},
@@ -447,9 +450,15 @@ run_message_queue(State) ->
State2.
attempt_delivery(#delivery{txn = none,
+ sender = ChPid,
message = Message,
msg_seq_no = MsgSeqNo},
State = #q{backing_queue = BQ}) ->
+ IsPersistent = Message#basic_message.is_persistent,
+ case IsPersistent of
+ false -> rabbit_channel:confirm(ChPid, MsgSeqNo);
+ _ -> ok
+ end,
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) ->
@@ -460,8 +469,7 @@ attempt_delivery(#delivery{txn = none,
BQ:publish_delivered(AckRequired, Message,
?BASE_MESSAGE_PROPERTIES
#message_properties {
- needs_confirming =
- (MsgSeqNo =/= undefined)},
+ needs_confirming = IsPersistent },
BQS),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 7c45b52d9d..fdf4bdc218 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -556,12 +556,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
Exchange,
- rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message,
- case IsPersistent of
- true -> MsgSeqNo;
- false -> undefined
- end)),
- State2 = process_routing_result(RoutingRes, DeliveredQPids, IsPersistent,
+ rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, MsgSeqNo)),
+ State2 = process_routing_result(RoutingRes, DeliveredQPids,
MsgSeqNo, Message, State1),
maybe_incr_stats([{ExchangeName, 1} |
[{{QPid, ExchangeName}, 1} ||
@@ -1245,19 +1241,17 @@ is_message_persistent(Content) ->
IsPersistent
end.
-process_routing_result(unroutable, _, _, MsgSeqNo, Message, State) ->
+process_routing_result(unroutable, _, MsgSeqNo, Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_route),
send_or_enqueue_ack(MsgSeqNo, undefined, State);
-process_routing_result(not_delivered, _, _, MsgSeqNo, Message, State) ->
+process_routing_result(not_delivered, _, MsgSeqNo, Message, State) ->
ok = basic_return(Message, State#ch.writer_pid, no_consumers),
send_or_enqueue_ack(MsgSeqNo, undefined, State);
-process_routing_result(routed, [], _, MsgSeqNo, _, State) ->
+process_routing_result(routed, [], MsgSeqNo, _, State) ->
send_or_enqueue_ack(MsgSeqNo, undefined, State);
-process_routing_result(routed, _, _, undefined, _, State) ->
+process_routing_result(routed, _, undefined, _, State) ->
State;
-process_routing_result(routed, _, false, MsgSeqNo, _, State) ->
- send_or_enqueue_ack(MsgSeqNo, undefined, State);
-process_routing_result(routed, QPids, true, MsgSeqNo, _,
+process_routing_result(routed, QPids, MsgSeqNo, _,
State = #ch{queues_for_msg = QFM}) ->
QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM),
[maybe_monitor(QPid) || QPid <- QPids],