summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
authorTed Ross <tross@apache.org>2010-04-09 17:19:32 +0000
committerTed Ross <tross@apache.org>2010-04-09 17:19:32 +0000
commit3e19793c8f5be1676ea6f7577196f1b01cadf685 (patch)
tree5d36d56405507f9c68736a4299f21dea294b99f5 /cpp/src/qpid
parentfcfff56e615c4054d52dc510c9cd1d1103249dce (diff)
downloadqpid-python-3e19793c8f5be1676ea6f7577196f1b01cadf685.tar.gz
QPID-2489 - Added wrapped version of Mutex to isolate QMF-generated source from boost.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@932517 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.cpp54
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp42
-rw-r--r--cpp/src/qpid/management/ManagementObject.cpp2
-rw-r--r--cpp/src/qpid/management/Mutex.cpp29
4 files changed, 78 insertions, 49 deletions
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 42bc36c4b8..bade1d2826 100644
--- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -46,7 +46,7 @@ using qpid::amqp_0_10::MapCodec;
using qpid::amqp_0_10::ListCodec;
namespace {
- Mutex lock;
+ qpid::sys::Mutex lock;
bool disabled = false;
ManagementAgent* agent = 0;
int refCount = 0;
@@ -54,7 +54,7 @@ namespace {
ManagementAgent::Singleton::Singleton(bool disableManagement)
{
- Mutex::ScopedLock _lock(lock);
+ sys::Mutex::ScopedLock _lock(lock);
if (disableManagement && !disabled) {
disabled = true;
assert(refCount == 0); // can't disable after agent has been allocated
@@ -66,7 +66,7 @@ ManagementAgent::Singleton::Singleton(bool disableManagement)
ManagementAgent::Singleton::~Singleton()
{
- Mutex::ScopedLock _lock(lock);
+ sys::Mutex::ScopedLock _lock(lock);
refCount--;
if (refCount == 0 && !disabled) {
delete agent;
@@ -103,7 +103,7 @@ ManagementAgentImpl::~ManagementAgentImpl()
// Release the memory associated with stored management objects.
{
- Mutex::ScopedLock lock(agentLock);
+ sys::Mutex::ScopedLock lock(agentLock);
moveNewObjectsLH();
for (ManagementObjectMap::iterator iter = managementObjects.begin ();
@@ -186,7 +186,7 @@ void ManagementAgentImpl::registerClass(const string& packageName,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall)
{
- Mutex::ScopedLock lock(agentLock);
+ sys::Mutex::ScopedLock lock(agentLock);
PackageMap::iterator pIter = findOrAddPackage(packageName);
addClassLocal(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall);
}
@@ -196,7 +196,7 @@ void ManagementAgentImpl::registerEvent(const string& packageName,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall)
{
- Mutex::ScopedLock lock(agentLock);
+ sys::Mutex::ScopedLock lock(agentLock);
PackageMap::iterator pIter = findOrAddPackage(packageName);
addClassLocal(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall);
}
@@ -218,7 +218,7 @@ ObjectId ManagementAgentImpl::addObject(ManagementObject* object,
const std::string& key,
bool persistent)
{
- Mutex::ScopedLock lock(addLock);
+ sys::Mutex::ScopedLock lock(addLock);
uint16_t sequence = persistent ? 0 : bootSequence;
@@ -236,7 +236,7 @@ ObjectId ManagementAgentImpl::addObject(ManagementObject* object,
void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t severity)
{
- Mutex::ScopedLock lock(agentLock);
+ sys::Mutex::ScopedLock lock(agentLock);
Buffer outBuffer(eventBuffer, MA_BUFFER_SIZE);
uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
stringstream key;
@@ -269,7 +269,7 @@ void ManagementAgentImpl::raiseEvent(const ManagementEvent& event, severity_t se
uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
{
- Mutex::ScopedLock lock(agentLock);
+ sys::Mutex::ScopedLock lock(agentLock);
if (inCallback) {
QPID_LOG(critical, "pollCallbacks invoked from the agent's thread!");
@@ -283,7 +283,7 @@ uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
QueuedMethod* item = methodQueue.front();
methodQueue.pop_front();
{
- Mutex::ScopedUnlock unlock(agentLock);
+ sys::Mutex::ScopedUnlock unlock(agentLock);
invokeMethodRequest(item->body, item->cid, item->replyTo);
delete item;
}
@@ -309,14 +309,14 @@ int ManagementAgentImpl::getSignalFd()
void ManagementAgentImpl::setSignalCallback(cb_t callback, void* context)
{
- Mutex::ScopedLock lock(agentLock);
+ sys::Mutex::ScopedLock lock(agentLock);
notifyCallback = callback;
notifyContext = context;
}
void ManagementAgentImpl::setSignalCallback(Notifyable& _notifyable)
{
- Mutex::ScopedLock lock(agentLock);
+ sys::Mutex::ScopedLock lock(agentLock);
notifyable = &_notifyable;
}
@@ -407,7 +407,7 @@ void ManagementAgentImpl::sendException(const string& replyToKey, const string&
void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequence, const string& replyTo)
{
- Mutex::ScopedLock lock(agentLock);
+ sys::Mutex::ScopedLock lock(agentLock);
string packageName;
SchemaClassKey key;
@@ -441,7 +441,7 @@ void ManagementAgentImpl::handleSchemaRequest(Buffer& inBuffer, uint32_t sequenc
void ManagementAgentImpl::handleConsoleAddedIndication()
{
- Mutex::ScopedLock lock(agentLock);
+ sys::Mutex::ScopedLock lock(agentLock);
clientWasAdded = true;
QPID_LOG(trace, "RCVD ConsoleAddedInd");
@@ -680,7 +680,7 @@ void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid,
QPID_LOG(trace, "SENT AgentLocateResponse replyTo=" << replyTo);
{
- Mutex::ScopedLock lock(agentLock);
+ sys::Mutex::ScopedLock lock(agentLock);
clientWasAdded = true;
}
}
@@ -688,7 +688,7 @@ void ManagementAgentImpl::handleLocateRequest(const string&, const string& cid,
void ManagementAgentImpl::handleMethodRequest(const string& body, const string& cid, const string& replyTo)
{
if (extThread) {
- Mutex::ScopedLock lock(agentLock);
+ sys::Mutex::ScopedLock lock(agentLock);
methodQueue.push_back(new QueuedMethod(cid, replyTo, body));
if (pipeHandle != 0) {
@@ -696,14 +696,14 @@ void ManagementAgentImpl::handleMethodRequest(const string& body, const string&
} else if (notifyable != 0) {
inCallback = true;
{
- Mutex::ScopedUnlock unlock(agentLock);
+ sys::Mutex::ScopedUnlock unlock(agentLock);
notifyable->notify();
}
inCallback = false;
} else if (notifyCallback != 0) {
inCallback = true;
{
- Mutex::ScopedUnlock unlock(agentLock);
+ sys::Mutex::ScopedUnlock unlock(agentLock);
notifyCallback(notifyContext);
}
inCallback = false;
@@ -820,7 +820,7 @@ ManagementAgentImpl::PackageMap::iterator ManagementAgentImpl::findOrAddPackage(
void ManagementAgentImpl::moveNewObjectsLH()
{
- Mutex::ScopedLock lock(addLock);
+ sys::Mutex::ScopedLock lock(addLock);
for (ManagementObjectMap::iterator iter = newManagementObjects.begin();
iter != newManagementObjects.end();
iter++)
@@ -872,7 +872,7 @@ void ManagementAgentImpl::encodeClassIndication(Buffer& buf,
void ManagementAgentImpl::periodicProcessing()
{
- Mutex::ScopedLock lock(agentLock);
+ sys::Mutex::ScopedLock lock(agentLock);
list<pair<ObjectId, ManagementObject*> > deleteList;
if (!connected)
@@ -1007,14 +1007,14 @@ void ManagementAgentImpl::ConnectionThread::run()
subscriptions->subscribe(agent, queueName.str(), dest);
QPID_LOG(info, "Connection established with broker");
{
- Mutex::ScopedLock _lock(connLock);
+ sys::Mutex::ScopedLock _lock(connLock);
if (shutdown)
return;
operational = true;
agent.connected = true;
agent.startProtocol();
try {
- Mutex::ScopedUnlock _unlock(connLock);
+ sys::Mutex::ScopedUnlock _unlock(connLock);
subscriptions->run();
} catch (exception) {}
@@ -1039,13 +1039,13 @@ void ManagementAgentImpl::ConnectionThread::run()
// sleep for "delay" seconds, but peridically check if the
// agent is shutting down so we don't hang for up to delayMax
// seconds during agent shutdown
- Mutex::ScopedLock _lock(connLock);
+ sys::Mutex::ScopedLock _lock(connLock);
if (shutdown)
return;
sleeping = true;
int totalSleep = 0;
do {
- Mutex::ScopedUnlock _unlock(connLock);
+ sys::Mutex::ScopedUnlock _unlock(connLock);
::sleep(delayMin);
totalSleep += delayMin;
} while (totalSleep < delay && !shutdown);
@@ -1109,7 +1109,7 @@ void ManagementAgentImpl::ConnectionThread::sendMessage(Message msg,
{
ConnectionThread::shared_ptr s;
{
- Mutex::ScopedLock _lock(connLock);
+ sys::Mutex::ScopedLock _lock(connLock);
if (!operational)
return;
s = subscriptions;
@@ -1142,7 +1142,7 @@ void ManagementAgentImpl::ConnectionThread::close()
{
ConnectionThread::shared_ptr s;
{
- Mutex::ScopedLock _lock(connLock);
+ sys::Mutex::ScopedLock _lock(connLock);
shutdown = true;
s = subscriptions;
}
@@ -1152,7 +1152,7 @@ void ManagementAgentImpl::ConnectionThread::close()
bool ManagementAgentImpl::ConnectionThread::isSleeping() const
{
- Mutex::ScopedLock _lock(connLock);
+ sys::Mutex::ScopedLock _lock(connLock);
return sleeping;
}
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index 4158b685ce..18cd0cdfee 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -44,10 +44,10 @@ using qpid::framing::Uuid;
using qpid::types::Variant;
using qpid::amqp_0_10::MapCodec;
using qpid::amqp_0_10::ListCodec;
+using qpid::sys::Mutex;
using namespace qpid::framing;
using namespace qpid::management;
using namespace qpid::broker;
-using namespace qpid::sys;
using namespace qpid;
using namespace std;
namespace _qmf = qmf::org::apache::qpid::broker;
@@ -80,7 +80,7 @@ ManagementAgent::RemoteAgent::~RemoteAgent ()
ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) :
threadPoolSize(1), interval(10), broker(0), timer(0),
- startTime(uint64_t(Duration(now()))),
+ startTime(uint64_t(sys::Duration(sys::now()))),
suppressed(false),
qmf1Support(qmfV1), qmf2Support(qmfV2)
{
@@ -95,7 +95,7 @@ ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) :
ManagementAgent::~ManagementAgent ()
{
{
- Mutex::ScopedLock lock (userLock);
+ sys::Mutex::ScopedLock lock (userLock);
// Reset the shared pointers to exchanges. If this is not done now, the exchanges
// will stick around until dExchange and mExchange are implicitly destroyed (long
@@ -231,7 +231,7 @@ void ManagementAgent::registerClass (const string& packageName,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall)
{
- Mutex::ScopedLock lock(userLock);
+ sys::Mutex::ScopedLock lock(userLock);
PackageMap::iterator pIter = findOrAddPackageLH(packageName);
addClassLH(ManagementItem::CLASS_KIND_TABLE, pIter, className, md5Sum, schemaCall);
}
@@ -241,7 +241,7 @@ void ManagementAgent::registerEvent (const string& packageName,
uint8_t* md5Sum,
ManagementObject::writeSchemaCall_t schemaCall)
{
- Mutex::ScopedLock lock(userLock);
+ sys::Mutex::ScopedLock lock(userLock);
PackageMap::iterator pIter = findOrAddPackageLH(packageName);
addClassLH(ManagementItem::CLASS_KIND_EVENT, pIter, eventName, md5Sum, schemaCall);
}
@@ -266,7 +266,7 @@ ObjectId ManagementAgent::addObject(ManagementObject* object, uint64_t persistId
object->setObjectId(objId);
{
- Mutex::ScopedLock lock (addLock);
+ sys::Mutex::ScopedLock lock (addLock);
ManagementObjectMap::iterator destIter = newManagementObjects.find(objId);
if (destIter != newManagementObjects.end()) {
if (destIter->second->isDeleted()) {
@@ -304,7 +304,7 @@ ObjectId ManagementAgent::addObject(ManagementObject* object,
object->setObjectId(objId);
{
- Mutex::ScopedLock lock (addLock);
+ sys::Mutex::ScopedLock lock (addLock);
ManagementObjectMap::iterator destIter = newManagementObjects.find(objId);
if (destIter != newManagementObjects.end()) {
if (destIter->second->isDeleted()) {
@@ -324,7 +324,7 @@ ObjectId ManagementAgent::addObject(ManagementObject* object,
void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severity)
{
- Mutex::ScopedLock lock (userLock);
+ sys::Mutex::ScopedLock lock (userLock);
uint8_t sev = (severity == SEV_DEFAULT) ? event.getSeverity() : (uint8_t) severity;
if (qmf1Support) {
@@ -335,7 +335,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi
outBuffer.putShortString(event.getPackageName());
outBuffer.putShortString(event.getEventName());
outBuffer.putBin128(event.getMd5Sum());
- outBuffer.putLongLong(uint64_t(Duration(now())));
+ outBuffer.putLongLong(uint64_t(sys::Duration(sys::now())));
outBuffer.putOctet(sev);
std::string sBuf;
event.encode(sBuf);
@@ -359,7 +359,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi
event.getMd5Sum());
event.mapEncode(values);
map_["_values"] = values;
- map_["_timestamp"] = uint64_t(Duration(now()));
+ map_["_timestamp"] = uint64_t(sys::Duration(sys::now()));
map_["_severity"] = sev;
headers["method"] = "indication";
@@ -379,7 +379,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi
}
ManagementAgent::Periodic::Periodic (ManagementAgent& _agent, uint32_t _seconds)
- : TimerTask (qpid::sys::Duration((_seconds ? _seconds : 1) * qpid::sys::TIME_SEC),
+ : TimerTask (sys::Duration((_seconds ? _seconds : 1) * sys::TIME_SEC),
"ManagementAgent::periodicProcessing"),
agent(_agent) {}
@@ -538,7 +538,7 @@ void ManagementAgent::sendBuffer(const std::string& data,
void ManagementAgent::moveNewObjectsLH()
{
- Mutex::ScopedLock lock (addLock);
+ sys::Mutex::ScopedLock lock (addLock);
for (ManagementObjectMap::iterator iter = newManagementObjects.begin ();
iter != newManagementObjects.end ();
iter++) {
@@ -573,14 +573,14 @@ void ManagementAgent::periodicProcessing (void)
#define BUFSIZE 65536
#define HEADROOM 4096
QPID_LOG(trace, "Management agent periodic processing");
- Mutex::ScopedLock lock (userLock);
+ sys::Mutex::ScopedLock lock (userLock);
char msgChars[BUFSIZE];
uint32_t contentSize;
string routingKey;
list<pair<ObjectId, ManagementObject*> > deleteList;
std::string sBuf;
- uint64_t uptime = uint64_t(Duration(now())) - startTime;
+ uint64_t uptime = uint64_t(sys::Duration(sys::now())) - startTime;
static_cast<_qmf::Broker*>(broker->GetManagementObject())->set_uptime(uptime);
moveNewObjectsLH();
@@ -779,7 +779,7 @@ void ManagementAgent::periodicProcessing (void)
char msgChars[BUFSIZE];
Buffer msgBuffer(msgChars, BUFSIZE);
encodeHeader(msgBuffer, 'h');
- msgBuffer.putLongLong(uint64_t(Duration(now())));
+ msgBuffer.putLongLong(uint64_t(sys::Duration(sys::now())));
contentSize = BUFSIZE - msgBuffer.available ();
msgBuffer.reset ();
@@ -799,7 +799,7 @@ void ManagementAgent::periodicProcessing (void)
headers["qmf.agent"] = name_address;
map["_values"] = attrMap;
- map["_values"].asMap()["timestamp"] = uint64_t(Duration(now()));
+ map["_values"].asMap()["timestamp"] = uint64_t(sys::Duration(sys::now()));
map["_values"].asMap()["heartbeat_interval"] = interval;
string content;
@@ -890,7 +890,7 @@ bool ManagementAgent::dispatchCommand (Deliverable& deliverable,
const FieldTable* /*args*/,
const bool topic)
{
- Mutex::ScopedLock lock (userLock);
+ sys::Mutex::ScopedLock lock (userLock);
Message& msg = ((DeliverableMessage&) deliverable).getMessage ();
if (qmf1Support && topic) {
@@ -1022,7 +1022,7 @@ void ManagementAgent::handleMethodRequestLH (Buffer& inBuffer, string replyToKey
else
try {
outBuffer.record();
- Mutex::ScopedUnlock u(userLock);
+ sys::Mutex::ScopedUnlock u(userLock);
std::string outBuf;
iter->second->doMethod(methodName, inArgs, outBuf);
outBuffer.putRawData(outBuf);
@@ -1687,7 +1687,7 @@ void ManagementAgent::handleLocateRequestLH(const string&, const string& replyTo
headers["qmf.agent"] = name_address;
map["_values"] = attrMap;
- map["_values"].asMap()["timestamp"] = uint64_t(Duration(now()));
+ map["_values"].asMap()["timestamp"] = uint64_t(sys::Duration(sys::now()));
map["_values"].asMap()["heartbeat_interval"] = interval;
string content;
@@ -2109,13 +2109,13 @@ ManagementObjectMap::iterator ManagementAgent::numericFind(const ObjectId& oid)
void ManagementAgent::setAllocator(std::auto_ptr<IdAllocator> a)
{
- Mutex::ScopedLock lock (userLock);
+ sys::Mutex::ScopedLock lock (userLock);
allocator = a;
}
uint64_t ManagementAgent::allocateId(Manageable* object)
{
- Mutex::ScopedLock lock (userLock);
+ sys::Mutex::ScopedLock lock (userLock);
if (allocator.get()) return allocator->getIdFor(object);
return 0;
}
diff --git a/cpp/src/qpid/management/ManagementObject.cpp b/cpp/src/qpid/management/ManagementObject.cpp
index 46fc67d07f..eb1b32bf7c 100644
--- a/cpp/src/qpid/management/ManagementObject.cpp
+++ b/cpp/src/qpid/management/ManagementObject.cpp
@@ -331,7 +331,7 @@ void ManagementObject::setReference(ObjectId) {}
int ManagementObject::getThreadIndex() {
static QPID_TSS int thisIndex = -1;
if (thisIndex == -1) {
- sys::Mutex::ScopedLock mutex(accessLock);
+ Mutex::ScopedLock mutex(accessLock);
thisIndex = nextThreadIndex;
if (nextThreadIndex < maxThreads - 1)
nextThreadIndex++;
diff --git a/cpp/src/qpid/management/Mutex.cpp b/cpp/src/qpid/management/Mutex.cpp
new file mode 100644
index 0000000000..f05abb01dc
--- /dev/null
+++ b/cpp/src/qpid/management/Mutex.cpp
@@ -0,0 +1,29 @@
+/*
+ *
+ * Copyright (c) 2008 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/management/Mutex.h"
+#include "qpid/sys/Mutex.h"
+
+using namespace std;
+using namespace qpid::management;
+
+Mutex::Mutex() : impl(new sys::Mutex()) {}
+Mutex::~Mutex() { delete impl; }
+void Mutex::lock() { impl->lock(); }
+void Mutex::unlock() { impl->unlock(); }
+