summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-13 10:25:17 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-13 10:25:17 +0100
commit6ff8e253b1b0d71d96958d38561ddc86bb42e6e8 (patch)
tree26aa758d3528ec373e44fe2c341e7b20828c8acc
parent173816cad7677a27f2b62f14a26b0a9aa764f1a1 (diff)
downloadrabbitmq-server-git-6ff8e253b1b0d71d96958d38561ddc86bb42e6e8.tar.gz
some instrumentation
-rw-r--r--src/rabbit_amqqueue_process.erl16
-rw-r--r--src/rabbit_channel.erl12
-rw-r--r--src/rabbit_variable_queue.erl1
3 files changed, 24 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d52660c5ac..4bb2109e56 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -345,9 +345,12 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
ChAckTags1 = case AckRequired of
- true -> sets:add_element(
- AckTag, ChAckTags);
- false -> ChAckTags
+ true ->
+ rabbit_log:info("Delivered message but waiting for ack~n"),
+ sets:add_element(AckTag, ChAckTags);
+ false ->
+ rabbit_log:info("Delivered message and not waiting for ack~n"),
+ ChAckTags
end,
NewC = C#cr{unsent_message_count = Count + 1,
acktags = ChAckTags1},
@@ -384,6 +387,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
{FunAcc, State}
end;
{empty, _} ->
+ rabbit_log:info("Message on a queue without consumers~n"),
{FunAcc, State}
end.
@@ -405,7 +409,8 @@ run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
{_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State),
State1.
-attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) ->
+attempt_delivery(none, ChPid, Message, State = #q{backing_queue = BQ}) ->
+ rabbit_log:info("Attempting delivery of message~n"),
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) ->
@@ -414,6 +419,7 @@ attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) ->
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
end,
+ ConfirmFun = fun() -> rabbit_channel:confirm(ChPid, -1) end,
deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State);
attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
@@ -421,6 +427,7 @@ attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ,
{true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}.
deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
+ rabbit_log:info("deliver_or_enqueue called for message~n"),
case attempt_delivery(Txn, ChPid, Message, State) of
{true, NewState} ->
{true, NewState};
@@ -799,6 +806,7 @@ handle_cast({deliver, Txn, Message, ChPid}, State) ->
handle_cast({ack, Txn, AckTags, ChPid},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ rabbit_log:info("Queue process got an ack~n"),
case lookup_ch(ChPid) of
not_found ->
noreply(State);
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index ae97066bf9..07d35ce0f3 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -38,7 +38,7 @@
-export([start_link/6, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
--export([emit_stats/1, flush/1, flush_multiple_acks/1]).
+-export([emit_stats/1, flush/1, flush_multiple_acks/1, confirm/2]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1]).
@@ -99,6 +99,8 @@
-spec(info_all/0 :: () -> [[rabbit_types:info()]]).
-spec(info_all/1 :: ([rabbit_types:info_key()]) -> [[rabbit_types:info()]]).
-spec(emit_stats/1 :: (pid()) -> 'ok').
+-spec(flush_multiple_acks/1 :: (pid()) -> 'ok').
+-spec(confirm/2 ::(pid(), integer()) -> 'ok').
-endif.
@@ -155,6 +157,9 @@ flush(Pid) ->
flush_multiple_acks(Pid) ->
gen_server2:cast(Pid, multiple_ack_flush).
+confirm(Pid, MessageSequenceNumber) ->
+ gen_server2:cast(Pid, {confirm, MessageSequenceNumber}).
+
%%---------------------------------------------------------------------------
@@ -262,6 +267,10 @@ handle_cast(multiple_ack_flush,
end,
{noreply, State#ch{confirm = C#confirm{held_acks = gb_sets:new(),
tref = undefined}}}.
+%handle_cast({confirm, MsgSeqNo}, State) ->
+% rabbit_log:info("got confirm for #~p~n", [MsgSeqNo]),
+% {noreply, SOMETHING MAGIC
+
handle_info({'EXIT', WriterPid, Reason = {writer, send_failed, _Error}},
State = #ch{writer_pid = WriterPid}) ->
@@ -492,6 +501,7 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
_, State = #ch{transaction_id = TxnKey,
unacked_message_q = UAMQ}) ->
+ rabbit_log:info("channel received a basic.ack~n"),
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
QIncs = ack(TxnKey, Acked),
Participants = [QPid || {QPid, _} <- QIncs],
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 0f52eee84f..714eb206aa 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1004,6 +1004,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent },
persistent_count = PCount,
durable = IsDurable,
ram_msg_count = RamMsgCount }) ->
+ rabbit_log:info("message ~p got to variable_queue:publish~n", [Msg#basic_message.guid]),
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg))
#msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk },