summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2008-12-19 16:53:51 +0000
committerMatthias Radestock <matthias@lshift.net>2008-12-19 16:53:51 +0000
commit7782a2074de06a6024373f86c505e61aebc4d08e (patch)
tree7e0078a9e03655246c4119828556e7a92522efc6 /src
parent5cd49e65fd1ab3a67810364d08bf734a9748f445 (diff)
downloadrabbitmq-server-git-7782a2074de06a6024373f86c505e61aebc4d08e.tar.gz
saner state transition handling
and assorted bug fixes
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl91
1 files changed, 39 insertions, 52 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b03887b84a..53b569b46d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -66,7 +66,6 @@
monitor_ref,
unacked_messages,
is_limit_active,
- is_overload_protection_active,
unsent_message_count}).
-define(INFO_KEYS,
@@ -133,7 +132,7 @@ ch_record(ChPid) ->
ch_pid = ChPid,
monitor_ref = MonitorRef,
unacked_messages = dict:new(),
- is_overload_protection_active = false,
+ is_limit_active = false,
unsent_message_count = 0},
put(Key, C),
C;
@@ -146,24 +145,16 @@ store_ch_record(C = #cr{ch_pid = ChPid}) ->
all_ch_record() ->
[C || {{ch, _}, C} <- get()].
-update_store_and_maybe_block_ch(
- C = #cr{is_overload_protection_active = Overloaded,
- is_limit_active = Limited,
- unsent_message_count = Count}) ->
- {Result, NewOverloaded, NewLimited} =
- if
- not(Overloaded) and (Count > ?UNSENT_MESSAGE_LIMIT) ->
- {block_ch, true, Limited};
- Overloaded and (Count == 0) ->
- {unblock_ch, false, Limited};
- Limited and (Count < ?UNSENT_MESSAGE_LIMIT) ->
- {unblock_ch, Overloaded, false};
- true ->
- {ok, Overloaded, Limited}
- end,
- store_ch_record(C#cr{is_overload_protection_active = NewOverloaded,
- is_limit_active = NewLimited}),
- Result.
+is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
+ Limited orelse Count > ?UNSENT_MESSAGE_LIMIT.
+
+ch_record_state_transition(OldCR, NewCR) ->
+ BlockedOld = is_ch_blocked(OldCR),
+ BlockedNew = is_ch_blocked(NewCR),
+ if BlockedOld andalso not(BlockedNew) -> unblock;
+ BlockedNew andalso not(BlockedOld) -> block;
+ true -> ok
+ end.
deliver_immediately(Message, Delivered,
State = #q{q = #amqqueue{name = QName},
@@ -213,12 +204,13 @@ really_deliver(AckRequired, ChPid, ConsumerTag, Delivered, Message, NextId,
true -> dict:store(NextId, Message, UAM);
false -> UAM
end,
+ NewC = C#cr{unsent_message_count = Count + 1,
+ unacked_messages = NewUAM},
+ store_ch_record(NewC),
NewConsumers =
- case update_store_and_maybe_block_ch(
- C#cr{unsent_message_count = Count + 1,
- unacked_messages = NewUAM}) of
- ok -> queue:in(QEntry, RoundRobinTail);
- block_ch -> block_consumers(ChPid, RoundRobinTail)
+ case ch_record_state_transition(C, NewC) of
+ ok -> queue:in(QEntry, RoundRobinTail);
+ block -> block_consumers(ChPid, RoundRobinTail)
end,
{offered, AckRequired, State#q{round_robin = NewConsumers,
next_msg_id = NextId +1}}.
@@ -270,16 +262,22 @@ block_consumer(ChPid, ConsumerTag, RoundRobin) ->
(CP /= ChPid) or (CT /= ConsumerTag)
end, queue:to_list(RoundRobin))).
-possibly_unblock(C = #cr{consumers = Consumers, ch_pid = ChPid},
- State = #q{round_robin = RoundRobin}) ->
- case update_store_and_maybe_block_ch(C) of
- ok ->
+possibly_unblock(State, ChPid, Update) ->
+ case lookup_ch(ChPid) of
+ not_found ->
State;
- unblock_ch ->
- run_poke_burst(State#q{round_robin =
- unblock_consumers(ChPid, Consumers, RoundRobin)})
+ C ->
+ NewC = Update(C),
+ store_ch_record(NewC),
+ case ch_record_state_transition(C, NewC) of
+ ok -> State;
+ unblock -> NewRR = unblock_consumers(ChPid,
+ NewC#cr.consumers,
+ State#q.round_robin),
+ run_poke_burst(State#q{round_robin = NewRR})
+ end
end.
-
+
check_auto_delete(State = #q{q = #amqqueue{auto_delete = false}}) ->
{continue, State};
check_auto_delete(State = #q{has_had_consumers = false}) ->
@@ -764,27 +762,16 @@ handle_cast({requeue, MsgIds, ChPid}, State) ->
end;
handle_cast({unblock, ChPid}, State) ->
- % TODO Refactor the code duplication
- % between this an the notify_sent cast handler
- case lookup_ch(ChPid) of
- not_found ->
- noreply(State);
- C = #cr{is_limit_active = true} ->
- noreply(possibly_unblock(C, State));
- C ->
- rabbit_log:warning("Ignoring unblock for an active ch: ~p~n",
- [C]),
- noreply(State)
- end;
+ noreply(
+ possibly_unblock(State, ChPid,
+ fun (C) -> C#cr{is_limit_active = false} end));
handle_cast({notify_sent, ChPid}, State) ->
- case lookup_ch(ChPid) of
- not_found -> noreply(State);
- T = #cr{unsent_message_count =Count} ->
- noreply(possibly_unblock(
- T#cr{unsent_message_count = Count - 1},
- State))
- end.
+ noreply(
+ possibly_unblock(State, ChPid,
+ fun (C = #cr{unsent_message_count = Count}) ->
+ C#cr{unsent_message_count = Count - 1}
+ end)).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->