diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2015-04-29 18:46:18 +0300 |
|---|---|---|
| committer | Michael Klishin <mklishin@pivotal.io> | 2015-04-29 18:46:18 +0300 |
| commit | 8a30408a93e27004c76cbb643841bd84c6af32dd (patch) | |
| tree | 1fc7a305e0b398fe18a696f513678a816cd656b4 /src | |
| parent | a5f3380b38be2dbcae0a8844c8086bd5d37a56e6 (diff) | |
| parent | 65011cce7c0f29d99beabfb3d24e40e47226bb2b (diff) | |
| download | rabbitmq-server-git-8a30408a93e27004c76cbb643841bd84c6af32dd.tar.gz | |
Merge branch 'stable'
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 17 |
1 files changed, 13 insertions, 4 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 3c473cf249..8fd7b195ae 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -138,9 +138,12 @@ trace_state, consumer_prefetch, %% used by "one shot RPC" (amq. - reply_consumer + reply_consumer, + %% flow | noflow, see rabbitmq-server#114 + delivery_flow }). + -define(MAX_PERMISSION_CACHE_SIZE, 12). -define(STATISTICS_KEYS, @@ -334,6 +337,10 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, process_flag(trap_exit, true), ?store_proc_name({ConnName, Channel}), ok = pg_local:join(rabbit_channels, self()), + Flow = case rabbit_misc:get_env(rabbit, mirroring_flow_control, true) of + true -> flow; + false -> noflow + end, State = #ch{state = starting, protocol = Protocol, channel = Channel, @@ -362,7 +369,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, capabilities = Capabilities, trace_state = rabbit_trace:init(VHost), consumer_prefetch = 0, - reply_consumer = none}, + reply_consumer = none, + delivery_flow = Flow}, State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer), rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(State1, #ch.stats_timer, @@ -869,7 +877,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, confirm_enabled = ConfirmEnabled, trace_state = TraceState, user = #user{username = Username}, - conn_name = ConnName}) -> + conn_name = ConnName, + delivery_flow = Flow}) -> check_msg_size(Content), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), @@ -896,7 +905,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, QNames = rabbit_exchange:route(Exchange, Delivery), rabbit_trace:tap_in(Message, QNames, ConnName, ChannelNum, Username, TraceState), - DQ = {Delivery#delivery{flow = flow}, QNames}, + DQ = {Delivery#delivery{flow = Flow}, QNames}, {noreply, case Tx of none -> deliver_to_queues(DQ, State1); {Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs), |
