diff options
| -rw-r--r-- | src/rabbit_channel.erl | 38 |
1 files changed, 19 insertions, 19 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e32ed1c458..17d84533dd 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -489,28 +489,18 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), IsPersistent = is_message_persistent(DecodedContent), - % PubAck transient messages immediately + %% PubAck transient messages immediately {MsgSeqNo, State1} = case State#ch.confirm_enabled of false -> {undefined, State}; true -> Count = State#ch.published_count, - % Add the current message to the need_acking list - State01 = State #ch { - need_confirming = gb_sets:add(Count, - State#ch.need_confirming) }, - % Ack transient messages now - {CountOrUndefined, State02} = - case IsPersistent of - true -> {Count, State01}; - false -> {undefined, - send_or_enqueue_ack( - Count, State01)} - end, - % Increase the PubAck counter - {CountOrUndefined, - State02 #ch { published_count = Count + 1 }} + {Count, + State #ch { published_count = Count + 1, + need_confirming = + gb_sets:add(Count, + State#ch.need_confirming) }} end, Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, @@ -520,10 +510,20 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, - rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, MsgSeqNo)), - % PubAck after basic.returns + rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, + case IsPersistent of + true -> MsgSeqNo; + false -> undefined + end)), State2 = case RoutingRes of - routed -> State1; + %% Confirm transient messages now + routed -> + case IsPersistent of + true -> State1; + false -> send_or_enqueue_ack( + MsgSeqNo, State1) + end; + %% Confirm after basic.returns unroutable -> ok = basic_return(Message, WriterPid, no_route), send_or_enqueue_ack(MsgSeqNo, State1); |
