summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Bridgen <mikeb@rabbitmq.com>2011-08-24 15:59:49 +0100
committerMichael Bridgen <mikeb@rabbitmq.com>2011-08-24 15:59:49 +0100
commit2e206712d07a22471b5d60d75b8b482e0842cfb8 (patch)
treeabf4be784f9e0d0295f95aea450c8fb88aac68a8 /src
parentcedc0b54532a6ce3958a1e07d1e9c31af4e0eff3 (diff)
downloadrabbitmq-server-git-2e206712d07a22471b5d60d75b8b482e0842cfb8.tar.gz
Rectify some poor conflict resolution choices.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl9
-rw-r--r--src/rabbit_limiter.erl13
2 files changed, 14 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2ca3c572c4..8333b753aa 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -369,7 +369,9 @@ ch_record_state_transition(OldCR, NewCR) ->
deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
State = #q{q = #amqqueue{name = QName},
active_consumers = ActiveConsumers,
- blocked_consumers = BlockedConsumers}) ->
+ blocked_consumers = BlockedConsumers,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
case queue:out(ActiveConsumers) of
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
@@ -379,7 +381,9 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
acktags = ChAckTags} = ch_record(ChPid),
IsMsgReady = PredFun(FunAcc, State),
case (IsMsgReady andalso
- rabbit_limiter:can_send(Limiter, self(), AckRequired)) of
+ rabbit_limiter:can_send(Limiter, self(),
+ AckRequired, ConsumerTag,
+ BQ:len(BQS))) of
true ->
{{Message, IsDelivered, AckTag}, FunAcc1, State1} =
DeliverFun(AckRequired, FunAcc, State),
@@ -1117,6 +1121,7 @@ handle_cast({limit, ChPid, Limiter}, State) ->
andalso rabbit_limiter:is_blocked(Limiter),
C#cr{limiter = Limiter, is_limit_active = Limited}
end));
+
handle_cast({flush, ChPid}, State) ->
ok = rabbit_channel:flushed(ChPid, self()),
noreply(State);
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index c219eaec95..5bc20636d0 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -24,7 +24,7 @@
-export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2,
disable/1]).
--export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
+-export([limit/2, can_send/5, ack/2, register/2, unregister/2]).
-export([get_limit/1, block/1, unblock/1, is_blocked/1]).
-export([set_credit/5]).
@@ -47,7 +47,7 @@
-spec(enable/2 :: (token(), non_neg_integer()) -> token()).
-spec(disable/1 :: (token()) -> token()).
-spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}).
--spec(can_send/3 :: (token(), pid(), boolean()) -> boolean()).
+-spec(can_send/3 :: (token(), pid(), boolean(), ) -> boolean()).
-spec(ack/2 :: (token(), non_neg_integer()) -> 'ok').
-spec(register/2 :: (token(), pid()) -> 'ok').
-spec(unregister/2 :: (token(), pid()) -> 'ok').
@@ -94,18 +94,19 @@ limit(Limiter, PrefetchCount) ->
%% breaching a limit. Note that we don't use maybe_call here in order
%% to avoid always going through with_exit_handler/2, even when the
%% limiter is disabled.
-can_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) ->
+can_send(#token{pid = Pid, enabled = true}, QPid, AckRequired, CTag, Len) ->
rabbit_misc:with_exit_handler(
fun () -> true end,
fun () ->
- gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity)
+ gen_server2:call(Pid, {can_send, QPid, AckRequired, CTag, Len},
+ infinity)
end);
-can_send(_, _, _) ->
+can_send(_, _, _, _, _) ->
true.
%% Let the limiter know that the channel has received some acks from a
%% consumer
-ack(Limiter, Count) -> maybe_cast(Limiter, {ack, Count}).
+ack(Limiter, CTag) -> maybe_cast(Limiter, {ack, CTag}).
register(Limiter, QPid) -> maybe_cast(Limiter, {register, QPid}).