summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-05-21 18:13:16 +0100
committerMatthew Sackman <matthew@lshift.net>2009-05-21 18:13:16 +0100
commit809d0768693f6a8b5218874848ef7c3e439a9840 (patch)
tree3499f65967133d4a681b2ddc65ac18c962e02776 /src
parent3bbe9499b556f2871e41d9ac76646ce635925a7d (diff)
downloadrabbitmq-server-git-809d0768693f6a8b5218874848ef7c3e439a9840.tar.gz
Still WIP + DNC. However, deliver_queue is now rather funky, pretty much incorporating a HO fold. This does rather simplify matters as it means we get told inadvance that we need to produce a message only when we have a consumer for it, and at the same time we get told about whether this message is going to get an explicit ack, and we get an accumulator to play with as well. Pretty nifty - has simplified code elsewhere.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl87
1 files changed, 47 insertions, 40 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 417c3f02b0..5e94134623 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -158,7 +158,7 @@ ch_record_state_transition(OldCR, NewCR) ->
true -> ok
end.
-deliver_queue(Fun,
+deliver_queue(Fun, FunAcc0,
State = #q{q = #amqqueue{name = QName},
round_robin = RoundRobin,
next_msg_id = NextId}) ->
@@ -172,10 +172,10 @@ deliver_queue(Fun,
case not(AckRequired) orelse rabbit_limiter:can_send(
LimiterPid, self()) of
true ->
- case Fun(State) of
- {empty, State2} ->
- {empty, State2};
- {{MsgId, Msg, MsgSize, IsDelivered, AckTag, Remaining}, State2} ->
+ case Fun(AckRequired, FunAcc0, State) of
+ {empty, FunAcc1, State2} ->
+ {FunAcc1, State2};
+ {{MsgId, Msg, MsgSize, IsDelivered, AckTag, Remaining}, FunAcc1, State2} ->
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
{QName, self(), NextId, Delivered, Message}), %% TODO FIXME
@@ -194,46 +194,51 @@ deliver_queue(Fun,
State3 = State2 #q { round_robin = NewConsumers,
next_msg_id = NextId + 1
},
- if Remaining == 0 -> {offered, AckRequired, State3};
- true -> deliver_queue(Fun, State3)
+ if Remaining == 0 -> {FunAcc1, State3};
+ true -> deliver_queue(Fun, FunAcc1, State3)
end
end;
false ->
store_ch_record(C#cr{is_limit_active = true}),
NewConsumers = block_consumers(ChPid, RoundRobinTail),
- deliver_queue(Fun, State#q{round_robin = NewConsumers})
+ deliver_queue(Fun, FunAcc0, State#q{round_robin = NewConsumers})
end;
{empty, _} ->
- {not_offered, State}
+ {FunAcc0, State}
end.
-deliver_from_queue(State = #q { mixed_state = MS }) ->
+deliver_from_queue(AckRequired, Acc, State = #q { mixed_state = MS }) ->
{Res, MS2} = rabbit_mixed_queue:deliver(MS),
- {Res, State #q { mixed_state = MS2 }}.
+ MS3 = case {Res, AckRequired} of
+ {empty, _} -> MS2;
+ {_, true} -> MS2;
+ {{_MsgId, _Msg, _MsgSize, _IsDelivered, AckTag, _Remaining}, false} ->
+ {ok, MS4} = rabbit_mixed_queue:ack([AckTag], MS2),
+ MS3
+ end,
+ {Res, Acc, State #q { mixed_state = MS3 }}.
run_message_queue(State) ->
- case deliver_queue(deliver_from_queue/1, State) of
- {not_offered, State2} ->
- State2;
- {empty, State2} ->
- State2;
- {offered, _AckRequired, State2} ->
- State2
- end.
+ {undefined, State2} = deliver_queue(deliver_from_queue/1, undefined, State),
+ State2.
attempt_delivery(none, Message, State) ->
- Fun = fun (State2) -> {{MsgId, Message, MsgSize, false, AckTag, 0}, State2} end, %% TODO FIX ME
- case deliver_queue(Fun, State) of
- {offered, false, State1} ->
- {true, State1};
- {offered, true, State1} ->
- MS = rabbit_mixed_queue:publish_delivered(Message, State1 #q.mixed_state), %% TODO API CHANGE
- {true, State1 #q { mixed_state = MS }};
- {not_offered, State1} ->
- {false, State1}
- end;
+ Fun =
+ fun (AckRequired, false, State2) ->
+ {AckTag, State3} =
+ if AckRequired ->
+ %% TODO API CHANGE
+ {ok, MS, AckTag2} = rabbit_mixed_queue:publish_delivered(Message,
+ State2 #q.mixed_state),
+ {AckTag2, State2 #q { mixed_state = MS }};
+ true ->
+ {noack, State2}
+ end,
+ {{MsgId, Message, MsgSize, false, AckTag, 0}, true, State3} %% TODO FIX ME
+ end,
+ deliver_queue(Fun, false, State);
attempt_delivery(Txn, Message, State) ->
- MS = rabbit_mixed_queue:tx_publish(Message, State #q.mixed_state), %% TODO API CHANGE
+ {ok, MS} = rabbit_mixed_queue:tx_publish(Message, State #q.mixed_state), %% TODO API CHANGE
record_pending_message(Txn, Message),
{true, State #q { mixed_state = MS }}.
@@ -242,8 +247,8 @@ deliver_or_enqueue(Txn, Message, State) ->
{true, NewState} ->
{true, NewState};
{false, NewState} ->
- %% Txn is none
- MS = rabbit_mixed_queue:publish(Message, State #q.mixed_state), %% TODO API CHANGE
+ %% Txn is none and no unblocked channels with consumers
+ {ok, MS} = rabbit_mixed_queue:publish(Message, State #q.mixed_state), %% TODO API CHANGE
{false, NewState #q { mixed_state = MS }}
end.
@@ -259,14 +264,16 @@ deliver_or_requeue_n(Messages, State) ->
State #q { mixed_state = MS2 }
end.
-deliver_or_requeue_msg(Message, {AcksAcc, State}) ->
- Fun = fun (State2) -> {{MsgId, Message, MsgSize, true, AckTag, 0}, State2} end, %% TODO FIX ME
- case deliver_queue(Fun, State) of
- {offered, true, State1} ->
- {true, {AcksAcc, State1}};
- {offered, false, State1} ->
- {true, {[AckTag|AcksAcc], State1}}; %% TODO FIXME where does AckTag come from?!
- {not_offered, State1} ->
+deliver_or_requeue_msg(Message, {AcksAcc, State}) -> %% TODO the acktag really should be within the msg here
+ Fun = fun (AckRequired, {false, AcksAcc}, State2) ->
+ AcksAcc2 = if AckRequired -> AcksAcc;
+ true -> [AckTag|AcksAcc]
+ end,
+ {{MsgId, Message, MsgSize, true, AckTag, 0}, {true, AcksAcc2}, State2} end, %% TODO FIX ME
+ case deliver_queue(Fun, {false, AcksAcc}, State) of
+ {{true, AcksAcc3}, State1} ->
+ {true, {AcksAcc3, State1}};
+ {{false, AcksAcc}, State1} ->
{false, {AcksAcc, State1}}
end.