diff options
| author | Tony Garnock-Jones <tonyg@kcbbs.gen.nz> | 2009-04-09 16:49:42 -0700 |
|---|---|---|
| committer | Tony Garnock-Jones <tonyg@kcbbs.gen.nz> | 2009-04-09 16:49:42 -0700 |
| commit | f7c3527a5fc88646787d111663aa19a51018ed9c (patch) | |
| tree | 50231201aec5b76785cb8fcbfc4ee3169e902b9e | |
| parent | 40cc23ce4796a38bfddaac1d11e568ca1fc282c0 (diff) | |
| download | rabbitmq-server-git-f7c3527a5fc88646787d111663aa19a51018ed9c.tar.gz | |
Initial solution. Remaining: documentation; rabbitmqctl interface.
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_log.erl | 52 |
3 files changed, 54 insertions, 0 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c390b2b7e4..990a254531 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -161,6 +161,7 @@ deliver_immediately(Message, Delivered, round_robin = RoundRobin, next_msg_id = NextId}) -> ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]), + rabbit_log:tap_trace_out(Message, QName), case queue:out(RoundRobin) of {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 7574cd673a..c074fe091d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -765,6 +765,7 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments, publish(Mandatory, Immediate, Message, QPids, State = #ch{transaction_id = TxnKey, writer_pid = WriterPid}) -> + rabbit_log:tap_trace_in(Message, QPids), Handled = deliver(QPids, Mandatory, Immediate, TxnKey, Message, WriterPid), case TxnKey of diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index f408336e94..5d1a8f60ab 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -41,9 +41,13 @@ -export([debug/1, debug/2, message/4, info/1, info/2, warning/1, warning/2, error/1, error/2]). +-export([tap_trace_in/2, tap_trace_out/2]). + -import(io). -import(error_logger). +-include("rabbit.hrl"). + -define(SERVER, ?MODULE). %%---------------------------------------------------------------------------- @@ -95,6 +99,54 @@ error(Fmt) -> error(Fmt, Args) when is_list(Args) -> gen_server:cast(?SERVER, {error, Fmt, Args}). +tap_trace_in(Message = #basic_message{exchange_name = XName}, + QPids) -> + case application:get_env(trace_exchange) of + undefined -> + ok; + {ok, TraceExchangeBin} -> + QInfos = [rabbit_amqqueue:info(#amqqueue{pid = P}, [name]) || P <- QPids], + QNames = [N || [{name, #resource{name = N}}] <- QInfos], + maybe_inject(TraceExchangeBin, + XName, + <<"publish">>, + XName, + [{queue_names, QNames}, + {message, Message}]) + end. + +tap_trace_out(Message = #basic_message{exchange_name = XName}, + QName) -> + case application:get_env(trace_exchange) of + undefined -> + ok; + {ok, TraceExchangeBin} -> + maybe_inject(TraceExchangeBin, + XName, + <<"deliver">>, + QName, + [{message, Message}]) + end. + +maybe_inject(TraceExchangeBin, + #resource{virtual_host = VHostBin, name = OriginalExchangeBin}, + RKPrefix, + #resource{name = RKSuffix}, + Term) -> + if + TraceExchangeBin =:= OriginalExchangeBin -> + ok; + true -> + rabbit_exchange:simple_publish( + false, + false, + rabbit_misc:r(VHostBin, exchange, TraceExchangeBin), + <<RKPrefix/binary, ".", RKSuffix/binary>>, + <<"text/plain">>, + list_to_binary(io_lib:format("~p", [Term]))), + ok + end. + %%-------------------------------------------------------------------- init([]) -> {ok, none}. |
