summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2008-03-07 16:57:43 +0000
committerRafael H. Schloming <rhs@apache.org>2008-03-07 16:57:43 +0000
commit205ae0e54ba0c5fdeb5d2e884997c80cb52f1799 (patch)
treecc85d43a8e09cb15111290f9302c4c6188c0784c /python
parente385ba8c6612ac396a4d9ecbd5c8ffa18977e25a (diff)
downloadqpid-python-205ae0e54ba0c5fdeb5d2e884997c80cb52f1799.tar.gz
added session.sync(); session.auto_sync; made transfers not auto-complete; fixed bug in RangedSet
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@634744 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r--python/cpp_failing_0-10.txt2
-rwxr-xr-xpython/hello-010-world6
-rw-r--r--python/qpid/connection010.py3
-rw-r--r--python/qpid/datatypes.py7
-rw-r--r--python/qpid/delegates.py18
-rw-r--r--python/qpid/exceptions.py20
-rw-r--r--python/qpid/session.py54
-rwxr-xr-xpython/run-tests10
-rwxr-xr-xpython/server01011
-rw-r--r--python/tests/connection010.py1
-rw-r--r--python/tests/datatypes.py11
11 files changed, 126 insertions, 17 deletions
diff --git a/python/cpp_failing_0-10.txt b/python/cpp_failing_0-10.txt
index 3d00313d2d..824967762b 100644
--- a/python/cpp_failing_0-10.txt
+++ b/python/cpp_failing_0-10.txt
@@ -63,3 +63,5 @@ tests_0-10.queue.QueueTests.test_delete_simple
tests_0-10.queue.QueueTests.test_purge
tests_0-10.queue.QueueTests.test_bind
tests_0-10.queue.QueueTests.test_unbind_headers
+tests_0-10.exchange.RecommendedTypesRuleTests.testTopic
+tests_0-10.exchange.RequiredInstancesRuleTests.testAmqTopic
diff --git a/python/hello-010-world b/python/hello-010-world
index 8c98170873..83e7c87675 100755
--- a/python/hello-010-world
+++ b/python/hello-010-world
@@ -38,6 +38,12 @@ print m3
m4 = ssn.incoming("test").get(timeout=10)
print m4
+print ssn.sender._completed, ssn.sender.next_id
+ssn.sync(10)
+print ssn.sender.segments
+
+ssn.channel.session_flush(completed=True)
+
ssn.message_accept(RangedSet(m1.id, m2.id, m3.id, m4.id))
print ssn.queue_query("testing")
diff --git a/python/qpid/connection010.py b/python/qpid/connection010.py
index b17ceea534..022ef8e411 100644
--- a/python/qpid/connection010.py
+++ b/python/qpid/connection010.py
@@ -26,10 +26,9 @@ from codec010 import StringCodec
from session import Session
from invoker import Invoker
from spec010 import Control, Command
+from exceptions import *
import delegates
-class Timeout(Exception): pass
-
class ChannelBusy(Exception): pass
class ChannelsBusy(Exception): pass
diff --git a/python/qpid/datatypes.py b/python/qpid/datatypes.py
index bce68aebcd..e9973b4cc8 100644
--- a/python/qpid/datatypes.py
+++ b/python/qpid/datatypes.py
@@ -67,10 +67,15 @@ class Range:
return self.lower <= n and n <= self.upper
def touches(self, r):
+ # XXX
return (self.lower - 1 in r or
self.upper + 1 in r or
r.lower - 1 in self or
- r.upper + 1 in self)
+ r.upper + 1 in self or
+ self.lower in r or
+ self.upper in r or
+ r.lower in self or
+ r.upper in self)
def span(self, r):
return Range(min(self.lower, r.lower), max(self.upper, r.upper))
diff --git a/python/qpid/delegates.py b/python/qpid/delegates.py
index 83413b91ea..a29d5c5265 100644
--- a/python/qpid/delegates.py
+++ b/python/qpid/delegates.py
@@ -20,6 +20,7 @@
import connection010
import session
from util import notify
+from datatypes import RangedSet
class Delegate:
@@ -79,6 +80,23 @@ class Delegate:
ssn.receiver.next_id = cp.command_id
ssn.receiver.next_offset = cp.command_offset
+ def session_completed(self, ch, cmp):
+ ch.session.sender.completed(cmp.commands)
+ notify(ch.session.condition)
+
+ def session_flush(self, ch, f):
+ rcv = ch.session.receiver
+ if f.expected:
+ if rcv.next_id == None:
+ exp = None
+ else:
+ exp = RangedSet(rcv.next_id)
+ ch.session_expected(exp)
+ if f.confirmed:
+ ch.session_confirmed(rcv._completed)
+ if f.completed:
+ ch.session_completed(rcv._completed)
+
class Server(Delegate):
def start(self):
diff --git a/python/qpid/exceptions.py b/python/qpid/exceptions.py
new file mode 100644
index 0000000000..2136793d3b
--- /dev/null
+++ b/python/qpid/exceptions.py
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+class Timeout(Exception): pass
diff --git a/python/qpid/session.py b/python/qpid/session.py
index b83bd1637f..7a84fa601d 100644
--- a/python/qpid/session.py
+++ b/python/qpid/session.py
@@ -17,14 +17,15 @@
# under the License.
#
-from threading import Condition, RLock
+from threading import Condition, RLock, currentThread
from invoker import Invoker
from datatypes import RangedSet, Struct, Future
from codec010 import StringCodec
from assembler import Segment
from queue import Queue
from datatypes import Message
-from util import wait
+from util import wait, notify
+from exceptions import *
from logging import getLogger
class SessionDetached(Exception): pass
@@ -37,12 +38,14 @@ def server(*args):
class SessionException(Exception): pass
+INCOMPLETE = object()
+
class Session(Invoker):
- def __init__(self, name, spec, sync=True, timeout=10, delegate=client):
+ def __init__(self, name, spec, auto_sync=True, timeout=10, delegate=client):
self.name = name
self.spec = spec
- self.sync = sync
+ self.auto_sync = auto_sync
self.timeout = timeout
self.channel = None
@@ -72,9 +75,29 @@ class Session(Invoker):
finally:
self.lock.release()
+ def error(self):
+ exc = self.exceptions[:]
+ if len(exc) == 1:
+ return exc[0]
+ else:
+ return tuple(exc)
+
+ def sync(self, timeout=None):
+ if currentThread() == self.channel.connection.thread:
+ raise SessionException("deadlock detected")
+ self.channel.session_flush(completed=True)
+ last = self.sender.next_id - 1
+ if not wait(self.condition, lambda:
+ last in self.sender._completed or self.exceptions,
+ timeout):
+ raise Timeout()
+ if self.exceptions:
+ raise SessionException(self.error())
+
def close(self, timeout=None):
self.channel.session_detach(self.name)
- wait(self.condition, lambda: self.channel is None, timeout)
+ if not wait(self.condition, lambda: self.channel is None, timeout):
+ raise Timeout()
def resolve_method(self, name):
cmd = self.spec.instructions.get(name)
@@ -132,10 +155,12 @@ class Session(Invoker):
self.send(seg)
if type.result:
- if self.sync:
+ if self.auto_sync:
return result.get(self.timeout)
else:
return result
+ elif self.auto_sync:
+ self.sync(self.timeout)
def received(self, seg):
self.receiver.received(seg)
@@ -148,6 +173,7 @@ class Session(Invoker):
self.assembly = None
def dispatch(self, assembly):
+ segments = assembly[:]
cmd = assembly.pop(0).decode(self.spec)
args = []
@@ -168,8 +194,9 @@ class Session(Invoker):
if cmd.type.result:
self.execution_result(cmd.id, result)
- for seg in assembly:
- self.receiver.completed(seg)
+ if result is not INCOMPLETE:
+ for seg in segments:
+ self.receiver.completed(seg)
def send(self, seg):
self.sender.send(seg)
@@ -212,6 +239,7 @@ class Sender:
self.next_id = 0
self.next_offset = 0
self.segments = []
+ self._completed = RangedSet()
def send(self, seg):
seg.id = self.next_id
@@ -235,6 +263,8 @@ class Sender:
del self.segments[idx]
else:
idx += 1
+ for range in commands.ranges:
+ self._completed.add(range.lower, range.upper)
class Delegate:
@@ -249,17 +279,14 @@ class Delegate:
self.session.lock.acquire()
try:
self.session.exceptions.append(ex)
- excs = self.session.exceptions[:]
- if len(excs) == 1:
- error = excs[0]
- else:
- error = tuple(excs)
+ error = self.session.error()
for id in self.session.results:
f = self.session.results.pop(id)
f.error(error)
for q in self.session._incoming.values():
q.close(error)
+ notify(self.session.condition)
finally:
self.session.lock.release()
@@ -274,3 +301,4 @@ class Client(Delegate):
messages = self.session.incoming(cmd.destination)
messages.put(m)
msg.debug("RECV: %s", m)
+ return INCOMPLETE
diff --git a/python/run-tests b/python/run-tests
index 90c0200d01..7efe5523df 100755
--- a/python/run-tests
+++ b/python/run-tests
@@ -18,9 +18,17 @@
# under the License.
#
-import sys
+import sys, logging
from qpid.testlib import testrunner
+if "-v" in sys.argv:
+ level = logging.DEBUG
+else:
+ level = logging.WARN
+
+format = "%(asctime)s %(name)-12s %(levelname)-8s %(message)s"
+logging.basicConfig(level=level, format=format, datefmt='%H:%M:%S')
+
if not testrunner.run(): sys.exit(1)
diff --git a/python/server010 b/python/server010
index ecf76577ec..2b2aa7f8d3 100755
--- a/python/server010
+++ b/python/server010
@@ -7,6 +7,16 @@ from qpid.spec010 import load
from qpid.session import Client
from qpid.datatypes import Message
+import logging, sys
+
+if "-v" in sys.argv:
+ level = logging.DEBUG
+else:
+ level = logging.WARN
+
+format = "%(asctime)s %(name)-12s %(levelname)-8s %(message)s"
+logging.basicConfig(level=level, format=format, datefmt='%H:%M:%S')
+
spec = load("../specs/amqp.0-10.xml")
class Server:
@@ -15,6 +25,7 @@ class Server:
return delegates.Server(connection, self.session)
def session(self, session):
+ session.auto_sync = False
return SessionDelegate(session)
class SessionDelegate(Client):
diff --git a/python/tests/connection010.py b/python/tests/connection010.py
index 8adf20fd78..e966ede377 100644
--- a/python/tests/connection010.py
+++ b/python/tests/connection010.py
@@ -39,6 +39,7 @@ class TestServer:
return Server(connection, delegate=self.session)
def session(self, session):
+ session.auto_sync = False
return TestSession(session, self.queue)
class TestSession(Delegate):
diff --git a/python/tests/datatypes.py b/python/tests/datatypes.py
index e22e250f61..7844cf4d10 100644
--- a/python/tests/datatypes.py
+++ b/python/tests/datatypes.py
@@ -89,3 +89,14 @@ class RangedSetTest(TestCase):
assert 21 not in rs
assert 20 in rs
self.check(rs.ranges)
+
+ def testAddSelf(self):
+ a = RangedSet()
+ a.add(0, 8)
+ self.check(a.ranges)
+ a.add(0, 8)
+ self.check(a.ranges)
+ assert len(a.ranges) == 1
+ range = a.ranges[0]
+ assert range.lower == 0
+ assert range.upper == 8