summaryrefslogtreecommitdiff
path: root/python/qpid/driver.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/qpid/driver.py')
-rw-r--r--python/qpid/driver.py72
1 files changed, 44 insertions, 28 deletions
diff --git a/python/qpid/driver.py b/python/qpid/driver.py
index 9978a27f5c..aa2ca3ccc5 100644
--- a/python/qpid/driver.py
+++ b/python/qpid/driver.py
@@ -132,10 +132,44 @@ class SessionState:
op.channel = self.channel
self.driver.write_op(op)
+POLICIES = Values("always", "sender", "receiver", "never")
+
+class Bindings:
+
+ def validate(self, o, ctx):
+ t = ctx.containers[1].get("type", "queue")
+ if t != "queue":
+ return "bindings are only permitted on nodes of type queue"
+
+COMMON_OPTS = {
+ "create": POLICIES,
+ "delete": POLICIES,
+ "assert": POLICIES,
+ "node-properties": Map({
+ "type": Values("queue", "topic"),
+ "durable": Types(bool),
+ "x-properties": Map({
+ "type": Types(basestring),
+ "bindings": And(Types(list), Bindings())
+ },
+ restricted=False)
+ })
+ }
+
+RECEIVE_MODES = Values("browse", "consume")
+
+SOURCE_OPTS = COMMON_OPTS.copy()
+SOURCE_OPTS.update({
+ "mode": RECEIVE_MODES
+ })
+
+TARGET_OPTS = COMMON_OPTS.copy()
+
class LinkIn:
ADDR_NAME = "source"
DIR_NAME = "receiver"
+ VALIDATOR = Map(SOURCE_OPTS)
def init_link(self, sst, rcv, _rcv):
_rcv.destination = str(rcv.id)
@@ -143,6 +177,8 @@ class LinkIn:
_rcv.draining = False
def do_link(self, sst, rcv, _rcv, type, subtype, action):
+ acq_mode = acquire_mode.pre_acquired
+
if type == "topic":
_rcv._queue = "%s.%s" % (rcv.session.name, _rcv.destination)
sst.write_cmd(QueueDeclare(queue=_rcv._queue, durable=DURABLE_DEFAULT, exclusive=True, auto_delete=True))
@@ -161,8 +197,11 @@ class LinkIn:
f._bind(sst, _rcv.name, _rcv._queue)
elif type == "queue":
_rcv._queue = _rcv.name
+ if _rcv.options.get("mode", "consume") == "browse":
+ acq_mode = acquire_mode.not_acquired
- sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination))
+ sst.write_cmd(MessageSubscribe(queue=_rcv._queue, destination=_rcv.destination,
+ acquire_mode = acq_mode))
sst.write_cmd(MessageSetFlowMode(_rcv.destination, flow_mode.credit), action)
def do_unlink(self, sst, rcv, _rcv, action=noop):
@@ -175,6 +214,7 @@ class LinkOut:
ADDR_NAME = "target"
DIR_NAME = "sender"
+ VALIDATOR = Map(TARGET_OPTS)
def init_link(self, sst, snd, _snd):
_snd.closing = False
@@ -582,7 +622,7 @@ class Driver:
_lnk.closing = False
dir.init_link(sst, lnk, _lnk)
- err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk)
+ err = self.parse_address(_lnk, dir, addr) or self.validate_options(_lnk, dir)
if err:
lnk.error = (err,)
lnk.closed = True
@@ -626,33 +666,9 @@ class Driver:
except address.ParseError, e:
return e
- POLICIES = Values("always", "sender", "receiver", "never")
-
- class Bindings:
-
- def validate(self, o, ctx):
- t = ctx.containers[1].get("type", "queue")
- if t != "queue":
- return "bindings are only permitted on nodes of type queue"
-
- OPTS = Map({
- "create": POLICIES,
- "delete": POLICIES,
- "assert": POLICIES,
- "node-properties": Map({
- "type": Values("queue", "topic"),
- "durable": Types(bool),
- "x-properties": Map({
- "type": Types(basestring),
- "bindings": And(Types(list), Bindings())
- },
- restricted=False)
- })
- })
-
- def validate_options(self, lnk):
+ def validate_options(self, lnk, dir):
ctx = Context()
- err = Driver.OPTS.validate(lnk.options, ctx)
+ err = dir.VALIDATOR.validate(lnk.options, ctx)
if err: return "error in options: %s" % err
def resolve_declare(self, sst, lnk, dir, action):