diff options
| author | Michael Bridgen <mikeb@rabbitmq.com> | 2011-08-24 15:59:49 +0100 |
|---|---|---|
| committer | Michael Bridgen <mikeb@rabbitmq.com> | 2011-08-24 15:59:49 +0100 |
| commit | 2e206712d07a22471b5d60d75b8b482e0842cfb8 (patch) | |
| tree | abf4be784f9e0d0295f95aea450c8fb88aac68a8 | |
| parent | cedc0b54532a6ce3958a1e07d1e9c31af4e0eff3 (diff) | |
| download | rabbitmq-server-git-2e206712d07a22471b5d60d75b8b482e0842cfb8.tar.gz | |
Rectify some poor conflict resolution choices.
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 13 |
2 files changed, 14 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2ca3c572c4..8333b753aa 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -369,7 +369,9 @@ ch_record_state_transition(OldCR, NewCR) -> deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, State = #q{q = #amqqueue{name = QName}, active_consumers = ActiveConsumers, - blocked_consumers = BlockedConsumers}) -> + blocked_consumers = BlockedConsumers, + backing_queue = BQ, + backing_queue_state = BQS}) -> case queue:out(ActiveConsumers) of {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, ack_required = AckRequired}}}, @@ -379,7 +381,9 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, acktags = ChAckTags} = ch_record(ChPid), IsMsgReady = PredFun(FunAcc, State), case (IsMsgReady andalso - rabbit_limiter:can_send(Limiter, self(), AckRequired)) of + rabbit_limiter:can_send(Limiter, self(), + AckRequired, ConsumerTag, + BQ:len(BQS))) of true -> {{Message, IsDelivered, AckTag}, FunAcc1, State1} = DeliverFun(AckRequired, FunAcc, State), @@ -1117,6 +1121,7 @@ handle_cast({limit, ChPid, Limiter}, State) -> andalso rabbit_limiter:is_blocked(Limiter), C#cr{limiter = Limiter, is_limit_active = Limited} end)); + handle_cast({flush, ChPid}, State) -> ok = rabbit_channel:flushed(ChPid, self()), noreply(State); diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index c219eaec95..5bc20636d0 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -24,7 +24,7 @@ -export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2, disable/1]). --export([limit/2, can_send/3, ack/2, register/2, unregister/2]). +-export([limit/2, can_send/5, ack/2, register/2, unregister/2]). -export([get_limit/1, block/1, unblock/1, is_blocked/1]). -export([set_credit/5]). @@ -47,7 +47,7 @@ -spec(enable/2 :: (token(), non_neg_integer()) -> token()). -spec(disable/1 :: (token()) -> token()). -spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}). --spec(can_send/3 :: (token(), pid(), boolean()) -> boolean()). +-spec(can_send/3 :: (token(), pid(), boolean(), ) -> boolean()). -spec(ack/2 :: (token(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (token(), pid()) -> 'ok'). -spec(unregister/2 :: (token(), pid()) -> 'ok'). @@ -94,18 +94,19 @@ limit(Limiter, PrefetchCount) -> %% breaching a limit. Note that we don't use maybe_call here in order %% to avoid always going through with_exit_handler/2, even when the %% limiter is disabled. -can_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) -> +can_send(#token{pid = Pid, enabled = true}, QPid, AckRequired, CTag, Len) -> rabbit_misc:with_exit_handler( fun () -> true end, fun () -> - gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity) + gen_server2:call(Pid, {can_send, QPid, AckRequired, CTag, Len}, + infinity) end); -can_send(_, _, _) -> +can_send(_, _, _, _, _) -> true. %% Let the limiter know that the channel has received some acks from a %% consumer -ack(Limiter, Count) -> maybe_cast(Limiter, {ack, Count}). +ack(Limiter, CTag) -> maybe_cast(Limiter, {ack, CTag}). register(Limiter, QPid) -> maybe_cast(Limiter, {register, QPid}). |
