From 9c0a76ef0159743bb58d288c89a7e2faf5be11be Mon Sep 17 00:00:00 2001 From: "Rafael H. Schloming" Date: Wed, 2 Sep 2009 15:35:42 +0000 Subject: changed Lockable -> Waiter and switched its usage from has-a to is-a; also fixed some more imports git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@810573 13f79535-47bb-0310-9956-ffa450edef68 --- python/qpid/concurrency.py | 65 ++++++++++++++++++++++++++++++++++++++++++++ python/qpid/driver.py | 13 ++++----- python/qpid/lockable.py | 68 ---------------------------------------------- python/qpid/messaging.py | 36 +++++++++++++----------- 4 files changed, 91 insertions(+), 91 deletions(-) create mode 100644 python/qpid/concurrency.py delete mode 100644 python/qpid/lockable.py (limited to 'python') diff --git a/python/qpid/concurrency.py b/python/qpid/concurrency.py new file mode 100644 index 0000000000..00cdb6b953 --- /dev/null +++ b/python/qpid/concurrency.py @@ -0,0 +1,65 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import inspect, time + +def synchronized(meth): + args, vargs, kwargs, defs = inspect.getargspec(meth) + scope = {} + scope["meth"] = meth + exec """ +def %s%s: + %s + %s._lock.acquire() + try: + return meth%s + finally: + %s._lock.release() +""" % (meth.__name__, inspect.formatargspec(args, vargs, kwargs, defs), + repr(inspect.getdoc(meth)), args[0], + inspect.formatargspec(args, vargs, kwargs, defs, + formatvalue=lambda x: ""), + args[0]) in scope + return scope[meth.__name__] + +class Waiter(object): + + def __init__(self, condition): + self.condition = condition + + def wait(self, predicate, timeout=None): + passed = 0 + start = time.time() + while not predicate(): + if timeout is None: + # using the timed wait prevents keyboard interrupts from being + # blocked while waiting + self.condition.wait(3) + elif passed < timeout: + self.condition.wait(timeout - passed) + else: + return False + passed = time.time() - start + return True + + def notify(self): + self.condition.notify() + + def notifyAll(self): + self.condition.notifyAll() diff --git a/python/qpid/driver.py b/python/qpid/driver.py index 2f56a6008e..a759588572 100644 --- a/python/qpid/driver.py +++ b/python/qpid/driver.py @@ -18,11 +18,11 @@ # import compat, connection, socket, sys, time +from concurrency import synchronized from datatypes import RangedSet, Message as Message010 from exceptions import Timeout -from lockable import synchronized, Lockable from logging import getLogger -from messaging import get_codec, Message, Pattern, UNLIMITED +from messaging import get_codec, ConnectError, Message, Pattern, UNLIMITED from ops import delivery_mode from session import Client, INCOMPLETE, SessionDetached from threading import Condition, Thread @@ -63,12 +63,11 @@ def delegate(handler, session): return handler._message_transfer(session, cmd) return Delegate -class Driver(Lockable): +class Driver: def __init__(self, connection): self.connection = connection self._lock = self.connection._lock - self._condition = self.connection._condition self._wakeup_cond = Condition() self._socket = None self._conn = None @@ -134,7 +133,7 @@ class Driver(Lockable): self.connection.error = (msg,) self._modcount = modcount - self.notifyAll() + self.connection._waiter.notifyAll() def connect(self): if self._conn is not None: @@ -177,7 +176,7 @@ class Driver(Lockable): _ssn.auto_sync = False _ssn.invoke_lock = self._lock _ssn.lock = self._lock - _ssn.condition = self._condition + _ssn.condition = self.connection._condition if ssn.transactional: # XXX: adding an attribute to qpid.session.Session _ssn.acked = [] @@ -422,7 +421,7 @@ class Driver(Lockable): rcv.received += 1 log.debug("RECV [%s] %s", ssn, msg) ssn.incoming.append(msg) - self.notifyAll() + self.connection._waiter.notifyAll() return INCOMPLETE def _decode(self, message): diff --git a/python/qpid/lockable.py b/python/qpid/lockable.py deleted file mode 100644 index 0415d53e27..0000000000 --- a/python/qpid/lockable.py +++ /dev/null @@ -1,68 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import inspect, time - -def synchronized(meth): - args, vargs, kwargs, defs = inspect.getargspec(meth) - scope = {} - scope["meth"] = meth - exec """ -def %s%s: - %s - %s.lock() - try: - return meth%s - finally: - %s.unlock() -""" % (meth.__name__, inspect.formatargspec(args, vargs, kwargs, defs), - repr(inspect.getdoc(meth)), args[0], - inspect.formatargspec(args, vargs, kwargs, defs, - formatvalue=lambda x: ""), - args[0]) in scope - return scope[meth.__name__] - -class Lockable(object): - - def lock(self): - self._lock.acquire() - - def unlock(self): - self._lock.release() - - def wait(self, predicate, timeout=None): - passed = 0 - start = time.time() - while not predicate(): - if timeout is None: - # using the timed wait prevents keyboard interrupts from being - # blocked while waiting - self._condition.wait(3) - elif passed < timeout: - self._condition.wait(timeout - passed) - else: - return False - passed = time.time() - start - return True - - def notify(self): - self._condition.notify() - - def notifyAll(self): - self._condition.notifyAll() diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py index 3c12641bca..a3f2d43ed2 100644 --- a/python/qpid/messaging.py +++ b/python/qpid/messaging.py @@ -31,8 +31,8 @@ Areas that still need work: """ from codec010 import StringCodec +from concurrency import synchronized, Waiter from datatypes import timestamp, uuid4, Serial -from lockable import synchronized, Lockable from logging import getLogger from ops import PRIMITIVE from threading import Thread, RLock, Condition @@ -69,7 +69,7 @@ class ConnectError(ConnectionError): """ pass -class Connection(Lockable): +class Connection: """ A Connection manages a group of L{Sessions} and connects @@ -114,19 +114,23 @@ class Connection(Lockable): self._connected = False self._lock = RLock() self._condition = Condition(self._lock) + self._waiter = Waiter(self._condition) self._modcount = Serial(0) self.error = None from driver import Driver self._driver = Driver(self) self._driver.start() + def _wait(self, predicate, timeout=None): + return self._waiter.wait(predicate, timeout=timeout) + def _wakeup(self): self._modcount += 1 self._driver.wakeup() def _catchup(self, exc=ConnectionError): mc = self._modcount - self.wait(lambda: not self._driver._modcount < mc) + self._wait(lambda: not self._driver._modcount < mc) self._check_error(exc) def _check_error(self, exc=ConnectionError): @@ -134,7 +138,7 @@ class Connection(Lockable): raise exc(*self.error) def _ewait(self, predicate, timeout=None, exc=ConnectionError): - result = self.wait(lambda: self.error or predicate(), timeout) + result = self._wait(lambda: self.error or predicate(), timeout) self._check_error(exc) return result @@ -255,7 +259,7 @@ class NontransactionalSession(SessionError): class TransactionAborted(SessionError): pass -class Session(Lockable): +class Session: """ Sessions provide a linear context for sending and receiving @@ -287,7 +291,6 @@ class Session(Lockable): self.closed = False self._lock = connection._lock - self._condition = connection._condition self.thread = Thread(target = self.run) self.thread.setDaemon(True) self.thread.start() @@ -295,6 +298,9 @@ class Session(Lockable): def __repr__(self): return "" % self.name + def _wait(self, predicate, timeout=None): + return self.connection._wait(predicate, timeout=timeout) + def _wakeup(self): self.connection._wakeup() @@ -369,8 +375,8 @@ class Session(Lockable): @synchronized def _get(self, predicate, timeout=None): - if self.wait(lambda: ((self._peek(predicate) is not None) or self.closing), - timeout): + if self._wait(lambda: ((self._peek(predicate) is not None) or self.closing), + timeout): msg = self._pop(predicate) if msg is not None: msg._receiver.returned += 1 @@ -454,7 +460,7 @@ class Session(Lockable): for rcv in self.receivers: rcv.stop() # TODO: think about stopping individual receivers in listen mode - self.wait(lambda: self._peek(self._pred) is None) + self._wait(lambda: self._peek(self._pred) is None) self.started = False def _pred(self, m): @@ -470,10 +476,10 @@ class Session(Lockable): else: msg._receiver.listener(msg) if self._peek(self._pred) is None: - self.notifyAll() + self.connection._waiter.notifyAll() finally: self.closed = True - self.notifyAll() + self.connection._waiter.notifyAll() @synchronized def close(self): @@ -486,7 +492,7 @@ class Session(Lockable): self.closing = True self._wakeup() self._catchup() - self.wait(lambda: self.closed) + self._wait(lambda: self.closed) while self.thread.isAlive(): self.thread.join(3) self.thread = None @@ -500,7 +506,7 @@ class SendError(SessionError): class InsufficientCapacity(SendError): pass -class Sender(Lockable): +class Sender: """ Sends outgoing messages. @@ -515,7 +521,6 @@ class Sender(Lockable): self.acked = Serial(0) self.closed = False self._lock = self.session._lock - self._condition = self.session._condition def _wakeup(self): self.session._wakeup() @@ -598,7 +603,7 @@ class Empty(ReceiveError): """ pass -class Receiver(Lockable): +class Receiver: """ Receives incoming messages from a remote source. Messages may be @@ -625,7 +630,6 @@ class Receiver(Lockable): self.closed = False self.listener = None self._lock = self.session._lock - self._condition = self.session._condition def _wakeup(self): self.session._wakeup() -- cgit v1.2.1