summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/credit_flow.erl21
-rw-r--r--src/rabbit_amqqueue_process.erl65
-rw-r--r--src/rabbit_binary_parser.erl18
-rw-r--r--src/rabbit_channel.erl20
-rw-r--r--src/rabbit_control_main.erl9
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_reader.erl178
-rw-r--r--src/rabbit_upgrade_functions.erl15
-rw-r--r--src/rabbit_vhost.erl20
9 files changed, 246 insertions, 102 deletions
diff --git a/src/credit_flow.erl b/src/credit_flow.erl
index d48d649ef3..39a257aca4 100644
--- a/src/credit_flow.erl
+++ b/src/credit_flow.erl
@@ -30,7 +30,7 @@
-define(DEFAULT_CREDIT, {200, 50}).
--export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0]).
+-export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0, state/0]).
-export([peer_down/1]).
%%----------------------------------------------------------------------------
@@ -110,6 +110,18 @@ blocked() -> case get(credit_blocked) of
_ -> true
end.
+state() -> case blocked() of
+ true -> flow;
+ false -> case get(credit_blocked_at) of
+ undefined -> running;
+ B -> Diff = timer:now_diff(erlang:now(), B),
+ case Diff < 5000000 of
+ true -> flow;
+ false -> running
+ end
+ end
+ end.
+
peer_down(Peer) ->
%% In theory we could also remove it from credit_deferred here, but it
%% doesn't really matter; at some point later we will drain
@@ -128,7 +140,12 @@ grant(To, Quantity) ->
true -> ?UPDATE(credit_deferred, [], Deferred, [{To, Msg} | Deferred])
end.
-block(From) -> ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]).
+block(From) ->
+ case blocked() of
+ false -> put(credit_blocked_at, erlang:now());
+ true -> ok
+ end,
+ ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]).
unblock(From) ->
?UPDATE(credit_blocked, [], Blocks, Blocks -- [From]),
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4ff30ce0b8..7002fd367c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -39,6 +39,7 @@
backing_queue,
backing_queue_state,
active_consumers,
+ consumer_use,
expires,
sync_timer_ref,
rate_timer_ref,
@@ -95,11 +96,12 @@
messages_unacknowledged,
messages,
consumers,
+ consumer_utilisation,
memory,
slave_pids,
synchronised_slave_pids,
backing_queue_status,
- status
+ state
]).
-define(CREATION_EVENT_KEYS,
@@ -149,6 +151,7 @@ init_state(Q) ->
exclusive_consumer = none,
has_had_consumers = false,
active_consumers = priority_queue:new(),
+ consumer_use = {inactive, now_micros(), 0, 0.0},
senders = pmon:new(delegate),
msg_id_to_channel = gb_trees:empty(),
status = running,
@@ -482,10 +485,12 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
deliver_msgs_to_consumers(_DeliverFun, true, State) ->
{true, State};
deliver_msgs_to_consumers(DeliverFun, false,
- State = #q{active_consumers = ActiveConsumers}) ->
+ State = #q{active_consumers = ActiveConsumers,
+ consumer_use = CUInfo}) ->
case priority_queue:out_p(ActiveConsumers) of
{empty, _} ->
- {false, State};
+ {false,
+ State#q{consumer_use = update_consumer_use(CUInfo, inactive)}};
{{value, QEntry, Priority}, Tail} ->
{Stop, State1} = deliver_msg_to_consumer(
DeliverFun, QEntry, Priority,
@@ -536,6 +541,26 @@ deliver_from_queue_deliver(AckRequired, State) ->
{Result, State1} = fetch(AckRequired, State),
{Result, is_empty(State1), State1}.
+update_consumer_use({inactive, _, _, _} = CUInfo, inactive) ->
+ CUInfo;
+update_consumer_use({active, _, _} = CUInfo, active) ->
+ CUInfo;
+update_consumer_use({active, Since, Avg}, inactive) ->
+ Now = now_micros(),
+ {inactive, Now, Now - Since, Avg};
+update_consumer_use({inactive, Since, Active, Avg}, active) ->
+ Now = now_micros(),
+ {active, Now, consumer_use_avg(Active, Now - Since, Avg)}.
+
+consumer_use_avg(Active, Inactive, Avg) ->
+ Time = Inactive + Active,
+ Ratio = Active / Time,
+ Weight = erlang:min(1, Time / 1000000),
+ case Avg of
+ undefined -> Ratio;
+ _ -> Ratio * Weight + Avg * (1 - Weight)
+ end.
+
confirm_messages([], State) ->
State;
confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
@@ -713,7 +738,7 @@ possibly_unblock(State, ChPid, Update) ->
end
end.
-unblock(State, C = #cr{limiter = Limiter}) ->
+unblock(State = #q{consumer_use = CUInfo}, C = #cr{limiter = Limiter}) ->
case lists:partition(
fun({_P, {_ChPid, #consumer{tag = CTag}}}) ->
rabbit_limiter:is_consumer_blocked(Limiter, CTag)
@@ -725,12 +750,14 @@ unblock(State, C = #cr{limiter = Limiter}) ->
BlockedQ = priority_queue:from_list(Blocked),
UnblockedQ = priority_queue:from_list(Unblocked),
update_ch_record(C#cr{blocked_consumers = BlockedQ}),
- AC1 = priority_queue:join(State#q.active_consumers, UnblockedQ),
- State1 = State#q{active_consumers = AC1},
+ State1 = State#q{consumer_use =
+ update_consumer_use(CUInfo, active)},
+ AC1 = priority_queue:join(State1#q.active_consumers, UnblockedQ),
+ State2 = State1#q{active_consumers = AC1},
[notify_decorators(
- consumer_unblocked, [{consumer_tag, CTag}], State1) ||
+ consumer_unblocked, [{consumer_tag, CTag}], State2) ||
{_P, {_ChPid, #consumer{tag = CTag}}} <- Unblocked],
- run_message_queue(State1)
+ run_message_queue(State2)
end.
should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
@@ -1037,6 +1064,16 @@ i(messages, State) ->
messages_unacknowledged]]);
i(consumers, _) ->
consumer_count();
+i(consumer_utilisation, #q{consumer_use = ConsumerUse}) ->
+ case consumer_count() of
+ 0 -> '';
+ _ -> case ConsumerUse of
+ {active, Since, Avg} ->
+ consumer_use_avg(now_micros() - Since, 0, Avg);
+ {inactive, Since, Active, Avg} ->
+ consumer_use_avg(Active, now_micros() - Since, Avg)
+ end
+ end;
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
@@ -1054,8 +1091,8 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) ->
false -> '';
true -> SSPids
end;
-i(status, #q{status = Status}) ->
- Status;
+i(state, #q{status = running}) -> credit_flow:state();
+i(state, #q{status = State}) -> State;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
i(Item, _) ->
@@ -1076,7 +1113,10 @@ emit_stats(State) ->
emit_stats(State, []).
emit_stats(State, Extra) ->
- rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)).
+ ExtraKs = [K || {K, _} <- Extra],
+ Infos = [{K, V} || {K, V} <- infos(?STATISTICS_KEYS, State),
+ not lists:member(K, ExtraKs)],
+ rabbit_event:notify(queue_stats, Extra ++ Infos).
emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Args) ->
rabbit_event:notify(consumer_created,
@@ -1537,7 +1577,8 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
BQS3 = BQ:handle_pre_hibernate(BQS2),
rabbit_event:if_enabled(
State, #q.stats_timer,
- fun () -> emit_stats(State, [{idle_since, now()}]) end),
+ fun () -> emit_stats(State, [{idle_since, now()},
+ {consumer_utilisation, ''}]) end),
State1 = rabbit_event:stop_stats_timer(State#q{backing_queue_state = BQS3},
#q.stats_timer),
{hibernate, stop_rate_timer(State1)}.
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index dc6d090ff0..088ad0e52e 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -20,6 +20,7 @@
-export([parse_table/1]).
-export([ensure_content_decoded/1, clear_decoded_content/1]).
+-export([validate_utf8/1, assert_utf8/1]).
%%----------------------------------------------------------------------------
@@ -30,6 +31,8 @@
(rabbit_types:content()) -> rabbit_types:decoded_content()).
-spec(clear_decoded_content/1 ::
(rabbit_types:content()) -> rabbit_types:undecoded_content()).
+-spec(validate_utf8/1 :: (binary()) -> 'ok' | 'error').
+-spec(assert_utf8/1 :: (binary()) -> 'ok').
-endif.
@@ -99,3 +102,18 @@ clear_decoded_content(Content = #content{properties_bin = none}) ->
Content;
clear_decoded_content(Content = #content{}) ->
Content#content{properties = none}.
+
+assert_utf8(B) ->
+ case validate_utf8(B) of
+ ok -> ok;
+ error -> rabbit_misc:protocol_error(
+ frame_error, "Malformed UTF-8 in shortstr", [])
+ end.
+
+validate_utf8(Bin) ->
+ try
+ xmerl_ucs:from_utf8(Bin),
+ ok
+ catch exit:{ucs, _} ->
+ error
+ end.
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index a3a0c7543b..4d778f946f 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -53,7 +53,8 @@
messages_uncommitted,
acks_uncommitted,
prefetch_count,
- client_flow_blocked]).
+ client_flow_blocked,
+ state]).
-define(CREATION_EVENT_KEYS,
[pid,
@@ -550,6 +551,14 @@ check_not_default_exchange(#resource{kind = exchange, name = <<"">>}) ->
check_not_default_exchange(_) ->
ok.
+check_exchange_deletion(XName = #resource{name = <<"amq.rabbitmq.", _/binary>>,
+ kind = exchange}) ->
+ rabbit_misc:protocol_error(
+ access_refused, "deletion of system ~s not allowed",
+ [rabbit_misc:rs(XName)]);
+check_exchange_deletion(_) ->
+ ok.
+
%% check that an exchange/queue name does not contain the reserved
%% "amq." prefix.
%%
@@ -592,7 +601,11 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
record_confirms(MXs, State#ch{unconfirmed = UC1}).
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
- {reply, #'channel.open_ok'{}, State#ch{state = running}};
+ %% Don't leave "starting" as the state for 5s. TODO is this TRTTD?
+ State1 = State#ch{state = running},
+ rabbit_event:if_enabled(State1, #ch.stats_timer,
+ fun() -> emit_stats(State1) end),
+ {reply, #'channel.open_ok'{}, State1};
handle_method(#'channel.open'{}, _, _State) ->
rabbit_misc:protocol_error(
@@ -933,6 +946,7 @@ handle_method(#'exchange.delete'{exchange = ExchangeNameBin,
_, State = #ch{virtual_host = VHostPath}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_not_default_exchange(ExchangeName),
+ check_exchange_deletion(ExchangeName),
check_configure_permitted(ExchangeName, State),
case rabbit_exchange:delete(ExchangeName, IfUnused) of
{error, not_found} ->
@@ -1615,6 +1629,8 @@ i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> queue:len(Msgs);
i(messages_uncommitted, #ch{}) -> 0;
i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks);
i(acks_uncommitted, #ch{}) -> 0;
+i(state, #ch{state = running}) -> credit_flow:state();
+i(state, #ch{state = State}) -> State;
i(prefetch_count, #ch{limiter = Limiter}) ->
rabbit_limiter:get_prefetch_limit(Limiter);
i(client_flow_blocked, #ch{limiter = Limiter}) ->
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 6f36f99df5..f34632867a 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -706,7 +706,14 @@ unsafe_rpc(Node, Mod, Fun, Args) ->
end.
call(Node, {Mod, Fun, Args}) ->
- rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary/1, Args)).
+ rpc_call(Node, Mod, Fun, lists:map(fun list_to_binary_utf8/1, Args)).
+
+list_to_binary_utf8(L) ->
+ B = list_to_binary(L),
+ case rabbit_binary_parser:validate_utf8(B) of
+ ok -> B;
+ error -> throw({error, {not_utf_8, L}})
+ end.
rpc_call(Node, Mod, Fun, Args) ->
rpc:call(Node, Mod, Fun, Args, ?RPC_TIMEOUT).
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index 17ed8563eb..ab8c62fe57 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -51,7 +51,7 @@ stop() ->
init([DefaultVHost]) ->
#exchange{} = rabbit_exchange:declare(
rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME),
- topic, true, false, false, []),
+ topic, true, false, true, []),
{ok, #resource{virtual_host = DefaultVHost,
kind = exchange,
name = ?LOG_EXCH_NAME}}.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 2c19b2bf0e..67effab0ea 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -32,6 +32,7 @@
-define(CLOSING_TIMEOUT, 30).
-define(CHANNEL_TERMINATION_TIMEOUT, 3).
-define(SILENT_CLOSE_DELAY, 3).
+-define(CHANNEL_MIN, 1).
%%--------------------------------------------------------------------------
@@ -40,7 +41,7 @@
stats_timer, channel_sup_sup_pid, buf, buf_len, throttle}).
-record(connection, {name, host, peer_host, port, peer_port,
- protocol, user, timeout_sec, frame_max, vhost,
+ protocol, user, timeout_sec, frame_max, channel_max, vhost,
client_properties, capabilities,
auth_mechanism, auth_state}).
@@ -48,15 +49,14 @@
blocked_sent}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
- send_pend, state, last_blocked_by, last_blocked_age,
- channels]).
+ send_pend, state, channels]).
-define(CREATION_EVENT_KEYS,
[pid, name, port, peer_port, host,
peer_host, ssl, peer_cert_subject, peer_cert_issuer,
peer_cert_validity, auth_mechanism, ssl_protocol,
ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost,
- timeout, frame_max, client_properties]).
+ timeout, frame_max, channel_max, client_properties]).
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
@@ -606,17 +606,26 @@ create_channel(Channel, State) ->
connection = #connection{name = Name,
protocol = Protocol,
frame_max = FrameMax,
+ channel_max = ChannelMax,
user = User,
vhost = VHost,
capabilities = Capabilities}} = State,
- {ok, _ChSupPid, {ChPid, AState}} =
- rabbit_channel_sup_sup:start_channel(
- ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
- Protocol, User, VHost, Capabilities, Collector}),
- MRef = erlang:monitor(process, ChPid),
- put({ch_pid, ChPid}, {Channel, MRef}),
- put({channel, Channel}, {ChPid, AState}),
- {ChPid, AState}.
+ N = length(all_channels()),
+ case ChannelMax == 0 orelse N < ChannelMax of
+ true -> {ok, _ChSupPid, {ChPid, AState}} =
+ rabbit_channel_sup_sup:start_channel(
+ ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Name,
+ Protocol, User, VHost, Capabilities,
+ Collector}),
+ MRef = erlang:monitor(process, ChPid),
+ put({ch_pid, ChPid}, {Channel, MRef}),
+ put({channel, Channel}, {ChPid, AState}),
+ {ok, {ChPid, AState}};
+ false -> {error, rabbit_misc:amqp_error(
+ not_allowed, "number of channels opened (~w) has "
+ "reached the negotiated channel_max (~w)",
+ [N, ChannelMax], 'none')}
+ end.
channel_cleanup(ChPid) ->
case get({ch_pid, ChPid}) of
@@ -664,24 +673,28 @@ handle_frame(Type, Channel, Payload, State) ->
process_frame(Frame, Channel, State) ->
ChKey = {channel, Channel},
- {ChPid, AState} = case get(ChKey) of
- undefined -> create_channel(Channel, State);
- Other -> Other
- end,
- case rabbit_command_assembler:process(Frame, AState) of
- {ok, NewAState} ->
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
- {ok, Method, NewAState} ->
- rabbit_channel:do(ChPid, Method),
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, State);
- {ok, Method, Content, NewAState} ->
- rabbit_channel:do_flow(ChPid, Method, Content),
- put(ChKey, {ChPid, NewAState}),
- post_process_frame(Frame, ChPid, control_throttle(State));
- {error, Reason} ->
- handle_exception(State, Channel, Reason)
+ case (case get(ChKey) of
+ undefined -> create_channel(Channel, State);
+ Other -> {ok, Other}
+ end) of
+ {error, Error} ->
+ handle_exception(State, Channel, Error);
+ {ok, {ChPid, AState}} ->
+ case rabbit_command_assembler:process(Frame, AState) of
+ {ok, NewAState} ->
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ {ok, Method, NewAState} ->
+ rabbit_channel:do(ChPid, Method),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, State);
+ {ok, Method, Content, NewAState} ->
+ rabbit_channel:do_flow(ChPid, Method, Content),
+ put(ChKey, {ChPid, NewAState}),
+ post_process_frame(Frame, ChPid, control_throttle(State));
+ {error, Reason} ->
+ handle_exception(State, Channel, Reason)
+ end
end.
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
@@ -838,38 +851,33 @@ handle_method0(#'connection.secure_ok'{response = Response},
State = #v1{connection_state = securing}) ->
auth_phase(Response, State);
-handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
- heartbeat = ClientHeartbeat},
+handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
+ channel_max = ChannelMax,
+ heartbeat = ClientHeartbeat},
State = #v1{connection_state = tuning,
connection = Connection,
helper_sup = SupPid,
sock = Sock}) ->
- ServerFrameMax = server_frame_max(),
- if FrameMax /= 0 andalso FrameMax < ?FRAME_MIN_SIZE ->
- rabbit_misc:protocol_error(
- not_allowed, "frame_max=~w < ~w min size",
- [FrameMax, ?FRAME_MIN_SIZE]);
- ServerFrameMax /= 0 andalso FrameMax > ServerFrameMax ->
- rabbit_misc:protocol_error(
- not_allowed, "frame_max=~w > ~w max size",
- [FrameMax, ServerFrameMax]);
- true ->
- {ok, Collector} =
- rabbit_connection_helper_sup:start_queue_collector(SupPid),
- Frame = rabbit_binary_generator:build_heartbeat_frame(),
- SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
- Parent = self(),
- ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
- Heartbeater =
- rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat,
- SendFun, ClientHeartbeat, ReceiveFun),
- State#v1{connection_state = opening,
- connection = Connection#connection{
- timeout_sec = ClientHeartbeat,
- frame_max = FrameMax},
- queue_collector = Collector,
- heartbeater = Heartbeater}
- end;
+ ok = validate_negotiated_integer_value(
+ frame_max, ?FRAME_MIN_SIZE, FrameMax),
+ ok = validate_negotiated_integer_value(
+ channel_max, ?CHANNEL_MIN, ChannelMax),
+ {ok, Collector} =
+ rabbit_connection_helper_sup:start_queue_collector(SupPid),
+ Frame = rabbit_binary_generator:build_heartbeat_frame(),
+ SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
+ Parent = self(),
+ ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
+ Heartbeater =
+ rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat,
+ SendFun, ClientHeartbeat, ReceiveFun),
+ State#v1{connection_state = opening,
+ connection = Connection#connection{
+ frame_max = FrameMax,
+ channel_max = ChannelMax,
+ timeout_sec = ClientHeartbeat},
+ queue_collector = Collector,
+ heartbeater = Heartbeater};
handle_method0(#'connection.open'{virtual_host = VHostPath},
State = #v1{connection_state = opening,
@@ -917,13 +925,28 @@ handle_method0(_Method, #v1{connection_state = S}) ->
rabbit_misc:protocol_error(
channel_error, "unexpected method in connection state ~w", [S]).
-server_frame_max() ->
- {ok, FrameMax} = application:get_env(rabbit, frame_max),
- FrameMax.
+validate_negotiated_integer_value(Field, Min, ClientValue) ->
+ ServerValue = get_env(Field),
+ if ClientValue /= 0 andalso ClientValue < Min ->
+ fail_negotiation(Field, min, ServerValue, ClientValue);
+ ServerValue /= 0 andalso ClientValue > ServerValue ->
+ fail_negotiation(Field, max, ServerValue, ClientValue);
+ true ->
+ ok
+ end.
-server_heartbeat() ->
- {ok, Heartbeat} = application:get_env(rabbit, heartbeat),
- Heartbeat.
+fail_negotiation(Field, MinOrMax, ServerValue, ClientValue) ->
+ {S1, S2} = case MinOrMax of
+ min -> {lower, minimum};
+ max -> {higher, maximum}
+ end,
+ rabbit_misc:protocol_error(
+ not_allowed, "negotiated ~w = ~w is ~w than the ~w allowed value (~w)",
+ [Field, ClientValue, S1, S2, ServerValue], 'connection.tune').
+
+get_env(Key) ->
+ {ok, Value} = application:get_env(rabbit, Key),
+ Value.
send_on_channel0(Sock, Method, Protocol) ->
ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
@@ -989,9 +1012,9 @@ auth_phase(Response,
State#v1{connection = Connection#connection{
auth_state = AuthState1}};
{ok, User} ->
- Tune = #'connection.tune'{channel_max = 0,
- frame_max = server_frame_max(),
- heartbeat = server_heartbeat()},
+ Tune = #'connection.tune'{frame_max = get_env(frame_max),
+ channel_max = get_env(channel_max),
+ heartbeat = get_env(heartbeat)},
ok = send_on_channel0(Sock, Tune, Protocol),
State#v1{connection_state = tuning,
connection = Connection#connection{user = User,
@@ -1018,13 +1041,17 @@ i(ssl_hash, S) -> ssl_info(fun ({_, {_, _, H}}) -> H end, S);
i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S);
i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S);
i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S);
-i(state, #v1{connection_state = CS}) -> CS;
-i(last_blocked_by, #v1{throttle = #throttle{last_blocked_by = By}}) -> By;
-i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = never}}) ->
- infinity;
-i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = T}}) ->
- timer:now_diff(erlang:now(), T) / 1000000;
i(channels, #v1{}) -> length(all_channels());
+i(state, #v1{connection_state = ConnectionState,
+ throttle = #throttle{last_blocked_by = BlockedBy,
+ last_blocked_at = T}}) ->
+ Recent = T =/= never andalso timer:now_diff(erlang:now(), T) < 5000000,
+ case {BlockedBy, ConnectionState, Recent} of
+ {resourse, blocked, _} -> blocked;
+ {_, blocking, _} -> blocking;
+ {flow, _, true} -> flow;
+ {_, _, _} -> ConnectionState
+ end;
i(Item, #v1{connection = Conn}) -> ic(Item, Conn).
ic(name, #connection{name = Name}) -> Name;
@@ -1039,6 +1066,7 @@ ic(user, #connection{user = U}) -> U#user.username;
ic(vhost, #connection{vhost = VHost}) -> VHost;
ic(timeout, #connection{timeout_sec = Timeout}) -> Timeout;
ic(frame_max, #connection{frame_max = FrameMax}) -> FrameMax;
+ic(channel_max, #connection{channel_max = ChMax}) -> ChMax;
ic(client_properties, #connection{client_properties = CP}) -> CP;
ic(auth_mechanism, #connection{auth_mechanism = none}) -> none;
ic(auth_mechanism, #connection{auth_mechanism = {Name, _Mod}}) -> Name;
@@ -1079,8 +1107,8 @@ emit_stats(State) ->
%% If we emit an event which looks like we are in flow control, it's not a
%% good idea for it to be our last even if we go idle. Keep emitting
%% events, either we stay busy or we drop out of flow control.
- case proplists:get_value(last_blocked_age, Infos) < 5 of
- true -> ensure_stats_timer(State1);
+ case proplists:get_value(state, Infos) of
+ flow -> ensure_stats_timer(State1);
_ -> State1
end.
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 6f95ef60ad..9037246172 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -46,6 +46,7 @@
-rabbit_upgrade({exchange_decorators, mnesia, [policy]}).
-rabbit_upgrade({policy_apply_to, mnesia, [runtime_parameters]}).
-rabbit_upgrade({queue_decorators, mnesia, [gm_pids]}).
+-rabbit_upgrade({internal_system_x, mnesia, [exchange_decorators]}).
%% -------------------------------------------------------------------
@@ -74,6 +75,7 @@
-spec(exchange_decorators/0 :: () -> 'ok').
-spec(policy_apply_to/0 :: () -> 'ok').
-spec(queue_decorators/0 :: () -> 'ok').
+-spec(internal_system_x/0 :: () -> 'ok').
-endif.
@@ -340,6 +342,19 @@ queue_decorators(Table) ->
[name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
sync_slave_pids, policy, gm_pids, decorators]).
+internal_system_x() ->
+ transform(
+ rabbit_durable_exchange,
+ fun ({exchange, Name = {resource, _, _, <<"amq.rabbitmq.", _/binary>>},
+ Type, Dur, AutoDel, _Int, Args, Scratches, Policy, Decorators}) ->
+ {exchange, Name, Type, Dur, AutoDel, true, Args, Scratches,
+ Policy, Decorators};
+ (X) ->
+ X
+ end,
+ [name, type, durable, auto_delete, internal, arguments, scratches, policy,
+ decorators]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 8d013d4337..047bce7780 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -60,15 +60,17 @@ add(VHostPath) ->
(ok, false) ->
[rabbit_exchange:declare(
rabbit_misc:r(VHostPath, exchange, Name),
- Type, true, false, false, []) ||
- {Name,Type} <-
- [{<<"">>, direct},
- {<<"amq.direct">>, direct},
- {<<"amq.topic">>, topic},
- {<<"amq.match">>, headers}, %% per 0-9-1 pdf
- {<<"amq.headers">>, headers}, %% per 0-9-1 xml
- {<<"amq.fanout">>, fanout},
- {<<"amq.rabbitmq.trace">>, topic}]],
+ Type, true, false, Internal, []) ||
+ {Name, Type, Internal} <-
+ [{<<"">>, direct, false},
+ {<<"amq.direct">>, direct, false},
+ {<<"amq.topic">>, topic, false},
+ %% per 0-9-1 pdf
+ {<<"amq.match">>, headers, false},
+ %% per 0-9-1 xml
+ {<<"amq.headers">>, headers, false},
+ {<<"amq.fanout">>, fanout, false},
+ {<<"amq.rabbitmq.trace">>, topic, true}]],
ok
end),
rabbit_event:notify(vhost_created, info(VHostPath)),