summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-10 17:56:33 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-10 17:56:33 +0100
commit912416cd714c6cbfabe95d0c6a9d473b34ecf9c1 (patch)
tree9f29468fa4d5eabc73d4b876b53ebea10b0470b9 /src
parentb340fd6b58f207aa5d4a1f6d8d204eb802b4f563 (diff)
downloadrabbitmq-server-git-912416cd714c6cbfabe95d0c6a9d473b34ecf9c1.tar.gz
added pubAck handlers in rabbit_channel
A transactional channel cannot be made pubAck. A pubAck channel cannot be made transactional.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl36
1 files changed, 32 insertions, 4 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 582960e79d..e42f49c590 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -47,7 +47,8 @@
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking, queue_collector_pid, stats_timer}).
+ consumer_mapping, blocking, queue_collector_pid, stats_timer,
+ pubAck_mode}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -170,7 +171,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
consumer_mapping = dict:new(),
blocking = dict:new(),
queue_collector_pid = CollectorPid,
- stats_timer = rabbit_event:init_stats_timer()},
+ stats_timer = rabbit_event:init_stats_timer(),
+ pubAck_mode = none},
rabbit_event:notify(
channel_created,
[{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]),
@@ -830,12 +832,17 @@ handle_method(#'queue.purge'{queue = QueueNameBin,
return_ok(State, NoWait,
#'queue.purge_ok'{message_count = PurgedMessageCount});
-handle_method(#'tx.select'{}, _, State = #ch{transaction_id = none}) ->
+handle_method(#'tx.select'{}, _, State = #ch{transaction_id = none,
+ pubAck_mode = none}) ->
{reply, #'tx.select_ok'{}, new_tx(State)};
-handle_method(#'tx.select'{}, _, State) ->
+handle_method(#'tx.select'{}, _, State = #ch{pubAck_mode = none}) ->
{reply, #'tx.select_ok'{}, State};
+handle_method(#'tx.select'{}, _, _State) ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "a pubAck channel cannot be made transactional", []);
+
handle_method(#'tx.commit'{}, _, #ch{transaction_id = none}) ->
rabbit_misc:protocol_error(
precondition_failed, "channel is not transactional", []);
@@ -850,6 +857,26 @@ handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) ->
handle_method(#'tx.rollback'{}, _, State) ->
{reply, #'tx.rollback_ok'{}, internal_rollback(State)};
+handle_method(#'pubAck.select'{many = Many}, _,
+ State = #ch{transaction_id = none}) ->
+ rabbit_log:info("got pubAck.select{many = ~p}~n", [Many]),
+ {noreply, State#ch{ pubAck_mode = case Many of
+ true -> 'pubAckMany';
+ false -> 'pubAck'
+ end
+ }};
+handle_method(#'pubAck.select'{}, _, State) ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "transactional channel cannot be made pubAck", []);
+
+handle_method(#'pubAck.deselect'{}, _, State = #ch{ pubAck_mode = 'none' }) ->
+ rabbit_misc:protocol_error(
+ precondition_failed, "channel must first be in pubAck, to deselect", []);
+
+handle_method(#'pubAck.deselect'{}, _, State) ->
+ rabbit_log:info("got pubAck.deselect{}~n"),
+ {reply, #'pubAck.deselect_ok'{}, State#ch{ pubAck_mode = 'none'}};
+
handle_method(#'channel.flow'{active = true}, _,
State = #ch{limiter_pid = LimiterPid}) ->
LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of
@@ -858,6 +885,7 @@ handle_method(#'channel.flow'{active = true}, _,
end,
{reply, #'channel.flow_ok'{active = true},
State#ch{limiter_pid = LimiterPid1}};
+
handle_method(#'channel.flow'{active = false}, _,
State = #ch{limiter_pid = LimiterPid,
consumer_mapping = Consumers}) ->