diff options
| -rw-r--r-- | ebin/rabbit_app.in | 4 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 16 |
2 files changed, 15 insertions, 5 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 918741438c..654c52bdd3 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -79,5 +79,7 @@ mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon, ssl_connection, tls_connection, ssl_record, tls_record, gen_fsm, ssl]}, - {ssl_apps, [asn1, crypto, public_key, ssl]} + {ssl_apps, [asn1, crypto, public_key, ssl]}, + %% see rabbitmq-server#114 + {mirroring_flow_control, true} ]}]}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 4e2a9e11f7..d0088a5ed8 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -40,7 +40,9 @@ queue_consumers, delivering_queues, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed, confirmed, mandatory, capabilities, trace_state, - consumer_prefetch, reply_consumer}). + consumer_prefetch, reply_consumer, + %% flow | noflow, see rabbitmq-server#114 + delivery_flow}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -235,6 +237,10 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, process_flag(trap_exit, true), ?store_proc_name({ConnName, Channel}), ok = pg_local:join(rabbit_channels, self()), + Flow = case rabbit_misc:get_env(rabbit, mirroring_flow_control, true) of + true -> flow; + false -> noflow + end, State = #ch{state = starting, protocol = Protocol, channel = Channel, @@ -263,7 +269,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, capabilities = Capabilities, trace_state = rabbit_trace:init(VHost), consumer_prefetch = 0, - reply_consumer = none}, + reply_consumer = none, + delivery_flow = Flow}, State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer), rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(State1, #ch.stats_timer, @@ -770,7 +777,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, confirm_enabled = ConfirmEnabled, trace_state = TraceState, user = #user{username = Username}, - conn_name = ConnName}) -> + conn_name = ConnName, + delivery_flow = Flow}) -> check_msg_size(Content), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), @@ -797,7 +805,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, QNames = rabbit_exchange:route(Exchange, Delivery), rabbit_trace:tap_in(Message, QNames, ConnName, ChannelNum, Username, TraceState), - DQ = {Delivery#delivery{flow = flow}, QNames}, + DQ = {Delivery#delivery{flow = Flow}, QNames}, {noreply, case Tx of none -> deliver_to_queues(DQ, State1); {Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs), |
