summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2012-05-04 15:39:19 +0000
committerKim van der Riet <kpvdr@apache.org>2012-05-04 15:39:19 +0000
commit633c33f224f3196f3f9bd80bd2e418d8143fea06 (patch)
tree1391da89470593209466df68c0b40b89c14963b1 /python
parentc73f9286ebff93a6c8dbc29cf05e258c4b55c976 (diff)
downloadqpid-python-633c33f224f3196f3f9bd80bd2e418d8143fea06.tar.gz
QPID-3858: Updated branch - merged from trunk r.1333987
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/asyncstore@1334037 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r--python/qpid/connection.py13
-rw-r--r--python/qpid/messaging/driver.py6
-rw-r--r--python/qpid/testlib.py12
-rw-r--r--python/qpid/tests/messaging/endpoints.py10
-rwxr-xr-xpython/setup.py2
5 files changed, 33 insertions, 10 deletions
diff --git a/python/qpid/connection.py b/python/qpid/connection.py
index f2c83d113c..66e1cb49be 100644
--- a/python/qpid/connection.py
+++ b/python/qpid/connection.py
@@ -170,6 +170,10 @@ class Connection(Framer):
if not status:
self.detach_all()
break
+ # When we do not use SSL transport, we get periodic
+ # spurious timeout events on the socket. When using SSL,
+ # these events show up as timeout *errors*. Both should be
+ # ignored unless we have aborted.
except socket.timeout:
if self.aborted():
self.close_code = (None, "connection timed out")
@@ -178,9 +182,12 @@ class Connection(Framer):
else:
continue
except socket.error, e:
- self.close_code = (None, str(e))
- self.detach_all()
- break
+ if self.aborted() or str(e) != "The read operation timed out":
+ self.close_code = (None, str(e))
+ self.detach_all()
+ break
+ else:
+ continue
frame_dec.write(data)
seg_dec.write(*frame_dec.read())
op_dec.write(*seg_dec.read())
diff --git a/python/qpid/messaging/driver.py b/python/qpid/messaging/driver.py
index 0358659111..3cb62d75c9 100644
--- a/python/qpid/messaging/driver.py
+++ b/python/qpid/messaging/driver.py
@@ -226,7 +226,11 @@ class LinkIn:
def do_link(self, sst, rcv, _rcv, type, subtype, action):
link_opts = _rcv.options.get("link", {})
- reliability = link_opts.get("reliability", "at-least-once")
+ if type == "topic":
+ default_reliability = "unreliable"
+ else:
+ default_reliability = "at-least-once"
+ reliability = link_opts.get("reliability", default_reliability)
declare = link_opts.get("x-declare", {})
subscribe = link_opts.get("x-subscribe", {})
acq_mode = acquire_mode.pre_acquired
diff --git a/python/qpid/testlib.py b/python/qpid/testlib.py
index 1da53b3378..f9796982f5 100644
--- a/python/qpid/testlib.py
+++ b/python/qpid/testlib.py
@@ -29,6 +29,9 @@ from qpid.message import Message
from qpid.harness import Skipped
from qpid.exceptions import VersionError
+import qpid.messaging
+from qpidtoollibs import BrokerAgent
+
class TestBase(unittest.TestCase):
"""Base class for Qpid test cases.
@@ -193,6 +196,15 @@ class TestBase010(unittest.TestCase):
self.qmf = qmf.console.Session(handler)
self.qmf_broker = self.qmf.addBroker(str(self.broker))
+ def startBrokerAccess(self):
+ """
+ New-style management access to the broker. Can be used in lieu of startQmf.
+ """
+ if 'broker_conn' not in self.__dict__:
+ self.broker_conn = qpid.messaging.Connection(str(self.broker))
+ self.broker_conn.open()
+ self.broker_access = BrokerAgent(self.broker_conn)
+
def connect(self, host=None, port=None):
url = self.broker
if url.scheme == URL.AMQPS:
diff --git a/python/qpid/tests/messaging/endpoints.py b/python/qpid/tests/messaging/endpoints.py
index db5ec03df2..62deacd0bd 100644
--- a/python/qpid/tests/messaging/endpoints.py
+++ b/python/qpid/tests/messaging/endpoints.py
@@ -524,7 +524,7 @@ class SessionTests(Base):
self.ssn.acknowledge(echos[0])
self.ssn.acknowledge(echos[1], Disposition(REJECTED))
self.ssn.acknowledge(echos[2],
- Disposition(REJECTED, code=3, text="test-reject"))
+ Disposition(REJECTED, code=0, text="test-reject"))
self.drain(rej, expected=msgs[1:])
self.ssn.acknowledge()
@@ -632,9 +632,9 @@ class SessionTests(Base):
def testDoubleCommit(self):
ssn = self.conn.session(transactional=True)
- snd = ssn.sender("amq.direct")
- rcv = ssn.receiver("amq.direct")
- msgs = [self.message("testDoubleCommit", i) for i in range(3)]
+ snd = ssn.sender("amq.direct/doubleCommit")
+ rcv = ssn.receiver("amq.direct/doubleCommit")
+ msgs = [self.message("testDoubleCommit", i, subject="doubleCommit") for i in range(3)]
for m in msgs:
snd.send(m)
ssn.commit()
@@ -1038,7 +1038,7 @@ class AddressTests(Base):
snd.close()
assert False, "successfully deleted amq.topic"
except SessionError, e:
- assert "Cannot delete default exchange" in str(e)
+ assert e.code == 530
# XXX: need to figure out close after error
self.conn._remove_session(self.ssn)
diff --git a/python/setup.py b/python/setup.py
index a0717ece1b..225ee44b91 100755
--- a/python/setup.py
+++ b/python/setup.py
@@ -298,7 +298,7 @@ class install_lib(_install_lib):
return outfiles + extra
setup(name="qpid-python",
- version="0.15",
+ version="0.17",
author="Apache Qpid",
author_email="dev@qpid.apache.org",
packages=["mllib", "qpid", "qpid.messaging", "qpid.tests",