summaryrefslogtreecommitdiff
path: root/cpp/src/qmf
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qmf')
-rw-r--r--cpp/src/qmf/Agent.cpp29
-rw-r--r--cpp/src/qmf/AgentImpl.h1
-rw-r--r--cpp/src/qmf/AgentSession.cpp229
-rw-r--r--cpp/src/qmf/AgentSessionImpl.h175
-rw-r--r--cpp/src/qmf/ConsoleSession.cpp125
-rw-r--r--cpp/src/qmf/ConsoleSessionImpl.h22
-rw-r--r--cpp/src/qmf/DataAddr.cpp6
-rw-r--r--cpp/src/qmf/DataAddrImpl.h4
-rw-r--r--cpp/src/qmf/EventNotifierImpl.cpp56
-rw-r--r--cpp/src/qmf/EventNotifierImpl.h48
-rw-r--r--cpp/src/qmf/PosixEventNotifier.cpp65
-rw-r--r--cpp/src/qmf/PosixEventNotifierImpl.cpp112
-rw-r--r--cpp/src/qmf/PosixEventNotifierImpl.h61
-rw-r--r--cpp/src/qmf/PrivateImplRef.h2
-rw-r--r--cpp/src/qmf/engine/ResilientConnection.cpp6
-rw-r--r--cpp/src/qmf/engine/SchemaImpl.cpp11
-rw-r--r--cpp/src/qmf/engine/SchemaImpl.h7
17 files changed, 199 insertions, 760 deletions
diff --git a/cpp/src/qmf/Agent.cpp b/cpp/src/qmf/Agent.cpp
index 684f8e4fba..915f2a1c88 100644
--- a/cpp/src/qmf/Agent.cpp
+++ b/cpp/src/qmf/Agent.cpp
@@ -72,7 +72,7 @@ Schema Agent::getSchema(const SchemaId& s, Duration t) { return impl->getSchema(
AgentImpl::AgentImpl(const std::string& n, uint32_t e, ConsoleSessionImpl& s) :
name(n), directSubject(n), epoch(e), session(s), touched(true), untouchedCount(0), capability(0),
- sender(session.directSender), schemaCache(s.schemaCache)
+ sender(session.directSender), nextCorrelator(1), schemaCache(s.schemaCache)
{
}
@@ -102,11 +102,12 @@ const Variant& AgentImpl::getAttribute(const string& k) const
ConsoleEvent AgentImpl::query(const Query& query, Duration timeout)
{
boost::shared_ptr<SyncContext> context(new SyncContext());
- uint32_t correlator(session.correlator());
+ uint32_t correlator;
ConsoleEvent result;
{
qpid::sys::Mutex::ScopedLock l(lock);
+ correlator = nextCorrelator++;
contextMap[correlator] = context;
}
try {
@@ -150,7 +151,12 @@ ConsoleEvent AgentImpl::query(const string& text, Duration timeout)
uint32_t AgentImpl::queryAsync(const Query& query)
{
- uint32_t correlator(session.correlator());
+ uint32_t correlator;
+
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ correlator = nextCorrelator++;
+ }
sendQuery(query, correlator);
return correlator;
@@ -166,11 +172,12 @@ uint32_t AgentImpl::queryAsync(const string& text)
ConsoleEvent AgentImpl::callMethod(const string& method, const Variant::Map& args, const DataAddr& addr, Duration timeout)
{
boost::shared_ptr<SyncContext> context(new SyncContext());
- uint32_t correlator(session.correlator());
+ uint32_t correlator;
ConsoleEvent result;
{
qpid::sys::Mutex::ScopedLock l(lock);
+ correlator = nextCorrelator++;
contextMap[correlator] = context;
}
try {
@@ -206,7 +213,12 @@ ConsoleEvent AgentImpl::callMethod(const string& method, const Variant::Map& arg
uint32_t AgentImpl::callMethodAsync(const string& method, const Variant::Map& args, const DataAddr& addr)
{
- uint32_t correlator(session.correlator());
+ uint32_t correlator;
+
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ correlator = nextCorrelator++;
+ }
sendMethod(method, args, addr, correlator);
return correlator;
@@ -584,7 +596,12 @@ void AgentImpl::sendMethod(const string& method, const Variant::Map& args, const
void AgentImpl::sendSchemaRequest(const SchemaId& id)
{
- uint32_t correlator(session.correlator());
+ uint32_t correlator;
+
+ {
+ qpid::sys::Mutex::ScopedLock l(lock);
+ correlator = nextCorrelator++;
+ }
if (capability >= AGENT_CAPABILITY_V2_SCHEMA) {
Query query(QUERY_SCHEMA, id);
diff --git a/cpp/src/qmf/AgentImpl.h b/cpp/src/qmf/AgentImpl.h
index 09754a3a7e..7fa4f4373a 100644
--- a/cpp/src/qmf/AgentImpl.h
+++ b/cpp/src/qmf/AgentImpl.h
@@ -99,6 +99,7 @@ namespace qmf {
uint32_t capability;
qpid::messaging::Sender sender;
qpid::types::Variant::Map attributes;
+ uint32_t nextCorrelator;
std::map<uint32_t, boost::shared_ptr<SyncContext> > contextMap;
boost::shared_ptr<SchemaCache> schemaCache;
mutable std::set<std::string> packageSet;
diff --git a/cpp/src/qmf/AgentSession.cpp b/cpp/src/qmf/AgentSession.cpp
index 251c25fd44..4c5a72a467 100644
--- a/cpp/src/qmf/AgentSession.cpp
+++ b/cpp/src/qmf/AgentSession.cpp
@@ -19,7 +19,132 @@
*
*/
-#include "qmf/AgentSessionImpl.h"
+#include "qpid/RefCounted.h"
+#include "qmf/PrivateImplRef.h"
+#include "qmf/exceptions.h"
+#include "qmf/AgentSession.h"
+#include "qmf/AgentEventImpl.h"
+#include "qmf/SchemaIdImpl.h"
+#include "qmf/SchemaImpl.h"
+#include "qmf/DataAddrImpl.h"
+#include "qmf/DataImpl.h"
+#include "qmf/QueryImpl.h"
+#include "qmf/agentCapability.h"
+#include "qmf/constants.h"
+#include "qpid/sys/Mutex.h"
+#include "qpid/sys/Condition.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Runnable.h"
+#include "qpid/log/Statement.h"
+#include "qpid/messaging/Connection.h"
+#include "qpid/messaging/Session.h"
+#include "qpid/messaging/Receiver.h"
+#include "qpid/messaging/Sender.h"
+#include "qpid/messaging/Message.h"
+#include "qpid/messaging/AddressParser.h"
+#include "qpid/management/Buffer.h"
+#include <queue>
+#include <map>
+#include <set>
+#include <iostream>
+#include <memory>
+
+using namespace std;
+using namespace qpid::messaging;
+using namespace qmf;
+using qpid::types::Variant;
+
+namespace qmf {
+ class AgentSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable {
+ public:
+ ~AgentSessionImpl();
+
+ //
+ // Methods from API handle
+ //
+ AgentSessionImpl(Connection& c, const string& o);
+ void setDomain(const string& d) { checkOpen(); domain = d; }
+ void setVendor(const string& v) { checkOpen(); attributes["_vendor"] = v; }
+ void setProduct(const string& p) { checkOpen(); attributes["_product"] = p; }
+ void setInstance(const string& i) { checkOpen(); attributes["_instance"] = i; }
+ void setAttribute(const string& k, const qpid::types::Variant& v) { checkOpen(); attributes[k] = v; }
+ const string& getName() const { return agentName; }
+ void open();
+ void close();
+ bool nextEvent(AgentEvent& e, Duration t);
+
+ void registerSchema(Schema& s);
+ DataAddr addData(Data& d, const string& n, bool persist);
+ void delData(const DataAddr&);
+
+ void authAccept(AgentEvent& e);
+ void authReject(AgentEvent& e, const string& m);
+ void raiseException(AgentEvent& e, const string& s);
+ void raiseException(AgentEvent& e, const Data& d);
+ void response(AgentEvent& e, const Data& d);
+ void complete(AgentEvent& e);
+ void methodSuccess(AgentEvent& e);
+ void raiseEvent(const Data& d);
+ void raiseEvent(const Data& d, int s);
+
+ private:
+ typedef map<DataAddr, Data, DataAddrCompare> DataIndex;
+ typedef map<SchemaId, Schema, SchemaIdCompare> SchemaMap;
+
+ mutable qpid::sys::Mutex lock;
+ qpid::sys::Condition cond;
+ Connection connection;
+ Session session;
+ Sender directSender;
+ Sender topicSender;
+ string domain;
+ Variant::Map attributes;
+ Variant::Map options;
+ string agentName;
+ bool opened;
+ queue<AgentEvent> eventQueue;
+ qpid::sys::Thread* thread;
+ bool threadCanceled;
+ uint32_t bootSequence;
+ uint32_t interval;
+ uint64_t lastHeartbeat;
+ uint64_t lastVisit;
+ bool forceHeartbeat;
+ bool externalStorage;
+ bool autoAllowQueries;
+ bool autoAllowMethods;
+ uint32_t maxSubscriptions;
+ uint32_t minSubInterval;
+ uint32_t subLifetime;
+ bool publicEvents;
+ bool listenOnDirect;
+ bool strictSecurity;
+ uint64_t schemaUpdateTime;
+ string directBase;
+ string topicBase;
+
+ SchemaMap schemata;
+ DataIndex globalIndex;
+ map<SchemaId, DataIndex, SchemaIdCompareNoHash> schemaIndex;
+
+ void checkOpen();
+ void setAgentName();
+ void enqueueEvent(const AgentEvent&);
+ void handleLocateRequest(const Variant::List& content, const Message& msg);
+ void handleMethodRequest(const Variant::Map& content, const Message& msg);
+ void handleQueryRequest(const Variant::Map& content, const Message& msg);
+ void handleSchemaRequest(AgentEvent&);
+ void handleV1SchemaRequest(qpid::management::Buffer&, uint32_t, const Message&);
+ void dispatch(Message);
+ void sendHeartbeat();
+ void send(Message, const Address&);
+ void flushResponses(AgentEvent&, bool);
+ void periodicProcessing(uint64_t);
+ void run();
+ };
+}
+
+typedef qmf::PrivateImplRef<AgentSession> PI;
AgentSession::AgentSession(AgentSessionImpl* impl) { PI::ctor(*this, impl); }
AgentSession::AgentSession(const AgentSession& s) : qmf::Handle<AgentSessionImpl>() { PI::copy(*this, s); }
@@ -36,7 +161,6 @@ const string& AgentSession::getName() const { return impl->getName(); }
void AgentSession::open() { impl->open(); }
void AgentSession::close() { impl->close(); }
bool AgentSession::nextEvent(AgentEvent& e, Duration t) { return impl->nextEvent(e, t); }
-int AgentSession::pendingEvents() const { return impl->pendingEvents(); }
void AgentSession::registerSchema(Schema& s) { impl->registerSchema(s); }
DataAddr AgentSession::addData(Data& d, const string& n, bool p) { return impl->addData(d, n, p); }
void AgentSession::delData(const DataAddr& a) { impl->delData(a); }
@@ -55,11 +179,11 @@ void AgentSession::raiseEvent(const Data& d, int s) { impl->raiseEvent(d, s); }
//========================================================================================
AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) :
- connection(c), domain("default"), opened(false), eventNotifier(0), thread(0), threadCanceled(false),
+ connection(c), domain("default"), opened(false), thread(0), threadCanceled(false),
bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false),
externalStorage(false), autoAllowQueries(true), autoAllowMethods(true),
maxSubscriptions(64), minSubInterval(3000), subLifetime(300), publicEvents(true),
- listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5),
+ listenOnDirect(true), strictSecurity(false),
schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())))
{
//
@@ -120,14 +244,7 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) :
iter = optMap.find("strict-security");
if (iter != optMap.end())
strictSecurity = iter->second.asBool();
-
- iter = optMap.find("max-thread-wait-time");
- if (iter != optMap.end())
- maxThreadWaitTime = iter->second.asUint32();
}
-
- if (maxThreadWaitTime > interval)
- maxThreadWaitTime = interval;
}
@@ -135,11 +252,6 @@ AgentSessionImpl::~AgentSessionImpl()
{
if (opened)
close();
-
- if (thread) {
- thread->join();
- delete thread;
- }
}
@@ -148,12 +260,6 @@ void AgentSessionImpl::open()
if (opened)
throw QmfException("The session is already open");
- // If the thread exists, join and delete it before creating a new one.
- if (thread) {
- thread->join();
- delete thread;
- }
-
const string addrArgs(";{create:never,node:{type:topic}}");
const string routableAddr("direct-agent.route." + qpid::types::Uuid(true).str());
attributes["_direct_subject"] = routableAddr;
@@ -191,26 +297,19 @@ void AgentSessionImpl::open()
}
-void AgentSessionImpl::closeAsync()
+void AgentSessionImpl::close()
{
if (!opened)
return;
- // Stop the receiver thread. Don't join it until the destructor is called or open() is called.
+ // Stop and join the receiver thread
threadCanceled = true;
- opened = false;
-}
+ thread->join();
+ delete thread;
-
-void AgentSessionImpl::close()
-{
- closeAsync();
-
- if (thread) {
- thread->join();
- delete thread;
- thread = 0;
- }
+ // Close the AMQP session
+ session.close();
+ opened = false;
}
@@ -219,19 +318,13 @@ bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout)
uint64_t milliseconds = timeout.getMilliseconds();
qpid::sys::Mutex::ScopedLock l(lock);
- if (eventQueue.empty() && milliseconds > 0) {
- int64_t nsecs(qpid::sys::TIME_INFINITE);
- if ((uint64_t)(nsecs / 1000000) > milliseconds)
- nsecs = (int64_t) milliseconds * 1000000;
- qpid::sys::Duration then(nsecs);
- cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then));
- }
+ if (eventQueue.empty())
+ cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(),
+ qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC)));
if (!eventQueue.empty()) {
event = eventQueue.front();
eventQueue.pop();
- if (eventQueue.empty())
- alertEventNotifierLH(false);
return true;
}
@@ -239,26 +332,6 @@ bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout)
}
-int AgentSessionImpl::pendingEvents() const
-{
- qpid::sys::Mutex::ScopedLock l(lock);
- return eventQueue.size();
-}
-
-
-void AgentSessionImpl::setEventNotifier(EventNotifierImpl* notifier)
-{
- qpid::sys::Mutex::ScopedLock l(lock);
- eventNotifier = notifier;
-}
-
-EventNotifierImpl* AgentSessionImpl::getEventNotifier() const
-{
- qpid::sys::Mutex::ScopedLock l(lock);
- return eventNotifier;
-}
-
-
void AgentSessionImpl::registerSchema(Schema& schema)
{
if (!schema.isFinalized())
@@ -514,10 +587,8 @@ void AgentSessionImpl::enqueueEvent(const AgentEvent& event)
qpid::sys::Mutex::ScopedLock l(lock);
bool notify = eventQueue.empty();
eventQueue.push(event);
- if (notify) {
+ if (notify)
cond.notify();
- alertEventNotifierLH(true);
- }
}
@@ -961,13 +1032,6 @@ void AgentSessionImpl::periodicProcessing(uint64_t seconds)
}
-void AgentSessionImpl::alertEventNotifierLH(bool readable)
-{
- if (eventNotifier)
- eventNotifier->setReadable(readable);
-}
-
-
void AgentSessionImpl::run()
{
QPID_LOG(debug, "AgentSession thread started for agent " << agentName);
@@ -977,7 +1041,7 @@ void AgentSessionImpl::run()
periodicProcessing((uint64_t) qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()) / qpid::sys::TIME_SEC);
Receiver rx;
- bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime);
+ bool valid = session.nextReceiver(rx, Duration::SECOND);
if (threadCanceled)
break;
if (valid) {
@@ -994,19 +1058,6 @@ void AgentSessionImpl::run()
enqueueEvent(AgentEvent(new AgentEventImpl(AGENT_THREAD_FAILED)));
}
- session.close();
QPID_LOG(debug, "AgentSession thread exiting for agent " << agentName);
}
-
-AgentSessionImpl& AgentSessionImplAccess::get(AgentSession& session)
-{
- return *session.impl;
-}
-
-
-const AgentSessionImpl& AgentSessionImplAccess::get(const AgentSession& session)
-{
- return *session.impl;
-}
-
diff --git a/cpp/src/qmf/AgentSessionImpl.h b/cpp/src/qmf/AgentSessionImpl.h
deleted file mode 100644
index ae512a4054..0000000000
--- a/cpp/src/qmf/AgentSessionImpl.h
+++ /dev/null
@@ -1,175 +0,0 @@
-#ifndef __QMF_AGENT_SESSION_IMPL_H
-#define __QMF_AGENT_SESSION_IMPL_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/RefCounted.h"
-#include "qmf/PrivateImplRef.h"
-#include "qmf/exceptions.h"
-#include "qmf/AgentSession.h"
-#include "qmf/AgentEventImpl.h"
-#include "qmf/EventNotifierImpl.h"
-#include "qpid/messaging/Connection.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/sys/Mutex.h"
-#include "qpid/sys/Condition.h"
-#include "qpid/sys/Thread.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/log/Statement.h"
-#include "qpid/messaging/Connection.h"
-#include "qpid/messaging/Session.h"
-#include "qpid/messaging/Receiver.h"
-#include "qpid/messaging/Sender.h"
-#include "qpid/messaging/Message.h"
-#include "qpid/messaging/AddressParser.h"
-#include "qpid/management/Buffer.h"
-#include "qpid/RefCounted.h"
-#include "qmf/PrivateImplRef.h"
-#include "qmf/AgentSession.h"
-#include "qmf/exceptions.h"
-#include "qmf/AgentSession.h"
-#include "qmf/SchemaIdImpl.h"
-#include "qmf/SchemaImpl.h"
-#include "qmf/DataAddrImpl.h"
-#include "qmf/DataImpl.h"
-#include "qmf/QueryImpl.h"
-#include "qmf/agentCapability.h"
-#include "qmf/constants.h"
-
-#include <queue>
-#include <map>
-#include <iostream>
-#include <memory>
-
-using namespace std;
-using namespace qpid::messaging;
-using namespace qmf;
-using qpid::types::Variant;
-
-typedef qmf::PrivateImplRef<AgentSession> PI;
-
-namespace qmf {
- class AgentSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable {
- public:
- ~AgentSessionImpl();
-
- //
- // Methods from API handle
- //
- AgentSessionImpl(Connection& c, const string& o);
- void setDomain(const string& d) { checkOpen(); domain = d; }
- void setVendor(const string& v) { checkOpen(); attributes["_vendor"] = v; }
- void setProduct(const string& p) { checkOpen(); attributes["_product"] = p; }
- void setInstance(const string& i) { checkOpen(); attributes["_instance"] = i; }
- void setAttribute(const string& k, const qpid::types::Variant& v) { checkOpen(); attributes[k] = v; }
- const string& getName() const { return agentName; }
- void open();
- void closeAsync();
- void close();
- bool nextEvent(AgentEvent& e, Duration t);
- int pendingEvents() const;
-
- void setEventNotifier(EventNotifierImpl* eventNotifier);
- EventNotifierImpl* getEventNotifier() const;
-
- void registerSchema(Schema& s);
- DataAddr addData(Data& d, const string& n, bool persist);
- void delData(const DataAddr&);
-
- void authAccept(AgentEvent& e);
- void authReject(AgentEvent& e, const string& m);
- void raiseException(AgentEvent& e, const string& s);
- void raiseException(AgentEvent& e, const Data& d);
- void response(AgentEvent& e, const Data& d);
- void complete(AgentEvent& e);
- void methodSuccess(AgentEvent& e);
- void raiseEvent(const Data& d);
- void raiseEvent(const Data& d, int s);
-
- private:
- typedef map<DataAddr, Data, DataAddrCompare> DataIndex;
- typedef map<SchemaId, Schema, SchemaIdCompare> SchemaMap;
-
- mutable qpid::sys::Mutex lock;
- qpid::sys::Condition cond;
- Connection connection;
- Session session;
- Sender directSender;
- Sender topicSender;
- string domain;
- Variant::Map attributes;
- Variant::Map options;
- string agentName;
- bool opened;
- queue<AgentEvent> eventQueue;
- EventNotifierImpl* eventNotifier;
- qpid::sys::Thread* thread;
- bool threadCanceled;
- uint32_t bootSequence;
- uint32_t interval;
- uint64_t lastHeartbeat;
- uint64_t lastVisit;
- bool forceHeartbeat;
- bool externalStorage;
- bool autoAllowQueries;
- bool autoAllowMethods;
- uint32_t maxSubscriptions;
- uint32_t minSubInterval;
- uint32_t subLifetime;
- bool publicEvents;
- bool listenOnDirect;
- bool strictSecurity;
- uint32_t maxThreadWaitTime;
- uint64_t schemaUpdateTime;
- string directBase;
- string topicBase;
-
- SchemaMap schemata;
- DataIndex globalIndex;
- map<SchemaId, DataIndex, SchemaIdCompareNoHash> schemaIndex;
-
- void checkOpen();
- void setAgentName();
- void enqueueEvent(const AgentEvent&);
- void alertEventNotifierLH(bool readable);
- void handleLocateRequest(const Variant::List& content, const Message& msg);
- void handleMethodRequest(const Variant::Map& content, const Message& msg);
- void handleQueryRequest(const Variant::Map& content, const Message& msg);
- void handleSchemaRequest(AgentEvent&);
- void handleV1SchemaRequest(qpid::management::Buffer&, uint32_t, const Message&);
- void dispatch(Message);
- void sendHeartbeat();
- void send(Message, const Address&);
- void flushResponses(AgentEvent&, bool);
- void periodicProcessing(uint64_t);
- void run();
- };
-
- struct AgentSessionImplAccess {
- static AgentSessionImpl& get(AgentSession& session);
- static const AgentSessionImpl& get(const AgentSession& session);
- };
-}
-
-
-#endif
-
diff --git a/cpp/src/qmf/ConsoleSession.cpp b/cpp/src/qmf/ConsoleSession.cpp
index 2dfc894c58..e12c1152f6 100644
--- a/cpp/src/qmf/ConsoleSession.cpp
+++ b/cpp/src/qmf/ConsoleSession.cpp
@@ -54,7 +54,6 @@ void ConsoleSession::setAgentFilter(const string& f) { impl->setAgentFilter(f);
void ConsoleSession::open() { impl->open(); }
void ConsoleSession::close() { impl->close(); }
bool ConsoleSession::nextEvent(ConsoleEvent& e, Duration t) { return impl->nextEvent(e, t); }
-int ConsoleSession::pendingEvents() const { return impl->pendingEvents(); }
uint32_t ConsoleSession::getAgentCount() const { return impl->getAgentCount(); }
Agent ConsoleSession::getAgent(uint32_t i) const { return impl->getAgent(i); }
Agent ConsoleSession::getConnectedBrokerAgent() const { return impl->getConnectedBrokerAgent(); }
@@ -66,9 +65,9 @@ Subscription ConsoleSession::subscribe(const string& q, const string& f, const s
//========================================================================================
ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
- connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5),
- opened(false), eventNotifier(0), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
- connectedBrokerInAgentList(false), schemaCache(new SchemaCache()), nextCorrelator(1)
+ connection(c), domain("default"), maxAgentAgeMinutes(5),
+ opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0),
+ connectedBrokerInAgentList(false), schemaCache(new SchemaCache())
{
if (!options.empty()) {
qpid::messaging::AddressParser parser(options);
@@ -92,14 +91,7 @@ ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) :
iter = optMap.find("strict-security");
if (iter != optMap.end())
strictSecurity = iter->second.asBool();
-
- iter = optMap.find("max-thread-wait-time");
- if (iter != optMap.end())
- maxThreadWaitTime = iter->second.asUint32();
}
-
- if (maxThreadWaitTime > 60)
- maxThreadWaitTime = 60;
}
@@ -107,11 +99,6 @@ ConsoleSessionImpl::~ConsoleSessionImpl()
{
if (opened)
close();
-
- if (thread) {
- thread->join();
- delete thread;
- }
}
@@ -166,12 +153,6 @@ void ConsoleSessionImpl::open()
if (opened)
throw QmfException("The session is already open");
- // If the thread exists, join and delete it before creating a new one.
- if (thread) {
- thread->join();
- delete thread;
- }
-
// Establish messaging addresses
directBase = "qmf." + domain + ".direct";
topicBase = "qmf." + domain + ".topic";
@@ -200,36 +181,30 @@ void ConsoleSessionImpl::open()
// Start the receiver thread
threadCanceled = false;
- opened = true;
thread = new qpid::sys::Thread(*this);
// Send an agent_locate to direct address 'broker' to identify the connected-broker-agent.
sendBrokerLocate();
if (agentQuery)
sendAgentLocate();
+
+ opened = true;
}
-void ConsoleSessionImpl::closeAsync()
+void ConsoleSessionImpl::close()
{
if (!opened)
throw QmfException("The session is already closed");
- // Stop the receiver thread. Don't join it until the destructor is called or open() is called.
+ // Stop and join the receiver thread
threadCanceled = true;
- opened = false;
-}
-
+ thread->join();
+ delete thread;
-void ConsoleSessionImpl::close()
-{
- closeAsync();
-
- if (thread) {
- thread->join();
- delete thread;
- thread = 0;
- }
+ // Close the AMQP session
+ session.close();
+ opened = false;
}
@@ -238,19 +213,13 @@ bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout)
uint64_t milliseconds = timeout.getMilliseconds();
qpid::sys::Mutex::ScopedLock l(lock);
- if (eventQueue.empty() && milliseconds > 0) {
- int64_t nsecs(qpid::sys::TIME_INFINITE);
- if ((uint64_t)(nsecs / 1000000) > milliseconds)
- nsecs = (int64_t) milliseconds * 1000000;
- qpid::sys::Duration then(nsecs);
- cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then));
- }
+ if (eventQueue.empty())
+ cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(),
+ qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC)));
if (!eventQueue.empty()) {
event = eventQueue.front();
eventQueue.pop();
- if (eventQueue.empty())
- alertEventNotifierLH(false);
return true;
}
@@ -258,27 +227,6 @@ bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout)
}
-int ConsoleSessionImpl::pendingEvents() const
-{
- qpid::sys::Mutex::ScopedLock l(lock);
- return eventQueue.size();
-}
-
-
-void ConsoleSessionImpl::setEventNotifier(EventNotifierImpl* notifier)
-{
- qpid::sys::Mutex::ScopedLock l(lock);
- eventNotifier = notifier;
-}
-
-
-EventNotifierImpl* ConsoleSessionImpl::getEventNotifier() const
-{
- qpid::sys::Mutex::ScopedLock l(lock);
- return eventNotifier;
-}
-
-
uint32_t ConsoleSessionImpl::getAgentCount() const
{
qpid::sys::Mutex::ScopedLock l(lock);
@@ -320,10 +268,8 @@ void ConsoleSessionImpl::enqueueEventLH(const ConsoleEvent& event)
{
bool notify = eventQueue.empty();
eventQueue.push(event);
- if (notify) {
+ if (notify)
cond.notify();
- alertEventNotifierLH(true);
- }
}
@@ -475,23 +421,7 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian
iter = content.find("_values");
if (iter == content.end())
return;
- const Variant::Map& in_attrs(iter->second.asMap());
- Variant::Map attrs;
-
- //
- // Copy the map from the message to "attrs". Translate any old-style
- // keys to their new key values in the process.
- //
- for (iter = in_attrs.begin(); iter != in_attrs.end(); iter++) {
- if (iter->first == "epoch")
- attrs[protocol::AGENT_ATTR_EPOCH] = iter->second;
- else if (iter->first == "timestamp")
- attrs[protocol::AGENT_ATTR_TIMESTAMP] = iter->second;
- else if (iter->first == "heartbeat_interval")
- attrs[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = iter->second;
- else
- attrs[iter->first] = iter->second;
- }
+ Variant::Map attrs(iter->second.asMap());
iter = attrs.find(protocol::AGENT_ATTR_EPOCH);
if (iter != attrs.end())
@@ -632,13 +562,6 @@ void ConsoleSessionImpl::periodicProcessing(uint64_t seconds)
}
-void ConsoleSessionImpl::alertEventNotifierLH(bool readable)
-{
- if (eventNotifier)
- eventNotifier->setReadable(readable);
-}
-
-
void ConsoleSessionImpl::run()
{
QPID_LOG(debug, "ConsoleSession thread started");
@@ -649,7 +572,7 @@ void ConsoleSessionImpl::run()
qpid::sys::TIME_SEC);
Receiver rx;
- bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime);
+ bool valid = session.nextReceiver(rx, Duration::SECOND);
if (threadCanceled)
break;
if (valid) {
@@ -666,18 +589,6 @@ void ConsoleSessionImpl::run()
enqueueEvent(ConsoleEvent(new ConsoleEventImpl(CONSOLE_THREAD_FAILED)));
}
- session.close();
QPID_LOG(debug, "ConsoleSession thread exiting");
}
-
-ConsoleSessionImpl& ConsoleSessionImplAccess::get(ConsoleSession& session)
-{
- return *session.impl;
-}
-
-
-const ConsoleSessionImpl& ConsoleSessionImplAccess::get(const ConsoleSession& session)
-{
- return *session.impl;
-}
diff --git a/cpp/src/qmf/ConsoleSessionImpl.h b/cpp/src/qmf/ConsoleSessionImpl.h
index e2b30602fa..675c8bcfb5 100644
--- a/cpp/src/qmf/ConsoleSessionImpl.h
+++ b/cpp/src/qmf/ConsoleSessionImpl.h
@@ -27,7 +27,6 @@
#include "qmf/SchemaId.h"
#include "qmf/Schema.h"
#include "qmf/ConsoleEventImpl.h"
-#include "qmf/EventNotifierImpl.h"
#include "qmf/SchemaCache.h"
#include "qmf/Query.h"
#include "qpid/sys/Mutex.h"
@@ -42,13 +41,9 @@
#include "qpid/messaging/Address.h"
#include "qpid/management/Buffer.h"
#include "qpid/types/Variant.h"
-
-#include <boost/shared_ptr.hpp>
#include <map>
#include <queue>
-using namespace std;
-
namespace qmf {
class ConsoleSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable {
public:
@@ -61,14 +56,8 @@ namespace qmf {
void setDomain(const std::string& d) { domain = d; }
void setAgentFilter(const std::string& f);
void open();
- void closeAsync();
void close();
bool nextEvent(ConsoleEvent& e, qpid::messaging::Duration t);
- int pendingEvents() const;
-
- void setEventNotifier(EventNotifierImpl* notifier);
- EventNotifierImpl* getEventNotifier() const;
-
uint32_t getAgentCount() const;
Agent getAgent(uint32_t i) const;
Agent getConnectedBrokerAgent() const { return connectedBrokerAgent; }
@@ -86,11 +75,9 @@ namespace qmf {
uint32_t maxAgentAgeMinutes;
bool listenOnDirect;
bool strictSecurity;
- uint32_t maxThreadWaitTime;
Query agentQuery;
bool opened;
std::queue<ConsoleEvent> eventQueue;
- EventNotifierImpl* eventNotifier;
qpid::sys::Thread* thread;
bool threadCanceled;
uint64_t lastVisit;
@@ -102,8 +89,6 @@ namespace qmf {
std::string directBase;
std::string topicBase;
boost::shared_ptr<SchemaCache> schemaCache;
- qpid::sys::Mutex corrlock;
- uint32_t nextCorrelator;
void enqueueEvent(const ConsoleEvent&);
void enqueueEventLH(const ConsoleEvent&);
@@ -113,17 +98,10 @@ namespace qmf {
void handleAgentUpdate(const std::string&, const qpid::types::Variant::Map&, const qpid::messaging::Message&);
void handleV1SchemaResponse(qpid::management::Buffer&, uint32_t, const qpid::messaging::Message&);
void periodicProcessing(uint64_t);
- void alertEventNotifierLH(bool readable);
void run();
- uint32_t correlator() { qpid::sys::Mutex::ScopedLock l(corrlock); return nextCorrelator++; }
friend class AgentImpl;
};
-
- struct ConsoleSessionImplAccess {
- static ConsoleSessionImpl& get(ConsoleSession& session);
- static const ConsoleSessionImpl& get(const ConsoleSession& session);
- };
}
#endif
diff --git a/cpp/src/qmf/DataAddr.cpp b/cpp/src/qmf/DataAddr.cpp
index d16e12062e..fb51d5787f 100644
--- a/cpp/src/qmf/DataAddr.cpp
+++ b/cpp/src/qmf/DataAddr.cpp
@@ -36,9 +36,7 @@ DataAddr::~DataAddr() { PI::dtor(*this); }
DataAddr& DataAddr::operator=(const DataAddr& s) { return PI::assign(*this, s); }
bool DataAddr::operator==(const DataAddr& o) { return *impl == *o.impl; }
-bool DataAddr::operator==(const DataAddr& o) const { return *impl == *o.impl; }
bool DataAddr::operator<(const DataAddr& o) { return *impl < *o.impl; }
-bool DataAddr::operator<(const DataAddr& o) const { return *impl < *o.impl; }
DataAddr::DataAddr(const qpid::types::Variant::Map& m) { PI::ctor(*this, new DataAddrImpl(m)); }
DataAddr::DataAddr(const string& n, const string& a, uint32_t e) { PI::ctor(*this, new DataAddrImpl(n, a, e)); }
@@ -47,7 +45,7 @@ const string& DataAddr::getAgentName() const { return impl->getAgentName(); }
uint32_t DataAddr::getAgentEpoch() const { return impl->getAgentEpoch(); }
Variant::Map DataAddr::asMap() const { return impl->asMap(); }
-bool DataAddrImpl::operator==(const DataAddrImpl& other) const
+bool DataAddrImpl::operator==(const DataAddrImpl& other)
{
return
agentName == other.agentName &&
@@ -56,7 +54,7 @@ bool DataAddrImpl::operator==(const DataAddrImpl& other) const
}
-bool DataAddrImpl::operator<(const DataAddrImpl& other) const
+bool DataAddrImpl::operator<(const DataAddrImpl& other)
{
if (agentName < other.agentName) return true;
if (agentName > other.agentName) return false;
diff --git a/cpp/src/qmf/DataAddrImpl.h b/cpp/src/qmf/DataAddrImpl.h
index 11d512f0c4..3f9cae9453 100644
--- a/cpp/src/qmf/DataAddrImpl.h
+++ b/cpp/src/qmf/DataAddrImpl.h
@@ -38,8 +38,8 @@ namespace qmf {
//
// Methods from API handle
//
- bool operator==(const DataAddrImpl&) const;
- bool operator<(const DataAddrImpl&) const;
+ bool operator==(const DataAddrImpl&);
+ bool operator<(const DataAddrImpl&);
DataAddrImpl(const qpid::types::Variant::Map&);
DataAddrImpl(const std::string& _name, const std::string& _agentName, uint32_t _agentEpoch=0) :
agentName(_agentName), name(_name), agentEpoch(_agentEpoch) {}
diff --git a/cpp/src/qmf/EventNotifierImpl.cpp b/cpp/src/qmf/EventNotifierImpl.cpp
deleted file mode 100644
index 20114aaa5e..0000000000
--- a/cpp/src/qmf/EventNotifierImpl.cpp
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/EventNotifierImpl.h"
-#include "qmf/AgentSessionImpl.h"
-#include "qmf/ConsoleSessionImpl.h"
-
-EventNotifierImpl::EventNotifierImpl(AgentSession& agentSession)
- : readable(false), agent(agentSession)
-{
- AgentSessionImplAccess::get(agent).setEventNotifier(this);
-}
-
-
-EventNotifierImpl::EventNotifierImpl(ConsoleSession& consoleSession)
- : readable(false), console(consoleSession)
-{
- ConsoleSessionImplAccess::get(console).setEventNotifier(this);
-}
-
-
-EventNotifierImpl::~EventNotifierImpl()
-{
- if (agent.isValid())
- AgentSessionImplAccess::get(agent).setEventNotifier(NULL);
- if (console.isValid())
- ConsoleSessionImplAccess::get(console).setEventNotifier(NULL);
-}
-
-void EventNotifierImpl::setReadable(bool readable)
-{
- update(readable);
- this->readable = readable;
-}
-
-
-bool EventNotifierImpl::isReadable() const
-{
- return this->readable;
-}
diff --git a/cpp/src/qmf/EventNotifierImpl.h b/cpp/src/qmf/EventNotifierImpl.h
deleted file mode 100644
index d85f9979d2..0000000000
--- a/cpp/src/qmf/EventNotifierImpl.h
+++ /dev/null
@@ -1,48 +0,0 @@
-#ifndef __QMF_EVENT_NOTIFIER_IMPL_H
-#define __QMF_EVENT_NOTIFIER_IMPL_H
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/AgentSession.h"
-#include "qmf/ConsoleSession.h"
-
-namespace qmf
-{
- class EventNotifierImpl {
- private:
- bool readable;
- AgentSession agent;
- ConsoleSession console;
-
- public:
- EventNotifierImpl(AgentSession& agentSession);
- EventNotifierImpl(ConsoleSession& consoleSession);
- virtual ~EventNotifierImpl();
-
- void setReadable(bool readable);
- bool isReadable() const;
-
- protected:
- virtual void update(bool readable) = 0;
- };
-}
-
-#endif
-
diff --git a/cpp/src/qmf/PosixEventNotifier.cpp b/cpp/src/qmf/PosixEventNotifier.cpp
deleted file mode 100644
index a364cc155d..0000000000
--- a/cpp/src/qmf/PosixEventNotifier.cpp
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/posix/EventNotifier.h"
-#include "qmf/PosixEventNotifierImpl.h"
-#include "qmf/PrivateImplRef.h"
-
-using namespace qmf;
-using namespace std;
-
-typedef qmf::PrivateImplRef<posix::EventNotifier> PI;
-
-posix::EventNotifier::EventNotifier(PosixEventNotifierImpl* impl) { PI::ctor(*this, impl); }
-
-posix::EventNotifier::EventNotifier(AgentSession& agentSession)
-{
- PI::ctor(*this, new PosixEventNotifierImpl(agentSession));
-}
-
-
-posix::EventNotifier::EventNotifier(ConsoleSession& consoleSession)
-{
- PI::ctor(*this, new PosixEventNotifierImpl(consoleSession));
-}
-
-
-posix::EventNotifier::EventNotifier(const posix::EventNotifier& that)
- : Handle<PosixEventNotifierImpl>()
-{
- PI::copy(*this, that);
-}
-
-
-posix::EventNotifier::~EventNotifier()
-{
- PI::dtor(*this);
-}
-
-posix::EventNotifier& posix::EventNotifier::operator=(const posix::EventNotifier& that)
-{
- return PI::assign(*this, that);
-}
-
-
-int posix::EventNotifier::getHandle() const
-{
- return impl->getHandle();
-}
-
diff --git a/cpp/src/qmf/PosixEventNotifierImpl.cpp b/cpp/src/qmf/PosixEventNotifierImpl.cpp
deleted file mode 100644
index 011dbcc214..0000000000
--- a/cpp/src/qmf/PosixEventNotifierImpl.cpp
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "PosixEventNotifierImpl.h"
-#include "qpid/log/Statement.h"
-
-#include <fcntl.h>
-#include <unistd.h>
-#include <errno.h>
-
-#define BUFFER_SIZE 10
-
-using namespace qmf;
-
-PosixEventNotifierImpl::PosixEventNotifierImpl(AgentSession& agentSession)
- : EventNotifierImpl(agentSession)
-{
- openHandle();
-}
-
-
-PosixEventNotifierImpl::PosixEventNotifierImpl(ConsoleSession& consoleSession)
- : EventNotifierImpl(consoleSession)
-{
- openHandle();
-}
-
-
-PosixEventNotifierImpl::~PosixEventNotifierImpl()
-{
- closeHandle();
-}
-
-
-void PosixEventNotifierImpl::update(bool readable)
-{
- char buffer[BUFFER_SIZE];
-
- if(readable && !this->isReadable()) {
- if (::write(myHandle, "1", 1) == -1)
- QPID_LOG(error, "PosixEventNotifierImpl::update write failed: " << errno);
- }
- else if(!readable && this->isReadable()) {
- if (::read(yourHandle, buffer, BUFFER_SIZE) == -1)
- QPID_LOG(error, "PosixEventNotifierImpl::update read failed: " << errno);
- }
-}
-
-
-void PosixEventNotifierImpl::openHandle()
-{
- int pair[2];
-
- if(::pipe(pair) == -1)
- throw QmfException("Unable to open event notifier handle.");
-
- yourHandle = pair[0];
- myHandle = pair[1];
-
- int flags;
-
- flags = ::fcntl(yourHandle, F_GETFL);
- if((::fcntl(yourHandle, F_SETFL, flags | O_NONBLOCK)) == -1)
- throw QmfException("Unable to make remote handle non-blocking.");
-
- flags = ::fcntl(myHandle, F_GETFL);
- if((::fcntl(myHandle, F_SETFL, flags | O_NONBLOCK)) == -1)
- throw QmfException("Unable to make local handle non-blocking.");
-}
-
-
-void PosixEventNotifierImpl::closeHandle()
-{
- if(myHandle > 0) {
- ::close(myHandle);
- myHandle = -1;
- }
-
- if(yourHandle > 0) {
- ::close(yourHandle);
- yourHandle = -1;
- }
-}
-
-
-PosixEventNotifierImpl& PosixEventNotifierImplAccess::get(posix::EventNotifier& notifier)
-{
- return *notifier.impl;
-}
-
-
-const PosixEventNotifierImpl& PosixEventNotifierImplAccess::get(const posix::EventNotifier& notifier)
-{
- return *notifier.impl;
-}
-
diff --git a/cpp/src/qmf/PosixEventNotifierImpl.h b/cpp/src/qmf/PosixEventNotifierImpl.h
deleted file mode 100644
index c8a7446bd5..0000000000
--- a/cpp/src/qmf/PosixEventNotifierImpl.h
+++ /dev/null
@@ -1,61 +0,0 @@
-#ifndef __QMF_POSIX_EVENT_NOTIFIER_IMPL_H
-#define __QMF_POSIX_EVENT_NOTIFIER_IMPL_H
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "qmf/posix/EventNotifier.h"
-#include "qmf/EventNotifierImpl.h"
-#include "qpid/RefCounted.h"
-
-namespace qmf
-{
- class AgentSession;
- class ConsoleSession;
-
- class PosixEventNotifierImpl : public EventNotifierImpl, public virtual qpid::RefCounted
- {
- public:
- PosixEventNotifierImpl(AgentSession& agentSession);
- PosixEventNotifierImpl(ConsoleSession& consoleSession);
- virtual ~PosixEventNotifierImpl();
-
- int getHandle() const { return yourHandle; }
-
- private:
- int myHandle;
- int yourHandle;
-
- void openHandle();
- void closeHandle();
-
- protected:
- void update(bool readable);
- };
-
- struct PosixEventNotifierImplAccess
- {
- static PosixEventNotifierImpl& get(posix::EventNotifier& notifier);
- static const PosixEventNotifierImpl& get(const posix::EventNotifier& notifier);
- };
-
-}
-
-#endif
-
diff --git a/cpp/src/qmf/PrivateImplRef.h b/cpp/src/qmf/PrivateImplRef.h
index 960cbb2e09..8b698c4199 100644
--- a/cpp/src/qmf/PrivateImplRef.h
+++ b/cpp/src/qmf/PrivateImplRef.h
@@ -23,8 +23,8 @@
*/
#include "qmf/ImportExport.h"
-#include "qpid/RefCounted.h"
#include <boost/intrusive_ptr.hpp>
+#include "qpid/RefCounted.h"
namespace qmf {
diff --git a/cpp/src/qmf/engine/ResilientConnection.cpp b/cpp/src/qmf/engine/ResilientConnection.cpp
index 41dd9ff00c..ab65b8d768 100644
--- a/cpp/src/qmf/engine/ResilientConnection.cpp
+++ b/cpp/src/qmf/engine/ResilientConnection.cpp
@@ -334,7 +334,8 @@ void ResilientConnectionImpl::notify()
{
if (notifyFd != -1)
{
- (void) ::write(notifyFd, ".", 1);
+ int unused_ret; //Suppress warnings about ignoring return value.
+ unused_ret = ::write(notifyFd, ".", 1);
}
}
@@ -431,7 +432,8 @@ void ResilientConnectionImpl::EnqueueEvent(ResilientConnectionEvent::EventKind k
if (notifyFd != -1)
{
- (void) ::write(notifyFd, ".", 1);
+ int unused_ret; //Suppress warnings about ignoring return value.
+ unused_ret = ::write(notifyFd, ".", 1);
}
}
diff --git a/cpp/src/qmf/engine/SchemaImpl.cpp b/cpp/src/qmf/engine/SchemaImpl.cpp
index 9d363d3012..e0948a9911 100644
--- a/cpp/src/qmf/engine/SchemaImpl.cpp
+++ b/cpp/src/qmf/engine/SchemaImpl.cpp
@@ -35,17 +35,17 @@ using qpid::framing::Uuid;
SchemaHash::SchemaHash()
{
for (int idx = 0; idx < 16; idx++)
- hash.b[idx] = 0x5A;
+ hash[idx] = 0x5A;
}
void SchemaHash::encode(Buffer& buffer) const
{
- buffer.putBin128(hash.b);
+ buffer.putBin128(hash);
}
void SchemaHash::decode(Buffer& buffer)
{
- buffer.getBin128(hash.b);
+ buffer.getBin128(hash);
}
void SchemaHash::update(uint8_t data)
@@ -55,8 +55,9 @@ void SchemaHash::update(uint8_t data)
void SchemaHash::update(const char* data, uint32_t len)
{
- uint64_t* first = &hash.q[0];
- uint64_t* second = &hash.q[1];
+ uint64_t* first = (uint64_t*) hash;
+ uint64_t* second = (uint64_t*) hash + 1;
+
for (uint32_t idx = 0; idx < len; idx++) {
*first = *first ^ (uint64_t) data[idx];
*second = *second << 1;
diff --git a/cpp/src/qmf/engine/SchemaImpl.h b/cpp/src/qmf/engine/SchemaImpl.h
index 683fb6f8f0..8b079a5ec6 100644
--- a/cpp/src/qmf/engine/SchemaImpl.h
+++ b/cpp/src/qmf/engine/SchemaImpl.h
@@ -35,10 +35,7 @@ namespace engine {
// they've been registered.
class SchemaHash {
- union h {
- uint8_t b[16];
- uint64_t q[2];
- } hash;
+ uint8_t hash[16];
public:
SchemaHash();
void encode(qpid::framing::Buffer& buffer) const;
@@ -50,7 +47,7 @@ namespace engine {
void update(Direction d) { update((uint8_t) d); }
void update(Access a) { update((uint8_t) a); }
void update(bool b) { update((uint8_t) (b ? 1 : 0)); }
- const uint8_t* get() const { return hash.b; }
+ const uint8_t* get() const { return hash; }
bool operator==(const SchemaHash& other) const;
bool operator<(const SchemaHash& other) const;
bool operator>(const SchemaHash& other) const;