summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl26
-rw-r--r--src/rabbit_trace.erl29
2 files changed, 34 insertions, 21 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f078886224..2517528a8b 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -35,7 +35,7 @@
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, consumer_monitors, queue_collector_pid,
stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
- unconfirmed_qm, confirmed, capabilities}).
+ unconfirmed_qm, confirmed, capabilities, trace_state}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -185,7 +185,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
unconfirmed_mq = gb_trees:empty(),
unconfirmed_qm = gb_trees:empty(),
confirmed = [],
- capabilities = Capabilities},
+ capabilities = Capabilities,
+ trace_state = rabbit_trace:init(VHost)},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
rabbit_event:if_enabled(StatsTimer,
fun() -> internal_emit_stats(State) end),
@@ -263,8 +264,9 @@ handle_cast({deliver, ConsumerTag, AckRequired,
#basic_message{exchange_name = ExchangeName,
routing_keys = [RoutingKey | _CcRoutes],
content = Content}}},
- State = #ch{writer_pid = WriterPid,
- next_tag = DeliveryTag}) ->
+ State = #ch{writer_pid = WriterPid,
+ next_tag = DeliveryTag,
+ trace_state = TraceState}) ->
State1 = lock_message(AckRequired,
ack_record(DeliveryTag, ConsumerTag, Msg),
State),
@@ -281,7 +283,7 @@ handle_cast({deliver, ConsumerTag, AckRequired,
true -> deliver;
false -> deliver_no_ack
end, State),
- rabbit_trace:tap_trace_out(Msg),
+ rabbit_trace:tap_trace_out(Msg, TraceState),
noreply(State1#ch{next_tag = DeliveryTag + 1});
handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) ->
@@ -591,7 +593,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
immediate = Immediate},
Content, State = #ch{virtual_host = VHostPath,
transaction_id = TxnKey,
- confirm_enabled = ConfirmEnabled}) ->
+ confirm_enabled = ConfirmEnabled,
+ trace_state = TraceState}) ->
ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin),
check_write_permitted(ExchangeName, State),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
@@ -608,7 +611,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
end,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
- rabbit_trace:tap_trace_in(Message),
+ rabbit_trace:tap_trace_in(Message, TraceState),
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
Exchange,
@@ -656,9 +659,10 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
handle_method(#'basic.get'{queue = QueueNameBin,
no_ack = NoAck},
- _, State = #ch{writer_pid = WriterPid,
- conn_pid = ConnPid,
- next_tag = DeliveryTag}) ->
+ _, State = #ch{writer_pid = WriterPid,
+ conn_pid = ConnPid,
+ next_tag = DeliveryTag,
+ trace_state = TraceState}) ->
QueueName = expand_queue_name_shortcut(QueueNameBin, State),
check_read_permitted(QueueName, State),
case rabbit_amqqueue:with_exclusive_access_or_die(
@@ -677,7 +681,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
true -> get_no_ack;
false -> get
end, State),
- rabbit_trace:tap_trace_out(Msg),
+ rabbit_trace:tap_trace_out(Msg, TraceState),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.get_ok'{delivery_tag = DeliveryTag,
diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl
index 2d15e7fc42..e24d22ad98 100644
--- a/src/rabbit_trace.erl
+++ b/src/rabbit_trace.erl
@@ -16,39 +16,48 @@
-module(rabbit_trace).
--export([tap_trace_in/1, tap_trace_out/1]).
+-export([init/1, tap_trace_in/2, tap_trace_out/2]).
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
+-record(trace_state, {trace_exchange}).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(tap_trace_in/1 :: (rabbit_types:basic_message()) -> 'ok').
--spec(tap_trace_out/1 :: (rabbit_amqqueue:qmsg()) -> 'ok').
+-type(state() :: #trace_state{trace_exchange :: rabbit_exchange:name()}).
+
+-spec(init/1 :: (rabbit_types:vhost()) -> state()).
+-spec(tap_trace_in/2 :: (rabbit_types:basic_message(), state()) -> 'ok').
+-spec(tap_trace_out/2 :: (rabbit_amqqueue:qmsg(), state()) -> 'ok').
-endif.
%%----------------------------------------------------------------------------
-tap_trace_in(Msg) ->
- maybe_trace(Msg, <<"publish">>, xname(Msg), []).
+init(VHost) ->
+ #trace_state{trace_exchange = trace_exchange(VHost)}.
+
+tap_trace_in(Msg, #trace_state{trace_exchange = TraceX}) ->
+ maybe_trace(Msg, TraceX, <<"publish">>, xname(Msg), []).
-tap_trace_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg}) ->
+tap_trace_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg},
+ #trace_state{trace_exchange = TraceX}) ->
RedeliveredNum = case Redelivered of true -> 1; false -> 0 end,
- maybe_trace(Msg, <<"deliver">>, QName,
+ maybe_trace(Msg, TraceX, <<"deliver">>, QName,
[{<<"redelivered">>, signedint, RedeliveredNum}]).
xname(#basic_message{exchange_name = #resource{name = XName}}) -> XName.
vhost(#basic_message{exchange_name = #resource{virtual_host = VHost}}) -> VHost.
-maybe_trace(Msg, RKPrefix, RKSuffix, Extra) ->
+maybe_trace(Msg, TraceX, RKPrefix, RKSuffix, Extra) ->
XName = xname(Msg),
- case trace_exchange(vhost(Msg)) of
+ case TraceX of
none -> ok;
XName -> ok;
- TraceX -> case catch trace(TraceX, Msg, RKPrefix, RKSuffix, Extra) of
+ _ -> case catch trace(TraceX, Msg, RKPrefix, RKSuffix, Extra) of
{'EXIT', R} -> rabbit_log:info("Trace died: ~p~n", [R]);
ok -> ok
end