summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2015-04-29 17:52:37 +0300
committerMichael Klishin <mklishin@pivotal.io>2015-04-29 17:52:37 +0300
commit0750291e797336d50d4fd12fa5489a5a51a0cb30 (patch)
tree44eb29b91693018592c2a31b79b0667e1eb87ffe
parent0b91553bafde0e063714b9557a1a76dc5213163b (diff)
downloadrabbitmq-server-git-0750291e797336d50d4fd12fa5489a5a51a0cb30.tar.gz
Move mirroring flow control setting into rabbit_channel:init
Rename the key to make more sense, support true/false values.
-rw-r--r--ebin/rabbit_app.in2
-rw-r--r--src/rabbit_channel.erl23
2 files changed, 14 insertions, 11 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 4422969bc6..224829c3bf 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -81,5 +81,5 @@
gen_fsm, ssl]},
{ssl_apps, [asn1, crypto, public_key, ssl]},
%% see rabbitmq-server#114
- {gm_flow_control, flow}
+ {mirroring_flow_control, flow}
]}]}.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index ba98175299..886c890c88 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -40,7 +40,9 @@
queue_consumers, delivering_queues,
queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
unconfirmed, confirmed, mandatory, capabilities, trace_state,
- consumer_prefetch, reply_consumer}).
+ consumer_prefetch, reply_consumer,
+ %% flow | noflow, see rabbitmq-server#114
+ delivery_flow}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -235,6 +237,12 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
process_flag(trap_exit, true),
?store_proc_name({ConnName, Channel}),
ok = pg_local:join(rabbit_channels, self()),
+ Flow = case rabbit_misc:get_env(rabbit, mirroring_control, flow) of
+ flow -> flow;
+ true -> flow;
+ noflow -> noflow;
+ false -> noflow
+ end,
State = #ch{state = starting,
protocol = Protocol,
channel = Channel,
@@ -263,7 +271,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
capabilities = Capabilities,
trace_state = rabbit_trace:init(VHost),
consumer_prefetch = 0,
- reply_consumer = none},
+ reply_consumer = none,
+ delivery_flow = Flow},
State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer),
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(State1, #ch.stats_timer,
@@ -770,7 +779,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
confirm_enabled = ConfirmEnabled,
trace_state = TraceState,
user = #user{username = Username},
- conn_name = ConnName}) ->
+ conn_name = ConnName,
+ delivery_flow = Flow}) ->
check_msg_size(Content),
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
@@ -797,13 +807,6 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
QNames = rabbit_exchange:route(Exchange, Delivery),
rabbit_trace:tap_in(Message, QNames, ConnName, ChannelNum,
Username, TraceState),
- %% flow | noflow, see rabbitmq-server#114
- Flow = case get(gm_flow_control) of
- undefined -> V = rabbit_misc:get_env(rabbit, gm_flow_control, flow),
- put(gm_flow_control, V),
- V;
- V -> V
- end,
DQ = {Delivery#delivery{flow = Flow}, QNames},
{noreply, case Tx of
none -> deliver_to_queues(DQ, State1);