diff options
| -rw-r--r-- | ebin/rabbit_app.in | 4 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 17 |
2 files changed, 16 insertions, 5 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 918741438c..654c52bdd3 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -79,5 +79,7 @@ mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon, ssl_connection, tls_connection, ssl_record, tls_record, gen_fsm, ssl]}, - {ssl_apps, [asn1, crypto, public_key, ssl]} + {ssl_apps, [asn1, crypto, public_key, ssl]}, + %% see rabbitmq-server#114 + {mirroring_flow_control, true} ]}]}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 3c473cf249..8fd7b195ae 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -138,9 +138,12 @@ trace_state, consumer_prefetch, %% used by "one shot RPC" (amq. - reply_consumer + reply_consumer, + %% flow | noflow, see rabbitmq-server#114 + delivery_flow }). + -define(MAX_PERMISSION_CACHE_SIZE, 12). -define(STATISTICS_KEYS, @@ -334,6 +337,10 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, process_flag(trap_exit, true), ?store_proc_name({ConnName, Channel}), ok = pg_local:join(rabbit_channels, self()), + Flow = case rabbit_misc:get_env(rabbit, mirroring_flow_control, true) of + true -> flow; + false -> noflow + end, State = #ch{state = starting, protocol = Protocol, channel = Channel, @@ -362,7 +369,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, capabilities = Capabilities, trace_state = rabbit_trace:init(VHost), consumer_prefetch = 0, - reply_consumer = none}, + reply_consumer = none, + delivery_flow = Flow}, State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer), rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(State1, #ch.stats_timer, @@ -869,7 +877,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, confirm_enabled = ConfirmEnabled, trace_state = TraceState, user = #user{username = Username}, - conn_name = ConnName}) -> + conn_name = ConnName, + delivery_flow = Flow}) -> check_msg_size(Content), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), @@ -896,7 +905,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, QNames = rabbit_exchange:route(Exchange, Delivery), rabbit_trace:tap_in(Message, QNames, ConnName, ChannelNum, Username, TraceState), - DQ = {Delivery#delivery{flow = flow}, QNames}, + DQ = {Delivery#delivery{flow = Flow}, QNames}, {noreply, case Tx of none -> deliver_to_queues(DQ, State1); {Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs), |
