summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-04-11 12:41:32 +0100
committerSimon MacMullen <simon@rabbitmq.com>2011-04-11 12:41:32 +0100
commitaf4d7d10a6fdf82c7948dfe79d70434c76abeaa7 (patch)
treecc74d0b3c8e4049dea4f064e7870ce5b41d8af56 /src
parentd156a1183511156148facc36862c113ebeff014f (diff)
parentb7c41bc6f16def1015998bd2475336adda0b77c2 (diff)
downloadrabbitmq-server-git-af4d7d10a6fdf82c7948dfe79d70434c76abeaa7.tar.gz
Merge in default
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl3
-rw-r--r--src/rabbit_control.erl18
-rw-r--r--src/rabbit_trace.erl126
3 files changed, 147 insertions, 0 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 0c12614cc6..bfd779eef7 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -281,6 +281,7 @@ handle_cast({deliver, ConsumerTag, AckRequired,
true -> deliver;
false -> deliver_no_ack
end, State),
+ rabbit_trace:tap_trace_out(Msg, ConsumerTag),
noreply(State1#ch{next_tag = DeliveryTag + 1});
handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
@@ -604,6 +605,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
end,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
+ rabbit_trace:tap_trace_in(Message),
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
Exchange,
@@ -672,6 +674,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
true -> get_no_ack;
false -> get
end, State),
+ rabbit_trace:tap_trace_out(Msg, none),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.get_ok'{delivery_tag = DeliveryTag,
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 1af91f4c3a..6ab0711117 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -282,6 +282,19 @@ action(list_consumers, Node, _Args, Opts, Inform) ->
Other -> Other
end;
+action(set_env, Node, [Var, Term], _Opts, Inform) ->
+ Inform("Setting control variable ~s for node ~p to ~s", [Var, Node, Term]),
+ rpc_call(Node, application, set_env, [rabbit, parse(Var), parse(Term)]);
+
+action(get_env, Node, [Var], _Opts, Inform) ->
+ Inform("Getting control variable ~s for node ~p", [Var, Node]),
+ Val = rpc_call(Node, application, get_env, [rabbit, parse(Var)]),
+ io:format("~p~n", [Val]);
+
+action(unset_env, Node, [Var], _Opts, Inform) ->
+ Inform("Clearing control variable ~s for node ~p", [Var, Node]),
+ rpc_call(Node, application, unset_env, [rabbit, parse(Var)]);
+
action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]),
@@ -325,6 +338,11 @@ default_if_empty(List, Default) when is_list(List) ->
true -> [list_to_atom(X) || X <- List]
end.
+parse(Str) ->
+ {ok, Tokens, _} = erl_scan:string(Str ++ "."),
+ {ok, Term} = erl_parse:parse_term(Tokens),
+ Term.
+
display_info_list(Results, InfoItemKeys) when is_list(Results) ->
lists:foreach(
fun (Result) -> display_row(
diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl
new file mode 100644
index 0000000000..6163d14a44
--- /dev/null
+++ b/src/rabbit_trace.erl
@@ -0,0 +1,126 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_trace).
+
+-export([tap_trace_in/1, tap_trace_out/2]).
+
+-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(tap_trace_in/1 :: (rabbit_types:basic_message()) -> 'ok').
+-spec(tap_trace_out/2 :: (rabbit_amqqueue:qmsg(),
+ rabbit_types:maybe(rabbit_types:ctag())) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+tap_trace_in(Message = #basic_message{
+ exchange_name = #resource{virtual_host = VHostBin,
+ name = XNameBin}}) ->
+ check_trace(
+ XNameBin,
+ VHostBin,
+ fun (TraceExchangeBin) ->
+ {EncodedMetadata, Payload} = message_to_table(Message),
+ publish(TraceExchangeBin, VHostBin, <<"publish">>, XNameBin,
+ EncodedMetadata, Payload)
+ end).
+
+tap_trace_out({#resource{name = QNameBin}, _QPid, _QMsgId, Redelivered,
+ Message = #basic_message{
+ exchange_name = #resource{virtual_host = VHostBin,
+ name = XNameBin}}},
+ ConsumerTagOrNone) ->
+ check_trace(
+ XNameBin,
+ VHostBin,
+ fun (TraceExchangeBin) ->
+ RedeliveredNum = case Redelivered of true -> 1; false -> 0 end,
+ {EncodedMetadata, Payload} = message_to_table(Message),
+ Fields0 = [{<<"redelivered">>, signedint, RedeliveredNum}]
+ ++ EncodedMetadata,
+ Fields = case ConsumerTagOrNone of
+ none -> Fields0;
+ CTag -> [{<<"consumer_tag">>, longstr, CTag} |
+ Fields0]
+ end,
+ publish(TraceExchangeBin, VHostBin, <<"deliver">>, QNameBin,
+ Fields, Payload)
+ end).
+
+check_trace(XNameBin, VHostBin, F) ->
+ case catch case application:get_env(rabbit, {trace_exchange, VHostBin}) of
+ undefined -> ok;
+ {ok, XNameBin} -> ok;
+ {ok, TraceExchangeBin} -> F(TraceExchangeBin)
+ end of
+ {'EXIT', Reason} -> rabbit_log:info("Trace tap died: ~p~n", [Reason]);
+ ok -> ok
+ end.
+
+publish(TraceExchangeBin, VHostBin, RKPrefix, RKSuffix, Table, Payload) ->
+ rabbit_basic:publish(rabbit_misc:r(VHostBin, exchange, TraceExchangeBin),
+ <<RKPrefix/binary, ".", RKSuffix/binary>>,
+ #'P_basic'{headers = Table}, Payload),
+ ok.
+
+message_to_table(#basic_message{exchange_name = #resource{name = XName},
+ routing_keys = RoutingKeys,
+ content = Content}) ->
+ #content{properties = #'P_basic'{content_type = ContentType,
+ content_encoding = ContentEncoding,
+ headers = Headers,
+ delivery_mode = DeliveryMode,
+ priority = Priority,
+ correlation_id = CorrelationId,
+ reply_to = ReplyTo,
+ expiration = Expiration,
+ message_id = MessageId,
+ timestamp = Timestamp,
+ type = Type,
+ user_id = UserId,
+ app_id = AppId},
+ payload_fragments_rev = PFR} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ Headers1 = prune_undefined(
+ [{<<"content_type">>, longstr, ContentType},
+ {<<"content_encoding">>, longstr, ContentEncoding},
+ {<<"headers">>, table, Headers},
+ {<<"delivery_mode">>, signedint, DeliveryMode},
+ {<<"priority">>, signedint, Priority},
+ {<<"correlation_id">>, longstr, CorrelationId},
+ {<<"reply_to">>, longstr, ReplyTo},
+ {<<"expiration">>, longstr, Expiration},
+ {<<"message_id">>, longstr, MessageId},
+ {<<"timestamp">>, longstr, Timestamp},
+ {<<"type">>, longstr, Type},
+ {<<"user_id">>, longstr, UserId},
+ {<<"app_id">>, longstr, AppId}]),
+ {[{<<"exchange_name">>, longstr, XName},
+ {<<"routing_keys">>, array, [{longstr, K} || K <- RoutingKeys]},
+ {<<"headers">>, table, Headers1},
+ {<<"node">>, longstr, list_to_binary(atom_to_list(node()))}],
+ list_to_binary(lists:reverse(PFR))}.
+
+prune_undefined(Fields) ->
+ [F || F = {_, _, Value} <- Fields,
+ Value =/= undefined].