diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-05-24 18:03:46 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-05-24 18:03:46 +0100 |
| commit | 0c0276ea8642c868772a0a145599cb3ec51f6b42 (patch) | |
| tree | a9964c713cfad48fcc70f4bd676aef82fd49f6f8 | |
| parent | ed518e836df619635776c4d93696e487d4a7a198 (diff) | |
| parent | dfb7e59b0fdc4972bc2a65a7a0842a54f8f99645 (diff) | |
| download | rabbitmq-server-git-0c0276ea8642c868772a0a145599cb3ec51f6b42.tar.gz | |
Merge default into bug23554
| -rw-r--r-- | docs/rabbitmqctl.1.xml | 66 | ||||
| -rw-r--r-- | ebin/rabbit_app.in | 1 | ||||
| -rw-r--r-- | src/rabbit_basic.erl | 61 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 37 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_prelaunch.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_trace.erl | 119 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 36 |
9 files changed, 229 insertions, 150 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 908ca97372..b825a1d0da 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -624,14 +624,31 @@ </listitem> </varlistentry> - <varlistentry> - <term><cmdsynopsis><command>list_vhosts</command></cmdsynopsis></term> + <varlistentry role="usage-has-option-list"> + <term><cmdsynopsis><command>list_vhosts</command> <arg choice="opt" role="usage-option-list"><replaceable>vhostinfoitem</replaceable> ...</arg></cmdsynopsis></term> <listitem> <para> Lists virtual hosts. </para> + <para> + The <command>vhostinfoitem</command> parameter is used to indicate which + virtual host information items to include in the results. The column order in the + results will match the order of the parameters. + <command>vhostinfoitem</command> can take any value from + the list that follows: + </para> + <variablelist> + <varlistentry> + <term>name</term> + <listitem><para>The name of the virtual host with non-ASCII characters escaped as in C.</para></listitem> + </varlistentry> + <varlistentry> + <term>tracing</term> + <listitem><para>Whether tracing is enabled for this virtual host.</para></listitem> + </varlistentry> + </variablelist> <para role="example-prefix">For example:</para> - <screen role="example">rabbitmqctl list_vhosts</screen> + <screen role="example">rabbitmqctl list_vhosts name tracing</screen> <para role="example"> This command instructs the RabbitMQ broker to list all virtual hosts. @@ -1266,59 +1283,34 @@ </refsect2> <refsect2> - <title>Configuration variables</title> - <para> - Some configuration values can be changed at run time. Note - that this does not apply to all variables; many are only read - at startup - changing them will have no effect. - </para> + <title>Message Tracing</title> <variablelist> <varlistentry> - <term><cmdsynopsis><command>set_env</command> <arg choice="req"><replaceable>variable</replaceable></arg> <arg choice="req"><replaceable>value</replaceable></arg></cmdsynopsis></term> + <term><cmdsynopsis><command>trace_on</command> <arg choice="opt">-p <replaceable>vhost</replaceable></arg></cmdsynopsis></term> <listitem> <variablelist> <varlistentry> - <term>variable</term> - <listitem><para>The name of the variable to set, as the string form of an Erlang term.</para></listitem> - </varlistentry> - <varlistentry> - <term>value</term> - <listitem><para>The value to set it to, as the string form of an Erlang term.</para></listitem> - </varlistentry> - </variablelist> - <para> - Set the value of a configuration variable. - </para> - </listitem> - </varlistentry> - - <varlistentry> - <term><cmdsynopsis><command>get_env</command> <arg choice="req"><replaceable>variable</replaceable></arg></cmdsynopsis></term> - <listitem> - <variablelist> - <varlistentry> - <term>variable</term> - <listitem><para>The name of the variable to get, as the string form of an Erlang term.</para></listitem> + <term>vhost</term> + <listitem><para>The name of the virtual host for which to start tracing.</para></listitem> </varlistentry> </variablelist> <para> - Get the value of a configuration variable, printing either - {ok,<command>Value</command>} or undefined. + Starts tracing. </para> </listitem> </varlistentry> <varlistentry> - <term><cmdsynopsis><command>unset_env</command> <arg choice="req"><replaceable>variable</replaceable></arg></cmdsynopsis></term> + <term><cmdsynopsis><command>trace_off</command> <arg choice="opt">-p <replaceable>vhost</replaceable></arg></cmdsynopsis></term> <listitem> <variablelist> <varlistentry> - <term>variable</term> - <listitem><para>The name of the variable to clear, as the string form of an Erlang term.</para></listitem> + <term>vhost</term> + <listitem><para>The name of the virtual host for which to stop tracing.</para></listitem> </varlistentry> </variablelist> <para> - Clear the value of a configuration variable. + Stops tracing. </para> </listitem> </varlistentry> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 014c18b0c6..7dabb8c346 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -36,6 +36,7 @@ {auth_mechanisms, ['PLAIN', 'AMQPLAIN']}, {auth_backends, [rabbit_auth_backend_internal]}, {delegate_count, 16}, + {trace_vhosts, []}, {tcp_listen_options, [binary, {packet, raw}, {reuseaddr, true}, diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 3cf73e80d2..fa7e3a5ac0 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -32,6 +32,9 @@ ({ok, rabbit_router:routing_result(), [pid()]} | rabbit_types:error('not_found'))). +-type(exchange_input() :: (rabbit_types:exchange() | rabbit_exchange:name())). +-type(body_input() :: (binary() | [binary()])). + -spec(publish/1 :: (rabbit_types:delivery()) -> publish_result()). -spec(delivery/5 :: @@ -48,14 +51,14 @@ -spec(properties/1 :: (properties_input()) -> rabbit_framing:amqp_property_record()). -spec(publish/4 :: - (rabbit_exchange:name(), rabbit_router:routing_key(), - properties_input(), binary()) -> publish_result()). + (exchange_input(), rabbit_router:routing_key(), properties_input(), + body_input()) -> publish_result()). -spec(publish/7 :: - (rabbit_exchange:name(), rabbit_router:routing_key(), - boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()), - properties_input(), binary()) -> publish_result()). --spec(build_content/2 :: (rabbit_framing:amqp_property_record(), binary()) -> - rabbit_types:content()). + (exchange_input(), rabbit_router:routing_key(), boolean(), boolean(), + rabbit_types:maybe(rabbit_types:txn()), properties_input(), + body_input()) -> publish_result()). +-spec(build_content/2 :: (rabbit_framing:amqp_property_record(), + binary() | [binary()]) -> rabbit_types:content()). -spec(from_content/1 :: (rabbit_types:content()) -> {rabbit_framing:amqp_property_record(), binary()}). @@ -66,18 +69,18 @@ publish(Delivery = #delivery{ message = #basic_message{exchange_name = ExchangeName}}) -> case rabbit_exchange:lookup(ExchangeName) of - {ok, X} -> - {RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery), - {ok, RoutingRes, DeliveredQPids}; - Other -> - Other + {ok, X} -> publish(X, Delivery); + Other -> Other end. delivery(Mandatory, Immediate, Txn, Message, MsgSeqNo) -> #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn, sender = self(), message = Message, msg_seq_no = MsgSeqNo}. -build_content(Properties, BodyBin) -> +build_content(Properties, BodyBin) when is_binary(BodyBin) -> + build_content(Properties, [BodyBin]); + +build_content(Properties, PFR) -> %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1 {ClassId, _MethodId} = rabbit_framing_amqp_0_9_1:method_id('basic.publish'), @@ -85,7 +88,7 @@ build_content(Properties, BodyBin) -> properties = Properties, properties_bin = none, protocol = none, - payload_fragments_rev = [BodyBin]}. + payload_fragments_rev = PFR}. from_content(Content) -> #content{class_id = ClassId, @@ -126,9 +129,9 @@ message(ExchangeName, RoutingKey, {error, _Reason} = Error -> Error end. -message(ExchangeName, RoutingKey, RawProperties, BodyBin) -> +message(ExchangeName, RoutingKey, RawProperties, Body) -> Properties = properties(RawProperties), - Content = build_content(Properties, BodyBin), + Content = build_content(Properties, Body), {ok, Msg} = message(ExchangeName, RoutingKey, Content), Msg. @@ -153,18 +156,26 @@ indexof([_ | Rest], Element, N) -> indexof(Rest, Element, N + 1). %% Convenience function, for avoiding round-trips in calls across the %% erlang distributed network. -publish(ExchangeName, RoutingKeyBin, Properties, BodyBin) -> - publish(ExchangeName, RoutingKeyBin, false, false, none, Properties, - BodyBin). +publish(Exchange, RoutingKeyBin, Properties, Body) -> + publish(Exchange, RoutingKeyBin, false, false, none, Properties, + Body). %% Convenience function, for avoiding round-trips in calls across the %% erlang distributed network. -publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties, - BodyBin) -> - publish(delivery(Mandatory, Immediate, Txn, - message(ExchangeName, RoutingKeyBin, - properties(Properties), BodyBin), - undefined)). +publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Txn, + Props, Body) -> + publish(X, delivery(Mandatory, Immediate, Txn, + message(XName, RKey, properties(Props), Body), + undefined)); +publish(XName, RKey, Mandatory, Immediate, Txn, Props, Body) -> + case rabbit_exchange:lookup(XName) of + {ok, X} -> publish(X, RKey, Mandatory, Immediate, Txn, Props, Body); + Err -> Err + end. + +publish(X, Delivery) -> + {RoutingRes, DeliveredQPids} = rabbit_exchange:publish(X, Delivery), + {ok, RoutingRes, DeliveredQPids}. is_message_persistent(#content{properties = #'P_basic'{ delivery_mode = Mode}}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f078886224..991b0b065c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -23,7 +23,7 @@ -export([start_link/10, do/2, do/3, flush/1, shutdown/1]). -export([send_command/2, deliver/4, flushed/2, confirm/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). --export([emit_stats/1, ready_for_close/1]). +-export([refresh_config_all/0, emit_stats/1, ready_for_close/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, @@ -35,7 +35,7 @@ user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, consumer_monitors, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, - unconfirmed_qm, confirmed, capabilities}). + unconfirmed_qm, confirmed, capabilities, trace_state}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -89,6 +89,7 @@ -spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). -spec(info_all/0 :: () -> [rabbit_types:infos()]). -spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]). +-spec(refresh_config_all/0 :: () -> 'ok'). -spec(emit_stats/1 :: (pid()) -> 'ok'). -spec(ready_for_close/1 :: (pid()) -> 'ok'). @@ -146,6 +147,11 @@ info_all() -> info_all(Items) -> rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()). +refresh_config_all() -> + rabbit_misc:upmap( + fun (C) -> gen_server2:call(C, refresh_config) end, list()), + ok. + emit_stats(Pid) -> gen_server2:cast(Pid, emit_stats). @@ -185,7 +191,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, unconfirmed_mq = gb_trees:empty(), unconfirmed_qm = gb_trees:empty(), confirmed = [], - capabilities = Capabilities}, + capabilities = Capabilities, + trace_state = rabbit_trace:init(VHost)}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, fun() -> internal_emit_stats(State) end), @@ -218,6 +225,9 @@ handle_call({info, Items}, _From, State) -> catch Error -> reply({error, Error}, State) end; +handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) -> + reply(ok, State#ch{trace_state = rabbit_trace:init(VHost)}); + handle_call(_Request, _From, State) -> noreply(State). @@ -263,8 +273,9 @@ handle_cast({deliver, ConsumerTag, AckRequired, #basic_message{exchange_name = ExchangeName, routing_keys = [RoutingKey | _CcRoutes], content = Content}}}, - State = #ch{writer_pid = WriterPid, - next_tag = DeliveryTag}) -> + State = #ch{writer_pid = WriterPid, + next_tag = DeliveryTag, + trace_state = TraceState}) -> State1 = lock_message(AckRequired, ack_record(DeliveryTag, ConsumerTag, Msg), State), @@ -281,7 +292,7 @@ handle_cast({deliver, ConsumerTag, AckRequired, true -> deliver; false -> deliver_no_ack end, State), - rabbit_trace:tap_trace_out(Msg), + rabbit_trace:tap_trace_out(Msg, TraceState), noreply(State1#ch{next_tag = DeliveryTag + 1}); handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> @@ -591,7 +602,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, immediate = Immediate}, Content, State = #ch{virtual_host = VHostPath, transaction_id = TxnKey, - confirm_enabled = ConfirmEnabled}) -> + confirm_enabled = ConfirmEnabled, + trace_state = TraceState}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), @@ -608,7 +620,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, end, case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of {ok, Message} -> - rabbit_trace:tap_trace_in(Message), + rabbit_trace:tap_trace_in(Message, TraceState), {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, @@ -656,9 +668,10 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, - _, State = #ch{writer_pid = WriterPid, - conn_pid = ConnPid, - next_tag = DeliveryTag}) -> + _, State = #ch{writer_pid = WriterPid, + conn_pid = ConnPid, + next_tag = DeliveryTag, + trace_state = TraceState}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), case rabbit_amqqueue:with_exclusive_access_or_die( @@ -677,7 +690,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, true -> get_no_ack; false -> get end, State), - rabbit_trace:tap_trace_out(Msg), + rabbit_trace:tap_trace_out(Msg, TraceState), ok = rabbit_writer:send_command( WriterPid, #'basic.get_ok'{delivery_tag = DeliveryTag, diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index b4b6255ebd..117496f561 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -221,9 +221,10 @@ action(delete_vhost, Node, Args = [_VHostPath], _Opts, Inform) -> Inform("Deleting vhost ~p", Args), call(Node, {rabbit_vhost, delete, Args}); -action(list_vhosts, Node, [], _Opts, Inform) -> +action(list_vhosts, Node, Args, _Opts, Inform) -> Inform("Listing vhosts", []), - display_list(call(Node, {rabbit_vhost, list, []})); + ArgAtoms = default_if_empty(Args, [name]), + display_info_list(call(Node, {rabbit_vhost, info_all, []}), ArgAtoms); action(list_user_permissions, Node, Args = [_Username], _Opts, Inform) -> Inform("Listing permissions for user ~p", Args), @@ -294,18 +295,15 @@ action(list_consumers, Node, _Args, Opts, Inform) -> Other -> Other end; -action(set_env, Node, [Var, Term], _Opts, Inform) -> - Inform("Setting control variable ~s for node ~p to ~s", [Var, Node, Term]), - rpc_call(Node, application, set_env, [rabbit, parse(Var), parse(Term)]); - -action(get_env, Node, [Var], _Opts, Inform) -> - Inform("Getting control variable ~s for node ~p", [Var, Node]), - Val = rpc_call(Node, application, get_env, [rabbit, parse(Var)]), - io:format("~p~n", [Val]); +action(trace_on, Node, [], Opts, Inform) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), + Inform("Starting tracing for vhost ~p", [VHost]), + rpc_call(Node, rabbit_trace, start, [list_to_binary(VHost)]); -action(unset_env, Node, [Var], _Opts, Inform) -> - Inform("Clearing control variable ~s for node ~p", [Var, Node]), - rpc_call(Node, application, unset_env, [rabbit, parse(Var)]); +action(trace_off, Node, [], Opts, Inform) -> + VHost = proplists:get_value(?VHOST_OPT, Opts), + Inform("Stopping tracing for vhost ~p", [VHost]), + rpc_call(Node, rabbit_trace, stop, [list_to_binary(VHost)]); action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), @@ -350,11 +348,6 @@ default_if_empty(List, Default) when is_list(List) -> true -> [list_to_atom(X) || X <- List] end. -parse(Str) -> - {ok, Tokens, _} = erl_scan:string(Str ++ "."), - {ok, Term} = erl_parse:parse_term(Tokens), - Term. - display_info_list(Results, InfoItemKeys) when is_list(Results) -> lists:foreach( fun (Result) -> display_row( diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl index 2512a60280..92829e4918 100644 --- a/src/rabbit_prelaunch.erl +++ b/src/rabbit_prelaunch.erl @@ -69,9 +69,13 @@ start() -> %% Write it out to $RABBITMQ_PLUGINS_EXPAND_DIR/rabbit.rel rabbit_misc:write_file(RootName ++ ".rel", io_lib:format("~p.~n", [RDesc])), + %% We exclude mochiweb due to its optional use of fdsrv. + XRefExclude = [mochiweb], + %% Compile the script ScriptFile = RootName ++ ".script", - case systools:make_script(RootName, [local, silent, exref]) of + case systools:make_script(RootName, [local, silent, + {exref, AllApps -- XRefExclude}]) of {ok, Module, Warnings} -> %% This gets lots of spurious no-source warnings when we %% have .ez files, so we want to supress them to prevent @@ -93,7 +97,8 @@ start() -> end]), case length(WarningStr) of 0 -> ok; - _ -> io:format("~s", [WarningStr]) + _ -> S = string:copies("*", 80), + io:format("~n~s~n~s~s~n~n", [S, WarningStr, S]) end, ok; {error, Module, Error} -> diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl index 2d15e7fc42..7d36856a9d 100644 --- a/src/rabbit_trace.erl +++ b/src/rabbit_trace.erl @@ -16,85 +16,104 @@ -module(rabbit_trace). --export([tap_trace_in/1, tap_trace_out/1]). +-export([init/1, tracing/1, tap_trace_in/2, tap_trace_out/2, start/1, stop/1]). -include("rabbit.hrl"). -include("rabbit_framing.hrl"). +-define(TRACE_VHOSTS, trace_vhosts). +-define(XNAME, <<"amq.rabbitmq.trace">>). + %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(tap_trace_in/1 :: (rabbit_types:basic_message()) -> 'ok'). --spec(tap_trace_out/1 :: (rabbit_amqqueue:qmsg()) -> 'ok'). +-type(state() :: rabbit_types:exchange() | 'none'). + +-spec(init/1 :: (rabbit_types:vhost()) -> state()). +-spec(tracing/1 :: (rabbit_types:vhost()) -> boolean()). +-spec(tap_trace_in/2 :: (rabbit_types:basic_message(), state()) -> 'ok'). +-spec(tap_trace_out/2 :: (rabbit_amqqueue:qmsg(), state()) -> 'ok'). + +-spec(start/1 :: (rabbit_types:vhost()) -> 'ok'). +-spec(stop/1 :: (rabbit_types:vhost()) -> 'ok'). -endif. %%---------------------------------------------------------------------------- -tap_trace_in(Msg) -> - maybe_trace(Msg, <<"publish">>, xname(Msg), []). +init(VHost) -> + case tracing(VHost) of + false -> none; + true -> {ok, X} = rabbit_exchange:lookup( + rabbit_misc:r(VHost, exchange, ?XNAME)), + X + end. + +tracing(VHost) -> + {ok, VHosts} = application:get_env(rabbit, ?TRACE_VHOSTS), + lists:member(VHost, VHosts). + +tap_trace_in(Msg = #basic_message{exchange_name = #resource{name = XName}}, + TraceX) -> + maybe_trace(TraceX, Msg, <<"publish">>, XName, []). -tap_trace_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg}) -> +tap_trace_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg}, + TraceX) -> RedeliveredNum = case Redelivered of true -> 1; false -> 0 end, - maybe_trace(Msg, <<"deliver">>, QName, + maybe_trace(TraceX, Msg, <<"deliver">>, QName, [{<<"redelivered">>, signedint, RedeliveredNum}]). -xname(#basic_message{exchange_name = #resource{name = XName}}) -> XName. -vhost(#basic_message{exchange_name = #resource{virtual_host = VHost}}) -> VHost. - -maybe_trace(Msg, RKPrefix, RKSuffix, Extra) -> - XName = xname(Msg), - case trace_exchange(vhost(Msg)) of - none -> ok; - XName -> ok; - TraceX -> case catch trace(TraceX, Msg, RKPrefix, RKSuffix, Extra) of - {'EXIT', R} -> rabbit_log:info("Trace died: ~p~n", [R]); - ok -> ok - end - end. +%%---------------------------------------------------------------------------- -trace_exchange(VHost) -> - case application:get_env(rabbit, trace_exchanges) of - undefined -> none; - {ok, Xs} -> proplists:get_value(VHost, Xs, none) - end. +start(VHost) -> + update_config(fun (VHosts) -> [VHost | VHosts -- [VHost]] end). + +stop(VHost) -> + update_config(fun (VHosts) -> VHosts -- [VHost] end). + +update_config(Fun) -> + {ok, VHosts0} = application:get_env(rabbit, ?TRACE_VHOSTS), + VHosts = Fun(VHosts0), + application:set_env(rabbit, ?TRACE_VHOSTS, VHosts), + rabbit_channel:refresh_config_all(), + ok. -trace(TraceX, Msg0, RKPrefix, RKSuffix, Extra) -> - Msg = ensure_content_decoded(Msg0), - rabbit_basic:publish(rabbit_misc:r(vhost(Msg), exchange, TraceX), - <<RKPrefix/binary, ".", RKSuffix/binary>>, - #'P_basic'{headers = msg_to_table(Msg) ++ Extra}, - payload(Msg)), +%%---------------------------------------------------------------------------- + +maybe_trace(none, _Msg, _RKPrefix, _RKSuffix, _Extra) -> + ok; +maybe_trace(#exchange{name = Name}, #basic_message{exchange_name = Name}, + _RKPrefix, _RKSuffix, _Extra) -> + ok; +maybe_trace(X, Msg = #basic_message{content = #content{ + payload_fragments_rev = PFR}}, + RKPrefix, RKSuffix, Extra) -> + {ok, _, _} = rabbit_basic:publish( + X, <<RKPrefix/binary, ".", RKSuffix/binary>>, + #'P_basic'{headers = msg_to_table(Msg) ++ Extra}, PFR), ok. msg_to_table(#basic_message{exchange_name = #resource{name = XName}, routing_keys = RoutingKeys, - content = #content{properties = Props}}) -> + content = Content}) -> + #content{properties = Props} = + rabbit_binary_parser:ensure_content_decoded(Content), {PropsTable, _Ix} = - lists:foldl( - fun (K, {L, Ix}) -> - V = element(Ix, Props), - NewL = case V of - undefined -> L; - _ -> [{a2b(K), type(V), V} | L] - end, - {NewL, Ix + 1} - end, {[], 2}, record_info(fields, 'P_basic')), + lists:foldl(fun (K, {L, Ix}) -> + V = element(Ix, Props), + NewL = case V of + undefined -> L; + _ -> [{a2b(K), type(V), V} | L] + end, + {NewL, Ix + 1} + end, {[], 2}, record_info(fields, 'P_basic')), [{<<"exchange_name">>, longstr, XName}, {<<"routing_keys">>, array, [{longstr, K} || K <- RoutingKeys]}, {<<"properties">>, table, PropsTable}, {<<"node">>, longstr, a2b(node())}]. -payload(#basic_message{content = #content{payload_fragments_rev = PFR}}) -> - list_to_binary(lists:reverse(PFR)). - -ensure_content_decoded(Msg = #basic_message{content = Content}) -> - Msg#basic_message{content = rabbit_binary_parser:ensure_content_decoded( - Content)}. - -a2b(A) -> - list_to_binary(atom_to_list(A)). +a2b(A) -> list_to_binary(atom_to_list(A)). type(V) when is_list(V) -> table; type(V) when is_integer(V) -> signedint; diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 325156b1f3..a6f02a0e0a 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -28,6 +28,7 @@ -rabbit_upgrade({topic_trie, mnesia, []}). -rabbit_upgrade({semi_durable_route, mnesia, []}). -rabbit_upgrade({exchange_event_serial, mnesia, []}). +-rabbit_upgrade({trace_exchanges, mnesia, []}). -rabbit_upgrade({mirror_pids, mnesia, []}). %% ------------------------------------------------------------------- @@ -42,6 +43,7 @@ -spec(topic_trie/0 :: () -> 'ok'). -spec(exchange_event_serial/0 :: () -> 'ok'). -spec(semi_durable_route/0 :: () -> 'ok'). +-spec(trace_exchanges/0 :: () -> 'ok'). -spec(mirror_pids/0 :: () -> 'ok'). -endif. @@ -115,6 +117,12 @@ exchange_event_serial() -> create(rabbit_exchange_serial, [{record_name, exchange_serial}, {attributes, [name, next]}]). +trace_exchanges() -> + [declare_exchange( + rabbit_misc:r(VHost, exchange, <<"amq.rabbitmq.trace">>), topic) || + VHost <- rabbit_vhost:list()], + ok. + mirror_pids() -> Tables = [rabbit_queue, rabbit_durable_queue], AddMirrorPidsFun = @@ -144,3 +152,16 @@ transform(TableName, Fun, FieldList, NewRecordName) -> create(Tab, TabDef) -> {atomic, ok} = mnesia:create_table(Tab, TabDef), ok. + +%% Dumb replacement for rabbit_exchange:declare that does not require +%% the exchange type registry or worker pool to be running by dint of +%% not validating anything and assuming the exchange type does not +%% require serialisation. +declare_exchange(XName, Type) -> + X = #exchange{name = XName, + type = Type, + durable = true, + auto_delete = false, + internal = false, + arguments = []}, + ok = mnesia:dirty_write(rabbit_durable_exchange, X). diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 24c130edd6..5270d80bd5 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -21,6 +21,7 @@ %%---------------------------------------------------------------------------- -export([add/1, delete/1, exists/1, list/0, with/2]). +-export([info/1, info/2, info_all/0, info_all/1]). -ifdef(use_specs). @@ -30,10 +31,18 @@ -spec(list/0 :: () -> [rabbit_types:vhost()]). -spec(with/2 :: (rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A). +-spec(info/1 :: (rabbit_types:vhost()) -> rabbit_types:infos()). +-spec(info/2 :: (rabbit_types:vhost(), rabbit_types:info_keys()) + -> rabbit_types:infos()). +-spec(info_all/0 :: () -> [rabbit_types:infos()]). +-spec(info_all/1 :: (rabbit_types:info_keys()) -> [rabbit_types:infos()]). + -endif. %%---------------------------------------------------------------------------- +-define(INFO_KEYS, [name, tracing]). + add(VHostPath) -> R = rabbit_misc:execute_mnesia_transaction( fun () -> @@ -51,12 +60,13 @@ add(VHostPath) -> rabbit_misc:r(VHostPath, exchange, Name), Type, true, false, false, []) || {Name,Type} <- - [{<<"">>, direct}, - {<<"amq.direct">>, direct}, - {<<"amq.topic">>, topic}, - {<<"amq.match">>, headers}, %% per 0-9-1 pdf - {<<"amq.headers">>, headers}, %% per 0-9-1 xml - {<<"amq.fanout">>, fanout}]], + [{<<"">>, direct}, + {<<"amq.direct">>, direct}, + {<<"amq.topic">>, topic}, + {<<"amq.match">>, headers}, %% per 0-9-1 pdf + {<<"amq.headers">>, headers}, %% per 0-9-1 xml + {<<"amq.fanout">>, fanout}, + {<<"amq.rabbitmq.trace">>, topic}]], ok end), rabbit_log:info("Added vhost ~p~n", [VHostPath]), @@ -104,3 +114,17 @@ with(VHostPath, Thunk) -> Thunk() end end. + +%%---------------------------------------------------------------------------- + +infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items]. + +i(name, VHost) -> VHost; +i(tracing, VHost) -> rabbit_trace:tracing(VHost); +i(Item, _) -> throw({bad_argument, Item}). + +info(VHost) -> infos(?INFO_KEYS, VHost). +info(VHost, Items) -> infos(Items, VHost). + +info_all() -> info_all(?INFO_KEYS). +info_all(Items) -> [info(VHost, Items) || VHost <- list()]. |
