diff options
Diffstat (limited to 'qpid/cpp/src/qmf/engine/Agent.cpp')
-rw-r--r-- | qpid/cpp/src/qmf/engine/Agent.cpp | 145 |
1 files changed, 115 insertions, 30 deletions
diff --git a/qpid/cpp/src/qmf/engine/Agent.cpp b/qpid/cpp/src/qmf/engine/Agent.cpp index 04308b954a..453c6bd27b 100644 --- a/qpid/cpp/src/qmf/engine/Agent.cpp +++ b/qpid/cpp/src/qmf/engine/Agent.cpp @@ -19,7 +19,7 @@ #include "qmf/engine/Agent.h" #include "qmf/engine/SchemaImpl.h" -#include "qmf/engine/Data.h" +#include "qmf/engine/DataImpl.h" #include "qmf/engine/QueryImpl.h" #include "qmf/Protocol.h" #include <qpid/sys/Mutex.h> @@ -34,6 +34,7 @@ #include <qpid/messaging/Address.h> #include <qpid/messaging/Message.h> #include <qpid/messaging/MapContent.h> +#include <qpid/messaging/ListContent.h> #include <qpid/messaging/MapView.h> #include <qpid/messaging/ListView.h> #include <string> @@ -86,15 +87,27 @@ namespace engine { AsyncContext(const string& cid, const Address& rt) : correlationId(cid), replyTo(rt), schemaMethod(0) {} }; - class StoreThread : public boost::noncopyable, public qpid::sys::Runnable { + /** + * StoreThread is used only when the Agent runs in internal-store mode. + * This class keeps track of stored objects and can perform queries and + * subscription queries on the data. + */ + class StoreThread : public boost::noncopyable, public qpid::sys::Runnable, public DataManager { public: StoreThread(AgentImpl& a) : agent(a), running(true), thread(*this) {} - ~StoreThread() { - stop(); - } + ~StoreThread() { stop(); } + + void addObject(const Data& data); + + // Methods from Runnable void run(); void stop(); + // Methods from DataManager + void modifyStart(DataPtr data); + void modifyDone(DataPtr data); + void destroy(DataPtr data); + private: AgentImpl& agent; bool running; @@ -142,6 +155,7 @@ namespace engine { string directAddrParams; string topicAddr; string topicAddrParams; + string eventSendAddr; Variant::Map attrMap; string storeDir; string transferDir; @@ -204,7 +218,10 @@ namespace engine { void handleSubscribeRefresh(const Message& message); void handleMethodRequest(const Message& message); void sendResponse(const Message& request, const string& opcode, const Data& data); - void sendResponse(const Address& address, const string& correlationId, const string& opcode, const Data& data); + void send(const Address& address, const string& correlationId, const string& opcode, + const string& cType, const Data& data); + void send(const Address& address, const string& correlationId, const string& opcode, + const string& cType, const Variant::List& list, bool partial=false); void sendPackageIndicationLH(const string& packageName); void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key); @@ -243,6 +260,11 @@ AgentEvent AgentEventImpl::copy() return item; } +void StoreThread::addObject(const Data& data) +{ + DataPtr stored(new Data(data)); +} + void StoreThread::run() { while (running) { @@ -256,6 +278,30 @@ void StoreThread::stop() agent.signalInternal(); } +void StoreThread::modifyStart(DataPtr) +{ + // Algorithm: + // Make a copy of the indicated object as a delta base if there + // isn't already one in place. If there is, do nothing. +} + +void StoreThread::modifyDone(DataPtr) +{ + // Algorithm: + // If any deltas between the current and the stored base are discrete, + // send an immediate update. Otherwise, mark the object as modified. + // + // If an update is sent, delete the base copy. If not, leave the base copy + // in place for the later periodic update. +} + +void StoreThread::destroy(DataPtr) +{ + // Algorithm: + // Send an immediate full-update for this object with the delete time set. + // Remove the object and any copies from the data store. +} + AgentImpl::AgentImpl(const char* _v, const char* _p, const char* _n, const char* _d, bool _i) : vendor(_v), product(_p), name(_n), domain(_d ? _d : "default"), internalStore(_i), notifyHandler(0), notifiable(0), @@ -263,6 +309,7 @@ AgentImpl::AgentImpl(const char* _v, const char* _p, const char* _n, const char* { directAddr = "qmf." + domain + ".direct/" + vendor + ":" + product + ":" + name; topicAddr = "qmf." + domain + ".topic/console.ind.#"; + eventSendAddr = "qmf." + domain + ".topic/agent.event"; if (_d != 0) { directAddrParams = " {create: always, type: topic, x-properties: {type: direct}}"; topicAddrParams = " {create: always, type: topic, x-properties: {type: topic}}"; @@ -376,7 +423,7 @@ void AgentImpl::authAllow(uint32_t sequence) // Re-issue the now-authorized action. If this is a data query (get or subscribe), // and the agent is handling storage internally, redirect to the internal event // queue for processing by the internal-storage thread. - if (internalStore) { + if (internalStore && context->authorizedEvent->kind != AgentEvent::METHOD_CALL) { internalEventQueue.push_back(context->authorizedEvent); cond.notify(); } else { @@ -395,7 +442,7 @@ void AgentImpl::authDeny(uint32_t sequence, const Data& exception) contextMap.erase(iter); // Return an exception message to the requestor - sendResponse(context->replyTo, context->correlationId, Protocol::OP_EXCEPTION, exception); + send(context->replyTo, context->correlationId, Protocol::OP_EXCEPTION, Protocol::CONTENT_NONE, exception); } void AgentImpl::authDeny(uint32_t sequence, const string& error) @@ -419,29 +466,37 @@ void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text, c QPID_LOG(trace, "SENT MethodResponse corr=" << context->correlationId << " status=" << status << " text=" << text); } -void AgentImpl::queryResponse(uint32_t sequence, Data&) +void AgentImpl::queryResponse(uint32_t sequence, Data& data) { - Mutex::ScopedLock _lock(lock); - map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence); - if (iter == contextMap.end()) - return; - AsyncContext::Ptr context = iter->second; + AsyncContext::Ptr context; + { + Mutex::ScopedLock _lock(lock); + map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence); + if (iter == contextMap.end()) + return; + context = iter->second; + } - // TODO: accumulate data records and send response messages when we have "enough" + Variant::List list; + list.push_back(data.impl->asMap()); + send(context->replyTo, context->correlationId, Protocol::OP_QUERY_RESPONSE, Protocol::CONTENT_NONE, list, true); + QPID_LOG(trace, "SENT QueryResponse to=" << context->replyTo); } void AgentImpl::queryComplete(uint32_t sequence) { - Mutex::ScopedLock _lock(lock); - map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence); - if (iter == contextMap.end()) - return; - - // TODO: send a response message if there are any unsent data records + AsyncContext::Ptr context; + { + Mutex::ScopedLock _lock(lock); + map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence); + if (iter == contextMap.end()) + return; + context = iter->second; + contextMap.erase(iter); + } - AsyncContext::Ptr context = iter->second; - contextMap.erase(iter); - //sendCommandCompleteLH(context->exchange, context->key, context->sequence, 0, "OK"); + send(context->replyTo, context->correlationId, Protocol::OP_QUERY_RESPONSE, Protocol::CONTENT_NONE, Variant::List()); + QPID_LOG(trace, "SENT QueryResponse to=" << context->replyTo << " final response message"); } void AgentImpl::registerClass(SchemaClass* cls) @@ -463,13 +518,24 @@ void AgentImpl::registerClass(SchemaClass* cls) const char* AgentImpl::addObject(Data&, const char*) { + // TODO: Implement + // + // Determine a key for this object: + // if supplied, use the supplied key + // else: + // if the data is described (has a schema), use the schema primary-key to generate a key + // else make something up (a guid) + // + Mutex::ScopedLock _lock(lock); return 0; } -void AgentImpl::raiseEvent(Data&) +void AgentImpl::raiseEvent(Data& data) { - Mutex::ScopedLock _lock(lock); + Variant::List list; + list.push_back(data.impl->asMap()); + send(eventSendAddr, "", Protocol::OP_DATA_INDICATION, Protocol::CONTENT_EVENT, list); } void AgentImpl::run() @@ -601,16 +667,35 @@ void AgentImpl::handleMethodRequest(const Message& message) void AgentImpl::sendResponse(const Message& request, const string& opcode, const Data& data) { - sendResponse(request.getReplyTo(), request.getCorrelationId(), opcode, data); + send(request.getReplyTo(), request.getCorrelationId(), opcode, Protocol::CONTENT_NONE, data); +} + +void AgentImpl::send(const Address& address, const string& correlationId, const string& opcode, const string& cType, const Data& data) +{ + Message message; + MapContent content(message, data.impl->asMap()); + + if (!correlationId.empty()) + message.setCorrelationId(correlationId); + if (!cType.empty()) + message.getHeaders()[Protocol::APP_CONTENT] = cType; + message.getHeaders()[Protocol::APP_OPCODE] = opcode; + content.encode(); + session.createSender(address).send(message); } -void AgentImpl::sendResponse(const Address& address, const string& correlationId, const string& opcode, const Data& data) +void AgentImpl::send(const Address& address, const string& correlationId, const string& opcode, const string& cType, const Variant::List& list, bool partial) { Message message; - MapContent content(message, data.asMap()); + ListContent content(message, list); - message.setCorrelationId(correlationId); + if (!correlationId.empty()) + message.setCorrelationId(correlationId); + if (!cType.empty()) + message.getHeaders()[Protocol::APP_CONTENT] = cType; message.getHeaders()[Protocol::APP_OPCODE] = opcode; + if (partial) + message.getHeaders()[Protocol::APP_PARTIAL] = Variant(); content.encode(); session.createSender(address).send(message); } |