summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--raven/transport/threaded.py69
-rw-r--r--raven/utils/__init__.py26
-rw-r--r--raven/utils/compat.py4
3 files changed, 68 insertions, 31 deletions
diff --git a/raven/transport/threaded.py b/raven/transport/threaded.py
index 11197aa..c1d48c4 100644
--- a/raven/transport/threaded.py
+++ b/raven/transport/threaded.py
@@ -8,14 +8,16 @@ raven.transport.threaded
import atexit
import logging
+import os
import time
import threading
-import os
-from raven.utils.compat import Queue
+from raven.utils import memoize
+from raven.utils.compat import Queue, Full
from raven.transport.base import HTTPTransport, AsyncTransport
DEFAULT_TIMEOUT = 10
+QUEUE_SIZE = 100
logger = logging.getLogger('sentry.errors')
@@ -23,8 +25,8 @@ logger = logging.getLogger('sentry.errors')
class AsyncWorker(object):
_terminator = object()
- def __init__(self, shutdown_timeout=DEFAULT_TIMEOUT):
- self._queue = Queue(-1)
+ def __init__(self, shutdown_timeout=DEFAULT_TIMEOUT, queue_size=QUEUE_SIZE):
+ self._queue = Queue(queue_size)
self._lock = threading.Lock()
self._thread = None
self.options = {
@@ -34,26 +36,30 @@ class AsyncWorker(object):
def main_thread_terminated(self):
size = self._queue.qsize()
- if size:
- timeout = self.options['shutdown_timeout']
- print("Sentry is attempting to send %s pending error messages" % size)
- print("Waiting up to %s seconds" % timeout)
- if os.name == 'nt':
- print("Press Ctrl-Break to quit")
- else:
- print("Press Ctrl-C to quit")
- self.stop(timeout=timeout)
+ if not size:
+ return
+
+ timeout = self.options['shutdown_timeout']
+ print("Sentry is attempting to send %s pending error messages" % size)
+ print("Waiting up to %s seconds" % timeout)
+ if os.name == 'nt':
+ print("Press Ctrl-Break to quit")
+ else:
+ print("Press Ctrl-C to quit")
+ self.stop(timeout=timeout)
def start(self):
"""
Starts the task thread.
"""
+ if not self._thread:
+ return
+
self._lock.acquire()
try:
- if not self._thread:
- self._thread = threading.Thread(target=self._target)
- self._thread.setDaemon(True)
- self._thread.start()
+ self._thread = threading.Thread(target=self._target)
+ self._thread.setDaemon(True)
+ self._thread.start()
finally:
self._lock.release()
atexit.register(self.main_thread_terminated)
@@ -62,17 +68,22 @@ class AsyncWorker(object):
"""
Stops the task thread. Synchronous!
"""
+ if not self._thread:
+ return
+
self._lock.acquire()
try:
- if self._thread:
- self._queue.put_nowait(self._terminator)
- self._thread.join(timeout=timeout)
- self._thread = None
+ self._queue.put_nowait(self._terminator)
+ self._thread.join(timeout=timeout)
+ self._thread = None
finally:
self._lock.release()
def queue(self, callback, *args, **kwargs):
- self._queue.put_nowait((callback, args, kwargs))
+ try:
+ self._queue.put_nowait((callback, args, kwargs))
+ except Full:
+ logger.error('Unable to queue job (full)', exc_info=True)
def _target(self):
while 1:
@@ -92,16 +103,16 @@ class ThreadedHTTPTransport(AsyncTransport, HTTPTransport):
scheme = ['threaded+http', 'threaded+https']
- def __init__(self, parsed_url):
+ def __init__(self, parsed_url, queue_size=QUEUE_SIZE):
super(ThreadedHTTPTransport, self).__init__(parsed_url)
# remove the threaded+ from the protocol, as it is not a real protocol
self._url = self._url.split('+', 1)[-1]
+ self._queue_size = queue_size
- def get_worker(self):
- if not hasattr(self, '_worker'):
- self._worker = AsyncWorker()
- return self._worker
+ @memoize
+ def worker(self):
+ return AsyncWorker(queue_size=self._queue_size)
def send_sync(self, data, headers, success_cb, failure_cb):
try:
@@ -112,5 +123,5 @@ class ThreadedHTTPTransport(AsyncTransport, HTTPTransport):
success_cb()
def async_send(self, data, headers, success_cb, failure_cb):
- self.get_worker().queue(self.send_sync, data, headers, success_cb,
- failure_cb)
+ self.worker.queue(
+ self.send_sync, data, headers, success_cb, failure_cb)
diff --git a/raven/utils/__init__.py b/raven/utils/__init__.py
index 30736b8..2ed213d 100644
--- a/raven/utils/__init__.py
+++ b/raven/utils/__init__.py
@@ -117,3 +117,29 @@ def get_auth_header(protocol, timestamp, client, api_key, api_secret=None, **kwa
header.append(('sentry_secret', api_secret))
return 'Sentry %s' % ', '.join('%s=%s' % (k, v) for k, v in header)
+
+
+class memoize(object):
+ """
+ Memoize the result of a property call.
+
+ >>> class A(object):
+ >>> @memoize
+ >>> def func(self):
+ >>> return 'foo'
+ """
+
+ def __init__(self, func):
+ self.__name__ = func.__name__
+ self.__module__ = func.__module__
+ self.__doc__ = func.__doc__
+ self.func = func
+
+ def __get__(self, obj, type=None):
+ if obj is None:
+ return self
+ d, n = vars(obj), self.__name__
+ if n not in d:
+ value = self.func(obj)
+ d[n] = value
+ return value
diff --git a/raven/utils/compat.py b/raven/utils/compat.py
index 03e0dc6..8e21fca 100644
--- a/raven/utils/compat.py
+++ b/raven/utils/compat.py
@@ -25,9 +25,9 @@ except ImportError:
try:
- from queue import Queue
+ from queue import Queue, Full
except ImportError:
- from Queue import Queue # NOQA
+ from Queue import Queue, Full # NOQA
try: