summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/io
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2006-10-16 13:50:26 +0000
committerAlan Conway <aconway@apache.org>2006-10-16 13:50:26 +0000
commit8a6ab3aa61d441b9210c05c84dc9998acfc38737 (patch)
tree1eb9d7f39b5c2d04a85a1f66caef3d398567b740 /cpp/src/qpid/io
parent9a808fb13aba243d41bbdab75158dae5939a80a4 (diff)
downloadqpid-python-8a6ab3aa61d441b9210c05c84dc9998acfc38737.tar.gz
Build system reorg, see README and Makefile comments for details.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@464494 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/io')
-rw-r--r--cpp/src/qpid/io/APRConnector.cpp201
-rw-r--r--cpp/src/qpid/io/APRConnector.h95
-rw-r--r--cpp/src/qpid/io/APRSocket.cpp78
-rw-r--r--cpp/src/qpid/io/APRSocket.h45
-rw-r--r--cpp/src/qpid/io/Acceptor.cpp21
-rw-r--r--cpp/src/qpid/io/Acceptor.h53
-rw-r--r--cpp/src/qpid/io/BlockingAPRAcceptor.cpp101
-rw-r--r--cpp/src/qpid/io/BlockingAPRAcceptor.h65
-rw-r--r--cpp/src/qpid/io/BlockingAPRSessionContext.cpp178
-rw-r--r--cpp/src/qpid/io/BlockingAPRSessionContext.h94
-rw-r--r--cpp/src/qpid/io/Connector.h56
-rw-r--r--cpp/src/qpid/io/ConnectorImpl.h53
-rw-r--r--cpp/src/qpid/io/LConnector.h48
-rw-r--r--cpp/src/qpid/io/LFAcceptor.cpp94
-rw-r--r--cpp/src/qpid/io/LFAcceptor.h74
-rw-r--r--cpp/src/qpid/io/LFProcessor.cpp193
-rw-r--r--cpp/src/qpid/io/LFProcessor.h119
-rw-r--r--cpp/src/qpid/io/LFSessionContext.cpp189
-rw-r--r--cpp/src/qpid/io/LFSessionContext.h88
-rw-r--r--cpp/src/qpid/io/SessionContext.h37
-rw-r--r--cpp/src/qpid/io/SessionHandler.h42
-rw-r--r--cpp/src/qpid/io/SessionHandlerFactory.h38
-rw-r--r--cpp/src/qpid/io/SessionManager.h40
-rw-r--r--cpp/src/qpid/io/ShutdownHandler.h34
-rw-r--r--cpp/src/qpid/io/TimeoutHandler.h36
25 files changed, 2072 insertions, 0 deletions
diff --git a/cpp/src/qpid/io/APRConnector.cpp b/cpp/src/qpid/io/APRConnector.cpp
new file mode 100644
index 0000000000..91cf01c842
--- /dev/null
+++ b/cpp/src/qpid/io/APRConnector.cpp
@@ -0,0 +1,201 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <iostream>
+#include "qpid/concurrent/APRBase.h"
+#include "qpid/io/APRConnector.h"
+#include "qpid/concurrent/APRThreadFactory.h"
+#include "qpid/QpidError.h"
+
+using namespace qpid::io;
+using namespace qpid::concurrent;
+using namespace qpid::framing;
+using qpid::QpidError;
+
+APRConnector::APRConnector(bool _debug, u_int32_t buffer_size) :
+ debug(_debug),
+ receive_buffer_size(buffer_size),
+ send_buffer_size(buffer_size),
+ closed(true),
+ lastIn(0), lastOut(0),
+ timeout(0),
+ idleIn(0), idleOut(0),
+ timeoutHandler(0),
+ shutdownHandler(0),
+ inbuf(receive_buffer_size),
+ outbuf(send_buffer_size){
+
+ APRBase::increment();
+
+ CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
+ CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, pool));
+
+ threadFactory = new APRThreadFactory();
+ writeLock = new APRMonitor();
+}
+
+APRConnector::~APRConnector(){
+ delete receiver;
+ delete writeLock;
+ delete threadFactory;
+ apr_pool_destroy(pool);
+
+ APRBase::decrement();
+}
+
+void APRConnector::connect(const std::string& host, int port){
+ apr_sockaddr_t* address;
+ CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, host.c_str(), APR_UNSPEC, port, APR_IPV4_ADDR_OK, pool));
+ CHECK_APR_SUCCESS(apr_socket_connect(socket, address));
+ closed = false;
+
+ receiver = threadFactory->create(this);
+ receiver->start();
+}
+
+void APRConnector::init(ProtocolInitiation* header){
+ writeBlock(header);
+ delete header;
+}
+
+void APRConnector::close(){
+ closed = true;
+ CHECK_APR_SUCCESS(apr_socket_close(socket));
+ receiver->join();
+}
+
+void APRConnector::setInputHandler(InputHandler* handler){
+ input = handler;
+}
+
+void APRConnector::setShutdownHandler(ShutdownHandler* handler){
+ shutdownHandler = handler;
+}
+
+OutputHandler* APRConnector::getOutputHandler(){
+ return this;
+}
+
+void APRConnector::send(AMQFrame* frame){
+ writeBlock(frame);
+ if(debug) std::cout << "SENT: " << *frame << std::endl;
+ delete frame;
+}
+
+void APRConnector::writeBlock(AMQDataBlock* data){
+ writeLock->acquire();
+ data->encode(outbuf);
+
+ //transfer data to wire
+ outbuf.flip();
+ writeToSocket(outbuf.start(), outbuf.available());
+ outbuf.clear();
+ writeLock->release();
+}
+
+void APRConnector::writeToSocket(char* data, size_t available){
+ apr_size_t bytes(available);
+ apr_size_t written(0);
+ while(written < available && !closed){
+ apr_status_t status = apr_socket_send(socket, data + written, &bytes);
+ if(status == APR_TIMEUP){
+ std::cout << "Write request timed out." << std::endl;
+ }
+ if(bytes == 0){
+ std::cout << "Write request wrote 0 bytes." << std::endl;
+ }
+ lastOut = apr_time_as_msec(apr_time_now());
+ written += bytes;
+ bytes = available - written;
+ }
+}
+
+void APRConnector::checkIdle(apr_status_t status){
+ if(timeoutHandler){
+ apr_time_t now = apr_time_as_msec(apr_time_now());
+ if(APR_STATUS_IS_TIMEUP(status)){
+ if(idleIn && (now - lastIn > idleIn)){
+ timeoutHandler->idleIn();
+ }
+ }else if(APR_STATUS_IS_EOF(status)){
+ closed = true;
+ CHECK_APR_SUCCESS(apr_socket_close(socket));
+ if(shutdownHandler) shutdownHandler->shutdown();
+ }else{
+ lastIn = now;
+ }
+ if(idleOut && (now - lastOut > idleOut)){
+ timeoutHandler->idleOut();
+ }
+ }
+}
+
+void APRConnector::setReadTimeout(u_int16_t t){
+ idleIn = t * 1000;//t is in secs
+ if(idleIn && (!timeout || idleIn < timeout)){
+ timeout = idleIn;
+ setSocketTimeout();
+ }
+
+}
+
+void APRConnector::setWriteTimeout(u_int16_t t){
+ idleOut = t * 1000;//t is in secs
+ if(idleOut && (!timeout || idleOut < timeout)){
+ timeout = idleOut;
+ setSocketTimeout();
+ }
+}
+
+void APRConnector::setSocketTimeout(){
+ //interval is in microseconds, timeout in milliseconds
+ //want the interval to be a bit shorter than the timeout, hence multiply
+ //by 800 rather than 1000.
+ apr_interval_time_t interval(timeout * 800);
+ apr_socket_timeout_set(socket, interval);
+}
+
+void APRConnector::setTimeoutHandler(TimeoutHandler* handler){
+ timeoutHandler = handler;
+}
+
+void APRConnector::run(){
+ try{
+ while(!closed){
+ apr_size_t bytes(inbuf.available());
+ if(bytes < 1){
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
+ }
+ checkIdle(apr_socket_recv(socket, inbuf.start(), &bytes));
+
+ if(bytes > 0){
+ inbuf.move(bytes);
+ inbuf.flip();//position = 0, limit = total data read
+
+ AMQFrame frame;
+ while(frame.decode(inbuf)){
+ if(debug) std::cout << "RECV: " << frame << std::endl;
+ input->received(&frame);
+ }
+ //need to compact buffer to preserve any 'extra' data
+ inbuf.compact();
+ }
+ }
+ }catch(QpidError error){
+ std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ }
+}
diff --git a/cpp/src/qpid/io/APRConnector.h b/cpp/src/qpid/io/APRConnector.h
new file mode 100644
index 0000000000..e097fc27eb
--- /dev/null
+++ b/cpp/src/qpid/io/APRConnector.h
@@ -0,0 +1,95 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _APRConnector_
+#define _APRConnector_
+
+#include "apr_network_io.h"
+#include "apr_time.h"
+
+#include "qpid/framing/InputHandler.h"
+#include "qpid/framing/OutputHandler.h"
+#include "qpid/framing/InitiationHandler.h"
+#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/io/ShutdownHandler.h"
+#include "qpid/concurrent/Thread.h"
+#include "qpid/concurrent/ThreadFactory.h"
+#include "qpid/io/Connector.h"
+#include "qpid/concurrent/APRMonitor.h"
+
+namespace qpid {
+namespace io {
+
+ class APRConnector : public virtual qpid::framing::OutputHandler,
+ public virtual Connector,
+ private virtual qpid::concurrent::Runnable
+ {
+ const bool debug;
+ const int receive_buffer_size;
+ const int send_buffer_size;
+
+ bool closed;
+
+ apr_time_t lastIn;
+ apr_time_t lastOut;
+ apr_interval_time_t timeout;
+ u_int32_t idleIn;
+ u_int32_t idleOut;
+
+ TimeoutHandler* timeoutHandler;
+ ShutdownHandler* shutdownHandler;
+ qpid::framing::InputHandler* input;
+ qpid::framing::InitiationHandler* initialiser;
+ qpid::framing::OutputHandler* output;
+
+ qpid::framing::Buffer inbuf;
+ qpid::framing::Buffer outbuf;
+
+ qpid::concurrent::APRMonitor* writeLock;
+ qpid::concurrent::ThreadFactory* threadFactory;
+ qpid::concurrent::Thread* receiver;
+
+ apr_pool_t* pool;
+ apr_socket_t* socket;
+
+ void checkIdle(apr_status_t status);
+ void writeBlock(qpid::framing::AMQDataBlock* data);
+ void writeToSocket(char* data, size_t available);
+ void setSocketTimeout();
+
+ void run();
+
+ public:
+ APRConnector(bool debug = false, u_int32_t buffer_size = 1024);
+ virtual ~APRConnector();
+ virtual void connect(const std::string& host, int port);
+ virtual void init(qpid::framing::ProtocolInitiation* header);
+ virtual void close();
+ virtual void setInputHandler(qpid::framing::InputHandler* handler);
+ virtual void setTimeoutHandler(TimeoutHandler* handler);
+ virtual void setShutdownHandler(ShutdownHandler* handler);
+ virtual qpid::framing::OutputHandler* getOutputHandler();
+ virtual void send(qpid::framing::AMQFrame* frame);
+ virtual void setReadTimeout(u_int16_t timeout);
+ virtual void setWriteTimeout(u_int16_t timeout);
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/io/APRSocket.cpp b/cpp/src/qpid/io/APRSocket.cpp
new file mode 100644
index 0000000000..c142b04df3
--- /dev/null
+++ b/cpp/src/qpid/io/APRSocket.cpp
@@ -0,0 +1,78 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/concurrent/APRBase.h"
+#include "qpid/io/APRSocket.h"
+#include <assert.h>
+#include <iostream>
+
+using namespace qpid::io;
+using namespace qpid::framing;
+using namespace qpid::concurrent;
+
+APRSocket::APRSocket(apr_socket_t* _socket) : socket(_socket), closed(false){
+
+}
+
+void APRSocket::read(qpid::framing::Buffer& buffer){
+ apr_size_t bytes;
+ bytes = buffer.available();
+ apr_status_t s = apr_socket_recv(socket, buffer.start(), &bytes);
+ buffer.move(bytes);
+ if(APR_STATUS_IS_TIMEUP(s)){
+ //timed out
+ }else if(APR_STATUS_IS_EOF(s)){
+ close();
+ }
+}
+
+void APRSocket::write(qpid::framing::Buffer& buffer){
+ apr_size_t bytes;
+ do{
+ bytes = buffer.available();
+ apr_status_t s = apr_socket_send(socket, buffer.start(), &bytes);
+ // TODO aconway 2006-10-05: better error handling
+ assert(s == 0);
+ buffer.move(bytes);
+ }while(bytes > 0);
+}
+
+void APRSocket::close(){
+ if(!closed){
+ std::cout << "Closing socket " << socket << "@" << this << std::endl;
+ CHECK_APR_SUCCESS(apr_socket_close(socket));
+ closed = true;
+ }
+}
+
+bool APRSocket::isOpen(){
+ return !closed;
+}
+
+u_int8_t APRSocket::read(){
+ char data[1];
+ apr_size_t bytes = 1;
+ apr_status_t s = apr_socket_recv(socket, data, &bytes);
+ if(APR_STATUS_IS_EOF(s) || bytes == 0){
+ return 0;
+ }else{
+ return *data;
+ }
+}
+
+APRSocket::~APRSocket(){
+}
diff --git a/cpp/src/qpid/io/APRSocket.h b/cpp/src/qpid/io/APRSocket.h
new file mode 100644
index 0000000000..742f958b8a
--- /dev/null
+++ b/cpp/src/qpid/io/APRSocket.h
@@ -0,0 +1,45 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _APRSocket_
+#define _APRSocket_
+
+#include "apr_network_io.h"
+#include "qpid/framing/Buffer.h"
+
+namespace qpid {
+namespace io {
+
+ class APRSocket
+ {
+ apr_socket_t* const socket;
+ volatile bool closed;
+ public:
+ APRSocket(apr_socket_t* socket);
+ void read(qpid::framing::Buffer& b);
+ void write(qpid::framing::Buffer& b);
+ void close();
+ bool isOpen();
+ u_int8_t read();
+ ~APRSocket();
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/io/Acceptor.cpp b/cpp/src/qpid/io/Acceptor.cpp
new file mode 100644
index 0000000000..6b76bd4da2
--- /dev/null
+++ b/cpp/src/qpid/io/Acceptor.cpp
@@ -0,0 +1,21 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/io/Acceptor.h"
+
+qpid::io::Acceptor::~Acceptor() {}
diff --git a/cpp/src/qpid/io/Acceptor.h b/cpp/src/qpid/io/Acceptor.h
new file mode 100644
index 0000000000..a7f7ad66f0
--- /dev/null
+++ b/cpp/src/qpid/io/Acceptor.h
@@ -0,0 +1,53 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Acceptor_
+#define _Acceptor_
+
+#include "qpid/io/SessionHandlerFactory.h"
+
+namespace qpid {
+namespace io {
+
+ class Acceptor
+ {
+ public:
+ /**
+ * Bind to port.
+ * @param port Port to bind to, 0 to bind to dynamically chosen port.
+ * @return The local bound port.
+ */
+ virtual int16_t bind(int16_t port) = 0;
+
+ /**
+ * Run the acceptor.
+ */
+ virtual void run(SessionHandlerFactory* factory) = 0;
+
+ /**
+ * Shut down the acceptor.
+ */
+ virtual void shutdown() = 0;
+
+ virtual ~Acceptor();
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/io/BlockingAPRAcceptor.cpp b/cpp/src/qpid/io/BlockingAPRAcceptor.cpp
new file mode 100644
index 0000000000..0e1fc535a2
--- /dev/null
+++ b/cpp/src/qpid/io/BlockingAPRAcceptor.cpp
@@ -0,0 +1,101 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <iostream>
+#include "qpid/io/BlockingAPRAcceptor.h"
+#include "qpid/concurrent/APRBase.h"
+#include "qpid/concurrent/APRThreadFactory.h"
+
+using namespace qpid::concurrent;
+using namespace qpid::framing;
+using namespace qpid::io;
+
+BlockingAPRAcceptor::BlockingAPRAcceptor(bool _debug, int c) :
+ debug(_debug),
+ threadFactory(new APRThreadFactory()),
+ connectionBacklog(c)
+{
+ APRBase::increment();
+ CHECK_APR_SUCCESS(apr_pool_create(&apr_pool, NULL));
+}
+
+int16_t BlockingAPRAcceptor::bind(int16_t _port){
+ apr_sockaddr_t* address;
+ CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, _port, APR_IPV4_ADDR_OK, apr_pool));
+ CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, apr_pool));
+ CHECK_APR_SUCCESS(apr_socket_bind(socket, address));
+ CHECK_APR_SUCCESS(apr_socket_listen(socket, connectionBacklog));
+ return getPort();
+}
+
+int16_t BlockingAPRAcceptor::getPort() const {
+ apr_sockaddr_t* address;
+ CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket));
+ return address->port;
+}
+
+void BlockingAPRAcceptor::run(SessionHandlerFactory* factory)
+{
+ running = true;
+ std::cout << "Listening on port " << getPort() << "..." << std::endl;
+ while(running){
+ apr_socket_t* client;
+ apr_status_t status = apr_socket_accept(&client, socket, apr_pool);
+ if(status == APR_SUCCESS){
+ //configure socket:
+ CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 1000000/* i.e. 1 sec*/));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768));
+
+ BlockingAPRSessionContext* session = new BlockingAPRSessionContext(client, threadFactory, this, debug);
+ session->init(factory->create(session));
+ sessions.push_back(session);
+ }else{
+ running = false;
+ if(status != APR_EINTR){
+ std::cout << "ERROR: " << get_desc(status) << std::endl;
+ }
+ }
+ }
+ shutdown();
+}
+
+void BlockingAPRAcceptor::shutdown()
+{
+ // TODO aconway 2006-10-12: Not thread safe.
+ if (running)
+ {
+ running = false;
+ apr_socket_close(socket); // Don't check, exception safety.
+ for(iterator i = sessions.begin(); i < sessions.end(); i++){
+ (*i)->shutdown();
+ }
+ }
+}
+
+BlockingAPRAcceptor::~BlockingAPRAcceptor(){
+ delete threadFactory;
+ apr_pool_destroy(apr_pool);
+ APRBase::decrement();
+}
+
+
+void BlockingAPRAcceptor::closed(BlockingAPRSessionContext* session){
+ sessions.erase(find(sessions.begin(), sessions.end(), session));
+}
+
diff --git a/cpp/src/qpid/io/BlockingAPRAcceptor.h b/cpp/src/qpid/io/BlockingAPRAcceptor.h
new file mode 100644
index 0000000000..8c83c726c9
--- /dev/null
+++ b/cpp/src/qpid/io/BlockingAPRAcceptor.h
@@ -0,0 +1,65 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _BlockingAPRAcceptor_
+#define _BlockingAPRAcceptor_
+
+#include <vector>
+#include "apr_network_io.h"
+#include "apr_poll.h"
+#include "apr_time.h"
+
+#include "qpid/io/Acceptor.h"
+#include "qpid/concurrent/APRMonitor.h"
+#include "qpid/io/BlockingAPRSessionContext.h"
+#include "qpid/concurrent/Runnable.h"
+#include "qpid/io/SessionContext.h"
+#include "qpid/io/SessionHandlerFactory.h"
+#include "qpid/concurrent/Thread.h"
+#include "qpid/concurrent/ThreadFactory.h"
+#include "qpid/concurrent/ThreadPool.h"
+
+namespace qpid {
+namespace io {
+
+ class BlockingAPRAcceptor : public virtual Acceptor
+ {
+ typedef std::vector<BlockingAPRSessionContext*>::iterator iterator;
+
+ const bool debug;
+ apr_pool_t* apr_pool;
+ qpid::concurrent::ThreadFactory* threadFactory;
+ std::vector<BlockingAPRSessionContext*> sessions;
+ apr_socket_t* socket;
+ const int connectionBacklog;
+ volatile bool running;
+
+ public:
+ BlockingAPRAcceptor(bool debug = false, int connectionBacklog = 10);
+ virtual int16_t bind(int16_t port);
+ virtual int16_t getPort() const;
+ virtual void run(SessionHandlerFactory* factory);
+ virtual void shutdown();
+ virtual ~BlockingAPRAcceptor();
+ void closed(BlockingAPRSessionContext* session);
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/io/BlockingAPRSessionContext.cpp b/cpp/src/qpid/io/BlockingAPRSessionContext.cpp
new file mode 100644
index 0000000000..aee223ca3b
--- /dev/null
+++ b/cpp/src/qpid/io/BlockingAPRSessionContext.cpp
@@ -0,0 +1,178 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include <assert.h>
+#include <iostream>
+#include "qpid/io/BlockingAPRSessionContext.h"
+#include "qpid/io/BlockingAPRAcceptor.h"
+#include "qpid/concurrent/APRBase.h"
+#include "qpid/QpidError.h"
+
+using namespace qpid::concurrent;
+using namespace qpid::framing;
+using namespace qpid::io;
+
+
+BlockingAPRSessionContext::BlockingAPRSessionContext(apr_socket_t* _socket,
+ ThreadFactory* factory,
+ BlockingAPRAcceptor* _acceptor,
+ bool _debug)
+ : socket(_socket),
+ debug(_debug),
+ handler(0),
+ acceptor(_acceptor),
+ inbuf(65536),
+ outbuf(65536),
+ closed(false){
+
+ reader = new Reader(this);
+ writer = new Writer(this);
+
+ rThread = factory->create(reader);
+ wThread = factory->create(writer);
+}
+
+BlockingAPRSessionContext::~BlockingAPRSessionContext(){
+ delete reader;
+ delete writer;
+
+ delete rThread;
+ delete wThread;
+
+ delete handler;
+}
+
+void BlockingAPRSessionContext::read(){
+ try{
+ bool initiated(false);
+ while(!closed){
+ apr_size_t bytes(inbuf.available());
+ if(bytes < 1){
+ THROW_QPID_ERROR(INTERNAL_ERROR, "Frame exceeds buffer size.");
+ }
+ apr_status_t s = apr_socket_recv(socket, inbuf.start(), &bytes);
+ if(APR_STATUS_IS_TIMEUP(s)){
+ //timed out, check closed on loop
+ }else if(APR_STATUS_IS_EOF(s) || bytes == 0){
+ closed = true;
+ }else{
+ inbuf.move(bytes);
+ inbuf.flip();
+
+ if(!initiated){
+ ProtocolInitiation* protocolInit = new ProtocolInitiation();
+ if(protocolInit->decode(inbuf)){
+ handler->initiated(protocolInit);
+ if(debug) std::cout << "RECV: [" << &socket << "]: Initialised " << std::endl;
+ initiated = true;
+ }
+ }else{
+ AMQFrame frame;
+ while(frame.decode(inbuf)){
+ if(debug) std::cout << "RECV: [" << &socket << "]:" << frame << std::endl;
+ handler->received(&frame);
+ }
+ }
+ //need to compact buffer to preserve any 'extra' data
+ inbuf.compact();
+ }
+ }
+
+ //close socket
+ }catch(qpid::QpidError error){
+ std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ }
+}
+
+void BlockingAPRSessionContext::write(){
+ while(!closed){
+ //get next frame
+ outlock.acquire();
+ while(outframes.empty() && !closed){
+ outlock.wait();
+ }
+ if(!closed){
+ AMQFrame* frame = outframes.front();
+ outframes.pop();
+ outlock.release();
+
+ //encode
+ frame->encode(outbuf);
+ if(debug) std::cout << "SENT [" << &socket << "]:" << *frame << std::endl;
+ delete frame;
+ outbuf.flip();
+
+ //write from outbuf to socket
+ char* data = outbuf.start();
+ const int available = outbuf.available();
+ int written = 0;
+ apr_size_t bytes = available;
+ while(available > written){
+ apr_status_t s = apr_socket_send(socket, data + written, &bytes);
+ assert(s == 0); // TODO aconway 2006-10-05: Error Handling.
+ written += bytes;
+ bytes = available - written;
+ }
+ outbuf.clear();
+ }else{
+ outlock.release();
+ }
+ }
+}
+
+void BlockingAPRSessionContext::send(AMQFrame* frame){
+ if(!closed){
+ outlock.acquire();
+ bool was_empty(outframes.empty());
+ outframes.push(frame);
+ if(was_empty){
+ outlock.notify();
+ }
+ outlock.release();
+ }else{
+ std::cout << "WARNING: Session closed[" << &socket << "], dropping frame. " << &frame << std::endl;
+ }
+}
+
+void BlockingAPRSessionContext::init(SessionHandler* _handler){
+ handler = _handler;
+ rThread->start();
+ wThread->start();
+}
+
+void BlockingAPRSessionContext::close(){
+ closed = true;
+ wThread->join();
+ CHECK_APR_SUCCESS(apr_socket_close(socket));
+ if(debug) std::cout << "RECV: [" << &socket << "]: Closed " << std::endl;
+ handler->closed();
+ acceptor->closed(this);
+ delete this;
+}
+
+void BlockingAPRSessionContext::shutdown(){
+ closed = true;
+ outlock.acquire();
+ outlock.notify();
+ outlock.release();
+
+ wThread->join();
+ CHECK_APR_SUCCESS(apr_socket_close(socket));
+ rThread->join();
+ handler->closed();
+ delete this;
+}
diff --git a/cpp/src/qpid/io/BlockingAPRSessionContext.h b/cpp/src/qpid/io/BlockingAPRSessionContext.h
new file mode 100644
index 0000000000..006b73b55c
--- /dev/null
+++ b/cpp/src/qpid/io/BlockingAPRSessionContext.h
@@ -0,0 +1,94 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _BlockingAPRSessionContext_
+#define _BlockingAPRSessionContext_
+
+#include <queue>
+#include <vector>
+
+#include "apr_network_io.h"
+#include "apr_time.h"
+
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/concurrent/APRMonitor.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/concurrent/Runnable.h"
+#include "qpid/io/SessionContext.h"
+#include "qpid/io/SessionHandler.h"
+#include "qpid/io/SessionHandlerFactory.h"
+#include "qpid/io/ShutdownHandler.h"
+#include "qpid/concurrent/Thread.h"
+#include "qpid/concurrent/ThreadFactory.h"
+
+namespace qpid {
+namespace io {
+
+ class BlockingAPRAcceptor;
+
+ class BlockingAPRSessionContext : public virtual SessionContext
+ {
+ class Reader : public virtual qpid::concurrent::Runnable{
+ BlockingAPRSessionContext* parent;
+ public:
+ inline Reader(BlockingAPRSessionContext* p) : parent(p){}
+ inline virtual void run(){ parent->read(); }
+ inline virtual ~Reader(){}
+ };
+
+ class Writer : public virtual qpid::concurrent::Runnable{
+ BlockingAPRSessionContext* parent;
+ public:
+ inline Writer(BlockingAPRSessionContext* p) : parent(p){}
+ inline virtual void run(){ parent->write(); }
+ inline virtual ~Writer(){}
+ };
+
+ apr_socket_t* socket;
+ const bool debug;
+ SessionHandler* handler;
+ BlockingAPRAcceptor* acceptor;
+ std::queue<qpid::framing::AMQFrame*> outframes;
+ qpid::framing::Buffer inbuf;
+ qpid::framing::Buffer outbuf;
+ qpid::concurrent::APRMonitor outlock;
+ Reader* reader;
+ Writer* writer;
+ qpid::concurrent::Thread* rThread;
+ qpid::concurrent::Thread* wThread;
+
+ volatile bool closed;
+
+ void read();
+ void write();
+ public:
+ BlockingAPRSessionContext(apr_socket_t* socket,
+ qpid::concurrent::ThreadFactory* factory,
+ BlockingAPRAcceptor* acceptor,
+ bool debug = false);
+ ~BlockingAPRSessionContext();
+ virtual void send(qpid::framing::AMQFrame* frame);
+ virtual void close();
+ void shutdown();
+ void init(SessionHandler* handler);
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/io/Connector.h b/cpp/src/qpid/io/Connector.h
new file mode 100644
index 0000000000..d0a2f470a8
--- /dev/null
+++ b/cpp/src/qpid/io/Connector.h
@@ -0,0 +1,56 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _Connector_
+#define _Connector_
+
+#include "qpid/framing/InputHandler.h"
+#include "qpid/framing/OutputHandler.h"
+#include "qpid/framing/InitiationHandler.h"
+#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/io/ShutdownHandler.h"
+#include "qpid/io/TimeoutHandler.h"
+
+namespace qpid {
+namespace io {
+
+ class Connector
+ {
+ public:
+ virtual void connect(const std::string& host, int port) = 0;
+ virtual void init(qpid::framing::ProtocolInitiation* header) = 0;
+ virtual void close() = 0;
+ virtual void setInputHandler(qpid::framing::InputHandler* handler) = 0;
+ virtual void setTimeoutHandler(TimeoutHandler* handler) = 0;
+ virtual void setShutdownHandler(ShutdownHandler* handler) = 0;
+ virtual qpid::framing::OutputHandler* getOutputHandler() = 0;
+ /**
+ * Set the timeout for reads, in secs.
+ */
+ virtual void setReadTimeout(u_int16_t timeout) = 0;
+ /**
+ * Set the timeout for writes, in secs.
+ */
+ virtual void setWriteTimeout(u_int16_t timeout) = 0;
+ virtual ~Connector(){}
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/io/ConnectorImpl.h b/cpp/src/qpid/io/ConnectorImpl.h
new file mode 100644
index 0000000000..55dcf7a2d4
--- /dev/null
+++ b/cpp/src/qpid/io/ConnectorImpl.h
@@ -0,0 +1,53 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _APRConnectorImpl_
+#define _APRConnectorImpl_
+
+#ifdef _USE_APR_IO_
+#include "qpid/io/APRConnector.h"
+#else
+#include "qpid/io/LConnector.h"
+#endif
+
+namespace qpid {
+namespace io {
+
+#ifdef _USE_APR_IO_
+ class ConnectorImpl : public virtual APRConnector
+ {
+
+ public:
+ ConnectorImpl(bool _debug = false, u_int32_t buffer_size = 1024):APRConnector(_debug,buffer_size){};
+ virtual ~ConnectorImpl(){};
+ };
+#else
+ class ConnectorImpl : public virtual LConnector
+ {
+
+ public:
+ ConnectorImpl(bool _debug = false, u_int32_t buffer_size = 1024):LConnector(_debug, buffer_size){};
+ virtual ~ConnectorImpl(){};
+ };
+
+#endif
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/io/LConnector.h b/cpp/src/qpid/io/LConnector.h
new file mode 100644
index 0000000000..5fc86597bd
--- /dev/null
+++ b/cpp/src/qpid/io/LConnector.h
@@ -0,0 +1,48 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _LConnector_
+#define _LConnector_
+
+
+#include "qpid/framing/InputHandler.h"
+#include "qpid/framing/OutputHandler.h"
+#include "qpid/framing/InitiationHandler.h"
+#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/concurrent/Thread.h"
+#include "qpid/concurrent/ThreadFactory.h"
+#include "qpid/io/Connector.h"
+
+namespace qpid {
+namespace io {
+
+ class LConnector : public virtual qpid::framing::OutputHandler,
+ public virtual Connector,
+ private virtual qpid::concurrent::Runnable
+ {
+
+ public:
+ LConnector(bool debug = false, u_int32_t buffer_size = 1024){};
+ virtual ~LConnector(){};
+
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/io/LFAcceptor.cpp b/cpp/src/qpid/io/LFAcceptor.cpp
new file mode 100644
index 0000000000..7e51a550af
--- /dev/null
+++ b/cpp/src/qpid/io/LFAcceptor.cpp
@@ -0,0 +1,94 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/io/LFAcceptor.h"
+#include "qpid/concurrent/APRBase.h"
+
+using namespace qpid::concurrent;
+using namespace qpid::io;
+
+LFAcceptor::LFAcceptor(bool _debug, int c, int worker_threads, int m) :
+ processor(aprPool.pool, worker_threads, 1000, 5000000),
+ max_connections_per_processor(m),
+ debug(_debug),
+ connectionBacklog(c)
+{ }
+
+
+int16_t LFAcceptor::bind(int16_t _port){
+ apr_sockaddr_t* address;
+ CHECK_APR_SUCCESS(apr_sockaddr_info_get(&address, APR_ANYADDR, APR_UNSPEC, _port, APR_IPV4_ADDR_OK, aprPool.pool));
+ CHECK_APR_SUCCESS(apr_socket_create(&socket, APR_INET, SOCK_STREAM, APR_PROTO_TCP, aprPool.pool));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(socket, APR_SO_REUSEADDR, 1));
+ CHECK_APR_SUCCESS(apr_socket_bind(socket, address));
+ CHECK_APR_SUCCESS(apr_socket_listen(socket, connectionBacklog));
+ return getPort();
+}
+
+int16_t LFAcceptor::getPort() const {
+ apr_sockaddr_t* address;
+ CHECK_APR_SUCCESS(apr_socket_addr_get(&address, APR_LOCAL, socket));
+ return address->port;
+}
+
+void LFAcceptor::run(SessionHandlerFactory* factory) {
+ running = true;
+ processor.start();
+ std::cout << "Listening on port " << getPort() << "..." << std::endl;
+ while(running){
+ apr_socket_t* client;
+ apr_status_t status = apr_socket_accept(&client, socket, aprPool.pool);
+ if(status == APR_SUCCESS){
+ //make this socket non-blocking:
+ CHECK_APR_SUCCESS(apr_socket_timeout_set(client, 0));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_NONBLOCK, 1));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_TCP_NODELAY, 1));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_SNDBUF, 32768));
+ CHECK_APR_SUCCESS(apr_socket_opt_set(client, APR_SO_RCVBUF, 32768));
+ LFSessionContext* session = new LFSessionContext(aprPool.pool, client, &processor, debug);
+ session->init(factory->create(session));
+ }else{
+ running = false;
+ if(status != APR_EINTR){
+ std::cout << "ERROR: " << get_desc(status) << std::endl;
+ }
+ }
+ }
+ shutdown();
+}
+
+void LFAcceptor::shutdown() {
+ // TODO aconway 2006-10-12: Cleanup, this is not thread safe.
+ if (running) {
+ running = false;
+ processor.stop();
+ CHECK_APR_SUCCESS(apr_socket_close(socket));
+ }
+}
+
+
+LFAcceptor::~LFAcceptor(){}
+
+LFAcceptor::APRPool::APRPool(){
+ APRBase::increment();
+ CHECK_APR_SUCCESS(apr_pool_create(&pool, NULL));
+}
+
+LFAcceptor::APRPool::~APRPool(){
+ apr_pool_destroy(pool);
+ APRBase::decrement();
+}
diff --git a/cpp/src/qpid/io/LFAcceptor.h b/cpp/src/qpid/io/LFAcceptor.h
new file mode 100644
index 0000000000..f31b544143
--- /dev/null
+++ b/cpp/src/qpid/io/LFAcceptor.h
@@ -0,0 +1,74 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _LFAcceptor_
+#define _LFAcceptor_
+
+#include <vector>
+#include "apr_network_io.h"
+#include "apr_poll.h"
+#include "apr_time.h"
+
+#include "qpid/io/Acceptor.h"
+#include "qpid/concurrent/APRMonitor.h"
+#include "qpid/concurrent/APRThreadFactory.h"
+#include "qpid/concurrent/APRThreadPool.h"
+#include "qpid/io/LFProcessor.h"
+#include "qpid/io/LFSessionContext.h"
+#include "qpid/concurrent/Runnable.h"
+#include "qpid/io/SessionContext.h"
+#include "qpid/io/SessionHandlerFactory.h"
+#include "qpid/concurrent/Thread.h"
+
+namespace qpid {
+namespace io {
+
+ class LFAcceptor : public virtual Acceptor
+ {
+ class APRPool{
+ public:
+ apr_pool_t* pool;
+ APRPool();
+ ~APRPool();
+ };
+
+ APRPool aprPool;
+ LFProcessor processor;
+ apr_socket_t* socket;
+ const int max_connections_per_processor;
+ const bool debug;
+ const int connectionBacklog;
+
+ volatile bool running;
+
+ public:
+ LFAcceptor(bool debug = false,
+ int connectionBacklog = 10,
+ int worker_threads = 5,
+ int max_connections_per_processor = 500);
+ virtual int16_t bind(int16_t port);
+ virtual int16_t getPort() const;
+ virtual void run(SessionHandlerFactory* factory);
+ virtual void shutdown();
+ virtual ~LFAcceptor();
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/io/LFProcessor.cpp b/cpp/src/qpid/io/LFProcessor.cpp
new file mode 100644
index 0000000000..dba5d6c962
--- /dev/null
+++ b/cpp/src/qpid/io/LFProcessor.cpp
@@ -0,0 +1,193 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/io/LFProcessor.h"
+#include "qpid/concurrent/APRBase.h"
+#include "qpid/io/LFSessionContext.h"
+#include "qpid/QpidError.h"
+#include <sstream>
+
+using namespace qpid::io;
+using namespace qpid::concurrent;
+using qpid::QpidError;
+
+// TODO aconway 2006-10-12: stopped is read outside locks.
+//
+
+LFProcessor::LFProcessor(apr_pool_t* pool, int _workers, int _size, int _timeout) :
+ size(_size),
+ timeout(_timeout),
+ signalledCount(0),
+ current(0),
+ count(0),
+ workerCount(_workers),
+ hasLeader(false),
+ workers(new Thread*[_workers]),
+ stopped(false)
+{
+
+ CHECK_APR_SUCCESS(apr_pollset_create(&pollset, size, pool, APR_POLLSET_THREADSAFE));
+ //create & start the required number of threads
+ for(int i = 0; i < workerCount; i++){
+ workers[i] = factory.create(this);
+ }
+}
+
+
+LFProcessor::~LFProcessor(){
+ if (!stopped) stop();
+ for(int i = 0; i < workerCount; i++){
+ delete workers[i];
+ }
+ delete[] workers;
+ CHECK_APR_SUCCESS(apr_pollset_destroy(pollset));
+}
+
+void LFProcessor::start(){
+ for(int i = 0; i < workerCount; i++){
+ workers[i]->start();
+ }
+}
+
+void LFProcessor::add(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
+ countLock.acquire();
+ sessions.push_back(reinterpret_cast<LFSessionContext*>(fd->client_data));
+ count++;
+ countLock.release();
+}
+
+void LFProcessor::remove(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
+ countLock.acquire();
+ sessions.erase(find(sessions.begin(), sessions.end(), reinterpret_cast<LFSessionContext*>(fd->client_data)));
+ count--;
+ countLock.release();
+}
+
+void LFProcessor::reactivate(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
+}
+
+void LFProcessor::deactivate(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
+}
+
+void LFProcessor::update(const apr_pollfd_t* const fd){
+ CHECK_APR_SUCCESS(apr_pollset_remove(pollset, fd));
+ CHECK_APR_SUCCESS(apr_pollset_add(pollset, fd));
+}
+
+bool LFProcessor::full(){
+ Locker locker(countLock);
+ return count == size;
+}
+
+bool LFProcessor::empty(){
+ Locker locker(countLock);
+ return count == 0;
+}
+
+void LFProcessor::poll(){
+ apr_status_t status;
+ do{
+ current = 0;
+ if(!stopped){
+ status = apr_pollset_poll(pollset, timeout, &signalledCount, &signalledFDs);
+ }
+ }while(status != APR_SUCCESS && !stopped);
+}
+
+void LFProcessor::run(){
+ try{
+ while(!stopped){
+ leadLock.acquire();
+ waitToLead();
+ if(!stopped){
+ const apr_pollfd_t* evt = getNextEvent();
+ if(evt){
+ LFSessionContext* session = reinterpret_cast<LFSessionContext*>(evt->client_data);
+ session->startProcessing();
+
+ relinquishLead();
+ leadLock.release();
+
+ //process event:
+ if(evt->rtnevents & APR_POLLIN) session->read();
+ if(evt->rtnevents & APR_POLLOUT) session->write();
+
+ if(session->isClosed()){
+ session->handleClose();
+ countLock.acquire();
+ sessions.erase(find(sessions.begin(), sessions.end(), session));
+ count--;
+ countLock.release();
+ }else{
+ session->stopProcessing();
+ }
+
+ }else{
+ leadLock.release();
+ }
+ }else{
+ leadLock.release();
+ }
+ }
+ }catch(QpidError error){
+ std::cout << "Error [" << error.code << "] " << error.msg << " (" << error.file << ":" << error.line << ")" << std::endl;
+ }
+}
+
+void LFProcessor::waitToLead(){
+ while(hasLeader && !stopped) leadLock.wait();
+ hasLeader = !stopped;
+}
+
+void LFProcessor::relinquishLead(){
+ hasLeader = false;
+ leadLock.notify();
+}
+
+const apr_pollfd_t* LFProcessor::getNextEvent(){
+ while(true){
+ if(stopped){
+ return 0;
+ }else if(current < signalledCount){
+ //use result of previous poll if one is available
+ return signalledFDs + (current++);
+ }else{
+ //else poll to get new events
+ poll();
+ }
+ }
+}
+
+void LFProcessor::stop(){
+ stopped = true;
+ leadLock.acquire();
+ leadLock.notifyAll();
+ leadLock.release();
+
+ for(int i = 0; i < workerCount; i++){
+ workers[i]->join();
+ }
+
+ for(iterator i = sessions.begin(); i < sessions.end(); i++){
+ (*i)->shutdown();
+ }
+}
+
diff --git a/cpp/src/qpid/io/LFProcessor.h b/cpp/src/qpid/io/LFProcessor.h
new file mode 100644
index 0000000000..e3f533d597
--- /dev/null
+++ b/cpp/src/qpid/io/LFProcessor.h
@@ -0,0 +1,119 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _LFProcessor_
+#define _LFProcessor_
+
+#include "apr_poll.h"
+#include <iostream>
+#include <vector>
+#include "qpid/concurrent/APRMonitor.h"
+#include "qpid/concurrent/APRThreadFactory.h"
+#include "qpid/concurrent/Runnable.h"
+
+namespace qpid {
+namespace io {
+
+ class LFSessionContext;
+
+ /**
+ * This class processes a poll set using the leaders-followers
+ * pattern for thread synchronization: the leader will poll and on
+ * the poll returning, it will remove a session, promote a
+ * follower to leadership, then process the session.
+ */
+ class LFProcessor : private virtual qpid::concurrent::Runnable
+ {
+ typedef std::vector<LFSessionContext*>::iterator iterator;
+
+ const int size;
+ const apr_interval_time_t timeout;
+ apr_pollset_t* pollset;
+ int signalledCount;
+ int current;
+ const apr_pollfd_t* signalledFDs;
+ int count;
+ const int workerCount;
+ bool hasLeader;
+ qpid::concurrent::Thread** const workers;
+ qpid::concurrent::APRMonitor leadLock;
+ qpid::concurrent::APRMonitor countLock;
+ qpid::concurrent::APRThreadFactory factory;
+ std::vector<LFSessionContext*> sessions;
+ volatile bool stopped;
+
+ const apr_pollfd_t* getNextEvent();
+ void waitToLead();
+ void relinquishLead();
+ void poll();
+ virtual void run();
+
+ public:
+ LFProcessor(apr_pool_t* pool, int workers, int size, int timeout);
+ /**
+ * Add the fd to the poll set. Relies on the client_data being
+ * an instance of LFSessionContext.
+ */
+ void add(const apr_pollfd_t* const fd);
+ /**
+ * Remove the fd from the poll set.
+ */
+ void remove(const apr_pollfd_t* const fd);
+ /**
+ * Signal that the fd passed in, already part of the pollset,
+ * has had its flags altered.
+ */
+ void update(const apr_pollfd_t* const fd);
+ /**
+ * Add an fd back to the poll set after deactivation.
+ */
+ void reactivate(const apr_pollfd_t* const fd);
+ /**
+ * Temporarily remove the fd from the poll set. Called when processing
+ * is about to begin.
+ */
+ void deactivate(const apr_pollfd_t* const fd);
+ /**
+ * Indicates whether the capacity of this processor has been
+ * reached (or whether it can still handle further fd's).
+ */
+ bool full();
+ /**
+ * Indicates whether there are any fd's registered.
+ */
+ bool empty();
+ /**
+ * Stop processing.
+ */
+ void stop();
+ /**
+ * Start processing.
+ */
+ void start();
+ /**
+ * Is processing stopped?
+ */
+ bool isStopped();
+
+ ~LFProcessor();
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/io/LFSessionContext.cpp b/cpp/src/qpid/io/LFSessionContext.cpp
new file mode 100644
index 0000000000..6d6d786841
--- /dev/null
+++ b/cpp/src/qpid/io/LFSessionContext.cpp
@@ -0,0 +1,189 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "qpid/io/LFSessionContext.h"
+#include "qpid/concurrent/APRBase.h"
+#include "qpid/QpidError.h"
+#include <assert.h>
+
+using namespace qpid::concurrent;
+using namespace qpid::io;
+using namespace qpid::framing;
+
+LFSessionContext::LFSessionContext(apr_pool_t* _pool, apr_socket_t* _socket,
+ LFProcessor* const _processor,
+ bool _debug) :
+ debug(_debug),
+ socket(_socket),
+ initiated(false),
+ in(32768),
+ out(32768),
+ processor(_processor),
+ processing(false),
+ closing(false),
+ reading(0),
+ writing(0)
+{
+
+ fd.p = _pool;
+ fd.desc_type = APR_POLL_SOCKET;
+ fd.reqevents = APR_POLLIN;
+ fd.client_data = this;
+ fd.desc.s = _socket;
+
+ out.flip();
+}
+
+LFSessionContext::~LFSessionContext(){
+
+}
+
+void LFSessionContext::read(){
+ assert(!reading); // No concurrent read.
+ reading = APRThread::currentThread();
+
+ socket.read(in);
+ in.flip();
+ if(initiated){
+ AMQFrame frame;
+ while(frame.decode(in)){
+ if(debug) log("RECV", &frame);
+ handler->received(&frame);
+ }
+ }else{
+ ProtocolInitiation protocolInit;
+ if(protocolInit.decode(in)){
+ handler->initiated(&protocolInit);
+ initiated = true;
+ if(debug) std::cout << "INIT [" << &socket << "]" << std::endl;
+ }
+ }
+ in.compact();
+
+ reading = 0;
+}
+
+void LFSessionContext::write(){
+ assert(!writing); // No concurrent writes.
+ writing = APRThread::currentThread();
+
+ bool done = isClosed();
+ while(!done){
+ if(out.available() > 0){
+ socket.write(out);
+ if(out.available() > 0){
+ writing = 0;
+
+ //incomplete write, leave flags to receive notification of readiness to write
+ done = true;//finished processing for now, but write is still in progress
+ }
+ }else{
+ //do we have any frames to write?
+ writeLock.acquire();
+ if(!framesToWrite.empty()){
+ out.clear();
+ bool encoded(false);
+ AMQFrame* frame = framesToWrite.front();
+ while(frame && out.available() >= frame->size()){
+ encoded = true;
+ frame->encode(out);
+ if(debug) log("SENT", frame);
+ delete frame;
+ framesToWrite.pop();
+ frame = framesToWrite.empty() ? 0 : framesToWrite.front();
+ }
+ if(!encoded) THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer.");
+ out.flip();
+ }else{
+ //reset flags, don't care about writability anymore
+ fd.reqevents = APR_POLLIN;
+ done = true;
+
+ writing = 0;
+
+ if(closing){
+ socket.close();
+ }
+ }
+ writeLock.release();
+ }
+ }
+}
+
+void LFSessionContext::send(AMQFrame* frame){
+ writeLock.acquire();
+ if(!closing){
+ framesToWrite.push(frame);
+ if(!(fd.reqevents & APR_POLLOUT)){
+ fd.reqevents |= APR_POLLOUT;
+ if(!processing){
+ processor->update(&fd);
+ }
+ }
+ }
+ writeLock.release();
+}
+
+void LFSessionContext::startProcessing(){
+ writeLock.acquire();
+ processing = true;
+ processor->deactivate(&fd);
+ writeLock.release();
+}
+
+void LFSessionContext::stopProcessing(){
+ writeLock.acquire();
+ processor->reactivate(&fd);
+ processing = false;
+ writeLock.release();
+}
+
+void LFSessionContext::close(){
+ closing = true;
+ writeLock.acquire();
+ if(!processing){
+ //allow pending frames to be written to socket
+ fd.reqevents = APR_POLLOUT;
+ processor->update(&fd);
+ }
+ writeLock.release();
+}
+
+void LFSessionContext::handleClose(){
+ handler->closed();
+ std::cout << "Session closed [" << &socket << "]" << std::endl;
+ delete handler;
+ delete this;
+}
+
+void LFSessionContext::shutdown(){
+ socket.close();
+ handleClose();
+}
+
+void LFSessionContext::init(SessionHandler* _handler){
+ handler = _handler;
+ processor->add(&fd);
+}
+
+void LFSessionContext::log(const std::string& desc, AMQFrame* const frame){
+ logLock.acquire();
+ std::cout << desc << " [" << &socket << "]: " << *frame << std::endl;
+ logLock.release();
+}
+
+APRMonitor LFSessionContext::logLock;
diff --git a/cpp/src/qpid/io/LFSessionContext.h b/cpp/src/qpid/io/LFSessionContext.h
new file mode 100644
index 0000000000..fad8736796
--- /dev/null
+++ b/cpp/src/qpid/io/LFSessionContext.h
@@ -0,0 +1,88 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _LFSessionContext_
+#define _LFSessionContext_
+
+#include <queue>
+
+#include "apr_network_io.h"
+#include "apr_poll.h"
+#include "apr_time.h"
+
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/concurrent/APRMonitor.h"
+#include "qpid/io/APRSocket.h"
+#include "qpid/framing/Buffer.h"
+#include "qpid/io/LFProcessor.h"
+#include "qpid/io/SessionContext.h"
+#include "qpid/io/SessionHandler.h"
+
+namespace qpid {
+namespace io {
+
+
+ class LFSessionContext : public virtual SessionContext
+ {
+ const bool debug;
+ APRSocket socket;
+ bool initiated;
+
+ qpid::framing::Buffer in;
+ qpid::framing::Buffer out;
+
+ SessionHandler* handler;
+ LFProcessor* const processor;
+
+ apr_pollfd_t fd;
+
+ std::queue<qpid::framing::AMQFrame*> framesToWrite;
+ qpid::concurrent::APRMonitor writeLock;
+
+ bool processing;
+ bool closing;
+
+ //these are just for debug, as a crude way of detecting concurrent access
+ volatile unsigned int reading;
+ volatile unsigned int writing;
+
+ static qpid::concurrent::APRMonitor logLock;
+ void log(const std::string& desc, qpid::framing::AMQFrame* const frame);
+
+ public:
+ LFSessionContext(apr_pool_t* pool, apr_socket_t* socket,
+ LFProcessor* const processor,
+ bool debug = false);
+ ~LFSessionContext();
+ virtual void send(qpid::framing::AMQFrame* frame);
+ virtual void close();
+ void read();
+ void write();
+ void init(SessionHandler* handler);
+ void startProcessing();
+ void stopProcessing();
+ void handleClose();
+ void shutdown();
+ inline apr_pollfd_t* const getFd(){ return &fd; }
+ inline bool isClosed(){ return !socket.isOpen(); }
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/io/SessionContext.h b/cpp/src/qpid/io/SessionContext.h
new file mode 100644
index 0000000000..b8fa8de62e
--- /dev/null
+++ b/cpp/src/qpid/io/SessionContext.h
@@ -0,0 +1,37 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _SessionContext_
+#define _SessionContext_
+
+#include "qpid/framing/OutputHandler.h"
+
+namespace qpid {
+namespace io {
+
+ class SessionContext : public virtual qpid::framing::OutputHandler
+ {
+ public:
+ virtual void close() = 0;
+ virtual ~SessionContext(){}
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/io/SessionHandler.h b/cpp/src/qpid/io/SessionHandler.h
new file mode 100644
index 0000000000..5b4e60213b
--- /dev/null
+++ b/cpp/src/qpid/io/SessionHandler.h
@@ -0,0 +1,42 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _SessionHandler_
+#define _SessionHandler_
+
+#include "qpid/framing/InputHandler.h"
+#include "qpid/framing/InitiationHandler.h"
+#include "qpid/framing/ProtocolInitiation.h"
+#include "qpid/io/TimeoutHandler.h"
+
+namespace qpid {
+namespace io {
+
+ class SessionHandler : public virtual qpid::framing::InitiationHandler,
+ public virtual qpid::framing::InputHandler,
+ public virtual TimeoutHandler
+ {
+ public:
+ virtual void closed() = 0;
+ virtual ~SessionHandler(){}
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/io/SessionHandlerFactory.h b/cpp/src/qpid/io/SessionHandlerFactory.h
new file mode 100644
index 0000000000..8ed2dffe57
--- /dev/null
+++ b/cpp/src/qpid/io/SessionHandlerFactory.h
@@ -0,0 +1,38 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _SessionHandlerFactory_
+#define _SessionHandlerFactory_
+
+#include "qpid/io/SessionContext.h"
+#include "qpid/io/SessionHandler.h"
+
+namespace qpid {
+namespace io {
+
+ class SessionHandlerFactory
+ {
+ public:
+ virtual SessionHandler* create(SessionContext* ctxt) = 0;
+ virtual ~SessionHandlerFactory(){}
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/io/SessionManager.h b/cpp/src/qpid/io/SessionManager.h
new file mode 100644
index 0000000000..e6b17451e4
--- /dev/null
+++ b/cpp/src/qpid/io/SessionManager.h
@@ -0,0 +1,40 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _SessionManager_
+#define _SessionManager_
+
+#include "qpid/io/SessionContext.h"
+#include "qpid/io/SessionHandler.h"
+
+namespace qpid {
+namespace io {
+
+ class SessionManager
+ {
+ public:
+ virtual SessionHandler* init(SessionContext* ctxt) = 0;
+ virtual void close(SessionContext* ctxt) = 0;
+ virtual void updateInterest(SessionContext* ctxt, bool read, bool write) = 0;
+ virtual ~SessionManager(){}
+ };
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/io/ShutdownHandler.h b/cpp/src/qpid/io/ShutdownHandler.h
new file mode 100644
index 0000000000..186d9eeca4
--- /dev/null
+++ b/cpp/src/qpid/io/ShutdownHandler.h
@@ -0,0 +1,34 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _ShutdownHandler_
+#define _ShutdownHandler_
+
+namespace qpid {
+namespace io {
+
+ class ShutdownHandler
+ {
+ public:
+ virtual void shutdown() = 0;
+ virtual ~ShutdownHandler(){}
+ };
+
+}
+}
+
+#endif
diff --git a/cpp/src/qpid/io/TimeoutHandler.h b/cpp/src/qpid/io/TimeoutHandler.h
new file mode 100644
index 0000000000..c92220fd6e
--- /dev/null
+++ b/cpp/src/qpid/io/TimeoutHandler.h
@@ -0,0 +1,36 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#ifndef _TimeoutHandler_
+#define _TimeoutHandler_
+
+namespace qpid {
+namespace io {
+
+ class TimeoutHandler
+ {
+ public:
+ virtual void idleOut() = 0;
+ virtual void idleIn() = 0;
+ virtual ~TimeoutHandler(){}
+ };
+
+}
+}
+
+
+#endif