summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/io/Acceptor.cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-10-31 19:53:55 +0000
committerAlan Conway <aconway@apache.org>2006-10-31 19:53:55 +0000
commit9094d2b10ecadd66fa3b22169183e7573cc79629 (patch)
treebf3915f72be2a5f09932b800d2fa4309fb3ad64e /cpp/src/qpid/io/Acceptor.cpp
parent0487ea40bc6568765cdec75a36273eeb26fae854 (diff)
downloadqpid-python-9094d2b10ecadd66fa3b22169183e7573cc79629.tar.gz
IO refactor phase 1. Reduced dependencies, removed redundant classes.
Renamed pricipal APR classes in preparation for move to apr namespace. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@469625 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/io/Acceptor.cpp')
-rw-r--r--cpp/src/qpid/io/Acceptor.cpp61
1 files changed, 59 insertions, 2 deletions
diff --git a/cpp/src/qpid/io/Acceptor.cpp b/cpp/src/qpid/io/Acceptor.cpp
index 6b76bd4da2..f95d9448cf 100644
--- a/cpp/src/qpid/io/Acceptor.cpp
+++ b/cpp/src/qpid/io/Acceptor.cpp
@@ -15,7 +15,64 @@
* limitations under the License.
*
*/
-
#include "qpid/io/Acceptor.h"
+#include "qpid/concurrent/APRBase.h"
+#include "APRPool.h"
+
+using namespace qpid::concurrent;
+using namespace qpid::io;
+
+Acceptor::Acceptor(int16_t port_, int backlog, int threads) :
+ port(port_),
+ processor(APRPool::get(), threads, 1000, 5000000)
+{
+ apr_sockaddr_t* address;
+ CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, port, APR_IPV4_ADDR_OK, APRPool::get()));
+ CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, APRPool::get()));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1));
+ CHECK_APR_SUCCESS(apr_socket_bind(socket, address));
+ CHECK_APR_SUCCESS(apr_socket_listen(socket, backlog));
+}
+
+int16_t Acceptor::getPort() const {
+ apr_sockaddr_t* address;
+ CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket));
+ return address->port;
+}
+
+void Acceptor::run(SessionHandlerFactory* factory) {
+ running = true;
+ processor.start();
+ std::cout << "Listening on port " << getPort() << "..." << std::endl;
+ while(running){
+ apr_socket_t* client;
+ apr_status_t status = apr_socket_accept(&client, socket, APRPool::get());
+ if(status == APR_SUCCESS){
+ //make this socket non-blocking:
+ CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768));
+ LFSessionContext* session = new LFSessionContext(APRPool::get(), client, &processor, false);
+ session->init(factory->create(session));
+ }else{
+ running = false;
+ if(status != APR_EINTR){
+ std::cout << "ERROR: " << get_desc(status) << std::endl;
+ }
+ }
+ }
+ shutdown();
+}
+
+void Acceptor::shutdown() {
+ // TODO aconway 2006-10-12: Cleanup, this is not thread safe.
+ if (running) {
+ running = false;
+ processor.stop();
+ CHECK_APR_SUCCESS(apr_socket_close(socket));
+ }
+}
+
-qpid::io::Acceptor::~Acceptor() {}