summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJerry Kuch <jerryk@vmware.com>2010-11-11 10:47:39 +0000
committerJerry Kuch <jerryk@vmware.com>2010-11-11 10:47:39 +0000
commit75981e271febab439d0923e90fa800c451d08824 (patch)
treef283ccc2268dab7b335e1108388388f012eba3f6
parent60b28b903d118ca5302b948f25f88f5b5b775c3e (diff)
parent6e7d448b39a9caef8659909c063ff6082b7e513e (diff)
downloadrabbitmq-server-git-75981e271febab439d0923e90fa800c451d08824.tar.gz
Merge with Matthew's cleanups.
-rw-r--r--src/rabbit_amqqueue_process.erl32
1 files changed, 17 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6d212157a6..9d4bff41d1 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -333,6 +333,11 @@ maybe_store_ch_record(C = #cr{consumer_count = ConsumerCount,
true
end.
+demonitor_and_erase_ch_record(#cr{ch_pid = ChPid,
+ monitor_ref = MonitorRef}) ->
+ erlang:demonitor(MonitorRef),
+ erase({ch, ChPid}).
+
all_ch_record() ->
[C || {{ch, _}, C} <- get()].
@@ -379,7 +384,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
end,
NewC = C#cr{unsent_message_count = Count + 1,
acktags = ChAckTags1},
- maybe_store_ch_record(NewC),
+ true = maybe_store_ch_record(NewC),
{NewActiveConsumers, NewBlockedConsumers} =
case ch_record_state_transition(C, NewC) of
ok -> {queue:in(QEntry, ActiveConsumersTail),
@@ -398,7 +403,7 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
deliver_msgs_to_consumers(Funs, FunAcc1, State2);
%% if IsMsgReady then we've hit the limiter
false when IsMsgReady ->
- maybe_store_ch_record(C#cr{is_limit_active = true}),
+ true = maybe_store_ch_record(C#cr{is_limit_active = true}),
{NewActiveConsumers, NewBlockedConsumers} =
move_consumers(ChPid,
ActiveConsumers,
@@ -788,8 +793,9 @@ handle_call({basic_get, ChPid, NoAck}, _From,
{{Message, IsDelivered, AckTag, Remaining}, State2} ->
case AckRequired of
true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
- maybe_store_ch_record(
- C#cr{acktags = sets:add_element(AckTag, ChAckTags)});
+ true = maybe_store_ch_record(
+ C#cr{acktags = sets:add_element(AckTag,
+ ChAckTags)});
false -> ok
end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
@@ -807,8 +813,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid,
C = #cr{consumer_count = ConsumerCount} = ch_record(ChPid),
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck},
- maybe_store_ch_record(C#cr{consumer_count = ConsumerCount +1,
- limiter_pid = LimiterPid}),
+ true = maybe_store_ch_record(C#cr{consumer_count = ConsumerCount +1,
+ limiter_pid = LimiterPid}),
ok = case ConsumerCount of
0 -> rabbit_limiter:register(LimiterPid, self());
_ -> ok
@@ -849,11 +855,12 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
%% condition for unregistering the limiter regardless of
%% whether or not we can dispose of the ch record
%% entirely.
- case ConsumerCount-1 of
- 0 -> ok = rabbit_limiter:unregister(LimiterPid, self());
- _ -> ok
+ C2 = case ConsumerCount-1 of
+ 0 -> ok = rabbit_limiter:unregister(LimiterPid, self()),
+ C1#cr{limiter_pid = undefined};
+ _ -> C1
end,
- maybe_store_ch_record(C1),
+ maybe_store_ch_record(C2),
ok = maybe_send_reply(ChPid, OkMsg),
NewState =
State#q{exclusive_consumer = cancel_holder(ChPid,
@@ -1058,8 +1065,3 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
State1 = State#q{stats_timer = rabbit_event:stop_stats_timer(StatsTimer),
backing_queue_state = BQS2},
{hibernate, stop_rate_timer(State1)}.
-
-demonitor_and_erase_ch_record(#cr{ch_pid = ChPid,
- monitor_ref = MonitorRef}) ->
- erlang:demonitor(MonitorRef),
- erase({ch, ChPid}).