diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-06-05 13:32:02 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-06-05 13:32:02 +0100 |
| commit | 0386c4a11a76a8d9a887c7c562b6b97dbba30f10 (patch) | |
| tree | 8948539894fb113e20a935ec9f2d1b7e882c4527 | |
| parent | 57d7c85c2ff057617815e0cfca7bc5072f2413f6 (diff) | |
| download | rabbitmq-server-git-0386c4a11a76a8d9a887c7c562b6b97dbba30f10.tar.gz | |
Java functional tests now pass.
There was a bad interaction between delivery and the limiter.
Previously, the limiter was only queried when we know we have a message to deliver. However, because of the fact that we now really store messages on disk, we don't want to read in a message only to then be told later than we have no consumer to send it to. Thus the message is not required until we know we can deliver it. The problem is then that there's a possibility that there is no message to deliver, in the mean time we've gone and asked the limiter to let us send.
So the generator we pass into deliver_queue now must respond to the atom is_message_ready and tell us, without side effecting (gimme a type system, ffs), whether we have a message available, without required the message itself be presented. If so, we then check with the limiter, and proceed to deliver.
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 25 |
1 files changed, 19 insertions, 6 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5f96b84be1..7230e09cd2 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -170,8 +170,11 @@ deliver_queue(Fun, FunAcc0, C = #cr{limiter_pid = LimiterPid, unsent_message_count = Count, unacked_messages = UAM} = ch_record(ChPid), - case not(AckRequired) orelse rabbit_limiter:can_send( - LimiterPid, self()) of + IsMsgReady = Fun(is_message_ready, FunAcc0, State), + case not(AckRequired) orelse + ( IsMsgReady andalso + rabbit_limiter:can_send( LimiterPid, self()) + ) of true -> case Fun(AckRequired, FunAcc0, State) of {empty, FunAcc1, State2} -> @@ -199,15 +202,21 @@ deliver_queue(Fun, FunAcc0, true -> deliver_queue(Fun, FunAcc1, State3) end end; - false -> + %% if IsMsgReady then (AckRequired and we've hit the limiter) + false when IsMsgReady -> store_ch_record(C#cr{is_limit_active = true}), NewConsumers = block_consumers(ChPid, RoundRobinTail), - deliver_queue(Fun, FunAcc0, State #q { round_robin = NewConsumers }) + deliver_queue(Fun, FunAcc0, State #q { round_robin = NewConsumers }); + false -> + %% no message was ready, so we don't need to block anyone + {FunAcc0, State} end; {empty, _} -> {FunAcc0, State} end. +deliver_from_queue(is_message_ready, undefined, #q { mixed_state = MS }) -> + not(rabbit_mixed_queue:is_empty(MS)); deliver_from_queue(AckRequired, Acc = undefined, State = #q { mixed_state = MS }) -> {Res, MS2} = rabbit_mixed_queue:deliver(MS), MS3 = case {Res, AckRequired} of @@ -225,7 +234,9 @@ run_message_queue(State) -> attempt_immediate_delivery(none, Msg, State) -> Fun = - fun (AckRequired, false, State2) -> + fun (is_message_ready, false, _State) -> + true; + (AckRequired, false, State2) -> {AckTag, State3} = if AckRequired -> {ok, AckTag2, MS} = rabbit_mixed_queue:publish_delivered(Msg, @@ -255,7 +266,7 @@ deliver_or_enqueue(Txn, Msg, State) -> %% all these messages have already been delivered at least once and %% not ack'd, but need to be either redelivered or requeued deliver_or_requeue_n([], State) -> - State; + run_message_queue(State); deliver_or_requeue_n(MsgsWithAcks, State) -> {{_RemainingLengthMinusOne, AutoAcks, OutstandingMsgs}, NewState} = deliver_queue(fun deliver_or_requeue_msgs/3, {length(MsgsWithAcks) - 1, [], MsgsWithAcks}, State), @@ -266,6 +277,8 @@ deliver_or_requeue_n(MsgsWithAcks, State) -> NewState #q { mixed_state = MS2 } end. +deliver_or_requeue_msgs(is_message_ready, {Len, _AcksAcc, _MsgsWithAcks}, _State) -> + -1 < Len; deliver_or_requeue_msgs(false, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) -> {{Msg, true, noack, Len}, {Len - 1, [AckTag|AcksAcc], MsgsWithAcks}, State}; deliver_or_requeue_msgs(true, {Len, AcksAcc, [{Msg, AckTag} | MsgsWithAcks]}, State) -> |
