summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl17
1 files changed, 13 insertions, 4 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 3c473cf249..8fd7b195ae 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -138,9 +138,12 @@
trace_state,
consumer_prefetch,
%% used by "one shot RPC" (amq.
- reply_consumer
+ reply_consumer,
+ %% flow | noflow, see rabbitmq-server#114
+ delivery_flow
}).
+
-define(MAX_PERMISSION_CACHE_SIZE, 12).
-define(STATISTICS_KEYS,
@@ -334,6 +337,10 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
process_flag(trap_exit, true),
?store_proc_name({ConnName, Channel}),
ok = pg_local:join(rabbit_channels, self()),
+ Flow = case rabbit_misc:get_env(rabbit, mirroring_flow_control, true) of
+ true -> flow;
+ false -> noflow
+ end,
State = #ch{state = starting,
protocol = Protocol,
channel = Channel,
@@ -362,7 +369,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
capabilities = Capabilities,
trace_state = rabbit_trace:init(VHost),
consumer_prefetch = 0,
- reply_consumer = none},
+ reply_consumer = none,
+ delivery_flow = Flow},
State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer),
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(State1, #ch.stats_timer,
@@ -869,7 +877,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
confirm_enabled = ConfirmEnabled,
trace_state = TraceState,
user = #user{username = Username},
- conn_name = ConnName}) ->
+ conn_name = ConnName,
+ delivery_flow = Flow}) ->
check_msg_size(Content),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
@@ -896,7 +905,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
QNames = rabbit_exchange:route(Exchange, Delivery),
rabbit_trace:tap_in(Message, QNames, ConnName, ChannelNum,
Username, TraceState),
- DQ = {Delivery#delivery{flow = flow}, QNames},
+ DQ = {Delivery#delivery{flow = Flow}, QNames},
{noreply, case Tx of
none -> deliver_to_queues(DQ, State1);
{Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs),