summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys')
-rw-r--r--cpp/src/qpid/sys/Acceptor.h11
-rw-r--r--cpp/src/qpid/sys/AsynchIOAcceptor.cpp200
-rw-r--r--cpp/src/qpid/sys/ConnectionCodec.h80
-rw-r--r--cpp/src/qpid/sys/ConnectionInputHandler.h4
-rw-r--r--cpp/src/qpid/sys/ConnectionOutputHandler.h3
5 files changed, 142 insertions, 156 deletions
diff --git a/cpp/src/qpid/sys/Acceptor.h b/cpp/src/qpid/sys/Acceptor.h
index 5eb1f1a500..1e7827e60c 100644
--- a/cpp/src/qpid/sys/Acceptor.h
+++ b/cpp/src/qpid/sys/Acceptor.h
@@ -24,13 +24,11 @@
#include <stdint.h>
#include "qpid/SharedObject.h"
+#include "ConnectionCodec.h"
namespace qpid {
namespace sys {
-class ConnectionInputHandlerFactory;
-class ConnectionInputHandler;
-
class Acceptor : public qpid::SharedObject<Acceptor>
{
public:
@@ -38,10 +36,9 @@ class Acceptor : public qpid::SharedObject<Acceptor>
virtual ~Acceptor() = 0;
virtual uint16_t getPort() const = 0;
virtual std::string getHost() const = 0;
- virtual void run(ConnectionInputHandlerFactory* factory) = 0;
- virtual ConnectionInputHandler* connect(
- const std::string& host, int16_t port,
- ConnectionInputHandlerFactory* factory) = 0;
+ virtual void run(ConnectionCodec::Factory*) = 0;
+ virtual void connect(
+ const std::string& host, int16_t port, ConnectionCodec::Factory* codec) = 0;
/** Note: this function is async-signal safe */
virtual void shutdown() = 0;
diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
index c24205f53e..56d7c6e1f3 100644
--- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
+++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp
@@ -27,12 +27,8 @@
#include "Thread.h"
#include "qpid/sys/ConnectionOutputHandler.h"
-#include "qpid/sys/ConnectionInputHandler.h"
-#include "qpid/sys/ConnectionInputHandlerFactory.h"
#include "qpid/framing/reply_exceptions.h"
-#include "qpid/framing/AMQDataBlock.h"
-#include "qpid/framing/Buffer.h"
-#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/log/Statement.h"
#include <boost/bind.hpp>
@@ -40,6 +36,7 @@
#include <queue>
#include <vector>
#include <memory>
+#include <ostream>
namespace qpid {
namespace sys {
@@ -53,10 +50,8 @@ class AsynchIOAcceptor : public Acceptor {
public:
AsynchIOAcceptor(int16_t port, int backlog, int threads);
~AsynchIOAcceptor() {}
- void run(ConnectionInputHandlerFactory* factory);
- ConnectionInputHandler* connect(
- const std::string& host, int16_t port,
- ConnectionInputHandlerFactory* factory);
+ void run(ConnectionCodec::Factory*);
+ void connect(const std::string& host, int16_t port, ConnectionCodec::Factory*);
void shutdown();
@@ -64,13 +59,12 @@ class AsynchIOAcceptor : public Acceptor {
std::string getHost() const;
private:
- void accepted(Poller::shared_ptr, const Socket&, ConnectionInputHandlerFactory*);
+ void accepted(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*);
};
Acceptor::shared_ptr Acceptor::create(int16_t port, int backlog, int threads)
{
- return
- Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads));
+ return Acceptor::shared_ptr(new AsynchIOAcceptor(port, backlog, threads));
}
AsynchIOAcceptor::AsynchIOAcceptor(int16_t port, int backlog, int threads) :
@@ -88,48 +82,43 @@ struct Buff : public AsynchIO::BufferBase {
{ delete [] bytes;}
};
-class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler {
+class AsynchIOHandler : public OutputControl {
AsynchIO* aio;
- ConnectionInputHandler* inputHandler;
- std::queue<framing::AMQFrame> frameQueue;
- Mutex frameQueueLock;
- bool frameQueueClosed;
- bool isInitiated;
+ ConnectionCodec::Factory* factory;
+ ConnectionCodec* codec;
bool readError;
std::string identifier;
bool isClient;
- void write(const framing::AMQDataBlock&);
+ void write(const framing::ProtocolInitiation&);
public:
AsynchIOHandler() :
- inputHandler(0),
- frameQueueClosed(false),
- isInitiated(false),
+ aio(0),
+ factory(0),
+ codec(0),
readError(false),
isClient(false)
{}
~AsynchIOHandler() {
- if (inputHandler)
- inputHandler->closed();
- delete inputHandler;
+ if (codec)
+ codec->closed();
+ delete codec;
}
void setClient() { isClient = true; }
-
- void init(AsynchIO* a, ConnectionInputHandler* h) {
+
+ void init(AsynchIO* a, ConnectionCodec::Factory* f) {
aio = a;
- inputHandler = h;
+ factory = f;
identifier = aio->getSocket().getPeerAddress();
+
}
// Output side
- void send(framing::AMQFrame&);
void close();
void activateOutput();
- void initiated(const framing::ProtocolInitiation&);
-
// Input side
void readbuff(AsynchIO& aio, AsynchIO::BufferBase* buff);
@@ -142,10 +131,8 @@ class AsynchIOHandler : public qpid::sys::ConnectionOutputHandler {
void closedSocket(AsynchIO& aio, const Socket& s);
};
-void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionInputHandlerFactory* f) {
-
+void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, ConnectionCodec::Factory* f) {
AsynchIOHandler* async = new AsynchIOHandler;
- ConnectionInputHandler* handler = f->create(async, s.getPeerAddress());
AsynchIO* aio = new AsynchIO(s,
boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
boost::bind(&AsynchIOHandler::eof, async, _1),
@@ -153,8 +140,7 @@ void AsynchIOAcceptor::accepted(Poller::shared_ptr poller, const Socket& s, Conn
boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
boost::bind(&AsynchIOHandler::nobuffs, async, _1),
boost::bind(&AsynchIOHandler::idle, async, _1));
- async->init(aio, handler);
-
+ async->init(aio, f);
// Give connection some buffers to use
for (int i = 0; i < 4; i++) {
aio->queueReadBuffer(new Buff);
@@ -171,7 +157,7 @@ std::string AsynchIOAcceptor::getHost() const {
return listener.getSockname();
}
-void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) {
+void AsynchIOAcceptor::run(ConnectionCodec::Factory* fact) {
Dispatcher d(poller);
AsynchAcceptor
acceptor(listener,
@@ -193,13 +179,13 @@ void AsynchIOAcceptor::run(ConnectionInputHandlerFactory* fact) {
}
}
-ConnectionInputHandler* AsynchIOAcceptor::connect(const std::string& host, int16_t port, ConnectionInputHandlerFactory* f)
+void AsynchIOAcceptor::connect(
+ const std::string& host, int16_t port, ConnectionCodec::Factory* f)
{
Socket* socket = new Socket();//Should be deleted by handle when socket closes
socket->connect(host, port);
AsynchIOHandler* async = new AsynchIOHandler;
async->setClient();
- ConnectionInputHandler* handler = f->create(async, socket->getPeerAddress());
AsynchIO* aio = new AsynchIO(*socket,
boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
boost::bind(&AsynchIOHandler::eof, async, _1),
@@ -207,14 +193,12 @@ ConnectionInputHandler* AsynchIOAcceptor::connect(const std::string& host, int16
boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
boost::bind(&AsynchIOHandler::nobuffs, async, _1),
boost::bind(&AsynchIOHandler::idle, async, _1));
- async->init(aio, handler);
-
+ async->init(aio, f);
// Give connection some buffers to use
for (int i = 0; i < 4; i++) {
aio->queueReadBuffer(new Buff);
}
aio->start(poller);
- return handler;
}
@@ -225,8 +209,9 @@ void AsynchIOAcceptor::shutdown() {
}
-void AsynchIOHandler::write(const framing::AMQDataBlock& data)
+void AsynchIOHandler::write(const framing::ProtocolInitiation& data)
{
+ QPID_LOG(debug, "SENT [" << identifier << "] INIT( " << data << ")");
AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
if (!buff)
buff = new Buff;
@@ -236,68 +221,45 @@ void AsynchIOHandler::write(const framing::AMQDataBlock& data)
aio->queueWrite(buff);
}
-// Output side
-void AsynchIOHandler::send(framing::AMQFrame& frame) {
- // TODO: Need to find out if we are in the callback context,
- // in the callback thread if so we can go further than just queuing the frame
- // to be handled later
- {
- ScopedLock<Mutex> l(frameQueueLock);
- // Ignore anything seen after closing
- if (!frameQueueClosed)
- frameQueue.push(frame);
- }
-
- // Activate aio for writing here
- aio->notifyPendingWrite();
-}
-
-void AsynchIOHandler::close() {
- ScopedLock<Mutex> l(frameQueueLock);
- frameQueueClosed = true;
-}
-
void AsynchIOHandler::activateOutput() {
aio->notifyPendingWrite();
}
-void AsynchIOHandler::initiated(const framing::ProtocolInitiation& pi)
-{
- write(pi);
-}
-
// Input side
void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
if (readError) {
return;
}
- framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
- if(isInitiated){
- framing::AMQFrame frame;
- try{
- while(frame.decode(in)) {
- QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
- inputHandler->received(frame);
- }
+ size_t decoded = 0;
+ if (codec) { // Already initiated
+ try {
+ decoded = codec->decode(buff->bytes+buff->dataStart, buff->dataCount);
}catch(const std::exception& e){
QPID_LOG(error, e.what());
readError = true;
aio->queueWriteClose();
}
}else{
+ framing::Buffer in(buff->bytes+buff->dataStart, buff->dataCount);
framing::ProtocolInitiation protocolInit;
- if(protocolInit.decode(in)){
- QPID_LOG(debug, "INIT [" << identifier << "]");
- inputHandler->initiated(protocolInit);
- isInitiated = true;
+ if (protocolInit.decode(in)) {
+ decoded = in.getPosition();
+ QPID_LOG(debug, "RECV [" << identifier << "] INIT( " << protocolInit << ")");
+ codec = factory->create(protocolInit.getVersion(), *this, identifier);
+ if (!codec) {
+ // FIXME aconway 2008-03-18: send valid version header & close connection.
+ // FIXME aconway 2008-03-18: exception type
+ throw Exception(
+ QPID_MSG("Protocol version not supported: " << protocolInit));
+ }
}
}
// TODO: unreading needs to go away, and when we can cope
// with multiple sub-buffers in the general buffer scheme, it will
- if (in.available() != 0) {
+ if (decoded != size_t(buff->dataCount)) {
// Adjust buffer for used bytes and then "unread them"
- buff->dataStart += buff->dataCount-in.available();
- buff->dataCount = in.available();
+ buff->dataStart += decoded;
+ buff->dataCount -= decoded;
aio->unread(buff);
} else {
// Give whole buffer back to aio subsystem
@@ -307,7 +269,7 @@ void AsynchIOHandler::readbuff(AsynchIO& , AsynchIO::BufferBase* buff) {
void AsynchIOHandler::eof(AsynchIO&) {
QPID_LOG(debug, "DISCONNECTED [" << identifier << "]");
- inputHandler->closed();
+ if (codec) codec->closed();
aio->queueWriteClose();
}
@@ -331,70 +293,22 @@ void AsynchIOHandler::nobuffs(AsynchIO&) {
}
void AsynchIOHandler::idle(AsynchIO&){
- if (isClient && !isInitiated) {
- //get & write protocol header from upper layers
- write(inputHandler->getInitiation());
- isInitiated = true;
+ if (isClient && codec == 0) {
+ codec = factory->create(*this, identifier);
+ write(framing::ProtocolInitiation(codec->getVersion()));
return;
}
- ScopedLock<Mutex> l(frameQueueLock);
-
- if (frameQueue.empty()) {
- // At this point we know that we're write idling the connection
- // so tell the input handler to queue any available output:
- inputHandler->doOutput();
- //if still no frames, theres nothing to do:
- if (frameQueue.empty()) return;
- }
-
- do {
+ if (codec == 0) return;
+ while (codec->canEncode()) {
// Try and get a queued buffer if not then construct new one
AsynchIO::BufferBase* buff = aio->getQueuedBuffer();
- if (!buff)
- buff = new Buff;
- framing::Buffer out(buff->bytes, buff->byteCount);
- int buffUsed = 0;
-
- framing::AMQFrame frame = frameQueue.front();
- int frameSize = frame.size();
- int framesEncoded=0;
- while (frameSize <= int(out.available())) {
- frameQueue.pop();
-
- // Encode output frame
- frame.encode(out);
- ++framesEncoded;
- buffUsed += frameSize;
- QPID_LOG(trace, "SENT [" << identifier << "]: " << frame);
-
- if (frameQueue.empty()) {
- //if we have run out of frames, allow upper layers to
- //generate more
- if (!frameQueueClosed) {
- inputHandler->doOutput();
- }
- if (frameQueue.empty()) {
- //if there are still no frames, we have no more to
- //do
- break;
- }
- }
- frame = frameQueue.front();
- frameSize = frame.size();
- }
- QPID_LOG(trace, "Writing buffer: " << buffUsed << " bytes " << framesEncoded << " frames ");
-
- // If frame was egregiously large complain
- if (frameSize > buff->byteCount)
- throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer."));
-
- buff->dataCount = buffUsed;
+ if (!buff) buff = new Buff;
+ size_t encoded=codec->encode(buff->bytes, buff->byteCount);
+ buff->dataCount = encoded;
aio->queueWrite(buff);
- } while (!frameQueue.empty());
-
- if (frameQueueClosed) {
- aio->queueWriteClose();
}
+ if (codec->isClosed())
+ aio->queueWriteClose();
}
}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/ConnectionCodec.h b/cpp/src/qpid/sys/ConnectionCodec.h
new file mode 100644
index 0000000000..205596c709
--- /dev/null
+++ b/cpp/src/qpid/sys/ConnectionCodec.h
@@ -0,0 +1,80 @@
+#ifndef QPID_SYS_CONNECTION_CODEC_H
+#define QPID_SYS_CONNECTION_CODEC_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/framing/ProtocolVersion.h"
+#include "OutputControl.h"
+#include <memory>
+#include <map>
+
+namespace qpid {
+
+namespace broker { class Broker; }
+
+namespace sys {
+
+/**
+ * Interface of coder/decoder for a connection of a specific protocol
+ * version.
+ */
+class ConnectionCodec {
+ public:
+ virtual ~ConnectionCodec() {}
+
+ /** Decode from buffer, return number of bytes decoded.
+ * @return may be less than size if there was incomplete
+ * data at the end of the buffer.
+ */
+ virtual size_t decode(const char* buffer, size_t size) = 0;
+
+
+ /** Encode into buffer, return number of bytes encoded */
+ virtual size_t encode(const char* buffer, size_t size) = 0;
+
+ /** Return true if we have data to encode */
+ virtual bool canEncode() = 0;
+
+ /** Network connection was closed from other end. */
+ virtual void closed() = 0;
+
+ virtual bool isClosed() const = 0;
+
+ virtual framing::ProtocolVersion getVersion() const = 0;
+
+ struct Factory {
+ virtual ~Factory() {}
+
+ /** Return 0 if version unknown */
+ virtual ConnectionCodec* create(
+ framing::ProtocolVersion, OutputControl&, const std::string& id
+ ) = 0;
+
+ /** Return "preferred" codec for outbound connections. */
+ virtual ConnectionCodec* create(
+ OutputControl&, const std::string& id
+ ) = 0;
+ };
+};
+
+}} // namespace qpid::sys
+
+#endif /*!QPID_SYS_CONNECTION_CODEC_H*/
diff --git a/cpp/src/qpid/sys/ConnectionInputHandler.h b/cpp/src/qpid/sys/ConnectionInputHandler.h
index 1936b5ec50..a2c18d6d9a 100644
--- a/cpp/src/qpid/sys/ConnectionInputHandler.h
+++ b/cpp/src/qpid/sys/ConnectionInputHandler.h
@@ -22,8 +22,6 @@
#define _ConnectionInputHandler_
#include "qpid/framing/InputHandler.h"
-#include "qpid/framing/InitiationHandler.h"
-#include "qpid/framing/ProtocolInitiation.h"
#include "OutputTask.h"
#include "TimeoutHandler.h"
@@ -31,12 +29,10 @@ namespace qpid {
namespace sys {
class ConnectionInputHandler :
- public qpid::framing::InitiationHandler,
public qpid::framing::InputHandler,
public TimeoutHandler, public OutputTask
{
public:
- virtual qpid::framing::ProtocolInitiation getInitiation() = 0;
virtual void closed() = 0;
};
diff --git a/cpp/src/qpid/sys/ConnectionOutputHandler.h b/cpp/src/qpid/sys/ConnectionOutputHandler.h
index 13407d9b9d..5a60ae4998 100644
--- a/cpp/src/qpid/sys/ConnectionOutputHandler.h
+++ b/cpp/src/qpid/sys/ConnectionOutputHandler.h
@@ -22,7 +22,6 @@
#define _ConnectionOutputHandler_
#include "qpid/framing/OutputHandler.h"
-#include "qpid/framing/InitiationHandler.h"
#include "OutputControl.h"
namespace qpid {
@@ -31,7 +30,7 @@ namespace sys {
/**
* Provides the output handler associated with a connection.
*/
-class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler, public OutputControl, public framing::InitiationHandler
+class ConnectionOutputHandler : public virtual qpid::framing::OutputHandler, public OutputControl
{
public:
virtual void close() = 0;