diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_control.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_trace.erl | 126 |
3 files changed, 147 insertions, 0 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 0c12614cc6..bfd779eef7 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -281,6 +281,7 @@ handle_cast({deliver, ConsumerTag, AckRequired, true -> deliver; false -> deliver_no_ack end, State), + rabbit_trace:tap_trace_out(Msg, ConsumerTag), noreply(State1#ch{next_tag = DeliveryTag + 1}); handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> @@ -604,6 +605,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, end, case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of {ok, Message} -> + rabbit_trace:tap_trace_in(Message), {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, @@ -672,6 +674,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, true -> get_no_ack; false -> get end, State), + rabbit_trace:tap_trace_out(Msg, none), ok = rabbit_writer:send_command( WriterPid, #'basic.get_ok'{delivery_tag = DeliveryTag, diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 1af91f4c3a..6ab0711117 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -282,6 +282,19 @@ action(list_consumers, Node, _Args, Opts, Inform) -> Other -> Other end; +action(set_env, Node, [Var, Term], _Opts, Inform) -> + Inform("Setting control variable ~s for node ~p to ~s", [Var, Node, Term]), + rpc_call(Node, application, set_env, [rabbit, parse(Var), parse(Term)]); + +action(get_env, Node, [Var], _Opts, Inform) -> + Inform("Getting control variable ~s for node ~p", [Var, Node]), + Val = rpc_call(Node, application, get_env, [rabbit, parse(Var)]), + io:format("~p~n", [Val]); + +action(unset_env, Node, [Var], _Opts, Inform) -> + Inform("Clearing control variable ~s for node ~p", [Var, Node]), + rpc_call(Node, application, unset_env, [rabbit, parse(Var)]); + action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]), @@ -325,6 +338,11 @@ default_if_empty(List, Default) when is_list(List) -> true -> [list_to_atom(X) || X <- List] end. +parse(Str) -> + {ok, Tokens, _} = erl_scan:string(Str ++ "."), + {ok, Term} = erl_parse:parse_term(Tokens), + Term. + display_info_list(Results, InfoItemKeys) when is_list(Results) -> lists:foreach( fun (Result) -> display_row( diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl new file mode 100644 index 0000000000..6163d14a44 --- /dev/null +++ b/src/rabbit_trace.erl @@ -0,0 +1,126 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% + +-module(rabbit_trace). + +-export([tap_trace_in/1, tap_trace_out/2]). + +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(tap_trace_in/1 :: (rabbit_types:basic_message()) -> 'ok'). +-spec(tap_trace_out/2 :: (rabbit_amqqueue:qmsg(), + rabbit_types:maybe(rabbit_types:ctag())) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +tap_trace_in(Message = #basic_message{ + exchange_name = #resource{virtual_host = VHostBin, + name = XNameBin}}) -> + check_trace( + XNameBin, + VHostBin, + fun (TraceExchangeBin) -> + {EncodedMetadata, Payload} = message_to_table(Message), + publish(TraceExchangeBin, VHostBin, <<"publish">>, XNameBin, + EncodedMetadata, Payload) + end). + +tap_trace_out({#resource{name = QNameBin}, _QPid, _QMsgId, Redelivered, + Message = #basic_message{ + exchange_name = #resource{virtual_host = VHostBin, + name = XNameBin}}}, + ConsumerTagOrNone) -> + check_trace( + XNameBin, + VHostBin, + fun (TraceExchangeBin) -> + RedeliveredNum = case Redelivered of true -> 1; false -> 0 end, + {EncodedMetadata, Payload} = message_to_table(Message), + Fields0 = [{<<"redelivered">>, signedint, RedeliveredNum}] + ++ EncodedMetadata, + Fields = case ConsumerTagOrNone of + none -> Fields0; + CTag -> [{<<"consumer_tag">>, longstr, CTag} | + Fields0] + end, + publish(TraceExchangeBin, VHostBin, <<"deliver">>, QNameBin, + Fields, Payload) + end). + +check_trace(XNameBin, VHostBin, F) -> + case catch case application:get_env(rabbit, {trace_exchange, VHostBin}) of + undefined -> ok; + {ok, XNameBin} -> ok; + {ok, TraceExchangeBin} -> F(TraceExchangeBin) + end of + {'EXIT', Reason} -> rabbit_log:info("Trace tap died: ~p~n", [Reason]); + ok -> ok + end. + +publish(TraceExchangeBin, VHostBin, RKPrefix, RKSuffix, Table, Payload) -> + rabbit_basic:publish(rabbit_misc:r(VHostBin, exchange, TraceExchangeBin), + <<RKPrefix/binary, ".", RKSuffix/binary>>, + #'P_basic'{headers = Table}, Payload), + ok. + +message_to_table(#basic_message{exchange_name = #resource{name = XName}, + routing_keys = RoutingKeys, + content = Content}) -> + #content{properties = #'P_basic'{content_type = ContentType, + content_encoding = ContentEncoding, + headers = Headers, + delivery_mode = DeliveryMode, + priority = Priority, + correlation_id = CorrelationId, + reply_to = ReplyTo, + expiration = Expiration, + message_id = MessageId, + timestamp = Timestamp, + type = Type, + user_id = UserId, + app_id = AppId}, + payload_fragments_rev = PFR} = + rabbit_binary_parser:ensure_content_decoded(Content), + Headers1 = prune_undefined( + [{<<"content_type">>, longstr, ContentType}, + {<<"content_encoding">>, longstr, ContentEncoding}, + {<<"headers">>, table, Headers}, + {<<"delivery_mode">>, signedint, DeliveryMode}, + {<<"priority">>, signedint, Priority}, + {<<"correlation_id">>, longstr, CorrelationId}, + {<<"reply_to">>, longstr, ReplyTo}, + {<<"expiration">>, longstr, Expiration}, + {<<"message_id">>, longstr, MessageId}, + {<<"timestamp">>, longstr, Timestamp}, + {<<"type">>, longstr, Type}, + {<<"user_id">>, longstr, UserId}, + {<<"app_id">>, longstr, AppId}]), + {[{<<"exchange_name">>, longstr, XName}, + {<<"routing_keys">>, array, [{longstr, K} || K <- RoutingKeys]}, + {<<"headers">>, table, Headers1}, + {<<"node">>, longstr, list_to_binary(atom_to_list(node()))}], + list_to_binary(lists:reverse(PFR))}. + +prune_undefined(Fields) -> + [F || F = {_, _, Value} <- Fields, + Value =/= undefined]. |
