diff options
| -rw-r--r-- | src/rabbit_channel.erl | 47 |
1 files changed, 26 insertions, 21 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 07d35ce0f3..04875b5e6c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -266,10 +266,10 @@ handle_cast(multiple_ack_flush, false -> flush_multiple(As, WriterPid) end, {noreply, State#ch{confirm = C#confirm{held_acks = gb_sets:new(), - tref = undefined}}}. -%handle_cast({confirm, MsgSeqNo}, State) -> -% rabbit_log:info("got confirm for #~p~n", [MsgSeqNo]), -% {noreply, SOMETHING MAGIC + tref = undefined}}}; +handle_cast({confirm, MsgSeqNo}, State) -> + rabbit_log:info("got confirm for #~p~n", [MsgSeqNo]), + {noreply, send_or_enqueue_ack(MsgSeqNo, State)}. handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, @@ -424,23 +424,18 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -handle_confirm(State = #ch{confirm = #confirm{enabled = false}}, _) -> +send_or_enqueue_ack(_, State = #ch{confirm = #confirm{enabled = false}}) -> State; -handle_confirm(State = #ch{writer_pid = WriterPid, - confirm = C = #confirm{ count = Count, multiple = false}}, - false) -> - rabbit_log:info("handling confirm in single transient mode (#~p)~n", [Count]), - ok = rabbit_writer:send_command(WriterPid, #'basic.ack'{ delivery_tag = Count }), - State#ch{confirm = C#confirm{ count = Count+1 }}; -handle_confirm(State = #ch{confirm = #confirm{multiple = true}}, false) -> - State1 = #ch{confirm = C = #confirm{count = Count, - held_acks = As}} = start_ack_timer(State), - rabbit_log:info("handling confirm in multiple transient mode (#~p)~n", [Count]), - State1#ch{confirm = C#confirm{count = Count+1, - held_acks = gb_sets:add(Count, As)}}; -handle_confirm(State = #ch{confirm = C = #confirm{count = Count}}, IsPersistent) -> - rabbit_log:info("handling confirm (#~p, persistent = ~p)~n", [Count, IsPersistent]), - State#ch{confirm = C#confirm{count = Count+1}}. +send_or_enqueue_ack(MsgSeqNo, + State = #ch{writer_pid = WriterPid, + confirm = #confirm{multiple = false}}) -> + rabbit_log:info("handling confirm in single mode (#~p)~n", [MsgSeqNo]), + ok = rabbit_writer:send_command(WriterPid, #'basic.ack'{delivery_tag = MsgSeqNo}), + State; +send_or_enqueue_ack(MsgSeqNo, State = #ch{confirm = #confirm{multiple = true}}) -> + rabbit_log:info("handling confirm in multiple mode (#~p)~n", [MsgSeqNo]), + State1 = #ch{confirm = C = #confirm{held_acks = As}} = start_ack_timer(State), + State1#ch{confirm = C#confirm{held_acks = gb_sets:add(MsgSeqNo, As)}}. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -474,7 +469,17 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), IsPersistent = is_message_persistent(DecodedContent), - State1 = handle_confirm(State, IsPersistent), + {_MsgSeqNo, State1} + = case State#ch.confirm#confirm.enabled of + false -> + {undefined, State}; + true -> + Count = State#ch.confirm#confirm.count, + NewState = send_or_enqueue_ack(Count, State), + Confirm = NewState#ch.confirm, + {Count, + NewState#ch{confirm = Confirm#confirm{count = Count+1}}} + end, Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = DecodedContent, |
