summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl38
1 files changed, 19 insertions, 19 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e32ed1c458..17d84533dd 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -489,28 +489,18 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
IsPersistent = is_message_persistent(DecodedContent),
- % PubAck transient messages immediately
+ %% PubAck transient messages immediately
{MsgSeqNo, State1}
= case State#ch.confirm_enabled of
false ->
{undefined, State};
true ->
Count = State#ch.published_count,
- % Add the current message to the need_acking list
- State01 = State #ch {
- need_confirming = gb_sets:add(Count,
- State#ch.need_confirming) },
- % Ack transient messages now
- {CountOrUndefined, State02} =
- case IsPersistent of
- true -> {Count, State01};
- false -> {undefined,
- send_or_enqueue_ack(
- Count, State01)}
- end,
- % Increase the PubAck counter
- {CountOrUndefined,
- State02 #ch { published_count = Count + 1 }}
+ {Count,
+ State #ch { published_count = Count + 1,
+ need_confirming =
+ gb_sets:add(Count,
+ State#ch.need_confirming) }}
end,
Message = #basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
@@ -520,10 +510,20 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
Exchange,
- rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, MsgSeqNo)),
- % PubAck after basic.returns
+ rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message,
+ case IsPersistent of
+ true -> MsgSeqNo;
+ false -> undefined
+ end)),
State2 = case RoutingRes of
- routed -> State1;
+ %% Confirm transient messages now
+ routed ->
+ case IsPersistent of
+ true -> State1;
+ false -> send_or_enqueue_ack(
+ MsgSeqNo, State1)
+ end;
+ %% Confirm after basic.returns
unroutable ->
ok = basic_return(Message, WriterPid, no_route),
send_or_enqueue_ack(MsgSeqNo, State1);