diff options
author | Stephen D. Huston <shuston@apache.org> | 2009-05-04 21:44:59 +0000 |
---|---|---|
committer | Stephen D. Huston <shuston@apache.org> | 2009-05-04 21:44:59 +0000 |
commit | abdf1ec27a07d5067e185e970c8229e06df28823 (patch) | |
tree | 50886fef8a3cc5e64c5299d7a09707c2d7ad227c /cpp | |
parent | 3c20c531712c067b040320d2fc48af915c8de112 (diff) | |
download | qpid-python-abdf1ec27a07d5067e185e970c8229e06df28823.tar.gz |
Add portability support for QMF agent, thanks to Pete McKinnon - partially fixes QPID-1731
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@771457 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/examples/README | 14 | ||||
-rw-r--r-- | cpp/examples/qmf-agent/example.cpp | 6 | ||||
-rw-r--r-- | cpp/src/CMakeLists.txt | 2 | ||||
-rw-r--r-- | cpp/src/Makefile.am | 5 | ||||
-rw-r--r-- | cpp/src/qpid/agent/ManagementAgent.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.cpp | 43 | ||||
-rw-r--r-- | cpp/src/qpid/agent/ManagementAgentImpl.h | 4 | ||||
-rw-r--r-- | cpp/src/qpid/agent/QmfAgentImportExport.h | 33 | ||||
-rwxr-xr-x | cpp/src/qpid/sys/PipeHandle.h | 51 | ||||
-rwxr-xr-x | cpp/src/qpid/sys/posix/PipeHandle.cpp | 64 | ||||
-rwxr-xr-x | cpp/src/qpid/sys/windows/PipeHandle.cpp | 65 |
11 files changed, 260 insertions, 34 deletions
diff --git a/cpp/examples/README b/cpp/examples/README index 50f0c07089..7bf35b53de 100644 --- a/cpp/examples/README +++ b/cpp/examples/README @@ -111,4 +111,16 @@ On Linux: # ./server # ./client - +== QMF Agent == + +This example demonstrates integration with the Qpid Management Framework (QMF). +After launching a Qpid broker, the sample program will connect to it and +advertise a managed object (org.apache.qpid.agent.example:parent). Using +qpid-tool, you can monitor the object and also call a method (create_child) to +spawn managed child objects. + +To build this example, simply invoke make on Unix or Linux. On Windows, you +must invoke + nmake /f example_gen.mak +before building the sample to generate the supporting model classes +(e.g., Parent,Child,etc.). diff --git a/cpp/examples/qmf-agent/example.cpp b/cpp/examples/qmf-agent/example.cpp index 4dec014370..a611c25ba3 100644 --- a/cpp/examples/qmf-agent/example.cpp +++ b/cpp/examples/qmf-agent/example.cpp @@ -23,6 +23,7 @@ #include <qpid/management/ManagementObject.h> #include <qpid/agent/ManagementAgent.h> #include <qpid/sys/Mutex.h> +#include <qpid/sys/Time.h> #include "qmf/org/apache/qpid/agent/example/Parent.h" #include "qmf/org/apache/qpid/agent/example/Child.h" #include "qmf/org/apache/qpid/agent/example/ArgsParentCreate_child.h" @@ -30,7 +31,6 @@ #include "qmf/org/apache/qpid/agent/example/Package.h" #include <signal.h> -#include <unistd.h> #include <cstdlib> #include <iostream> @@ -102,7 +102,7 @@ void CoreClass::doLoop() { // Periodically bump a counter to provide a changing statistical value while (1) { - sleep(1); + qpid::sys::sleep(1); mgmtObject->inc_count(); mgmtObject->set_state("IN_LOOP"); @@ -187,6 +187,8 @@ int main_int(int argc, char** argv) CoreClass core3(agent, "Example Core Object #3"); core1.doLoop(); + + return 0; } int main(int argc, char** argv) diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index d4fde9dcb9..695163d722 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -276,6 +276,7 @@ if (CMAKE_SYSTEM_NAME STREQUAL Windows) qpid/sys/windows/IocpPoller.cpp qpid/sys/windows/IOHandle.cpp qpid/sys/windows/LockFile.cpp + qpid/sys/windows/PipeHandle.cpp qpid/sys/windows/PollableCondition.cpp qpid/sys/windows/Shlib.cpp qpid/sys/windows/Socket.cpp @@ -321,6 +322,7 @@ else (CMAKE_SYSTEM_NAME STREQUAL Windows) qpid/sys/posix/IOHandle.cpp qpid/sys/posix/LockFile.cpp qpid/sys/posix/Mutex.cpp + qpid/sys/posix/PipeHandle.cpp qpid/sys/posix/PollableCondition.cpp qpid/sys/posix/Shlib.cpp qpid/log/posix/SinkOptions.cpp diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 77cc302c37..11ef204ddb 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -44,6 +44,7 @@ windows_dist = \ qpid/sys/windows/IoHandlePrivate.h \ qpid/sys/windows/LockFile.cpp \ qpid/sys/windows/PollableCondition.cpp \ + qpid/sys/windows/PipeHandle.cpp \ qpid/sys/windows/Mutex.h \ qpid/sys/windows/Shlib.cpp \ qpid/sys/windows/Socket.cpp \ @@ -132,7 +133,8 @@ posix_plat_src = \ qpid/sys/posix/Mutex.cpp \ qpid/sys/posix/Fork.cpp \ qpid/sys/posix/StrError.cpp \ - qpid/sys/posix/PollableCondition.cpp + qpid/sys/posix/PollableCondition.cpp \ + qpid/sys/posix/PipeHandle.cpp posix_plat_hdr = \ qpid/log/posix/SinkOptions.h \ @@ -711,6 +713,7 @@ nobase_include_HEADERS = \ qpid/sys/Mutex.h \ qpid/sys/OutputControl.h \ qpid/sys/OutputTask.h \ + qpid/sys/PipeHandle.h \ qpid/sys/PollableCondition.h \ qpid/sys/PollableQueue.h \ qpid/sys/Poller.h \ diff --git a/cpp/src/qpid/agent/ManagementAgent.h b/cpp/src/qpid/agent/ManagementAgent.h index c94291c9e7..1ab888c4ef 100644 --- a/cpp/src/qpid/agent/ManagementAgent.h +++ b/cpp/src/qpid/agent/ManagementAgent.h @@ -20,6 +20,7 @@ // under the License. // +#include "QmfAgentImportExport.h" #include "qpid/management/ManagementObject.h" #include "qpid/management/ManagementEvent.h" #include "qpid/management/Manageable.h" @@ -35,9 +36,9 @@ class ManagementAgent class Singleton { public: - Singleton(bool disableManagement = false); - ~Singleton(); - static ManagementAgent* getInstance(); + QMF_AGENT_EXTERN Singleton(bool disableManagement = false); + QMF_AGENT_EXTERN ~Singleton(); + QMF_AGENT_EXTERN static ManagementAgent* getInstance(); private: static sys::Mutex lock; static bool disabled; diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp index 7c70c12213..6c6fbdfe3c 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp +++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp @@ -21,14 +21,12 @@ #include "qpid/management/Manageable.h" #include "qpid/management/ManagementObject.h" #include "qpid/log/Statement.h" +#include "qpid/sys/PipeHandle.h" #include "ManagementAgentImpl.h" #include <list> #include <string.h> #include <stdlib.h> #include <sys/types.h> -#include <sys/socket.h> -#include <unistd.h> -#include <fcntl.h> #include <iostream> #include <fstream> @@ -80,7 +78,7 @@ ManagementAgent* ManagementAgent::Singleton::getInstance() const string ManagementAgentImpl::storeMagicNumber("MA02"); ManagementAgentImpl::ManagementAgentImpl() : - extThread(false), writeFd(-1), readFd(-1), + extThread(false), initialized(false), connected(false), lastFailure("never connected"), clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0), assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0), @@ -112,6 +110,10 @@ ManagementAgentImpl::~ManagementAgentImpl() } managementObjects.clear(); } + if (pipeHandle) { + delete pipeHandle; + pipeHandle = 0; + } } void ManagementAgentImpl::init(const string& brokerHost, @@ -134,7 +136,7 @@ void ManagementAgentImpl::init(const string& brokerHost, init(settings, intervalSeconds, useExternalThread, _storeFile); } -void ManagementAgentImpl::init(const client::ConnectionSettings& settings, +void ManagementAgentImpl::init(const qpid::client::ConnectionSettings& settings, uint16_t intervalSeconds, bool useExternalThread, const std::string& _storeFile) @@ -149,18 +151,9 @@ void ManagementAgentImpl::init(const client::ConnectionSettings& settings, connectionSettings = settings; // TODO: Abstract the socket calls for portability + // qpid::sys::PipeHandle to create a pipe if (extThread) { - int pair[2]; - int result = socketpair(PF_UNIX, SOCK_STREAM, 0, pair); - if (result == -1) { - return; - } - writeFd = pair[0]; - readFd = pair[1]; - - // Set the readFd to non-blocking - int flags = fcntl(readFd, F_GETFL); - fcntl(readFd, F_SETFL, flags | O_NONBLOCK); + pipeHandle = new PipeHandle(true); } retrieveData(); @@ -175,7 +168,7 @@ void ManagementAgentImpl::init(const client::ConnectionSettings& settings, void ManagementAgentImpl::registerClass(const string& packageName, const string& className, uint8_t* md5Sum, - management::ManagementObject::writeSchemaCall_t schemaCall) + qpid::management::ManagementObject::writeSchemaCall_t schemaCall) { Mutex::ScopedLock lock(agentLock); PackageMap::iterator pIter = findOrAddPackage(packageName); @@ -185,7 +178,7 @@ void ManagementAgentImpl::registerClass(const string& packageName, void ManagementAgentImpl::registerEvent(const string& packageName, const string& eventName, uint8_t* md5Sum, - management::ManagementObject::writeSchemaCall_t schemaCall) + qpid::management::ManagementObject::writeSchemaCall_t schemaCall) { Mutex::ScopedLock lock(agentLock); PackageMap::iterator pIter = findOrAddPackage(packageName); @@ -247,15 +240,15 @@ uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit) delete item; } } - - uint8_t rbuf[100]; - while (read(readFd, rbuf, 100) > 0) ; // Consume all signaling bytes + + char rbuf[100]; + while (pipeHandle->read(rbuf, 100) > 0) ; // Consume all signaling bytes return methodQueue.size(); } int ManagementAgentImpl::getSignalFd(void) { - return readFd; + return pipeHandle->getReadHandle(); } void ManagementAgentImpl::startProtocol() @@ -536,7 +529,7 @@ void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequenc inBuffer.getRawData(body, inBuffer.available()); methodQueue.push_back(new QueuedMethod(sequence, replyTo, body)); - write(writeFd, "X", 1); + pipeHandle->write("X", 1); } else { invokeMethodRequest(inBuffer, sequence, replyTo); } @@ -631,7 +624,7 @@ void ManagementAgentImpl::addClassLocal(uint8_t classKind, PackageMap::iterator pIter, const string& className, uint8_t* md5Sum, - management::ManagementObject::writeSchemaCall_t schemaCall) + qpid::management::ManagementObject::writeSchemaCall_t schemaCall) { SchemaClassKey key; ClassMap& cMap = pIter->second; @@ -906,7 +899,7 @@ bool ManagementAgentImpl::ConnectionThread::isSleeping() const void ManagementAgentImpl::PublishThread::run() { while (true) { - ::sleep(agent.getInterval()); agent.periodicProcessing(); + ::sleep(agent.getInterval()); } } diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.h b/cpp/src/qpid/agent/ManagementAgentImpl.h index 53eb690ba8..cc668b4995 100644 --- a/cpp/src/qpid/agent/ManagementAgentImpl.h +++ b/cpp/src/qpid/agent/ManagementAgentImpl.h @@ -31,6 +31,7 @@ #include "qpid/sys/Thread.h" #include "qpid/sys/Runnable.h" #include "qpid/sys/Mutex.h" +#include "qpid/sys/PipeHandle.h" #include "qpid/framing/Uuid.h" #include <iostream> #include <sstream> @@ -132,8 +133,7 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen uint16_t interval; bool extThread; - int writeFd; - int readFd; + sys::PipeHandle* pipeHandle; uint64_t nextObjectId; std::string storeFile; sys::Mutex agentLock; diff --git a/cpp/src/qpid/agent/QmfAgentImportExport.h b/cpp/src/qpid/agent/QmfAgentImportExport.h new file mode 100644 index 0000000000..9eee4a18fd --- /dev/null +++ b/cpp/src/qpid/agent/QmfAgentImportExport.h @@ -0,0 +1,33 @@ +#ifndef QMF_AGENT_IMPORT_EXPORT_H +#define QMF_AGENT_IMPORT_EXPORT_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#if defined(WIN32) && !defined(QPID_DECLARE_STATIC) +#if defined(QMF_AGENT_EXPORT) || defined (qmfagent_EXPORTS) +#define QMF_AGENT_EXTERN __declspec(dllexport) +#else +#define QMF_AGENT_EXTERN __declspec(dllimport) +#endif +#else +#define QMF_AGENT_EXTERN +#endif + +#endif diff --git a/cpp/src/qpid/sys/PipeHandle.h b/cpp/src/qpid/sys/PipeHandle.h new file mode 100755 index 0000000000..8aac76996b --- /dev/null +++ b/cpp/src/qpid/sys/PipeHandle.h @@ -0,0 +1,51 @@ +#ifndef _sys_PipeHandle_h +#define _sys_PipeHandle_h + +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ + +#include "qpid/sys/IntegerTypes.h" +#include "qpid/CommonImportExport.h" +#include <string> + +// This class is a portability wrapper around pipe fds. +// It currently exists primarily and solely for the purpose of +// integration with single-threaded components that require QMF +// integration through a signalling fd. + +namespace qpid { +namespace sys { + + class PipeHandle { + private: + int writeFd; + int readFd; + public: + QPID_COMMON_EXTERN PipeHandle(bool nonBlocking=true); + QPID_COMMON_EXTERN ~PipeHandle(); + QPID_COMMON_EXTERN int read(void* buf, size_t bufSize); + QPID_COMMON_EXTERN int write(const void* buf, size_t bufSize); + QPID_COMMON_EXTERN int getReadHandle(); + }; + +}} + +#endif /*!_sys_PipeHandle_h*/ diff --git a/cpp/src/qpid/sys/posix/PipeHandle.cpp b/cpp/src/qpid/sys/posix/PipeHandle.cpp new file mode 100755 index 0000000000..10e2cdc755 --- /dev/null +++ b/cpp/src/qpid/sys/posix/PipeHandle.cpp @@ -0,0 +1,64 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "qpid/sys/PipeHandle.h" +#include "check.h" +#include <unistd.h> +#include <fcntl.h> +#include <sys/socket.h> + +namespace qpid { +namespace sys { + +PipeHandle::PipeHandle(bool nonBlocking) { + + int pair[2]; + pair[0] = pair[1] = -1; + + if (socketpair(PF_UNIX, SOCK_STREAM, 0, pair) == -1) + throw qpid::Exception(QPID_MSG("Creation of pipe failed")); + + writeFd = pair[0]; + readFd = pair[1]; + + // Set the socket to non-blocking + if (nonBlocking) { + int flags = fcntl(readFd, F_GETFL); + fcntl(readFd, F_SETFL, flags | O_NONBLOCK); + } +} + +PipeHandle::~PipeHandle() { + close(readFd); + close(writeFd); +} + +int PipeHandle::read(void* buf, size_t bufSize) { + return ::read(readFd,buf,bufSize); +} + +int PipeHandle::write(const void* buf, size_t bufSize) { + return ::write(writeFd,buf,bufSize); +} + +int PipeHandle::getReadHandle() { + return readFd; +} + +}} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/windows/PipeHandle.cpp b/cpp/src/qpid/sys/windows/PipeHandle.cpp new file mode 100755 index 0000000000..b023d77cc0 --- /dev/null +++ b/cpp/src/qpid/sys/windows/PipeHandle.cpp @@ -0,0 +1,65 @@ +// +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +#include "qpid/sys/PipeHandle.h" +#include "check.h" +#include <io.h> +#include <fcntl.h> +#include <errno.h> +#include <winsock2.h> + +namespace qpid { +namespace sys { + +PipeHandle::PipeHandle(bool nonBlocking) { + + int pair[2]; + pair[0] = pair[1] = -1; + + if (_pipe(pair, 128, O_BINARY) == -1) + throw qpid::Exception(QPID_MSG("Creation of pipe failed")); + + writeFd = pair[0]; + readFd = pair[1]; + + // Set the socket to non-blocking + if (nonBlocking) { + unsigned long nonblock = 1; + ioctlsocket(readFd, FIONBIO, &nonblock); + } +} + +PipeHandle::~PipeHandle() { + close(readFd); + close(writeFd); +} + +int PipeHandle::read(void* buf, size_t bufSize) { + return ::read(readFd, buf, bufSize); +} + +int PipeHandle::write(const void* buf, size_t bufSize) { + return ::write(writeFd, buf, bufSize); +} + +int PipeHandle::getReadHandle() { + return readFd; +} + +}} // namespace qpid::sys |