summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-09-10 08:41:05 +0000
committerGordon Sim <gsim@apache.org>2007-09-10 08:41:05 +0000
commita5c0fde5d0b96ae0b747f0cea21414753d6ee654 (patch)
tree4a809a880691db3e04fa3c7374db500b767ca85b /cpp/src/qpid/client
parent783b718d0b270121cd2e597424d0c81adea77a38 (diff)
downloadqpid-python-a5c0fde5d0b96ae0b747f0cea21414753d6ee654.tar.gz
Client side support for message and delivery properties in header segments.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@574176 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/ClientMessage.h10
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp34
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.h5
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp17
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.h2
5 files changed, 44 insertions, 24 deletions
diff --git a/cpp/src/qpid/client/ClientMessage.h b/cpp/src/qpid/client/ClientMessage.h
index 19b0f867bc..1afe5585a9 100644
--- a/cpp/src/qpid/client/ClientMessage.h
+++ b/cpp/src/qpid/client/ClientMessage.h
@@ -58,8 +58,14 @@ class Message : public framing::BasicHeaderProperties, public framing::MethodCon
bool isRedelivered() const { return redelivered; }
void setRedelivered(bool _redelivered){ redelivered = _redelivered; }
- const HeaderProperties& getMethodHeaders() const { return *this; }
-
+ framing::AMQHeaderBody getHeader() const
+ {
+ framing::AMQHeaderBody header;
+ BasicHeaderProperties* properties = header.get<BasicHeaderProperties>(true);
+ BasicHeaderProperties::copy<BasicHeaderProperties, Message>(*properties, *this);
+ properties->setContentLength(data.size());
+ return header;
+ }
//TODO: move this elsewhere (GRS 24/08/2007)
void populate(framing::FrameSet& frameset)
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index 5ff34cde4e..d21d550ee2 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -24,6 +24,7 @@
using namespace qpid::client;
using namespace qpid::framing;
+using namespace qpid::sys;
ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) : connector(c)
{
@@ -38,6 +39,7 @@ ConnectionImpl::ConnectionImpl(boost::shared_ptr<Connector> c) : connector(c)
void ConnectionImpl::allocated(SessionCore::shared_ptr session)
{
+ Mutex::ScopedLock l(lock);
if (sessions.find(session->getId()) != sessions.end()) {
throw Exception("Id already in use.");
}
@@ -46,6 +48,7 @@ void ConnectionImpl::allocated(SessionCore::shared_ptr session)
void ConnectionImpl::released(SessionCore::shared_ptr session)
{
+ Mutex::ScopedLock l(lock);
SessionMap::iterator i = sessions.find(session->getId());
if (i != sessions.end()) {
sessions.erase(i);
@@ -59,12 +62,7 @@ void ConnectionImpl::handle(framing::AMQFrame& frame)
void ConnectionImpl::incoming(framing::AMQFrame& frame)
{
- uint16_t id = frame.getChannel();
- SessionMap::iterator i = sessions.find(id);
- if (i == sessions.end()) {
- throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str());
- }
- i->second->handle(frame);
+ find(frame.getChannel())->handle(frame);
}
void ConnectionImpl::open(const std::string& host, int port,
@@ -93,10 +91,7 @@ void ConnectionImpl::closed()
void ConnectionImpl::closedByPeer(uint16_t code, const std::string& text)
{
- for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) {
- i->second->closed(code, text);
- }
- sessions.clear();
+ signalClose(code, text);
connector->close();
}
@@ -114,8 +109,25 @@ void ConnectionImpl::idleOut()
void ConnectionImpl::shutdown()
{
//this indicates that the socket to the server has closed
+ signalClose(0, "Unexpected socket closure.");
+}
+
+void ConnectionImpl::signalClose(uint16_t code, const std::string& text)
+{
+ Mutex::ScopedLock l(lock);
for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); i++) {
- i->second->closed(0, "Unexpected socket closure.");
+ Mutex::ScopedUnlock u(lock);
+ i->second->closed(code, text);
}
sessions.clear();
}
+
+SessionCore::shared_ptr ConnectionImpl::find(uint16_t id)
+{
+ Mutex::ScopedLock l(lock);
+ SessionMap::iterator i = sessions.find(id);
+ if (i == sessions.end()) {
+ throw ConnectionException(504, (boost::format("Invalid channel number %g") % id).str());
+ }
+ return i->second;
+}
diff --git a/cpp/src/qpid/client/ConnectionImpl.h b/cpp/src/qpid/client/ConnectionImpl.h
index e37713e77d..a2ee14ea6e 100644
--- a/cpp/src/qpid/client/ConnectionImpl.h
+++ b/cpp/src/qpid/client/ConnectionImpl.h
@@ -25,6 +25,7 @@
#include <map>
#include <boost/shared_ptr.hpp>
#include "qpid/framing/FrameHandler.h"
+#include "qpid/sys/Mutex.h"
#include "qpid/sys/ShutdownHandler.h"
#include "qpid/sys/TimeoutHandler.h"
#include "ConnectionHandler.h"
@@ -44,6 +45,7 @@ class ConnectionImpl : public framing::FrameHandler,
ConnectionHandler handler;
boost::shared_ptr<Connector> connector;
framing::ProtocolVersion version;
+ sys::Mutex lock;
void incoming(framing::AMQFrame& frame);
void closed();
@@ -51,6 +53,9 @@ class ConnectionImpl : public framing::FrameHandler,
void idleOut();
void idleIn();
void shutdown();
+ void signalClose(uint16_t, const std::string&);
+ SessionCore::shared_ptr find(uint16_t);
+
public:
typedef boost::shared_ptr<ConnectionImpl> shared_ptr;
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
index 1520ba2272..c2b5e45928 100644
--- a/cpp/src/qpid/client/ExecutionHandler.cpp
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -181,31 +181,28 @@ SequenceNumber ExecutionHandler::send(const AMQBody& command, const MethodConten
CompletionTracker::ResultListener l)
{
SequenceNumber id = send(command, l);
- sendContent(dynamic_cast<const BasicHeaderProperties&>(content.getMethodHeaders()), content.getData());
+ sendContent(content);
return id;
}
-void ExecutionHandler::sendContent(const BasicHeaderProperties& headers, const std::string& data)
+void ExecutionHandler::sendContent(const MethodContent& content)
{
- AMQHeaderBody header;
- BasicHeaderProperties::copy(*header.get<BasicHeaderProperties>(true), headers);
- header.get<BasicHeaderProperties>(true)->setContentLength(data.size());
- AMQFrame h(0, header);
- out(h);
+ AMQFrame header(0, content.getHeader());
+ out(header);
- u_int64_t data_length = data.length();
+ u_int64_t data_length = content.getData().length();
if(data_length > 0){
//frame itself uses 8 bytes
u_int32_t frag_size = maxFrameSize - 8;
if(data_length < frag_size){
- AMQFrame frame(0, AMQContentBody(data));
+ AMQFrame frame(0, AMQContentBody(content.getData()));
out(frame);
}else{
u_int32_t offset = 0;
u_int32_t remaining = data_length - offset;
while (remaining > 0) {
u_int32_t length = remaining > frag_size ? frag_size : remaining;
- string frag(data.substr(offset, length));
+ string frag(content.getData().substr(offset, length));
AMQFrame frame(0, AMQContentBody(frag));
out(frame);
offset += length;
diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h
index a42697e26a..3078f6bc3a 100644
--- a/cpp/src/qpid/client/ExecutionHandler.h
+++ b/cpp/src/qpid/client/ExecutionHandler.h
@@ -59,7 +59,7 @@ class ExecutionHandler :
void sendCompletion();
- void sendContent(const framing::BasicHeaderProperties& headers, const std::string& data);
+ void sendContent(const framing::MethodContent&);
public:
typedef CompletionTracker::ResultListener ResultListener;