diff options
| author | Ask Solem <ask@celeryproject.org> | 2014-01-16 15:01:19 +0000 |
|---|---|---|
| committer | Ask Solem <ask@celeryproject.org> | 2014-01-16 15:01:19 +0000 |
| commit | 97abf06e054398f22b7bf77aa6349d8a2ccd860b (patch) | |
| tree | d4cabb26972ef44dbd836d3f7d43470654a50bd2 /amqp | |
| parent | bff6ddb0ea116e7c3b06f6cf2509c6411b3e4803 (diff) | |
| parent | 07a312582d90b1303ff5d675613935392f6a6283 (diff) | |
| download | py-amqp-readwrite.tar.gz | |
Merge branch 'master' into readwritereadwrite
Conflicts:
amqp/transport.py
Diffstat (limited to 'amqp')
| -rw-r--r-- | amqp/__init__.py | 2 | ||||
| -rw-r--r-- | amqp/abstract_channel.py | 6 | ||||
| -rw-r--r-- | amqp/channel.py | 12 | ||||
| -rw-r--r-- | amqp/connection.py | 69 | ||||
| -rw-r--r-- | amqp/five.py | 55 | ||||
| -rw-r--r-- | amqp/method_framing.py | 6 | ||||
| -rw-r--r-- | amqp/serialization.py | 18 | ||||
| -rw-r--r-- | amqp/transport.py | 100 | ||||
| -rw-r--r-- | amqp/utils.py | 5 |
9 files changed, 165 insertions, 108 deletions
diff --git a/amqp/__init__.py b/amqp/__init__.py index de7259b..3a5ba82 100644 --- a/amqp/__init__.py +++ b/amqp/__init__.py @@ -16,7 +16,7 @@ # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 from __future__ import absolute_import -VERSION = (1, 3, 2) +VERSION = (1, 4, 1) __version__ = '.'.join(map(str, VERSION[0:3])) + ''.join(VERSION[3:]) __author__ = 'Barry Pederson' __maintainer__ = 'Ask Solem' diff --git a/amqp/abstract_channel.py b/amqp/abstract_channel.py index 5e37bf9..28cfe13 100644 --- a/amqp/abstract_channel.py +++ b/amqp/abstract_channel.py @@ -19,12 +19,6 @@ from __future__ import absolute_import from .exceptions import AMQPNotImplementedError, RecoverableConnectionError from .serialization import AMQPWriter -try: - bytes -except NameError: - # Python 2.5 and lower - bytes = str - __all__ = ['AbstractChannel'] diff --git a/amqp/channel.py b/amqp/channel.py index ea59f0c..05eb09a 100644 --- a/amqp/channel.py +++ b/amqp/channel.py @@ -31,6 +31,15 @@ __all__ = ['Channel'] AMQP_LOGGER = logging.getLogger('amqp') +EXCHANGE_AUTODELETE_DEPRECATED = """\ +The auto_delete flag for exchanges has been deprecated and will be removed +from py-amqp v1.5.0.\ +""" + + +class VDeprecationWarning(DeprecationWarning): + pass + class Channel(AbstractChannel): """Work with channels @@ -604,8 +613,7 @@ class Channel(AbstractChannel): self._send_method((40, 10), args) if auto_delete: - warn(DeprecationWarning( - 'auto_delete exchanges has been deprecated')) + warn(VDeprecationWarning(EXCHANGE_AUTODELETE_DEPRECATED)) if not nowait: return self.wait(allowed_methods=[ diff --git a/amqp/connection.py b/amqp/connection.py index 8808a58..1d4980f 100644 --- a/amqp/connection.py +++ b/amqp/connection.py @@ -34,7 +34,7 @@ from .exceptions import ( ConnectionForced, ConnectionError, error_for_code, RecoverableConnectionError, RecoverableChannelError, ) -from .five import items, range, values +from .five import items, range, values, monotonic from .method_framing import MethodReader, MethodWriter from .serialization import AMQPWriter from .transport import create_transport @@ -80,9 +80,26 @@ class Connection(AbstractChannel): """ Channel = Channel + #: Final heartbeat interval value (in float seconds) after negotiation + heartbeat = None + + #: Original heartbeat interval value proposed by client. + client_heartbeat = None + + #: Original heartbeat interval proposed by server. + server_heartbeat = None + + #: Time of last heartbeat sent (in monotonic time, if available). + last_heartbeat_sent = 0 + + #: Time of last heartbeat received (in monotonic time, if available). + last_heartbeat_received = 0 + + #: Number of bytes sent to socket at the last heartbeat check. prev_sent = None + + #: Number of bytes received from socket at the last heartbeat check. prev_recv = None - missed_heartbeats = 0 def __init__(self, host='localhost', userid='guest', password='guest', login_method='AMQPLAIN', login_response=None, @@ -125,7 +142,7 @@ class Connection(AbstractChannel): # Properties set in the Tune method self.channel_max = channel_max self.frame_max = frame_max - self.heartbeat = heartbeat + self.client_heartbeat = heartbeat self.confirm_publish = confirm_publish @@ -841,10 +858,18 @@ class Connection(AbstractChannel): want a heartbeat. """ + client_heartbeat = self.client_heartbeat or 0 self.channel_max = args.read_short() or self.channel_max self.frame_max = args.read_long() or self.frame_max self.method_writer.frame_max = self.frame_max - heartbeat = args.read_short() # noqa + self.server_heartbeat = args.read_short() or 0 + + # negotiate the heartbeat interval to the smaller of the + # specified values + if self.server_heartbeat == 0 or client_heartbeat == 0: + self.heartbeat = max(self.server_heartbeat, client_heartbeat) + else: + self.heartbeat = min(self.server_heartbeat, client_heartbeat) self._x_tune_ok(self.channel_max, self.frame_max, self.heartbeat) @@ -852,28 +877,34 @@ class Connection(AbstractChannel): self.transport.write_frame(8, 0, bytes()) def heartbeat_tick(self, rate=2): - """Verify that hartbeats are sent and received. - - :keyword rate: Rate is how often the tick is called - compared to the actual heartbeat value. E.g. if - the heartbeat is set to 3 seconds, and the tick - is called every 3 / 2 seconds, then the rate is 2. + """Send heartbeat packets, if necessary, and fail if none have been + received recently. This should be called frequently, on the order of + once per second. + :keyword rate: Ignored """ + if not self.heartbeat: + return + + # treat actual data exchange in either direction as a heartbeat sent_now = self.method_writer.bytes_sent recv_now = self.method_reader.bytes_recv + if self.prev_sent is None or self.prev_sent != sent_now: + self.last_heartbeat_sent = monotonic() + if self.prev_recv is None or self.prev_recv != recv_now: + self.last_heartbeat_received = monotonic() + self.prev_sent, self.prev_recv = sent_now, recv_now - if self.prev_sent is not None and self.prev_sent == sent_now: + # send a heartbeat if it's time to do so + if monotonic() > self.last_heartbeat_sent + self.heartbeat: self.send_heartbeat() + self.last_heartbeat_sent = monotonic() - if self.prev_recv is not None and self.prev_recv == recv_now: - self.missed_heartbeats += 1 - else: - self.missed_heartbeats = 0 - - self.prev_sent, self.prev_recv = sent_now, recv_now - - if self.missed_heartbeats >= rate: + # if we've missed two intervals' heartbeats, fail; this gives the + # server enough time to send heartbeats a little late + if (self.last_heartbeat_received and + self.last_heartbeat_received + 2 * + self.heartbeat < monotonic()): raise ConnectionForced('Too many heartbeats missed') def _x_tune_ok(self, channel_max, frame_max, heartbeat): diff --git a/amqp/five.py b/amqp/five.py index 25b83fc..5157df5 100644 --- a/amqp/five.py +++ b/amqp/five.py @@ -131,3 +131,58 @@ def with_metaclass(Type, skip_attrs=set(['__dict__', '__weakref__'])): return Type(Class.__name__, Class.__bases__, attrs) return _clone_with_metaclass + +############## time.monotonic ################################################ + +if sys.version_info < (3, 3): + + import platform + SYSTEM = platform.system() + + if SYSTEM == 'Darwin': + import ctypes + from ctypes.util import find_library + libSystem = ctypes.CDLL('libSystem.dylib') + CoreServices = ctypes.CDLL(find_library('CoreServices'), + use_errno=True) + mach_absolute_time = libSystem.mach_absolute_time + mach_absolute_time.restype = ctypes.c_uint64 + absolute_to_nanoseconds = CoreServices.AbsoluteToNanoseconds + absolute_to_nanoseconds.restype = ctypes.c_uint64 + absolute_to_nanoseconds.argtypes = [ctypes.c_uint64] + + def _monotonic(): + return absolute_to_nanoseconds(mach_absolute_time()) * 1e-9 + + elif SYSTEM == 'Linux': + # from stackoverflow: + # questions/1205722/how-do-i-get-monotonic-time-durations-in-python + import ctypes + import os + + CLOCK_MONOTONIC = 1 # see <linux/time.h> + + class timespec(ctypes.Structure): + _fields_ = [ + ('tv_sec', ctypes.c_long), + ('tv_nsec', ctypes.c_long), + ] + + librt = ctypes.CDLL('librt.so.1', use_errno=True) + clock_gettime = librt.clock_gettime + clock_gettime.argtypes = [ + ctypes.c_int, ctypes.POINTER(timespec), + ] + + def _monotonic(): # noqa + t = timespec() + if clock_gettime(CLOCK_MONOTONIC, ctypes.pointer(t)) != 0: + errno_ = ctypes.get_errno() + raise OSError(errno_, os.strerror(errno_)) + return t.tv_sec + t.tv_nsec * 1e-9 + else: + from time import time as _monotonic +try: + from time import monotonic +except ImportError: + monotonic = _monotonic # noqa diff --git a/amqp/method_framing.py b/amqp/method_framing.py index 85fbfba..b454524 100644 --- a/amqp/method_framing.py +++ b/amqp/method_framing.py @@ -19,12 +19,6 @@ from __future__ import absolute_import from collections import defaultdict, deque from struct import pack, unpack -try: - bytes -except NameError: - # Python 2.5 and lower - bytes = str - from .basic_message import Message from .exceptions import AMQPError, UnexpectedFrame from .five import range, string diff --git a/amqp/serialization.py b/amqp/serialization.py index 6a74702..528d0b7 100644 --- a/amqp/serialization.py +++ b/amqp/serialization.py @@ -25,6 +25,7 @@ import sys from datetime import datetime from decimal import Decimal +from io import BytesIO from struct import pack, unpack from time import mktime @@ -39,19 +40,6 @@ if IS_PY3K: else: byte = chr -try: - from io import BytesIO -except ImportError: # Py2.5 - try: - from cStringIO import StringIO as BytesIO # noqa - except ImportError: - from StringIO import StringIO as BytesIO # noqa - -try: - bytes -except NameError: - # Python 2.5 and lower - bytes = str ILLEGAL_TABLE_TYPE_WITH_KEY = """\ Table type {0!r} for key {1!r} not handled by amqp. [value: {2!r}] @@ -174,6 +162,8 @@ class AMQPReader(object): val = self.read_bit() elif ftype == 100: val = self.read_float() + elif ftype == 86: # 'V' + val = None else: raise FrameSyntaxError( 'Unknown value in table: {0!r} ({1!r})'.format( @@ -357,6 +347,8 @@ class AMQPWriter(object): elif isinstance(v, (list, tuple)): self.write(b'A') self.write_array(v) + elif v is None: + self.write(b'V') else: err = (ILLEGAL_TABLE_TYPE_WITH_KEY.format(type(v), k, v) if k else ILLEGAL_TABLE_TYPE.format(type(v), v)) diff --git a/amqp/transport.py b/amqp/transport.py index 3123072..b567d79 100644 --- a/amqp/transport.py +++ b/amqp/transport.py @@ -1,9 +1,3 @@ -""" -Read/Write AMQP frames over network transports. - -2009-01-14 Barry Pederson <bp@barryp.org> - -""" # Copyright (C) 2009 Barry Pederson <bp@barryp.org> # # This library is free software; you can redistribute it and/or @@ -25,6 +19,7 @@ import errno import re import select import socket +import ssl # Jython does not have this attribute try: @@ -32,29 +27,12 @@ try: except ImportError: # pragma: no cover from socket import IPPROTO_TCP as SOL_TCP # noqa -# -# See if Python 2.6+ SSL support is available -# -try: - import ssl - HAVE_PY26_SSL = True -except: - HAVE_PY26_SSL = False - -try: - bytes -except: - # Python 2.5 and lower - bytes = str - -UNAVAIL = frozenset([errno.EAGAIN, errno.EINTR]) - from struct import pack, unpack from .exceptions import UnexpectedFrame from .utils import get_errno, set_cloexec -_UNAVAIL = errno.EAGAIN, errno.EINTR +_UNAVAIL = frozenset([errno.EAGAIN, errno.EINTR, errno.ENOENT]) AMQP_PORT = 5672 @@ -206,22 +184,17 @@ class SSLTransport(_AbstractTransport): super(SSLTransport, self).__init__(host, connect_timeout) def _setup_transport(self): - """Wrap the socket in an SSL object, either the - new Python 2.6 version, or the older Python 2.5 and - lower version.""" - if HAVE_PY26_SSL: - if hasattr(self, 'sslopts'): - self.sock = ssl.wrap_socket(self.sock, **self.sslopts) - else: - self.sock = ssl.wrap_socket(self.sock) - self.sock.do_handshake() + """Wrap the socket in an SSL object.""" + if hasattr(self, 'sslopts'): + self.sock = ssl.wrap_socket(self.sock, **self.sslopts) else: - self.sock = socket.ssl(self.sock) + self.sock = ssl.wrap_socket(self.sock) + self.sock.do_handshake() self._quick_recv = self.sock.read def _shutdown_transport(self): """Unwrap a Python 2.6 SSL socket, so we can call shutdown()""" - if HAVE_PY26_SSL and self.sock is not None: + if self.sock is not None: try: unwrap = self.sock.unwrap except AttributeError: @@ -235,19 +208,22 @@ class SSLTransport(_AbstractTransport): # to get the exact number of bytes wanted. recv = self._quick_recv rbuf = self._read_buffer - while len(rbuf) < n: - try: - s = recv(131072) # see note above - except socket.error as exc: - # ssl.sock.read may cause ENOENT if the - # operation couldn't be performed (Issue celery#1414). - if not initial and exc.errno in _errnos: - continue - raise exc - if not s: - raise IOError('Socket closed') - rbuf += s - + try: + while len(rbuf) < n: + try: + s = recv(131072) # see note above + except socket.error as exc: + # ssl.sock.read may cause ENOENT if the + # operation couldn't be performed (Issue celery#1414). + if not initial and exc.errno in _errnos: + continue + raise + if not s: + raise IOError('Socket closed') + rbuf += s + except: + self._read_buffer = rbuf + raise result, self._read_buffer = rbuf[:n], rbuf[n:] return result @@ -270,7 +246,7 @@ class TCPTransport(_AbstractTransport): self._read_buffer = EMPTY_BUFFER self._quick_recv = self.sock.recv - def _write(self, s, select=select.select, unavail=UNAVAIL): + def _write(self, s, select=select.select, unavail=_UNAVAIL): write = self.sock.send while s: r, w, e = select([self.sock], [self.sock], [self.sock], 1.0) @@ -291,20 +267,24 @@ class TCPTransport(_AbstractTransport): raise IOError('Socket closed') s = s[n:] - def _read(self, n, initial=False, _errnos=UNAVAIL): + def _read(self, n, initial=False, _errnos=_UNAVAIL): """Read exactly n bytes from the socket""" recv = self._quick_recv rbuf = self._read_buffer - while len(rbuf) < n: - try: - s = recv(131072) - except socket.error as exc: - if not initial and exc.errno in _errnos: - continue - raise - if not s: - raise IOError('Socket closed') - rbuf += s + try: + while len(rbuf) < n: + try: + s = recv(131072) + except socket.error as exc: + if not initial and exc.errno in _errnos: + continue + raise + if not s: + raise IOError('Socket closed') + rbuf += s + except: + self._read_buffer = rbuf + raise result, self._read_buffer = rbuf[:n], rbuf[n:] return result diff --git a/amqp/utils.py b/amqp/utils.py index 994030b..900d2aa 100644 --- a/amqp/utils.py +++ b/amqp/utils.py @@ -11,7 +11,8 @@ except ImportError: class promise(object): if not hasattr(sys, 'pypy_version_info'): __slots__ = tuple( - 'fun args kwargs value ready failed on_success on_error'.split() + 'fun args kwargs value ready failed ' + ' on_success on_error calls'.split() ) def __init__(self, fun, args=(), kwargs=(), @@ -24,6 +25,7 @@ class promise(object): self.on_success = on_success self.on_error = on_error self.value = None + self.calls = 0 def __repr__(self): return '<$: {0.fun.__name__}(*{0.args!r}, **{0.kwargs!r})'.format( @@ -43,6 +45,7 @@ class promise(object): self.on_success(self.value) finally: self.ready = True + self.calls += 1 def then(self, callback=None, on_error=None): self.on_success = callback |
