summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl87
1 files changed, 47 insertions, 40 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 417c3f02b0..5e94134623 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -158,7 +158,7 @@ ch_record_state_transition(OldCR, NewCR) ->
true -> ok
end.
-deliver_queue(Fun,
+deliver_queue(Fun, FunAcc0,
State = #q{q = #amqqueue{name = QName},
round_robin = RoundRobin,
next_msg_id = NextId}) ->
@@ -172,10 +172,10 @@ deliver_queue(Fun,
case not(AckRequired) orelse rabbit_limiter:can_send(
LimiterPid, self()) of
true ->
- case Fun(State) of
- {empty, State2} ->
- {empty, State2};
- {{MsgId, Msg, MsgSize, IsDelivered, AckTag, Remaining}, State2} ->
+ case Fun(AckRequired, FunAcc0, State) of
+ {empty, FunAcc1, State2} ->
+ {FunAcc1, State2};
+ {{MsgId, Msg, MsgSize, IsDelivered, AckTag, Remaining}, FunAcc1, State2} ->
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
{QName, self(), NextId, Delivered, Message}), %% TODO FIXME
@@ -194,46 +194,51 @@ deliver_queue(Fun,
State3 = State2 #q { round_robin = NewConsumers,
next_msg_id = NextId + 1
},
- if Remaining == 0 -> {offered, AckRequired, State3};
- true -> deliver_queue(Fun, State3)
+ if Remaining == 0 -> {FunAcc1, State3};
+ true -> deliver_queue(Fun, FunAcc1, State3)
end
end;
false ->
store_ch_record(C#cr{is_limit_active = true}),
NewConsumers = block_consumers(ChPid, RoundRobinTail),
- deliver_queue(Fun, State#q{round_robin = NewConsumers})
+ deliver_queue(Fun, FunAcc0, State#q{round_robin = NewConsumers})
end;
{empty, _} ->
- {not_offered, State}
+ {FunAcc0, State}
end.
-deliver_from_queue(State = #q { mixed_state = MS }) ->
+deliver_from_queue(AckRequired, Acc, State = #q { mixed_state = MS }) ->
{Res, MS2} = rabbit_mixed_queue:deliver(MS),
- {Res, State #q { mixed_state = MS2 }}.
+ MS3 = case {Res, AckRequired} of
+ {empty, _} -> MS2;
+ {_, true} -> MS2;
+ {{_MsgId, _Msg, _MsgSize, _IsDelivered, AckTag, _Remaining}, false} ->
+ {ok, MS4} = rabbit_mixed_queue:ack([AckTag], MS2),
+ MS3
+ end,
+ {Res, Acc, State #q { mixed_state = MS3 }}.
run_message_queue(State) ->
- case deliver_queue(deliver_from_queue/1, State) of
- {not_offered, State2} ->
- State2;
- {empty, State2} ->
- State2;
- {offered, _AckRequired, State2} ->
- State2
- end.
+ {undefined, State2} = deliver_queue(deliver_from_queue/1, undefined, State),
+ State2.
attempt_delivery(none, Message, State) ->
- Fun = fun (State2) -> {{MsgId, Message, MsgSize, false, AckTag, 0}, State2} end, %% TODO FIX ME
- case deliver_queue(Fun, State) of
- {offered, false, State1} ->
- {true, State1};
- {offered, true, State1} ->
- MS = rabbit_mixed_queue:publish_delivered(Message, State1 #q.mixed_state), %% TODO API CHANGE
- {true, State1 #q { mixed_state = MS }};
- {not_offered, State1} ->
- {false, State1}
- end;
+ Fun =
+ fun (AckRequired, false, State2) ->
+ {AckTag, State3} =
+ if AckRequired ->
+ %% TODO API CHANGE
+ {ok, MS, AckTag2} = rabbit_mixed_queue:publish_delivered(Message,
+ State2 #q.mixed_state),
+ {AckTag2, State2 #q { mixed_state = MS }};
+ true ->
+ {noack, State2}
+ end,
+ {{MsgId, Message, MsgSize, false, AckTag, 0}, true, State3} %% TODO FIX ME
+ end,
+ deliver_queue(Fun, false, State);
attempt_delivery(Txn, Message, State) ->
- MS = rabbit_mixed_queue:tx_publish(Message, State #q.mixed_state), %% TODO API CHANGE
+ {ok, MS} = rabbit_mixed_queue:tx_publish(Message, State #q.mixed_state), %% TODO API CHANGE
record_pending_message(Txn, Message),
{true, State #q { mixed_state = MS }}.
@@ -242,8 +247,8 @@ deliver_or_enqueue(Txn, Message, State) ->
{true, NewState} ->
{true, NewState};
{false, NewState} ->
- %% Txn is none
- MS = rabbit_mixed_queue:publish(Message, State #q.mixed_state), %% TODO API CHANGE
+ %% Txn is none and no unblocked channels with consumers
+ {ok, MS} = rabbit_mixed_queue:publish(Message, State #q.mixed_state), %% TODO API CHANGE
{false, NewState #q { mixed_state = MS }}
end.
@@ -259,14 +264,16 @@ deliver_or_requeue_n(Messages, State) ->
State #q { mixed_state = MS2 }
end.
-deliver_or_requeue_msg(Message, {AcksAcc, State}) ->
- Fun = fun (State2) -> {{MsgId, Message, MsgSize, true, AckTag, 0}, State2} end, %% TODO FIX ME
- case deliver_queue(Fun, State) of
- {offered, true, State1} ->
- {true, {AcksAcc, State1}};
- {offered, false, State1} ->
- {true, {[AckTag|AcksAcc], State1}}; %% TODO FIXME where does AckTag come from?!
- {not_offered, State1} ->
+deliver_or_requeue_msg(Message, {AcksAcc, State}) -> %% TODO the acktag really should be within the msg here
+ Fun = fun (AckRequired, {false, AcksAcc}, State2) ->
+ AcksAcc2 = if AckRequired -> AcksAcc;
+ true -> [AckTag|AcksAcc]
+ end,
+ {{MsgId, Message, MsgSize, true, AckTag, 0}, {true, AcksAcc2}, State2} end, %% TODO FIX ME
+ case deliver_queue(Fun, {false, AcksAcc}, State) of
+ {{true, AcksAcc3}, State1} ->
+ {true, {AcksAcc3, State1}};
+ {{false, AcksAcc}, State1} ->
{false, {AcksAcc, State1}}
end.