diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /python/examples | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-rajith_jms_client.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/examples')
-rw-r--r-- | python/examples/README.txt | 42 | ||||
-rwxr-xr-x | python/examples/api/drain | 97 | ||||
-rwxr-xr-x | python/examples/api/hello | 52 | ||||
-rwxr-xr-x | python/examples/api/hello_xml | 77 | ||||
-rwxr-xr-x | python/examples/api/server | 95 | ||||
-rwxr-xr-x | python/examples/api/spout | 123 | ||||
-rw-r--r-- | python/examples/reservations/common.py | 80 | ||||
-rwxr-xr-x | python/examples/reservations/inventory | 94 | ||||
-rwxr-xr-x | python/examples/reservations/machine-agent | 103 | ||||
-rwxr-xr-x | python/examples/reservations/reserve | 197 |
10 files changed, 0 insertions, 960 deletions
diff --git a/python/examples/README.txt b/python/examples/README.txt deleted file mode 100644 index 4395160fec..0000000000 --- a/python/examples/README.txt +++ /dev/null @@ -1,42 +0,0 @@ -The Python Examples -=================== - -README.txt -- This file. - -api -- Directory containing drain, spout, - sever, hello, and hello_xml examples. - -api/drain -- A simple messaging client that prints - messages from the source specified on - the command line. - -api/spout -- A simple messaging client that sends - messages to the target specified on the - command line. - -api/server -- An example server that process incoming - messages and sends replies. - -api/hello -- An example client that sends a message - and then receives it. - -api/hello_xml -- An example client that sends a message - to the xml exchange and then receives - it. - - -reservations -- Directory containing an example machine - reservation system. - -reservations/common.py -- Utility code used by reserve, - machine-agent, and inventory scripts. - -reservations/reserve -- Messaging client for listing, reserving, - and releasing machines. - -reservations/machine-agent -- Messaging server that tracks and reports - on the status of its host machine and - listens for reservation requests. - -reservations/inventory -- Messaging server that tracks the last - known status of machines. diff --git a/python/examples/api/drain b/python/examples/api/drain deleted file mode 100755 index 5e30153bc2..0000000000 --- a/python/examples/api/drain +++ /dev/null @@ -1,97 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import optparse -from qpid.messaging import * -from qpid.util import URL -from qpid.log import enable, DEBUG, WARN - -parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS ...", - description="Drain messages from the supplied address.") -parser.add_option("-b", "--broker", default="localhost", - help="connect to specified BROKER (default %default)") -parser.add_option("-c", "--count", type="int", - help="number of messages to drain") -parser.add_option("-f", "--forever", action="store_true", - help="ignore timeout and wait forever") -parser.add_option("-r", "--reconnect", action="store_true", - help="enable auto reconnect") -parser.add_option("-i", "--reconnect-interval", type="float", default=3, - help="interval between reconnect attempts") -parser.add_option("-l", "--reconnect-limit", type="int", - help="maximum number of reconnect attempts") -parser.add_option("-t", "--timeout", type="float", default=0, - help="timeout in seconds to wait before exiting (default %default)") -parser.add_option("-p", "--print", dest="format", default="%(M)s", - help="format string for printing messages (default %default)") -parser.add_option("-v", dest="verbose", action="store_true", - help="enable logging") - -opts, args = parser.parse_args() - -if opts.verbose: - enable("qpid", DEBUG) -else: - enable("qpid", WARN) - -if args: - addr = args.pop(0) -else: - parser.error("address is required") -if opts.forever: - timeout = None -else: - timeout = opts.timeout - -class Formatter: - - def __init__(self, message): - self.message = message - self.environ = {"M": self.message, - "P": self.message.properties, - "C": self.message.content} - - def __getitem__(self, st): - return eval(st, self.environ) - -conn = Connection(opts.broker, - reconnect=opts.reconnect, - reconnect_interval=opts.reconnect_interval, - reconnect_limit=opts.reconnect_limit) -try: - conn.open() - ssn = conn.session() - rcv = ssn.receiver(addr) - - count = 0 - while not opts.count or count < opts.count: - try: - msg = rcv.fetch(timeout=timeout) - print opts.format % Formatter(msg) - count += 1 - ssn.acknowledge() - except Empty: - break -except ReceiverError, e: - print e -except KeyboardInterrupt: - pass - -conn.close() diff --git a/python/examples/api/hello b/python/examples/api/hello deleted file mode 100755 index ad314da19e..0000000000 --- a/python/examples/api/hello +++ /dev/null @@ -1,52 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import sys -from qpid.messaging import * - -if len(sys.argv)<2: - broker = "localhost:5672" -else: - broker = sys.argv[1] - -if len(sys.argv)<3: - address = "amq.topic" -else: - address = sys.argv[2] - -connection = Connection(broker) - -try: - connection.open() - session = connection.session() - - sender = session.sender(address) - receiver = session.receiver(address) - - sender.send(Message("Hello world!")); - - message = receiver.fetch() - print message.content - session.acknowledge() - -except MessagingError,m: - print m - -connection.close() diff --git a/python/examples/api/hello_xml b/python/examples/api/hello_xml deleted file mode 100755 index ab567ec5dd..0000000000 --- a/python/examples/api/hello_xml +++ /dev/null @@ -1,77 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import sys -from qpid.messaging import * - -broker = "localhost:5672" -connection = Connection(broker) - -try: - connection.open() - session = connection.session() - -# Set up the receiver - query = """ - let $w := ./weather - return $w/station = 'Raleigh-Durham International Airport (KRDU)' - and $w/temperature_f > 50 - and $w/temperature_f - $w/dewpoint > 5 - and $w/wind_speed_mph > 7 - and $w/wind_speed_mph < 20 """ - -# query="./weather" - - address = """ - xml; { - create: always, - node:{ type: queue }, - link: { - x-bindings: [{ exchange: xml, key: weather, arguments: { xquery: %r} }] - } - } - """ % query - - receiver = session.receiver(address) - -# Send an observation - - observations = """ - <weather> - <station>Raleigh-Durham International Airport (KRDU)</station> - <wind_speed_mph>16</wind_speed_mph> - <temperature_f>70</temperature_f> - <dewpoint>35</dewpoint> - </weather> """ - - message = Message(subject="weather", content=observations) - sender = session.sender("xml") - sender.send(message) - -# Retrieve matching message from the receiver and print it - - message = receiver.fetch(timeout=1) - print message.content - session.acknowledge() - -except MessagingError,m: - print m - -connection.close() diff --git a/python/examples/api/server b/python/examples/api/server deleted file mode 100755 index 3b9a3560da..0000000000 --- a/python/examples/api/server +++ /dev/null @@ -1,95 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import optparse, sys, traceback -from qpid.messaging import * -from qpid.util import URL -from subprocess import Popen, STDOUT, PIPE -from qpid.log import enable, DEBUG, WARN - -parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS ...", - description="handle requests from the supplied address.") -parser.add_option("-b", "--broker", default="localhost", - help="connect to specified BROKER (default %default)") -parser.add_option("-r", "--reconnect", action="store_true", - help="enable auto reconnect") -parser.add_option("-i", "--reconnect-interval", type="float", default=3, - help="interval between reconnect attempts") -parser.add_option("-l", "--reconnect-limit", type="int", - help="maximum number of reconnect attempts") -parser.add_option("-v", dest="verbose", action="store_true", - help="enable logging") - -opts, args = parser.parse_args() - -if opts.verbose: - enable("qpid", DEBUG) -else: - enable("qpid", WARN) - -if args: - addr = args.pop(0) -else: - parser.error("address is required") - -conn = Connection(opts.broker, - reconnect=opts.reconnect, - reconnect_interval=opts.reconnect_interval, - reconnect_limit=opts.reconnect_limit) -def dispatch(msg): - msg_type = msg.properties.get("type") - if msg_type == "shell": - proc = Popen(msg.content, shell=True, stderr=STDOUT, stdin=PIPE, stdout=PIPE) - output, _ = proc.communicate() - result = Message(output) - result.properties["exit"] = proc.returncode - elif msg_type == "eval": - try: - content = eval(msg.content) - except: - content = traceback.format_exc() - result = Message(content) - else: - result = Message("unrecognized message type: %s" % msg_type) - return result - -try: - conn.open() - ssn = conn.session() - rcv = ssn.receiver(addr) - - while True: - msg = rcv.fetch() - response = dispatch(msg) - snd = None - try: - snd = ssn.sender(msg.reply_to) - snd.send(response) - except SendError, e: - print e - if snd is not None: - snd.close() - ssn.acknowledge() -except ReceiveError, e: - print e -except KeyboardInterrupt: - pass - -conn.close() diff --git a/python/examples/api/spout b/python/examples/api/spout deleted file mode 100755 index c2dc4db380..0000000000 --- a/python/examples/api/spout +++ /dev/null @@ -1,123 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import optparse, time -from qpid.messaging import * -from qpid.util import URL -from qpid.log import enable, DEBUG, WARN - -def nameval(st): - idx = st.find("=") - if idx >= 0: - name = st[0:idx] - value = st[idx+1:] - else: - name = st - value = None - return name, value - -parser = optparse.OptionParser(usage="usage: %prog [options] ADDRESS [ CONTENT ... ]", - description="Send messages to the supplied address.") -parser.add_option("-b", "--broker", default="localhost", - help="connect to specified BROKER (default %default)") -parser.add_option("-r", "--reconnect", action="store_true", - help="enable auto reconnect") -parser.add_option("-i", "--reconnect-interval", type="float", default=3, - help="interval between reconnect attempts") -parser.add_option("-l", "--reconnect-limit", type="int", - help="maximum number of reconnect attempts") -parser.add_option("-c", "--count", type="int", default=1, - help="stop after count messages have been sent, zero disables (default %default)") -parser.add_option("-t", "--timeout", type="float", default=None, - help="exit after the specified time") -parser.add_option("-I", "--id", help="use the supplied id instead of generating one") -parser.add_option("-S", "--subject", help="specify a subject") -parser.add_option("-R", "--reply-to", help="specify reply-to address") -parser.add_option("-P", "--property", dest="properties", action="append", default=[], - metavar="NAME=VALUE", help="specify message property") -parser.add_option("-M", "--map", dest="entries", action="append", default=[], - metavar="KEY=VALUE", - help="specify map entry for message body") -parser.add_option("-v", dest="verbose", action="store_true", - help="enable logging") - -opts, args = parser.parse_args() - -if opts.verbose: - enable("qpid", DEBUG) -else: - enable("qpid", WARN) - -if opts.id is None: - spout_id = str(uuid4()) -else: - spout_id = opts.id -if args: - addr = args.pop(0) -else: - parser.error("address is required") - -content = None - -if args: - text = " ".join(args) -else: - text = None - -if opts.entries: - content = {} - if text: - content["text"] = text - for e in opts.entries: - name, val = nameval(e) - content[name] = val -else: - content = text - -conn = Connection(opts.broker, - reconnect=opts.reconnect, - reconnect_interval=opts.reconnect_interval, - reconnect_limit=opts.reconnect_limit) -try: - conn.open() - ssn = conn.session() - snd = ssn.sender(addr) - - count = 0 - start = time.time() - while (opts.count == 0 or count < opts.count) and \ - (opts.timeout is None or time.time() - start < opts.timeout): - msg = Message(subject=opts.subject, - reply_to=opts.reply_to, - content=content) - msg.properties["spout-id"] = "%s:%s" % (spout_id, count) - for p in opts.properties: - name, val = nameval(p) - msg.properties[name] = val - - snd.send(msg) - count += 1 - print msg -except SendError, e: - print e -except KeyboardInterrupt: - pass - -conn.close() diff --git a/python/examples/reservations/common.py b/python/examples/reservations/common.py deleted file mode 100644 index 12f07e1c92..0000000000 --- a/python/examples/reservations/common.py +++ /dev/null @@ -1,80 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import traceback -from fnmatch import fnmatch -from qpid.messaging import * - -class Dispatcher: - - def unhandled(self, msg): - print "UNHANDLED MESSAGE: %s" % msg - - def ignored(self, msg): - return False - - def dispatch(self, msg): - try: - if self.ignored(msg): - return () - else: - type = msg.properties.get("type") - replies = getattr(self, "do_%s" % type, self.unhandled)(msg) - if replies is None: - return () - else: - return replies - except: - traceback.print_exc() - return () - - def run(self, session): - while self.running(): - msg = session.next_receiver().fetch() - replies = self.dispatch(msg) - - count = len(replies) - sequence = 1 - for to, r in replies: - r.correlation_id = msg.correlation_id - r.properties["count"] = count - r.properties["sequence"] = sequence - sequence += 1 - try: - snd = session.sender(to) - snd.send(r) - except SendError, e: - print e - finally: - snd.close() - - session.acknowledge(msg) - -def get_status(msg): - return msg.content["identity"], msg.content["status"], msg.content["owner"] - -FREE = "free" -BUSY = "busy" - -def match(value, patterns): - for p in patterns: - if fnmatch(value, p): - return True - return False diff --git a/python/examples/reservations/inventory b/python/examples/reservations/inventory deleted file mode 100755 index 0a49643e5f..0000000000 --- a/python/examples/reservations/inventory +++ /dev/null @@ -1,94 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import optparse, traceback -from qpid.messaging import * -from qpid.log import enable, DEBUG, WARN -from common import * - -parser = optparse.OptionParser(usage="usage: %prog [options]", - description="machine inventory agent") -parser.add_option("-b", "--broker", default="localhost", - help="connect to specified BROKER (default %default)") -parser.add_option("-d", "--database", - help="database file for persistent machine status") -parser.add_option("-a", "--address", default="reservations", - help="address for reservation requests") -parser.add_option("-v", dest="verbose", action="store_true", - help="enable verbose logging") - -opts, args = parser.parse_args() - -if opts.verbose: - enable("qpid", DEBUG) -else: - enable("qpid", WARN) - -conn = Connection.establish(opts.broker, reconnect=True, reconnect_interval=1) - -class Inventory(Dispatcher): - - def __init__(self): - self.agents = {} - - def running(self): - return True - - def do_status(self, msg): - id, status, owner = get_status(msg) - self.agents[id] = (status, owner) - - def do_query(self, msg): - patterns = msg.content["identity"] - result = [] - for id, (status, owner) in self.agents.items(): - if match(id, patterns): - r = Message(properties = { - "type": "status" - }, - content = { - "identity": id, - "status": status, - "owner": owner - }) - result.append((msg.reply_to, r)) - continue - if not result: - result.append((msg.reply_to, - Message(properties = {"type": "empty"}))) - return result - - def ignored(self, msg): - type = msg.properties.get("type") - return type not in ("status", "query") - -try: - ssn = conn.session() - rcv = ssn.receiver(opts.address, capacity = 10) - snd = ssn.sender(opts.address) - snd.send(Message(reply_to = opts.address, - properties = {"type": "discover", "identity": ["*"]})) - - inv = Inventory() - inv.run(ssn) -except KeyboardInterrupt: - pass -finally: - conn.close() diff --git a/python/examples/reservations/machine-agent b/python/examples/reservations/machine-agent deleted file mode 100755 index a221a8b6de..0000000000 --- a/python/examples/reservations/machine-agent +++ /dev/null @@ -1,103 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import optparse, socket -from qpid.messaging import * -from qpid.log import enable, DEBUG, WARN -from common import * - -host = socket.gethostname() - -parser = optparse.OptionParser(usage="usage: %prog [options]", - description="machine reservation agent") -parser.add_option("-b", "--broker", default="localhost", - help="connect to specified BROKER (default %default)") -parser.add_option("-d", "--database", - help="database file for persistent machine status") -parser.add_option("-a", "--address", default="reservations", - help="address for reservation requests") -parser.add_option("-i", "--identity", default=host, - help="resource id (default %default)") -parser.add_option("-v", dest="verbose", action="store_true", - help="enable verbose logging") - -opts, args = parser.parse_args() - -if opts.verbose: - enable("qpid", DEBUG) -else: - enable("qpid", WARN) - -conn = Connection.establish(opts.broker, reconnect=True, reconnect_interval=1) - - -class Agent(Dispatcher): - - def __init__(self, identity): - self.identity = identity - self.status = FREE - self.owner = None - - def running(self): - return True - - def get_status(self): - msg = Message(properties = {"type": "status"}, - content = {"identity": self.identity, - "status": self.status, - "owner": self.owner}) - return msg - - def do_discover(self, msg): - r = self.get_status() - return [(msg.reply_to, r)] - - def do_reserve(self, msg): - if self.status == FREE: - self.owner = msg.content["owner"] - self.status = BUSY - return self.do_discover(msg) - - def do_release(self, msg): - if self.owner == msg.content["owner"]: - self.status = FREE - self.owner = None - return self.do_discover(msg) - - def ignored(self, msg): - patterns = msg.properties.get("identity") - type = msg.properties.get("type") - if patterns and match(self.identity, patterns): - return type == "status" - else: - return True - -try: - ssn = conn.session() - rcv = ssn.receiver(opts.address) - rcv.capacity = 10 - snd = ssn.sender(opts.address) - agent = Agent(opts.identity) - snd.send(agent.get_status()) - agent.run(ssn) -except KeyboardInterrupt: - pass -finally: - conn.close() diff --git a/python/examples/reservations/reserve b/python/examples/reservations/reserve deleted file mode 100755 index 68e7fee912..0000000000 --- a/python/examples/reservations/reserve +++ /dev/null @@ -1,197 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import optparse, os, sys, time -from uuid import uuid4 -from qpid.messaging import * -from qpid.log import enable, DEBUG, WARN -from common import * - -parser = optparse.OptionParser(usage="usage: %prog [options] PATTERN ...", - description="reserve a machine") -parser.add_option("-b", "--broker", default="localhost", - help="connect to specified BROKER (default %default)") -parser.add_option("-a", "--address", default="reservations", - help="address for reservation requests") -parser.add_option("-r", "--release", action="store_true", - help="release any machines matching the pattern") -parser.add_option("-s", "--status", action="store_true", - help="list machine status") -parser.add_option("-d", "--discover", action="store_true", - help="use discovery instead of inventory") -parser.add_option("-o", "--owner", default=os.environ["USER"], - help="the holder of the reservation") -parser.add_option("-n", "--number", type=int, default=1, - help="the number of machines to reserve") -parser.add_option("-t", "--timeout", type=float, default=10, - help="timeout in seconds to wait for resources") -parser.add_option("-v", dest="verbose", action="store_true", - help="enable verbose logging") - -opts, args = parser.parse_args() - -if opts.verbose: - enable("qpid", DEBUG) -else: - enable("qpid", WARN) - -if args: - patterns = args -else: - patterns = ["*"] - -conn = Connection.establish(opts.broker) - -if opts.release: - request_type = "release" - candidate_status = BUSY - candidate_owner = opts.owner -else: - request_type = "reserve" - candidate_status = FREE - candidate_owner = None - -class Requester(Dispatcher): - - def __init__(self): - self.agents = {} - self.requests = set() - self.outstanding = set() - - def agent_status(self, id): - status, owner = self.agents[id] - if owner: - return "%s %s(%s)" % (id, status, owner) - else: - return "%s %s" % (id, status) - - def correlation(self, cid): - self.requests.add(cid) - self.outstanding.add(cid) - - def ignored(self, msg): - return msg.properties.get("type") not in ("status", "empty") or \ - msg.correlation_id not in self.requests - - def do_status(self, msg): - id, status, owner = get_status(msg) - self.agents[id] = (status, owner) - - if opts.status: - print self.agent_status(id) - - def do_empty(self, msg): - print "no matching resources" - - def candidates(self, candidate_status, candidate_owner): - for id, (status, owner) in self.agents.items(): - if status == candidate_status and owner == candidate_owner: - yield id - - def dispatch(self, msg): - result = Dispatcher.dispatch(self, msg) - count = msg.properties.get("count") - sequence = msg.properties.get("sequence") - if count and sequence == count: - self.outstanding.discard(msg.correlation_id) - return result - -try: - ssn = conn.session() - rcv = ssn.receiver(opts.address, capacity=10) - snd = ssn.sender(opts.address) - - correlation_id = str(uuid4()) - - if opts.discover: - properties = {"type": "discover", "identity": patterns} - content = None - else: - properties = {"type": "query"} - content = {"identity": patterns} - - snd.send(Message(reply_to = opts.address, - correlation_id = correlation_id, - properties = properties, - content = content)) - - req = Requester() - req.correlation(correlation_id) - - start = time.time() - ellapsed = 0 - requested = set() - discovering = opts.discover - - while ellapsed <= opts.timeout and (discovering or req.outstanding): - try: - msg = rcv.fetch(opts.timeout - ellapsed) - ssn.acknowledge(msg) - except Empty: - continue - finally: - ellapsed = time.time() - start - - req.dispatch(msg) - if not opts.status: - if len(requested) < opts.number: - for cid in req.candidates(candidate_status, candidate_owner): - if cid in requested: continue - req_msg = Message(reply_to = opts.address, - correlation_id = str(uuid4()), - properties = {"type": request_type, - "identity": [cid]}, - content = {"owner": opts.owner}) - if not requested: - print "requesting %s:" % request_type, - print cid, - sys.stdout.flush() - req.correlation(req_msg.correlation_id) - snd.send(req_msg) - requested.add(cid) - else: - discovering = False - - if requested: - print - owners = {} - for id in requested: - st, ow = req.agents[id] - if not owners.has_key(ow): - owners[ow] = [] - owners[ow].append(id) - keys = list(owners.keys()) - keys.sort() - for k in keys: - owners[k].sort() - v = ", ".join(owners[k]) - if k is None: - print "free: %s" % v - else: - print "owner %s: %s" % (k, v) - elif req.agents and not opts.status: - print "no available resources" - - if req.outstanding: - print "request timed out" -except KeyboardInterrupt: - pass -finally: - conn.close() |