diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-13 17:42:03 +0100 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-08-13 17:42:03 +0100 |
| commit | fc62811384f293e4e82e583b18991f9daaac0cf6 (patch) | |
| tree | 9308b72ed4efe3dd95a5cd9cc4f813a811f22467 /src | |
| parent | 1786a75a72991b99788bdb515f92573b6f32e501 (diff) | |
| download | rabbitmq-server-git-fc62811384f293e4e82e583b18991f9daaac0cf6.tar.gz | |
refactoring
What's done:
- PubAck after transient messages
- PubAck after basic.returns
- PubAck after message delivered to a consumer (disregarding
consumer acks)
- PubAck after message got
- out of order ack'ing
- multiple ack'ing
Whant's not done:
- PubAck de-duplication
- PubAck after message hits disk
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 1 |
3 files changed, 7 insertions, 12 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 4fd503803f..082de83a19 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -344,6 +344,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun }, FunAcc, rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), + % PubAck after message delivered to consumer (disregard consumer acks) confirm_message(Message), ChAckTags1 = case AckRequired of true -> sets:add_element(AckTag, ChAckTags); @@ -399,10 +400,7 @@ deliver_from_queue_deliver(AckRequired, false, State #q { backing_queue_state = BQS1 }}. confirm_message(#basic_message{msg_seq_no = MsgSeqNo, origin = ChPid}) -> - case MsgSeqNo of - undefined -> ok; - _ -> rabbit_channel:confirm(ChPid, MsgSeqNo) - end. + rabbit_channel:confirm(ChPid, MsgSeqNo). run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> Funs = {fun deliver_from_queue_pred/2, @@ -413,7 +411,6 @@ run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> attempt_delivery(none, _ChPid, Message = #basic_message{msg_seq_no = MsgSeqNo}, State = #q{backing_queue = BQ}) -> - rabbit_log:info("Attempting delivery of message #~p~n", [MsgSeqNo]), PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> @@ -429,7 +426,6 @@ attempt_delivery(Txn, ChPid, Message, {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}. deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> - rabbit_log:info("deliver_or_enqueue called for message~n"), case attempt_delivery(Txn, ChPid, Message, State) of {true, NewState} -> {true, NewState}; @@ -686,6 +682,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, case BQ:fetch(AckRequired, BQS) of {empty, BQS1} -> reply(empty, State1#q{backing_queue_state = BQS1}); {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> + % PubAck after message got confirm_message(Message), case AckRequired of true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 85bae8e45b..21d4ff2a4c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -268,7 +268,6 @@ handle_cast(multiple_ack_flush, {noreply, State#ch{confirm = C#confirm{held_acks = gb_sets:new(), tref = undefined}}}; handle_cast({confirm, MsgSeqNo}, State) -> - rabbit_log:info("got confirm for #~p~n", [MsgSeqNo]), {noreply, send_or_enqueue_ack(MsgSeqNo, State)}. @@ -424,6 +423,8 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. +send_or_enqueue_ack(undefined, State) -> + State; send_or_enqueue_ack(_, State = #ch{confirm = #confirm{enabled = false}}) -> State; send_or_enqueue_ack(MsgSeqNo, @@ -469,6 +470,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), IsPersistent = is_message_persistent(DecodedContent), + % PubAck transient messages immediately {MsgSeqNo, State1} = case State#ch.confirm#confirm.enabled of false -> @@ -497,6 +499,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, rabbit_exchange:publish( Exchange, rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)), + % PubAck after basic.returns State2 = case RoutingRes of routed -> State1; unroutable -> @@ -1282,19 +1285,15 @@ erase_queue_stats(QPid) -> {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid =:= QPid0]. start_ack_timer(State = #ch{confirm = C = #confirm{tref = undefined}}) -> - rabbit_log:info("starting ack timer...~n"), {ok, TRef} = timer:apply_after(?MULTIPLE_ACK_FLUSH_INTERVAL, ?MODULE, flush_multiple_acks, [self()]), State#ch{confirm = C#confirm{tref = TRef}}; start_ack_timer(State) -> - rabbit_log:info("timer already started.. nop~n"), State. stop_ack_timer(State = #ch{confirm = #confirm{tref = undefined}}) -> - rabbit_log:info("stopping a stopped ack timer.. nop~n"), State; stop_ack_timer(State = #ch{confirm = C = #confirm{tref = TRef}}) -> - rabbit_log:info("canceling ack timer: ~p~n", [TRef]), {ok, cancel} = timer:cancel(TRef), State#ch{confirm = C#confirm{tref = undefined}}. diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 14d93497c6..0f8611d00f 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -66,7 +66,6 @@ deliver(QPids, Delivery = #delivery{mandatory = false, delegate:invoke_no_result( QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), case {QPids, Msg#basic_message.msg_seq_no} of - {[], undefined} -> ok; {[], MsgSeqNo} -> rabbit_channel:confirm(Msg#basic_message.origin, MsgSeqNo); _ -> ok end, |
