diff options
Diffstat (limited to 'python/examples/reservations')
-rw-r--r-- | python/examples/reservations/common.py | 80 | ||||
-rwxr-xr-x | python/examples/reservations/inventory | 94 | ||||
-rwxr-xr-x | python/examples/reservations/machine-agent | 103 | ||||
-rwxr-xr-x | python/examples/reservations/reserve | 197 |
4 files changed, 0 insertions, 474 deletions
diff --git a/python/examples/reservations/common.py b/python/examples/reservations/common.py deleted file mode 100644 index 12f07e1c92..0000000000 --- a/python/examples/reservations/common.py +++ /dev/null @@ -1,80 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import traceback -from fnmatch import fnmatch -from qpid.messaging import * - -class Dispatcher: - - def unhandled(self, msg): - print "UNHANDLED MESSAGE: %s" % msg - - def ignored(self, msg): - return False - - def dispatch(self, msg): - try: - if self.ignored(msg): - return () - else: - type = msg.properties.get("type") - replies = getattr(self, "do_%s" % type, self.unhandled)(msg) - if replies is None: - return () - else: - return replies - except: - traceback.print_exc() - return () - - def run(self, session): - while self.running(): - msg = session.next_receiver().fetch() - replies = self.dispatch(msg) - - count = len(replies) - sequence = 1 - for to, r in replies: - r.correlation_id = msg.correlation_id - r.properties["count"] = count - r.properties["sequence"] = sequence - sequence += 1 - try: - snd = session.sender(to) - snd.send(r) - except SendError, e: - print e - finally: - snd.close() - - session.acknowledge(msg) - -def get_status(msg): - return msg.content["identity"], msg.content["status"], msg.content["owner"] - -FREE = "free" -BUSY = "busy" - -def match(value, patterns): - for p in patterns: - if fnmatch(value, p): - return True - return False diff --git a/python/examples/reservations/inventory b/python/examples/reservations/inventory deleted file mode 100755 index 0a49643e5f..0000000000 --- a/python/examples/reservations/inventory +++ /dev/null @@ -1,94 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import optparse, traceback -from qpid.messaging import * -from qpid.log import enable, DEBUG, WARN -from common import * - -parser = optparse.OptionParser(usage="usage: %prog [options]", - description="machine inventory agent") -parser.add_option("-b", "--broker", default="localhost", - help="connect to specified BROKER (default %default)") -parser.add_option("-d", "--database", - help="database file for persistent machine status") -parser.add_option("-a", "--address", default="reservations", - help="address for reservation requests") -parser.add_option("-v", dest="verbose", action="store_true", - help="enable verbose logging") - -opts, args = parser.parse_args() - -if opts.verbose: - enable("qpid", DEBUG) -else: - enable("qpid", WARN) - -conn = Connection.establish(opts.broker, reconnect=True, reconnect_interval=1) - -class Inventory(Dispatcher): - - def __init__(self): - self.agents = {} - - def running(self): - return True - - def do_status(self, msg): - id, status, owner = get_status(msg) - self.agents[id] = (status, owner) - - def do_query(self, msg): - patterns = msg.content["identity"] - result = [] - for id, (status, owner) in self.agents.items(): - if match(id, patterns): - r = Message(properties = { - "type": "status" - }, - content = { - "identity": id, - "status": status, - "owner": owner - }) - result.append((msg.reply_to, r)) - continue - if not result: - result.append((msg.reply_to, - Message(properties = {"type": "empty"}))) - return result - - def ignored(self, msg): - type = msg.properties.get("type") - return type not in ("status", "query") - -try: - ssn = conn.session() - rcv = ssn.receiver(opts.address, capacity = 10) - snd = ssn.sender(opts.address) - snd.send(Message(reply_to = opts.address, - properties = {"type": "discover", "identity": ["*"]})) - - inv = Inventory() - inv.run(ssn) -except KeyboardInterrupt: - pass -finally: - conn.close() diff --git a/python/examples/reservations/machine-agent b/python/examples/reservations/machine-agent deleted file mode 100755 index a221a8b6de..0000000000 --- a/python/examples/reservations/machine-agent +++ /dev/null @@ -1,103 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import optparse, socket -from qpid.messaging import * -from qpid.log import enable, DEBUG, WARN -from common import * - -host = socket.gethostname() - -parser = optparse.OptionParser(usage="usage: %prog [options]", - description="machine reservation agent") -parser.add_option("-b", "--broker", default="localhost", - help="connect to specified BROKER (default %default)") -parser.add_option("-d", "--database", - help="database file for persistent machine status") -parser.add_option("-a", "--address", default="reservations", - help="address for reservation requests") -parser.add_option("-i", "--identity", default=host, - help="resource id (default %default)") -parser.add_option("-v", dest="verbose", action="store_true", - help="enable verbose logging") - -opts, args = parser.parse_args() - -if opts.verbose: - enable("qpid", DEBUG) -else: - enable("qpid", WARN) - -conn = Connection.establish(opts.broker, reconnect=True, reconnect_interval=1) - - -class Agent(Dispatcher): - - def __init__(self, identity): - self.identity = identity - self.status = FREE - self.owner = None - - def running(self): - return True - - def get_status(self): - msg = Message(properties = {"type": "status"}, - content = {"identity": self.identity, - "status": self.status, - "owner": self.owner}) - return msg - - def do_discover(self, msg): - r = self.get_status() - return [(msg.reply_to, r)] - - def do_reserve(self, msg): - if self.status == FREE: - self.owner = msg.content["owner"] - self.status = BUSY - return self.do_discover(msg) - - def do_release(self, msg): - if self.owner == msg.content["owner"]: - self.status = FREE - self.owner = None - return self.do_discover(msg) - - def ignored(self, msg): - patterns = msg.properties.get("identity") - type = msg.properties.get("type") - if patterns and match(self.identity, patterns): - return type == "status" - else: - return True - -try: - ssn = conn.session() - rcv = ssn.receiver(opts.address) - rcv.capacity = 10 - snd = ssn.sender(opts.address) - agent = Agent(opts.identity) - snd.send(agent.get_status()) - agent.run(ssn) -except KeyboardInterrupt: - pass -finally: - conn.close() diff --git a/python/examples/reservations/reserve b/python/examples/reservations/reserve deleted file mode 100755 index 68e7fee912..0000000000 --- a/python/examples/reservations/reserve +++ /dev/null @@ -1,197 +0,0 @@ -#!/usr/bin/env python -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -import optparse, os, sys, time -from uuid import uuid4 -from qpid.messaging import * -from qpid.log import enable, DEBUG, WARN -from common import * - -parser = optparse.OptionParser(usage="usage: %prog [options] PATTERN ...", - description="reserve a machine") -parser.add_option("-b", "--broker", default="localhost", - help="connect to specified BROKER (default %default)") -parser.add_option("-a", "--address", default="reservations", - help="address for reservation requests") -parser.add_option("-r", "--release", action="store_true", - help="release any machines matching the pattern") -parser.add_option("-s", "--status", action="store_true", - help="list machine status") -parser.add_option("-d", "--discover", action="store_true", - help="use discovery instead of inventory") -parser.add_option("-o", "--owner", default=os.environ["USER"], - help="the holder of the reservation") -parser.add_option("-n", "--number", type=int, default=1, - help="the number of machines to reserve") -parser.add_option("-t", "--timeout", type=float, default=10, - help="timeout in seconds to wait for resources") -parser.add_option("-v", dest="verbose", action="store_true", - help="enable verbose logging") - -opts, args = parser.parse_args() - -if opts.verbose: - enable("qpid", DEBUG) -else: - enable("qpid", WARN) - -if args: - patterns = args -else: - patterns = ["*"] - -conn = Connection.establish(opts.broker) - -if opts.release: - request_type = "release" - candidate_status = BUSY - candidate_owner = opts.owner -else: - request_type = "reserve" - candidate_status = FREE - candidate_owner = None - -class Requester(Dispatcher): - - def __init__(self): - self.agents = {} - self.requests = set() - self.outstanding = set() - - def agent_status(self, id): - status, owner = self.agents[id] - if owner: - return "%s %s(%s)" % (id, status, owner) - else: - return "%s %s" % (id, status) - - def correlation(self, cid): - self.requests.add(cid) - self.outstanding.add(cid) - - def ignored(self, msg): - return msg.properties.get("type") not in ("status", "empty") or \ - msg.correlation_id not in self.requests - - def do_status(self, msg): - id, status, owner = get_status(msg) - self.agents[id] = (status, owner) - - if opts.status: - print self.agent_status(id) - - def do_empty(self, msg): - print "no matching resources" - - def candidates(self, candidate_status, candidate_owner): - for id, (status, owner) in self.agents.items(): - if status == candidate_status and owner == candidate_owner: - yield id - - def dispatch(self, msg): - result = Dispatcher.dispatch(self, msg) - count = msg.properties.get("count") - sequence = msg.properties.get("sequence") - if count and sequence == count: - self.outstanding.discard(msg.correlation_id) - return result - -try: - ssn = conn.session() - rcv = ssn.receiver(opts.address, capacity=10) - snd = ssn.sender(opts.address) - - correlation_id = str(uuid4()) - - if opts.discover: - properties = {"type": "discover", "identity": patterns} - content = None - else: - properties = {"type": "query"} - content = {"identity": patterns} - - snd.send(Message(reply_to = opts.address, - correlation_id = correlation_id, - properties = properties, - content = content)) - - req = Requester() - req.correlation(correlation_id) - - start = time.time() - ellapsed = 0 - requested = set() - discovering = opts.discover - - while ellapsed <= opts.timeout and (discovering or req.outstanding): - try: - msg = rcv.fetch(opts.timeout - ellapsed) - ssn.acknowledge(msg) - except Empty: - continue - finally: - ellapsed = time.time() - start - - req.dispatch(msg) - if not opts.status: - if len(requested) < opts.number: - for cid in req.candidates(candidate_status, candidate_owner): - if cid in requested: continue - req_msg = Message(reply_to = opts.address, - correlation_id = str(uuid4()), - properties = {"type": request_type, - "identity": [cid]}, - content = {"owner": opts.owner}) - if not requested: - print "requesting %s:" % request_type, - print cid, - sys.stdout.flush() - req.correlation(req_msg.correlation_id) - snd.send(req_msg) - requested.add(cid) - else: - discovering = False - - if requested: - print - owners = {} - for id in requested: - st, ow = req.agents[id] - if not owners.has_key(ow): - owners[ow] = [] - owners[ow].append(id) - keys = list(owners.keys()) - keys.sort() - for k in keys: - owners[k].sort() - v = ", ".join(owners[k]) - if k is None: - print "free: %s" % v - else: - print "owner %s: %s" % (k, v) - elif req.agents and not opts.status: - print "no available resources" - - if req.outstanding: - print "request timed out" -except KeyboardInterrupt: - pass -finally: - conn.close() |