diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-01-10 17:28:21 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-01-10 17:28:21 +0000 |
| commit | 1375ebc533cd376397ff99a8c20cd42f6fa2d4ec (patch) | |
| tree | 574fb0b7f97a6f238b799788bbb037bd47141e9a | |
| parent | e099c912c60c7ed4c51f2754227809b490bb279b (diff) | |
| download | rabbitmq-server-git-1375ebc533cd376397ff99a8c20cd42f6fa2d4ec.tar.gz | |
Cosmetic / refactor.
| -rw-r--r-- | src/rabbit_channel.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 63 |
2 files changed, 34 insertions, 42 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 13625d6373..f914aaf654 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -331,13 +331,12 @@ handle_cast({confirm, MsgSeqNos, From}, State) -> handle_info({bump_credit, Msg}, State = #ch{block_reader = BlockReader, reader_pid = ReaderPid}) -> - State1 = - case {rabbit_flow:bump(Msg), BlockReader} of - {false, true} -> rabbit_reader:conserve_memory( - self(), ReaderPid, false), - State#ch{block_reader = false}; - _ -> State - end, + State1 = case {rabbit_flow:bump(Msg), BlockReader} of + {false, true} -> rabbit_reader:conserve_memory( + self(), ReaderPid, false), + State#ch{block_reader = false}; + _ -> State + end, noreply(State1); handle_info(timeout, State) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 9e3b58aabe..87829b093f 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -278,7 +278,7 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> end. handle_other({conserve_memory, Blocker, Conserve}, Deb, State) -> - recvloop(Deb, internal_conserve_memory(Conserve, Blocker, State)); + recvloop(Deb, update_blockers(Conserve, Blocker, State)); handle_other({channel_closing, ChPid}, Deb, State) -> ok = rabbit_channel:ready_for_close(ChPid), channel_cleanup(ChPid), @@ -343,7 +343,7 @@ handle_other(emit_stats, Deb, State) -> handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); handle_other({bump_credit, Msg}, Deb, State) -> - recvloop(Deb, internal_bump_credit(Msg, State)); + recvloop(Deb, bump_credit(Msg, State)); handle_other(Other, _Deb, _State) -> %% internal error -> something worth dying for exit({unexpected_message, Other}). @@ -362,47 +362,41 @@ terminate(Explanation, State) when ?IS_RUNNING(State) -> terminate(_Explanation, State) -> {force, State}. -internal_conserve_memory(true, Blocker, - State = #v1{connection_state = running, - blockers = Blockers}) -> +update_blockers(true, Blocker, State = #v1{connection_state = running, + blockers = Blockers}) -> 0 = sets:size(Blockers), %% ASSERT State#v1{connection_state = blocking, blockers = sets:add_element(Blocker, Blockers)}; -internal_conserve_memory(true, Blocker, - State = #v1{blockers = Blockers}) -> +update_blockers(true, Blocker, State = #v1{blockers = Blockers}) -> State#v1{blockers = sets:add_element(Blocker, Blockers)}; -internal_conserve_memory(false, Blocker, - State = #v1{connection_state = blocking, - blockers = Blockers}) -> - NewBlockers = sets:del_element(Blocker, Blockers), - case sets:size(NewBlockers) of - 0 -> State#v1{connection_state = running, - blockers = NewBlockers}; - - _ -> State#v1{blockers = NewBlockers} +update_blockers(false, Blocker, State = #v1{connection_state = blocking}) -> + remove_blocker(Blocker, State); +update_blockers(false, Blocker, State = #v1{connection_state = blocked, + heartbeater = Heartbeater}) -> + State1 = remove_blocker(Blocker, State), + case State1#v1.connection_state of + running -> ok = rabbit_heartbeat:resume_monitor(Heartbeater), + State1; + _ -> State1 end; -internal_conserve_memory(false, Blocker, - State = #v1{connection_state = blocked, - blockers = Blockers, - heartbeater = Heartbeater}) -> +update_blockers(_Conserve, _Blocker, State) -> + State. + +remove_blocker(Blocker, State = #v1{blockers = Blockers}) -> NewBlockers = sets:del_element(Blocker, Blockers), case sets:size(NewBlockers) of - 0 -> ok = rabbit_heartbeat:resume_monitor(Heartbeater), - State#v1{connection_state = running, + 0 -> State#v1{connection_state = running, blockers = NewBlockers}; - _ -> State#v1{blockers = NewBlockers} - end; -internal_conserve_memory(_Conserve, _Blocker, State) -> - State. + end. -internal_bump_credit(Msg, State) -> +bump_credit(Msg, State) -> rabbit_flow:bump(Msg), - internal_conserve_memory(false, self(), State). + update_blockers(false, self(), State). -internal_check_credit(State) when ?IS_RUNNING(State) -> +check_credit(State) when ?IS_RUNNING(State) -> case rabbit_flow:blocked() of - true -> internal_conserve_memory(true, self(), State); + true -> update_blockers(true, self(), State); false -> State end. @@ -541,7 +535,7 @@ handle_frame(Type, Channel, Payload, Channel, ChPid, FramingState), put({channel, Channel}, {ChPid, NewAState}), post_process_frame(AnalyzedFrame, ChPid, - internal_check_credit(State)); + check_credit(State)); undefined -> case ?IS_RUNNING(State) of true -> send_to_new_channel( @@ -736,10 +730,9 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), - State1 = internal_conserve_memory( - rabbit_alarm:register(self(), {?MODULE, conserve_memory, - [memory_alarm]}), - memory_alarm, + State1 = update_blockers( + rabbit_alarm:register(self(), {?MODULE, conserve_memory, [mem]}), + mem, State#v1{connection_state = running, connection = NewConnection}), rabbit_event:notify(connection_created, |
