summaryrefslogtreecommitdiff
path: root/qpid/python
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2009-10-05 12:26:55 +0000
committerRobert Godfrey <rgodfrey@apache.org>2009-10-05 12:26:55 +0000
commit69e05a6b0979d2e12588635a4ed46a09a87cdec0 (patch)
tree883b432624cb9508d3d275c85aa0eb00388b166a /qpid/python
parent2e20e00f3816aa63f3633a5a16f1891261750c74 (diff)
downloadqpid-python-69e05a6b0979d2e12588635a4ed46a09a87cdec0.tar.gz
Merged from trunk up to r807984
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/java-broker-0-10@821770 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python')
-rw-r--r--qpid/python/Makefile92
-rwxr-xr-xqpid/python/commands/qpid-config8
-rwxr-xr-xqpid/python/preppy67
-rw-r--r--qpid/python/qpid/compat.py7
-rw-r--r--qpid/python/qpid/datatypes.py11
-rw-r--r--qpid/python/qpid/management.py21
-rw-r--r--qpid/python/qpid/managementdata.py3
-rw-r--r--qpid/python/qpid/messaging.py817
-rw-r--r--qpid/python/qpid/session.py14
-rw-r--r--qpid/python/qpid/tests/messaging.py36
-rw-r--r--qpid/python/qpid_config.py8
-rw-r--r--qpid/python/tests/datatypes.py13
-rw-r--r--qpid/python/tests_0-10/management.py4
13 files changed, 816 insertions, 285 deletions
diff --git a/qpid/python/Makefile b/qpid/python/Makefile
new file mode 100644
index 0000000000..380115db41
--- /dev/null
+++ b/qpid/python/Makefile
@@ -0,0 +1,92 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+PREFIX=/usr/local
+EXEC_PREFIX=$(PREFIX)/bin
+DATA_DIR=$(PREFIX)/share
+
+PYTHON_LIB=$(shell python -c "from distutils.sysconfig import get_python_lib; print get_python_lib(prefix='$(PREFIX)')")
+PYTHON_VERSION=$(shell python -c "from distutils.sysconfig import get_python_version; print get_python_version()")
+
+ddfirst=$(shell ddir=$(DATA_DIR) && echo $${ddir:0:1})
+ifeq ($(ddfirst),/)
+AMQP_SPEC_DIR=$(DATA_DIR)/amqp
+else
+AMQP_SPEC_DIR=$(PWD)/$(DATA_DIR)/amqp
+endif
+
+DIRS=qmf qpid mllib models examples tests tests_0-8 tests_0-9 tests_0-10
+SRCS=$(shell find $(DIRS) -name "*.py") qpid_config.py
+BUILD=build
+TARGETS=$(SRCS:%.py=$(BUILD)/%.py)
+
+PYCC=python -c "import compileall, sys; compileall.compile_dir(sys.argv[1])"
+
+all: build
+
+$(BUILD)/%.py: %.py
+ @mkdir -p $(shell dirname $@)
+ ./preppy $(PYTHON_VERSION) < $< > $@
+
+build: $(TARGETS)
+
+install: build
+ install -d $(PYTHON_LIB)
+
+ install -d $(PYTHON_LIB)/mllib
+ install -pm 0644 LICENSE.txt NOTICE.txt $(BUILD)/mllib/*.* $(PYTHON_LIB)/mllib
+ $(PYCC) $(PYTHON_LIB)/mllib
+
+ install -d $(PYTHON_LIB)/qpid
+ install -pm 0644 LICENSE.txt NOTICE.txt README.txt $(BUILD)/qpid/*.* $(PYTHON_LIB)/qpid
+ TDIR=$(shell mktemp -d) && \
+ sed s@AMQP_SPEC_DIR=.*@AMQP_SPEC_DIR='"$(AMQP_SPEC_DIR)"'@ \
+ $(BUILD)/qpid_config.py > $${TDIR}/qpid_config.py && \
+ install -pm 0644 $${TDIR}/qpid_config.py $(PYTHON_LIB) && \
+ rm -rf $${TDIR}
+
+ install -d $(PYTHON_LIB)/qpid/tests
+ install -pm 0644 $(BUILD)/qpid/tests/*.* $(PYTHON_LIB)/qpid/tests
+ $(PYCC) $(PYTHON_LIB)/qpid
+
+ install -d $(PYTHON_LIB)/qmf
+ install -pm 0644 LICENSE.txt NOTICE.txt qmf/*.* $(PYTHON_LIB)/qmf
+ $(PYCC) $(PYTHON_LIB)/qmf
+
+ install -d $(PYTHON_LIB)/tests
+ install -pm 0644 $(BUILD)/tests/*.* $(PYTHON_LIB)/tests
+ $(PYCC) $(PYTHON_LIB)/tests
+
+ install -d $(PYTHON_LIB)/tests_0-8
+ install -pm 0644 $(BUILD)/tests_0-8/*.* $(PYTHON_LIB)/tests_0-8
+ $(PYCC) $(PYTHON_LIB)/tests_0-8
+
+ install -d $(PYTHON_LIB)/tests_0-9
+ install -pm 0644 $(BUILD)/tests_0-9/*.* $(PYTHON_LIB)/tests_0-9
+ $(PYCC) $(PYTHON_LIB)/tests_0-9
+
+ install -d $(PYTHON_LIB)/tests_0-10
+ install -pm 0644 $(BUILD)/tests_0-10/*.* $(PYTHON_LIB)/tests_0-10
+ $(PYCC) $(PYTHON_LIB)/tests_0-10
+
+ install -d $(EXEC_PREFIX)
+ install -pm 0755 qpid-python-test commands/* $(EXEC_PREFIX)
+
+clean:
+ rm -rf $(BUILD)
diff --git a/qpid/python/commands/qpid-config b/qpid/python/commands/qpid-config
index c4ea5c5f2d..4cf9505c58 100755
--- a/qpid/python/commands/qpid-config
+++ b/qpid/python/commands/qpid-config
@@ -110,6 +110,12 @@ def Usage ():
print " --force-if-not-empty Force delete of queue even if it's not empty"
print " --force-if-used Force delete of queue even if it's currently used"
print
+ print "Add Exchange <type> values:"
+ print " direct Direct exchange for point-to-point communication"
+ print " fanout Fanout exchange for broadcast communication"
+ print " topic Topic exchange that routes messages using binding keys with wildcards"
+ print " headers Headers exchange that matches header fields against the binding keys"
+ print
print "Add Exchange Options:"
print " --alternate-exchange [name of the alternate exchange]"
print " In the event that a message cannot be routed, this is the name of the exchange to"
@@ -190,6 +196,8 @@ class BrokerManager:
if ex.durable: print "--durable",
if MSG_SEQUENCE in args and args[MSG_SEQUENCE] == 1: print "--sequence",
if IVE in args and args[IVE] == 1: print "--ive",
+ if ex.altExchange:
+ print "--alternate-exchange=%s" % ex._altExchange_.name,
print
def ExchangeListRecurse (self, filter):
diff --git a/qpid/python/preppy b/qpid/python/preppy
new file mode 100755
index 0000000000..22893dad03
--- /dev/null
+++ b/qpid/python/preppy
@@ -0,0 +1,67 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os, re, sys
+
+ann = re.compile(r"([ \t]*)@([_a-zA-Z][_a-zA-Z0-9]*)([ \t\n\r]+def[ \t]+)([_a-zA-Z][_a-zA-Z0-9]*)")
+line = re.compile(r"\n([ \t]*)[^ \t\n#]+")
+
+if len(sys.argv) == 2:
+ major, minor = [int(p) for p in sys.argv[1].split(".")]
+elif len(sys.argv) == 1:
+ major, minor = sys.version_info[0:2]
+else:
+ print "usage: %s [ version ] < input.py > output.py" % sys.argv[0]
+ sys.exit(-1)
+
+if major <= 2 and minor <= 3:
+ def process(input):
+ output = ""
+ pos = 0
+ while True:
+ m = ann.search(input, pos)
+ if m:
+ indent, decorator, idef, function = m.groups()
+ output += input[pos:m.start()]
+ output += "%s#@%s%s%s" % (indent, decorator, idef, function)
+ pos = m.end()
+
+ subst = "\n%s%s = %s(%s)\n" % (indent, function, decorator, function)
+ npos = pos
+ while True:
+ n = line.search(input, npos)
+ if not n:
+ input += subst
+ break
+ if len(n.group(1)) <= len(indent):
+ idx = n.start()
+ input = input[:idx] + subst + input[idx:]
+ break
+ npos = n.end()
+ else:
+ break
+
+ output += input[pos:]
+ return output
+else:
+ def process(input):
+ return input
+
+sys.stdout.write(process(sys.stdin.read()))
diff --git a/qpid/python/qpid/compat.py b/qpid/python/qpid/compat.py
index 26f60fb8aa..49273193df 100644
--- a/qpid/python/qpid/compat.py
+++ b/qpid/python/qpid/compat.py
@@ -26,3 +26,10 @@ try:
from socket import SHUT_RDWR
except ImportError:
SHUT_RDWR = 2
+
+try:
+ from traceback import format_exc
+except ImportError:
+ import sys, traceback
+ def format_exc():
+ return "".join(traceback.format_exception(*sys.exc_info()))
diff --git a/qpid/python/qpid/datatypes.py b/qpid/python/qpid/datatypes.py
index bba3f5b9ab..f832ddae34 100644
--- a/qpid/python/qpid/datatypes.py
+++ b/qpid/python/qpid/datatypes.py
@@ -132,7 +132,7 @@ class Serial:
return hash(self.value)
def __cmp__(self, other):
- if other is None:
+ if other.__class__ not in (int, long, Serial):
return 1
other = serial(other)
@@ -150,7 +150,10 @@ class Serial:
return Serial(self.value + other)
def __sub__(self, other):
- return Serial(self.value - other)
+ if isinstance(other, Serial):
+ return self.value - other.value
+ else:
+ return Serial(self.value - other)
def __repr__(self):
return "serial(%s)" % self.value
@@ -169,7 +172,7 @@ class Range:
def __contains__(self, n):
return self.lower <= n and n <= self.upper
-
+
def __iter__(self):
i = self.lower
while i <= self.upper:
@@ -230,7 +233,7 @@ class RangedSet:
def add(self, lower, upper = None):
self.add_range(Range(lower, upper))
-
+
def __iter__(self):
return iter(self.ranges)
diff --git a/qpid/python/qpid/management.py b/qpid/python/qpid/management.py
index c006da76f5..65807548e6 100644
--- a/qpid/python/qpid/management.py
+++ b/qpid/python/qpid/management.py
@@ -234,8 +234,7 @@ class managementClient:
#========================================================
# User API - interacts with the class's user
#========================================================
- def __init__ (self, amqpSpec, ctrlCb=None, configCb=None, instCb=None, methodCb=None, closeCb=None):
- self.spec = amqpSpec
+ def __init__ (self, unused=None, ctrlCb=None, configCb=None, instCb=None, methodCb=None, closeCb=None):
self.ctrlCb = ctrlCb
self.configCb = configCb
self.instCb = instCb
@@ -268,7 +267,7 @@ class managementClient:
self.channels.append (mch)
self.incOutstanding (mch)
- codec = Codec (self.spec)
+ codec = Codec ()
self.setHeader (codec, ord ('B'))
msg = mch.message(codec.encoded)
mch.send ("qpid.management", msg)
@@ -285,7 +284,7 @@ class managementClient:
def getObjects (self, channel, userSequence, className, bank=0):
""" Request immediate content from broker """
- codec = Codec (self.spec)
+ codec = Codec ()
self.setHeader (codec, ord ('G'), userSequence)
ft = {}
ft["_class"] = className
@@ -353,7 +352,7 @@ class managementClient:
#========================================================
def topicCb (self, ch, msg):
""" Receive messages via the topic queue of a particular channel. """
- codec = Codec (self.spec, msg.body)
+ codec = Codec (msg.body)
while True:
hdr = self.checkHeader (codec)
if hdr == None:
@@ -372,7 +371,7 @@ class managementClient:
def replyCb (self, ch, msg):
""" Receive messages via the reply queue of a particular channel. """
- codec = Codec (self.spec, msg.body)
+ codec = Codec (msg.body)
hdr = self.checkHeader (codec)
if hdr == None:
return
@@ -498,7 +497,7 @@ class managementClient:
data = codec.read_uuid ()
elif typecode == 15: # FTABLE
data = {}
- sc = Codec(codec.spec, codec.read_vbin32())
+ sc = Codec(codec.read_vbin32())
if sc.encoded:
count = sc.read_uint32()
while count > 0:
@@ -599,7 +598,7 @@ class managementClient:
self.ctrlCb (ch.context, self.CTRL_BROKER_INFO, ch.brokerInfo)
# Send a package request
- sendCodec = Codec (self.spec)
+ sendCodec = Codec ()
seq = self.seqMgr.reserve ("outstanding")
self.setHeader (sendCodec, ord ('P'), seq)
smsg = ch.message(sendCodec.encoded)
@@ -611,7 +610,7 @@ class managementClient:
self.packages[pname] = {}
# Send a class request
- sendCodec = Codec (self.spec)
+ sendCodec = Codec ()
seq = self.seqMgr.reserve ("outstanding")
self.setHeader (sendCodec, ord ('Q'), seq)
self.incOutstanding (ch)
@@ -631,7 +630,7 @@ class managementClient:
if (cname, hash) not in self.packages[pname]:
# Send a schema request
- sendCodec = Codec (self.spec)
+ sendCodec = Codec ()
seq = self.seqMgr.reserve ("outstanding")
self.setHeader (sendCodec, ord ('S'), seq)
self.incOutstanding (ch)
@@ -885,7 +884,7 @@ class managementClient:
def method (self, channel, userSequence, objId, classId, methodName, args):
""" Invoke a method on an object """
- codec = Codec (self.spec)
+ codec = Codec ()
sequence = self.seqMgr.reserve ((userSequence, classId, methodName))
self.setHeader (codec, ord ('M'), sequence)
objId.encode(codec)
diff --git a/qpid/python/qpid/managementdata.py b/qpid/python/qpid/managementdata.py
index c0d32d46cf..655ee13464 100644
--- a/qpid/python/qpid/managementdata.py
+++ b/qpid/python/qpid/managementdata.py
@@ -212,7 +212,6 @@ class ManagementData:
sock.settimeout(10)
self.conn = Connection (sock,
username=self.broker.username, password=self.broker.password)
- self.spec = self.conn.spec
def aborted():
raise Timeout("Waiting for connection to be established with broker")
oldAborted = self.conn.aborted
@@ -223,7 +222,7 @@ class ManagementData:
sock.settimeout(oldTimeout)
self.conn.aborted = oldAborted
- self.mclient = managementClient (self.spec, self.ctrlHandler, self.configHandler,
+ self.mclient = managementClient ("unused", self.ctrlHandler, self.configHandler,
self.instHandler, self.methodReply, self.closeHandler)
self.mclient.schemaListener (self.schemaHandler)
self.mch = self.mclient.addChannel (self.conn.session(self.sessionId))
diff --git a/qpid/python/qpid/messaging.py b/qpid/python/qpid/messaging.py
index 9b3fecbf9b..3c41a2c417 100644
--- a/qpid/python/qpid/messaging.py
+++ b/qpid/python/qpid/messaging.py
@@ -30,12 +30,13 @@ Areas that still need work:
- protocol negotiation/multiprotocol impl
"""
-import connection, time, socket, sys, traceback
+import connection, time, socket, sys, compat
from codec010 import StringCodec
-from datatypes import timestamp, uuid4, RangedSet, Message as Message010
+from datatypes import timestamp, uuid4, RangedSet, Message as Message010, Serial
+from exceptions import Timeout
from logging import getLogger
-from ops import PRIMITIVE
-from session import Client, INCOMPLETE
+from ops import PRIMITIVE, delivery_mode
+from session import Client, INCOMPLETE, SessionDetached
from threading import Thread, RLock, Condition
from util import connect
@@ -101,7 +102,10 @@ class Constant:
UNLIMITED = Constant("UNLIMITED", 0xFFFFFFFFL)
-class ConnectError(Exception):
+class ConnectionError(Exception):
+ pass
+
+class ConnectError(ConnectionError):
pass
class Connection(Lockable):
@@ -142,12 +146,35 @@ class Connection(Lockable):
self.host = host
self.port = default(port, AMQP_PORT)
self.started = False
- self._conn = None
self.id = str(uuid4())
self.session_counter = 0
self.sessions = {}
+ self.reconnect = False
+ self._connected = False
self._lock = RLock()
self._condition = Condition(self._lock)
+ self._modcount = Serial(0)
+ self.error = None
+ self._driver = Driver(self)
+ self._driver.start()
+
+ def wakeup(self):
+ self._modcount += 1
+ self._driver.wakeup()
+
+ def catchup(self, exc=ConnectionError):
+ mc = self._modcount
+ self.wait(lambda: not self._driver._modcount < mc)
+ self.check_error(exc)
+
+ def check_error(self, exc=ConnectionError):
+ if self.error:
+ raise exc(*self.error)
+
+ def ewait(self, predicate, timeout=None, exc=ConnectionError):
+ result = self.wait(lambda: self.error or predicate(), timeout)
+ self.check_error(exc)
+ return result
@synchronized
def session(self, name=None, transactional=False):
@@ -173,8 +200,7 @@ class Connection(Lockable):
else:
ssn = Session(self, name, self.started, transactional=transactional)
self.sessions[name] = ssn
- if self._conn is not None:
- ssn._attach()
+ self.wakeup()
return ssn
@synchronized
@@ -186,38 +212,25 @@ class Connection(Lockable):
"""
Connect to the remote endpoint.
"""
- if self._conn is not None:
- return
- try:
- self._socket = connect(self.host, self.port)
- except socket.error, e:
- raise ConnectError(e)
- self._conn = connection.Connection(self._socket)
- try:
- self._conn.start()
- except connection.VersionError, e:
- raise ConnectError(e)
-
- for ssn in self.sessions.values():
- ssn._attach()
+ self._connected = True
+ self.wakeup()
+ self.ewait(lambda: self._driver._connected, exc=ConnectError)
@synchronized
def disconnect(self):
"""
Disconnect from the remote endpoint.
"""
- if self._conn is not None:
- self._conn.close()
- self._conn = None
- for ssn in self.sessions.values():
- ssn._disconnected()
+ self._connected = False
+ self.wakeup()
+ self.ewait(lambda: not self._driver._connected)
@synchronized
def connected(self):
"""
Return true if the connection is connected, false otherwise.
"""
- return self._conn is not None
+ return self._connected
@synchronized
def start(self):
@@ -255,6 +268,7 @@ class Pattern:
def __init__(self, value):
self.value = value
+ # XXX: this should become part of the driver
def _bind(self, ssn, exchange, queue):
ssn.exchange_bind(exchange=exchange, queue=queue,
binding_key=self.value.replace("*", "#"))
@@ -263,13 +277,33 @@ FILTER_DEFAULTS = {
"topic": Pattern("*")
}
-def delegate(session):
+def delegate(handler, session):
class Delegate(Client):
def message_transfer(self, cmd):
- session._message_transfer(cmd)
+ handler._message_transfer(session, cmd)
return Delegate
+class SessionError(Exception):
+ pass
+
+class Disconnected(SessionError):
+ """
+ Exception raised when an operation is attempted that is illegal when
+ disconnected.
+ """
+ pass
+
+class NontransactionalSession(SessionError):
+ """
+ Exception raised when commit or rollback is attempted on a non
+ transactional session.
+ """
+ pass
+
+class TransactionAborted(SessionError):
+ pass
+
class Session(Lockable):
"""
@@ -281,18 +315,26 @@ class Session(Lockable):
self.connection = connection
self.name = name
self.started = started
+
self.transactional = transactional
- self._ssn = None
+
+ self.committing = False
+ self.committed = True
+ self.aborting = False
+ self.aborted = False
+
self.senders = []
self.receivers = []
- self.closing = False
+ self.outgoing = []
self.incoming = []
- self.closed = False
self.unacked = []
- if self.transactional:
- self.acked = []
- self._lock = RLock()
- self._condition = Condition(self._lock)
+ self.acked = []
+
+ self.closing = False
+ self.closed = False
+
+ self._lock = connection._lock
+ self._condition = connection._condition
self.thread = Thread(target = self.run)
self.thread.setDaemon(True)
self.thread.start()
@@ -300,60 +342,17 @@ class Session(Lockable):
def __repr__(self):
return "<Session %s>" % self.name
- def _attach(self):
- self._ssn = self.connection._conn.session(self.name, delegate=delegate(self))
- self._ssn.auto_sync = False
- self._ssn.invoke_lock = self._lock
- self._ssn.lock = self._lock
- self._ssn.condition = self._condition
- if self.transactional:
- self._ssn.tx_select()
- for link in self.senders + self.receivers:
- link._link()
-
- def _disconnected(self):
- self._ssn = None
- for link in self.senders + self.receivers:
- link._disconnected()
+ def wakeup(self):
+ self.connection.wakeup()
- @synchronized
- def _message_transfer(self, cmd):
- m = Message010(cmd.payload)
- m.headers = cmd.headers
- m.id = cmd.id
- msg = self._decode(m)
- rcv = self.receivers[int(cmd.destination)]
- msg._receiver = rcv
- log.debug("RECV [%s] %s", self, msg)
- self.incoming.append(msg)
- self.notifyAll()
- return INCOMPLETE
+ def catchup(self, exc=SessionError):
+ self.connection.catchup(exc)
- def _decode(self, message):
- dp = message.get("delivery_properties")
- mp = message.get("message_properties")
- ap = mp.application_headers
- enc, dec = get_codec(mp.content_type)
- content = dec(message.body)
- msg = Message(content)
- msg.id = mp.message_id
- if ap is not None:
- msg.to = ap.get("to")
- msg.subject = ap.get("subject")
- msg.user_id = mp.user_id
- if mp.reply_to is not None:
- msg.reply_to = reply_to2addr(mp.reply_to)
- msg.correlation_id = mp.correlation_id
- msg.properties = mp.application_headers
- msg.content_type = mp.content_type
- msg._transfer_id = message.id
- return msg
+ def check_error(self, exc=SessionError):
+ self.connection.check_error(exc)
- def _exchange_query(self, address):
- # XXX: auto sync hack is to avoid deadlock on future
- result = self._ssn.exchange_query(name=address, sync=True)
- self._ssn.sync()
- return result.get()
+ def ewait(self, predicate, timeout=None, exc=SessionError):
+ return self.connection.ewait(predicate, timeout, exc)
@synchronized
def sender(self, target):
@@ -368,8 +367,11 @@ class Session(Lockable):
"""
sender = Sender(self, len(self.senders), target)
self.senders.append(sender)
- if self._ssn is not None:
- sender._link()
+ self.wakeup()
+ # XXX: because of the lack of waiting here we can end up getting
+ # into the driver loop with messages sent for senders that haven't
+ # been linked yet, something similar can probably happen for
+ # receivers
return sender
@synchronized
@@ -386,8 +388,7 @@ class Session(Lockable):
receiver = Receiver(self, len(self.receivers), source, filter,
self.started)
self.receivers.append(receiver)
- if self._ssn is not None:
- receiver._link()
+ self.wakeup()
return receiver
@synchronized
@@ -419,6 +420,7 @@ class Session(Lockable):
timeout):
msg = self._pop(predicate)
if msg is not None:
+ msg._receiver.returned += 1
self.unacked.append(msg)
log.debug("RETR [%s] %s", self, msg)
return msg
@@ -438,20 +440,13 @@ class Session(Lockable):
else:
messages = [message]
- ids = RangedSet(*[m._transfer_id for m in messages])
- for range in ids:
- self._ssn.receiver._completed.add_range(range)
- self._ssn.channel.session_completed(self._ssn.receiver._completed)
- self._ssn.message_accept(ids, sync=True)
- self._ssn.sync()
-
for m in messages:
- try:
- self.unacked.remove(m)
- except ValueError:
- pass
- if self.transactional:
- self.acked.append(m)
+ self.unacked.remove(m)
+ self.acked.append(m)
+
+ self.wakeup()
+ self.wait(lambda: self.connection.error or not [m for m in messages if m in self.acked])
+ self.check_error()
@synchronized
def commit(self):
@@ -461,11 +456,12 @@ class Session(Lockable):
"""
if not self.transactional:
raise NontransactionalSession()
- if self._ssn is None:
- raise Disconnected()
- self._ssn.tx_commit(sync=True)
- del self.acked[:]
- self._ssn.sync()
+ self.committing = True
+ self.wakeup()
+ self.ewait(lambda: not self.committing)
+ if self.aborted:
+ raise TransactionAborted()
+ assert self.committed
@synchronized
def rollback(self):
@@ -475,21 +471,10 @@ class Session(Lockable):
"""
if not self.transactional:
raise NontransactionalSession()
- if self._ssn is None:
- raise Disconnected()
-
- ids = RangedSet(*[m._transfer_id for m in self.acked + self.unacked + self.incoming])
- for range in ids:
- self._ssn.receiver._completed.add_range(range)
- self._ssn.channel.session_completed(self._ssn.receiver._completed)
- self._ssn.message_release(ids)
- self._ssn.tx_rollback(sync=True)
-
- del self.incoming[:]
- del self.unacked[:]
- del self.acked[:]
-
- self._ssn.sync()
+ self.aborting = True
+ self.wakeup()
+ self.ewait(lambda: not self.aborting)
+ assert self.aborted
@synchronized
def start(self):
@@ -538,13 +523,12 @@ class Session(Lockable):
link.close()
self.closing = True
- self.notifyAll()
+ self.wakeup()
+ self.catchup()
self.wait(lambda: self.closed)
while self.thread.isAlive():
self.thread.join(3)
self.thread = None
- self._ssn.close()
- self._ssn = None
self.connection._remove_session(self)
def parse_addr(address):
@@ -562,18 +546,7 @@ def reply_to2addr(reply_to):
else:
return "%s/%s" % (reply_to.exchange, reply_to.routing_key)
-class Disconnected(Exception):
- """
- Exception raised when an operation is attempted that is illegal when
- disconnected.
- """
- pass
-
-class NontransactionalSession(Exception):
- """
- Exception raised when commit or rollback is attempted on a non
- transactional session.
- """
+class SendError(SessionError):
pass
class Sender(Lockable):
@@ -587,29 +560,20 @@ class Sender(Lockable):
self.index = index
self.target = target
self.closed = False
- self._ssn = None
- self._exchange = None
- self._routing_key = None
- self._subject = None
self._lock = self.session._lock
self._condition = self.session._condition
- def _link(self):
- self._ssn = self.session._ssn
- node, self._subject = parse_addr(self.target)
- result = self.session._exchange_query(node)
- if result.not_found:
- # XXX: should check 'create' option
- self._ssn.queue_declare(queue=node, durable=False, sync=True)
- self._ssn.sync()
- self._exchange = ""
- self._routing_key = node
- else:
- self._exchange = node
- self._routing_key = self._subject
+ def wakeup(self):
+ self.session.wakeup()
- def _disconnected(self):
- self._ssn = None
+ def catchup(self, exc=SendError):
+ self.session.catchup(exc)
+
+ def check_error(self, exc=SendError):
+ self.session.check_error(exc)
+
+ def ewait(self, predicate, timeout=None, exc=SendError):
+ return self.session.ewait(predicate, timeout, exc)
@synchronized
def send(self, object):
@@ -623,56 +587,35 @@ class Sender(Lockable):
@param object: the message or content to send
"""
- if self._ssn is None:
+ if not self.session.connection._connected or self.session.closing:
raise Disconnected()
if isinstance(object, Message):
message = object
else:
message = Message(object)
- # XXX: what if subject is specified for a normal queue?
- if self._routing_key is None:
- rk = message.subject
- else:
- rk = self._routing_key
- # XXX: do we need to query to figure out how to create the reply-to interoperably?
- if message.reply_to:
- rt = self._ssn.reply_to(*parse_addr(message.reply_to))
- else:
- rt = None
- dp = self._ssn.delivery_properties(routing_key=rk)
- mp = self._ssn.message_properties(message_id=message.id,
- user_id=message.user_id,
- reply_to=rt,
- correlation_id=message.correlation_id,
- content_type=message.content_type,
- application_headers=message.properties)
- if message.subject is not None:
- if mp.application_headers is None:
- mp.application_headers = {}
- mp.application_headers["subject"] = message.subject
- if message.to is not None:
- if mp.application_headers is None:
- mp.application_headers = {}
- mp.application_headers["to"] = message.to
- enc, dec = get_codec(message.content_type)
- body = enc(message.content)
- self._ssn.message_transfer(destination=self._exchange,
- message=Message010(dp, mp, body),
- sync=True)
- log.debug("SENT [%s] %s", self.session, message)
- self._ssn.sync()
+
+ # XXX: what if we send the same message to multiple senders?
+ message._sender = self
+ self.session.outgoing.append(message)
+
+ self.wakeup()
+ self.ewait(lambda: message not in self.session.outgoing)
@synchronized
def close(self):
"""
Close the Sender.
"""
+ # XXX: should make driver do something here
if not self.closed:
self.session.senders.remove(self)
self.closed = True
-class Empty(Exception):
+class ReceiveError(SessionError):
+ pass
+
+class Empty(ReceiveError):
"""
Exception raised by L{Receiver.fetch} when there is no message
available within the alloted time.
@@ -693,43 +636,36 @@ class Receiver(Lockable):
self.destination = str(self.index)
self.source = source
self.filter = filter
+
self.started = started
self.capacity = UNLIMITED
+ self.granted = Serial(0)
+ self.drain = False
+ self.impending = Serial(0)
+ self.received = Serial(0)
+ self.returned = Serial(0)
+
+ self.closing = False
self.closed = False
self.listener = None
- self._ssn = None
- self._queue = None
self._lock = self.session._lock
self._condition = self.session._condition
- def _link(self):
- self._ssn = self.session._ssn
- result = self.session._exchange_query(self.source)
- if result.not_found:
- self._queue = self.source
- # XXX: should check 'create' option
- self._ssn.queue_declare(queue=self._queue, durable=False)
- else:
- self._queue = "%s.%s" % (self.session.name, self.destination)
- self._ssn.queue_declare(queue=self._queue, durable=False, exclusive=True, auto_delete=True)
- if self.filter is None:
- f = FILTER_DEFAULTS[result.type]
- else:
- f = self.filter
- f._bind(self._ssn, self.source, self._queue)
- self._ssn.message_subscribe(queue=self._queue, destination=self.destination,
- sync=True)
- self._ssn.message_set_flow_mode(self.destination, self._ssn.flow_mode.credit)
- self._ssn.sync()
- if self.started:
- self._start()
+ def wakeup(self):
+ self.session.wakeup()
- def _disconnected(self):
- self._ssn = None
+ def catchup(self, exc=ReceiveError):
+ self.session.catchup()
+
+ def check_error(self, exc=ReceiveError):
+ self.session.check_error(exc)
+
+ def ewait(self, predicate, timeout=None, exc=ReceiveError):
+ return self.session.ewait(predicate, timeout, exc)
@synchronized
def pending(self):
- return self.session._count(self._pred)
+ return self.received - self.returned
def _capacity(self):
if not self.started:
@@ -762,23 +698,36 @@ class Receiver(Lockable):
@type timeout: float
@param timeout: the time to wait for a message to be available
"""
- if self.capacity is not UNLIMITED or not self.started:
- self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte,
- UNLIMITED.value)
- self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, 1)
+ if self._capacity() == 0:
+ self.granted = self.returned + 1
+ self.wakeup()
+ self.ewait(lambda: self.impending == self.granted)
msg = self.session._get(self._pred, timeout=timeout)
if msg is None:
- self._ssn.message_flush(self.destination)
- self._start()
- self._ssn.sync()
+ self.drain = True
+ self.granted = self.received
+ self.wakeup()
+ self.ewait(lambda: self.impending == self.received)
+ self.drain = False
+ self._grant()
+ self.wakeup()
msg = self.session._get(self._pred, timeout=0)
if msg is None:
raise Empty()
+ elif self._capacity() not in (0, UNLIMITED.value):
+ self.granted += 1
+ self.wakeup()
return msg
- def _start(self):
- self._ssn.message_flow(self.destination, self._ssn.credit_unit.byte, UNLIMITED.value)
- self._ssn.message_flow(self.destination, self._ssn.credit_unit.message, self._capacity())
+ def _grant(self):
+ if self.started:
+ if self.capacity is UNLIMITED:
+ self.granted = UNLIMITED
+ else:
+ self.granted = self.received + self._capacity()
+ else:
+ self.granted = self.received
+
@synchronized
def start(self):
@@ -786,34 +735,31 @@ class Receiver(Lockable):
Start incoming message delivery for this receiver.
"""
self.started = True
- if self._ssn is not None:
- self._start()
-
- def _stop(self):
- self._ssn.message_stop(self.destination)
+ self._grant()
+ self.wakeup()
@synchronized
def stop(self):
"""
Stop incoming message delivery for this receiver.
"""
- if self._ssn is not None:
- self._stop()
self.started = False
+ self._grant()
+ self.wakeup()
+ self.ewait(lambda: self.impending == self.received)
@synchronized
def close(self):
"""
Close the receiver.
"""
- if not self.closed:
- self.closed = True
- self._ssn.message_cancel(self.destination, sync=True)
- self._ssn.sync()
+ self.closing = True
+ self.wakeup()
+ try:
+ self.ewait(lambda: self.closed)
+ finally:
self.session.receivers.remove(self)
-
-
def codec(name):
type = PRIMITIVE[name]
@@ -889,6 +835,7 @@ class Message:
self.to = None
self.reply_to = None
self.correlation_id = None
+ self.durable = False
self.properties = {}
self.content_type = get_type(content)
self.content = content
@@ -896,5 +843,391 @@ class Message:
def __repr__(self):
return "Message(%r)" % self.content
+class Attachment:
+
+ def __init__(self, target):
+ self.target = target
+
+DURABLE_DEFAULT=True
+
+class Driver(Lockable):
+
+ def __init__(self, connection):
+ self.connection = connection
+ self._lock = self.connection._lock
+ self._condition = self.connection._condition
+ self._wakeup_cond = Condition()
+ self._socket = None
+ self._conn = None
+ self._connected = False
+ self._attachments = {}
+ self._modcount = self.connection._modcount
+ self.thread = Thread(target=self.run)
+ self.thread.setDaemon(True)
+ # XXX: need to figure out how to join on this thread
+
+ def start(self):
+ self.thread.start()
+
+ def wakeup(self):
+ self._wakeup_cond.acquire()
+ try:
+ self._wakeup_cond.notifyAll()
+ finally:
+ self._wakeup_cond.release()
+
+ def start(self):
+ self.thread.start()
+
+ def run(self):
+ while True:
+ self._wakeup_cond.acquire()
+ try:
+ if self.connection._modcount <= self._modcount:
+ self._wakeup_cond.wait(10)
+ finally:
+ self._wakeup_cond.release()
+ self.dispatch(self.connection._modcount)
+
+ @synchronized
+ def dispatch(self, modcount):
+ try:
+ if self._conn is None and self.connection._connected:
+ self.connect()
+ elif self._conn is not None and not self.connection._connected:
+ self.disconnect()
+
+ if self._conn is not None:
+ for ssn in self.connection.sessions.values():
+ self.attach(ssn)
+ self.process(ssn)
+
+ exi = None
+ except:
+ exi = sys.exc_info()
+
+ if exi:
+ msg = compat.format_exc()
+ recoverable = ["aborted", "Connection refused", "SessionDetached", "Connection reset by peer",
+ "Bad file descriptor", "start timed out", "Broken pipe"]
+ for r in recoverable:
+ if self.connection.reconnect and r in msg:
+ print "waiting to retry"
+ self.reset()
+ time.sleep(3)
+ print "retrying..."
+ return
+ else:
+ self.connection.error = (msg,)
+
+ self._modcount = modcount
+ self.notifyAll()
+
+ def connect(self):
+ if self._conn is not None:
+ return
+ try:
+ self._socket = connect(self.connection.host, self.connection.port)
+ except socket.error, e:
+ raise ConnectError(e)
+ self._conn = connection.Connection(self._socket)
+ try:
+ self._conn.start(timeout=10)
+ self._connected = True
+ except connection.VersionError, e:
+ raise ConnectError(e)
+ except Timeout:
+ print "start timed out"
+ raise ConnectError("start timed out")
+
+ def disconnect(self):
+ self._conn.close()
+ self.reset()
+
+ def reset(self):
+ self._conn = None
+ self._connected = False
+ self._attachments.clear()
+ for ssn in self.connection.sessions.values():
+ for m in ssn.acked + ssn.unacked + ssn.incoming:
+ m._transfer_id = None
+ for rcv in ssn.receivers:
+ rcv.impending = rcv.received
+
+ def connected(self):
+ return self._conn is not None
+
+ def attach(self, ssn):
+ _ssn = self._attachments.get(ssn)
+ if _ssn is None:
+ _ssn = self._conn.session(ssn.name, delegate=delegate(self, ssn))
+ _ssn.auto_sync = False
+ _ssn.invoke_lock = self._lock
+ _ssn.lock = self._lock
+ _ssn.condition = self._condition
+ if ssn.transactional:
+ # XXX: adding an attribute to qpid.session.Session
+ _ssn.acked = []
+ _ssn.tx_select()
+ self._attachments[ssn] = _ssn
+
+ for snd in ssn.senders:
+ self.link_out(snd)
+ for rcv in ssn.receivers:
+ self.link_in(rcv)
+
+ if ssn.closing:
+ _ssn.close()
+ del self._attachments[ssn]
+
+ def _exchange_query(self, ssn, address):
+ # XXX: auto sync hack is to avoid deadlock on future
+ result = ssn.exchange_query(name=address, sync=True)
+ ssn.sync()
+ return result.get()
+
+ def link_out(self, snd):
+ _ssn = self._attachments[snd.session]
+ _snd = self._attachments.get(snd)
+ if _snd is None:
+ _snd = Attachment(snd)
+ node, _snd._subject = parse_addr(snd.target)
+ result = self._exchange_query(_ssn, node)
+ if result.not_found:
+ # XXX: should check 'create' option
+ _ssn.queue_declare(queue=node, durable=DURABLE_DEFAULT, sync=True)
+ _ssn.sync()
+ _snd._exchange = ""
+ _snd._routing_key = node
+ else:
+ _snd._exchange = node
+ _snd._routing_key = _snd._subject
+ self._attachments[snd] = _snd
+
+ if snd.closed:
+ del self._attachments[snd]
+ return None
+ else:
+ return _snd
+
+ def link_in(self, rcv):
+ _ssn = self._attachments[rcv.session]
+ _rcv = self._attachments.get(rcv)
+ if _rcv is None:
+ _rcv = Attachment(rcv)
+ result = self._exchange_query(_ssn, rcv.source)
+ if result.not_found:
+ _rcv._queue = rcv.source
+ # XXX: should check 'create' option
+ _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT)
+ else:
+ _rcv._queue = "%s.%s" % (rcv.session.name, rcv.destination)
+ _ssn.queue_declare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)
+ if rcv.filter is None:
+ f = FILTER_DEFAULTS[result.type]
+ else:
+ f = rcv.filter
+ f._bind(_ssn, rcv.source, _rcv._queue)
+ _ssn.message_subscribe(queue=_rcv._queue, destination=rcv.destination)
+ _ssn.message_set_flow_mode(rcv.destination, _ssn.flow_mode.credit, sync=True)
+ self._attachments[rcv] = _rcv
+ # XXX: need to kill syncs
+ _ssn.sync()
+
+ if rcv.closing:
+ _ssn.message_cancel(rcv.destination, sync=True)
+ # XXX: need to kill syncs
+ _ssn.sync()
+ del self._attachments[rcv]
+ rcv.closed = True
+ return None
+ else:
+ return _rcv
+
+ def process(self, ssn):
+ if ssn.closing: return
+
+ _ssn = self._attachments[ssn]
+
+ while ssn.outgoing:
+ msg = ssn.outgoing[0]
+ snd = msg._sender
+ self.send(snd, msg)
+ ssn.outgoing.pop(0)
+
+ for rcv in ssn.receivers:
+ self.process_receiver(rcv)
+
+ if ssn.acked:
+ messages = ssn.acked[:]
+ ids = RangedSet(*[m._transfer_id for m in messages if m._transfer_id is not None])
+ for range in ids:
+ _ssn.receiver._completed.add_range(range)
+ ch = _ssn.channel
+ if ch is None:
+ raise SessionDetached()
+ ch.session_completed(_ssn.receiver._completed)
+ _ssn.message_accept(ids, sync=True)
+ # XXX: really need to make this async so that we don't give up the lock
+ _ssn.sync()
+
+ for m in messages:
+ ssn.acked.remove(m)
+ if ssn.transactional:
+ _ssn.acked.append(m)
+
+ if ssn.committing:
+ _ssn.tx_commit(sync=True)
+ # XXX: need to kill syncs
+ _ssn.sync()
+ del _ssn.acked[:]
+ ssn.committing = False
+ ssn.committed = True
+ ssn.aborting = False
+ ssn.aborted = False
+
+ if ssn.aborting:
+ for rcv in ssn.receivers:
+ _ssn.message_stop(rcv.destination)
+ _ssn.sync()
+
+ messages = _ssn.acked + ssn.unacked + ssn.incoming
+ ids = RangedSet(*[m._transfer_id for m in messages])
+ for range in ids:
+ _ssn.receiver._completed.add_range(range)
+ _ssn.channel.session_completed(_ssn.receiver._completed)
+ _ssn.message_release(ids)
+ _ssn.tx_rollback(sync=True)
+ _ssn.sync()
+
+ del ssn.incoming[:]
+ del ssn.unacked[:]
+ del _ssn.acked[:]
+
+ for rcv in ssn.receivers:
+ rcv.impending = rcv.received
+ rcv.returned = rcv.received
+ # XXX: do we need to update granted here as well?
+
+ for rcv in ssn.receivers:
+ self.process_receiver(rcv)
+
+ ssn.aborting = False
+ ssn.aborted = True
+ ssn.committing = False
+ ssn.committed = False
+
+ def grant(self, rcv):
+ _ssn = self._attachments[rcv.session]
+ _rcv = self.link_in(rcv)
+
+ if rcv.granted is UNLIMITED:
+ if rcv.impending is UNLIMITED:
+ delta = 0
+ else:
+ delta = UNLIMITED
+ elif rcv.impending is UNLIMITED:
+ delta = -1
+ else:
+ delta = max(rcv.granted, rcv.received) - rcv.impending
+
+ if delta is UNLIMITED:
+ _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value)
+ _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, UNLIMITED.value)
+ rcv.impending = UNLIMITED
+ elif delta > 0:
+ _ssn.message_flow(rcv.destination, _ssn.credit_unit.byte, UNLIMITED.value)
+ _ssn.message_flow(rcv.destination, _ssn.credit_unit.message, delta)
+ rcv.impending += delta
+ elif delta < 0:
+ if rcv.drain:
+ _ssn.message_flush(rcv.destination, sync=True)
+ else:
+ _ssn.message_stop(rcv.destination, sync=True)
+ # XXX: need to kill syncs
+ _ssn.sync()
+ rcv.impending = rcv.received
+ self.grant(rcv)
+
+ def process_receiver(self, rcv):
+ if rcv.closed: return
+ self.grant(rcv)
+
+ def send(self, snd, msg):
+ _ssn = self._attachments[snd.session]
+ _snd = self.link_out(snd)
+
+ # XXX: what if subject is specified for a normal queue?
+ if _snd._routing_key is None:
+ rk = msg.subject
+ else:
+ rk = _snd._routing_key
+ # XXX: do we need to query to figure out how to create the reply-to interoperably?
+ if msg.reply_to:
+ rt = _ssn.reply_to(*parse_addr(msg.reply_to))
+ else:
+ rt = None
+ dp = _ssn.delivery_properties(routing_key=rk)
+ mp = _ssn.message_properties(message_id=msg.id,
+ user_id=msg.user_id,
+ reply_to=rt,
+ correlation_id=msg.correlation_id,
+ content_type=msg.content_type,
+ application_headers=msg.properties)
+ if msg.subject is not None:
+ if mp.application_headers is None:
+ mp.application_headers = {}
+ mp.application_headers["subject"] = msg.subject
+ if msg.to is not None:
+ if mp.application_headers is None:
+ mp.application_headers = {}
+ mp.application_headers["to"] = msg.to
+ if msg.durable:
+ dp.delivery_mode = delivery_mode.persistent
+ enc, dec = get_codec(msg.content_type)
+ body = enc(msg.content)
+ _ssn.message_transfer(destination=_snd._exchange,
+ message=Message010(dp, mp, body),
+ sync=True)
+ log.debug("SENT [%s] %s", snd.session, msg)
+ # XXX: really need to make this async so that we don't give up the lock
+ _ssn.sync()
+ # XXX: should we log the ack somehow too?
+
+ @synchronized
+ def _message_transfer(self, ssn, cmd):
+ m = Message010(cmd.payload)
+ m.headers = cmd.headers
+ m.id = cmd.id
+ msg = self._decode(m)
+ rcv = ssn.receivers[int(cmd.destination)]
+ msg._receiver = rcv
+ rcv.received += 1
+ log.debug("RECV [%s] %s", ssn, msg)
+ ssn.incoming.append(msg)
+ self.notifyAll()
+ return INCOMPLETE
+
+ def _decode(self, message):
+ dp = message.get("delivery_properties")
+ mp = message.get("message_properties")
+ ap = mp.application_headers
+ enc, dec = get_codec(mp.content_type)
+ content = dec(message.body)
+ msg = Message(content)
+ msg.id = mp.message_id
+ if ap is not None:
+ msg.to = ap.get("to")
+ msg.subject = ap.get("subject")
+ msg.user_id = mp.user_id
+ if mp.reply_to is not None:
+ msg.reply_to = reply_to2addr(mp.reply_to)
+ msg.correlation_id = mp.correlation_id
+ msg.durable = dp.delivery_mode == delivery_mode.persistent
+ msg.properties = mp.application_headers
+ msg.content_type = mp.content_type
+ msg._transfer_id = message.id
+ return msg
+
__all__ = ["Connection", "Pattern", "Session", "Sender", "Receiver", "Message",
"Empty", "timestamp", "uuid4"]
diff --git a/qpid/python/qpid/session.py b/qpid/python/qpid/session.py
index 4413a22899..2f1bd81bd4 100644
--- a/qpid/python/qpid/session.py
+++ b/qpid/python/qpid/session.py
@@ -146,7 +146,8 @@ class Session(command_invoker()):
if self._closing:
raise SessionClosed()
- if self.channel == None:
+ ch = self.channel
+ if ch == None:
raise SessionDetached()
if op == MessageTransfer:
@@ -162,14 +163,12 @@ class Session(command_invoker()):
cmd = op(*args, **kwargs)
cmd.sync = self.auto_sync or cmd.sync
self.need_sync = not cmd.sync
- cmd.channel = self.channel.id
+ cmd.channel = ch.id
if op.RESULT:
result = Future(exception=SessionException)
self.results[self.sender.next_id] = result
- log.debug("SENDING %s", cmd)
-
self.send(cmd)
log.debug("SENT %s", cmd)
@@ -245,13 +244,16 @@ class Sender:
self._completed = RangedSet()
def send(self, cmd):
+ ch = self.session.channel
+ if ch is None:
+ raise SessionDetached()
cmd.id = self.next_id
self.next_id += 1
if self.session.send_id:
self.session.send_id = False
- self.session.channel.session_command_point(cmd.id, 0)
+ ch.session_command_point(cmd.id, 0)
self.commands.append(cmd)
- self.session.channel.connection.write_op(cmd)
+ ch.connection.write_op(cmd)
def completed(self, commands):
idx = 0
diff --git a/qpid/python/qpid/tests/messaging.py b/qpid/python/qpid/tests/messaging.py
index 7706ebbabe..6062895519 100644
--- a/qpid/python/qpid/tests/messaging.py
+++ b/qpid/python/qpid/tests/messaging.py
@@ -84,6 +84,10 @@ class Base(Test):
contents = self.drain(rcv)
assert len(contents) == 0, "%s is supposed to be empty: %s" % (rcv, contents)
+ def assertPending(self, rcv, expected):
+ p = rcv.pending()
+ assert p == expected, "expected %s, got %s" % (expected, p)
+
def sleep(self):
time.sleep(self.delay())
@@ -107,7 +111,8 @@ class SetupTests(Base):
try:
self.conn = Connection.open("localhost", 0)
assert False, "connect succeeded"
- except ConnectError:
+ except ConnectError, e:
+ # XXX: should verify that e includes appropriate diagnostic info
pass
class ConnectionTests(Base):
@@ -237,7 +242,8 @@ class SessionTests(Base):
# were requeued, and ack this time before closing
self.ssn = self.conn.session()
rcv = self.ssn.receiver("test-ack-queue")
- assert contents == self.drain(rcv)
+ drained = self.drain(rcv)
+ assert contents == drained, "expected %s, got %s" % (contents, drained)
self.ssn.acknowledge()
self.ssn.close()
@@ -319,7 +325,8 @@ class SessionTests(Base):
txssn.acknowledge()
else:
txssn.rollback()
- assert contents == self.drain(txrcv)
+ drained = self.drain(txrcv)
+ assert contents == drained, "expected %s, got %s" % (contents, drained)
txssn.acknowledge()
txssn.rollback()
assert contents == self.drain(txrcv)
@@ -401,9 +408,9 @@ class ReceiverTests(Base):
elapsed = time.time() - start
assert elapsed >= self.delay()
- one = self.send("testListen", 1)
- two = self.send("testListen", 2)
- three = self.send("testListen", 3)
+ one = self.send("testFetch", 1)
+ two = self.send("testFetch", 2)
+ three = self.send("testFetch", 3)
msg = self.rcv.fetch(0)
assert msg.content == one
msg = self.rcv.fetch(self.delay())
@@ -467,34 +474,35 @@ class ReceiverTests(Base):
def testCapacity(self):
self.rcv.capacity = 5
self.rcv.start()
- assert self.rcv.pending() == 0
+ self.assertPending(self.rcv, 0)
for i in range(15):
self.send("testCapacity", i)
self.sleep()
- assert self.rcv.pending() == 5
+ self.assertPending(self.rcv, 5)
self.drain(self.rcv, limit = 5)
self.sleep()
- assert self.rcv.pending() == 5
+ self.assertPending(self.rcv, 5)
- self.drain(self.rcv)
- assert self.rcv.pending() == 0
+ drained = self.drain(self.rcv)
+ assert len(drained) == 10
+ self.assertPending(self.rcv, 0)
self.ssn.acknowledge()
def testCapacityUNLIMITED(self):
self.rcv.capacity = UNLIMITED
self.rcv.start()
- assert self.rcv.pending() == 0
+ self.assertPending(self.rcv, 0)
for i in range(10):
self.send("testCapacityUNLIMITED", i)
self.sleep()
- assert self.rcv.pending() == 10
+ self.assertPending(self.rcv, 10)
self.drain(self.rcv)
- assert self.rcv.pending() == 0
+ self.assertPending(self.rcv, 0)
self.ssn.acknowledge()
diff --git a/qpid/python/qpid_config.py b/qpid/python/qpid_config.py
index 3cf6b69b7e..d740a53dfe 100644
--- a/qpid/python/qpid_config.py
+++ b/qpid/python/qpid_config.py
@@ -19,7 +19,7 @@
import os
-qpid_home = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
-amqp_spec = os.path.join(qpid_home, "specs", "amqp.0-10-qpid-errata.xml")
-amqp_spec_0_8 = os.path.join(qpid_home, "specs", "amqp.0-8.xml")
-amqp_spec_0_9 = os.path.join(qpid_home, "specs", "amqp.0-9.xml")
+AMQP_SPEC_DIR=os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "specs")
+amqp_spec = os.path.join(AMQP_SPEC_DIR, "amqp.0-10-qpid-errata.xml")
+amqp_spec_0_8 = os.path.join(AMQP_SPEC_DIR, "amqp.0-8.xml")
+amqp_spec_0_9 = os.path.join(AMQP_SPEC_DIR, "amqp.0-9.xml")
diff --git a/qpid/python/tests/datatypes.py b/qpid/python/tests/datatypes.py
index 1a60bb4107..b00e5e78f8 100644
--- a/qpid/python/tests/datatypes.py
+++ b/qpid/python/tests/datatypes.py
@@ -54,6 +54,19 @@ class SerialTest(TestCase):
d[serial(0)] = "zero"
assert d[0] == "zero"
+ def testAdd(self):
+ assert serial(2) + 2 == serial(4)
+ assert serial(2) + 2 == 4
+
+ def testSub(self):
+ delta = serial(4) - serial(2)
+ assert isinstance(delta, int) or isinstance(delta, long)
+ assert delta == 2
+
+ delta = serial(4) - 2
+ assert isinstance(delta, Serial)
+ assert delta == serial(2)
+
class RangedSetTest(TestCase):
def check(self, ranges):
diff --git a/qpid/python/tests_0-10/management.py b/qpid/python/tests_0-10/management.py
index 5cd0caba40..51c2a687cb 100644
--- a/qpid/python/tests_0-10/management.py
+++ b/qpid/python/tests_0-10/management.py
@@ -29,13 +29,13 @@ class ManagementTest (TestBase010):
Tests for the management hooks
"""
- def disabled_test_broker_connectivity_oldAPI (self):
+ def test_broker_connectivity_oldAPI (self):
"""
Call the "echo" method on the broker to verify it is alive and talking.
"""
session = self.session
- mc = managementClient (session.spec)
+ mc = managementClient ()
mch = mc.addChannel (session)
mc.syncWaitForStable (mch)