summaryrefslogtreecommitdiff
path: root/qpid/cpp/management/python/lib/qpidtoollibs/broker.py
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/management/python/lib/qpidtoollibs/broker.py')
-rw-r--r--qpid/cpp/management/python/lib/qpidtoollibs/broker.py486
1 files changed, 0 insertions, 486 deletions
diff --git a/qpid/cpp/management/python/lib/qpidtoollibs/broker.py b/qpid/cpp/management/python/lib/qpidtoollibs/broker.py
deleted file mode 100644
index fca6680067..0000000000
--- a/qpid/cpp/management/python/lib/qpidtoollibs/broker.py
+++ /dev/null
@@ -1,486 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import sys
-from qpidtoollibs.disp import TimeLong
-try:
- from uuid import uuid4
-except ImportError:
- from qpid.datatypes import uuid4
-
-class BrokerAgent(object):
- """
- Proxy for a manageable Qpid Broker - Invoke with an opened qpid.messaging.Connection
- or qpid_messaging.Connection
- """
- def __init__(self, conn):
- # Use the Message class from the same module as conn which could be qpid.messaging
- # or qpid_messaging
- self.message_class = sys.modules[conn.__class__.__module__].Message
- self.conn = conn
- self.sess = self.conn.session()
- self.reply_to = "qmf.default.topic/direct.%s;{node:{type:topic}}" % str(uuid4())
- self.reply_rx = self.sess.receiver(self.reply_to)
- self.reply_rx.capacity = 10
- self.tx = self.sess.sender("qmf.default.direct/broker")
- self.next_correlator = 1
-
- def close(self):
- """
- Close the proxy session. This will not affect the connection used in creating the object.
- """
- self.sess.close()
-
- def _method(self, method, arguments=None, addr="org.apache.qpid.broker:broker:amqp-broker", timeout=10):
- props = {'method' : 'request',
- 'qmf.opcode' : '_method_request',
- 'x-amqp-0-10.app-id' : 'qmf2'}
- correlator = str(self.next_correlator)
- self.next_correlator += 1
-
- content = {'_object_id' : {'_object_name' : addr},
- '_method_name' : method,
- '_arguments' : arguments or {}}
-
- message = self.message_class(
- content, reply_to=self.reply_to, correlation_id=correlator,
- properties=props, subject="broker")
- self.tx.send(message)
- response = self.reply_rx.fetch(timeout)
- self.sess.acknowledge()
- if response.properties['qmf.opcode'] == '_exception':
- raise Exception("Exception from Agent: %r" % response.content['_values'])
- if response.properties['qmf.opcode'] != '_method_response':
- raise Exception("bad response: %r" % response.properties)
- return response.content['_arguments']
-
- def _sendRequest(self, opcode, content):
- props = {'method' : 'request',
- 'qmf.opcode' : opcode,
- 'x-amqp-0-10.app-id' : 'qmf2'}
- correlator = str(self.next_correlator)
- self.next_correlator += 1
- message = self.message_class(
- content, reply_to=self.reply_to, correlation_id=correlator,
- properties=props, subject="broker")
- self.tx.send(message)
- return correlator
-
- def _doClassQuery(self, class_name):
- query = {'_what' : 'OBJECT',
- '_schema_id' : {'_class_name' : class_name}}
- correlator = self._sendRequest('_query_request', query)
- response = self.reply_rx.fetch(10)
- if response.properties['qmf.opcode'] != '_query_response':
- raise Exception("bad response")
- items = []
- done = False
- while not done:
- for item in response.content:
- items.append(item)
- if 'partial' in response.properties:
- response = self.reply_rx.fetch(10)
- else:
- done = True
- self.sess.acknowledge()
- return items
-
- def _doNameQuery(self, object_id):
- query = {'_what' : 'OBJECT', '_object_id' : {'_object_name' : object_id}}
- correlator = self._sendRequest('_query_request', query)
- response = self.reply_rx.fetch(10)
- if response.properties['qmf.opcode'] != '_query_response':
- raise Exception("bad response")
- items = []
- done = False
- while not done:
- for item in response.content:
- items.append(item)
- if 'partial' in response.properties:
- response = self.reply_rx.fetch(10)
- else:
- done = True
- self.sess.acknowledge()
- if len(items) == 1:
- return items[0]
- return None
-
- def _getAllBrokerObjects(self, cls):
- items = self._doClassQuery(cls.__name__.lower())
- objs = []
- for item in items:
- objs.append(cls(self, item))
- return objs
-
- def _getBrokerObject(self, cls, oid):
- obj = self._doNameQuery(oid)
- if obj:
- return cls(self, obj)
- return None
-
- def _getSingleObject(self, cls):
- #
- # getAllBrokerObjects is used instead of getBrokerObject(Broker, 'amqp-broker') because
- # of a bug that used to be in the broker whereby by-name queries did not return the
- # object timestamps.
- #
- objects = self._getAllBrokerObjects(cls)
- if objects: return objects[0]
- return None
-
- def getBroker(self):
- """
- Get the Broker object that contains broker-scope statistics and operations.
- """
- return self._getSingleObject(Broker)
-
-
- def getCluster(self):
- return self._getSingleObject(Cluster)
-
- def getHaBroker(self):
- return self._getSingleObject(HaBroker)
-
- def getAllConnections(self):
- return self._getAllBrokerObjects(Connection)
-
- def getConnection(self, oid):
- return self._getBrokerObject(Connection, "org.apache.qpid.broker:connection:%s" % oid)
-
- def getAllSessions(self):
- return self._getAllBrokerObjects(Session)
-
- def getSession(self, oid):
- return self._getBrokerObject(Session, "org.apache.qpid.broker:session:%s" % oid)
-
- def getAllSubscriptions(self):
- return self._getAllBrokerObjects(Subscription)
-
- def getSubscription(self, oid):
- return self._getBrokerObject(Subscription, "org.apache.qpid.broker:subscription:%s" % oid)
-
- def getAllExchanges(self):
- return self._getAllBrokerObjects(Exchange)
-
- def getExchange(self, name):
- return self._getBrokerObject(Exchange, "org.apache.qpid.broker:exchange:%s" % name)
-
- def getAllQueues(self):
- return self._getAllBrokerObjects(Queue)
-
- def getQueue(self, name):
- return self._getBrokerObject(Queue, "org.apache.qpid.broker:queue:%s" % name)
-
- def getAllBindings(self):
- return self._getAllBrokerObjects(Binding)
-
- def getAllLinks(self):
- return self._getAllBrokerObjects(Link)
-
- def getAcl(self):
- return self._getSingleObject(Acl)
-
- def getMemory(self):
- return self._getSingleObject(Memory)
-
- def echo(self, sequence = 1, body = "Body"):
- """Request a response to test the path to the management broker"""
- args = {'sequence' : sequence, 'body' : body}
- return self._method('echo', args)
-
- def connect(self, host, port, durable, authMechanism, username, password, transport):
- """Establish a connection to another broker"""
- pass
-
- def queueMoveMessages(self, srcQueue, destQueue, qty):
- """Move messages from one queue to another"""
- self._method("queueMoveMessages", {'srcQueue':srcQueue,'destQueue':destQueue,'qty':qty})
-
- def queueRedirect(self, sourceQueue, targetQueue):
- """Enable/disable delivery redirect for indicated queues"""
- self._method("queueRedirect", {'sourceQueue':sourceQueue,'targetQueue':targetQueue})
-
- def setLogLevel(self, level):
- """Set the log level"""
- self._method("setLogLevel", {'level':level})
-
- def getLogLevel(self):
- """Get the log level"""
- return self._method('getLogLevel')
-
- def setTimestampConfig(self, receive):
- """Set the message timestamping configuration"""
- self._method("setTimestampConfig", {'receive':receive})
-
- def getTimestampConfig(self):
- """Get the message timestamping configuration"""
- return self._method('getTimestampConfig')
-
- def setLogHiresTimestamp(self, logHires):
- """Set the high resolution timestamp in logs"""
- self._method("setLogHiresTimestamp", {'logHires':logHires})
-
- def getLogHiresTimestamp(self):
- """Get the high resolution timestamp in logs"""
- return self._method('getLogHiresTimestamp')
-
- def addExchange(self, exchange_type, name, options={}, **kwargs):
- properties = {}
- properties['exchange-type'] = exchange_type
- for k,v in options.items():
- properties[k] = v
- for k,v in kwargs.items():
- properties[k] = v
- args = {'type': 'exchange',
- 'name': name,
- 'properties': properties,
- 'strict': True}
- self._method('create', args)
-
- def delExchange(self, name):
- args = {'type': 'exchange', 'name': name}
- self._method('delete', args)
-
- def addQueue(self, name, options={}, **kwargs):
- properties = options
- for k,v in kwargs.items():
- properties[k] = v
- args = {'type': 'queue',
- 'name': name,
- 'properties': properties,
- 'strict': True}
- self._method('create', args)
-
- def delQueue(self, name, if_empty=True, if_unused=True):
- options = {'if_empty': if_empty,
- 'if_unused': if_unused}
-
- args = {'type': 'queue',
- 'name': name,
- 'options': options}
- self._method('delete', args)
-
- def bind(self, exchange, queue, key="", options={}, **kwargs):
- properties = options
- for k,v in kwargs.items():
- properties[k] = v
- args = {'type': 'binding',
- 'name': "%s/%s/%s" % (exchange, queue, key),
- 'properties': properties,
- 'strict': True}
- self._method('create', args)
-
- def unbind(self, exchange, queue, key, **kwargs):
- args = {'type': 'binding',
- 'name': "%s/%s/%s" % (exchange, queue, key),
- 'strict': True}
- self._method('delete', args)
-
- def reloadAclFile(self):
- self._method('reloadACLFile', {}, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker")
-
- def acl_lookup(self, userName, action, aclObj, aclObjName, propMap):
- args = {'userId': userName,
- 'action': action,
- 'object': aclObj,
- 'objectName': aclObjName,
- 'propertyMap': propMap}
- return self._method('Lookup', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker")
-
- def acl_lookupPublish(self, userName, exchange, key):
- args = {'userId': userName,
- 'exchangeName': exchange,
- 'routingKey': key}
- return self._method('LookupPublish', args, "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker")
-
- def Redirect(self, sourceQueue, targetQueue):
- args = {'sourceQueue': sourceQueue,
- 'targetQueue': targetQueue}
- return self._method('queueRedirect', args, "org.apache.qpid.broker:broker:amqp-broker")
-
- def create(self, _type, name, properties={}, strict=False):
- """Create an object of the specified type"""
- args = {'type': _type,
- 'name': name,
- 'properties': properties,
- 'strict': strict}
- return self._method('create', args)
-
- def delete(self, _type, name, options):
- """Delete an object of the specified type"""
- args = {'type': _type,
- 'name': name,
- 'options': options}
- return self._method('delete', args)
-
- def list(self, _type):
- """List objects of the specified type"""
- return [i["_values"] for i in self._doClassQuery(_type.lower())]
-
- def query(self, _type, oid):
- """Query the current state of an object"""
- return self._getBrokerObject(self, _type, oid)
-
-
-class EventHelper(object):
- def eventAddress(self, pkg='*', cls='*', sev='*'):
- return "qmf.default.topic/agent.ind.event.%s.%s.%s.#" % (pkg.replace('.', '_'), cls, sev)
-
- def event(self, msg):
- return BrokerEvent(msg)
-
-
-class BrokerEvent(object):
- def __init__(self, msg):
- self.msg = msg
- self.content = msg.content[0]
- self.values = self.content['_values']
- self.schema_id = self.content['_schema_id']
- self.name = "%s:%s" % (self.schema_id['_package_name'], self.schema_id['_class_name'])
-
- def __repr__(self):
- rep = "%s %s" % (TimeLong(self.getTimestamp()), self.name)
- for k,v in self.values.items():
- rep = rep + " %s=%s" % (k, v)
- return rep
-
- def __getattr__(self, key):
- if key not in self.values:
- return None
- value = self.values[key]
- return value
-
- def getAttributes(self):
- return self.values
-
- def getTimestamp(self):
- return self.content['_timestamp']
-
-
-class BrokerObject(object):
- def __init__(self, broker, content):
- self.broker = broker
- self.content = content
- self.values = content['_values']
-
- def __getattr__(self, key):
- if key not in self.values:
- return None
- value = self.values[key]
- if value.__class__ == dict and '_object_name' in value:
- full_name = value['_object_name']
- colon = full_name.find(':')
- if colon > 0:
- full_name = full_name[colon+1:]
- colon = full_name.find(':')
- if colon > 0:
- return full_name[colon+1:]
- return value
-
- def getObjectId(self):
- return self.content['_object_id']['_object_name']
-
- def getAttributes(self):
- return self.values
-
- def getCreateTime(self):
- return self.content['_create_ts']
-
- def getDeleteTime(self):
- return self.content['_delete_ts']
-
- def getUpdateTime(self):
- return self.content['_update_ts']
-
- def update(self):
- """
- Reload the property values from the agent.
- """
- refreshed = self.broker._getBrokerObject(self.__class__, self.getObjectId())
- if refreshed:
- self.content = refreshed.content
- self.values = self.content['_values']
- else:
- raise Exception("No longer exists on the broker")
-
-class Broker(BrokerObject):
- def __init__(self, broker, values):
- BrokerObject.__init__(self, broker, values)
-
-class Cluster(BrokerObject):
- def __init__(self, broker, values):
- BrokerObject.__init__(self, broker, values)
-
-class HaBroker(BrokerObject):
- def __init__(self, broker, values):
- BrokerObject.__init__(self, broker, values)
-
-class Memory(BrokerObject):
- def __init__(self, broker, values):
- BrokerObject.__init__(self, broker, values)
-
-class Connection(BrokerObject):
- def __init__(self, broker, values):
- BrokerObject.__init__(self, broker, values)
-
- def close(self):
- self.broker._method("close", {}, "org.apache.qpid.broker:connection:%s" % self.address)
-
-class Session(BrokerObject):
- def __init__(self, broker, values):
- BrokerObject.__init__(self, broker, values)
-
-class Subscription(BrokerObject):
- def __init__(self, broker, values):
- BrokerObject.__init__(self, broker, values)
-
- def __repr__(self):
- return "subscription name undefined"
-
-class Exchange(BrokerObject):
- def __init__(self, broker, values):
- BrokerObject.__init__(self, broker, values)
-
-class Binding(BrokerObject):
- def __init__(self, broker, values):
- BrokerObject.__init__(self, broker, values)
-
- def __repr__(self):
- return "Binding key: %s" % self.values['bindingKey']
-
-class Queue(BrokerObject):
- def __init__(self, broker, values):
- BrokerObject.__init__(self, broker, values)
-
- def purge(self, request):
- """Discard all or some messages on a queue"""
- self.broker._method("purge", {'request':request}, "org.apache.qpid.broker:queue:%s" % self.name)
-
- def reroute(self, request, useAltExchange, exchange, filter={}):
- """Remove all or some messages on this queue and route them to an exchange"""
- self.broker._method("reroute", {'request':request,'useAltExchange':useAltExchange,'exchange':exchange,'filter':filter},
- "org.apache.qpid.broker:queue:%s" % self.name)
-
-class Link(BrokerObject):
- def __init__(self, broker, values):
- BrokerObject.__init__(self, broker, values)
-
-class Acl(BrokerObject):
- def __init__(self, broker, values):
- BrokerObject.__init__(self, broker, values)