summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-04-07 17:11:07 +0100
committerSimon MacMullen <simon@rabbitmq.com>2011-04-07 17:11:07 +0100
commit16c2d80ff006839075cf4ebe64b4d4677255a191 (patch)
tree5da1c7154810d9e567f65edc7cbb4e6d319a0d8d
parent17da180f15f55c5ef2f35ad117cfc584bc7188da (diff)
downloadrabbitmq-server-git-16c2d80ff006839075cf4ebe64b4d4677255a191.tar.gz
Tracing has very little to do with rabbit_log, let's make it its own module.
-rw-r--r--src/rabbit_channel.erl6
-rw-r--r--src/rabbit_log.erl99
-rw-r--r--src/rabbit_trace.erl126
3 files changed, 129 insertions, 102 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e3dc47dc94..6ec2a09f87 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -280,7 +280,7 @@ handle_cast({deliver, ConsumerTag, AckRequired,
true -> deliver;
false -> deliver_no_ack
end, State),
- rabbit_log:tap_trace_out(Msg, DeliveryTag, ConsumerTag),
+ rabbit_trace:tap_trace_out(Msg, DeliveryTag, ConsumerTag),
noreply(State1#ch{next_tag = DeliveryTag + 1});
handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
@@ -604,7 +604,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
end,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
- rabbit_log:tap_trace_in(Message),
+ rabbit_trace:tap_trace_in(Message),
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
Exchange,
@@ -673,7 +673,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
true -> get_no_ack;
false -> get
end, State),
- rabbit_log:tap_trace_out(Msg, DeliveryTag, none),
+ rabbit_trace:tap_trace_out(Msg, DeliveryTag, none),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.get_ok'{delivery_tag = DeliveryTag,
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index 8cd980ea53..8207d6bc65 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -26,11 +26,6 @@
-export([debug/1, debug/2, message/4, info/1, info/2,
warning/1, warning/2, error/1, error/2]).
--export([tap_trace_in/1, tap_trace_out/3]).
-
--include("rabbit.hrl").
--include("rabbit_framing.hrl").
-
-define(SERVER, ?MODULE).
%%----------------------------------------------------------------------------
@@ -82,100 +77,6 @@ error(Fmt) ->
error(Fmt, Args) when is_list(Args) ->
gen_server:cast(?SERVER, {error, Fmt, Args}).
-tap_trace_in(Message = #basic_message{
- exchange_name = #resource{virtual_host = VHostBin,
- name = XNameBin}}) ->
- check_trace(
- XNameBin,
- VHostBin,
- fun (TraceExchangeBin) ->
- {EncodedMetadata, Payload} = message_to_table(Message),
- publish(TraceExchangeBin, VHostBin, <<"publish">>, XNameBin,
- EncodedMetadata, Payload)
- end).
-
-tap_trace_out({#resource{name = QNameBin}, _QPid, _QMsgId, Redelivered,
- Message = #basic_message{
- exchange_name = #resource{virtual_host = VHostBin,
- name = XNameBin}}},
- DeliveryTag,
- ConsumerTagOrNone) ->
- check_trace(
- XNameBin,
- VHostBin,
- fun (TraceExchangeBin) ->
- RedeliveredNum = case Redelivered of true -> 1; false -> 0 end,
- {EncodedMetadata, Payload} = message_to_table(Message),
- Fields0 = [{<<"delivery_tag">>, signedint, DeliveryTag},
- {<<"redelivered">>, signedint, RedeliveredNum}]
- ++ EncodedMetadata,
- Fields = case ConsumerTagOrNone of
- none -> Fields0;
- CTag -> [{<<"consumer_tag">>, longstr, CTag} |
- Fields0]
- end,
- publish(TraceExchangeBin, VHostBin, <<"deliver">>, QNameBin,
- Fields, Payload)
- end).
-
-check_trace(XNameBin, VHostBin, F) ->
- case catch case application:get_env(rabbit, {trace_exchange, VHostBin}) of
- undefined -> ok;
- {ok, XNameBin} -> ok;
- {ok, TraceExchangeBin} -> F(TraceExchangeBin)
- end of
- {'EXIT', Reason} -> info("Trace tap died with reason ~p~n", [Reason]);
- ok -> ok
- end.
-
-publish(TraceExchangeBin, VHostBin, RKPrefix, RKSuffix, Table, Payload) ->
- rabbit_basic:publish(rabbit_misc:r(VHostBin, exchange, TraceExchangeBin),
- <<RKPrefix/binary, ".", RKSuffix/binary>>,
- #'P_basic'{headers = Table}, Payload),
- ok.
-
-message_to_table(#basic_message{exchange_name = #resource{name = XName},
- routing_keys = RoutingKeys,
- content = Content}) ->
- #content{properties = #'P_basic'{content_type = ContentType,
- content_encoding = ContentEncoding,
- headers = Headers,
- delivery_mode = DeliveryMode,
- priority = Priority,
- correlation_id = CorrelationId,
- reply_to = ReplyTo,
- expiration = Expiration,
- message_id = MessageId,
- timestamp = Timestamp,
- type = Type,
- user_id = UserId,
- app_id = AppId},
- payload_fragments_rev = PFR} =
- rabbit_binary_parser:ensure_content_decoded(Content),
- Headers1 = prune_undefined(
- [{<<"content_type">>, longstr, ContentType},
- {<<"content_encoding">>, longstr, ContentEncoding},
- {<<"headers">>, table, Headers},
- {<<"delivery_mode">>, signedint, DeliveryMode},
- {<<"priority">>, signedint, Priority},
- {<<"correlation_id">>, longstr, CorrelationId},
- {<<"reply_to">>, longstr, ReplyTo},
- {<<"expiration">>, longstr, Expiration},
- {<<"message_id">>, longstr, MessageId},
- {<<"timestamp">>, longstr, Timestamp},
- {<<"type">>, longstr, Type},
- {<<"user_id">>, longstr, UserId},
- {<<"app_id">>, longstr, AppId}]),
- {[{<<"exchange_name">>, longstr, XName},
- {<<"routing_key">>, array, [{longstr, K} || K <- RoutingKeys]},
- {<<"headers">>, table, Headers1},
- {<<"node">>, longstr, list_to_binary(atom_to_list(node()))}],
- list_to_binary(lists:reverse(PFR))}.
-
-prune_undefined(Fields) ->
- [F || F = {_, _, Value} <- Fields,
- Value =/= undefined].
-
%%--------------------------------------------------------------------
init([]) -> {ok, none}.
diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl
new file mode 100644
index 0000000000..8f531808d1
--- /dev/null
+++ b/src/rabbit_trace.erl
@@ -0,0 +1,126 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_trace).
+
+-export([tap_trace_in/1, tap_trace_out/3]).
+
+-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+%% TODO
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+tap_trace_in(Message = #basic_message{
+ exchange_name = #resource{virtual_host = VHostBin,
+ name = XNameBin}}) ->
+ check_trace(
+ XNameBin,
+ VHostBin,
+ fun (TraceExchangeBin) ->
+ {EncodedMetadata, Payload} = message_to_table(Message),
+ publish(TraceExchangeBin, VHostBin, <<"publish">>, XNameBin,
+ EncodedMetadata, Payload)
+ end).
+
+tap_trace_out({#resource{name = QNameBin}, _QPid, _QMsgId, Redelivered,
+ Message = #basic_message{
+ exchange_name = #resource{virtual_host = VHostBin,
+ name = XNameBin}}},
+ DeliveryTag,
+ ConsumerTagOrNone) ->
+ check_trace(
+ XNameBin,
+ VHostBin,
+ fun (TraceExchangeBin) ->
+ RedeliveredNum = case Redelivered of true -> 1; false -> 0 end,
+ {EncodedMetadata, Payload} = message_to_table(Message),
+ Fields0 = [{<<"delivery_tag">>, signedint, DeliveryTag},
+ {<<"redelivered">>, signedint, RedeliveredNum}]
+ ++ EncodedMetadata,
+ Fields = case ConsumerTagOrNone of
+ none -> Fields0;
+ CTag -> [{<<"consumer_tag">>, longstr, CTag} |
+ Fields0]
+ end,
+ publish(TraceExchangeBin, VHostBin, <<"deliver">>, QNameBin,
+ Fields, Payload)
+ end).
+
+check_trace(XNameBin, VHostBin, F) ->
+ case catch case application:get_env(rabbit, {trace_exchange, VHostBin}) of
+ undefined -> ok;
+ {ok, XNameBin} -> ok;
+ {ok, TraceExchangeBin} -> F(TraceExchangeBin)
+ end of
+ {'EXIT', Reason} -> rabbit_log:info("Trace tap died: ~p~n", [Reason]);
+ ok -> ok
+ end.
+
+publish(TraceExchangeBin, VHostBin, RKPrefix, RKSuffix, Table, Payload) ->
+ rabbit_basic:publish(rabbit_misc:r(VHostBin, exchange, TraceExchangeBin),
+ <<RKPrefix/binary, ".", RKSuffix/binary>>,
+ #'P_basic'{headers = Table}, Payload),
+ ok.
+
+message_to_table(#basic_message{exchange_name = #resource{name = XName},
+ routing_keys = RoutingKeys,
+ content = Content}) ->
+ #content{properties = #'P_basic'{content_type = ContentType,
+ content_encoding = ContentEncoding,
+ headers = Headers,
+ delivery_mode = DeliveryMode,
+ priority = Priority,
+ correlation_id = CorrelationId,
+ reply_to = ReplyTo,
+ expiration = Expiration,
+ message_id = MessageId,
+ timestamp = Timestamp,
+ type = Type,
+ user_id = UserId,
+ app_id = AppId},
+ payload_fragments_rev = PFR} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ Headers1 = prune_undefined(
+ [{<<"content_type">>, longstr, ContentType},
+ {<<"content_encoding">>, longstr, ContentEncoding},
+ {<<"headers">>, table, Headers},
+ {<<"delivery_mode">>, signedint, DeliveryMode},
+ {<<"priority">>, signedint, Priority},
+ {<<"correlation_id">>, longstr, CorrelationId},
+ {<<"reply_to">>, longstr, ReplyTo},
+ {<<"expiration">>, longstr, Expiration},
+ {<<"message_id">>, longstr, MessageId},
+ {<<"timestamp">>, longstr, Timestamp},
+ {<<"type">>, longstr, Type},
+ {<<"user_id">>, longstr, UserId},
+ {<<"app_id">>, longstr, AppId}]),
+ {[{<<"exchange_name">>, longstr, XName},
+ {<<"routing_key">>, array, [{longstr, K} || K <- RoutingKeys]},
+ {<<"headers">>, table, Headers1},
+ {<<"node">>, longstr, list_to_binary(atom_to_list(node()))}],
+ list_to_binary(lists:reverse(PFR))}.
+
+prune_undefined(Fields) ->
+ [F || F = {_, _, Value} <- Fields,
+ Value =/= undefined].