summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_log.erl103
1 files changed, 78 insertions, 25 deletions
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index 5d1a8f60ab..aa22ffe3b1 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -47,6 +47,7 @@
-import(error_logger).
-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
-define(SERVER, ?MODULE).
@@ -101,38 +102,51 @@ error(Fmt, Args) when is_list(Args) ->
tap_trace_in(Message = #basic_message{exchange_name = XName},
QPids) ->
- case application:get_env(trace_exchange) of
- undefined ->
- ok;
- {ok, TraceExchangeBin} ->
- QInfos = [rabbit_amqqueue:info(#amqqueue{pid = P}, [name]) || P <- QPids],
- QNames = [N || [{name, #resource{name = N}}] <- QInfos],
- maybe_inject(TraceExchangeBin,
- XName,
- <<"publish">>,
- XName,
- [{queue_names, QNames},
- {message, Message}])
- end.
+ check_trace(fun (TraceExchangeBin) ->
+ QInfos = [rabbit_amqqueue:info(#amqqueue{pid = P}, [name]) || P <- QPids],
+ QNames = [N || [{name, #resource{name = N}}] <- QInfos],
+ maybe_inject(TraceExchangeBin,
+ XName,
+ <<"publish">>,
+ XName,
+ [{<<"queue_names">>,
+ longstr,
+ list_to_binary(rabbit_misc:intersperse(",", QNames))},
+ {<<"message">>,
+ table,
+ message_to_table(Message)}])
+ end).
tap_trace_out(Message = #basic_message{exchange_name = XName},
QName) ->
- case application:get_env(trace_exchange) of
- undefined ->
- ok;
- {ok, TraceExchangeBin} ->
- maybe_inject(TraceExchangeBin,
- XName,
- <<"deliver">>,
- QName,
- [{message, Message}])
+ check_trace(fun (TraceExchangeBin) ->
+ maybe_inject(TraceExchangeBin,
+ XName,
+ <<"deliver">>,
+ QName,
+ [{<<"message">>,
+ table,
+ message_to_table(Message)}])
+ end).
+
+check_trace(F) ->
+ case catch case application:get_env(trace_exchange) of
+ undefined ->
+ ok;
+ {ok, TraceExchangeBin} ->
+ F(TraceExchangeBin)
+ end of
+ {'EXIT', Reason} ->
+ info("Trace tap died with reason ~p~n", [Reason]);
+ ok ->
+ ok
end.
maybe_inject(TraceExchangeBin,
#resource{virtual_host = VHostBin, name = OriginalExchangeBin},
RKPrefix,
#resource{name = RKSuffix},
- Term) ->
+ Table) ->
if
TraceExchangeBin =:= OriginalExchangeBin ->
ok;
@@ -142,11 +156,50 @@ maybe_inject(TraceExchangeBin,
false,
rabbit_misc:r(VHostBin, exchange, TraceExchangeBin),
<<RKPrefix/binary, ".", RKSuffix/binary>>,
- <<"text/plain">>,
- list_to_binary(io_lib:format("~p", [Term]))),
+ <<"application/x-amqp-table; version=0-8">>,
+ rabbit_binary_generator:generate_table(Table)),
ok
end.
+message_to_table(#basic_message{exchange_name = #resource{name = XName},
+ routing_key = RoutingKey,
+ content = Content}) ->
+ #content{properties = Props,
+ payload_fragments_rev = PFR} = rabbit_binary_parser:ensure_content_decoded(Content),
+ #'P_basic'{content_type = ContentType,
+ content_encoding = ContentEncoding,
+ headers = Headers,
+ delivery_mode = DeliveryMode,
+ priority = Priority,
+ correlation_id = CorrelationId,
+ reply_to = ReplyTo,
+ expiration = Expiration,
+ message_id = MessageId,
+ timestamp = Timestamp,
+ type = Type,
+ user_id = UserId,
+ app_id = AppId} = Props,
+ [{<<"exchange_name">>, longstr, XName},
+ {<<"routing_key">>, longstr, RoutingKey},
+ {<<"headers">>, table, prune_undefined([{<<"content_type">>, longstr, ContentType},
+ {<<"content_encoding">>, longstr, ContentEncoding},
+ {<<"headers">>, table, Headers},
+ {<<"delivery_mode">>, signedint, DeliveryMode},
+ {<<"priority">>, signedint, Priority},
+ {<<"correlation_id">>, longstr, CorrelationId},
+ {<<"reply_to">>, longstr, ReplyTo},
+ {<<"expiration">>, longstr, Expiration},
+ {<<"message_id">>, longstr, MessageId},
+ {<<"timestamp">>, longstr, Timestamp},
+ {<<"type">>, longstr, Type},
+ {<<"user_id">>, longstr, UserId},
+ {<<"app_id">>, longstr, AppId}])},
+ {<<"body">>, binary, list_to_binary(lists:reverse(PFR))}].
+
+prune_undefined(Fields) ->
+ [F || F = {_, _, Value} <- Fields,
+ Value =/= undefined].
+
%%--------------------------------------------------------------------
init([]) -> {ok, none}.