diff options
| -rw-r--r-- | src/rabbit_channel.erl | 51 |
1 files changed, 34 insertions, 17 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e42f49c590..38c4855020 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -49,6 +49,7 @@ username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, pubAck_mode}). +-record(pubAck, {enabled, count, many}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -172,7 +173,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> blocking = dict:new(), queue_collector_pid = CollectorPid, stats_timer = rabbit_event:init_stats_timer(), - pubAck_mode = none}, + pubAck_mode = #pubAck{ enabled = false, + count = 0 }}, rabbit_event:notify( channel_created, [{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]), @@ -393,6 +395,16 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. +handle_pubAck(PA = #pubAck{ enabled = false }, _) -> + PA; +handle_pubAck(PA = #pubAck{ count = Count, many = false }, WriterPid) -> + rabbit_log:info("handling pubAck in single mode (#~p)~n", [Count]), + ok = rabbit_writer:send_command(WriterPid, #'basic.ack'{ delivery_tag = Count }), + PA#pubAck{ count = Count+1 }; +handle_pubAck(PA = #pubAck{ count = Count }, _WriterPid) -> + rabbit_log:info("handling pubAck in many mode (#~p)~n", [Count]), + PA#pubAck{ count = Count+1 }. + handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -417,10 +429,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, immediate = Immediate}, Content, State = #ch{virtual_host = VHostPath, transaction_id = TxnKey, - writer_pid = WriterPid}) -> + writer_pid = WriterPid, + pubAck_mode = PubAck}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), + PubAck1 = handle_pubAck(PubAck, WriterPid), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), @@ -442,9 +456,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, maybe_incr_stats([{ExchangeName, 1} | [{{QPid, ExchangeName}, 1} || QPid <- DeliveredQPids]], publish, State), + State1 = State#ch{ pubAck_mode = PubAck1 }, {noreply, case TxnKey of - none -> State; - _ -> add_tx_participants(DeliveredQPids, State) + none -> State1; + _ -> add_tx_participants(DeliveredQPids, State1) end}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, @@ -832,11 +847,13 @@ handle_method(#'queue.purge'{queue = QueueNameBin, return_ok(State, NoWait, #'queue.purge_ok'{message_count = PurgedMessageCount}); -handle_method(#'tx.select'{}, _, State = #ch{transaction_id = none, - pubAck_mode = none}) -> +handle_method(#'tx.select'{}, _, + State = #ch{transaction_id = none, + pubAck_mode = #pubAck{enabled = false}}) -> {reply, #'tx.select_ok'{}, new_tx(State)}; -handle_method(#'tx.select'{}, _, State = #ch{pubAck_mode = none}) -> +handle_method(#'tx.select'{}, _, + State = #ch{pubAck_mode = #pubAck{enabled = false}}) -> {reply, #'tx.select_ok'{}, State}; handle_method(#'tx.select'{}, _, _State) -> @@ -858,24 +875,24 @@ handle_method(#'tx.rollback'{}, _, State) -> {reply, #'tx.rollback_ok'{}, internal_rollback(State)}; handle_method(#'pubAck.select'{many = Many}, _, - State = #ch{transaction_id = none}) -> + State = #ch{ transaction_id = none, + pubAck_mode = PA}) -> rabbit_log:info("got pubAck.select{many = ~p}~n", [Many]), - {noreply, State#ch{ pubAck_mode = case Many of - true -> 'pubAckMany'; - false -> 'pubAck' - end - }}; -handle_method(#'pubAck.select'{}, _, State) -> + {noreply, State#ch{ pubAck_mode = PA#'pubAck'{ enabled = true, + many = Many } }}; +handle_method(#'pubAck.select'{}, _, _State) -> rabbit_misc:protocol_error( precondition_failed, "transactional channel cannot be made pubAck", []); -handle_method(#'pubAck.deselect'{}, _, State = #ch{ pubAck_mode = 'none' }) -> +handle_method(#'pubAck.deselect'{}, _, + #ch{ pubAck_mode = #pubAck { enabled = false }}) -> rabbit_misc:protocol_error( precondition_failed, "channel must first be in pubAck, to deselect", []); -handle_method(#'pubAck.deselect'{}, _, State) -> +handle_method(#'pubAck.deselect'{}, _, State = #ch{ pubAck_mode = PA }) -> rabbit_log:info("got pubAck.deselect{}~n"), - {reply, #'pubAck.deselect_ok'{}, State#ch{ pubAck_mode = 'none'}}; + {reply, #'pubAck.deselect_ok'{}, + State#ch{ pubAck_mode = PA#pubAck{ enabled = false }}}; handle_method(#'channel.flow'{active = true}, _, State = #ch{limiter_pid = LimiterPid}) -> |
