summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2013-07-26 21:31:09 +0000
committerAndrew Stitcher <astitcher@apache.org>2013-07-26 21:31:09 +0000
commit71798082f1d34e3d1a21cdcaa9128e06906d8e4e (patch)
treecb0f1800101e374995a9ed0f2566789cdf868a73 /qpid/cpp/src
parent7371045bd79b57356f163a2b6e55605c7718704a (diff)
downloadqpid-python-71798082f1d34e3d1a21cdcaa9128e06906d8e4e.tar.gz
QPID-4940: Remove obsolete qmf code
- Remove qmf1 engine code - Remove qmf1 language bindings - Remove qmf1 agent code (it depends on engine) - Fix up cmake to build git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1507464 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/CMakeLists.txt86
-rw-r--r--qpid/cpp/src/qmf/engine/Agent.cpp915
-rw-r--r--qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp827
-rw-r--r--qpid/cpp/src/qmf/engine/BrokerProxyImpl.h241
-rw-r--r--qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp278
-rw-r--r--qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.h63
-rw-r--r--qpid/cpp/src/qmf/engine/ConsoleImpl.cpp458
-rw-r--r--qpid/cpp/src/qmf/engine/ConsoleImpl.h148
-rw-r--r--qpid/cpp/src/qmf/engine/EventImpl.cpp120
-rw-r--r--qpid/cpp/src/qmf/engine/EventImpl.h57
-rw-r--r--qpid/cpp/src/qmf/engine/MessageImpl.cpp43
-rw-r--r--qpid/cpp/src/qmf/engine/MessageImpl.h44
-rw-r--r--qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp210
-rw-r--r--qpid/cpp/src/qmf/engine/ObjectIdImpl.h72
-rw-r--r--qpid/cpp/src/qmf/engine/ObjectImpl.cpp232
-rw-r--r--qpid/cpp/src/qmf/engine/ObjectImpl.h76
-rw-r--r--qpid/cpp/src/qmf/engine/Protocol.cpp52
-rw-r--r--qpid/cpp/src/qmf/engine/Protocol.h69
-rw-r--r--qpid/cpp/src/qmf/engine/QueryImpl.cpp103
-rw-r--r--qpid/cpp/src/qmf/engine/QueryImpl.h102
-rw-r--r--qpid/cpp/src/qmf/engine/ResilientConnection.cpp512
-rw-r--r--qpid/cpp/src/qmf/engine/SchemaImpl.cpp610
-rw-r--r--qpid/cpp/src/qmf/engine/SchemaImpl.h230
-rw-r--r--qpid/cpp/src/qmf/engine/SequenceManager.cpp96
-rw-r--r--qpid/cpp/src/qmf/engine/SequenceManager.h68
-rw-r--r--qpid/cpp/src/qmf/engine/ValueImpl.cpp571
-rw-r--r--qpid/cpp/src/qmf/engine/ValueImpl.h166
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp1434
-rw-r--r--qpid/cpp/src/qpid/agent/ManagementAgentImpl.h294
-rw-r--r--qpid/cpp/src/tests/testagent.cpp208
-rw-r--r--qpid/cpp/src/tests/testagent.xml64
31 files changed, 0 insertions, 8449 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt
index c1d16404ef..5d5dd034b1 100644
--- a/qpid/cpp/src/CMakeLists.txt
+++ b/qpid/cpp/src/CMakeLists.txt
@@ -1373,30 +1373,6 @@ if (UNIX)
install (CODE "FILE(MAKE_DIRECTORY \$ENV{DESTDIR}/${QPID_LOCALSTATE_DIR}/spool/qpidd)")
endif (UNIX)
-set (qmf_SOURCES
- qpid/agent/ManagementAgentImpl.cpp
- qpid/agent/ManagementAgentImpl.h
- )
-set (qmf_HEADERS
- ../include/qpid/agent/ManagementAgent.h
- ../include/qpid/agent/QmfAgentImportExport.h
- ../include/qmf/BrokerImportExport.h
- )
-
-add_msvc_version (qmf library dll)
-add_library (qmf SHARED ${qmf_SOURCES})
-target_link_libraries (qmf qmfengine)
-set_target_properties (qmf PROPERTIES
- VERSION ${qmf_version}
- SOVERSION ${qmf_version_major})
-install (TARGETS qmf OPTIONAL
- DESTINATION ${QPID_INSTALL_LIBDIR}
- COMPONENT ${QPID_COMPONENT_QMF})
-install (FILES ${qmf_HEADERS}
- DESTINATION ${QPID_INSTALL_INCLUDEDIR}/qpid/agent
- COMPONENT ${QPID_COMPONENT_QMF})
-install_pdb (qmf ${QPID_COMPONENT_QMF})
-
if (NOT WIN32)
set (qmf2_platform_headers
../include/qmf/posix/EventNotifier.h
@@ -1487,68 +1463,6 @@ endif (NOT WIN32)
COMPONENT ${QPID_COMPONENT_QMF})
install_pdb (qmf2 ${QPID_COMPONENT_QMF})
-set (qmfengine_SOURCES
- qmf/engine/Agent.cpp
- qmf/engine/BrokerProxyImpl.cpp
- qmf/engine/BrokerProxyImpl.h
- qmf/engine/ConnectionSettingsImpl.cpp
- qmf/engine/ConnectionSettingsImpl.h
- qmf/engine/ConsoleImpl.cpp
- qmf/engine/ConsoleImpl.h
- qmf/engine/EventImpl.cpp
- qmf/engine/EventImpl.h
- qmf/engine/MessageImpl.cpp
- qmf/engine/MessageImpl.h
- qmf/engine/ObjectIdImpl.cpp
- qmf/engine/ObjectIdImpl.h
- qmf/engine/ObjectImpl.cpp
- qmf/engine/ObjectImpl.h
- qmf/engine/Protocol.cpp
- qmf/engine/Protocol.h
- qmf/engine/QueryImpl.cpp
- qmf/engine/QueryImpl.h
- qmf/engine/SequenceManager.cpp
- qmf/engine/SequenceManager.h
- qmf/engine/SchemaImpl.cpp
- qmf/engine/SchemaImpl.h
- qmf/engine/ValueImpl.cpp
- qmf/engine/ValueImpl.h
- )
-
-set (qmfengine_HEADERS
- ../include/qmf/engine/Agent.h
- ../include/qmf/engine/ConnectionSettings.h
- ../include/qmf/engine/Console.h
- ../include/qmf/engine/Event.h
- ../include/qmf/engine/Message.h
- ../include/qmf/engine/Object.h
- ../include/qmf/engine/ObjectId.h
- ../include/qmf/engine/QmfEngineImportExport.h
- ../include/qmf/engine/Query.h
- ../include/qmf/engine/ResilientConnection.h
- ../include/qmf/engine/Schema.h
- ../include/qmf/engine/Typecode.h
- ../include/qmf/engine/Value.h
- )
-install (FILES ${qmfengine_HEADERS}
- DESTINATION ${QPID_INSTALL_INCLUDEDIR}/qmf/engine
- COMPONENT ${QPID_COMPONENT_QMF})
-
-if (NOT WIN32)
- list(APPEND qmfengine_SOURCES qmf/engine/ResilientConnection.cpp)
-endif (NOT WIN32)
-add_msvc_version (qmfengine library dll)
-
-add_library (qmfengine SHARED ${qmfengine_SOURCES})
-target_link_libraries (qmfengine qpidclient)
-set_target_properties (qmfengine PROPERTIES
- VERSION ${qmfengine_version}
- SOVERSION ${qmfengine_version_major})
-install (TARGETS qmfengine OPTIONAL
- DESTINATION ${QPID_INSTALL_LIBDIR}
- COMPONENT ${QPID_COMPONENT_QMF})
-install_pdb (qmfengine ${QPID_COMPONENT_QMF})
-
set (qmfconsole_SOURCES
qpid/console/Agent.cpp
qpid/console/Broker.cpp
diff --git a/qpid/cpp/src/qmf/engine/Agent.cpp b/qpid/cpp/src/qmf/engine/Agent.cpp
deleted file mode 100644
index 1f08dded94..0000000000
--- a/qpid/cpp/src/qmf/engine/Agent.cpp
+++ /dev/null
@@ -1,915 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/Agent.h"
-#include "qmf/engine/MessageImpl.h"
-#include "qmf/engine/SchemaImpl.h"
-#include "qmf/engine/Typecode.h"
-#include "qmf/engine/EventImpl.h"
-#include "qmf/engine/ObjectImpl.h"
-#include "qmf/engine/ObjectIdImpl.h"
-#include "qmf/engine/QueryImpl.h"
-#include "qmf/engine/ValueImpl.h"
-#include "qmf/engine/Protocol.h"
-#include <qpid/framing/Buffer.h>
-#include <qpid/framing/Uuid.h>
-#include <qpid/framing/FieldTable.h>
-#include <qpid/framing/FieldValue.h>
-#include <qpid/sys/Mutex.h>
-#include <qpid/log/Statement.h>
-#include <qpid/sys/Time.h>
-#include <string.h>
-#include <string>
-#include <deque>
-#include <map>
-#include <iostream>
-#include <fstream>
-#include <boost/shared_ptr.hpp>
-#include <boost/noncopyable.hpp>
-
-using namespace std;
-using namespace qmf::engine;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-namespace qmf {
-namespace engine {
-
- struct AgentEventImpl {
- typedef boost::shared_ptr<AgentEventImpl> Ptr;
- AgentEvent::EventKind kind;
- uint32_t sequence;
- string authUserId;
- string authToken;
- string name;
- Object* object;
- boost::shared_ptr<ObjectId> objectId;
- boost::shared_ptr<Query> query;
- boost::shared_ptr<Value> arguments;
- string exchange;
- string bindingKey;
- const SchemaObjectClass* objectClass;
-
- AgentEventImpl(AgentEvent::EventKind k) :
- kind(k), sequence(0), object(0), objectClass(0) {}
- ~AgentEventImpl() {}
- AgentEvent copy();
- };
-
- struct AgentQueryContext {
- typedef boost::shared_ptr<AgentQueryContext> Ptr;
- uint32_t sequence;
- string exchange;
- string key;
- const SchemaMethod* schemaMethod;
- AgentQueryContext() : schemaMethod(0) {}
- };
-
- class AgentImpl : public boost::noncopyable {
- public:
- AgentImpl(char* label, bool internalStore);
- ~AgentImpl();
-
- void setStoreDir(const char* path);
- void setTransferDir(const char* path);
- void handleRcvMessage(Message& message);
- bool getXmtMessage(Message& item) const;
- void popXmt();
- bool getEvent(AgentEvent& event) const;
- void popEvent();
- void newSession();
- void startProtocol();
- void heartbeat();
- void methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments);
- void queryResponse(uint32_t sequence, Object& object, bool prop, bool stat);
- void queryComplete(uint32_t sequence);
- void registerClass(SchemaObjectClass* cls);
- void registerClass(SchemaEventClass* cls);
- const ObjectId* addObject(Object& obj, uint64_t persistId);
- const ObjectId* allocObjectId(uint64_t persistId);
- const ObjectId* allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi);
- void raiseEvent(Event& event);
-
- private:
- mutable Mutex lock;
- Mutex addLock;
- string label;
- string queueName;
- string storeDir;
- string transferDir;
- bool internalStore;
- uint64_t nextTransientId;
- Uuid systemId;
- uint32_t requestedBrokerBank;
- uint32_t requestedAgentBank;
- uint32_t assignedBrokerBank;
- uint32_t assignedAgentBank;
- AgentAttachment attachment;
- uint16_t bootSequence;
- uint64_t nextObjectId;
- uint32_t nextContextNum;
- deque<AgentEventImpl::Ptr> eventQueue;
- deque<MessageImpl::Ptr> xmtQueue;
- map<uint32_t, AgentQueryContext::Ptr> contextMap;
- bool attachComplete;
-
- static const char* QMF_EXCHANGE;
- static const char* DIR_EXCHANGE;
- static const char* BROKER_KEY;
- static const uint32_t MERR_UNKNOWN_METHOD = 2;
- static const uint32_t MERR_UNKNOWN_PACKAGE = 8;
- static const uint32_t MERR_UNKNOWN_CLASS = 9;
- static const uint32_t MERR_INTERNAL_ERROR = 10;
-# define MA_BUFFER_SIZE 65536
- char outputBuffer[MA_BUFFER_SIZE];
-
- struct AgentClassKey {
- string name;
- uint8_t hash[16];
- AgentClassKey(const string& n, const uint8_t* h) : name(n) {
- memcpy(hash, h, 16);
- }
- AgentClassKey(Buffer& buffer) {
- buffer.getShortString(name);
- buffer.getBin128(hash);
- }
- string repr() {
- return name;
- }
- };
-
- struct AgentClassKeyComp {
- bool operator() (const AgentClassKey& lhs, const AgentClassKey& rhs) const
- {
- if (lhs.name != rhs.name)
- return lhs.name < rhs.name;
- else
- for (int i = 0; i < 16; i++)
- if (lhs.hash[i] != rhs.hash[i])
- return lhs.hash[i] < rhs.hash[i];
- return false;
- }
- };
-
- typedef map<AgentClassKey, SchemaObjectClass*, AgentClassKeyComp> ObjectClassMap;
- typedef map<AgentClassKey, SchemaEventClass*, AgentClassKeyComp> EventClassMap;
-
- struct ClassMaps {
- ObjectClassMap objectClasses;
- EventClassMap eventClasses;
- };
-
- map<string, ClassMaps> packages;
-
- AgentEventImpl::Ptr eventDeclareQueue(const string& queueName);
- AgentEventImpl::Ptr eventBind(const string& exchange, const string& queue, const string& key);
- AgentEventImpl::Ptr eventSetupComplete();
- AgentEventImpl::Ptr eventQuery(uint32_t num, const string& userId, const string& package, const string& cls,
- boost::shared_ptr<ObjectId> oid);
- AgentEventImpl::Ptr eventMethod(uint32_t num, const string& userId, const string& method,
- boost::shared_ptr<ObjectId> oid, boost::shared_ptr<Value> argMap,
- const SchemaObjectClass* objectClass);
- void sendBufferLH(Buffer& buf, const string& destination, const string& routingKey);
-
- void sendPackageIndicationLH(const string& packageName);
- void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key);
- void sendCommandCompleteLH(const string& exchange, const string& key, uint32_t seq,
- uint32_t code = 0, const string& text = "OK");
- void sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text="");
- void handleAttachResponse(Buffer& inBuffer);
- void handlePackageRequest(Buffer& inBuffer);
- void handleClassQuery(Buffer& inBuffer);
- void handleSchemaRequest(Buffer& inBuffer, uint32_t sequence,
- const string& replyToExchange, const string& replyToKey);
- void handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId);
- void handleMethodRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId);
- void handleConsoleAddedIndication();
- };
-}
-}
-
-const char* AgentImpl::QMF_EXCHANGE = "qpid.management";
-const char* AgentImpl::DIR_EXCHANGE = "amq.direct";
-const char* AgentImpl::BROKER_KEY = "broker";
-
-#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
-
-AgentEvent AgentEventImpl::copy()
-{
- AgentEvent item;
-
- ::memset(&item, 0, sizeof(AgentEvent));
- item.kind = kind;
- item.sequence = sequence;
- item.object = object;
- item.objectId = objectId.get();
- item.query = query.get();
- item.arguments = arguments.get();
- item.objectClass = objectClass;
-
- STRING_REF(authUserId);
- STRING_REF(authToken);
- STRING_REF(name);
- STRING_REF(exchange);
- STRING_REF(bindingKey);
-
- return item;
-}
-
-AgentImpl::AgentImpl(char* _label, bool i) :
- label(_label), queueName("qmfa-"), internalStore(i), nextTransientId(1),
- requestedBrokerBank(0), requestedAgentBank(0),
- assignedBrokerBank(0), assignedAgentBank(0),
- bootSequence(1), nextObjectId(1), nextContextNum(1), attachComplete(false)
-{
- queueName += Uuid(true).str();
-}
-
-AgentImpl::~AgentImpl()
-{
-}
-
-void AgentImpl::setStoreDir(const char* path)
-{
- Mutex::ScopedLock _lock(lock);
- if (path)
- storeDir = path;
- else
- storeDir.clear();
-}
-
-void AgentImpl::setTransferDir(const char* path)
-{
- Mutex::ScopedLock _lock(lock);
- if (path)
- transferDir = path;
- else
- transferDir.clear();
-}
-
-void AgentImpl::handleRcvMessage(Message& message)
-{
- Buffer inBuffer(message.body, message.length);
- uint8_t opcode;
- uint32_t sequence;
- string replyToExchange(message.replyExchange ? message.replyExchange : "");
- string replyToKey(message.replyKey ? message.replyKey : "");
- string userId(message.userId ? message.userId : "");
-
- while (Protocol::checkHeader(inBuffer, &opcode, &sequence)) {
- if (opcode == Protocol::OP_ATTACH_RESPONSE) handleAttachResponse(inBuffer);
- else if (opcode == Protocol::OP_SCHEMA_REQUEST) handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey);
- else if (opcode == Protocol::OP_CONSOLE_ADDED_INDICATION) handleConsoleAddedIndication();
- else if (opcode == Protocol::OP_GET_QUERY) handleGetQuery(inBuffer, sequence, replyToKey, userId);
- else if (opcode == Protocol::OP_METHOD_REQUEST) handleMethodRequest(inBuffer, sequence, replyToKey, userId);
- else {
- QPID_LOG(error, "AgentImpl::handleRcvMessage invalid opcode=" << opcode);
- break;
- }
- }
-}
-
-bool AgentImpl::getXmtMessage(Message& item) const
-{
- Mutex::ScopedLock _lock(lock);
- if (xmtQueue.empty())
- return false;
- item = xmtQueue.front()->copy();
- return true;
-}
-
-void AgentImpl::popXmt()
-{
- Mutex::ScopedLock _lock(lock);
- if (!xmtQueue.empty())
- xmtQueue.pop_front();
-}
-
-bool AgentImpl::getEvent(AgentEvent& event) const
-{
- Mutex::ScopedLock _lock(lock);
- if (eventQueue.empty())
- return false;
- event = eventQueue.front()->copy();
- return true;
-}
-
-void AgentImpl::popEvent()
-{
- Mutex::ScopedLock _lock(lock);
- if (!eventQueue.empty())
- eventQueue.pop_front();
-}
-
-void AgentImpl::newSession()
-{
- Mutex::ScopedLock _lock(lock);
- eventQueue.clear();
- xmtQueue.clear();
- eventQueue.push_back(eventDeclareQueue(queueName));
- eventQueue.push_back(eventBind("amq.direct", queueName, queueName));
- eventQueue.push_back(eventSetupComplete());
-}
-
-void AgentImpl::startProtocol()
-{
- Mutex::ScopedLock _lock(lock);
- char rawbuffer[512];
- Buffer buffer(rawbuffer, 512);
-
- Protocol::encodeHeader(buffer, Protocol::OP_ATTACH_REQUEST);
- buffer.putShortString(label);
- systemId.encode(buffer);
- buffer.putLong(requestedBrokerBank);
- buffer.putLong(requestedAgentBank);
- sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT AttachRequest: reqBroker=" << requestedBrokerBank <<
- " reqAgent=" << requestedAgentBank);
-}
-
-void AgentImpl::heartbeat()
-{
- Mutex::ScopedLock _lock(lock);
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
-
- Protocol::encodeHeader(buffer, Protocol::OP_HEARTBEAT_INDICATION);
- buffer.putLongLong(uint64_t(Duration(EPOCH, now())));
- stringstream key;
- key << "console.heartbeat." << assignedBrokerBank << "." << assignedAgentBank;
- sendBufferLH(buffer, QMF_EXCHANGE, key.str());
- QPID_LOG(trace, "SENT HeartbeatIndication");
-}
-
-void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& argMap)
-{
- Mutex::ScopedLock _lock(lock);
- map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
- if (iter == contextMap.end())
- return;
- AgentQueryContext::Ptr context = iter->second;
- contextMap.erase(iter);
-
- char* buf(outputBuffer);
- uint32_t bufLen(114 + strlen(text)); // header(8) + status(4) + mstring(2 + size) + margin(100)
- bool allocated(false);
-
- if (status == 0) {
- for (vector<const SchemaArgument*>::const_iterator aIter = context->schemaMethod->impl->arguments.begin();
- aIter != context->schemaMethod->impl->arguments.end(); aIter++) {
- const SchemaArgument* schemaArg = *aIter;
- if (schemaArg->getDirection() == DIR_OUT || schemaArg->getDirection() == DIR_IN_OUT) {
- if (argMap.keyInMap(schemaArg->getName())) {
- const Value* val = argMap.byKey(schemaArg->getName());
- bufLen += val->impl->encodedSize();
- } else {
- Value val(schemaArg->getType());
- bufLen += val.impl->encodedSize();
- }
- }
- }
- }
-
- if (bufLen > MA_BUFFER_SIZE) {
- buf = (char*) malloc(bufLen);
- allocated = true;
- }
-
- Buffer buffer(buf, bufLen);
- Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, context->sequence);
- buffer.putLong(status);
- buffer.putMediumString(text);
- if (status == 0) {
- for (vector<const SchemaArgument*>::const_iterator aIter = context->schemaMethod->impl->arguments.begin();
- aIter != context->schemaMethod->impl->arguments.end(); aIter++) {
- const SchemaArgument* schemaArg = *aIter;
- if (schemaArg->getDirection() == DIR_OUT || schemaArg->getDirection() == DIR_IN_OUT) {
- if (argMap.keyInMap(schemaArg->getName())) {
- const Value* val = argMap.byKey(schemaArg->getName());
- val->impl->encode(buffer);
- } else {
- Value val(schemaArg->getType());
- val.impl->encode(buffer);
- }
- }
- }
- }
- sendBufferLH(buffer, context->exchange, context->key);
- if (allocated)
- free(buf);
- QPID_LOG(trace, "SENT MethodResponse seq=" << context->sequence << " status=" << status << " text=" << text);
-}
-
-void AgentImpl::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat)
-{
- Mutex::ScopedLock _lock(lock);
- map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
- if (iter == contextMap.end())
- return;
- AgentQueryContext::Ptr context = iter->second;
-
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- Protocol::encodeHeader(buffer, Protocol::OP_OBJECT_INDICATION, context->sequence);
-
- object.impl->encodeSchemaKey(buffer);
- object.impl->encodeManagedObjectData(buffer);
- if (prop)
- object.impl->encodeProperties(buffer);
- if (stat)
- object.impl->encodeStatistics(buffer);
-
- sendBufferLH(buffer, context->exchange, context->key);
- QPID_LOG(trace, "SENT ContentIndication seq=" << context->sequence);
-}
-
-void AgentImpl::queryComplete(uint32_t sequence)
-{
- Mutex::ScopedLock _lock(lock);
- map<uint32_t, AgentQueryContext::Ptr>::iterator iter = contextMap.find(sequence);
- if (iter == contextMap.end())
- return;
-
- AgentQueryContext::Ptr context = iter->second;
- contextMap.erase(iter);
- sendCommandCompleteLH(context->exchange, context->key, context->sequence, 0, "OK");
-}
-
-void AgentImpl::registerClass(SchemaObjectClass* cls)
-{
- Mutex::ScopedLock _lock(lock);
- bool newPackage = false;
-
- map<string, ClassMaps>::iterator iter = packages.find(cls->getClassKey()->getPackageName());
- if (iter == packages.end()) {
- packages[cls->getClassKey()->getPackageName()] = ClassMaps();
- iter = packages.find(cls->getClassKey()->getPackageName());
- newPackage = true;
- }
-
- AgentClassKey key(cls->getClassKey()->getClassName(), cls->getClassKey()->getHash());
- iter->second.objectClasses[key] = cls;
-
- // Indicate this new schema if connected.
-
- if (attachComplete) {
-
- if (newPackage) {
- sendPackageIndicationLH(iter->first);
- }
- sendClassIndicationLH(CLASS_OBJECT, iter->first, key);
- }
-}
-
-void AgentImpl::registerClass(SchemaEventClass* cls)
-{
- Mutex::ScopedLock _lock(lock);
- bool newPackage = false;
-
- map<string, ClassMaps>::iterator iter = packages.find(cls->getClassKey()->getPackageName());
- if (iter == packages.end()) {
- packages[cls->getClassKey()->getPackageName()] = ClassMaps();
- iter = packages.find(cls->getClassKey()->getPackageName());
- newPackage = true;
- }
-
- AgentClassKey key(cls->getClassKey()->getClassName(), cls->getClassKey()->getHash());
- iter->second.eventClasses[key] = cls;
-
- // Indicate this new schema if connected.
-
- if (attachComplete) {
-
- if (newPackage) {
- sendPackageIndicationLH(iter->first);
- }
- sendClassIndicationLH(CLASS_EVENT, iter->first, key);
- }
-}
-
-const ObjectId* AgentImpl::addObject(Object&, uint64_t)
-{
- Mutex::ScopedLock _lock(lock);
- return 0;
-}
-
-const ObjectId* AgentImpl::allocObjectId(uint64_t persistId)
-{
- Mutex::ScopedLock _lock(lock);
- uint16_t sequence = persistId ? 0 : bootSequence;
- uint64_t objectNum = persistId ? persistId : nextObjectId++;
-
- ObjectId* oid = ObjectIdImpl::factory(&attachment, 0, sequence, objectNum);
- return oid;
-}
-
-const ObjectId* AgentImpl::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi)
-{
- return allocObjectId(((uint64_t) persistIdHi) << 32 | (uint64_t) persistIdLo);
-}
-
-void AgentImpl::raiseEvent(Event& event)
-{
- Mutex::ScopedLock _lock(lock);
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- Protocol::encodeHeader(buffer, Protocol::OP_EVENT_INDICATION);
-
- event.impl->encodeSchemaKey(buffer);
- buffer.putLongLong(uint64_t(Duration(EPOCH, now())));
- event.impl->encode(buffer);
- string key(event.impl->getRoutingKey(assignedBrokerBank, assignedAgentBank));
-
- sendBufferLH(buffer, QMF_EXCHANGE, key);
- QPID_LOG(trace, "SENT EventIndication");
-}
-
-AgentEventImpl::Ptr AgentImpl::eventDeclareQueue(const string& name)
-{
- AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::DECLARE_QUEUE));
- event->name = name;
-
- return event;
-}
-
-AgentEventImpl::Ptr AgentImpl::eventBind(const string& exchange, const string& queue,
- const string& key)
-{
- AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::BIND));
- event->name = queue;
- event->exchange = exchange;
- event->bindingKey = key;
-
- return event;
-}
-
-AgentEventImpl::Ptr AgentImpl::eventSetupComplete()
-{
- AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::SETUP_COMPLETE));
- return event;
-}
-
-AgentEventImpl::Ptr AgentImpl::eventQuery(uint32_t num, const string& userId, const string& package,
- const string& cls, boost::shared_ptr<ObjectId> oid)
-{
- AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::GET_QUERY));
- event->sequence = num;
- event->authUserId = userId;
- if (oid.get())
- event->query.reset(new Query(oid.get()));
- else
- event->query.reset(new Query(cls.c_str(), package.c_str()));
- return event;
-}
-
-AgentEventImpl::Ptr AgentImpl::eventMethod(uint32_t num, const string& userId, const string& method,
- boost::shared_ptr<ObjectId> oid, boost::shared_ptr<Value> argMap,
- const SchemaObjectClass* objectClass)
-{
- AgentEventImpl::Ptr event(new AgentEventImpl(AgentEvent::METHOD_CALL));
- event->sequence = num;
- event->authUserId = userId;
- event->name = method;
- event->objectId = oid;
- event->arguments = argMap;
- event->objectClass = objectClass;
- return event;
-}
-
-void AgentImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
-{
- uint32_t length = buf.getPosition();
- MessageImpl::Ptr message(new MessageImpl);
-
- buf.reset();
- buf.getRawData(message->body, length);
- message->destination = destination;
- message->routingKey = routingKey;
- message->replyExchange = "amq.direct";
- message->replyKey = queueName;
-
- xmtQueue.push_back(message);
-}
-
-void AgentImpl::sendPackageIndicationLH(const string& packageName)
-{
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- Protocol::encodeHeader(buffer, Protocol::OP_PACKAGE_INDICATION);
- buffer.putShortString(packageName);
- sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT PackageIndication: package_name=" << packageName);
-}
-
-void AgentImpl::sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key)
-{
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- Protocol::encodeHeader(buffer, Protocol::OP_CLASS_INDICATION);
- buffer.putOctet((int) kind);
- buffer.putShortString(packageName);
- buffer.putShortString(key.name);
- buffer.putBin128(const_cast<uint8_t*>(key.hash)); // const_cast needed for older Qpid libraries
- sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT ClassIndication: package_name=" << packageName << " class_name=" << key.name);
-}
-
-void AgentImpl::sendCommandCompleteLH(const string& exchange, const string& replyToKey,
- uint32_t sequence, uint32_t code, const string& text)
-{
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- Protocol::encodeHeader(buffer, Protocol::OP_COMMAND_COMPLETE, sequence);
- buffer.putLong(code);
- buffer.putShortString(text);
- sendBufferLH(buffer, exchange, replyToKey);
- QPID_LOG(trace, "SENT CommandComplete: seq=" << sequence << " code=" << code << " text=" << text);
-}
-
-void AgentImpl::sendMethodErrorLH(uint32_t sequence, const string& key, uint32_t code, const string& text)
-{
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- Protocol::encodeHeader(buffer, Protocol::OP_METHOD_RESPONSE, sequence);
- buffer.putLong(code);
-
- string fulltext;
- switch (code) {
- case MERR_UNKNOWN_PACKAGE: fulltext = "Unknown Package"; break;
- case MERR_UNKNOWN_CLASS: fulltext = "Unknown Class"; break;
- case MERR_UNKNOWN_METHOD: fulltext = "Unknown Method"; break;
- case MERR_INTERNAL_ERROR: fulltext = "Internal Error"; break;
- default: fulltext = "Unspecified Error"; break;
- }
-
- if (!text.empty()) {
- fulltext += " (";
- fulltext += text;
- fulltext += ")";
- }
-
- buffer.putMediumString(fulltext);
- sendBufferLH(buffer, DIR_EXCHANGE, key);
- QPID_LOG(trace, "SENT MethodResponse: errorCode=" << code << " text=" << fulltext);
-}
-
-void AgentImpl::handleAttachResponse(Buffer& inBuffer)
-{
- Mutex::ScopedLock _lock(lock);
-
- assignedBrokerBank = inBuffer.getLong();
- assignedAgentBank = inBuffer.getLong();
-
- QPID_LOG(trace, "RCVD AttachResponse: broker=" << assignedBrokerBank << " agent=" << assignedAgentBank);
-
- if ((assignedBrokerBank != requestedBrokerBank) ||
- (assignedAgentBank != requestedAgentBank)) {
- if (requestedAgentBank == 0) {
- QPID_LOG(notice, "Initial object-id bank assigned: " << assignedBrokerBank << "." <<
- assignedAgentBank);
- } else {
- QPID_LOG(warning, "Collision in object-id! New bank assigned: " << assignedBrokerBank <<
- "." << assignedAgentBank);
- }
- //storeData(); // TODO
- requestedBrokerBank = assignedBrokerBank;
- requestedAgentBank = assignedAgentBank;
- }
-
- attachment.setBanks(assignedBrokerBank, assignedAgentBank);
-
- // Bind to qpid.management to receive commands
- stringstream key;
- key << "agent." << assignedBrokerBank << "." << assignedAgentBank;
- eventQueue.push_back(eventBind(QMF_EXCHANGE, queueName, key.str()));
-
- // Send package indications for all local packages
- for (map<string, ClassMaps>::iterator pIter = packages.begin();
- pIter != packages.end();
- pIter++) {
- sendPackageIndicationLH(pIter->first);
-
- // Send class indications for all local classes
- ClassMaps cMap = pIter->second;
- for (ObjectClassMap::iterator cIter = cMap.objectClasses.begin();
- cIter != cMap.objectClasses.end(); cIter++)
- sendClassIndicationLH(CLASS_OBJECT, pIter->first, cIter->first);
- for (EventClassMap::iterator cIter = cMap.eventClasses.begin();
- cIter != cMap.eventClasses.end(); cIter++)
- sendClassIndicationLH(CLASS_EVENT, pIter->first, cIter->first);
- }
-
- attachComplete = true;
-}
-
-void AgentImpl::handlePackageRequest(Buffer&)
-{
- Mutex::ScopedLock _lock(lock);
-}
-
-void AgentImpl::handleClassQuery(Buffer&)
-{
- Mutex::ScopedLock _lock(lock);
-}
-
-void AgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence,
- const string& replyExchange, const string& replyKey)
-{
- Mutex::ScopedLock _lock(lock);
- string rExchange(replyExchange);
- string rKey(replyKey);
- string packageName;
- inBuffer.getShortString(packageName);
- AgentClassKey key(inBuffer);
-
- if (rExchange.empty())
- rExchange = QMF_EXCHANGE;
- if (rKey.empty())
- rKey = BROKER_KEY;
-
- QPID_LOG(trace, "RCVD SchemaRequest: package=" << packageName << " class=" << key.name);
-
- map<string, ClassMaps>::iterator pIter = packages.find(packageName);
- if (pIter == packages.end()) {
- sendCommandCompleteLH(rExchange, rKey, sequence, 1, "package not found");
- return;
- }
-
- ClassMaps cMap = pIter->second;
- ObjectClassMap::iterator ocIter = cMap.objectClasses.find(key);
- if (ocIter != cMap.objectClasses.end()) {
- SchemaObjectClass* oImpl = ocIter->second;
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence);
- oImpl->impl->encode(buffer);
- sendBufferLH(buffer, rExchange, rKey);
- QPID_LOG(trace, "SENT SchemaResponse: (object) package=" << packageName << " class=" << key.name);
- return;
- }
-
- EventClassMap::iterator ecIter = cMap.eventClasses.find(key);
- if (ecIter != cMap.eventClasses.end()) {
- SchemaEventClass* eImpl = ecIter->second;
- Buffer buffer(outputBuffer, MA_BUFFER_SIZE);
- Protocol::encodeHeader(buffer, Protocol::OP_SCHEMA_RESPONSE, sequence);
- eImpl->impl->encode(buffer);
- sendBufferLH(buffer, rExchange, rKey);
- QPID_LOG(trace, "SENT SchemaResponse: (event) package=" << packageName << " class=" << key.name);
- return;
- }
-
- sendCommandCompleteLH(rExchange, rKey, sequence, 1, "class not found");
-}
-
-void AgentImpl::handleGetQuery(Buffer& inBuffer, uint32_t sequence, const string& replyTo, const string& userId)
-{
- Mutex::ScopedLock _lock(lock);
- FieldTable ft;
- FieldTable::ValuePtr value;
- map<string, ClassMaps>::const_iterator pIter = packages.end();
- string pname;
- string cname;
- string oidRepr;
- boost::shared_ptr<ObjectId> oid;
-
- ft.decode(inBuffer);
-
- QPID_LOG(trace, "RCVD GetQuery: seq=" << sequence << " map=" << ft);
-
- value = ft.get("_package");
- if (value.get() && value->convertsTo<string>()) {
- pname = value->get<string>();
- pIter = packages.find(pname);
- if (pIter == packages.end()) {
- sendCommandCompleteLH(DIR_EXCHANGE, replyTo, sequence);
- return;
- }
- }
-
- value = ft.get("_class");
- if (value.get() && value->convertsTo<string>()) {
- cname = value->get<string>();
- // TODO - check for validity of class (in package or any package)
- if (pIter == packages.end()) {
- } else {
-
- }
- }
-
- value = ft.get("_objectid");
- if (value.get() && value->convertsTo<string>()) {
- oidRepr = value->get<string>();
- oid.reset(new ObjectId());
- oid->impl->fromString(oidRepr);
- }
-
- AgentQueryContext::Ptr context(new AgentQueryContext);
- uint32_t contextNum = nextContextNum++;
- context->sequence = sequence;
- context->exchange = DIR_EXCHANGE;
- context->key = replyTo;
- contextMap[contextNum] = context;
-
- eventQueue.push_back(eventQuery(contextNum, userId, pname, cname, oid));
-}
-
-void AgentImpl::handleMethodRequest(Buffer& buffer, uint32_t sequence, const string& replyTo, const string& userId)
-{
- Mutex::ScopedLock _lock(lock);
- string pname;
- string method;
- boost::shared_ptr<ObjectId> oid(ObjectIdImpl::factory(buffer));
- buffer.getShortString(pname);
- AgentClassKey classKey(buffer);
- buffer.getShortString(method);
-
- QPID_LOG(trace, "RCVD MethodRequest seq=" << sequence << " method=" << method);
-
- map<string, ClassMaps>::const_iterator pIter = packages.find(pname);
- if (pIter == packages.end()) {
- sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_PACKAGE, pname);
- return;
- }
-
- ObjectClassMap::const_iterator cIter = pIter->second.objectClasses.find(classKey);
- if (cIter == pIter->second.objectClasses.end()) {
- sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_CLASS, classKey.repr());
- return;
- }
-
- const SchemaObjectClass* schema = cIter->second;
- vector<const SchemaMethod*>::const_iterator mIter = schema->impl->methods.begin();
- for (; mIter != schema->impl->methods.end(); mIter++) {
- if ((*mIter)->getName() == method)
- break;
- }
-
- if (mIter == schema->impl->methods.end()) {
- sendMethodErrorLH(sequence, replyTo, MERR_UNKNOWN_METHOD, method);
- return;
- }
-
- const SchemaMethod* schemaMethod = *mIter;
- boost::shared_ptr<Value> argMap(new Value(TYPE_MAP));
- Value* value;
- for (vector<const SchemaArgument*>::const_iterator aIter = schemaMethod->impl->arguments.begin();
- aIter != schemaMethod->impl->arguments.end(); aIter++) {
- const SchemaArgument* schemaArg = *aIter;
- if (schemaArg->getDirection() == DIR_IN || schemaArg->getDirection() == DIR_IN_OUT)
- value = ValueImpl::factory(schemaArg->getType(), buffer);
- else
- value = ValueImpl::factory(schemaArg->getType());
- argMap->insert(schemaArg->getName(), value);
- }
-
- AgentQueryContext::Ptr context(new AgentQueryContext);
- uint32_t contextNum = nextContextNum++;
- context->sequence = sequence;
- context->exchange = DIR_EXCHANGE;
- context->key = replyTo;
- context->schemaMethod = schemaMethod;
- contextMap[contextNum] = context;
-
- eventQueue.push_back(eventMethod(contextNum, userId, method, oid, argMap, schema));
-}
-
-void AgentImpl::handleConsoleAddedIndication()
-{
- Mutex::ScopedLock _lock(lock);
-}
-
-//==================================================================
-// Wrappers
-//==================================================================
-
-Agent::Agent(char* label, bool internalStore) { impl = new AgentImpl(label, internalStore); }
-Agent::~Agent() { delete impl; }
-void Agent::setStoreDir(const char* path) { impl->setStoreDir(path); }
-void Agent::setTransferDir(const char* path) { impl->setTransferDir(path); }
-void Agent::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); }
-bool Agent::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); }
-void Agent::popXmt() { impl->popXmt(); }
-bool Agent::getEvent(AgentEvent& event) const { return impl->getEvent(event); }
-void Agent::popEvent() { impl->popEvent(); }
-void Agent::newSession() { impl->newSession(); }
-void Agent::startProtocol() { impl->startProtocol(); }
-void Agent::heartbeat() { impl->heartbeat(); }
-void Agent::methodResponse(uint32_t sequence, uint32_t status, char* text, const Value& arguments) { impl->methodResponse(sequence, status, text, arguments); }
-void Agent::queryResponse(uint32_t sequence, Object& object, bool prop, bool stat) { impl->queryResponse(sequence, object, prop, stat); }
-void Agent::queryComplete(uint32_t sequence) { impl->queryComplete(sequence); }
-void Agent::registerClass(SchemaObjectClass* cls) { impl->registerClass(cls); }
-void Agent::registerClass(SchemaEventClass* cls) { impl->registerClass(cls); }
-const ObjectId* Agent::addObject(Object& obj, uint64_t persistId) { return impl->addObject(obj, persistId); }
-const ObjectId* Agent::allocObjectId(uint64_t persistId) { return impl->allocObjectId(persistId); }
-const ObjectId* Agent::allocObjectId(uint32_t persistIdLo, uint32_t persistIdHi) { return impl->allocObjectId(persistIdLo, persistIdHi); }
-void Agent::raiseEvent(Event& event) { impl->raiseEvent(event); }
-
diff --git a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp
deleted file mode 100644
index 5fc71979fd..0000000000
--- a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.cpp
+++ /dev/null
@@ -1,827 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/BrokerProxyImpl.h"
-#include "qmf/engine/ConsoleImpl.h"
-#include "qmf/engine/Protocol.h"
-#include "qpid/Address.h"
-#include "qpid/sys/SystemInfo.h"
-#include <qpid/log/Statement.h>
-#include <qpid/StringUtils.h>
-#include <string.h>
-#include <iostream>
-#include <fstream>
-
-using namespace std;
-using namespace qmf::engine;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-namespace {
- const char* QMF_EXCHANGE = "qpid.management";
- const char* DIR_EXCHANGE = "amq.direct";
- const char* BROKER_KEY = "broker";
- const char* BROKER_PACKAGE = "org.apache.qpid.broker";
- const char* AGENT_CLASS = "agent";
- const char* BROKER_AGENT_KEY = "agent.1.0";
-}
-
-const Object* QueryResponseImpl::getObject(uint32_t idx) const
-{
- vector<ObjectPtr>::const_iterator iter = results.begin();
-
- while (idx > 0) {
- if (iter == results.end())
- return 0;
- iter++;
- idx--;
- }
-
- return iter->get();
-}
-
-#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
-
-BrokerEvent BrokerEventImpl::copy()
-{
- BrokerEvent item;
-
- ::memset(&item, 0, sizeof(BrokerEvent));
- item.kind = kind;
-
- STRING_REF(name);
- STRING_REF(exchange);
- STRING_REF(bindingKey);
- item.context = context;
- item.queryResponse = queryResponse.get();
- item.methodResponse = methodResponse.get();
-
- return item;
-}
-
-BrokerProxyImpl::BrokerProxyImpl(BrokerProxy& pub, Console& _console) : publicObject(pub), console(_console)
-{
- stringstream qn;
- qpid::Address addr;
-
- SystemInfo::getLocalHostname(addr);
- qn << "qmfc-" << SystemInfo::getProcessName() << "-" << addr << "-" << SystemInfo::getProcessId();
- queueName = qn.str();
-
- seqMgr.setUnsolicitedContext(SequenceContext::Ptr(new StaticContext(*this)));
-}
-
-void BrokerProxyImpl::sessionOpened(SessionHandle& /*sh*/)
-{
- Mutex::ScopedLock _lock(lock);
- agentList.clear();
- eventQueue.clear();
- xmtQueue.clear();
- eventQueue.push_back(eventDeclareQueue(queueName));
- eventQueue.push_back(eventBind(DIR_EXCHANGE, queueName, queueName));
- eventQueue.push_back(eventSetupComplete());
-
- // TODO: Store session handle
-}
-
-void BrokerProxyImpl::sessionClosed()
-{
- Mutex::ScopedLock _lock(lock);
- agentList.clear();
- eventQueue.clear();
- xmtQueue.clear();
-}
-
-void BrokerProxyImpl::startProtocol()
-{
- AgentProxyPtr agent(AgentProxyImpl::factory(console, publicObject, 0, "Agent embedded in broker"));
- {
- Mutex::ScopedLock _lock(lock);
- char rawbuffer[512];
- Buffer buffer(rawbuffer, 512);
-
- agentList[0] = agent;
-
- requestsOutstanding = 1;
- topicBound = false;
- uint32_t sequence(seqMgr.reserve());
- Protocol::encodeHeader(buffer, Protocol::OP_BROKER_REQUEST, sequence);
- sendBufferLH(buffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT BrokerRequest seq=" << sequence);
- }
-
- console.impl->eventAgentAdded(agent);
-}
-
-void BrokerProxyImpl::sendBufferLH(Buffer& buf, const string& destination, const string& routingKey)
-{
- uint32_t length = buf.getPosition();
- MessageImpl::Ptr message(new MessageImpl);
-
- buf.reset();
- buf.getRawData(message->body, length);
- message->destination = destination;
- message->routingKey = routingKey;
- message->replyExchange = DIR_EXCHANGE;
- message->replyKey = queueName;
-
- xmtQueue.push_back(message);
-}
-
-void BrokerProxyImpl::handleRcvMessage(Message& message)
-{
- Buffer inBuffer(message.body, message.length);
- uint8_t opcode;
- uint32_t sequence;
-
- while (Protocol::checkHeader(inBuffer, &opcode, &sequence))
- seqMgr.dispatch(opcode, sequence, message.routingKey ? string(message.routingKey) : string(), inBuffer);
-}
-
-bool BrokerProxyImpl::getXmtMessage(Message& item) const
-{
- Mutex::ScopedLock _lock(lock);
- if (xmtQueue.empty())
- return false;
- item = xmtQueue.front()->copy();
- return true;
-}
-
-void BrokerProxyImpl::popXmt()
-{
- Mutex::ScopedLock _lock(lock);
- if (!xmtQueue.empty())
- xmtQueue.pop_front();
-}
-
-bool BrokerProxyImpl::getEvent(BrokerEvent& event) const
-{
- Mutex::ScopedLock _lock(lock);
- if (eventQueue.empty())
- return false;
- event = eventQueue.front()->copy();
- return true;
-}
-
-void BrokerProxyImpl::popEvent()
-{
- Mutex::ScopedLock _lock(lock);
- if (!eventQueue.empty())
- eventQueue.pop_front();
-}
-
-uint32_t BrokerProxyImpl::agentCount() const
-{
- Mutex::ScopedLock _lock(lock);
- return agentList.size();
-}
-
-const AgentProxy* BrokerProxyImpl::getAgent(uint32_t idx) const
-{
- Mutex::ScopedLock _lock(lock);
- for (map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.begin();
- iter != agentList.end(); iter++)
- if (idx-- == 0)
- return iter->second.get();
- return 0;
-}
-
-void BrokerProxyImpl::sendQuery(const Query& query, void* context, const AgentProxy* agent)
-{
- SequenceContext::Ptr queryContext(new QueryContext(*this, context));
- Mutex::ScopedLock _lock(lock);
- bool sent = false;
- if (agent != 0) {
- if (sendGetRequestLH(queryContext, query, agent))
- sent = true;
- } else {
- // TODO (optimization) only send queries to agents that have the requested class+package
- for (map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.begin();
- iter != agentList.end(); iter++) {
- if (sendGetRequestLH(queryContext, query, iter->second.get()))
- sent = true;
- }
- }
-
- if (!sent) {
- queryContext->reserve();
- queryContext->release();
- }
-}
-
-bool BrokerProxyImpl::sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent)
-{
- if (query.impl->singleAgent()) {
- if (query.impl->agentBank() != agent->getAgentBank())
- return false;
- }
- stringstream key;
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve(queryContext));
- agent->impl->addSequence(sequence);
-
- Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
- query.impl->encode(outBuffer);
- key << "agent.1." << agent->impl->agentBank;
- sendBufferLH(outBuffer, QMF_EXCHANGE, key.str());
- QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << key.str());
- return true;
-}
-
-string BrokerProxyImpl::encodeMethodArguments(const SchemaMethod* schema, const Value* argmap, Buffer& buffer)
-{
- int argCount = schema->getArgumentCount();
-
- if (argmap == 0 || !argmap->isMap())
- return string("Arguments must be in a map value");
-
- for (int aIdx = 0; aIdx < argCount; aIdx++) {
- const SchemaArgument* arg(schema->getArgument(aIdx));
- if (arg->getDirection() == DIR_IN || arg->getDirection() == DIR_IN_OUT) {
- if (argmap->keyInMap(arg->getName())) {
- const Value* argVal(argmap->byKey(arg->getName()));
- if (argVal->getType() != arg->getType())
- return string("Argument is the wrong type: ") + arg->getName();
- argVal->impl->encode(buffer);
- } else {
- Value defaultValue(arg->getType());
- defaultValue.impl->encode(buffer);
- }
- }
- }
-
- return string();
-}
-
-string BrokerProxyImpl::encodedSizeMethodArguments(const SchemaMethod* schema, const Value* argmap, uint32_t& size)
-{
- int argCount = schema->getArgumentCount();
-
- if (argmap == 0 || !argmap->isMap())
- return string("Arguments must be in a map value");
-
- for (int aIdx = 0; aIdx < argCount; aIdx++) {
- const SchemaArgument* arg(schema->getArgument(aIdx));
- if (arg->getDirection() == DIR_IN || arg->getDirection() == DIR_IN_OUT) {
- if (argmap->keyInMap(arg->getName())) {
- const Value* argVal(argmap->byKey(arg->getName()));
- if (argVal->getType() != arg->getType())
- return string("Argument is the wrong type: ") + arg->getName();
- size += argVal->impl->encodedSize();
- } else {
- Value defaultValue(arg->getType());
- size += defaultValue.impl->encodedSize();
- }
- }
- }
-
- return string();
-}
-
-void BrokerProxyImpl::sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls,
- const string& methodName, const Value* args, void* userContext)
-{
- int methodCount = cls->getMethodCount();
- int idx;
- for (idx = 0; idx < methodCount; idx++) {
- const SchemaMethod* method = cls->getMethod(idx);
- if (string(method->getName()) == methodName) {
- Mutex::ScopedLock _lock(lock);
- SequenceContext::Ptr methodContext(new MethodContext(*this, userContext, method));
- stringstream key;
- char* buf(outputBuffer);
- uint32_t bufLen(1024);
- bool allocated(false);
-
- string argErrorString = encodedSizeMethodArguments(method, args, bufLen);
- if (!argErrorString.empty()) {
- MethodResponsePtr argError(MethodResponseImpl::factory(1, argErrorString));
- eventQueue.push_back(eventMethodResponse(userContext, argError));
- return;
- }
-
- if (bufLen > MA_BUFFER_SIZE) {
- buf = (char*) malloc(bufLen);
- allocated = true;
- }
-
- Buffer outBuffer(buf, bufLen);
- uint32_t sequence(seqMgr.reserve(methodContext));
-
- Protocol::encodeHeader(outBuffer, Protocol::OP_METHOD_REQUEST, sequence);
- oid->impl->encode(outBuffer);
- cls->getClassKey()->impl->encode(outBuffer);
- outBuffer.putShortString(methodName);
-
- encodeMethodArguments(method, args, outBuffer);
- key << "agent.1." << oid->impl->getAgentBank();
- sendBufferLH(outBuffer, QMF_EXCHANGE, key.str());
- QPID_LOG(trace, "SENT MethodRequest seq=" << sequence << " method=" << methodName << " key=" << key.str());
-
- if (allocated)
- free(buf);
-
- return;
- }
- }
-
- MethodResponsePtr error(MethodResponseImpl::factory(1, string("Unknown method: ") + methodName));
- Mutex::ScopedLock _lock(lock);
- eventQueue.push_back(eventMethodResponse(userContext, error));
-}
-
-void BrokerProxyImpl::addBinding(const string& exchange, const string& key)
-{
- Mutex::ScopedLock _lock(lock);
- eventQueue.push_back(eventBind(exchange, queueName, key));
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventDeclareQueue(const string& queueName)
-{
- BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::DECLARE_QUEUE));
- event->name = queueName;
- return event;
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventBind(const string& exchange, const string& queue, const string& key)
-{
- BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::BIND));
- event->name = queue;
- event->exchange = exchange;
- event->bindingKey = key;
-
- return event;
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventSetupComplete()
-{
- BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::SETUP_COMPLETE));
- return event;
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventStable()
-{
- QPID_LOG(trace, "Console Link to Broker Stable");
- BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::STABLE));
- return event;
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventQueryComplete(void* context, QueryResponsePtr response)
-{
- BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::QUERY_COMPLETE));
- event->context = context;
- event->queryResponse = response;
- return event;
-}
-
-BrokerEventImpl::Ptr BrokerProxyImpl::eventMethodResponse(void* context, MethodResponsePtr response)
-{
- BrokerEventImpl::Ptr event(new BrokerEventImpl(BrokerEvent::METHOD_RESPONSE));
- event->context = context;
- event->methodResponse = response;
- return event;
-}
-
-void BrokerProxyImpl::handleBrokerResponse(Buffer& inBuffer, uint32_t seq)
-{
- brokerId.decode(inBuffer);
- QPID_LOG(trace, "RCVD BrokerResponse seq=" << seq << " brokerId=" << brokerId);
- Mutex::ScopedLock _lock(lock);
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve());
- incOutstandingLH();
- Protocol::encodeHeader(outBuffer, Protocol::OP_PACKAGE_REQUEST, sequence);
- sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT PackageRequest seq=" << sequence);
-}
-
-void BrokerProxyImpl::handlePackageIndication(Buffer& inBuffer, uint32_t seq)
-{
- string package;
-
- inBuffer.getShortString(package);
- QPID_LOG(trace, "RCVD PackageIndication seq=" << seq << " package=" << package);
- console.impl->learnPackage(package);
-
- Mutex::ScopedLock _lock(lock);
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve());
- incOutstandingLH();
- Protocol::encodeHeader(outBuffer, Protocol::OP_CLASS_QUERY, sequence);
- outBuffer.putShortString(package);
- sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT ClassQuery seq=" << sequence << " package=" << package);
-}
-
-void BrokerProxyImpl::handleCommandComplete(Buffer& inBuffer, uint32_t seq)
-{
- string text;
- uint32_t code = inBuffer.getLong();
- inBuffer.getShortString(text);
- QPID_LOG(trace, "RCVD CommandComplete seq=" << seq << " code=" << code << " text=" << text);
-}
-
-void BrokerProxyImpl::handleClassIndication(Buffer& inBuffer, uint32_t seq)
-{
- uint8_t kind = inBuffer.getOctet();
- auto_ptr<SchemaClassKey> classKey(SchemaClassKeyImpl::factory(inBuffer));
-
- QPID_LOG(trace, "RCVD ClassIndication seq=" << seq << " kind=" << (int) kind << " key=" << classKey->impl->str());
-
- if (!console.impl->haveClass(classKey.get())) {
- Mutex::ScopedLock _lock(lock);
- incOutstandingLH();
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve());
- Protocol::encodeHeader(outBuffer, Protocol::OP_SCHEMA_REQUEST, sequence);
- classKey->impl->encode(outBuffer);
- sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_KEY);
- QPID_LOG(trace, "SENT SchemaRequest seq=" << sequence <<" key=" << classKey->impl->str());
- }
-}
-
-MethodResponsePtr BrokerProxyImpl::handleMethodResponse(Buffer& inBuffer, uint32_t seq, const SchemaMethod* schema)
-{
- MethodResponsePtr response(MethodResponseImpl::factory(inBuffer, schema));
-
- QPID_LOG(trace, "RCVD MethodResponse seq=" << seq << " status=" << response->getStatus() << " text=" <<
- response->getException()->asString());
-
- return response;
-}
-
-void BrokerProxyImpl::handleHeartbeatIndication(Buffer& inBuffer, uint32_t seq, const string& routingKey)
-{
- vector<string> tokens = qpid::split(routingKey, ".");
- uint32_t agentBank;
- uint64_t timestamp;
-
- if (routingKey.empty() || tokens.size() != 4)
- agentBank = 0;
- else
- agentBank = ::atoi(tokens[3].c_str());
-
- timestamp = inBuffer.getLongLong();
- map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.find(agentBank);
- if (iter != agentList.end()) {
- console.impl->eventAgentHeartbeat(iter->second, timestamp);
- }
- QPID_LOG(trace, "RCVD HeartbeatIndication seq=" << seq << " agentBank=" << agentBank);
-}
-
-void BrokerProxyImpl::handleEventIndication(Buffer& inBuffer, uint32_t seq)
-{
- auto_ptr<SchemaClassKey> classKey(SchemaClassKeyImpl::factory(inBuffer));
- const SchemaEventClass *schema = console.impl->getEventClass(classKey.get());
- if (schema == 0) {
- QPID_LOG(trace, "No Schema Found for EventIndication. seq=" << seq << " key=" << classKey->impl->str());
- return;
- }
-
- EventPtr eptr(EventImpl::factory(schema, inBuffer));
-
- console.impl->eventEventReceived(eptr);
- QPID_LOG(trace, "RCVD EventIndication seq=" << seq << " key=" << classKey->impl->str());
-}
-
-void BrokerProxyImpl::handleSchemaResponse(Buffer& inBuffer, uint32_t seq)
-{
- SchemaObjectClass* oClassPtr;
- SchemaEventClass* eClassPtr;
- uint8_t kind = inBuffer.getOctet();
- const SchemaClassKey* key;
- if (kind == CLASS_OBJECT) {
- oClassPtr = SchemaObjectClassImpl::factory(inBuffer);
- console.impl->learnClass(oClassPtr);
- key = oClassPtr->getClassKey();
- QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=object key=" << key->impl->str());
-
- //
- // If we have just learned about the org.apache.qpid.broker:agent class, send a get
- // request for the current list of agents so we can have it on-hand before we declare
- // this session "stable".
- //
- if (key->impl->getClassName() == AGENT_CLASS && key->impl->getPackageName() == BROKER_PACKAGE) {
- Mutex::ScopedLock _lock(lock);
- incOutstandingLH();
- Buffer outBuffer(outputBuffer, MA_BUFFER_SIZE);
- uint32_t sequence(seqMgr.reserve());
- Protocol::encodeHeader(outBuffer, Protocol::OP_GET_QUERY, sequence);
- FieldTable ft;
- ft.setString("_class", AGENT_CLASS);
- ft.setString("_package", BROKER_PACKAGE);
- ft.encode(outBuffer);
- sendBufferLH(outBuffer, QMF_EXCHANGE, BROKER_AGENT_KEY);
- QPID_LOG(trace, "SENT GetQuery seq=" << sequence << " key=" << BROKER_AGENT_KEY);
- }
- } else if (kind == CLASS_EVENT) {
- eClassPtr = SchemaEventClassImpl::factory(inBuffer);
- console.impl->learnClass(eClassPtr);
- key = eClassPtr->getClassKey();
- QPID_LOG(trace, "RCVD SchemaResponse seq=" << seq << " kind=event key=" << key->impl->str());
- }
- else {
- QPID_LOG(error, "BrokerProxyImpl::handleSchemaResponse received unknown class kind: " << (int) kind);
- }
-}
-
-ObjectPtr BrokerProxyImpl::handleObjectIndication(Buffer& inBuffer, uint32_t seq, bool prop, bool stat)
-{
- auto_ptr<SchemaClassKey> classKey(SchemaClassKeyImpl::factory(inBuffer));
- QPID_LOG(trace, "RCVD ObjectIndication seq=" << seq << " key=" << classKey->impl->str());
-
- SchemaObjectClass* schema = console.impl->getSchema(classKey.get());
- if (schema == 0) {
- QPID_LOG(trace, "No Schema Found for ObjectIndication. seq=" << seq << " key=" << classKey->impl->str());
- return ObjectPtr();
- }
-
- ObjectPtr optr(ObjectImpl::factory(schema, this, inBuffer, prop, stat, true));
- if (prop && classKey->impl->getPackageName() == BROKER_PACKAGE && classKey->impl->getClassName() == AGENT_CLASS) {
- //
- // We've intercepted information about a remote agent... update the agent list accordingly
- //
- updateAgentList(optr);
- }
- return optr;
-}
-
-void BrokerProxyImpl::updateAgentList(ObjectPtr obj)
-{
- Value* value = obj->getValue("agentBank");
- Mutex::ScopedLock _lock(lock);
- if (value != 0 && value->isUint()) {
- uint32_t agentBank = value->asUint();
- if (obj->isDeleted()) {
- map<uint32_t, AgentProxyPtr>::iterator iter = agentList.find(agentBank);
- if (iter != agentList.end()) {
- AgentProxyPtr agent(iter->second);
- console.impl->eventAgentDeleted(agent);
- agentList.erase(agentBank);
- QPID_LOG(trace, "Agent at bank " << agentBank << " removed from agent list");
-
- //
- // Release all sequence numbers for requests in-flight to this agent.
- // Since the agent is no longer connected, these requests would not
- // otherwise complete.
- //
- agent->impl->releaseInFlight(seqMgr);
- }
- } else {
- Value* str = obj->getValue("label");
- string label;
- if (str != 0 && str->isString())
- label = str->asString();
- map<uint32_t, AgentProxyPtr>::const_iterator iter = agentList.find(agentBank);
- if (iter == agentList.end()) {
- AgentProxyPtr agent(AgentProxyImpl::factory(console, publicObject, agentBank, label));
- agentList[agentBank] = agent;
- console.impl->eventAgentAdded(agent);
- QPID_LOG(trace, "Agent '" << label << "' found at bank " << agentBank);
- }
- }
- }
-}
-
-void BrokerProxyImpl::incOutstandingLH()
-{
- requestsOutstanding++;
-}
-
-void BrokerProxyImpl::decOutstanding()
-{
- Mutex::ScopedLock _lock(lock);
- requestsOutstanding--;
- if (requestsOutstanding == 0 && !topicBound) {
- topicBound = true;
- for (vector<pair<string, string> >::const_iterator iter = console.impl->bindingList.begin();
- iter != console.impl->bindingList.end(); iter++) {
- string exchange(iter->first.empty() ? QMF_EXCHANGE : iter->first);
- string key(iter->second);
- eventQueue.push_back(eventBind(exchange, queueName, key));
- }
- eventQueue.push_back(eventStable());
- }
-}
-
-MethodResponseImpl::MethodResponseImpl(const MethodResponseImpl& from) :
- status(from.status), schema(from.schema)
-{
- if (from.exception.get())
- exception.reset(new Value(*(from.exception)));
- if (from.arguments.get())
- arguments.reset(new Value(*(from.arguments)));
-}
-
-MethodResponseImpl::MethodResponseImpl(Buffer& buf, const SchemaMethod* s) : schema(s)
-{
- string text;
-
- status = buf.getLong();
- buf.getMediumString(text);
- exception.reset(new Value(TYPE_LSTR));
- exception->setString(text.c_str());
-
- if (status != 0)
- return;
-
- arguments.reset(new Value(TYPE_MAP));
- int argCount(schema->getArgumentCount());
- for (int idx = 0; idx < argCount; idx++) {
- const SchemaArgument* arg = schema->getArgument(idx);
- if (arg->getDirection() == DIR_OUT || arg->getDirection() == DIR_IN_OUT) {
- Value* value(ValueImpl::factory(arg->getType(), buf));
- arguments->insert(arg->getName(), value);
- }
- }
-}
-
-MethodResponseImpl::MethodResponseImpl(uint32_t s, const string& text) : schema(0)
-{
- status = s;
- exception.reset(new Value(TYPE_LSTR));
- exception->setString(text.c_str());
-}
-
-MethodResponse* MethodResponseImpl::factory(Buffer& buf, const SchemaMethod* schema)
-{
- MethodResponseImpl* impl(new MethodResponseImpl(buf, schema));
- return new MethodResponse(impl);
-}
-
-MethodResponse* MethodResponseImpl::factory(uint32_t status, const std::string& text)
-{
- MethodResponseImpl* impl(new MethodResponseImpl(status, text));
- return new MethodResponse(impl);
-}
-
-bool StaticContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& routingKey, Buffer& buffer)
-{
- ObjectPtr object;
- bool completeContext = false;
-
- if (opcode == Protocol::OP_BROKER_RESPONSE) {
- broker.handleBrokerResponse(buffer, sequence);
- completeContext = true;
- }
- else if (opcode == Protocol::OP_COMMAND_COMPLETE) {
- broker.handleCommandComplete(buffer, sequence);
- completeContext = true;
- }
- else if (opcode == Protocol::OP_SCHEMA_RESPONSE) {
- broker.handleSchemaResponse(buffer, sequence);
- completeContext = true;
- }
- else if (opcode == Protocol::OP_PACKAGE_INDICATION)
- broker.handlePackageIndication(buffer, sequence);
- else if (opcode == Protocol::OP_CLASS_INDICATION)
- broker.handleClassIndication(buffer, sequence);
- else if (opcode == Protocol::OP_HEARTBEAT_INDICATION)
- broker.handleHeartbeatIndication(buffer, sequence, routingKey);
- else if (opcode == Protocol::OP_EVENT_INDICATION)
- broker.handleEventIndication(buffer, sequence);
- else if (opcode == Protocol::OP_PROPERTY_INDICATION) {
- object = broker.handleObjectIndication(buffer, sequence, true, false);
- broker.console.impl->eventObjectUpdate(object, true, false);
- }
- else if (opcode == Protocol::OP_STATISTIC_INDICATION) {
- object = broker.handleObjectIndication(buffer, sequence, false, true);
- broker.console.impl->eventObjectUpdate(object, false, true);
- }
- else if (opcode == Protocol::OP_OBJECT_INDICATION) {
- object = broker.handleObjectIndication(buffer, sequence, true, true);
- broker.console.impl->eventObjectUpdate(object, true, true);
- }
- else {
- QPID_LOG(trace, "StaticContext::handleMessage invalid opcode: " << opcode);
- completeContext = true;
- }
-
- return completeContext;
-}
-
-void QueryContext::reserve()
-{
- Mutex::ScopedLock _lock(lock);
- requestsOutstanding++;
-}
-
-void QueryContext::release()
-{
- {
- Mutex::ScopedLock _lock(lock);
- if (--requestsOutstanding > 0)
- return;
- }
-
- Mutex::ScopedLock _block(broker.lock);
- broker.eventQueue.push_back(broker.eventQueryComplete(userContext, queryResponse));
-}
-
-bool QueryContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& /*routingKey*/, Buffer& buffer)
-{
- bool completeContext = false;
- ObjectPtr object;
-
- if (opcode == Protocol::OP_COMMAND_COMPLETE) {
- broker.handleCommandComplete(buffer, sequence);
- completeContext = true;
-
- //
- // Visit each agent and remove the sequence from that agent's in-flight list.
- // This could be made more efficient because only one agent will have this sequence
- // in its list.
- //
- map<uint32_t, AgentProxyPtr> copy;
- {
- Mutex::ScopedLock _block(broker.lock);
- copy = broker.agentList;
- }
- for (map<uint32_t, AgentProxyPtr>::iterator iter = copy.begin(); iter != copy.end(); iter++)
- iter->second->impl->delSequence(sequence);
- }
- else if (opcode == Protocol::OP_OBJECT_INDICATION) {
- object = broker.handleObjectIndication(buffer, sequence, true, true);
- if (object.get() != 0)
- queryResponse->impl->results.push_back(object);
- }
- else {
- QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode);
- completeContext = true;
- }
-
- return completeContext;
-}
-
-void MethodContext::release()
-{
- Mutex::ScopedLock _block(broker.lock);
- broker.eventQueue.push_back(broker.eventMethodResponse(userContext, methodResponse));
-}
-
-bool MethodContext::handleMessage(uint8_t opcode, uint32_t sequence, const string& /*routingKey*/, Buffer& buffer)
-{
- if (opcode == Protocol::OP_METHOD_RESPONSE)
- methodResponse = broker.handleMethodResponse(buffer, sequence, schema);
- else
- QPID_LOG(trace, "QueryContext::handleMessage invalid opcode: " << opcode);
-
- return true;
-}
-
-
-//==================================================================
-// Wrappers
-//==================================================================
-
-AgentProxy::AgentProxy(AgentProxyImpl* i) : impl(i) {}
-AgentProxy::AgentProxy(const AgentProxy& from) : impl(new AgentProxyImpl(*(from.impl))) {}
-AgentProxy::~AgentProxy() { delete impl; }
-const char* AgentProxy::getLabel() const { return impl->getLabel().c_str(); }
-uint32_t AgentProxy::getBrokerBank() const { return impl->getBrokerBank(); }
-uint32_t AgentProxy::getAgentBank() const { return impl->getAgentBank(); }
-
-BrokerProxy::BrokerProxy(Console& console) : impl(new BrokerProxyImpl(*this, console)) {}
-BrokerProxy::~BrokerProxy() { delete impl; }
-void BrokerProxy::sessionOpened(SessionHandle& sh) { impl->sessionOpened(sh); }
-void BrokerProxy::sessionClosed() { impl->sessionClosed(); }
-void BrokerProxy::startProtocol() { impl->startProtocol(); }
-void BrokerProxy::handleRcvMessage(Message& message) { impl->handleRcvMessage(message); }
-bool BrokerProxy::getXmtMessage(Message& item) const { return impl->getXmtMessage(item); }
-void BrokerProxy::popXmt() { impl->popXmt(); }
-bool BrokerProxy::getEvent(BrokerEvent& event) const { return impl->getEvent(event); }
-void BrokerProxy::popEvent() { impl->popEvent(); }
-uint32_t BrokerProxy::agentCount() const { return impl->agentCount(); }
-const AgentProxy* BrokerProxy::getAgent(uint32_t idx) const { return impl->getAgent(idx); }
-void BrokerProxy::sendQuery(const Query& query, void* context, const AgentProxy* agent) { impl->sendQuery(query, context, agent); }
-
-MethodResponse::MethodResponse(const MethodResponse& from) : impl(new MethodResponseImpl(*(from.impl))) {}
-MethodResponse::MethodResponse(MethodResponseImpl* i) : impl(i) {}
-MethodResponse::~MethodResponse() {}
-uint32_t MethodResponse::getStatus() const { return impl->getStatus(); }
-const Value* MethodResponse::getException() const { return impl->getException(); }
-const Value* MethodResponse::getArgs() const { return impl->getArgs(); }
-
-QueryResponse::QueryResponse(QueryResponseImpl* i) : impl(i) {}
-QueryResponse::~QueryResponse() {}
-uint32_t QueryResponse::getStatus() const { return impl->getStatus(); }
-const Value* QueryResponse::getException() const { return impl->getException(); }
-uint32_t QueryResponse::getObjectCount() const { return impl->getObjectCount(); }
-const Object* QueryResponse::getObject(uint32_t idx) const { return impl->getObject(idx); }
-
diff --git a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h b/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h
deleted file mode 100644
index 0542b67dbb..0000000000
--- a/qpid/cpp/src/qmf/engine/BrokerProxyImpl.h
+++ /dev/null
@@ -1,241 +0,0 @@
-#ifndef _QmfEngineBrokerProxyImpl_
-#define _QmfEngineBrokerProxyImpl_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/Console.h"
-#include "qmf/engine/ObjectImpl.h"
-#include "qmf/engine/EventImpl.h"
-#include "qmf/engine/SchemaImpl.h"
-#include "qmf/engine/ValueImpl.h"
-#include "qmf/engine/QueryImpl.h"
-#include "qmf/engine/SequenceManager.h"
-#include "qmf/engine/MessageImpl.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/framing/Uuid.h"
-#include "qpid/sys/Mutex.h"
-#include "boost/shared_ptr.hpp"
-#include "boost/noncopyable.hpp"
-#include <memory>
-#include <string>
-#include <deque>
-#include <map>
-#include <set>
-#include <vector>
-
-namespace qmf {
-namespace engine {
-
- typedef boost::shared_ptr<MethodResponse> MethodResponsePtr;
- struct MethodResponseImpl {
- uint32_t status;
- const SchemaMethod* schema;
- std::auto_ptr<Value> exception;
- std::auto_ptr<Value> arguments;
-
- MethodResponseImpl(const MethodResponseImpl& from);
- MethodResponseImpl(qpid::framing::Buffer& buf, const SchemaMethod* schema);
- MethodResponseImpl(uint32_t status, const std::string& text);
- static MethodResponse* factory(qpid::framing::Buffer& buf, const SchemaMethod* schema);
- static MethodResponse* factory(uint32_t status, const std::string& text);
- ~MethodResponseImpl() {}
- uint32_t getStatus() const { return status; }
- const Value* getException() const { return exception.get(); }
- const Value* getArgs() const { return arguments.get(); }
- };
-
- typedef boost::shared_ptr<QueryResponse> QueryResponsePtr;
- struct QueryResponseImpl {
- uint32_t status;
- std::auto_ptr<Value> exception;
- std::vector<ObjectPtr> results;
-
- QueryResponseImpl() : status(0) {}
- static QueryResponse* factory() {
- QueryResponseImpl* impl(new QueryResponseImpl());
- return new QueryResponse(impl);
- }
- ~QueryResponseImpl() {}
- uint32_t getStatus() const { return status; }
- const Value* getException() const { return exception.get(); }
- uint32_t getObjectCount() const { return results.size(); }
- const Object* getObject(uint32_t idx) const;
- };
-
- struct BrokerEventImpl {
- typedef boost::shared_ptr<BrokerEventImpl> Ptr;
- BrokerEvent::EventKind kind;
- std::string name;
- std::string exchange;
- std::string bindingKey;
- void* context;
- QueryResponsePtr queryResponse;
- MethodResponsePtr methodResponse;
-
- BrokerEventImpl(BrokerEvent::EventKind k) : kind(k), context(0) {}
- ~BrokerEventImpl() {}
- BrokerEvent copy();
- };
-
- typedef boost::shared_ptr<AgentProxy> AgentProxyPtr;
- struct AgentProxyImpl {
- Console& console;
- BrokerProxy& broker;
- uint32_t agentBank;
- std::string label;
- std::set<uint32_t> inFlightSequences;
-
- AgentProxyImpl(Console& c, BrokerProxy& b, uint32_t ab, const std::string& l) : console(c), broker(b), agentBank(ab), label(l) {}
- static AgentProxy* factory(Console& c, BrokerProxy& b, uint32_t ab, const std::string& l) {
- AgentProxyImpl* impl(new AgentProxyImpl(c, b, ab, l));
- return new AgentProxy(impl);
- }
- ~AgentProxyImpl() {}
- const std::string& getLabel() const { return label; }
- uint32_t getBrokerBank() const { return 1; }
- uint32_t getAgentBank() const { return agentBank; }
- void addSequence(uint32_t seq) { inFlightSequences.insert(seq); }
- void delSequence(uint32_t seq) { inFlightSequences.erase(seq); }
- void releaseInFlight(SequenceManager& seqMgr) {
- for (std::set<uint32_t>::iterator iter = inFlightSequences.begin(); iter != inFlightSequences.end(); iter++)
- seqMgr.release(*iter);
- inFlightSequences.clear();
- }
- };
-
- class BrokerProxyImpl : public boost::noncopyable {
- public:
- BrokerProxyImpl(BrokerProxy& pub, Console& _console);
- ~BrokerProxyImpl() {}
-
- void sessionOpened(SessionHandle& sh);
- void sessionClosed();
- void startProtocol();
-
- void sendBufferLH(qpid::framing::Buffer& buf, const std::string& destination, const std::string& routingKey);
- void handleRcvMessage(Message& message);
- bool getXmtMessage(Message& item) const;
- void popXmt();
-
- bool getEvent(BrokerEvent& event) const;
- void popEvent();
-
- uint32_t agentCount() const;
- const AgentProxy* getAgent(uint32_t idx) const;
- void sendQuery(const Query& query, void* context, const AgentProxy* agent);
- bool sendGetRequestLH(SequenceContext::Ptr queryContext, const Query& query, const AgentProxy* agent);
- std::string encodeMethodArguments(const SchemaMethod* schema, const Value* args, qpid::framing::Buffer& buffer);
- std::string encodedSizeMethodArguments(const SchemaMethod* schema, const Value* args, uint32_t& size);
- void sendMethodRequest(ObjectId* oid, const SchemaObjectClass* cls, const std::string& method, const Value* args, void* context);
-
- void addBinding(const std::string& exchange, const std::string& key);
- void staticRelease() { decOutstanding(); }
-
- private:
- friend struct StaticContext;
- friend struct QueryContext;
- friend struct MethodContext;
- BrokerProxy& publicObject;
- mutable qpid::sys::Mutex lock;
- Console& console;
- std::string queueName;
- qpid::framing::Uuid brokerId;
- SequenceManager seqMgr;
- uint32_t requestsOutstanding;
- bool topicBound;
- std::map<uint32_t, AgentProxyPtr> agentList;
- std::deque<MessageImpl::Ptr> xmtQueue;
- std::deque<BrokerEventImpl::Ptr> eventQueue;
-
-# define MA_BUFFER_SIZE 65536
- char outputBuffer[MA_BUFFER_SIZE];
-
- BrokerEventImpl::Ptr eventDeclareQueue(const std::string& queueName);
- BrokerEventImpl::Ptr eventBind(const std::string& exchange, const std::string& queue, const std::string& key);
- BrokerEventImpl::Ptr eventSetupComplete();
- BrokerEventImpl::Ptr eventStable();
- BrokerEventImpl::Ptr eventQueryComplete(void* context, QueryResponsePtr response);
- BrokerEventImpl::Ptr eventMethodResponse(void* context, MethodResponsePtr response);
-
- void handleBrokerResponse(qpid::framing::Buffer& inBuffer, uint32_t seq);
- void handlePackageIndication(qpid::framing::Buffer& inBuffer, uint32_t seq);
- void handleCommandComplete(qpid::framing::Buffer& inBuffer, uint32_t seq);
- void handleClassIndication(qpid::framing::Buffer& inBuffer, uint32_t seq);
- MethodResponsePtr handleMethodResponse(qpid::framing::Buffer& inBuffer, uint32_t seq, const SchemaMethod* schema);
- void handleHeartbeatIndication(qpid::framing::Buffer& inBuffer, uint32_t seq, const std::string& routingKey);
- void handleEventIndication(qpid::framing::Buffer& inBuffer, uint32_t seq);
- void handleSchemaResponse(qpid::framing::Buffer& inBuffer, uint32_t seq);
- ObjectPtr handleObjectIndication(qpid::framing::Buffer& inBuffer, uint32_t seq, bool prop, bool stat);
- void updateAgentList(ObjectPtr obj);
- void incOutstandingLH();
- void decOutstanding();
- };
-
- //
- // StaticContext is used to handle:
- //
- // 1) Responses to console-level requests (for schema info, etc.)
- // 2) Unsolicited messages from agents (events, published updates, etc.)
- //
- struct StaticContext : public SequenceContext {
- StaticContext(BrokerProxyImpl& b) : broker(b) {}
- virtual ~StaticContext() {}
- void reserve() {}
- void release() { broker.staticRelease(); }
- bool handleMessage(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer);
- BrokerProxyImpl& broker;
- };
-
- //
- // QueryContext is used to track and handle responses associated with a single Get Query
- //
- struct QueryContext : public SequenceContext {
- QueryContext(BrokerProxyImpl& b, void* u) :
- broker(b), userContext(u), requestsOutstanding(0), queryResponse(QueryResponseImpl::factory()) {}
- virtual ~QueryContext() {}
- void reserve();
- void release();
- bool handleMessage(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer);
-
- mutable qpid::sys::Mutex lock;
- BrokerProxyImpl& broker;
- void* userContext;
- uint32_t requestsOutstanding;
- QueryResponsePtr queryResponse;
- };
-
- struct MethodContext : public SequenceContext {
- MethodContext(BrokerProxyImpl& b, void* u, const SchemaMethod* s) : broker(b), userContext(u), schema(s) {}
- virtual ~MethodContext() {}
- void reserve() {}
- void release();
- bool handleMessage(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer);
-
- BrokerProxyImpl& broker;
- void* userContext;
- const SchemaMethod* schema;
- MethodResponsePtr methodResponse;
- };
-
-}
-}
-
-#endif
-
diff --git a/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp b/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp
deleted file mode 100644
index 22a65f28ca..0000000000
--- a/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.cpp
+++ /dev/null
@@ -1,278 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/ConnectionSettingsImpl.h"
-#include "qmf/engine/Typecode.h"
-
-using namespace std;
-using namespace qmf::engine;
-using namespace qpid;
-
-const string attrProtocol("protocol");
-const string attrHost("host");
-const string attrPort("port");
-const string attrVirtualhost("virtualhost");
-const string attrUsername("username");
-const string attrPassword("password");
-const string attrMechanism("mechanism");
-const string attrLocale("locale");
-const string attrHeartbeat("heartbeat");
-const string attrMaxChannels("maxChannels");
-const string attrMaxFrameSize("maxFrameSize");
-const string attrBounds("bounds");
-const string attrTcpNoDelay("tcpNoDelay");
-const string attrService("service");
-const string attrMinSsf("minSsf");
-const string attrMaxSsf("maxSsf");
-const string attrRetryDelayMin("retryDelayMin");
-const string attrRetryDelayMax("retryDelayMax");
-const string attrRetryDelayFactor("retryDelayFactor");
-const string attrSendUserId("sendUserId");
-
-ConnectionSettingsImpl::ConnectionSettingsImpl() :
- retryDelayMin(1), retryDelayMax(64), retryDelayFactor(2), sendUserId(true)
-{
-}
-
-ConnectionSettingsImpl::ConnectionSettingsImpl(const string& /*url*/) :
- retryDelayMin(1), retryDelayMax(64), retryDelayFactor(2), sendUserId(true)
-{
- // TODO: Parse the URL
-}
-
-bool ConnectionSettingsImpl::setAttr(const string& key, const Value& value)
-{
- if (key == attrProtocol) clientSettings.protocol = value.asString();
- else if (key == attrHost) clientSettings.host = value.asString();
- else if (key == attrPort) clientSettings.port = value.asUint();
- else if (key == attrVirtualhost) clientSettings.virtualhost = value.asString();
- else if (key == attrUsername) clientSettings.username = value.asString();
- else if (key == attrPassword) clientSettings.password = value.asString();
- else if (key == attrMechanism) clientSettings.mechanism = value.asString();
- else if (key == attrLocale) clientSettings.locale = value.asString();
- else if (key == attrHeartbeat) clientSettings.heartbeat = value.asUint();
- else if (key == attrMaxChannels) clientSettings.maxChannels = value.asUint();
- else if (key == attrMaxFrameSize) clientSettings.maxFrameSize = value.asUint();
- else if (key == attrBounds) clientSettings.bounds = value.asUint();
- else if (key == attrTcpNoDelay) clientSettings.tcpNoDelay = value.asBool();
- else if (key == attrService) clientSettings.service = value.asString();
- else if (key == attrMinSsf) clientSettings.minSsf = value.asUint();
- else if (key == attrMaxSsf) clientSettings.maxSsf = value.asUint();
-
- else if (key == attrRetryDelayMin) retryDelayMin = value.asUint();
- else if (key == attrRetryDelayMax) retryDelayMax = value.asUint();
- else if (key == attrRetryDelayFactor) retryDelayFactor = value.asUint();
- else if (key == attrSendUserId) sendUserId = value.asBool();
- else
- return false;
- return true;
-}
-
-Value ConnectionSettingsImpl::getAttr(const string& key) const
-{
- Value strval(TYPE_LSTR);
- Value intval(TYPE_UINT32);
- Value boolval(TYPE_BOOL);
-
- if (key == attrProtocol) {
- strval.setString(clientSettings.protocol.c_str());
- return strval;
- }
-
- if (key == attrHost) {
- strval.setString(clientSettings.host.c_str());
- return strval;
- }
-
- if (key == attrPort) {
- intval.setUint(clientSettings.port);
- return intval;
- }
-
- if (key == attrVirtualhost) {
- strval.setString(clientSettings.virtualhost.c_str());
- return strval;
- }
-
- if (key == attrUsername) {
- strval.setString(clientSettings.username.c_str());
- return strval;
- }
-
- if (key == attrPassword) {
- strval.setString(clientSettings.password.c_str());
- return strval;
- }
-
- if (key == attrMechanism) {
- strval.setString(clientSettings.mechanism.c_str());
- return strval;
- }
-
- if (key == attrLocale) {
- strval.setString(clientSettings.locale.c_str());
- return strval;
- }
-
- if (key == attrHeartbeat) {
- intval.setUint(clientSettings.heartbeat);
- return intval;
- }
-
- if (key == attrMaxChannels) {
- intval.setUint(clientSettings.maxChannels);
- return intval;
- }
-
- if (key == attrMaxFrameSize) {
- intval.setUint(clientSettings.maxFrameSize);
- return intval;
- }
-
- if (key == attrBounds) {
- intval.setUint(clientSettings.bounds);
- return intval;
- }
-
- if (key == attrTcpNoDelay) {
- boolval.setBool(clientSettings.tcpNoDelay);
- return boolval;
- }
-
- if (key == attrService) {
- strval.setString(clientSettings.service.c_str());
- return strval;
- }
-
- if (key == attrMinSsf) {
- intval.setUint(clientSettings.minSsf);
- return intval;
- }
-
- if (key == attrMaxSsf) {
- intval.setUint(clientSettings.maxSsf);
- return intval;
- }
-
- if (key == attrRetryDelayMin) {
- intval.setUint(retryDelayMin);
- return intval;
- }
-
- if (key == attrRetryDelayMax) {
- intval.setUint(retryDelayMax);
- return intval;
- }
-
- if (key == attrRetryDelayFactor) {
- intval.setUint(retryDelayFactor);
- return intval;
- }
-
- if (key == attrSendUserId) {
- boolval.setBool(sendUserId);
- return boolval;
- }
-
- return strval;
-}
-
-const string& ConnectionSettingsImpl::getAttrString() const
-{
- // TODO: build and return attribute string
- return attrString;
-}
-
-void ConnectionSettingsImpl::transportTcp(uint16_t port)
-{
- clientSettings.protocol = "tcp";
- clientSettings.port = port;
-}
-
-void ConnectionSettingsImpl::transportSsl(uint16_t port)
-{
- clientSettings.protocol = "ssl";
- clientSettings.port = port;
-}
-
-void ConnectionSettingsImpl::transportRdma(uint16_t port)
-{
- clientSettings.protocol = "rdma";
- clientSettings.port = port;
-}
-
-void ConnectionSettingsImpl::authAnonymous(const string& username)
-{
- clientSettings.mechanism = "ANONYMOUS";
- clientSettings.username = username;
-}
-
-void ConnectionSettingsImpl::authPlain(const string& username, const string& password)
-{
- clientSettings.mechanism = "PLAIN";
- clientSettings.username = username;
- clientSettings.password = password;
-}
-
-void ConnectionSettingsImpl::authGssapi(const string& serviceName, uint32_t minSsf, uint32_t maxSsf)
-{
- clientSettings.mechanism = "GSSAPI";
- clientSettings.service = serviceName;
- clientSettings.minSsf = minSsf;
- clientSettings.maxSsf = maxSsf;
-}
-
-void ConnectionSettingsImpl::setRetry(int delayMin, int delayMax, int delayFactor)
-{
- retryDelayMin = delayMin;
- retryDelayMax = delayMax;
- retryDelayFactor = delayFactor;
-}
-
-const client::ConnectionSettings& ConnectionSettingsImpl::getClientSettings() const
-{
- return clientSettings;
-}
-
-void ConnectionSettingsImpl::getRetrySettings(int* min, int* max, int* factor) const
-{
- *min = retryDelayMin;
- *max = retryDelayMax;
- *factor = retryDelayFactor;
-}
-
-//==================================================================
-// Wrappers
-//==================================================================
-
-ConnectionSettings::ConnectionSettings(const ConnectionSettings& from) { impl = new ConnectionSettingsImpl(*from.impl); }
-ConnectionSettings::ConnectionSettings() { impl = new ConnectionSettingsImpl(); }
-ConnectionSettings::ConnectionSettings(const char* url) { impl = new ConnectionSettingsImpl(url); }
-ConnectionSettings::~ConnectionSettings() { delete impl; }
-bool ConnectionSettings::setAttr(const char* key, const Value& value) { return impl->setAttr(key, value); }
-Value ConnectionSettings::getAttr(const char* key) const { return impl->getAttr(key); }
-const char* ConnectionSettings::getAttrString() const { return impl->getAttrString().c_str(); }
-void ConnectionSettings::transportTcp(uint16_t port) { impl->transportTcp(port); }
-void ConnectionSettings::transportSsl(uint16_t port) { impl->transportSsl(port); }
-void ConnectionSettings::transportRdma(uint16_t port) { impl->transportRdma(port); }
-void ConnectionSettings::authAnonymous(const char* username) { impl->authAnonymous(username); }
-void ConnectionSettings::authPlain(const char* username, const char* password) { impl->authPlain(username, password); }
-void ConnectionSettings::authGssapi(const char* serviceName, uint32_t minSsf, uint32_t maxSsf) { impl->authGssapi(serviceName, minSsf, maxSsf); }
-void ConnectionSettings::setRetry(int delayMin, int delayMax, int delayFactor) { impl->setRetry(delayMin, delayMax, delayFactor); }
-
diff --git a/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.h b/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.h
deleted file mode 100644
index 98bf87868b..0000000000
--- a/qpid/cpp/src/qmf/engine/ConnectionSettingsImpl.h
+++ /dev/null
@@ -1,63 +0,0 @@
-#ifndef _QmfEngineConnectionSettingsImpl_
-#define _QmfEngineConnectionSettingsImpl_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/ConnectionSettings.h"
-#include "qmf/engine/Value.h"
-#include "qpid/client/ConnectionSettings.h"
-#include <string>
-#include <map>
-
-namespace qmf {
-namespace engine {
-
- class ConnectionSettingsImpl {
- qpid::client::ConnectionSettings clientSettings;
- mutable std::string attrString;
- int retryDelayMin;
- int retryDelayMax;
- int retryDelayFactor;
- bool sendUserId;
-
- public:
- ConnectionSettingsImpl();
- ConnectionSettingsImpl(const std::string& url);
- ~ConnectionSettingsImpl() {}
- bool setAttr(const std::string& key, const Value& value);
- Value getAttr(const std::string& key) const;
- const std::string& getAttrString() const;
- void transportTcp(uint16_t port);
- void transportSsl(uint16_t port);
- void transportRdma(uint16_t port);
- void authAnonymous(const std::string& username);
- void authPlain(const std::string& username, const std::string& password);
- void authGssapi(const std::string& serviceName, uint32_t minSsf, uint32_t maxSsf);
- void setRetry(int delayMin, int delayMax, int delayFactor);
-
- const qpid::client::ConnectionSettings& getClientSettings() const;
- void getRetrySettings(int* delayMin, int* delayMax, int* delayFactor) const;
- bool getSendUserId() const { return sendUserId; }
- };
-
-}
-}
-
-#endif
diff --git a/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp b/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp
deleted file mode 100644
index 4a5da31bdc..0000000000
--- a/qpid/cpp/src/qmf/engine/ConsoleImpl.cpp
+++ /dev/null
@@ -1,458 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/ConsoleImpl.h"
-#include "qmf/engine/MessageImpl.h"
-#include "qmf/engine/SchemaImpl.h"
-#include "qmf/engine/Typecode.h"
-#include "qmf/engine/ObjectImpl.h"
-#include "qmf/engine/ObjectIdImpl.h"
-#include "qmf/engine/QueryImpl.h"
-#include "qmf/engine/ValueImpl.h"
-#include "qmf/engine/Protocol.h"
-#include "qmf/engine/SequenceManager.h"
-#include "qmf/engine/BrokerProxyImpl.h"
-#include <qpid/framing/Buffer.h>
-#include <qpid/framing/Uuid.h>
-#include <qpid/framing/FieldTable.h>
-#include <qpid/framing/FieldValue.h>
-#include <qpid/log/Statement.h>
-#include <qpid/sys/Time.h>
-#include <qpid/sys/SystemInfo.h>
-#include <string.h>
-#include <iostream>
-#include <fstream>
-
-using namespace std;
-using namespace qmf::engine;
-using namespace qpid::framing;
-using namespace qpid::sys;
-
-namespace {
- const char* QMF_EXCHANGE = "qpid.management";
-}
-
-#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
-
-ConsoleEvent ConsoleEventImpl::copy()
-{
- ConsoleEvent item;
-
- ::memset(&item, 0, sizeof(ConsoleEvent));
- item.kind = kind;
- item.agent = agent.get();
- item.classKey = classKey;
- item.object = object.get();
- item.context = context;
- item.event = event.get();
- item.timestamp = timestamp;
- item.hasProps = hasProps;
- item.hasStats = hasStats;
-
- STRING_REF(name);
-
- return item;
-}
-
-ConsoleImpl::ConsoleImpl(const ConsoleSettings& s) : settings(s)
-{
- bindingList.push_back(pair<string, string>(string(), "schema.#"));
- if (settings.rcvObjects && settings.rcvEvents && settings.rcvHeartbeats && !settings.userBindings) {
- bindingList.push_back(pair<string, string>(string(), "console.#"));
- } else {
- if (settings.rcvObjects && !settings.userBindings)
- bindingList.push_back(pair<string, string>(string(), "console.obj.#"));
- else
- bindingList.push_back(pair<string, string>(string(), "console.obj.*.*.org.apache.qpid.broker.agent"));
- if (settings.rcvEvents)
- bindingList.push_back(pair<string, string>(string(), "console.event.#"));
- if (settings.rcvHeartbeats)
- bindingList.push_back(pair<string, string>(string(), "console.heartbeat.#"));
- }
-}
-
-ConsoleImpl::~ConsoleImpl()
-{
- // This function intentionally left blank.
-}
-
-bool ConsoleImpl::getEvent(ConsoleEvent& event) const
-{
- Mutex::ScopedLock _lock(lock);
- if (eventQueue.empty())
- return false;
- event = eventQueue.front()->copy();
- return true;
-}
-
-void ConsoleImpl::popEvent()
-{
- Mutex::ScopedLock _lock(lock);
- if (!eventQueue.empty())
- eventQueue.pop_front();
-}
-
-void ConsoleImpl::addConnection(BrokerProxy& broker, void* /*context*/)
-{
- Mutex::ScopedLock _lock(lock);
- brokerList.push_back(broker.impl);
-}
-
-void ConsoleImpl::delConnection(BrokerProxy& broker)
-{
- Mutex::ScopedLock _lock(lock);
- for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
- iter != brokerList.end(); iter++)
- if (*iter == broker.impl) {
- brokerList.erase(iter);
- break;
- }
-}
-
-uint32_t ConsoleImpl::packageCount() const
-{
- Mutex::ScopedLock _lock(lock);
- return packages.size();
-}
-
-const string& ConsoleImpl::getPackageName(uint32_t idx) const
-{
- const static string empty;
-
- Mutex::ScopedLock _lock(lock);
- if (idx >= packages.size())
- return empty;
-
- PackageList::const_iterator iter = packages.begin();
- for (uint32_t i = 0; i < idx; i++) iter++;
- return iter->first;
-}
-
-uint32_t ConsoleImpl::classCount(const char* packageName) const
-{
- Mutex::ScopedLock _lock(lock);
- PackageList::const_iterator pIter = packages.find(packageName);
- if (pIter == packages.end())
- return 0;
-
- const ObjectClassList& oList = pIter->second.first;
- const EventClassList& eList = pIter->second.second;
-
- return oList.size() + eList.size();
-}
-
-const SchemaClassKey* ConsoleImpl::getClass(const char* packageName, uint32_t idx) const
-{
- Mutex::ScopedLock _lock(lock);
- PackageList::const_iterator pIter = packages.find(packageName);
- if (pIter == packages.end())
- return 0;
-
- const ObjectClassList& oList = pIter->second.first;
- const EventClassList& eList = pIter->second.second;
- uint32_t count = 0;
-
- for (ObjectClassList::const_iterator oIter = oList.begin();
- oIter != oList.end(); oIter++) {
- if (count == idx)
- return oIter->second->getClassKey();
- count++;
- }
-
- for (EventClassList::const_iterator eIter = eList.begin();
- eIter != eList.end(); eIter++) {
- if (count == idx)
- return eIter->second->getClassKey();
- count++;
- }
-
- return 0;
-}
-
-ClassKind ConsoleImpl::getClassKind(const SchemaClassKey* key) const
-{
- Mutex::ScopedLock _lock(lock);
- PackageList::const_iterator pIter = packages.find(key->getPackageName());
- if (pIter == packages.end())
- return CLASS_OBJECT;
-
- const EventClassList& eList = pIter->second.second;
- if (eList.find(key) != eList.end())
- return CLASS_EVENT;
- return CLASS_OBJECT;
-}
-
-const SchemaObjectClass* ConsoleImpl::getObjectClass(const SchemaClassKey* key) const
-{
- Mutex::ScopedLock _lock(lock);
- PackageList::const_iterator pIter = packages.find(key->getPackageName());
- if (pIter == packages.end())
- return 0;
-
- const ObjectClassList& oList = pIter->second.first;
- ObjectClassList::const_iterator iter = oList.find(key);
- if (iter == oList.end())
- return 0;
- return iter->second;
-}
-
-const SchemaEventClass* ConsoleImpl::getEventClass(const SchemaClassKey* key) const
-{
- Mutex::ScopedLock _lock(lock);
- PackageList::const_iterator pIter = packages.find(key->getPackageName());
- if (pIter == packages.end())
- return 0;
-
- const EventClassList& eList = pIter->second.second;
- EventClassList::const_iterator iter = eList.find(key);
- if (iter == eList.end())
- return 0;
- return iter->second;
-}
-
-void ConsoleImpl::bindPackage(const char* packageName)
-{
- stringstream key;
- key << "console.obj.*.*." << packageName << ".#";
- Mutex::ScopedLock _lock(lock);
- bindingList.push_back(pair<string, string>(string(), key.str()));
- for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
- iter != brokerList.end(); iter++)
- (*iter)->addBinding(QMF_EXCHANGE, key.str());
-}
-
-void ConsoleImpl::bindClass(const SchemaClassKey* classKey)
-{
- stringstream key;
- key << "console.obj.*.*." << classKey->getPackageName() << "." << classKey->getClassName() << ".#";
- Mutex::ScopedLock _lock(lock);
- bindingList.push_back(pair<string, string>(string(), key.str()));
- for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
- iter != brokerList.end(); iter++)
- (*iter)->addBinding(QMF_EXCHANGE, key.str());
-}
-
-void ConsoleImpl::bindClass(const char* packageName, const char* className)
-{
- stringstream key;
- key << "console.obj.*.*." << packageName << "." << className << ".#";
- Mutex::ScopedLock _lock(lock);
- bindingList.push_back(pair<string, string>(string(), key.str()));
- for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
- iter != brokerList.end(); iter++)
- (*iter)->addBinding(QMF_EXCHANGE, key.str());
-}
-
-
-void ConsoleImpl::bindEvent(const SchemaClassKey* classKey)
-{
- bindEvent(classKey->getPackageName(), classKey->getClassName());
-}
-
-void ConsoleImpl::bindEvent(const char* packageName, const char* eventName)
-{
- if (!settings.userBindings) throw qpid::Exception("Console not configured for userBindings.");
- if (settings.rcvEvents) throw qpid::Exception("Console already configured to receive all events.");
-
- stringstream key;
- key << "console.event.*.*." << packageName;
- if (eventName && *eventName) {
- key << "." << eventName << ".#";
- } else {
- key << ".#";
- }
-
- Mutex::ScopedLock _lock(lock);
- bindingList.push_back(pair<string, string>(string(), key.str()));
- for (vector<BrokerProxyImpl*>::iterator iter = brokerList.begin();
- iter != brokerList.end(); iter++)
- (*iter)->addBinding(QMF_EXCHANGE, key.str());
-}
-
-/*
-void ConsoleImpl::startSync(const Query& query, void* context, SyncQuery& sync)
-{
-}
-
-void ConsoleImpl::touchSync(SyncQuery& sync)
-{
-}
-
-void ConsoleImpl::endSync(SyncQuery& sync)
-{
-}
-*/
-
-void ConsoleImpl::learnPackage(const string& packageName)
-{
- Mutex::ScopedLock _lock(lock);
- if (packages.find(packageName) == packages.end()) {
- packages.insert(pair<string, pair<ObjectClassList, EventClassList> >
- (packageName, pair<ObjectClassList, EventClassList>(ObjectClassList(), EventClassList())));
- eventNewPackage(packageName);
- }
-}
-
-void ConsoleImpl::learnClass(SchemaObjectClass* cls)
-{
- Mutex::ScopedLock _lock(lock);
- const SchemaClassKey* key = cls->getClassKey();
- PackageList::iterator pIter = packages.find(key->getPackageName());
- if (pIter == packages.end())
- return;
-
- ObjectClassList& list = pIter->second.first;
- if (list.find(key) == list.end()) {
- list[key] = cls;
- eventNewClass(key);
- }
-}
-
-void ConsoleImpl::learnClass(SchemaEventClass* cls)
-{
- Mutex::ScopedLock _lock(lock);
- const SchemaClassKey* key = cls->getClassKey();
- PackageList::iterator pIter = packages.find(key->getPackageName());
- if (pIter == packages.end())
- return;
-
- EventClassList& list = pIter->second.second;
- if (list.find(key) == list.end()) {
- list[key] = cls;
- eventNewClass(key);
- }
-}
-
-bool ConsoleImpl::haveClass(const SchemaClassKey* key) const
-{
- Mutex::ScopedLock _lock(lock);
- PackageList::const_iterator pIter = packages.find(key->getPackageName());
- if (pIter == packages.end())
- return false;
-
- const ObjectClassList& oList = pIter->second.first;
- const EventClassList& eList = pIter->second.second;
-
- return oList.find(key) != oList.end() || eList.find(key) != eList.end();
-}
-
-SchemaObjectClass* ConsoleImpl::getSchema(const SchemaClassKey* key) const
-{
- Mutex::ScopedLock _lock(lock);
- PackageList::const_iterator pIter = packages.find(key->getPackageName());
- if (pIter == packages.end())
- return 0;
-
- const ObjectClassList& oList = pIter->second.first;
- ObjectClassList::const_iterator iter = oList.find(key);
- if (iter == oList.end())
- return 0;
-
- return iter->second;
-}
-
-void ConsoleImpl::eventAgentAdded(boost::shared_ptr<AgentProxy> agent)
-{
- ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_ADDED));
- event->agent = agent;
- Mutex::ScopedLock _lock(lock);
- eventQueue.push_back(event);
-}
-
-void ConsoleImpl::eventAgentDeleted(boost::shared_ptr<AgentProxy> agent)
-{
- ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_DELETED));
- event->agent = agent;
- Mutex::ScopedLock _lock(lock);
- eventQueue.push_back(event);
-}
-
-void ConsoleImpl::eventNewPackage(const string& packageName)
-{
- ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::NEW_PACKAGE));
- event->name = packageName;
- Mutex::ScopedLock _lock(lock);
- eventQueue.push_back(event);
-}
-
-void ConsoleImpl::eventNewClass(const SchemaClassKey* key)
-{
- ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::NEW_CLASS));
- event->classKey = key;
- Mutex::ScopedLock _lock(lock);
- eventQueue.push_back(event);
-}
-
-void ConsoleImpl::eventObjectUpdate(ObjectPtr object, bool prop, bool stat)
-{
- ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::OBJECT_UPDATE));
- event->object = object;
- event->hasProps = prop;
- event->hasStats = stat;
- Mutex::ScopedLock _lock(lock);
- eventQueue.push_back(event);
-}
-
-void ConsoleImpl::eventAgentHeartbeat(boost::shared_ptr<AgentProxy> agent, uint64_t timestamp)
-{
- ConsoleEventImpl::Ptr event(new ConsoleEventImpl(ConsoleEvent::AGENT_HEARTBEAT));
- event->agent = agent;
- event->timestamp = timestamp;
- Mutex::ScopedLock _lock(lock);
- eventQueue.push_back(event);
-}
-
-
-void ConsoleImpl::eventEventReceived(EventPtr event)
-{
- ConsoleEventImpl::Ptr console_event(new ConsoleEventImpl(ConsoleEvent::EVENT_RECEIVED));
- console_event->event = event;
- Mutex::ScopedLock _lock(lock);
- eventQueue.push_back(console_event);
-}
-
-//==================================================================
-// Wrappers
-//==================================================================
-
-Console::Console(const ConsoleSettings& settings) : impl(new ConsoleImpl(settings)) {}
-Console::~Console() { delete impl; }
-bool Console::getEvent(ConsoleEvent& event) const { return impl->getEvent(event); }
-void Console::popEvent() { impl->popEvent(); }
-void Console::addConnection(BrokerProxy& broker, void* context) { impl->addConnection(broker, context); }
-void Console::delConnection(BrokerProxy& broker) { impl->delConnection(broker); }
-uint32_t Console::packageCount() const { return impl->packageCount(); }
-const char* Console::getPackageName(uint32_t idx) const { return impl->getPackageName(idx).c_str(); }
-uint32_t Console::classCount(const char* packageName) const { return impl->classCount(packageName); }
-const SchemaClassKey* Console::getClass(const char* packageName, uint32_t idx) const { return impl->getClass(packageName, idx); }
-ClassKind Console::getClassKind(const SchemaClassKey* key) const { return impl->getClassKind(key); }
-const SchemaObjectClass* Console::getObjectClass(const SchemaClassKey* key) const { return impl->getObjectClass(key); }
-const SchemaEventClass* Console::getEventClass(const SchemaClassKey* key) const { return impl->getEventClass(key); }
-void Console::bindPackage(const char* packageName) { impl->bindPackage(packageName); }
-void Console::bindClass(const SchemaClassKey* key) { impl->bindClass(key); }
-void Console::bindClass(const char* packageName, const char* className) { impl->bindClass(packageName, className); }
-
-void Console::bindEvent(const SchemaClassKey *key) { impl->bindEvent(key); }
-void Console::bindEvent(const char* packageName, const char* eventName) { impl->bindEvent(packageName, eventName); }
-
-//void Console::startSync(const Query& query, void* context, SyncQuery& sync) { impl->startSync(query, context, sync); }
-//void Console::touchSync(SyncQuery& sync) { impl->touchSync(sync); }
-//void Console::endSync(SyncQuery& sync) { impl->endSync(sync); }
-
-
diff --git a/qpid/cpp/src/qmf/engine/ConsoleImpl.h b/qpid/cpp/src/qmf/engine/ConsoleImpl.h
deleted file mode 100644
index 0c27fdabcd..0000000000
--- a/qpid/cpp/src/qmf/engine/ConsoleImpl.h
+++ /dev/null
@@ -1,148 +0,0 @@
-#ifndef _QmfEngineConsoleEngineImpl_
-#define _QmfEngineConsoleEngineImpl_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/Console.h"
-#include "qmf/engine/MessageImpl.h"
-#include "qmf/engine/SchemaImpl.h"
-#include "qmf/engine/Typecode.h"
-#include "qmf/engine/ObjectImpl.h"
-#include "qmf/engine/ObjectIdImpl.h"
-#include "qmf/engine/QueryImpl.h"
-#include "qmf/engine/ValueImpl.h"
-#include "qmf/engine/Protocol.h"
-#include "qmf/engine/SequenceManager.h"
-#include "qmf/engine/BrokerProxyImpl.h"
-#include <qpid/framing/Buffer.h>
-#include <qpid/framing/Uuid.h>
-#include <qpid/framing/FieldTable.h>
-#include <qpid/framing/FieldValue.h>
-#include <qpid/sys/Mutex.h>
-#include <qpid/sys/Time.h>
-#include <qpid/sys/SystemInfo.h>
-#include <string.h>
-#include <string>
-#include <deque>
-#include <map>
-#include <vector>
-#include <iostream>
-#include <fstream>
-#include <boost/shared_ptr.hpp>
-#include <boost/noncopyable.hpp>
-
-namespace qmf {
-namespace engine {
-
- struct ConsoleEventImpl {
- typedef boost::shared_ptr<ConsoleEventImpl> Ptr;
- ConsoleEvent::EventKind kind;
- boost::shared_ptr<AgentProxy> agent;
- std::string name;
- const SchemaClassKey* classKey;
- boost::shared_ptr<Object> object;
- void* context;
- boost::shared_ptr<Event> event;
- uint64_t timestamp;
- bool hasProps;
- bool hasStats;
-
- ConsoleEventImpl(ConsoleEvent::EventKind k) :
- kind(k), classKey(0), context(0), timestamp(0) {}
- ~ConsoleEventImpl() {}
- ConsoleEvent copy();
- };
-
- class ConsoleImpl : public boost::noncopyable {
- public:
- ConsoleImpl(const ConsoleSettings& settings = ConsoleSettings());
- ~ConsoleImpl();
-
- bool getEvent(ConsoleEvent& event) const;
- void popEvent();
-
- void addConnection(BrokerProxy& broker, void* context);
- void delConnection(BrokerProxy& broker);
-
- uint32_t packageCount() const;
- const std::string& getPackageName(uint32_t idx) const;
-
- uint32_t classCount(const char* packageName) const;
- const SchemaClassKey* getClass(const char* packageName, uint32_t idx) const;
-
- ClassKind getClassKind(const SchemaClassKey* key) const;
- const SchemaObjectClass* getObjectClass(const SchemaClassKey* key) const;
- const SchemaEventClass* getEventClass(const SchemaClassKey* key) const;
-
- void bindPackage(const char* packageName);
- void bindClass(const SchemaClassKey* key);
- void bindClass(const char* packageName, const char* className);
- void bindEvent(const SchemaClassKey* key);
- void bindEvent(const char* packageName, const char* eventName);
-
- /*
- void startSync(const Query& query, void* context, SyncQuery& sync);
- void touchSync(SyncQuery& sync);
- void endSync(SyncQuery& sync);
- */
-
- private:
- friend class BrokerProxyImpl;
- friend struct StaticContext;
- const ConsoleSettings& settings;
- mutable qpid::sys::Mutex lock;
- std::deque<ConsoleEventImpl::Ptr> eventQueue;
- std::vector<BrokerProxyImpl*> brokerList;
- std::vector<std::pair<std::string, std::string> > bindingList; // exchange/key (empty exchange => QMF_EXCHANGE)
-
- // Declare a compare class for the class maps that compares the dereferenced
- // class key pointers. The default behavior would be to compare the pointer
- // addresses themselves.
- struct KeyCompare {
- bool operator()(const SchemaClassKey* left, const SchemaClassKey* right) const {
- return *left < *right;
- }
- };
-
- typedef std::map<const SchemaClassKey*, SchemaObjectClass*, KeyCompare> ObjectClassList;
- typedef std::map<const SchemaClassKey*, SchemaEventClass*, KeyCompare> EventClassList;
- typedef std::map<std::string, std::pair<ObjectClassList, EventClassList> > PackageList;
-
- PackageList packages;
-
- void learnPackage(const std::string& packageName);
- void learnClass(SchemaObjectClass* cls);
- void learnClass(SchemaEventClass* cls);
- bool haveClass(const SchemaClassKey* key) const;
- SchemaObjectClass* getSchema(const SchemaClassKey* key) const;
-
- void eventAgentAdded(boost::shared_ptr<AgentProxy> agent);
- void eventAgentDeleted(boost::shared_ptr<AgentProxy> agent);
- void eventNewPackage(const std::string& packageName);
- void eventNewClass(const SchemaClassKey* key);
- void eventObjectUpdate(ObjectPtr object, bool prop, bool stat);
- void eventAgentHeartbeat(boost::shared_ptr<AgentProxy> agent, uint64_t timestamp);
- void eventEventReceived(boost::shared_ptr<Event> event);
- };
-}
-}
-
-#endif
-
diff --git a/qpid/cpp/src/qmf/engine/EventImpl.cpp b/qpid/cpp/src/qmf/engine/EventImpl.cpp
deleted file mode 100644
index 4b034e8e83..0000000000
--- a/qpid/cpp/src/qmf/engine/EventImpl.cpp
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <qmf/engine/EventImpl.h>
-#include <qmf/engine/ValueImpl.h>
-
-#include <sstream>
-
-using namespace std;
-using namespace qmf::engine;
-using qpid::framing::Buffer;
-
-EventImpl::EventImpl(const SchemaEventClass* type) : eventClass(type), timestamp(0), severity(0)
-{
- int argCount = eventClass->getArgumentCount();
- int idx;
-
- for (idx = 0; idx < argCount; idx++) {
- const SchemaArgument* arg = eventClass->getArgument(idx);
- arguments[arg->getName()] = ValuePtr(new Value(arg->getType()));
- }
-}
-
-
-EventImpl::EventImpl(const SchemaEventClass* type, Buffer& buffer) :
- eventClass(type), timestamp(0), severity(0)
-{
- int argCount = eventClass->getArgumentCount();
- int idx;
-
- timestamp = buffer.getLongLong();
- severity = buffer.getOctet();
-
- for (idx = 0; idx < argCount; idx++)
- {
- const SchemaArgument *arg = eventClass->getArgument(idx);
- Value* pval = ValueImpl::factory(arg->getType(), buffer);
- arguments[arg->getName()] = ValuePtr(pval);
- }
-}
-
-
-Event* EventImpl::factory(const SchemaEventClass* type, Buffer& buffer)
-{
- EventImpl* impl(new EventImpl(type, buffer));
- return new Event(impl);
-}
-
-
-Value* EventImpl::getValue(const char* key) const
-{
- map<string, ValuePtr>::const_iterator iter;
-
- iter = arguments.find(key);
- if (iter != arguments.end())
- return iter->second.get();
-
- return 0;
-}
-
-
-void EventImpl::encodeSchemaKey(Buffer& buffer) const
-{
- buffer.putShortString(eventClass->getClassKey()->getPackageName());
- buffer.putShortString(eventClass->getClassKey()->getClassName());
- buffer.putBin128(const_cast<uint8_t*>(eventClass->getClassKey()->getHash()));
-}
-
-
-void EventImpl::encode(Buffer& buffer) const
-{
- buffer.putOctet((uint8_t) eventClass->getSeverity());
-
- int argCount = eventClass->getArgumentCount();
- for (int idx = 0; idx < argCount; idx++) {
- const SchemaArgument* arg = eventClass->getArgument(idx);
- ValuePtr value = arguments[arg->getName()];
- value->impl->encode(buffer);
- }
-}
-
-
-string EventImpl::getRoutingKey(uint32_t brokerBank, uint32_t agentBank) const
-{
- stringstream key;
-
- key << "console.event." << brokerBank << "." << agentBank << "." <<
- eventClass->getClassKey()->getPackageName() << "." <<
- eventClass->getClassKey()->getClassName();
- return key.str();
-}
-
-
-//==================================================================
-// Wrappers
-//==================================================================
-
-Event::Event(const SchemaEventClass* type) : impl(new EventImpl(type)) {}
-Event::Event(EventImpl* i) : impl(i) {}
-Event::Event(const Event& from) : impl(new EventImpl(*(from.impl))) {}
-Event::~Event() { delete impl; }
-const SchemaEventClass* Event::getClass() const { return impl->getClass(); }
-Value* Event::getValue(const char* key) const { return impl->getValue(key); }
-
diff --git a/qpid/cpp/src/qmf/engine/EventImpl.h b/qpid/cpp/src/qmf/engine/EventImpl.h
deleted file mode 100644
index 4046e71ef9..0000000000
--- a/qpid/cpp/src/qmf/engine/EventImpl.h
+++ /dev/null
@@ -1,57 +0,0 @@
-#ifndef _QmfEngineEventImpl_
-#define _QmfEngineEventImpl_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <qmf/engine/Event.h>
-#include <qmf/engine/Schema.h>
-#include <qpid/framing/Buffer.h>
-#include <boost/shared_ptr.hpp>
-#include <map>
-
-namespace qmf {
-namespace engine {
-
- typedef boost::shared_ptr<Event> EventPtr;
-
- struct EventImpl {
- typedef boost::shared_ptr<Value> ValuePtr;
- const SchemaEventClass* eventClass;
- uint64_t timestamp;
- uint8_t severity;
- mutable std::map<std::string, ValuePtr> arguments;
-
- EventImpl(const SchemaEventClass* type);
- EventImpl(const SchemaEventClass* type, qpid::framing::Buffer& buffer);
- static Event* factory(const SchemaEventClass* type, qpid::framing::Buffer& buffer);
-
- const SchemaEventClass* getClass() const { return eventClass; }
- Value* getValue(const char* key) const;
-
- void encodeSchemaKey(qpid::framing::Buffer& buffer) const;
- void encode(qpid::framing::Buffer& buffer) const;
- std::string getRoutingKey(uint32_t brokerBank, uint32_t agentBank) const;
- };
-
-}
-}
-
-#endif
-
diff --git a/qpid/cpp/src/qmf/engine/MessageImpl.cpp b/qpid/cpp/src/qmf/engine/MessageImpl.cpp
deleted file mode 100644
index 0047d3eb9d..0000000000
--- a/qpid/cpp/src/qmf/engine/MessageImpl.cpp
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/MessageImpl.h"
-#include <string.h>
-
-using namespace std;
-using namespace qmf::engine;
-
-#define STRING_REF(s) {if (!s.empty()) item.s = const_cast<char*>(s.c_str());}
-
-Message MessageImpl::copy()
-{
- Message item;
-
- ::memset(&item, 0, sizeof(Message));
- item.body = const_cast<char*>(body.c_str());
- item.length = body.length();
- STRING_REF(destination);
- STRING_REF(routingKey);
- STRING_REF(replyExchange);
- STRING_REF(replyKey);
- STRING_REF(userId);
-
- return item;
-}
-
diff --git a/qpid/cpp/src/qmf/engine/MessageImpl.h b/qpid/cpp/src/qmf/engine/MessageImpl.h
deleted file mode 100644
index b91291d2e4..0000000000
--- a/qpid/cpp/src/qmf/engine/MessageImpl.h
+++ /dev/null
@@ -1,44 +0,0 @@
-#ifndef _QmfEngineMessageImpl_
-#define _QmfEngineMessageImpl_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/Message.h"
-#include <string>
-#include <boost/shared_ptr.hpp>
-
-namespace qmf {
-namespace engine {
-
- struct MessageImpl {
- typedef boost::shared_ptr<MessageImpl> Ptr;
- std::string body;
- std::string destination;
- std::string routingKey;
- std::string replyExchange;
- std::string replyKey;
- std::string userId;
-
- Message copy();
- };
-}
-}
-
-#endif
diff --git a/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp b/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp
deleted file mode 100644
index 9216f7bac0..0000000000
--- a/qpid/cpp/src/qmf/engine/ObjectIdImpl.cpp
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/ObjectIdImpl.h"
-#include <stdlib.h>
-#include <sstream>
-
-using namespace std;
-using namespace qmf::engine;
-using qpid::framing::Buffer;
-
-void AgentAttachment::setBanks(uint32_t broker, uint32_t agent)
-{
- first =
- ((uint64_t) (broker & 0x000fffff)) << 28 |
- ((uint64_t) (agent & 0x0fffffff));
-}
-
-ObjectIdImpl::ObjectIdImpl(Buffer& buffer) : agent(0)
-{
- decode(buffer);
-}
-
-ObjectIdImpl::ObjectIdImpl(AgentAttachment* a, uint8_t flags, uint16_t seq, uint64_t object) : agent(a)
-{
- first =
- ((uint64_t) (flags & 0x0f)) << 60 |
- ((uint64_t) (seq & 0x0fff)) << 48;
- second = object;
-}
-
-ObjectId* ObjectIdImpl::factory(Buffer& buffer)
-{
- ObjectIdImpl* impl(new ObjectIdImpl(buffer));
- return new ObjectId(impl);
-}
-
-ObjectId* ObjectIdImpl::factory(AgentAttachment* agent, uint8_t flags, uint16_t seq, uint64_t object)
-{
- ObjectIdImpl* impl(new ObjectIdImpl(agent, flags, seq, object));
- return new ObjectId(impl);
-}
-
-void ObjectIdImpl::decode(Buffer& buffer)
-{
- first = buffer.getLongLong();
- second = buffer.getLongLong();
-}
-
-void ObjectIdImpl::encode(Buffer& buffer) const
-{
- if (agent == 0)
- buffer.putLongLong(first);
- else
- buffer.putLongLong(first | agent->first);
- buffer.putLongLong(second);
-}
-
-void ObjectIdImpl::fromString(const std::string& repr)
-{
-#define FIELDS 5
-#if defined (_WIN32) && !defined (atoll)
-# define atoll(X) _atoi64(X)
-#endif
-
- std::string copy(repr.c_str());
- char* cText;
- char* field[FIELDS];
- bool atFieldStart = true;
- int idx = 0;
-
- cText = const_cast<char*>(copy.c_str());
- for (char* cursor = cText; *cursor; cursor++) {
- if (atFieldStart) {
- if (idx >= FIELDS)
- return; // TODO error
- field[idx++] = cursor;
- atFieldStart = false;
- } else {
- if (*cursor == '-') {
- *cursor = '\0';
- atFieldStart = true;
- }
- }
- }
-
- if (idx != FIELDS)
- return; // TODO error
-
- first = (atoll(field[0]) << 60) +
- (atoll(field[1]) << 48) +
- (atoll(field[2]) << 28) +
- atoll(field[3]);
- second = atoll(field[4]);
- agent = 0;
-}
-
-const string& ObjectIdImpl::asString() const
-{
- stringstream val;
-
- val << (int) getFlags() << "-" << getSequence() << "-" << getBrokerBank() << "-" <<
- getAgentBank() << "-" << getObjectNum();
- repr = val.str();
- return repr;
-}
-
-#define ACTUAL_FIRST (agent == 0 ? first : first | agent->first)
-#define ACTUAL_OTHER (other.agent == 0 ? other.first : other.first | other.agent->first)
-
-uint8_t ObjectIdImpl::getFlags() const
-{
- return (ACTUAL_FIRST & 0xF000000000000000LL) >> 60;
-}
-
-uint16_t ObjectIdImpl::getSequence() const
-{
- return (ACTUAL_FIRST & 0x0FFF000000000000LL) >> 48;
-}
-
-uint32_t ObjectIdImpl::getBrokerBank() const
-{
- return (ACTUAL_FIRST & 0x0000FFFFF0000000LL) >> 28;
-}
-
-uint32_t ObjectIdImpl::getAgentBank() const
-{
- return ACTUAL_FIRST & 0x000000000FFFFFFFLL;
-}
-
-uint64_t ObjectIdImpl::getObjectNum() const
-{
- return second;
-}
-
-uint32_t ObjectIdImpl::getObjectNumHi() const
-{
- return (uint32_t) (second >> 32);
-}
-
-uint32_t ObjectIdImpl::getObjectNumLo() const
-{
- return (uint32_t) (second & 0x00000000FFFFFFFFLL);
-}
-
-bool ObjectIdImpl::operator==(const ObjectIdImpl& other) const
-{
- return ACTUAL_FIRST == ACTUAL_OTHER && second == other.second;
-}
-
-bool ObjectIdImpl::operator<(const ObjectIdImpl& other) const
-{
- return (ACTUAL_FIRST < ACTUAL_OTHER) || ((ACTUAL_FIRST == ACTUAL_OTHER) && (second < other.second));
-}
-
-bool ObjectIdImpl::operator>(const ObjectIdImpl& other) const
-{
- return (ACTUAL_FIRST > ACTUAL_OTHER) || ((ACTUAL_FIRST == ACTUAL_OTHER) && (second > other.second));
-}
-
-
-//==================================================================
-// Wrappers
-//==================================================================
-
-ObjectId::ObjectId() : impl(new ObjectIdImpl()) {}
-ObjectId::ObjectId(const ObjectId& from) : impl(new ObjectIdImpl(*(from.impl))) {}
-ObjectId::ObjectId(ObjectIdImpl* i) : impl(i) {}
-ObjectId::~ObjectId() { delete impl; }
-uint64_t ObjectId::getObjectNum() const { return impl->getObjectNum(); }
-uint32_t ObjectId::getObjectNumHi() const { return impl->getObjectNumHi(); }
-uint32_t ObjectId::getObjectNumLo() const { return impl->getObjectNumLo(); }
-bool ObjectId::isDurable() const { return impl->isDurable(); }
-const char* ObjectId::str() const { return impl->asString().c_str(); }
-uint8_t ObjectId::getFlags() const { return impl->getFlags(); }
-uint16_t ObjectId::getSequence() const { return impl->getSequence(); }
-uint32_t ObjectId::getBrokerBank() const { return impl->getBrokerBank(); }
-uint32_t ObjectId::getAgentBank() const { return impl->getAgentBank(); }
-bool ObjectId::operator==(const ObjectId& other) const { return *impl == *other.impl; }
-bool ObjectId::operator<(const ObjectId& other) const { return *impl < *other.impl; }
-bool ObjectId::operator>(const ObjectId& other) const { return *impl > *other.impl; }
-bool ObjectId::operator<=(const ObjectId& other) const { return !(*impl > *other.impl); }
-bool ObjectId::operator>=(const ObjectId& other) const { return !(*impl < *other.impl); }
-ObjectId& ObjectId::operator=(const ObjectId& other) {
- ObjectIdImpl *old;
- if (this != &other) {
- old = impl;
- impl = new ObjectIdImpl(*(other.impl));
- if (old)
- delete old;
- }
- return *this;
-}
-
diff --git a/qpid/cpp/src/qmf/engine/ObjectIdImpl.h b/qpid/cpp/src/qmf/engine/ObjectIdImpl.h
deleted file mode 100644
index d70c8efff4..0000000000
--- a/qpid/cpp/src/qmf/engine/ObjectIdImpl.h
+++ /dev/null
@@ -1,72 +0,0 @@
-#ifndef _QmfEngineObjectIdImpl_
-#define _QmfEngineObjectIdImpl_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <qmf/engine/ObjectId.h>
-#include <qpid/framing/Buffer.h>
-
-namespace qmf {
-namespace engine {
-
- struct AgentAttachment {
- uint64_t first;
-
- AgentAttachment() : first(0) {}
- void setBanks(uint32_t broker, uint32_t bank);
- uint64_t getFirst() const { return first; }
- };
-
- struct ObjectIdImpl {
- AgentAttachment* agent;
- uint64_t first;
- uint64_t second;
- mutable std::string repr;
-
- ObjectIdImpl() : agent(0), first(0), second(0) {}
- ObjectIdImpl(qpid::framing::Buffer& buffer);
- ObjectIdImpl(AgentAttachment* agent, uint8_t flags, uint16_t seq, uint64_t object);
-
- static ObjectId* factory(qpid::framing::Buffer& buffer);
- static ObjectId* factory(AgentAttachment* agent, uint8_t flags, uint16_t seq, uint64_t object);
-
- void decode(qpid::framing::Buffer& buffer);
- void encode(qpid::framing::Buffer& buffer) const;
- void fromString(const std::string& repr);
- const std::string& asString() const;
- uint8_t getFlags() const;
- uint16_t getSequence() const;
- uint32_t getBrokerBank() const;
- uint32_t getAgentBank() const;
- uint64_t getObjectNum() const;
- uint32_t getObjectNumHi() const;
- uint32_t getObjectNumLo() const;
- bool isDurable() const { return getSequence() == 0; }
- void setValue(uint64_t f, uint64_t s) { first = f; second = s; agent = 0; }
-
- bool operator==(const ObjectIdImpl& other) const;
- bool operator<(const ObjectIdImpl& other) const;
- bool operator>(const ObjectIdImpl& other) const;
- };
-}
-}
-
-#endif
-
diff --git a/qpid/cpp/src/qmf/engine/ObjectImpl.cpp b/qpid/cpp/src/qmf/engine/ObjectImpl.cpp
deleted file mode 100644
index 45925cb804..0000000000
--- a/qpid/cpp/src/qmf/engine/ObjectImpl.cpp
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/ObjectImpl.h"
-#include "qmf/engine/ValueImpl.h"
-#include "qmf/engine/BrokerProxyImpl.h"
-#include <qpid/sys/Time.h>
-
-using namespace std;
-using namespace qmf::engine;
-using namespace qpid::sys;
-using qpid::framing::Buffer;
-
-ObjectImpl::ObjectImpl(const SchemaObjectClass* type) : objectClass(type), broker(0), createTime(uint64_t(Duration(EPOCH, now()))), destroyTime(0), lastUpdatedTime(createTime)
-{
- int propCount = objectClass->getPropertyCount();
- int statCount = objectClass->getStatisticCount();
- int idx;
-
- for (idx = 0; idx < propCount; idx++) {
- const SchemaProperty* prop = objectClass->getProperty(idx);
- properties[prop->getName()] = ValuePtr(new Value(prop->getType()));
- }
-
- for (idx = 0; idx < statCount; idx++) {
- const SchemaStatistic* stat = objectClass->getStatistic(idx);
- statistics[stat->getName()] = ValuePtr(new Value(stat->getType()));
- }
-}
-
-ObjectImpl::ObjectImpl(const SchemaObjectClass* type, BrokerProxyImpl* b, Buffer& buffer, bool prop, bool stat, bool managed) :
- objectClass(type), broker(b), createTime(0), destroyTime(0), lastUpdatedTime(0)
-{
- int idx;
-
- if (managed) {
- lastUpdatedTime = buffer.getLongLong();
- createTime = buffer.getLongLong();
- destroyTime = buffer.getLongLong();
- objectId.reset(ObjectIdImpl::factory(buffer));
- }
-
- if (prop) {
- int propCount = objectClass->getPropertyCount();
- set<string> excludes;
- parsePresenceMasks(buffer, excludes);
- for (idx = 0; idx < propCount; idx++) {
- const SchemaProperty* prop = objectClass->getProperty(idx);
- if (excludes.count(prop->getName()) != 0) {
- properties[prop->getName()] = ValuePtr(new Value(prop->getType()));
- } else {
- Value* pval = ValueImpl::factory(prop->getType(), buffer);
- properties[prop->getName()] = ValuePtr(pval);
- }
- }
- }
-
- if (stat) {
- int statCount = objectClass->getStatisticCount();
- for (idx = 0; idx < statCount; idx++) {
- const SchemaStatistic* stat = objectClass->getStatistic(idx);
- Value* sval = ValueImpl::factory(stat->getType(), buffer);
- statistics[stat->getName()] = ValuePtr(sval);
- }
- }
-}
-
-Object* ObjectImpl::factory(const SchemaObjectClass* type, BrokerProxyImpl* b, Buffer& buffer, bool prop, bool stat, bool managed)
-{
- ObjectImpl* impl(new ObjectImpl(type, b, buffer, prop, stat, managed));
- return new Object(impl);
-}
-
-ObjectImpl::~ObjectImpl()
-{
-}
-
-void ObjectImpl::destroy()
-{
- destroyTime = uint64_t(Duration(EPOCH, now()));
- // TODO - flag deletion
-}
-
-Value* ObjectImpl::getValue(const string& key) const
-{
- map<string, ValuePtr>::const_iterator iter;
-
- iter = properties.find(key);
- if (iter != properties.end())
- return iter->second.get();
-
- iter = statistics.find(key);
- if (iter != statistics.end())
- return iter->second.get();
-
- return 0;
-}
-
-void ObjectImpl::invokeMethod(const string& methodName, const Value* inArgs, void* context) const
-{
- if (broker != 0 && objectId.get() != 0)
- broker->sendMethodRequest(objectId.get(), objectClass, methodName, inArgs, context);
-}
-
-void ObjectImpl::merge(const Object& from)
-{
- for (map<string, ValuePtr>::const_iterator piter = from.impl->properties.begin();
- piter != from.impl->properties.end(); piter++)
- properties[piter->first] = piter->second;
- for (map<string, ValuePtr>::const_iterator siter = from.impl->statistics.begin();
- siter != from.impl->statistics.end(); siter++)
- statistics[siter->first] = siter->second;
-}
-
-void ObjectImpl::parsePresenceMasks(Buffer& buffer, set<string>& excludeList)
-{
- int propCount = objectClass->getPropertyCount();
- excludeList.clear();
- uint8_t bit = 0;
- uint8_t mask = 0;
-
- for (int idx = 0; idx < propCount; idx++) {
- const SchemaProperty* prop = objectClass->getProperty(idx);
- if (prop->isOptional()) {
- if (bit == 0) {
- mask = buffer.getOctet();
- bit = 1;
- }
- if ((mask & bit) == 0)
- excludeList.insert(string(prop->getName()));
- if (bit == 0x80)
- bit = 0;
- else
- bit = bit << 1;
- }
- }
-}
-
-void ObjectImpl::encodeSchemaKey(qpid::framing::Buffer& buffer) const
-{
- buffer.putShortString(objectClass->getClassKey()->getPackageName());
- buffer.putShortString(objectClass->getClassKey()->getClassName());
- buffer.putBin128(const_cast<uint8_t*>(objectClass->getClassKey()->getHash()));
-}
-
-void ObjectImpl::encodeManagedObjectData(qpid::framing::Buffer& buffer) const
-{
- buffer.putLongLong(lastUpdatedTime);
- buffer.putLongLong(createTime);
- buffer.putLongLong(destroyTime);
- objectId->impl->encode(buffer);
-}
-
-void ObjectImpl::encodeProperties(qpid::framing::Buffer& buffer) const
-{
- int propCount = objectClass->getPropertyCount();
- uint8_t bit = 0;
- uint8_t mask = 0;
- ValuePtr value;
-
- for (int idx = 0; idx < propCount; idx++) {
- const SchemaProperty* prop = objectClass->getProperty(idx);
- if (prop->isOptional()) {
- value = properties[prop->getName()];
- if (bit == 0)
- bit = 1;
- if (!value->isNull())
- mask |= bit;
- if (bit == 0x80) {
- buffer.putOctet(mask);
- bit = 0;
- mask = 0;
- } else
- bit = bit << 1;
- }
- }
- if (bit != 0) {
- buffer.putOctet(mask);
- }
-
- for (int idx = 0; idx < propCount; idx++) {
- const SchemaProperty* prop = objectClass->getProperty(idx);
- value = properties[prop->getName()];
- if (!prop->isOptional() || !value->isNull()) {
- value->impl->encode(buffer);
- }
- }
-}
-
-void ObjectImpl::encodeStatistics(qpid::framing::Buffer& buffer) const
-{
- int statCount = objectClass->getStatisticCount();
- for (int idx = 0; idx < statCount; idx++) {
- const SchemaStatistic* stat = objectClass->getStatistic(idx);
- ValuePtr value = statistics[stat->getName()];
- value->impl->encode(buffer);
- }
-}
-
-//==================================================================
-// Wrappers
-//==================================================================
-
-Object::Object(const SchemaObjectClass* type) : impl(new ObjectImpl(type)) {}
-Object::Object(ObjectImpl* i) : impl(i) {}
-Object::Object(const Object& from) : impl(new ObjectImpl(*(from.impl))) {}
-Object::~Object() { delete impl; }
-void Object::destroy() { impl->destroy(); }
-const ObjectId* Object::getObjectId() const { return impl->getObjectId(); }
-void Object::setObjectId(ObjectId* oid) { impl->setObjectId(oid); }
-const SchemaObjectClass* Object::getClass() const { return impl->getClass(); }
-Value* Object::getValue(const char* key) const { return impl->getValue(key); }
-void Object::invokeMethod(const char* m, const Value* a, void* c) const { impl->invokeMethod(m, a, c); }
-bool Object::isDeleted() const { return impl->isDeleted(); }
-void Object::merge(const Object& from) { impl->merge(from); }
-
diff --git a/qpid/cpp/src/qmf/engine/ObjectImpl.h b/qpid/cpp/src/qmf/engine/ObjectImpl.h
deleted file mode 100644
index 6f25867004..0000000000
--- a/qpid/cpp/src/qmf/engine/ObjectImpl.h
+++ /dev/null
@@ -1,76 +0,0 @@
-#ifndef _QmfEngineObjectImpl_
-#define _QmfEngineObjectImpl_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <qmf/engine/Object.h>
-#include <qmf/engine/ObjectIdImpl.h>
-#include <map>
-#include <set>
-#include <string>
-#include <qpid/framing/Buffer.h>
-#include <boost/shared_ptr.hpp>
-#include <qpid/sys/Mutex.h>
-
-namespace qmf {
-namespace engine {
-
- class BrokerProxyImpl;
-
- typedef boost::shared_ptr<Object> ObjectPtr;
-
- struct ObjectImpl {
- typedef boost::shared_ptr<Value> ValuePtr;
- const SchemaObjectClass* objectClass;
- BrokerProxyImpl* broker;
- boost::shared_ptr<ObjectId> objectId;
- uint64_t createTime;
- uint64_t destroyTime;
- uint64_t lastUpdatedTime;
- mutable std::map<std::string, ValuePtr> properties;
- mutable std::map<std::string, ValuePtr> statistics;
-
- ObjectImpl(const SchemaObjectClass* type);
- ObjectImpl(const SchemaObjectClass* type, BrokerProxyImpl* b, qpid::framing::Buffer& buffer,
- bool prop, bool stat, bool managed);
- static Object* factory(const SchemaObjectClass* type, BrokerProxyImpl* b, qpid::framing::Buffer& buffer,
- bool prop, bool stat, bool managed);
- ~ObjectImpl();
-
- void destroy();
- const ObjectId* getObjectId() const { return objectId.get(); }
- void setObjectId(ObjectId* oid) { objectId.reset(new ObjectId(*oid)); }
- const SchemaObjectClass* getClass() const { return objectClass; }
- Value* getValue(const std::string& key) const;
- void invokeMethod(const std::string& methodName, const Value* inArgs, void* context) const;
- bool isDeleted() const { return destroyTime != 0; }
- void merge(const Object& from);
-
- void parsePresenceMasks(qpid::framing::Buffer& buffer, std::set<std::string>& excludeList);
- void encodeSchemaKey(qpid::framing::Buffer& buffer) const;
- void encodeManagedObjectData(qpid::framing::Buffer& buffer) const;
- void encodeProperties(qpid::framing::Buffer& buffer) const;
- void encodeStatistics(qpid::framing::Buffer& buffer) const;
- };
-}
-}
-
-#endif
-
diff --git a/qpid/cpp/src/qmf/engine/Protocol.cpp b/qpid/cpp/src/qmf/engine/Protocol.cpp
deleted file mode 100644
index 9e5f490604..0000000000
--- a/qpid/cpp/src/qmf/engine/Protocol.cpp
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/Protocol.h"
-#include "qpid/framing/Buffer.h"
-
-using namespace std;
-using namespace qmf::engine;
-using namespace qpid::framing;
-
-
-bool Protocol::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq)
-{
- if (buf.available() < 8)
- return false;
-
- uint8_t h1 = buf.getOctet();
- uint8_t h2 = buf.getOctet();
- uint8_t h3 = buf.getOctet();
-
- *opcode = buf.getOctet();
- *seq = buf.getLong();
-
- return h1 == 'A' && h2 == 'M' && h3 == '2';
-}
-
-void Protocol::encodeHeader(qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq)
-{
- buf.putOctet('A');
- buf.putOctet('M');
- buf.putOctet('2');
- buf.putOctet(opcode);
- buf.putLong (seq);
-}
-
-
diff --git a/qpid/cpp/src/qmf/engine/Protocol.h b/qpid/cpp/src/qmf/engine/Protocol.h
deleted file mode 100644
index 1cdfa60c84..0000000000
--- a/qpid/cpp/src/qmf/engine/Protocol.h
+++ /dev/null
@@ -1,69 +0,0 @@
-#ifndef _QmfEngineProtocol_
-#define _QmfEngineProtocol_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <qpid/sys/IntegerTypes.h>
-
-namespace qpid {
- namespace framing {
- class Buffer;
- }
-}
-
-namespace qmf {
-namespace engine {
-
- class Protocol {
- public:
- static bool checkHeader(qpid::framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
- static void encodeHeader(qpid::framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
-
- const static uint8_t OP_ATTACH_REQUEST = 'A';
- const static uint8_t OP_ATTACH_RESPONSE = 'a';
-
- const static uint8_t OP_BROKER_REQUEST = 'B';
- const static uint8_t OP_BROKER_RESPONSE = 'b';
-
- const static uint8_t OP_CONSOLE_ADDED_INDICATION = 'x';
- const static uint8_t OP_COMMAND_COMPLETE = 'z';
- const static uint8_t OP_HEARTBEAT_INDICATION = 'h';
-
- const static uint8_t OP_PACKAGE_REQUEST = 'P';
- const static uint8_t OP_PACKAGE_INDICATION = 'p';
- const static uint8_t OP_CLASS_QUERY = 'Q';
- const static uint8_t OP_CLASS_INDICATION = 'q';
- const static uint8_t OP_SCHEMA_REQUEST = 'S';
- const static uint8_t OP_SCHEMA_RESPONSE = 's';
-
- const static uint8_t OP_METHOD_REQUEST = 'M';
- const static uint8_t OP_METHOD_RESPONSE = 'm';
- const static uint8_t OP_GET_QUERY = 'G';
- const static uint8_t OP_OBJECT_INDICATION = 'g';
- const static uint8_t OP_PROPERTY_INDICATION = 'c';
- const static uint8_t OP_STATISTIC_INDICATION = 'i';
- const static uint8_t OP_EVENT_INDICATION = 'e';
- };
-
-}
-}
-
-#endif
-
diff --git a/qpid/cpp/src/qmf/engine/QueryImpl.cpp b/qpid/cpp/src/qmf/engine/QueryImpl.cpp
deleted file mode 100644
index 6f2beeee87..0000000000
--- a/qpid/cpp/src/qmf/engine/QueryImpl.cpp
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/QueryImpl.h"
-#include "qmf/engine/ObjectIdImpl.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/framing/FieldTable.h"
-
-using namespace std;
-using namespace qmf::engine;
-using namespace qpid::framing;
-
-bool QueryElementImpl::evaluate(const Object* /*object*/) const
-{
- // TODO: Implement this
- return false;
-}
-
-bool QueryExpressionImpl::evaluate(const Object* /*object*/) const
-{
- // TODO: Implement this
- return false;
-}
-
-QueryImpl::QueryImpl(Buffer& buffer)
-{
- FieldTable ft;
- ft.decode(buffer);
- // TODO
-}
-
-Query* QueryImpl::factory(Buffer& buffer)
-{
- QueryImpl* impl(new QueryImpl(buffer));
- return new Query(impl);
-}
-
-void QueryImpl::encode(Buffer& buffer) const
-{
- FieldTable ft;
-
- if (oid.get() != 0) {
- ft.setString("_objectid", oid->impl->asString());
- } else {
- if (!packageName.empty())
- ft.setString("_package", packageName);
- ft.setString("_class", className);
- }
-
- ft.encode(buffer);
-}
-
-
-//==================================================================
-// Wrappers
-//==================================================================
-
-QueryElement::QueryElement(const char* attrName, const Value* value, ValueOper oper) : impl(new QueryElementImpl(attrName, value, oper)) {}
-QueryElement::QueryElement(QueryElementImpl* i) : impl(i) {}
-QueryElement::~QueryElement() { delete impl; }
-bool QueryElement::evaluate(const Object* object) const { return impl->evaluate(object); }
-
-QueryExpression::QueryExpression(ExprOper oper, const QueryOperand* operand1, const QueryOperand* operand2) : impl(new QueryExpressionImpl(oper, operand1, operand2)) {}
-QueryExpression::QueryExpression(QueryExpressionImpl* i) : impl(i) {}
-QueryExpression::~QueryExpression() { delete impl; }
-bool QueryExpression::evaluate(const Object* object) const { return impl->evaluate(object); }
-
-Query::Query(const char* className, const char* packageName) : impl(new QueryImpl(className, packageName)) {}
-Query::Query(const SchemaClassKey* key) : impl(new QueryImpl(key)) {}
-Query::Query(const ObjectId* oid) : impl(new QueryImpl(oid)) {}
-Query::Query(QueryImpl* i) : impl(i) {}
-Query::Query(const Query& from) : impl(new QueryImpl(*(from.impl))) {}
-Query::~Query() { delete impl; }
-void Query::setSelect(const QueryOperand* criterion) { impl->setSelect(criterion); }
-void Query::setLimit(uint32_t maxResults) { impl->setLimit(maxResults); }
-void Query::setOrderBy(const char* attrName, bool decreasing) { impl->setOrderBy(attrName, decreasing); }
-const char* Query::getPackage() const { return impl->getPackage().c_str(); }
-const char* Query::getClass() const { return impl->getClass().c_str(); }
-const ObjectId* Query::getObjectId() const { return impl->getObjectId(); }
-bool Query::haveSelect() const { return impl->haveSelect(); }
-bool Query::haveLimit() const { return impl->haveLimit(); }
-bool Query::haveOrderBy() const { return impl->haveOrderBy(); }
-const QueryOperand* Query::getSelect() const { return impl->getSelect(); }
-uint32_t Query::getLimit() const { return impl->getLimit(); }
-const char* Query::getOrderBy() const { return impl->getOrderBy().c_str(); }
-bool Query::getDecreasing() const { return impl->getDecreasing(); }
-
diff --git a/qpid/cpp/src/qmf/engine/QueryImpl.h b/qpid/cpp/src/qmf/engine/QueryImpl.h
deleted file mode 100644
index 8ebe0d932f..0000000000
--- a/qpid/cpp/src/qmf/engine/QueryImpl.h
+++ /dev/null
@@ -1,102 +0,0 @@
-#ifndef _QmfEngineQueryImpl_
-#define _QmfEngineQueryImpl_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/Query.h"
-#include "qmf/engine/Schema.h"
-#include <string>
-#include <boost/shared_ptr.hpp>
-
-namespace qpid {
- namespace framing {
- class Buffer;
- }
-}
-
-namespace qmf {
-namespace engine {
-
- struct QueryElementImpl {
- QueryElementImpl(const std::string& a, const Value* v, ValueOper o) : attrName(a), value(v), oper(o) {}
- ~QueryElementImpl() {}
- bool evaluate(const Object* object) const;
-
- std::string attrName;
- const Value* value;
- ValueOper oper;
- };
-
- struct QueryExpressionImpl {
- QueryExpressionImpl(ExprOper o, const QueryOperand* operand1, const QueryOperand* operand2) : oper(o), left(operand1), right(operand2) {}
- ~QueryExpressionImpl() {}
- bool evaluate(const Object* object) const;
-
- ExprOper oper;
- const QueryOperand* left;
- const QueryOperand* right;
- };
-
- struct QueryImpl {
- // Constructors mapped to public
- QueryImpl(const std::string& c, const std::string& p) : packageName(p), className(c), select(0), resultLimit(0) {}
- QueryImpl(const SchemaClassKey* key) : packageName(key->getPackageName()), className(key->getClassName()), select(0), resultLimit(0) {}
- QueryImpl(const ObjectId* oid) : oid(new ObjectId(*oid)), select(0), resultLimit(0) {}
-
- // Factory constructors
- QueryImpl(qpid::framing::Buffer& buffer);
-
- ~QueryImpl() {};
- static Query* factory(qpid::framing::Buffer& buffer);
-
- void setSelect(const QueryOperand* criterion) { select = criterion; }
- void setLimit(uint32_t maxResults) { resultLimit = maxResults; }
- void setOrderBy(const std::string& attrName, bool decreasing) {
- orderBy = attrName; orderDecreasing = decreasing;
- }
-
- const std::string& getPackage() const { return packageName; }
- const std::string& getClass() const { return className; }
- const ObjectId* getObjectId() const { return oid.get(); }
-
- bool haveSelect() const { return select != 0; }
- bool haveLimit() const { return resultLimit > 0; }
- bool haveOrderBy() const { return !orderBy.empty(); }
- const QueryOperand* getSelect() const { return select; }
- uint32_t getLimit() const { return resultLimit; }
- const std::string& getOrderBy() const { return orderBy; }
- bool getDecreasing() const { return orderDecreasing; }
-
- void encode(qpid::framing::Buffer& buffer) const;
- bool singleAgent() const { return oid.get() != 0; }
- uint32_t agentBank() const { return singleAgent() ? oid->getAgentBank() : 0; }
-
- std::string packageName;
- std::string className;
- boost::shared_ptr<ObjectId> oid;
- const QueryOperand* select;
- uint32_t resultLimit;
- std::string orderBy;
- bool orderDecreasing;
- };
-}
-}
-
-#endif
diff --git a/qpid/cpp/src/qmf/engine/ResilientConnection.cpp b/qpid/cpp/src/qmf/engine/ResilientConnection.cpp
deleted file mode 100644
index 851193ccc1..0000000000
--- a/qpid/cpp/src/qmf/engine/ResilientConnection.cpp
+++ /dev/null
@@ -1,512 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/ResilientConnection.h"
-#include "qmf/engine/MessageImpl.h"
-#include "qmf/engine/ConnectionSettingsImpl.h"
-#include <qpid/client/Connection.h>
-#include <qpid/client/Session.h>
-#include <qpid/client/MessageListener.h>
-#include <qpid/client/SubscriptionManager.h>
-#include <qpid/client/Message.h>
-#include <qpid/sys/Thread.h>
-#include <qpid/sys/Runnable.h>
-#include <qpid/sys/Mutex.h>
-#include <qpid/sys/Condition.h>
-#include <qpid/sys/Time.h>
-#include <qpid/log/Statement.h>
-#include <qpid/RefCounted.h>
-#include <boost/bind.hpp>
-#include <string>
-#include <deque>
-#include <vector>
-#include <set>
-#include <boost/intrusive_ptr.hpp>
-#include <boost/noncopyable.hpp>
-#include <unistd.h>
-#include <fcntl.h>
-
-using namespace std;
-using namespace qmf::engine;
-using namespace qpid;
-using qpid::sys::Mutex;
-
-namespace qmf {
-namespace engine {
- struct ResilientConnectionEventImpl {
- ResilientConnectionEvent::EventKind kind;
- void* sessionContext;
- string errorText;
- MessageImpl message;
-
- ResilientConnectionEventImpl(ResilientConnectionEvent::EventKind k,
- const MessageImpl& m = MessageImpl()) :
- kind(k), sessionContext(0), message(m) {}
- ResilientConnectionEvent copy();
- };
-
- struct RCSession : public client::MessageListener, public qpid::sys::Runnable, public qpid::RefCounted {
- typedef boost::intrusive_ptr<RCSession> Ptr;
- ResilientConnectionImpl& connImpl;
- string name;
- client::Connection& connection;
- client::Session session;
- client::SubscriptionManager* subscriptions;
- string userId;
- void* userContext;
- vector<string> dests;
- qpid::sys::Thread thread;
-
- RCSession(ResilientConnectionImpl& ci, const string& n, client::Connection& c, void* uc);
- ~RCSession();
- void received(client::Message& msg);
- void run();
- void stop();
- };
-
- class ResilientConnectionImpl : public qpid::sys::Runnable, public boost::noncopyable {
- public:
- ResilientConnectionImpl(const ConnectionSettings& settings);
- ~ResilientConnectionImpl();
-
- bool isConnected() const;
- bool getEvent(ResilientConnectionEvent& event);
- void popEvent();
- bool createSession(const char* name, void* sessionContext, SessionHandle& handle);
- void destroySession(SessionHandle handle);
- void sendMessage(SessionHandle handle, qmf::engine::Message& message);
- void declareQueue(SessionHandle handle, char* queue);
- void deleteQueue(SessionHandle handle, char* queue);
- void bind(SessionHandle handle, char* exchange, char* queue, char* key);
- void unbind(SessionHandle handle, char* exchange, char* queue, char* key);
- void setNotifyFd(int fd);
- void notify();
-
- void run();
- void failure();
- void sessionClosed(RCSession* sess);
-
- void EnqueueEvent(ResilientConnectionEvent::EventKind kind,
- void* sessionContext = 0,
- const MessageImpl& message = MessageImpl(),
- const string& errorText = "");
-
- private:
- int notifyFd;
- bool connected;
- bool shutdown;
- string lastError;
- const ConnectionSettings settings;
- client::Connection connection;
- mutable qpid::sys::Mutex lock;
- int delayMin;
- int delayMax;
- int delayFactor;
- qpid::sys::Condition cond;
- deque<ResilientConnectionEventImpl> eventQueue;
- set<RCSession::Ptr> sessions;
- qpid::sys::Thread connThread;
- };
-}
-}
-
-ResilientConnectionEvent ResilientConnectionEventImpl::copy()
-{
- ResilientConnectionEvent item;
-
- ::memset(&item, 0, sizeof(ResilientConnectionEvent));
- item.kind = kind;
- item.sessionContext = sessionContext;
- item.message = message.copy();
- item.errorText = const_cast<char*>(errorText.c_str());
-
- return item;
-}
-
-RCSession::RCSession(ResilientConnectionImpl& ci, const string& n, client::Connection& c, void* uc) :
- connImpl(ci), name(n), connection(c), session(connection.newSession(name)),
- subscriptions(new client::SubscriptionManager(session)), userContext(uc), thread(*this)
-{
- const qpid::client::ConnectionSettings& operSettings = connection.getNegotiatedSettings();
- userId = operSettings.username;
-}
-
-RCSession::~RCSession()
-{
- subscriptions->stop();
- thread.join();
- session.close();
- delete subscriptions;
-}
-
-void RCSession::run()
-{
- try {
- subscriptions->run();
- } catch (exception& /*e*/) {
- connImpl.sessionClosed(this);
- }
-}
-
-void RCSession::stop()
-{
- subscriptions->stop();
-}
-
-void RCSession::received(client::Message& msg)
-{
- MessageImpl qmsg;
- qmsg.body = msg.getData();
-
- qpid::framing::DeliveryProperties dp = msg.getDeliveryProperties();
- if (dp.hasRoutingKey()) {
- qmsg.routingKey = dp.getRoutingKey();
- }
-
- qpid::framing::MessageProperties mp = msg.getMessageProperties();
- if (mp.hasReplyTo()) {
- const qpid::framing::ReplyTo& rt = mp.getReplyTo();
- qmsg.replyExchange = rt.getExchange();
- qmsg.replyKey = rt.getRoutingKey();
- }
-
- if (mp.hasUserId()) {
- qmsg.userId = mp.getUserId();
- }
-
- connImpl.EnqueueEvent(ResilientConnectionEvent::RECV, userContext, qmsg);
-}
-
-ResilientConnectionImpl::ResilientConnectionImpl(const ConnectionSettings& _settings) :
- notifyFd(-1), connected(false), shutdown(false), settings(_settings), delayMin(1), connThread(*this)
-{
- connection.registerFailureCallback(boost::bind(&ResilientConnectionImpl::failure, this));
- settings.impl->getRetrySettings(&delayMin, &delayMax, &delayFactor);
-}
-
-ResilientConnectionImpl::~ResilientConnectionImpl()
-{
- shutdown = true;
- connected = false;
- cond.notify();
- connThread.join();
- connection.close();
-}
-
-bool ResilientConnectionImpl::isConnected() const
-{
- Mutex::ScopedLock _lock(lock);
- return connected;
-}
-
-bool ResilientConnectionImpl::getEvent(ResilientConnectionEvent& event)
-{
- Mutex::ScopedLock _lock(lock);
- if (eventQueue.empty())
- return false;
- event = eventQueue.front().copy();
- return true;
-}
-
-void ResilientConnectionImpl::popEvent()
-{
- Mutex::ScopedLock _lock(lock);
- if (!eventQueue.empty())
- eventQueue.pop_front();
-}
-
-bool ResilientConnectionImpl::createSession(const char* name, void* sessionContext,
- SessionHandle& handle)
-{
- Mutex::ScopedLock _lock(lock);
- if (!connected)
- return false;
-
- RCSession::Ptr sess = RCSession::Ptr(new RCSession(*this, name, connection, sessionContext));
-
- handle.impl = (void*) sess.get();
- sessions.insert(sess);
-
- return true;
-}
-
-void ResilientConnectionImpl::destroySession(SessionHandle handle)
-{
- Mutex::ScopedLock _lock(lock);
- RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.impl);
- set<RCSession::Ptr>::iterator iter = sessions.find(sess);
- if (iter != sessions.end()) {
- for (vector<string>::iterator dIter = sess->dests.begin(); dIter != sess->dests.end(); dIter++)
- sess->subscriptions->cancel(dIter->c_str());
- sess->subscriptions->stop();
- sess->subscriptions->wait();
-
- sessions.erase(iter);
- return;
- }
-}
-
-void ResilientConnectionImpl::sendMessage(SessionHandle handle, qmf::engine::Message& message)
-{
- Mutex::ScopedLock _lock(lock);
- RCSession::Ptr sess = RCSession::Ptr((RCSession*) handle.impl);
- set<RCSession::Ptr>::iterator iter = sessions.find(sess);
- qpid::client::Message msg;
- string data(message.body, message.length);
- msg.getDeliveryProperties().setRoutingKey(message.routingKey);
- msg.getMessageProperties().setReplyTo(qpid::framing::ReplyTo(message.replyExchange, message.replyKey));
- if (settings.impl->getSendUserId())
- msg.getMessageProperties().setUserId(sess->userId);
- msg.setData(data);
-
- try {
- sess->session.messageTransfer(client::arg::content=msg, client::arg::destination=message.destination);
- } catch(exception& e) {
- QPID_LOG(error, "Session Exception during message-transfer: " << e.what());
- sessions.erase(iter);
- EnqueueEvent(ResilientConnectionEvent::SESSION_CLOSED, (*iter)->userContext);
- }
-}
-
-void ResilientConnectionImpl::declareQueue(SessionHandle handle, char* queue)
-{
- Mutex::ScopedLock _lock(lock);
- RCSession* sess = (RCSession*) handle.impl;
-
- sess->session.queueDeclare(client::arg::queue=queue, client::arg::autoDelete=true, client::arg::exclusive=true);
- sess->subscriptions->setAcceptMode(client::ACCEPT_MODE_NONE);
- sess->subscriptions->setAcquireMode(client::ACQUIRE_MODE_PRE_ACQUIRED);
- sess->subscriptions->subscribe(*sess, queue, queue);
- sess->subscriptions->setFlowControl(queue, client::FlowControl::unlimited());
- sess->dests.push_back(string(queue));
-}
-
-void ResilientConnectionImpl::deleteQueue(SessionHandle handle, char* queue)
-{
- Mutex::ScopedLock _lock(lock);
- RCSession* sess = (RCSession*) handle.impl;
-
- sess->session.queueDelete(client::arg::queue=queue);
- for (vector<string>::iterator iter = sess->dests.begin();
- iter != sess->dests.end(); iter++)
- if (*iter == queue) {
- sess->subscriptions->cancel(queue);
- sess->dests.erase(iter);
- break;
- }
-}
-
-void ResilientConnectionImpl::bind(SessionHandle handle,
- char* exchange, char* queue, char* key)
-{
- Mutex::ScopedLock _lock(lock);
- RCSession* sess = (RCSession*) handle.impl;
-
- sess->session.exchangeBind(client::arg::exchange=exchange, client::arg::queue=queue, client::arg::bindingKey=key);
-}
-
-void ResilientConnectionImpl::unbind(SessionHandle handle,
- char* exchange, char* queue, char* key)
-{
- Mutex::ScopedLock _lock(lock);
- RCSession* sess = (RCSession*) handle.impl;
-
- sess->session.exchangeUnbind(client::arg::exchange=exchange, client::arg::queue=queue, client::arg::bindingKey=key);
-}
-
-void ResilientConnectionImpl::notify()
-{
- if (notifyFd != -1)
- {
- if (::write(notifyFd, ".", 1)) {}
- }
-}
-
-
-void ResilientConnectionImpl::setNotifyFd(int fd)
-{
- notifyFd = fd;
- if (notifyFd > 0) {
- int original = fcntl(notifyFd, F_GETFL);
- fcntl(notifyFd, F_SETFL, O_NONBLOCK | original);
- }
-}
-
-void ResilientConnectionImpl::run()
-{
- int delay(delayMin);
-
- while (true) {
- try {
- QPID_LOG(trace, "Trying to open connection...");
- connection.open(settings.impl->getClientSettings());
- {
- Mutex::ScopedLock _lock(lock);
- connected = true;
- EnqueueEvent(ResilientConnectionEvent::CONNECTED);
-
- while (connected)
- cond.wait(lock);
- delay = delayMin;
-
- while (!sessions.empty()) {
- set<RCSession::Ptr>::iterator iter = sessions.begin();
- RCSession::Ptr sess = *iter;
- sessions.erase(iter);
- EnqueueEvent(ResilientConnectionEvent::SESSION_CLOSED, sess->userContext);
- Mutex::ScopedUnlock _u(lock);
- sess->stop();
-
- // Nullify the intrusive pointer within the scoped unlock, otherwise,
- // the reference is held until overwritted above (under lock) which causes
- // the session destructor to be called with the lock held.
- sess = 0;
- }
-
- EnqueueEvent(ResilientConnectionEvent::DISCONNECTED);
-
- if (shutdown)
- return;
- }
- connection.close();
- } catch (exception &e) {
- QPID_LOG(debug, "connection.open exception: " << e.what());
- Mutex::ScopedLock _lock(lock);
- lastError = e.what();
- if (delay < delayMax)
- delay *= delayFactor;
- }
-
- ::qpid::sys::sleep(delay);
- }
-}
-
-void ResilientConnectionImpl::failure()
-{
- Mutex::ScopedLock _lock(lock);
-
- connected = false;
- lastError = "Closed by Peer";
- cond.notify();
-}
-
-void ResilientConnectionImpl::sessionClosed(RCSession*)
-{
- Mutex::ScopedLock _lock(lock);
- connected = false;
- lastError = "Closed due to Session failure";
- cond.notify();
-}
-
-void ResilientConnectionImpl::EnqueueEvent(ResilientConnectionEvent::EventKind kind,
- void* sessionContext,
- const MessageImpl& message,
- const string& errorText)
-{
- {
- Mutex::ScopedLock _lock(lock);
- ResilientConnectionEventImpl event(kind, message);
-
- event.sessionContext = sessionContext;
- event.errorText = errorText;
-
- eventQueue.push_back(event);
- }
-
- if (notifyFd != -1)
- {
- if (::write(notifyFd, ".", 1)) {}
- }
-}
-
-
-//==================================================================
-// Wrappers
-//==================================================================
-
-ResilientConnection::ResilientConnection(const ConnectionSettings& settings)
-{
- impl = new ResilientConnectionImpl(settings);
-}
-
-ResilientConnection::~ResilientConnection()
-{
- delete impl;
-}
-
-bool ResilientConnection::isConnected() const
-{
- return impl->isConnected();
-}
-
-bool ResilientConnection::getEvent(ResilientConnectionEvent& event)
-{
- return impl->getEvent(event);
-}
-
-void ResilientConnection::popEvent()
-{
- impl->popEvent();
-}
-
-bool ResilientConnection::createSession(const char* name, void* sessionContext, SessionHandle& handle)
-{
- return impl->createSession(name, sessionContext, handle);
-}
-
-void ResilientConnection::destroySession(SessionHandle handle)
-{
- impl->destroySession(handle);
-}
-
-void ResilientConnection::sendMessage(SessionHandle handle, qmf::engine::Message& message)
-{
- impl->sendMessage(handle, message);
-}
-
-void ResilientConnection::declareQueue(SessionHandle handle, char* queue)
-{
- impl->declareQueue(handle, queue);
-}
-
-void ResilientConnection::deleteQueue(SessionHandle handle, char* queue)
-{
- impl->deleteQueue(handle, queue);
-}
-
-void ResilientConnection::bind(SessionHandle handle, char* exchange, char* queue, char* key)
-{
- impl->bind(handle, exchange, queue, key);
-}
-
-void ResilientConnection::unbind(SessionHandle handle, char* exchange, char* queue, char* key)
-{
- impl->unbind(handle, exchange, queue, key);
-}
-
-void ResilientConnection::setNotifyFd(int fd)
-{
- impl->setNotifyFd(fd);
-}
-
-void ResilientConnection::notify()
-{
- impl->notify();
-}
-
diff --git a/qpid/cpp/src/qmf/engine/SchemaImpl.cpp b/qpid/cpp/src/qmf/engine/SchemaImpl.cpp
deleted file mode 100644
index 9d363d3012..0000000000
--- a/qpid/cpp/src/qmf/engine/SchemaImpl.cpp
+++ /dev/null
@@ -1,610 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/SchemaImpl.h"
-#include <qpid/framing/Buffer.h>
-#include <qpid/framing/FieldTable.h>
-#include <qpid/framing/Uuid.h>
-#include <string.h>
-#include <string>
-#include <vector>
-#include <sstream>
-
-using namespace std;
-using namespace qmf::engine;
-using qpid::framing::Buffer;
-using qpid::framing::FieldTable;
-using qpid::framing::Uuid;
-
-SchemaHash::SchemaHash()
-{
- for (int idx = 0; idx < 16; idx++)
- hash.b[idx] = 0x5A;
-}
-
-void SchemaHash::encode(Buffer& buffer) const
-{
- buffer.putBin128(hash.b);
-}
-
-void SchemaHash::decode(Buffer& buffer)
-{
- buffer.getBin128(hash.b);
-}
-
-void SchemaHash::update(uint8_t data)
-{
- update((char*) &data, 1);
-}
-
-void SchemaHash::update(const char* data, uint32_t len)
-{
- uint64_t* first = &hash.q[0];
- uint64_t* second = &hash.q[1];
- for (uint32_t idx = 0; idx < len; idx++) {
- *first = *first ^ (uint64_t) data[idx];
- *second = *second << 1;
- *second |= ((*first & 0x8000000000000000LL) >> 63);
- *first = *first << 1;
- *first = *first ^ *second;
- }
-}
-
-bool SchemaHash::operator==(const SchemaHash& other) const
-{
- return ::memcmp(&hash, &other.hash, 16) == 0;
-}
-
-bool SchemaHash::operator<(const SchemaHash& other) const
-{
- return ::memcmp(&hash, &other.hash, 16) < 0;
-}
-
-bool SchemaHash::operator>(const SchemaHash& other) const
-{
- return ::memcmp(&hash, &other.hash, 16) > 0;
-}
-
-SchemaArgumentImpl::SchemaArgumentImpl(Buffer& buffer)
-{
- FieldTable map;
- map.decode(buffer);
-
- name = map.getAsString("name");
- typecode = (Typecode) map.getAsInt("type");
- unit = map.getAsString("unit");
- description = map.getAsString("desc");
-
- dir = DIR_IN;
- string dstr(map.getAsString("dir"));
- if (dstr == "O")
- dir = DIR_OUT;
- else if (dstr == "IO")
- dir = DIR_IN_OUT;
-}
-
-SchemaArgument* SchemaArgumentImpl::factory(Buffer& buffer)
-{
- SchemaArgumentImpl* impl(new SchemaArgumentImpl(buffer));
- return new SchemaArgument(impl);
-}
-
-void SchemaArgumentImpl::encode(Buffer& buffer) const
-{
- FieldTable map;
-
- map.setString("name", name);
- map.setInt("type", (int) typecode);
- if (dir == DIR_IN)
- map.setString("dir", "I");
- else if (dir == DIR_OUT)
- map.setString("dir", "O");
- else
- map.setString("dir", "IO");
- if (!unit.empty())
- map.setString("unit", unit);
- if (!description.empty())
- map.setString("desc", description);
-
- map.encode(buffer);
-}
-
-void SchemaArgumentImpl::updateHash(SchemaHash& hash) const
-{
- hash.update(name);
- hash.update(typecode);
- hash.update(dir);
- hash.update(unit);
- hash.update(description);
-}
-
-SchemaMethodImpl::SchemaMethodImpl(Buffer& buffer)
-{
- FieldTable map;
- int argCount;
-
- map.decode(buffer);
- name = map.getAsString("name");
- argCount = map.getAsInt("argCount");
- description = map.getAsString("desc");
-
- for (int idx = 0; idx < argCount; idx++) {
- SchemaArgument* arg = SchemaArgumentImpl::factory(buffer);
- addArgument(arg);
- }
-}
-
-SchemaMethod* SchemaMethodImpl::factory(Buffer& buffer)
-{
- SchemaMethodImpl* impl(new SchemaMethodImpl(buffer));
- return new SchemaMethod(impl);
-}
-
-void SchemaMethodImpl::encode(Buffer& buffer) const
-{
- FieldTable map;
-
- map.setString("name", name);
- map.setInt("argCount", arguments.size());
- if (!description.empty())
- map.setString("desc", description);
- map.encode(buffer);
-
- for (vector<const SchemaArgument*>::const_iterator iter = arguments.begin();
- iter != arguments.end(); iter++)
- (*iter)->impl->encode(buffer);
-}
-
-void SchemaMethodImpl::addArgument(const SchemaArgument* argument)
-{
- arguments.push_back(argument);
-}
-
-const SchemaArgument* SchemaMethodImpl::getArgument(int idx) const
-{
- int count = 0;
- for (vector<const SchemaArgument*>::const_iterator iter = arguments.begin();
- iter != arguments.end(); iter++, count++)
- if (idx == count)
- return (*iter);
- return 0;
-}
-
-void SchemaMethodImpl::updateHash(SchemaHash& hash) const
-{
- hash.update(name);
- hash.update(description);
- for (vector<const SchemaArgument*>::const_iterator iter = arguments.begin();
- iter != arguments.end(); iter++)
- (*iter)->impl->updateHash(hash);
-}
-
-SchemaPropertyImpl::SchemaPropertyImpl(Buffer& buffer)
-{
- FieldTable map;
- map.decode(buffer);
-
- name = map.getAsString("name");
- typecode = (Typecode) map.getAsInt("type");
- access = (Access) map.getAsInt("access");
- index = map.getAsInt("index") != 0;
- optional = map.getAsInt("optional") != 0;
- unit = map.getAsString("unit");
- description = map.getAsString("desc");
-}
-
-SchemaProperty* SchemaPropertyImpl::factory(Buffer& buffer)
-{
- SchemaPropertyImpl* impl(new SchemaPropertyImpl(buffer));
- return new SchemaProperty(impl);
-}
-
-void SchemaPropertyImpl::encode(Buffer& buffer) const
-{
- FieldTable map;
-
- map.setString("name", name);
- map.setInt("type", (int) typecode);
- map.setInt("access", (int) access);
- map.setInt("index", index ? 1 : 0);
- map.setInt("optional", optional ? 1 : 0);
- if (!unit.empty())
- map.setString("unit", unit);
- if (!description.empty())
- map.setString("desc", description);
-
- map.encode(buffer);
-}
-
-void SchemaPropertyImpl::updateHash(SchemaHash& hash) const
-{
- hash.update(name);
- hash.update(typecode);
- hash.update(access);
- hash.update(index);
- hash.update(optional);
- hash.update(unit);
- hash.update(description);
-}
-
-SchemaStatisticImpl::SchemaStatisticImpl(Buffer& buffer)
-{
- FieldTable map;
- map.decode(buffer);
-
- name = map.getAsString("name");
- typecode = (Typecode) map.getAsInt("type");
- unit = map.getAsString("unit");
- description = map.getAsString("desc");
-}
-
-SchemaStatistic* SchemaStatisticImpl::factory(Buffer& buffer)
-{
- SchemaStatisticImpl* impl(new SchemaStatisticImpl(buffer));
- return new SchemaStatistic(impl);
-}
-
-void SchemaStatisticImpl::encode(Buffer& buffer) const
-{
- FieldTable map;
-
- map.setString("name", name);
- map.setInt("type", (int) typecode);
- if (!unit.empty())
- map.setString("unit", unit);
- if (!description.empty())
- map.setString("desc", description);
-
- map.encode(buffer);
-}
-
-void SchemaStatisticImpl::updateHash(SchemaHash& hash) const
-{
- hash.update(name);
- hash.update(typecode);
- hash.update(unit);
- hash.update(description);
-}
-
-SchemaClassKeyImpl::SchemaClassKeyImpl(const string& p, const string& n, const SchemaHash& h) : package(p), name(n), hash(h) {}
-
-SchemaClassKeyImpl::SchemaClassKeyImpl(Buffer& buffer) : package(packageContainer), name(nameContainer), hash(hashContainer)
-{
- buffer.getShortString(packageContainer);
- buffer.getShortString(nameContainer);
- hashContainer.decode(buffer);
-}
-
-SchemaClassKey* SchemaClassKeyImpl::factory(const string& package, const string& name, const SchemaHash& hash)
-{
- SchemaClassKeyImpl* impl(new SchemaClassKeyImpl(package, name, hash));
- return new SchemaClassKey(impl);
-}
-
-SchemaClassKey* SchemaClassKeyImpl::factory(Buffer& buffer)
-{
- SchemaClassKeyImpl* impl(new SchemaClassKeyImpl(buffer));
- return new SchemaClassKey(impl);
-}
-
-void SchemaClassKeyImpl::encode(Buffer& buffer) const
-{
- buffer.putShortString(package);
- buffer.putShortString(name);
- hash.encode(buffer);
-}
-
-bool SchemaClassKeyImpl::operator==(const SchemaClassKeyImpl& other) const
-{
- return package == other.package &&
- name == other.name &&
- hash == other.hash;
-}
-
-bool SchemaClassKeyImpl::operator<(const SchemaClassKeyImpl& other) const
-{
- if (package < other.package) return true;
- if (package > other.package) return false;
- if (name < other.name) return true;
- if (name > other.name) return false;
- return hash < other.hash;
-}
-
-const string& SchemaClassKeyImpl::str() const
-{
- Uuid printableHash(hash.get());
- stringstream str;
- str << package << ":" << name << "(" << printableHash << ")";
- repr = str.str();
- return repr;
-}
-
-SchemaObjectClassImpl::SchemaObjectClassImpl(Buffer& buffer) : hasHash(true), classKey(SchemaClassKeyImpl::factory(package, name, hash))
-{
- buffer.getShortString(package);
- buffer.getShortString(name);
- hash.decode(buffer);
-
- uint16_t propCount = buffer.getShort();
- uint16_t statCount = buffer.getShort();
- uint16_t methodCount = buffer.getShort();
-
- for (uint16_t idx = 0; idx < propCount; idx++) {
- const SchemaProperty* property = SchemaPropertyImpl::factory(buffer);
- addProperty(property);
- }
-
- for (uint16_t idx = 0; idx < statCount; idx++) {
- const SchemaStatistic* statistic = SchemaStatisticImpl::factory(buffer);
- addStatistic(statistic);
- }
-
- for (uint16_t idx = 0; idx < methodCount; idx++) {
- SchemaMethod* method = SchemaMethodImpl::factory(buffer);
- addMethod(method);
- }
-}
-
-SchemaObjectClass* SchemaObjectClassImpl::factory(Buffer& buffer)
-{
- SchemaObjectClassImpl* impl(new SchemaObjectClassImpl(buffer));
- return new SchemaObjectClass(impl);
-}
-
-void SchemaObjectClassImpl::encode(Buffer& buffer) const
-{
- buffer.putOctet((uint8_t) CLASS_OBJECT);
- buffer.putShortString(package);
- buffer.putShortString(name);
- hash.encode(buffer);
- //buffer.putOctet(0); // No parent class
- buffer.putShort((uint16_t) properties.size());
- buffer.putShort((uint16_t) statistics.size());
- buffer.putShort((uint16_t) methods.size());
-
- for (vector<const SchemaProperty*>::const_iterator iter = properties.begin();
- iter != properties.end(); iter++)
- (*iter)->impl->encode(buffer);
- for (vector<const SchemaStatistic*>::const_iterator iter = statistics.begin();
- iter != statistics.end(); iter++)
- (*iter)->impl->encode(buffer);
- for (vector<const SchemaMethod*>::const_iterator iter = methods.begin();
- iter != methods.end(); iter++)
- (*iter)->impl->encode(buffer);
-}
-
-const SchemaClassKey* SchemaObjectClassImpl::getClassKey() const
-{
- if (!hasHash) {
- hasHash = true;
- hash.update(package);
- hash.update(name);
- for (vector<const SchemaProperty*>::const_iterator iter = properties.begin();
- iter != properties.end(); iter++)
- (*iter)->impl->updateHash(hash);
- for (vector<const SchemaStatistic*>::const_iterator iter = statistics.begin();
- iter != statistics.end(); iter++)
- (*iter)->impl->updateHash(hash);
- for (vector<const SchemaMethod*>::const_iterator iter = methods.begin();
- iter != methods.end(); iter++)
- (*iter)->impl->updateHash(hash);
- }
-
- return classKey.get();
-}
-
-void SchemaObjectClassImpl::addProperty(const SchemaProperty* property)
-{
- properties.push_back(property);
-}
-
-void SchemaObjectClassImpl::addStatistic(const SchemaStatistic* statistic)
-{
- statistics.push_back(statistic);
-}
-
-void SchemaObjectClassImpl::addMethod(const SchemaMethod* method)
-{
- methods.push_back(method);
-}
-
-const SchemaProperty* SchemaObjectClassImpl::getProperty(int idx) const
-{
- int count = 0;
- for (vector<const SchemaProperty*>::const_iterator iter = properties.begin();
- iter != properties.end(); iter++, count++)
- if (idx == count)
- return *iter;
- return 0;
-}
-
-const SchemaStatistic* SchemaObjectClassImpl::getStatistic(int idx) const
-{
- int count = 0;
- for (vector<const SchemaStatistic*>::const_iterator iter = statistics.begin();
- iter != statistics.end(); iter++, count++)
- if (idx == count)
- return *iter;
- return 0;
-}
-
-const SchemaMethod* SchemaObjectClassImpl::getMethod(int idx) const
-{
- int count = 0;
- for (vector<const SchemaMethod*>::const_iterator iter = methods.begin();
- iter != methods.end(); iter++, count++)
- if (idx == count)
- return *iter;
- return 0;
-}
-
-SchemaEventClassImpl::SchemaEventClassImpl(Buffer& buffer) : hasHash(true), classKey(SchemaClassKeyImpl::factory(package, name, hash))
-{
- buffer.getShortString(package);
- buffer.getShortString(name);
- hash.decode(buffer);
-
- uint16_t argCount = buffer.getShort();
-
- for (uint16_t idx = 0; idx < argCount; idx++) {
- SchemaArgument* argument = SchemaArgumentImpl::factory(buffer);
- addArgument(argument);
- }
-}
-
-SchemaEventClass* SchemaEventClassImpl::factory(Buffer& buffer)
-{
- SchemaEventClassImpl* impl(new SchemaEventClassImpl(buffer));
- return new SchemaEventClass(impl);
-}
-
-void SchemaEventClassImpl::encode(Buffer& buffer) const
-{
- buffer.putOctet((uint8_t) CLASS_EVENT);
- buffer.putShortString(package);
- buffer.putShortString(name);
- hash.encode(buffer);
- buffer.putShort((uint16_t) arguments.size());
-
- for (vector<const SchemaArgument*>::const_iterator iter = arguments.begin();
- iter != arguments.end(); iter++)
- (*iter)->impl->encode(buffer);
-}
-
-const SchemaClassKey* SchemaEventClassImpl::getClassKey() const
-{
- if (!hasHash) {
- hasHash = true;
- hash.update(package);
- hash.update(name);
- for (vector<const SchemaArgument*>::const_iterator iter = arguments.begin();
- iter != arguments.end(); iter++)
- (*iter)->impl->updateHash(hash);
- }
- return classKey.get();
-}
-
-void SchemaEventClassImpl::addArgument(const SchemaArgument* argument)
-{
- arguments.push_back(argument);
-}
-
-const SchemaArgument* SchemaEventClassImpl::getArgument(int idx) const
-{
- int count = 0;
- for (vector<const SchemaArgument*>::const_iterator iter = arguments.begin();
- iter != arguments.end(); iter++, count++)
- if (idx == count)
- return (*iter);
- return 0;
-}
-
-
-//==================================================================
-// Wrappers
-//==================================================================
-
-SchemaArgument::SchemaArgument(const char* name, Typecode typecode) { impl = new SchemaArgumentImpl(name, typecode); }
-SchemaArgument::SchemaArgument(SchemaArgumentImpl* i) : impl(i) {}
-SchemaArgument::SchemaArgument(const SchemaArgument& from) : impl(new SchemaArgumentImpl(*(from.impl))) {}
-SchemaArgument::~SchemaArgument() { delete impl; }
-void SchemaArgument::setDirection(Direction dir) { impl->setDirection(dir); }
-void SchemaArgument::setUnit(const char* val) { impl->setUnit(val); }
-void SchemaArgument::setDesc(const char* desc) { impl->setDesc(desc); }
-const char* SchemaArgument::getName() const { return impl->getName().c_str(); }
-Typecode SchemaArgument::getType() const { return impl->getType(); }
-Direction SchemaArgument::getDirection() const { return impl->getDirection(); }
-const char* SchemaArgument::getUnit() const { return impl->getUnit().c_str(); }
-const char* SchemaArgument::getDesc() const { return impl->getDesc().c_str(); }
-
-SchemaMethod::SchemaMethod(const char* name) : impl(new SchemaMethodImpl(name)) {}
-SchemaMethod::SchemaMethod(SchemaMethodImpl* i) : impl(i) {}
-SchemaMethod::SchemaMethod(const SchemaMethod& from) : impl(new SchemaMethodImpl(*(from.impl))) {}
-SchemaMethod::~SchemaMethod() { delete impl; }
-void SchemaMethod::addArgument(const SchemaArgument* argument) { impl->addArgument(argument); }
-void SchemaMethod::setDesc(const char* desc) { impl->setDesc(desc); }
-const char* SchemaMethod::getName() const { return impl->getName().c_str(); }
-const char* SchemaMethod::getDesc() const { return impl->getDesc().c_str(); }
-int SchemaMethod::getArgumentCount() const { return impl->getArgumentCount(); }
-const SchemaArgument* SchemaMethod::getArgument(int idx) const { return impl->getArgument(idx); }
-
-SchemaProperty::SchemaProperty(const char* name, Typecode typecode) : impl(new SchemaPropertyImpl(name, typecode)) {}
-SchemaProperty::SchemaProperty(SchemaPropertyImpl* i) : impl(i) {}
-SchemaProperty::SchemaProperty(const SchemaProperty& from) : impl(new SchemaPropertyImpl(*(from.impl))) {}
-SchemaProperty::~SchemaProperty() { delete impl; }
-void SchemaProperty::setAccess(Access access) { impl->setAccess(access); }
-void SchemaProperty::setIndex(bool val) { impl->setIndex(val); }
-void SchemaProperty::setOptional(bool val) { impl->setOptional(val); }
-void SchemaProperty::setUnit(const char* val) { impl->setUnit(val); }
-void SchemaProperty::setDesc(const char* desc) { impl->setDesc(desc); }
-const char* SchemaProperty::getName() const { return impl->getName().c_str(); }
-Typecode SchemaProperty::getType() const { return impl->getType(); }
-Access SchemaProperty::getAccess() const { return impl->getAccess(); }
-bool SchemaProperty::isIndex() const { return impl->isIndex(); }
-bool SchemaProperty::isOptional() const { return impl->isOptional(); }
-const char* SchemaProperty::getUnit() const { return impl->getUnit().c_str(); }
-const char* SchemaProperty::getDesc() const { return impl->getDesc().c_str(); }
-
-SchemaStatistic::SchemaStatistic(const char* name, Typecode typecode) : impl(new SchemaStatisticImpl(name, typecode)) {}
-SchemaStatistic::SchemaStatistic(SchemaStatisticImpl* i) : impl(i) {}
-SchemaStatistic::SchemaStatistic(const SchemaStatistic& from) : impl(new SchemaStatisticImpl(*(from.impl))) {}
-SchemaStatistic::~SchemaStatistic() { delete impl; }
-void SchemaStatistic::setUnit(const char* val) { impl->setUnit(val); }
-void SchemaStatistic::setDesc(const char* desc) { impl->setDesc(desc); }
-const char* SchemaStatistic::getName() const { return impl->getName().c_str(); }
-Typecode SchemaStatistic::getType() const { return impl->getType(); }
-const char* SchemaStatistic::getUnit() const { return impl->getUnit().c_str(); }
-const char* SchemaStatistic::getDesc() const { return impl->getDesc().c_str(); }
-
-SchemaClassKey::SchemaClassKey(SchemaClassKeyImpl* i) : impl(i) {}
-SchemaClassKey::SchemaClassKey(const SchemaClassKey& from) : impl(new SchemaClassKeyImpl(*(from.impl))) {}
-SchemaClassKey::~SchemaClassKey() { delete impl; }
-const char* SchemaClassKey::getPackageName() const { return impl->getPackageName().c_str(); }
-const char* SchemaClassKey::getClassName() const { return impl->getClassName().c_str(); }
-const uint8_t* SchemaClassKey::getHash() const { return impl->getHash(); }
-const char* SchemaClassKey::asString() const { return impl->str().c_str(); }
-bool SchemaClassKey::operator==(const SchemaClassKey& other) const { return *impl == *(other.impl); }
-bool SchemaClassKey::operator<(const SchemaClassKey& other) const { return *impl < *(other.impl); }
-
-SchemaObjectClass::SchemaObjectClass(const char* package, const char* name) : impl(new SchemaObjectClassImpl(package, name)) {}
-SchemaObjectClass::SchemaObjectClass(SchemaObjectClassImpl* i) : impl(i) {}
-SchemaObjectClass::SchemaObjectClass(const SchemaObjectClass& from) : impl(new SchemaObjectClassImpl(*(from.impl))) {}
-SchemaObjectClass::~SchemaObjectClass() { delete impl; }
-void SchemaObjectClass::addProperty(const SchemaProperty* property) { impl->addProperty(property); }
-void SchemaObjectClass::addStatistic(const SchemaStatistic* statistic) { impl->addStatistic(statistic); }
-void SchemaObjectClass::addMethod(const SchemaMethod* method) { impl->addMethod(method); }
-const SchemaClassKey* SchemaObjectClass::getClassKey() const { return impl->getClassKey(); }
-int SchemaObjectClass::getPropertyCount() const { return impl->getPropertyCount(); }
-int SchemaObjectClass::getStatisticCount() const { return impl->getStatisticCount(); }
-int SchemaObjectClass::getMethodCount() const { return impl->getMethodCount(); }
-const SchemaProperty* SchemaObjectClass::getProperty(int idx) const { return impl->getProperty(idx); }
-const SchemaStatistic* SchemaObjectClass::getStatistic(int idx) const { return impl->getStatistic(idx); }
-const SchemaMethod* SchemaObjectClass::getMethod(int idx) const { return impl->getMethod(idx); }
-
-SchemaEventClass::SchemaEventClass(const char* package, const char* name, Severity s) : impl(new SchemaEventClassImpl(package, name, s)) {}
-SchemaEventClass::SchemaEventClass(SchemaEventClassImpl* i) : impl(i) {}
-SchemaEventClass::SchemaEventClass(const SchemaEventClass& from) : impl(new SchemaEventClassImpl(*(from.impl))) {}
-SchemaEventClass::~SchemaEventClass() { delete impl; }
-void SchemaEventClass::addArgument(const SchemaArgument* argument) { impl->addArgument(argument); }
-void SchemaEventClass::setDesc(const char* desc) { impl->setDesc(desc); }
-const SchemaClassKey* SchemaEventClass::getClassKey() const { return impl->getClassKey(); }
-Severity SchemaEventClass::getSeverity() const { return impl->getSeverity(); }
-int SchemaEventClass::getArgumentCount() const { return impl->getArgumentCount(); }
-const SchemaArgument* SchemaEventClass::getArgument(int idx) const { return impl->getArgument(idx); }
-
diff --git a/qpid/cpp/src/qmf/engine/SchemaImpl.h b/qpid/cpp/src/qmf/engine/SchemaImpl.h
deleted file mode 100644
index 683fb6f8f0..0000000000
--- a/qpid/cpp/src/qmf/engine/SchemaImpl.h
+++ /dev/null
@@ -1,230 +0,0 @@
-#ifndef _QmfEngineSchemaImpl_
-#define _QmfEngineSchemaImpl_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/Schema.h"
-#include "qpid/framing/Buffer.h"
-
-#include <string>
-#include <vector>
-#include <memory>
-
-namespace qmf {
-namespace engine {
-
- // TODO: Destructors for schema classes
- // TODO: Add "frozen" attribute for schema classes so they can't be modified after
- // they've been registered.
-
- class SchemaHash {
- union h {
- uint8_t b[16];
- uint64_t q[2];
- } hash;
- public:
- SchemaHash();
- void encode(qpid::framing::Buffer& buffer) const;
- void decode(qpid::framing::Buffer& buffer);
- void update(const char* data, uint32_t len);
- void update(uint8_t data);
- void update(const std::string& data) { update(data.c_str(), data.size()); }
- void update(Typecode t) { update((uint8_t) t); }
- void update(Direction d) { update((uint8_t) d); }
- void update(Access a) { update((uint8_t) a); }
- void update(bool b) { update((uint8_t) (b ? 1 : 0)); }
- const uint8_t* get() const { return hash.b; }
- bool operator==(const SchemaHash& other) const;
- bool operator<(const SchemaHash& other) const;
- bool operator>(const SchemaHash& other) const;
- };
-
- struct SchemaArgumentImpl {
- std::string name;
- Typecode typecode;
- Direction dir;
- std::string unit;
- std::string description;
-
- SchemaArgumentImpl(const char* n, Typecode t) : name(n), typecode(t), dir(DIR_IN) {}
- SchemaArgumentImpl(qpid::framing::Buffer& buffer);
- static SchemaArgument* factory(qpid::framing::Buffer& buffer);
- void encode(qpid::framing::Buffer& buffer) const;
- void setDirection(Direction d) { dir = d; }
- void setUnit(const char* val) { unit = val; }
- void setDesc(const char* desc) { description = desc; }
- const std::string& getName() const { return name; }
- Typecode getType() const { return typecode; }
- Direction getDirection() const { return dir; }
- const std::string& getUnit() const { return unit; }
- const std::string& getDesc() const { return description; }
- void updateHash(SchemaHash& hash) const;
- };
-
- struct SchemaMethodImpl {
- std::string name;
- std::string description;
- std::vector<const SchemaArgument*> arguments;
-
- SchemaMethodImpl(const char* n) : name(n) {}
- SchemaMethodImpl(qpid::framing::Buffer& buffer);
- static SchemaMethod* factory(qpid::framing::Buffer& buffer);
- void encode(qpid::framing::Buffer& buffer) const;
- void addArgument(const SchemaArgument* argument);
- void setDesc(const char* desc) { description = desc; }
- const std::string& getName() const { return name; }
- const std::string& getDesc() const { return description; }
- int getArgumentCount() const { return arguments.size(); }
- const SchemaArgument* getArgument(int idx) const;
- void updateHash(SchemaHash& hash) const;
- };
-
- struct SchemaPropertyImpl {
- std::string name;
- Typecode typecode;
- Access access;
- bool index;
- bool optional;
- std::string unit;
- std::string description;
-
- SchemaPropertyImpl(const char* n, Typecode t) : name(n), typecode(t), access(ACCESS_READ_ONLY), index(false), optional(false) {}
- SchemaPropertyImpl(qpid::framing::Buffer& buffer);
- static SchemaProperty* factory(qpid::framing::Buffer& buffer);
- void encode(qpid::framing::Buffer& buffer) const;
- void setAccess(Access a) { access = a; }
- void setIndex(bool val) { index = val; }
- void setOptional(bool val) { optional = val; }
- void setUnit(const char* val) { unit = val; }
- void setDesc(const char* desc) { description = desc; }
- const std::string& getName() const { return name; }
- Typecode getType() const { return typecode; }
- Access getAccess() const { return access; }
- bool isIndex() const { return index; }
- bool isOptional() const { return optional; }
- const std::string& getUnit() const { return unit; }
- const std::string& getDesc() const { return description; }
- void updateHash(SchemaHash& hash) const;
- };
-
- struct SchemaStatisticImpl {
- std::string name;
- Typecode typecode;
- std::string unit;
- std::string description;
-
- SchemaStatisticImpl(const char* n, Typecode t) : name(n), typecode(t) {}
- SchemaStatisticImpl(qpid::framing::Buffer& buffer);
- static SchemaStatistic* factory(qpid::framing::Buffer& buffer);
- void encode(qpid::framing::Buffer& buffer) const;
- void setUnit(const char* val) { unit = val; }
- void setDesc(const char* desc) { description = desc; }
- const std::string& getName() const { return name; }
- Typecode getType() const { return typecode; }
- const std::string& getUnit() const { return unit; }
- const std::string& getDesc() const { return description; }
- void updateHash(SchemaHash& hash) const;
- };
-
- struct SchemaClassKeyImpl {
- const std::string& package;
- const std::string& name;
- const SchemaHash& hash;
- mutable std::string repr;
-
- // The *Container elements are only used if there isn't an external place to
- // store these values.
- std::string packageContainer;
- std::string nameContainer;
- SchemaHash hashContainer;
-
- SchemaClassKeyImpl(const std::string& package, const std::string& name, const SchemaHash& hash);
- SchemaClassKeyImpl(qpid::framing::Buffer& buffer);
- static SchemaClassKey* factory(const std::string& package, const std::string& name, const SchemaHash& hash);
- static SchemaClassKey* factory(qpid::framing::Buffer& buffer);
-
- const std::string& getPackageName() const { return package; }
- const std::string& getClassName() const { return name; }
- const uint8_t* getHash() const { return hash.get(); }
-
- void encode(qpid::framing::Buffer& buffer) const;
- bool operator==(const SchemaClassKeyImpl& other) const;
- bool operator<(const SchemaClassKeyImpl& other) const;
- const std::string& str() const;
- };
-
- struct SchemaObjectClassImpl {
- std::string package;
- std::string name;
- mutable SchemaHash hash;
- mutable bool hasHash;
- std::auto_ptr<SchemaClassKey> classKey;
- std::vector<const SchemaProperty*> properties;
- std::vector<const SchemaStatistic*> statistics;
- std::vector<const SchemaMethod*> methods;
-
- SchemaObjectClassImpl(const char* p, const char* n) :
- package(p), name(n), hasHash(false), classKey(SchemaClassKeyImpl::factory(package, name, hash)) {}
- SchemaObjectClassImpl(qpid::framing::Buffer& buffer);
- static SchemaObjectClass* factory(qpid::framing::Buffer& buffer);
-
- void encode(qpid::framing::Buffer& buffer) const;
- void addProperty(const SchemaProperty* property);
- void addStatistic(const SchemaStatistic* statistic);
- void addMethod(const SchemaMethod* method);
-
- const SchemaClassKey* getClassKey() const;
- int getPropertyCount() const { return properties.size(); }
- int getStatisticCount() const { return statistics.size(); }
- int getMethodCount() const { return methods.size(); }
- const SchemaProperty* getProperty(int idx) const;
- const SchemaStatistic* getStatistic(int idx) const;
- const SchemaMethod* getMethod(int idx) const;
- };
-
- struct SchemaEventClassImpl {
- std::string package;
- std::string name;
- mutable SchemaHash hash;
- mutable bool hasHash;
- std::auto_ptr<SchemaClassKey> classKey;
- std::string description;
- Severity severity;
- std::vector<const SchemaArgument*> arguments;
-
- SchemaEventClassImpl(const char* p, const char* n, Severity sev) :
- package(p), name(n), hasHash(false), classKey(SchemaClassKeyImpl::factory(package, name, hash)), severity(sev) {}
- SchemaEventClassImpl(qpid::framing::Buffer& buffer);
- static SchemaEventClass* factory(qpid::framing::Buffer& buffer);
-
- void encode(qpid::framing::Buffer& buffer) const;
- void addArgument(const SchemaArgument* argument);
- void setDesc(const char* desc) { description = desc; }
-
- const SchemaClassKey* getClassKey() const;
- Severity getSeverity() const { return severity; }
- int getArgumentCount() const { return arguments.size(); }
- const SchemaArgument* getArgument(int idx) const;
- };
-}
-}
-
-#endif
-
diff --git a/qpid/cpp/src/qmf/engine/SequenceManager.cpp b/qpid/cpp/src/qmf/engine/SequenceManager.cpp
deleted file mode 100644
index 4a4644a8b9..0000000000
--- a/qpid/cpp/src/qmf/engine/SequenceManager.cpp
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/SequenceManager.h"
-
-using namespace std;
-using namespace qmf::engine;
-using namespace qpid::sys;
-
-SequenceManager::SequenceManager() : nextSequence(1) {}
-
-void SequenceManager::setUnsolicitedContext(SequenceContext::Ptr ctx)
-{
- unsolicitedContext = ctx;
-}
-
-uint32_t SequenceManager::reserve(SequenceContext::Ptr ctx)
-{
- Mutex::ScopedLock _lock(lock);
- if (ctx.get() == 0)
- ctx = unsolicitedContext;
- uint32_t seq = nextSequence;
- while (contextMap.find(seq) != contextMap.end())
- seq = seq < 0xFFFFFFFF ? seq + 1 : 1;
- nextSequence = seq < 0xFFFFFFFF ? seq + 1 : 1;
- contextMap[seq] = ctx;
- ctx->reserve();
- return seq;
-}
-
-void SequenceManager::release(uint32_t sequence)
-{
- Mutex::ScopedLock _lock(lock);
-
- if (sequence == 0) {
- if (unsolicitedContext.get() != 0)
- unsolicitedContext->release();
- return;
- }
-
- map<uint32_t, SequenceContext::Ptr>::iterator iter = contextMap.find(sequence);
- if (iter != contextMap.end()) {
- if (iter->second != 0)
- iter->second->release();
- contextMap.erase(iter);
- }
-}
-
-void SequenceManager::releaseAll()
-{
- Mutex::ScopedLock _lock(lock);
- contextMap.clear();
-}
-
-void SequenceManager::dispatch(uint8_t opcode, uint32_t sequence, const string& routingKey, qpid::framing::Buffer& buffer)
-{
- Mutex::ScopedLock _lock(lock);
- bool done;
-
- if (sequence == 0) {
- if (unsolicitedContext.get() != 0) {
- done = unsolicitedContext->handleMessage(opcode, sequence, routingKey, buffer);
- if (done)
- unsolicitedContext->release();
- }
- return;
- }
-
- map<uint32_t, SequenceContext::Ptr>::iterator iter = contextMap.find(sequence);
- if (iter != contextMap.end()) {
- if (iter->second != 0) {
- done = iter->second->handleMessage(opcode, sequence, routingKey, buffer);
- if (done) {
- iter->second->release();
- contextMap.erase(iter);
- }
- }
- }
-}
-
diff --git a/qpid/cpp/src/qmf/engine/SequenceManager.h b/qpid/cpp/src/qmf/engine/SequenceManager.h
deleted file mode 100644
index 9e47e38610..0000000000
--- a/qpid/cpp/src/qmf/engine/SequenceManager.h
+++ /dev/null
@@ -1,68 +0,0 @@
-#ifndef _QmfEngineSequenceManager_
-#define _QmfEngineSequenceManager_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qpid/sys/Mutex.h"
-#include <boost/shared_ptr.hpp>
-#include <map>
-
-namespace qpid {
- namespace framing {
- class Buffer;
- }
-}
-
-namespace qmf {
-namespace engine {
-
- class SequenceContext {
- public:
- typedef boost::shared_ptr<SequenceContext> Ptr;
- SequenceContext() {}
- virtual ~SequenceContext() {}
-
- virtual void reserve() = 0;
- virtual bool handleMessage(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer) = 0;
- virtual void release() = 0;
- };
-
- class SequenceManager {
- public:
- SequenceManager();
-
- void setUnsolicitedContext(SequenceContext::Ptr ctx);
- uint32_t reserve(SequenceContext::Ptr ctx = SequenceContext::Ptr());
- void release(uint32_t sequence);
- void releaseAll();
- void dispatch(uint8_t opcode, uint32_t sequence, const std::string& routingKey, qpid::framing::Buffer& buffer);
-
- private:
- mutable qpid::sys::Mutex lock;
- uint32_t nextSequence;
- SequenceContext::Ptr unsolicitedContext;
- std::map<uint32_t, SequenceContext::Ptr> contextMap;
- };
-
-}
-}
-
-#endif
-
diff --git a/qpid/cpp/src/qmf/engine/ValueImpl.cpp b/qpid/cpp/src/qmf/engine/ValueImpl.cpp
deleted file mode 100644
index f9ebbf5028..0000000000
--- a/qpid/cpp/src/qmf/engine/ValueImpl.cpp
+++ /dev/null
@@ -1,571 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/engine/ValueImpl.h"
-#include <qpid/framing/FieldValue.h>
-#include <qpid/framing/FieldTable.h>
-#include <qpid/framing/List.h>
-#include <qpid/log/Statement.h>
-
-using namespace std;
-using namespace qmf::engine;
-//using qpid::framing::Buffer;
-//using qpid::framing::FieldTable;
-//using qpid::framing::FieldValue;
-using namespace qpid::framing;
-
-ValueImpl::ValueImpl(Typecode t, Buffer& buf) : typecode(t)
-{
- uint64_t first;
- uint64_t second;
- FieldTable ft;
- List fl;
-
- switch (typecode) {
- case TYPE_UINT8 : value.u32 = (uint32_t) buf.getOctet(); break;
- case TYPE_UINT16 : value.u32 = (uint32_t) buf.getShort(); break;
- case TYPE_UINT32 : value.u32 = (uint32_t) buf.getLong(); break;
- case TYPE_UINT64 : value.u64 = buf.getLongLong(); break;
- case TYPE_SSTR : buf.getShortString(stringVal); break;
- case TYPE_LSTR : buf.getMediumString(stringVal); break;
- case TYPE_ABSTIME : value.s64 = buf.getLongLong(); break;
- case TYPE_DELTATIME : value.u64 = buf.getLongLong(); break;
- case TYPE_BOOL : value.boolVal = (buf.getOctet() != 0); break;
- case TYPE_FLOAT : value.floatVal = buf.getFloat(); break;
- case TYPE_DOUBLE : value.doubleVal = buf.getDouble(); break;
- case TYPE_INT8 : value.s32 = (int32_t) ((int8_t) buf.getOctet()); break;
- case TYPE_INT16 : value.s32 = (int32_t) ((int16_t) buf.getShort()); break;
- case TYPE_INT32 : value.s32 = (int32_t) buf.getLong(); break;
- case TYPE_INT64 : value.s64 = buf.getLongLong(); break;
- case TYPE_UUID : buf.getBin128(value.uuidVal); break;
- case TYPE_REF:
- first = buf.getLongLong();
- second = buf.getLongLong();
- refVal.impl->setValue(first, second);
- break;
-
- case TYPE_MAP:
- ft.decode(buf);
- initMap(ft);
- break;
-
- case TYPE_LIST:
- fl.decode(buf);
- initList(fl);
- break;
-
- case TYPE_ARRAY:
- case TYPE_OBJECT:
- default:
- break;
- }
-}
-
-ValueImpl::ValueImpl(Typecode t, Typecode at) : typecode(t), valid(false), arrayTypecode(at)
-{
-}
-
-ValueImpl::ValueImpl(Typecode t) : typecode(t)
-{
- ::memset(&value, 0, sizeof(value));
-}
-
-Value* ValueImpl::factory(Typecode t, Buffer& b)
-{
- ValueImpl* impl(new ValueImpl(t, b));
- return new Value(impl);
-}
-
-Value* ValueImpl::factory(Typecode t)
-{
- ValueImpl* impl(new ValueImpl(t));
- return new Value(impl);
-}
-
-ValueImpl::~ValueImpl()
-{
-}
-
-void ValueImpl::initMap(const FieldTable& ft)
-{
- for (FieldTable::ValueMap::const_iterator iter = ft.begin();
- iter != ft.end(); iter++) {
- const string& name(iter->first);
- const FieldValue& fvalue(*iter->second);
- uint8_t amqType = fvalue.getType();
-
- if (amqType == 0x32) {
- Value* subval(new Value(TYPE_UINT64));
- subval->setUint64(fvalue.get<int64_t>());
- insert(name.c_str(), subval);
- } else if ((amqType & 0xCF) == 0x02) {
- Value* subval(new Value(TYPE_UINT32));
- switch (amqType) {
- case 0x02 : subval->setUint(fvalue.get<int>()); break;
- case 0x12 : subval->setUint(fvalue.get<int>()); break;
- case 0x22 : subval->setUint(fvalue.get<int>()); break;
- }
- insert(name.c_str(), subval);
- } else if (amqType == 0x31) { // int64
- Value* subval(new Value(TYPE_INT64));
- subval->setInt64(fvalue.get<int64_t>());
- insert(name.c_str(), subval);
- } else if ((amqType & 0xCF) == 0x01) { // 0x01:int8, 0x11:int16, 0x21:int21
- Value* subval(new Value(TYPE_INT32));
- subval->setInt((int32_t)fvalue.get<int>());
- insert(name.c_str(), subval);
- } else if (amqType == 0x85 || amqType == 0x95) {
- Value* subval(new Value(TYPE_LSTR));
- subval->setString(fvalue.get<string>().c_str());
- insert(name.c_str(), subval);
- } else if (amqType == 0x23 || amqType == 0x33) {
- Value* subval(new Value(TYPE_DOUBLE));
- subval->setDouble(fvalue.get<double>());
- insert(name.c_str(), subval);
- } else if (amqType == 0xa8) {
- FieldTable subFt;
- bool valid = qpid::framing::getEncodedValue<FieldTable>(iter->second, subFt);
- if (valid) {
- Value* subval(new Value(TYPE_MAP));
- subval->impl->initMap(subFt);
- insert(name.c_str(), subval);
- }
- } else if (amqType == 0xa9) {
- List subList;
- bool valid = qpid::framing::getEncodedValue<List>(iter->second, subList);
- if (valid) {
- Value* subval(new Value(TYPE_LIST));
- subval->impl->initList(subList);
- insert(name.c_str(), subval);
- }
- } else if (amqType == 0x08) {
- Value* subval(new Value(TYPE_BOOL));
- subval->setBool(fvalue.get<int>() ? true : false);
- insert(name.c_str(), subval);
- } else {
- QPID_LOG(error, "Unable to decode unsupported AMQP typecode=" << amqType << " map index=" << name);
- }
- }
-}
-
-void ValueImpl::mapToFieldTable(FieldTable& ft) const
-{
- FieldTable subFt;
-
- for (map<string, Value>::const_iterator iter = mapVal.begin();
- iter != mapVal.end(); iter++) {
- const string& name(iter->first);
- const Value& subval(iter->second);
-
- switch (subval.getType()) {
- case TYPE_UINT8:
- case TYPE_UINT16:
- case TYPE_UINT32:
- ft.setUInt64(name, (uint64_t) subval.asUint());
- break;
- case TYPE_UINT64:
- case TYPE_DELTATIME:
- ft.setUInt64(name, subval.asUint64());
- break;
- case TYPE_SSTR:
- case TYPE_LSTR:
- ft.setString(name, subval.asString());
- break;
- case TYPE_INT64:
- case TYPE_ABSTIME:
- ft.setInt64(name, subval.asInt64());
- break;
- case TYPE_BOOL:
- ft.set(name, FieldTable::ValuePtr(new BoolValue(subval.asBool())));
- break;
- case TYPE_FLOAT:
- ft.setFloat(name, subval.asFloat());
- break;
- case TYPE_DOUBLE:
- ft.setDouble(name, subval.asDouble());
- break;
- case TYPE_INT8:
- case TYPE_INT16:
- case TYPE_INT32:
- ft.setInt(name, subval.asInt());
- break;
- case TYPE_MAP:
- subFt.clear();
- subval.impl->mapToFieldTable(subFt);
- ft.setTable(name, subFt);
- break;
- case TYPE_LIST:
- {
- List subList;
- subval.impl->listToFramingList(subList);
- ft.set(name,
- ::qpid::framing::FieldTable::ValuePtr(
- new ListValue(
- subList)));
- } break;
- case TYPE_ARRAY:
- case TYPE_OBJECT:
- case TYPE_UUID:
- case TYPE_REF:
- default:
- break;
- }
- }
- }
-
-
-void ValueImpl::initList(const List& fl)
-{
- for (List::const_iterator iter = fl.begin();
- iter != fl.end(); iter++) {
- const FieldValue& fvalue(*iter->get());
- uint8_t amqType = fvalue.getType();
-
- if (amqType == 0x32) {
- Value* subval(new Value(TYPE_UINT64));
- subval->setUint64(fvalue.get<int64_t>());
- appendToList(subval);
- } else if ((amqType & 0xCF) == 0x02) {
- Value* subval(new Value(TYPE_UINT32));
- switch (amqType) {
- case 0x02 : subval->setUint(fvalue.get<int>()); break; // uint8
- case 0x12 : subval->setUint(fvalue.get<int>()); break; // uint16
- case 0x22 : subval->setUint(fvalue.get<int>()); break; // uint32
- }
- appendToList(subval);
- } else if (amqType == 0x31) { // int64
- Value* subval(new Value(TYPE_INT64));
- subval->setInt64(fvalue.get<int64_t>());
- appendToList(subval);
- } else if ((amqType & 0xCF) == 0x01) { // 0x01:int8, 0x11:int16, 0x21:int32
- Value* subval(new Value(TYPE_INT32));
- subval->setInt((int32_t)fvalue.get<int>());
- appendToList(subval);
- } else if (amqType == 0x85 || amqType == 0x95) {
- Value* subval(new Value(TYPE_LSTR));
- subval->setString(fvalue.get<string>().c_str());
- appendToList(subval);
- } else if (amqType == 0x23 || amqType == 0x33) {
- Value* subval(new Value(TYPE_DOUBLE));
- subval->setDouble(fvalue.get<double>());
- appendToList(subval);
- } else if (amqType == 0xa8) {
- FieldTable subFt;
- bool valid = qpid::framing::getEncodedValue<FieldTable>(*iter, subFt);
- if (valid) {
- Value* subval(new Value(TYPE_MAP));
- subval->impl->initMap(subFt);
- appendToList(subval);
- }
- } else if (amqType == 0xa9) {
- List subList;
- bool valid = qpid::framing::getEncodedValue<List>(*iter, subList);
- if (valid) {
- Value *subVal(new Value(TYPE_LIST));
- subVal->impl->initList(subList);
- appendToList(subVal);
- }
- } else if (amqType == 0x08) {
- Value* subval(new Value(TYPE_BOOL));
- subval->setBool(fvalue.get<int>() ? true : false);
- appendToList(subval);
- } else {
- QPID_LOG(error, "Unable to decode unsupported AMQP typecode =" << amqType);
- }
- }
-}
-
-void ValueImpl::listToFramingList(List& fl) const
-{
- for (vector<Value>::const_iterator iter = vectorVal.begin();
- iter != vectorVal.end(); iter++) {
- const Value& subval(*iter);
-
- switch (subval.getType()) {
- case TYPE_UINT8:
- case TYPE_UINT16:
- case TYPE_UINT32:
- fl.push_back(List::ValuePtr(new Unsigned64Value((uint64_t) subval.asUint())));
- break;
- case TYPE_UINT64:
- case TYPE_DELTATIME:
- fl.push_back(List::ValuePtr(new Unsigned64Value(subval.asUint64())));
- break;
- case TYPE_SSTR:
- case TYPE_LSTR:
- fl.push_back(List::ValuePtr(new Str16Value(subval.asString())));
- break;
- case TYPE_INT64:
- case TYPE_ABSTIME:
- fl.push_back(List::ValuePtr(new Integer64Value(subval.asInt64())));
- break;
- case TYPE_BOOL:
- fl.push_back(List::ValuePtr(new BoolValue(subval.asBool() ? 1 : 0)));
- break;
- case TYPE_FLOAT:
- fl.push_back(List::ValuePtr(new FloatValue(subval.asFloat())));
- break;
- case TYPE_DOUBLE:
- fl.push_back(List::ValuePtr(new DoubleValue(subval.asDouble())));
- break;
- case TYPE_INT8:
- case TYPE_INT16:
- case TYPE_INT32:
- fl.push_back(List::ValuePtr(new IntegerValue(subval.asInt())));
- break;
- case TYPE_MAP:
- {
- FieldTable subFt;
- subval.impl->mapToFieldTable(subFt);
- fl.push_back(List::ValuePtr(new FieldTableValue(subFt)));
-
- } break;
- case TYPE_LIST:
- {
- List subList;
- subval.impl->listToFramingList(subList);
- fl.push_back(List::ValuePtr(new ListValue(subList)));
- } break;
-
- case TYPE_ARRAY:
- case TYPE_OBJECT:
- case TYPE_UUID:
- case TYPE_REF:
- default:
- break;
- }
- }
- }
-
-
-
-void ValueImpl::encode(Buffer& buf) const
-{
- FieldTable ft;
- List fl;
-
- switch (typecode) {
- case TYPE_UINT8 : buf.putOctet((uint8_t) value.u32); break;
- case TYPE_UINT16 : buf.putShort((uint16_t) value.u32); break;
- case TYPE_UINT32 : buf.putLong(value.u32); break;
- case TYPE_UINT64 : buf.putLongLong(value.u64); break;
- case TYPE_SSTR : buf.putShortString(stringVal); break;
- case TYPE_LSTR : buf.putMediumString(stringVal); break;
- case TYPE_ABSTIME : buf.putLongLong(value.s64); break;
- case TYPE_DELTATIME : buf.putLongLong(value.u64); break;
- case TYPE_BOOL : buf.putOctet(value.boolVal ? 1 : 0); break;
- case TYPE_FLOAT : buf.putFloat(value.floatVal); break;
- case TYPE_DOUBLE : buf.putDouble(value.doubleVal); break;
- case TYPE_INT8 : buf.putOctet((uint8_t) value.s32); break;
- case TYPE_INT16 : buf.putShort((uint16_t) value.s32); break;
- case TYPE_INT32 : buf.putLong(value.s32); break;
- case TYPE_INT64 : buf.putLongLong(value.s64); break;
- case TYPE_UUID : buf.putBin128(value.uuidVal); break;
- case TYPE_REF : refVal.impl->encode(buf); break;
- case TYPE_MAP:
- mapToFieldTable(ft);
- ft.encode(buf);
- break;
- case TYPE_LIST:
- listToFramingList(fl);
- fl.encode(buf);
- break;
-
- case TYPE_ARRAY:
- case TYPE_OBJECT:
- default:
- break;
- }
-}
-
-uint32_t ValueImpl::encodedSize() const
-{
- FieldTable ft;
- List fl;
-
- switch (typecode) {
- case TYPE_UINT8 :
- case TYPE_BOOL :
- case TYPE_INT8 : return 1;
-
- case TYPE_UINT16 :
- case TYPE_INT16 : return 2;
-
- case TYPE_UINT32 :
- case TYPE_INT32 :
- case TYPE_FLOAT : return 4;
-
- case TYPE_UINT64 :
- case TYPE_INT64 :
- case TYPE_DOUBLE :
- case TYPE_ABSTIME :
- case TYPE_DELTATIME : return 8;
-
- case TYPE_UUID :
- case TYPE_REF : return 16;
-
- case TYPE_SSTR : return 1 + stringVal.size();
- case TYPE_LSTR : return 2 + stringVal.size();
- case TYPE_MAP:
- mapToFieldTable(ft);
- return ft.encodedSize();
-
- case TYPE_LIST:
- listToFramingList(fl);
- return fl.encodedSize();
-
- case TYPE_ARRAY:
- case TYPE_OBJECT:
- default:
- break;
- }
-
- return 0;
-}
-
-bool ValueImpl::keyInMap(const char* key) const
-{
- return typecode == TYPE_MAP && mapVal.count(key) > 0;
-}
-
-Value* ValueImpl::byKey(const char* key)
-{
- if (keyInMap(key)) {
- map<string, Value>::iterator iter = mapVal.find(key);
- if (iter != mapVal.end())
- return &iter->second;
- }
- return 0;
-}
-
-const Value* ValueImpl::byKey(const char* key) const
-{
- if (keyInMap(key)) {
- map<string, Value>::const_iterator iter = mapVal.find(key);
- if (iter != mapVal.end())
- return &iter->second;
- }
- return 0;
-}
-
-void ValueImpl::deleteKey(const char* key)
-{
- mapVal.erase(key);
-}
-
-void ValueImpl::insert(const char* key, Value* val)
-{
- pair<string, Value> entry(key, *val);
- mapVal.insert(entry);
-}
-
-const char* ValueImpl::key(uint32_t idx) const
-{
- map<string, Value>::const_iterator iter = mapVal.begin();
- for (uint32_t i = 0; i < idx; i++) {
- if (iter == mapVal.end())
- break;
- iter++;
- }
-
- if (iter == mapVal.end())
- return 0;
- else
- return iter->first.c_str();
-}
-
-Value* ValueImpl::arrayItem(uint32_t)
-{
- return 0;
-}
-
-void ValueImpl::appendToArray(Value*)
-{
-}
-
-void ValueImpl::deleteArrayItem(uint32_t)
-{
-}
-
-
-//==================================================================
-// Wrappers
-//==================================================================
-
-Value::Value(const Value& from) : impl(new ValueImpl(*(from.impl))) {}
-Value::Value(Typecode t, Typecode at) : impl(new ValueImpl(t, at)) {}
-Value::Value(ValueImpl* i) : impl(i) {}
-Value::~Value() { delete impl;}
-
-Typecode Value::getType() const { return impl->getType(); }
-bool Value::isNull() const { return impl->isNull(); }
-void Value::setNull() { impl->setNull(); }
-bool Value::isObjectId() const { return impl->isObjectId(); }
-const ObjectId& Value::asObjectId() const { return impl->asObjectId(); }
-void Value::setObjectId(const ObjectId& oid) { impl->setObjectId(oid); }
-bool Value::isUint() const { return impl->isUint(); }
-uint32_t Value::asUint() const { return impl->asUint(); }
-void Value::setUint(uint32_t val) { impl->setUint(val); }
-bool Value::isInt() const { return impl->isInt(); }
-int32_t Value::asInt() const { return impl->asInt(); }
-void Value::setInt(int32_t val) { impl->setInt(val); }
-bool Value::isUint64() const { return impl->isUint64(); }
-uint64_t Value::asUint64() const { return impl->asUint64(); }
-void Value::setUint64(uint64_t val) { impl->setUint64(val); }
-bool Value::isInt64() const { return impl->isInt64(); }
-int64_t Value::asInt64() const { return impl->asInt64(); }
-void Value::setInt64(int64_t val) { impl->setInt64(val); }
-bool Value::isString() const { return impl->isString(); }
-const char* Value::asString() const { return impl->asString(); }
-void Value::setString(const char* val) { impl->setString(val); }
-bool Value::isBool() const { return impl->isBool(); }
-bool Value::asBool() const { return impl->asBool(); }
-void Value::setBool(bool val) { impl->setBool(val); }
-bool Value::isFloat() const { return impl->isFloat(); }
-float Value::asFloat() const { return impl->asFloat(); }
-void Value::setFloat(float val) { impl->setFloat(val); }
-bool Value::isDouble() const { return impl->isDouble(); }
-double Value::asDouble() const { return impl->asDouble(); }
-void Value::setDouble(double val) { impl->setDouble(val); }
-bool Value::isUuid() const { return impl->isUuid(); }
-const uint8_t* Value::asUuid() const { return impl->asUuid(); }
-void Value::setUuid(const uint8_t* val) { impl->setUuid(val); }
-bool Value::isObject() const { return impl->isObject(); }
-const Object* Value::asObject() const { return impl->asObject(); }
-void Value::setObject(Object* val) { impl->setObject(val); }
-bool Value::isMap() const { return impl->isMap(); }
-bool Value::keyInMap(const char* key) const { return impl->keyInMap(key); }
-Value* Value::byKey(const char* key) { return impl->byKey(key); }
-const Value* Value::byKey(const char* key) const { return impl->byKey(key); }
-void Value::deleteKey(const char* key) { impl->deleteKey(key); }
-void Value::insert(const char* key, Value* val) { impl->insert(key, val); }
-uint32_t Value::keyCount() const { return impl->keyCount(); }
-const char* Value::key(uint32_t idx) const { return impl->key(idx); }
-bool Value::isList() const { return impl->isList(); }
-uint32_t Value::listItemCount() const { return impl->listItemCount(); }
-Value* Value::listItem(uint32_t idx) { return impl->listItem(idx); }
-void Value::appendToList(Value* val) { impl->appendToList(val); }
-void Value::deleteListItem(uint32_t idx) { impl->deleteListItem(idx); }
-bool Value::isArray() const { return impl->isArray(); }
-Typecode Value::arrayType() const { return impl->arrayType(); }
-uint32_t Value::arrayItemCount() const { return impl->arrayItemCount(); }
-Value* Value::arrayItem(uint32_t idx) { return impl->arrayItem(idx); }
-void Value::appendToArray(Value* val) { impl->appendToArray(val); }
-void Value::deleteArrayItem(uint32_t idx) { impl->deleteArrayItem(idx); }
-
diff --git a/qpid/cpp/src/qmf/engine/ValueImpl.h b/qpid/cpp/src/qmf/engine/ValueImpl.h
deleted file mode 100644
index 8de8c5329f..0000000000
--- a/qpid/cpp/src/qmf/engine/ValueImpl.h
+++ /dev/null
@@ -1,166 +0,0 @@
-#ifndef _QmfEngineValueImpl_
-#define _QmfEngineValueImpl_
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <qmf/engine/Value.h>
-#include <qmf/engine/ObjectIdImpl.h>
-#include <qmf/engine/Object.h>
-#include <qpid/framing/Buffer.h>
-#include <string>
-#include <string.h>
-#include <map>
-#include <vector>
-#include <boost/shared_ptr.hpp>
-
-namespace qpid {
-namespace framing {
- class FieldTable;
- class List;
-}
-}
-
-namespace qmf {
-namespace engine {
-
- // TODO: set valid flag on all value settors
- // TODO: add a modified flag and accessors
-
- struct ValueImpl {
- const Typecode typecode;
- bool valid;
-
- ObjectId refVal;
- std::string stringVal;
- std::auto_ptr<Object> objectVal;
- std::map<std::string, Value> mapVal;
- std::vector<Value> vectorVal;
- Typecode arrayTypecode;
-
- union {
- uint32_t u32;
- uint64_t u64;
- int32_t s32;
- int64_t s64;
- bool boolVal;
- float floatVal;
- double doubleVal;
- uint8_t uuidVal[16];
- } value;
-
- ValueImpl(const ValueImpl& from) :
- typecode(from.typecode), valid(from.valid), refVal(from.refVal), stringVal(from.stringVal),
- objectVal(from.objectVal.get() ? new Object(*(from.objectVal)) : 0),
- mapVal(from.mapVal), vectorVal(from.vectorVal), arrayTypecode(from.arrayTypecode),
- value(from.value) {}
-
- ValueImpl(Typecode t, Typecode at);
- ValueImpl(Typecode t, qpid::framing::Buffer& b);
- ValueImpl(Typecode t);
- static Value* factory(Typecode t, qpid::framing::Buffer& b);
- static Value* factory(Typecode t);
- ~ValueImpl();
-
- void encode(qpid::framing::Buffer& b) const;
- uint32_t encodedSize() const;
-
- Typecode getType() const { return typecode; }
- bool isNull() const { return !valid; }
- void setNull() { valid = false; }
-
- bool isObjectId() const { return typecode == TYPE_REF; }
- const ObjectId& asObjectId() const { return refVal; }
- void setObjectId(const ObjectId& o) { refVal = o; } // TODO
-
- bool isUint() const { return typecode >= TYPE_UINT8 && typecode <= TYPE_UINT32; }
- uint32_t asUint() const { return value.u32; }
- void setUint(uint32_t val) { value.u32 = val; }
-
- bool isInt() const { return typecode >= TYPE_INT8 && typecode <= TYPE_INT32; }
- int32_t asInt() const { return value.s32; }
- void setInt(int32_t val) { value.s32 = val; }
-
- bool isUint64() const { return typecode == TYPE_UINT64 || typecode == TYPE_DELTATIME; }
- uint64_t asUint64() const { return value.u64; }
- void setUint64(uint64_t val) { value.u64 = val; }
-
- bool isInt64() const { return typecode == TYPE_INT64 || typecode == TYPE_ABSTIME; }
- int64_t asInt64() const { return value.s64; }
- void setInt64(int64_t val) { value.s64 = val; }
-
- bool isString() const { return typecode == TYPE_SSTR || typecode == TYPE_LSTR; }
- const char* asString() const { return stringVal.c_str(); }
- void setString(const char* val) { stringVal = val; }
-
- bool isBool() const { return typecode == TYPE_BOOL; }
- bool asBool() const { return value.boolVal; }
- void setBool(bool val) { value.boolVal = val; }
-
- bool isFloat() const { return typecode == TYPE_FLOAT; }
- float asFloat() const { return value.floatVal; }
- void setFloat(float val) { value.floatVal = val; }
-
- bool isDouble() const { return typecode == TYPE_DOUBLE; }
- double asDouble() const { return value.doubleVal; }
- void setDouble(double val) { value.doubleVal = val; }
-
- bool isUuid() const { return typecode == TYPE_UUID; }
- const uint8_t* asUuid() const { return value.uuidVal; }
- void setUuid(const uint8_t* val) { ::memcpy(value.uuidVal, val, 16); }
-
- bool isObject() const { return typecode == TYPE_OBJECT; }
- Object* asObject() const { return objectVal.get(); }
- void setObject(Object* val) { objectVal.reset(val); }
-
- bool isMap() const { return typecode == TYPE_MAP; }
- bool keyInMap(const char* key) const;
- Value* byKey(const char* key);
- const Value* byKey(const char* key) const;
- void deleteKey(const char* key);
- void insert(const char* key, Value* val);
- uint32_t keyCount() const { return mapVal.size(); }
- const char* key(uint32_t idx) const;
-
- bool isList() const { return typecode == TYPE_LIST; }
- uint32_t listItemCount() const { return vectorVal.size(); }
- Value* listItem(uint32_t idx) { return idx < listItemCount() ? &vectorVal[idx] : 0; }
- const Value* listItem(uint32_t idx) const { return idx < listItemCount() ? &vectorVal[idx] : 0; }
- void appendToList(Value* val) { vectorVal.push_back(*val); }
- void deleteListItem(uint32_t idx) { if (idx < listItemCount()) vectorVal.erase(vectorVal.begin()+idx); }
-
- bool isArray() const { return typecode == TYPE_ARRAY; }
- Typecode arrayType() const { return arrayTypecode; }
- uint32_t arrayItemCount() const { return 0; }
- Value* arrayItem(uint32_t idx);
- void appendToArray(Value* val);
- void deleteArrayItem(uint32_t idx);
-
- private:
- void mapToFieldTable(qpid::framing::FieldTable& ft) const;
- void initMap(const qpid::framing::FieldTable& ft);
-
- void listToFramingList(qpid::framing::List& fl) const;
- void initList(const qpid::framing::List& fl);
- };
-}
-}
-
-#endif
-
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
deleted file mode 100644
index a48789973a..0000000000
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ /dev/null
@@ -1,1434 +0,0 @@
-
-//
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//
-
-#include "qpid/management/Manageable.h"
-#include "qpid/management/ManagementObject.h"
-#include "qpid/log/Statement.h"
-#include "qpid/agent/ManagementAgentImpl.h"
-#include "qpid/amqp_0_10/Codecs.h"
-#include <list>
-#include <string.h>
-#include <stdlib.h>
-#include <sys/types.h>
-#include <iostream>
-#include <fstream>
-#include <boost/lexical_cast.hpp>
-
-namespace qpid {
-namespace management {
-
-using namespace qpid::client;
-using namespace qpid::framing;
-using namespace qpid::sys;
-using namespace std;
-using std::stringstream;
-using std::ofstream;
-using std::ifstream;
-using std::string;
-using std::endl;
-using qpid::types::Variant;
-using qpid::amqp_0_10::MapCodec;
-using qpid::amqp_0_10::ListCodec;
-
-namespace {
- qpid::sys::Mutex lock;
- bool disabled = false;
- ManagementAgent* agent = 0;
- int refCount = 0;
-
- const string defaultVendorName("vendor");
- const string defaultProductName("product");
-
- // Create a valid binding key substring by
- // replacing all '.' chars with '_'
- const string keyifyNameStr(const string& name)
- {
- string n2 = name;
-
- size_t pos = n2.find('.');
- while (pos != n2.npos) {
- n2.replace(pos, 1, "_");
- pos = n2.find('.', pos);
- }
- return n2;
- }
-}
-
-ManagementAgent::Singleton::Singleton(bool disableManagement)
-{
- sys::Mutex::ScopedLock _lock(lock);
- if (disableManagement && !disabled) {
- disabled = true;
- assert(refCount == 0); // can't disable after agent has been allocated
- }
- if (refCount == 0 && !disabled)
- agent = new ManagementAgentImpl();
- refCount++;
-}
-
-ManagementAgent::Singleton::~Singleton()
-{
- sys::Mutex::ScopedLock _lock(lock);
- refCount--;
- if (refCount == 0 && !disabled) {
- delete agent;
- agent = 0;
- }
-}
-
-ManagementAgent* ManagementAgent::Singleton::getInstance()
-{
- return agent;
-}
-
-const string ManagementAgentImpl::storeMagicNumber("MA02");
-
-ManagementAgentImpl::ManagementAgentImpl() :
- interval(10), extThread(false), pipeHandle(0), notifyCallback(0), notifyContext(0),
- notifyable(0), inCallback(false),
- initialized(false), connected(false), useMapMsg(false), lastFailure("never connected"),
- topicExchange("qmf.default.topic"), directExchange("qmf.default.direct"),
- schemaTimestamp(Duration(EPOCH, now())),
- publishAllData(true), requestedBrokerBank(0), requestedAgentBank(0),
- assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0),
- maxV2ReplyObjs(10), // KAG todo: make this a tuneable parameter
- connThreadBody(*this), connThread(connThreadBody),
- pubThreadBody(*this), pubThread(pubThreadBody)
-{
-}
-
-ManagementAgentImpl::~ManagementAgentImpl()
-{
- // shutdown & cleanup all threads
- connThreadBody.close();
- pubThreadBody.close();
-
- connThread.join();
- pubThread.join();
-
- if (pipeHandle) {
- delete pipeHandle;
- pipeHandle = 0;
- }
-}
-
-void ManagementAgentImpl::setName(const string& vendor, const string& product, const string& instance)
-{
- if (vendor.find(':') != vendor.npos) {
- throw Exception("vendor string cannot contain a ':' character.");
- }
- if (product.find(':') != product.npos) {
- throw Exception("product string cannot contain a ':' character.");
- }
-
- attrMap["_vendor"] = vendor;
- attrMap["_product"] = product;
- if (!instance.empty()) {
- attrMap["_instance"] = instance;
- }
-}
-
-
-void ManagementAgentImpl::getName(string& vendor, string& product, string& instance)
-{
- vendor = std::string(attrMap["_vendor"]);
- product = std::string(attrMap["_product"]);
- instance = std::string(attrMap["_instance"]);
-}
-
-
-const std::string& ManagementAgentImpl::getAddress()
-{
- return name_address;
-}
-
-
-void ManagementAgentImpl::init(const string& brokerHost,
- uint16_t brokerPort,
- uint16_t intervalSeconds,
- bool useExternalThread,
- const string& _storeFile,
- const string& uid,
- const string& pwd,
- const string& mech,
- const string& proto)
-{
- management::ConnectionSettings settings;
- settings.protocol = proto;
- settings.host = brokerHost;
- settings.port = brokerPort;
- settings.username = uid;
- settings.password = pwd;
- settings.mechanism = mech;
- settings.heartbeat = 10;
- init(settings, intervalSeconds, useExternalThread, _storeFile);
-}
-
-void ManagementAgentImpl::init(const qpid::management::ConnectionSettings& settings,
- uint16_t intervalSeconds,
- bool useExternalThread,
- const string& _storeFile)
-{
- std::string cfgVendor, cfgProduct, cfgInstance;
-
- interval = intervalSeconds;
- extThread = useExternalThread;
- storeFile = _storeFile;
- nextObjectId = 1;
-
- //
- // Convert from management::ConnectionSettings to client::ConnectionSettings
- //
- connectionSettings.protocol = settings.protocol;
- connectionSettings.host = settings.host;
- connectionSettings.port = settings.port;
- connectionSettings.virtualhost = settings.virtualhost;
- connectionSettings.username = settings.username;
- connectionSettings.password = settings.password;
- connectionSettings.mechanism = settings.mechanism;
- connectionSettings.locale = settings.locale;
- connectionSettings.heartbeat = settings.heartbeat;
- connectionSettings.maxChannels = settings.maxChannels;
- connectionSettings.maxFrameSize = settings.maxFrameSize;
- connectionSettings.bounds = settings.bounds;
- connectionSettings.tcpNoDelay = settings.tcpNoDelay;
- connectionSettings.service = settings.service;
- connectionSettings.minSsf = settings.minSsf;
- connectionSettings.maxSsf = settings.maxSsf;
-
- retrieveData(cfgVendor, cfgProduct, cfgInstance);
-
- bootSequence++;
- if ((bootSequence & 0xF000) != 0)
- bootSequence = 1;
-
- // setup the agent's name. The name may be set via a call to setName(). If setName()
- // has not been called, the name can be read from the configuration file. If there is
- // no name in the configuration file, a unique default name is provided.
- if (attrMap.empty()) {
- // setName() never called by application, so use names retrieved from config, otherwise defaults.
- setName(cfgVendor.empty() ? defaultVendorName : cfgVendor,
- cfgProduct.empty() ? defaultProductName : cfgProduct,
- cfgInstance.empty() ? qpid::types::Uuid(true).str() : cfgInstance);
- } else if (attrMap.find("_instance") == attrMap.end()) {
- // setName() called, but instance was not specified, use config or generate a uuid
- setName(attrMap["_vendor"].asString(), attrMap["_product"].asString(),
- cfgInstance.empty() ? qpid::types::Uuid(true).str() : cfgInstance);
- }
-
- name_address = attrMap["_vendor"].asString() + ":" + attrMap["_product"].asString() + ":" + attrMap["_instance"].asString();
- vendorNameKey = keyifyNameStr(attrMap["_vendor"].asString());
- productNameKey = keyifyNameStr(attrMap["_product"].asString());
- instanceNameKey = keyifyNameStr(attrMap["_instance"].asString());
- attrMap["_name"] = name_address;
-
- storeData(true);
-
- QPID_LOG(info, "QMF Agent Initialized: broker=" << settings.host << ":" << settings.port <<
- " interval=" << intervalSeconds << " storeFile=" << _storeFile << " name=" << name_address);
-
- initialized = true;
-}
-
-void ManagementAgentImpl::registerClass(const string& packageName,
- const string& className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall)
-{
- sys::Mutex::ScopedLock lock(agentLock);
- PackageMap::iterator pIter = findOrAddPackage(packageName);
- addClassLocal(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall);
-}
-
-void ManagementAgentImpl::registerEvent(const string& packageName,
- const string& eventName,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall)
-{
- sys::Mutex::ScopedLock lock(agentLock);
- PackageMap::iterator pIter = findOrAddPackage(packageName);
- addClassLocal(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall);
-}
-
-// old-style add object: 64bit id - deprecated
-ObjectId ManagementAgentImpl::addObject(ManagementObject* object,
- uint64_t persistId)
-{
- std::string key;
- if (persistId) {
- key = boost::lexical_cast<std::string>(persistId);
- }
- return addObject(object, key, persistId != 0);
-}
-
-
-// new style add object - use this approach!
-ObjectId ManagementAgentImpl::addObject(ManagementObject* object,
- const std::string& key,
- bool persistent)
-{
- sys::Mutex::ScopedLock lock(addLock);
-
- uint16_t sequence = persistent ? 0 : bootSequence;
-
- ObjectId objectId(&attachment, 0, sequence);
- if (key.empty())
- objectId.setV2Key(*object); // let object generate the key
- else
- objectId.setV2Key(key);
- objectId.setAgentName(name_address);
-
- object->setObjectId(objectId);
- newManagementObjects[objectId] = boost::shared_ptr<ManagementObject>(object);
- return objectId;
-}
-
-
-void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t severity)
-{
- static const std::string severityStr[] = {
- "emerg", "alert", "crit", "error", "warn",
- "note", "info", "debug"
- };
- string content;
- stringstream key;
- Variant::Map headers;
-
- {
- sys::Mutex::ScopedLock lock(agentLock);
- Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
- uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
-
- // key << "console.event." << assignedBrokerBank << "." << assignedAgentBank << "." <<
- // event.getPackageName() << "." << event.getEventName();
- key << "agent.ind.event." << keyifyNameStr(event.getPackageName())
- << "." << keyifyNameStr(event.getEventName())
- << "." << severityStr[sev]
- << "." << vendorNameKey
- << "." << productNameKey
- << "." << instanceNameKey;
-
- Variant::Map map_;
- Variant::Map schemaId;
- Variant::Map values;
-
- map_["_schema_id"] = mapEncodeSchemaId(event.getPackageName(),
- event.getEventName(),
- event.getMd5Sum(),
- ManagementItem::CLASS_KIND_EVENT);
- event.mapEncode(values);
- map_["_values"] = values;
- map_["_timestamp"] = uint64_t(Duration(EPOCH, now()));
- map_["_severity"] = sev;
-
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_data_indication";
- headers["qmf.content"] = "_event";
- headers["qmf.agent"] = name_address;
-
- Variant::List list;
- list.push_back(map_);
- ListCodec::encode(list, content);
- }
-
- connThreadBody.sendBuffer(content, "", headers, topicExchange, key.str(), "amqp/list");
-}
-
-uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
-{
- sys::Mutex::ScopedLock lock(agentLock);
-
- if (inCallback) {
- QPID_LOG(critical, "pollCallbacks invoked from the agent's thread!");
- return 0;
- }
-
- for (uint32_t idx = 0; callLimit == 0 || idx < callLimit; idx++) {
- if (methodQueue.empty())
- break;
-
- QueuedMethod* item = methodQueue.front();
- methodQueue.pop_front();
- {
- sys::Mutex::ScopedUnlock unlock(agentLock);
- invokeMethodRequest(item->body, item->cid, item->replyToExchange, item->replyToKey, item->userId);
- delete item;
- }
- }
-
- if (pipeHandle != 0) {
- char rbuf[100];
- while (pipeHandle->read(rbuf, 100) > 0) ; // Consume all signaling bytes
- }
- return methodQueue.size();
-}
-
-int ManagementAgentImpl::getSignalFd()
-{
- if (extThread) {
- if (pipeHandle == 0)
- pipeHandle = new PipeHandle(true);
- return pipeHandle->getReadHandle();
- }
-
- return -1;
-}
-
-void ManagementAgentImpl::setSignalCallback(cb_t callback, void* context)
-{
- sys::Mutex::ScopedLock lock(agentLock);
- notifyCallback = callback;
- notifyContext = context;
-}
-
-void ManagementAgentImpl::setSignalCallback(Notifyable& _notifyable)
-{
- sys::Mutex::ScopedLock lock(agentLock);
- notifyable = &_notifyable;
-}
-
-void ManagementAgentImpl::startProtocol()
-{
- sendHeartbeat();
- {
- sys::Mutex::ScopedLock lock(agentLock);
- publishAllData = true;
- }
-}
-
-void ManagementAgentImpl::storeData(bool requested)
-{
- if (!storeFile.empty()) {
- ofstream outFile(storeFile.c_str());
- uint32_t brokerBankToWrite = requested ? requestedBrokerBank : assignedBrokerBank;
- uint32_t agentBankToWrite = requested ? requestedAgentBank : assignedAgentBank;
-
- if (outFile.good()) {
- outFile << storeMagicNumber << " " << brokerBankToWrite << " " <<
- agentBankToWrite << " " << bootSequence << endl;
-
- if (attrMap.find("_vendor") != attrMap.end())
- outFile << "vendor=" << attrMap["_vendor"] << endl;
- if (attrMap.find("_product") != attrMap.end())
- outFile << "product=" << attrMap["_product"] << endl;
- if (attrMap.find("_instance") != attrMap.end())
- outFile << "instance=" << attrMap["_instance"] << endl;
-
- outFile.close();
- }
- }
-}
-
-void ManagementAgentImpl::retrieveData(std::string& vendor, std::string& product, std::string& inst)
-{
- vendor.clear();
- product.clear();
- inst.clear();
-
- if (!storeFile.empty()) {
- ifstream inFile(storeFile.c_str());
- string mn;
-
- if (inFile.good()) {
- inFile >> mn;
- if (mn == storeMagicNumber) {
- std::string inText;
-
- inFile >> requestedBrokerBank;
- inFile >> requestedAgentBank;
- inFile >> bootSequence;
-
- while (inFile.good()) {
- std::getline(inFile, inText);
- if (!inText.compare(0, 7, "vendor=")) {
- vendor = inText.substr(7);
- QPID_LOG(debug, "read vendor name [" << vendor << "] from configuration file.");
- } else if (!inText.compare(0, 8, "product=")) {
- product = inText.substr(8);
- QPID_LOG(debug, "read product name [" << product << "] from configuration file.");
- } else if (!inText.compare(0, 9, "instance=")) {
- inst = inText.substr(9);
- QPID_LOG(debug, "read instance name [" << inst << "] from configuration file.");
- }
- }
- }
- inFile.close();
- }
- }
-}
-
-void ManagementAgentImpl::sendHeartbeat()
-{
- static const string addr_key_base("agent.ind.heartbeat.");
-
- Variant::Map map;
- Variant::Map headers;
- string content;
- std::stringstream addr_key;
-
- addr_key << addr_key_base << vendorNameKey
- << "." << productNameKey
- << "." << instanceNameKey;
-
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_agent_heartbeat_indication";
- headers["qmf.agent"] = name_address;
-
- getHeartbeatContent(map);
- MapCodec::encode(map, content);
-
- // Set TTL (in msecs) on outgoing heartbeat indications based on the interval
- // time to prevent stale heartbeats from getting to the consoles.
-
- connThreadBody.sendBuffer(content, "", headers, topicExchange, addr_key.str(),
- "amqp/map", interval * 2 * 1000);
-
- QPID_LOG(trace, "SENT AgentHeartbeat name=" << name_address);
-}
-
-void ManagementAgentImpl::sendException(const string& rte, const string& rtk, const string& cid,
- const string& text, uint32_t code)
-{
- Variant::Map map;
- Variant::Map headers;
- Variant::Map values;
- string content;
-
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_exception";
- headers["qmf.agent"] = name_address;
-
- values["error_code"] = code;
- values["error_text"] = text;
- map["_values"] = values;
-
- MapCodec::encode(map, content);
- connThreadBody.sendBuffer(content, cid, headers, rte, rtk);
-
- QPID_LOG(trace, "SENT Exception code=" << code <<" text=" << text);
-}
-
-void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& rte, const string& rtk)
-{
- string packageName;
- SchemaClassKey key;
- uint32_t outLen(0);
- char localBuffer[MA_BUFFER_SIZE];
- Buffer outBuffer(localBuffer, MA_BUFFER_SIZE);
- bool found(false);
-
- inBuffer.getShortString(packageName);
- inBuffer.getShortString(key.name);
- inBuffer.getBin128(key.hash);
-
- QPID_LOG(trace, "RCVD SchemaRequest: package=" << packageName << " class=" << key.name);
-
- {
- sys::Mutex::ScopedLock lock(agentLock);
- PackageMap::iterator pIter = packages.find(packageName);
- if (pIter != packages.end()) {
- ClassMap& cMap = pIter->second;
- ClassMap::iterator cIter = cMap.find(key);
- if (cIter != cMap.end()) {
- SchemaClass& schema = cIter->second;
- string body;
-
- encodeHeader(outBuffer, 's', sequence);
- schema.writeSchemaCall(body);
- outBuffer.putRawData(body);
- outLen = MA_BUFFER_SIZE - outBuffer.available();
- outBuffer.reset();
- found = true;
- }
- }
- }
-
- if (found) {
- connThreadBody.sendBuffer(outBuffer, outLen, rte, rtk);
- QPID_LOG(trace, "SENT SchemaInd: package=" << packageName << " class=" << key.name);
- }
-}
-
-void ManagementAgentImpl::handleConsoleAddedIndication()
-{
- sys::Mutex::ScopedLock lock(agentLock);
- publishAllData = true;
-
- QPID_LOG(trace, "RCVD ConsoleAddedInd");
-}
-
-void ManagementAgentImpl::invokeMethodRequest(const string& body, const string& cid, const string& rte, const string& rtk, const string& userId)
-{
- string methodName;
- bool failed = false;
- Variant::Map inMap;
- Variant::Map outMap;
- Variant::Map::const_iterator oid, mid;
- string content;
-
- MapCodec::decode(body, inMap);
-
- if ((oid = inMap.find("_object_id")) == inMap.end() ||
- (mid = inMap.find("_method_name")) == inMap.end()) {
- sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_PARAMETER_INVALID),
- Manageable::STATUS_PARAMETER_INVALID);
- failed = true;
- } else {
- string methodName;
- ObjectId objId;
- Variant::Map inArgs;
- Variant::Map callMap;
-
- try {
- // conversions will throw if input is invalid.
- objId = ObjectId(oid->second.asMap());
- methodName = mid->second.getString();
-
- mid = inMap.find("_arguments");
- if (mid != inMap.end()) {
- inArgs = (mid->second).asMap();
- }
-
- QPID_LOG(trace, "Invoking Method: name=" << methodName << " args=" << inArgs);
-
- boost::shared_ptr<ManagementObject> oPtr;
- {
- sys::Mutex::ScopedLock lock(agentLock);
- ObjectMap::iterator iter = managementObjects.find(objId);
- if (iter != managementObjects.end() && !iter->second->isDeleted())
- oPtr = iter->second;
- }
-
- if (oPtr.get() == 0) {
- sendException(rte, rtk, cid, Manageable::StatusText(Manageable::STATUS_UNKNOWN_OBJECT),
- Manageable::STATUS_UNKNOWN_OBJECT);
- failed = true;
- } else {
- oPtr->doMethod(methodName, inArgs, callMap, userId);
-
- if (callMap["_status_code"].asUint32() == 0) {
- outMap["_arguments"] = Variant::Map();
- for (Variant::Map::const_iterator iter = callMap.begin();
- iter != callMap.end(); iter++)
- if (iter->first != "_status_code" && iter->first != "_status_text")
- outMap["_arguments"].asMap()[iter->first] = iter->second;
- } else {
- sendException(rte, rtk, cid, callMap["_status_text"], callMap["_status_code"]);
- failed = true;
- }
- }
-
- } catch(types::InvalidConversion& e) {
- sendException(rte, rtk, cid, e.what(), Manageable::STATUS_EXCEPTION);
- failed = true;
- }
- }
-
- if (!failed) {
- Variant::Map headers;
- headers["method"] = "response";
- headers["qmf.agent"] = name_address;
- headers["qmf.opcode"] = "_method_response";
- QPID_LOG(trace, "SENT MethodResponse map=" << outMap);
- MapCodec::encode(outMap, content);
- connThreadBody.sendBuffer(content, cid, headers, rte, rtk);
- }
-}
-
-void ManagementAgentImpl::handleGetQuery(const string& body, const string& cid, const string& rte, const string& rtk)
-{
- {
- sys::Mutex::ScopedLock lock(agentLock);
- moveNewObjectsLH(lock);
- }
-
- Variant::Map inMap;
- Variant::Map::const_iterator i;
- Variant::Map headers;
-
- MapCodec::decode(body, inMap);
- QPID_LOG(trace, "RCVD GetQuery: map=" << inMap << " cid=" << cid);
-
- headers["method"] = "response";
- headers["qmf.opcode"] = "_query_response";
- headers["qmf.agent"] = name_address;
- headers["partial"] = Variant();
-
- Variant::List list_;
- Variant::Map map_;
- Variant::Map values;
- Variant::Map oidMap;
- string content;
-
- /*
- * Unpack the _what element of the query. Currently we only support OBJECT queries.
- */
- i = inMap.find("_what");
- if (i == inMap.end()) {
- sendException(rte, rtk, cid, "_what element missing in Query");
- return;
- }
-
- if (i->second.getType() != qpid::types::VAR_STRING) {
- sendException(rte, rtk, cid, "_what element is not a string");
- return;
- }
-
- if (i->second.asString() == "OBJECT") {
- headers["qmf.content"] = "_data";
- /*
- * Unpack the _object_id element of the query if it is present. If it is present, find that one
- * object and return it. If it is not present, send a class-based result.
- */
- i = inMap.find("_object_id");
- if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) {
- ObjectId objId(i->second.asMap());
- boost::shared_ptr<ManagementObject> object;
-
- {
- sys::Mutex::ScopedLock lock(agentLock);
- ObjectMap::iterator iter = managementObjects.find(objId);
- if (iter != managementObjects.end())
- object = iter->second;
- }
-
- if (object.get() != 0) {
- if (object->getConfigChanged() || object->getInstChanged())
- object->setUpdateTime();
-
- object->mapEncodeValues(values, true, true); // write both stats and properties
- objId.mapEncode(oidMap);
- map_["_values"] = values;
- map_["_object_id"] = oidMap;
- object->writeTimestamps(map_);
- map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
- object->getClassName(),
- object->getMd5Sum());
- list_.push_back(map_);
- headers.erase("partial");
-
- ListCodec::encode(list_, content);
- connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list");
- QPID_LOG(trace, "SENT QueryResponse (query by object_id) to=" << rte << "/" << rtk);
- return;
- }
- } else { // match using schema_id, if supplied
-
- string className;
- string packageName;
-
- i = inMap.find("_schema_id");
- if (i != inMap.end() && i->second.getType() == qpid::types::VAR_MAP) {
- const Variant::Map& schemaIdMap(i->second.asMap());
-
- Variant::Map::const_iterator s_iter = schemaIdMap.find("_class_name");
- if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING)
- className = s_iter->second.asString();
-
- s_iter = schemaIdMap.find("_package_name");
- if (s_iter != schemaIdMap.end() && s_iter->second.getType() == qpid::types::VAR_STRING)
- packageName = s_iter->second.asString();
-
- typedef list<boost::shared_ptr<ManagementObject> > StageList;
- StageList staging;
-
- {
- sys::Mutex::ScopedLock lock(agentLock);
- for (ObjectMap::iterator iter = managementObjects.begin();
- iter != managementObjects.end();
- iter++) {
- ManagementObject* object = iter->second.get();
- if (object->getClassName() == className &&
- (packageName.empty() || object->getPackageName() == packageName))
- staging.push_back(iter->second);
- }
- }
-
- unsigned int objCount = 0;
- for (StageList::iterator iter = staging.begin(); iter != staging.end(); iter++) {
- ManagementObject* object = iter->get();
- if (object->getClassName() == className &&
- (packageName.empty() || object->getPackageName() == packageName)) {
-
- values.clear();
- oidMap.clear();
- map_.clear();
-
- if (object->getConfigChanged() || object->getInstChanged())
- object->setUpdateTime();
-
- object->mapEncodeValues(values, true, true); // write both stats and properties
- object->getObjectId().mapEncode(oidMap);
- map_["_values"] = values;
- map_["_object_id"] = oidMap;
- object->writeTimestamps(map_);
- map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
- object->getClassName(),
- object->getMd5Sum());
- list_.push_back(map_);
-
- if (++objCount >= maxV2ReplyObjs) {
- objCount = 0;
- ListCodec::encode(list_, content);
- connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list");
- QPID_LOG(trace, "SENT QueryResponse (query by schema_id) to=" << rte << "/" << rtk);
- content.clear();
- list_.clear();
- }
- }
- }
- }
- }
-
- // Send last "non-partial" message to indicate CommandComplete
- headers.erase("partial");
- ListCodec::encode(list_, content);
- connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list");
- QPID_LOG(trace, "SENT QueryResponse (last message, no 'partial' indicator) to=" << rte << "/" << rtk);
-
- } else if (i->second.asString() == "SCHEMA_ID") {
- headers["qmf.content"] = "_schema_id";
- /**
- * @todo - support for a predicate. For now, send a list of all known schema class keys.
- */
- for (PackageMap::iterator pIter = packages.begin();
- pIter != packages.end(); pIter++) {
- for (ClassMap::iterator cIter = pIter->second.begin();
- cIter != pIter->second.end(); cIter++) {
-
- list_.push_back(mapEncodeSchemaId( pIter->first,
- cIter->first.name,
- cIter->first.hash,
- cIter->second.kind ));
- }
- }
-
- headers.erase("partial");
- ListCodec::encode(list_, content);
- connThreadBody.sendBuffer(content, cid, headers, rte, rtk, "amqp/list");
- QPID_LOG(trace, "SENT QueryResponse (SchemaId) to=" << rte << "/" << rtk);
-
- } else {
- // Unknown query target
- sendException(rte, rtk, cid, "Query for _what => '" + i->second.asString() + "' not supported");
- }
-}
-
-void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, const string& rte, const string& rtk)
-{
- QPID_LOG(trace, "RCVD AgentLocateRequest");
-
- Variant::Map map;
- Variant::Map headers;
- string content;
-
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_agent_locate_response";
- headers["qmf.agent"] = name_address;
-
- getHeartbeatContent(map);
- MapCodec::encode(map, content);
- connThreadBody.sendBuffer(content, cid, headers, rte, rtk);
-
- QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << rte << "/" << rtk);
-
- {
- sys::Mutex::ScopedLock lock(agentLock);
- publishAllData = true;
- }
-}
-
-void ManagementAgentImpl::handleMethodRequest(const string& body, const string& cid, const string& rte, const string& rtk, const string& userId)
-{
- if (extThread) {
- sys::Mutex::ScopedLock lock(agentLock);
-
- methodQueue.push_back(new QueuedMethod(cid, rte, rtk, body, userId));
- if (pipeHandle != 0) {
- pipeHandle->write("X", 1);
- } else if (notifyable != 0) {
- inCallback = true;
- {
- sys::Mutex::ScopedUnlock unlock(agentLock);
- notifyable->notify();
- }
- inCallback = false;
- } else if (notifyCallback != 0) {
- inCallback = true;
- {
- sys::Mutex::ScopedUnlock unlock(agentLock);
- notifyCallback(notifyContext);
- }
- inCallback = false;
- }
- } else {
- invokeMethodRequest(body, cid, rte, rtk, userId);
- }
-
- QPID_LOG(trace, "RCVD MethodRequest");
-}
-
-void ManagementAgentImpl::received(Message& msg)
-{
- string replyToExchange;
- string replyToKey;
- framing::MessageProperties mp = msg.getMessageProperties();
- if (mp.hasReplyTo()) {
- const framing::ReplyTo& rt = mp.getReplyTo();
- replyToExchange = rt.getExchange();
- replyToKey = rt.getRoutingKey();
- }
-
- string userId;
- if (mp.hasUserId())
- userId = mp.getUserId();
-
- if (mp.hasAppId() && mp.getAppId() == "qmf2")
- {
- string opcode = mp.getApplicationHeaders().getAsString("qmf.opcode");
- string cid = msg.getMessageProperties().getCorrelationId();
-
- if (opcode == "_agent_locate_request") handleLocateRequest(msg.getData(), cid, replyToExchange, replyToKey);
- else if (opcode == "_method_request") handleMethodRequest(msg.getData(), cid, replyToExchange, replyToKey, userId);
- else if (opcode == "_query_request") handleGetQuery(msg.getData(), cid, replyToExchange, replyToKey);
- else {
- QPID_LOG(warning, "Support for QMF V2 Opcode [" << opcode << "] TBD!!!");
- }
- return;
- }
-
- // old preV2 binary messages
-
- uint32_t sequence;
- string data = msg.getData();
- Buffer inBuffer(const_cast<char*>(data.c_str()), data.size());
- uint8_t opcode;
-
-
- if (checkHeader(inBuffer, &opcode, &sequence))
- {
- if (opcode == 'S') handleSchemaRequest(inBuffer, sequence, replyToExchange, replyToKey);
- else if (opcode == 'x') handleConsoleAddedIndication();
- else
- QPID_LOG(warning, "Ignoring old-format QMF Request! opcode=" << char(opcode));
- }
-}
-
-
-void ManagementAgentImpl::encodeHeader(Buffer& buf, uint8_t opcode, uint32_t seq)
-{
- buf.putOctet('A');
- buf.putOctet('M');
- buf.putOctet('2');
- buf.putOctet(opcode);
- buf.putLong (seq);
-}
-
-Variant::Map ManagementAgentImpl::mapEncodeSchemaId(const string& pname,
- const string& cname,
- const uint8_t *md5Sum,
- uint8_t type)
-{
- Variant::Map map_;
-
- map_["_package_name"] = pname;
- map_["_class_name"] = cname;
- map_["_hash"] = types::Uuid(md5Sum);
- if (type == ManagementItem::CLASS_KIND_EVENT)
- map_["_type"] = "_event";
- else
- map_["_type"] = "_data";
-
- return map_;
-}
-
-
-bool ManagementAgentImpl::checkHeader(Buffer& buf, uint8_t *opcode, uint32_t *seq)
-{
- if (buf.getSize() < 8)
- return false;
-
- uint8_t h1 = buf.getOctet();
- uint8_t h2 = buf.getOctet();
- uint8_t h3 = buf.getOctet();
-
- *opcode = buf.getOctet();
- *seq = buf.getLong();
-
- return h1 == 'A' && h2 == 'M' && h3 == '2';
-}
-
-ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::findOrAddPackage(const string& name)
-{
- PackageMap::iterator pIter = packages.find(name);
- if (pIter != packages.end())
- return pIter;
-
- // No such package found, create a new map entry.
- pair<PackageMap::iterator, bool> result =
- packages.insert(pair<string, ClassMap>(name, ClassMap()));
-
- return result.first;
-}
-
-// note well: caller must hold agentLock when calling this!
-void ManagementAgentImpl::moveNewObjectsLH(const sys::Mutex::ScopedLock& /*agentLock*/)
-{
- sys::Mutex::ScopedLock lock(addLock);
- ObjectMap::iterator newObj = newManagementObjects.begin();
- while (newObj != newManagementObjects.end()) {
- // before adding a new mgmt object, check for duplicates:
- ObjectMap::iterator oldObj = managementObjects.find(newObj->first);
- if (oldObj == managementObjects.end()) {
- managementObjects[newObj->first] = newObj->second;
- newManagementObjects.erase(newObj++); // post inc iterator safe!
- } else {
- // object exists with same object id. This may be legit, for example, when a
- // recently deleted object is re-added before the mgmt poll runs.
- if (newObj->second->isDeleted()) {
- // @TODO fixme: we missed an add-delete for the new object
- QPID_LOG(warning, "Mgmt Object deleted before update sent, oid=" << newObj->first);
- newManagementObjects.erase(newObj++); // post inc iterator safe!
- } else if (oldObj->second->isDeleted()) {
- // skip adding newObj, try again later once oldObj has been cleaned up by poll
- ++newObj;
- } else {
- // real bad - two objects exist with same OID. This is a bug in the application
- QPID_LOG(error, "Detected two Mgmt Objects using the same object id! oid=" << newObj->first
- << ", this is bad!");
- // what to do here? Can't erase an active obj - owner has a pointer to it.
- // for now I punt. Maybe the flood of log messages will get someone's attention :P
- ++newObj;
- }
- }
- }
-}
-
-void ManagementAgentImpl::addClassLocal(uint8_t classKind,
- PackageMap::iterator pIter,
- const string& className,
- uint8_t* md5Sum,
- ManagementObject::writeSchemaCall_t schemaCall)
-{
- SchemaClassKey key;
- ClassMap& cMap = pIter->second;
-
- key.name = className;
- memcpy(&key.hash, md5Sum, 16);
-
- ClassMap::iterator cIter = cMap.find(key);
- if (cIter != cMap.end())
- return;
-
- // No such class found, create a new class with local information.
- cMap.insert(pair<SchemaClassKey, SchemaClass>(key, SchemaClass(schemaCall, classKind)));
- schemaTimestamp = Duration(EPOCH, now());
- QPID_LOG(trace, "Updated schema timestamp, now=" << uint64_t(schemaTimestamp));
-}
-
-void ManagementAgentImpl::encodePackageIndication(Buffer& buf,
- PackageMap::iterator pIter)
-{
- buf.putShortString((*pIter).first);
-
- QPID_LOG(trace, "SENT PackageInd: package=" << (*pIter).first);
-}
-
-void ManagementAgentImpl::encodeClassIndication(Buffer& buf,
- PackageMap::iterator pIter,
- ClassMap::iterator cIter)
-{
- SchemaClassKey key = (*cIter).first;
-
- buf.putOctet((*cIter).second.kind);
- buf.putShortString((*pIter).first);
- buf.putShortString(key.name);
- buf.putBin128(key.hash);
-
- QPID_LOG(trace, "SENT ClassInd: package=" << (*pIter).first << " class=" << key.name);
-}
-
-struct MessageItem {
- string content;
- Variant::Map headers;
- string key;
- MessageItem(const Variant::Map& h, const string& k) : headers(h), key(k) {}
-};
-
-void ManagementAgentImpl::periodicProcessing()
-{
- string addr_key_base = "agent.ind.data.";
- list<ObjectId> deleteList;
- list<boost::shared_ptr<MessageItem> > message_list;
-
- sendHeartbeat();
-
- {
- sys::Mutex::ScopedLock lock(agentLock);
-
- if (!connected)
- return;
-
- moveNewObjectsLH(lock);
-
- //
- // Clear the been-here flag on all objects in the map.
- //
- for (ObjectMap::iterator iter = managementObjects.begin();
- iter != managementObjects.end();
- iter++) {
- ManagementObject* object = iter->second.get();
- object->setFlags(0);
- if (publishAllData) {
- object->setForcePublish(true);
- }
- }
-
- publishAllData = false;
-
- //
- // Process the entire object map.
- //
- uint32_t v2Objs = 0;
-
- for (ObjectMap::iterator baseIter = managementObjects.begin();
- baseIter != managementObjects.end();
- baseIter++) {
- ManagementObject* baseObject = baseIter->second.get();
-
- //
- // Skip until we find a base object requiring a sent message.
- //
- if (baseObject->getFlags() == 1 ||
- (!baseObject->getConfigChanged() &&
- !baseObject->getInstChanged() &&
- !baseObject->getForcePublish() &&
- !baseObject->isDeleted()))
- continue;
-
- std::string packageName = baseObject->getPackageName();
- std::string className = baseObject->getClassName();
-
- Variant::List list_;
- std::stringstream addr_key;
- Variant::Map headers;
-
- addr_key << addr_key_base;
- addr_key << keyifyNameStr(packageName)
- << "." << keyifyNameStr(className)
- << "." << vendorNameKey
- << "." << productNameKey
- << "." << instanceNameKey;
-
- headers["method"] = "indication";
- headers["qmf.opcode"] = "_data_indication";
- headers["qmf.content"] = "_data";
- headers["qmf.agent"] = name_address;
-
- for (ObjectMap::iterator iter = baseIter;
- iter != managementObjects.end();
- iter++) {
- ManagementObject* object = iter->second.get();
- bool send_stats, send_props;
- if (baseObject->isSameClass(*object) && object->getFlags() == 0) {
- object->setFlags(1);
- if (object->getConfigChanged() || object->getInstChanged())
- object->setUpdateTime();
-
- send_props = (object->getConfigChanged() || object->getForcePublish() || object->isDeleted());
- send_stats = (object->hasInst() && (object->getInstChanged() || object->getForcePublish()));
-
- if (send_stats || send_props) {
- Variant::Map map_;
- Variant::Map values;
- Variant::Map oid;
-
- object->getObjectId().mapEncode(oid);
- map_["_object_id"] = oid;
- map_["_schema_id"] = mapEncodeSchemaId(object->getPackageName(),
- object->getClassName(),
- object->getMd5Sum());
- object->writeTimestamps(map_);
- object->mapEncodeValues(values, send_props, send_stats);
- map_["_values"] = values;
- list_.push_back(map_);
-
- if (++v2Objs >= maxV2ReplyObjs) {
- v2Objs = 0;
- boost::shared_ptr<MessageItem> item(new MessageItem(headers, addr_key.str()));
- ListCodec::encode(list_, item->content);
- message_list.push_back(item);
- list_.clear();
- }
- }
-
- if (object->isDeleted())
- deleteList.push_back(iter->first);
- object->setForcePublish(false);
- }
- }
-
- if (!list_.empty()) {
- boost::shared_ptr<MessageItem> item(new MessageItem(headers, addr_key.str()));
- ListCodec::encode(list_, item->content);
- message_list.push_back(item);
- }
- }
-
- // Delete flagged objects
- for (list<ObjectId>::reverse_iterator iter = deleteList.rbegin();
- iter != deleteList.rend();
- iter++)
- managementObjects.erase(*iter);
- }
-
- while (!message_list.empty()) {
- boost::shared_ptr<MessageItem> item(message_list.front());
- message_list.pop_front();
- connThreadBody.sendBuffer(item->content, "", item->headers, topicExchange, item->key, "amqp/list");
- QPID_LOG(trace, "SENT DataIndication");
- }
-}
-
-
-void ManagementAgentImpl::getHeartbeatContent(qpid::types::Variant::Map& map)
-{
- map["_values"] = attrMap;
- map["_values"].asMap()["_timestamp"] = uint64_t(Duration(EPOCH, now()));
- map["_values"].asMap()["_heartbeat_interval"] = interval;
- map["_values"].asMap()["_epoch"] = bootSequence;
- map["_values"].asMap()["_schema_updated"] = uint64_t(schemaTimestamp);
-}
-
-void ManagementAgentImpl::ConnectionThread::run()
-{
- static const int delayMin(1);
- static const int delayMax(128);
- static const int delayFactor(2);
- int delay(delayMin);
- string dest("qmfagent");
- ConnectionThread::shared_ptr tmp;
-
- sessionId.generate();
- queueName << "qmfagent-" << sessionId;
-
- while (true) {
- try {
- if (agent.initialized) {
- QPID_LOG(debug, "QMF Agent attempting to connect to the broker...");
- connection.open(agent.connectionSettings);
- session = connection.newSession(queueName.str());
- subscriptions.reset(new client::SubscriptionManager(session));
-
- session.queueDeclare(arg::queue=queueName.str(), arg::autoDelete=true,
- arg::exclusive=true);
- session.exchangeBind(arg::exchange="amq.direct", arg::queue=queueName.str(),
- arg::bindingKey=queueName.str());
- session.exchangeBind(arg::exchange=agent.directExchange, arg::queue=queueName.str(),
- arg::bindingKey=agent.name_address);
- session.exchangeBind(arg::exchange=agent.topicExchange, arg::queue=queueName.str(),
- arg::bindingKey="console.#");
-
- subscriptions->subscribe(agent, queueName.str(), dest);
- QPID_LOG(info, "Connection established with broker");
- {
- sys::Mutex::ScopedLock _lock(connLock);
- if (shutdown)
- return;
- operational = true;
- agent.connected = true;
- agent.startProtocol();
- try {
- sys::Mutex::ScopedUnlock _unlock(connLock);
- subscriptions->run();
- } catch (exception) {}
-
- QPID_LOG(warning, "Connection to the broker has been lost");
-
- operational = false;
- agent.connected = false;
- tmp = subscriptions;
- subscriptions.reset();
- }
- tmp.reset(); // frees the subscription outside the lock
- delay = delayMin;
- connection.close();
- }
- } catch (exception &e) {
- if (delay < delayMax)
- delay *= delayFactor;
- QPID_LOG(debug, "Connection failed: exception=" << e.what());
- }
-
- {
- // sleep for "delay" seconds, but peridically check if the
- // agent is shutting down so we don't hang for up to delayMax
- // seconds during agent shutdown
- sys::Mutex::ScopedLock _lock(connLock);
- if (shutdown)
- return;
- sleeping = true;
- int totalSleep = 0;
- do {
- sys::Mutex::ScopedUnlock _unlock(connLock);
- qpid::sys::sleep(delayMin);
- totalSleep += delayMin;
- } while (totalSleep < delay && !shutdown);
- sleeping = false;
- if (shutdown)
- return;
- }
- }
-}
-
-ManagementAgentImpl::ConnectionThread::~ConnectionThread()
-{
-}
-
-void ManagementAgentImpl::ConnectionThread::sendBuffer(Buffer& buf,
- uint32_t length,
- const string& exchange,
- const string& routingKey)
-{
- Message msg;
- string data;
-
- buf.getRawData(data, length);
- msg.setData(data);
- sendMessage(msg, exchange, routingKey);
-}
-
-
-
-void ManagementAgentImpl::ConnectionThread::sendBuffer(const string& data,
- const string& cid,
- const Variant::Map headers,
- const string& exchange,
- const string& routingKey,
- const string& contentType,
- uint64_t ttl_msec)
-{
- Message msg;
- Variant::Map::const_iterator i;
-
- if (!cid.empty())
- msg.getMessageProperties().setCorrelationId(cid);
-
- if (!contentType.empty())
- msg.getMessageProperties().setContentType(contentType);
-
- if (ttl_msec)
- msg.getDeliveryProperties().setTtl(ttl_msec);
-
- for (i = headers.begin(); i != headers.end(); ++i) {
- msg.getHeaders().setString(i->first, i->second.asString());
- }
-
- msg.setData(data);
- sendMessage(msg, exchange, routingKey);
-}
-
-
-
-
-
-void ManagementAgentImpl::ConnectionThread::sendMessage(Message msg,
- const string& exchange,
- const string& routingKey)
-{
- ConnectionThread::shared_ptr s;
- {
- sys::Mutex::ScopedLock _lock(connLock);
- if (!operational)
- return;
- s = subscriptions;
- }
-
- msg.getDeliveryProperties().setRoutingKey(routingKey);
- msg.getMessageProperties().setReplyTo(ReplyTo("amq.direct", queueName.str()));
- msg.getMessageProperties().getApplicationHeaders().setString("qmf.agent", agent.name_address);
- msg.getMessageProperties().setAppId("qmf2");
- try {
- session.messageTransfer(arg::content=msg, arg::destination=exchange);
- } catch(exception& e) {
- QPID_LOG(error, "Exception caught in sendMessage: " << e.what());
- // Bounce the connection
- if (s)
- s->stop();
- }
-}
-
-
-
-void ManagementAgentImpl::ConnectionThread::bindToBank(uint32_t brokerBank, uint32_t agentBank)
-{
- stringstream key;
- key << "agent." << brokerBank << "." << agentBank;
- session.exchangeBind(arg::exchange="qpid.management", arg::queue=queueName.str(),
- arg::bindingKey=key.str());
-}
-
-void ManagementAgentImpl::ConnectionThread::close()
-{
- ConnectionThread::shared_ptr s;
- {
- sys::Mutex::ScopedLock _lock(connLock);
- shutdown = true;
- s = subscriptions;
- }
- if (s)
- s->stop();
-}
-
-bool ManagementAgentImpl::ConnectionThread::isSleeping() const
-{
- sys::Mutex::ScopedLock _lock(connLock);
- return sleeping;
-}
-
-
-void ManagementAgentImpl::PublishThread::run()
-{
- uint16_t totalSleep;
- uint16_t sleepTime;
-
- while (!shutdown) {
- agent.periodicProcessing();
- totalSleep = 0;
-
- //
- // Calculate a sleep time that is no greater than 5 seconds and
- // no less than 1 second.
- //
- sleepTime = agent.getInterval();
- if (sleepTime > 5)
- sleepTime = 5;
- else if (sleepTime == 0)
- sleepTime = 1;
-
- while (totalSleep < agent.getInterval() && !shutdown) {
- qpid::sys::sleep(sleepTime);
- totalSleep += sleepTime;
- }
- }
-}
-
-}}
-
diff --git a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h b/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
deleted file mode 100644
index 4c97bc89da..0000000000
--- a/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h
+++ /dev/null
@@ -1,294 +0,0 @@
-#ifndef _qpid_agent_ManagementAgentImpl_
-#define _qpid_agent_ManagementAgentImpl_
-
-//
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-//
-
-#include "qpid/agent/ManagementAgent.h"
-#include "qpid/client/Connection.h"
-#include "qpid/client/ConnectionSettings.h"
-#include "qpid/client/SubscriptionManager.h"
-#include "qpid/client/Session.h"
-#include "qpid/client/AsyncSession.h"
-#include "qpid/client/Message.h"
-#include "qpid/client/MessageListener.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/sys/Mutex.h"
-#include "qpid/sys/PipeHandle.h"
-#include "qpid/sys/Time.h"
-#include "qpid/framing/Uuid.h"
-#include <iostream>
-#include <sstream>
-#include <deque>
-
-namespace qpid {
-namespace management {
-
-class ManagementAgentImpl : public ManagementAgent, public client::MessageListener
-{
- public:
-
- ManagementAgentImpl();
- virtual ~ManagementAgentImpl();
-
- //
- // Methods from ManagementAgent
- //
- int getMaxThreads() { return 1; }
- void setName(const std::string& vendor,
- const std::string& product,
- const std::string& instance="");
- void getName(std::string& vendor, std::string& product, std::string& instance);
- const std::string& getAddress();
- void init(const std::string& brokerHost = "localhost",
- uint16_t brokerPort = 5672,
- uint16_t intervalSeconds = 10,
- bool useExternalThread = false,
- const std::string& storeFile = "",
- const std::string& uid = "",
- const std::string& pwd = "",
- const std::string& mech = "PLAIN",
- const std::string& proto = "tcp");
- void init(const management::ConnectionSettings& settings,
- uint16_t intervalSeconds = 10,
- bool useExternalThread = false,
- const std::string& storeFile = "");
- bool isConnected() { return connected; }
- std::string& getLastFailure() { return lastFailure; }
- void registerClass(const std::string& packageName,
- const std::string& className,
- uint8_t* md5Sum,
- management::ManagementObject::writeSchemaCall_t schemaCall);
- void registerEvent(const std::string& packageName,
- const std::string& eventName,
- uint8_t* md5Sum,
- management::ManagementObject::writeSchemaCall_t schemaCall);
- ObjectId addObject(management::ManagementObject* objectPtr, uint64_t persistId = 0);
- ObjectId addObject(management::ManagementObject* objectPtr, const std::string& key,
- bool persistent);
- void raiseEvent(const management::ManagementEvent& event, severity_t severity = SEV_DEFAULT);
- uint32_t pollCallbacks(uint32_t callLimit = 0);
- int getSignalFd();
- void setSignalCallback(cb_t callback, void* context);
- void setSignalCallback(Notifyable& n);
-
- uint16_t getInterval() { return interval; }
- void periodicProcessing();
-
- uint16_t getBootSequence(void) { return bootSequence; }
- void setBootSequence(uint16_t b) { bootSequence = b; }
-
- private:
-
- struct SchemaClassKey {
- std::string name;
- uint8_t hash[16];
- };
-
- struct SchemaClassKeyComp {
- bool operator() (const SchemaClassKey& lhs, const SchemaClassKey& rhs) const
- {
- if (lhs.name != rhs.name)
- return lhs.name < rhs.name;
- else
- for (int i = 0; i < 16; i++)
- if (lhs.hash[i] != rhs.hash[i])
- return lhs.hash[i] < rhs.hash[i];
- return false;
- }
- };
-
- struct SchemaClass {
- management::ManagementObject::writeSchemaCall_t writeSchemaCall;
- uint8_t kind;
-
- SchemaClass(const management::ManagementObject::writeSchemaCall_t call,
- const uint8_t _kind) : writeSchemaCall(call), kind(_kind) {}
- };
-
- struct QueuedMethod {
- QueuedMethod(const std::string& _cid, const std::string& _rte, const std::string& _rtk, const std::string& _body, const std::string& _uid) :
- cid(_cid), replyToExchange(_rte), replyToKey(_rtk), body(_body), userId(_uid) {}
-
- std::string cid;
- std::string replyToExchange;
- std::string replyToKey;
- std::string body;
- std::string userId;
- };
-
- typedef std::deque<QueuedMethod*> MethodQueue;
- typedef std::map<SchemaClassKey, SchemaClass, SchemaClassKeyComp> ClassMap;
- typedef std::map<std::string, ClassMap> PackageMap;
-
- PackageMap packages;
- AgentAttachment attachment;
-
- typedef std::map<ObjectId, boost::shared_ptr<ManagementObject> > ObjectMap;
-
- ObjectMap managementObjects;
- ObjectMap newManagementObjects;
- MethodQueue methodQueue;
-
- void received (client::Message& msg);
-
- qpid::types::Variant::Map attrMap;
- std::string name_address;
- std::string vendorNameKey; // vendor name with "." --> "_"
- std::string productNameKey; // product name with "." --> "_"
- std::string instanceNameKey; // agent instance with "." --> "_"
- uint16_t interval;
- bool extThread;
- sys::PipeHandle* pipeHandle;
- uint64_t nextObjectId;
- cb_t notifyCallback;
- void* notifyContext;
- Notifyable* notifyable;
- bool inCallback;
- std::string storeFile;
- sys::Mutex agentLock;
- sys::Mutex addLock;
- framing::Uuid systemId;
- client::ConnectionSettings connectionSettings;
- bool initialized;
- bool connected;
- bool useMapMsg;
- std::string lastFailure;
- std::string topicExchange;
- std::string directExchange;
- qpid::sys::Duration schemaTimestamp;
-
- bool publishAllData;
- uint32_t requestedBrokerBank;
- uint32_t requestedAgentBank;
- uint32_t assignedBrokerBank;
- uint32_t assignedAgentBank;
- uint16_t bootSequence;
-
- // Maximum # of objects allowed in a single V2 response
- // message.
- uint32_t maxV2ReplyObjs;
-
- static const uint8_t DEBUG_OFF = 0;
- static const uint8_t DEBUG_CONN = 1;
- static const uint8_t DEBUG_PROTO = 2;
- static const uint8_t DEBUG_PUBLISH = 3;
-
-# define MA_BUFFER_SIZE 65536
- char outputBuffer[MA_BUFFER_SIZE];
- char eventBuffer[MA_BUFFER_SIZE];
-
- friend class ConnectionThread;
- class ConnectionThread : public sys::Runnable
- {
- typedef boost::shared_ptr<client::SubscriptionManager> shared_ptr;
-
- bool operational;
- ManagementAgentImpl& agent;
- framing::Uuid sessionId;
- client::Connection connection;
- client::Session session;
- ConnectionThread::shared_ptr subscriptions;
- std::stringstream queueName;
- mutable sys::Mutex connLock;
- bool shutdown;
- bool sleeping;
- void run();
- public:
- ConnectionThread(ManagementAgentImpl& _agent) :
- operational(false), agent(_agent),
- shutdown(false), sleeping(false) {}
- ~ConnectionThread();
- void sendBuffer(qpid::framing::Buffer& buf,
- uint32_t length,
- const std::string& exchange,
- const std::string& routingKey);
- void sendBuffer(const std::string& data,
- const std::string& cid,
- const qpid::types::Variant::Map headers,
- const std::string& exchange,
- const std::string& routingKey,
- const std::string& contentType="amqp/map",
- uint64_t ttl_msec=0);
- void sendMessage(qpid::client::Message msg,
- const std::string& exchange,
- const std::string& routingKey);
- void bindToBank(uint32_t brokerBank, uint32_t agentBank);
- void close();
- bool isSleeping() const;
- };
-
- class PublishThread : public sys::Runnable
- {
- ManagementAgentImpl& agent;
- void run();
- bool shutdown;
- public:
- PublishThread(ManagementAgentImpl& _agent) :
- agent(_agent), shutdown(false) {}
- void close() { shutdown = true; }
- };
-
- ConnectionThread connThreadBody;
- sys::Thread connThread;
- PublishThread pubThreadBody;
- sys::Thread pubThread;
-
- static const std::string storeMagicNumber;
-
- void startProtocol();
- void storeData(bool requested=false);
- void retrieveData(std::string& vendor, std::string& product, std::string& inst);
- PackageMap::iterator findOrAddPackage(const std::string& name);
- void moveNewObjectsLH(const sys::Mutex::ScopedLock& agentLock);
- void addClassLocal (uint8_t classKind,
- PackageMap::iterator pIter,
- const std::string& className,
- uint8_t* md5Sum,
- management::ManagementObject::writeSchemaCall_t schemaCall);
- void encodePackageIndication (framing::Buffer& buf,
- PackageMap::iterator pIter);
- void encodeClassIndication (framing::Buffer& buf,
- PackageMap::iterator pIter,
- ClassMap::iterator cIter);
- void encodeHeader (framing::Buffer& buf, uint8_t opcode, uint32_t seq = 0);
- qpid::types::Variant::Map mapEncodeSchemaId(const std::string& pname,
- const std::string& cname,
- const uint8_t *md5Sum,
- uint8_t type=ManagementItem::CLASS_KIND_TABLE);
- bool checkHeader (framing::Buffer& buf, uint8_t *opcode, uint32_t *seq);
- void sendHeartbeat();
- void sendException(const std::string& replyToExchange, const std::string& replyToKey, const std::string& cid,
- const std::string& text, uint32_t code=1);
- void handlePackageRequest (qpid::framing::Buffer& inBuffer);
- void handleClassQuery (qpid::framing::Buffer& inBuffer);
- void handleSchemaRequest (qpid::framing::Buffer& inBuffer, uint32_t sequence, const std::string& rte, const std::string& rtk);
- void invokeMethodRequest (const std::string& body, const std::string& cid, const std::string& rte, const std::string& rtk, const std::string& userId);
-
- void handleGetQuery (const std::string& body, const std::string& cid, const std::string& rte, const std::string& rtk);
- void handleLocateRequest (const std::string& body, const std::string& sequence, const std::string& rte, const std::string& rtk);
- void handleMethodRequest (const std::string& body, const std::string& sequence, const std::string& rte, const std::string& rtk, const std::string& userId);
- void handleConsoleAddedIndication();
- void getHeartbeatContent (qpid::types::Variant::Map& map);
-};
-
-}}
-
-#endif /*!_qpid_agent_ManagementAgentImpl_*/
diff --git a/qpid/cpp/src/tests/testagent.cpp b/qpid/cpp/src/tests/testagent.cpp
deleted file mode 100644
index e6010a8e00..0000000000
--- a/qpid/cpp/src/tests/testagent.cpp
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <qpid/management/Manageable.h>
-#include <qpid/management/ManagementObject.h>
-#include <qpid/agent/ManagementAgent.h>
-#include <qpid/sys/Mutex.h>
-#include <qpid/sys/Time.h>
-#include "qmf/org/apache/qpid/agent/example/Parent.h"
-#include "qmf/org/apache/qpid/agent/example/Child.h"
-#include "qmf/org/apache/qpid/agent/example/ArgsParentCreate_child.h"
-#include "qmf/org/apache/qpid/agent/example/EventChildCreated.h"
-#include "qmf/org/apache/qpid/agent/example/Package.h"
-
-#include <signal.h>
-#include <cstdlib>
-#include <iostream>
-
-#include <sstream>
-
-namespace qpid {
-namespace tests {
-
-static bool running = true;
-
-using std::string;
-using qpid::management::ManagementAgent;
-using qpid::management::ManagementObject;
-using qpid::management::Manageable;
-using qpid::management::Args;
-using qpid::sys::Mutex;
-namespace _qmf = qmf::org::apache::qpid::agent::example;
-
-class ChildClass;
-
-//==============================================================
-// CoreClass is the operational class that corresponds to the
-// "Parent" class in the management schema.
-//==============================================================
-class CoreClass : public Manageable
-{
- string name;
- ManagementAgent* agent;
- _qmf::Parent* mgmtObject;
- std::vector<ChildClass*> children;
- Mutex vectorLock;
-
-public:
-
- CoreClass(ManagementAgent* agent, string _name);
- ~CoreClass() { mgmtObject->resourceDestroy(); }
-
- ManagementObject* GetManagementObject(void) const
- { return mgmtObject; }
-
- void doLoop();
- status_t ManagementMethod (uint32_t methodId, Args& args, string& text);
-};
-
-class ChildClass : public Manageable
-{
- string name;
- _qmf::Child* mgmtObject;
-
-public:
-
- ChildClass(ManagementAgent* agent, CoreClass* parent, string name);
- ~ChildClass() { mgmtObject->resourceDestroy(); }
-
- ManagementObject* GetManagementObject(void) const
- { return mgmtObject; }
-
- void doWork()
- {
- mgmtObject->inc_count(2);
- }
-};
-
-CoreClass::CoreClass(ManagementAgent* _agent, string _name) : name(_name), agent(_agent)
-{
- static uint64_t persistId = 0x111222333444555LL;
- mgmtObject = new _qmf::Parent(agent, this, name);
-
- agent->addObject(mgmtObject, persistId++);
- mgmtObject->set_state("IDLE");
-}
-
-void CoreClass::doLoop()
-{
- // Periodically bump a counter to provide a changing statistical value
- while (running) {
- qpid::sys::sleep(1);
- mgmtObject->inc_count();
- mgmtObject->set_state("IN_LOOP");
-
- {
- Mutex::ScopedLock _lock(vectorLock);
-
- for (std::vector<ChildClass*>::iterator iter = children.begin();
- iter != children.end();
- iter++) {
- (*iter)->doWork();
- }
- }
- }
-}
-
-Manageable::status_t CoreClass::ManagementMethod(uint32_t methodId, Args& args, string& /*text*/)
-{
- Mutex::ScopedLock _lock(vectorLock);
-
- switch (methodId) {
- case _qmf::Parent::METHOD_CREATE_CHILD:
- _qmf::ArgsParentCreate_child& ioArgs = (_qmf::ArgsParentCreate_child&) args;
-
- ChildClass *child = new ChildClass(agent, this, ioArgs.i_name);
- ioArgs.o_childRef = child->GetManagementObject()->getObjectId();
-
- children.push_back(child);
-
- agent->raiseEvent(_qmf::EventChildCreated(ioArgs.i_name));
-
- return STATUS_OK;
- }
-
- return STATUS_NOT_IMPLEMENTED;
-}
-
-ChildClass::ChildClass(ManagementAgent* agent, CoreClass* parent, string name)
-{
- mgmtObject = new _qmf::Child(agent, this, parent, name);
-
- agent->addObject(mgmtObject);
-}
-
-
-//==============================================================
-// Main program
-//==============================================================
-
-ManagementAgent::Singleton* singleton;
-
-void shutdown(int)
-{
- running = false;
-}
-
-int main_int(int argc, char** argv)
-{
- singleton = new ManagementAgent::Singleton();
- const char* host = argc>1 ? argv[1] : "127.0.0.1";
- int port = argc>2 ? atoi(argv[2]) : 5672;
-
- signal(SIGINT, shutdown);
-
- // Create the qmf management agent
- ManagementAgent* agent = singleton->getInstance();
-
- // Register the Qmf_example schema with the agent
- _qmf::Package packageInit(agent);
-
- // Start the agent. It will attempt to make a connection to the
- // management broker
- agent->init(host, port, 5, false, ".magentdata");
-
- // Allocate some core objects
- CoreClass core1(agent, "Example Core Object #1");
- CoreClass core2(agent, "Example Core Object #2");
- CoreClass core3(agent, "Example Core Object #3");
-
- core1.doLoop();
-
- // done, cleanup and exit
- delete singleton;
-
- return 0;
-}
-
-}} // namespace qpid::tests
-
-int main(int argc, char** argv)
-{
- try {
- return qpid::tests::main_int(argc, argv);
- } catch(std::exception& e) {
- std::cerr << "Top Level Exception: " << e.what() << std::endl;
- return 1;
- }
-}
-
diff --git a/qpid/cpp/src/tests/testagent.xml b/qpid/cpp/src/tests/testagent.xml
deleted file mode 100644
index 0b1436f999..0000000000
--- a/qpid/cpp/src/tests/testagent.xml
+++ /dev/null
@@ -1,64 +0,0 @@
-<schema package="org.apache.qpid.agent.example">
-
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing,
- software distributed under the License is distributed on an
- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- KIND, either express or implied. See the License for the
- specific language governing permissions and limitations
- under the License.
--->
-
- <!--
- ===============================================================
- Parent
- ===============================================================
- -->
- <class name="Parent">
-
- This class represents a parent object
-
- <property name="name" type="lstr" access="RC" index="y"/>
-
- <statistic name="state" type="sstr" desc="Operational state of the link"/>
- <statistic name="count" type="count64" unit="tick" desc="Counter that increases monotonically"/>
-
- <method name="create_child" desc="Create child object">
- <arg name="name" dir="I" type="lstr"/>
- <arg name="childRef" dir="O" type="objId"/>
- </method>
- </class>
-
-
- <!--
- ===============================================================
- Child
- ===============================================================
- -->
- <class name="Child">
- <property name="ParentRef" type="objId" references="Parent" access="RC" index="y" parentRef="y"/>
- <property name="name" type="lstr" access="RC" index="y"/>
-
- <statistic name="count" type="count64" unit="tick" desc="Counter that increases monotonically"/>
-
- <method name="delete"/>
- </class>
-
- <eventArguments>
- <arg name="childName" type="lstr"/>
- </eventArguments>
-
- <event name="ChildCreated" args="childName"/>
- <event name="ChildDestroyed" args="childName"/>
-</schema>
-