summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJerry Kuch <jerryk@vmware.com>2010-11-10 15:48:46 +0000
committerJerry Kuch <jerryk@vmware.com>2010-11-10 15:48:46 +0000
commit477bcb7e42b7cac12ca0379e8b8cf580c0088e34 (patch)
treea7d9b72a23475c8447197f207c0e3e5b5378a29c
parent4ff2f25b20ed4c461ec3bb41b2e99bc9c0a8222e (diff)
downloadrabbitmq-server-git-477bcb7e42b7cac12ca0379e8b8cf580c0088e34.tar.gz
Refactor update_ch_record into maybe_store_record and make its use universal.
-rw-r--r--src/rabbit_amqqueue_process.erl45
1 files changed, 24 insertions, 21 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 94cbef8648..48a2287f60 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -315,20 +315,23 @@ ch_record(ChPid) ->
store_ch_record(C = #cr{ch_pid = ChPid}) ->
put({ch, ChPid}, C).
-%% If the channel record we're considering submitting to the process dictionary
-%% has no consumers, has no pending acks, and doesn't have a transaction, we
-%% should delete its record rather than storing it.
-update_ch_record(C = #cr{consumer_count = ConsumerCount,
- limiter_pid = LimiterPid,
- acktags = ChAckTags,
- txn = Txn}) ->
+%% If the channel record we're considering submitting to the process
+%% dictionary has no consumers, has no pending acks, and doesn't have
+%% a transaction, we should delete its record rather than storing it.
+%% If the ch record was stored, this function returns true; otherwise,
+%% the ch record is erased and this function returns false.
+maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount,
+ limiter_pid = LimiterPid,
+ acktags = ChAckTags,
+ txn = Txn}) ->
case {sets:size(ChAckTags), ConsumerCount, Txn} of
{0, 0, none} -> demonitor_and_erase_ch_record(C),
- ok = rabbit_limiter:unregister(LimiterPid, self());
- _ -> store_ch_record(C)
+ ok = rabbit_limiter:unregister(LimiterPid, self()),
+ false;
+ _ -> store_ch_record(C),
+ true
end.
-
all_ch_record() ->
[C || {{ch, _}, C} <- get()].
@@ -375,7 +378,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
end,
NewC = C#cr{unsent_message_count = Count + 1,
acktags = ChAckTags1},
- store_ch_record(NewC),
+ maybe_store_ch_record(NewC),
{NewActiveConsumers, NewBlockedConsumers} =
case ch_record_state_transition(C, NewC) of
ok -> {queue:in(QEntry, ActiveConsumersTail),
@@ -394,7 +397,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
deliver_msgs_to_consumers(Funs, FunAcc1, State2);
%% if IsMsgReady then we've hit the limiter
false when IsMsgReady ->
- store_ch_record(C#cr{is_limit_active = true}),
+ maybe_store_ch_record(C#cr{is_limit_active = true}),
{NewActiveConsumers, NewBlockedConsumers} =
move_consumers(ChPid,
ActiveConsumers,
@@ -493,7 +496,7 @@ possibly_unblock(State, ChPid, Update) ->
State;
C ->
NewC = Update(C),
- store_ch_record(NewC),
+ maybe_store_ch_record(NewC),
case ch_record_state_transition(C, NewC) of
ok -> State;
unblock -> {NewBlockedConsumers, NewActiveConsumers} =
@@ -574,7 +577,7 @@ commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
%% by the channel.
C = #cr{acktags = ChAckTags} = lookup_ch(ChPid),
ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- update_ch_record(C#cr{acktags = ChAckTags1, txn = none}),
+ maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}),
State#q{backing_queue_state = BQS1}.
rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ,
@@ -784,7 +787,7 @@ handle_call({basic_get, ChPid, NoAck}, _From,
{{Message, IsDelivered, AckTag, Remaining}, State2} ->
case AckRequired of
true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
- store_ch_record(
+ maybe_store_ch_record(
C#cr{acktags = sets:add_element(AckTag, ChAckTags)});
false -> ok
end,
@@ -803,8 +806,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid,
C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
- store_ch_record(C#cr{consumer_count = ConsumerCount +1,
- limiter_pid = LimiterPid}),
+ maybe_store_ch_record(C#cr{consumer_count = ConsumerCount +1,
+ limiter_pid = LimiterPid}),
ok = case ConsumerCount of
0 -> rabbit_limiter:register(LimiterPid, self());
_ -> ok
@@ -840,7 +843,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
reply(ok, State);
C = #cr{consumer_count = ConsumerCount} ->
C1 = C#cr{consumer_count = ConsumerCount-1},
- update_ch_record(C1),
+ maybe_store_ch_record(C1),
ok = maybe_send_reply(ChPid, OkMsg),
NewState =
State#q{exclusive_consumer = cancel_holder(ChPid,
@@ -889,7 +892,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
noreply(State);
C = #cr{acktags = ChAckTags} ->
ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- update_ch_record(C#cr{acktags = ChAckTags1}),
+ maybe_store_ch_record(C#cr{acktags = ChAckTags1}),
noreply(requeue_and_run(AckTags, State))
end;
@@ -913,7 +916,7 @@ handle_cast({ack, Txn, AckTags, ChPid},
{C#cr{acktags = ChAckTags1}, BQ:ack(AckTags, BQS)};
_ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)}
end,
- update_ch_record(C1),
+ maybe_store_ch_record(C1),
noreply(State#q{backing_queue_state = BQS1})
end;
@@ -924,7 +927,7 @@ handle_cast({reject, AckTags, Requeue, ChPid},
noreply(State);
C = #cr{acktags = ChAckTags} ->
ChAckTags1 = subtract_acks(ChAckTags, AckTags),
- update_ch_record(C#cr{acktags = ChAckTags1}),
+ maybe_store_ch_record(C#cr{acktags = ChAckTags1}),
noreply(case Requeue of
true -> requeue_and_run(AckTags, State);
false -> BQS1 = BQ:ack(AckTags, BQS),