summaryrefslogtreecommitdiff
path: root/qpid/python/examples
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/python/examples')
-rw-r--r--qpid/python/examples/README.txt42
-rwxr-xr-xqpid/python/examples/api/drain97
-rwxr-xr-xqpid/python/examples/api/hello52
-rwxr-xr-xqpid/python/examples/api/hello_xml77
-rwxr-xr-xqpid/python/examples/api/server95
-rwxr-xr-xqpid/python/examples/api/spout123
-rw-r--r--qpid/python/examples/reservations/common.py80
-rwxr-xr-xqpid/python/examples/reservations/inventory94
-rwxr-xr-xqpid/python/examples/reservations/machine-agent103
-rwxr-xr-xqpid/python/examples/reservations/reserve197
10 files changed, 960 insertions, 0 deletions
diff --git a/qpid/python/examples/README.txt b/qpid/python/examples/README.txt
new file mode 100644
index 0000000000..4395160fec
--- /dev/null
+++ b/qpid/python/examples/README.txt
@@ -0,0 +1,42 @@
+The Python Examples
+===================
+
+README.txt -- This file.
+
+api -- Directory containing drain, spout,
+ sever, hello, and hello_xml examples.
+
+api/drain -- A simple messaging client that prints
+ messages from the source specified on
+ the command line.
+
+api/spout -- A simple messaging client that sends
+ messages to the target specified on the
+ command line.
+
+api/server -- An example server that process incoming
+ messages and sends replies.
+
+api/hello -- An example client that sends a message
+ and then receives it.
+
+api/hello_xml -- An example client that sends a message
+ to the xml exchange and then receives
+ it.
+
+
+reservations -- Directory containing an example machine
+ reservation system.
+
+reservations/common.py -- Utility code used by reserve,
+ machine-agent, and inventory scripts.
+
+reservations/reserve -- Messaging client for listing, reserving,
+ and releasing machines.
+
+reservations/machine-agent -- Messaging server that tracks and reports
+ on the status of its host machine and
+ listens for reservation requests.
+
+reservations/inventory -- Messaging server that tracks the last
+ known status of machines.
diff --git a/qpid/python/examples/api/drain b/qpid/python/examples/api/drain
new file mode 100755
index 0000000000..5e30153bc2
--- /dev/null
+++ b/qpid/python/examples/api/drain
@@ -0,0 +1,97 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse
+from qpid.messaging import *
+from qpid.util import URL
+from qpid.log import enable, DEBUG, WARN
+
+parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS ...",
+ description="Drain messages from the supplied address.")
+parser.add_option("-b", "--broker", default="localhost",
+ help="connect to specified BROKER (default %default)")
+parser.add_option("-c", "--count", type="int",
+ help="number of messages to drain")
+parser.add_option("-f", "--forever", action="store_true",
+ help="ignore timeout and wait forever")
+parser.add_option("-r", "--reconnect", action="store_true",
+ help="enable auto reconnect")
+parser.add_option("-i", "--reconnect-interval", type="float", default=3,
+ help="interval between reconnect attempts")
+parser.add_option("-l", "--reconnect-limit", type="int",
+ help="maximum number of reconnect attempts")
+parser.add_option("-t", "--timeout", type="float", default=0,
+ help="timeout in seconds to wait before exiting (default %default)")
+parser.add_option("-p", "--print", dest="format", default="%(M)s",
+ help="format string for printing messages (default %default)")
+parser.add_option("-v", dest="verbose", action="store_true",
+ help="enable logging")
+
+opts, args = parser.parse_args()
+
+if opts.verbose:
+ enable("qpid", DEBUG)
+else:
+ enable("qpid", WARN)
+
+if args:
+ addr = args.pop(0)
+else:
+ parser.error("address is required")
+if opts.forever:
+ timeout = None
+else:
+ timeout = opts.timeout
+
+class Formatter:
+
+ def __init__(self, message):
+ self.message = message
+ self.environ = {"M": self.message,
+ "P": self.message.properties,
+ "C": self.message.content}
+
+ def __getitem__(self, st):
+ return eval(st, self.environ)
+
+conn = Connection(opts.broker,
+ reconnect=opts.reconnect,
+ reconnect_interval=opts.reconnect_interval,
+ reconnect_limit=opts.reconnect_limit)
+try:
+ conn.open()
+ ssn = conn.session()
+ rcv = ssn.receiver(addr)
+
+ count = 0
+ while not opts.count or count < opts.count:
+ try:
+ msg = rcv.fetch(timeout=timeout)
+ print opts.format % Formatter(msg)
+ count += 1
+ ssn.acknowledge()
+ except Empty:
+ break
+except ReceiverError, e:
+ print e
+except KeyboardInterrupt:
+ pass
+
+conn.close()
diff --git a/qpid/python/examples/api/hello b/qpid/python/examples/api/hello
new file mode 100755
index 0000000000..ad314da19e
--- /dev/null
+++ b/qpid/python/examples/api/hello
@@ -0,0 +1,52 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+from qpid.messaging import *
+
+if len(sys.argv)<2:
+ broker = "localhost:5672"
+else:
+ broker = sys.argv[1]
+
+if len(sys.argv)<3:
+ address = "amq.topic"
+else:
+ address = sys.argv[2]
+
+connection = Connection(broker)
+
+try:
+ connection.open()
+ session = connection.session()
+
+ sender = session.sender(address)
+ receiver = session.receiver(address)
+
+ sender.send(Message("Hello world!"));
+
+ message = receiver.fetch()
+ print message.content
+ session.acknowledge()
+
+except MessagingError,m:
+ print m
+
+connection.close()
diff --git a/qpid/python/examples/api/hello_xml b/qpid/python/examples/api/hello_xml
new file mode 100755
index 0000000000..ab567ec5dd
--- /dev/null
+++ b/qpid/python/examples/api/hello_xml
@@ -0,0 +1,77 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import sys
+from qpid.messaging import *
+
+broker = "localhost:5672"
+connection = Connection(broker)
+
+try:
+ connection.open()
+ session = connection.session()
+
+# Set up the receiver
+ query = """
+ let $w := ./weather
+ return $w/station = 'Raleigh-Durham International Airport (KRDU)'
+ and $w/temperature_f > 50
+ and $w/temperature_f - $w/dewpoint > 5
+ and $w/wind_speed_mph > 7
+ and $w/wind_speed_mph < 20 """
+
+# query="./weather"
+
+ address = """
+ xml; {
+ create: always,
+ node:{ type: queue },
+ link: {
+ x-bindings: [{ exchange: xml, key: weather, arguments: { xquery: %r} }]
+ }
+ }
+ """ % query
+
+ receiver = session.receiver(address)
+
+# Send an observation
+
+ observations = """
+ <weather>
+ <station>Raleigh-Durham International Airport (KRDU)</station>
+ <wind_speed_mph>16</wind_speed_mph>
+ <temperature_f>70</temperature_f>
+ <dewpoint>35</dewpoint>
+ </weather> """
+
+ message = Message(subject="weather", content=observations)
+ sender = session.sender("xml")
+ sender.send(message)
+
+# Retrieve matching message from the receiver and print it
+
+ message = receiver.fetch(timeout=1)
+ print message.content
+ session.acknowledge()
+
+except MessagingError,m:
+ print m
+
+connection.close()
diff --git a/qpid/python/examples/api/server b/qpid/python/examples/api/server
new file mode 100755
index 0000000000..3b9a3560da
--- /dev/null
+++ b/qpid/python/examples/api/server
@@ -0,0 +1,95 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, sys, traceback
+from qpid.messaging import *
+from qpid.util import URL
+from subprocess import Popen, STDOUT, PIPE
+from qpid.log import enable, DEBUG, WARN
+
+parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS ...",
+ description="handle requests from the supplied address.")
+parser.add_option("-b", "--broker", default="localhost",
+ help="connect to specified BROKER (default %default)")
+parser.add_option("-r", "--reconnect", action="store_true",
+ help="enable auto reconnect")
+parser.add_option("-i", "--reconnect-interval", type="float", default=3,
+ help="interval between reconnect attempts")
+parser.add_option("-l", "--reconnect-limit", type="int",
+ help="maximum number of reconnect attempts")
+parser.add_option("-v", dest="verbose", action="store_true",
+ help="enable logging")
+
+opts, args = parser.parse_args()
+
+if opts.verbose:
+ enable("qpid", DEBUG)
+else:
+ enable("qpid", WARN)
+
+if args:
+ addr = args.pop(0)
+else:
+ parser.error("address is required")
+
+conn = Connection(opts.broker,
+ reconnect=opts.reconnect,
+ reconnect_interval=opts.reconnect_interval,
+ reconnect_limit=opts.reconnect_limit)
+def dispatch(msg):
+ msg_type = msg.properties.get("type")
+ if msg_type == "shell":
+ proc = Popen(msg.content, shell=True, stderr=STDOUT, stdin=PIPE, stdout=PIPE)
+ output, _ = proc.communicate()
+ result = Message(output)
+ result.properties["exit"] = proc.returncode
+ elif msg_type == "eval":
+ try:
+ content = eval(msg.content)
+ except:
+ content = traceback.format_exc()
+ result = Message(content)
+ else:
+ result = Message("unrecognized message type: %s" % msg_type)
+ return result
+
+try:
+ conn.open()
+ ssn = conn.session()
+ rcv = ssn.receiver(addr)
+
+ while True:
+ msg = rcv.fetch()
+ response = dispatch(msg)
+ snd = None
+ try:
+ snd = ssn.sender(msg.reply_to)
+ snd.send(response)
+ except SendError, e:
+ print e
+ if snd is not None:
+ snd.close()
+ ssn.acknowledge()
+except ReceiveError, e:
+ print e
+except KeyboardInterrupt:
+ pass
+
+conn.close()
diff --git a/qpid/python/examples/api/spout b/qpid/python/examples/api/spout
new file mode 100755
index 0000000000..c2dc4db380
--- /dev/null
+++ b/qpid/python/examples/api/spout
@@ -0,0 +1,123 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, time
+from qpid.messaging import *
+from qpid.util import URL
+from qpid.log import enable, DEBUG, WARN
+
+def nameval(st):
+ idx = st.find("=")
+ if idx >= 0:
+ name = st[0:idx]
+ value = st[idx+1:]
+ else:
+ name = st
+ value = None
+ return name, value
+
+parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS [ CONTENT ... ]",
+ description="Send messages to the supplied address.")
+parser.add_option("-b", "--broker", default="localhost",
+ help="connect to specified BROKER (default %default)")
+parser.add_option("-r", "--reconnect", action="store_true",
+ help="enable auto reconnect")
+parser.add_option("-i", "--reconnect-interval", type="float", default=3,
+ help="interval between reconnect attempts")
+parser.add_option("-l", "--reconnect-limit", type="int",
+ help="maximum number of reconnect attempts")
+parser.add_option("-c", "--count", type="int", default=1,
+ help="stop after count messages have been sent, zero disables (default %default)")
+parser.add_option("-t", "--timeout", type="float", default=None,
+ help="exit after the specified time")
+parser.add_option("-I", "--id", help="use the supplied id instead of generating one")
+parser.add_option("-S", "--subject", help="specify a subject")
+parser.add_option("-R", "--reply-to", help="specify reply-to address")
+parser.add_option("-P", "--property", dest="properties", action="append", default=[],
+ metavar="NAME=VALUE", help="specify message property")
+parser.add_option("-M", "--map", dest="entries", action="append", default=[],
+ metavar="KEY=VALUE",
+ help="specify map entry for message body")
+parser.add_option("-v", dest="verbose", action="store_true",
+ help="enable logging")
+
+opts, args = parser.parse_args()
+
+if opts.verbose:
+ enable("qpid", DEBUG)
+else:
+ enable("qpid", WARN)
+
+if opts.id is None:
+ spout_id = str(uuid4())
+else:
+ spout_id = opts.id
+if args:
+ addr = args.pop(0)
+else:
+ parser.error("address is required")
+
+content = None
+
+if args:
+ text = " ".join(args)
+else:
+ text = None
+
+if opts.entries:
+ content = {}
+ if text:
+ content["text"] = text
+ for e in opts.entries:
+ name, val = nameval(e)
+ content[name] = val
+else:
+ content = text
+
+conn = Connection(opts.broker,
+ reconnect=opts.reconnect,
+ reconnect_interval=opts.reconnect_interval,
+ reconnect_limit=opts.reconnect_limit)
+try:
+ conn.open()
+ ssn = conn.session()
+ snd = ssn.sender(addr)
+
+ count = 0
+ start = time.time()
+ while (opts.count == 0 or count < opts.count) and \
+ (opts.timeout is None or time.time() - start < opts.timeout):
+ msg = Message(subject=opts.subject,
+ reply_to=opts.reply_to,
+ content=content)
+ msg.properties["spout-id"] = "%s:%s" % (spout_id, count)
+ for p in opts.properties:
+ name, val = nameval(p)
+ msg.properties[name] = val
+
+ snd.send(msg)
+ count += 1
+ print msg
+except SendError, e:
+ print e
+except KeyboardInterrupt:
+ pass
+
+conn.close()
diff --git a/qpid/python/examples/reservations/common.py b/qpid/python/examples/reservations/common.py
new file mode 100644
index 0000000000..12f07e1c92
--- /dev/null
+++ b/qpid/python/examples/reservations/common.py
@@ -0,0 +1,80 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import traceback
+from fnmatch import fnmatch
+from qpid.messaging import *
+
+class Dispatcher:
+
+ def unhandled(self, msg):
+ print "UNHANDLED MESSAGE: %s" % msg
+
+ def ignored(self, msg):
+ return False
+
+ def dispatch(self, msg):
+ try:
+ if self.ignored(msg):
+ return ()
+ else:
+ type = msg.properties.get("type")
+ replies = getattr(self, "do_%s" % type, self.unhandled)(msg)
+ if replies is None:
+ return ()
+ else:
+ return replies
+ except:
+ traceback.print_exc()
+ return ()
+
+ def run(self, session):
+ while self.running():
+ msg = session.next_receiver().fetch()
+ replies = self.dispatch(msg)
+
+ count = len(replies)
+ sequence = 1
+ for to, r in replies:
+ r.correlation_id = msg.correlation_id
+ r.properties["count"] = count
+ r.properties["sequence"] = sequence
+ sequence += 1
+ try:
+ snd = session.sender(to)
+ snd.send(r)
+ except SendError, e:
+ print e
+ finally:
+ snd.close()
+
+ session.acknowledge(msg)
+
+def get_status(msg):
+ return msg.content["identity"], msg.content["status"], msg.content["owner"]
+
+FREE = "free"
+BUSY = "busy"
+
+def match(value, patterns):
+ for p in patterns:
+ if fnmatch(value, p):
+ return True
+ return False
diff --git a/qpid/python/examples/reservations/inventory b/qpid/python/examples/reservations/inventory
new file mode 100755
index 0000000000..0a49643e5f
--- /dev/null
+++ b/qpid/python/examples/reservations/inventory
@@ -0,0 +1,94 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, traceback
+from qpid.messaging import *
+from qpid.log import enable, DEBUG, WARN
+from common import *
+
+parser = optparse.OptionParser(usage="usage: %prog [options]",
+ description="machine inventory agent")
+parser.add_option("-b", "--broker", default="localhost",
+ help="connect to specified BROKER (default %default)")
+parser.add_option("-d", "--database",
+ help="database file for persistent machine status")
+parser.add_option("-a", "--address", default="reservations",
+ help="address for reservation requests")
+parser.add_option("-v", dest="verbose", action="store_true",
+ help="enable verbose logging")
+
+opts, args = parser.parse_args()
+
+if opts.verbose:
+ enable("qpid", DEBUG)
+else:
+ enable("qpid", WARN)
+
+conn = Connection.establish(opts.broker, reconnect=True, reconnect_interval=1)
+
+class Inventory(Dispatcher):
+
+ def __init__(self):
+ self.agents = {}
+
+ def running(self):
+ return True
+
+ def do_status(self, msg):
+ id, status, owner = get_status(msg)
+ self.agents[id] = (status, owner)
+
+ def do_query(self, msg):
+ patterns = msg.content["identity"]
+ result = []
+ for id, (status, owner) in self.agents.items():
+ if match(id, patterns):
+ r = Message(properties = {
+ "type": "status"
+ },
+ content = {
+ "identity": id,
+ "status": status,
+ "owner": owner
+ })
+ result.append((msg.reply_to, r))
+ continue
+ if not result:
+ result.append((msg.reply_to,
+ Message(properties = {"type": "empty"})))
+ return result
+
+ def ignored(self, msg):
+ type = msg.properties.get("type")
+ return type not in ("status", "query")
+
+try:
+ ssn = conn.session()
+ rcv = ssn.receiver(opts.address, capacity = 10)
+ snd = ssn.sender(opts.address)
+ snd.send(Message(reply_to = opts.address,
+ properties = {"type": "discover", "identity": ["*"]}))
+
+ inv = Inventory()
+ inv.run(ssn)
+except KeyboardInterrupt:
+ pass
+finally:
+ conn.close()
diff --git a/qpid/python/examples/reservations/machine-agent b/qpid/python/examples/reservations/machine-agent
new file mode 100755
index 0000000000..a221a8b6de
--- /dev/null
+++ b/qpid/python/examples/reservations/machine-agent
@@ -0,0 +1,103 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, socket
+from qpid.messaging import *
+from qpid.log import enable, DEBUG, WARN
+from common import *
+
+host = socket.gethostname()
+
+parser = optparse.OptionParser(usage="usage: %prog [options]",
+ description="machine reservation agent")
+parser.add_option("-b", "--broker", default="localhost",
+ help="connect to specified BROKER (default %default)")
+parser.add_option("-d", "--database",
+ help="database file for persistent machine status")
+parser.add_option("-a", "--address", default="reservations",
+ help="address for reservation requests")
+parser.add_option("-i", "--identity", default=host,
+ help="resource id (default %default)")
+parser.add_option("-v", dest="verbose", action="store_true",
+ help="enable verbose logging")
+
+opts, args = parser.parse_args()
+
+if opts.verbose:
+ enable("qpid", DEBUG)
+else:
+ enable("qpid", WARN)
+
+conn = Connection.establish(opts.broker, reconnect=True, reconnect_interval=1)
+
+
+class Agent(Dispatcher):
+
+ def __init__(self, identity):
+ self.identity = identity
+ self.status = FREE
+ self.owner = None
+
+ def running(self):
+ return True
+
+ def get_status(self):
+ msg = Message(properties = {"type": "status"},
+ content = {"identity": self.identity,
+ "status": self.status,
+ "owner": self.owner})
+ return msg
+
+ def do_discover(self, msg):
+ r = self.get_status()
+ return [(msg.reply_to, r)]
+
+ def do_reserve(self, msg):
+ if self.status == FREE:
+ self.owner = msg.content["owner"]
+ self.status = BUSY
+ return self.do_discover(msg)
+
+ def do_release(self, msg):
+ if self.owner == msg.content["owner"]:
+ self.status = FREE
+ self.owner = None
+ return self.do_discover(msg)
+
+ def ignored(self, msg):
+ patterns = msg.properties.get("identity")
+ type = msg.properties.get("type")
+ if patterns and match(self.identity, patterns):
+ return type == "status"
+ else:
+ return True
+
+try:
+ ssn = conn.session()
+ rcv = ssn.receiver(opts.address)
+ rcv.capacity = 10
+ snd = ssn.sender(opts.address)
+ agent = Agent(opts.identity)
+ snd.send(agent.get_status())
+ agent.run(ssn)
+except KeyboardInterrupt:
+ pass
+finally:
+ conn.close()
diff --git a/qpid/python/examples/reservations/reserve b/qpid/python/examples/reservations/reserve
new file mode 100755
index 0000000000..68e7fee912
--- /dev/null
+++ b/qpid/python/examples/reservations/reserve
@@ -0,0 +1,197 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import optparse, os, sys, time
+from uuid import uuid4
+from qpid.messaging import *
+from qpid.log import enable, DEBUG, WARN
+from common import *
+
+parser = optparse.OptionParser(usage="usage: %prog [options] PATTERN ...",
+ description="reserve a machine")
+parser.add_option("-b", "--broker", default="localhost",
+ help="connect to specified BROKER (default %default)")
+parser.add_option("-a", "--address", default="reservations",
+ help="address for reservation requests")
+parser.add_option("-r", "--release", action="store_true",
+ help="release any machines matching the pattern")
+parser.add_option("-s", "--status", action="store_true",
+ help="list machine status")
+parser.add_option("-d", "--discover", action="store_true",
+ help="use discovery instead of inventory")
+parser.add_option("-o", "--owner", default=os.environ["USER"],
+ help="the holder of the reservation")
+parser.add_option("-n", "--number", type=int, default=1,
+ help="the number of machines to reserve")
+parser.add_option("-t", "--timeout", type=float, default=10,
+ help="timeout in seconds to wait for resources")
+parser.add_option("-v", dest="verbose", action="store_true",
+ help="enable verbose logging")
+
+opts, args = parser.parse_args()
+
+if opts.verbose:
+ enable("qpid", DEBUG)
+else:
+ enable("qpid", WARN)
+
+if args:
+ patterns = args
+else:
+ patterns = ["*"]
+
+conn = Connection.establish(opts.broker)
+
+if opts.release:
+ request_type = "release"
+ candidate_status = BUSY
+ candidate_owner = opts.owner
+else:
+ request_type = "reserve"
+ candidate_status = FREE
+ candidate_owner = None
+
+class Requester(Dispatcher):
+
+ def __init__(self):
+ self.agents = {}
+ self.requests = set()
+ self.outstanding = set()
+
+ def agent_status(self, id):
+ status, owner = self.agents[id]
+ if owner:
+ return "%s %s(%s)" % (id, status, owner)
+ else:
+ return "%s %s" % (id, status)
+
+ def correlation(self, cid):
+ self.requests.add(cid)
+ self.outstanding.add(cid)
+
+ def ignored(self, msg):
+ return msg.properties.get("type") not in ("status", "empty") or \
+ msg.correlation_id not in self.requests
+
+ def do_status(self, msg):
+ id, status, owner = get_status(msg)
+ self.agents[id] = (status, owner)
+
+ if opts.status:
+ print self.agent_status(id)
+
+ def do_empty(self, msg):
+ print "no matching resources"
+
+ def candidates(self, candidate_status, candidate_owner):
+ for id, (status, owner) in self.agents.items():
+ if status == candidate_status and owner == candidate_owner:
+ yield id
+
+ def dispatch(self, msg):
+ result = Dispatcher.dispatch(self, msg)
+ count = msg.properties.get("count")
+ sequence = msg.properties.get("sequence")
+ if count and sequence == count:
+ self.outstanding.discard(msg.correlation_id)
+ return result
+
+try:
+ ssn = conn.session()
+ rcv = ssn.receiver(opts.address, capacity=10)
+ snd = ssn.sender(opts.address)
+
+ correlation_id = str(uuid4())
+
+ if opts.discover:
+ properties = {"type": "discover", "identity": patterns}
+ content = None
+ else:
+ properties = {"type": "query"}
+ content = {"identity": patterns}
+
+ snd.send(Message(reply_to = opts.address,
+ correlation_id = correlation_id,
+ properties = properties,
+ content = content))
+
+ req = Requester()
+ req.correlation(correlation_id)
+
+ start = time.time()
+ ellapsed = 0
+ requested = set()
+ discovering = opts.discover
+
+ while ellapsed <= opts.timeout and (discovering or req.outstanding):
+ try:
+ msg = rcv.fetch(opts.timeout - ellapsed)
+ ssn.acknowledge(msg)
+ except Empty:
+ continue
+ finally:
+ ellapsed = time.time() - start
+
+ req.dispatch(msg)
+ if not opts.status:
+ if len(requested) < opts.number:
+ for cid in req.candidates(candidate_status, candidate_owner):
+ if cid in requested: continue
+ req_msg = Message(reply_to = opts.address,
+ correlation_id = str(uuid4()),
+ properties = {"type": request_type,
+ "identity": [cid]},
+ content = {"owner": opts.owner})
+ if not requested:
+ print "requesting %s:" % request_type,
+ print cid,
+ sys.stdout.flush()
+ req.correlation(req_msg.correlation_id)
+ snd.send(req_msg)
+ requested.add(cid)
+ else:
+ discovering = False
+
+ if requested:
+ print
+ owners = {}
+ for id in requested:
+ st, ow = req.agents[id]
+ if not owners.has_key(ow):
+ owners[ow] = []
+ owners[ow].append(id)
+ keys = list(owners.keys())
+ keys.sort()
+ for k in keys:
+ owners[k].sort()
+ v = ", ".join(owners[k])
+ if k is None:
+ print "free: %s" % v
+ else:
+ print "owner %s: %s" % (k, v)
+ elif req.agents and not opts.status:
+ print "no available resources"
+
+ if req.outstanding:
+ print "request timed out"
+except KeyboardInterrupt:
+ pass
+finally:
+ conn.close()