summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp12
-rw-r--r--cpp/src/qpid/broker/Broker.h8
-rw-r--r--cpp/src/qpid/broker/Connection.cpp10
-rw-r--r--cpp/src/qpid/broker/Connection.h2
-rw-r--r--cpp/src/qpid/broker/ConnectionFactory.cpp29
-rw-r--r--cpp/src/qpid/broker/ConnectionFactory.h14
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.cpp21
-rw-r--r--cpp/src/qpid/broker/ConnectionHandler.h2
-rw-r--r--cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp18
-rw-r--r--cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h2
-rw-r--r--cpp/src/qpid/broker/PreviewConnection.cpp10
-rw-r--r--cpp/src/qpid/broker/PreviewConnection.h5
-rw-r--r--cpp/src/qpid/broker/PreviewConnectionCodec.cpp90
-rw-r--r--cpp/src/qpid/broker/PreviewConnectionCodec.h55
-rw-r--r--cpp/src/qpid/broker/PreviewConnectionHandler.cpp16
-rw-r--r--cpp/src/qpid/broker/PreviewConnectionHandler.h1
16 files changed, 202 insertions, 93 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 8b70831cf7..ddd5959343 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -286,19 +286,19 @@ Manageable::status_t Broker::ManagementMethod (uint32_t methodId,
return status;
}
-sys::ConnectionInputHandler* Broker::connect(
+void Broker::connect(
const std::string& host, uint16_t port,
- sys::ConnectionInputHandlerFactory* f)
+ sys::ConnectionCodec::Factory* f)
{
- return getAcceptor().connect(host, port, f ? f : &factory);
+ getAcceptor().connect(host, port, f ? f : &factory);
}
-sys::ConnectionInputHandler* Broker::connect(
- const Url& url, sys::ConnectionInputHandlerFactory* f)
+void Broker::connect(
+ const Url& url, sys::ConnectionCodec::Factory* f)
{
url.throwIfEmpty();
TcpAddress addr=boost::get<TcpAddress>(url[0]);
- return connect(addr.host, addr.port, f);
+ connect(addr.host, addr.port, f);
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 9e5191825d..481191eb55 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -119,12 +119,10 @@ class Broker : public sys::Runnable, public Plugin::Target,
ManagementMethod (uint32_t methodId, management::Args& args);
/** Create a connection to another broker. */
- sys::ConnectionInputHandler*
- connect(const std::string& host, uint16_t port,
- sys::ConnectionInputHandlerFactory* =0);
+ void connect(const std::string& host, uint16_t port,
+ sys::ConnectionCodec::Factory* =0);
/** Create a connection to another broker. */
- sys::ConnectionInputHandler*
- connect(const Url& url, sys::ConnectionInputHandlerFactory* =0);
+ void connect(const Url& url, sys::ConnectionCodec::Factory* =0);
private:
sys::Acceptor& getAcceptor() const;
diff --git a/cpp/src/qpid/broker/Connection.cpp b/cpp/src/qpid/broker/Connection.cpp
index 8be4f7756e..1e55087390 100644
--- a/cpp/src/qpid/broker/Connection.cpp
+++ b/cpp/src/qpid/broker/Connection.cpp
@@ -90,7 +90,9 @@ Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std
adapter(*this),
mgmtClosing(0),
mgmtId(mgmtId_)
-{}
+{
+ initMgmt();
+}
void Connection::initMgmt(bool asLink)
{
@@ -134,12 +136,6 @@ void Connection::close(
getOutput().close();
}
-void Connection::initiated(const framing::ProtocolInitiation& header) {
- version = ProtocolVersion(header.getMajor(), header.getMinor());
- adapter.init(header);
- initMgmt();
-}
-
void Connection::idleOut(){}
void Connection::idleIn(){}
diff --git a/cpp/src/qpid/broker/Connection.h b/cpp/src/qpid/broker/Connection.h
index 8719a9dfcd..a59df26c84 100644
--- a/cpp/src/qpid/broker/Connection.h
+++ b/cpp/src/qpid/broker/Connection.h
@@ -65,12 +65,10 @@ class Connection : public sys::ConnectionInputHandler,
// ConnectionInputHandler methods
void received(framing::AMQFrame& frame);
- void initiated(const framing::ProtocolInitiation& header);
void idleOut();
void idleIn();
void closed();
bool doOutput();
- framing::ProtocolInitiation getInitiation() { return framing::ProtocolInitiation(version); }
void closeChannel(framing::ChannelId channel);
diff --git a/cpp/src/qpid/broker/ConnectionFactory.cpp b/cpp/src/qpid/broker/ConnectionFactory.cpp
index a0cd4e35d7..dfab998c78 100644
--- a/cpp/src/qpid/broker/ConnectionFactory.cpp
+++ b/cpp/src/qpid/broker/ConnectionFactory.cpp
@@ -19,27 +19,32 @@
*
*/
#include "ConnectionFactory.h"
-#include "Connection.h"
-#include "MultiVersionConnectionInputHandler.h"
+#include "qpid/framing/ProtocolVersion.h"
+#include "qpid/amqp_0_10/Connection.h"
+#include "PreviewConnectionCodec.h"
namespace qpid {
namespace broker {
+using framing::ProtocolVersion;
-ConnectionFactory::ConnectionFactory(Broker& b) : broker(b)
-{}
+ConnectionFactory::ConnectionFactory(Broker& b) : broker(b) {}
+ConnectionFactory::~ConnectionFactory() {}
-ConnectionFactory::~ConnectionFactory()
-{
-
+sys::ConnectionCodec*
+ConnectionFactory::create(ProtocolVersion v, sys::OutputControl& out, const std::string& id) {
+ if (v == ProtocolVersion(99, 0))
+ return new PreviewConnectionCodec(out, broker, id);
+ if (v == ProtocolVersion(0, 10))
+ return new amqp_0_10::Connection(out, broker, id);
+ return 0;
}
-qpid::sys::ConnectionInputHandler*
-ConnectionFactory::create(qpid::sys::ConnectionOutputHandler* out,
- const std::string& id)
-{
- return new MultiVersionConnectionInputHandler(out, broker, id);
+sys::ConnectionCodec*
+ConnectionFactory::create(sys::OutputControl& out, const std::string& id) {
+ // FIXME aconway 2008-03-18:
+ return new PreviewConnectionCodec(out, broker, id);
}
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/ConnectionFactory.h b/cpp/src/qpid/broker/ConnectionFactory.h
index 53fb160279..5797495054 100644
--- a/cpp/src/qpid/broker/ConnectionFactory.h
+++ b/cpp/src/qpid/broker/ConnectionFactory.h
@@ -21,22 +21,24 @@
#ifndef _ConnectionFactory_
#define _ConnectionFactory_
-#include "qpid/sys/ConnectionInputHandlerFactory.h"
+#include "qpid/sys/ConnectionCodec.h"
namespace qpid {
namespace broker {
class Broker;
-class ConnectionFactory : public qpid::sys::ConnectionInputHandlerFactory
-{
+class ConnectionFactory : public sys::ConnectionCodec::Factory {
public:
ConnectionFactory(Broker& b);
- virtual qpid::sys::ConnectionInputHandler*
- create(qpid::sys::ConnectionOutputHandler* out, const std::string& id);
-
virtual ~ConnectionFactory();
+ sys::ConnectionCodec*
+ create(framing::ProtocolVersion, sys::OutputControl&, const std::string& id);
+
+ sys::ConnectionCodec*
+ create(sys::OutputControl&, const std::string& id);
+
private:
Broker& broker;
};
diff --git a/cpp/src/qpid/broker/ConnectionHandler.cpp b/cpp/src/qpid/broker/ConnectionHandler.cpp
index 0aee420022..53a403c955 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.cpp
+++ b/cpp/src/qpid/broker/ConnectionHandler.cpp
@@ -38,17 +38,6 @@ const std::string PLAIN = "PLAIN";
const std::string en_US = "en_US";
}
-void ConnectionHandler::init(const framing::ProtocolInitiation& header) {
- //need to send out a protocol header back to the client
- handler->connection.getOutput().initiated(header);
-
- FieldTable properties;
- string mechanisms(PLAIN);
- string locales(en_US);
- handler->serverMode = true;
- handler->client.start(properties, mechanisms, locales);
-}
-
void ConnectionHandler::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId)
{
handler->client.close(code, text, classId, methodId);
@@ -75,7 +64,15 @@ void ConnectionHandler::handle(framing::AMQFrame& frame)
}
}
-ConnectionHandler::ConnectionHandler(Connection& connection) : handler(new Handler(connection)) {}
+ConnectionHandler::ConnectionHandler(Connection& connection) : handler(new Handler(connection)) {
+ FieldTable properties;
+ string mechanisms(PLAIN);
+ string locales(en_US);
+ handler->serverMode = true;
+ handler->client.start(properties, mechanisms, locales);
+}
+
+
ConnectionHandler::Handler:: Handler(Connection& c) : client(c.getOutput()), server(c.getOutput()),
connection(c), serverMode(false) {}
diff --git a/cpp/src/qpid/broker/ConnectionHandler.h b/cpp/src/qpid/broker/ConnectionHandler.h
index 44e2ce05fa..8e659f0913 100644
--- a/cpp/src/qpid/broker/ConnectionHandler.h
+++ b/cpp/src/qpid/broker/ConnectionHandler.h
@@ -38,7 +38,6 @@ namespace broker {
class Connection;
-// TODO aconway 2007-09-18: Rename to ConnectionHandler
class ConnectionHandler : public framing::FrameHandler
{
struct Handler : public framing::AMQP_ServerOperations::Connection010Handler,
@@ -82,7 +81,6 @@ class ConnectionHandler : public framing::FrameHandler
std::auto_ptr<Handler> handler;
public:
ConnectionHandler(Connection& connection);
- void init(const framing::ProtocolInitiation& header);
void close(framing::ReplyCode code, const std::string& text, framing::ClassId classId, framing::MethodId methodId);
void handle(framing::AMQFrame& frame);
};
diff --git a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp
index 6c3d960d1f..f1bbf7d10e 100644
--- a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp
+++ b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.cpp
@@ -31,19 +31,6 @@ MultiVersionConnectionInputHandler::MultiVersionConnectionInputHandler(
Broker& _broker,
const std::string& _id) : linkVersion(99,0), out(_out), broker(_broker), id(_id) {}
-
-void MultiVersionConnectionInputHandler::initiated(const qpid::framing::ProtocolInitiation& i)
-{
- if (i.getMajor() == 99 && i.getMinor() == 0) {
- handler = std::auto_ptr<ConnectionInputHandler>(new PreviewConnection(out, broker, id));
- } else if (i.getMajor() == 0 && i.getMinor() == 10) {
- handler = std::auto_ptr<ConnectionInputHandler>(new Connection(out, broker, id));
- } else {
- throw qpid::framing::InternalErrorException("Unsupported version: " + i.getVersion().toString());
- }
- handler->initiated(i);
-}
-
void MultiVersionConnectionInputHandler::received(qpid::framing::AMQFrame& f)
{
check();
@@ -67,11 +54,6 @@ bool MultiVersionConnectionInputHandler::doOutput()
return handler.get() && handler->doOutput();
}
-qpid::framing::ProtocolInitiation MultiVersionConnectionInputHandler::getInitiation()
-{
- return qpid::framing::ProtocolInitiation(linkVersion);
-}
-
void MultiVersionConnectionInputHandler::closed()
{
if (handler.get()) handler->closed();
diff --git a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h
index 440c00c09a..e6915a00bd 100644
--- a/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h
+++ b/cpp/src/qpid/broker/MultiVersionConnectionInputHandler.h
@@ -44,12 +44,10 @@ public:
MultiVersionConnectionInputHandler(qpid::sys::ConnectionOutputHandler* out, Broker& broker, const std::string& id);
virtual ~MultiVersionConnectionInputHandler() {}
- void initiated(const qpid::framing::ProtocolInitiation&);
void received(qpid::framing::AMQFrame&);
void idleOut();
void idleIn();
bool doOutput();
- qpid::framing::ProtocolInitiation getInitiation();
void closed();
};
diff --git a/cpp/src/qpid/broker/PreviewConnection.cpp b/cpp/src/qpid/broker/PreviewConnection.cpp
index 05879a0329..5a541b5624 100644
--- a/cpp/src/qpid/broker/PreviewConnection.cpp
+++ b/cpp/src/qpid/broker/PreviewConnection.cpp
@@ -90,7 +90,9 @@ PreviewConnection::PreviewConnection(ConnectionOutputHandler* out_, Broker& brok
adapter(*this),
mgmtClosing(0),
mgmtId(mgmtId_)
-{}
+{
+ initMgmt();
+}
void PreviewConnection::initMgmt(bool asLink)
{
@@ -134,12 +136,6 @@ void PreviewConnection::close(
getOutput().close();
}
-void PreviewConnection::initiated(const framing::ProtocolInitiation& header) {
- version = ProtocolVersion(header.getMajor(), header.getMinor());
- adapter.init(header);
- initMgmt();
-}
-
void PreviewConnection::idleOut(){}
void PreviewConnection::idleIn(){}
diff --git a/cpp/src/qpid/broker/PreviewConnection.h b/cpp/src/qpid/broker/PreviewConnection.h
index d6a945c26c..1cc9e7a3d4 100644
--- a/cpp/src/qpid/broker/PreviewConnection.h
+++ b/cpp/src/qpid/broker/PreviewConnection.h
@@ -50,8 +50,7 @@
namespace qpid {
namespace broker {
-class PreviewConnection : public sys::ConnectionInputHandler,
- public ConnectionState
+class PreviewConnection : public sys::ConnectionInputHandler, public ConnectionState
{
public:
PreviewConnection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId);
@@ -65,12 +64,10 @@ class PreviewConnection : public sys::ConnectionInputHandler,
// ConnectionInputHandler methods
void received(framing::AMQFrame& frame);
- void initiated(const framing::ProtocolInitiation& header);
void idleOut();
void idleIn();
void closed();
bool doOutput();
- framing::ProtocolInitiation getInitiation() { return framing::ProtocolInitiation(version); }
void closeChannel(framing::ChannelId channel);
diff --git a/cpp/src/qpid/broker/PreviewConnectionCodec.cpp b/cpp/src/qpid/broker/PreviewConnectionCodec.cpp
new file mode 100644
index 0000000000..81ec7f7076
--- /dev/null
+++ b/cpp/src/qpid/broker/PreviewConnectionCodec.cpp
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "PreviewConnectionCodec.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace broker {
+
+using sys::Mutex;
+
+PreviewConnectionCodec::PreviewConnectionCodec(sys::OutputControl& o, Broker& broker, const std::string& id)
+ : frameQueueClosed(false), output(o), connection(this, broker, id), identifier(id) {}
+
+size_t PreviewConnectionCodec::decode(const char* buffer, size_t size) {
+ framing::Buffer in(const_cast<char*>(buffer), size);
+ framing::AMQFrame frame;
+ while(frame.decode(in)) {
+ QPID_LOG(trace, "RECV [" << identifier << "]: " << frame);
+ connection.received(frame);
+ }
+ return in.getPosition();
+}
+
+bool PreviewConnectionCodec::canEncode() {
+ if (!frameQueueClosed) connection.doOutput();
+ return !frameQueue.empty();
+}
+
+bool PreviewConnectionCodec::isClosed() const {
+ Mutex::ScopedLock l(frameQueueLock);
+ return frameQueueClosed;
+}
+
+size_t PreviewConnectionCodec::encode(const char* buffer, size_t size) {
+ Mutex::ScopedLock l(frameQueueLock);
+ framing::Buffer out(const_cast<char*>(buffer), size);
+ while (!frameQueue.empty() && (frameQueue.front().size() <= out.available())) {
+ frameQueue.front().encode(out);
+ QPID_LOG(trace, "SENT [" << identifier << "]: " << frameQueue.front());
+ frameQueue.pop();
+ }
+ if (!frameQueue.empty() && frameQueue.front().size() > size)
+ throw framing::ContentTooLargeException(QPID_MSG("Could not write frame, too large for buffer."));
+ return out.getPosition();
+}
+
+void PreviewConnectionCodec::activateOutput() { output.activateOutput(); }
+
+void PreviewConnectionCodec::close() {
+ // Close the output queue.
+ Mutex::ScopedLock l(frameQueueLock);
+ frameQueueClosed = true;
+}
+
+void PreviewConnectionCodec::closed() {
+ connection.closed();
+}
+
+void PreviewConnectionCodec::send(framing::AMQFrame& f) {
+ {
+ Mutex::ScopedLock l(frameQueueLock);
+ if (!frameQueueClosed)
+ frameQueue.push(f);
+ }
+ activateOutput();
+}
+
+framing::ProtocolVersion PreviewConnectionCodec::getVersion() const {
+ return framing::ProtocolVersion(99,0);
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/PreviewConnectionCodec.h b/cpp/src/qpid/broker/PreviewConnectionCodec.h
new file mode 100644
index 0000000000..8c7074c1df
--- /dev/null
+++ b/cpp/src/qpid/broker/PreviewConnectionCodec.h
@@ -0,0 +1,55 @@
+#ifndef QPID_BROKER_PREVIEWCONNECTIONCODEC_H
+#define QPID_BROKER_PREVIEWCONNECTIONCODEC_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/ConnectionCodec.h"
+#include "qpid/sys/ConnectionOutputHandler.h"
+#include "qpid/sys/Mutex.h"
+#include "PreviewConnection.h"
+
+namespace qpid {
+namespace broker {
+
+class PreviewConnectionCodec : public sys::ConnectionCodec, public sys::ConnectionOutputHandler {
+ std::queue<framing::AMQFrame> frameQueue;
+ bool frameQueueClosed;
+ mutable sys::Mutex frameQueueLock;
+ sys::OutputControl& output;
+ PreviewConnection connection;
+ std::string identifier;
+
+ public:
+ PreviewConnectionCodec(sys::OutputControl&, Broker&, const std::string& id);
+ size_t decode(const char* buffer, size_t size);
+ size_t encode(const char* buffer, size_t size);
+ bool isClosed() const;
+ bool canEncode();
+ void activateOutput();
+ void closed(); // connection closed by peer.
+ void close(); // closing from this end.
+ void send(framing::AMQFrame&);
+ framing::ProtocolVersion getVersion() const;
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_PREVIEWCONNECTIONCODEC_H*/
diff --git a/cpp/src/qpid/broker/PreviewConnectionHandler.cpp b/cpp/src/qpid/broker/PreviewConnectionHandler.cpp
index c0f0d9f5e0..0052b0d588 100644
--- a/cpp/src/qpid/broker/PreviewConnectionHandler.cpp
+++ b/cpp/src/qpid/broker/PreviewConnectionHandler.cpp
@@ -37,14 +37,6 @@ const std::string PLAIN = "PLAIN";
const std::string en_US = "en_US";
}
-void PreviewConnectionHandler::init(const framing::ProtocolInitiation& header) {
- FieldTable properties;
- string mechanisms(PLAIN);
- string locales(en_US);
- handler->serverMode = true;
- handler->client.start(header.getMajor(), header.getMinor(), properties, mechanisms, locales);
-}
-
void PreviewConnectionHandler::close(ReplyCode code, const string& text, ClassId classId, MethodId methodId)
{
handler->client.close(code, text, classId, methodId);
@@ -68,7 +60,13 @@ void PreviewConnectionHandler::handle(framing::AMQFrame& frame)
}
}
-PreviewConnectionHandler::PreviewConnectionHandler(PreviewConnection& connection) : handler(new Handler(connection)) {}
+PreviewConnectionHandler::PreviewConnectionHandler(PreviewConnection& connection) : handler(new Handler(connection)) {
+ FieldTable properties;
+ string mechanisms(PLAIN);
+ string locales(en_US);
+ handler->serverMode = true;
+ handler->client.start(0, 10, properties, mechanisms, locales);
+}
PreviewConnectionHandler::Handler:: Handler(PreviewConnection& c) : client(c.getOutput()), server(c.getOutput()),
connection(c), serverMode(false) {}
diff --git a/cpp/src/qpid/broker/PreviewConnectionHandler.h b/cpp/src/qpid/broker/PreviewConnectionHandler.h
index 93901dd492..bd6b54e8f7 100644
--- a/cpp/src/qpid/broker/PreviewConnectionHandler.h
+++ b/cpp/src/qpid/broker/PreviewConnectionHandler.h
@@ -81,7 +81,6 @@ class PreviewConnectionHandler : public framing::FrameHandler
std::auto_ptr<Handler> handler;
public:
PreviewConnectionHandler(PreviewConnection& connection);
- void init(const framing::ProtocolInitiation& header);
void close(framing::ReplyCode code, const std::string& text, framing::ClassId classId, framing::MethodId methodId);
void handle(framing::AMQFrame& frame);
};