summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qmf/engine/Agent.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qmf/engine/Agent.cpp')
-rw-r--r--qpid/cpp/src/qmf/engine/Agent.cpp145
1 files changed, 115 insertions, 30 deletions
diff --git a/qpid/cpp/src/qmf/engine/Agent.cpp b/qpid/cpp/src/qmf/engine/Agent.cpp
index 04308b954a..453c6bd27b 100644
--- a/qpid/cpp/src/qmf/engine/Agent.cpp
+++ b/qpid/cpp/src/qmf/engine/Agent.cpp
@@ -19,7 +19,7 @@
#include "qmf/engine/Agent.h"
#include "qmf/engine/SchemaImpl.h"
-#include "qmf/engine/Data.h"
+#include "qmf/engine/DataImpl.h"
#include "qmf/engine/QueryImpl.h"
#include "qmf/Protocol.h"
#include <qpid/sys/Mutex.h>
@@ -34,6 +34,7 @@
#include <qpid/messaging/Address.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/MapContent.h>
+#include <qpid/messaging/ListContent.h>
#include <qpid/messaging/MapView.h>
#include <qpid/messaging/ListView.h>
#include <string>
@@ -86,15 +87,27 @@ namespace engine {
AsyncContext(const string& cid, const Address& rt) : correlationId(cid), replyTo(rt), schemaMethod(0) {}
};
- class StoreThread : public boost::noncopyable, public qpid::sys::Runnable {
+ /**
+ * StoreThread is used only when the Agent runs in internal-store mode.
+ * This class keeps track of stored objects and can perform queries and
+ * subscription queries on the data.
+ */
+ class StoreThread : public boost::noncopyable, public qpid::sys::Runnable, public DataManager {
public:
StoreThread(AgentImpl& a) : agent(a), running(true), thread(*this) {}
- ~StoreThread() {
- stop();
- }
+ ~StoreThread() { stop(); }
+
+ void addObject(const Data& data);
+
+ // Methods from Runnable
void run();
void stop();
+ // Methods from DataManager
+ void modifyStart(DataPtr data);
+ void modifyDone(DataPtr data);
+ void destroy(DataPtr data);
+
private:
AgentImpl& agent;
bool running;
@@ -142,6 +155,7 @@ namespace engine {
string directAddrParams;
string topicAddr;
string topicAddrParams;
+ string eventSendAddr;
Variant::Map attrMap;
string storeDir;
string transferDir;
@@ -204,7 +218,10 @@ namespace engine {
void handleSubscribeRefresh(const Message& message);
void handleMethodRequest(const Message& message);
void sendResponse(const Message& request, const string& opcode, const Data& data);
- void sendResponse(const Address& address, const string& correlationId, const string& opcode, const Data& data);
+ void send(const Address& address, const string& correlationId, const string& opcode,
+ const string& cType, const Data& data);
+ void send(const Address& address, const string& correlationId, const string& opcode,
+ const string& cType, const Variant::List& list, bool partial=false);
void sendPackageIndicationLH(const string& packageName);
void sendClassIndicationLH(ClassKind kind, const string& packageName, const AgentClassKey& key);
@@ -243,6 +260,11 @@ AgentEvent AgentEventImpl::copy()
return item;
}
+void StoreThread::addObject(const Data& data)
+{
+ DataPtr stored(new Data(data));
+}
+
void StoreThread::run()
{
while (running) {
@@ -256,6 +278,30 @@ void StoreThread::stop()
agent.signalInternal();
}
+void StoreThread::modifyStart(DataPtr)
+{
+ // Algorithm:
+ // Make a copy of the indicated object as a delta base if there
+ // isn't already one in place. If there is, do nothing.
+}
+
+void StoreThread::modifyDone(DataPtr)
+{
+ // Algorithm:
+ // If any deltas between the current and the stored base are discrete,
+ // send an immediate update. Otherwise, mark the object as modified.
+ //
+ // If an update is sent, delete the base copy. If not, leave the base copy
+ // in place for the later periodic update.
+}
+
+void StoreThread::destroy(DataPtr)
+{
+ // Algorithm:
+ // Send an immediate full-update for this object with the delete time set.
+ // Remove the object and any copies from the data store.
+}
+
AgentImpl::AgentImpl(const char* _v, const char* _p, const char* _n, const char* _d, bool _i) :
vendor(_v), product(_p), name(_n), domain(_d ? _d : "default"), internalStore(_i),
notifyHandler(0), notifiable(0),
@@ -263,6 +309,7 @@ AgentImpl::AgentImpl(const char* _v, const char* _p, const char* _n, const char*
{
directAddr = "qmf." + domain + ".direct/" + vendor + ":" + product + ":" + name;
topicAddr = "qmf." + domain + ".topic/console.ind.#";
+ eventSendAddr = "qmf." + domain + ".topic/agent.event";
if (_d != 0) {
directAddrParams = " {create: always, type: topic, x-properties: {type: direct}}";
topicAddrParams = " {create: always, type: topic, x-properties: {type: topic}}";
@@ -376,7 +423,7 @@ void AgentImpl::authAllow(uint32_t sequence)
// Re-issue the now-authorized action. If this is a data query (get or subscribe),
// and the agent is handling storage internally, redirect to the internal event
// queue for processing by the internal-storage thread.
- if (internalStore) {
+ if (internalStore && context->authorizedEvent->kind != AgentEvent::METHOD_CALL) {
internalEventQueue.push_back(context->authorizedEvent);
cond.notify();
} else {
@@ -395,7 +442,7 @@ void AgentImpl::authDeny(uint32_t sequence, const Data& exception)
contextMap.erase(iter);
// Return an exception message to the requestor
- sendResponse(context->replyTo, context->correlationId, Protocol::OP_EXCEPTION, exception);
+ send(context->replyTo, context->correlationId, Protocol::OP_EXCEPTION, Protocol::CONTENT_NONE, exception);
}
void AgentImpl::authDeny(uint32_t sequence, const string& error)
@@ -419,29 +466,37 @@ void AgentImpl::methodResponse(uint32_t sequence, uint32_t status, char* text, c
QPID_LOG(trace, "SENT MethodResponse corr=" << context->correlationId << " status=" << status << " text=" << text);
}
-void AgentImpl::queryResponse(uint32_t sequence, Data&)
+void AgentImpl::queryResponse(uint32_t sequence, Data& data)
{
- Mutex::ScopedLock _lock(lock);
- map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
- if (iter == contextMap.end())
- return;
- AsyncContext::Ptr context = iter->second;
+ AsyncContext::Ptr context;
+ {
+ Mutex::ScopedLock _lock(lock);
+ map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
+ if (iter == contextMap.end())
+ return;
+ context = iter->second;
+ }
- // TODO: accumulate data records and send response messages when we have "enough"
+ Variant::List list;
+ list.push_back(data.impl->asMap());
+ send(context->replyTo, context->correlationId, Protocol::OP_QUERY_RESPONSE, Protocol::CONTENT_NONE, list, true);
+ QPID_LOG(trace, "SENT QueryResponse to=" << context->replyTo);
}
void AgentImpl::queryComplete(uint32_t sequence)
{
- Mutex::ScopedLock _lock(lock);
- map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
- if (iter == contextMap.end())
- return;
-
- // TODO: send a response message if there are any unsent data records
+ AsyncContext::Ptr context;
+ {
+ Mutex::ScopedLock _lock(lock);
+ map<uint32_t, AsyncContext::Ptr>::iterator iter = contextMap.find(sequence);
+ if (iter == contextMap.end())
+ return;
+ context = iter->second;
+ contextMap.erase(iter);
+ }
- AsyncContext::Ptr context = iter->second;
- contextMap.erase(iter);
- //sendCommandCompleteLH(context->exchange, context->key, context->sequence, 0, "OK");
+ send(context->replyTo, context->correlationId, Protocol::OP_QUERY_RESPONSE, Protocol::CONTENT_NONE, Variant::List());
+ QPID_LOG(trace, "SENT QueryResponse to=" << context->replyTo << " final response message");
}
void AgentImpl::registerClass(SchemaClass* cls)
@@ -463,13 +518,24 @@ void AgentImpl::registerClass(SchemaClass* cls)
const char* AgentImpl::addObject(Data&, const char*)
{
+ // TODO: Implement
+ //
+ // Determine a key for this object:
+ // if supplied, use the supplied key
+ // else:
+ // if the data is described (has a schema), use the schema primary-key to generate a key
+ // else make something up (a guid)
+ //
+
Mutex::ScopedLock _lock(lock);
return 0;
}
-void AgentImpl::raiseEvent(Data&)
+void AgentImpl::raiseEvent(Data& data)
{
- Mutex::ScopedLock _lock(lock);
+ Variant::List list;
+ list.push_back(data.impl->asMap());
+ send(eventSendAddr, "", Protocol::OP_DATA_INDICATION, Protocol::CONTENT_EVENT, list);
}
void AgentImpl::run()
@@ -601,16 +667,35 @@ void AgentImpl::handleMethodRequest(const Message& message)
void AgentImpl::sendResponse(const Message& request, const string& opcode, const Data& data)
{
- sendResponse(request.getReplyTo(), request.getCorrelationId(), opcode, data);
+ send(request.getReplyTo(), request.getCorrelationId(), opcode, Protocol::CONTENT_NONE, data);
+}
+
+void AgentImpl::send(const Address& address, const string& correlationId, const string& opcode, const string& cType, const Data& data)
+{
+ Message message;
+ MapContent content(message, data.impl->asMap());
+
+ if (!correlationId.empty())
+ message.setCorrelationId(correlationId);
+ if (!cType.empty())
+ message.getHeaders()[Protocol::APP_CONTENT] = cType;
+ message.getHeaders()[Protocol::APP_OPCODE] = opcode;
+ content.encode();
+ session.createSender(address).send(message);
}
-void AgentImpl::sendResponse(const Address& address, const string& correlationId, const string& opcode, const Data& data)
+void AgentImpl::send(const Address& address, const string& correlationId, const string& opcode, const string& cType, const Variant::List& list, bool partial)
{
Message message;
- MapContent content(message, data.asMap());
+ ListContent content(message, list);
- message.setCorrelationId(correlationId);
+ if (!correlationId.empty())
+ message.setCorrelationId(correlationId);
+ if (!cType.empty())
+ message.getHeaders()[Protocol::APP_CONTENT] = cType;
message.getHeaders()[Protocol::APP_OPCODE] = opcode;
+ if (partial)
+ message.getHeaders()[Protocol::APP_PARTIAL] = Variant();
content.encode();
session.createSender(address).send(message);
}