diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2011-05-27 15:44:23 +0000 |
commit | 66765100f4257159622cefe57bed50125a5ad017 (patch) | |
tree | a88ee23bb194eb91f0ebb2d9b23ff423e3ea8e37 /python/qpid/management.py | |
parent | 1aeaa7b16e5ce54f10c901d75c4d40f9f88b9db6 (diff) | |
parent | 88b98b2f4152ef59a671fad55a0d08338b6b78ca (diff) | |
download | qpid-python-rajith_jms_client.tar.gz |
Creating a branch for experimenting with some ideas for JMS client.rajith_jms_client
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/rajith_jms_client@1128369 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'python/qpid/management.py')
-rw-r--r-- | python/qpid/management.py | 922 |
1 files changed, 0 insertions, 922 deletions
diff --git a/python/qpid/management.py b/python/qpid/management.py deleted file mode 100644 index 3de8da9d49..0000000000 --- a/python/qpid/management.py +++ /dev/null @@ -1,922 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -############################################################################### -## This file is being obsoleted by qmf/console.py -############################################################################### - -""" -Management API for Qpid -""" - -import qpid -import struct -import socket -from threading import Thread -from datatypes import Message, RangedSet -from time import time -from cStringIO import StringIO -from codec010 import StringCodec as Codec -from threading import Lock, Condition - - -class SequenceManager: - """ Manage sequence numbers for asynchronous method calls """ - def __init__ (self): - self.lock = Lock () - self.sequence = 0 - self.pending = {} - - def reserve (self, data): - """ Reserve a unique sequence number """ - self.lock.acquire () - result = self.sequence - self.sequence = self.sequence + 1 - self.pending[result] = data - self.lock.release () - return result - - def release (self, seq): - """ Release a reserved sequence number """ - data = None - self.lock.acquire () - if seq in self.pending: - data = self.pending[seq] - del self.pending[seq] - self.lock.release () - return data - - -class mgmtObject (object): - """ Generic object that holds the contents of a management object with its - attributes set as object attributes. """ - - def __init__ (self, classKey, timestamps, row): - self.classKey = classKey - self.timestamps = timestamps - for cell in row: - setattr (self, cell[0], cell[1]) - -class objectId(object): - """ Object that represents QMF object identifiers """ - - def __init__(self, codec, first=0, second=0): - if codec: - self.first = codec.read_uint64() - self.second = codec.read_uint64() - else: - self.first = first - self.second = second - - def __cmp__(self, other): - if other == None: - return 1 - if self.first < other.first: - return -1 - if self.first > other.first: - return 1 - if self.second < other.second: - return -1 - if self.second > other.second: - return 1 - return 0 - - - def index(self): - return (self.first, self.second) - - def getFlags(self): - return (self.first & 0xF000000000000000) >> 60 - - def getSequence(self): - return (self.first & 0x0FFF000000000000) >> 48 - - def getBroker(self): - return (self.first & 0x0000FFFFF0000000) >> 28 - - def getBank(self): - return self.first & 0x000000000FFFFFFF - - def getObject(self): - return self.second - - def isDurable(self): - return self.getSequence() == 0 - - def encode(self, codec): - codec.write_uint64(self.first) - codec.write_uint64(self.second) - - -class methodResult: - """ Object that contains the result of a method call """ - - def __init__ (self, status, sText, args): - self.status = status - self.statusText = sText - for arg in args: - setattr (self, arg, args[arg]) - -class brokerInfo: - """ Object that contains information about a broker and the session to it """ - - def __init__ (self, brokerId, sessionId): - self.brokerId = brokerId - self.sessionId = sessionId - -class managementChannel: - """ This class represents a connection to an AMQP broker. """ - - def __init__ (self, ssn, topicCb, replyCb, exceptionCb, cbContext, _detlife=0): - """ Given a channel on an established AMQP broker connection, this method - opens a session and performs all of the declarations and bindings needed - to participate in the management protocol. """ - self.enabled = True - self.ssn = ssn - self.sessionId = ssn.name - self.topicName = "mgmt-%s" % self.sessionId - self.replyName = "repl-%s" % self.sessionId - self.qpidChannel = ssn - self.tcb = topicCb - self.rcb = replyCb - self.ecb = exceptionCb - self.context = cbContext - self.reqsOutstanding = 0 - self.brokerInfo = None - - ssn.auto_sync = False - ssn.queue_declare (queue=self.topicName, exclusive=True, auto_delete=True) - ssn.queue_declare (queue=self.replyName, exclusive=True, auto_delete=True) - - ssn.exchange_bind (exchange="amq.direct", - queue=self.replyName, binding_key=self.replyName) - ssn.message_subscribe (queue=self.topicName, destination="tdest", - accept_mode=ssn.accept_mode.none, - acquire_mode=ssn.acquire_mode.pre_acquired) - ssn.message_subscribe (queue=self.replyName, destination="rdest", - accept_mode=ssn.accept_mode.none, - acquire_mode=ssn.acquire_mode.pre_acquired) - - ssn.incoming ("tdest").listen (self.topicCb, self.exceptionCb) - ssn.incoming ("rdest").listen (self.replyCb) - - ssn.message_set_flow_mode (destination="tdest", flow_mode=1) - ssn.message_flow (destination="tdest", unit=0, value=0xFFFFFFFFL) - ssn.message_flow (destination="tdest", unit=1, value=0xFFFFFFFFL) - - ssn.message_set_flow_mode (destination="rdest", flow_mode=1) - ssn.message_flow (destination="rdest", unit=0, value=0xFFFFFFFFL) - ssn.message_flow (destination="rdest", unit=1, value=0xFFFFFFFFL) - - def setBrokerInfo (self, data): - self.brokerInfo = data - - def shutdown (self): - self.enabled = False - self.ssn.incoming("tdest").stop() - self.ssn.incoming("rdest").stop() - - def topicCb (self, msg): - """ Receive messages via the topic queue on this channel. """ - if self.enabled: - self.tcb (self, msg) - self.ssn.receiver._completed.add(msg.id) - self.ssn.channel.session_completed(self.ssn.receiver._completed) - - def replyCb (self, msg): - """ Receive messages via the reply queue on this channel. """ - if self.enabled: - self.rcb (self, msg) - self.ssn.receiver._completed.add(msg.id) - self.ssn.channel.session_completed(self.ssn.receiver._completed) - - def exceptionCb (self, data): - if self.ecb != None: - self.ecb (self, data) - - def send (self, exchange, msg): - if self.enabled: - self.qpidChannel.message_transfer (destination=exchange, message=msg) - - def message (self, body, routing_key="broker"): - dp = self.qpidChannel.delivery_properties() - dp.routing_key = routing_key - mp = self.qpidChannel.message_properties() - mp.content_type = "application/octet-stream" - mp.reply_to = self.qpidChannel.reply_to("amq.direct", self.replyName) - return Message(dp, mp, body) - - -class managementClient: - """ This class provides an API for access to management data on the AMQP - network. It implements the management protocol and manages the management - schemas as advertised by the various management agents in the network. """ - - CTRL_BROKER_INFO = 1 - CTRL_SCHEMA_LOADED = 2 - CTRL_USER = 3 - CTRL_HEARTBEAT = 4 - - SYNC_TIME = 10.0 - - #======================================================== - # User API - interacts with the class's user - #======================================================== - def __init__ (self, unused=None, ctrlCb=None, configCb=None, instCb=None, methodCb=None, closeCb=None): - self.ctrlCb = ctrlCb - self.configCb = configCb - self.instCb = instCb - self.methodCb = methodCb - self.closeCb = closeCb - self.schemaCb = None - self.eventCb = None - self.channels = [] - self.seqMgr = SequenceManager () - self.schema = {} - self.packages = {} - self.cv = Condition () - self.syncInFlight = False - self.syncSequence = 0 - self.syncResult = None - - def schemaListener (self, schemaCb): - """ Optionally register a callback to receive details of the schema of - managed objects in the network. """ - self.schemaCb = schemaCb - - def eventListener (self, eventCb): - """ Optionally register a callback to receive events from managed objects - in the network. """ - self.eventCb = eventCb - - def addChannel (self, channel, cbContext=None): - """ Register a new channel. """ - mch = managementChannel (channel, self.topicCb, self.replyCb, self.exceptCb, cbContext) - - self.channels.append (mch) - self.incOutstanding (mch) - codec = Codec () - self.setHeader (codec, ord ('B')) - msg = mch.message(codec.encoded) - mch.send ("qpid.management", msg) - return mch - - def removeChannel (self, mch): - """ Remove a previously added channel from management. """ - mch.shutdown () - self.channels.remove (mch) - - def callMethod (self, channel, userSequence, objId, className, methodName, args=None): - """ Invoke a method on a managed object. """ - self.method (channel, userSequence, objId, className, methodName, args) - - def getObjects (self, channel, userSequence, className, bank=0): - """ Request immediate content from broker """ - codec = Codec () - self.setHeader (codec, ord ('G'), userSequence) - ft = {} - ft["_class"] = className - codec.write_map (ft) - msg = channel.message(codec.encoded, routing_key="agent.1.%d" % bank) - channel.send ("qpid.management", msg) - - def syncWaitForStable (self, channel): - """ Synchronous (blocking) call to wait for schema stability on a channel """ - self.cv.acquire () - if channel.reqsOutstanding == 0: - self.cv.release () - return channel.brokerInfo - - self.syncInFlight = True - starttime = time () - while channel.reqsOutstanding != 0: - self.cv.wait (self.SYNC_TIME) - if time () - starttime > self.SYNC_TIME: - self.cv.release () - raise RuntimeError ("Timed out waiting for response on channel") - self.cv.release () - return channel.brokerInfo - - def syncCallMethod (self, channel, objId, className, methodName, args=None): - """ Synchronous (blocking) method call """ - self.cv.acquire () - self.syncInFlight = True - self.syncResult = None - self.syncSequence = self.seqMgr.reserve ("sync") - self.cv.release () - self.callMethod (channel, self.syncSequence, objId, className, methodName, args) - self.cv.acquire () - starttime = time () - while self.syncInFlight: - self.cv.wait (self.SYNC_TIME) - if time () - starttime > self.SYNC_TIME: - self.cv.release () - raise RuntimeError ("Timed out waiting for response on channel") - result = self.syncResult - self.cv.release () - return result - - def syncGetObjects (self, channel, className, bank=0): - """ Synchronous (blocking) get call """ - self.cv.acquire () - self.syncInFlight = True - self.syncResult = [] - self.syncSequence = self.seqMgr.reserve ("sync") - self.cv.release () - self.getObjects (channel, self.syncSequence, className, bank) - self.cv.acquire () - starttime = time () - while self.syncInFlight: - self.cv.wait (self.SYNC_TIME) - if time () - starttime > self.SYNC_TIME: - self.cv.release () - raise RuntimeError ("Timed out waiting for response on channel") - result = self.syncResult - self.cv.release () - return result - - #======================================================== - # Channel API - interacts with registered channel objects - #======================================================== - def topicCb (self, ch, msg): - """ Receive messages via the topic queue of a particular channel. """ - codec = Codec (msg.body) - while True: - hdr = self.checkHeader (codec) - if hdr == None: - return - - if hdr[0] == 'p': - self.handlePackageInd (ch, codec) - elif hdr[0] == 'q': - self.handleClassInd (ch, codec) - elif hdr[0] == 'h': - self.handleHeartbeat (ch, codec) - elif hdr[0] == 'e': - self.handleEvent (ch, codec) - else: - self.parse (ch, codec, hdr[0], hdr[1]) - - def replyCb (self, ch, msg): - """ Receive messages via the reply queue of a particular channel. """ - codec = Codec (msg.body) - while True: - hdr = self.checkHeader (codec) - if hdr == None: - return - - if hdr[0] == 'm': - self.handleMethodReply (ch, codec, hdr[1]) - elif hdr[0] == 'z': - self.handleCommandComplete (ch, codec, hdr[1]) - elif hdr[0] == 'b': - self.handleBrokerResponse (ch, codec) - elif hdr[0] == 'p': - self.handlePackageInd (ch, codec) - elif hdr[0] == 'q': - self.handleClassInd (ch, codec) - else: - self.parse (ch, codec, hdr[0], hdr[1]) - - def exceptCb (self, ch, data): - if self.closeCb != None: - self.closeCb (ch.context, data) - - #======================================================== - # Internal Functions - #======================================================== - def setHeader (self, codec, opcode, seq = 0): - """ Compose the header of a management message. """ - codec.write_uint8 (ord ('A')) - codec.write_uint8 (ord ('M')) - codec.write_uint8 (ord ('2')) - codec.write_uint8 (opcode) - codec.write_uint32 (seq) - - def checkHeader (self, codec): - """ Check the header of a management message and extract the opcode and class. """ - try: - octet = chr (codec.read_uint8 ()) - if octet != 'A': - return None - octet = chr (codec.read_uint8 ()) - if octet != 'M': - return None - octet = chr (codec.read_uint8 ()) - if octet != '2': - return None - opcode = chr (codec.read_uint8 ()) - seq = codec.read_uint32 () - return (opcode, seq) - except: - return None - - def encodeValue (self, codec, value, typecode): - """ Encode, into the codec, a value based on its typecode. """ - if typecode == 1: - codec.write_uint8 (int (value)) - elif typecode == 2: - codec.write_uint16 (int (value)) - elif typecode == 3: - codec.write_uint32 (long (value)) - elif typecode == 4: - codec.write_uint64 (long (value)) - elif typecode == 5: - codec.write_uint8 (int (value)) - elif typecode == 6: - codec.write_str8 (value) - elif typecode == 7: - codec.write_str16 (value) - elif typecode == 8: # ABSTIME - codec.write_uint64 (long (value)) - elif typecode == 9: # DELTATIME - codec.write_uint64 (long (value)) - elif typecode == 10: # REF - value.encode(codec) - elif typecode == 11: # BOOL - codec.write_uint8 (int (value)) - elif typecode == 12: # FLOAT - codec.write_float (float (value)) - elif typecode == 13: # DOUBLE - codec.write_double (float (value)) - elif typecode == 14: # UUID - codec.write_uuid (value) - elif typecode == 15: # FTABLE - codec.write_map (value) - elif typecode == 16: - codec.write_int8 (int(value)) - elif typecode == 17: - codec.write_int16 (int(value)) - elif typecode == 18: - codec.write_int32 (int(value)) - elif typecode == 19: - codec.write_int64 (int(value)) - else: - raise ValueError ("Invalid type code: %d" % typecode) - - def decodeValue (self, codec, typecode): - """ Decode, from the codec, a value based on its typecode. """ - if typecode == 1: - data = codec.read_uint8 () - elif typecode == 2: - data = codec.read_uint16 () - elif typecode == 3: - data = codec.read_uint32 () - elif typecode == 4: - data = codec.read_uint64 () - elif typecode == 5: - data = codec.read_uint8 () - elif typecode == 6: - data = codec.read_str8 () - elif typecode == 7: - data = codec.read_str16 () - elif typecode == 8: # ABSTIME - data = codec.read_uint64 () - elif typecode == 9: # DELTATIME - data = codec.read_uint64 () - elif typecode == 10: # REF - data = objectId(codec) - elif typecode == 11: # BOOL - data = codec.read_uint8 () - elif typecode == 12: # FLOAT - data = codec.read_float () - elif typecode == 13: # DOUBLE - data = codec.read_double () - elif typecode == 14: # UUID - data = codec.read_uuid () - elif typecode == 15: # FTABLE - data = codec.read_map () - elif typecode == 16: - data = codec.read_int8 () - elif typecode == 17: - data = codec.read_int16 () - elif typecode == 18: - data = codec.read_int32 () - elif typecode == 19: - data = codec.read_int64 () - else: - raise ValueError ("Invalid type code: %d" % typecode) - return data - - def incOutstanding (self, ch): - self.cv.acquire () - ch.reqsOutstanding = ch.reqsOutstanding + 1 - self.cv.release () - - def decOutstanding (self, ch): - self.cv.acquire () - ch.reqsOutstanding = ch.reqsOutstanding - 1 - if ch.reqsOutstanding == 0 and self.syncInFlight: - self.syncInFlight = False - self.cv.notify () - self.cv.release () - - if ch.reqsOutstanding == 0: - if self.ctrlCb != None: - self.ctrlCb (ch.context, self.CTRL_SCHEMA_LOADED, None) - ch.ssn.exchange_bind (exchange="qpid.management", - queue=ch.topicName, binding_key="console.#") - ch.ssn.exchange_bind (exchange="qpid.management", - queue=ch.topicName, binding_key="schema.#") - - - def handleMethodReply (self, ch, codec, sequence): - status = codec.read_uint32 () - sText = codec.read_str16 () - - data = self.seqMgr.release (sequence) - if data == None: - return - - (userSequence, classId, methodName) = data - args = {} - context = self.seqMgr.release (userSequence) - - if status == 0: - schemaClass = self.schema[classId] - ms = schemaClass['M'] - arglist = None - for mname in ms: - (mdesc, margs) = ms[mname] - if mname == methodName: - arglist = margs - if arglist == None: - return - - for arg in arglist: - if arg[2].find("O") != -1: - args[arg[0]] = self.decodeValue (codec, arg[1]) - - if context == "sync" and userSequence == self.syncSequence: - self.cv.acquire () - self.syncInFlight = False - self.syncResult = methodResult (status, sText, args) - self.cv.notify () - self.cv.release () - elif self.methodCb != None: - self.methodCb (ch.context, userSequence, status, sText, args) - - def handleCommandComplete (self, ch, codec, seq): - code = codec.read_uint32 () - text = codec.read_str8 () - data = (seq, code, text) - context = self.seqMgr.release (seq) - if context == "outstanding": - self.decOutstanding (ch) - elif context == "sync" and seq == self.syncSequence: - self.cv.acquire () - self.syncInFlight = False - self.cv.notify () - self.cv.release () - elif self.ctrlCb != None: - self.ctrlCb (ch.context, self.CTRL_USER, data) - - def handleBrokerResponse (self, ch, codec): - uuid = codec.read_uuid () - ch.brokerInfo = brokerInfo (uuid, ch.sessionId) - if self.ctrlCb != None: - self.ctrlCb (ch.context, self.CTRL_BROKER_INFO, ch.brokerInfo) - - # Send a package request - sendCodec = Codec () - seq = self.seqMgr.reserve ("outstanding") - self.setHeader (sendCodec, ord ('P'), seq) - smsg = ch.message(sendCodec.encoded) - ch.send ("qpid.management", smsg) - - def handlePackageInd (self, ch, codec): - pname = codec.read_str8 () - if pname not in self.packages: - self.packages[pname] = {} - - # Send a class request - sendCodec = Codec () - seq = self.seqMgr.reserve ("outstanding") - self.setHeader (sendCodec, ord ('Q'), seq) - self.incOutstanding (ch) - sendCodec.write_str8 (pname) - smsg = ch.message(sendCodec.encoded) - ch.send ("qpid.management", smsg) - - def handleClassInd (self, ch, codec): - kind = codec.read_uint8() - if kind != 1: # This API doesn't handle new-style events - return - pname = codec.read_str8() - cname = codec.read_str8() - hash = codec.read_bin128() - if pname not in self.packages: - return - - if (cname, hash) not in self.packages[pname]: - # Send a schema request - sendCodec = Codec () - seq = self.seqMgr.reserve ("outstanding") - self.setHeader (sendCodec, ord ('S'), seq) - self.incOutstanding (ch) - sendCodec.write_str8 (pname) - sendCodec.write_str8 (cname) - sendCodec.write_bin128 (hash) - smsg = ch.message(sendCodec.encoded) - ch.send ("qpid.management", smsg) - - def handleHeartbeat (self, ch, codec): - timestamp = codec.read_uint64() - if self.ctrlCb != None: - self.ctrlCb (ch.context, self.CTRL_HEARTBEAT, timestamp) - - def handleEvent (self, ch, codec): - if self.eventCb == None: - return - timestamp = codec.read_uint64() - objId = objectId(codec) - packageName = codec.read_str8() - className = codec.read_str8() - hash = codec.read_bin128() - name = codec.read_str8() - classKey = (packageName, className, hash) - if classKey not in self.schema: - return; - schemaClass = self.schema[classKey] - row = [] - es = schemaClass['E'] - arglist = None - for ename in es: - (edesc, eargs) = es[ename] - if ename == name: - arglist = eargs - if arglist == None: - return - for arg in arglist: - row.append((arg[0], self.decodeValue(codec, arg[1]))) - self.eventCb(ch.context, classKey, objId, name, row) - - def parseSchema (self, ch, codec): - """ Parse a received schema-description message. """ - self.decOutstanding (ch) - kind = codec.read_uint8() - if kind != 1: # This API doesn't handle new-style events - return - packageName = codec.read_str8 () - className = codec.read_str8 () - hash = codec.read_bin128 () - hasSupertype = 0 #codec.read_uint8() - configCount = codec.read_uint16 () - instCount = codec.read_uint16 () - methodCount = codec.read_uint16 () - if hasSupertype != 0: - supertypePackage = codec.read_str8() - supertypeClass = codec.read_str8() - supertypeHash = codec.read_bin128() - - if packageName not in self.packages: - return - if (className, hash) in self.packages[packageName]: - return - - classKey = (packageName, className, hash) - if classKey in self.schema: - return - - configs = [] - insts = [] - methods = {} - - configs.append (("id", 4, "", "", 1, 1, None, None, None, None, None)) - insts.append (("id", 4, None, None)) - - for idx in range (configCount): - ft = codec.read_map () - name = str (ft["name"]) - type = ft["type"] - access = ft["access"] - index = ft["index"] - optional = ft["optional"] - unit = None - min = None - max = None - maxlen = None - desc = None - - for key, value in ft.items (): - if key == "unit": - unit = str (value) - elif key == "min": - min = value - elif key == "max": - max = value - elif key == "maxlen": - maxlen = value - elif key == "desc": - desc = str (value) - - config = (name, type, unit, desc, access, index, min, max, maxlen, optional) - configs.append (config) - - for idx in range (instCount): - ft = codec.read_map () - name = str (ft["name"]) - type = ft["type"] - unit = None - desc = None - - for key, value in ft.items (): - if key == "unit": - unit = str (value) - elif key == "desc": - desc = str (value) - - inst = (name, type, unit, desc) - insts.append (inst) - - for idx in range (methodCount): - ft = codec.read_map () - mname = str (ft["name"]) - argCount = ft["argCount"] - if "desc" in ft: - mdesc = str (ft["desc"]) - else: - mdesc = None - - args = [] - for aidx in range (argCount): - ft = codec.read_map () - name = str (ft["name"]) - type = ft["type"] - dir = str (ft["dir"].upper ()) - unit = None - min = None - max = None - maxlen = None - desc = None - default = None - - for key, value in ft.items (): - if key == "unit": - unit = str (value) - elif key == "min": - min = value - elif key == "max": - max = value - elif key == "maxlen": - maxlen = value - elif key == "desc": - desc = str (value) - elif key == "default": - default = str (value) - - arg = (name, type, dir, unit, desc, min, max, maxlen, default) - args.append (arg) - methods[mname] = (mdesc, args) - - schemaClass = {} - schemaClass['C'] = configs - schemaClass['I'] = insts - schemaClass['M'] = methods - self.schema[classKey] = schemaClass - - if self.schemaCb != None: - self.schemaCb (ch.context, classKey, configs, insts, methods, {}) - - def parsePresenceMasks(self, codec, schemaClass): - """ Generate a list of not-present properties """ - excludeList = [] - bit = 0 - for element in schemaClass['C'][1:]: - if element[9] == 1: - if bit == 0: - mask = codec.read_uint8() - bit = 1 - if (mask & bit) == 0: - excludeList.append(element[0]) - bit = bit * 2 - if bit == 256: - bit = 0 - return excludeList - - def parseContent (self, ch, cls, codec, seq=0): - """ Parse a received content message. """ - if (cls == 'C' or (cls == 'B' and seq == 0)) and self.configCb == None: - return - if cls == 'I' and self.instCb == None: - return - - packageName = codec.read_str8 () - className = codec.read_str8 () - hash = codec.read_bin128 () - classKey = (packageName, className, hash) - - if classKey not in self.schema: - return - - row = [] - timestamps = [] - - timestamps.append (codec.read_uint64 ()) # Current Time - timestamps.append (codec.read_uint64 ()) # Create Time - timestamps.append (codec.read_uint64 ()) # Delete Time - objId = objectId(codec) - schemaClass = self.schema[classKey] - if cls == 'C' or cls == 'B': - notPresent = self.parsePresenceMasks(codec, schemaClass) - - if cls == 'C' or cls == 'B': - row.append(("id", objId)) - for element in schemaClass['C'][1:]: - tc = element[1] - name = element[0] - if name in notPresent: - row.append((name, None)) - else: - data = self.decodeValue(codec, tc) - row.append((name, data)) - - if cls == 'I' or cls == 'B': - if cls == 'I': - row.append(("id", objId)) - for element in schemaClass['I'][1:]: - tc = element[1] - name = element[0] - data = self.decodeValue (codec, tc) - row.append ((name, data)) - - if cls == 'C' or (cls == 'B' and seq != self.syncSequence): - self.configCb (ch.context, classKey, row, timestamps) - elif cls == 'B' and seq == self.syncSequence: - if timestamps[2] == 0: - obj = mgmtObject (classKey, timestamps, row) - self.syncResult.append (obj) - elif cls == 'I': - self.instCb (ch.context, classKey, row, timestamps) - - def parse (self, ch, codec, opcode, seq): - """ Parse a message received from the topic queue. """ - if opcode == 's': - self.parseSchema (ch, codec) - elif opcode == 'c': - self.parseContent (ch, 'C', codec) - elif opcode == 'i': - self.parseContent (ch, 'I', codec) - elif opcode == 'g': - self.parseContent (ch, 'B', codec, seq) - else: - raise ValueError ("Unknown opcode: %c" % opcode); - - def method (self, channel, userSequence, objId, classId, methodName, args): - """ Invoke a method on an object """ - codec = Codec () - sequence = self.seqMgr.reserve ((userSequence, classId, methodName)) - self.setHeader (codec, ord ('M'), sequence) - objId.encode(codec) - codec.write_str8 (classId[0]) - codec.write_str8 (classId[1]) - codec.write_bin128 (classId[2]) - codec.write_str8 (methodName) - bank = "%d.%d" % (objId.getBroker(), objId.getBank()) - - # Encode args according to schema - if classId not in self.schema: - self.seqMgr.release (sequence) - raise ValueError ("Unknown class name: %s" % classId) - - schemaClass = self.schema[classId] - ms = schemaClass['M'] - arglist = None - for mname in ms: - (mdesc, margs) = ms[mname] - if mname == methodName: - arglist = margs - if arglist == None: - self.seqMgr.release (sequence) - raise ValueError ("Unknown method name: %s" % methodName) - - for arg in arglist: - if arg[2].find("I") != -1: - value = arg[8] # default - if arg[0] in args: - value = args[arg[0]] - if value == None: - self.seqMgr.release (sequence) - raise ValueError ("Missing non-defaulted argument: %s" % arg[0]) - self.encodeValue (codec, value, arg[1]) - - packageName = classId[0] - className = classId[1] - msg = channel.message(codec.encoded, "agent." + bank) - channel.send ("qpid.management", msg) |