diff options
| author | Jerry Kuch <jerryk@vmware.com> | 2010-11-10 15:48:46 +0000 |
|---|---|---|
| committer | Jerry Kuch <jerryk@vmware.com> | 2010-11-10 15:48:46 +0000 |
| commit | 477bcb7e42b7cac12ca0379e8b8cf580c0088e34 (patch) | |
| tree | a7d9b72a23475c8447197f207c0e3e5b5378a29c /src | |
| parent | 4ff2f25b20ed4c461ec3bb41b2e99bc9c0a8222e (diff) | |
| download | rabbitmq-server-git-477bcb7e42b7cac12ca0379e8b8cf580c0088e34.tar.gz | |
Refactor update_ch_record into maybe_store_record and make its use universal.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 45 |
1 files changed, 24 insertions, 21 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 94cbef8648..48a2287f60 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -315,20 +315,23 @@ ch_record(ChPid) -> store_ch_record(C = #cr{ch_pid = ChPid}) -> put({ch, ChPid}, C). -%% If the channel record we're considering submitting to the process dictionary -%% has no consumers, has no pending acks, and doesn't have a transaction, we -%% should delete its record rather than storing it. -update_ch_record(C = #cr{consumer_count = ConsumerCount, - limiter_pid = LimiterPid, - acktags = ChAckTags, - txn = Txn}) -> +%% If the channel record we're considering submitting to the process +%% dictionary has no consumers, has no pending acks, and doesn't have +%% a transaction, we should delete its record rather than storing it. +%% If the ch record was stored, this function returns true; otherwise, +%% the ch record is erased and this function returns false. +maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount, + limiter_pid = LimiterPid, + acktags = ChAckTags, + txn = Txn}) -> case {sets:size(ChAckTags), ConsumerCount, Txn} of {0, 0, none} -> demonitor_and_erase_ch_record(C), - ok = rabbit_limiter:unregister(LimiterPid, self()); - _ -> store_ch_record(C) + ok = rabbit_limiter:unregister(LimiterPid, self()), + false; + _ -> store_ch_record(C), + true end. - all_ch_record() -> [C || {{ch, _}, C} <- get()]. @@ -375,7 +378,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, end, NewC = C#cr{unsent_message_count = Count + 1, acktags = ChAckTags1}, - store_ch_record(NewC), + maybe_store_ch_record(NewC), {NewActiveConsumers, NewBlockedConsumers} = case ch_record_state_transition(C, NewC) of ok -> {queue:in(QEntry, ActiveConsumersTail), @@ -394,7 +397,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, deliver_msgs_to_consumers(Funs, FunAcc1, State2); %% if IsMsgReady then we've hit the limiter false when IsMsgReady -> - store_ch_record(C#cr{is_limit_active = true}), + maybe_store_ch_record(C#cr{is_limit_active = true}), {NewActiveConsumers, NewBlockedConsumers} = move_consumers(ChPid, ActiveConsumers, @@ -493,7 +496,7 @@ possibly_unblock(State, ChPid, Update) -> State; C -> NewC = Update(C), - store_ch_record(NewC), + maybe_store_ch_record(NewC), case ch_record_state_transition(C, NewC) of ok -> State; unblock -> {NewBlockedConsumers, NewActiveConsumers} = @@ -574,7 +577,7 @@ commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, %% by the channel. C = #cr{acktags = ChAckTags} = lookup_ch(ChPid), ChAckTags1 = subtract_acks(ChAckTags, AckTags), - update_ch_record(C#cr{acktags = ChAckTags1, txn = none}), + maybe_store_ch_record(C#cr{acktags = ChAckTags1, txn = none}), State#q{backing_queue_state = BQS1}. rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ, @@ -784,7 +787,7 @@ handle_call({basic_get, ChPid, NoAck}, _From, {{Message, IsDelivered, AckTag, Remaining}, State2} -> case AckRequired of true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), - store_ch_record( + maybe_store_ch_record( C#cr{acktags = sets:add_element(AckTag, ChAckTags)}); false -> ok end, @@ -803,8 +806,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, - store_ch_record(C#cr{consumer_count = ConsumerCount +1, - limiter_pid = LimiterPid}), + maybe_store_ch_record(C#cr{consumer_count = ConsumerCount +1, + limiter_pid = LimiterPid}), ok = case ConsumerCount of 0 -> rabbit_limiter:register(LimiterPid, self()); _ -> ok @@ -840,7 +843,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, reply(ok, State); C = #cr{consumer_count = ConsumerCount} -> C1 = C#cr{consumer_count = ConsumerCount-1}, - update_ch_record(C1), + maybe_store_ch_record(C1), ok = maybe_send_reply(ChPid, OkMsg), NewState = State#q{exclusive_consumer = cancel_holder(ChPid, @@ -889,7 +892,7 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> noreply(State); C = #cr{acktags = ChAckTags} -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), - update_ch_record(C#cr{acktags = ChAckTags1}), + maybe_store_ch_record(C#cr{acktags = ChAckTags1}), noreply(requeue_and_run(AckTags, State)) end; @@ -913,7 +916,7 @@ handle_cast({ack, Txn, AckTags, ChPid}, {C#cr{acktags = ChAckTags1}, BQ:ack(AckTags, BQS)}; _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)} end, - update_ch_record(C1), + maybe_store_ch_record(C1), noreply(State#q{backing_queue_state = BQS1}) end; @@ -924,7 +927,7 @@ handle_cast({reject, AckTags, Requeue, ChPid}, noreply(State); C = #cr{acktags = ChAckTags} -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), - update_ch_record(C#cr{acktags = ChAckTags1}), + maybe_store_ch_record(C#cr{acktags = ChAckTags1}), noreply(case Requeue of true -> requeue_and_run(AckTags, State); false -> BQS1 = BQ:ack(AckTags, BQS), |
