summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-12-06 10:48:30 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-12-06 10:48:30 +0000
commit9001dcac8e101a6ff4b9560410bc9c0f420808db (patch)
treed4249d72e805f56282114c5f1d94170a759a3650 /src
parent510ccc636411681fcbfeae520f00f63363600a74 (diff)
downloadrabbitmq-server-git-9001dcac8e101a6ff4b9560410bc9c0f420808db.tar.gz
add stats for confirms
To recap, a published message is confirmed by the channel. A message is confirmed only after all the queues it was published to confirm it. With the current change, the emitted stats look like this: {channel_exchange_stats, [{{resource,<<"/">>,exchange,<<"direct">>}, [{confirm,545},{publish,545}]}]}, {channel_queue_exchange_stats, [{{<0.204.0>,{resource,<<"/">>,exchange,<<"direct">>}}, [{confirm,545},{publish,545}]}, {{<0.195.0>,{resource,<<"/">>,exchange,<<"direct">>}}, [{confirm,545},{publish,545}]}]}] The confirm field in channel_exchange_stats represents the number of messages sent to that exchange that have also been confirmed. If the exchanged routed the message to different queues, this number is only increased when all queues have confirmed the message. If the message was unroutable or was routed to 0 queues, this number is still increased. This is the number of basic.confirms sent back to publisher. The confirm field in channel_queue_exchange_stats represents the number of messages confirmed by that queue (but not necessarily confirmed by the channel). In channel_exchange_stats, if the number of confirms lags behind the number of publishes, one of the queues is not confirming messages in a timely fashion.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl15
1 files changed, 7 insertions, 8 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index ef85c318e6..e0f6f0e2f6 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -467,29 +467,30 @@ send_or_enqueue_ack(_MsgSeqNo, _QPid, _EN, State = #ch{confirm_enabled = false})
State;
send_or_enqueue_ack(MsgSeqNo, QPid, ExchangeName,
State = #ch{confirm_multiple = false}) ->
- maybe_incr_confirm_stats(QPid, ExchangeName, State),
+ maybe_incr_confirm_queue_stats(QPid, ExchangeName, State),
do_if_unconfirmed(
MsgSeqNo, QPid,
fun(MSN, State1 = #ch{writer_pid = WriterPid}) ->
ok = rabbit_writer:send_command(
WriterPid, #'basic.ack'{delivery_tag = MSN}),
+ maybe_incr_stats([{ExchangeName, 1}], confirm, State1),
State1
end, State);
send_or_enqueue_ack(MsgSeqNo, QPid, ExchangeName,
State = #ch{confirm_multiple = true}) ->
- maybe_incr_confirm_stats(QPid, ExchangeName, State),
+ maybe_incr_confirm_queue_stats(QPid, ExchangeName, State),
do_if_unconfirmed(
MsgSeqNo, QPid,
fun(MSN, State1 = #ch{held_confirms = As}) ->
+ maybe_incr_stats([{ExchangeName, 1}], confirm, State1),
start_confirm_timer(
State1#ch{held_confirms = gb_sets:add(MSN, As)})
end, State).
-maybe_incr_confirm_stats(QPid, ExchangeName, State) ->
- maybe_incr_stats([{ExchangeName, 1}], confirm, State),
+maybe_incr_confirm_queue_stats(QPid, ExchangeName, State) ->
case QPid of
undefined -> ok;
- _ -> maybe_incr_stats({{QPid, ExchangeName}, 1}, confirm, State)
+ _ -> maybe_incr_stats([{{QPid, ExchangeName}, 1}], confirm, State)
end.
do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun,
@@ -568,7 +569,6 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
content = DecodedContent,
guid = rabbit_guid:guid(),
is_persistent = IsPersistent},
- io:format("publishing ~p to ~p (~p)~n", [MsgSeqNo, ExchangeName, IsPersistent]),
{RoutingRes, DeliveredQPids} =
rabbit_exchange:publish(
Exchange,
@@ -579,7 +579,6 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
maybe_incr_stats([{ExchangeName, 1} |
[{{QPid, ExchangeName}, 1} ||
QPid <- DeliveredQPids]], publish, State2),
- io:format("did~n"),
{noreply, case TxnKey of
none -> State2;
_ -> add_tx_participants(DeliveredQPids, State2)
@@ -1256,7 +1255,6 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _Msg,
State = #ch{queues_for_msg = QFM,
exchange_for_msg = EFM}) ->
EFM1 = dict:store(MsgSeqNo, XName, EFM),
- io:format("Msg -> X: ~p -> ~p~n", [MsgSeqNo, XName]),
QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM),
[maybe_monitor(QPid) || QPid <- QPids],
State#ch{queues_for_msg = QFM1, exchange_for_msg = EFM1}.
@@ -1368,6 +1366,7 @@ internal_emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) ->
{channel_queue_exchange_stats,
[{QX, Stats} ||
{{queue_exchange_stats, QX}, Stats} <- get()]}],
+ io:format("Stats: ~p~n", [Extra ++ CoarseStats ++ FineStats]),
rabbit_event:notify(channel_stats,
Extra ++ CoarseStats ++ FineStats)
end.