From 413f907caf0013942126d566113e322576759a6f Mon Sep 17 00:00:00 2001 From: Ted Ross Date: Wed, 19 Aug 2009 18:31:31 +0000 Subject: Introduce the public includes for the QMF interfaces. Rename Agent to AgentEngine to differentiate the API from the underlying engine. Note that some of these public headers will overlap with the emerging "messaging" API (notably Connection.h and ConnectionSettings.h). It is desirable that these components of the API become common between "messaging" and "qmf". As such, once the differences are reconciled, they will most likely be removed from the qmf space and placed in the messaging space. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@805916 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qmf.mk | 22 +- cpp/src/qmf/Agent.cpp | 958 -------------------------------------------- cpp/src/qmf/Agent.h | 206 ---------- cpp/src/qmf/AgentEngine.cpp | 958 ++++++++++++++++++++++++++++++++++++++++++++ cpp/src/qmf/AgentEngine.h | 207 ++++++++++ cpp/src/qmf/Console.h | 82 ---- cpp/src/qmf/ConsoleEngine.h | 83 ++++ cpp/src/qmf/Object.h | 2 +- 8 files changed, 1261 insertions(+), 1257 deletions(-) delete mode 100644 cpp/src/qmf/Agent.cpp delete mode 100644 cpp/src/qmf/Agent.h create mode 100644 cpp/src/qmf/AgentEngine.cpp create mode 100644 cpp/src/qmf/AgentEngine.h delete mode 100644 cpp/src/qmf/Console.h create mode 100644 cpp/src/qmf/ConsoleEngine.h (limited to 'cpp/src') diff --git a/cpp/src/qmf.mk b/cpp/src/qmf.mk index 62393cdcfb..8ccc1f4a95 100644 --- a/cpp/src/qmf.mk +++ b/cpp/src/qmf.mk @@ -18,7 +18,7 @@ # # -# qmf agent library makefile fragment, to be included in Makefile.am +# qmf library makefile fragment, to be included in Makefile.am # lib_LTLIBRARIES += \ libqmfcommon.la \ @@ -27,13 +27,15 @@ lib_LTLIBRARIES += \ # Public header files nobase_include_HEADERS += \ ../include/qpid/agent/ManagementAgent.h \ - ../include/qpid/agent/QmfAgentImportExport.h - + ../include/qpid/agent/QmfAgentImportExport.h \ + ../include/qmf/Agent.h \ + ../include/qmf/Connection.h \ + ../include/qmf/QmfImportExport.h \ + ../include/qmf/ConnectionSettings.h \ + ../include/qmf/AgentObject.h libqmfcommon_la_SOURCES = \ - qmf/Agent.cpp \ - qmf/Agent.h \ - qmf/Console.h \ + qmf/ConsoleEngine.h \ qmf/Event.h \ qmf/Message.h \ qmf/MessageImpl.cpp \ @@ -58,9 +60,9 @@ libqmfcommon_la_SOURCES = \ qmf/ValueImpl.h libqmfagent_la_SOURCES = \ - ../include/qpid/agent/ManagementAgent.h \ + qmf/AgentEngine.cpp \ + qmf/AgentEngine.h \ qpid/agent/ManagementAgentImpl.cpp \ - qpid/agent/ManagementAgentImpl.h \ - qmf/Agent.cpp + qpid/agent/ManagementAgentImpl.h -libqmfagent_la_LIBADD = libqpidclient.la +libqmfagent_la_LIBADD = libqpidclient.la libqmfcommon.la diff --git a/cpp/src/qmf/Agent.cpp b/cpp/src/qmf/Agent.cpp deleted file mode 100644 index 6d59ae2750..0000000000 --- a/cpp/src/qmf/Agent.cpp +++ /dev/null @@ -1,958 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "qmf/Agent.h" -#include "qmf/MessageImpl.h" -#include "qmf/SchemaImpl.h" -#include "qmf/Typecode.h" -#include "qmf/ObjectImpl.h" -#include "qmf/ObjectIdImpl.h" -#include "qmf/QueryImpl.h" -#include "qmf/ValueImpl.h" -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -using namespace std; -using namespace qmf; -using namespace qpid::framing; -using namespace qpid::sys; - -namespace qmf { - - struct AgentEventImpl { - typedef boost::shared_ptr Ptr; - AgentEvent::EventKind kind; - uint32_t sequence; - string authUserId; - string authToken; - string name; - Object* object; - boost::shared_ptr objectId; - Query query; - boost::shared_ptr arguments; - string exchange; - string bindingKey; - SchemaObjectClass* objectClass; - - AgentEventImpl(AgentEvent::EventKind k) : - kind(k), sequence(0), object(0), objectClass(0) {} - ~AgentEventImpl() {} - AgentEvent copy(); - }; - - struct AgentQueryContext { - typedef boost::shared_ptr Ptr; - uint32_t sequence; - string exchange; - string key; - SchemaMethodImpl* schemaMethod; - AgentQueryContext() : schemaMethod(0) {} - }; - - class AgentImpl { - public: - AgentImpl(char* label, bool internalStore); - ~AgentImpl(); - - void setStoreDir(char* path); - void setTransferDir(char* path); - void handleRcvMessage(Message& message); - bool getXmtMessage(Message& item); - void popXmt(); - bool getEvent(AgentEvent& event); - void popEvent(); - void newSession(); - void startProtocol(); - void heartbeat(); - void methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments); - void queryResponse(uint32_t sequence, Object& object, bool prop, bool stat); - void queryComplete(uint32_t sequence); - void registerClass(SchemaObjectClass* cls); - void registerClass(SchemaEventClass* cls); - const ObjectId* addObject(Object& obj, uint64_t persistId); - const ObjectId* allocObjectId(uint64_t persistId); - const ObjectId* allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi); - void raiseEvent(Event& event); - - private: - Mutex lock; - Mutex addLock; - string label; - string queueName; - string storeDir; - string transferDir; - bool internalStore; - uint64_t nextTransientId; - Uuid systemId; - uint32_t requestedBrokerBank; - uint32_t requestedAgentBank; - uint32_t assignedBrokerBank; - uint32_t assignedAgentBank; - AgentAttachment attachment; - uint16_t bootSequence; - uint64_t nextObjectId; - uint32_t nextContextNum; - deque eventQueue; - deque xmtQueue; - map contextMap; - - static const char* QMF_EXCHANGE; - static const char* DIR_EXCHANGE; - static const char* BROKER_KEY; - static const uint32_t MERR_UNKNOWN_METHOD = 2; - static const uint32_t MERR_UNKNOWN_PACKAGE = 8; - static const uint32_t MERR_UNKNOWN_CLASS = 9; - static const uint32_t MERR_INTERNAL_ERROR = 10; -# define MA_BUFFER_SIZE 65536 - char outputBuffer[MA_BUFFER_SIZE]; - - struct SchemaClassKey { - string name; - uint8_t hash[16]; - SchemaClassKey(const string& n, const uint8_t* h) : name(n) { - memcpy(hash, h, 16); - } - SchemaClassKey(Buffer& buffer) { - buffer.getShortString(name); - buffer.getBin128(hash); - } - string repr() { - return name; - } - }; - - struct SchemaClassKeyComp { - bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const - { - if (lhs.name != rhs.name) - return lhs.name < rhs.name; - else - for (int i = 0; i < 16; i++) - if (lhs.hash[i] != rhs.hash[i]) - return lhs.hash[i] < rhs.hash[i]; - return false; - } - }; - - typedef map ObjectClassMap; - typedef map EventClassMap; - - struct ClassMaps { - ObjectClassMap objectClasses; - EventClassMap eventClasses; - }; - - map packages; - - bool checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq); - void encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq = 0); - AgentEventImpl::Ptr eventDeclareQueue(const string& queueName); - AgentEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key); - AgentEventImpl::Ptr eventSetupComplete(); - AgentEventImpl::Ptr eventQuery(uint32_t num, const string& userId, const string& package, const string& cls, - boost::shared_ptr oid); - AgentEventImpl::Ptr eventMethod(uint32_t num, const string& userId, const string& method, - boost::shared_ptr oid, boost::shared_ptr argMap, - SchemaObjectClass* objectClass); - void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey); - - void sendPackageIndicationLH(const string& packageName); - void sendClassIndicationLH(ClassKind kind, const string& packageName, const SchemaClassKey& key); - void sendCommandCompleteLH(const string& exchange, const string& key, uint32_t seq, - uint32_t code = 0, const string& text = "OK"); - void sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text=""); - void handleAttachResponse(Buffer& inBuffer); - void handlePackageRequest(Buffer& inBuffer); - void handleClassQuery(Buffer& inBuffer); - void handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, - const string& replyToExchange, const string& replyToKey); - void handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId); - void handleMethodRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId); - void handleConsoleAddedIndication(); - }; -} - -const char* AgentImpl::QMF_EXCHANGE = "qpid.management"; -const char* AgentImpl::DIR_EXCHANGE = "amq.direct"; -const char* AgentImpl::BROKER_KEY = "broker"; - -#define STRING_REF(s) {if (!s.empty()) item.s = const_cast(s.c_str());} - -AgentEvent AgentEventImpl::copy() -{ - AgentEvent item; - - ::memset(&item, 0, sizeof(AgentEvent)); - item.kind = kind; - item.sequence = sequence; - item.object = object; - item.objectId = objectId.get(); - item.query = &query; - item.arguments = arguments.get(); - item.objectClass = objectClass; - - STRING_REF(authUserId); - STRING_REF(authToken); - STRING_REF(name); - STRING_REF(exchange); - STRING_REF(bindingKey); - - return item; -} - -AgentImpl::AgentImpl(char* _label, bool i) : - label(_label), queueName("qmfa-"), internalStore(i), nextTransientId(1), - requestedBrokerBank(0), requestedAgentBank(0), - assignedBrokerBank(0), assignedAgentBank(0), - bootSequence(1), nextObjectId(1), nextContextNum(1) -{ - queueName += label; -} - -AgentImpl::~AgentImpl() -{ -} - -void AgentImpl::setStoreDir(char* path) -{ - Mutex::ScopedLock _lock(lock); - if (path) - storeDir = path; - else - storeDir.clear(); -} - -void AgentImpl::setTransferDir(char* path) -{ - Mutex::ScopedLock _lock(lock); - if (path) - transferDir = path; - else - transferDir.clear(); -} - -void AgentImpl::handleRcvMessage(Message& message) -{ - Buffer inBuffer(message.body, message.length); - uint8_t opcode; - uint32_t sequence; - string replyToExchange(message.replyExchange ? message.replyExchange : ""); - string replyToKey(message.replyKey ? message.replyKey : ""); - string userId(message.userId ? message.userId : ""); - - if (checkHeader(inBuffer, &opcode, &sequence)) { - if (opcode == 'a') handleAttachResponse(inBuffer); - else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey); - else if (opcode == 'x') handleConsoleAddedIndication(); - else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey, userId); - else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey, userId); - } -} - -bool AgentImpl::getXmtMessage(Message& item) -{ - Mutex::ScopedLock _lock(lock); - if (xmtQueue.empty()) - return false; - item = xmtQueue.front()->copy(); - return true; -} - -void AgentImpl::popXmt() -{ - Mutex::ScopedLock _lock(lock); - if (!xmtQueue.empty()) - xmtQueue.pop_front(); -} - -bool AgentImpl::getEvent(AgentEvent& event) -{ - Mutex::ScopedLock _lock(lock); - if (eventQueue.empty()) - return false; - event = eventQueue.front()->copy(); - return true; -} - -void AgentImpl::popEvent() -{ - Mutex::ScopedLock _lock(lock); - if (!eventQueue.empty()) - eventQueue.pop_front(); -} - -void AgentImpl::newSession() -{ - Mutex::ScopedLock _lock(lock); - eventQueue.clear(); - xmtQueue.clear(); - eventQueue.push_back(eventDeclareQueue(queueName)); - eventQueue.push_back(eventBind("amq.direct", queueName, queueName)); - eventQueue.push_back(eventSetupComplete()); -} - -void AgentImpl::startProtocol() -{ - Mutex::ScopedLock _lock(lock); - char rawbuffer[512]; - Buffer buffer(rawbuffer, 512); - - encodeHeader(buffer, 'A'); - buffer.putShortString("qmfa"); - systemId.encode(buffer); - buffer.putLong(requestedBrokerBank); - buffer.putLong(requestedAgentBank); - sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); - QPID_LOG(trace, "SENT AttachRequest: reqBroker=" << requestedBrokerBank << - " reqAgent=" << requestedAgentBank); -} - -void AgentImpl::heartbeat() -{ - Mutex::ScopedLock _lock(lock); - Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - - encodeHeader(buffer, 'h'); - buffer.putLongLong(uint64_t(Duration(now()))); - stringstream key; - key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank; - sendBufferLH(buffer, QMF_EXCHANGE, key.str()); - QPID_LOG(trace, "SENT HeartbeatIndication"); -} - -void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text, - const Value& argMap) -{ - Mutex::ScopedLock _lock(lock); - map::iterator iter = contextMap.find(sequence); - if (iter == contextMap.end()) - return; - AgentQueryContext::Ptr context = iter->second; - contextMap.erase(iter); - - Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 'm', context->sequence); - buffer.putLong(status); - buffer.putMediumString(text); - if (status == 0) { - for (vector::const_iterator aIter = context->schemaMethod->arguments.begin(); - aIter != context->schemaMethod->arguments.end(); aIter++) { - const SchemaArgumentImpl* schemaArg = *aIter; - if (schemaArg->dir == DIR_OUT || schemaArg->dir == DIR_IN_OUT) { - if (argMap.keyInMap(schemaArg->name.c_str())) { - const Value* val = argMap.byKey(schemaArg->name.c_str()); - val->impl->encode(buffer); - } else { - Value val(schemaArg->typecode); - val.impl->encode(buffer); - } - } - } - } - sendBufferLH(buffer, context->exchange, context->key); - QPID_LOG(trace, "SENT MethodResponse"); -} - -void AgentImpl::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) -{ - Mutex::ScopedLock _lock(lock); - map::iterator iter = contextMap.find(sequence); - if (iter == contextMap.end()) - return; - AgentQueryContext::Ptr context = iter->second; - - Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 'g', context->sequence); - - object.impl->encodeSchemaKey(buffer); - object.impl->encodeManagedObjectData(buffer); - if (prop) - object.impl->encodeProperties(buffer); - if (stat) - object.impl->encodeStatistics(buffer); - - sendBufferLH(buffer, context->exchange, context->key); - QPID_LOG(trace, "SENT ContentIndication"); -} - -void AgentImpl::queryComplete(uint32_t sequence) -{ - Mutex::ScopedLock _lock(lock); - map::iterator iter = contextMap.find(sequence); - if (iter == contextMap.end()) - return; - - AgentQueryContext::Ptr context = iter->second; - contextMap.erase(iter); - sendCommandCompleteLH(context->exchange, context->key, context->sequence, 0, "OK"); -} - -void AgentImpl::registerClass(SchemaObjectClass* cls) -{ - Mutex::ScopedLock _lock(lock); - SchemaObjectClassImpl* impl = cls->impl; - - map::iterator iter = packages.find(impl->package); - if (iter == packages.end()) { - packages[impl->package] = ClassMaps(); - iter = packages.find(impl->package); - // TODO: Indicate this package if connected - } - - SchemaClassKey key(impl->name, impl->getHash()); - iter->second.objectClasses[key] = impl; - - // TODO: Indicate this schema if connected. -} - -void AgentImpl::registerClass(SchemaEventClass* cls) -{ - Mutex::ScopedLock _lock(lock); - SchemaEventClassImpl* impl = cls->impl; - - map::iterator iter = packages.find(impl->package); - if (iter == packages.end()) { - packages[impl->package] = ClassMaps(); - iter = packages.find(impl->package); - // TODO: Indicate this package if connected - } - - SchemaClassKey key(impl->name, impl->getHash()); - iter->second.eventClasses[key] = impl; - - // TODO: Indicate this schema if connected. -} - -const ObjectId* AgentImpl::addObject(Object&, uint64_t) -{ - Mutex::ScopedLock _lock(lock); - return 0; -} - -const ObjectId* AgentImpl::allocObjectId(uint64_t persistId) -{ - Mutex::ScopedLock _lock(lock); - uint16_t sequence = persistId ? 0 : bootSequence; - uint64_t objectNum = persistId ? persistId : nextObjectId++; - - ObjectIdImpl* oid = new ObjectIdImpl(&attachment, 0, sequence, objectNum); - return oid->envelope; -} - -const ObjectId* AgentImpl::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) -{ - return allocObjectId(((uint64_t) persistIdHi) << 32 | (uint64_t) persistIdLo); -} - -void AgentImpl::raiseEvent(Event&) -{ - Mutex::ScopedLock _lock(lock); -} - -void AgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq) -{ - buf.putOctet('A'); - buf.putOctet('M'); - buf.putOctet('3'); - buf.putOctet(opcode); - buf.putLong (seq); -} - -bool AgentImpl::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq) -{ - if (buf.getSize() < 8) - return false; - - uint8_t h1 = buf.getOctet(); - uint8_t h2 = buf.getOctet(); - uint8_t h3 = buf.getOctet(); - - *opcode = buf.getOctet(); - *seq = buf.getLong(); - - return h1 == 'A' && h2 == 'M' && h3 == '3'; -} - -AgentEventImpl::Ptr AgentImpl::eventDeclareQueue(const string& name) -{ - AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::DECLARE_QUEUE)); - event->name = name; - - return event; -} - -AgentEventImpl::Ptr AgentImpl::eventBind(const string& exchange, const string& queue, - const string& key) -{ - AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::BIND)); - event->name = queue; - event->exchange = exchange; - event->bindingKey = key; - - return event; -} - -AgentEventImpl::Ptr AgentImpl::eventSetupComplete() -{ - AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::SETUP_COMPLETE)); - return event; -} - -AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string& package, - const string& cls, boost::shared_ptr oid) -{ - AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY)); - event->sequence = num; - event->authUserId = userId; - event->query.impl->packageName = package; - event->query.impl->className = cls; - event->query.impl->oid = oid; - return event; -} - -AgentEventImpl::Ptr AgentImpl::eventMethod(uint32_t num, const string& userId, const string& method, - boost::shared_ptr oid, boost::shared_ptr argMap, - SchemaObjectClass* objectClass) -{ - AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::METHOD_CALL)); - event->sequence = num; - event->authUserId = userId; - event->name = method; - event->objectId = oid; - event->arguments = argMap; - event->objectClass = objectClass; - return event; -} - -void AgentImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey) -{ - uint32_t length = buf.getPosition(); - MessageImpl::Ptr message(new MessageImpl); - - buf.reset(); - buf.getRawData(message->body, length); - message->destination = destination; - message->routingKey = routingKey; - message->replyExchange = "amq.direct"; - message->replyKey = queueName; - - xmtQueue.push_back(message); -} - -void AgentImpl::sendPackageIndicationLH(const string& packageName) -{ - Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 'p'); - buffer.putShortString(packageName); - sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); - QPID_LOG(trace, "SENT PackageIndication: package_name=" << packageName); -} - -void AgentImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const SchemaClassKey& key) -{ - Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 'q'); - buffer.putOctet((int) kind); - buffer.putShortString(packageName); - buffer.putShortString(key.name); - buffer.putBin128(const_cast(key.hash)); // const_cast needed for older Qpid libraries - sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); - QPID_LOG(trace, "SENT ClassIndication: package_name=" << packageName << " class_name=" << key.name); -} - -void AgentImpl::sendCommandCompleteLH(const string& exchange, const string& replyToKey, - uint32_t sequence, uint32_t code, const string& text) -{ - Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 'z', sequence); - buffer.putLong(code); - buffer.putShortString(text); - sendBufferLH(buffer, exchange, replyToKey); - QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text); -} - -void AgentImpl::sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text) -{ - Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 'm', sequence); - buffer.putLong(code); - - string fulltext; - switch (code) { - case MERR_UNKNOWN_PACKAGE: fulltext = "Unknown Package"; break; - case MERR_UNKNOWN_CLASS: fulltext = "Unknown Class"; break; - case MERR_UNKNOWN_METHOD: fulltext = "Unknown Method"; break; - case MERR_INTERNAL_ERROR: fulltext = "Internal Error"; break; - default: fulltext = "Unspecified Error"; break; - } - - if (!text.empty()) { - fulltext += " ("; - fulltext += text; - fulltext += ")"; - } - - buffer.putMediumString(fulltext); - sendBufferLH(buffer, DIR_EXCHANGE, key); - QPID_LOG(trace, "SENT MethodResponse: errorCode=" << code << " text=" << fulltext); -} - -void AgentImpl::handleAttachResponse(Buffer& inBuffer) -{ - Mutex::ScopedLock _lock(lock); - - assignedBrokerBank = inBuffer.getLong(); - assignedAgentBank = inBuffer.getLong(); - - QPID_LOG(trace, "RCVD AttachResponse: broker=" << assignedBrokerBank << " agent=" << assignedAgentBank); - - if ((assignedBrokerBank != requestedBrokerBank) || - (assignedAgentBank != requestedAgentBank)) { - if (requestedAgentBank == 0) { - QPID_LOG(notice, "Initial object-id bank assigned: " << assignedBrokerBank << "." << - assignedAgentBank); - } else { - QPID_LOG(warning, "Collision in object-id! New bank assigned: " << assignedBrokerBank << - "." << assignedAgentBank); - } - //storeData(); // TODO - requestedBrokerBank = assignedBrokerBank; - requestedAgentBank = assignedAgentBank; - } - - attachment.setBanks(assignedBrokerBank, assignedAgentBank); - - // Bind to qpid.management to receive commands - stringstream key; - key << "agent." << assignedBrokerBank << "." << assignedAgentBank; - eventQueue.push_back(eventBind(QMF_EXCHANGE, queueName, key.str())); - - // Send package indications for all local packages - for (map::iterator pIter = packages.begin(); - pIter != packages.end(); - pIter++) { - sendPackageIndicationLH(pIter->first); - - // Send class indications for all local classes - ClassMaps cMap = pIter->second; - for (ObjectClassMap::iterator cIter = cMap.objectClasses.begin(); - cIter != cMap.objectClasses.end(); cIter++) - sendClassIndicationLH(CLASS_OBJECT, pIter->first, cIter->first); - for (EventClassMap::iterator cIter = cMap.eventClasses.begin(); - cIter != cMap.eventClasses.end(); cIter++) - sendClassIndicationLH(CLASS_EVENT, pIter->first, cIter->first); - } -} - -void AgentImpl::handlePackageRequest(Buffer&) -{ - Mutex::ScopedLock _lock(lock); -} - -void AgentImpl::handleClassQuery(Buffer&) -{ - Mutex::ScopedLock _lock(lock); -} - -void AgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, - const string& replyExchange, const string& replyKey) -{ - Mutex::ScopedLock _lock(lock); - string rExchange(replyExchange); - string rKey(replyKey); - string packageName; - inBuffer.getShortString(packageName); - SchemaClassKey key(inBuffer); - - if (rExchange.empty()) - rExchange = QMF_EXCHANGE; - if (rKey.empty()) - rKey = BROKER_KEY; - - QPID_LOG(trace, "RCVD SchemaRequest: package=" << packageName << " class=" << key.name); - - map::iterator pIter = packages.find(packageName); - if (pIter == packages.end()) { - sendCommandCompleteLH(rExchange, rKey, sequence, 1, "package not found"); - return; - } - - ClassMaps cMap = pIter->second; - ObjectClassMap::iterator ocIter = cMap.objectClasses.find(key); - if (ocIter != cMap.objectClasses.end()) { - SchemaObjectClassImpl* oImpl = ocIter->second; - Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 's', sequence); - oImpl->encode(buffer); - sendBufferLH(buffer, rExchange, rKey); - QPID_LOG(trace, "SENT SchemaResponse: (object) package=" << packageName << " class=" << key.name); - return; - } - - EventClassMap::iterator ecIter = cMap.eventClasses.find(key); - if (ecIter != cMap.eventClasses.end()) { - SchemaEventClassImpl* eImpl = ecIter->second; - Buffer buffer(outputBuffer, MA_BUFFER_SIZE); - encodeHeader(buffer, 's', sequence); - eImpl->encode(buffer); - sendBufferLH(buffer, rExchange, rKey); - QPID_LOG(trace, "SENT SchemaResponse: (event) package=" << packageName << " class=" << key.name); - return; - } - - sendCommandCompleteLH(rExchange, rKey, sequence, 1, "class not found"); -} - -void AgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId) -{ - Mutex::ScopedLock _lock(lock); - FieldTable ft; - FieldTable::ValuePtr value; - map::const_iterator pIter = packages.end(); - string pname; - string cname; - string oidRepr; - boost::shared_ptr oid; - - ft.decode(inBuffer); - - QPID_LOG(trace, "RCVD GetQuery: map=" << ft); - - value = ft.get("_package"); - if (value.get() && value->convertsTo()) { - pname = value->get(); - pIter = packages.find(pname); - if (pIter == packages.end()) { - sendCommandCompleteLH(DIR_EXCHANGE, replyTo, sequence); - return; - } - } - - value = ft.get("_class"); - if (value.get() && value->convertsTo()) { - cname = value->get(); - // TODO - check for validity of class (in package or any package) - if (pIter == packages.end()) { - } else { - - } - } - - value = ft.get("_objectid"); - if (value.get() && value->convertsTo()) { - oidRepr = value->get(); - oid.reset(new ObjectId()); - oid->impl->fromString(oidRepr); - } - - AgentQueryContext::Ptr context(new AgentQueryContext); - uint32_t contextNum = nextContextNum++; - context->sequence = sequence; - context->exchange = DIR_EXCHANGE; - context->key = replyTo; - contextMap[contextNum] = context; - - eventQueue.push_back(eventQuery(contextNum, userId, pname, cname, oid)); -} - -void AgentImpl::handleMethodRequest(Buffer& buffer, uint32_t sequence, const string& replyTo, const string& userId) -{ - Mutex::ScopedLock _lock(lock); - string pname; - string method; - ObjectIdImpl* oidImpl = new ObjectIdImpl(buffer); - boost::shared_ptr oid(oidImpl->envelope); - buffer.getShortString(pname); - SchemaClassKey classKey(buffer); - buffer.getShortString(method); - - map::const_iterator pIter = packages.find(pname); - if (pIter == packages.end()) { - sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_PACKAGE, pname); - return; - } - - ObjectClassMap::const_iterator cIter = pIter->second.objectClasses.find(classKey); - if (cIter == pIter->second.objectClasses.end()) { - sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_CLASS, classKey.repr()); - return; - } - - const SchemaObjectClassImpl* schema = cIter->second; - vector::const_iterator mIter = schema->methods.begin(); - for (; mIter != schema->methods.end(); mIter++) { - if ((*mIter)->name == method) - break; - } - - if (mIter == schema->methods.end()) { - sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_METHOD, method); - return; - } - - SchemaMethodImpl* schemaMethod = *mIter; - boost::shared_ptr argMap(new Value(TYPE_MAP)); - ValueImpl* value; - for (vector::const_iterator aIter = schemaMethod->arguments.begin(); - aIter != schemaMethod->arguments.end(); aIter++) { - const SchemaArgumentImpl* schemaArg = *aIter; - if (schemaArg->dir == DIR_IN || schemaArg->dir == DIR_IN_OUT) - value = new ValueImpl(schemaArg->typecode, buffer); - else - value = new ValueImpl(schemaArg->typecode); - argMap->insert(schemaArg->name.c_str(), value->envelope); - } - - AgentQueryContext::Ptr context(new AgentQueryContext); - uint32_t contextNum = nextContextNum++; - context->sequence = sequence; - context->exchange = DIR_EXCHANGE; - context->key = replyTo; - context->schemaMethod = schemaMethod; - contextMap[contextNum] = context; - - eventQueue.push_back(eventMethod(contextNum, userId, method, oid, argMap, schema->envelope)); -} - -void AgentImpl::handleConsoleAddedIndication() -{ - Mutex::ScopedLock _lock(lock); -} - -//================================================================== -// Wrappers -//================================================================== - -Agent::Agent(char* label, bool internalStore) -{ - impl = new AgentImpl(label, internalStore); -} - -Agent::~Agent() -{ - delete impl; -} - -void Agent::setStoreDir(char* path) -{ - impl->setStoreDir(path); -} - -void Agent::setTransferDir(char* path) -{ - impl->setTransferDir(path); -} - -void Agent::handleRcvMessage(Message& message) -{ - impl->handleRcvMessage(message); -} - -bool Agent::getXmtMessage(Message& item) -{ - return impl->getXmtMessage(item); -} - -void Agent::popXmt() -{ - impl->popXmt(); -} - -bool Agent::getEvent(AgentEvent& event) -{ - return impl->getEvent(event); -} - -void Agent::popEvent() -{ - impl->popEvent(); -} - -void Agent::newSession() -{ - impl->newSession(); -} - -void Agent::startProtocol() -{ - impl->startProtocol(); -} - -void Agent::heartbeat() -{ - impl->heartbeat(); -} - -void Agent::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments) -{ - impl->methodResponse(sequence, status, text, arguments); -} - -void Agent::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) -{ - impl->queryResponse(sequence, object, prop, stat); -} - -void Agent::queryComplete(uint32_t sequence) -{ - impl->queryComplete(sequence); -} - -void Agent::registerClass(SchemaObjectClass* cls) -{ - impl->registerClass(cls); -} - -void Agent::registerClass(SchemaEventClass* cls) -{ - impl->registerClass(cls); -} - -const ObjectId* Agent::addObject(Object& obj, uint64_t persistId) -{ - return impl->addObject(obj, persistId); -} - -const ObjectId* Agent::allocObjectId(uint64_t persistId) -{ - return impl->allocObjectId(persistId); -} - -const ObjectId* Agent::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) -{ - return impl->allocObjectId(persistIdLo, persistIdHi); -} - -void Agent::raiseEvent(Event& event) -{ - impl->raiseEvent(event); -} - diff --git a/cpp/src/qmf/Agent.h b/cpp/src/qmf/Agent.h deleted file mode 100644 index d8f784e9d8..0000000000 --- a/cpp/src/qmf/Agent.h +++ /dev/null @@ -1,206 +0,0 @@ -#ifndef _QmfAgent_ -#define _QmfAgent_ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include -#include -#include -#include -#include -#include -#include - -namespace qmf { - - /** - * AgentEvent - * - * This structure represents a QMF event coming from the agent to - * the application. - */ - struct AgentEvent { - enum EventKind { - GET_QUERY = 1, - START_SYNC = 2, - END_SYNC = 3, - METHOD_CALL = 4, - DECLARE_QUEUE = 5, - DELETE_QUEUE = 6, - BIND = 7, - UNBIND = 8, - SETUP_COMPLETE = 9 - }; - - EventKind kind; - uint32_t sequence; // Protocol sequence (for all kinds) - char* authUserId; // Authenticated user ID (for all kinds) - char* authToken; // Authentication token if issued (for all kinds) - char* name; // Name of the method/sync query - // (METHOD_CALL, START_SYNC, END_SYNC, DECLARE_QUEUE, BIND, UNBIND) - Object* object; // Object involved in method call (METHOD_CALL) - ObjectId* objectId; // ObjectId for method call (METHOD_CALL) - Query* query; // Query parameters (GET_QUERY, START_SYNC) - Value* arguments; // Method parameters (METHOD_CALL) - char* exchange; // Exchange for bind (BIND, UNBIND) - char* bindingKey; // Key for bind (BIND, UNBIND) - SchemaObjectClass* objectClass; // (METHOD_CALL) - }; - - class AgentImpl; - - /** - * Agent - Protocol engine for the QMF agent - */ - class Agent { - public: - Agent(char* label, bool internalStore=true); - ~Agent(); - - /** - * Configure the directory path for storing persistent data. - *@param path Null-terminated string containing a directory path where files can be - * created, written, and read. If NULL, no persistent storage will be - * attempted. - */ - void setStoreDir(char* path); - - /** - * Configure the directory path for files transferred over QMF. - *@param path Null-terminated string containing a directory path where files can be - * created, deleted, written, and read. If NULL, file transfers shall not - * be permitted. - */ - void setTransferDir(char* path); - - /** - * Pass messages received from the AMQP session to the Agent engine. - *@param message AMQP messages received on the agent session. - */ - void handleRcvMessage(Message& message); - - /** - * Get the next message to be sent to the AMQP network. - *@param item The Message structure describing the message to be produced. - *@return true if the Message is valid, false if there are no messages to send. - */ - bool getXmtMessage(Message& item); - - /** - * Remove and discard one message from the head of the transmit queue. - */ - void popXmt(); - - /** - * Get the next application event from the agent engine. - *@param event The event iff the return value is true - *@return true if event is valid, false if there are no events to process - */ - bool getEvent(AgentEvent& event); - - /** - * Remove and discard one event from the head of the event queue. - */ - void popEvent(); - - /** - * A new AMQP session has been established for Agent communication. - */ - void newSession(); - - /** - * Start the QMF Agent protocol. This should be invoked after a SETUP_COMPLETE event - * is received from the Agent engine. - */ - void startProtocol(); - - /** - * This method is called periodically so the agent can supply a heartbeat. - */ - void heartbeat(); - - /** - * Respond to a method request. - *@param sequence The sequence number from the method request event. - *@param status The method's completion status. - *@param text Status text ("OK" or an error message) - *@param arguments The list of output arguments from the method call. - */ - void methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments); - - /** - * Send a content indication to the QMF bus. This is only needed for objects that are - * managed by the application. This is *NOT* needed for objects managed by the Agent - * (inserted using addObject). - *@param sequence The sequence number of the GET request or the SYNC_START request. - *@param object The object (annotated with "changed" flags) for publication. - *@param prop If true, changed object properties are transmitted. - *@param stat If true, changed object statistics are transmitted. - */ - void queryResponse(uint32_t sequence, Object& object, bool prop = true, bool stat = true); - - /** - * Indicate the completion of a query. This is not used for SYNC_START requests. - *@param sequence The sequence number of the GET request. - */ - void queryComplete(uint32_t sequence); - - /** - * Register a schema class with the Agent. - *@param cls A SchemaObejctClass object that defines data managed by the agent. - */ - void registerClass(SchemaObjectClass* cls); - - /** - * Register a schema class with the Agent. - *@param cls A SchemaEventClass object that defines events sent by the agent. - */ - void registerClass(SchemaEventClass* cls); - - /** - * Give an object to the Agent for storage and management. Once added, the agent takes - * responsibility for the life cycle of the object. - *@param obj The object to be managed by the Agent. - *@param persistId A unique non-zero value if the object-id is to be persistent. - *@return The objectId of the managed object. - */ - const ObjectId* addObject(Object& obj, uint64_t persistId); - - /** - * Allocate an objecc-id for an object that will be managed by the application. - *@param persistId A unique non-zero value if the object-id is to be persistent. - @return The objectId structure for the allocated ID. - */ - const ObjectId* allocObjectId(uint64_t persistId); - const ObjectId* allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi); - - /** - * Raise an event into the QMF network.. - *@param event The event object for the event to be raised. - */ - void raiseEvent(Event& event); - - private: - AgentImpl* impl; - }; -} - -#endif - diff --git a/cpp/src/qmf/AgentEngine.cpp b/cpp/src/qmf/AgentEngine.cpp new file mode 100644 index 0000000000..bef8b3d102 --- /dev/null +++ b/cpp/src/qmf/AgentEngine.cpp @@ -0,0 +1,958 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "qmf/AgentEngine.h" +#include "qmf/MessageImpl.h" +#include "qmf/SchemaImpl.h" +#include "qmf/Typecode.h" +#include "qmf/ObjectImpl.h" +#include "qmf/ObjectIdImpl.h" +#include "qmf/QueryImpl.h" +#include "qmf/ValueImpl.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace std; +using namespace qmf; +using namespace qpid::framing; +using namespace qpid::sys; + +namespace qmf { + + struct AgentEventImpl { + typedef boost::shared_ptr Ptr; + AgentEvent::EventKind kind; + uint32_t sequence; + string authUserId; + string authToken; + string name; + Object* object; + boost::shared_ptr objectId; + Query query; + boost::shared_ptr arguments; + string exchange; + string bindingKey; + SchemaObjectClass* objectClass; + + AgentEventImpl(AgentEvent::EventKind k) : + kind(k), sequence(0), object(0), objectClass(0) {} + ~AgentEventImpl() {} + AgentEvent copy(); + }; + + struct AgentQueryContext { + typedef boost::shared_ptr Ptr; + uint32_t sequence; + string exchange; + string key; + SchemaMethodImpl* schemaMethod; + AgentQueryContext() : schemaMethod(0) {} + }; + + class AgentEngineImpl { + public: + AgentEngineImpl(char* label, bool internalStore); + ~AgentEngineImpl(); + + void setStoreDir(const char* path); + void setTransferDir(const char* path); + void handleRcvMessage(Message& message); + bool getXmtMessage(Message& item); + void popXmt(); + bool getEvent(AgentEvent& event); + void popEvent(); + void newSession(); + void startProtocol(); + void heartbeat(); + void methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments); + void queryResponse(uint32_t sequence, Object& object, bool prop, bool stat); + void queryComplete(uint32_t sequence); + void registerClass(SchemaObjectClass* cls); + void registerClass(SchemaEventClass* cls); + const ObjectId* addObject(Object& obj, uint64_t persistId); + const ObjectId* allocObjectId(uint64_t persistId); + const ObjectId* allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi); + void raiseEvent(Event& event); + + private: + Mutex lock; + Mutex addLock; + string label; + string queueName; + string storeDir; + string transferDir; + bool internalStore; + uint64_t nextTransientId; + Uuid systemId; + uint32_t requestedBrokerBank; + uint32_t requestedAgentBank; + uint32_t assignedBrokerBank; + uint32_t assignedAgentBank; + AgentAttachment attachment; + uint16_t bootSequence; + uint64_t nextObjectId; + uint32_t nextContextNum; + deque eventQueue; + deque xmtQueue; + map contextMap; + + static const char* QMF_EXCHANGE; + static const char* DIR_EXCHANGE; + static const char* BROKER_KEY; + static const uint32_t MERR_UNKNOWN_METHOD = 2; + static const uint32_t MERR_UNKNOWN_PACKAGE = 8; + static const uint32_t MERR_UNKNOWN_CLASS = 9; + static const uint32_t MERR_INTERNAL_ERROR = 10; +# define MA_BUFFER_SIZE 65536 + char outputBuffer[MA_BUFFER_SIZE]; + + struct SchemaClassKey { + string name; + uint8_t hash[16]; + SchemaClassKey(const string& n, const uint8_t* h) : name(n) { + memcpy(hash, h, 16); + } + SchemaClassKey(Buffer& buffer) { + buffer.getShortString(name); + buffer.getBin128(hash); + } + string repr() { + return name; + } + }; + + struct SchemaClassKeyComp { + bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const + { + if (lhs.name != rhs.name) + return lhs.name < rhs.name; + else + for (int i = 0; i < 16; i++) + if (lhs.hash[i] != rhs.hash[i]) + return lhs.hash[i] < rhs.hash[i]; + return false; + } + }; + + typedef map ObjectClassMap; + typedef map EventClassMap; + + struct ClassMaps { + ObjectClassMap objectClasses; + EventClassMap eventClasses; + }; + + map packages; + + bool checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq); + void encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq = 0); + AgentEventImpl::Ptr eventDeclareQueue(const string& queueName); + AgentEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key); + AgentEventImpl::Ptr eventSetupComplete(); + AgentEventImpl::Ptr eventQuery(uint32_t num, const string& userId, const string& package, const string& cls, + boost::shared_ptr oid); + AgentEventImpl::Ptr eventMethod(uint32_t num, const string& userId, const string& method, + boost::shared_ptr oid, boost::shared_ptr argMap, + SchemaObjectClass* objectClass); + void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey); + + void sendPackageIndicationLH(const string& packageName); + void sendClassIndicationLH(ClassKind kind, const string& packageName, const SchemaClassKey& key); + void sendCommandCompleteLH(const string& exchange, const string& key, uint32_t seq, + uint32_t code = 0, const string& text = "OK"); + void sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text=""); + void handleAttachResponse(Buffer& inBuffer); + void handlePackageRequest(Buffer& inBuffer); + void handleClassQuery(Buffer& inBuffer); + void handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, + const string& replyToExchange, const string& replyToKey); + void handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId); + void handleMethodRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId); + void handleConsoleAddedIndication(); + }; +} + +const char* AgentEngineImpl::QMF_EXCHANGE = "qpid.management"; +const char* AgentEngineImpl::DIR_EXCHANGE = "amq.direct"; +const char* AgentEngineImpl::BROKER_KEY = "broker"; + +#define STRING_REF(s) {if (!s.empty()) item.s = const_cast(s.c_str());} + +AgentEvent AgentEventImpl::copy() +{ + AgentEvent item; + + ::memset(&item, 0, sizeof(AgentEvent)); + item.kind = kind; + item.sequence = sequence; + item.object = object; + item.objectId = objectId.get(); + item.query = &query; + item.arguments = arguments.get(); + item.objectClass = objectClass; + + STRING_REF(authUserId); + STRING_REF(authToken); + STRING_REF(name); + STRING_REF(exchange); + STRING_REF(bindingKey); + + return item; +} + +AgentEngineImpl::AgentEngineImpl(char* _label, bool i) : + label(_label), queueName("qmfa-"), internalStore(i), nextTransientId(1), + requestedBrokerBank(0), requestedAgentBank(0), + assignedBrokerBank(0), assignedAgentBank(0), + bootSequence(1), nextObjectId(1), nextContextNum(1) +{ + queueName += label; +} + +AgentEngineImpl::~AgentEngineImpl() +{ +} + +void AgentEngineImpl::setStoreDir(const char* path) +{ + Mutex::ScopedLock _lock(lock); + if (path) + storeDir = path; + else + storeDir.clear(); +} + +void AgentEngineImpl::setTransferDir(const char* path) +{ + Mutex::ScopedLock _lock(lock); + if (path) + transferDir = path; + else + transferDir.clear(); +} + +void AgentEngineImpl::handleRcvMessage(Message& message) +{ + Buffer inBuffer(message.body, message.length); + uint8_t opcode; + uint32_t sequence; + string replyToExchange(message.replyExchange ? message.replyExchange : ""); + string replyToKey(message.replyKey ? message.replyKey : ""); + string userId(message.userId ? message.userId : ""); + + if (checkHeader(inBuffer, &opcode, &sequence)) { + if (opcode == 'a') handleAttachResponse(inBuffer); + else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey); + else if (opcode == 'x') handleConsoleAddedIndication(); + else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey, userId); + else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey, userId); + } +} + +bool AgentEngineImpl::getXmtMessage(Message& item) +{ + Mutex::ScopedLock _lock(lock); + if (xmtQueue.empty()) + return false; + item = xmtQueue.front()->copy(); + return true; +} + +void AgentEngineImpl::popXmt() +{ + Mutex::ScopedLock _lock(lock); + if (!xmtQueue.empty()) + xmtQueue.pop_front(); +} + +bool AgentEngineImpl::getEvent(AgentEvent& event) +{ + Mutex::ScopedLock _lock(lock); + if (eventQueue.empty()) + return false; + event = eventQueue.front()->copy(); + return true; +} + +void AgentEngineImpl::popEvent() +{ + Mutex::ScopedLock _lock(lock); + if (!eventQueue.empty()) + eventQueue.pop_front(); +} + +void AgentEngineImpl::newSession() +{ + Mutex::ScopedLock _lock(lock); + eventQueue.clear(); + xmtQueue.clear(); + eventQueue.push_back(eventDeclareQueue(queueName)); + eventQueue.push_back(eventBind("amq.direct", queueName, queueName)); + eventQueue.push_back(eventSetupComplete()); +} + +void AgentEngineImpl::startProtocol() +{ + Mutex::ScopedLock _lock(lock); + char rawbuffer[512]; + Buffer buffer(rawbuffer, 512); + + encodeHeader(buffer, 'A'); + buffer.putShortString("qmfa"); + systemId.encode(buffer); + buffer.putLong(requestedBrokerBank); + buffer.putLong(requestedAgentBank); + sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); + QPID_LOG(trace, "SENT AttachRequest: reqBroker=" << requestedBrokerBank << + " reqAgent=" << requestedAgentBank); +} + +void AgentEngineImpl::heartbeat() +{ + Mutex::ScopedLock _lock(lock); + Buffer buffer(outputBuffer, MA_BUFFER_SIZE); + + encodeHeader(buffer, 'h'); + buffer.putLongLong(uint64_t(Duration(now()))); + stringstream key; + key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank; + sendBufferLH(buffer, QMF_EXCHANGE, key.str()); + QPID_LOG(trace, "SENT HeartbeatIndication"); +} + +void AgentEngineImpl::methodResponse(uint32_t sequence, uint32_t status, char* text, + const Value& argMap) +{ + Mutex::ScopedLock _lock(lock); + map::iterator iter = contextMap.find(sequence); + if (iter == contextMap.end()) + return; + AgentQueryContext::Ptr context = iter->second; + contextMap.erase(iter); + + Buffer buffer(outputBuffer, MA_BUFFER_SIZE); + encodeHeader(buffer, 'm', context->sequence); + buffer.putLong(status); + buffer.putMediumString(text); + if (status == 0) { + for (vector::const_iterator aIter = context->schemaMethod->arguments.begin(); + aIter != context->schemaMethod->arguments.end(); aIter++) { + const SchemaArgumentImpl* schemaArg = *aIter; + if (schemaArg->dir == DIR_OUT || schemaArg->dir == DIR_IN_OUT) { + if (argMap.keyInMap(schemaArg->name.c_str())) { + const Value* val = argMap.byKey(schemaArg->name.c_str()); + val->impl->encode(buffer); + } else { + Value val(schemaArg->typecode); + val.impl->encode(buffer); + } + } + } + } + sendBufferLH(buffer, context->exchange, context->key); + QPID_LOG(trace, "SENT MethodResponse"); +} + +void AgentEngineImpl::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) +{ + Mutex::ScopedLock _lock(lock); + map::iterator iter = contextMap.find(sequence); + if (iter == contextMap.end()) + return; + AgentQueryContext::Ptr context = iter->second; + + Buffer buffer(outputBuffer, MA_BUFFER_SIZE); + encodeHeader(buffer, 'g', context->sequence); + + object.impl->encodeSchemaKey(buffer); + object.impl->encodeManagedObjectData(buffer); + if (prop) + object.impl->encodeProperties(buffer); + if (stat) + object.impl->encodeStatistics(buffer); + + sendBufferLH(buffer, context->exchange, context->key); + QPID_LOG(trace, "SENT ContentIndication"); +} + +void AgentEngineImpl::queryComplete(uint32_t sequence) +{ + Mutex::ScopedLock _lock(lock); + map::iterator iter = contextMap.find(sequence); + if (iter == contextMap.end()) + return; + + AgentQueryContext::Ptr context = iter->second; + contextMap.erase(iter); + sendCommandCompleteLH(context->exchange, context->key, context->sequence, 0, "OK"); +} + +void AgentEngineImpl::registerClass(SchemaObjectClass* cls) +{ + Mutex::ScopedLock _lock(lock); + SchemaObjectClassImpl* impl = cls->impl; + + map::iterator iter = packages.find(impl->package); + if (iter == packages.end()) { + packages[impl->package] = ClassMaps(); + iter = packages.find(impl->package); + // TODO: Indicate this package if connected + } + + SchemaClassKey key(impl->name, impl->getHash()); + iter->second.objectClasses[key] = impl; + + // TODO: Indicate this schema if connected. +} + +void AgentEngineImpl::registerClass(SchemaEventClass* cls) +{ + Mutex::ScopedLock _lock(lock); + SchemaEventClassImpl* impl = cls->impl; + + map::iterator iter = packages.find(impl->package); + if (iter == packages.end()) { + packages[impl->package] = ClassMaps(); + iter = packages.find(impl->package); + // TODO: Indicate this package if connected + } + + SchemaClassKey key(impl->name, impl->getHash()); + iter->second.eventClasses[key] = impl; + + // TODO: Indicate this schema if connected. +} + +const ObjectId* AgentEngineImpl::addObject(Object&, uint64_t) +{ + Mutex::ScopedLock _lock(lock); + return 0; +} + +const ObjectId* AgentEngineImpl::allocObjectId(uint64_t persistId) +{ + Mutex::ScopedLock _lock(lock); + uint16_t sequence = persistId ? 0 : bootSequence; + uint64_t objectNum = persistId ? persistId : nextObjectId++; + + ObjectIdImpl* oid = new ObjectIdImpl(&attachment, 0, sequence, objectNum); + return oid->envelope; +} + +const ObjectId* AgentEngineImpl::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) +{ + return allocObjectId(((uint64_t) persistIdHi) << 32 | (uint64_t) persistIdLo); +} + +void AgentEngineImpl::raiseEvent(Event&) +{ + Mutex::ScopedLock _lock(lock); +} + +void AgentEngineImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq) +{ + buf.putOctet('A'); + buf.putOctet('M'); + buf.putOctet('3'); + buf.putOctet(opcode); + buf.putLong (seq); +} + +bool AgentEngineImpl::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq) +{ + if (buf.getSize() < 8) + return false; + + uint8_t h1 = buf.getOctet(); + uint8_t h2 = buf.getOctet(); + uint8_t h3 = buf.getOctet(); + + *opcode = buf.getOctet(); + *seq = buf.getLong(); + + return h1 == 'A' && h2 == 'M' && h3 == '3'; +} + +AgentEventImpl::Ptr AgentEngineImpl::eventDeclareQueue(const string& name) +{ + AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::DECLARE_QUEUE)); + event->name = name; + + return event; +} + +AgentEventImpl::Ptr AgentEngineImpl::eventBind(const string& exchange, const string& queue, + const string& key) +{ + AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::BIND)); + event->name = queue; + event->exchange = exchange; + event->bindingKey = key; + + return event; +} + +AgentEventImpl::Ptr AgentEngineImpl::eventSetupComplete() +{ + AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::SETUP_COMPLETE)); + return event; +} + +AgentEventImpl::Ptr AgentEngineImpl::eventQuery(uint32_t num, const string& userId, const string& package, + const string& cls, boost::shared_ptr oid) +{ + AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY)); + event->sequence = num; + event->authUserId = userId; + event->query.impl->packageName = package; + event->query.impl->className = cls; + event->query.impl->oid = oid; + return event; +} + +AgentEventImpl::Ptr AgentEngineImpl::eventMethod(uint32_t num, const string& userId, const string& method, + boost::shared_ptr oid, boost::shared_ptr argMap, + SchemaObjectClass* objectClass) +{ + AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::METHOD_CALL)); + event->sequence = num; + event->authUserId = userId; + event->name = method; + event->objectId = oid; + event->arguments = argMap; + event->objectClass = objectClass; + return event; +} + +void AgentEngineImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey) +{ + uint32_t length = buf.getPosition(); + MessageImpl::Ptr message(new MessageImpl); + + buf.reset(); + buf.getRawData(message->body, length); + message->destination = destination; + message->routingKey = routingKey; + message->replyExchange = "amq.direct"; + message->replyKey = queueName; + + xmtQueue.push_back(message); +} + +void AgentEngineImpl::sendPackageIndicationLH(const string& packageName) +{ + Buffer buffer(outputBuffer, MA_BUFFER_SIZE); + encodeHeader(buffer, 'p'); + buffer.putShortString(packageName); + sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); + QPID_LOG(trace, "SENT PackageIndication: package_name=" << packageName); +} + +void AgentEngineImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const SchemaClassKey& key) +{ + Buffer buffer(outputBuffer, MA_BUFFER_SIZE); + encodeHeader(buffer, 'q'); + buffer.putOctet((int) kind); + buffer.putShortString(packageName); + buffer.putShortString(key.name); + buffer.putBin128(const_cast(key.hash)); // const_cast needed for older Qpid libraries + sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY); + QPID_LOG(trace, "SENT ClassIndication: package_name=" << packageName << " class_name=" << key.name); +} + +void AgentEngineImpl::sendCommandCompleteLH(const string& exchange, const string& replyToKey, + uint32_t sequence, uint32_t code, const string& text) +{ + Buffer buffer(outputBuffer, MA_BUFFER_SIZE); + encodeHeader(buffer, 'z', sequence); + buffer.putLong(code); + buffer.putShortString(text); + sendBufferLH(buffer, exchange, replyToKey); + QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text); +} + +void AgentEngineImpl::sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text) +{ + Buffer buffer(outputBuffer, MA_BUFFER_SIZE); + encodeHeader(buffer, 'm', sequence); + buffer.putLong(code); + + string fulltext; + switch (code) { + case MERR_UNKNOWN_PACKAGE: fulltext = "Unknown Package"; break; + case MERR_UNKNOWN_CLASS: fulltext = "Unknown Class"; break; + case MERR_UNKNOWN_METHOD: fulltext = "Unknown Method"; break; + case MERR_INTERNAL_ERROR: fulltext = "Internal Error"; break; + default: fulltext = "Unspecified Error"; break; + } + + if (!text.empty()) { + fulltext += " ("; + fulltext += text; + fulltext += ")"; + } + + buffer.putMediumString(fulltext); + sendBufferLH(buffer, DIR_EXCHANGE, key); + QPID_LOG(trace, "SENT MethodResponse: errorCode=" << code << " text=" << fulltext); +} + +void AgentEngineImpl::handleAttachResponse(Buffer& inBuffer) +{ + Mutex::ScopedLock _lock(lock); + + assignedBrokerBank = inBuffer.getLong(); + assignedAgentBank = inBuffer.getLong(); + + QPID_LOG(trace, "RCVD AttachResponse: broker=" << assignedBrokerBank << " agent=" << assignedAgentBank); + + if ((assignedBrokerBank != requestedBrokerBank) || + (assignedAgentBank != requestedAgentBank)) { + if (requestedAgentBank == 0) { + QPID_LOG(notice, "Initial object-id bank assigned: " << assignedBrokerBank << "." << + assignedAgentBank); + } else { + QPID_LOG(warning, "Collision in object-id! New bank assigned: " << assignedBrokerBank << + "." << assignedAgentBank); + } + //storeData(); // TODO + requestedBrokerBank = assignedBrokerBank; + requestedAgentBank = assignedAgentBank; + } + + attachment.setBanks(assignedBrokerBank, assignedAgentBank); + + // Bind to qpid.management to receive commands + stringstream key; + key << "agent." << assignedBrokerBank << "." << assignedAgentBank; + eventQueue.push_back(eventBind(QMF_EXCHANGE, queueName, key.str())); + + // Send package indications for all local packages + for (map::iterator pIter = packages.begin(); + pIter != packages.end(); + pIter++) { + sendPackageIndicationLH(pIter->first); + + // Send class indications for all local classes + ClassMaps cMap = pIter->second; + for (ObjectClassMap::iterator cIter = cMap.objectClasses.begin(); + cIter != cMap.objectClasses.end(); cIter++) + sendClassIndicationLH(CLASS_OBJECT, pIter->first, cIter->first); + for (EventClassMap::iterator cIter = cMap.eventClasses.begin(); + cIter != cMap.eventClasses.end(); cIter++) + sendClassIndicationLH(CLASS_EVENT, pIter->first, cIter->first); + } +} + +void AgentEngineImpl::handlePackageRequest(Buffer&) +{ + Mutex::ScopedLock _lock(lock); +} + +void AgentEngineImpl::handleClassQuery(Buffer&) +{ + Mutex::ScopedLock _lock(lock); +} + +void AgentEngineImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, + const string& replyExchange, const string& replyKey) +{ + Mutex::ScopedLock _lock(lock); + string rExchange(replyExchange); + string rKey(replyKey); + string packageName; + inBuffer.getShortString(packageName); + SchemaClassKey key(inBuffer); + + if (rExchange.empty()) + rExchange = QMF_EXCHANGE; + if (rKey.empty()) + rKey = BROKER_KEY; + + QPID_LOG(trace, "RCVD SchemaRequest: package=" << packageName << " class=" << key.name); + + map::iterator pIter = packages.find(packageName); + if (pIter == packages.end()) { + sendCommandCompleteLH(rExchange, rKey, sequence, 1, "package not found"); + return; + } + + ClassMaps cMap = pIter->second; + ObjectClassMap::iterator ocIter = cMap.objectClasses.find(key); + if (ocIter != cMap.objectClasses.end()) { + SchemaObjectClassImpl* oImpl = ocIter->second; + Buffer buffer(outputBuffer, MA_BUFFER_SIZE); + encodeHeader(buffer, 's', sequence); + oImpl->encode(buffer); + sendBufferLH(buffer, rExchange, rKey); + QPID_LOG(trace, "SENT SchemaResponse: (object) package=" << packageName << " class=" << key.name); + return; + } + + EventClassMap::iterator ecIter = cMap.eventClasses.find(key); + if (ecIter != cMap.eventClasses.end()) { + SchemaEventClassImpl* eImpl = ecIter->second; + Buffer buffer(outputBuffer, MA_BUFFER_SIZE); + encodeHeader(buffer, 's', sequence); + eImpl->encode(buffer); + sendBufferLH(buffer, rExchange, rKey); + QPID_LOG(trace, "SENT SchemaResponse: (event) package=" << packageName << " class=" << key.name); + return; + } + + sendCommandCompleteLH(rExchange, rKey, sequence, 1, "class not found"); +} + +void AgentEngineImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId) +{ + Mutex::ScopedLock _lock(lock); + FieldTable ft; + FieldTable::ValuePtr value; + map::const_iterator pIter = packages.end(); + string pname; + string cname; + string oidRepr; + boost::shared_ptr oid; + + ft.decode(inBuffer); + + QPID_LOG(trace, "RCVD GetQuery: map=" << ft); + + value = ft.get("_package"); + if (value.get() && value->convertsTo()) { + pname = value->get(); + pIter = packages.find(pname); + if (pIter == packages.end()) { + sendCommandCompleteLH(DIR_EXCHANGE, replyTo, sequence); + return; + } + } + + value = ft.get("_class"); + if (value.get() && value->convertsTo()) { + cname = value->get(); + // TODO - check for validity of class (in package or any package) + if (pIter == packages.end()) { + } else { + + } + } + + value = ft.get("_objectid"); + if (value.get() && value->convertsTo()) { + oidRepr = value->get(); + oid.reset(new ObjectId()); + oid->impl->fromString(oidRepr); + } + + AgentQueryContext::Ptr context(new AgentQueryContext); + uint32_t contextNum = nextContextNum++; + context->sequence = sequence; + context->exchange = DIR_EXCHANGE; + context->key = replyTo; + contextMap[contextNum] = context; + + eventQueue.push_back(eventQuery(contextNum, userId, pname, cname, oid)); +} + +void AgentEngineImpl::handleMethodRequest(Buffer& buffer, uint32_t sequence, const string& replyTo, const string& userId) +{ + Mutex::ScopedLock _lock(lock); + string pname; + string method; + ObjectIdImpl* oidImpl = new ObjectIdImpl(buffer); + boost::shared_ptr oid(oidImpl->envelope); + buffer.getShortString(pname); + SchemaClassKey classKey(buffer); + buffer.getShortString(method); + + map::const_iterator pIter = packages.find(pname); + if (pIter == packages.end()) { + sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_PACKAGE, pname); + return; + } + + ObjectClassMap::const_iterator cIter = pIter->second.objectClasses.find(classKey); + if (cIter == pIter->second.objectClasses.end()) { + sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_CLASS, classKey.repr()); + return; + } + + const SchemaObjectClassImpl* schema = cIter->second; + vector::const_iterator mIter = schema->methods.begin(); + for (; mIter != schema->methods.end(); mIter++) { + if ((*mIter)->name == method) + break; + } + + if (mIter == schema->methods.end()) { + sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_METHOD, method); + return; + } + + SchemaMethodImpl* schemaMethod = *mIter; + boost::shared_ptr argMap(new Value(TYPE_MAP)); + ValueImpl* value; + for (vector::const_iterator aIter = schemaMethod->arguments.begin(); + aIter != schemaMethod->arguments.end(); aIter++) { + const SchemaArgumentImpl* schemaArg = *aIter; + if (schemaArg->dir == DIR_IN || schemaArg->dir == DIR_IN_OUT) + value = new ValueImpl(schemaArg->typecode, buffer); + else + value = new ValueImpl(schemaArg->typecode); + argMap->insert(schemaArg->name.c_str(), value->envelope); + } + + AgentQueryContext::Ptr context(new AgentQueryContext); + uint32_t contextNum = nextContextNum++; + context->sequence = sequence; + context->exchange = DIR_EXCHANGE; + context->key = replyTo; + context->schemaMethod = schemaMethod; + contextMap[contextNum] = context; + + eventQueue.push_back(eventMethod(contextNum, userId, method, oid, argMap, schema->envelope)); +} + +void AgentEngineImpl::handleConsoleAddedIndication() +{ + Mutex::ScopedLock _lock(lock); +} + +//================================================================== +// Wrappers +//================================================================== + +AgentEngine::AgentEngine(char* label, bool internalStore) +{ + impl = new AgentEngineImpl(label, internalStore); +} + +AgentEngine::~AgentEngine() +{ + delete impl; +} + +void AgentEngine::setStoreDir(const char* path) +{ + impl->setStoreDir(path); +} + +void AgentEngine::setTransferDir(const char* path) +{ + impl->setTransferDir(path); +} + +void AgentEngine::handleRcvMessage(Message& message) +{ + impl->handleRcvMessage(message); +} + +bool AgentEngine::getXmtMessage(Message& item) +{ + return impl->getXmtMessage(item); +} + +void AgentEngine::popXmt() +{ + impl->popXmt(); +} + +bool AgentEngine::getEvent(AgentEvent& event) +{ + return impl->getEvent(event); +} + +void AgentEngine::popEvent() +{ + impl->popEvent(); +} + +void AgentEngine::newSession() +{ + impl->newSession(); +} + +void AgentEngine::startProtocol() +{ + impl->startProtocol(); +} + +void AgentEngine::heartbeat() +{ + impl->heartbeat(); +} + +void AgentEngine::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments) +{ + impl->methodResponse(sequence, status, text, arguments); +} + +void AgentEngine::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) +{ + impl->queryResponse(sequence, object, prop, stat); +} + +void AgentEngine::queryComplete(uint32_t sequence) +{ + impl->queryComplete(sequence); +} + +void AgentEngine::registerClass(SchemaObjectClass* cls) +{ + impl->registerClass(cls); +} + +void AgentEngine::registerClass(SchemaEventClass* cls) +{ + impl->registerClass(cls); +} + +const ObjectId* AgentEngine::addObject(Object& obj, uint64_t persistId) +{ + return impl->addObject(obj, persistId); +} + +const ObjectId* AgentEngine::allocObjectId(uint64_t persistId) +{ + return impl->allocObjectId(persistId); +} + +const ObjectId* AgentEngine::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) +{ + return impl->allocObjectId(persistIdLo, persistIdHi); +} + +void AgentEngine::raiseEvent(Event& event) +{ + impl->raiseEvent(event); +} + diff --git a/cpp/src/qmf/AgentEngine.h b/cpp/src/qmf/AgentEngine.h new file mode 100644 index 0000000000..d18a104e96 --- /dev/null +++ b/cpp/src/qmf/AgentEngine.h @@ -0,0 +1,207 @@ +#ifndef _QmfAgentEngine_ +#define _QmfAgentEngine_ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include +#include +#include + +namespace qmf { + + /** + * AgentEvent + * + * This structure represents a QMF event coming from the agent to + * the application. + */ + struct AgentEvent { + enum EventKind { + GET_QUERY = 1, + START_SYNC = 2, + END_SYNC = 3, + METHOD_CALL = 4, + DECLARE_QUEUE = 5, + DELETE_QUEUE = 6, + BIND = 7, + UNBIND = 8, + SETUP_COMPLETE = 9 + }; + + EventKind kind; + uint32_t sequence; // Protocol sequence (for all kinds) + char* authUserId; // Authenticated user ID (for all kinds) + char* authToken; // Authentication token if issued (for all kinds) + char* name; // Name of the method/sync query + // (METHOD_CALL, START_SYNC, END_SYNC, DECLARE_QUEUE, BIND, UNBIND) + Object* object; // Object involved in method call (METHOD_CALL) + ObjectId* objectId; // ObjectId for method call (METHOD_CALL) + Query* query; // Query parameters (GET_QUERY, START_SYNC) + Value* arguments; // Method parameters (METHOD_CALL) + char* exchange; // Exchange for bind (BIND, UNBIND) + char* bindingKey; // Key for bind (BIND, UNBIND) + SchemaObjectClass* objectClass; // (METHOD_CALL) + }; + + class AgentEngineImpl; + + /** + * AgentEngine - Protocol engine for the QMF agent + */ + class AgentEngine { + public: + AgentEngine(char* label, bool internalStore=true); + ~AgentEngine(); + + /** + * Configure the directory path for storing persistent data. + *@param path Null-terminated string containing a directory path where files can be + * created, written, and read. If NULL, no persistent storage will be + * attempted. + */ + void setStoreDir(const char* path); + + /** + * Configure the directory path for files transferred over QMF. + *@param path Null-terminated string containing a directory path where files can be + * created, deleted, written, and read. If NULL, file transfers shall not + * be permitted. + */ + void setTransferDir(const char* path); + + /** + * Pass messages received from the AMQP session to the Agent engine. + *@param message AMQP messages received on the agent session. + */ + void handleRcvMessage(Message& message); + + /** + * Get the next message to be sent to the AMQP network. + *@param item The Message structure describing the message to be produced. + *@return true if the Message is valid, false if there are no messages to send. + */ + bool getXmtMessage(Message& item); + + /** + * Remove and discard one message from the head of the transmit queue. + */ + void popXmt(); + + /** + * Get the next application event from the agent engine. + *@param event The event iff the return value is true + *@return true if event is valid, false if there are no events to process + */ + bool getEvent(AgentEvent& event); + + /** + * Remove and discard one event from the head of the event queue. + */ + void popEvent(); + + /** + * A new AMQP session has been established for Agent communication. + */ + void newSession(); + + /** + * Start the QMF Agent protocol. This should be invoked after a SETUP_COMPLETE event + * is received from the Agent engine. + */ + void startProtocol(); + + /** + * This method is called periodically so the agent can supply a heartbeat. + */ + void heartbeat(); + + /** + * Respond to a method request. + *@param sequence The sequence number from the method request event. + *@param status The method's completion status. + *@param text Status text ("OK" or an error message) + *@param arguments The list of output arguments from the method call. + */ + void methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments); + + /** + * Send a content indication to the QMF bus. This is only needed for objects that are + * managed by the application. This is *NOT* needed for objects managed by the Agent + * (inserted using addObject). + *@param sequence The sequence number of the GET request or the SYNC_START request. + *@param object The object (annotated with "changed" flags) for publication. + *@param prop If true, changed object properties are transmitted. + *@param stat If true, changed object statistics are transmitted. + */ + void queryResponse(uint32_t sequence, Object& object, bool prop = true, bool stat = true); + + /** + * Indicate the completion of a query. This is not used for SYNC_START requests. + *@param sequence The sequence number of the GET request. + */ + void queryComplete(uint32_t sequence); + + /** + * Register a schema class with the Agent. + *@param cls A SchemaObejctClass object that defines data managed by the agent. + */ + void registerClass(SchemaObjectClass* cls); + + /** + * Register a schema class with the Agent. + *@param cls A SchemaEventClass object that defines events sent by the agent. + */ + void registerClass(SchemaEventClass* cls); + + /** + * Give an object to the Agent for storage and management. Once added, the agent takes + * responsibility for the life cycle of the object. + *@param obj The object to be managed by the Agent. + *@param persistId A unique non-zero value if the object-id is to be persistent. + *@return The objectId of the managed object. + */ + const ObjectId* addObject(Object& obj, uint64_t persistId); + const ObjectId* addObject(Object& obj, uint32_t persistIdLo, uint32_t persistIdHi); + + /** + * Allocate an object-id for an object that will be managed by the application. + *@param persistId A unique non-zero value if the object-id is to be persistent. + *@return The objectId structure for the allocated ID. + */ + const ObjectId* allocObjectId(uint64_t persistId); + const ObjectId* allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi); + + /** + * Raise an event into the QMF network.. + *@param event The event object for the event to be raised. + */ + void raiseEvent(Event& event); + + private: + AgentEngineImpl* impl; + }; +} + +#endif + diff --git a/cpp/src/qmf/Console.h b/cpp/src/qmf/Console.h deleted file mode 100644 index de7949e1de..0000000000 --- a/cpp/src/qmf/Console.h +++ /dev/null @@ -1,82 +0,0 @@ -#ifndef _QmfConsole_ -#define _QmfConsole_ - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -namespace qmf { - - struct ConsoleSettings { - bool rcvObjects; - bool rcvEvents; - bool rcvHeartbeats; - bool userBindings; - uint32_t methodTimeout; - uint32_t getTimeout; - - ConsoleSettings() : - rcvObjects(true), - rcvEvents(true), - rcvHeartbeats(true), - userBindings(false), - methodTimeout(20), - getTimeout(20) {} - }; - - class Console { - public: - Console(ConsoleHandler* handler = 0, ConsoleSettings settings = ConsoleSettings()); - ~Console(); - - Broker* addConnection(ManagedConnection& connection); - void delConnection(Broker* broker); - void delConnection(ManagedConnection& connection); - - const PackageMap& getPackages() const; - - void bindPackage(const Package& package); - void bindPackage(const std::string& packageName); - void bindClass(const SchemaClass& otype); - void bindClass(const std::string& packageName, const std::string& className); - - void getAgents(std::set& agents, Broker* = 0); - void getObjects(std::vector& objects, const std::string& typeName, - const std::string& packageName = "", - Broker* broker = 0, - Agent* agent = 0); - void getObjects(std::vector& objects, - const std::map& query, - Broker* broker = 0, - Agent* agent = 0); - }; -} - -#endif - diff --git a/cpp/src/qmf/ConsoleEngine.h b/cpp/src/qmf/ConsoleEngine.h new file mode 100644 index 0000000000..823e281b14 --- /dev/null +++ b/cpp/src/qmf/ConsoleEngine.h @@ -0,0 +1,83 @@ +#ifndef _QmfConsoleEngine_ +#define _QmfConsoleEngine_ + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace qmf { + + struct ConsoleSettings { + bool rcvObjects; + bool rcvEvents; + bool rcvHeartbeats; + bool userBindings; + uint32_t methodTimeout; + uint32_t getTimeout; + + ConsoleSettings() : + rcvObjects(true), + rcvEvents(true), + rcvHeartbeats(true), + userBindings(false), + methodTimeout(20), + getTimeout(20) {} + }; + + class ConsoleEngine { + public: + ConsoleEngine(ConsoleHandler* handler = 0, ConsoleSettings settings = ConsoleSettings()); + ~ConsoleEngine(); + + Broker* addConnection(ManagedConnection& connection); + void delConnection(Broker* broker); + void delConnection(ManagedConnection& connection); + + const PackageMap& getPackages() const; + + void bindPackage(const Package& package); + void bindPackage(const std::string& packageName); + void bindClass(const SchemaClass& otype); + void bindClass(const std::string& packageName, const std::string& className); + + /* + void getAgents(std::set& agents, Broker* = 0); + void getObjects(std::vector& objects, const std::string& typeName, + const std::string& packageName = "", + Broker* broker = 0, + Agent* agent = 0); + void getObjects(std::vector& objects, + const std::map& query, + Broker* broker = 0, + Agent* agent = 0); + */ + }; +} + +#endif + diff --git a/cpp/src/qmf/Object.h b/cpp/src/qmf/Object.h index 8caab8d6dc..eb92cbbe45 100644 --- a/cpp/src/qmf/Object.h +++ b/cpp/src/qmf/Object.h @@ -31,7 +31,7 @@ namespace qmf { public: Object(const SchemaObjectClass* type); Object(ObjectImpl* impl); - ~Object(); + virtual ~Object(); void destroy(); const ObjectId* getObjectId() const; -- cgit v1.2.1