diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-12-06 10:48:30 +0000 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-12-06 10:48:30 +0000 |
| commit | 9001dcac8e101a6ff4b9560410bc9c0f420808db (patch) | |
| tree | d4249d72e805f56282114c5f1d94170a759a3650 /src | |
| parent | 510ccc636411681fcbfeae520f00f63363600a74 (diff) | |
| download | rabbitmq-server-git-9001dcac8e101a6ff4b9560410bc9c0f420808db.tar.gz | |
add stats for confirms
To recap, a published message is confirmed by the channel. A message
is confirmed only after all the queues it was published to confirm it.
With the current change, the emitted stats look like this:
{channel_exchange_stats,
[{{resource,<<"/">>,exchange,<<"direct">>},
[{confirm,545},{publish,545}]}]},
{channel_queue_exchange_stats,
[{{<0.204.0>,{resource,<<"/">>,exchange,<<"direct">>}},
[{confirm,545},{publish,545}]},
{{<0.195.0>,{resource,<<"/">>,exchange,<<"direct">>}},
[{confirm,545},{publish,545}]}]}]
The confirm field in channel_exchange_stats represents the number of
messages sent to that exchange that have also been confirmed. If the
exchanged routed the message to different queues, this number is only
increased when all queues have confirmed the message. If the message
was unroutable or was routed to 0 queues, this number is still
increased. This is the number of basic.confirms sent back to
publisher.
The confirm field in channel_queue_exchange_stats represents the
number of messages confirmed by that queue (but not necessarily
confirmed by the channel).
In channel_exchange_stats, if the number of confirms lags behind the
number of publishes, one of the queues is not confirming messages in a
timely fashion.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 15 |
1 files changed, 7 insertions, 8 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ef85c318e6..e0f6f0e2f6 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -467,29 +467,30 @@ send_or_enqueue_ack(_MsgSeqNo, _QPid, _EN, State = #ch{confirm_enabled = false}) State; send_or_enqueue_ack(MsgSeqNo, QPid, ExchangeName, State = #ch{confirm_multiple = false}) -> - maybe_incr_confirm_stats(QPid, ExchangeName, State), + maybe_incr_confirm_queue_stats(QPid, ExchangeName, State), do_if_unconfirmed( MsgSeqNo, QPid, fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command( WriterPid, #'basic.ack'{delivery_tag = MSN}), + maybe_incr_stats([{ExchangeName, 1}], confirm, State1), State1 end, State); send_or_enqueue_ack(MsgSeqNo, QPid, ExchangeName, State = #ch{confirm_multiple = true}) -> - maybe_incr_confirm_stats(QPid, ExchangeName, State), + maybe_incr_confirm_queue_stats(QPid, ExchangeName, State), do_if_unconfirmed( MsgSeqNo, QPid, fun(MSN, State1 = #ch{held_confirms = As}) -> + maybe_incr_stats([{ExchangeName, 1}], confirm, State1), start_confirm_timer( State1#ch{held_confirms = gb_sets:add(MSN, As)}) end, State). -maybe_incr_confirm_stats(QPid, ExchangeName, State) -> - maybe_incr_stats([{ExchangeName, 1}], confirm, State), +maybe_incr_confirm_queue_stats(QPid, ExchangeName, State) -> case QPid of undefined -> ok; - _ -> maybe_incr_stats({{QPid, ExchangeName}, 1}, confirm, State) + _ -> maybe_incr_stats([{{QPid, ExchangeName}, 1}], confirm, State) end. do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun, @@ -568,7 +569,6 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, content = DecodedContent, guid = rabbit_guid:guid(), is_persistent = IsPersistent}, - io:format("publishing ~p to ~p (~p)~n", [MsgSeqNo, ExchangeName, IsPersistent]), {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, @@ -579,7 +579,6 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, maybe_incr_stats([{ExchangeName, 1} | [{{QPid, ExchangeName}, 1} || QPid <- DeliveredQPids]], publish, State2), - io:format("did~n"), {noreply, case TxnKey of none -> State2; _ -> add_tx_participants(DeliveredQPids, State2) @@ -1256,7 +1255,6 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _Msg, State = #ch{queues_for_msg = QFM, exchange_for_msg = EFM}) -> EFM1 = dict:store(MsgSeqNo, XName, EFM), - io:format("Msg -> X: ~p -> ~p~n", [MsgSeqNo, XName]), QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM), [maybe_monitor(QPid) || QPid <- QPids], State#ch{queues_for_msg = QFM1, exchange_for_msg = EFM1}. @@ -1368,6 +1366,7 @@ internal_emit_stats(State = #ch{stats_timer = StatsTimer}, Extra) -> {channel_queue_exchange_stats, [{QX, Stats} || {{queue_exchange_stats, QX}, Stats} <- get()]}], + io:format("Stats: ~p~n", [Extra ++ CoarseStats ++ FineStats]), rabbit_event:notify(channel_stats, Extra ++ CoarseStats ++ FineStats) end. |
