diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2011-04-07 17:11:07 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2011-04-07 17:11:07 +0100 |
| commit | 16c2d80ff006839075cf4ebe64b4d4677255a191 (patch) | |
| tree | 5da1c7154810d9e567f65edc7cbb4e6d319a0d8d | |
| parent | 17da180f15f55c5ef2f35ad117cfc584bc7188da (diff) | |
| download | rabbitmq-server-git-16c2d80ff006839075cf4ebe64b4d4677255a191.tar.gz | |
Tracing has very little to do with rabbit_log, let's make it its own module.
| -rw-r--r-- | src/rabbit_channel.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_log.erl | 99 | ||||
| -rw-r--r-- | src/rabbit_trace.erl | 126 |
3 files changed, 129 insertions, 102 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e3dc47dc94..6ec2a09f87 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -280,7 +280,7 @@ handle_cast({deliver, ConsumerTag, AckRequired, true -> deliver; false -> deliver_no_ack end, State), - rabbit_log:tap_trace_out(Msg, DeliveryTag, ConsumerTag), + rabbit_trace:tap_trace_out(Msg, DeliveryTag, ConsumerTag), noreply(State1#ch{next_tag = DeliveryTag + 1}); handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> @@ -604,7 +604,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, end, case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of {ok, Message} -> - rabbit_log:tap_trace_in(Message), + rabbit_trace:tap_trace_in(Message), {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, @@ -673,7 +673,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, true -> get_no_ack; false -> get end, State), - rabbit_log:tap_trace_out(Msg, DeliveryTag, none), + rabbit_trace:tap_trace_out(Msg, DeliveryTag, none), ok = rabbit_writer:send_command( WriterPid, #'basic.get_ok'{delivery_tag = DeliveryTag, diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index 8cd980ea53..8207d6bc65 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -26,11 +26,6 @@ -export([debug/1, debug/2, message/4, info/1, info/2, warning/1, warning/2, error/1, error/2]). --export([tap_trace_in/1, tap_trace_out/3]). - --include("rabbit.hrl"). --include("rabbit_framing.hrl"). - -define(SERVER, ?MODULE). %%---------------------------------------------------------------------------- @@ -82,100 +77,6 @@ error(Fmt) -> error(Fmt, Args) when is_list(Args) -> gen_server:cast(?SERVER, {error, Fmt, Args}). -tap_trace_in(Message = #basic_message{ - exchange_name = #resource{virtual_host = VHostBin, - name = XNameBin}}) -> - check_trace( - XNameBin, - VHostBin, - fun (TraceExchangeBin) -> - {EncodedMetadata, Payload} = message_to_table(Message), - publish(TraceExchangeBin, VHostBin, <<"publish">>, XNameBin, - EncodedMetadata, Payload) - end). - -tap_trace_out({#resource{name = QNameBin}, _QPid, _QMsgId, Redelivered, - Message = #basic_message{ - exchange_name = #resource{virtual_host = VHostBin, - name = XNameBin}}}, - DeliveryTag, - ConsumerTagOrNone) -> - check_trace( - XNameBin, - VHostBin, - fun (TraceExchangeBin) -> - RedeliveredNum = case Redelivered of true -> 1; false -> 0 end, - {EncodedMetadata, Payload} = message_to_table(Message), - Fields0 = [{<<"delivery_tag">>, signedint, DeliveryTag}, - {<<"redelivered">>, signedint, RedeliveredNum}] - ++ EncodedMetadata, - Fields = case ConsumerTagOrNone of - none -> Fields0; - CTag -> [{<<"consumer_tag">>, longstr, CTag} | - Fields0] - end, - publish(TraceExchangeBin, VHostBin, <<"deliver">>, QNameBin, - Fields, Payload) - end). - -check_trace(XNameBin, VHostBin, F) -> - case catch case application:get_env(rabbit, {trace_exchange, VHostBin}) of - undefined -> ok; - {ok, XNameBin} -> ok; - {ok, TraceExchangeBin} -> F(TraceExchangeBin) - end of - {'EXIT', Reason} -> info("Trace tap died with reason ~p~n", [Reason]); - ok -> ok - end. - -publish(TraceExchangeBin, VHostBin, RKPrefix, RKSuffix, Table, Payload) -> - rabbit_basic:publish(rabbit_misc:r(VHostBin, exchange, TraceExchangeBin), - <<RKPrefix/binary, ".", RKSuffix/binary>>, - #'P_basic'{headers = Table}, Payload), - ok. - -message_to_table(#basic_message{exchange_name = #resource{name = XName}, - routing_keys = RoutingKeys, - content = Content}) -> - #content{properties = #'P_basic'{content_type = ContentType, - content_encoding = ContentEncoding, - headers = Headers, - delivery_mode = DeliveryMode, - priority = Priority, - correlation_id = CorrelationId, - reply_to = ReplyTo, - expiration = Expiration, - message_id = MessageId, - timestamp = Timestamp, - type = Type, - user_id = UserId, - app_id = AppId}, - payload_fragments_rev = PFR} = - rabbit_binary_parser:ensure_content_decoded(Content), - Headers1 = prune_undefined( - [{<<"content_type">>, longstr, ContentType}, - {<<"content_encoding">>, longstr, ContentEncoding}, - {<<"headers">>, table, Headers}, - {<<"delivery_mode">>, signedint, DeliveryMode}, - {<<"priority">>, signedint, Priority}, - {<<"correlation_id">>, longstr, CorrelationId}, - {<<"reply_to">>, longstr, ReplyTo}, - {<<"expiration">>, longstr, Expiration}, - {<<"message_id">>, longstr, MessageId}, - {<<"timestamp">>, longstr, Timestamp}, - {<<"type">>, longstr, Type}, - {<<"user_id">>, longstr, UserId}, - {<<"app_id">>, longstr, AppId}]), - {[{<<"exchange_name">>, longstr, XName}, - {<<"routing_key">>, array, [{longstr, K} || K <- RoutingKeys]}, - {<<"headers">>, table, Headers1}, - {<<"node">>, longstr, list_to_binary(atom_to_list(node()))}], - list_to_binary(lists:reverse(PFR))}. - -prune_undefined(Fields) -> - [F || F = {_, _, Value} <- Fields, - Value =/= undefined]. - %%-------------------------------------------------------------------- init([]) -> {ok, none}. diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl new file mode 100644 index 0000000000..8f531808d1 --- /dev/null +++ b/src/rabbit_trace.erl @@ -0,0 +1,126 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% + +-module(rabbit_trace). + +-export([tap_trace_in/1, tap_trace_out/3]). + +-include("rabbit.hrl"). +-include("rabbit_framing.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +%% TODO + +-endif. + +%%---------------------------------------------------------------------------- + +tap_trace_in(Message = #basic_message{ + exchange_name = #resource{virtual_host = VHostBin, + name = XNameBin}}) -> + check_trace( + XNameBin, + VHostBin, + fun (TraceExchangeBin) -> + {EncodedMetadata, Payload} = message_to_table(Message), + publish(TraceExchangeBin, VHostBin, <<"publish">>, XNameBin, + EncodedMetadata, Payload) + end). + +tap_trace_out({#resource{name = QNameBin}, _QPid, _QMsgId, Redelivered, + Message = #basic_message{ + exchange_name = #resource{virtual_host = VHostBin, + name = XNameBin}}}, + DeliveryTag, + ConsumerTagOrNone) -> + check_trace( + XNameBin, + VHostBin, + fun (TraceExchangeBin) -> + RedeliveredNum = case Redelivered of true -> 1; false -> 0 end, + {EncodedMetadata, Payload} = message_to_table(Message), + Fields0 = [{<<"delivery_tag">>, signedint, DeliveryTag}, + {<<"redelivered">>, signedint, RedeliveredNum}] + ++ EncodedMetadata, + Fields = case ConsumerTagOrNone of + none -> Fields0; + CTag -> [{<<"consumer_tag">>, longstr, CTag} | + Fields0] + end, + publish(TraceExchangeBin, VHostBin, <<"deliver">>, QNameBin, + Fields, Payload) + end). + +check_trace(XNameBin, VHostBin, F) -> + case catch case application:get_env(rabbit, {trace_exchange, VHostBin}) of + undefined -> ok; + {ok, XNameBin} -> ok; + {ok, TraceExchangeBin} -> F(TraceExchangeBin) + end of + {'EXIT', Reason} -> rabbit_log:info("Trace tap died: ~p~n", [Reason]); + ok -> ok + end. + +publish(TraceExchangeBin, VHostBin, RKPrefix, RKSuffix, Table, Payload) -> + rabbit_basic:publish(rabbit_misc:r(VHostBin, exchange, TraceExchangeBin), + <<RKPrefix/binary, ".", RKSuffix/binary>>, + #'P_basic'{headers = Table}, Payload), + ok. + +message_to_table(#basic_message{exchange_name = #resource{name = XName}, + routing_keys = RoutingKeys, + content = Content}) -> + #content{properties = #'P_basic'{content_type = ContentType, + content_encoding = ContentEncoding, + headers = Headers, + delivery_mode = DeliveryMode, + priority = Priority, + correlation_id = CorrelationId, + reply_to = ReplyTo, + expiration = Expiration, + message_id = MessageId, + timestamp = Timestamp, + type = Type, + user_id = UserId, + app_id = AppId}, + payload_fragments_rev = PFR} = + rabbit_binary_parser:ensure_content_decoded(Content), + Headers1 = prune_undefined( + [{<<"content_type">>, longstr, ContentType}, + {<<"content_encoding">>, longstr, ContentEncoding}, + {<<"headers">>, table, Headers}, + {<<"delivery_mode">>, signedint, DeliveryMode}, + {<<"priority">>, signedint, Priority}, + {<<"correlation_id">>, longstr, CorrelationId}, + {<<"reply_to">>, longstr, ReplyTo}, + {<<"expiration">>, longstr, Expiration}, + {<<"message_id">>, longstr, MessageId}, + {<<"timestamp">>, longstr, Timestamp}, + {<<"type">>, longstr, Type}, + {<<"user_id">>, longstr, UserId}, + {<<"app_id">>, longstr, AppId}]), + {[{<<"exchange_name">>, longstr, XName}, + {<<"routing_key">>, array, [{longstr, K} || K <- RoutingKeys]}, + {<<"headers">>, table, Headers1}, + {<<"node">>, longstr, list_to_binary(atom_to_list(node()))}], + list_to_binary(lists:reverse(PFR))}. + +prune_undefined(Fields) -> + [F || F = {_, _, Value} <- Fields, + Value =/= undefined]. |
