summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTony Garnock-Jones <tonyg@kcbbs.gen.nz>2009-05-09 12:03:22 +0100
committerTony Garnock-Jones <tonyg@kcbbs.gen.nz>2009-05-09 12:03:22 +0100
commit81122a17bedb82927be206e446654c4438415370 (patch)
tree64dc2447b2b02899472fea9ae59846bcd6a183a2
parent22eb3828ed314a8cf8c03a35258bd075bfea077c (diff)
downloadrabbitmq-server-git-81122a17bedb82927be206e446654c4438415370.tar.gz
Move outbound tap to saner location; take advantage of extra info at that point
-rw-r--r--src/rabbit_amqqueue_process.erl1
-rw-r--r--src/rabbit_channel.erl10
-rw-r--r--src/rabbit_log.erl24
3 files changed, 23 insertions, 12 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 990a254531..c390b2b7e4 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -161,7 +161,6 @@ deliver_immediately(Message, Delivered,
round_robin = RoundRobin,
next_msg_id = NextId}) ->
?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]),
- rabbit_log:tap_trace_out(Message, QName),
case queue:out(RoundRobin) of
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index bcbf281037..7c1dcf029e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -362,6 +362,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
routing_key = RoutingKey,
content = Content}}} ->
State1 = lock_message(not(NoAck), {DeliveryTag, none, Msg}, State),
+ rabbit_log:tap_trace_out(Msg, DeliveryTag, none),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.get_ok'{delivery_tag = DeliveryTag,
@@ -937,15 +938,16 @@ lock_message(false, _MsgStruct, State) ->
State.
internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag,
- {_QName, QPid, _MsgId, Redelivered,
- #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey,
- content = Content}}) ->
+ Msg = {_QName, QPid, _MsgId, Redelivered,
+ #basic_message{exchange_name = ExchangeName,
+ routing_key = RoutingKey,
+ content = Content}}) ->
M = #'basic.deliver'{consumer_tag = ConsumerTag,
delivery_tag = DeliveryTag,
redelivered = Redelivered,
exchange = ExchangeName#resource.name,
routing_key = RoutingKey},
+ rabbit_log:tap_trace_out(Msg, DeliveryTag, ConsumerTag),
ok = case Notify of
true -> rabbit_writer:send_command_and_notify(
WriterPid, QPid, self(), M, Content);
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index b581d1ee10..93c01400aa 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -41,7 +41,7 @@
-export([debug/1, debug/2, message/4, info/1, info/2,
warning/1, warning/2, error/1, error/2]).
--export([tap_trace_in/2, tap_trace_out/2]).
+-export([tap_trace_in/2, tap_trace_out/3]).
-import(io).
-import(error_logger).
@@ -120,19 +120,29 @@ tap_trace_in(Message = #basic_message{exchange_name = #resource{virtual_host = V
message_to_table(Message)}])
end).
-tap_trace_out(Message = #basic_message{exchange_name = #resource{virtual_host = VHostBin,
- name = XNameBin}},
- #resource{name = QNameBin}) ->
+tap_trace_out({#resource{name = QNameBin}, _QPid, QMsgId, Redelivered,
+ Message = #basic_message{exchange_name = #resource{virtual_host = VHostBin,
+ name = XNameBin}}},
+ DeliveryTag,
+ ConsumerTagOrNone) ->
check_trace(VHostBin,
fun (TraceExchangeBin) ->
+ RedeliveredNum = case Redelivered of true -> 1; false -> 0 end,
+ Fields0 = [{<<"delivery_tag">>, signedint, DeliveryTag}, %% FIXME later
+ {<<"queue_msg_number">>, signedint, QMsgId},
+ {<<"redelivered">>, signedint, RedeliveredNum},
+ {<<"message">>, table, message_to_table(Message)}],
+ Fields = case ConsumerTagOrNone of
+ none -> Fields0;
+ ConsumerTag -> [{<<"consumer_tag">>, longstr, ConsumerTag}
+ | Fields0]
+ end,
maybe_inject(TraceExchangeBin,
VHostBin,
XNameBin,
<<"deliver">>,
QNameBin,
- [{<<"message">>,
- table,
- message_to_table(Message)}])
+ Fields)
end).
check_trace(VHostBin, F) ->