diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-13 10:25:17 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-13 10:25:17 +0100 |
| commit | 6ff8e253b1b0d71d96958d38561ddc86bb42e6e8 (patch) | |
| tree | 26aa758d3528ec373e44fe2c341e7b20828c8acc | |
| parent | 173816cad7677a27f2b62f14a26b0a9aa764f1a1 (diff) | |
| download | rabbitmq-server-git-6ff8e253b1b0d71d96958d38561ddc86bb42e6e8.tar.gz | |
some instrumentation
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 1 |
3 files changed, 24 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d52660c5ac..4bb2109e56 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -345,9 +345,12 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, ChPid, ConsumerTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), ChAckTags1 = case AckRequired of - true -> sets:add_element( - AckTag, ChAckTags); - false -> ChAckTags + true -> + rabbit_log:info("Delivered message but waiting for ack~n"), + sets:add_element(AckTag, ChAckTags); + false -> + rabbit_log:info("Delivered message and not waiting for ack~n"), + ChAckTags end, NewC = C#cr{unsent_message_count = Count + 1, acktags = ChAckTags1}, @@ -384,6 +387,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, {FunAcc, State} end; {empty, _} -> + rabbit_log:info("Message on a queue without consumers~n"), {FunAcc, State} end. @@ -405,7 +409,8 @@ run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State), State1. -attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> +attempt_delivery(none, ChPid, Message, State = #q{backing_queue = BQ}) -> + rabbit_log:info("Attempting delivery of message~n"), PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> @@ -414,6 +419,7 @@ attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} end, + ConfirmFun = fun() -> rabbit_channel:confirm(ChPid, -1) end, deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> @@ -421,6 +427,7 @@ attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}. deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> + rabbit_log:info("deliver_or_enqueue called for message~n"), case attempt_delivery(Txn, ChPid, Message, State) of {true, NewState} -> {true, NewState}; @@ -799,6 +806,7 @@ handle_cast({deliver, Txn, Message, ChPid}, State) -> handle_cast({ack, Txn, AckTags, ChPid}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + rabbit_log:info("Queue process got an ack~n"), case lookup_ch(ChPid) of not_found -> noreply(State); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ae97066bf9..07d35ce0f3 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -38,7 +38,7 @@ -export([start_link/6, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). --export([emit_stats/1, flush/1, flush_multiple_acks/1]). +-export([emit_stats/1, flush/1, flush_multiple_acks/1, confirm/2]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]). @@ -99,6 +99,8 @@ -spec(info_all/0 :: () -> [[rabbit_types:info()]]). -spec(info_all/1 :: ([rabbit_types:info_key()]) -> [[rabbit_types:info()]]). -spec(emit_stats/1 :: (pid()) -> 'ok'). +-spec(flush_multiple_acks/1 :: (pid()) -> 'ok'). +-spec(confirm/2 ::(pid(), integer()) -> 'ok'). -endif. @@ -155,6 +157,9 @@ flush(Pid) -> flush_multiple_acks(Pid) -> gen_server2:cast(Pid, multiple_ack_flush). +confirm(Pid, MessageSequenceNumber) -> + gen_server2:cast(Pid, {confirm, MessageSequenceNumber}). + %%--------------------------------------------------------------------------- @@ -262,6 +267,10 @@ handle_cast(multiple_ack_flush, end, {noreply, State#ch{confirm = C#confirm{held_acks = gb_sets:new(), tref = undefined}}}. +%handle_cast({confirm, MsgSeqNo}, State) -> +% rabbit_log:info("got confirm for #~p~n", [MsgSeqNo]), +% {noreply, SOMETHING MAGIC + handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}}, State = #ch{writer_pid = WriterPid}) -> @@ -492,6 +501,7 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, _, State = #ch{transaction_id = TxnKey, unacked_message_q = UAMQ}) -> + rabbit_log:info("channel received a basic.ack~n"), {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), QIncs = ack(TxnKey, Acked), Participants = [QPid || {QPid, _} <- QIncs], diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 0f52eee84f..714eb206aa 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1004,6 +1004,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, persistent_count = PCount, durable = IsDurable, ram_msg_count = RamMsgCount }) -> + rabbit_log:info("message ~p got to variable_queue:publish~n", [Msg#basic_message.guid]), IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg)) #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk }, |
