diff options
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 12 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Broker.h | 8 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Connection.cpp | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Connection.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/ConnectionFactory.cpp | 29 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/ConnectionFactory.h | 14 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/ConnectionHandler.cpp | 21 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/ConnectionHandler.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp | 18 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/PreviewConnection.cpp | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/PreviewConnection.h | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/PreviewConnectionCodec.cpp | 90 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/PreviewConnectionCodec.h | 55 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/PreviewConnectionHandler.cpp | 16 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/PreviewConnectionHandler.h | 1 |
16 files changed, 202 insertions, 93 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 8b70831cf7..ddd5959343 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -286,19 +286,19 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId, return status; } -sys::ConnectionInputHandler* Broker::connect( +void Broker::connect( const std::string& host, uint16_t port, - sys::ConnectionInputHandlerFactory* f) + sys::ConnectionCodec::Factory* f) { - return getAcceptor().connect(host, port, f ? f : &factory); + getAcceptor().connect(host, port, f ? f : &factory); } -sys::ConnectionInputHandler* Broker::connect( - const Url& url, sys::ConnectionInputHandlerFactory* f) +void Broker::connect( + const Url& url, sys::ConnectionCodec::Factory* f) { url.throwIfEmpty(); TcpAddress addr=boost::get<TcpAddress>(url[0]); - return connect(addr.host, addr.port, f); + connect(addr.host, addr.port, f); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 9e5191825d..481191eb55 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -119,12 +119,10 @@ class Broker : public sys::Runnable, public Plugin::Target, ManagementMethod (uint32_t methodId, management::Args& args); /** Create a connection to another broker. */ - sys::ConnectionInputHandler* - connect(const std::string& host, uint16_t port, - sys::ConnectionInputHandlerFactory* =0); + void connect(const std::string& host, uint16_t port, + sys::ConnectionCodec::Factory* =0); /** Create a connection to another broker. */ - sys::ConnectionInputHandler* - connect(const Url& url, sys::ConnectionInputHandlerFactory* =0); + void connect(const Url& url, sys::ConnectionCodec::Factory* =0); private: sys::Acceptor& getAcceptor() const; diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp index 8be4f7756e..1e55087390 100644 --- a/cpp/src/qpid/broker/Connection.cpp +++ b/cpp/src/qpid/broker/Connection.cpp @@ -90,7 +90,9 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std adapter(*this), mgmtClosing(0), mgmtId(mgmtId_) -{} +{ + initMgmt(); +} void Connection::initMgmt(bool asLink) { @@ -134,12 +136,6 @@ void Connection::close( getOutput().close(); } -void Connection::initiated(const framing::ProtocolInitiation& header) { - version = ProtocolVersion(header.getMajor(), header.getMinor()); - adapter.init(header); - initMgmt(); -} - void Connection::idleOut(){} void Connection::idleIn(){} diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h index 8719a9dfcd..a59df26c84 100644 --- a/cpp/src/qpid/broker/Connection.h +++ b/cpp/src/qpid/broker/Connection.h @@ -65,12 +65,10 @@ class Connection : public sys::ConnectionInputHandler, // ConnectionInputHandler methods void received(framing::AMQFrame& frame); - void initiated(const framing::ProtocolInitiation& header); void idleOut(); void idleIn(); void closed(); bool doOutput(); - framing::ProtocolInitiation getInitiation() { return framing::ProtocolInitiation(version); } void closeChannel(framing::ChannelId channel); diff --git a/cpp/src/qpid/broker/ConnectionFactory.cpp b/cpp/src/qpid/broker/ConnectionFactory.cpp index a0cd4e35d7..dfab998c78 100644 --- a/cpp/src/qpid/broker/ConnectionFactory.cpp +++ b/cpp/src/qpid/broker/ConnectionFactory.cpp @@ -19,27 +19,32 @@ * */ #include "ConnectionFactory.h" -#include "Connection.h" -#include "MultiVersionConnectionInputHandler.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/amqp_0_10/Connection.h" +#include "PreviewConnectionCodec.h" namespace qpid { namespace broker { +using framing::ProtocolVersion; -ConnectionFactory::ConnectionFactory(Broker& b) : broker(b) -{} +ConnectionFactory::ConnectionFactory(Broker& b) : broker(b) {} +ConnectionFactory::~ConnectionFactory() {} -ConnectionFactory::~ConnectionFactory() -{ - +sys::ConnectionCodec* +ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id) { + if (v == ProtocolVersion(99, 0)) + return new PreviewConnectionCodec(out, broker, id); + if (v == ProtocolVersion(0, 10)) + return new amqp_0_10::Connection(out, broker, id); + return 0; } -qpid::sys::ConnectionInputHandler* -ConnectionFactory::create(qpid::sys::ConnectionOutputHandler* out, - const std::string& id) -{ - return new MultiVersionConnectionInputHandler(out, broker, id); +sys::ConnectionCodec* +ConnectionFactory::create(sys::OutputControl& out, const std::string& id) { + // FIXME aconway 2008-03-18: + return new PreviewConnectionCodec(out, broker, id); } }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/ConnectionFactory.h b/cpp/src/qpid/broker/ConnectionFactory.h index 53fb160279..5797495054 100644 --- a/cpp/src/qpid/broker/ConnectionFactory.h +++ b/cpp/src/qpid/broker/ConnectionFactory.h @@ -21,22 +21,24 @@ #ifndef _ConnectionFactory_ #define _ConnectionFactory_ -#include "qpid/sys/ConnectionInputHandlerFactory.h" +#include "qpid/sys/ConnectionCodec.h" namespace qpid { namespace broker { class Broker; -class ConnectionFactory : public qpid::sys::ConnectionInputHandlerFactory -{ +class ConnectionFactory : public sys::ConnectionCodec::Factory { public: ConnectionFactory(Broker& b); - virtual qpid::sys::ConnectionInputHandler* - create(qpid::sys::ConnectionOutputHandler* out, const std::string& id); - virtual ~ConnectionFactory(); + sys::ConnectionCodec* + create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id); + + sys::ConnectionCodec* + create(sys::OutputControl&, const std::string& id); + private: Broker& broker; }; diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp index 0aee420022..53a403c955 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -38,17 +38,6 @@ const std::string PLAIN = "PLAIN"; const std::string en_US = "en_US"; } -void ConnectionHandler::init(const framing::ProtocolInitiation& header) { - //need to send out a protocol header back to the client - handler->connection.getOutput().initiated(header); - - FieldTable properties; - string mechanisms(PLAIN); - string locales(en_US); - handler->serverMode = true; - handler->client.start(properties, mechanisms, locales); -} - void ConnectionHandler::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId) { handler->client.close(code, text, classId, methodId); @@ -75,7 +64,15 @@ void ConnectionHandler::handle(framing::AMQFrame& frame) } } -ConnectionHandler::ConnectionHandler(Connection& connection) : handler(new Handler(connection)) {} +ConnectionHandler::ConnectionHandler(Connection& connection) : handler(new Handler(connection)) { + FieldTable properties; + string mechanisms(PLAIN); + string locales(en_US); + handler->serverMode = true; + handler->client.start(properties, mechanisms, locales); +} + + ConnectionHandler::Handler:: Handler(Connection& c) : client(c.getOutput()), server(c.getOutput()), connection(c), serverMode(false) {} diff --git a/cpp/src/qpid/broker/ConnectionHandler.h b/cpp/src/qpid/broker/ConnectionHandler.h index 44e2ce05fa..8e659f0913 100644 --- a/cpp/src/qpid/broker/ConnectionHandler.h +++ b/cpp/src/qpid/broker/ConnectionHandler.h @@ -38,7 +38,6 @@ namespace broker { class Connection; -// TODO aconway 2007-09-18: Rename to ConnectionHandler class ConnectionHandler : public framing::FrameHandler { struct Handler : public framing::AMQP_ServerOperations::Connection010Handler, @@ -82,7 +81,6 @@ class ConnectionHandler : public framing::FrameHandler std::auto_ptr<Handler> handler; public: ConnectionHandler(Connection& connection); - void init(const framing::ProtocolInitiation& header); void close(framing::ReplyCode code, const std::string& text, framing::ClassId classId, framing::MethodId methodId); void handle(framing::AMQFrame& frame); }; diff --git a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp index 6c3d960d1f..f1bbf7d10e 100644 --- a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp +++ b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp @@ -31,19 +31,6 @@ MultiVersionConnectionInputHandler::MultiVersionConnectionInputHandler( Broker& _broker, const std::string& _id) : linkVersion(99,0), out(_out), broker(_broker), id(_id) {} - -void MultiVersionConnectionInputHandler::initiated(const qpid::framing::ProtocolInitiation& i) -{ - if (i.getMajor() == 99 && i.getMinor() == 0) { - handler = std::auto_ptr<ConnectionInputHandler>(new PreviewConnection(out, broker, id)); - } else if (i.getMajor() == 0 && i.getMinor() == 10) { - handler = std::auto_ptr<ConnectionInputHandler>(new Connection(out, broker, id)); - } else { - throw qpid::framing::InternalErrorException("Unsupported version: " + i.getVersion().toString()); - } - handler->initiated(i); -} - void MultiVersionConnectionInputHandler::received(qpid::framing::AMQFrame& f) { check(); @@ -67,11 +54,6 @@ bool MultiVersionConnectionInputHandler::doOutput() return handler.get() && handler->doOutput(); } -qpid::framing::ProtocolInitiation MultiVersionConnectionInputHandler::getInitiation() -{ - return qpid::framing::ProtocolInitiation(linkVersion); -} - void MultiVersionConnectionInputHandler::closed() { if (handler.get()) handler->closed(); diff --git a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h index 440c00c09a..e6915a00bd 100644 --- a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h +++ b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h @@ -44,12 +44,10 @@ public: MultiVersionConnectionInputHandler(qpid::sys::ConnectionOutputHandler* out, Broker& broker, const std::string& id); virtual ~MultiVersionConnectionInputHandler() {} - void initiated(const qpid::framing::ProtocolInitiation&); void received(qpid::framing::AMQFrame&); void idleOut(); void idleIn(); bool doOutput(); - qpid::framing::ProtocolInitiation getInitiation(); void closed(); }; diff --git a/cpp/src/qpid/broker/PreviewConnection.cpp b/cpp/src/qpid/broker/PreviewConnection.cpp index 05879a0329..5a541b5624 100644 --- a/cpp/src/qpid/broker/PreviewConnection.cpp +++ b/cpp/src/qpid/broker/PreviewConnection.cpp @@ -90,7 +90,9 @@ PreviewConnection::PreviewConnection(ConnectionOutputHandler* out_, Broker& brok adapter(*this), mgmtClosing(0), mgmtId(mgmtId_) -{} +{ + initMgmt(); +} void PreviewConnection::initMgmt(bool asLink) { @@ -134,12 +136,6 @@ void PreviewConnection::close( getOutput().close(); } -void PreviewConnection::initiated(const framing::ProtocolInitiation& header) { - version = ProtocolVersion(header.getMajor(), header.getMinor()); - adapter.init(header); - initMgmt(); -} - void PreviewConnection::idleOut(){} void PreviewConnection::idleIn(){} diff --git a/cpp/src/qpid/broker/PreviewConnection.h b/cpp/src/qpid/broker/PreviewConnection.h index d6a945c26c..1cc9e7a3d4 100644 --- a/cpp/src/qpid/broker/PreviewConnection.h +++ b/cpp/src/qpid/broker/PreviewConnection.h @@ -50,8 +50,7 @@ namespace qpid { namespace broker { -class PreviewConnection : public sys::ConnectionInputHandler, - public ConnectionState +class PreviewConnection : public sys::ConnectionInputHandler, public ConnectionState { public: PreviewConnection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId); @@ -65,12 +64,10 @@ class PreviewConnection : public sys::ConnectionInputHandler, // ConnectionInputHandler methods void received(framing::AMQFrame& frame); - void initiated(const framing::ProtocolInitiation& header); void idleOut(); void idleIn(); void closed(); bool doOutput(); - framing::ProtocolInitiation getInitiation() { return framing::ProtocolInitiation(version); } void closeChannel(framing::ChannelId channel); diff --git a/cpp/src/qpid/broker/PreviewConnectionCodec.cpp b/cpp/src/qpid/broker/PreviewConnectionCodec.cpp new file mode 100644 index 0000000000..81ec7f7076 --- /dev/null +++ b/cpp/src/qpid/broker/PreviewConnectionCodec.cpp @@ -0,0 +1,90 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "PreviewConnectionCodec.h" +#include "qpid/log/Statement.h" + +namespace qpid { +namespace broker { + +using sys::Mutex; + +PreviewConnectionCodec::PreviewConnectionCodec(sys::OutputControl& o, Broker& broker, const std::string& id) + : frameQueueClosed(false), output(o), connection(this, broker, id), identifier(id) {} + +size_t PreviewConnectionCodec::decode(const char* buffer, size_t size) { + framing::Buffer in(const_cast<char*>(buffer), size); + framing::AMQFrame frame; + while(frame.decode(in)) { + QPID_LOG(trace, "RECV [" << identifier << "]: " << frame); + connection.received(frame); + } + return in.getPosition(); +} + +bool PreviewConnectionCodec::canEncode() { + if (!frameQueueClosed) connection.doOutput(); + return !frameQueue.empty(); +} + +bool PreviewConnectionCodec::isClosed() const { + Mutex::ScopedLock l(frameQueueLock); + return frameQueueClosed; +} + +size_t PreviewConnectionCodec::encode(const char* buffer, size_t size) { + Mutex::ScopedLock l(frameQueueLock); + framing::Buffer out(const_cast<char*>(buffer), size); + while (!frameQueue.empty() && (frameQueue.front().size() <= out.available())) { + frameQueue.front().encode(out); + QPID_LOG(trace, "SENT [" << identifier << "]: " << frameQueue.front()); + frameQueue.pop(); + } + if (!frameQueue.empty() && frameQueue.front().size() > size) + throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer.")); + return out.getPosition(); +} + +void PreviewConnectionCodec::activateOutput() { output.activateOutput(); } + +void PreviewConnectionCodec::close() { + // Close the output queue. + Mutex::ScopedLock l(frameQueueLock); + frameQueueClosed = true; +} + +void PreviewConnectionCodec::closed() { + connection.closed(); +} + +void PreviewConnectionCodec::send(framing::AMQFrame& f) { + { + Mutex::ScopedLock l(frameQueueLock); + if (!frameQueueClosed) + frameQueue.push(f); + } + activateOutput(); +} + +framing::ProtocolVersion PreviewConnectionCodec::getVersion() const { + return framing::ProtocolVersion(99,0); +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/PreviewConnectionCodec.h b/cpp/src/qpid/broker/PreviewConnectionCodec.h new file mode 100644 index 0000000000..8c7074c1df --- /dev/null +++ b/cpp/src/qpid/broker/PreviewConnectionCodec.h @@ -0,0 +1,55 @@ +#ifndef QPID_BROKER_PREVIEWCONNECTIONCODEC_H +#define QPID_BROKER_PREVIEWCONNECTIONCODEC_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/ConnectionCodec.h" +#include "qpid/sys/ConnectionOutputHandler.h" +#include "qpid/sys/Mutex.h" +#include "PreviewConnection.h" + +namespace qpid { +namespace broker { + +class PreviewConnectionCodec : public sys::ConnectionCodec, public sys::ConnectionOutputHandler { + std::queue<framing::AMQFrame> frameQueue; + bool frameQueueClosed; + mutable sys::Mutex frameQueueLock; + sys::OutputControl& output; + PreviewConnection connection; + std::string identifier; + + public: + PreviewConnectionCodec(sys::OutputControl&, Broker&, const std::string& id); + size_t decode(const char* buffer, size_t size); + size_t encode(const char* buffer, size_t size); + bool isClosed() const; + bool canEncode(); + void activateOutput(); + void closed(); // connection closed by peer. + void close(); // closing from this end. + void send(framing::AMQFrame&); + framing::ProtocolVersion getVersion() const; +}; + +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_PREVIEWCONNECTIONCODEC_H*/ diff --git a/cpp/src/qpid/broker/PreviewConnectionHandler.cpp b/cpp/src/qpid/broker/PreviewConnectionHandler.cpp index c0f0d9f5e0..0052b0d588 100644 --- a/cpp/src/qpid/broker/PreviewConnectionHandler.cpp +++ b/cpp/src/qpid/broker/PreviewConnectionHandler.cpp @@ -37,14 +37,6 @@ const std::string PLAIN = "PLAIN"; const std::string en_US = "en_US"; } -void PreviewConnectionHandler::init(const framing::ProtocolInitiation& header) { - FieldTable properties; - string mechanisms(PLAIN); - string locales(en_US); - handler->serverMode = true; - handler->client.start(header.getMajor(), header.getMinor(), properties, mechanisms, locales); -} - void PreviewConnectionHandler::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId) { handler->client.close(code, text, classId, methodId); @@ -68,7 +60,13 @@ void PreviewConnectionHandler::handle(framing::AMQFrame& frame) } } -PreviewConnectionHandler::PreviewConnectionHandler(PreviewConnection& connection) : handler(new Handler(connection)) {} +PreviewConnectionHandler::PreviewConnectionHandler(PreviewConnection& connection) : handler(new Handler(connection)) { + FieldTable properties; + string mechanisms(PLAIN); + string locales(en_US); + handler->serverMode = true; + handler->client.start(0, 10, properties, mechanisms, locales); +} PreviewConnectionHandler::Handler:: Handler(PreviewConnection& c) : client(c.getOutput()), server(c.getOutput()), connection(c), serverMode(false) {} diff --git a/cpp/src/qpid/broker/PreviewConnectionHandler.h b/cpp/src/qpid/broker/PreviewConnectionHandler.h index 93901dd492..bd6b54e8f7 100644 --- a/cpp/src/qpid/broker/PreviewConnectionHandler.h +++ b/cpp/src/qpid/broker/PreviewConnectionHandler.h @@ -81,7 +81,6 @@ class PreviewConnectionHandler : public framing::FrameHandler std::auto_ptr<Handler> handler; public: PreviewConnectionHandler(PreviewConnection& connection); - void init(const framing::ProtocolInitiation& header); void close(framing::ReplyCode code, const std::string& text, framing::ClassId classId, framing::MethodId methodId); void handle(framing::AMQFrame& frame); }; |
