# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import atexit, time, errno from compat import select, set, selectable_waiter from threading import Thread, Lock class Acceptor: def __init__(self, sock, handler): self.sock = sock self.handler = handler def fileno(self): return self.sock.fileno() def reading(self): return True def writing(self): return False def readable(self): sock, addr = self.sock.accept() self.handler(sock) class Selector: lock = Lock() DEFAULT = None @staticmethod def default(): Selector.lock.acquire() try: if Selector.DEFAULT is None: sel = Selector() atexit.register(sel.stop) sel.start() Selector.DEFAULT = sel return Selector.DEFAULT finally: Selector.lock.release() def __init__(self): self.selectables = set() self.reading = set() self.writing = set() self.waiter = selectable_waiter() self.reading.add(self.waiter) self.stopped = False self.thread = None def wakeup(self): self.waiter.wakeup() def register(self, selectable): self.selectables.add(selectable) self.modify(selectable) def _update(self, selectable): if selectable.reading(): self.reading.add(selectable) else: self.reading.discard(selectable) if selectable.writing(): self.writing.add(selectable) else: self.writing.discard(selectable) return selectable.timing() def modify(self, selectable): self._update(selectable) self.wakeup() def unregister(self, selectable): self.reading.discard(selectable) self.writing.discard(selectable) self.selectables.discard(selectable) self.wakeup() def start(self): self.stopped = False self.thread = Thread(target=self.run) self.thread.setDaemon(True) self.thread.start(); def run(self): while not self.stopped: wakeup = None for sel in self.selectables.copy(): t = self._update(sel) if t is not None: if wakeup is None: wakeup = t else: wakeup = min(wakeup, t) rd = [] wr = [] ex = [] while True: try: if wakeup is None: timeout = None else: timeout = max(0, wakeup - time.time()) rd, wr, ex = select(self.reading, self.writing, (), timeout) break except Exception, (err, strerror): # Repeat the select call if we were interrupted. if err == errno.EINTR: continue else: raise for sel in wr: if sel.writing(): sel.writeable() for sel in rd: if sel.reading(): sel.readable() now = time.time() for sel in self.selectables.copy(): w = sel.timing() if w is not None and now > w: sel.timeout() def stop(self, timeout=None): self.stopped = True self.wakeup() self.thread.join(timeout) self.thread = None