diff options
Diffstat (limited to 'cpp/src/qmf/engine/Agent.cpp')
-rw-r--r-- | cpp/src/qmf/engine/Agent.cpp | 915 |
1 files changed, 0 insertions, 915 deletions
diff --git a/cpp/src/qmf/engine/Agent.cpp b/cpp/src/qmf/engine/Agent.cpp deleted file mode 100644 index 1f08dded94..0000000000 --- a/cpp/src/qmf/engine/Agent.cpp +++ /dev/null @@ -1,915 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "qmf/engine/Agent.h" -#include "qmf/engine/MessageImpl.h" -#include "qmf/engine/SchemaImpl.h" -#include "qmf/engine/Typecode.h" -#include "qmf/engine/EventImpl.h" -#include "qmf/engine/ObjectImpl.h" -#include "qmf/engine/ObjectIdImpl.h" -#include "qmf/engine/QueryImpl.h" -#include "qmf/engine/ValueImpl.h" -#include "qmf/engine/Protocol.h" -#include <qpid/framing/Buffer.h> -#include <qpid/framing/Uuid.h> -#include <qpid/framing/FieldTable.h> -#include <qpid/framing/FieldValue.h> -#include <qpid/sys/Mutex.h> -#include <qpid/log/Statement.h> -#include <qpid/sys/Time.h> -#include <string.h> -#include <string> -#include <deque> -#include <map> -#include <iostream> -#include <fstream> -#include <boost/shared_ptr.hpp> -#include <boost/noncopyable.hpp> - -using namespace std; -using namespace qmf::engine; -using namespace qpid::framing; -using namespace qpid::sys; - -namespace qmf { -namespace engine { - - struct AgentEventImpl { - typedef boost::shared_ptr<AgentEventImpl> Ptr; - AgentEvent::EventKind kind; - uint32_t sequence; - string authUserId; - string authToken; - string name; - Object* object; - boost::shared_ptr<ObjectId> objectId; - boost::shared_ptr<Query> query; - boost::shared_ptr<Value> arguments; - string exchange; - string bindingKey; - const SchemaObjectClass* objectClass; - - AgentEventImpl(AgentEvent::EventKind k) : - kind(k), sequence(0), object(0), objectClass(0) {} - ~AgentEventImpl() {} - AgentEvent copy(); - }; - - struct AgentQueryContext { - typedef boost::shared_ptr<AgentQueryContext> Ptr; - uint32_t sequence; - string exchange; - string key; - const SchemaMethod* schemaMethod; - AgentQueryContext() : schemaMethod(0) {} - }; - - class AgentImpl : public boost::noncopyable { - public: - AgentImpl(char* label, bool internalStore); - ~AgentImpl(); - - void setStoreDir(const char* path); - void setTransferDir(const char* path); - void handleRcvMessage(Message& message); - bool getXmtMessage(Message& item) const; - void popXmt(); - bool getEvent(AgentEvent& event) const; - void popEvent(); - void newSession(); - void startProtocol(); - void heartbeat(); - void methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments); - void queryResponse(uint32_t sequence, Object& object, bool prop, bool stat); - void queryComplete(uint32_t sequence); - void registerClass(SchemaObjectClass* cls); - void registerClass(SchemaEventClass* cls); - const ObjectId* addObject(Object& obj, uint64_t persistId); - const ObjectId* allocObjectId(uint64_t persistId); - const ObjectId* allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi); - void raiseEvent(Event& event); - - private: - mutable Mutex lock; - Mutex addLock; - string label; - string queueName; - string storeDir; - string transferDir; - bool internalStore; - uint64_t nextTransientId; - Uuid systemId; - uint32_t requestedBrokerBank; - uint32_t requestedAgentBank; - uint32_t assignedBrokerBank; - uint32_t assignedAgentBank; - AgentAttachment attachment; - uint16_t bootSequence; - uint64_t nextObjectId; - uint32_t nextContextNum; - deque<AgentEventImpl::Ptr> eventQueue; - deque<MessageImpl::Ptr> xmtQueue; - map<uint32_t, AgentQueryContext::Ptr> contextMap; - bool attachComplete; - - static const char* QMF_EXCHANGE; - static const char* DIR_EXCHANGE; - static const char* BROKER_KEY; - static const uint32_t MERR_UNKNOWN_METHOD = 2; - static const uint32_t MERR_UNKNOWN_PACKAGE = 8; - static const uint32_t MERR_UNKNOWN_CLASS = 9; - static const uint32_t MERR_INTERNAL_ERROR = 10; -# define MA_BUFFER_SIZE 65536 - char outputBuffer[MA_BUFFER_SIZE]; - - struct AgentClassKey { - string name; - uint8_t hash[16]; - AgentClassKey(const string& n, const uint8_t* h) : name(n) { - memcpy(hash, h, 16); - } - AgentClassKey(Buffer& buffer) { - buffer.getShortString(name); - buffer.getBin128(hash); - } - string repr() { - return name; - } - }; - - struct AgentClassKeyComp { - bool operator() (const AgentClassKey& lhs, const AgentClassKey& rhs) const - { - if (lhs.name != rhs.name) - return lhs.name < rhs.name; - else - for (int i = 0; i < 16; i++) - if (lhs.hash[i] != rhs.hash[i]) - return lhs.hash[i] < rhs.hash[i]; - return false; - } - }; - - typedef map<AgentClassKey, SchemaObjectClass*, AgentClassKeyComp> ObjectClassMap; - typedef map<AgentClassKey, SchemaEventClass*, AgentClassKeyComp> EventClassMap; - - struct ClassMaps { - ObjectClassMap objectClasses; - EventClassMap eventClasses; - }; - - map<string, ClassMaps> packages; - - AgentEventImpl::Ptr eventDeclareQueue(const string& queueName); - AgentEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key); - AgentEventImpl::Ptr eventSetupComplete(); - AgentEventImpl::Ptr eventQuery(uint32_t num, const string& userId, const string& package, const string& cls, - boost::shared_ptr<ObjectId> oid); - AgentEventImpl::Ptr eventMethod(uint32_t num, const string& userId, const string& method, - boost::shared_ptr<ObjectId> oid, boost::shared_ptr<Value> argMap, - const SchemaObjectClass* objectClass); - void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey); - - void sendPackageIndicationLH(const string& packageName); - void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key); - void sendCommandCompleteLH(const string& exchange, const string& key, uint32_t seq, - uint32_t code = 0, const string& text = "OK"); - void sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text=""); - void handleAttachResponse(Buffer& inBuffer); - void handlePackageRequest(Buffer& inBuffer); - void handleClassQuery(Buffer& inBuffer); - void handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, - const string& replyToExchange, const string& replyToKey); - void handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId); - void handleMethodRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId); - void handleConsoleAddedIndication(); - }; -} -} - -const char* AgentImpl::QMF_EXCHANGE = "qpid.management"; -const char* AgentImpl::DIR_EXCHANGE = "amq.direct"; -const char* AgentImpl::BROKER_KEY = "broker"; - -#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());} - -AgentEvent AgentEventImpl::copy() -{ - AgentEvent item; - - ::memset(&item, 0, sizeof(AgentEvent)); - item.kind = kind; - item.sequence = sequence; - item.object = object; - item.objectId = objectId.get(); - item.query = query.get(); - item.arguments = arguments.get(); - item.objectClass = objectClass; - - STRING_REF(authUserId); - STRING_REF(authToken); - STRING_REF(name); - STRING_REF(exchange); - STRING_REF(bindingKey); - - return item; -} - -AgentImpl::AgentImpl(char* _label, bool i) : - label(_label), queueName("qmfa-"), internalStore(i), nextTransientId(1), - requestedBrokerBank(0), requestedAgentBank(0), - assignedBrokerBank(0), assignedAgentBank(0), - bootSequence(1), nextObjectId(1), nextContextNum(1), attachComplete(false) -{ - queueName += Uuid(true).str(); -} - -AgentImpl::~AgentImpl() -{ -} - -void AgentImpl::setStoreDir(const char* path) -{ - Mutex::ScopedLock _lock(lock); - if (path) - storeDir = path; - else - storeDir.clear(); -} - -void AgentImpl::setTransferDir(const char* path) -{ - Mutex::ScopedLock _lock(lock); - if (path) - transferDir = path; - else - transferDir.clear(); -} - -void AgentImpl::handleRcvMessage(Message& message) -{ - Buffer inBuffer(message.body, message.length); - uint8_t opcode; - uint32_t sequence; - string replyToExchange(message.replyExchange ? message.replyExchange : ""); - string replyToKey(message.replyKey ? message.replyKey : ""); - string userId(message.userId ? message.userId : ""); - - while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) { - if (opcode == Protocol::OP_ATTACH_RESPONSE) handleAttachResponse(inBuffer); - else if (opcode == Protocol::OP_SCHEMA_REQUEST) handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey); - else if (opcode == Protocol::OP_CONSOLE_ADDED_INDICATION) handleConsoleAddedIndication(); - else if (opcode == Protocol::OP_GET_QUERY) handleGetQuery(inBuffer, sequence, replyToKey, userId); - else if (opcode == Protocol::OP_METHOD_REQUEST) handleMethodRequest(inBuffer, sequence, replyToKey, userId); - else { - QPID_LOG(error, "AgentImpl::handleRcvMessage invalid opcode=" << opcode); - break; - } - } -} - -bool AgentImpl::getXmtMessage(Message& item) const -{ - Mutex::ScopedLock _lock(lock); - if (xmtQueue.empty()) - return false; - item = xmtQueue.front()->copy(); - return true; -} - -void AgentImpl::popXmt() -{ - Mutex::ScopedLock _lock(lock); - if (!xmtQueue.empty()) - xmtQueue.pop_front(); -} - -bool AgentImpl::getEvent(AgentEvent& event) const -{ - Mutex::ScopedLock _lock(lock); - if (eventQueue.empty()) - return false; - event = eventQueue.front()->copy(); - return true; -} - -void AgentImpl::popEvent() -{ - Mutex::ScopedLock _lock(lock); - if (!eventQueue.empty()) - eventQueue.pop_front(); -} - -void AgentImpl::newSession() -{ - Mutex::ScopedLock _lock(lock); - eventQueue.clear(); - xmtQueue.clear(); - eventQueue.push_back(eventDeclareQueue(queueName)); - eventQueue.push_back(eventBind("amq.direct", queueName, queueName)); - eventQueue.push_back(eventSetupComplete()); -} - -void AgentImpl::startProtocol() -{ - Mutex::ScopedLock _lock(lock); - char rawbuffer[512]; - Buffer buffer(rawbuffer, 512); - - Protocol::encodeHeader(buffer, Protocol::OP_ATTACH_REQUEST); - buffer.putShortString(label); - systemId.encode(buffer); - buffer.putLong(requestedBrokerBank); - buffer.putLong(requestedAgentBank); - sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); - QPID_LOG(trace, "SENT AttachRequest: reqBroker=" << requestedBrokerBank << - " reqAgent=" << requestedAgentBank); -} - -void AgentImpl::heartbeat() -{ - Mutex::ScopedLock _lock(lock); - Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - - Protocol::encodeHeader(buffer, Protocol::OP_HEARTBEAT_INDICATION); - buffer.putLongLong(uint64_t(Duration(EPOCH, now()))); - stringstream key; - key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank; - sendBufferLH(buffer, QMF_EXCHANGE, key.str()); - QPID_LOG(trace, "SENT HeartbeatIndication"); -} - -void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& argMap) -{ - Mutex::ScopedLock _lock(lock); - map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence); - if (iter == contextMap.end()) - return; - AgentQueryContext::Ptr context = iter->second; - contextMap.erase(iter); - - char* buf(outputBuffer); - uint32_t bufLen(114 + strlen(text)); // header(8) + status(4) + mstring(2 + size) + margin(100) - bool allocated(false); - - if (status == 0) { - for (vector<const SchemaArgument*>::const_iterator aIter = context->schemaMethod->impl->arguments.begin(); - aIter != context->schemaMethod->impl->arguments.end(); aIter++) { - const SchemaArgument* schemaArg = *aIter; - if (schemaArg->getDirection() == DIR_OUT || schemaArg->getDirection() == DIR_IN_OUT) { - if (argMap.keyInMap(schemaArg->getName())) { - const Value* val = argMap.byKey(schemaArg->getName()); - bufLen += val->impl->encodedSize(); - } else { - Value val(schemaArg->getType()); - bufLen += val.impl->encodedSize(); - } - } - } - } - - if (bufLen > MA_BUFFER_SIZE) { - buf = (char*) malloc(bufLen); - allocated = true; - } - - Buffer buffer(buf, bufLen); - Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, context->sequence); - buffer.putLong(status); - buffer.putMediumString(text); - if (status == 0) { - for (vector<const SchemaArgument*>::const_iterator aIter = context->schemaMethod->impl->arguments.begin(); - aIter != context->schemaMethod->impl->arguments.end(); aIter++) { - const SchemaArgument* schemaArg = *aIter; - if (schemaArg->getDirection() == DIR_OUT || schemaArg->getDirection() == DIR_IN_OUT) { - if (argMap.keyInMap(schemaArg->getName())) { - const Value* val = argMap.byKey(schemaArg->getName()); - val->impl->encode(buffer); - } else { - Value val(schemaArg->getType()); - val.impl->encode(buffer); - } - } - } - } - sendBufferLH(buffer, context->exchange, context->key); - if (allocated) - free(buf); - QPID_LOG(trace, "SENT MethodResponse seq=" << context->sequence << " status=" << status << " text=" << text); -} - -void AgentImpl::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) -{ - Mutex::ScopedLock _lock(lock); - map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence); - if (iter == contextMap.end()) - return; - AgentQueryContext::Ptr context = iter->second; - - Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - Protocol::encodeHeader(buffer, Protocol::OP_OBJECT_INDICATION, context->sequence); - - object.impl->encodeSchemaKey(buffer); - object.impl->encodeManagedObjectData(buffer); - if (prop) - object.impl->encodeProperties(buffer); - if (stat) - object.impl->encodeStatistics(buffer); - - sendBufferLH(buffer, context->exchange, context->key); - QPID_LOG(trace, "SENT ContentIndication seq=" << context->sequence); -} - -void AgentImpl::queryComplete(uint32_t sequence) -{ - Mutex::ScopedLock _lock(lock); - map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence); - if (iter == contextMap.end()) - return; - - AgentQueryContext::Ptr context = iter->second; - contextMap.erase(iter); - sendCommandCompleteLH(context->exchange, context->key, context->sequence, 0, "OK"); -} - -void AgentImpl::registerClass(SchemaObjectClass* cls) -{ - Mutex::ScopedLock _lock(lock); - bool newPackage = false; - - map<string, ClassMaps>::iterator iter = packages.find(cls->getClassKey()->getPackageName()); - if (iter == packages.end()) { - packages[cls->getClassKey()->getPackageName()] = ClassMaps(); - iter = packages.find(cls->getClassKey()->getPackageName()); - newPackage = true; - } - - AgentClassKey key(cls->getClassKey()->getClassName(), cls->getClassKey()->getHash()); - iter->second.objectClasses[key] = cls; - - // Indicate this new schema if connected. - - if (attachComplete) { - - if (newPackage) { - sendPackageIndicationLH(iter->first); - } - sendClassIndicationLH(CLASS_OBJECT, iter->first, key); - } -} - -void AgentImpl::registerClass(SchemaEventClass* cls) -{ - Mutex::ScopedLock _lock(lock); - bool newPackage = false; - - map<string, ClassMaps>::iterator iter = packages.find(cls->getClassKey()->getPackageName()); - if (iter == packages.end()) { - packages[cls->getClassKey()->getPackageName()] = ClassMaps(); - iter = packages.find(cls->getClassKey()->getPackageName()); - newPackage = true; - } - - AgentClassKey key(cls->getClassKey()->getClassName(), cls->getClassKey()->getHash()); - iter->second.eventClasses[key] = cls; - - // Indicate this new schema if connected. - - if (attachComplete) { - - if (newPackage) { - sendPackageIndicationLH(iter->first); - } - sendClassIndicationLH(CLASS_EVENT, iter->first, key); - } -} - -const ObjectId* AgentImpl::addObject(Object&, uint64_t) -{ - Mutex::ScopedLock _lock(lock); - return 0; -} - -const ObjectId* AgentImpl::allocObjectId(uint64_t persistId) -{ - Mutex::ScopedLock _lock(lock); - uint16_t sequence = persistId ? 0 : bootSequence; - uint64_t objectNum = persistId ? persistId : nextObjectId++; - - ObjectId* oid = ObjectIdImpl::factory(&attachment, 0, sequence, objectNum); - return oid; -} - -const ObjectId* AgentImpl::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) -{ - return allocObjectId(((uint64_t) persistIdHi) << 32 | (uint64_t) persistIdLo); -} - -void AgentImpl::raiseEvent(Event& event) -{ - Mutex::ScopedLock _lock(lock); - Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - Protocol::encodeHeader(buffer, Protocol::OP_EVENT_INDICATION); - - event.impl->encodeSchemaKey(buffer); - buffer.putLongLong(uint64_t(Duration(EPOCH, now()))); - event.impl->encode(buffer); - string key(event.impl->getRoutingKey(assignedBrokerBank, assignedAgentBank)); - - sendBufferLH(buffer, QMF_EXCHANGE, key); - QPID_LOG(trace, "SENT EventIndication"); -} - -AgentEventImpl::Ptr AgentImpl::eventDeclareQueue(const string& name) -{ - AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::DECLARE_QUEUE)); - event->name = name; - - return event; -} - -AgentEventImpl::Ptr AgentImpl::eventBind(const string& exchange, const string& queue, - const string& key) -{ - AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::BIND)); - event->name = queue; - event->exchange = exchange; - event->bindingKey = key; - - return event; -} - -AgentEventImpl::Ptr AgentImpl::eventSetupComplete() -{ - AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::SETUP_COMPLETE)); - return event; -} - -AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string& package, - const string& cls, boost::shared_ptr<ObjectId> oid) -{ - AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY)); - event->sequence = num; - event->authUserId = userId; - if (oid.get()) - event->query.reset(new Query(oid.get())); - else - event->query.reset(new Query(cls.c_str(), package.c_str())); - return event; -} - -AgentEventImpl::Ptr AgentImpl::eventMethod(uint32_t num, const string& userId, const string& method, - boost::shared_ptr<ObjectId> oid, boost::shared_ptr<Value> argMap, - const SchemaObjectClass* objectClass) -{ - AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::METHOD_CALL)); - event->sequence = num; - event->authUserId = userId; - event->name = method; - event->objectId = oid; - event->arguments = argMap; - event->objectClass = objectClass; - return event; -} - -void AgentImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey) -{ - uint32_t length = buf.getPosition(); - MessageImpl::Ptr message(new MessageImpl); - - buf.reset(); - buf.getRawData(message->body, length); - message->destination = destination; - message->routingKey = routingKey; - message->replyExchange = "amq.direct"; - message->replyKey = queueName; - - xmtQueue.push_back(message); -} - -void AgentImpl::sendPackageIndicationLH(const string& packageName) -{ - Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - Protocol::encodeHeader(buffer, Protocol::OP_PACKAGE_INDICATION); - buffer.putShortString(packageName); - sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); - QPID_LOG(trace, "SENT PackageIndication: package_name=" << packageName); -} - -void AgentImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key) -{ - Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - Protocol::encodeHeader(buffer, Protocol::OP_CLASS_INDICATION); - buffer.putOctet((int) kind); - buffer.putShortString(packageName); - buffer.putShortString(key.name); - buffer.putBin128(const_cast<uint8_t*>(key.hash)); // const_cast needed for older Qpid libraries - sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); - QPID_LOG(trace, "SENT ClassIndication: package_name=" << packageName << " class_name=" << key.name); -} - -void AgentImpl::sendCommandCompleteLH(const string& exchange, const string& replyToKey, - uint32_t sequence, uint32_t code, const string& text) -{ - Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - Protocol::encodeHeader(buffer, Protocol::OP_COMMAND_COMPLETE, sequence); - buffer.putLong(code); - buffer.putShortString(text); - sendBufferLH(buffer, exchange, replyToKey); - QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text); -} - -void AgentImpl::sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text) -{ - Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, sequence); - buffer.putLong(code); - - string fulltext; - switch (code) { - case MERR_UNKNOWN_PACKAGE: fulltext = "Unknown Package"; break; - case MERR_UNKNOWN_CLASS: fulltext = "Unknown Class"; break; - case MERR_UNKNOWN_METHOD: fulltext = "Unknown Method"; break; - case MERR_INTERNAL_ERROR: fulltext = "Internal Error"; break; - default: fulltext = "Unspecified Error"; break; - } - - if (!text.empty()) { - fulltext += " ("; - fulltext += text; - fulltext += ")"; - } - - buffer.putMediumString(fulltext); - sendBufferLH(buffer, DIR_EXCHANGE, key); - QPID_LOG(trace, "SENT MethodResponse: errorCode=" << code << " text=" << fulltext); -} - -void AgentImpl::handleAttachResponse(Buffer& inBuffer) -{ - Mutex::ScopedLock _lock(lock); - - assignedBrokerBank = inBuffer.getLong(); - assignedAgentBank = inBuffer.getLong(); - - QPID_LOG(trace, "RCVD AttachResponse: broker=" << assignedBrokerBank << " agent=" << assignedAgentBank); - - if ((assignedBrokerBank != requestedBrokerBank) || - (assignedAgentBank != requestedAgentBank)) { - if (requestedAgentBank == 0) { - QPID_LOG(notice, "Initial object-id bank assigned: " << assignedBrokerBank << "." << - assignedAgentBank); - } else { - QPID_LOG(warning, "Collision in object-id! New bank assigned: " << assignedBrokerBank << - "." << assignedAgentBank); - } - //storeData(); // TODO - requestedBrokerBank = assignedBrokerBank; - requestedAgentBank = assignedAgentBank; - } - - attachment.setBanks(assignedBrokerBank, assignedAgentBank); - - // Bind to qpid.management to receive commands - stringstream key; - key << "agent." << assignedBrokerBank << "." << assignedAgentBank; - eventQueue.push_back(eventBind(QMF_EXCHANGE, queueName, key.str())); - - // Send package indications for all local packages - for (map<string, ClassMaps>::iterator pIter = packages.begin(); - pIter != packages.end(); - pIter++) { - sendPackageIndicationLH(pIter->first); - - // Send class indications for all local classes - ClassMaps cMap = pIter->second; - for (ObjectClassMap::iterator cIter = cMap.objectClasses.begin(); - cIter != cMap.objectClasses.end(); cIter++) - sendClassIndicationLH(CLASS_OBJECT, pIter->first, cIter->first); - for (EventClassMap::iterator cIter = cMap.eventClasses.begin(); - cIter != cMap.eventClasses.end(); cIter++) - sendClassIndicationLH(CLASS_EVENT, pIter->first, cIter->first); - } - - attachComplete = true; -} - -void AgentImpl::handlePackageRequest(Buffer&) -{ - Mutex::ScopedLock _lock(lock); -} - -void AgentImpl::handleClassQuery(Buffer&) -{ - Mutex::ScopedLock _lock(lock); -} - -void AgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, - const string& replyExchange, const string& replyKey) -{ - Mutex::ScopedLock _lock(lock); - string rExchange(replyExchange); - string rKey(replyKey); - string packageName; - inBuffer.getShortString(packageName); - AgentClassKey key(inBuffer); - - if (rExchange.empty()) - rExchange = QMF_EXCHANGE; - if (rKey.empty()) - rKey = BROKER_KEY; - - QPID_LOG(trace, "RCVD SchemaRequest: package=" << packageName << " class=" << key.name); - - map<string, ClassMaps>::iterator pIter = packages.find(packageName); - if (pIter == packages.end()) { - sendCommandCompleteLH(rExchange, rKey, sequence, 1, "package not found"); - return; - } - - ClassMaps cMap = pIter->second; - ObjectClassMap::iterator ocIter = cMap.objectClasses.find(key); - if (ocIter != cMap.objectClasses.end()) { - SchemaObjectClass* oImpl = ocIter->second; - Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence); - oImpl->impl->encode(buffer); - sendBufferLH(buffer, rExchange, rKey); - QPID_LOG(trace, "SENT SchemaResponse: (object) package=" << packageName << " class=" << key.name); - return; - } - - EventClassMap::iterator ecIter = cMap.eventClasses.find(key); - if (ecIter != cMap.eventClasses.end()) { - SchemaEventClass* eImpl = ecIter->second; - Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence); - eImpl->impl->encode(buffer); - sendBufferLH(buffer, rExchange, rKey); - QPID_LOG(trace, "SENT SchemaResponse: (event) package=" << packageName << " class=" << key.name); - return; - } - - sendCommandCompleteLH(rExchange, rKey, sequence, 1, "class not found"); -} - -void AgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId) -{ - Mutex::ScopedLock _lock(lock); - FieldTable ft; - FieldTable::ValuePtr value; - map<string, ClassMaps>::const_iterator pIter = packages.end(); - string pname; - string cname; - string oidRepr; - boost::shared_ptr<ObjectId> oid; - - ft.decode(inBuffer); - - QPID_LOG(trace, "RCVD GetQuery: seq=" << sequence << " map=" << ft); - - value = ft.get("_package"); - if (value.get() && value->convertsTo<string>()) { - pname = value->get<string>(); - pIter = packages.find(pname); - if (pIter == packages.end()) { - sendCommandCompleteLH(DIR_EXCHANGE, replyTo, sequence); - return; - } - } - - value = ft.get("_class"); - if (value.get() && value->convertsTo<string>()) { - cname = value->get<string>(); - // TODO - check for validity of class (in package or any package) - if (pIter == packages.end()) { - } else { - - } - } - - value = ft.get("_objectid"); - if (value.get() && value->convertsTo<string>()) { - oidRepr = value->get<string>(); - oid.reset(new ObjectId()); - oid->impl->fromString(oidRepr); - } - - AgentQueryContext::Ptr context(new AgentQueryContext); - uint32_t contextNum = nextContextNum++; - context->sequence = sequence; - context->exchange = DIR_EXCHANGE; - context->key = replyTo; - contextMap[contextNum] = context; - - eventQueue.push_back(eventQuery(contextNum, userId, pname, cname, oid)); -} - -void AgentImpl::handleMethodRequest(Buffer& buffer, uint32_t sequence, const string& replyTo, const string& userId) -{ - Mutex::ScopedLock _lock(lock); - string pname; - string method; - boost::shared_ptr<ObjectId> oid(ObjectIdImpl::factory(buffer)); - buffer.getShortString(pname); - AgentClassKey classKey(buffer); - buffer.getShortString(method); - - QPID_LOG(trace, "RCVD MethodRequest seq=" << sequence << " method=" << method); - - map<string, ClassMaps>::const_iterator pIter = packages.find(pname); - if (pIter == packages.end()) { - sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_PACKAGE, pname); - return; - } - - ObjectClassMap::const_iterator cIter = pIter->second.objectClasses.find(classKey); - if (cIter == pIter->second.objectClasses.end()) { - sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_CLASS, classKey.repr()); - return; - } - - const SchemaObjectClass* schema = cIter->second; - vector<const SchemaMethod*>::const_iterator mIter = schema->impl->methods.begin(); - for (; mIter != schema->impl->methods.end(); mIter++) { - if ((*mIter)->getName() == method) - break; - } - - if (mIter == schema->impl->methods.end()) { - sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_METHOD, method); - return; - } - - const SchemaMethod* schemaMethod = *mIter; - boost::shared_ptr<Value> argMap(new Value(TYPE_MAP)); - Value* value; - for (vector<const SchemaArgument*>::const_iterator aIter = schemaMethod->impl->arguments.begin(); - aIter != schemaMethod->impl->arguments.end(); aIter++) { - const SchemaArgument* schemaArg = *aIter; - if (schemaArg->getDirection() == DIR_IN || schemaArg->getDirection() == DIR_IN_OUT) - value = ValueImpl::factory(schemaArg->getType(), buffer); - else - value = ValueImpl::factory(schemaArg->getType()); - argMap->insert(schemaArg->getName(), value); - } - - AgentQueryContext::Ptr context(new AgentQueryContext); - uint32_t contextNum = nextContextNum++; - context->sequence = sequence; - context->exchange = DIR_EXCHANGE; - context->key = replyTo; - context->schemaMethod = schemaMethod; - contextMap[contextNum] = context; - - eventQueue.push_back(eventMethod(contextNum, userId, method, oid, argMap, schema)); -} - -void AgentImpl::handleConsoleAddedIndication() -{ - Mutex::ScopedLock _lock(lock); -} - -//================================================================== -// Wrappers -//================================================================== - -Agent::Agent(char* label, bool internalStore) { impl = new AgentImpl(label, internalStore); } -Agent::~Agent() { delete impl; } -void Agent::setStoreDir(const char* path) { impl->setStoreDir(path); } -void Agent::setTransferDir(const char* path) { impl->setTransferDir(path); } -void Agent::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); } -bool Agent::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); } -void Agent::popXmt() { impl->popXmt(); } -bool Agent::getEvent(AgentEvent& event) const { return impl->getEvent(event); } -void Agent::popEvent() { impl->popEvent(); } -void Agent::newSession() { impl->newSession(); } -void Agent::startProtocol() { impl->startProtocol(); } -void Agent::heartbeat() { impl->heartbeat(); } -void Agent::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments) { impl->methodResponse(sequence, status, text, arguments); } -void Agent::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) { impl->queryResponse(sequence, object, prop, stat); } -void Agent::queryComplete(uint32_t sequence) { impl->queryComplete(sequence); } -void Agent::registerClass(SchemaObjectClass* cls) { impl->registerClass(cls); } -void Agent::registerClass(SchemaEventClass* cls) { impl->registerClass(cls); } -const ObjectId* Agent::addObject(Object& obj, uint64_t persistId) { return impl->addObject(obj, persistId); } -const ObjectId* Agent::allocObjectId(uint64_t persistId) { return impl->allocObjectId(persistId); } -const ObjectId* Agent::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) { return impl->allocObjectId(persistIdLo, persistIdHi); } -void Agent::raiseEvent(Event& event) { impl->raiseEvent(event); } - |