summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2009-05-04 21:44:59 +0000
committerStephen D. Huston <shuston@apache.org>2009-05-04 21:44:59 +0000
commitabdf1ec27a07d5067e185e970c8229e06df28823 (patch)
tree50886fef8a3cc5e64c5299d7a09707c2d7ad227c /cpp
parent3c20c531712c067b040320d2fc48af915c8de112 (diff)
downloadqpid-python-abdf1ec27a07d5067e185e970c8229e06df28823.tar.gz
Add portability support for QMF agent, thanks to Pete McKinnon - partially fixes QPID-1731
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@771457 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/examples/README14
-rw-r--r--cpp/examples/qmf-agent/example.cpp6
-rw-r--r--cpp/src/CMakeLists.txt2
-rw-r--r--cpp/src/Makefile.am5
-rw-r--r--cpp/src/qpid/agent/ManagementAgent.h7
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.cpp43
-rw-r--r--cpp/src/qpid/agent/ManagementAgentImpl.h4
-rw-r--r--cpp/src/qpid/agent/QmfAgentImportExport.h33
-rwxr-xr-xcpp/src/qpid/sys/PipeHandle.h51
-rwxr-xr-xcpp/src/qpid/sys/posix/PipeHandle.cpp64
-rwxr-xr-xcpp/src/qpid/sys/windows/PipeHandle.cpp65
11 files changed, 260 insertions, 34 deletions
diff --git a/cpp/examples/README b/cpp/examples/README
index 50f0c07089..7bf35b53de 100644
--- a/cpp/examples/README
+++ b/cpp/examples/README
@@ -111,4 +111,16 @@ On Linux:
# ./server
# ./client
-
+== QMF Agent ==
+
+This example demonstrates integration with the Qpid Management Framework (QMF).
+After launching a Qpid broker, the sample program will connect to it and
+advertise a managed object (org.apache.qpid.agent.example:parent). Using
+qpid-tool, you can monitor the object and also call a method (create_child) to
+spawn managed child objects.
+
+To build this example, simply invoke make on Unix or Linux. On Windows, you
+must invoke
+ nmake /f example_gen.mak
+before building the sample to generate the supporting model classes
+(e.g., Parent,Child,etc.).
diff --git a/cpp/examples/qmf-agent/example.cpp b/cpp/examples/qmf-agent/example.cpp
index 4dec014370..a611c25ba3 100644
--- a/cpp/examples/qmf-agent/example.cpp
+++ b/cpp/examples/qmf-agent/example.cpp
@@ -23,6 +23,7 @@
#include <qpid/management/ManagementObject.h>
#include <qpid/agent/ManagementAgent.h>
#include <qpid/sys/Mutex.h>
+#include <qpid/sys/Time.h>
#include "qmf/org/apache/qpid/agent/example/Parent.h"
#include "qmf/org/apache/qpid/agent/example/Child.h"
#include "qmf/org/apache/qpid/agent/example/ArgsParentCreate_child.h"
@@ -30,7 +31,6 @@
#include "qmf/org/apache/qpid/agent/example/Package.h"
#include <signal.h>
-#include <unistd.h>
#include <cstdlib>
#include <iostream>
@@ -102,7 +102,7 @@ void CoreClass::doLoop()
{
// Periodically bump a counter to provide a changing statistical value
while (1) {
- sleep(1);
+ qpid::sys::sleep(1);
mgmtObject->inc_count();
mgmtObject->set_state("IN_LOOP");
@@ -187,6 +187,8 @@ int main_int(int argc, char** argv)
CoreClass core3(agent, "Example Core Object #3");
core1.doLoop();
+
+ return 0;
}
int main(int argc, char** argv)
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index d4fde9dcb9..695163d722 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -276,6 +276,7 @@ if (CMAKE_SYSTEM_NAME STREQUAL Windows)
qpid/sys/windows/IocpPoller.cpp
qpid/sys/windows/IOHandle.cpp
qpid/sys/windows/LockFile.cpp
+ qpid/sys/windows/PipeHandle.cpp
qpid/sys/windows/PollableCondition.cpp
qpid/sys/windows/Shlib.cpp
qpid/sys/windows/Socket.cpp
@@ -321,6 +322,7 @@ else (CMAKE_SYSTEM_NAME STREQUAL Windows)
qpid/sys/posix/IOHandle.cpp
qpid/sys/posix/LockFile.cpp
qpid/sys/posix/Mutex.cpp
+ qpid/sys/posix/PipeHandle.cpp
qpid/sys/posix/PollableCondition.cpp
qpid/sys/posix/Shlib.cpp
qpid/log/posix/SinkOptions.cpp
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 77cc302c37..11ef204ddb 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -44,6 +44,7 @@ windows_dist = \
qpid/sys/windows/IoHandlePrivate.h \
qpid/sys/windows/LockFile.cpp \
qpid/sys/windows/PollableCondition.cpp \
+ qpid/sys/windows/PipeHandle.cpp \
qpid/sys/windows/Mutex.h \
qpid/sys/windows/Shlib.cpp \
qpid/sys/windows/Socket.cpp \
@@ -132,7 +133,8 @@ posix_plat_src = \
qpid/sys/posix/Mutex.cpp \
qpid/sys/posix/Fork.cpp \
qpid/sys/posix/StrError.cpp \
- qpid/sys/posix/PollableCondition.cpp
+ qpid/sys/posix/PollableCondition.cpp \
+ qpid/sys/posix/PipeHandle.cpp
posix_plat_hdr = \
qpid/log/posix/SinkOptions.h \
@@ -711,6 +713,7 @@ nobase_include_HEADERS = \
qpid/sys/Mutex.h \
qpid/sys/OutputControl.h \
qpid/sys/OutputTask.h \
+ qpid/sys/PipeHandle.h \
qpid/sys/PollableCondition.h \
qpid/sys/PollableQueue.h \
qpid/sys/Poller.h \
diff --git a/cpp/src/qpid/agent/ManagementAgent.h b/cpp/src/qpid/agent/ManagementAgent.h
index c94291c9e7..1ab888c4ef 100644
--- a/cpp/src/qpid/agent/ManagementAgent.h
+++ b/cpp/src/qpid/agent/ManagementAgent.h
@@ -20,6 +20,7 @@
// under the License.
//
+#include "QmfAgentImportExport.h"
#include "qpid/management/ManagementObject.h"
#include "qpid/management/ManagementEvent.h"
#include "qpid/management/Manageable.h"
@@ -35,9 +36,9 @@ class ManagementAgent
class Singleton {
public:
- Singleton(bool disableManagement = false);
- ~Singleton();
- static ManagementAgent* getInstance();
+ QMF_AGENT_EXTERN Singleton(bool disableManagement = false);
+ QMF_AGENT_EXTERN ~Singleton();
+ QMF_AGENT_EXTERN static ManagementAgent* getInstance();
private:
static sys::Mutex lock;
static bool disabled;
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.cpp b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
index 7c70c12213..6c6fbdfe3c 100644
--- a/cpp/src/qpid/agent/ManagementAgentImpl.cpp
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.cpp
@@ -21,14 +21,12 @@
#include "qpid/management/Manageable.h"
#include "qpid/management/ManagementObject.h"
#include "qpid/log/Statement.h"
+#include "qpid/sys/PipeHandle.h"
#include "ManagementAgentImpl.h"
#include <list>
#include <string.h>
#include <stdlib.h>
#include <sys/types.h>
-#include <sys/socket.h>
-#include <unistd.h>
-#include <fcntl.h>
#include <iostream>
#include <fstream>
@@ -80,7 +78,7 @@ ManagementAgent* ManagementAgent::Singleton::getInstance()
const string ManagementAgentImpl::storeMagicNumber("MA02");
ManagementAgentImpl::ManagementAgentImpl() :
- extThread(false), writeFd(-1), readFd(-1),
+ extThread(false),
initialized(false), connected(false), lastFailure("never connected"),
clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0),
assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0),
@@ -112,6 +110,10 @@ ManagementAgentImpl::~ManagementAgentImpl()
}
managementObjects.clear();
}
+ if (pipeHandle) {
+ delete pipeHandle;
+ pipeHandle = 0;
+ }
}
void ManagementAgentImpl::init(const string& brokerHost,
@@ -134,7 +136,7 @@ void ManagementAgentImpl::init(const string& brokerHost,
init(settings, intervalSeconds, useExternalThread, _storeFile);
}
-void ManagementAgentImpl::init(const client::ConnectionSettings& settings,
+void ManagementAgentImpl::init(const qpid::client::ConnectionSettings& settings,
uint16_t intervalSeconds,
bool useExternalThread,
const std::string& _storeFile)
@@ -149,18 +151,9 @@ void ManagementAgentImpl::init(const client::ConnectionSettings& settings,
connectionSettings = settings;
// TODO: Abstract the socket calls for portability
+ // qpid::sys::PipeHandle to create a pipe
if (extThread) {
- int pair[2];
- int result = socketpair(PF_UNIX, SOCK_STREAM, 0, pair);
- if (result == -1) {
- return;
- }
- writeFd = pair[0];
- readFd = pair[1];
-
- // Set the readFd to non-blocking
- int flags = fcntl(readFd, F_GETFL);
- fcntl(readFd, F_SETFL, flags | O_NONBLOCK);
+ pipeHandle = new PipeHandle(true);
}
retrieveData();
@@ -175,7 +168,7 @@ void ManagementAgentImpl::init(const client::ConnectionSettings& settings,
void ManagementAgentImpl::registerClass(const string& packageName,
const string& className,
uint8_t* md5Sum,
- management::ManagementObject::writeSchemaCall_t schemaCall)
+ qpid::management::ManagementObject::writeSchemaCall_t schemaCall)
{
Mutex::ScopedLock lock(agentLock);
PackageMap::iterator pIter = findOrAddPackage(packageName);
@@ -185,7 +178,7 @@ void ManagementAgentImpl::registerClass(const string& packageName,
void ManagementAgentImpl::registerEvent(const string& packageName,
const string& eventName,
uint8_t* md5Sum,
- management::ManagementObject::writeSchemaCall_t schemaCall)
+ qpid::management::ManagementObject::writeSchemaCall_t schemaCall)
{
Mutex::ScopedLock lock(agentLock);
PackageMap::iterator pIter = findOrAddPackage(packageName);
@@ -247,15 +240,15 @@ uint32_t ManagementAgentImpl::pollCallbacks(uint32_t callLimit)
delete item;
}
}
-
- uint8_t rbuf[100];
- while (read(readFd, rbuf, 100) > 0) ; // Consume all signaling bytes
+
+ char rbuf[100];
+ while (pipeHandle->read(rbuf, 100) > 0) ; // Consume all signaling bytes
return methodQueue.size();
}
int ManagementAgentImpl::getSignalFd(void)
{
- return readFd;
+ return pipeHandle->getReadHandle();
}
void ManagementAgentImpl::startProtocol()
@@ -536,7 +529,7 @@ void ManagementAgentImpl::handleMethodRequest(Buffer& inBuffer, uint32_t sequenc
inBuffer.getRawData(body, inBuffer.available());
methodQueue.push_back(new QueuedMethod(sequence, replyTo, body));
- write(writeFd, "X", 1);
+ pipeHandle->write("X", 1);
} else {
invokeMethodRequest(inBuffer, sequence, replyTo);
}
@@ -631,7 +624,7 @@ void ManagementAgentImpl::addClassLocal(uint8_t classKind,
PackageMap::iterator pIter,
const string& className,
uint8_t* md5Sum,
- management::ManagementObject::writeSchemaCall_t schemaCall)
+ qpid::management::ManagementObject::writeSchemaCall_t schemaCall)
{
SchemaClassKey key;
ClassMap& cMap = pIter->second;
@@ -906,7 +899,7 @@ bool ManagementAgentImpl::ConnectionThread::isSleeping() const
void ManagementAgentImpl::PublishThread::run()
{
while (true) {
- ::sleep(agent.getInterval());
agent.periodicProcessing();
+ ::sleep(agent.getInterval());
}
}
diff --git a/cpp/src/qpid/agent/ManagementAgentImpl.h b/cpp/src/qpid/agent/ManagementAgentImpl.h
index 53eb690ba8..cc668b4995 100644
--- a/cpp/src/qpid/agent/ManagementAgentImpl.h
+++ b/cpp/src/qpid/agent/ManagementAgentImpl.h
@@ -31,6 +31,7 @@
#include "qpid/sys/Thread.h"
#include "qpid/sys/Runnable.h"
#include "qpid/sys/Mutex.h"
+#include "qpid/sys/PipeHandle.h"
#include "qpid/framing/Uuid.h"
#include <iostream>
#include <sstream>
@@ -132,8 +133,7 @@ class ManagementAgentImpl : public ManagementAgent, public client::MessageListen
uint16_t interval;
bool extThread;
- int writeFd;
- int readFd;
+ sys::PipeHandle* pipeHandle;
uint64_t nextObjectId;
std::string storeFile;
sys::Mutex agentLock;
diff --git a/cpp/src/qpid/agent/QmfAgentImportExport.h b/cpp/src/qpid/agent/QmfAgentImportExport.h
new file mode 100644
index 0000000000..9eee4a18fd
--- /dev/null
+++ b/cpp/src/qpid/agent/QmfAgentImportExport.h
@@ -0,0 +1,33 @@
+#ifndef QMF_AGENT_IMPORT_EXPORT_H
+#define QMF_AGENT_IMPORT_EXPORT_H
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#if defined(WIN32) && !defined(QPID_DECLARE_STATIC)
+#if defined(QMF_AGENT_EXPORT) || defined (qmfagent_EXPORTS)
+#define QMF_AGENT_EXTERN __declspec(dllexport)
+#else
+#define QMF_AGENT_EXTERN __declspec(dllimport)
+#endif
+#else
+#define QMF_AGENT_EXTERN
+#endif
+
+#endif
diff --git a/cpp/src/qpid/sys/PipeHandle.h b/cpp/src/qpid/sys/PipeHandle.h
new file mode 100755
index 0000000000..8aac76996b
--- /dev/null
+++ b/cpp/src/qpid/sys/PipeHandle.h
@@ -0,0 +1,51 @@
+#ifndef _sys_PipeHandle_h
+#define _sys_PipeHandle_h
+
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+
+#include "qpid/sys/IntegerTypes.h"
+#include "qpid/CommonImportExport.h"
+#include <string>
+
+// This class is a portability wrapper around pipe fds.
+// It currently exists primarily and solely for the purpose of
+// integration with single-threaded components that require QMF
+// integration through a signalling fd.
+
+namespace qpid {
+namespace sys {
+
+ class PipeHandle {
+ private:
+ int writeFd;
+ int readFd;
+ public:
+ QPID_COMMON_EXTERN PipeHandle(bool nonBlocking=true);
+ QPID_COMMON_EXTERN ~PipeHandle();
+ QPID_COMMON_EXTERN int read(void* buf, size_t bufSize);
+ QPID_COMMON_EXTERN int write(const void* buf, size_t bufSize);
+ QPID_COMMON_EXTERN int getReadHandle();
+ };
+
+}}
+
+#endif /*!_sys_PipeHandle_h*/
diff --git a/cpp/src/qpid/sys/posix/PipeHandle.cpp b/cpp/src/qpid/sys/posix/PipeHandle.cpp
new file mode 100755
index 0000000000..10e2cdc755
--- /dev/null
+++ b/cpp/src/qpid/sys/posix/PipeHandle.cpp
@@ -0,0 +1,64 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+#include "qpid/sys/PipeHandle.h"
+#include "check.h"
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/socket.h>
+
+namespace qpid {
+namespace sys {
+
+PipeHandle::PipeHandle(bool nonBlocking) {
+
+ int pair[2];
+ pair[0] = pair[1] = -1;
+
+ if (socketpair(PF_UNIX, SOCK_STREAM, 0, pair) == -1)
+ throw qpid::Exception(QPID_MSG("Creation of pipe failed"));
+
+ writeFd = pair[0];
+ readFd = pair[1];
+
+ // Set the socket to non-blocking
+ if (nonBlocking) {
+ int flags = fcntl(readFd, F_GETFL);
+ fcntl(readFd, F_SETFL, flags | O_NONBLOCK);
+ }
+}
+
+PipeHandle::~PipeHandle() {
+ close(readFd);
+ close(writeFd);
+}
+
+int PipeHandle::read(void* buf, size_t bufSize) {
+ return ::read(readFd,buf,bufSize);
+}
+
+int PipeHandle::write(const void* buf, size_t bufSize) {
+ return ::write(writeFd,buf,bufSize);
+}
+
+int PipeHandle::getReadHandle() {
+ return readFd;
+}
+
+}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/windows/PipeHandle.cpp b/cpp/src/qpid/sys/windows/PipeHandle.cpp
new file mode 100755
index 0000000000..b023d77cc0
--- /dev/null
+++ b/cpp/src/qpid/sys/windows/PipeHandle.cpp
@@ -0,0 +1,65 @@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+
+#include "qpid/sys/PipeHandle.h"
+#include "check.h"
+#include <io.h>
+#include <fcntl.h>
+#include <errno.h>
+#include <winsock2.h>
+
+namespace qpid {
+namespace sys {
+
+PipeHandle::PipeHandle(bool nonBlocking) {
+
+ int pair[2];
+ pair[0] = pair[1] = -1;
+
+ if (_pipe(pair, 128, O_BINARY) == -1)
+ throw qpid::Exception(QPID_MSG("Creation of pipe failed"));
+
+ writeFd = pair[0];
+ readFd = pair[1];
+
+ // Set the socket to non-blocking
+ if (nonBlocking) {
+ unsigned long nonblock = 1;
+ ioctlsocket(readFd, FIONBIO, &nonblock);
+ }
+}
+
+PipeHandle::~PipeHandle() {
+ close(readFd);
+ close(writeFd);
+}
+
+int PipeHandle::read(void* buf, size_t bufSize) {
+ return ::read(readFd, buf, bufSize);
+}
+
+int PipeHandle::write(const void* buf, size_t bufSize) {
+ return ::write(writeFd, buf, bufSize);
+}
+
+int PipeHandle::getReadHandle() {
+ return readFd;
+}
+
+}} // namespace qpid::sys