summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTony Garnock-Jones <tonyg@kcbbs.gen.nz>2009-04-09 16:49:42 -0700
committerTony Garnock-Jones <tonyg@kcbbs.gen.nz>2009-04-09 16:49:42 -0700
commitf7c3527a5fc88646787d111663aa19a51018ed9c (patch)
tree50231201aec5b76785cb8fcbfc4ee3169e902b9e
parent40cc23ce4796a38bfddaac1d11e568ca1fc282c0 (diff)
downloadrabbitmq-server-git-f7c3527a5fc88646787d111663aa19a51018ed9c.tar.gz
Initial solution. Remaining: documentation; rabbitmqctl interface.
-rw-r--r--src/rabbit_amqqueue_process.erl1
-rw-r--r--src/rabbit_channel.erl1
-rw-r--r--src/rabbit_log.erl52
3 files changed, 54 insertions, 0 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c390b2b7e4..990a254531 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -161,6 +161,7 @@ deliver_immediately(Message, Delivered,
round_robin = RoundRobin,
next_msg_id = NextId}) ->
?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]),
+ rabbit_log:tap_trace_out(Message, QName),
case queue:out(RoundRobin) of
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 7574cd673a..c074fe091d 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -765,6 +765,7 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
publish(Mandatory, Immediate, Message, QPids,
State = #ch{transaction_id = TxnKey, writer_pid = WriterPid}) ->
+ rabbit_log:tap_trace_in(Message, QPids),
Handled = deliver(QPids, Mandatory, Immediate, TxnKey,
Message, WriterPid),
case TxnKey of
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index f408336e94..5d1a8f60ab 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -41,9 +41,13 @@
-export([debug/1, debug/2, message/4, info/1, info/2,
warning/1, warning/2, error/1, error/2]).
+-export([tap_trace_in/2, tap_trace_out/2]).
+
-import(io).
-import(error_logger).
+-include("rabbit.hrl").
+
-define(SERVER, ?MODULE).
%%----------------------------------------------------------------------------
@@ -95,6 +99,54 @@ error(Fmt) ->
error(Fmt, Args) when is_list(Args) ->
gen_server:cast(?SERVER, {error, Fmt, Args}).
+tap_trace_in(Message = #basic_message{exchange_name = XName},
+ QPids) ->
+ case application:get_env(trace_exchange) of
+ undefined ->
+ ok;
+ {ok, TraceExchangeBin} ->
+ QInfos = [rabbit_amqqueue:info(#amqqueue{pid = P}, [name]) || P <- QPids],
+ QNames = [N || [{name, #resource{name = N}}] <- QInfos],
+ maybe_inject(TraceExchangeBin,
+ XName,
+ <<"publish">>,
+ XName,
+ [{queue_names, QNames},
+ {message, Message}])
+ end.
+
+tap_trace_out(Message = #basic_message{exchange_name = XName},
+ QName) ->
+ case application:get_env(trace_exchange) of
+ undefined ->
+ ok;
+ {ok, TraceExchangeBin} ->
+ maybe_inject(TraceExchangeBin,
+ XName,
+ <<"deliver">>,
+ QName,
+ [{message, Message}])
+ end.
+
+maybe_inject(TraceExchangeBin,
+ #resource{virtual_host = VHostBin, name = OriginalExchangeBin},
+ RKPrefix,
+ #resource{name = RKSuffix},
+ Term) ->
+ if
+ TraceExchangeBin =:= OriginalExchangeBin ->
+ ok;
+ true ->
+ rabbit_exchange:simple_publish(
+ false,
+ false,
+ rabbit_misc:r(VHostBin, exchange, TraceExchangeBin),
+ <<RKPrefix/binary, ".", RKSuffix/binary>>,
+ <<"text/plain">>,
+ list_to_binary(io_lib:format("~p", [Term]))),
+ ok
+ end.
+
%%--------------------------------------------------------------------
init([]) -> {ok, none}.