diff options
Diffstat (limited to 'cpp/src/qpid/client')
| -rw-r--r-- | cpp/src/qpid/client/ClientMessage.h | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.cpp | 34 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ConnectionImpl.h | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ExecutionHandler.cpp | 17 | ||||
| -rw-r--r-- | cpp/src/qpid/client/ExecutionHandler.h | 2 |
5 files changed, 44 insertions, 24 deletions
diff --git a/cpp/src/qpid/client/ClientMessage.h b/cpp/src/qpid/client/ClientMessage.h index 19b0f867bc..1afe5585a9 100644 --- a/cpp/src/qpid/client/ClientMessage.h +++ b/cpp/src/qpid/client/ClientMessage.h @@ -58,8 +58,14 @@ class Message : public framing::BasicHeaderProperties, public framing::MethodCon bool isRedelivered() const { return redelivered; } void setRedelivered(bool _redelivered){ redelivered = _redelivered; } - const HeaderProperties& getMethodHeaders() const { return *this; } - + framing::AMQHeaderBody getHeader() const + { + framing::AMQHeaderBody header; + BasicHeaderProperties* properties = header.get<BasicHeaderProperties>(true); + BasicHeaderProperties::copy<BasicHeaderProperties, Message>(*properties, *this); + properties->setContentLength(data.size()); + return header; + } //TODO: move this elsewhere (GRS 24/08/2007) void populate(framing::FrameSet& frameset) diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index 5ff34cde4e..d21d550ee2 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -24,6 +24,7 @@ using namespace qpid::client; using namespace qpid::framing; +using namespace qpid::sys; ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) : connector(c) { @@ -38,6 +39,7 @@ ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) : connector(c) void ConnectionImpl::allocated(SessionCore::shared_ptr session) { + Mutex::ScopedLock l(lock); if (sessions.find(session->getId()) != sessions.end()) { throw Exception("Id already in use."); } @@ -46,6 +48,7 @@ void ConnectionImpl::allocated(SessionCore::shared_ptr session) void ConnectionImpl::released(SessionCore::shared_ptr session) { + Mutex::ScopedLock l(lock); SessionMap::iterator i = sessions.find(session->getId()); if (i != sessions.end()) { sessions.erase(i); @@ -59,12 +62,7 @@ void ConnectionImpl::handle(framing::AMQFrame& frame) void ConnectionImpl::incoming(framing::AMQFrame& frame) { - uint16_t id = frame.getChannel(); - SessionMap::iterator i = sessions.find(id); - if (i == sessions.end()) { - throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str()); - } - i->second->handle(frame); + find(frame.getChannel())->handle(frame); } void ConnectionImpl::open(const std::string& host, int port, @@ -93,10 +91,7 @@ void ConnectionImpl::closed() void ConnectionImpl::closedByPeer(uint16_t code, const std::string& text) { - for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) { - i->second->closed(code, text); - } - sessions.clear(); + signalClose(code, text); connector->close(); } @@ -114,8 +109,25 @@ void ConnectionImpl::idleOut() void ConnectionImpl::shutdown() { //this indicates that the socket to the server has closed + signalClose(0, "Unexpected socket closure."); +} + +void ConnectionImpl::signalClose(uint16_t code, const std::string& text) +{ + Mutex::ScopedLock l(lock); for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) { - i->second->closed(0, "Unexpected socket closure."); + Mutex::ScopedUnlock u(lock); + i->second->closed(code, text); } sessions.clear(); } + +SessionCore::shared_ptr ConnectionImpl::find(uint16_t id) +{ + Mutex::ScopedLock l(lock); + SessionMap::iterator i = sessions.find(id); + if (i == sessions.end()) { + throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str()); + } + return i->second; +} diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h index e37713e77d..a2ee14ea6e 100644 --- a/cpp/src/qpid/client/ConnectionImpl.h +++ b/cpp/src/qpid/client/ConnectionImpl.h @@ -25,6 +25,7 @@ #include <map> #include <boost/shared_ptr.hpp> #include "qpid/framing/FrameHandler.h" +#include "qpid/sys/Mutex.h" #include "qpid/sys/ShutdownHandler.h" #include "qpid/sys/TimeoutHandler.h" #include "ConnectionHandler.h" @@ -44,6 +45,7 @@ class ConnectionImpl : public framing::FrameHandler, ConnectionHandler handler; boost::shared_ptr<Connector> connector; framing::ProtocolVersion version; + sys::Mutex lock; void incoming(framing::AMQFrame& frame); void closed(); @@ -51,6 +53,9 @@ class ConnectionImpl : public framing::FrameHandler, void idleOut(); void idleIn(); void shutdown(); + void signalClose(uint16_t, const std::string&); + SessionCore::shared_ptr find(uint16_t); + public: typedef boost::shared_ptr<ConnectionImpl> shared_ptr; diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp index 1520ba2272..c2b5e45928 100644 --- a/cpp/src/qpid/client/ExecutionHandler.cpp +++ b/cpp/src/qpid/client/ExecutionHandler.cpp @@ -181,31 +181,28 @@ SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodConten CompletionTracker::ResultListener l) { SequenceNumber id = send(command, l); - sendContent(dynamic_cast<const BasicHeaderProperties&>(content.getMethodHeaders()), content.getData()); + sendContent(content); return id; } -void ExecutionHandler::sendContent(const BasicHeaderProperties& headers, const std::string& data) +void ExecutionHandler::sendContent(const MethodContent& content) { - AMQHeaderBody header; - BasicHeaderProperties::copy(*header.get<BasicHeaderProperties>(true), headers); - header.get<BasicHeaderProperties>(true)->setContentLength(data.size()); - AMQFrame h(0, header); - out(h); + AMQFrame header(0, content.getHeader()); + out(header); - u_int64_t data_length = data.length(); + u_int64_t data_length = content.getData().length(); if(data_length > 0){ //frame itself uses 8 bytes u_int32_t frag_size = maxFrameSize - 8; if(data_length < frag_size){ - AMQFrame frame(0, AMQContentBody(data)); + AMQFrame frame(0, AMQContentBody(content.getData())); out(frame); }else{ u_int32_t offset = 0; u_int32_t remaining = data_length - offset; while (remaining > 0) { u_int32_t length = remaining > frag_size ? frag_size : remaining; - string frag(data.substr(offset, length)); + string frag(content.getData().substr(offset, length)); AMQFrame frame(0, AMQContentBody(frag)); out(frame); offset += length; diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h index a42697e26a..3078f6bc3a 100644 --- a/cpp/src/qpid/client/ExecutionHandler.h +++ b/cpp/src/qpid/client/ExecutionHandler.h @@ -59,7 +59,7 @@ class ExecutionHandler : void sendCompletion(); - void sendContent(const framing::BasicHeaderProperties& headers, const std::string& data); + void sendContent(const framing::MethodContent&); public: typedef CompletionTracker::ResultListener ResultListener; |
