summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorRafael H. Schloming <rhs@apache.org>2010-02-17 05:11:33 +0000
committerRafael H. Schloming <rhs@apache.org>2010-02-17 05:11:33 +0000
commit5d0e5d3e0cf554b0f50841e954800e5153d9a916 (patch)
treedc8c568f6ab6e0bef29680caf990da68c5581dc9 /python
parente54f5047dc6588ba952f394b251b9f454a32da1b (diff)
downloadqpid-python-5d0e5d3e0cf554b0f50841e954800e5153d9a916.tar.gz
tweaks to link
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@910826 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python')
-rw-r--r--python/qpid/driver.py25
1 files changed, 12 insertions, 13 deletions
diff --git a/python/qpid/driver.py b/python/qpid/driver.py
index d4f5d3bb93..9978a27f5c 100644
--- a/python/qpid/driver.py
+++ b/python/qpid/driver.py
@@ -140,10 +140,9 @@ class LinkIn:
def init_link(self, sst, rcv, _rcv):
_rcv.destination = str(rcv.id)
sst.destinations[_rcv.destination] = _rcv
- _rcv.closing = False
_rcv.draining = False
- def do_link(self, sst, rcv, _rcv, type, subtype):
+ def do_link(self, sst, rcv, _rcv, type, subtype, action):
if type == "topic":
_rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination)
sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
@@ -163,11 +162,8 @@ class LinkIn:
elif type == "queue":
_rcv._queue = _rcv.name
- def done():
- rcv.linked = True
-
sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination))
- sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), done)
+ sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action)
def do_unlink(self, sst, rcv, _rcv, action=noop):
sst.write_cmd(MessageCancel(_rcv.destination), action)
@@ -183,15 +179,14 @@ class LinkOut:
def init_link(self, sst, snd, _snd):
_snd.closing = False
- def do_link(self, sst, snd, _snd, type, subtype):
+ def do_link(self, sst, snd, _snd, type, subtype, action):
if type == "topic":
_snd._exchange = _snd.name
_snd._routing_key = _snd.subject
elif type == "queue":
_snd._exchange = ""
_snd._routing_key = _snd.name
-
- snd.linked = True
+ action()
def do_unlink(self, sst, snd, _snd, action=noop):
action()
@@ -584,6 +579,7 @@ class Driver:
if _lnk is None and not lnk.closing and not lnk.closed:
_lnk = Attachment(lnk)
+ _lnk.closing = False
dir.init_link(sst, lnk, _lnk)
err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk)
@@ -592,23 +588,26 @@ class Driver:
lnk.closed = True
return
+ def linked():
+ lnk.linked = True
+
def resolved(type, subtype):
- dir.do_link(sst, lnk, _lnk, type, subtype)
+ dir.do_link(sst, lnk, _lnk, type, subtype, linked)
self.resolve_declare(sst, _lnk, dir.DIR_NAME, resolved)
self._attachments[lnk] = _lnk
if lnk.linked and lnk.closing and not lnk.closed:
if not _lnk.closing:
- def done():
+ def unlinked():
dir.del_link(sst, lnk, _lnk)
del self._attachments[lnk]
lnk.closed = True
if _lnk.options.get("delete") in ("always", dir.DIR_NAME):
dir.do_unlink(sst, lnk, _lnk)
- self.delete(sst, _lnk.name, done)
+ self.delete(sst, _lnk.name, unlinked)
else:
- dir.do_unlink(sst, lnk, _lnk, done)
+ dir.do_unlink(sst, lnk, _lnk, unlinked)
_lnk.closing = True
elif not lnk.linked and lnk.closing and not lnk.closed:
lnk.closed = True