diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_log.erl | 103 |
1 files changed, 78 insertions, 25 deletions
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index 5d1a8f60ab..aa22ffe3b1 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -47,6 +47,7 @@ -import(error_logger). -include("rabbit.hrl"). +-include("rabbit_framing.hrl"). -define(SERVER, ?MODULE). @@ -101,38 +102,51 @@ error(Fmt, Args) when is_list(Args) -> tap_trace_in(Message = #basic_message{exchange_name = XName}, QPids) -> - case application:get_env(trace_exchange) of - undefined -> - ok; - {ok, TraceExchangeBin} -> - QInfos = [rabbit_amqqueue:info(#amqqueue{pid = P}, [name]) || P <- QPids], - QNames = [N || [{name, #resource{name = N}}] <- QInfos], - maybe_inject(TraceExchangeBin, - XName, - <<"publish">>, - XName, - [{queue_names, QNames}, - {message, Message}]) - end. + check_trace(fun (TraceExchangeBin) -> + QInfos = [rabbit_amqqueue:info(#amqqueue{pid = P}, [name]) || P <- QPids], + QNames = [N || [{name, #resource{name = N}}] <- QInfos], + maybe_inject(TraceExchangeBin, + XName, + <<"publish">>, + XName, + [{<<"queue_names">>, + longstr, + list_to_binary(rabbit_misc:intersperse(",", QNames))}, + {<<"message">>, + table, + message_to_table(Message)}]) + end). tap_trace_out(Message = #basic_message{exchange_name = XName}, QName) -> - case application:get_env(trace_exchange) of - undefined -> - ok; - {ok, TraceExchangeBin} -> - maybe_inject(TraceExchangeBin, - XName, - <<"deliver">>, - QName, - [{message, Message}]) + check_trace(fun (TraceExchangeBin) -> + maybe_inject(TraceExchangeBin, + XName, + <<"deliver">>, + QName, + [{<<"message">>, + table, + message_to_table(Message)}]) + end). + +check_trace(F) -> + case catch case application:get_env(trace_exchange) of + undefined -> + ok; + {ok, TraceExchangeBin} -> + F(TraceExchangeBin) + end of + {'EXIT', Reason} -> + info("Trace tap died with reason ~p~n", [Reason]); + ok -> + ok end. maybe_inject(TraceExchangeBin, #resource{virtual_host = VHostBin, name = OriginalExchangeBin}, RKPrefix, #resource{name = RKSuffix}, - Term) -> + Table) -> if TraceExchangeBin =:= OriginalExchangeBin -> ok; @@ -142,11 +156,50 @@ maybe_inject(TraceExchangeBin, false, rabbit_misc:r(VHostBin, exchange, TraceExchangeBin), <<RKPrefix/binary, ".", RKSuffix/binary>>, - <<"text/plain">>, - list_to_binary(io_lib:format("~p", [Term]))), + <<"application/x-amqp-table; version=0-8">>, + rabbit_binary_generator:generate_table(Table)), ok end. +message_to_table(#basic_message{exchange_name = #resource{name = XName}, + routing_key = RoutingKey, + content = Content}) -> + #content{properties = Props, + payload_fragments_rev = PFR} = rabbit_binary_parser:ensure_content_decoded(Content), + #'P_basic'{content_type = ContentType, + content_encoding = ContentEncoding, + headers = Headers, + delivery_mode = DeliveryMode, + priority = Priority, + correlation_id = CorrelationId, + reply_to = ReplyTo, + expiration = Expiration, + message_id = MessageId, + timestamp = Timestamp, + type = Type, + user_id = UserId, + app_id = AppId} = Props, + [{<<"exchange_name">>, longstr, XName}, + {<<"routing_key">>, longstr, RoutingKey}, + {<<"headers">>, table, prune_undefined([{<<"content_type">>, longstr, ContentType}, + {<<"content_encoding">>, longstr, ContentEncoding}, + {<<"headers">>, table, Headers}, + {<<"delivery_mode">>, signedint, DeliveryMode}, + {<<"priority">>, signedint, Priority}, + {<<"correlation_id">>, longstr, CorrelationId}, + {<<"reply_to">>, longstr, ReplyTo}, + {<<"expiration">>, longstr, Expiration}, + {<<"message_id">>, longstr, MessageId}, + {<<"timestamp">>, longstr, Timestamp}, + {<<"type">>, longstr, Type}, + {<<"user_id">>, longstr, UserId}, + {<<"app_id">>, longstr, AppId}])}, + {<<"body">>, binary, list_to_binary(lists:reverse(PFR))}]. + +prune_undefined(Fields) -> + [F || F = {_, _, Value} <- Fields, + Value =/= undefined]. + %%-------------------------------------------------------------------- init([]) -> {ok, none}. |
