diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-10 17:56:33 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-10 17:56:33 +0100 |
| commit | 912416cd714c6cbfabe95d0c6a9d473b34ecf9c1 (patch) | |
| tree | 9f29468fa4d5eabc73d4b876b53ebea10b0470b9 /src | |
| parent | b340fd6b58f207aa5d4a1f6d8d204eb802b4f563 (diff) | |
| download | rabbitmq-server-git-912416cd714c6cbfabe95d0c6a9d473b34ecf9c1.tar.gz | |
added pubAck handlers in rabbit_channel
A transactional channel cannot be made pubAck.
A pubAck channel cannot be made transactional.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 36 |
1 files changed, 32 insertions, 4 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 582960e79d..e42f49c590 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -47,7 +47,8 @@ transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, - consumer_mapping, blocking, queue_collector_pid, stats_timer}). + consumer_mapping, blocking, queue_collector_pid, stats_timer, + pubAck_mode}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -170,7 +171,8 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> consumer_mapping = dict:new(), blocking = dict:new(), queue_collector_pid = CollectorPid, - stats_timer = rabbit_event:init_stats_timer()}, + stats_timer = rabbit_event:init_stats_timer(), + pubAck_mode = none}, rabbit_event:notify( channel_created, [{Item, i(Item, State)} || Item <- ?CREATION_EVENT_KEYS]), @@ -830,12 +832,17 @@ handle_method(#'queue.purge'{queue = QueueNameBin, return_ok(State, NoWait, #'queue.purge_ok'{message_count = PurgedMessageCount}); -handle_method(#'tx.select'{}, _, State = #ch{transaction_id = none}) -> +handle_method(#'tx.select'{}, _, State = #ch{transaction_id = none, + pubAck_mode = none}) -> {reply, #'tx.select_ok'{}, new_tx(State)}; -handle_method(#'tx.select'{}, _, State) -> +handle_method(#'tx.select'{}, _, State = #ch{pubAck_mode = none}) -> {reply, #'tx.select_ok'{}, State}; +handle_method(#'tx.select'{}, _, _State) -> + rabbit_misc:protocol_error( + precondition_failed, "a pubAck channel cannot be made transactional", []); + handle_method(#'tx.commit'{}, _, #ch{transaction_id = none}) -> rabbit_misc:protocol_error( precondition_failed, "channel is not transactional", []); @@ -850,6 +857,26 @@ handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) -> handle_method(#'tx.rollback'{}, _, State) -> {reply, #'tx.rollback_ok'{}, internal_rollback(State)}; +handle_method(#'pubAck.select'{many = Many}, _, + State = #ch{transaction_id = none}) -> + rabbit_log:info("got pubAck.select{many = ~p}~n", [Many]), + {noreply, State#ch{ pubAck_mode = case Many of + true -> 'pubAckMany'; + false -> 'pubAck' + end + }}; +handle_method(#'pubAck.select'{}, _, State) -> + rabbit_misc:protocol_error( + precondition_failed, "transactional channel cannot be made pubAck", []); + +handle_method(#'pubAck.deselect'{}, _, State = #ch{ pubAck_mode = 'none' }) -> + rabbit_misc:protocol_error( + precondition_failed, "channel must first be in pubAck, to deselect", []); + +handle_method(#'pubAck.deselect'{}, _, State) -> + rabbit_log:info("got pubAck.deselect{}~n"), + {reply, #'pubAck.deselect_ok'{}, State#ch{ pubAck_mode = 'none'}}; + handle_method(#'channel.flow'{active = true}, _, State = #ch{limiter_pid = LimiterPid}) -> LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of @@ -858,6 +885,7 @@ handle_method(#'channel.flow'{active = true}, _, end, {reply, #'channel.flow_ok'{active = true}, State#ch{limiter_pid = LimiterPid1}}; + handle_method(#'channel.flow'{active = false}, _, State = #ch{limiter_pid = LimiterPid, consumer_mapping = Consumers}) -> |
