diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2010-02-17 05:11:33 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2010-02-17 05:11:33 +0000 |
| commit | 5d0e5d3e0cf554b0f50841e954800e5153d9a916 (patch) | |
| tree | dc8c568f6ab6e0bef29680caf990da68c5581dc9 /python | |
| parent | e54f5047dc6588ba952f394b251b9f454a32da1b (diff) | |
| download | qpid-python-5d0e5d3e0cf554b0f50841e954800e5153d9a916.tar.gz | |
tweaks to link
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@910826 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
| -rw-r--r-- | python/qpid/driver.py | 25 |
1 files changed, 12 insertions, 13 deletions
diff --git a/python/qpid/driver.py b/python/qpid/driver.py index d4f5d3bb93..9978a27f5c 100644 --- a/python/qpid/driver.py +++ b/python/qpid/driver.py @@ -140,10 +140,9 @@ class LinkIn: def init_link(self, sst, rcv, _rcv): _rcv.destination = str(rcv.id) sst.destinations[_rcv.destination] = _rcv - _rcv.closing = False _rcv.draining = False - def do_link(self, sst, rcv, _rcv, type, subtype): + def do_link(self, sst, rcv, _rcv, type, subtype, action): if type == "topic": _rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination) sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)) @@ -163,11 +162,8 @@ class LinkIn: elif type == "queue": _rcv._queue = _rcv.name - def done(): - rcv.linked = True - sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination)) - sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), done) + sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action) def do_unlink(self, sst, rcv, _rcv, action=noop): sst.write_cmd(MessageCancel(_rcv.destination), action) @@ -183,15 +179,14 @@ class LinkOut: def init_link(self, sst, snd, _snd): _snd.closing = False - def do_link(self, sst, snd, _snd, type, subtype): + def do_link(self, sst, snd, _snd, type, subtype, action): if type == "topic": _snd._exchange = _snd.name _snd._routing_key = _snd.subject elif type == "queue": _snd._exchange = "" _snd._routing_key = _snd.name - - snd.linked = True + action() def do_unlink(self, sst, snd, _snd, action=noop): action() @@ -584,6 +579,7 @@ class Driver: if _lnk is None and not lnk.closing and not lnk.closed: _lnk = Attachment(lnk) + _lnk.closing = False dir.init_link(sst, lnk, _lnk) err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk) @@ -592,23 +588,26 @@ class Driver: lnk.closed = True return + def linked(): + lnk.linked = True + def resolved(type, subtype): - dir.do_link(sst, lnk, _lnk, type, subtype) + dir.do_link(sst, lnk, _lnk, type, subtype, linked) self.resolve_declare(sst, _lnk, dir.DIR_NAME, resolved) self._attachments[lnk] = _lnk if lnk.linked and lnk.closing and not lnk.closed: if not _lnk.closing: - def done(): + def unlinked(): dir.del_link(sst, lnk, _lnk) del self._attachments[lnk] lnk.closed = True if _lnk.options.get("delete") in ("always", dir.DIR_NAME): dir.do_unlink(sst, lnk, _lnk) - self.delete(sst, _lnk.name, done) + self.delete(sst, _lnk.name, unlinked) else: - dir.do_unlink(sst, lnk, _lnk, done) + dir.do_unlink(sst, lnk, _lnk, unlinked) _lnk.closing = True elif not lnk.linked and lnk.closing and not lnk.closed: lnk.closed = True |
