diff options
| author | Tim Fox <tim@rabbitmq.com> | 2011-03-04 11:53:55 +0000 |
|---|---|---|
| committer | Tim Fox <tim@rabbitmq.com> | 2011-03-04 11:53:55 +0000 |
| commit | 5b014bff96f8edbf70fcc3a39c7d26bcd525dcd8 (patch) | |
| tree | 5c0fa43af519f37a43aaf189ad0b3dee7f8fe00b | |
| parent | e0fb813eb903254369d616f67861de420aef4b4c (diff) | |
| parent | ba3a1565304a4d3d1239a528b71c79eea1bd72f3 (diff) | |
| download | rabbitmq-server-git-5b014bff96f8edbf70fcc3a39c7d26bcd525dcd8.tar.gz | |
merged default into branch, again
| -rw-r--r-- | src/file_handle_cache.erl | 77 |
1 files changed, 46 insertions, 31 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index f41815d06b..ad80b86c5a 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -867,34 +867,35 @@ handle_call({open, Pid, Requested, EldestUnusedSince}, From, false -> {noreply, run_pending_item(Item, State1)} end; -handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit, - obtain_count = Count, - obtain_pending = Pending, - clients = Clients }) - when Limit =/= infinity andalso Count >= Limit -> - ok = track_client(Pid, Clients), - true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), - Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From }, - {noreply, State #fhc_state { obtain_pending = pending_in(Item, Pending) }}; handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, obtain_pending = Pending, clients = Clients }) -> - Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From }, ok = track_client(Pid, Clients), - case needs_reduce(State #fhc_state { obtain_count = Count + 1 }) of - true -> - true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), - {noreply, reduce(State #fhc_state { - obtain_pending = pending_in(Item, Pending) })}; - false -> - {noreply, run_pending_item(Item, State)} - end; + Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From }, + Enqueue = fun () -> + true = ets:update_element(Clients, Pid, + {#cstate.blocked, true}), + State #fhc_state { + obtain_pending = pending_in(Item, Pending) } + end, + {noreply, + case obtain_limit_reached(State) of + true -> Enqueue(); + false -> case needs_reduce(State #fhc_state { + obtain_count = Count + 1 }) of + true -> reduce(Enqueue()); + false -> adjust_alarm( + State, run_pending_item(Item, State)) + end + end}; handle_call({set_limit, Limit}, _From, State) -> - {reply, ok, maybe_reduce( - process_pending(State #fhc_state { - limit = Limit, - obtain_limit = obtain_limit(Limit) }))}; + {reply, ok, adjust_alarm( + State, maybe_reduce( + process_pending( + State #fhc_state { + limit = Limit, + obtain_limit = obtain_limit(Limit) })))}; handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) -> {reply, Limit, State}; @@ -923,9 +924,9 @@ handle_cast({close, Pid, EldestUnusedSince}, _ -> dict:store(Pid, EldestUnusedSince, Elders) end, ets:update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}), - {noreply, process_pending( + {noreply, adjust_alarm(State, process_pending( update_counts(open, Pid, -1, - State #fhc_state { elders = Elders1 }))}; + State #fhc_state { elders = Elders1 })))}; handle_cast({transfer, FromPid, ToPid}, State) -> ok = track_client(ToPid, State#fhc_state.clients), @@ -947,13 +948,15 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, ets:lookup(Clients, Pid), true = ets:delete(Clients, Pid), FilterFun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end, - {noreply, process_pending( - State #fhc_state { - open_count = OpenCount - Opened, - open_pending = filter_pending(FilterFun, OpenPending), - obtain_count = ObtainCount - Obtained, - obtain_pending = filter_pending(FilterFun, ObtainPending), - elders = dict:erase(Pid, Elders) })}. + {noreply, adjust_alarm( + State, + process_pending( + State #fhc_state { + open_count = OpenCount - Opened, + open_pending = filter_pending(FilterFun, OpenPending), + obtain_count = ObtainCount - Obtained, + obtain_pending = filter_pending(FilterFun, ObtainPending), + elders = dict:erase(Pid, Elders) }))}. terminate(_Reason, State = #fhc_state { clients = Clients }) -> ets:delete(Clients), @@ -1013,6 +1016,18 @@ obtain_limit(Limit) -> case ?OBTAIN_LIMIT(Limit) of OLimit -> OLimit end. +obtain_limit_reached(#fhc_state { obtain_limit = Limit, + obtain_count = Count}) -> + Limit =/= infinity andalso Count >= Limit. + +adjust_alarm(OldState, NewState) -> + case {obtain_limit_reached(OldState), obtain_limit_reached(NewState)} of + {false, true} -> alarm_handler:set_alarm({file_descriptor_limit, []}); + {true, false} -> alarm_handler:clear_alarm(file_descriptor_limit); + _ -> ok + end, + NewState. + requested({_Kind, _Pid, Requested, _From}) -> Requested. |
