summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2007-07-30 13:25:31 +0000
committerAndrew Stitcher <astitcher@apache.org>2007-07-30 13:25:31 +0000
commitf5bcdf79d18710205afb5ce95ab20686e9acea63 (patch)
treefc143ce68ddd8509da07f7a9192cecb6f6da18c1 /cpp
parent04a5eca2d8ff39952f8b76a647572756f277b17c (diff)
downloadqpid-python-f5bcdf79d18710205afb5ce95ab20686e9acea63.tar.gz
r797@fuschia: andrew | 2007-07-30 14:25:02 +0100
* Removed all the leader-follower acceptor code (APR based acceptor) * Removed the --enable/disable-apr-netio option to configure git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560973 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/README4
-rw-r--r--cpp/configure.ac23
-rw-r--r--cpp/src/Makefile.am50
-rw-r--r--cpp/src/qpid/sys/apr/APRAcceptor.cpp127
-rw-r--r--cpp/src/qpid/sys/apr/APRSocket.cpp79
-rw-r--r--cpp/src/qpid/sys/apr/APRSocket.h48
-rw-r--r--cpp/src/qpid/sys/apr/LFProcessor.cpp181
-rw-r--r--cpp/src/qpid/sys/apr/LFProcessor.h121
-rw-r--r--cpp/src/qpid/sys/apr/LFSessionContext.cpp170
-rw-r--r--cpp/src/qpid/sys/apr/LFSessionContext.h87
10 files changed, 18 insertions, 872 deletions
diff --git a/cpp/README b/cpp/README
index a6c25dcd0f..b50d00ed7c 100644
--- a/cpp/README
+++ b/cpp/README
@@ -33,9 +33,11 @@ The following libraries and header files must be installed to build
a source distribution:
* boost <http://www.boost.org> (1.33.1)
* uuid <http://e2fsprogs.sourceforge.net/> (1.39)
- * apr <http://apr.apache.org> (1.2.7)
* pkgconfig <http://pkgconfig.freedesktop.org/wiki/> (0.21)
+Building on a platform other than Linux currently requires:
+ * apr <http://apr.apache.org> (1.2.7)
+
Optional cluster functionality requires:
* openais <http://openais.org/> (0.80.3)
diff --git a/cpp/configure.ac b/cpp/configure.ac
index d9ec8cce21..20c6426188 100644
--- a/cpp/configure.ac
+++ b/cpp/configure.ac
@@ -90,17 +90,6 @@ CPPUNIT_CXXFLAGS=$CPPUNIT_CFLAGS
AC_SUBST(CPPUNIT_LIBS)
AC_SUBST(CPPUNIT_CXXFLAGS)
-AC_ARG_ENABLE([apr-netio],
- [AS_HELP_STRING([--enable-apr-netio],
- [use the Apache Portable Runtime library for network IO (default yes)])],
- [case $enableval in
- yes|no) enable_APR_NETIO=$enableval;;
- *) AC_MSG_ERROR([Invalid value for --enable-apr-netio: $enableval]);;
- esac],
- [enable_APR_NETIO=yes]
-)
-AM_CONDITIONAL([USE_APR_NETIO], [test x$enable_APR_NETIO = xyes])
-
AC_ARG_ENABLE([apr-platform],
[AS_HELP_STRING([--enable-apr-platform],
[use the Apache Portable Runtime library for platform (default no)])],
@@ -117,16 +106,10 @@ AC_SUBST(APR_MINIMUM_VERSION)
AC_SUBST(APR_CXXFLAGS)
AC_SUBST(USE_APR_PLATFORM)
-if test "$enable_APR_NETIO" = yes -o "$enable_APR_PLATFORM" = yes; then
+if test "$enable_APR_PLATFORM" = yes; then
PKG_CHECK_MODULES([APR], [apr-1 >= $APR_MINIMUM_VERSION])
- APR_CXXFLAGS="$APR_CFLAGS"
- if test "$enable_APR_NETIO" = yes; then
- USE_APR_NETIO=1
- fi
- if test "$enable_APR_PLATFORM" = yes; then
- APR_CXXFLAGS+=" -DUSE_APR_PLATFORM=1"
- USE_APR_PLATFORM=1
- fi
+ APR_CXXFLAGS="$APR_CFLAGS -DUSE_APR_PLATFORM=1"
+ USE_APR_PLATFORM=1
fi
AC_ARG_ENABLE([valgrind],
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 7950299f64..b455a4558b 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -33,40 +33,22 @@ qpidd_LDADD = \
sbin_PROGRAMS = qpidd
qpidd_SOURCES = qpidd.cpp
-apr_netio_src = \
- qpid/sys/apr/APRAcceptor.cpp \
+apr_plat_src = \
qpid/sys/apr/APRBase.cpp \
qpid/sys/apr/APRPool.cpp \
- qpid/sys/apr/APRSocket.cpp \
- qpid/sys/apr/LFProcessor.cpp \
- qpid/sys/apr/LFSessionContext.cpp
-
-apr_netio_hdr = \
- qpid/sys/apr/APRBase.h \
- qpid/sys/apr/APRPool.h \
- qpid/sys/apr/APRSocket.h \
- qpid/sys/apr/LFProcessor.h \
- qpid/sys/apr/LFSessionContext.h
-
-apr_plat_src = \
qpid/sys/apr/Socket.cpp \
qpid/sys/apr/Time.cpp \
qpid/sys/apr/Thread.cpp \
qpid/sys/apr/Shlib.cpp
-
apr_plat_hdr = \
+ qpid/sys/apr/APRBase.h \
+ qpid/sys/apr/APRPool.h \
qpid/sys/apr/Condition.h \
qpid/sys/apr/Mutex.h \
qpid/sys/apr/Thread.h
-posix_netio_src = \
- qpid/sys/AsynchIOAcceptor.cpp
-
-posix_netio_hdr =
-
posix_plat_src = \
- qpid/sys/Dispatcher.cpp \
qpid/sys/epoll/EpollPoller.cpp \
qpid/sys/posix/check.cpp \
qpid/sys/posix/Socket.cpp \
@@ -82,24 +64,14 @@ posix_plat_hdr = \
qpid/sys/posix/Mutex.h \
qpid/sys/posix/Thread.h
-if USE_APR_NETIO
- platform_dist=$(posix_netio_src) $(posix_netio_hdr)
- platform_src = $(apr_netio_src)
- platform_hdr = $(apr_netio_hdr)
-else
- platform_dist=$(apr_netio_src) $(apr_netio_hdr)
- platform_src = $(posix_netio_src)
- platform_hdr = $(posix_netio_hdr)
-endif
-
if USE_APR_PLATFORM
- platform_dist+=$(posix_plat_src) $(posix_plat_hdr)
- platform_src += $(apr_plat_src)
- platform_hdr += $(apr_plat_hdr)
+ platform_dist=$(posix_plat_src) $(posix_plat_hdr)
+ platform_src = $(apr_plat_src)
+ platform_hdr = $(apr_plat_hdr)
else
- platform_dist+=$(apr_plat_src) $(apr_plat_hdr)
- platform_src += $(posix_plat_src)
- platform_hdr += $(posix_plat_hdr)
+ platform_dist=$(apr_plat_src) $(apr_plat_hdr)
+ platform_src = $(posix_plat_src)
+ platform_hdr = $(posix_plat_hdr)
endif
lib_LTLIBRARIES = libqpidcommon.la libqpidbroker.la libqpidclient.la
@@ -166,8 +138,10 @@ libqpidcommon_la_SOURCES = \
qpid/Url.h \
qpid/Url.cpp \
qpid/QpidError.cpp \
- qpid/sys/Serializer.cpp \
+ qpid/sys/AsynchIOAcceptor.cpp \
+ qpid/sys/Dispatcher.cpp \
qpid/sys/Runnable.cpp \
+ qpid/sys/Serializer.cpp \
qpid/sys/Shlib.h \
qpid/sys/Shlib.cpp \
qpid/Options.cpp \
diff --git a/cpp/src/qpid/sys/apr/APRAcceptor.cpp b/cpp/src/qpid/sys/apr/APRAcceptor.cpp
deleted file mode 100644
index b353b698ef..0000000000
--- a/cpp/src/qpid/sys/apr/APRAcceptor.cpp
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "qpid/log/Statement.h"
-#include "qpid/sys/Acceptor.h"
-#include "qpid/sys/ConnectionInputHandlerFactory.h"
-#include "qpid/sys/Mutex.h"
-#include "LFProcessor.h"
-#include "LFSessionContext.h"
-#include "APRBase.h"
-#include "APRPool.h"
-
-namespace qpid {
-namespace sys {
-
-class APRAcceptor : public Acceptor
-{
- public:
- APRAcceptor(int16_t port, int backlog, int threads, bool trace);
- virtual uint16_t getPort() const;
- virtual std::string getHost() const;
- virtual void run(qpid::sys::ConnectionInputHandlerFactory* factory);
- virtual void shutdown();
-
- private:
- void shutdownImpl();
-
- private:
- int16_t port;
- bool trace;
- LFProcessor processor;
- apr_socket_t* socket;
- volatile bool running;
- Mutex shutdownLock;
-};
-
-// Define generic Acceptor::create() to return APRAcceptor.
-Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads, bool trace)
-{
- return Acceptor::shared_ptr(new APRAcceptor(port, backlog, threads, trace));
-}
-
-APRAcceptor::APRAcceptor(int16_t port_, int backlog, int threads, bool trace_) :
- port(port_),
- trace(trace_),
- processor(APRPool::get(), threads, 1000, 5000000),
- running(false)
-{
- apr_sockaddr_t* address;
- CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get()));
- CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get()));
- CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1));
- CHECK_APR_SUCCESS(apr_socket_bind(socket, address));
- CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog));
-}
-
-std::string APRAcceptor::getHost() const {
- apr_sockaddr_t* address;
- CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket));
- return address->hostname;
-}
-
-uint16_t APRAcceptor::getPort() const {
- apr_sockaddr_t* address;
- CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket));
- return address->port;
-}
-
-void APRAcceptor::run(ConnectionInputHandlerFactory* factory) {
- running = true;
- processor.start();
- QPID_LOG(info, "Listening on port " << getPort());
- while(running) {
- apr_socket_t* client;
- apr_status_t status = apr_socket_accept(&client, socket, APRPool::get());
- if(status == APR_SUCCESS){
- //make this socket non-blocking:
- CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0));
- CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1));
- CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1));
- CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768));
- CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768));
- LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, trace);
- session->init(factory->create(session));
- }else{
- Mutex::ScopedLock locker(shutdownLock);
- if(running) {
- if(status != APR_EINTR){
- QPID_LOG(error, get_desc(status));
- }
- shutdownImpl();
- }
- }
- }
-}
-
-void APRAcceptor::shutdown() {
- Mutex::ScopedLock locker(shutdownLock);
- if (running)
- shutdownImpl();
-}
-
-void APRAcceptor::shutdownImpl() {
- running = false;
- processor.stop();
- CHECK_APR_SUCCESS(apr_socket_close(socket));
-}
-
-
-}}
diff --git a/cpp/src/qpid/sys/apr/APRSocket.cpp b/cpp/src/qpid/sys/apr/APRSocket.cpp
deleted file mode 100644
index 3086bafc7c..0000000000
--- a/cpp/src/qpid/sys/apr/APRSocket.cpp
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "APRBase.h"
-#include "APRSocket.h"
-#include "qpid/log/Statement.h"
-#include <assert.h>
-#include <iostream>
-
-using namespace qpid::sys;
-using namespace qpid::framing;
-
-APRSocket::APRSocket(apr_socket_t* _socket) : socket(_socket), closed(false){
-
-}
-
-void APRSocket::read(qpid::framing::Buffer& buffer){
- apr_size_t bytes;
- bytes = buffer.available();
- apr_status_t s = apr_socket_recv(socket, buffer.start(), &bytes);
- buffer.move(bytes);
- if(APR_STATUS_IS_TIMEUP(s)){
- //timed out
- }else if(APR_STATUS_IS_EOF(s)){
- close();
- }
-}
-
-void APRSocket::write(qpid::framing::Buffer& buffer){
- apr_size_t bytes;
- do{
- bytes = buffer.available();
- apr_socket_send(socket, buffer.start(), &bytes);
- buffer.move(bytes);
- }while(bytes > 0);
-}
-
-void APRSocket::close(){
- if(!closed){
- QPID_LOG(warning, "Closing socket " << socket << "@" << this);
- CHECK_APR_SUCCESS(apr_socket_close(socket));
- closed = true;
- }
-}
-
-bool APRSocket::isOpen() const {
- return !closed;
-}
-
-uint8_t APRSocket::read(){
- char data[1];
- apr_size_t bytes = 1;
- apr_status_t s = apr_socket_recv(socket, data, &bytes);
- if(APR_STATUS_IS_EOF(s) || bytes == 0){
- return 0;
- }else{
- return *data;
- }
-}
-
-APRSocket::~APRSocket(){
-}
diff --git a/cpp/src/qpid/sys/apr/APRSocket.h b/cpp/src/qpid/sys/apr/APRSocket.h
deleted file mode 100644
index 30a3be2746..0000000000
--- a/cpp/src/qpid/sys/apr/APRSocket.h
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _APRSocket_
-#define _APRSocket_
-
-#include <apr_network_io.h>
-#include "qpid/framing/Buffer.h"
-
-namespace qpid {
-namespace sys {
-
- class APRSocket
- {
- apr_socket_t* const socket;
- volatile bool closed;
- public:
- APRSocket(apr_socket_t* socket);
- void read(qpid::framing::Buffer& b);
- void write(qpid::framing::Buffer& b);
- void close();
- bool isOpen() const;
- uint8_t read();
- ~APRSocket();
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/sys/apr/LFProcessor.cpp b/cpp/src/qpid/sys/apr/LFProcessor.cpp
deleted file mode 100644
index 69c92cc2c4..0000000000
--- a/cpp/src/qpid/sys/apr/LFProcessor.cpp
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <sstream>
-#include "qpid/log/Statement.h"
-#include "qpid/QpidError.h"
-#include "qpid/sys/Mutex.h"
-#include "LFProcessor.h"
-#include "APRBase.h"
-#include "LFSessionContext.h"
-
-using namespace qpid::sys;
-using qpid::QpidError;
-
-// TODO aconway 2006-10-12: stopped is read outside locks.
-//
-
-LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) :
- size(_size),
- timeout(_timeout),
- signalledCount(0),
- current(0),
- count(0),
- workerCount(_workers),
- hasLeader(false),
- workers(new Thread[_workers]),
- stopped(false)
-{
-
- CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE));
-}
-
-
-LFProcessor::~LFProcessor(){
- if (!stopped) stop();
- delete[] workers;
- CHECK_APR_SUCCESS(apr_pollset_destroy(pollset));
-}
-
-void LFProcessor::start(){
- for(int i = 0; i < workerCount; i++){
- workers[i] = Thread(this);
- }
-}
-
-void LFProcessor::add(const apr_pollfd_t* const fd){
- CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
- Monitor::ScopedLock l(countLock);
- sessions.push_back(reinterpret_cast<LFSessionContext*>(fd->client_data));
- count++;
-}
-
-void LFProcessor::remove(const apr_pollfd_t* const fd){
- CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
- Monitor::ScopedLock l(countLock);
- sessions.erase(find(sessions.begin(), sessions.end(), reinterpret_cast<LFSessionContext*>(fd->client_data)));
- count--;
-}
-
-void LFProcessor::reactivate(const apr_pollfd_t* const fd){
- CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
-}
-
-void LFProcessor::deactivate(const apr_pollfd_t* const fd){
- CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
-}
-
-void LFProcessor::update(const apr_pollfd_t* const fd){
- CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
- CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
-}
-
-bool LFProcessor::full(){
- Mutex::ScopedLock locker(countLock);
- return count == size;
-}
-
-bool LFProcessor::empty(){
- Mutex::ScopedLock locker(countLock);
- return count == 0;
-}
-
-void LFProcessor::poll() {
- apr_status_t status = APR_EGENERAL;
- do{
- current = 0;
- if(!stopped){
- status = apr_pollset_poll(pollset, timeout, &signalledCount, &signalledFDs);
- }
- }while(status != APR_SUCCESS && !stopped);
-}
-
-void LFProcessor::run(){
- try{
- while(!stopped){
- const apr_pollfd_t* event = 0;
- LFSessionContext* session = 0;
- {
- Monitor::ScopedLock l(leadLock);
- waitToLead();
- event = getNextEvent();
- if(!event) return;
- session = reinterpret_cast<LFSessionContext*>(
- event->client_data);
- session->startProcessing();
- relinquishLead();
- }
-
- //process event:
- if(event->rtnevents & APR_POLLIN) session->read();
- if(event->rtnevents & APR_POLLOUT) session->write();
-
- if(session->isClosed()){
- session->handleClose();
- Monitor::ScopedLock l(countLock);
- sessions.erase(find(sessions.begin(),sessions.end(), session));
- count--;
- }else{
- session->stopProcessing();
- }
- }
- }catch(const std::exception& e){
- QPID_LOG(error, e.what());
- }
-}
-
-void LFProcessor::waitToLead(){
- while(hasLeader && !stopped) leadLock.wait();
- hasLeader = !stopped;
-}
-
-void LFProcessor::relinquishLead(){
- hasLeader = false;
- leadLock.notify();
-}
-
-const apr_pollfd_t* LFProcessor::getNextEvent(){
- while(true){
- if(stopped){
- return 0;
- }else if(current < signalledCount){
- //use result of previous poll if one is available
- return signalledFDs + (current++);
- }else{
- //else poll to get new events
- poll();
- }
- }
-}
-
-void LFProcessor::stop(){
- stopped = true;
- {
- Monitor::ScopedLock l(leadLock);
- leadLock.notifyAll();
- }
- for(int i = 0; i < workerCount; i++){
- workers[i].join();
- }
- for(iterator i = sessions.begin(); i < sessions.end(); i++){
- (*i)->shutdown();
- }
-}
-
diff --git a/cpp/src/qpid/sys/apr/LFProcessor.h b/cpp/src/qpid/sys/apr/LFProcessor.h
deleted file mode 100644
index 38d0b2787f..0000000000
--- a/cpp/src/qpid/sys/apr/LFProcessor.h
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _LFProcessor_
-#define _LFProcessor_
-
-#include <apr_poll.h>
-#include <iostream>
-#include <vector>
-#include "qpid/sys/Monitor.h"
-#include "qpid/sys/Runnable.h"
-#include "qpid/sys/Thread.h"
-
-namespace qpid {
-namespace sys {
-
- class LFSessionContext;
-
- /**
- * This class processes a poll set using the leaders-followers
- * pattern for thread synchronization: the leader will poll and on
- * the poll returning, it will remove a session, promote a
- * follower to leadership, then process the session.
- */
- class LFProcessor : private virtual qpid::sys::Runnable
- {
- typedef std::vector<LFSessionContext*>::iterator iterator;
-
- const int size;
- const apr_interval_time_t timeout;
- apr_pollset_t* pollset;
- int signalledCount;
- int current;
- const apr_pollfd_t* signalledFDs;
- int count;
- const int workerCount;
- bool hasLeader;
- qpid::sys::Thread* workers;
- qpid::sys::Monitor leadLock;
- qpid::sys::Mutex countLock;
- std::vector<LFSessionContext*> sessions;
- volatile bool stopped;
-
- const apr_pollfd_t* getNextEvent();
- void waitToLead();
- void relinquishLead();
- void poll();
- virtual void run();
-
- public:
- LFProcessor(apr_pool_t* pool, int workers, int size, int timeout);
- /**
- * Add the fd to the poll set. Relies on the client_data being
- * an instance of LFSessionContext.
- */
- void add(const apr_pollfd_t* const fd);
- /**
- * Remove the fd from the poll set.
- */
- void remove(const apr_pollfd_t* const fd);
- /**
- * Signal that the fd passed in, already part of the pollset,
- * has had its flags altered.
- */
- void update(const apr_pollfd_t* const fd);
- /**
- * Add an fd back to the poll set after deactivation.
- */
- void reactivate(const apr_pollfd_t* const fd);
- /**
- * Temporarily remove the fd from the poll set. Called when processing
- * is about to begin.
- */
- void deactivate(const apr_pollfd_t* const fd);
- /**
- * Indicates whether the capacity of this processor has been
- * reached (or whether it can still handle further fd's).
- */
- bool full();
- /**
- * Indicates whether there are any fd's registered.
- */
- bool empty();
- /**
- * Stop processing.
- */
- void stop();
- /**
- * Start processing.
- */
- void start();
- /**
- * Is processing stopped?
- */
- bool isStopped();
-
- ~LFProcessor();
- };
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/sys/apr/LFSessionContext.cpp b/cpp/src/qpid/sys/apr/LFSessionContext.cpp
deleted file mode 100644
index 4e708fd747..0000000000
--- a/cpp/src/qpid/sys/apr/LFSessionContext.cpp
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "LFSessionContext.h"
-#include "APRBase.h"
-#include "qpid/QpidError.h"
-#include "qpid/log/Statement.h"
-#include <assert.h>
-
-using namespace qpid::sys;
-using namespace qpid::sys;
-using namespace qpid::framing;
-
-LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket,
- LFProcessor* const _processor,
- bool _debug) :
- debug(_debug),
- socket(_socket),
- initiated(false),
- in(65536),
- out(65536),
- processor(_processor),
- processing(false),
- closing(false)
-{
-
- fd.p = _pool;
- fd.desc_type = APR_POLL_SOCKET;
- fd.reqevents = APR_POLLIN;
- fd.client_data = this;
- fd.desc.s = _socket;
-
- out.flip();
-}
-
-LFSessionContext::~LFSessionContext(){
-
-}
-
-void LFSessionContext::read(){
- socket.read(in);
- in.flip();
- if(initiated){
- AMQFrame frame;
- try{
- while(frame.decode(in)){
- QPID_LOG(debug, "RECV: " << frame);
- handler->received(frame);
- }
- }catch(const std::exception& e){
- QPID_LOG(error, e.what());
- }
- }else{
- ProtocolInitiation protocolInit;
- if(protocolInit.decode(in)){
- handler->initiated(protocolInit);
- initiated = true;
- QPID_LOG(debug, "INIT [" << &socket << "]");
- }
- }
- in.compact();
-}
-
-void LFSessionContext::write(){
- bool done = isClosed();
- while(!done){
- if(out.available() > 0){
- socket.write(out);
- if(out.available() > 0){
-
- //incomplete write, leave flags to receive notification of readiness to write
- done = true;//finished processing for now, but write is still in progress
- }
- }else{
- //do we have any frames to write?
- Mutex::ScopedLock l(writeLock);
- if(!framesToWrite.empty()){
- out.clear();
- bool encoded(false);
- while(!framesToWrite.empty() && out.available() >= framesToWrite.front().size()){
- AMQFrame& frame = framesToWrite.front();
- encoded = true;
- frame.encode(out);
- QPID_LOG(debug, "SENT: " << frame);
- framesToWrite.pop();
- }
- if(!encoded) THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer.");
- out.flip();
- }else{
- //reset flags, don't care about writability anymore
- fd.reqevents = APR_POLLIN;
- done = true;
-
- if(closing){
- socket.close();
- }
- }
- }
- }
-}
-
-void LFSessionContext::send(AMQFrame& frame){
- Mutex::ScopedLock l(writeLock);
- if(!closing){
- framesToWrite.push(frame);
- if(!(fd.reqevents & APR_POLLOUT)){
- fd.reqevents |= APR_POLLOUT;
- if(!processing){
- processor->update(&fd);
- }
- }
- }
-}
-
-void LFSessionContext::startProcessing(){
- Mutex::ScopedLock l(writeLock);
- processing = true;
- processor->deactivate(&fd);
-}
-
-void LFSessionContext::stopProcessing(){
- Mutex::ScopedLock l(writeLock);
- processor->reactivate(&fd);
- processing = false;
-}
-
-void LFSessionContext::close(){
- Mutex::ScopedLock l(writeLock);
- closing = true;
- if(!processing){
- //allow pending frames to be written to socket
- fd.reqevents = APR_POLLOUT;
- processor->update(&fd);
- }
-}
-
-void LFSessionContext::handleClose(){
- handler->closed();
- QPID_LOG(info, "Session closed [" << &socket << "]");
- delete handler;
- delete this;
-}
-
-void LFSessionContext::shutdown(){
- socket.close();
- handleClose();
-}
-
-void LFSessionContext::init(ConnectionInputHandler* _handler){
- handler = _handler;
- processor->add(&fd);
-}
-
diff --git a/cpp/src/qpid/sys/apr/LFSessionContext.h b/cpp/src/qpid/sys/apr/LFSessionContext.h
deleted file mode 100644
index 0ff80eccec..0000000000
--- a/cpp/src/qpid/sys/apr/LFSessionContext.h
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#ifndef _LFSessionContext_
-#define _LFSessionContext_
-
-#include <queue>
-
-#include <apr_network_io.h>
-#include <apr_poll.h>
-#include <apr_time.h>
-
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/FrameHandler.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/sys/Monitor.h"
-#include "qpid/sys/Mutex.h"
-#include "qpid/sys/ConnectionOutputHandler.h"
-#include "qpid/sys/ConnectionInputHandler.h"
-
-#include "APRSocket.h"
-#include "LFProcessor.h"
-
-namespace qpid {
-namespace sys {
-
-
-class LFSessionContext : public virtual qpid::sys::ConnectionOutputHandler
-{
- const bool debug;
- APRSocket socket;
- bool initiated;
-
- qpid::framing::Buffer in;
- qpid::framing::Buffer out;
-
- qpid::sys::ConnectionInputHandler* handler;
- LFProcessor* const processor;
-
- apr_pollfd_t fd;
-
- std::queue<qpid::framing::AMQFrame> framesToWrite;
- qpid::sys::Mutex writeLock;
-
- bool processing;
- bool closing;
-
- public:
- LFSessionContext(apr_pool_t* pool, apr_socket_t* socket,
- LFProcessor* const processor,
- bool debug = false);
- virtual ~LFSessionContext();
- virtual void send(framing::AMQFrame& frame);
- virtual void close();
- void read();
- void write();
- void init(qpid::sys::ConnectionInputHandler* handler);
- void startProcessing();
- void stopProcessing();
- void handleClose();
- void shutdown();
- inline apr_pollfd_t* const getFd(){ return &fd; }
- inline bool isClosed(){ return !socket.isOpen(); }
-};
-
-}
-}
-
-
-#endif