summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl51
1 files changed, 34 insertions, 17 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e42f49c590..38c4855020 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -49,6 +49,7 @@
username, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
pubAck_mode}).
+-record(pubAck, {enabled, count, many}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -172,7 +173,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
blocking = dict:new(),
queue_collector_pid = CollectorPid,
stats_timer = rabbit_event:init_stats_timer(),
- pubAck_mode = none},
+ pubAck_mode = #pubAck{ enabled = false,
+ count = 0 }},
rabbit_event:notify(
channel_created,
[{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]),
@@ -393,6 +395,16 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
+handle_pubAck(PA = #pubAck{ enabled = false }, _) ->
+ PA;
+handle_pubAck(PA = #pubAck{ count = Count, many = false }, WriterPid) ->
+ rabbit_log:info("handling pubAck in single mode (#~p)~n", [Count]),
+ ok = rabbit_writer:send_command(WriterPid, #'basic.ack'{ delivery_tag = Count }),
+ PA#pubAck{ count = Count+1 };
+handle_pubAck(PA = #pubAck{ count = Count }, _WriterPid) ->
+ rabbit_log:info("handling pubAck in many mode (#~p)~n", [Count]),
+ PA#pubAck{ count = Count+1 }.
+
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};
@@ -417,10 +429,12 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
immediate = Immediate},
Content, State = #ch{virtual_host = VHostPath,
transaction_id = TxnKey,
- writer_pid = WriterPid}) ->
+ writer_pid = WriterPid,
+ pubAck_mode = PubAck}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
+ PubAck1 = handle_pubAck(PubAck, WriterPid),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
@@ -442,9 +456,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
maybe_incr_stats([{ExchangeName, 1} |
[{{QPid, ExchangeName}, 1} ||
QPid <- DeliveredQPids]], publish, State),
+ State1 = State#ch{ pubAck_mode = PubAck1 },
{noreply, case TxnKey of
- none -> State;
- _ -> add_tx_participants(DeliveredQPids, State)
+ none -> State1;
+ _ -> add_tx_participants(DeliveredQPids, State1)
end};
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
@@ -832,11 +847,13 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
return_ok(State, NoWait,
#'queue.purge_ok'{message_count = PurgedMessageCount});
-handle_method(#'tx.select'{}, _, State = #ch{transaction_id = none,
- pubAck_mode = none}) ->
+handle_method(#'tx.select'{}, _,
+ State = #ch{transaction_id = none,
+ pubAck_mode = #pubAck{enabled = false}}) ->
{reply, #'tx.select_ok'{}, new_tx(State)};
-handle_method(#'tx.select'{}, _, State = #ch{pubAck_mode = none}) ->
+handle_method(#'tx.select'{}, _,
+ State = #ch{pubAck_mode = #pubAck{enabled = false}}) ->
{reply, #'tx.select_ok'{}, State};
handle_method(#'tx.select'{}, _, _State) ->
@@ -858,24 +875,24 @@ handle_method(#'tx.rollback'{}, _, State) ->
{reply, #'tx.rollback_ok'{}, internal_rollback(State)};
handle_method(#'pubAck.select'{many = Many}, _,
- State = #ch{transaction_id = none}) ->
+ State = #ch{ transaction_id = none,
+ pubAck_mode = PA}) ->
rabbit_log:info("got pubAck.select{many = ~p}~n", [Many]),
- {noreply, State#ch{ pubAck_mode = case Many of
- true -> 'pubAckMany';
- false -> 'pubAck'
- end
- }};
-handle_method(#'pubAck.select'{}, _, State) ->
+ {noreply, State#ch{ pubAck_mode = PA#'pubAck'{ enabled = true,
+ many = Many } }};
+handle_method(#'pubAck.select'{}, _, _State) ->
rabbit_misc:protocol_error(
precondition_failed, "transactional channel cannot be made pubAck", []);
-handle_method(#'pubAck.deselect'{}, _, State = #ch{ pubAck_mode = 'none' }) ->
+handle_method(#'pubAck.deselect'{}, _,
+ #ch{ pubAck_mode = #pubAck { enabled = false }}) ->
rabbit_misc:protocol_error(
precondition_failed, "channel must first be in pubAck, to deselect", []);
-handle_method(#'pubAck.deselect'{}, _, State) ->
+handle_method(#'pubAck.deselect'{}, _, State = #ch{ pubAck_mode = PA }) ->
rabbit_log:info("got pubAck.deselect{}~n"),
- {reply, #'pubAck.deselect_ok'{}, State#ch{ pubAck_mode = 'none'}};
+ {reply, #'pubAck.deselect_ok'{},
+ State#ch{ pubAck_mode = PA#pubAck{ enabled = false }}};
handle_method(#'channel.flow'{active = true}, _,
State = #ch{limiter_pid = LimiterPid}) ->