summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-06-05 13:32:02 +0100
committerMatthew Sackman <matthew@lshift.net>2009-06-05 13:32:02 +0100
commit0386c4a11a76a8d9a887c7c562b6b97dbba30f10 (patch)
tree8948539894fb113e20a935ec9f2d1b7e882c4527
parent57d7c85c2ff057617815e0cfca7bc5072f2413f6 (diff)
downloadrabbitmq-server-git-0386c4a11a76a8d9a887c7c562b6b97dbba30f10.tar.gz
Java functional tests now pass.
There was a bad interaction between delivery and the limiter. Previously, the limiter was only queried when we know we have a message to deliver. However, because of the fact that we now really store messages on disk, we don't want to read in a message only to then be told later than we have no consumer to send it to. Thus the message is not required until we know we can deliver it. The problem is then that there's a possibility that there is no message to deliver, in the mean time we've gone and asked the limiter to let us send. So the generator we pass into deliver_queue now must respond to the atom is_message_ready and tell us, without side effecting (gimme a type system, ffs), whether we have a message available, without required the message itself be presented. If so, we then check with the limiter, and proceed to deliver.
-rw-r--r--src/rabbit_amqqueue_process.erl25
1 files changed, 19 insertions, 6 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5f96b84be1..7230e09cd2 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -170,8 +170,11 @@ deliver_queue(Fun, FunAcc0,
C = #cr{limiter_pid = LimiterPid,
unsent_message_count = Count,
unacked_messages = UAM} = ch_record(ChPid),
- case not(AckRequired) orelse rabbit_limiter:can_send(
- LimiterPid, self()) of
+ IsMsgReady = Fun(is_message_ready, FunAcc0, State),
+ case not(AckRequired) orelse
+ ( IsMsgReady andalso
+ rabbit_limiter:can_send( LimiterPid, self())
+ ) of
true ->
case Fun(AckRequired, FunAcc0, State) of
{empty, FunAcc1, State2} ->
@@ -199,15 +202,21 @@ deliver_queue(Fun, FunAcc0,
true -> deliver_queue(Fun, FunAcc1, State3)
end
end;
- false ->
+ %% if IsMsgReady then (AckRequired and we've hit the limiter)
+ false when IsMsgReady ->
store_ch_record(C#cr{is_limit_active = true}),
NewConsumers = block_consumers(ChPid, RoundRobinTail),
- deliver_queue(Fun, FunAcc0, State #q { round_robin = NewConsumers })
+ deliver_queue(Fun, FunAcc0, State #q { round_robin = NewConsumers });
+ false ->
+ %% no message was ready, so we don't need to block anyone
+ {FunAcc0, State}
end;
{empty, _} ->
{FunAcc0, State}
end.
+deliver_from_queue(is_message_ready, undefined, #q { mixed_state = MS }) ->
+ not(rabbit_mixed_queue:is_empty(MS));
deliver_from_queue(AckRequired, Acc = undefined, State = #q { mixed_state = MS }) ->
{Res, MS2} = rabbit_mixed_queue:deliver(MS),
MS3 = case {Res, AckRequired} of
@@ -225,7 +234,9 @@ run_message_queue(State) ->
attempt_immediate_delivery(none, Msg, State) ->
Fun =
- fun (AckRequired, false, State2) ->
+ fun (is_message_ready, false, _State) ->
+ true;
+ (AckRequired, false, State2) ->
{AckTag, State3} =
if AckRequired ->
{ok, AckTag2, MS} = rabbit_mixed_queue:publish_delivered(Msg,
@@ -255,7 +266,7 @@ deliver_or_enqueue(Txn, Msg, State) ->
%% all these messages have already been delivered at least once and
%% not ack'd, but need to be either redelivered or requeued
deliver_or_requeue_n([], State) ->
- State;
+ run_message_queue(State);
deliver_or_requeue_n(MsgsWithAcks, State) ->
{{_RemainingLengthMinusOne, AutoAcks, OutstandingMsgs}, NewState} =
deliver_queue(fun deliver_or_requeue_msgs/3, {length(MsgsWithAcks) - 1, [], MsgsWithAcks}, State),
@@ -266,6 +277,8 @@ deliver_or_requeue_n(MsgsWithAcks, State) ->
NewState #q { mixed_state = MS2 }
end.
+deliver_or_requeue_msgs(is_message_ready, {Len, _AcksAcc, _MsgsWithAcks}, _State) ->
+ -1 < Len;
deliver_or_requeue_msgs(false, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) ->
{{Msg, true, noack, Len}, {Len - 1, [AckTag|AcksAcc], MsgsWithAcks}, State};
deliver_or_requeue_msgs(true, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) ->