summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-09-18 00:46:16 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-09-18 00:46:16 +0100
commit7678f8aeb43c9a3d08229efaa217027548296a13 (patch)
tree7f5b4694482643eb622271af66dec7a85a07fcf8 /src
parent739ee88e529aa0ffcd5e37d24a2d58b92f3d177d (diff)
parentff8bf657ab87743b19d872b8766b42c9844dc1dd (diff)
downloadrabbitmq-server-git-7678f8aeb43c9a3d08229efaa217027548296a13.tar.gz
merge from default
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl95
1 files changed, 47 insertions, 48 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 0cd03daf48..e4b48c4706 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -353,7 +353,8 @@ update_ch_record(C = #cr{consumer_count = ConsumerCount,
case {sets:size(ChAckTags), ConsumerCount, UnsentMessageCount} of
{0, 0, 0} -> ok = erase_ch_record(C);
_ -> ok = store_ch_record(C)
- end.
+ end,
+ C.
store_ch_record(C = #cr{ch_pid = ChPid}) ->
put({ch, ChPid}, C),
@@ -367,6 +368,16 @@ erase_ch_record(#cr{ch_pid = ChPid,
erase({ch, ChPid}),
ok.
+update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) ->
+ ok = rabbit_limiter:register(Limiter, self()),
+ update_ch_record(C#cr{consumer_count = 1});
+update_consumer_count(C = #cr{consumer_count = 1, limiter = Limiter}, -1) ->
+ ok = rabbit_limiter:unregister(Limiter, self()),
+ update_ch_record(C#cr{consumer_count = 0,
+ limiter = rabbit_limiter:make_token()});
+update_consumer_count(C = #cr{consumer_count = Count}, Delta) ->
+ update_ch_record(C#cr{consumer_count = Count + Delta}).
+
all_ch_record() -> [C || {{ch, _}, C} <- get()].
block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) ->
@@ -564,15 +575,6 @@ requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
BQS1
end, State).
-process_acks(ChPid, AckTags, State, Fun) ->
- case lookup_ch(ChPid) of
- not_found ->
- noreply(State);
- C = #cr{acktags = ChAckTags} ->
- update_ch_record(C#cr{acktags = subtract_acks(ChAckTags, AckTags)}),
- noreply(Fun(State))
- end.
-
fetch(AckRequired, State = #q{backing_queue_state = BQS,
backing_queue = BQ}) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
@@ -674,8 +676,16 @@ run_backing_queue(Mod, Fun, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
run_message_queue(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}).
-subtract_acks(A, B) when is_list(B) ->
- lists:foldl(fun sets:del_element/2, A, B).
+subtract_acks(ChPid, AckTags, State, Fun) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ State;
+ C = #cr{acktags = ChAckTags} ->
+ update_ch_record(
+ C#cr{acktags = lists:foldl(fun sets:del_element/2,
+ ChAckTags, AckTags)}),
+ Fun(State)
+ end.
discard_delivery(#delivery{sender = ChPid,
message = Message},
@@ -959,15 +969,10 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
in_use ->
reply({error, exclusive_consume_unavailable}, State);
ok ->
- C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
+ C = ch_record(ChPid),
+ C1 = update_consumer_count(C#cr{limiter = Limiter}, +1),
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
- C1 = C#cr{consumer_count = ConsumerCount +1,
- limiter = Limiter},
- ok = case ConsumerCount of
- 0 -> rabbit_limiter:register(Limiter, self());
- _ -> ok
- end,
ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag};
true -> ExistingHolder
end,
@@ -976,7 +981,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
ok = maybe_send_reply(ChPid, OkMsg),
E = {ChPid, Consumer},
State2 =
- case is_ch_blocked(C) of
+ case is_ch_blocked(C1) of
true -> block_consumer(C1, E),
State1;
false -> update_ch_record(C1),
@@ -994,15 +999,8 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
not_found ->
ok = maybe_send_reply(ChPid, OkMsg),
reply(ok, State);
- C = #cr{consumer_count = ConsumerCount,
- limiter = Limiter} ->
- C1 = C#cr{consumer_count = ConsumerCount -1},
- update_ch_record(
- case ConsumerCount of
- 1 -> ok = rabbit_limiter:unregister(Limiter, self()),
- C1#cr{limiter = rabbit_limiter:make_token()};
- _ -> C1
- end),
+ C ->
+ update_consumer_count(C, -1),
emit_consumer_deleted(ChPid, ConsumerTag),
ok = maybe_send_reply(ChPid, OkMsg),
NewState =
@@ -1047,8 +1045,9 @@ handle_call(purge, _From, State = #q{backing_queue = BQ,
handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
- noreply(process_acks(ChPid, AckTags, State,
- fun (State1) -> requeue_and_run(AckTags, State1) end)).
+ noreply(subtract_acks(
+ ChPid, AckTags, State,
+ fun (State1) -> requeue_and_run(AckTags, State1) end)).
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
@@ -1058,25 +1057,25 @@ handle_cast({deliver, Delivery}, State) ->
noreply(deliver_or_enqueue(Delivery, State));
handle_cast({ack, AckTags, ChPid}, State) ->
- noreply(process_acks(ChPid, AckTags, State,
- fun (State1 = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {_Guids, BQS1} = BQ:ack(AckTags, BQS),
- State1#q{backing_queue_state = BQS1}
- end));
+ noreply(subtract_acks(
+ ChPid, AckTags, State,
+ fun (State1 = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ State1#q{backing_queue_state = BQS1}
+ end));
handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
- noreply(process_acks(ChPid, AckTags, State,
- fun (State1 = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- case Requeue of
- true -> requeue_and_run(AckTags, State1);
- false -> {_Guids, BQS1} =
- BQ:ack(AckTags, BQS),
- State1#q{
- backing_queue_state = BQS1}
- end
- end));
+ noreply(subtract_acks(
+ ChPid, AckTags, State,
+ fun (State1 = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ case Requeue of
+ true -> requeue_and_run(AckTags, State1);
+ false -> {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ State1#q{backing_queue_state = BQS1}
+ end
+ end));
handle_cast(delete_immediately, State) ->
{stop, normal, State};