diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-11 13:54:36 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-11 13:54:36 +0100 |
| commit | 5cc92b64e688ba403233eff17fcfbdf3d1460431 (patch) | |
| tree | 3a1b51949540d7a6004363ef6ac68d0e070a5ab9 | |
| parent | 912416cd714c6cbfabe95d0c6a9d473b34ecf9c1 (diff) | |
| download | rabbitmq-server-git-5cc92b64e688ba403233eff17fcfbdf3d1460431.tar.gz | |
added counter and single pubacks for transient messages
Publishes are counter starting with the first one after
pubAck.select().
If many is set to false, transient messages are ack'd as soon as the
are received, but after the exchange name is resolved.
| -rw-r--r-- | src/rabbit_channel.erl | 51 |
1 files changed, 34 insertions, 17 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e42f49c590..38c4855020 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -49,6 +49,7 @@ username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, pubAck_mode}). +-record(pubAck, {enabled, count, many}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -172,7 +173,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> blocking = dict:new(), queue_collector_pid = CollectorPid, stats_timer = rabbit_event:init_stats_timer(), - pubAck_mode = none}, + pubAck_mode = #pubAck{ enabled = false, + count = 0 }}, rabbit_event:notify( channel_created, [{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]), @@ -393,6 +395,16 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. +handle_pubAck(PA = #pubAck{ enabled = false }, _) -> + PA; +handle_pubAck(PA = #pubAck{ count = Count, many = false }, WriterPid) -> + rabbit_log:info("handling pubAck in single mode (#~p)~n", [Count]), + ok = rabbit_writer:send_command(WriterPid, #'basic.ack'{ delivery_tag = Count }), + PA#pubAck{ count = Count+1 }; +handle_pubAck(PA = #pubAck{ count = Count }, _WriterPid) -> + rabbit_log:info("handling pubAck in many mode (#~p)~n", [Count]), + PA#pubAck{ count = Count+1 }. + handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -417,10 +429,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, immediate = Immediate}, Content, State = #ch{virtual_host = VHostPath, transaction_id = TxnKey, - writer_pid = WriterPid}) -> + writer_pid = WriterPid, + pubAck_mode = PubAck}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), + PubAck1 = handle_pubAck(PubAck, WriterPid), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), @@ -442,9 +456,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, maybe_incr_stats([{ExchangeName, 1} | [{{QPid, ExchangeName}, 1} || QPid <- DeliveredQPids]], publish, State), + State1 = State#ch{ pubAck_mode = PubAck1 }, {noreply, case TxnKey of - none -> State; - _ -> add_tx_participants(DeliveredQPids, State) + none -> State1; + _ -> add_tx_participants(DeliveredQPids, State1) end}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, @@ -832,11 +847,13 @@ handle_method(#'queue.purge'{queue = QueueNameBin, return_ok(State, NoWait, #'queue.purge_ok'{message_count = PurgedMessageCount}); -handle_method(#'tx.select'{}, _, State = #ch{transaction_id = none, - pubAck_mode = none}) -> +handle_method(#'tx.select'{}, _, + State = #ch{transaction_id = none, + pubAck_mode = #pubAck{enabled = false}}) -> {reply, #'tx.select_ok'{}, new_tx(State)}; -handle_method(#'tx.select'{}, _, State = #ch{pubAck_mode = none}) -> +handle_method(#'tx.select'{}, _, + State = #ch{pubAck_mode = #pubAck{enabled = false}}) -> {reply, #'tx.select_ok'{}, State}; handle_method(#'tx.select'{}, _, _State) -> @@ -858,24 +875,24 @@ handle_method(#'tx.rollback'{}, _, State) -> {reply, #'tx.rollback_ok'{}, internal_rollback(State)}; handle_method(#'pubAck.select'{many = Many}, _, - State = #ch{transaction_id = none}) -> + State = #ch{ transaction_id = none, + pubAck_mode = PA}) -> rabbit_log:info("got pubAck.select{many = ~p}~n", [Many]), - {noreply, State#ch{ pubAck_mode = case Many of - true -> 'pubAckMany'; - false -> 'pubAck' - end - }}; -handle_method(#'pubAck.select'{}, _, State) -> + {noreply, State#ch{ pubAck_mode = PA#'pubAck'{ enabled = true, + many = Many } }}; +handle_method(#'pubAck.select'{}, _, _State) -> rabbit_misc:protocol_error( precondition_failed, "transactional channel cannot be made pubAck", []); -handle_method(#'pubAck.deselect'{}, _, State = #ch{ pubAck_mode = 'none' }) -> +handle_method(#'pubAck.deselect'{}, _, + #ch{ pubAck_mode = #pubAck { enabled = false }}) -> rabbit_misc:protocol_error( precondition_failed, "channel must first be in pubAck, to deselect", []); -handle_method(#'pubAck.deselect'{}, _, State) -> +handle_method(#'pubAck.deselect'{}, _, State = #ch{ pubAck_mode = PA }) -> rabbit_log:info("got pubAck.deselect{}~n"), - {reply, #'pubAck.deselect_ok'{}, State#ch{ pubAck_mode = 'none'}}; + {reply, #'pubAck.deselect_ok'{}, + State#ch{ pubAck_mode = PA#pubAck{ enabled = false }}}; handle_method(#'channel.flow'{active = true}, _, State = #ch{limiter_pid = LimiterPid}) -> |
