diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-11 16:06:00 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-11 16:06:00 +0100 |
| commit | ce7d349cbfe1ea1f0f53c5fa1ad1164a0974b99a (patch) | |
| tree | d0ad12d9c904806c94ea8a50ea13ab9cf641dd71 | |
| parent | 91d12163db8232a8c1e2b16c81e21bdab0e04e27 (diff) | |
| download | rabbitmq-server-git-ce7d349cbfe1ea1f0f53c5fa1ad1164a0974b99a.tar.gz | |
rename pubAck -> confirm
| -rw-r--r-- | Makefile | 2 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 65 |
2 files changed, 31 insertions, 36 deletions
@@ -58,7 +58,7 @@ SIBLING_CODEGEN_DIR=../rabbitmq-codegen/ AMQP_CODEGEN_DIR=$(shell [ -d $(SIBLING_CODEGEN_DIR) ] && echo $(SIBLING_CODEGEN_DIR) || echo codegen) AMQP_SPEC_JSON_FILES_0_9_1=$(AMQP_CODEGEN_DIR)/amqp-rabbitmq-0.9.1.json AMQP_SPEC_JSON_FILES_0_8=$(AMQP_CODEGEN_DIR)/amqp-rabbitmq-0.8.json -RABBIT_SPEC_EXTENSIONS=$(AMQP_CODEGEN_DIR)/puback-extension.json +RABBIT_SPEC_EXTENSIONS=$(AMQP_CODEGEN_DIR)/confirm-extension.json ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index d06b7a3040..35e95a188d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -48,8 +48,8 @@ uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, - pubAck_mode}). --record(pubAck, {enabled, count, many}). + confirm}). +-record(confirm, {enabled, count, multiple}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -173,8 +173,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> blocking = dict:new(), queue_collector_pid = CollectorPid, stats_timer = rabbit_event:init_stats_timer(), - pubAck_mode = #pubAck{ enabled = false, - count = 0 }}, + confirm = #confirm{ enabled = false, + count = 0 }}, rabbit_event:notify( channel_created, [{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]), @@ -395,15 +395,15 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. -handle_pubAck(PA = #pubAck{ enabled = false }, _, _) -> - PA; -handle_pubAck(PA = #pubAck{ count = Count, many = false }, false, WriterPid) -> - rabbit_log:info("handling pubAck in single mode (#~p)~n", [Count]), +handle_confirm(C = #confirm{ enabled = false }, _, _) -> + C; +handle_confirm(C = #confirm{ count = Count, multiple = false }, false, WriterPid) -> + rabbit_log:info("handling confirm in single mode (#~p)~n", [Count]), ok = rabbit_writer:send_command(WriterPid, #'basic.ack'{ delivery_tag = Count }), - PA#pubAck{ count = Count+1 }; -handle_pubAck(PA = #pubAck{ count = Count }, IsPersistent, _WriterPid) -> - rabbit_log:info("handling pubAck (#~p, persistent = ~p)~n", [Count, IsPersistent]), - PA#pubAck{ count = Count+1 }. + C#confirm{ count = Count+1 }; +handle_confirm(C = #confirm{ count = Count }, IsPersistent, _WriterPid) -> + rabbit_log:info("handling confirm (#~p, persistent = ~p)~n", [Count, IsPersistent]), + C#confirm{ count = Count+1 }. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -430,7 +430,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, Content, State = #ch{virtual_host = VHostPath, transaction_id = TxnKey, writer_pid = WriterPid, - pubAck_mode = PubAck}) -> + confirm = Confirm}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), @@ -438,7 +438,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), IsPersistent = is_message_persistent(DecodedContent), - PubAck1 = handle_pubAck(PubAck, IsPersistent, WriterPid), + Confirm1 = handle_confirm(Confirm, IsPersistent, WriterPid), Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = DecodedContent, @@ -456,7 +456,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, maybe_incr_stats([{ExchangeName, 1} | [{{QPid, ExchangeName}, 1} || QPid <- DeliveredQPids]], publish, State), - State1 = State#ch{ pubAck_mode = PubAck1 }, + State1 = State#ch{ confirm = Confirm1 }, {noreply, case TxnKey of none -> State1; _ -> add_tx_participants(DeliveredQPids, State1) @@ -849,16 +849,16 @@ handle_method(#'queue.purge'{queue = QueueNameBin, handle_method(#'tx.select'{}, _, State = #ch{transaction_id = none, - pubAck_mode = #pubAck{enabled = false}}) -> + confirm = #confirm{enabled = false}}) -> {reply, #'tx.select_ok'{}, new_tx(State)}; handle_method(#'tx.select'{}, _, - State = #ch{pubAck_mode = #pubAck{enabled = false}}) -> + State = #ch{confirm = #confirm{enabled = false}}) -> {reply, #'tx.select_ok'{}, State}; handle_method(#'tx.select'{}, _, _State) -> rabbit_misc:protocol_error( - precondition_failed, "a pubAck channel cannot be made transactional", []); + precondition_failed, "a confirm channel cannot be made transactional", []); handle_method(#'tx.commit'{}, _, #ch{transaction_id = none}) -> rabbit_misc:protocol_error( @@ -874,25 +874,20 @@ handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) -> handle_method(#'tx.rollback'{}, _, State) -> {reply, #'tx.rollback_ok'{}, internal_rollback(State)}; -handle_method(#'pubAck.select'{many = Many}, _, +handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, _, State = #ch{ transaction_id = none, - pubAck_mode = PA}) -> - rabbit_log:info("got pubAck.select{many = ~p}~n", [Many]), - {noreply, State#ch{ pubAck_mode = PA#'pubAck'{ enabled = true, - many = Many } }}; -handle_method(#'pubAck.select'{}, _, _State) -> - rabbit_misc:protocol_error( - precondition_failed, "transactional channel cannot be made pubAck", []); - -handle_method(#'pubAck.deselect'{}, _, - #ch{ pubAck_mode = #pubAck { enabled = false }}) -> + confirm = C}) -> + rabbit_log:info("got confirm.select{multiple = ~p, nowait = ~p}~n", + [Multiple, NoWait]), + State1 = State#ch{confirm = C#confirm{ enabled = true, + multiple = Multiple }}, + case NoWait of + true -> {noreply, State1}; + false -> {reply, #'confirm.select_ok'{}, State1} + end; +handle_method(#'confirm.select'{}, _, _State) -> rabbit_misc:protocol_error( - precondition_failed, "channel must first be in pubAck, to deselect", []); - -handle_method(#'pubAck.deselect'{}, _, State = #ch{ pubAck_mode = PA }) -> - rabbit_log:info("got pubAck.deselect{}~n"), - {reply, #'pubAck.deselect_ok'{}, - State#ch{ pubAck_mode = PA#pubAck{ enabled = false }}}; + precondition_failed, "transactional channel cannot be made confirm", []); handle_method(#'channel.flow'{active = true}, _, State = #ch{limiter_pid = LimiterPid}) -> |
