diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 87 |
1 files changed, 47 insertions, 40 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 417c3f02b0..5e94134623 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -158,7 +158,7 @@ ch_record_state_transition(OldCR, NewCR) -> true -> ok end. -deliver_queue(Fun, +deliver_queue(Fun, FunAcc0, State = #q{q = #amqqueue{name = QName}, round_robin = RoundRobin, next_msg_id = NextId}) -> @@ -172,10 +172,10 @@ deliver_queue(Fun, case not(AckRequired) orelse rabbit_limiter:can_send( LimiterPid, self()) of true -> - case Fun(State) of - {empty, State2} -> - {empty, State2}; - {{MsgId, Msg, MsgSize, IsDelivered, AckTag, Remaining}, State2} -> + case Fun(AckRequired, FunAcc0, State) of + {empty, FunAcc1, State2} -> + {FunAcc1, State2}; + {{MsgId, Msg, MsgSize, IsDelivered, AckTag, Remaining}, FunAcc1, State2} -> rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, {QName, self(), NextId, Delivered, Message}), %% TODO FIXME @@ -194,46 +194,51 @@ deliver_queue(Fun, State3 = State2 #q { round_robin = NewConsumers, next_msg_id = NextId + 1 }, - if Remaining == 0 -> {offered, AckRequired, State3}; - true -> deliver_queue(Fun, State3) + if Remaining == 0 -> {FunAcc1, State3}; + true -> deliver_queue(Fun, FunAcc1, State3) end end; false -> store_ch_record(C#cr{is_limit_active = true}), NewConsumers = block_consumers(ChPid, RoundRobinTail), - deliver_queue(Fun, State#q{round_robin = NewConsumers}) + deliver_queue(Fun, FunAcc0, State#q{round_robin = NewConsumers}) end; {empty, _} -> - {not_offered, State} + {FunAcc0, State} end. -deliver_from_queue(State = #q { mixed_state = MS }) -> +deliver_from_queue(AckRequired, Acc, State = #q { mixed_state = MS }) -> {Res, MS2} = rabbit_mixed_queue:deliver(MS), - {Res, State #q { mixed_state = MS2 }}. + MS3 = case {Res, AckRequired} of + {empty, _} -> MS2; + {_, true} -> MS2; + {{_MsgId, _Msg, _MsgSize, _IsDelivered, AckTag, _Remaining}, false} -> + {ok, MS4} = rabbit_mixed_queue:ack([AckTag], MS2), + MS3 + end, + {Res, Acc, State #q { mixed_state = MS3 }}. run_message_queue(State) -> - case deliver_queue(deliver_from_queue/1, State) of - {not_offered, State2} -> - State2; - {empty, State2} -> - State2; - {offered, _AckRequired, State2} -> - State2 - end. + {undefined, State2} = deliver_queue(deliver_from_queue/1, undefined, State), + State2. attempt_delivery(none, Message, State) -> - Fun = fun (State2) -> {{MsgId, Message, MsgSize, false, AckTag, 0}, State2} end, %% TODO FIX ME - case deliver_queue(Fun, State) of - {offered, false, State1} -> - {true, State1}; - {offered, true, State1} -> - MS = rabbit_mixed_queue:publish_delivered(Message, State1 #q.mixed_state), %% TODO API CHANGE - {true, State1 #q { mixed_state = MS }}; - {not_offered, State1} -> - {false, State1} - end; + Fun = + fun (AckRequired, false, State2) -> + {AckTag, State3} = + if AckRequired -> + %% TODO API CHANGE + {ok, MS, AckTag2} = rabbit_mixed_queue:publish_delivered(Message, + State2 #q.mixed_state), + {AckTag2, State2 #q { mixed_state = MS }}; + true -> + {noack, State2} + end, + {{MsgId, Message, MsgSize, false, AckTag, 0}, true, State3} %% TODO FIX ME + end, + deliver_queue(Fun, false, State); attempt_delivery(Txn, Message, State) -> - MS = rabbit_mixed_queue:tx_publish(Message, State #q.mixed_state), %% TODO API CHANGE + {ok, MS} = rabbit_mixed_queue:tx_publish(Message, State #q.mixed_state), %% TODO API CHANGE record_pending_message(Txn, Message), {true, State #q { mixed_state = MS }}. @@ -242,8 +247,8 @@ deliver_or_enqueue(Txn, Message, State) -> {true, NewState} -> {true, NewState}; {false, NewState} -> - %% Txn is none - MS = rabbit_mixed_queue:publish(Message, State #q.mixed_state), %% TODO API CHANGE + %% Txn is none and no unblocked channels with consumers + {ok, MS} = rabbit_mixed_queue:publish(Message, State #q.mixed_state), %% TODO API CHANGE {false, NewState #q { mixed_state = MS }} end. @@ -259,14 +264,16 @@ deliver_or_requeue_n(Messages, State) -> State #q { mixed_state = MS2 } end. -deliver_or_requeue_msg(Message, {AcksAcc, State}) -> - Fun = fun (State2) -> {{MsgId, Message, MsgSize, true, AckTag, 0}, State2} end, %% TODO FIX ME - case deliver_queue(Fun, State) of - {offered, true, State1} -> - {true, {AcksAcc, State1}}; - {offered, false, State1} -> - {true, {[AckTag|AcksAcc], State1}}; %% TODO FIXME where does AckTag come from?! - {not_offered, State1} -> +deliver_or_requeue_msg(Message, {AcksAcc, State}) -> %% TODO the acktag really should be within the msg here + Fun = fun (AckRequired, {false, AcksAcc}, State2) -> + AcksAcc2 = if AckRequired -> AcksAcc; + true -> [AckTag|AcksAcc] + end, + {{MsgId, Message, MsgSize, true, AckTag, 0}, {true, AcksAcc2}, State2} end, %% TODO FIX ME + case deliver_queue(Fun, {false, AcksAcc}, State) of + {{true, AcksAcc3}, State1} -> + {true, {AcksAcc3, State1}}; + {{false, AcksAcc}, State1} -> {false, {AcksAcc, State1}} end. |
