diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2009-11-14 01:12:54 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2009-11-14 01:12:54 +0000 |
| commit | caac82682127f212d2d714154d09fc51244cd4ae (patch) | |
| tree | 6412fc891a94ce7554c9e8fc836983a1ef39b24f /python | |
| parent | 208fb8cefbfdb4a193a319e6d9fc3ba7c81bcd65 (diff) | |
| download | qpid-python-caac82682127f212d2d714154d09fc51244cd4ae.tar.gz | |
removed start/stop
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@836085 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
| -rw-r--r-- | python/qpid/messaging.py | 114 | ||||
| -rw-r--r-- | python/qpid/tests/messaging.py | 59 |
2 files changed, 40 insertions, 133 deletions
diff --git a/python/qpid/messaging.py b/python/qpid/messaging.py index ed0bd14f9c..ec1c054e14 100644 --- a/python/qpid/messaging.py +++ b/python/qpid/messaging.py @@ -97,7 +97,7 @@ class Connection: mechanism="PLAIN", heartbeat=None, **options): """ Creates a connection. A newly created connection must be connected - with the Connection.connect() method before it can be started. + with the Connection.connect() method before it can be used. @type host: str @param host: the name or ip address of the remote host @@ -113,7 +113,6 @@ class Connection: self.mechanism = mechanism self.heartbeat = heartbeat - self.started = False self.id = str(uuid4()) self.session_counter = 0 self.sessions = {} @@ -166,7 +165,7 @@ class Connection: if self.sessions.has_key(name): return self.sessions[name] else: - ssn = Session(self, name, self.started, transactional=transactional) + ssn = Session(self, name, transactional) self.sessions[name] = ssn self._wakeup() return ssn @@ -201,24 +200,6 @@ class Connection: return self._connected @synchronized - def start(self): - """ - Start incoming message delivery for all sessions. - """ - self.started = True - for ssn in self.sessions.values(): - ssn.start() - - @synchronized - def stop(self): - """ - Stop incoming message deliveries for all sessions. - """ - for ssn in self.sessions.values(): - ssn.stop() - self.started = False - - @synchronized def close(self): """ Close the connection and all sessions. @@ -269,10 +250,9 @@ class Session: messages, and manage various Senders and Receivers. """ - def __init__(self, connection, name, started, transactional): + def __init__(self, connection, name, transactional): self.connection = connection self.name = name - self.started = started self.transactional = transactional @@ -346,8 +326,7 @@ class Session: @rtype: Receiver @return: a new Receiver for the specified source """ - receiver = Receiver(self, len(self.receivers), source, options, - self.started) + receiver = Receiver(self, len(self.receivers), source, options) self.receivers.append(receiver) self._wakeup() return receiver @@ -453,24 +432,6 @@ class Session: assert self.aborted @synchronized - def start(self): - """ - Start incoming message delivery for the session. - """ - self.started = True - for rcv in self.receivers: - rcv.start() - - @synchronized - def stop(self): - """ - Stop incoming message delivery for the session. - """ - for rcv in self.receivers: - rcv.stop() - self.started = False - - @synchronized def close(self): """ Close the session. @@ -611,22 +572,20 @@ class Empty(ReceiveError): """ pass -class Receiver: +class Receiver(object): """ Receives incoming messages from a remote source. Messages may be fetched with L{fetch}. """ - def __init__(self, session, index, source, options, started): + def __init__(self, session, index, source, options): self.session = session self.index = index self.destination = str(self.index) self.source = source self.options = options - self.started = started - self.capacity = options.get("capacity", UNLIMITED) self.granted = Serial(0) self.draining = False self.impending = Serial(0) @@ -638,6 +597,26 @@ class Receiver: self.closing = False self.closed = False self._lock = self.session._lock + self._capacity = 0 + self._set_capacity(options.get("capacity", 0), False) + + @synchronized + def _set_capacity(self, c, wakeup=True): + if c is UNLIMITED: + self._capacity = c.value + else: + self._capacity = c + self._grant() + if wakeup: + self._wakeup() + + def _get_capacity(self): + if self._capacity == UNLIMITED.value: + return UNLIMITED + else: + return self._capacity + + capacity = property(_get_capacity, _set_capacity) def _wakeup(self): self.session._wakeup() @@ -663,14 +642,6 @@ class Receiver: """ return self.received - self.returned - def _capacity(self): - if not self.started: - return 0 - elif self.capacity is UNLIMITED: - return self.capacity.value - else: - return self.capacity - def _pred(self, msg): return msg._receiver == self @@ -687,7 +658,7 @@ class Receiver: self._ewait(lambda: self.linked) - if self._capacity() == 0: + if self._capacity == 0: self.granted = self.returned + 1 self._wakeup() self._ewait(lambda: self.impending >= self.granted) @@ -701,39 +672,16 @@ class Receiver: msg = self.session._get(self._pred, timeout=0) if msg is None: raise Empty() - elif self._capacity() not in (0, UNLIMITED.value): + elif self._capacity not in (0, UNLIMITED.value): self.granted += 1 self._wakeup() return msg def _grant(self): - if self.started: - if self.capacity is UNLIMITED: - self.granted = UNLIMITED - else: - self.granted = self.received + self._capacity() + if self._capacity == UNLIMITED.value: + self.granted = UNLIMITED else: - self.granted = self.received - - - @synchronized - def start(self): - """ - Start incoming message delivery for this receiver. - """ - self.started = True - self._grant() - self._wakeup() - - @synchronized - def stop(self): - """ - Stop incoming message delivery for this receiver. - """ - self.started = False - self._grant() - self._wakeup() - self._ewait(lambda: self.impending == self.received) + self.granted = self.received + self._capacity @synchronized def close(self): diff --git a/python/qpid/tests/messaging.py b/python/qpid/tests/messaging.py index 830391bf31..8cadb2a8fe 100644 --- a/python/qpid/tests/messaging.py +++ b/python/qpid/tests/messaging.py @@ -169,23 +169,6 @@ class ConnectionTests(Base): self.conn.connect() self.ping(ssn) - def testStart(self): - ssn = self.conn.session() - assert not ssn.started - self.conn.start() - assert ssn.started - ssn2 = self.conn.session() - assert ssn2.started - - def testStop(self): - self.conn.start() - ssn = self.conn.session() - assert ssn.started - self.conn.stop() - assert not ssn.started - ssn2 = self.conn.session() - assert not ssn2.started - def testClose(self): self.conn.close() assert not self.conn.connected() @@ -234,9 +217,6 @@ class SessionTests(Base): rcv2 = self.ssn.receiver(ADDR, capacity=UNLIMITED) rcv3 = self.ssn.receiver(ADDR, capacity=UNLIMITED) - # XXX: this won't work if it is before the receiver creation - self.ssn.start() - snd = self.ssn.sender(ADDR) msgs = [] @@ -257,25 +237,6 @@ class SessionTests(Base): assert msgs == fetched, "expecting %s, got %s" % (msgs, fetched) self.ssn.acknowledge() - def testStart(self): - START_Q = 'test-start-queue; {create: always}' - rcv = self.ssn.receiver(START_Q) - assert not rcv.started - self.ssn.start() - assert rcv.started - rcv = self.ssn.receiver(START_Q) - assert rcv.started - - def testStop(self): - STOP_Q = 'test-stop-queue; {create: always}' - self.ssn.start() - rcv = self.ssn.receiver(STOP_Q) - assert rcv.started - self.ssn.stop() - assert not rcv.started - rcv = self.ssn.receiver(STOP_Q) - assert not rcv.started - # XXX, we need a convenient way to assert that required queues are # empty on setup, and possibly also to drain queues on teardown def ackTest(self, acker, ack_capacity=None): @@ -491,11 +452,11 @@ class ReceiverTests(Base): assert msg.content == three self.ssn.acknowledge() - def testStart(self): - content = self.send("testStart") + def testCapacityIncrease(self): + content = self.send("testCapacityIncrease") self.sleep() assert self.rcv.pending() == 0 - self.rcv.start() + self.rcv.capacity = UNLIMITED self.sleep() assert self.rcv.pending() == 1 msg = self.rcv.fetch(0) @@ -503,17 +464,17 @@ class ReceiverTests(Base): assert self.rcv.pending() == 0 self.ssn.acknowledge() - def testStop(self): - self.rcv.start() - one = self.send("testStop", 1) + def testCapacityDecrease(self): + self.rcv.capacity = UNLIMITED + one = self.send("testCapacityDecrease", 1) self.sleep() assert self.rcv.pending() == 1 msg = self.rcv.fetch(0) assert msg.content == one - self.rcv.stop() + self.rcv.capacity = 0 - two = self.send("testStop", 2) + two = self.send("testCapacityDecrease", 2) self.sleep() assert self.rcv.pending() == 0 msg = self.rcv.fetch(0) @@ -522,7 +483,7 @@ class ReceiverTests(Base): self.ssn.acknowledge() def testPending(self): - self.rcv.start() + self.rcv.capacity = UNLIMITED assert self.rcv.pending() == 0 for i in range(3): @@ -545,7 +506,6 @@ class ReceiverTests(Base): def testCapacity(self): self.rcv.capacity = 5 - self.rcv.start() self.assertPending(self.rcv, 0) for i in range(15): @@ -565,7 +525,6 @@ class ReceiverTests(Base): def testCapacityUNLIMITED(self): self.rcv.capacity = UNLIMITED - self.rcv.start() self.assertPending(self.rcv, 0) for i in range(10): |
