summaryrefslogtreecommitdiff
path: root/amqp
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2014-01-16 15:01:19 +0000
committerAsk Solem <ask@celeryproject.org>2014-01-16 15:01:19 +0000
commit97abf06e054398f22b7bf77aa6349d8a2ccd860b (patch)
treed4cabb26972ef44dbd836d3f7d43470654a50bd2 /amqp
parentbff6ddb0ea116e7c3b06f6cf2509c6411b3e4803 (diff)
parent07a312582d90b1303ff5d675613935392f6a6283 (diff)
downloadpy-amqp-readwrite.tar.gz
Merge branch 'master' into readwritereadwrite
Conflicts: amqp/transport.py
Diffstat (limited to 'amqp')
-rw-r--r--amqp/__init__.py2
-rw-r--r--amqp/abstract_channel.py6
-rw-r--r--amqp/channel.py12
-rw-r--r--amqp/connection.py69
-rw-r--r--amqp/five.py55
-rw-r--r--amqp/method_framing.py6
-rw-r--r--amqp/serialization.py18
-rw-r--r--amqp/transport.py100
-rw-r--r--amqp/utils.py5
9 files changed, 165 insertions, 108 deletions
diff --git a/amqp/__init__.py b/amqp/__init__.py
index de7259b..3a5ba82 100644
--- a/amqp/__init__.py
+++ b/amqp/__init__.py
@@ -16,7 +16,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301
from __future__ import absolute_import
-VERSION = (1, 3, 2)
+VERSION = (1, 4, 1)
__version__ = '.'.join(map(str, VERSION[0:3])) + ''.join(VERSION[3:])
__author__ = 'Barry Pederson'
__maintainer__ = 'Ask Solem'
diff --git a/amqp/abstract_channel.py b/amqp/abstract_channel.py
index 5e37bf9..28cfe13 100644
--- a/amqp/abstract_channel.py
+++ b/amqp/abstract_channel.py
@@ -19,12 +19,6 @@ from __future__ import absolute_import
from .exceptions import AMQPNotImplementedError, RecoverableConnectionError
from .serialization import AMQPWriter
-try:
- bytes
-except NameError:
- # Python 2.5 and lower
- bytes = str
-
__all__ = ['AbstractChannel']
diff --git a/amqp/channel.py b/amqp/channel.py
index ea59f0c..05eb09a 100644
--- a/amqp/channel.py
+++ b/amqp/channel.py
@@ -31,6 +31,15 @@ __all__ = ['Channel']
AMQP_LOGGER = logging.getLogger('amqp')
+EXCHANGE_AUTODELETE_DEPRECATED = """\
+The auto_delete flag for exchanges has been deprecated and will be removed
+from py-amqp v1.5.0.\
+"""
+
+
+class VDeprecationWarning(DeprecationWarning):
+ pass
+
class Channel(AbstractChannel):
"""Work with channels
@@ -604,8 +613,7 @@ class Channel(AbstractChannel):
self._send_method((40, 10), args)
if auto_delete:
- warn(DeprecationWarning(
- 'auto_delete exchanges has been deprecated'))
+ warn(VDeprecationWarning(EXCHANGE_AUTODELETE_DEPRECATED))
if not nowait:
return self.wait(allowed_methods=[
diff --git a/amqp/connection.py b/amqp/connection.py
index 8808a58..1d4980f 100644
--- a/amqp/connection.py
+++ b/amqp/connection.py
@@ -34,7 +34,7 @@ from .exceptions import (
ConnectionForced, ConnectionError, error_for_code,
RecoverableConnectionError, RecoverableChannelError,
)
-from .five import items, range, values
+from .five import items, range, values, monotonic
from .method_framing import MethodReader, MethodWriter
from .serialization import AMQPWriter
from .transport import create_transport
@@ -80,9 +80,26 @@ class Connection(AbstractChannel):
"""
Channel = Channel
+ #: Final heartbeat interval value (in float seconds) after negotiation
+ heartbeat = None
+
+ #: Original heartbeat interval value proposed by client.
+ client_heartbeat = None
+
+ #: Original heartbeat interval proposed by server.
+ server_heartbeat = None
+
+ #: Time of last heartbeat sent (in monotonic time, if available).
+ last_heartbeat_sent = 0
+
+ #: Time of last heartbeat received (in monotonic time, if available).
+ last_heartbeat_received = 0
+
+ #: Number of bytes sent to socket at the last heartbeat check.
prev_sent = None
+
+ #: Number of bytes received from socket at the last heartbeat check.
prev_recv = None
- missed_heartbeats = 0
def __init__(self, host='localhost', userid='guest', password='guest',
login_method='AMQPLAIN', login_response=None,
@@ -125,7 +142,7 @@ class Connection(AbstractChannel):
# Properties set in the Tune method
self.channel_max = channel_max
self.frame_max = frame_max
- self.heartbeat = heartbeat
+ self.client_heartbeat = heartbeat
self.confirm_publish = confirm_publish
@@ -841,10 +858,18 @@ class Connection(AbstractChannel):
want a heartbeat.
"""
+ client_heartbeat = self.client_heartbeat or 0
self.channel_max = args.read_short() or self.channel_max
self.frame_max = args.read_long() or self.frame_max
self.method_writer.frame_max = self.frame_max
- heartbeat = args.read_short() # noqa
+ self.server_heartbeat = args.read_short() or 0
+
+ # negotiate the heartbeat interval to the smaller of the
+ # specified values
+ if self.server_heartbeat == 0 or client_heartbeat == 0:
+ self.heartbeat = max(self.server_heartbeat, client_heartbeat)
+ else:
+ self.heartbeat = min(self.server_heartbeat, client_heartbeat)
self._x_tune_ok(self.channel_max, self.frame_max, self.heartbeat)
@@ -852,28 +877,34 @@ class Connection(AbstractChannel):
self.transport.write_frame(8, 0, bytes())
def heartbeat_tick(self, rate=2):
- """Verify that hartbeats are sent and received.
-
- :keyword rate: Rate is how often the tick is called
- compared to the actual heartbeat value. E.g. if
- the heartbeat is set to 3 seconds, and the tick
- is called every 3 / 2 seconds, then the rate is 2.
+ """Send heartbeat packets, if necessary, and fail if none have been
+ received recently. This should be called frequently, on the order of
+ once per second.
+ :keyword rate: Ignored
"""
+ if not self.heartbeat:
+ return
+
+ # treat actual data exchange in either direction as a heartbeat
sent_now = self.method_writer.bytes_sent
recv_now = self.method_reader.bytes_recv
+ if self.prev_sent is None or self.prev_sent != sent_now:
+ self.last_heartbeat_sent = monotonic()
+ if self.prev_recv is None or self.prev_recv != recv_now:
+ self.last_heartbeat_received = monotonic()
+ self.prev_sent, self.prev_recv = sent_now, recv_now
- if self.prev_sent is not None and self.prev_sent == sent_now:
+ # send a heartbeat if it's time to do so
+ if monotonic() > self.last_heartbeat_sent + self.heartbeat:
self.send_heartbeat()
+ self.last_heartbeat_sent = monotonic()
- if self.prev_recv is not None and self.prev_recv == recv_now:
- self.missed_heartbeats += 1
- else:
- self.missed_heartbeats = 0
-
- self.prev_sent, self.prev_recv = sent_now, recv_now
-
- if self.missed_heartbeats >= rate:
+ # if we've missed two intervals' heartbeats, fail; this gives the
+ # server enough time to send heartbeats a little late
+ if (self.last_heartbeat_received and
+ self.last_heartbeat_received + 2 *
+ self.heartbeat < monotonic()):
raise ConnectionForced('Too many heartbeats missed')
def _x_tune_ok(self, channel_max, frame_max, heartbeat):
diff --git a/amqp/five.py b/amqp/five.py
index 25b83fc..5157df5 100644
--- a/amqp/five.py
+++ b/amqp/five.py
@@ -131,3 +131,58 @@ def with_metaclass(Type, skip_attrs=set(['__dict__', '__weakref__'])):
return Type(Class.__name__, Class.__bases__, attrs)
return _clone_with_metaclass
+
+############## time.monotonic ################################################
+
+if sys.version_info < (3, 3):
+
+ import platform
+ SYSTEM = platform.system()
+
+ if SYSTEM == 'Darwin':
+ import ctypes
+ from ctypes.util import find_library
+ libSystem = ctypes.CDLL('libSystem.dylib')
+ CoreServices = ctypes.CDLL(find_library('CoreServices'),
+ use_errno=True)
+ mach_absolute_time = libSystem.mach_absolute_time
+ mach_absolute_time.restype = ctypes.c_uint64
+ absolute_to_nanoseconds = CoreServices.AbsoluteToNanoseconds
+ absolute_to_nanoseconds.restype = ctypes.c_uint64
+ absolute_to_nanoseconds.argtypes = [ctypes.c_uint64]
+
+ def _monotonic():
+ return absolute_to_nanoseconds(mach_absolute_time()) * 1e-9
+
+ elif SYSTEM == 'Linux':
+ # from stackoverflow:
+ # questions/1205722/how-do-i-get-monotonic-time-durations-in-python
+ import ctypes
+ import os
+
+ CLOCK_MONOTONIC = 1 # see <linux/time.h>
+
+ class timespec(ctypes.Structure):
+ _fields_ = [
+ ('tv_sec', ctypes.c_long),
+ ('tv_nsec', ctypes.c_long),
+ ]
+
+ librt = ctypes.CDLL('librt.so.1', use_errno=True)
+ clock_gettime = librt.clock_gettime
+ clock_gettime.argtypes = [
+ ctypes.c_int, ctypes.POINTER(timespec),
+ ]
+
+ def _monotonic(): # noqa
+ t = timespec()
+ if clock_gettime(CLOCK_MONOTONIC, ctypes.pointer(t)) != 0:
+ errno_ = ctypes.get_errno()
+ raise OSError(errno_, os.strerror(errno_))
+ return t.tv_sec + t.tv_nsec * 1e-9
+ else:
+ from time import time as _monotonic
+try:
+ from time import monotonic
+except ImportError:
+ monotonic = _monotonic # noqa
diff --git a/amqp/method_framing.py b/amqp/method_framing.py
index 85fbfba..b454524 100644
--- a/amqp/method_framing.py
+++ b/amqp/method_framing.py
@@ -19,12 +19,6 @@ from __future__ import absolute_import
from collections import defaultdict, deque
from struct import pack, unpack
-try:
- bytes
-except NameError:
- # Python 2.5 and lower
- bytes = str
-
from .basic_message import Message
from .exceptions import AMQPError, UnexpectedFrame
from .five import range, string
diff --git a/amqp/serialization.py b/amqp/serialization.py
index 6a74702..528d0b7 100644
--- a/amqp/serialization.py
+++ b/amqp/serialization.py
@@ -25,6 +25,7 @@ import sys
from datetime import datetime
from decimal import Decimal
+from io import BytesIO
from struct import pack, unpack
from time import mktime
@@ -39,19 +40,6 @@ if IS_PY3K:
else:
byte = chr
-try:
- from io import BytesIO
-except ImportError: # Py2.5
- try:
- from cStringIO import StringIO as BytesIO # noqa
- except ImportError:
- from StringIO import StringIO as BytesIO # noqa
-
-try:
- bytes
-except NameError:
- # Python 2.5 and lower
- bytes = str
ILLEGAL_TABLE_TYPE_WITH_KEY = """\
Table type {0!r} for key {1!r} not handled by amqp. [value: {2!r}]
@@ -174,6 +162,8 @@ class AMQPReader(object):
val = self.read_bit()
elif ftype == 100:
val = self.read_float()
+ elif ftype == 86: # 'V'
+ val = None
else:
raise FrameSyntaxError(
'Unknown value in table: {0!r} ({1!r})'.format(
@@ -357,6 +347,8 @@ class AMQPWriter(object):
elif isinstance(v, (list, tuple)):
self.write(b'A')
self.write_array(v)
+ elif v is None:
+ self.write(b'V')
else:
err = (ILLEGAL_TABLE_TYPE_WITH_KEY.format(type(v), k, v) if k
else ILLEGAL_TABLE_TYPE.format(type(v), v))
diff --git a/amqp/transport.py b/amqp/transport.py
index 3123072..b567d79 100644
--- a/amqp/transport.py
+++ b/amqp/transport.py
@@ -1,9 +1,3 @@
-"""
-Read/Write AMQP frames over network transports.
-
-2009-01-14 Barry Pederson <bp@barryp.org>
-
-"""
# Copyright (C) 2009 Barry Pederson <bp@barryp.org>
#
# This library is free software; you can redistribute it and/or
@@ -25,6 +19,7 @@ import errno
import re
import select
import socket
+import ssl
# Jython does not have this attribute
try:
@@ -32,29 +27,12 @@ try:
except ImportError: # pragma: no cover
from socket import IPPROTO_TCP as SOL_TCP # noqa
-#
-# See if Python 2.6+ SSL support is available
-#
-try:
- import ssl
- HAVE_PY26_SSL = True
-except:
- HAVE_PY26_SSL = False
-
-try:
- bytes
-except:
- # Python 2.5 and lower
- bytes = str
-
-UNAVAIL = frozenset([errno.EAGAIN, errno.EINTR])
-
from struct import pack, unpack
from .exceptions import UnexpectedFrame
from .utils import get_errno, set_cloexec
-_UNAVAIL = errno.EAGAIN, errno.EINTR
+_UNAVAIL = frozenset([errno.EAGAIN, errno.EINTR, errno.ENOENT])
AMQP_PORT = 5672
@@ -206,22 +184,17 @@ class SSLTransport(_AbstractTransport):
super(SSLTransport, self).__init__(host, connect_timeout)
def _setup_transport(self):
- """Wrap the socket in an SSL object, either the
- new Python 2.6 version, or the older Python 2.5 and
- lower version."""
- if HAVE_PY26_SSL:
- if hasattr(self, 'sslopts'):
- self.sock = ssl.wrap_socket(self.sock, **self.sslopts)
- else:
- self.sock = ssl.wrap_socket(self.sock)
- self.sock.do_handshake()
+ """Wrap the socket in an SSL object."""
+ if hasattr(self, 'sslopts'):
+ self.sock = ssl.wrap_socket(self.sock, **self.sslopts)
else:
- self.sock = socket.ssl(self.sock)
+ self.sock = ssl.wrap_socket(self.sock)
+ self.sock.do_handshake()
self._quick_recv = self.sock.read
def _shutdown_transport(self):
"""Unwrap a Python 2.6 SSL socket, so we can call shutdown()"""
- if HAVE_PY26_SSL and self.sock is not None:
+ if self.sock is not None:
try:
unwrap = self.sock.unwrap
except AttributeError:
@@ -235,19 +208,22 @@ class SSLTransport(_AbstractTransport):
# to get the exact number of bytes wanted.
recv = self._quick_recv
rbuf = self._read_buffer
- while len(rbuf) < n:
- try:
- s = recv(131072) # see note above
- except socket.error as exc:
- # ssl.sock.read may cause ENOENT if the
- # operation couldn't be performed (Issue celery#1414).
- if not initial and exc.errno in _errnos:
- continue
- raise exc
- if not s:
- raise IOError('Socket closed')
- rbuf += s
-
+ try:
+ while len(rbuf) < n:
+ try:
+ s = recv(131072) # see note above
+ except socket.error as exc:
+ # ssl.sock.read may cause ENOENT if the
+ # operation couldn't be performed (Issue celery#1414).
+ if not initial and exc.errno in _errnos:
+ continue
+ raise
+ if not s:
+ raise IOError('Socket closed')
+ rbuf += s
+ except:
+ self._read_buffer = rbuf
+ raise
result, self._read_buffer = rbuf[:n], rbuf[n:]
return result
@@ -270,7 +246,7 @@ class TCPTransport(_AbstractTransport):
self._read_buffer = EMPTY_BUFFER
self._quick_recv = self.sock.recv
- def _write(self, s, select=select.select, unavail=UNAVAIL):
+ def _write(self, s, select=select.select, unavail=_UNAVAIL):
write = self.sock.send
while s:
r, w, e = select([self.sock], [self.sock], [self.sock], 1.0)
@@ -291,20 +267,24 @@ class TCPTransport(_AbstractTransport):
raise IOError('Socket closed')
s = s[n:]
- def _read(self, n, initial=False, _errnos=UNAVAIL):
+ def _read(self, n, initial=False, _errnos=_UNAVAIL):
"""Read exactly n bytes from the socket"""
recv = self._quick_recv
rbuf = self._read_buffer
- while len(rbuf) < n:
- try:
- s = recv(131072)
- except socket.error as exc:
- if not initial and exc.errno in _errnos:
- continue
- raise
- if not s:
- raise IOError('Socket closed')
- rbuf += s
+ try:
+ while len(rbuf) < n:
+ try:
+ s = recv(131072)
+ except socket.error as exc:
+ if not initial and exc.errno in _errnos:
+ continue
+ raise
+ if not s:
+ raise IOError('Socket closed')
+ rbuf += s
+ except:
+ self._read_buffer = rbuf
+ raise
result, self._read_buffer = rbuf[:n], rbuf[n:]
return result
diff --git a/amqp/utils.py b/amqp/utils.py
index 994030b..900d2aa 100644
--- a/amqp/utils.py
+++ b/amqp/utils.py
@@ -11,7 +11,8 @@ except ImportError:
class promise(object):
if not hasattr(sys, 'pypy_version_info'):
__slots__ = tuple(
- 'fun args kwargs value ready failed on_success on_error'.split()
+ 'fun args kwargs value ready failed '
+ ' on_success on_error calls'.split()
)
def __init__(self, fun, args=(), kwargs=(),
@@ -24,6 +25,7 @@ class promise(object):
self.on_success = on_success
self.on_error = on_error
self.value = None
+ self.calls = 0
def __repr__(self):
return '<$: {0.fun.__name__}(*{0.args!r}, **{0.kwargs!r})'.format(
@@ -43,6 +45,7 @@ class promise(object):
self.on_success(self.value)
finally:
self.ready = True
+ self.calls += 1
def then(self, callback=None, on_error=None):
self.on_success = callback