summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-01-10 17:28:21 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-01-10 17:28:21 +0000
commit1375ebc533cd376397ff99a8c20cd42f6fa2d4ec (patch)
tree574fb0b7f97a6f238b799788bbb037bd47141e9a
parente099c912c60c7ed4c51f2754227809b490bb279b (diff)
downloadrabbitmq-server-git-1375ebc533cd376397ff99a8c20cd42f6fa2d4ec.tar.gz
Cosmetic / refactor.
-rw-r--r--src/rabbit_channel.erl13
-rw-r--r--src/rabbit_reader.erl63
2 files changed, 34 insertions, 42 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 13625d6373..f914aaf654 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -331,13 +331,12 @@ handle_cast({confirm, MsgSeqNos, From}, State) ->
handle_info({bump_credit, Msg}, State = #ch{block_reader = BlockReader,
reader_pid = ReaderPid}) ->
- State1 =
- case {rabbit_flow:bump(Msg), BlockReader} of
- {false, true} -> rabbit_reader:conserve_memory(
- self(), ReaderPid, false),
- State#ch{block_reader = false};
- _ -> State
- end,
+ State1 = case {rabbit_flow:bump(Msg), BlockReader} of
+ {false, true} -> rabbit_reader:conserve_memory(
+ self(), ReaderPid, false),
+ State#ch{block_reader = false};
+ _ -> State
+ end,
noreply(State1);
handle_info(timeout, State) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 9e3b58aabe..87829b093f 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -278,7 +278,7 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
end.
handle_other({conserve_memory, Blocker, Conserve}, Deb, State) ->
- recvloop(Deb, internal_conserve_memory(Conserve, Blocker, State));
+ recvloop(Deb, update_blockers(Conserve, Blocker, State));
handle_other({channel_closing, ChPid}, Deb, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
channel_cleanup(ChPid),
@@ -343,7 +343,7 @@ handle_other(emit_stats, Deb, State) ->
handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State);
handle_other({bump_credit, Msg}, Deb, State) ->
- recvloop(Deb, internal_bump_credit(Msg, State));
+ recvloop(Deb, bump_credit(Msg, State));
handle_other(Other, _Deb, _State) ->
%% internal error -> something worth dying for
exit({unexpected_message, Other}).
@@ -362,47 +362,41 @@ terminate(Explanation, State) when ?IS_RUNNING(State) ->
terminate(_Explanation, State) ->
{force, State}.
-internal_conserve_memory(true, Blocker,
- State = #v1{connection_state = running,
- blockers = Blockers}) ->
+update_blockers(true, Blocker, State = #v1{connection_state = running,
+ blockers = Blockers}) ->
0 = sets:size(Blockers), %% ASSERT
State#v1{connection_state = blocking,
blockers = sets:add_element(Blocker, Blockers)};
-internal_conserve_memory(true, Blocker,
- State = #v1{blockers = Blockers}) ->
+update_blockers(true, Blocker, State = #v1{blockers = Blockers}) ->
State#v1{blockers = sets:add_element(Blocker, Blockers)};
-internal_conserve_memory(false, Blocker,
- State = #v1{connection_state = blocking,
- blockers = Blockers}) ->
- NewBlockers = sets:del_element(Blocker, Blockers),
- case sets:size(NewBlockers) of
- 0 -> State#v1{connection_state = running,
- blockers = NewBlockers};
-
- _ -> State#v1{blockers = NewBlockers}
+update_blockers(false, Blocker, State = #v1{connection_state = blocking}) ->
+ remove_blocker(Blocker, State);
+update_blockers(false, Blocker, State = #v1{connection_state = blocked,
+ heartbeater = Heartbeater}) ->
+ State1 = remove_blocker(Blocker, State),
+ case State1#v1.connection_state of
+ running -> ok = rabbit_heartbeat:resume_monitor(Heartbeater),
+ State1;
+ _ -> State1
end;
-internal_conserve_memory(false, Blocker,
- State = #v1{connection_state = blocked,
- blockers = Blockers,
- heartbeater = Heartbeater}) ->
+update_blockers(_Conserve, _Blocker, State) ->
+ State.
+
+remove_blocker(Blocker, State = #v1{blockers = Blockers}) ->
NewBlockers = sets:del_element(Blocker, Blockers),
case sets:size(NewBlockers) of
- 0 -> ok = rabbit_heartbeat:resume_monitor(Heartbeater),
- State#v1{connection_state = running,
+ 0 -> State#v1{connection_state = running,
blockers = NewBlockers};
-
_ -> State#v1{blockers = NewBlockers}
- end;
-internal_conserve_memory(_Conserve, _Blocker, State) ->
- State.
+ end.
-internal_bump_credit(Msg, State) ->
+bump_credit(Msg, State) ->
rabbit_flow:bump(Msg),
- internal_conserve_memory(false, self(), State).
+ update_blockers(false, self(), State).
-internal_check_credit(State) when ?IS_RUNNING(State) ->
+check_credit(State) when ?IS_RUNNING(State) ->
case rabbit_flow:blocked() of
- true -> internal_conserve_memory(true, self(), State);
+ true -> update_blockers(true, self(), State);
false -> State
end.
@@ -541,7 +535,7 @@ handle_frame(Type, Channel, Payload,
Channel, ChPid, FramingState),
put({channel, Channel}, {ChPid, NewAState}),
post_process_frame(AnalyzedFrame, ChPid,
- internal_check_credit(State));
+ check_credit(State));
undefined ->
case ?IS_RUNNING(State) of
true -> send_to_new_channel(
@@ -736,10 +730,9 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
- State1 = internal_conserve_memory(
- rabbit_alarm:register(self(), {?MODULE, conserve_memory,
- [memory_alarm]}),
- memory_alarm,
+ State1 = update_blockers(
+ rabbit_alarm:register(self(), {?MODULE, conserve_memory, [mem]}),
+ mem,
State#v1{connection_state = running,
connection = NewConnection}),
rabbit_event:notify(connection_created,