diff options
| author | Ted Ross <tross@apache.org> | 2010-04-09 17:19:32 +0000 |
|---|---|---|
| committer | Ted Ross <tross@apache.org> | 2010-04-09 17:19:32 +0000 |
| commit | 3e19793c8f5be1676ea6f7577196f1b01cadf685 (patch) | |
| tree | 5d36d56405507f9c68736a4299f21dea294b99f5 /cpp/src/qpid | |
| parent | fcfff56e615c4054d52dc510c9cd1d1103249dce (diff) | |
| download | qpid-python-3e19793c8f5be1676ea6f7577196f1b01cadf685.tar.gz | |
QPID-2489 - Added wrapped version of Mutex to isolate QMF-generated source from boost.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@932517 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
| -rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.cpp | 54 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementAgent.cpp | 42 | ||||
| -rw-r--r-- | cpp/src/qpid/management/ManagementObject.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/management/Mutex.cpp | 29 |
4 files changed, 78 insertions, 49 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 42bc36c4b8..bade1d2826 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -46,7 +46,7 @@ using qpid::amqp_0_10::MapCodec; using qpid::amqp_0_10::ListCodec; namespace { - Mutex lock; + qpid::sys::Mutex lock; bool disabled = false; ManagementAgent* agent = 0; int refCount = 0; @@ -54,7 +54,7 @@ namespace { ManagementAgent::Singleton::Singleton(bool disableManagement) { - Mutex::ScopedLock _lock(lock); + sys::Mutex::ScopedLock _lock(lock); if (disableManagement && !disabled) { disabled = true; assert(refCount == 0); // can't disable after agent has been allocated @@ -66,7 +66,7 @@ ManagementAgent::Singleton::Singleton(bool disableManagement) ManagementAgent::Singleton::~Singleton() { - Mutex::ScopedLock _lock(lock); + sys::Mutex::ScopedLock _lock(lock); refCount--; if (refCount == 0 && !disabled) { delete agent; @@ -103,7 +103,7 @@ ManagementAgentImpl::~ManagementAgentImpl() // Release the memory associated with stored management objects. { - Mutex::ScopedLock lock(agentLock); + sys::Mutex::ScopedLock lock(agentLock); moveNewObjectsLH(); for (ManagementObjectMap::iterator iter = managementObjects.begin (); @@ -186,7 +186,7 @@ void ManagementAgentImpl::registerClass(const string& packageName, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall) { - Mutex::ScopedLock lock(agentLock); + sys::Mutex::ScopedLock lock(agentLock); PackageMap::iterator pIter = findOrAddPackage(packageName); addClassLocal(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall); } @@ -196,7 +196,7 @@ void ManagementAgentImpl::registerEvent(const string& packageName, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall) { - Mutex::ScopedLock lock(agentLock); + sys::Mutex::ScopedLock lock(agentLock); PackageMap::iterator pIter = findOrAddPackage(packageName); addClassLocal(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall); } @@ -218,7 +218,7 @@ ObjectId ManagementAgentImpl::addObject(ManagementObject* object, const std::string& key, bool persistent) { - Mutex::ScopedLock lock(addLock); + sys::Mutex::ScopedLock lock(addLock); uint16_t sequence = persistent ? 0 : bootSequence; @@ -236,7 +236,7 @@ ObjectId ManagementAgentImpl::addObject(ManagementObject* object, void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t severity) { - Mutex::ScopedLock lock(agentLock); + sys::Mutex::ScopedLock lock(agentLock); Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE); uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity; stringstream key; @@ -269,7 +269,7 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit) { - Mutex::ScopedLock lock(agentLock); + sys::Mutex::ScopedLock lock(agentLock); if (inCallback) { QPID_LOG(critical, "pollCallbacks invoked from the agent's thread!"); @@ -283,7 +283,7 @@ uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit) QueuedMethod* item = methodQueue.front(); methodQueue.pop_front(); { - Mutex::ScopedUnlock unlock(agentLock); + sys::Mutex::ScopedUnlock unlock(agentLock); invokeMethodRequest(item->body, item->cid, item->replyTo); delete item; } @@ -309,14 +309,14 @@ int ManagementAgentImpl::getSignalFd() void ManagementAgentImpl::setSignalCallback(cb_t callback, void* context) { - Mutex::ScopedLock lock(agentLock); + sys::Mutex::ScopedLock lock(agentLock); notifyCallback = callback; notifyContext = context; } void ManagementAgentImpl::setSignalCallback(Notifyable& _notifyable) { - Mutex::ScopedLock lock(agentLock); + sys::Mutex::ScopedLock lock(agentLock); notifyable = &_notifyable; } @@ -407,7 +407,7 @@ void ManagementAgentImpl::sendException(const string& replyToKey, const string& void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo) { - Mutex::ScopedLock lock(agentLock); + sys::Mutex::ScopedLock lock(agentLock); string packageName; SchemaClassKey key; @@ -441,7 +441,7 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc void ManagementAgentImpl::handleConsoleAddedIndication() { - Mutex::ScopedLock lock(agentLock); + sys::Mutex::ScopedLock lock(agentLock); clientWasAdded = true; QPID_LOG(trace, "RCVD ConsoleAddedInd"); @@ -680,7 +680,7 @@ void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo); { - Mutex::ScopedLock lock(agentLock); + sys::Mutex::ScopedLock lock(agentLock); clientWasAdded = true; } } @@ -688,7 +688,7 @@ void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid, void ManagementAgentImpl::handleMethodRequest(const string& body, const string& cid, const string& replyTo) { if (extThread) { - Mutex::ScopedLock lock(agentLock); + sys::Mutex::ScopedLock lock(agentLock); methodQueue.push_back(new QueuedMethod(cid, replyTo, body)); if (pipeHandle != 0) { @@ -696,14 +696,14 @@ void ManagementAgentImpl::handleMethodRequest(const string& body, const string& } else if (notifyable != 0) { inCallback = true; { - Mutex::ScopedUnlock unlock(agentLock); + sys::Mutex::ScopedUnlock unlock(agentLock); notifyable->notify(); } inCallback = false; } else if (notifyCallback != 0) { inCallback = true; { - Mutex::ScopedUnlock unlock(agentLock); + sys::Mutex::ScopedUnlock unlock(agentLock); notifyCallback(notifyContext); } inCallback = false; @@ -820,7 +820,7 @@ ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::findOrAddPackage( void ManagementAgentImpl::moveNewObjectsLH() { - Mutex::ScopedLock lock(addLock); + sys::Mutex::ScopedLock lock(addLock); for (ManagementObjectMap::iterator iter = newManagementObjects.begin(); iter != newManagementObjects.end(); iter++) @@ -872,7 +872,7 @@ void ManagementAgentImpl::encodeClassIndication(Buffer& buf, void ManagementAgentImpl::periodicProcessing() { - Mutex::ScopedLock lock(agentLock); + sys::Mutex::ScopedLock lock(agentLock); list<pair<ObjectId, ManagementObject*> > deleteList; if (!connected) @@ -1007,14 +1007,14 @@ void ManagementAgentImpl::ConnectionThread::run() subscriptions->subscribe(agent, queueName.str(), dest); QPID_LOG(info, "Connection established with broker"); { - Mutex::ScopedLock _lock(connLock); + sys::Mutex::ScopedLock _lock(connLock); if (shutdown) return; operational = true; agent.connected = true; agent.startProtocol(); try { - Mutex::ScopedUnlock _unlock(connLock); + sys::Mutex::ScopedUnlock _unlock(connLock); subscriptions->run(); } catch (exception) {} @@ -1039,13 +1039,13 @@ void ManagementAgentImpl::ConnectionThread::run() // sleep for "delay" seconds, but peridically check if the // agent is shutting down so we don't hang for up to delayMax // seconds during agent shutdown - Mutex::ScopedLock _lock(connLock); + sys::Mutex::ScopedLock _lock(connLock); if (shutdown) return; sleeping = true; int totalSleep = 0; do { - Mutex::ScopedUnlock _unlock(connLock); + sys::Mutex::ScopedUnlock _unlock(connLock); ::sleep(delayMin); totalSleep += delayMin; } while (totalSleep < delay && !shutdown); @@ -1109,7 +1109,7 @@ void ManagementAgentImpl::ConnectionThread::sendMessage(Message msg, { ConnectionThread::shared_ptr s; { - Mutex::ScopedLock _lock(connLock); + sys::Mutex::ScopedLock _lock(connLock); if (!operational) return; s = subscriptions; @@ -1142,7 +1142,7 @@ void ManagementAgentImpl::ConnectionThread::close() { ConnectionThread::shared_ptr s; { - Mutex::ScopedLock _lock(connLock); + sys::Mutex::ScopedLock _lock(connLock); shutdown = true; s = subscriptions; } @@ -1152,7 +1152,7 @@ void ManagementAgentImpl::ConnectionThread::close() bool ManagementAgentImpl::ConnectionThread::isSleeping() const { - Mutex::ScopedLock _lock(connLock); + sys::Mutex::ScopedLock _lock(connLock); return sleeping; } diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp index 4158b685ce..18cd0cdfee 100644 --- a/cpp/src/qpid/management/ManagementAgent.cpp +++ b/cpp/src/qpid/management/ManagementAgent.cpp @@ -44,10 +44,10 @@ using qpid::framing::Uuid; using qpid::types::Variant; using qpid::amqp_0_10::MapCodec; using qpid::amqp_0_10::ListCodec; +using qpid::sys::Mutex; using namespace qpid::framing; using namespace qpid::management; using namespace qpid::broker; -using namespace qpid::sys; using namespace qpid; using namespace std; namespace _qmf = qmf::org::apache::qpid::broker; @@ -80,7 +80,7 @@ ManagementAgent::RemoteAgent::~RemoteAgent () ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) : threadPoolSize(1), interval(10), broker(0), timer(0), - startTime(uint64_t(Duration(now()))), + startTime(uint64_t(sys::Duration(sys::now()))), suppressed(false), qmf1Support(qmfV1), qmf2Support(qmfV2) { @@ -95,7 +95,7 @@ ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) : ManagementAgent::~ManagementAgent () { { - Mutex::ScopedLock lock (userLock); + sys::Mutex::ScopedLock lock (userLock); // Reset the shared pointers to exchanges. If this is not done now, the exchanges // will stick around until dExchange and mExchange are implicitly destroyed (long @@ -231,7 +231,7 @@ void ManagementAgent::registerClass (const string& packageName, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall) { - Mutex::ScopedLock lock(userLock); + sys::Mutex::ScopedLock lock(userLock); PackageMap::iterator pIter = findOrAddPackageLH(packageName); addClassLH(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall); } @@ -241,7 +241,7 @@ void ManagementAgent::registerEvent (const string& packageName, uint8_t* md5Sum, ManagementObject::writeSchemaCall_t schemaCall) { - Mutex::ScopedLock lock(userLock); + sys::Mutex::ScopedLock lock(userLock); PackageMap::iterator pIter = findOrAddPackageLH(packageName); addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall); } @@ -266,7 +266,7 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId object->setObjectId(objId); { - Mutex::ScopedLock lock (addLock); + sys::Mutex::ScopedLock lock (addLock); ManagementObjectMap::iterator destIter = newManagementObjects.find(objId); if (destIter != newManagementObjects.end()) { if (destIter->second->isDeleted()) { @@ -304,7 +304,7 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, object->setObjectId(objId); { - Mutex::ScopedLock lock (addLock); + sys::Mutex::ScopedLock lock (addLock); ManagementObjectMap::iterator destIter = newManagementObjects.find(objId); if (destIter != newManagementObjects.end()) { if (destIter->second->isDeleted()) { @@ -324,7 +324,7 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severity) { - Mutex::ScopedLock lock (userLock); + sys::Mutex::ScopedLock lock (userLock); uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity; if (qmf1Support) { @@ -335,7 +335,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi outBuffer.putShortString(event.getPackageName()); outBuffer.putShortString(event.getEventName()); outBuffer.putBin128(event.getMd5Sum()); - outBuffer.putLongLong(uint64_t(Duration(now()))); + outBuffer.putLongLong(uint64_t(sys::Duration(sys::now()))); outBuffer.putOctet(sev); std::string sBuf; event.encode(sBuf); @@ -359,7 +359,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi event.getMd5Sum()); event.mapEncode(values); map_["_values"] = values; - map_["_timestamp"] = uint64_t(Duration(now())); + map_["_timestamp"] = uint64_t(sys::Duration(sys::now())); map_["_severity"] = sev; headers["method"] = "indication"; @@ -379,7 +379,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi } ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds) - : TimerTask (qpid::sys::Duration((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC), + : TimerTask (sys::Duration((_seconds ? _seconds : 1) * sys::TIME_SEC), "ManagementAgent::periodicProcessing"), agent(_agent) {} @@ -538,7 +538,7 @@ void ManagementAgent::sendBuffer(const std::string& data, void ManagementAgent::moveNewObjectsLH() { - Mutex::ScopedLock lock (addLock); + sys::Mutex::ScopedLock lock (addLock); for (ManagementObjectMap::iterator iter = newManagementObjects.begin (); iter != newManagementObjects.end (); iter++) { @@ -573,14 +573,14 @@ void ManagementAgent::periodicProcessing (void) #define BUFSIZE 65536 #define HEADROOM 4096 QPID_LOG(trace, "Management agent periodic processing"); - Mutex::ScopedLock lock (userLock); + sys::Mutex::ScopedLock lock (userLock); char msgChars[BUFSIZE]; uint32_t contentSize; string routingKey; list<pair<ObjectId, ManagementObject*> > deleteList; std::string sBuf; - uint64_t uptime = uint64_t(Duration(now())) - startTime; + uint64_t uptime = uint64_t(sys::Duration(sys::now())) - startTime; static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime); moveNewObjectsLH(); @@ -779,7 +779,7 @@ void ManagementAgent::periodicProcessing (void) char msgChars[BUFSIZE]; Buffer msgBuffer(msgChars, BUFSIZE); encodeHeader(msgBuffer, 'h'); - msgBuffer.putLongLong(uint64_t(Duration(now()))); + msgBuffer.putLongLong(uint64_t(sys::Duration(sys::now()))); contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); @@ -799,7 +799,7 @@ void ManagementAgent::periodicProcessing (void) headers["qmf.agent"] = name_address; map["_values"] = attrMap; - map["_values"].asMap()["timestamp"] = uint64_t(Duration(now())); + map["_values"].asMap()["timestamp"] = uint64_t(sys::Duration(sys::now())); map["_values"].asMap()["heartbeat_interval"] = interval; string content; @@ -890,7 +890,7 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable, const FieldTable* /*args*/, const bool topic) { - Mutex::ScopedLock lock (userLock); + sys::Mutex::ScopedLock lock (userLock); Message& msg = ((DeliverableMessage&) deliverable).getMessage (); if (qmf1Support && topic) { @@ -1022,7 +1022,7 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey else try { outBuffer.record(); - Mutex::ScopedUnlock u(userLock); + sys::Mutex::ScopedUnlock u(userLock); std::string outBuf; iter->second->doMethod(methodName, inArgs, outBuf); outBuffer.putRawData(outBuf); @@ -1687,7 +1687,7 @@ void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo headers["qmf.agent"] = name_address; map["_values"] = attrMap; - map["_values"].asMap()["timestamp"] = uint64_t(Duration(now())); + map["_values"].asMap()["timestamp"] = uint64_t(sys::Duration(sys::now())); map["_values"].asMap()["heartbeat_interval"] = interval; string content; @@ -2109,13 +2109,13 @@ ManagementObjectMap::iterator ManagementAgent::numericFind(const ObjectId& oid) void ManagementAgent::setAllocator(std::auto_ptr<IdAllocator> a) { - Mutex::ScopedLock lock (userLock); + sys::Mutex::ScopedLock lock (userLock); allocator = a; } uint64_t ManagementAgent::allocateId(Manageable* object) { - Mutex::ScopedLock lock (userLock); + sys::Mutex::ScopedLock lock (userLock); if (allocator.get()) return allocator->getIdFor(object); return 0; } diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp index 46fc67d07f..eb1b32bf7c 100644 --- a/cpp/src/qpid/management/ManagementObject.cpp +++ b/cpp/src/qpid/management/ManagementObject.cpp @@ -331,7 +331,7 @@ void ManagementObject::setReference(ObjectId) {} int ManagementObject::getThreadIndex() { static QPID_TSS int thisIndex = -1; if (thisIndex == -1) { - sys::Mutex::ScopedLock mutex(accessLock); + Mutex::ScopedLock mutex(accessLock); thisIndex = nextThreadIndex; if (nextThreadIndex < maxThreads - 1) nextThreadIndex++; diff --git a/cpp/src/qpid/management/Mutex.cpp b/cpp/src/qpid/management/Mutex.cpp new file mode 100644 index 0000000000..f05abb01dc --- /dev/null +++ b/cpp/src/qpid/management/Mutex.cpp @@ -0,0 +1,29 @@ +/* + * + * Copyright (c) 2008 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include "qpid/management/Mutex.h" +#include "qpid/sys/Mutex.h" + +using namespace std; +using namespace qpid::management; + +Mutex::Mutex() : impl(new sys::Mutex()) {} +Mutex::~Mutex() { delete impl; } +void Mutex::lock() { impl->lock(); } +void Mutex::unlock() { impl->unlock(); } + |
