diff options
| -rw-r--r-- | src/rabbit_channel.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_trace.erl | 29 |
2 files changed, 34 insertions, 21 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f078886224..2517528a8b 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,7 +35,7 @@ user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, consumer_monitors, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, - unconfirmed_qm, confirmed, capabilities}). + unconfirmed_qm, confirmed, capabilities, trace_state}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -185,7 +185,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, unconfirmed_mq = gb_trees:empty(), unconfirmed_qm = gb_trees:empty(), confirmed = [], - capabilities = Capabilities}, + capabilities = Capabilities, + trace_state = rabbit_trace:init(VHost)}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, fun() -> internal_emit_stats(State) end), @@ -263,8 +264,9 @@ handle_cast({deliver, ConsumerTag, AckRequired, #basic_message{exchange_name = ExchangeName, routing_keys = [RoutingKey | _CcRoutes], content = Content}}}, - State = #ch{writer_pid = WriterPid, - next_tag = DeliveryTag}) -> + State = #ch{writer_pid = WriterPid, + next_tag = DeliveryTag, + trace_state = TraceState}) -> State1 = lock_message(AckRequired, ack_record(DeliveryTag, ConsumerTag, Msg), State), @@ -281,7 +283,7 @@ handle_cast({deliver, ConsumerTag, AckRequired, true -> deliver; false -> deliver_no_ack end, State), - rabbit_trace:tap_trace_out(Msg), + rabbit_trace:tap_trace_out(Msg, TraceState), noreply(State1#ch{next_tag = DeliveryTag + 1}); handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> @@ -591,7 +593,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, immediate = Immediate}, Content, State = #ch{virtual_host = VHostPath, transaction_id = TxnKey, - confirm_enabled = ConfirmEnabled}) -> + confirm_enabled = ConfirmEnabled, + trace_state = TraceState}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), @@ -608,7 +611,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, end, case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of {ok, Message} -> - rabbit_trace:tap_trace_in(Message), + rabbit_trace:tap_trace_in(Message, TraceState), {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, @@ -656,9 +659,10 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, handle_method(#'basic.get'{queue = QueueNameBin, no_ack = NoAck}, - _, State = #ch{writer_pid = WriterPid, - conn_pid = ConnPid, - next_tag = DeliveryTag}) -> + _, State = #ch{writer_pid = WriterPid, + conn_pid = ConnPid, + next_tag = DeliveryTag, + trace_state = TraceState}) -> QueueName = expand_queue_name_shortcut(QueueNameBin, State), check_read_permitted(QueueName, State), case rabbit_amqqueue:with_exclusive_access_or_die( @@ -677,7 +681,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, true -> get_no_ack; false -> get end, State), - rabbit_trace:tap_trace_out(Msg), + rabbit_trace:tap_trace_out(Msg, TraceState), ok = rabbit_writer:send_command( WriterPid, #'basic.get_ok'{delivery_tag = DeliveryTag, diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl index 2d15e7fc42..e24d22ad98 100644 --- a/src/rabbit_trace.erl +++ b/src/rabbit_trace.erl @@ -16,39 +16,48 @@ -module(rabbit_trace). --export([tap_trace_in/1, tap_trace_out/1]). +-export([init/1, tap_trace_in/2, tap_trace_out/2]). -include("rabbit.hrl"). -include("rabbit_framing.hrl"). +-record(trace_state, {trace_exchange}). + %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(tap_trace_in/1 :: (rabbit_types:basic_message()) -> 'ok'). --spec(tap_trace_out/1 :: (rabbit_amqqueue:qmsg()) -> 'ok'). +-type(state() :: #trace_state{trace_exchange :: rabbit_exchange:name()}). + +-spec(init/1 :: (rabbit_types:vhost()) -> state()). +-spec(tap_trace_in/2 :: (rabbit_types:basic_message(), state()) -> 'ok'). +-spec(tap_trace_out/2 :: (rabbit_amqqueue:qmsg(), state()) -> 'ok'). -endif. %%---------------------------------------------------------------------------- -tap_trace_in(Msg) -> - maybe_trace(Msg, <<"publish">>, xname(Msg), []). +init(VHost) -> + #trace_state{trace_exchange = trace_exchange(VHost)}. + +tap_trace_in(Msg, #trace_state{trace_exchange = TraceX}) -> + maybe_trace(Msg, TraceX, <<"publish">>, xname(Msg), []). -tap_trace_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg}) -> +tap_trace_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg}, + #trace_state{trace_exchange = TraceX}) -> RedeliveredNum = case Redelivered of true -> 1; false -> 0 end, - maybe_trace(Msg, <<"deliver">>, QName, + maybe_trace(Msg, TraceX, <<"deliver">>, QName, [{<<"redelivered">>, signedint, RedeliveredNum}]). xname(#basic_message{exchange_name = #resource{name = XName}}) -> XName. vhost(#basic_message{exchange_name = #resource{virtual_host = VHost}}) -> VHost. -maybe_trace(Msg, RKPrefix, RKSuffix, Extra) -> +maybe_trace(Msg, TraceX, RKPrefix, RKSuffix, Extra) -> XName = xname(Msg), - case trace_exchange(vhost(Msg)) of + case TraceX of none -> ok; XName -> ok; - TraceX -> case catch trace(TraceX, Msg, RKPrefix, RKSuffix, Extra) of + _ -> case catch trace(TraceX, Msg, RKPrefix, RKSuffix, Extra) of {'EXIT', R} -> rabbit_log:info("Trace died: ~p~n", [R]); ok -> ok end |
