diff options
| author | Matthias Radestock <matthias@lshift.net> | 2008-12-19 16:53:51 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2008-12-19 16:53:51 +0000 |
| commit | 7782a2074de06a6024373f86c505e61aebc4d08e (patch) | |
| tree | 7e0078a9e03655246c4119828556e7a92522efc6 | |
| parent | 5cd49e65fd1ab3a67810364d08bf734a9748f445 (diff) | |
| download | rabbitmq-server-git-7782a2074de06a6024373f86c505e61aebc4d08e.tar.gz | |
saner state transition handling
and assorted bug fixes
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 91 |
1 files changed, 39 insertions, 52 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b03887b84a..53b569b46d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -66,7 +66,6 @@ monitor_ref, unacked_messages, is_limit_active, - is_overload_protection_active, unsent_message_count}). -define(INFO_KEYS, @@ -133,7 +132,7 @@ ch_record(ChPid) -> ch_pid = ChPid, monitor_ref = MonitorRef, unacked_messages = dict:new(), - is_overload_protection_active = false, + is_limit_active = false, unsent_message_count = 0}, put(Key, C), C; @@ -146,24 +145,16 @@ store_ch_record(C = #cr{ch_pid = ChPid}) -> all_ch_record() -> [C || {{ch, _}, C} <- get()]. -update_store_and_maybe_block_ch( - C = #cr{is_overload_protection_active = Overloaded, - is_limit_active = Limited, - unsent_message_count = Count}) -> - {Result, NewOverloaded, NewLimited} = - if - not(Overloaded) and (Count > ?UNSENT_MESSAGE_LIMIT) -> - {block_ch, true, Limited}; - Overloaded and (Count == 0) -> - {unblock_ch, false, Limited}; - Limited and (Count < ?UNSENT_MESSAGE_LIMIT) -> - {unblock_ch, Overloaded, false}; - true -> - {ok, Overloaded, Limited} - end, - store_ch_record(C#cr{is_overload_protection_active = NewOverloaded, - is_limit_active = NewLimited}), - Result. +is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> + Limited orelse Count > ?UNSENT_MESSAGE_LIMIT. + +ch_record_state_transition(OldCR, NewCR) -> + BlockedOld = is_ch_blocked(OldCR), + BlockedNew = is_ch_blocked(NewCR), + if BlockedOld andalso not(BlockedNew) -> unblock; + BlockedNew andalso not(BlockedOld) -> block; + true -> ok + end. deliver_immediately(Message, Delivered, State = #q{q = #amqqueue{name = QName}, @@ -213,12 +204,13 @@ really_deliver(AckRequired, ChPid, ConsumerTag, Delivered, Message, NextId, true -> dict:store(NextId, Message, UAM); false -> UAM end, + NewC = C#cr{unsent_message_count = Count + 1, + unacked_messages = NewUAM}, + store_ch_record(NewC), NewConsumers = - case update_store_and_maybe_block_ch( - C#cr{unsent_message_count = Count + 1, - unacked_messages = NewUAM}) of - ok -> queue:in(QEntry, RoundRobinTail); - block_ch -> block_consumers(ChPid, RoundRobinTail) + case ch_record_state_transition(C, NewC) of + ok -> queue:in(QEntry, RoundRobinTail); + block -> block_consumers(ChPid, RoundRobinTail) end, {offered, AckRequired, State#q{round_robin = NewConsumers, next_msg_id = NextId +1}}. @@ -270,16 +262,22 @@ block_consumer(ChPid, ConsumerTag, RoundRobin) -> (CP /= ChPid) or (CT /= ConsumerTag) end, queue:to_list(RoundRobin))). -possibly_unblock(C = #cr{consumers = Consumers, ch_pid = ChPid}, - State = #q{round_robin = RoundRobin}) -> - case update_store_and_maybe_block_ch(C) of - ok -> +possibly_unblock(State, ChPid, Update) -> + case lookup_ch(ChPid) of + not_found -> State; - unblock_ch -> - run_poke_burst(State#q{round_robin = - unblock_consumers(ChPid, Consumers, RoundRobin)}) + C -> + NewC = Update(C), + store_ch_record(NewC), + case ch_record_state_transition(C, NewC) of + ok -> State; + unblock -> NewRR = unblock_consumers(ChPid, + NewC#cr.consumers, + State#q.round_robin), + run_poke_burst(State#q{round_robin = NewRR}) + end end. - + check_auto_delete(State = #q{q = #amqqueue{auto_delete = false}}) -> {continue, State}; check_auto_delete(State = #q{has_had_consumers = false}) -> @@ -764,27 +762,16 @@ handle_cast({requeue, MsgIds, ChPid}, State) -> end; handle_cast({unblock, ChPid}, State) -> - % TODO Refactor the code duplication - % between this an the notify_sent cast handler - case lookup_ch(ChPid) of - not_found -> - noreply(State); - C = #cr{is_limit_active = true} -> - noreply(possibly_unblock(C, State)); - C -> - rabbit_log:warning("Ignoring unblock for an active ch: ~p~n", - [C]), - noreply(State) - end; + noreply( + possibly_unblock(State, ChPid, + fun (C) -> C#cr{is_limit_active = false} end)); handle_cast({notify_sent, ChPid}, State) -> - case lookup_ch(ChPid) of - not_found -> noreply(State); - T = #cr{unsent_message_count =Count} -> - noreply(possibly_unblock( - T#cr{unsent_message_count = Count - 1}, - State)) - end. + noreply( + possibly_unblock(State, ChPid, + fun (C = #cr{unsent_message_count = Count}) -> + C#cr{unsent_message_count = Count - 1} + end)). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> |
