diff options
| author | David Cramer <dcramer@gmail.com> | 2013-05-28 04:29:51 -0700 |
|---|---|---|
| committer | David Cramer <dcramer@gmail.com> | 2013-05-28 04:32:26 -0700 |
| commit | ba7631d6c2a9b23a5fd64387901ab995a339f519 (patch) | |
| tree | 78c622611d8e7106f63dd4ecaf8053d22b896d97 | |
| parent | f4c2969e926aedf0a5760a204b1b1d89534d5b7a (diff) | |
| download | raven-cleanup-threaded.tar.gz | |
Set a cap on threaded queue and minor cleanupcleanup-threaded
| -rw-r--r-- | raven/transport/threaded.py | 69 | ||||
| -rw-r--r-- | raven/utils/__init__.py | 26 | ||||
| -rw-r--r-- | raven/utils/compat.py | 4 |
3 files changed, 68 insertions, 31 deletions
diff --git a/raven/transport/threaded.py b/raven/transport/threaded.py index 11197aa..c1d48c4 100644 --- a/raven/transport/threaded.py +++ b/raven/transport/threaded.py @@ -8,14 +8,16 @@ raven.transport.threaded import atexit import logging +import os import time import threading -import os -from raven.utils.compat import Queue +from raven.utils import memoize +from raven.utils.compat import Queue, Full from raven.transport.base import HTTPTransport, AsyncTransport DEFAULT_TIMEOUT = 10 +QUEUE_SIZE = 100 logger = logging.getLogger('sentry.errors') @@ -23,8 +25,8 @@ logger = logging.getLogger('sentry.errors') class AsyncWorker(object): _terminator = object() - def __init__(self, shutdown_timeout=DEFAULT_TIMEOUT): - self._queue = Queue(-1) + def __init__(self, shutdown_timeout=DEFAULT_TIMEOUT, queue_size=QUEUE_SIZE): + self._queue = Queue(queue_size) self._lock = threading.Lock() self._thread = None self.options = { @@ -34,26 +36,30 @@ class AsyncWorker(object): def main_thread_terminated(self): size = self._queue.qsize() - if size: - timeout = self.options['shutdown_timeout'] - print("Sentry is attempting to send %s pending error messages" % size) - print("Waiting up to %s seconds" % timeout) - if os.name == 'nt': - print("Press Ctrl-Break to quit") - else: - print("Press Ctrl-C to quit") - self.stop(timeout=timeout) + if not size: + return + + timeout = self.options['shutdown_timeout'] + print("Sentry is attempting to send %s pending error messages" % size) + print("Waiting up to %s seconds" % timeout) + if os.name == 'nt': + print("Press Ctrl-Break to quit") + else: + print("Press Ctrl-C to quit") + self.stop(timeout=timeout) def start(self): """ Starts the task thread. """ + if not self._thread: + return + self._lock.acquire() try: - if not self._thread: - self._thread = threading.Thread(target=self._target) - self._thread.setDaemon(True) - self._thread.start() + self._thread = threading.Thread(target=self._target) + self._thread.setDaemon(True) + self._thread.start() finally: self._lock.release() atexit.register(self.main_thread_terminated) @@ -62,17 +68,22 @@ class AsyncWorker(object): """ Stops the task thread. Synchronous! """ + if not self._thread: + return + self._lock.acquire() try: - if self._thread: - self._queue.put_nowait(self._terminator) - self._thread.join(timeout=timeout) - self._thread = None + self._queue.put_nowait(self._terminator) + self._thread.join(timeout=timeout) + self._thread = None finally: self._lock.release() def queue(self, callback, *args, **kwargs): - self._queue.put_nowait((callback, args, kwargs)) + try: + self._queue.put_nowait((callback, args, kwargs)) + except Full: + logger.error('Unable to queue job (full)', exc_info=True) def _target(self): while 1: @@ -92,16 +103,16 @@ class ThreadedHTTPTransport(AsyncTransport, HTTPTransport): scheme = ['threaded+http', 'threaded+https'] - def __init__(self, parsed_url): + def __init__(self, parsed_url, queue_size=QUEUE_SIZE): super(ThreadedHTTPTransport, self).__init__(parsed_url) # remove the threaded+ from the protocol, as it is not a real protocol self._url = self._url.split('+', 1)[-1] + self._queue_size = queue_size - def get_worker(self): - if not hasattr(self, '_worker'): - self._worker = AsyncWorker() - return self._worker + @memoize + def worker(self): + return AsyncWorker(queue_size=self._queue_size) def send_sync(self, data, headers, success_cb, failure_cb): try: @@ -112,5 +123,5 @@ class ThreadedHTTPTransport(AsyncTransport, HTTPTransport): success_cb() def async_send(self, data, headers, success_cb, failure_cb): - self.get_worker().queue(self.send_sync, data, headers, success_cb, - failure_cb) + self.worker.queue( + self.send_sync, data, headers, success_cb, failure_cb) diff --git a/raven/utils/__init__.py b/raven/utils/__init__.py index 30736b8..2ed213d 100644 --- a/raven/utils/__init__.py +++ b/raven/utils/__init__.py @@ -117,3 +117,29 @@ def get_auth_header(protocol, timestamp, client, api_key, api_secret=None, **kwa header.append(('sentry_secret', api_secret)) return 'Sentry %s' % ', '.join('%s=%s' % (k, v) for k, v in header) + + +class memoize(object): + """ + Memoize the result of a property call. + + >>> class A(object): + >>> @memoize + >>> def func(self): + >>> return 'foo' + """ + + def __init__(self, func): + self.__name__ = func.__name__ + self.__module__ = func.__module__ + self.__doc__ = func.__doc__ + self.func = func + + def __get__(self, obj, type=None): + if obj is None: + return self + d, n = vars(obj), self.__name__ + if n not in d: + value = self.func(obj) + d[n] = value + return value diff --git a/raven/utils/compat.py b/raven/utils/compat.py index 03e0dc6..8e21fca 100644 --- a/raven/utils/compat.py +++ b/raven/utils/compat.py @@ -25,9 +25,9 @@ except ImportError: try: - from queue import Queue + from queue import Queue, Full except ImportError: - from Queue import Queue # NOQA + from Queue import Queue, Full # NOQA try: |
