diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /qpid/python/examples | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-rajith_jms_client.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/python/examples')
-rw-r--r-- | qpid/python/examples/README.txt | 42 | ||||
-rwxr-xr-x | qpid/python/examples/api/drain | 97 | ||||
-rwxr-xr-x | qpid/python/examples/api/hello | 52 | ||||
-rwxr-xr-x | qpid/python/examples/api/hello_xml | 77 | ||||
-rwxr-xr-x | qpid/python/examples/api/server | 95 | ||||
-rwxr-xr-x | qpid/python/examples/api/spout | 123 | ||||
-rw-r--r-- | qpid/python/examples/reservations/common.py | 80 | ||||
-rwxr-xr-x | qpid/python/examples/reservations/inventory | 94 | ||||
-rwxr-xr-x | qpid/python/examples/reservations/machine-agent | 103 | ||||
-rwxr-xr-x | qpid/python/examples/reservations/reserve | 197 |
10 files changed, 960 insertions, 0 deletions
diff --git a/qpid/python/examples/README.txt b/qpid/python/examples/README.txt new file mode 100644 index 0000000000..4395160fec --- /dev/null +++ b/qpid/python/examples/README.txt @@ -0,0 +1,42 @@ +The Python Examples +=================== + +README.txt -- This file. + +api -- Directory containing drain, spout, + sever, hello, and hello_xml examples. + +api/drain -- A simple messaging client that prints + messages from the source specified on + the command line. + +api/spout -- A simple messaging client that sends + messages to the target specified on the + command line. + +api/server -- An example server that process incoming + messages and sends replies. + +api/hello -- An example client that sends a message + and then receives it. + +api/hello_xml -- An example client that sends a message + to the xml exchange and then receives + it. + + +reservations -- Directory containing an example machine + reservation system. + +reservations/common.py -- Utility code used by reserve, + machine-agent, and inventory scripts. + +reservations/reserve -- Messaging client for listing, reserving, + and releasing machines. + +reservations/machine-agent -- Messaging server that tracks and reports + on the status of its host machine and + listens for reservation requests. + +reservations/inventory -- Messaging server that tracks the last + known status of machines. diff --git a/qpid/python/examples/api/drain b/qpid/python/examples/api/drain new file mode 100755 index 0000000000..5e30153bc2 --- /dev/null +++ b/qpid/python/examples/api/drain @@ -0,0 +1,97 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse +from qpid.messaging import * +from qpid.util import URL +from qpid.log import enable, DEBUG, WARN + +parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS ...", + description="Drain messages from the supplied address.") +parser.add_option("-b", "--broker", default="localhost", + help="connect to specified BROKER (default %default)") +parser.add_option("-c", "--count", type="int", + help="number of messages to drain") +parser.add_option("-f", "--forever", action="store_true", + help="ignore timeout and wait forever") +parser.add_option("-r", "--reconnect", action="store_true", + help="enable auto reconnect") +parser.add_option("-i", "--reconnect-interval", type="float", default=3, + help="interval between reconnect attempts") +parser.add_option("-l", "--reconnect-limit", type="int", + help="maximum number of reconnect attempts") +parser.add_option("-t", "--timeout", type="float", default=0, + help="timeout in seconds to wait before exiting (default %default)") +parser.add_option("-p", "--print", dest="format", default="%(M)s", + help="format string for printing messages (default %default)") +parser.add_option("-v", dest="verbose", action="store_true", + help="enable logging") + +opts, args = parser.parse_args() + +if opts.verbose: + enable("qpid", DEBUG) +else: + enable("qpid", WARN) + +if args: + addr = args.pop(0) +else: + parser.error("address is required") +if opts.forever: + timeout = None +else: + timeout = opts.timeout + +class Formatter: + + def __init__(self, message): + self.message = message + self.environ = {"M": self.message, + "P": self.message.properties, + "C": self.message.content} + + def __getitem__(self, st): + return eval(st, self.environ) + +conn = Connection(opts.broker, + reconnect=opts.reconnect, + reconnect_interval=opts.reconnect_interval, + reconnect_limit=opts.reconnect_limit) +try: + conn.open() + ssn = conn.session() + rcv = ssn.receiver(addr) + + count = 0 + while not opts.count or count < opts.count: + try: + msg = rcv.fetch(timeout=timeout) + print opts.format % Formatter(msg) + count += 1 + ssn.acknowledge() + except Empty: + break +except ReceiverError, e: + print e +except KeyboardInterrupt: + pass + +conn.close() diff --git a/qpid/python/examples/api/hello b/qpid/python/examples/api/hello new file mode 100755 index 0000000000..ad314da19e --- /dev/null +++ b/qpid/python/examples/api/hello @@ -0,0 +1,52 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys +from qpid.messaging import * + +if len(sys.argv)<2: + broker = "localhost:5672" +else: + broker = sys.argv[1] + +if len(sys.argv)<3: + address = "amq.topic" +else: + address = sys.argv[2] + +connection = Connection(broker) + +try: + connection.open() + session = connection.session() + + sender = session.sender(address) + receiver = session.receiver(address) + + sender.send(Message("Hello world!")); + + message = receiver.fetch() + print message.content + session.acknowledge() + +except MessagingError,m: + print m + +connection.close() diff --git a/qpid/python/examples/api/hello_xml b/qpid/python/examples/api/hello_xml new file mode 100755 index 0000000000..ab567ec5dd --- /dev/null +++ b/qpid/python/examples/api/hello_xml @@ -0,0 +1,77 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys +from qpid.messaging import * + +broker = "localhost:5672" +connection = Connection(broker) + +try: + connection.open() + session = connection.session() + +# Set up the receiver + query = """ + let $w := ./weather + return $w/station = 'Raleigh-Durham International Airport (KRDU)' + and $w/temperature_f > 50 + and $w/temperature_f - $w/dewpoint > 5 + and $w/wind_speed_mph > 7 + and $w/wind_speed_mph < 20 """ + +# query="./weather" + + address = """ + xml; { + create: always, + node:{ type: queue }, + link: { + x-bindings: [{ exchange: xml, key: weather, arguments: { xquery: %r} }] + } + } + """ % query + + receiver = session.receiver(address) + +# Send an observation + + observations = """ + <weather> + <station>Raleigh-Durham International Airport (KRDU)</station> + <wind_speed_mph>16</wind_speed_mph> + <temperature_f>70</temperature_f> + <dewpoint>35</dewpoint> + </weather> """ + + message = Message(subject="weather", content=observations) + sender = session.sender("xml") + sender.send(message) + +# Retrieve matching message from the receiver and print it + + message = receiver.fetch(timeout=1) + print message.content + session.acknowledge() + +except MessagingError,m: + print m + +connection.close() diff --git a/qpid/python/examples/api/server b/qpid/python/examples/api/server new file mode 100755 index 0000000000..3b9a3560da --- /dev/null +++ b/qpid/python/examples/api/server @@ -0,0 +1,95 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse, sys, traceback +from qpid.messaging import * +from qpid.util import URL +from subprocess import Popen, STDOUT, PIPE +from qpid.log import enable, DEBUG, WARN + +parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS ...", + description="handle requests from the supplied address.") +parser.add_option("-b", "--broker", default="localhost", + help="connect to specified BROKER (default %default)") +parser.add_option("-r", "--reconnect", action="store_true", + help="enable auto reconnect") +parser.add_option("-i", "--reconnect-interval", type="float", default=3, + help="interval between reconnect attempts") +parser.add_option("-l", "--reconnect-limit", type="int", + help="maximum number of reconnect attempts") +parser.add_option("-v", dest="verbose", action="store_true", + help="enable logging") + +opts, args = parser.parse_args() + +if opts.verbose: + enable("qpid", DEBUG) +else: + enable("qpid", WARN) + +if args: + addr = args.pop(0) +else: + parser.error("address is required") + +conn = Connection(opts.broker, + reconnect=opts.reconnect, + reconnect_interval=opts.reconnect_interval, + reconnect_limit=opts.reconnect_limit) +def dispatch(msg): + msg_type = msg.properties.get("type") + if msg_type == "shell": + proc = Popen(msg.content, shell=True, stderr=STDOUT, stdin=PIPE, stdout=PIPE) + output, _ = proc.communicate() + result = Message(output) + result.properties["exit"] = proc.returncode + elif msg_type == "eval": + try: + content = eval(msg.content) + except: + content = traceback.format_exc() + result = Message(content) + else: + result = Message("unrecognized message type: %s" % msg_type) + return result + +try: + conn.open() + ssn = conn.session() + rcv = ssn.receiver(addr) + + while True: + msg = rcv.fetch() + response = dispatch(msg) + snd = None + try: + snd = ssn.sender(msg.reply_to) + snd.send(response) + except SendError, e: + print e + if snd is not None: + snd.close() + ssn.acknowledge() +except ReceiveError, e: + print e +except KeyboardInterrupt: + pass + +conn.close() diff --git a/qpid/python/examples/api/spout b/qpid/python/examples/api/spout new file mode 100755 index 0000000000..c2dc4db380 --- /dev/null +++ b/qpid/python/examples/api/spout @@ -0,0 +1,123 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse, time +from qpid.messaging import * +from qpid.util import URL +from qpid.log import enable, DEBUG, WARN + +def nameval(st): + idx = st.find("=") + if idx >= 0: + name = st[0:idx] + value = st[idx+1:] + else: + name = st + value = None + return name, value + +parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS [ CONTENT ... ]", + description="Send messages to the supplied address.") +parser.add_option("-b", "--broker", default="localhost", + help="connect to specified BROKER (default %default)") +parser.add_option("-r", "--reconnect", action="store_true", + help="enable auto reconnect") +parser.add_option("-i", "--reconnect-interval", type="float", default=3, + help="interval between reconnect attempts") +parser.add_option("-l", "--reconnect-limit", type="int", + help="maximum number of reconnect attempts") +parser.add_option("-c", "--count", type="int", default=1, + help="stop after count messages have been sent, zero disables (default %default)") +parser.add_option("-t", "--timeout", type="float", default=None, + help="exit after the specified time") +parser.add_option("-I", "--id", help="use the supplied id instead of generating one") +parser.add_option("-S", "--subject", help="specify a subject") +parser.add_option("-R", "--reply-to", help="specify reply-to address") +parser.add_option("-P", "--property", dest="properties", action="append", default=[], + metavar="NAME=VALUE", help="specify message property") +parser.add_option("-M", "--map", dest="entries", action="append", default=[], + metavar="KEY=VALUE", + help="specify map entry for message body") +parser.add_option("-v", dest="verbose", action="store_true", + help="enable logging") + +opts, args = parser.parse_args() + +if opts.verbose: + enable("qpid", DEBUG) +else: + enable("qpid", WARN) + +if opts.id is None: + spout_id = str(uuid4()) +else: + spout_id = opts.id +if args: + addr = args.pop(0) +else: + parser.error("address is required") + +content = None + +if args: + text = " ".join(args) +else: + text = None + +if opts.entries: + content = {} + if text: + content["text"] = text + for e in opts.entries: + name, val = nameval(e) + content[name] = val +else: + content = text + +conn = Connection(opts.broker, + reconnect=opts.reconnect, + reconnect_interval=opts.reconnect_interval, + reconnect_limit=opts.reconnect_limit) +try: + conn.open() + ssn = conn.session() + snd = ssn.sender(addr) + + count = 0 + start = time.time() + while (opts.count == 0 or count < opts.count) and \ + (opts.timeout is None or time.time() - start < opts.timeout): + msg = Message(subject=opts.subject, + reply_to=opts.reply_to, + content=content) + msg.properties["spout-id"] = "%s:%s" % (spout_id, count) + for p in opts.properties: + name, val = nameval(p) + msg.properties[name] = val + + snd.send(msg) + count += 1 + print msg +except SendError, e: + print e +except KeyboardInterrupt: + pass + +conn.close() diff --git a/qpid/python/examples/reservations/common.py b/qpid/python/examples/reservations/common.py new file mode 100644 index 0000000000..12f07e1c92 --- /dev/null +++ b/qpid/python/examples/reservations/common.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import traceback +from fnmatch import fnmatch +from qpid.messaging import * + +class Dispatcher: + + def unhandled(self, msg): + print "UNHANDLED MESSAGE: %s" % msg + + def ignored(self, msg): + return False + + def dispatch(self, msg): + try: + if self.ignored(msg): + return () + else: + type = msg.properties.get("type") + replies = getattr(self, "do_%s" % type, self.unhandled)(msg) + if replies is None: + return () + else: + return replies + except: + traceback.print_exc() + return () + + def run(self, session): + while self.running(): + msg = session.next_receiver().fetch() + replies = self.dispatch(msg) + + count = len(replies) + sequence = 1 + for to, r in replies: + r.correlation_id = msg.correlation_id + r.properties["count"] = count + r.properties["sequence"] = sequence + sequence += 1 + try: + snd = session.sender(to) + snd.send(r) + except SendError, e: + print e + finally: + snd.close() + + session.acknowledge(msg) + +def get_status(msg): + return msg.content["identity"], msg.content["status"], msg.content["owner"] + +FREE = "free" +BUSY = "busy" + +def match(value, patterns): + for p in patterns: + if fnmatch(value, p): + return True + return False diff --git a/qpid/python/examples/reservations/inventory b/qpid/python/examples/reservations/inventory new file mode 100755 index 0000000000..0a49643e5f --- /dev/null +++ b/qpid/python/examples/reservations/inventory @@ -0,0 +1,94 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse, traceback +from qpid.messaging import * +from qpid.log import enable, DEBUG, WARN +from common import * + +parser = optparse.OptionParser(usage="usage: %prog [options]", + description="machine inventory agent") +parser.add_option("-b", "--broker", default="localhost", + help="connect to specified BROKER (default %default)") +parser.add_option("-d", "--database", + help="database file for persistent machine status") +parser.add_option("-a", "--address", default="reservations", + help="address for reservation requests") +parser.add_option("-v", dest="verbose", action="store_true", + help="enable verbose logging") + +opts, args = parser.parse_args() + +if opts.verbose: + enable("qpid", DEBUG) +else: + enable("qpid", WARN) + +conn = Connection.establish(opts.broker, reconnect=True, reconnect_interval=1) + +class Inventory(Dispatcher): + + def __init__(self): + self.agents = {} + + def running(self): + return True + + def do_status(self, msg): + id, status, owner = get_status(msg) + self.agents[id] = (status, owner) + + def do_query(self, msg): + patterns = msg.content["identity"] + result = [] + for id, (status, owner) in self.agents.items(): + if match(id, patterns): + r = Message(properties = { + "type": "status" + }, + content = { + "identity": id, + "status": status, + "owner": owner + }) + result.append((msg.reply_to, r)) + continue + if not result: + result.append((msg.reply_to, + Message(properties = {"type": "empty"}))) + return result + + def ignored(self, msg): + type = msg.properties.get("type") + return type not in ("status", "query") + +try: + ssn = conn.session() + rcv = ssn.receiver(opts.address, capacity = 10) + snd = ssn.sender(opts.address) + snd.send(Message(reply_to = opts.address, + properties = {"type": "discover", "identity": ["*"]})) + + inv = Inventory() + inv.run(ssn) +except KeyboardInterrupt: + pass +finally: + conn.close() diff --git a/qpid/python/examples/reservations/machine-agent b/qpid/python/examples/reservations/machine-agent new file mode 100755 index 0000000000..a221a8b6de --- /dev/null +++ b/qpid/python/examples/reservations/machine-agent @@ -0,0 +1,103 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse, socket +from qpid.messaging import * +from qpid.log import enable, DEBUG, WARN +from common import * + +host = socket.gethostname() + +parser = optparse.OptionParser(usage="usage: %prog [options]", + description="machine reservation agent") +parser.add_option("-b", "--broker", default="localhost", + help="connect to specified BROKER (default %default)") +parser.add_option("-d", "--database", + help="database file for persistent machine status") +parser.add_option("-a", "--address", default="reservations", + help="address for reservation requests") +parser.add_option("-i", "--identity", default=host, + help="resource id (default %default)") +parser.add_option("-v", dest="verbose", action="store_true", + help="enable verbose logging") + +opts, args = parser.parse_args() + +if opts.verbose: + enable("qpid", DEBUG) +else: + enable("qpid", WARN) + +conn = Connection.establish(opts.broker, reconnect=True, reconnect_interval=1) + + +class Agent(Dispatcher): + + def __init__(self, identity): + self.identity = identity + self.status = FREE + self.owner = None + + def running(self): + return True + + def get_status(self): + msg = Message(properties = {"type": "status"}, + content = {"identity": self.identity, + "status": self.status, + "owner": self.owner}) + return msg + + def do_discover(self, msg): + r = self.get_status() + return [(msg.reply_to, r)] + + def do_reserve(self, msg): + if self.status == FREE: + self.owner = msg.content["owner"] + self.status = BUSY + return self.do_discover(msg) + + def do_release(self, msg): + if self.owner == msg.content["owner"]: + self.status = FREE + self.owner = None + return self.do_discover(msg) + + def ignored(self, msg): + patterns = msg.properties.get("identity") + type = msg.properties.get("type") + if patterns and match(self.identity, patterns): + return type == "status" + else: + return True + +try: + ssn = conn.session() + rcv = ssn.receiver(opts.address) + rcv.capacity = 10 + snd = ssn.sender(opts.address) + agent = Agent(opts.identity) + snd.send(agent.get_status()) + agent.run(ssn) +except KeyboardInterrupt: + pass +finally: + conn.close() diff --git a/qpid/python/examples/reservations/reserve b/qpid/python/examples/reservations/reserve new file mode 100755 index 0000000000..68e7fee912 --- /dev/null +++ b/qpid/python/examples/reservations/reserve @@ -0,0 +1,197 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import optparse, os, sys, time +from uuid import uuid4 +from qpid.messaging import * +from qpid.log import enable, DEBUG, WARN +from common import * + +parser = optparse.OptionParser(usage="usage: %prog [options] PATTERN ...", + description="reserve a machine") +parser.add_option("-b", "--broker", default="localhost", + help="connect to specified BROKER (default %default)") +parser.add_option("-a", "--address", default="reservations", + help="address for reservation requests") +parser.add_option("-r", "--release", action="store_true", + help="release any machines matching the pattern") +parser.add_option("-s", "--status", action="store_true", + help="list machine status") +parser.add_option("-d", "--discover", action="store_true", + help="use discovery instead of inventory") +parser.add_option("-o", "--owner", default=os.environ["USER"], + help="the holder of the reservation") +parser.add_option("-n", "--number", type=int, default=1, + help="the number of machines to reserve") +parser.add_option("-t", "--timeout", type=float, default=10, + help="timeout in seconds to wait for resources") +parser.add_option("-v", dest="verbose", action="store_true", + help="enable verbose logging") + +opts, args = parser.parse_args() + +if opts.verbose: + enable("qpid", DEBUG) +else: + enable("qpid", WARN) + +if args: + patterns = args +else: + patterns = ["*"] + +conn = Connection.establish(opts.broker) + +if opts.release: + request_type = "release" + candidate_status = BUSY + candidate_owner = opts.owner +else: + request_type = "reserve" + candidate_status = FREE + candidate_owner = None + +class Requester(Dispatcher): + + def __init__(self): + self.agents = {} + self.requests = set() + self.outstanding = set() + + def agent_status(self, id): + status, owner = self.agents[id] + if owner: + return "%s %s(%s)" % (id, status, owner) + else: + return "%s %s" % (id, status) + + def correlation(self, cid): + self.requests.add(cid) + self.outstanding.add(cid) + + def ignored(self, msg): + return msg.properties.get("type") not in ("status", "empty") or \ + msg.correlation_id not in self.requests + + def do_status(self, msg): + id, status, owner = get_status(msg) + self.agents[id] = (status, owner) + + if opts.status: + print self.agent_status(id) + + def do_empty(self, msg): + print "no matching resources" + + def candidates(self, candidate_status, candidate_owner): + for id, (status, owner) in self.agents.items(): + if status == candidate_status and owner == candidate_owner: + yield id + + def dispatch(self, msg): + result = Dispatcher.dispatch(self, msg) + count = msg.properties.get("count") + sequence = msg.properties.get("sequence") + if count and sequence == count: + self.outstanding.discard(msg.correlation_id) + return result + +try: + ssn = conn.session() + rcv = ssn.receiver(opts.address, capacity=10) + snd = ssn.sender(opts.address) + + correlation_id = str(uuid4()) + + if opts.discover: + properties = {"type": "discover", "identity": patterns} + content = None + else: + properties = {"type": "query"} + content = {"identity": patterns} + + snd.send(Message(reply_to = opts.address, + correlation_id = correlation_id, + properties = properties, + content = content)) + + req = Requester() + req.correlation(correlation_id) + + start = time.time() + ellapsed = 0 + requested = set() + discovering = opts.discover + + while ellapsed <= opts.timeout and (discovering or req.outstanding): + try: + msg = rcv.fetch(opts.timeout - ellapsed) + ssn.acknowledge(msg) + except Empty: + continue + finally: + ellapsed = time.time() - start + + req.dispatch(msg) + if not opts.status: + if len(requested) < opts.number: + for cid in req.candidates(candidate_status, candidate_owner): + if cid in requested: continue + req_msg = Message(reply_to = opts.address, + correlation_id = str(uuid4()), + properties = {"type": request_type, + "identity": [cid]}, + content = {"owner": opts.owner}) + if not requested: + print "requesting %s:" % request_type, + print cid, + sys.stdout.flush() + req.correlation(req_msg.correlation_id) + snd.send(req_msg) + requested.add(cid) + else: + discovering = False + + if requested: + print + owners = {} + for id in requested: + st, ow = req.agents[id] + if not owners.has_key(ow): + owners[ow] = [] + owners[ow].append(id) + keys = list(owners.keys()) + keys.sort() + for k in keys: + owners[k].sort() + v = ", ".join(owners[k]) + if k is None: + print "free: %s" % v + else: + print "owner %s: %s" % (k, v) + elif req.agents and not opts.status: + print "no available resources" + + if req.outstanding: + print "request timed out" +except KeyboardInterrupt: + pass +finally: + conn.close() |