diff options
Diffstat (limited to 'python/qpid/driver.py')
| -rw-r--r-- | python/qpid/driver.py | 72 |
1 files changed, 44 insertions, 28 deletions
diff --git a/python/qpid/driver.py b/python/qpid/driver.py index 9978a27f5c..aa2ca3ccc5 100644 --- a/python/qpid/driver.py +++ b/python/qpid/driver.py @@ -132,10 +132,44 @@ class SessionState: op.channel = self.channel self.driver.write_op(op) +POLICIES = Values("always", "sender", "receiver", "never") + +class Bindings: + + def validate(self, o, ctx): + t = ctx.containers[1].get("type", "queue") + if t != "queue": + return "bindings are only permitted on nodes of type queue" + +COMMON_OPTS = { + "create": POLICIES, + "delete": POLICIES, + "assert": POLICIES, + "node-properties": Map({ + "type": Values("queue", "topic"), + "durable": Types(bool), + "x-properties": Map({ + "type": Types(basestring), + "bindings": And(Types(list), Bindings()) + }, + restricted=False) + }) + } + +RECEIVE_MODES = Values("browse", "consume") + +SOURCE_OPTS = COMMON_OPTS.copy() +SOURCE_OPTS.update({ + "mode": RECEIVE_MODES + }) + +TARGET_OPTS = COMMON_OPTS.copy() + class LinkIn: ADDR_NAME = "source" DIR_NAME = "receiver" + VALIDATOR = Map(SOURCE_OPTS) def init_link(self, sst, rcv, _rcv): _rcv.destination = str(rcv.id) @@ -143,6 +177,8 @@ class LinkIn: _rcv.draining = False def do_link(self, sst, rcv, _rcv, type, subtype, action): + acq_mode = acquire_mode.pre_acquired + if type == "topic": _rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination) sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True)) @@ -161,8 +197,11 @@ class LinkIn: f._bind(sst, _rcv.name, _rcv._queue) elif type == "queue": _rcv._queue = _rcv.name + if _rcv.options.get("mode", "consume") == "browse": + acq_mode = acquire_mode.not_acquired - sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination)) + sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination, + acquire_mode = acq_mode)) sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action) def do_unlink(self, sst, rcv, _rcv, action=noop): @@ -175,6 +214,7 @@ class LinkOut: ADDR_NAME = "target" DIR_NAME = "sender" + VALIDATOR = Map(TARGET_OPTS) def init_link(self, sst, snd, _snd): _snd.closing = False @@ -582,7 +622,7 @@ class Driver: _lnk.closing = False dir.init_link(sst, lnk, _lnk) - err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk) + err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk, dir) if err: lnk.error = (err,) lnk.closed = True @@ -626,33 +666,9 @@ class Driver: except address.ParseError, e: return e - POLICIES = Values("always", "sender", "receiver", "never") - - class Bindings: - - def validate(self, o, ctx): - t = ctx.containers[1].get("type", "queue") - if t != "queue": - return "bindings are only permitted on nodes of type queue" - - OPTS = Map({ - "create": POLICIES, - "delete": POLICIES, - "assert": POLICIES, - "node-properties": Map({ - "type": Values("queue", "topic"), - "durable": Types(bool), - "x-properties": Map({ - "type": Types(basestring), - "bindings": And(Types(list), Bindings()) - }, - restricted=False) - }) - }) - - def validate_options(self, lnk): + def validate_options(self, lnk, dir): ctx = Context() - err = Driver.OPTS.validate(lnk.options, ctx) + err = dir.VALIDATOR.validate(lnk.options, ctx) if err: return "error in options: %s" % err def resolve_declare(self, sst, lnk, dir, action): |
