summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qmf/Agent.cpp
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2009-08-19 18:31:31 +0000
committerTed Ross <tross@apache.org>2009-08-19 18:31:31 +0000
commit7753a6839ac0abb282e2d4372f7a4199243cf4ff (patch)
tree4b8ff6cdf883dc24f84423dd42b5f25d169013c7 /qpid/cpp/src/qmf/Agent.cpp
parentaaae47e89aa754481027e0c7ed6030218f01ec51 (diff)
downloadqpid-python-7753a6839ac0abb282e2d4372f7a4199243cf4ff.tar.gz
Introduce the public includes for the QMF interfaces.
Rename Agent to AgentEngine to differentiate the API from the underlying engine. Note that some of these public headers will overlap with the emerging "messaging" API (notably Connection.h and ConnectionSettings.h). It is desirable that these components of the API become common between "messaging" and "qmf". As such, once the differences are reconciled, they will most likely be removed from the qmf space and placed in the messaging space. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@805916 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src/qmf/Agent.cpp')
-rw-r--r--qpid/cpp/src/qmf/Agent.cpp958
1 files changed, 0 insertions, 958 deletions
diff --git a/qpid/cpp/src/qmf/Agent.cpp b/qpid/cpp/src/qmf/Agent.cpp
deleted file mode 100644
index 6d59ae2750..0000000000
--- a/qpid/cpp/src/qmf/Agent.cpp
+++ /dev/null
@@ -1,958 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/Agent.h"
-#include "qmf/MessageImpl.h"
-#include "qmf/SchemaImpl.h"
-#include "qmf/Typecode.h"
-#include "qmf/ObjectImpl.h"
-#include "qmf/ObjectIdImpl.h"
-#include "qmf/QueryImpl.h"
-#include "qmf/ValueImpl.h"
-#include <qpid/framing/Buffer.h>
-#include <qpid/framing/Uuid.h>
-#include <qpid/framing/FieldTable.h>
-#include <qpid/framing/FieldValue.h>
-#include <qpid/sys/Mutex.h>
-#include <qpid/log/Statement.h>
-#include <qpid/sys/Time.h>
-#include <string.h>
-#include <string>
-#include <deque>
-#include <map>
-#include <iostream>
-#include <fstream>
-#include <boost/shared_ptr.hpp>
-
-using namespace std;
-using namespace qmf;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-namespace qmf {
-
- struct AgentEventImpl {
- typedef boost::shared_ptr<AgentEventImpl> Ptr;
- AgentEvent::EventKind kind;
- uint32_t sequence;
- string authUserId;
- string authToken;
- string name;
- Object* object;
- boost::shared_ptr<ObjectId> objectId;
- Query query;
- boost::shared_ptr<Value> arguments;
- string exchange;
- string bindingKey;
- SchemaObjectClass* objectClass;
-
- AgentEventImpl(AgentEvent::EventKind k) :
- kind(k), sequence(0), object(0), objectClass(0) {}
- ~AgentEventImpl() {}
- AgentEvent copy();
- };
-
- struct AgentQueryContext {
- typedef boost::shared_ptr<AgentQueryContext> Ptr;
- uint32_t sequence;
- string exchange;
- string key;
- SchemaMethodImpl* schemaMethod;
- AgentQueryContext() : schemaMethod(0) {}
- };
-
- class AgentImpl {
- public:
- AgentImpl(char* label, bool internalStore);
- ~AgentImpl();
-
- void setStoreDir(char* path);
- void setTransferDir(char* path);
- void handleRcvMessage(Message& message);
- bool getXmtMessage(Message& item);
- void popXmt();
- bool getEvent(AgentEvent& event);
- void popEvent();
- void newSession();
- void startProtocol();
- void heartbeat();
- void methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments);
- void queryResponse(uint32_t sequence, Object& object, bool prop, bool stat);
- void queryComplete(uint32_t sequence);
- void registerClass(SchemaObjectClass* cls);
- void registerClass(SchemaEventClass* cls);
- const ObjectId* addObject(Object& obj, uint64_t persistId);
- const ObjectId* allocObjectId(uint64_t persistId);
- const ObjectId* allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi);
- void raiseEvent(Event& event);
-
- private:
- Mutex lock;
- Mutex addLock;
- string label;
- string queueName;
- string storeDir;
- string transferDir;
- bool internalStore;
- uint64_t nextTransientId;
- Uuid systemId;
- uint32_t requestedBrokerBank;
- uint32_t requestedAgentBank;
- uint32_t assignedBrokerBank;
- uint32_t assignedAgentBank;
- AgentAttachment attachment;
- uint16_t bootSequence;
- uint64_t nextObjectId;
- uint32_t nextContextNum;
- deque<AgentEventImpl::Ptr> eventQueue;
- deque<MessageImpl::Ptr> xmtQueue;
- map<uint32_t, AgentQueryContext::Ptr> contextMap;
-
- static const char* QMF_EXCHANGE;
- static const char* DIR_EXCHANGE;
- static const char* BROKER_KEY;
- static const uint32_t MERR_UNKNOWN_METHOD = 2;
- static const uint32_t MERR_UNKNOWN_PACKAGE = 8;
- static const uint32_t MERR_UNKNOWN_CLASS = 9;
- static const uint32_t MERR_INTERNAL_ERROR = 10;
-# define MA_BUFFER_SIZE 65536
- char outputBuffer[MA_BUFFER_SIZE];
-
- struct SchemaClassKey {
- string name;
- uint8_t hash[16];
- SchemaClassKey(const string& n, const uint8_t* h) : name(n) {
- memcpy(hash, h, 16);
- }
- SchemaClassKey(Buffer& buffer) {
- buffer.getShortString(name);
- buffer.getBin128(hash);
- }
- string repr() {
- return name;
- }
- };
-
- struct SchemaClassKeyComp {
- bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
- {
- if (lhs.name != rhs.name)
- return lhs.name < rhs.name;
- else
- for (int i = 0; i < 16; i++)
- if (lhs.hash[i] != rhs.hash[i])
- return lhs.hash[i] < rhs.hash[i];
- return false;
- }
- };
-
- typedef map<SchemaClassKey, SchemaObjectClassImpl*, SchemaClassKeyComp> ObjectClassMap;
- typedef map<SchemaClassKey, SchemaEventClassImpl*, SchemaClassKeyComp> EventClassMap;
-
- struct ClassMaps {
- ObjectClassMap objectClasses;
- EventClassMap eventClasses;
- };
-
- map<string, ClassMaps> packages;
-
- bool checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq);
- void encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq = 0);
- AgentEventImpl::Ptr eventDeclareQueue(const string& queueName);
- AgentEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key);
- AgentEventImpl::Ptr eventSetupComplete();
- AgentEventImpl::Ptr eventQuery(uint32_t num, const string& userId, const string& package, const string& cls,
- boost::shared_ptr<ObjectId> oid);
- AgentEventImpl::Ptr eventMethod(uint32_t num, const string& userId, const string& method,
- boost::shared_ptr<ObjectId> oid, boost::shared_ptr<Value> argMap,
- SchemaObjectClass* objectClass);
- void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey);
-
- void sendPackageIndicationLH(const string& packageName);
- void sendClassIndicationLH(ClassKind kind, const string& packageName, const SchemaClassKey& key);
- void sendCommandCompleteLH(const string& exchange, const string& key, uint32_t seq,
- uint32_t code = 0, const string& text = "OK");
- void sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text="");
- void handleAttachResponse(Buffer& inBuffer);
- void handlePackageRequest(Buffer& inBuffer);
- void handleClassQuery(Buffer& inBuffer);
- void handleSchemaRequest(Buffer& inBuffer, uint32_t sequence,
- const string& replyToExchange, const string& replyToKey);
- void handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId);
- void handleMethodRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId);
- void handleConsoleAddedIndication();
- };
-}
-
-const char* AgentImpl::QMF_EXCHANGE = "qpid.management";
-const char* AgentImpl::DIR_EXCHANGE = "amq.direct";
-const char* AgentImpl::BROKER_KEY = "broker";
-
-#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
-
-AgentEvent AgentEventImpl::copy()
-{
- AgentEvent item;
-
- ::memset(&item, 0, sizeof(AgentEvent));
- item.kind = kind;
- item.sequence = sequence;
- item.object = object;
- item.objectId = objectId.get();
- item.query = &query;
- item.arguments = arguments.get();
- item.objectClass = objectClass;
-
- STRING_REF(authUserId);
- STRING_REF(authToken);
- STRING_REF(name);
- STRING_REF(exchange);
- STRING_REF(bindingKey);
-
- return item;
-}
-
-AgentImpl::AgentImpl(char* _label, bool i) :
- label(_label), queueName("qmfa-"), internalStore(i), nextTransientId(1),
- requestedBrokerBank(0), requestedAgentBank(0),
- assignedBrokerBank(0), assignedAgentBank(0),
- bootSequence(1), nextObjectId(1), nextContextNum(1)
-{
- queueName += label;
-}
-
-AgentImpl::~AgentImpl()
-{
-}
-
-void AgentImpl::setStoreDir(char* path)
-{
- Mutex::ScopedLock _lock(lock);
- if (path)
- storeDir = path;
- else
- storeDir.clear();
-}
-
-void AgentImpl::setTransferDir(char* path)
-{
- Mutex::ScopedLock _lock(lock);
- if (path)
- transferDir = path;
- else
- transferDir.clear();
-}
-
-void AgentImpl::handleRcvMessage(Message& message)
-{
- Buffer inBuffer(message.body, message.length);
- uint8_t opcode;
- uint32_t sequence;
- string replyToExchange(message.replyExchange ? message.replyExchange : "");
- string replyToKey(message.replyKey ? message.replyKey : "");
- string userId(message.userId ? message.userId : "");
-
- if (checkHeader(inBuffer, &opcode, &sequence)) {
- if (opcode == 'a') handleAttachResponse(inBuffer);
- else if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey);
- else if (opcode == 'x') handleConsoleAddedIndication();
- else if (opcode == 'G') handleGetQuery(inBuffer, sequence, replyToKey, userId);
- else if (opcode == 'M') handleMethodRequest(inBuffer, sequence, replyToKey, userId);
- }
-}
-
-bool AgentImpl::getXmtMessage(Message& item)
-{
- Mutex::ScopedLock _lock(lock);
- if (xmtQueue.empty())
- return false;
- item = xmtQueue.front()->copy();
- return true;
-}
-
-void AgentImpl::popXmt()
-{
- Mutex::ScopedLock _lock(lock);
- if (!xmtQueue.empty())
- xmtQueue.pop_front();
-}
-
-bool AgentImpl::getEvent(AgentEvent& event)
-{
- Mutex::ScopedLock _lock(lock);
- if (eventQueue.empty())
- return false;
- event = eventQueue.front()->copy();
- return true;
-}
-
-void AgentImpl::popEvent()
-{
- Mutex::ScopedLock _lock(lock);
- if (!eventQueue.empty())
- eventQueue.pop_front();
-}
-
-void AgentImpl::newSession()
-{
- Mutex::ScopedLock _lock(lock);
- eventQueue.clear();
- xmtQueue.clear();
- eventQueue.push_back(eventDeclareQueue(queueName));
- eventQueue.push_back(eventBind("amq.direct", queueName, queueName));
- eventQueue.push_back(eventSetupComplete());
-}
-
-void AgentImpl::startProtocol()
-{
- Mutex::ScopedLock _lock(lock);
- char rawbuffer[512];
- Buffer buffer(rawbuffer, 512);
-
- encodeHeader(buffer, 'A');
- buffer.putShortString("qmfa");
- systemId.encode(buffer);
- buffer.putLong(requestedBrokerBank);
- buffer.putLong(requestedAgentBank);
- sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT AttachRequest: reqBroker=" << requestedBrokerBank <<
- " reqAgent=" << requestedAgentBank);
-}
-
-void AgentImpl::heartbeat()
-{
- Mutex::ScopedLock _lock(lock);
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
-
- encodeHeader(buffer, 'h');
- buffer.putLongLong(uint64_t(Duration(now())));
- stringstream key;
- key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank;
- sendBufferLH(buffer, QMF_EXCHANGE, key.str());
- QPID_LOG(trace, "SENT HeartbeatIndication");
-}
-
-void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text,
- const Value& argMap)
-{
- Mutex::ScopedLock _lock(lock);
- map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
- if (iter == contextMap.end())
- return;
- AgentQueryContext::Ptr context = iter->second;
- contextMap.erase(iter);
-
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 'm', context->sequence);
- buffer.putLong(status);
- buffer.putMediumString(text);
- if (status == 0) {
- for (vector<SchemaArgumentImpl*>::const_iterator aIter = context->schemaMethod->arguments.begin();
- aIter != context->schemaMethod->arguments.end(); aIter++) {
- const SchemaArgumentImpl* schemaArg = *aIter;
- if (schemaArg->dir == DIR_OUT || schemaArg->dir == DIR_IN_OUT) {
- if (argMap.keyInMap(schemaArg->name.c_str())) {
- const Value* val = argMap.byKey(schemaArg->name.c_str());
- val->impl->encode(buffer);
- } else {
- Value val(schemaArg->typecode);
- val.impl->encode(buffer);
- }
- }
- }
- }
- sendBufferLH(buffer, context->exchange, context->key);
- QPID_LOG(trace, "SENT MethodResponse");
-}
-
-void AgentImpl::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat)
-{
- Mutex::ScopedLock _lock(lock);
- map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
- if (iter == contextMap.end())
- return;
- AgentQueryContext::Ptr context = iter->second;
-
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 'g', context->sequence);
-
- object.impl->encodeSchemaKey(buffer);
- object.impl->encodeManagedObjectData(buffer);
- if (prop)
- object.impl->encodeProperties(buffer);
- if (stat)
- object.impl->encodeStatistics(buffer);
-
- sendBufferLH(buffer, context->exchange, context->key);
- QPID_LOG(trace, "SENT ContentIndication");
-}
-
-void AgentImpl::queryComplete(uint32_t sequence)
-{
- Mutex::ScopedLock _lock(lock);
- map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
- if (iter == contextMap.end())
- return;
-
- AgentQueryContext::Ptr context = iter->second;
- contextMap.erase(iter);
- sendCommandCompleteLH(context->exchange, context->key, context->sequence, 0, "OK");
-}
-
-void AgentImpl::registerClass(SchemaObjectClass* cls)
-{
- Mutex::ScopedLock _lock(lock);
- SchemaObjectClassImpl* impl = cls->impl;
-
- map<string, ClassMaps>::iterator iter = packages.find(impl->package);
- if (iter == packages.end()) {
- packages[impl->package] = ClassMaps();
- iter = packages.find(impl->package);
- // TODO: Indicate this package if connected
- }
-
- SchemaClassKey key(impl->name, impl->getHash());
- iter->second.objectClasses[key] = impl;
-
- // TODO: Indicate this schema if connected.
-}
-
-void AgentImpl::registerClass(SchemaEventClass* cls)
-{
- Mutex::ScopedLock _lock(lock);
- SchemaEventClassImpl* impl = cls->impl;
-
- map<string, ClassMaps>::iterator iter = packages.find(impl->package);
- if (iter == packages.end()) {
- packages[impl->package] = ClassMaps();
- iter = packages.find(impl->package);
- // TODO: Indicate this package if connected
- }
-
- SchemaClassKey key(impl->name, impl->getHash());
- iter->second.eventClasses[key] = impl;
-
- // TODO: Indicate this schema if connected.
-}
-
-const ObjectId* AgentImpl::addObject(Object&, uint64_t)
-{
- Mutex::ScopedLock _lock(lock);
- return 0;
-}
-
-const ObjectId* AgentImpl::allocObjectId(uint64_t persistId)
-{
- Mutex::ScopedLock _lock(lock);
- uint16_t sequence = persistId ? 0 : bootSequence;
- uint64_t objectNum = persistId ? persistId : nextObjectId++;
-
- ObjectIdImpl* oid = new ObjectIdImpl(&attachment, 0, sequence, objectNum);
- return oid->envelope;
-}
-
-const ObjectId* AgentImpl::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi)
-{
- return allocObjectId(((uint64_t) persistIdHi) << 32 | (uint64_t) persistIdLo);
-}
-
-void AgentImpl::raiseEvent(Event&)
-{
- Mutex::ScopedLock _lock(lock);
-}
-
-void AgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq)
-{
- buf.putOctet('A');
- buf.putOctet('M');
- buf.putOctet('3');
- buf.putOctet(opcode);
- buf.putLong (seq);
-}
-
-bool AgentImpl::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq)
-{
- if (buf.getSize() < 8)
- return false;
-
- uint8_t h1 = buf.getOctet();
- uint8_t h2 = buf.getOctet();
- uint8_t h3 = buf.getOctet();
-
- *opcode = buf.getOctet();
- *seq = buf.getLong();
-
- return h1 == 'A' && h2 == 'M' && h3 == '3';
-}
-
-AgentEventImpl::Ptr AgentImpl::eventDeclareQueue(const string& name)
-{
- AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::DECLARE_QUEUE));
- event->name = name;
-
- return event;
-}
-
-AgentEventImpl::Ptr AgentImpl::eventBind(const string& exchange, const string& queue,
- const string& key)
-{
- AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::BIND));
- event->name = queue;
- event->exchange = exchange;
- event->bindingKey = key;
-
- return event;
-}
-
-AgentEventImpl::Ptr AgentImpl::eventSetupComplete()
-{
- AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::SETUP_COMPLETE));
- return event;
-}
-
-AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string& package,
- const string& cls, boost::shared_ptr<ObjectId> oid)
-{
- AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY));
- event->sequence = num;
- event->authUserId = userId;
- event->query.impl->packageName = package;
- event->query.impl->className = cls;
- event->query.impl->oid = oid;
- return event;
-}
-
-AgentEventImpl::Ptr AgentImpl::eventMethod(uint32_t num, const string& userId, const string& method,
- boost::shared_ptr<ObjectId> oid, boost::shared_ptr<Value> argMap,
- SchemaObjectClass* objectClass)
-{
- AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::METHOD_CALL));
- event->sequence = num;
- event->authUserId = userId;
- event->name = method;
- event->objectId = oid;
- event->arguments = argMap;
- event->objectClass = objectClass;
- return event;
-}
-
-void AgentImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
-{
- uint32_t length = buf.getPosition();
- MessageImpl::Ptr message(new MessageImpl);
-
- buf.reset();
- buf.getRawData(message->body, length);
- message->destination = destination;
- message->routingKey = routingKey;
- message->replyExchange = "amq.direct";
- message->replyKey = queueName;
-
- xmtQueue.push_back(message);
-}
-
-void AgentImpl::sendPackageIndicationLH(const string& packageName)
-{
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 'p');
- buffer.putShortString(packageName);
- sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT PackageIndication: package_name=" << packageName);
-}
-
-void AgentImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const SchemaClassKey& key)
-{
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 'q');
- buffer.putOctet((int) kind);
- buffer.putShortString(packageName);
- buffer.putShortString(key.name);
- buffer.putBin128(const_cast<uint8_t*>(key.hash)); // const_cast needed for older Qpid libraries
- sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT ClassIndication: package_name=" << packageName << " class_name=" << key.name);
-}
-
-void AgentImpl::sendCommandCompleteLH(const string& exchange, const string& replyToKey,
- uint32_t sequence, uint32_t code, const string& text)
-{
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 'z', sequence);
- buffer.putLong(code);
- buffer.putShortString(text);
- sendBufferLH(buffer, exchange, replyToKey);
- QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text);
-}
-
-void AgentImpl::sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text)
-{
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 'm', sequence);
- buffer.putLong(code);
-
- string fulltext;
- switch (code) {
- case MERR_UNKNOWN_PACKAGE: fulltext = "Unknown Package"; break;
- case MERR_UNKNOWN_CLASS: fulltext = "Unknown Class"; break;
- case MERR_UNKNOWN_METHOD: fulltext = "Unknown Method"; break;
- case MERR_INTERNAL_ERROR: fulltext = "Internal Error"; break;
- default: fulltext = "Unspecified Error"; break;
- }
-
- if (!text.empty()) {
- fulltext += " (";
- fulltext += text;
- fulltext += ")";
- }
-
- buffer.putMediumString(fulltext);
- sendBufferLH(buffer, DIR_EXCHANGE, key);
- QPID_LOG(trace, "SENT MethodResponse: errorCode=" << code << " text=" << fulltext);
-}
-
-void AgentImpl::handleAttachResponse(Buffer& inBuffer)
-{
- Mutex::ScopedLock _lock(lock);
-
- assignedBrokerBank = inBuffer.getLong();
- assignedAgentBank = inBuffer.getLong();
-
- QPID_LOG(trace, "RCVD AttachResponse: broker=" << assignedBrokerBank << " agent=" << assignedAgentBank);
-
- if ((assignedBrokerBank != requestedBrokerBank) ||
- (assignedAgentBank != requestedAgentBank)) {
- if (requestedAgentBank == 0) {
- QPID_LOG(notice, "Initial object-id bank assigned: " << assignedBrokerBank << "." <<
- assignedAgentBank);
- } else {
- QPID_LOG(warning, "Collision in object-id! New bank assigned: " << assignedBrokerBank <<
- "." << assignedAgentBank);
- }
- //storeData(); // TODO
- requestedBrokerBank = assignedBrokerBank;
- requestedAgentBank = assignedAgentBank;
- }
-
- attachment.setBanks(assignedBrokerBank, assignedAgentBank);
-
- // Bind to qpid.management to receive commands
- stringstream key;
- key << "agent." << assignedBrokerBank << "." << assignedAgentBank;
- eventQueue.push_back(eventBind(QMF_EXCHANGE, queueName, key.str()));
-
- // Send package indications for all local packages
- for (map<string, ClassMaps>::iterator pIter = packages.begin();
- pIter != packages.end();
- pIter++) {
- sendPackageIndicationLH(pIter->first);
-
- // Send class indications for all local classes
- ClassMaps cMap = pIter->second;
- for (ObjectClassMap::iterator cIter = cMap.objectClasses.begin();
- cIter != cMap.objectClasses.end(); cIter++)
- sendClassIndicationLH(CLASS_OBJECT, pIter->first, cIter->first);
- for (EventClassMap::iterator cIter = cMap.eventClasses.begin();
- cIter != cMap.eventClasses.end(); cIter++)
- sendClassIndicationLH(CLASS_EVENT, pIter->first, cIter->first);
- }
-}
-
-void AgentImpl::handlePackageRequest(Buffer&)
-{
- Mutex::ScopedLock _lock(lock);
-}
-
-void AgentImpl::handleClassQuery(Buffer&)
-{
- Mutex::ScopedLock _lock(lock);
-}
-
-void AgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence,
- const string& replyExchange, const string& replyKey)
-{
- Mutex::ScopedLock _lock(lock);
- string rExchange(replyExchange);
- string rKey(replyKey);
- string packageName;
- inBuffer.getShortString(packageName);
- SchemaClassKey key(inBuffer);
-
- if (rExchange.empty())
- rExchange = QMF_EXCHANGE;
- if (rKey.empty())
- rKey = BROKER_KEY;
-
- QPID_LOG(trace, "RCVD SchemaRequest: package=" << packageName << " class=" << key.name);
-
- map<string, ClassMaps>::iterator pIter = packages.find(packageName);
- if (pIter == packages.end()) {
- sendCommandCompleteLH(rExchange, rKey, sequence, 1, "package not found");
- return;
- }
-
- ClassMaps cMap = pIter->second;
- ObjectClassMap::iterator ocIter = cMap.objectClasses.find(key);
- if (ocIter != cMap.objectClasses.end()) {
- SchemaObjectClassImpl* oImpl = ocIter->second;
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 's', sequence);
- oImpl->encode(buffer);
- sendBufferLH(buffer, rExchange, rKey);
- QPID_LOG(trace, "SENT SchemaResponse: (object) package=" << packageName << " class=" << key.name);
- return;
- }
-
- EventClassMap::iterator ecIter = cMap.eventClasses.find(key);
- if (ecIter != cMap.eventClasses.end()) {
- SchemaEventClassImpl* eImpl = ecIter->second;
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- encodeHeader(buffer, 's', sequence);
- eImpl->encode(buffer);
- sendBufferLH(buffer, rExchange, rKey);
- QPID_LOG(trace, "SENT SchemaResponse: (event) package=" << packageName << " class=" << key.name);
- return;
- }
-
- sendCommandCompleteLH(rExchange, rKey, sequence, 1, "class not found");
-}
-
-void AgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId)
-{
- Mutex::ScopedLock _lock(lock);
- FieldTable ft;
- FieldTable::ValuePtr value;
- map<string, ClassMaps>::const_iterator pIter = packages.end();
- string pname;
- string cname;
- string oidRepr;
- boost::shared_ptr<ObjectId> oid;
-
- ft.decode(inBuffer);
-
- QPID_LOG(trace, "RCVD GetQuery: map=" << ft);
-
- value = ft.get("_package");
- if (value.get() && value->convertsTo<string>()) {
- pname = value->get<string>();
- pIter = packages.find(pname);
- if (pIter == packages.end()) {
- sendCommandCompleteLH(DIR_EXCHANGE, replyTo, sequence);
- return;
- }
- }
-
- value = ft.get("_class");
- if (value.get() && value->convertsTo<string>()) {
- cname = value->get<string>();
- // TODO - check for validity of class (in package or any package)
- if (pIter == packages.end()) {
- } else {
-
- }
- }
-
- value = ft.get("_objectid");
- if (value.get() && value->convertsTo<string>()) {
- oidRepr = value->get<string>();
- oid.reset(new ObjectId());
- oid->impl->fromString(oidRepr);
- }
-
- AgentQueryContext::Ptr context(new AgentQueryContext);
- uint32_t contextNum = nextContextNum++;
- context->sequence = sequence;
- context->exchange = DIR_EXCHANGE;
- context->key = replyTo;
- contextMap[contextNum] = context;
-
- eventQueue.push_back(eventQuery(contextNum, userId, pname, cname, oid));
-}
-
-void AgentImpl::handleMethodRequest(Buffer& buffer, uint32_t sequence, const string& replyTo, const string& userId)
-{
- Mutex::ScopedLock _lock(lock);
- string pname;
- string method;
- ObjectIdImpl* oidImpl = new ObjectIdImpl(buffer);
- boost::shared_ptr<ObjectId> oid(oidImpl->envelope);
- buffer.getShortString(pname);
- SchemaClassKey classKey(buffer);
- buffer.getShortString(method);
-
- map<string, ClassMaps>::const_iterator pIter = packages.find(pname);
- if (pIter == packages.end()) {
- sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_PACKAGE, pname);
- return;
- }
-
- ObjectClassMap::const_iterator cIter = pIter->second.objectClasses.find(classKey);
- if (cIter == pIter->second.objectClasses.end()) {
- sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_CLASS, classKey.repr());
- return;
- }
-
- const SchemaObjectClassImpl* schema = cIter->second;
- vector<SchemaMethodImpl*>::const_iterator mIter = schema->methods.begin();
- for (; mIter != schema->methods.end(); mIter++) {
- if ((*mIter)->name == method)
- break;
- }
-
- if (mIter == schema->methods.end()) {
- sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_METHOD, method);
- return;
- }
-
- SchemaMethodImpl* schemaMethod = *mIter;
- boost::shared_ptr<Value> argMap(new Value(TYPE_MAP));
- ValueImpl* value;
- for (vector<SchemaArgumentImpl*>::const_iterator aIter = schemaMethod->arguments.begin();
- aIter != schemaMethod->arguments.end(); aIter++) {
- const SchemaArgumentImpl* schemaArg = *aIter;
- if (schemaArg->dir == DIR_IN || schemaArg->dir == DIR_IN_OUT)
- value = new ValueImpl(schemaArg->typecode, buffer);
- else
- value = new ValueImpl(schemaArg->typecode);
- argMap->insert(schemaArg->name.c_str(), value->envelope);
- }
-
- AgentQueryContext::Ptr context(new AgentQueryContext);
- uint32_t contextNum = nextContextNum++;
- context->sequence = sequence;
- context->exchange = DIR_EXCHANGE;
- context->key = replyTo;
- context->schemaMethod = schemaMethod;
- contextMap[contextNum] = context;
-
- eventQueue.push_back(eventMethod(contextNum, userId, method, oid, argMap, schema->envelope));
-}
-
-void AgentImpl::handleConsoleAddedIndication()
-{
- Mutex::ScopedLock _lock(lock);
-}
-
-//==================================================================
-// Wrappers
-//==================================================================
-
-Agent::Agent(char* label, bool internalStore)
-{
- impl = new AgentImpl(label, internalStore);
-}
-
-Agent::~Agent()
-{
- delete impl;
-}
-
-void Agent::setStoreDir(char* path)
-{
- impl->setStoreDir(path);
-}
-
-void Agent::setTransferDir(char* path)
-{
- impl->setTransferDir(path);
-}
-
-void Agent::handleRcvMessage(Message& message)
-{
- impl->handleRcvMessage(message);
-}
-
-bool Agent::getXmtMessage(Message& item)
-{
- return impl->getXmtMessage(item);
-}
-
-void Agent::popXmt()
-{
- impl->popXmt();
-}
-
-bool Agent::getEvent(AgentEvent& event)
-{
- return impl->getEvent(event);
-}
-
-void Agent::popEvent()
-{
- impl->popEvent();
-}
-
-void Agent::newSession()
-{
- impl->newSession();
-}
-
-void Agent::startProtocol()
-{
- impl->startProtocol();
-}
-
-void Agent::heartbeat()
-{
- impl->heartbeat();
-}
-
-void Agent::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments)
-{
- impl->methodResponse(sequence, status, text, arguments);
-}
-
-void Agent::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat)
-{
- impl->queryResponse(sequence, object, prop, stat);
-}
-
-void Agent::queryComplete(uint32_t sequence)
-{
- impl->queryComplete(sequence);
-}
-
-void Agent::registerClass(SchemaObjectClass* cls)
-{
- impl->registerClass(cls);
-}
-
-void Agent::registerClass(SchemaEventClass* cls)
-{
- impl->registerClass(cls);
-}
-
-const ObjectId* Agent::addObject(Object& obj, uint64_t persistId)
-{
- return impl->addObject(obj, persistId);
-}
-
-const ObjectId* Agent::allocObjectId(uint64_t persistId)
-{
- return impl->allocObjectId(persistId);
-}
-
-const ObjectId* Agent::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi)
-{
- return impl->allocObjectId(persistIdLo, persistIdHi);
-}
-
-void Agent::raiseEvent(Event& event)
-{
- impl->raiseEvent(event);
-}
-