summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/sys
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/sys')
-rw-r--r--cpp/src/qpid/sys/AsynchIOHandler.cpp7
-rw-r--r--cpp/src/qpid/sys/AsynchIOHandler.h4
-rw-r--r--cpp/src/qpid/sys/ConnectionCodec.h3
-rw-r--r--cpp/src/qpid/sys/ProtocolAccess.h65
-rw-r--r--cpp/src/qpid/sys/ProtocolFactory.h4
-rw-r--r--cpp/src/qpid/sys/Socket.h1
-rw-r--r--cpp/src/qpid/sys/TCPIOPlugin.cpp45
-rw-r--r--cpp/src/qpid/sys/posix/Socket.cpp7
8 files changed, 31 insertions, 105 deletions
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.cpp b/cpp/src/qpid/sys/AsynchIOHandler.cpp
index 31974993bb..ca2bd7c93c 100644
--- a/cpp/src/qpid/sys/AsynchIOHandler.cpp
+++ b/cpp/src/qpid/sys/AsynchIOHandler.cpp
@@ -36,14 +36,13 @@ struct Buff : public AsynchIO::BufferBase {
{ delete [] bytes;}
};
-AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f, ProtocolAccess* a) :
+AsynchIOHandler::AsynchIOHandler(std::string id, ConnectionCodec::Factory* f) :
identifier(id),
aio(0),
factory(f),
codec(0),
readError(false),
- isClient(false),
- access(a)
+ isClient(false)
{}
AsynchIOHandler::~AsynchIOHandler() {
@@ -153,7 +152,7 @@ void AsynchIOHandler::nobuffs(AsynchIO&) {
void AsynchIOHandler::idle(AsynchIO&){
if (isClient && codec == 0) {
- codec = factory->create(*this, identifier, access);
+ codec = factory->create(*this, identifier);
write(framing::ProtocolInitiation(codec->getVersion()));
return;
}
diff --git a/cpp/src/qpid/sys/AsynchIOHandler.h b/cpp/src/qpid/sys/AsynchIOHandler.h
index ece52f57c4..7448094a94 100644
--- a/cpp/src/qpid/sys/AsynchIOHandler.h
+++ b/cpp/src/qpid/sys/AsynchIOHandler.h
@@ -32,7 +32,6 @@ namespace framing {
}
namespace sys {
-class ProtocolAccess;
class AsynchIOHandler : public OutputControl {
std::string identifier;
AsynchIO* aio;
@@ -40,12 +39,11 @@ class AsynchIOHandler : public OutputControl {
ConnectionCodec* codec;
bool readError;
bool isClient;
- ProtocolAccess* access;
void write(const framing::ProtocolInitiation&);
public:
- AsynchIOHandler(std::string id, ConnectionCodec::Factory* f, ProtocolAccess* a =0);
+ AsynchIOHandler(std::string id, ConnectionCodec::Factory* f);
~AsynchIOHandler();
void init(AsynchIO* a, int numBuffs);
diff --git a/cpp/src/qpid/sys/ConnectionCodec.h b/cpp/src/qpid/sys/ConnectionCodec.h
index 4c5a68e576..efc6839b60 100644
--- a/cpp/src/qpid/sys/ConnectionCodec.h
+++ b/cpp/src/qpid/sys/ConnectionCodec.h
@@ -29,7 +29,6 @@
namespace qpid {
namespace sys {
-class ProtocolAccess;
/**
* Interface of coder/decoder for a connection of a specific protocol
@@ -69,7 +68,7 @@ class ConnectionCodec {
/** Return "preferred" codec for outbound connections. */
virtual ConnectionCodec* create(
- OutputControl&, const std::string& id, ProtocolAccess* a = 0
+ OutputControl&, const std::string& id
) = 0;
};
};
diff --git a/cpp/src/qpid/sys/ProtocolAccess.h b/cpp/src/qpid/sys/ProtocolAccess.h
deleted file mode 100644
index 433bf0ef97..0000000000
--- a/cpp/src/qpid/sys/ProtocolAccess.h
+++ /dev/null
@@ -1,65 +0,0 @@
-#ifndef _sys_ProtocolAccess_h
-#define _sys_ProtocolAccess_h
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "AsynchIO.h"
-#include "AsynchIOHandler.h"
-#include <boost/function.hpp>
-#include <boost/shared_ptr.hpp>
-
-namespace qpid {
-
-namespace broker
-{
-class Connection;
-}
-
-namespace sys {
-
-class ProtocolAccess
-{
-public:
- typedef boost::function0<void> Callback;
- typedef boost::function2<void, int, std::string> ClosedCallback;
- typedef boost::function1<void, boost::shared_ptr<broker::Connection> > SetConnCallback;
-
- ProtocolAccess (Callback ecb, ClosedCallback ccb, SetConnCallback sccb)
- : aio(0), establishedCb(ecb), closedCb(ccb), setConnCb(sccb) {}
- ~ProtocolAccess() {}
- inline void close() { if (aio) aio->queueWriteClose(); }
-
- inline void setAio(AsynchIO *_aio) { aio = _aio; establishedCb(); }
- inline void closedEof(AsynchIOHandler* async) { async->eof(*aio); closedCb(-1, "Closed by Peer"); }
- inline void closed(int err, std::string str) { closedCb(err, str); }
- inline void callConnCb(boost::shared_ptr<broker::Connection> c) { setConnCb(c); }
-
-private:
- AsynchIO* aio;
- Callback establishedCb;
- ClosedCallback closedCb;
- SetConnCallback setConnCb;
-};
-
-}}
-
-#endif //!_sys_ProtocolAccess_h
diff --git a/cpp/src/qpid/sys/ProtocolFactory.h b/cpp/src/qpid/sys/ProtocolFactory.h
index e61a94b205..e8eaefe1f6 100644
--- a/cpp/src/qpid/sys/ProtocolFactory.h
+++ b/cpp/src/qpid/sys/ProtocolFactory.h
@@ -25,7 +25,7 @@
#include <stdint.h>
#include "qpid/SharedObject.h"
#include "ConnectionCodec.h"
-#include "ProtocolAccess.h"
+#include <boost/function.hpp>
namespace qpid {
namespace sys {
@@ -43,7 +43,7 @@ class ProtocolFactory : public qpid::SharedObject<ProtocolFactory>
boost::shared_ptr<Poller>,
const std::string& host, int16_t port,
ConnectionCodec::Factory* codec,
- ProtocolAccess* access = 0) = 0;
+ boost::function2<void, int, std::string> failed) = 0;
};
inline ProtocolFactory::~ProtocolFactory() {}
diff --git a/cpp/src/qpid/sys/Socket.h b/cpp/src/qpid/sys/Socket.h
index 806d6b5164..f95d841b39 100644
--- a/cpp/src/qpid/sys/Socket.h
+++ b/cpp/src/qpid/sys/Socket.h
@@ -118,6 +118,7 @@ public:
private:
Socket(IOHandlePrivate*);
+ mutable std::string connectname;
};
}}
diff --git a/cpp/src/qpid/sys/TCPIOPlugin.cpp b/cpp/src/qpid/sys/TCPIOPlugin.cpp
index 5d2cadbe03..e82a6a9102 100644
--- a/cpp/src/qpid/sys/TCPIOPlugin.cpp
+++ b/cpp/src/qpid/sys/TCPIOPlugin.cpp
@@ -42,14 +42,15 @@ class AsynchIOProtocolFactory : public ProtocolFactory {
AsynchIOProtocolFactory(int16_t port, int backlog);
void accept(Poller::shared_ptr, ConnectionCodec::Factory*);
void connect(Poller::shared_ptr, const std::string& host, int16_t port,
- ConnectionCodec::Factory*, ProtocolAccess*);
+ ConnectionCodec::Factory*,
+ boost::function2<void, int, std::string> failed);
uint16_t getPort() const;
std::string getHost() const;
private:
void established(Poller::shared_ptr, const Socket&, ConnectionCodec::Factory*,
- bool isClient, ProtocolAccess*);
+ bool isClient);
};
// Static instance to initialise plugin
@@ -74,31 +75,18 @@ AsynchIOProtocolFactory::AsynchIOProtocolFactory(int16_t port, int backlog) :
{}
void AsynchIOProtocolFactory::established(Poller::shared_ptr poller, const Socket& s,
- ConnectionCodec::Factory* f, bool isClient,
- ProtocolAccess* a) {
- AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f, a);
- AsynchIO* aio;
+ ConnectionCodec::Factory* f, bool isClient) {
+ AsynchIOHandler* async = new AsynchIOHandler(s.getPeerAddress(), f);
if (isClient)
async->setClient();
- if (a == 0)
- aio = new AsynchIO(s,
- boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
- boost::bind(&AsynchIOHandler::eof, async, _1),
- boost::bind(&AsynchIOHandler::disconnect, async, _1),
- boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
- boost::bind(&AsynchIOHandler::nobuffs, async, _1),
- boost::bind(&AsynchIOHandler::idle, async, _1));
- else {
- aio = new AsynchIO(s,
- boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
- boost::bind(&ProtocolAccess::closedEof, a, async),
- boost::bind(&AsynchIOHandler::disconnect, async, _1),
- boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
- boost::bind(&AsynchIOHandler::nobuffs, async, _1),
- boost::bind(&AsynchIOHandler::idle, async, _1));
- a->setAio(aio);
- }
+ AsynchIO* aio = new AsynchIO(s,
+ boost::bind(&AsynchIOHandler::readbuff, async, _1, _2),
+ boost::bind(&AsynchIOHandler::eof, async, _1),
+ boost::bind(&AsynchIOHandler::disconnect, async, _1),
+ boost::bind(&AsynchIOHandler::closedSocket, async, _1, _2),
+ boost::bind(&AsynchIOHandler::nobuffs, async, _1),
+ boost::bind(&AsynchIOHandler::idle, async, _1));
async->init(aio, 4);
aio->start(poller);
@@ -116,8 +104,7 @@ void AsynchIOProtocolFactory::accept(Poller::shared_ptr poller,
ConnectionCodec::Factory* fact) {
acceptor.reset(
new AsynchAcceptor(listener,
- boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false,
- (ProtocolAccess*) 0)));
+ boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, false)));
acceptor->start(poller);
}
@@ -125,7 +112,7 @@ void AsynchIOProtocolFactory::connect(
Poller::shared_ptr poller,
const std::string& host, int16_t port,
ConnectionCodec::Factory* fact,
- ProtocolAccess* access)
+ boost::function2<void, int, std::string> failed)
{
// Note that the following logic does not cause a memory leak.
// The allocated Socket is freed either by the AsynchConnector
@@ -135,8 +122,8 @@ void AsynchIOProtocolFactory::connect(
Socket* socket = new Socket();
new AsynchConnector (*socket, poller, host, port,
- boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, true, access),
- boost::bind(&ProtocolAccess::closed, access, _1, _2));
+ boost::bind(&AsynchIOProtocolFactory::established, this, poller, _1, fact, true),
+ failed);
}
}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/posix/Socket.cpp b/cpp/src/qpid/sys/posix/Socket.cpp
index 67f6b6db4c..f4320531a9 100644
--- a/cpp/src/qpid/sys/posix/Socket.cpp
+++ b/cpp/src/qpid/sys/posix/Socket.cpp
@@ -32,6 +32,7 @@
#include <netdb.h>
#include <cstdlib>
#include <string.h>
+#include <iostream>
#include <boost/format.hpp>
@@ -138,6 +139,10 @@ const char* h_errstr(int e) {
void Socket::connect(const std::string& host, int port) const
{
+ std::stringstream namestream;
+ namestream << host << ":" << port;
+ connectname = namestream.str();
+
const int& socket = impl->fd;
struct sockaddr_in name;
name.sin_family = AF_INET;
@@ -240,6 +245,8 @@ std::string Socket::getPeername() const
std::string Socket::getPeerAddress() const
{
+ if (!connectname.empty())
+ return std::string (connectname);
return getName(impl->fd, false, true);
}