summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-05-24 18:03:46 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-05-24 18:03:46 +0100
commit0c0276ea8642c868772a0a145599cb3ec51f6b42 (patch)
treea9964c713cfad48fcc70f4bd676aef82fd49f6f8
parented518e836df619635776c4d93696e487d4a7a198 (diff)
parentdfb7e59b0fdc4972bc2a65a7a0842a54f8f99645 (diff)
downloadrabbitmq-server-git-0c0276ea8642c868772a0a145599cb3ec51f6b42.tar.gz
Merge default into bug23554
-rw-r--r--docs/rabbitmqctl.1.xml66
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--src/rabbit_basic.erl61
-rw-r--r--src/rabbit_channel.erl37
-rw-r--r--src/rabbit_control.erl29
-rw-r--r--src/rabbit_prelaunch.erl9
-rw-r--r--src/rabbit_trace.erl119
-rw-r--r--src/rabbit_upgrade_functions.erl21
-rw-r--r--src/rabbit_vhost.erl36
9 files changed, 229 insertions, 150 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 908ca97372..b825a1d0da 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -624,14 +624,31 @@
</listitem>
</varlistentry>
- <varlistentry>
- <term><cmdsynopsis><command>list_vhosts</command></cmdsynopsis></term>
+ <varlistentry role="usage-has-option-list">
+ <term><cmdsynopsis><command>list_vhosts</command> <arg choice="opt" role="usage-option-list"><replaceable>vhostinfoitem</replaceable> ...</arg></cmdsynopsis></term>
<listitem>
<para>
Lists virtual hosts.
</para>
+ <para>
+ The <command>vhostinfoitem</command> parameter is used to indicate which
+ virtual host information items to include in the results. The column order in the
+ results will match the order of the parameters.
+ <command>vhostinfoitem</command> can take any value from
+ the list that follows:
+ </para>
+ <variablelist>
+ <varlistentry>
+ <term>name</term>
+ <listitem><para>The name of the virtual host with non-ASCII characters escaped as in C.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>tracing</term>
+ <listitem><para>Whether tracing is enabled for this virtual host.</para></listitem>
+ </varlistentry>
+ </variablelist>
<para role="example-prefix">For example:</para>
- <screen role="example">rabbitmqctl list_vhosts</screen>
+ <screen role="example">rabbitmqctl list_vhosts name tracing</screen>
<para role="example">
This command instructs the RabbitMQ broker to list all
virtual hosts.
@@ -1266,59 +1283,34 @@
</refsect2>
<refsect2>
- <title>Configuration variables</title>
- <para>
- Some configuration values can be changed at run time. Note
- that this does not apply to all variables; many are only read
- at startup - changing them will have no effect.
- </para>
+ <title>Message Tracing</title>
<variablelist>
<varlistentry>
- <term><cmdsynopsis><command>set_env</command> <arg choice="req"><replaceable>variable</replaceable></arg> <arg choice="req"><replaceable>value</replaceable></arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>trace_on</command> <arg choice="opt">-p <replaceable>vhost</replaceable></arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
- <term>variable</term>
- <listitem><para>The name of the variable to set, as the string form of an Erlang term.</para></listitem>
- </varlistentry>
- <varlistentry>
- <term>value</term>
- <listitem><para>The value to set it to, as the string form of an Erlang term.</para></listitem>
- </varlistentry>
- </variablelist>
- <para>
- Set the value of a configuration variable.
- </para>
- </listitem>
- </varlistentry>
-
- <varlistentry>
- <term><cmdsynopsis><command>get_env</command> <arg choice="req"><replaceable>variable</replaceable></arg></cmdsynopsis></term>
- <listitem>
- <variablelist>
- <varlistentry>
- <term>variable</term>
- <listitem><para>The name of the variable to get, as the string form of an Erlang term.</para></listitem>
+ <term>vhost</term>
+ <listitem><para>The name of the virtual host for which to start tracing.</para></listitem>
</varlistentry>
</variablelist>
<para>
- Get the value of a configuration variable, printing either
- {ok,<command>Value</command>} or undefined.
+ Starts tracing.
</para>
</listitem>
</varlistentry>
<varlistentry>
- <term><cmdsynopsis><command>unset_env</command> <arg choice="req"><replaceable>variable</replaceable></arg></cmdsynopsis></term>
+ <term><cmdsynopsis><command>trace_off</command> <arg choice="opt">-p <replaceable>vhost</replaceable></arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
- <term>variable</term>
- <listitem><para>The name of the variable to clear, as the string form of an Erlang term.</para></listitem>
+ <term>vhost</term>
+ <listitem><para>The name of the virtual host for which to stop tracing.</para></listitem>
</varlistentry>
</variablelist>
<para>
- Clear the value of a configuration variable.
+ Stops tracing.
</para>
</listitem>
</varlistentry>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 014c18b0c6..7dabb8c346 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -36,6 +36,7 @@
{auth_mechanisms, ['PLAIN', 'AMQPLAIN']},
{auth_backends, [rabbit_auth_backend_internal]},
{delegate_count, 16},
+ {trace_vhosts, []},
{tcp_listen_options, [binary,
{packet, raw},
{reuseaddr, true},
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 3cf73e80d2..fa7e3a5ac0 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -32,6 +32,9 @@
({ok, rabbit_router:routing_result(), [pid()]}
| rabbit_types:error('not_found'))).
+-type(exchange_input() :: (rabbit_types:exchange() | rabbit_exchange:name())).
+-type(body_input() :: (binary() | [binary()])).
+
-spec(publish/1 ::
(rabbit_types:delivery()) -> publish_result()).
-spec(delivery/5 ::
@@ -48,14 +51,14 @@
-spec(properties/1 ::
(properties_input()) -> rabbit_framing:amqp_property_record()).
-spec(publish/4 ::
- (rabbit_exchange:name(), rabbit_router:routing_key(),
- properties_input(), binary()) -> publish_result()).
+ (exchange_input(), rabbit_router:routing_key(), properties_input(),
+ body_input()) -> publish_result()).
-spec(publish/7 ::
- (rabbit_exchange:name(), rabbit_router:routing_key(),
- boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()),
- properties_input(), binary()) -> publish_result()).
--spec(build_content/2 :: (rabbit_framing:amqp_property_record(), binary()) ->
- rabbit_types:content()).
+ (exchange_input(), rabbit_router:routing_key(), boolean(), boolean(),
+ rabbit_types:maybe(rabbit_types:txn()), properties_input(),
+ body_input()) -> publish_result()).
+-spec(build_content/2 :: (rabbit_framing:amqp_property_record(),
+ binary() | [binary()]) -> rabbit_types:content()).
-spec(from_content/1 :: (rabbit_types:content()) ->
{rabbit_framing:amqp_property_record(), binary()}).
@@ -66,18 +69,18 @@
publish(Delivery = #delivery{
message = #basic_message{exchange_name = ExchangeName}}) ->
case rabbit_exchange:lookup(ExchangeName) of
- {ok, X} ->
- {RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery),
- {ok, RoutingRes, DeliveredQPids};
- Other ->
- Other
+ {ok, X} -> publish(X, Delivery);
+ Other -> Other
end.
delivery(Mandatory, Immediate, Txn, Message, MsgSeqNo) ->
#delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn,
sender = self(), message = Message, msg_seq_no = MsgSeqNo}.
-build_content(Properties, BodyBin) ->
+build_content(Properties, BodyBin) when is_binary(BodyBin) ->
+ build_content(Properties, [BodyBin]);
+
+build_content(Properties, PFR) ->
%% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
{ClassId, _MethodId} =
rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
@@ -85,7 +88,7 @@ build_content(Properties, BodyBin) ->
properties = Properties,
properties_bin = none,
protocol = none,
- payload_fragments_rev = [BodyBin]}.
+ payload_fragments_rev = PFR}.
from_content(Content) ->
#content{class_id = ClassId,
@@ -126,9 +129,9 @@ message(ExchangeName, RoutingKey,
{error, _Reason} = Error -> Error
end.
-message(ExchangeName, RoutingKey, RawProperties, BodyBin) ->
+message(ExchangeName, RoutingKey, RawProperties, Body) ->
Properties = properties(RawProperties),
- Content = build_content(Properties, BodyBin),
+ Content = build_content(Properties, Body),
{ok, Msg} = message(ExchangeName, RoutingKey, Content),
Msg.
@@ -153,18 +156,26 @@ indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1).
%% Convenience function, for avoiding round-trips in calls across the
%% erlang distributed network.
-publish(ExchangeName, RoutingKeyBin, Properties, BodyBin) ->
- publish(ExchangeName, RoutingKeyBin, false, false, none, Properties,
- BodyBin).
+publish(Exchange, RoutingKeyBin, Properties, Body) ->
+ publish(Exchange, RoutingKeyBin, false, false, none, Properties,
+ Body).
%% Convenience function, for avoiding round-trips in calls across the
%% erlang distributed network.
-publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties,
- BodyBin) ->
- publish(delivery(Mandatory, Immediate, Txn,
- message(ExchangeName, RoutingKeyBin,
- properties(Properties), BodyBin),
- undefined)).
+publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Txn,
+ Props, Body) ->
+ publish(X, delivery(Mandatory, Immediate, Txn,
+ message(XName, RKey, properties(Props), Body),
+ undefined));
+publish(XName, RKey, Mandatory, Immediate, Txn, Props, Body) ->
+ case rabbit_exchange:lookup(XName) of
+ {ok, X} -> publish(X, RKey, Mandatory, Immediate, Txn, Props, Body);
+ Err -> Err
+ end.
+
+publish(X, Delivery) ->
+ {RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery),
+ {ok, RoutingRes, DeliveredQPids}.
is_message_persistent(#content{properties = #'P_basic'{
delivery_mode = Mode}}) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f078886224..991b0b065c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -23,7 +23,7 @@
-export([start_link/10, do/2, do/3, flush/1, shutdown/1]).
-export([send_command/2, deliver/4, flushed/2, confirm/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
--export([emit_stats/1, ready_for_close/1]).
+-export([refresh_config_all/0, emit_stats/1, ready_for_close/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
@@ -35,7 +35,7 @@
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, consumer_monitors, queue_collector_pid,
stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
- unconfirmed_qm, confirmed, capabilities}).
+ unconfirmed_qm, confirmed, capabilities, trace_state}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -89,6 +89,7 @@
-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
-spec(info_all/0 :: () -> [rabbit_types:infos()]).
-spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]).
+-spec(refresh_config_all/0 :: () -> 'ok').
-spec(emit_stats/1 :: (pid()) -> 'ok').
-spec(ready_for_close/1 :: (pid()) -> 'ok').
@@ -146,6 +147,11 @@ info_all() ->
info_all(Items) ->
rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()).
+refresh_config_all() ->
+ rabbit_misc:upmap(
+ fun (C) -> gen_server2:call(C, refresh_config) end, list()),
+ ok.
+
emit_stats(Pid) ->
gen_server2:cast(Pid, emit_stats).
@@ -185,7 +191,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
unconfirmed_mq = gb_trees:empty(),
unconfirmed_qm = gb_trees:empty(),
confirmed = [],
- capabilities = Capabilities},
+ capabilities = Capabilities,
+ trace_state = rabbit_trace:init(VHost)},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
rabbit_event:if_enabled(StatsTimer,
fun() -> internal_emit_stats(State) end),
@@ -218,6 +225,9 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
+handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) ->
+ reply(ok, State#ch{trace_state = rabbit_trace:init(VHost)});
+
handle_call(_Request, _From, State) ->
noreply(State).
@@ -263,8 +273,9 @@ handle_cast({deliver, ConsumerTag, AckRequired,
#basic_message{exchange_name = ExchangeName,
routing_keys = [RoutingKey | _CcRoutes],
content = Content}}},
- State = #ch{writer_pid = WriterPid,
- next_tag = DeliveryTag}) ->
+ State = #ch{writer_pid = WriterPid,
+ next_tag = DeliveryTag,
+ trace_state = TraceState}) ->
State1 = lock_message(AckRequired,
ack_record(DeliveryTag, ConsumerTag, Msg),
State),
@@ -281,7 +292,7 @@ handle_cast({deliver, ConsumerTag, AckRequired,
true -> deliver;
false -> deliver_no_ack
end, State),
- rabbit_trace:tap_trace_out(Msg),
+ rabbit_trace:tap_trace_out(Msg, TraceState),
noreply(State1#ch{next_tag = DeliveryTag + 1});
handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
@@ -591,7 +602,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
immediate = Immediate},
Content, State = #ch{virtual_host = VHostPath,
transaction_id = TxnKey,
- confirm_enabled = ConfirmEnabled}) ->
+ confirm_enabled = ConfirmEnabled,
+ trace_state = TraceState}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
@@ -608,7 +620,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
end,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
- rabbit_trace:tap_trace_in(Message),
+ rabbit_trace:tap_trace_in(Message, TraceState),
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
Exchange,
@@ -656,9 +668,10 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
- _, State = #ch{writer_pid = WriterPid,
- conn_pid = ConnPid,
- next_tag = DeliveryTag}) ->
+ _, State = #ch{writer_pid = WriterPid,
+ conn_pid = ConnPid,
+ next_tag = DeliveryTag,
+ trace_state = TraceState}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_exclusive_access_or_die(
@@ -677,7 +690,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
true -> get_no_ack;
false -> get
end, State),
- rabbit_trace:tap_trace_out(Msg),
+ rabbit_trace:tap_trace_out(Msg, TraceState),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.get_ok'{delivery_tag = DeliveryTag,
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index b4b6255ebd..117496f561 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -221,9 +221,10 @@ action(delete_vhost, Node, Args = [_VHostPath], _Opts, Inform) ->
Inform("Deleting vhost ~p", Args),
call(Node, {rabbit_vhost, delete, Args});
-action(list_vhosts, Node, [], _Opts, Inform) ->
+action(list_vhosts, Node, Args, _Opts, Inform) ->
Inform("Listing vhosts", []),
- display_list(call(Node, {rabbit_vhost, list, []}));
+ ArgAtoms = default_if_empty(Args, [name]),
+ display_info_list(call(Node, {rabbit_vhost, info_all, []}), ArgAtoms);
action(list_user_permissions, Node, Args = [_Username], _Opts, Inform) ->
Inform("Listing permissions for user ~p", Args),
@@ -294,18 +295,15 @@ action(list_consumers, Node, _Args, Opts, Inform) ->
Other -> Other
end;
-action(set_env, Node, [Var, Term], _Opts, Inform) ->
- Inform("Setting control variable ~s for node ~p to ~s", [Var, Node, Term]),
- rpc_call(Node, application, set_env, [rabbit, parse(Var), parse(Term)]);
-
-action(get_env, Node, [Var], _Opts, Inform) ->
- Inform("Getting control variable ~s for node ~p", [Var, Node]),
- Val = rpc_call(Node, application, get_env, [rabbit, parse(Var)]),
- io:format("~p~n", [Val]);
+action(trace_on, Node, [], Opts, Inform) ->
+ VHost = proplists:get_value(?VHOST_OPT, Opts),
+ Inform("Starting tracing for vhost ~p", [VHost]),
+ rpc_call(Node, rabbit_trace, start, [list_to_binary(VHost)]);
-action(unset_env, Node, [Var], _Opts, Inform) ->
- Inform("Clearing control variable ~s for node ~p", [Var, Node]),
- rpc_call(Node, application, unset_env, [rabbit, parse(Var)]);
+action(trace_off, Node, [], Opts, Inform) ->
+ VHost = proplists:get_value(?VHOST_OPT, Opts),
+ Inform("Stopping tracing for vhost ~p", [VHost]),
+ rpc_call(Node, rabbit_trace, stop, [list_to_binary(VHost)]);
action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
@@ -350,11 +348,6 @@ default_if_empty(List, Default) when is_list(List) ->
true -> [list_to_atom(X) || X <- List]
end.
-parse(Str) ->
- {ok, Tokens, _} = erl_scan:string(Str ++ "."),
- {ok, Term} = erl_parse:parse_term(Tokens),
- Term.
-
display_info_list(Results, InfoItemKeys) when is_list(Results) ->
lists:foreach(
fun (Result) -> display_row(
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index 2512a60280..92829e4918 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -69,9 +69,13 @@ start() ->
%% Write it out to $RABBITMQ_PLUGINS_EXPAND_DIR/rabbit.rel
rabbit_misc:write_file(RootName ++ ".rel", io_lib:format("~p.~n", [RDesc])),
+ %% We exclude mochiweb due to its optional use of fdsrv.
+ XRefExclude = [mochiweb],
+
%% Compile the script
ScriptFile = RootName ++ ".script",
- case systools:make_script(RootName, [local, silent, exref]) of
+ case systools:make_script(RootName, [local, silent,
+ {exref, AllApps -- XRefExclude}]) of
{ok, Module, Warnings} ->
%% This gets lots of spurious no-source warnings when we
%% have .ez files, so we want to supress them to prevent
@@ -93,7 +97,8 @@ start() ->
end]),
case length(WarningStr) of
0 -> ok;
- _ -> io:format("~s", [WarningStr])
+ _ -> S = string:copies("*", 80),
+ io:format("~n~s~n~s~s~n~n", [S, WarningStr, S])
end,
ok;
{error, Module, Error} ->
diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl
index 2d15e7fc42..7d36856a9d 100644
--- a/src/rabbit_trace.erl
+++ b/src/rabbit_trace.erl
@@ -16,85 +16,104 @@
-module(rabbit_trace).
--export([tap_trace_in/1, tap_trace_out/1]).
+-export([init/1, tracing/1, tap_trace_in/2, tap_trace_out/2, start/1, stop/1]).
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
+-define(TRACE_VHOSTS, trace_vhosts).
+-define(XNAME, <<"amq.rabbitmq.trace">>).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(tap_trace_in/1 :: (rabbit_types:basic_message()) -> 'ok').
--spec(tap_trace_out/1 :: (rabbit_amqqueue:qmsg()) -> 'ok').
+-type(state() :: rabbit_types:exchange() | 'none').
+
+-spec(init/1 :: (rabbit_types:vhost()) -> state()).
+-spec(tracing/1 :: (rabbit_types:vhost()) -> boolean()).
+-spec(tap_trace_in/2 :: (rabbit_types:basic_message(), state()) -> 'ok').
+-spec(tap_trace_out/2 :: (rabbit_amqqueue:qmsg(), state()) -> 'ok').
+
+-spec(start/1 :: (rabbit_types:vhost()) -> 'ok').
+-spec(stop/1 :: (rabbit_types:vhost()) -> 'ok').
-endif.
%%----------------------------------------------------------------------------
-tap_trace_in(Msg) ->
- maybe_trace(Msg, <<"publish">>, xname(Msg), []).
+init(VHost) ->
+ case tracing(VHost) of
+ false -> none;
+ true -> {ok, X} = rabbit_exchange:lookup(
+ rabbit_misc:r(VHost, exchange, ?XNAME)),
+ X
+ end.
+
+tracing(VHost) ->
+ {ok, VHosts} = application:get_env(rabbit, ?TRACE_VHOSTS),
+ lists:member(VHost, VHosts).
+
+tap_trace_in(Msg = #basic_message{exchange_name = #resource{name = XName}},
+ TraceX) ->
+ maybe_trace(TraceX, Msg, <<"publish">>, XName, []).
-tap_trace_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg}) ->
+tap_trace_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg},
+ TraceX) ->
RedeliveredNum = case Redelivered of true -> 1; false -> 0 end,
- maybe_trace(Msg, <<"deliver">>, QName,
+ maybe_trace(TraceX, Msg, <<"deliver">>, QName,
[{<<"redelivered">>, signedint, RedeliveredNum}]).
-xname(#basic_message{exchange_name = #resource{name = XName}}) -> XName.
-vhost(#basic_message{exchange_name = #resource{virtual_host = VHost}}) -> VHost.
-
-maybe_trace(Msg, RKPrefix, RKSuffix, Extra) ->
- XName = xname(Msg),
- case trace_exchange(vhost(Msg)) of
- none -> ok;
- XName -> ok;
- TraceX -> case catch trace(TraceX, Msg, RKPrefix, RKSuffix, Extra) of
- {'EXIT', R} -> rabbit_log:info("Trace died: ~p~n", [R]);
- ok -> ok
- end
- end.
+%%----------------------------------------------------------------------------
-trace_exchange(VHost) ->
- case application:get_env(rabbit, trace_exchanges) of
- undefined -> none;
- {ok, Xs} -> proplists:get_value(VHost, Xs, none)
- end.
+start(VHost) ->
+ update_config(fun (VHosts) -> [VHost | VHosts -- [VHost]] end).
+
+stop(VHost) ->
+ update_config(fun (VHosts) -> VHosts -- [VHost] end).
+
+update_config(Fun) ->
+ {ok, VHosts0} = application:get_env(rabbit, ?TRACE_VHOSTS),
+ VHosts = Fun(VHosts0),
+ application:set_env(rabbit, ?TRACE_VHOSTS, VHosts),
+ rabbit_channel:refresh_config_all(),
+ ok.
-trace(TraceX, Msg0, RKPrefix, RKSuffix, Extra) ->
- Msg = ensure_content_decoded(Msg0),
- rabbit_basic:publish(rabbit_misc:r(vhost(Msg), exchange, TraceX),
- <<RKPrefix/binary, ".", RKSuffix/binary>>,
- #'P_basic'{headers = msg_to_table(Msg) ++ Extra},
- payload(Msg)),
+%%----------------------------------------------------------------------------
+
+maybe_trace(none, _Msg, _RKPrefix, _RKSuffix, _Extra) ->
+ ok;
+maybe_trace(#exchange{name = Name}, #basic_message{exchange_name = Name},
+ _RKPrefix, _RKSuffix, _Extra) ->
+ ok;
+maybe_trace(X, Msg = #basic_message{content = #content{
+ payload_fragments_rev = PFR}},
+ RKPrefix, RKSuffix, Extra) ->
+ {ok, _, _} = rabbit_basic:publish(
+ X, <<RKPrefix/binary, ".", RKSuffix/binary>>,
+ #'P_basic'{headers = msg_to_table(Msg) ++ Extra}, PFR),
ok.
msg_to_table(#basic_message{exchange_name = #resource{name = XName},
routing_keys = RoutingKeys,
- content = #content{properties = Props}}) ->
+ content = Content}) ->
+ #content{properties = Props} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
{PropsTable, _Ix} =
- lists:foldl(
- fun (K, {L, Ix}) ->
- V = element(Ix, Props),
- NewL = case V of
- undefined -> L;
- _ -> [{a2b(K), type(V), V} | L]
- end,
- {NewL, Ix + 1}
- end, {[], 2}, record_info(fields, 'P_basic')),
+ lists:foldl(fun (K, {L, Ix}) ->
+ V = element(Ix, Props),
+ NewL = case V of
+ undefined -> L;
+ _ -> [{a2b(K), type(V), V} | L]
+ end,
+ {NewL, Ix + 1}
+ end, {[], 2}, record_info(fields, 'P_basic')),
[{<<"exchange_name">>, longstr, XName},
{<<"routing_keys">>, array, [{longstr, K} || K <- RoutingKeys]},
{<<"properties">>, table, PropsTable},
{<<"node">>, longstr, a2b(node())}].
-payload(#basic_message{content = #content{payload_fragments_rev = PFR}}) ->
- list_to_binary(lists:reverse(PFR)).
-
-ensure_content_decoded(Msg = #basic_message{content = Content}) ->
- Msg#basic_message{content = rabbit_binary_parser:ensure_content_decoded(
- Content)}.
-
-a2b(A) ->
- list_to_binary(atom_to_list(A)).
+a2b(A) -> list_to_binary(atom_to_list(A)).
type(V) when is_list(V) -> table;
type(V) when is_integer(V) -> signedint;
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 325156b1f3..a6f02a0e0a 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -28,6 +28,7 @@
-rabbit_upgrade({topic_trie, mnesia, []}).
-rabbit_upgrade({semi_durable_route, mnesia, []}).
-rabbit_upgrade({exchange_event_serial, mnesia, []}).
+-rabbit_upgrade({trace_exchanges, mnesia, []}).
-rabbit_upgrade({mirror_pids, mnesia, []}).
%% -------------------------------------------------------------------
@@ -42,6 +43,7 @@
-spec(topic_trie/0 :: () -> 'ok').
-spec(exchange_event_serial/0 :: () -> 'ok').
-spec(semi_durable_route/0 :: () -> 'ok').
+-spec(trace_exchanges/0 :: () -> 'ok').
-spec(mirror_pids/0 :: () -> 'ok').
-endif.
@@ -115,6 +117,12 @@ exchange_event_serial() ->
create(rabbit_exchange_serial, [{record_name, exchange_serial},
{attributes, [name, next]}]).
+trace_exchanges() ->
+ [declare_exchange(
+ rabbit_misc:r(VHost, exchange, <<"amq.rabbitmq.trace">>), topic) ||
+ VHost <- rabbit_vhost:list()],
+ ok.
+
mirror_pids() ->
Tables = [rabbit_queue, rabbit_durable_queue],
AddMirrorPidsFun =
@@ -144,3 +152,16 @@ transform(TableName, Fun, FieldList, NewRecordName) ->
create(Tab, TabDef) ->
{atomic, ok} = mnesia:create_table(Tab, TabDef),
ok.
+
+%% Dumb replacement for rabbit_exchange:declare that does not require
+%% the exchange type registry or worker pool to be running by dint of
+%% not validating anything and assuming the exchange type does not
+%% require serialisation.
+declare_exchange(XName, Type) ->
+ X = #exchange{name = XName,
+ type = Type,
+ durable = true,
+ auto_delete = false,
+ internal = false,
+ arguments = []},
+ ok = mnesia:dirty_write(rabbit_durable_exchange, X).
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 24c130edd6..5270d80bd5 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -21,6 +21,7 @@
%%----------------------------------------------------------------------------
-export([add/1, delete/1, exists/1, list/0, with/2]).
+-export([info/1, info/2, info_all/0, info_all/1]).
-ifdef(use_specs).
@@ -30,10 +31,18 @@
-spec(list/0 :: () -> [rabbit_types:vhost()]).
-spec(with/2 :: (rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A).
+-spec(info/1 :: (rabbit_types:vhost()) -> rabbit_types:infos()).
+-spec(info/2 :: (rabbit_types:vhost(), rabbit_types:info_keys())
+ -> rabbit_types:infos()).
+-spec(info_all/0 :: () -> [rabbit_types:infos()]).
+-spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]).
+
-endif.
%%----------------------------------------------------------------------------
+-define(INFO_KEYS, [name, tracing]).
+
add(VHostPath) ->
R = rabbit_misc:execute_mnesia_transaction(
fun () ->
@@ -51,12 +60,13 @@ add(VHostPath) ->
rabbit_misc:r(VHostPath, exchange, Name),
Type, true, false, false, []) ||
{Name,Type} <-
- [{<<"">>, direct},
- {<<"amq.direct">>, direct},
- {<<"amq.topic">>, topic},
- {<<"amq.match">>, headers}, %% per 0-9-1 pdf
- {<<"amq.headers">>, headers}, %% per 0-9-1 xml
- {<<"amq.fanout">>, fanout}]],
+ [{<<"">>, direct},
+ {<<"amq.direct">>, direct},
+ {<<"amq.topic">>, topic},
+ {<<"amq.match">>, headers}, %% per 0-9-1 pdf
+ {<<"amq.headers">>, headers}, %% per 0-9-1 xml
+ {<<"amq.fanout">>, fanout},
+ {<<"amq.rabbitmq.trace">>, topic}]],
ok
end),
rabbit_log:info("Added vhost ~p~n", [VHostPath]),
@@ -104,3 +114,17 @@ with(VHostPath, Thunk) ->
Thunk()
end
end.
+
+%%----------------------------------------------------------------------------
+
+infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items].
+
+i(name, VHost) -> VHost;
+i(tracing, VHost) -> rabbit_trace:tracing(VHost);
+i(Item, _) -> throw({bad_argument, Item}).
+
+info(VHost) -> infos(?INFO_KEYS, VHost).
+info(VHost, Items) -> infos(Items, VHost).
+
+info_all() -> info_all(?INFO_KEYS).
+info_all(Items) -> [info(VHost, Items) || VHost <- list()].