diff options
Diffstat (limited to 'cpp/src/qmf')
| -rw-r--r-- | cpp/src/qmf/Agent.cpp | 29 | ||||
| -rw-r--r-- | cpp/src/qmf/AgentImpl.h | 1 | ||||
| -rw-r--r-- | cpp/src/qmf/AgentSession.cpp | 229 | ||||
| -rw-r--r-- | cpp/src/qmf/AgentSessionImpl.h | 175 | ||||
| -rw-r--r-- | cpp/src/qmf/ConsoleSession.cpp | 125 | ||||
| -rw-r--r-- | cpp/src/qmf/ConsoleSessionImpl.h | 22 | ||||
| -rw-r--r-- | cpp/src/qmf/DataAddr.cpp | 6 | ||||
| -rw-r--r-- | cpp/src/qmf/DataAddrImpl.h | 4 | ||||
| -rw-r--r-- | cpp/src/qmf/EventNotifierImpl.cpp | 56 | ||||
| -rw-r--r-- | cpp/src/qmf/EventNotifierImpl.h | 48 | ||||
| -rw-r--r-- | cpp/src/qmf/PosixEventNotifier.cpp | 65 | ||||
| -rw-r--r-- | cpp/src/qmf/PosixEventNotifierImpl.cpp | 112 | ||||
| -rw-r--r-- | cpp/src/qmf/PosixEventNotifierImpl.h | 61 | ||||
| -rw-r--r-- | cpp/src/qmf/PrivateImplRef.h | 2 | ||||
| -rw-r--r-- | cpp/src/qmf/engine/ResilientConnection.cpp | 6 | ||||
| -rw-r--r-- | cpp/src/qmf/engine/SchemaImpl.cpp | 11 | ||||
| -rw-r--r-- | cpp/src/qmf/engine/SchemaImpl.h | 7 |
17 files changed, 199 insertions, 760 deletions
diff --git a/cpp/src/qmf/Agent.cpp b/cpp/src/qmf/Agent.cpp index 684f8e4fba..915f2a1c88 100644 --- a/cpp/src/qmf/Agent.cpp +++ b/cpp/src/qmf/Agent.cpp @@ -72,7 +72,7 @@ Schema Agent::getSchema(const SchemaId& s, Duration t) { return impl->getSchema( AgentImpl::AgentImpl(const std::string& n, uint32_t e, ConsoleSessionImpl& s) : name(n), directSubject(n), epoch(e), session(s), touched(true), untouchedCount(0), capability(0), - sender(session.directSender), schemaCache(s.schemaCache) + sender(session.directSender), nextCorrelator(1), schemaCache(s.schemaCache) { } @@ -102,11 +102,12 @@ const Variant& AgentImpl::getAttribute(const string& k) const ConsoleEvent AgentImpl::query(const Query& query, Duration timeout) { boost::shared_ptr<SyncContext> context(new SyncContext()); - uint32_t correlator(session.correlator()); + uint32_t correlator; ConsoleEvent result; { qpid::sys::Mutex::ScopedLock l(lock); + correlator = nextCorrelator++; contextMap[correlator] = context; } try { @@ -150,7 +151,12 @@ ConsoleEvent AgentImpl::query(const string& text, Duration timeout) uint32_t AgentImpl::queryAsync(const Query& query) { - uint32_t correlator(session.correlator()); + uint32_t correlator; + + { + qpid::sys::Mutex::ScopedLock l(lock); + correlator = nextCorrelator++; + } sendQuery(query, correlator); return correlator; @@ -166,11 +172,12 @@ uint32_t AgentImpl::queryAsync(const string& text) ConsoleEvent AgentImpl::callMethod(const string& method, const Variant::Map& args, const DataAddr& addr, Duration timeout) { boost::shared_ptr<SyncContext> context(new SyncContext()); - uint32_t correlator(session.correlator()); + uint32_t correlator; ConsoleEvent result; { qpid::sys::Mutex::ScopedLock l(lock); + correlator = nextCorrelator++; contextMap[correlator] = context; } try { @@ -206,7 +213,12 @@ ConsoleEvent AgentImpl::callMethod(const string& method, const Variant::Map& arg uint32_t AgentImpl::callMethodAsync(const string& method, const Variant::Map& args, const DataAddr& addr) { - uint32_t correlator(session.correlator()); + uint32_t correlator; + + { + qpid::sys::Mutex::ScopedLock l(lock); + correlator = nextCorrelator++; + } sendMethod(method, args, addr, correlator); return correlator; @@ -584,7 +596,12 @@ void AgentImpl::sendMethod(const string& method, const Variant::Map& args, const void AgentImpl::sendSchemaRequest(const SchemaId& id) { - uint32_t correlator(session.correlator()); + uint32_t correlator; + + { + qpid::sys::Mutex::ScopedLock l(lock); + correlator = nextCorrelator++; + } if (capability >= AGENT_CAPABILITY_V2_SCHEMA) { Query query(QUERY_SCHEMA, id); diff --git a/cpp/src/qmf/AgentImpl.h b/cpp/src/qmf/AgentImpl.h index 09754a3a7e..7fa4f4373a 100644 --- a/cpp/src/qmf/AgentImpl.h +++ b/cpp/src/qmf/AgentImpl.h @@ -99,6 +99,7 @@ namespace qmf { uint32_t capability; qpid::messaging::Sender sender; qpid::types::Variant::Map attributes; + uint32_t nextCorrelator; std::map<uint32_t, boost::shared_ptr<SyncContext> > contextMap; boost::shared_ptr<SchemaCache> schemaCache; mutable std::set<std::string> packageSet; diff --git a/cpp/src/qmf/AgentSession.cpp b/cpp/src/qmf/AgentSession.cpp index 251c25fd44..4c5a72a467 100644 --- a/cpp/src/qmf/AgentSession.cpp +++ b/cpp/src/qmf/AgentSession.cpp @@ -19,7 +19,132 @@ * */ -#include "qmf/AgentSessionImpl.h" +#include "qpid/RefCounted.h" +#include "qmf/PrivateImplRef.h" +#include "qmf/exceptions.h" +#include "qmf/AgentSession.h" +#include "qmf/AgentEventImpl.h" +#include "qmf/SchemaIdImpl.h" +#include "qmf/SchemaImpl.h" +#include "qmf/DataAddrImpl.h" +#include "qmf/DataImpl.h" +#include "qmf/QueryImpl.h" +#include "qmf/agentCapability.h" +#include "qmf/constants.h" +#include "qpid/sys/Mutex.h" +#include "qpid/sys/Condition.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Runnable.h" +#include "qpid/log/Statement.h" +#include "qpid/messaging/Connection.h" +#include "qpid/messaging/Session.h" +#include "qpid/messaging/Receiver.h" +#include "qpid/messaging/Sender.h" +#include "qpid/messaging/Message.h" +#include "qpid/messaging/AddressParser.h" +#include "qpid/management/Buffer.h" +#include <queue> +#include <map> +#include <set> +#include <iostream> +#include <memory> + +using namespace std; +using namespace qpid::messaging; +using namespace qmf; +using qpid::types::Variant; + +namespace qmf { + class AgentSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable { + public: + ~AgentSessionImpl(); + + // + // Methods from API handle + // + AgentSessionImpl(Connection& c, const string& o); + void setDomain(const string& d) { checkOpen(); domain = d; } + void setVendor(const string& v) { checkOpen(); attributes["_vendor"] = v; } + void setProduct(const string& p) { checkOpen(); attributes["_product"] = p; } + void setInstance(const string& i) { checkOpen(); attributes["_instance"] = i; } + void setAttribute(const string& k, const qpid::types::Variant& v) { checkOpen(); attributes[k] = v; } + const string& getName() const { return agentName; } + void open(); + void close(); + bool nextEvent(AgentEvent& e, Duration t); + + void registerSchema(Schema& s); + DataAddr addData(Data& d, const string& n, bool persist); + void delData(const DataAddr&); + + void authAccept(AgentEvent& e); + void authReject(AgentEvent& e, const string& m); + void raiseException(AgentEvent& e, const string& s); + void raiseException(AgentEvent& e, const Data& d); + void response(AgentEvent& e, const Data& d); + void complete(AgentEvent& e); + void methodSuccess(AgentEvent& e); + void raiseEvent(const Data& d); + void raiseEvent(const Data& d, int s); + + private: + typedef map<DataAddr, Data, DataAddrCompare> DataIndex; + typedef map<SchemaId, Schema, SchemaIdCompare> SchemaMap; + + mutable qpid::sys::Mutex lock; + qpid::sys::Condition cond; + Connection connection; + Session session; + Sender directSender; + Sender topicSender; + string domain; + Variant::Map attributes; + Variant::Map options; + string agentName; + bool opened; + queue<AgentEvent> eventQueue; + qpid::sys::Thread* thread; + bool threadCanceled; + uint32_t bootSequence; + uint32_t interval; + uint64_t lastHeartbeat; + uint64_t lastVisit; + bool forceHeartbeat; + bool externalStorage; + bool autoAllowQueries; + bool autoAllowMethods; + uint32_t maxSubscriptions; + uint32_t minSubInterval; + uint32_t subLifetime; + bool publicEvents; + bool listenOnDirect; + bool strictSecurity; + uint64_t schemaUpdateTime; + string directBase; + string topicBase; + + SchemaMap schemata; + DataIndex globalIndex; + map<SchemaId, DataIndex, SchemaIdCompareNoHash> schemaIndex; + + void checkOpen(); + void setAgentName(); + void enqueueEvent(const AgentEvent&); + void handleLocateRequest(const Variant::List& content, const Message& msg); + void handleMethodRequest(const Variant::Map& content, const Message& msg); + void handleQueryRequest(const Variant::Map& content, const Message& msg); + void handleSchemaRequest(AgentEvent&); + void handleV1SchemaRequest(qpid::management::Buffer&, uint32_t, const Message&); + void dispatch(Message); + void sendHeartbeat(); + void send(Message, const Address&); + void flushResponses(AgentEvent&, bool); + void periodicProcessing(uint64_t); + void run(); + }; +} + +typedef qmf::PrivateImplRef<AgentSession> PI; AgentSession::AgentSession(AgentSessionImpl* impl) { PI::ctor(*this, impl); } AgentSession::AgentSession(const AgentSession& s) : qmf::Handle<AgentSessionImpl>() { PI::copy(*this, s); } @@ -36,7 +161,6 @@ const string& AgentSession::getName() const { return impl->getName(); } void AgentSession::open() { impl->open(); } void AgentSession::close() { impl->close(); } bool AgentSession::nextEvent(AgentEvent& e, Duration t) { return impl->nextEvent(e, t); } -int AgentSession::pendingEvents() const { return impl->pendingEvents(); } void AgentSession::registerSchema(Schema& s) { impl->registerSchema(s); } DataAddr AgentSession::addData(Data& d, const string& n, bool p) { return impl->addData(d, n, p); } void AgentSession::delData(const DataAddr& a) { impl->delData(a); } @@ -55,11 +179,11 @@ void AgentSession::raiseEvent(const Data& d, int s) { impl->raiseEvent(d, s); } //======================================================================================== AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) : - connection(c), domain("default"), opened(false), eventNotifier(0), thread(0), threadCanceled(false), + connection(c), domain("default"), opened(false), thread(0), threadCanceled(false), bootSequence(1), interval(60), lastHeartbeat(0), lastVisit(0), forceHeartbeat(false), externalStorage(false), autoAllowQueries(true), autoAllowMethods(true), maxSubscriptions(64), minSubInterval(3000), subLifetime(300), publicEvents(true), - listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5), + listenOnDirect(true), strictSecurity(false), schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()))) { // @@ -120,14 +244,7 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) : iter = optMap.find("strict-security"); if (iter != optMap.end()) strictSecurity = iter->second.asBool(); - - iter = optMap.find("max-thread-wait-time"); - if (iter != optMap.end()) - maxThreadWaitTime = iter->second.asUint32(); } - - if (maxThreadWaitTime > interval) - maxThreadWaitTime = interval; } @@ -135,11 +252,6 @@ AgentSessionImpl::~AgentSessionImpl() { if (opened) close(); - - if (thread) { - thread->join(); - delete thread; - } } @@ -148,12 +260,6 @@ void AgentSessionImpl::open() if (opened) throw QmfException("The session is already open"); - // If the thread exists, join and delete it before creating a new one. - if (thread) { - thread->join(); - delete thread; - } - const string addrArgs(";{create:never,node:{type:topic}}"); const string routableAddr("direct-agent.route." + qpid::types::Uuid(true).str()); attributes["_direct_subject"] = routableAddr; @@ -191,26 +297,19 @@ void AgentSessionImpl::open() } -void AgentSessionImpl::closeAsync() +void AgentSessionImpl::close() { if (!opened) return; - // Stop the receiver thread. Don't join it until the destructor is called or open() is called. + // Stop and join the receiver thread threadCanceled = true; - opened = false; -} + thread->join(); + delete thread; - -void AgentSessionImpl::close() -{ - closeAsync(); - - if (thread) { - thread->join(); - delete thread; - thread = 0; - } + // Close the AMQP session + session.close(); + opened = false; } @@ -219,19 +318,13 @@ bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout) uint64_t milliseconds = timeout.getMilliseconds(); qpid::sys::Mutex::ScopedLock l(lock); - if (eventQueue.empty() && milliseconds > 0) { - int64_t nsecs(qpid::sys::TIME_INFINITE); - if ((uint64_t)(nsecs / 1000000) > milliseconds) - nsecs = (int64_t) milliseconds * 1000000; - qpid::sys::Duration then(nsecs); - cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then)); - } + if (eventQueue.empty()) + cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), + qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC))); if (!eventQueue.empty()) { event = eventQueue.front(); eventQueue.pop(); - if (eventQueue.empty()) - alertEventNotifierLH(false); return true; } @@ -239,26 +332,6 @@ bool AgentSessionImpl::nextEvent(AgentEvent& event, Duration timeout) } -int AgentSessionImpl::pendingEvents() const -{ - qpid::sys::Mutex::ScopedLock l(lock); - return eventQueue.size(); -} - - -void AgentSessionImpl::setEventNotifier(EventNotifierImpl* notifier) -{ - qpid::sys::Mutex::ScopedLock l(lock); - eventNotifier = notifier; -} - -EventNotifierImpl* AgentSessionImpl::getEventNotifier() const -{ - qpid::sys::Mutex::ScopedLock l(lock); - return eventNotifier; -} - - void AgentSessionImpl::registerSchema(Schema& schema) { if (!schema.isFinalized()) @@ -514,10 +587,8 @@ void AgentSessionImpl::enqueueEvent(const AgentEvent& event) qpid::sys::Mutex::ScopedLock l(lock); bool notify = eventQueue.empty(); eventQueue.push(event); - if (notify) { + if (notify) cond.notify(); - alertEventNotifierLH(true); - } } @@ -961,13 +1032,6 @@ void AgentSessionImpl::periodicProcessing(uint64_t seconds) } -void AgentSessionImpl::alertEventNotifierLH(bool readable) -{ - if (eventNotifier) - eventNotifier->setReadable(readable); -} - - void AgentSessionImpl::run() { QPID_LOG(debug, "AgentSession thread started for agent " << agentName); @@ -977,7 +1041,7 @@ void AgentSessionImpl::run() periodicProcessing((uint64_t) qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()) / qpid::sys::TIME_SEC); Receiver rx; - bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime); + bool valid = session.nextReceiver(rx, Duration::SECOND); if (threadCanceled) break; if (valid) { @@ -994,19 +1058,6 @@ void AgentSessionImpl::run() enqueueEvent(AgentEvent(new AgentEventImpl(AGENT_THREAD_FAILED))); } - session.close(); QPID_LOG(debug, "AgentSession thread exiting for agent " << agentName); } - -AgentSessionImpl& AgentSessionImplAccess::get(AgentSession& session) -{ - return *session.impl; -} - - -const AgentSessionImpl& AgentSessionImplAccess::get(const AgentSession& session) -{ - return *session.impl; -} - diff --git a/cpp/src/qmf/AgentSessionImpl.h b/cpp/src/qmf/AgentSessionImpl.h deleted file mode 100644 index ae512a4054..0000000000 --- a/cpp/src/qmf/AgentSessionImpl.h +++ /dev/null @@ -1,175 +0,0 @@ -#ifndef __QMF_AGENT_SESSION_IMPL_H -#define __QMF_AGENT_SESSION_IMPL_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "qpid/RefCounted.h" -#include "qmf/PrivateImplRef.h" -#include "qmf/exceptions.h" -#include "qmf/AgentSession.h" -#include "qmf/AgentEventImpl.h" -#include "qmf/EventNotifierImpl.h" -#include "qpid/messaging/Connection.h" -#include "qpid/sys/Runnable.h" -#include "qpid/sys/Mutex.h" -#include "qpid/sys/Condition.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/Runnable.h" -#include "qpid/log/Statement.h" -#include "qpid/messaging/Connection.h" -#include "qpid/messaging/Session.h" -#include "qpid/messaging/Receiver.h" -#include "qpid/messaging/Sender.h" -#include "qpid/messaging/Message.h" -#include "qpid/messaging/AddressParser.h" -#include "qpid/management/Buffer.h" -#include "qpid/RefCounted.h" -#include "qmf/PrivateImplRef.h" -#include "qmf/AgentSession.h" -#include "qmf/exceptions.h" -#include "qmf/AgentSession.h" -#include "qmf/SchemaIdImpl.h" -#include "qmf/SchemaImpl.h" -#include "qmf/DataAddrImpl.h" -#include "qmf/DataImpl.h" -#include "qmf/QueryImpl.h" -#include "qmf/agentCapability.h" -#include "qmf/constants.h" - -#include <queue> -#include <map> -#include <iostream> -#include <memory> - -using namespace std; -using namespace qpid::messaging; -using namespace qmf; -using qpid::types::Variant; - -typedef qmf::PrivateImplRef<AgentSession> PI; - -namespace qmf { - class AgentSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable { - public: - ~AgentSessionImpl(); - - // - // Methods from API handle - // - AgentSessionImpl(Connection& c, const string& o); - void setDomain(const string& d) { checkOpen(); domain = d; } - void setVendor(const string& v) { checkOpen(); attributes["_vendor"] = v; } - void setProduct(const string& p) { checkOpen(); attributes["_product"] = p; } - void setInstance(const string& i) { checkOpen(); attributes["_instance"] = i; } - void setAttribute(const string& k, const qpid::types::Variant& v) { checkOpen(); attributes[k] = v; } - const string& getName() const { return agentName; } - void open(); - void closeAsync(); - void close(); - bool nextEvent(AgentEvent& e, Duration t); - int pendingEvents() const; - - void setEventNotifier(EventNotifierImpl* eventNotifier); - EventNotifierImpl* getEventNotifier() const; - - void registerSchema(Schema& s); - DataAddr addData(Data& d, const string& n, bool persist); - void delData(const DataAddr&); - - void authAccept(AgentEvent& e); - void authReject(AgentEvent& e, const string& m); - void raiseException(AgentEvent& e, const string& s); - void raiseException(AgentEvent& e, const Data& d); - void response(AgentEvent& e, const Data& d); - void complete(AgentEvent& e); - void methodSuccess(AgentEvent& e); - void raiseEvent(const Data& d); - void raiseEvent(const Data& d, int s); - - private: - typedef map<DataAddr, Data, DataAddrCompare> DataIndex; - typedef map<SchemaId, Schema, SchemaIdCompare> SchemaMap; - - mutable qpid::sys::Mutex lock; - qpid::sys::Condition cond; - Connection connection; - Session session; - Sender directSender; - Sender topicSender; - string domain; - Variant::Map attributes; - Variant::Map options; - string agentName; - bool opened; - queue<AgentEvent> eventQueue; - EventNotifierImpl* eventNotifier; - qpid::sys::Thread* thread; - bool threadCanceled; - uint32_t bootSequence; - uint32_t interval; - uint64_t lastHeartbeat; - uint64_t lastVisit; - bool forceHeartbeat; - bool externalStorage; - bool autoAllowQueries; - bool autoAllowMethods; - uint32_t maxSubscriptions; - uint32_t minSubInterval; - uint32_t subLifetime; - bool publicEvents; - bool listenOnDirect; - bool strictSecurity; - uint32_t maxThreadWaitTime; - uint64_t schemaUpdateTime; - string directBase; - string topicBase; - - SchemaMap schemata; - DataIndex globalIndex; - map<SchemaId, DataIndex, SchemaIdCompareNoHash> schemaIndex; - - void checkOpen(); - void setAgentName(); - void enqueueEvent(const AgentEvent&); - void alertEventNotifierLH(bool readable); - void handleLocateRequest(const Variant::List& content, const Message& msg); - void handleMethodRequest(const Variant::Map& content, const Message& msg); - void handleQueryRequest(const Variant::Map& content, const Message& msg); - void handleSchemaRequest(AgentEvent&); - void handleV1SchemaRequest(qpid::management::Buffer&, uint32_t, const Message&); - void dispatch(Message); - void sendHeartbeat(); - void send(Message, const Address&); - void flushResponses(AgentEvent&, bool); - void periodicProcessing(uint64_t); - void run(); - }; - - struct AgentSessionImplAccess { - static AgentSessionImpl& get(AgentSession& session); - static const AgentSessionImpl& get(const AgentSession& session); - }; -} - - -#endif - diff --git a/cpp/src/qmf/ConsoleSession.cpp b/cpp/src/qmf/ConsoleSession.cpp index 2dfc894c58..e12c1152f6 100644 --- a/cpp/src/qmf/ConsoleSession.cpp +++ b/cpp/src/qmf/ConsoleSession.cpp @@ -54,7 +54,6 @@ void ConsoleSession::setAgentFilter(const string& f) { impl->setAgentFilter(f); void ConsoleSession::open() { impl->open(); } void ConsoleSession::close() { impl->close(); } bool ConsoleSession::nextEvent(ConsoleEvent& e, Duration t) { return impl->nextEvent(e, t); } -int ConsoleSession::pendingEvents() const { return impl->pendingEvents(); } uint32_t ConsoleSession::getAgentCount() const { return impl->getAgentCount(); } Agent ConsoleSession::getAgent(uint32_t i) const { return impl->getAgent(i); } Agent ConsoleSession::getConnectedBrokerAgent() const { return impl->getConnectedBrokerAgent(); } @@ -66,9 +65,9 @@ Subscription ConsoleSession::subscribe(const string& q, const string& f, const s //======================================================================================== ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) : - connection(c), domain("default"), maxAgentAgeMinutes(5), listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5), - opened(false), eventNotifier(0), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0), - connectedBrokerInAgentList(false), schemaCache(new SchemaCache()), nextCorrelator(1) + connection(c), domain("default"), maxAgentAgeMinutes(5), + opened(false), thread(0), threadCanceled(false), lastVisit(0), lastAgePass(0), + connectedBrokerInAgentList(false), schemaCache(new SchemaCache()) { if (!options.empty()) { qpid::messaging::AddressParser parser(options); @@ -92,14 +91,7 @@ ConsoleSessionImpl::ConsoleSessionImpl(Connection& c, const string& options) : iter = optMap.find("strict-security"); if (iter != optMap.end()) strictSecurity = iter->second.asBool(); - - iter = optMap.find("max-thread-wait-time"); - if (iter != optMap.end()) - maxThreadWaitTime = iter->second.asUint32(); } - - if (maxThreadWaitTime > 60) - maxThreadWaitTime = 60; } @@ -107,11 +99,6 @@ ConsoleSessionImpl::~ConsoleSessionImpl() { if (opened) close(); - - if (thread) { - thread->join(); - delete thread; - } } @@ -166,12 +153,6 @@ void ConsoleSessionImpl::open() if (opened) throw QmfException("The session is already open"); - // If the thread exists, join and delete it before creating a new one. - if (thread) { - thread->join(); - delete thread; - } - // Establish messaging addresses directBase = "qmf." + domain + ".direct"; topicBase = "qmf." + domain + ".topic"; @@ -200,36 +181,30 @@ void ConsoleSessionImpl::open() // Start the receiver thread threadCanceled = false; - opened = true; thread = new qpid::sys::Thread(*this); // Send an agent_locate to direct address 'broker' to identify the connected-broker-agent. sendBrokerLocate(); if (agentQuery) sendAgentLocate(); + + opened = true; } -void ConsoleSessionImpl::closeAsync() +void ConsoleSessionImpl::close() { if (!opened) throw QmfException("The session is already closed"); - // Stop the receiver thread. Don't join it until the destructor is called or open() is called. + // Stop and join the receiver thread threadCanceled = true; - opened = false; -} - + thread->join(); + delete thread; -void ConsoleSessionImpl::close() -{ - closeAsync(); - - if (thread) { - thread->join(); - delete thread; - thread = 0; - } + // Close the AMQP session + session.close(); + opened = false; } @@ -238,19 +213,13 @@ bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout) uint64_t milliseconds = timeout.getMilliseconds(); qpid::sys::Mutex::ScopedLock l(lock); - if (eventQueue.empty() && milliseconds > 0) { - int64_t nsecs(qpid::sys::TIME_INFINITE); - if ((uint64_t)(nsecs / 1000000) > milliseconds) - nsecs = (int64_t) milliseconds * 1000000; - qpid::sys::Duration then(nsecs); - cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), then)); - } + if (eventQueue.empty()) + cond.wait(lock, qpid::sys::AbsTime(qpid::sys::now(), + qpid::sys::Duration(milliseconds * qpid::sys::TIME_MSEC))); if (!eventQueue.empty()) { event = eventQueue.front(); eventQueue.pop(); - if (eventQueue.empty()) - alertEventNotifierLH(false); return true; } @@ -258,27 +227,6 @@ bool ConsoleSessionImpl::nextEvent(ConsoleEvent& event, Duration timeout) } -int ConsoleSessionImpl::pendingEvents() const -{ - qpid::sys::Mutex::ScopedLock l(lock); - return eventQueue.size(); -} - - -void ConsoleSessionImpl::setEventNotifier(EventNotifierImpl* notifier) -{ - qpid::sys::Mutex::ScopedLock l(lock); - eventNotifier = notifier; -} - - -EventNotifierImpl* ConsoleSessionImpl::getEventNotifier() const -{ - qpid::sys::Mutex::ScopedLock l(lock); - return eventNotifier; -} - - uint32_t ConsoleSessionImpl::getAgentCount() const { qpid::sys::Mutex::ScopedLock l(lock); @@ -320,10 +268,8 @@ void ConsoleSessionImpl::enqueueEventLH(const ConsoleEvent& event) { bool notify = eventQueue.empty(); eventQueue.push(event); - if (notify) { + if (notify) cond.notify(); - alertEventNotifierLH(true); - } } @@ -475,23 +421,7 @@ void ConsoleSessionImpl::handleAgentUpdate(const string& agentName, const Varian iter = content.find("_values"); if (iter == content.end()) return; - const Variant::Map& in_attrs(iter->second.asMap()); - Variant::Map attrs; - - // - // Copy the map from the message to "attrs". Translate any old-style - // keys to their new key values in the process. - // - for (iter = in_attrs.begin(); iter != in_attrs.end(); iter++) { - if (iter->first == "epoch") - attrs[protocol::AGENT_ATTR_EPOCH] = iter->second; - else if (iter->first == "timestamp") - attrs[protocol::AGENT_ATTR_TIMESTAMP] = iter->second; - else if (iter->first == "heartbeat_interval") - attrs[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = iter->second; - else - attrs[iter->first] = iter->second; - } + Variant::Map attrs(iter->second.asMap()); iter = attrs.find(protocol::AGENT_ATTR_EPOCH); if (iter != attrs.end()) @@ -632,13 +562,6 @@ void ConsoleSessionImpl::periodicProcessing(uint64_t seconds) } -void ConsoleSessionImpl::alertEventNotifierLH(bool readable) -{ - if (eventNotifier) - eventNotifier->setReadable(readable); -} - - void ConsoleSessionImpl::run() { QPID_LOG(debug, "ConsoleSession thread started"); @@ -649,7 +572,7 @@ void ConsoleSessionImpl::run() qpid::sys::TIME_SEC); Receiver rx; - bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime); + bool valid = session.nextReceiver(rx, Duration::SECOND); if (threadCanceled) break; if (valid) { @@ -666,18 +589,6 @@ void ConsoleSessionImpl::run() enqueueEvent(ConsoleEvent(new ConsoleEventImpl(CONSOLE_THREAD_FAILED))); } - session.close(); QPID_LOG(debug, "ConsoleSession thread exiting"); } - -ConsoleSessionImpl& ConsoleSessionImplAccess::get(ConsoleSession& session) -{ - return *session.impl; -} - - -const ConsoleSessionImpl& ConsoleSessionImplAccess::get(const ConsoleSession& session) -{ - return *session.impl; -} diff --git a/cpp/src/qmf/ConsoleSessionImpl.h b/cpp/src/qmf/ConsoleSessionImpl.h index e2b30602fa..675c8bcfb5 100644 --- a/cpp/src/qmf/ConsoleSessionImpl.h +++ b/cpp/src/qmf/ConsoleSessionImpl.h @@ -27,7 +27,6 @@ #include "qmf/SchemaId.h" #include "qmf/Schema.h" #include "qmf/ConsoleEventImpl.h" -#include "qmf/EventNotifierImpl.h" #include "qmf/SchemaCache.h" #include "qmf/Query.h" #include "qpid/sys/Mutex.h" @@ -42,13 +41,9 @@ #include "qpid/messaging/Address.h" #include "qpid/management/Buffer.h" #include "qpid/types/Variant.h" - -#include <boost/shared_ptr.hpp> #include <map> #include <queue> -using namespace std; - namespace qmf { class ConsoleSessionImpl : public virtual qpid::RefCounted, public qpid::sys::Runnable { public: @@ -61,14 +56,8 @@ namespace qmf { void setDomain(const std::string& d) { domain = d; } void setAgentFilter(const std::string& f); void open(); - void closeAsync(); void close(); bool nextEvent(ConsoleEvent& e, qpid::messaging::Duration t); - int pendingEvents() const; - - void setEventNotifier(EventNotifierImpl* notifier); - EventNotifierImpl* getEventNotifier() const; - uint32_t getAgentCount() const; Agent getAgent(uint32_t i) const; Agent getConnectedBrokerAgent() const { return connectedBrokerAgent; } @@ -86,11 +75,9 @@ namespace qmf { uint32_t maxAgentAgeMinutes; bool listenOnDirect; bool strictSecurity; - uint32_t maxThreadWaitTime; Query agentQuery; bool opened; std::queue<ConsoleEvent> eventQueue; - EventNotifierImpl* eventNotifier; qpid::sys::Thread* thread; bool threadCanceled; uint64_t lastVisit; @@ -102,8 +89,6 @@ namespace qmf { std::string directBase; std::string topicBase; boost::shared_ptr<SchemaCache> schemaCache; - qpid::sys::Mutex corrlock; - uint32_t nextCorrelator; void enqueueEvent(const ConsoleEvent&); void enqueueEventLH(const ConsoleEvent&); @@ -113,17 +98,10 @@ namespace qmf { void handleAgentUpdate(const std::string&, const qpid::types::Variant::Map&, const qpid::messaging::Message&); void handleV1SchemaResponse(qpid::management::Buffer&, uint32_t, const qpid::messaging::Message&); void periodicProcessing(uint64_t); - void alertEventNotifierLH(bool readable); void run(); - uint32_t correlator() { qpid::sys::Mutex::ScopedLock l(corrlock); return nextCorrelator++; } friend class AgentImpl; }; - - struct ConsoleSessionImplAccess { - static ConsoleSessionImpl& get(ConsoleSession& session); - static const ConsoleSessionImpl& get(const ConsoleSession& session); - }; } #endif diff --git a/cpp/src/qmf/DataAddr.cpp b/cpp/src/qmf/DataAddr.cpp index d16e12062e..fb51d5787f 100644 --- a/cpp/src/qmf/DataAddr.cpp +++ b/cpp/src/qmf/DataAddr.cpp @@ -36,9 +36,7 @@ DataAddr::~DataAddr() { PI::dtor(*this); } DataAddr& DataAddr::operator=(const DataAddr& s) { return PI::assign(*this, s); } bool DataAddr::operator==(const DataAddr& o) { return *impl == *o.impl; } -bool DataAddr::operator==(const DataAddr& o) const { return *impl == *o.impl; } bool DataAddr::operator<(const DataAddr& o) { return *impl < *o.impl; } -bool DataAddr::operator<(const DataAddr& o) const { return *impl < *o.impl; } DataAddr::DataAddr(const qpid::types::Variant::Map& m) { PI::ctor(*this, new DataAddrImpl(m)); } DataAddr::DataAddr(const string& n, const string& a, uint32_t e) { PI::ctor(*this, new DataAddrImpl(n, a, e)); } @@ -47,7 +45,7 @@ const string& DataAddr::getAgentName() const { return impl->getAgentName(); } uint32_t DataAddr::getAgentEpoch() const { return impl->getAgentEpoch(); } Variant::Map DataAddr::asMap() const { return impl->asMap(); } -bool DataAddrImpl::operator==(const DataAddrImpl& other) const +bool DataAddrImpl::operator==(const DataAddrImpl& other) { return agentName == other.agentName && @@ -56,7 +54,7 @@ bool DataAddrImpl::operator==(const DataAddrImpl& other) const } -bool DataAddrImpl::operator<(const DataAddrImpl& other) const +bool DataAddrImpl::operator<(const DataAddrImpl& other) { if (agentName < other.agentName) return true; if (agentName > other.agentName) return false; diff --git a/cpp/src/qmf/DataAddrImpl.h b/cpp/src/qmf/DataAddrImpl.h index 11d512f0c4..3f9cae9453 100644 --- a/cpp/src/qmf/DataAddrImpl.h +++ b/cpp/src/qmf/DataAddrImpl.h @@ -38,8 +38,8 @@ namespace qmf { // // Methods from API handle // - bool operator==(const DataAddrImpl&) const; - bool operator<(const DataAddrImpl&) const; + bool operator==(const DataAddrImpl&); + bool operator<(const DataAddrImpl&); DataAddrImpl(const qpid::types::Variant::Map&); DataAddrImpl(const std::string& _name, const std::string& _agentName, uint32_t _agentEpoch=0) : agentName(_agentName), name(_name), agentEpoch(_agentEpoch) {} diff --git a/cpp/src/qmf/EventNotifierImpl.cpp b/cpp/src/qmf/EventNotifierImpl.cpp deleted file mode 100644 index 20114aaa5e..0000000000 --- a/cpp/src/qmf/EventNotifierImpl.cpp +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "qmf/EventNotifierImpl.h" -#include "qmf/AgentSessionImpl.h" -#include "qmf/ConsoleSessionImpl.h" - -EventNotifierImpl::EventNotifierImpl(AgentSession& agentSession) - : readable(false), agent(agentSession) -{ - AgentSessionImplAccess::get(agent).setEventNotifier(this); -} - - -EventNotifierImpl::EventNotifierImpl(ConsoleSession& consoleSession) - : readable(false), console(consoleSession) -{ - ConsoleSessionImplAccess::get(console).setEventNotifier(this); -} - - -EventNotifierImpl::~EventNotifierImpl() -{ - if (agent.isValid()) - AgentSessionImplAccess::get(agent).setEventNotifier(NULL); - if (console.isValid()) - ConsoleSessionImplAccess::get(console).setEventNotifier(NULL); -} - -void EventNotifierImpl::setReadable(bool readable) -{ - update(readable); - this->readable = readable; -} - - -bool EventNotifierImpl::isReadable() const -{ - return this->readable; -} diff --git a/cpp/src/qmf/EventNotifierImpl.h b/cpp/src/qmf/EventNotifierImpl.h deleted file mode 100644 index d85f9979d2..0000000000 --- a/cpp/src/qmf/EventNotifierImpl.h +++ /dev/null @@ -1,48 +0,0 @@ -#ifndef __QMF_EVENT_NOTIFIER_IMPL_H -#define __QMF_EVENT_NOTIFIER_IMPL_H - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "qmf/AgentSession.h" -#include "qmf/ConsoleSession.h" - -namespace qmf -{ - class EventNotifierImpl { - private: - bool readable; - AgentSession agent; - ConsoleSession console; - - public: - EventNotifierImpl(AgentSession& agentSession); - EventNotifierImpl(ConsoleSession& consoleSession); - virtual ~EventNotifierImpl(); - - void setReadable(bool readable); - bool isReadable() const; - - protected: - virtual void update(bool readable) = 0; - }; -} - -#endif - diff --git a/cpp/src/qmf/PosixEventNotifier.cpp b/cpp/src/qmf/PosixEventNotifier.cpp deleted file mode 100644 index a364cc155d..0000000000 --- a/cpp/src/qmf/PosixEventNotifier.cpp +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "qmf/posix/EventNotifier.h" -#include "qmf/PosixEventNotifierImpl.h" -#include "qmf/PrivateImplRef.h" - -using namespace qmf; -using namespace std; - -typedef qmf::PrivateImplRef<posix::EventNotifier> PI; - -posix::EventNotifier::EventNotifier(PosixEventNotifierImpl* impl) { PI::ctor(*this, impl); } - -posix::EventNotifier::EventNotifier(AgentSession& agentSession) -{ - PI::ctor(*this, new PosixEventNotifierImpl(agentSession)); -} - - -posix::EventNotifier::EventNotifier(ConsoleSession& consoleSession) -{ - PI::ctor(*this, new PosixEventNotifierImpl(consoleSession)); -} - - -posix::EventNotifier::EventNotifier(const posix::EventNotifier& that) - : Handle<PosixEventNotifierImpl>() -{ - PI::copy(*this, that); -} - - -posix::EventNotifier::~EventNotifier() -{ - PI::dtor(*this); -} - -posix::EventNotifier& posix::EventNotifier::operator=(const posix::EventNotifier& that) -{ - return PI::assign(*this, that); -} - - -int posix::EventNotifier::getHandle() const -{ - return impl->getHandle(); -} - diff --git a/cpp/src/qmf/PosixEventNotifierImpl.cpp b/cpp/src/qmf/PosixEventNotifierImpl.cpp deleted file mode 100644 index 011dbcc214..0000000000 --- a/cpp/src/qmf/PosixEventNotifierImpl.cpp +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "PosixEventNotifierImpl.h" -#include "qpid/log/Statement.h" - -#include <fcntl.h> -#include <unistd.h> -#include <errno.h> - -#define BUFFER_SIZE 10 - -using namespace qmf; - -PosixEventNotifierImpl::PosixEventNotifierImpl(AgentSession& agentSession) - : EventNotifierImpl(agentSession) -{ - openHandle(); -} - - -PosixEventNotifierImpl::PosixEventNotifierImpl(ConsoleSession& consoleSession) - : EventNotifierImpl(consoleSession) -{ - openHandle(); -} - - -PosixEventNotifierImpl::~PosixEventNotifierImpl() -{ - closeHandle(); -} - - -void PosixEventNotifierImpl::update(bool readable) -{ - char buffer[BUFFER_SIZE]; - - if(readable && !this->isReadable()) { - if (::write(myHandle, "1", 1) == -1) - QPID_LOG(error, "PosixEventNotifierImpl::update write failed: " << errno); - } - else if(!readable && this->isReadable()) { - if (::read(yourHandle, buffer, BUFFER_SIZE) == -1) - QPID_LOG(error, "PosixEventNotifierImpl::update read failed: " << errno); - } -} - - -void PosixEventNotifierImpl::openHandle() -{ - int pair[2]; - - if(::pipe(pair) == -1) - throw QmfException("Unable to open event notifier handle."); - - yourHandle = pair[0]; - myHandle = pair[1]; - - int flags; - - flags = ::fcntl(yourHandle, F_GETFL); - if((::fcntl(yourHandle, F_SETFL, flags | O_NONBLOCK)) == -1) - throw QmfException("Unable to make remote handle non-blocking."); - - flags = ::fcntl(myHandle, F_GETFL); - if((::fcntl(myHandle, F_SETFL, flags | O_NONBLOCK)) == -1) - throw QmfException("Unable to make local handle non-blocking."); -} - - -void PosixEventNotifierImpl::closeHandle() -{ - if(myHandle > 0) { - ::close(myHandle); - myHandle = -1; - } - - if(yourHandle > 0) { - ::close(yourHandle); - yourHandle = -1; - } -} - - -PosixEventNotifierImpl& PosixEventNotifierImplAccess::get(posix::EventNotifier& notifier) -{ - return *notifier.impl; -} - - -const PosixEventNotifierImpl& PosixEventNotifierImplAccess::get(const posix::EventNotifier& notifier) -{ - return *notifier.impl; -} - diff --git a/cpp/src/qmf/PosixEventNotifierImpl.h b/cpp/src/qmf/PosixEventNotifierImpl.h deleted file mode 100644 index c8a7446bd5..0000000000 --- a/cpp/src/qmf/PosixEventNotifierImpl.h +++ /dev/null @@ -1,61 +0,0 @@ -#ifndef __QMF_POSIX_EVENT_NOTIFIER_IMPL_H -#define __QMF_POSIX_EVENT_NOTIFIER_IMPL_H - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "qmf/posix/EventNotifier.h" -#include "qmf/EventNotifierImpl.h" -#include "qpid/RefCounted.h" - -namespace qmf -{ - class AgentSession; - class ConsoleSession; - - class PosixEventNotifierImpl : public EventNotifierImpl, public virtual qpid::RefCounted - { - public: - PosixEventNotifierImpl(AgentSession& agentSession); - PosixEventNotifierImpl(ConsoleSession& consoleSession); - virtual ~PosixEventNotifierImpl(); - - int getHandle() const { return yourHandle; } - - private: - int myHandle; - int yourHandle; - - void openHandle(); - void closeHandle(); - - protected: - void update(bool readable); - }; - - struct PosixEventNotifierImplAccess - { - static PosixEventNotifierImpl& get(posix::EventNotifier& notifier); - static const PosixEventNotifierImpl& get(const posix::EventNotifier& notifier); - }; - -} - -#endif - diff --git a/cpp/src/qmf/PrivateImplRef.h b/cpp/src/qmf/PrivateImplRef.h index 960cbb2e09..8b698c4199 100644 --- a/cpp/src/qmf/PrivateImplRef.h +++ b/cpp/src/qmf/PrivateImplRef.h @@ -23,8 +23,8 @@ */ #include "qmf/ImportExport.h" -#include "qpid/RefCounted.h" #include <boost/intrusive_ptr.hpp> +#include "qpid/RefCounted.h" namespace qmf { diff --git a/cpp/src/qmf/engine/ResilientConnection.cpp b/cpp/src/qmf/engine/ResilientConnection.cpp index 41dd9ff00c..ab65b8d768 100644 --- a/cpp/src/qmf/engine/ResilientConnection.cpp +++ b/cpp/src/qmf/engine/ResilientConnection.cpp @@ -334,7 +334,8 @@ void ResilientConnectionImpl::notify() { if (notifyFd != -1) { - (void) ::write(notifyFd, ".", 1); + int unused_ret; //Suppress warnings about ignoring return value. + unused_ret = ::write(notifyFd, ".", 1); } } @@ -431,7 +432,8 @@ void ResilientConnectionImpl::EnqueueEvent(ResilientConnectionEvent::EventKind k if (notifyFd != -1) { - (void) ::write(notifyFd, ".", 1); + int unused_ret; //Suppress warnings about ignoring return value. + unused_ret = ::write(notifyFd, ".", 1); } } diff --git a/cpp/src/qmf/engine/SchemaImpl.cpp b/cpp/src/qmf/engine/SchemaImpl.cpp index 9d363d3012..e0948a9911 100644 --- a/cpp/src/qmf/engine/SchemaImpl.cpp +++ b/cpp/src/qmf/engine/SchemaImpl.cpp @@ -35,17 +35,17 @@ using qpid::framing::Uuid; SchemaHash::SchemaHash() { for (int idx = 0; idx < 16; idx++) - hash.b[idx] = 0x5A; + hash[idx] = 0x5A; } void SchemaHash::encode(Buffer& buffer) const { - buffer.putBin128(hash.b); + buffer.putBin128(hash); } void SchemaHash::decode(Buffer& buffer) { - buffer.getBin128(hash.b); + buffer.getBin128(hash); } void SchemaHash::update(uint8_t data) @@ -55,8 +55,9 @@ void SchemaHash::update(uint8_t data) void SchemaHash::update(const char* data, uint32_t len) { - uint64_t* first = &hash.q[0]; - uint64_t* second = &hash.q[1]; + uint64_t* first = (uint64_t*) hash; + uint64_t* second = (uint64_t*) hash + 1; + for (uint32_t idx = 0; idx < len; idx++) { *first = *first ^ (uint64_t) data[idx]; *second = *second << 1; diff --git a/cpp/src/qmf/engine/SchemaImpl.h b/cpp/src/qmf/engine/SchemaImpl.h index 683fb6f8f0..8b079a5ec6 100644 --- a/cpp/src/qmf/engine/SchemaImpl.h +++ b/cpp/src/qmf/engine/SchemaImpl.h @@ -35,10 +35,7 @@ namespace engine { // they've been registered. class SchemaHash { - union h { - uint8_t b[16]; - uint64_t q[2]; - } hash; + uint8_t hash[16]; public: SchemaHash(); void encode(qpid::framing::Buffer& buffer) const; @@ -50,7 +47,7 @@ namespace engine { void update(Direction d) { update((uint8_t) d); } void update(Access a) { update((uint8_t) a); } void update(bool b) { update((uint8_t) (b ? 1 : 0)); } - const uint8_t* get() const { return hash.b; } + const uint8_t* get() const { return hash; } bool operator==(const SchemaHash& other) const; bool operator<(const SchemaHash& other) const; bool operator>(const SchemaHash& other) const; |
