summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile2
-rw-r--r--src/rabbit_channel.erl65
2 files changed, 31 insertions, 36 deletions
diff --git a/Makefile b/Makefile
index 195c4ddf61..b39c09f63c 100644
--- a/Makefile
+++ b/Makefile
@@ -58,7 +58,7 @@ SIBLING_CODEGEN_DIR=../rabbitmq-codegen/
AMQP_CODEGEN_DIR=$(shell [ -d $(SIBLING_CODEGEN_DIR) ] && echo $(SIBLING_CODEGEN_DIR) || echo codegen)
AMQP_SPEC_JSON_FILES_0_9_1=$(AMQP_CODEGEN_DIR)/amqp-rabbitmq-0.9.1.json
AMQP_SPEC_JSON_FILES_0_8=$(AMQP_CODEGEN_DIR)/amqp-rabbitmq-0.8.json
-RABBIT_SPEC_EXTENSIONS=$(AMQP_CODEGEN_DIR)/puback-extension.json
+RABBIT_SPEC_EXTENSIONS=$(AMQP_CODEGEN_DIR)/confirm-extension.json
ERL_CALL=erl_call -sname $(RABBITMQ_NODENAME) -e
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index d06b7a3040..35e95a188d 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -48,8 +48,8 @@
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
- pubAck_mode}).
--record(pubAck, {enabled, count, many}).
+ confirm}).
+-record(confirm, {enabled, count, multiple}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -173,8 +173,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
blocking = dict:new(),
queue_collector_pid = CollectorPid,
stats_timer = rabbit_event:init_stats_timer(),
- pubAck_mode = #pubAck{ enabled = false,
- count = 0 }},
+ confirm = #confirm{ enabled = false,
+ count = 0 }},
rabbit_event:notify(
channel_created,
[{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]),
@@ -395,15 +395,15 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
-handle_pubAck(PA = #pubAck{ enabled = false }, _, _) ->
- PA;
-handle_pubAck(PA = #pubAck{ count = Count, many = false }, false, WriterPid) ->
- rabbit_log:info("handling pubAck in single mode (#~p)~n", [Count]),
+handle_confirm(C = #confirm{ enabled = false }, _, _) ->
+ C;
+handle_confirm(C = #confirm{ count = Count, multiple = false }, false, WriterPid) ->
+ rabbit_log:info("handling confirm in single mode (#~p)~n", [Count]),
ok = rabbit_writer:send_command(WriterPid, #'basic.ack'{ delivery_tag = Count }),
- PA#pubAck{ count = Count+1 };
-handle_pubAck(PA = #pubAck{ count = Count }, IsPersistent, _WriterPid) ->
- rabbit_log:info("handling pubAck (#~p, persistent = ~p)~n", [Count, IsPersistent]),
- PA#pubAck{ count = Count+1 }.
+ C#confirm{ count = Count+1 };
+handle_confirm(C = #confirm{ count = Count }, IsPersistent, _WriterPid) ->
+ rabbit_log:info("handling confirm (#~p, persistent = ~p)~n", [Count, IsPersistent]),
+ C#confirm{ count = Count+1 }.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};
@@ -430,7 +430,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
Content, State = #ch{virtual_host = VHostPath,
transaction_id = TxnKey,
writer_pid = WriterPid,
- pubAck_mode = PubAck}) ->
+ confirm = Confirm}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
@@ -438,7 +438,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
IsPersistent = is_message_persistent(DecodedContent),
- PubAck1 = handle_pubAck(PubAck, IsPersistent, WriterPid),
+ Confirm1 = handle_confirm(Confirm, IsPersistent, WriterPid),
Message = #basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = DecodedContent,
@@ -456,7 +456,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
maybe_incr_stats([{ExchangeName, 1} |
[{{QPid, ExchangeName}, 1} ||
QPid <- DeliveredQPids]], publish, State),
- State1 = State#ch{ pubAck_mode = PubAck1 },
+ State1 = State#ch{ confirm = Confirm1 },
{noreply, case TxnKey of
none -> State1;
_ -> add_tx_participants(DeliveredQPids, State1)
@@ -849,16 +849,16 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
handle_method(#'tx.select'{}, _,
State = #ch{transaction_id = none,
- pubAck_mode = #pubAck{enabled = false}}) ->
+ confirm = #confirm{enabled = false}}) ->
{reply, #'tx.select_ok'{}, new_tx(State)};
handle_method(#'tx.select'{}, _,
- State = #ch{pubAck_mode = #pubAck{enabled = false}}) ->
+ State = #ch{confirm = #confirm{enabled = false}}) ->
{reply, #'tx.select_ok'{}, State};
handle_method(#'tx.select'{}, _, _State) ->
rabbit_misc:protocol_error(
- precondition_failed, "a pubAck channel cannot be made transactional", []);
+ precondition_failed, "a confirm channel cannot be made transactional", []);
handle_method(#'tx.commit'{}, _, #ch{transaction_id = none}) ->
rabbit_misc:protocol_error(
@@ -874,25 +874,20 @@ handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) ->
handle_method(#'tx.rollback'{}, _, State) ->
{reply, #'tx.rollback_ok'{}, internal_rollback(State)};
-handle_method(#'pubAck.select'{many = Many}, _,
+handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, _,
State = #ch{ transaction_id = none,
- pubAck_mode = PA}) ->
- rabbit_log:info("got pubAck.select{many = ~p}~n", [Many]),
- {noreply, State#ch{ pubAck_mode = PA#'pubAck'{ enabled = true,
- many = Many } }};
-handle_method(#'pubAck.select'{}, _, _State) ->
- rabbit_misc:protocol_error(
- precondition_failed, "transactional channel cannot be made pubAck", []);
-
-handle_method(#'pubAck.deselect'{}, _,
- #ch{ pubAck_mode = #pubAck { enabled = false }}) ->
+ confirm = C}) ->
+ rabbit_log:info("got confirm.select{multiple = ~p, nowait = ~p}~n",
+ [Multiple, NoWait]),
+ State1 = State#ch{confirm = C#confirm{ enabled = true,
+ multiple = Multiple }},
+ case NoWait of
+ true -> {noreply, State1};
+ false -> {reply, #'confirm.select_ok'{}, State1}
+ end;
+handle_method(#'confirm.select'{}, _, _State) ->
rabbit_misc:protocol_error(
- precondition_failed, "channel must first be in pubAck, to deselect", []);
-
-handle_method(#'pubAck.deselect'{}, _, State = #ch{ pubAck_mode = PA }) ->
- rabbit_log:info("got pubAck.deselect{}~n"),
- {reply, #'pubAck.deselect_ok'{},
- State#ch{ pubAck_mode = PA#pubAck{ enabled = false }}};
+ precondition_failed, "transactional channel cannot be made confirm", []);
handle_method(#'channel.flow'{active = true}, _,
State = #ch{limiter_pid = LimiterPid}) ->