summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys')
-rw-r--r--cpp/src/qpid/sys/AsynchIOHandler.cpp7
-rw-r--r--cpp/src/qpid/sys/AsynchIOHandler.h5
-rw-r--r--cpp/src/qpid/sys/ConnectionCodec.h5
-rw-r--r--cpp/src/qpid/sys/ProtocolAccess.h65
-rw-r--r--cpp/src/qpid/sys/ProtocolFactory.h5
-rw-r--r--cpp/src/qpid/sys/TCPIOPlugin.cpp54
-rw-r--r--cpp/src/qpid/sys/posix/AsynchIO.cpp3
7 files changed, 117 insertions, 27 deletions
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp
index ca2bd7c93c..31974993bb 100644
--- a/cpp/src/qpid/sys/AsynchIOHandler.cpp
+++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp
@@ -36,13 +36,14 @@ struct Buff : public AsynchIO::BufferBase {
{ delete [] bytes;}
};
-AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) :
+AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f, ProtocolAccess* a) :
identifier(id),
aio(0),
factory(f),
codec(0),
readError(false),
- isClient(false)
+ isClient(false),
+ access(a)
{}
AsynchIOHandler::~AsynchIOHandler() {
@@ -152,7 +153,7 @@ void AsynchIOHandler::nobuffs(AsynchIO&) {
void AsynchIOHandler::idle(AsynchIO&){
if (isClient && codec == 0) {
- codec = factory->create(*this, identifier);
+ codec = factory->create(*this, identifier, access);
write(framing::ProtocolInitiation(codec->getVersion()));
return;
}
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.h b/cpp/src/qpid/sys/AsynchIOHandler.h
index 530613367a..ece52f57c4 100644
--- a/cpp/src/qpid/sys/AsynchIOHandler.h
+++ b/cpp/src/qpid/sys/AsynchIOHandler.h
@@ -32,7 +32,7 @@ namespace framing {
}
namespace sys {
-
+class ProtocolAccess;
class AsynchIOHandler : public OutputControl {
std::string identifier;
AsynchIO* aio;
@@ -40,11 +40,12 @@ class AsynchIOHandler : public OutputControl {
ConnectionCodec* codec;
bool readError;
bool isClient;
+ ProtocolAccess* access;
void write(const framing::ProtocolInitiation&);
public:
- AsynchIOHandler(std::string id, ConnectionCodec::Factory* f);
+ AsynchIOHandler(std::string id, ConnectionCodec::Factory* f, ProtocolAccess* a =0);
~AsynchIOHandler();
void init(AsynchIO* a, int numBuffs);
diff --git a/cpp/src/qpid/sys/ConnectionCodec.h b/cpp/src/qpid/sys/ConnectionCodec.h
index 205596c709..4c5a68e576 100644
--- a/cpp/src/qpid/sys/ConnectionCodec.h
+++ b/cpp/src/qpid/sys/ConnectionCodec.h
@@ -28,9 +28,8 @@
namespace qpid {
-namespace broker { class Broker; }
-
namespace sys {
+class ProtocolAccess;
/**
* Interface of coder/decoder for a connection of a specific protocol
@@ -70,7 +69,7 @@ class ConnectionCodec {
/** Return "preferred" codec for outbound connections. */
virtual ConnectionCodec* create(
- OutputControl&, const std::string& id
+ OutputControl&, const std::string& id, ProtocolAccess* a = 0
) = 0;
};
};
diff --git a/cpp/src/qpid/sys/ProtocolAccess.h b/cpp/src/qpid/sys/ProtocolAccess.h
new file mode 100644
index 0000000000..433bf0ef97
--- /dev/null
+++ b/cpp/src/qpid/sys/ProtocolAccess.h
@@ -0,0 +1,65 @@
+#ifndef _sys_ProtocolAccess_h
+#define _sys_ProtocolAccess_h
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "AsynchIO.h"
+#include "AsynchIOHandler.h"
+#include <boost/function.hpp>
+#include <boost/shared_ptr.hpp>
+
+namespace qpid {
+
+namespace broker
+{
+class Connection;
+}
+
+namespace sys {
+
+class ProtocolAccess
+{
+public:
+ typedef boost::function0<void> Callback;
+ typedef boost::function2<void, int, std::string> ClosedCallback;
+ typedef boost::function1<void, boost::shared_ptr<broker::Connection> > SetConnCallback;
+
+ ProtocolAccess (Callback ecb, ClosedCallback ccb, SetConnCallback sccb)
+ : aio(0), establishedCb(ecb), closedCb(ccb), setConnCb(sccb) {}
+ ~ProtocolAccess() {}
+ inline void close() { if (aio) aio->queueWriteClose(); }
+
+ inline void setAio(AsynchIO *_aio) { aio = _aio; establishedCb(); }
+ inline void closedEof(AsynchIOHandler* async) { async->eof(*aio); closedCb(-1, "Closed by Peer"); }
+ inline void closed(int err, std::string str) { closedCb(err, str); }
+ inline void callConnCb(boost::shared_ptr<broker::Connection> c) { setConnCb(c); }
+
+private:
+ AsynchIO* aio;
+ Callback establishedCb;
+ ClosedCallback closedCb;
+ SetConnCallback setConnCb;
+};
+
+}}
+
+#endif //!_sys_ProtocolAccess_h
diff --git a/cpp/src/qpid/sys/ProtocolFactory.h b/cpp/src/qpid/sys/ProtocolFactory.h
index 5f80771e49..e61a94b205 100644
--- a/cpp/src/qpid/sys/ProtocolFactory.h
+++ b/cpp/src/qpid/sys/ProtocolFactory.h
@@ -25,7 +25,7 @@
#include <stdint.h>
#include "qpid/SharedObject.h"
#include "ConnectionCodec.h"
-
+#include "ProtocolAccess.h"
namespace qpid {
namespace sys {
@@ -42,7 +42,8 @@ class ProtocolFactory : public qpid::SharedObject<ProtocolFactory>
virtual void connect(
boost::shared_ptr<Poller>,
const std::string& host, int16_t port,
- ConnectionCodec::Factory* codec) = 0;
+ ConnectionCodec::Factory* codec,
+ ProtocolAccess* access = 0) = 0;
};
inline ProtocolFactory::~ProtocolFactory() {}
diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp
index 045bc56e90..5d2cadbe03 100644
--- a/cpp/src/qpid/sys/TCPIOPlugin.cpp
+++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp
@@ -41,13 +41,15 @@ class AsynchIOProtocolFactory : public ProtocolFactory {
public:
AsynchIOProtocolFactory(int16_t port, int backlog);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
- void connect(Poller::shared_ptr, const std::string& host, int16_t port, ConnectionCodec::Factory*);
+ void connect(Poller::shared_ptr, const std::string& host, int16_t port,
+ ConnectionCodec::Factory*, ProtocolAccess*);
uint16_t getPort() const;
std::string getHost() const;
private:
- void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*, bool isClient);
+ void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
+ bool isClient, ProtocolAccess*);
};
// Static instance to initialise plugin
@@ -72,17 +74,32 @@ AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog) :
{}
void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
- ConnectionCodec::Factory* f, bool isClient) {
- AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f);
+ ConnectionCodec::Factory* f, bool isClient,
+ ProtocolAccess* a) {
+ AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f, a);
+ AsynchIO* aio;
+
if (isClient)
async->setClient();
- AsynchIO* aio = new AsynchIO(s,
- boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
- boost::bind(&AsynchIOHandler::eof, async, _1),
- boost::bind(&AsynchIOHandler::disconnect, async, _1),
- boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
- boost::bind(&AsynchIOHandler::nobuffs, async, _1),
- boost::bind(&AsynchIOHandler::idle, async, _1));
+ if (a == 0)
+ aio = new AsynchIO(s,
+ boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+ boost::bind(&AsynchIOHandler::eof, async, _1),
+ boost::bind(&AsynchIOHandler::disconnect, async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+ boost::bind(&AsynchIOHandler::idle, async, _1));
+ else {
+ aio = new AsynchIO(s,
+ boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+ boost::bind(&ProtocolAccess::closedEof, a, async),
+ boost::bind(&AsynchIOHandler::disconnect, async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+ boost::bind(&AsynchIOHandler::idle, async, _1));
+ a->setAio(aio);
+ }
+
async->init(aio, 4);
aio->start(poller);
}
@@ -95,26 +112,31 @@ std::string AsynchIOProtocolFactory::getHost() const {
return listener.getSockname();
}
-void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller, ConnectionCodec::Factory* fact) {
+void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller,
+ ConnectionCodec::Factory* fact) {
acceptor.reset(
new AsynchAcceptor(listener,
- boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
+ boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false,
+ (ProtocolAccess*) 0)));
acceptor->start(poller);
}
void AsynchIOProtocolFactory::connect(
Poller::shared_ptr poller,
const std::string& host, int16_t port,
- ConnectionCodec::Factory* f)
+ ConnectionCodec::Factory* fact,
+ ProtocolAccess* access)
{
// Note that the following logic does not cause a memory leak.
// The allocated Socket is freed either by the AsynchConnector
// upon connection failure or by the AsynchIO upon connection
// shutdown. The allocated AsynchConnector frees itself when it
// is no longer needed.
+
Socket* socket = new Socket();
- new AsynchConnector(*socket, poller, host, port,
- boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, f, true));
+ new AsynchConnector (*socket, poller, host, port,
+ boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, true, access),
+ boost::bind(&ProtocolAccess::closed, access, _1, _2));
}
}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/posix/AsynchIO.cpp b/cpp/src/qpid/sys/posix/AsynchIO.cpp
index 9dcb841992..470db4c614 100644
--- a/cpp/src/qpid/sys/posix/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/posix/AsynchIO.cpp
@@ -130,7 +130,7 @@ void AsynchConnector::connComplete(DispatchHandle& h)
h.stopWatch();
if (errCode == 0) {
connCallback(socket);
- DispatchHandle::doDelete();
+ DispatchHandle::doDelete();
} else {
failure(errCode, std::string(strerror(errCode)));
}
@@ -148,6 +148,7 @@ void AsynchConnector::failure(int errCode, std::string message)
}
/*
+>>>>>>> .r654667
* Asynch reader/writer
*/
AsynchIO::AsynchIO(const Socket& s,