diff options
| author | Jerry Kuch <jerryk@vmware.com> | 2010-11-11 10:47:39 +0000 |
|---|---|---|
| committer | Jerry Kuch <jerryk@vmware.com> | 2010-11-11 10:47:39 +0000 |
| commit | 75981e271febab439d0923e90fa800c451d08824 (patch) | |
| tree | f283ccc2268dab7b335e1108388388f012eba3f6 | |
| parent | 60b28b903d118ca5302b948f25f88f5b5b775c3e (diff) | |
| parent | 6e7d448b39a9caef8659909c063ff6082b7e513e (diff) | |
| download | rabbitmq-server-git-75981e271febab439d0923e90fa800c451d08824.tar.gz | |
Merge with Matthew's cleanups.
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 32 |
1 files changed, 17 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6d212157a6..9d4bff41d1 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -333,6 +333,11 @@ maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount, true end. +demonitor_and_erase_ch_record(#cr{ch_pid = ChPid, + monitor_ref = MonitorRef}) -> + erlang:demonitor(MonitorRef), + erase({ch, ChPid}). + all_ch_record() -> [C || {{ch, _}, C} <- get()]. @@ -379,7 +384,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, end, NewC = C#cr{unsent_message_count = Count + 1, acktags = ChAckTags1}, - maybe_store_ch_record(NewC), + true = maybe_store_ch_record(NewC), {NewActiveConsumers, NewBlockedConsumers} = case ch_record_state_transition(C, NewC) of ok -> {queue:in(QEntry, ActiveConsumersTail), @@ -398,7 +403,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, deliver_msgs_to_consumers(Funs, FunAcc1, State2); %% if IsMsgReady then we've hit the limiter false when IsMsgReady -> - maybe_store_ch_record(C#cr{is_limit_active = true}), + true = maybe_store_ch_record(C#cr{is_limit_active = true}), {NewActiveConsumers, NewBlockedConsumers} = move_consumers(ChPid, ActiveConsumers, @@ -788,8 +793,9 @@ handle_call({basic_get, ChPid, NoAck}, _From, {{Message, IsDelivered, AckTag, Remaining}, State2} -> case AckRequired of true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), - maybe_store_ch_record( - C#cr{acktags = sets:add_element(AckTag, ChAckTags)}); + true = maybe_store_ch_record( + C#cr{acktags = sets:add_element(AckTag, + ChAckTags)}); false -> ok end, Msg = {QName, self(), AckTag, IsDelivered, Message}, @@ -807,8 +813,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, - maybe_store_ch_record(C#cr{consumer_count = ConsumerCount +1, - limiter_pid = LimiterPid}), + true = maybe_store_ch_record(C#cr{consumer_count = ConsumerCount +1, + limiter_pid = LimiterPid}), ok = case ConsumerCount of 0 -> rabbit_limiter:register(LimiterPid, self()); _ -> ok @@ -849,11 +855,12 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, %% condition for unregistering the limiter regardless of %% whether or not we can dispose of the ch record %% entirely. - case ConsumerCount-1 of - 0 -> ok = rabbit_limiter:unregister(LimiterPid, self()); - _ -> ok + C2 = case ConsumerCount-1 of + 0 -> ok = rabbit_limiter:unregister(LimiterPid, self()), + C1#cr{limiter_pid = undefined}; + _ -> C1 end, - maybe_store_ch_record(C1), + maybe_store_ch_record(C2), ok = maybe_send_reply(ChPid, OkMsg), NewState = State#q{exclusive_consumer = cancel_holder(ChPid, @@ -1058,8 +1065,3 @@ handle_pre_hibernate(State = #q{backing_queue = BQ, State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer), backing_queue_state = BQS2}, {hibernate, stop_rate_timer(State1)}. - -demonitor_and_erase_ch_record(#cr{ch_pid = ChPid, - monitor_ref = MonitorRef}) -> - erlang:demonitor(MonitorRef), - erase({ch, ChPid}). |
