diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2007-07-30 13:25:31 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2007-07-30 13:25:31 +0000 |
| commit | f5bcdf79d18710205afb5ce95ab20686e9acea63 (patch) | |
| tree | fc143ce68ddd8509da07f7a9192cecb6f6da18c1 /cpp | |
| parent | 04a5eca2d8ff39952f8b76a647572756f277b17c (diff) | |
| download | qpid-python-f5bcdf79d18710205afb5ce95ab20686e9acea63.tar.gz | |
r797@fuschia: andrew | 2007-07-30 14:25:02 +0100
* Removed all the leader-follower acceptor code (APR based acceptor)
* Removed the --enable/disable-apr-netio option to configure
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560973 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
| -rw-r--r-- | cpp/README | 4 | ||||
| -rw-r--r-- | cpp/configure.ac | 23 | ||||
| -rw-r--r-- | cpp/src/Makefile.am | 50 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/apr/APRAcceptor.cpp | 127 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/apr/APRSocket.cpp | 79 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/apr/APRSocket.h | 48 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/apr/LFProcessor.cpp | 181 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/apr/LFProcessor.h | 121 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/apr/LFSessionContext.cpp | 170 | ||||
| -rw-r--r-- | cpp/src/qpid/sys/apr/LFSessionContext.h | 87 |
10 files changed, 18 insertions, 872 deletions
diff --git a/cpp/README b/cpp/README index a6c25dcd0f..b50d00ed7c 100644 --- a/cpp/README +++ b/cpp/README @@ -33,9 +33,11 @@ The following libraries and header files must be installed to build a source distribution: * boost <http://www.boost.org> (1.33.1) * uuid <http://e2fsprogs.sourceforge.net/> (1.39) - * apr <http://apr.apache.org> (1.2.7) * pkgconfig <http://pkgconfig.freedesktop.org/wiki/> (0.21) +Building on a platform other than Linux currently requires: + * apr <http://apr.apache.org> (1.2.7) + Optional cluster functionality requires: * openais <http://openais.org/> (0.80.3) diff --git a/cpp/configure.ac b/cpp/configure.ac index d9ec8cce21..20c6426188 100644 --- a/cpp/configure.ac +++ b/cpp/configure.ac @@ -90,17 +90,6 @@ CPPUNIT_CXXFLAGS=$CPPUNIT_CFLAGS AC_SUBST(CPPUNIT_LIBS) AC_SUBST(CPPUNIT_CXXFLAGS) -AC_ARG_ENABLE([apr-netio], - [AS_HELP_STRING([--enable-apr-netio], - [use the Apache Portable Runtime library for network IO (default yes)])], - [case $enableval in - yes|no) enable_APR_NETIO=$enableval;; - *) AC_MSG_ERROR([Invalid value for --enable-apr-netio: $enableval]);; - esac], - [enable_APR_NETIO=yes] -) -AM_CONDITIONAL([USE_APR_NETIO], [test x$enable_APR_NETIO = xyes]) - AC_ARG_ENABLE([apr-platform], [AS_HELP_STRING([--enable-apr-platform], [use the Apache Portable Runtime library for platform (default no)])], @@ -117,16 +106,10 @@ AC_SUBST(APR_MINIMUM_VERSION) AC_SUBST(APR_CXXFLAGS) AC_SUBST(USE_APR_PLATFORM) -if test "$enable_APR_NETIO" = yes -o "$enable_APR_PLATFORM" = yes; then +if test "$enable_APR_PLATFORM" = yes; then PKG_CHECK_MODULES([APR], [apr-1 >= $APR_MINIMUM_VERSION]) - APR_CXXFLAGS="$APR_CFLAGS" - if test "$enable_APR_NETIO" = yes; then - USE_APR_NETIO=1 - fi - if test "$enable_APR_PLATFORM" = yes; then - APR_CXXFLAGS+=" -DUSE_APR_PLATFORM=1" - USE_APR_PLATFORM=1 - fi + APR_CXXFLAGS="$APR_CFLAGS -DUSE_APR_PLATFORM=1" + USE_APR_PLATFORM=1 fi AC_ARG_ENABLE([valgrind], diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 7950299f64..b455a4558b 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -33,40 +33,22 @@ qpidd_LDADD = \ sbin_PROGRAMS = qpidd qpidd_SOURCES = qpidd.cpp -apr_netio_src = \ - qpid/sys/apr/APRAcceptor.cpp \ +apr_plat_src = \ qpid/sys/apr/APRBase.cpp \ qpid/sys/apr/APRPool.cpp \ - qpid/sys/apr/APRSocket.cpp \ - qpid/sys/apr/LFProcessor.cpp \ - qpid/sys/apr/LFSessionContext.cpp - -apr_netio_hdr = \ - qpid/sys/apr/APRBase.h \ - qpid/sys/apr/APRPool.h \ - qpid/sys/apr/APRSocket.h \ - qpid/sys/apr/LFProcessor.h \ - qpid/sys/apr/LFSessionContext.h - -apr_plat_src = \ qpid/sys/apr/Socket.cpp \ qpid/sys/apr/Time.cpp \ qpid/sys/apr/Thread.cpp \ qpid/sys/apr/Shlib.cpp - apr_plat_hdr = \ + qpid/sys/apr/APRBase.h \ + qpid/sys/apr/APRPool.h \ qpid/sys/apr/Condition.h \ qpid/sys/apr/Mutex.h \ qpid/sys/apr/Thread.h -posix_netio_src = \ - qpid/sys/AsynchIOAcceptor.cpp - -posix_netio_hdr = - posix_plat_src = \ - qpid/sys/Dispatcher.cpp \ qpid/sys/epoll/EpollPoller.cpp \ qpid/sys/posix/check.cpp \ qpid/sys/posix/Socket.cpp \ @@ -82,24 +64,14 @@ posix_plat_hdr = \ qpid/sys/posix/Mutex.h \ qpid/sys/posix/Thread.h -if USE_APR_NETIO - platform_dist=$(posix_netio_src) $(posix_netio_hdr) - platform_src = $(apr_netio_src) - platform_hdr = $(apr_netio_hdr) -else - platform_dist=$(apr_netio_src) $(apr_netio_hdr) - platform_src = $(posix_netio_src) - platform_hdr = $(posix_netio_hdr) -endif - if USE_APR_PLATFORM - platform_dist+=$(posix_plat_src) $(posix_plat_hdr) - platform_src += $(apr_plat_src) - platform_hdr += $(apr_plat_hdr) + platform_dist=$(posix_plat_src) $(posix_plat_hdr) + platform_src = $(apr_plat_src) + platform_hdr = $(apr_plat_hdr) else - platform_dist+=$(apr_plat_src) $(apr_plat_hdr) - platform_src += $(posix_plat_src) - platform_hdr += $(posix_plat_hdr) + platform_dist=$(apr_plat_src) $(apr_plat_hdr) + platform_src = $(posix_plat_src) + platform_hdr = $(posix_plat_hdr) endif lib_LTLIBRARIES = libqpidcommon.la libqpidbroker.la libqpidclient.la @@ -166,8 +138,10 @@ libqpidcommon_la_SOURCES = \ qpid/Url.h \ qpid/Url.cpp \ qpid/QpidError.cpp \ - qpid/sys/Serializer.cpp \ + qpid/sys/AsynchIOAcceptor.cpp \ + qpid/sys/Dispatcher.cpp \ qpid/sys/Runnable.cpp \ + qpid/sys/Serializer.cpp \ qpid/sys/Shlib.h \ qpid/sys/Shlib.cpp \ qpid/Options.cpp \ diff --git a/cpp/src/qpid/sys/apr/APRAcceptor.cpp b/cpp/src/qpid/sys/apr/APRAcceptor.cpp deleted file mode 100644 index b353b698ef..0000000000 --- a/cpp/src/qpid/sys/apr/APRAcceptor.cpp +++ /dev/null @@ -1,127 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "qpid/log/Statement.h" -#include "qpid/sys/Acceptor.h" -#include "qpid/sys/ConnectionInputHandlerFactory.h" -#include "qpid/sys/Mutex.h" -#include "LFProcessor.h" -#include "LFSessionContext.h" -#include "APRBase.h" -#include "APRPool.h" - -namespace qpid { -namespace sys { - -class APRAcceptor : public Acceptor -{ - public: - APRAcceptor(int16_t port, int backlog, int threads, bool trace); - virtual uint16_t getPort() const; - virtual std::string getHost() const; - virtual void run(qpid::sys::ConnectionInputHandlerFactory* factory); - virtual void shutdown(); - - private: - void shutdownImpl(); - - private: - int16_t port; - bool trace; - LFProcessor processor; - apr_socket_t* socket; - volatile bool running; - Mutex shutdownLock; -}; - -// Define generic Acceptor::create() to return APRAcceptor. -Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads, bool trace) -{ - return Acceptor::shared_ptr(new APRAcceptor(port, backlog, threads, trace)); -} - -APRAcceptor::APRAcceptor(int16_t port_, int backlog, int threads, bool trace_) : - port(port_), - trace(trace_), - processor(APRPool::get(), threads, 1000, 5000000), - running(false) -{ - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get())); - CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get())); - CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1)); - CHECK_APR_SUCCESS(apr_socket_bind(socket, address)); - CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog)); -} - -std::string APRAcceptor::getHost() const { - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); - return address->hostname; -} - -uint16_t APRAcceptor::getPort() const { - apr_sockaddr_t* address; - CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket)); - return address->port; -} - -void APRAcceptor::run(ConnectionInputHandlerFactory* factory) { - running = true; - processor.start(); - QPID_LOG(info, "Listening on port " << getPort()); - while(running) { - apr_socket_t* client; - apr_status_t status = apr_socket_accept(&client, socket, APRPool::get()); - if(status == APR_SUCCESS){ - //make this socket non-blocking: - CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768)); - CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768)); - LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, trace); - session->init(factory->create(session)); - }else{ - Mutex::ScopedLock locker(shutdownLock); - if(running) { - if(status != APR_EINTR){ - QPID_LOG(error, get_desc(status)); - } - shutdownImpl(); - } - } - } -} - -void APRAcceptor::shutdown() { - Mutex::ScopedLock locker(shutdownLock); - if (running) - shutdownImpl(); -} - -void APRAcceptor::shutdownImpl() { - running = false; - processor.stop(); - CHECK_APR_SUCCESS(apr_socket_close(socket)); -} - - -}} diff --git a/cpp/src/qpid/sys/apr/APRSocket.cpp b/cpp/src/qpid/sys/apr/APRSocket.cpp deleted file mode 100644 index 3086bafc7c..0000000000 --- a/cpp/src/qpid/sys/apr/APRSocket.cpp +++ /dev/null @@ -1,79 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "APRBase.h" -#include "APRSocket.h" -#include "qpid/log/Statement.h" -#include <assert.h> -#include <iostream> - -using namespace qpid::sys; -using namespace qpid::framing; - -APRSocket::APRSocket(apr_socket_t* _socket) : socket(_socket), closed(false){ - -} - -void APRSocket::read(qpid::framing::Buffer& buffer){ - apr_size_t bytes; - bytes = buffer.available(); - apr_status_t s = apr_socket_recv(socket, buffer.start(), &bytes); - buffer.move(bytes); - if(APR_STATUS_IS_TIMEUP(s)){ - //timed out - }else if(APR_STATUS_IS_EOF(s)){ - close(); - } -} - -void APRSocket::write(qpid::framing::Buffer& buffer){ - apr_size_t bytes; - do{ - bytes = buffer.available(); - apr_socket_send(socket, buffer.start(), &bytes); - buffer.move(bytes); - }while(bytes > 0); -} - -void APRSocket::close(){ - if(!closed){ - QPID_LOG(warning, "Closing socket " << socket << "@" << this); - CHECK_APR_SUCCESS(apr_socket_close(socket)); - closed = true; - } -} - -bool APRSocket::isOpen() const { - return !closed; -} - -uint8_t APRSocket::read(){ - char data[1]; - apr_size_t bytes = 1; - apr_status_t s = apr_socket_recv(socket, data, &bytes); - if(APR_STATUS_IS_EOF(s) || bytes == 0){ - return 0; - }else{ - return *data; - } -} - -APRSocket::~APRSocket(){ -} diff --git a/cpp/src/qpid/sys/apr/APRSocket.h b/cpp/src/qpid/sys/apr/APRSocket.h deleted file mode 100644 index 30a3be2746..0000000000 --- a/cpp/src/qpid/sys/apr/APRSocket.h +++ /dev/null @@ -1,48 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _APRSocket_ -#define _APRSocket_ - -#include <apr_network_io.h> -#include "qpid/framing/Buffer.h" - -namespace qpid { -namespace sys { - - class APRSocket - { - apr_socket_t* const socket; - volatile bool closed; - public: - APRSocket(apr_socket_t* socket); - void read(qpid::framing::Buffer& b); - void write(qpid::framing::Buffer& b); - void close(); - bool isOpen() const; - uint8_t read(); - ~APRSocket(); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/sys/apr/LFProcessor.cpp b/cpp/src/qpid/sys/apr/LFProcessor.cpp deleted file mode 100644 index 69c92cc2c4..0000000000 --- a/cpp/src/qpid/sys/apr/LFProcessor.cpp +++ /dev/null @@ -1,181 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <sstream> -#include "qpid/log/Statement.h" -#include "qpid/QpidError.h" -#include "qpid/sys/Mutex.h" -#include "LFProcessor.h" -#include "APRBase.h" -#include "LFSessionContext.h" - -using namespace qpid::sys; -using qpid::QpidError; - -// TODO aconway 2006-10-12: stopped is read outside locks. -// - -LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) : - size(_size), - timeout(_timeout), - signalledCount(0), - current(0), - count(0), - workerCount(_workers), - hasLeader(false), - workers(new Thread[_workers]), - stopped(false) -{ - - CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE)); -} - - -LFProcessor::~LFProcessor(){ - if (!stopped) stop(); - delete[] workers; - CHECK_APR_SUCCESS(apr_pollset_destroy(pollset)); -} - -void LFProcessor::start(){ - for(int i = 0; i < workerCount; i++){ - workers[i] = Thread(this); - } -} - -void LFProcessor::add(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); - Monitor::ScopedLock l(countLock); - sessions.push_back(reinterpret_cast<LFSessionContext*>(fd->client_data)); - count++; -} - -void LFProcessor::remove(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); - Monitor::ScopedLock l(countLock); - sessions.erase(find(sessions.begin(), sessions.end(), reinterpret_cast<LFSessionContext*>(fd->client_data))); - count--; -} - -void LFProcessor::reactivate(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); -} - -void LFProcessor::deactivate(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); -} - -void LFProcessor::update(const apr_pollfd_t* const fd){ - CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd)); - CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd)); -} - -bool LFProcessor::full(){ - Mutex::ScopedLock locker(countLock); - return count == size; -} - -bool LFProcessor::empty(){ - Mutex::ScopedLock locker(countLock); - return count == 0; -} - -void LFProcessor::poll() { - apr_status_t status = APR_EGENERAL; - do{ - current = 0; - if(!stopped){ - status = apr_pollset_poll(pollset, timeout, &signalledCount, &signalledFDs); - } - }while(status != APR_SUCCESS && !stopped); -} - -void LFProcessor::run(){ - try{ - while(!stopped){ - const apr_pollfd_t* event = 0; - LFSessionContext* session = 0; - { - Monitor::ScopedLock l(leadLock); - waitToLead(); - event = getNextEvent(); - if(!event) return; - session = reinterpret_cast<LFSessionContext*>( - event->client_data); - session->startProcessing(); - relinquishLead(); - } - - //process event: - if(event->rtnevents & APR_POLLIN) session->read(); - if(event->rtnevents & APR_POLLOUT) session->write(); - - if(session->isClosed()){ - session->handleClose(); - Monitor::ScopedLock l(countLock); - sessions.erase(find(sessions.begin(),sessions.end(), session)); - count--; - }else{ - session->stopProcessing(); - } - } - }catch(const std::exception& e){ - QPID_LOG(error, e.what()); - } -} - -void LFProcessor::waitToLead(){ - while(hasLeader && !stopped) leadLock.wait(); - hasLeader = !stopped; -} - -void LFProcessor::relinquishLead(){ - hasLeader = false; - leadLock.notify(); -} - -const apr_pollfd_t* LFProcessor::getNextEvent(){ - while(true){ - if(stopped){ - return 0; - }else if(current < signalledCount){ - //use result of previous poll if one is available - return signalledFDs + (current++); - }else{ - //else poll to get new events - poll(); - } - } -} - -void LFProcessor::stop(){ - stopped = true; - { - Monitor::ScopedLock l(leadLock); - leadLock.notifyAll(); - } - for(int i = 0; i < workerCount; i++){ - workers[i].join(); - } - for(iterator i = sessions.begin(); i < sessions.end(); i++){ - (*i)->shutdown(); - } -} - diff --git a/cpp/src/qpid/sys/apr/LFProcessor.h b/cpp/src/qpid/sys/apr/LFProcessor.h deleted file mode 100644 index 38d0b2787f..0000000000 --- a/cpp/src/qpid/sys/apr/LFProcessor.h +++ /dev/null @@ -1,121 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _LFProcessor_ -#define _LFProcessor_ - -#include <apr_poll.h> -#include <iostream> -#include <vector> -#include "qpid/sys/Monitor.h" -#include "qpid/sys/Runnable.h" -#include "qpid/sys/Thread.h" - -namespace qpid { -namespace sys { - - class LFSessionContext; - - /** - * This class processes a poll set using the leaders-followers - * pattern for thread synchronization: the leader will poll and on - * the poll returning, it will remove a session, promote a - * follower to leadership, then process the session. - */ - class LFProcessor : private virtual qpid::sys::Runnable - { - typedef std::vector<LFSessionContext*>::iterator iterator; - - const int size; - const apr_interval_time_t timeout; - apr_pollset_t* pollset; - int signalledCount; - int current; - const apr_pollfd_t* signalledFDs; - int count; - const int workerCount; - bool hasLeader; - qpid::sys::Thread* workers; - qpid::sys::Monitor leadLock; - qpid::sys::Mutex countLock; - std::vector<LFSessionContext*> sessions; - volatile bool stopped; - - const apr_pollfd_t* getNextEvent(); - void waitToLead(); - void relinquishLead(); - void poll(); - virtual void run(); - - public: - LFProcessor(apr_pool_t* pool, int workers, int size, int timeout); - /** - * Add the fd to the poll set. Relies on the client_data being - * an instance of LFSessionContext. - */ - void add(const apr_pollfd_t* const fd); - /** - * Remove the fd from the poll set. - */ - void remove(const apr_pollfd_t* const fd); - /** - * Signal that the fd passed in, already part of the pollset, - * has had its flags altered. - */ - void update(const apr_pollfd_t* const fd); - /** - * Add an fd back to the poll set after deactivation. - */ - void reactivate(const apr_pollfd_t* const fd); - /** - * Temporarily remove the fd from the poll set. Called when processing - * is about to begin. - */ - void deactivate(const apr_pollfd_t* const fd); - /** - * Indicates whether the capacity of this processor has been - * reached (or whether it can still handle further fd's). - */ - bool full(); - /** - * Indicates whether there are any fd's registered. - */ - bool empty(); - /** - * Stop processing. - */ - void stop(); - /** - * Start processing. - */ - void start(); - /** - * Is processing stopped? - */ - bool isStopped(); - - ~LFProcessor(); - }; - -} -} - - -#endif diff --git a/cpp/src/qpid/sys/apr/LFSessionContext.cpp b/cpp/src/qpid/sys/apr/LFSessionContext.cpp deleted file mode 100644 index 4e708fd747..0000000000 --- a/cpp/src/qpid/sys/apr/LFSessionContext.cpp +++ /dev/null @@ -1,170 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "LFSessionContext.h" -#include "APRBase.h" -#include "qpid/QpidError.h" -#include "qpid/log/Statement.h" -#include <assert.h> - -using namespace qpid::sys; -using namespace qpid::sys; -using namespace qpid::framing; - -LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket, - LFProcessor* const _processor, - bool _debug) : - debug(_debug), - socket(_socket), - initiated(false), - in(65536), - out(65536), - processor(_processor), - processing(false), - closing(false) -{ - - fd.p = _pool; - fd.desc_type = APR_POLL_SOCKET; - fd.reqevents = APR_POLLIN; - fd.client_data = this; - fd.desc.s = _socket; - - out.flip(); -} - -LFSessionContext::~LFSessionContext(){ - -} - -void LFSessionContext::read(){ - socket.read(in); - in.flip(); - if(initiated){ - AMQFrame frame; - try{ - while(frame.decode(in)){ - QPID_LOG(debug, "RECV: " << frame); - handler->received(frame); - } - }catch(const std::exception& e){ - QPID_LOG(error, e.what()); - } - }else{ - ProtocolInitiation protocolInit; - if(protocolInit.decode(in)){ - handler->initiated(protocolInit); - initiated = true; - QPID_LOG(debug, "INIT [" << &socket << "]"); - } - } - in.compact(); -} - -void LFSessionContext::write(){ - bool done = isClosed(); - while(!done){ - if(out.available() > 0){ - socket.write(out); - if(out.available() > 0){ - - //incomplete write, leave flags to receive notification of readiness to write - done = true;//finished processing for now, but write is still in progress - } - }else{ - //do we have any frames to write? - Mutex::ScopedLock l(writeLock); - if(!framesToWrite.empty()){ - out.clear(); - bool encoded(false); - while(!framesToWrite.empty() && out.available() >= framesToWrite.front().size()){ - AMQFrame& frame = framesToWrite.front(); - encoded = true; - frame.encode(out); - QPID_LOG(debug, "SENT: " << frame); - framesToWrite.pop(); - } - if(!encoded) THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer."); - out.flip(); - }else{ - //reset flags, don't care about writability anymore - fd.reqevents = APR_POLLIN; - done = true; - - if(closing){ - socket.close(); - } - } - } - } -} - -void LFSessionContext::send(AMQFrame& frame){ - Mutex::ScopedLock l(writeLock); - if(!closing){ - framesToWrite.push(frame); - if(!(fd.reqevents & APR_POLLOUT)){ - fd.reqevents |= APR_POLLOUT; - if(!processing){ - processor->update(&fd); - } - } - } -} - -void LFSessionContext::startProcessing(){ - Mutex::ScopedLock l(writeLock); - processing = true; - processor->deactivate(&fd); -} - -void LFSessionContext::stopProcessing(){ - Mutex::ScopedLock l(writeLock); - processor->reactivate(&fd); - processing = false; -} - -void LFSessionContext::close(){ - Mutex::ScopedLock l(writeLock); - closing = true; - if(!processing){ - //allow pending frames to be written to socket - fd.reqevents = APR_POLLOUT; - processor->update(&fd); - } -} - -void LFSessionContext::handleClose(){ - handler->closed(); - QPID_LOG(info, "Session closed [" << &socket << "]"); - delete handler; - delete this; -} - -void LFSessionContext::shutdown(){ - socket.close(); - handleClose(); -} - -void LFSessionContext::init(ConnectionInputHandler* _handler){ - handler = _handler; - processor->add(&fd); -} - diff --git a/cpp/src/qpid/sys/apr/LFSessionContext.h b/cpp/src/qpid/sys/apr/LFSessionContext.h deleted file mode 100644 index 0ff80eccec..0000000000 --- a/cpp/src/qpid/sys/apr/LFSessionContext.h +++ /dev/null @@ -1,87 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#ifndef _LFSessionContext_ -#define _LFSessionContext_ - -#include <queue> - -#include <apr_network_io.h> -#include <apr_poll.h> -#include <apr_time.h> - -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/FrameHandler.h" -#include "qpid/framing/Buffer.h" -#include "qpid/sys/Monitor.h" -#include "qpid/sys/Mutex.h" -#include "qpid/sys/ConnectionOutputHandler.h" -#include "qpid/sys/ConnectionInputHandler.h" - -#include "APRSocket.h" -#include "LFProcessor.h" - -namespace qpid { -namespace sys { - - -class LFSessionContext : public virtual qpid::sys::ConnectionOutputHandler -{ - const bool debug; - APRSocket socket; - bool initiated; - - qpid::framing::Buffer in; - qpid::framing::Buffer out; - - qpid::sys::ConnectionInputHandler* handler; - LFProcessor* const processor; - - apr_pollfd_t fd; - - std::queue<qpid::framing::AMQFrame> framesToWrite; - qpid::sys::Mutex writeLock; - - bool processing; - bool closing; - - public: - LFSessionContext(apr_pool_t* pool, apr_socket_t* socket, - LFProcessor* const processor, - bool debug = false); - virtual ~LFSessionContext(); - virtual void send(framing::AMQFrame& frame); - virtual void close(); - void read(); - void write(); - void init(qpid::sys::ConnectionInputHandler* handler); - void startProcessing(); - void stopProcessing(); - void handleClose(); - void shutdown(); - inline apr_pollfd_t* const getFd(){ return &fd; } - inline bool isClosed(){ return !socket.isOpen(); } -}; - -} -} - - -#endif |
