summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-01-24 22:26:12 +0000
committerAlan Conway <aconway@apache.org>2008-01-24 22:26:12 +0000
commitf0a31beb7a609591e7b34e60ddfd85e9e183fbc0 (patch)
tree5582c3f04ee1b417d11050b0c994da657db09b39 /cpp/src/qpid/client
parentf2ab2fa9fcb713eedf21e98a2a3f9fab8e76dead (diff)
downloadqpid-python-f0a31beb7a609591e7b34e60ddfd85e9e183fbc0.tar.gz
Improved/additional client API tests.
- Replaced InProcessBroker with a more accurate loopback BrokerFixture. - Added asserts for mutex/condition/thread errors in debug build. - Added client tests for several exception conditions. - Added peer address to log ouput, client/server distinguished by (addr) or [addr] - Fixed various deadlocks & races exposed by the new asserts & tests. File-by-file: New BrokerFixture replaces InProcessBroker D src/tests/InProcessBroker.h M src/tests/BrokerFixture.h M src/tests/SocketProxy.h M src/tests/Makefile.am Made it run a bit faster. M src/tests/quick_perftest Redundant D src/tests/APRBaseTest.cpp Updated tests to use BrokerFixture M src/tests/ClientChannelTest.cpp M src/tests/exception_test.cpp M src/tests/ClientSessionTest.cpp Print thread IDs in decimal, same as GDB. M src/qpid/log/Logger.cpp Assert mutex/condition ops in debug build. M src/qpid/sys/posix/check.h M src/qpid/sys/posix/Mutex.h M src/qpid/sys/posix/Condition.h M src/qpid/sys/posix/Thread.h Added toFd() so SocketProxy can use ::select() M src/qpid/sys/Socket.h M src/qpid/sys/posix/Socket.cpp Fixes for races & deadlocks shown up by new tests & asserts. Mostly shutdown/close issues. M src/qpid/client/ConnectionHandler.h M src/qpid/client/ConnectionImpl.cpp M src/qpid/client/Demux.h M src/qpid/client/SessionCore.cpp M src/qpid/client/ConnectionHandler.cpp M src/qpid/client/Connector.h M src/qpid/client/Demux.cpp M src/qpid/client/Dispatcher.cpp M src/qpid/client/ConnectionImpl.h Logging peer address. M src/qpid/sys/AsynchIOAcceptor.cpp git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@615063 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.cpp2
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.h2
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp84
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.h7
-rw-r--r--cpp/src/qpid/client/Connector.cpp22
-rw-r--r--cpp/src/qpid/client/Connector.h6
-rw-r--r--cpp/src/qpid/client/Demux.cpp4
-rw-r--r--cpp/src/qpid/client/Demux.h3
-rw-r--r--cpp/src/qpid/client/Dispatcher.cpp3
-rw-r--r--cpp/src/qpid/client/SessionCore.cpp13
10 files changed, 88 insertions, 58 deletions
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp
index 84b0768c27..e1c50c14fc 100644
--- a/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -142,7 +142,7 @@ void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody
void ConnectionHandler::fail(const std::string& message)
{
- QPID_LOG(error, message);
+ QPID_LOG(warning, message);
setState(FAILED);
}
diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h
index e409f0f2a9..bb50495c06 100644
--- a/cpp/src/qpid/client/ConnectionHandler.h
+++ b/cpp/src/qpid/client/ConnectionHandler.h
@@ -59,7 +59,6 @@ class ConnectionHandler : private StateManager,
void send(const framing::AMQBody& body);
void error(uint16_t code, const std::string& message, uint16_t classId = 0, uint16_t methodId = 0);
void error(uint16_t code, const std::string& message, framing::AMQBody* body);
- void fail(const std::string& message);
public:
using InputHandler::handle;
@@ -75,6 +74,7 @@ public:
void waitForOpen();
void close();
+ void fail(const std::string& message);
CloseListener onClose;
ErrorListener onError;
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index dd986deec4..b248de8744 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -18,6 +18,7 @@
* under the License.
*
*/
+#include "qpid/log/Statement.h"
#include "qpid/framing/constants.h"
#include "qpid/framing/reply_exceptions.h"
@@ -44,14 +45,18 @@ ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c)
connector->setShutdownHandler(this);
}
-ConnectionImpl::~ConnectionImpl() { close(); }
+ConnectionImpl::~ConnectionImpl() {
+ // Important to close the connector first, to ensure the
+ // connector thread does not call on us while the destructor
+ // is running.
+ connector->close();
+}
void ConnectionImpl::addSession(const boost::shared_ptr<SessionCore>& session)
{
Mutex::ScopedLock l(lock);
boost::weak_ptr<SessionCore>& s = sessions[session->getChannel()];
- if (s.lock())
- throw ChannelBusyException();
+ if (s.lock()) throw ChannelBusyException();
s = session;
}
@@ -81,31 +86,15 @@ void ConnectionImpl::open(const std::string& host, int port,
handler.pwd = pwd;
handler.vhost = vhost;
+ QPID_LOG(info, "Connecting to " << host << ":" << port);
connector->connect(host, port);
connector->init();
handler.waitForOpen();
}
-bool ConnectionImpl::setClosing()
-{
- Mutex::ScopedLock l(lock);
- if (isClosing || isClosed) {
- return false;
- }
- isClosing = true;
- return true;
-}
-
-void ConnectionImpl::close()
-{
- if (setClosing()) {
- handler.close();
- }
-}
-
void ConnectionImpl::idleIn()
{
- connector->close();
+ close();
}
void ConnectionImpl::idleOut()
@@ -114,35 +103,52 @@ void ConnectionImpl::idleOut()
connector->send(frame);
}
-template <class F>
-void ConnectionImpl::forChannels(F functor) {
- for (SessionMap::iterator i = sessions.begin();
- i != sessions.end(); ++i) {
- try {
- boost::shared_ptr<SessionCore> s = i->second.lock();
- if (s) functor(*s);
- } catch (...) { assert(0); }
+void ConnectionImpl::close()
+{
+ Mutex::ScopedLock l(lock);
+ if (isClosing || isClosed) return;
+ isClosing = true;
+ {
+ Mutex::ScopedUnlock u(lock);
+ handler.close();
+ }
+ closed(REPLY_SUCCESS, "Closed by client");
+}
+
+// Set closed flags and erase the sessions map, but keep the contents
+// so sessions can be updated outside the lock.
+ConnectionImpl::SessionVector ConnectionImpl::closeInternal(const Mutex::ScopedLock&) {
+ isClosed = true;
+ connector->close();
+ SessionVector save;
+ for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+ boost::shared_ptr<SessionCore> s = i->second.lock();
+ if (s) save.push_back(s);
}
+ sessions.clear();
+ return save;
}
-void ConnectionImpl::shutdown()
+void ConnectionImpl::closed(uint16_t code, const std::string& text)
{
Mutex::ScopedLock l(lock);
if (isClosed) return;
- forChannels(boost::bind(&SessionCore::connectionBroke, _1,
- INTERNAL_ERROR, "Unexpected socket closure."));
- sessions.clear();
- isClosed = true;
+ SessionVector save(closeInternal(l));
+ Mutex::ScopedUnlock u(lock);
+ std::for_each(save.begin(), save.end(), boost::bind(&SessionCore::connectionClosed, _1, code, text));
}
-void ConnectionImpl::closed(uint16_t code, const std::string& text)
+static const std::string CONN_CLOSED("Connection closed by broker");
+
+void ConnectionImpl::shutdown()
{
Mutex::ScopedLock l(lock);
if (isClosed) return;
- forChannels(boost::bind(&SessionCore::connectionClosed, _1, code, text));
- sessions.clear();
- isClosed = true;
- connector->close();
+ SessionVector save(closeInternal(l));
+ handler.fail(CONN_CLOSED);
+ Mutex::ScopedUnlock u(lock);
+ std::for_each(save.begin(), save.end(),
+ boost::bind(&SessionCore::connectionBroke, _1, INTERNAL_ERROR, CONN_CLOSED));
}
void ConnectionImpl::erase(uint16_t ch) {
diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h
index 1fe8ac4653..bf8226a776 100644
--- a/cpp/src/qpid/client/ConnectionImpl.h
+++ b/cpp/src/qpid/client/ConnectionImpl.h
@@ -43,6 +43,8 @@ class ConnectionImpl : public framing::FrameHandler,
{
typedef std::map<uint16_t, boost::weak_ptr<SessionCore> > SessionMap;
+ typedef std::vector<boost::shared_ptr<SessionCore> > SessionVector;
+
SessionMap sessions;
ConnectionHandler handler;
boost::shared_ptr<Connector> connector;
@@ -51,6 +53,9 @@ class ConnectionImpl : public framing::FrameHandler,
bool isClosed;
bool isClosing;
+ template <class F> void detachAll(const F&);
+
+ SessionVector closeInternal(const sys::Mutex::ScopedLock&);
void incoming(framing::AMQFrame& frame);
void closed(uint16_t, const std::string&);
void idleOut();
@@ -58,8 +63,6 @@ class ConnectionImpl : public framing::FrameHandler,
void shutdown();
bool setClosing();
- template <class F> void forChannels(F functor);
-
public:
typedef boost::shared_ptr<ConnectionImpl> shared_ptr;
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index 95314dcb40..4fb5aa6b4d 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -27,7 +27,7 @@
#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/Dispatcher.h"
#include "qpid/sys/Poller.h"
-
+#include "qpid/Msg.h"
#include <boost/bind.hpp>
namespace qpid {
@@ -43,6 +43,7 @@ Connector::Connector(
send_buffer_size(buffer_size),
version(ver),
closed(true),
+ joined(true),
timeout(0),
idleIn(0), idleOut(0),
timeoutHandler(0),
@@ -52,11 +53,11 @@ Connector::Connector(
Connector::~Connector() {
close();
- if (receiver.id() && receiver.id() != Thread::current().id())
- receiver.join();
}
void Connector::connect(const std::string& host, int port){
+ Mutex::ScopedLock l(closedLock);
+ assert(closed);
socket.connect(host, port);
closed = false;
poller = Poller::shared_ptr(new Poller);
@@ -71,20 +72,27 @@ void Connector::connect(const std::string& host, int port){
}
void Connector::init(){
+ Mutex::ScopedLock l(closedLock);
+ assert(joined);
ProtocolInitiation init(version);
-
writeDataBlock(init);
+ joined = false;
receiver = Thread(this);
}
bool Connector::closeInternal() {
Mutex::ScopedLock l(closedLock);
+ bool ret = !closed;
if (!closed) {
- poller->shutdown();
closed = true;
- return true;
+ poller->shutdown();
+ }
+ if (!joined && receiver.id() != Thread::current().id()) {
+ joined = true;
+ Mutex::ScopedUnlock u(closedLock);
+ receiver.join();
}
- return false;
+ return ret;
}
void Connector::close() {
diff --git a/cpp/src/qpid/client/Connector.h b/cpp/src/qpid/client/Connector.h
index aefd91f6f4..121a1c33aa 100644
--- a/cpp/src/qpid/client/Connector.h
+++ b/cpp/src/qpid/client/Connector.h
@@ -77,8 +77,9 @@ class Connector : public framing::OutputHandler,
const int send_buffer_size;
framing::ProtocolVersion version;
- bool closed;
sys::Mutex closedLock;
+ bool closed;
+ bool joined;
sys::AbsTime lastIn;
sys::AbsTime lastOut;
@@ -112,6 +113,8 @@ class Connector : public framing::OutputHandler,
void writebuff(qpid::sys::AsynchIO&);
void writeDataBlock(const framing::AMQDataBlock& data);
void eof(qpid::sys::AsynchIO&);
+
+ std::string identifier;
friend class Channel;
@@ -130,6 +133,7 @@ class Connector : public framing::OutputHandler,
virtual void send(framing::AMQFrame& frame);
virtual void setReadTimeout(uint16_t timeout);
virtual void setWriteTimeout(uint16_t timeout);
+ const std::string& getIdentifier() const { return identifier; }
};
}}
diff --git a/cpp/src/qpid/client/Demux.cpp b/cpp/src/qpid/client/Demux.cpp
index e61103981b..cb9372cee7 100644
--- a/cpp/src/qpid/client/Demux.cpp
+++ b/cpp/src/qpid/client/Demux.cpp
@@ -45,6 +45,10 @@ ScopedDivert::~ScopedDivert()
demuxer.remove(dest);
}
+Demux::Demux() : defaultQueue(new Queue()) {}
+
+Demux::~Demux() { close(); }
+
Demux::QueuePtr ScopedDivert::getQueue()
{
return queue;
diff --git a/cpp/src/qpid/client/Demux.h b/cpp/src/qpid/client/Demux.h
index 234282a8d2..dce24223f2 100644
--- a/cpp/src/qpid/client/Demux.h
+++ b/cpp/src/qpid/client/Demux.h
@@ -47,7 +47,8 @@ public:
typedef sys::BlockingQueue<framing::FrameSet::shared_ptr> Queue;
typedef boost::shared_ptr<Queue> QueuePtr;
- Demux() : defaultQueue(new Queue()) {}
+ Demux();
+ ~Demux();
void handle(framing::FrameSet::shared_ptr);
void close();
diff --git a/cpp/src/qpid/client/Dispatcher.cpp b/cpp/src/qpid/client/Dispatcher.cpp
index f4a7ff54d8..0783d5bc55 100644
--- a/cpp/src/qpid/client/Dispatcher.cpp
+++ b/cpp/src/qpid/client/Dispatcher.cpp
@@ -62,13 +62,12 @@ void Dispatcher::start()
}
void Dispatcher::run()
-{
+{
Mutex::ScopedLock l(lock);
if (running)
throw Exception("Dispatcher is already running.");
boost::state_saver<bool> reset(running); // Reset to false on exit.
running = true;
- queue->open();
try {
while (!queue->isClosed()) {
Mutex::ScopedUnlock u(lock);
diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp
index 07a791bef3..5079c47b5e 100644
--- a/cpp/src/qpid/client/SessionCore.cpp
+++ b/cpp/src/qpid/client/SessionCore.cpp
@@ -125,7 +125,7 @@ void SessionCore::detach(int c, const std::string& t) {
channel.next = 0;
code=c;
text=t;
- l3.getDemux().close();
+ l3.getDemux().close();
}
void SessionCore::doClose(int code, const std::string& text) {
@@ -270,7 +270,6 @@ void SessionCore::detached() { // network thread
Lock l(state);
check(state == SUSPENDING,
COMMAND_INVALID, UNEXPECTED_SESSION_DETACHED);
- connection->erase(channel);
doSuspend(REPLY_SUCCESS, OK);
}
@@ -379,22 +378,28 @@ bool isCloseResponse(const AMQFrame& frame) {
// Network thread.
void SessionCore::handleIn(AMQFrame& frame) {
+ ConnectionImpl::shared_ptr save;
{
Lock l(state);
+ save=connection;
// Ignore frames received while closing other than closed response.
if (state==CLOSING && !isCloseResponse(frame))
return;
}
try {
// Cast to expose private SessionHandler functions.
- if (!invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) {
+ if (invoke(static_cast<SessionHandler&>(*this), *frame.getBody())) {
+ // If we were detached by a session command, tell the connection.
+ if (!connection) save->erase(channel);
+ }
+ else {
session->received(frame);
l3.handle(frame);
}
} catch (const ChannelException& e) {
QPID_LOG(error, "Channel exception:" << e.what());
doClose(e.code, e.what());
- }
+ }
}
void SessionCore::handleOut(AMQFrame& frame)