diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-05-21 18:13:16 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-05-21 18:13:16 +0100 |
| commit | 809d0768693f6a8b5218874848ef7c3e439a9840 (patch) | |
| tree | 3499f65967133d4a681b2ddc65ac18c962e02776 | |
| parent | 3bbe9499b556f2871e41d9ac76646ce635925a7d (diff) | |
| download | rabbitmq-server-git-809d0768693f6a8b5218874848ef7c3e439a9840.tar.gz | |
Still WIP + DNC. However, deliver_queue is now rather funky, pretty much incorporating a HO fold. This does rather simplify matters as it means we get told inadvance that we need to produce a message only when we have a consumer for it, and at the same time we get told about whether this message is going to get an explicit ack, and we get an accumulator to play with as well. Pretty nifty - has simplified code elsewhere.
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 87 |
1 files changed, 47 insertions, 40 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 417c3f02b0..5e94134623 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -158,7 +158,7 @@ ch_record_state_transition(OldCR, NewCR) -> true -> ok end. -deliver_queue(Fun, +deliver_queue(Fun, FunAcc0, State = #q{q = #amqqueue{name = QName}, round_robin = RoundRobin, next_msg_id = NextId}) -> @@ -172,10 +172,10 @@ deliver_queue(Fun, case not(AckRequired) orelse rabbit_limiter:can_send( LimiterPid, self()) of true -> - case Fun(State) of - {empty, State2} -> - {empty, State2}; - {{MsgId, Msg, MsgSize, IsDelivered, AckTag, Remaining}, State2} -> + case Fun(AckRequired, FunAcc0, State) of + {empty, FunAcc1, State2} -> + {FunAcc1, State2}; + {{MsgId, Msg, MsgSize, IsDelivered, AckTag, Remaining}, FunAcc1, State2} -> rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, {QName, self(), NextId, Delivered, Message}), %% TODO FIXME @@ -194,46 +194,51 @@ deliver_queue(Fun, State3 = State2 #q { round_robin = NewConsumers, next_msg_id = NextId + 1 }, - if Remaining == 0 -> {offered, AckRequired, State3}; - true -> deliver_queue(Fun, State3) + if Remaining == 0 -> {FunAcc1, State3}; + true -> deliver_queue(Fun, FunAcc1, State3) end end; false -> store_ch_record(C#cr{is_limit_active = true}), NewConsumers = block_consumers(ChPid, RoundRobinTail), - deliver_queue(Fun, State#q{round_robin = NewConsumers}) + deliver_queue(Fun, FunAcc0, State#q{round_robin = NewConsumers}) end; {empty, _} -> - {not_offered, State} + {FunAcc0, State} end. -deliver_from_queue(State = #q { mixed_state = MS }) -> +deliver_from_queue(AckRequired, Acc, State = #q { mixed_state = MS }) -> {Res, MS2} = rabbit_mixed_queue:deliver(MS), - {Res, State #q { mixed_state = MS2 }}. + MS3 = case {Res, AckRequired} of + {empty, _} -> MS2; + {_, true} -> MS2; + {{_MsgId, _Msg, _MsgSize, _IsDelivered, AckTag, _Remaining}, false} -> + {ok, MS4} = rabbit_mixed_queue:ack([AckTag], MS2), + MS3 + end, + {Res, Acc, State #q { mixed_state = MS3 }}. run_message_queue(State) -> - case deliver_queue(deliver_from_queue/1, State) of - {not_offered, State2} -> - State2; - {empty, State2} -> - State2; - {offered, _AckRequired, State2} -> - State2 - end. + {undefined, State2} = deliver_queue(deliver_from_queue/1, undefined, State), + State2. attempt_delivery(none, Message, State) -> - Fun = fun (State2) -> {{MsgId, Message, MsgSize, false, AckTag, 0}, State2} end, %% TODO FIX ME - case deliver_queue(Fun, State) of - {offered, false, State1} -> - {true, State1}; - {offered, true, State1} -> - MS = rabbit_mixed_queue:publish_delivered(Message, State1 #q.mixed_state), %% TODO API CHANGE - {true, State1 #q { mixed_state = MS }}; - {not_offered, State1} -> - {false, State1} - end; + Fun = + fun (AckRequired, false, State2) -> + {AckTag, State3} = + if AckRequired -> + %% TODO API CHANGE + {ok, MS, AckTag2} = rabbit_mixed_queue:publish_delivered(Message, + State2 #q.mixed_state), + {AckTag2, State2 #q { mixed_state = MS }}; + true -> + {noack, State2} + end, + {{MsgId, Message, MsgSize, false, AckTag, 0}, true, State3} %% TODO FIX ME + end, + deliver_queue(Fun, false, State); attempt_delivery(Txn, Message, State) -> - MS = rabbit_mixed_queue:tx_publish(Message, State #q.mixed_state), %% TODO API CHANGE + {ok, MS} = rabbit_mixed_queue:tx_publish(Message, State #q.mixed_state), %% TODO API CHANGE record_pending_message(Txn, Message), {true, State #q { mixed_state = MS }}. @@ -242,8 +247,8 @@ deliver_or_enqueue(Txn, Message, State) -> {true, NewState} -> {true, NewState}; {false, NewState} -> - %% Txn is none - MS = rabbit_mixed_queue:publish(Message, State #q.mixed_state), %% TODO API CHANGE + %% Txn is none and no unblocked channels with consumers + {ok, MS} = rabbit_mixed_queue:publish(Message, State #q.mixed_state), %% TODO API CHANGE {false, NewState #q { mixed_state = MS }} end. @@ -259,14 +264,16 @@ deliver_or_requeue_n(Messages, State) -> State #q { mixed_state = MS2 } end. -deliver_or_requeue_msg(Message, {AcksAcc, State}) -> - Fun = fun (State2) -> {{MsgId, Message, MsgSize, true, AckTag, 0}, State2} end, %% TODO FIX ME - case deliver_queue(Fun, State) of - {offered, true, State1} -> - {true, {AcksAcc, State1}}; - {offered, false, State1} -> - {true, {[AckTag|AcksAcc], State1}}; %% TODO FIXME where does AckTag come from?! - {not_offered, State1} -> +deliver_or_requeue_msg(Message, {AcksAcc, State}) -> %% TODO the acktag really should be within the msg here + Fun = fun (AckRequired, {false, AcksAcc}, State2) -> + AcksAcc2 = if AckRequired -> AcksAcc; + true -> [AckTag|AcksAcc] + end, + {{MsgId, Message, MsgSize, true, AckTag, 0}, {true, AcksAcc2}, State2} end, %% TODO FIX ME + case deliver_queue(Fun, {false, AcksAcc}, State) of + {{true, AcksAcc3}, State1} -> + {true, {AcksAcc3, State1}}; + {{false, AcksAcc}, State1} -> {false, {AcksAcc, State1}} end. |
