summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-13 17:42:03 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-08-13 17:42:03 +0100
commitfc62811384f293e4e82e583b18991f9daaac0cf6 (patch)
tree9308b72ed4efe3dd95a5cd9cc4f813a811f22467 /src
parent1786a75a72991b99788bdb515f92573b6f32e501 (diff)
downloadrabbitmq-server-git-fc62811384f293e4e82e583b18991f9daaac0cf6.tar.gz
refactoring
What's done: - PubAck after transient messages - PubAck after basic.returns - PubAck after message delivered to a consumer (disregarding consumer acks) - PubAck after message got - out of order ack'ing - multiple ack'ing Whant's not done: - PubAck de-duplication - PubAck after message hits disk
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl9
-rw-r--r--src/rabbit_channel.erl9
-rw-r--r--src/rabbit_router.erl1
3 files changed, 7 insertions, 12 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4fd503803f..082de83a19 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -344,6 +344,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun }, FunAcc,
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
+ % PubAck after message delivered to consumer (disregard consumer acks)
confirm_message(Message),
ChAckTags1 = case AckRequired of
true -> sets:add_element(AckTag, ChAckTags);
@@ -399,10 +400,7 @@ deliver_from_queue_deliver(AckRequired, false,
State #q { backing_queue_state = BQS1 }}.
confirm_message(#basic_message{msg_seq_no = MsgSeqNo, origin = ChPid}) ->
- case MsgSeqNo of
- undefined -> ok;
- _ -> rabbit_channel:confirm(ChPid, MsgSeqNo)
- end.
+ rabbit_channel:confirm(ChPid, MsgSeqNo).
run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
Funs = {fun deliver_from_queue_pred/2,
@@ -413,7 +411,6 @@ run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
attempt_delivery(none, _ChPid, Message = #basic_message{msg_seq_no = MsgSeqNo},
State = #q{backing_queue = BQ}) ->
- rabbit_log:info("Attempting delivery of message #~p~n", [MsgSeqNo]),
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) ->
@@ -429,7 +426,6 @@ attempt_delivery(Txn, ChPid, Message,
{true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}.
deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
- rabbit_log:info("deliver_or_enqueue called for message~n"),
case attempt_delivery(Txn, ChPid, Message, State) of
{true, NewState} ->
{true, NewState};
@@ -686,6 +682,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
case BQ:fetch(AckRequired, BQS) of
{empty, BQS1} -> reply(empty, State1#q{backing_queue_state = BQS1});
{{Message, IsDelivered, AckTag, Remaining}, BQS1} ->
+ % PubAck after message got
confirm_message(Message),
case AckRequired of
true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 85bae8e45b..21d4ff2a4c 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -268,7 +268,6 @@ handle_cast(multiple_ack_flush,
{noreply, State#ch{confirm = C#confirm{held_acks = gb_sets:new(),
tref = undefined}}};
handle_cast({confirm, MsgSeqNo}, State) ->
- rabbit_log:info("got confirm for #~p~n", [MsgSeqNo]),
{noreply, send_or_enqueue_ack(MsgSeqNo, State)}.
@@ -424,6 +423,8 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
+send_or_enqueue_ack(undefined, State) ->
+ State;
send_or_enqueue_ack(_, State = #ch{confirm = #confirm{enabled = false}}) ->
State;
send_or_enqueue_ack(MsgSeqNo,
@@ -469,6 +470,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
%% certain to want to look at delivery-mode and priority.
DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
IsPersistent = is_message_persistent(DecodedContent),
+ % PubAck transient messages immediately
{MsgSeqNo, State1}
= case State#ch.confirm#confirm.enabled of
false ->
@@ -497,6 +499,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
rabbit_exchange:publish(
Exchange,
rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)),
+ % PubAck after basic.returns
State2 = case RoutingRes of
routed -> State1;
unroutable ->
@@ -1282,19 +1285,15 @@ erase_queue_stats(QPid) ->
{{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0].
start_ack_timer(State = #ch{confirm = C = #confirm{tref = undefined}}) ->
- rabbit_log:info("starting ack timer...~n"),
{ok, TRef} = timer:apply_after(?MULTIPLE_ACK_FLUSH_INTERVAL,
?MODULE, flush_multiple_acks, [self()]),
State#ch{confirm = C#confirm{tref = TRef}};
start_ack_timer(State) ->
- rabbit_log:info("timer already started.. nop~n"),
State.
stop_ack_timer(State = #ch{confirm = #confirm{tref = undefined}}) ->
- rabbit_log:info("stopping a stopped ack timer.. nop~n"),
State;
stop_ack_timer(State = #ch{confirm = C = #confirm{tref = TRef}}) ->
- rabbit_log:info("canceling ack timer: ~p~n", [TRef]),
{ok, cancel} = timer:cancel(TRef),
State#ch{confirm = C#confirm{tref = undefined}}.
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 14d93497c6..0f8611d00f 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -66,7 +66,6 @@ deliver(QPids, Delivery = #delivery{mandatory = false,
delegate:invoke_no_result(
QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
case {QPids, Msg#basic_message.msg_seq_no} of
- {[], undefined} -> ok;
{[], MsgSeqNo} -> rabbit_channel:confirm(Msg#basic_message.origin, MsgSeqNo);
_ -> ok
end,