summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2007-08-16 20:12:33 +0000
committerAlan Conway <aconway@apache.org>2007-08-16 20:12:33 +0000
commit49c7a491c98c26fe7d4f017a7ba655dfc029278c (patch)
tree304d51ba039a5391b4ebde08caab3da978b465fb /cpp/src/qpid/client
parentdc13ca80ff893f74ab57fee6543de6543aa366bc (diff)
downloadqpid-python-49c7a491c98c26fe7d4f017a7ba655dfc029278c.tar.gz
AMQBodies are no longer allocated on the heap and passed with shared_ptr.
AMQFrame contains a boost::variant of AMQHeaderBody,AMQContentBody, AMQHeatbeatBody, and MethodHolder. A variant is basically a type-safe union, it can allocate any of the types in-place. MethodHolder contains a Blob, a less sophisticated kind of variant, which can contain any of the concrete method body types. Using variants for all the method types causes outrageous compile times and bloated library symbol names. Blob lacks some of the finer features of variant and needs help from generated code. For now both are hidden to the rest of the code base behind AMQFrame and MethodBody classes so if/when we decide to settle on just one "variant" type solution we can do so. This commit touches nearly 100 files, mostly converting method signatures with shared_ptr<FooBody> to FooBody* or FooBody&, and converting stored shared_ptr<AMQBody> to AMQFrame and share_ptr<AMQMethodBody> to MethodHolder. There is one outstanding client memory leak, which I will fix in my next commit. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@566822 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/ChannelHandler.cpp44
-rw-r--r--cpp/src/qpid/client/ChannelHandler.h8
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp10
-rw-r--r--cpp/src/qpid/client/Connection.h1
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.cpp65
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.h8
-rw-r--r--cpp/src/qpid/client/Correlator.cpp2
-rw-r--r--cpp/src/qpid/client/Correlator.h4
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp40
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.h4
-rw-r--r--cpp/src/qpid/client/FutureResponse.cpp8
-rw-r--r--cpp/src/qpid/client/FutureResponse.h8
-rw-r--r--cpp/src/qpid/client/ReceivedContent.cpp21
-rw-r--r--cpp/src/qpid/client/ReceivedContent.h28
-rw-r--r--cpp/src/qpid/client/Response.h4
-rw-r--r--cpp/src/qpid/client/SessionCore.cpp4
-rw-r--r--cpp/src/qpid/client/SessionCore.h4
17 files changed, 128 insertions, 135 deletions
diff --git a/cpp/src/qpid/client/ChannelHandler.cpp b/cpp/src/qpid/client/ChannelHandler.cpp
index a6aea438f0..b3d720baf0 100644
--- a/cpp/src/qpid/client/ChannelHandler.cpp
+++ b/cpp/src/qpid/client/ChannelHandler.cpp
@@ -21,6 +21,7 @@
#include "ChannelHandler.h"
#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/all_method_bodies.h"
using namespace qpid::client;
using namespace qpid::framing;
@@ -30,40 +31,39 @@ ChannelHandler::ChannelHandler() : StateManager(CLOSED), id(0) {}
void ChannelHandler::incoming(AMQFrame& frame)
{
- AMQBody::shared_ptr body = frame.getBody();
+ AMQBody* body = frame.getBody();
if (getState() == OPEN) {
- if (isA<ChannelCloseBody>(body)) {
- ChannelCloseBody::shared_ptr method(shared_polymorphic_cast<ChannelCloseBody>(body));
+ ChannelCloseBody* closeBody=
+ dynamic_cast<ChannelCloseBody*>(body->getMethod());
+ if (closeBody) {
setState(CLOSED);
if (onClose) {
- onClose(method->getReplyCode(), method->getReplyText());
+ onClose(closeBody->getReplyCode(), closeBody->getReplyText());
}
} else {
try {
in(frame);
}catch(ChannelException& e){
- if (body->type() == METHOD_BODY) {
- AMQMethodBody::shared_ptr method(shared_polymorphic_cast<AMQMethodBody>(body));
- close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId());
- } else {
+ AMQMethodBody* method=body->getMethod();
+ if (method)
+ close(e.code, e.toString(),
+ method->amqpClassId(), method->amqpMethodId());
+ else
close(e.code, e.toString(), 0, 0);
- }
}
}
} else {
- if (body->type() == METHOD_BODY) {
- handleMethod(shared_polymorphic_cast<AMQMethodBody>(body));
- } else {
+ if (body->getMethod())
+ handleMethod(body->getMethod());
+ else
throw new ConnectionException(504, "Channel not open.");
- }
-
}
}
void ChannelHandler::outgoing(AMQFrame& frame)
{
if (getState() == OPEN) {
- frame.channel = id;
+ frame.setChannel(id);
out(frame);
} else {
throw Exception("Channel not open");
@@ -75,7 +75,7 @@ void ChannelHandler::open(uint16_t _id)
id = _id;
setState(OPENING);
- AMQFrame f(version, id, make_shared_ptr(new ChannelOpenBody(version)));
+ AMQFrame f(version, id, ChannelOpenBody(version));
out(f);
std::set<int> states;
@@ -90,7 +90,7 @@ void ChannelHandler::open(uint16_t _id)
void ChannelHandler::close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId)
{
setState(CLOSING);
- AMQFrame f(version, id, make_shared_ptr(new ChannelCloseBody(version, code, message, classId, methodId)));
+ AMQFrame f(version, id, ChannelCloseBody(version, code, message, classId, methodId));
out(f);
}
@@ -100,24 +100,24 @@ void ChannelHandler::close()
waitFor(CLOSED);
}
-void ChannelHandler::handleMethod(AMQMethodBody::shared_ptr method)
+void ChannelHandler::handleMethod(AMQMethodBody* method)
{
switch (getState()) {
- case OPENING:
+ case OPENING:
if (method->isA<ChannelOpenOkBody>()) {
setState(OPEN);
} else {
throw ConnectionException(504, "Channel not opened.");
}
break;
- case CLOSING:
+ case CLOSING:
if (method->isA<ChannelCloseOkBody>()) {
setState(CLOSED);
} //else just ignore it
break;
- case CLOSED:
+ case CLOSED:
throw ConnectionException(504, "Channel not opened.");
- default:
+ default:
throw Exception("Unexpected state encountered in ChannelHandler!");
}
}
diff --git a/cpp/src/qpid/client/ChannelHandler.h b/cpp/src/qpid/client/ChannelHandler.h
index eaa7e7cc72..556e13a4f8 100644
--- a/cpp/src/qpid/client/ChannelHandler.h
+++ b/cpp/src/qpid/client/ChannelHandler.h
@@ -34,13 +34,7 @@ class ChannelHandler : private StateManager, public ChainableFrameHandler
framing::ProtocolVersion version;
uint16_t id;
- void handleMethod(framing::AMQMethodBody::shared_ptr method);
-
- template <class T> bool isA(framing::AMQBody::shared_ptr body) {
- return body->type() == framing::METHOD_BODY &&
- boost::shared_polymorphic_cast<framing::AMQMethodBody>(body)->isA<T>();
- }
-
+ void handleMethod(framing::AMQMethodBody* method);
void close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId);
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
index 8ffddd0dbf..aa73e83328 100644
--- a/cpp/src/qpid/client/ClientChannel.cpp
+++ b/cpp/src/qpid/client/ClientChannel.cpp
@@ -25,11 +25,11 @@
#include "ClientMessage.h"
#include "qpid/QpidError.h"
#include "Connection.h"
-#include "ConnectionHandler.h"
#include "FutureResponse.h"
#include "MessageListener.h"
#include <boost/format.hpp>
#include <boost/bind.hpp>
+#include "qpid/framing/all_method_bodies.h"
// FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent
// handling of errors that should close the connection or the channel.
@@ -37,7 +37,6 @@
//
using namespace std;
using namespace boost;
-using namespace qpid::client;
using namespace qpid::framing;
using namespace qpid::sys;
@@ -49,13 +48,11 @@ const std::string empty;
class ScopedSync
{
Session& session;
-public:
+ public:
ScopedSync(Session& s, bool enabled = true) : session(s) { session.setSynchronous(enabled); }
~ScopedSync() { session.setSynchronous(false); }
};
-}}
-
Channel::Channel(bool _transactional, u_int16_t _prefetch) :
prefetch(_prefetch), transactional(_transactional), running(false)
{
@@ -250,3 +247,6 @@ void Channel::run() {
}
} catch (const QueueClosed&) {}
}
+
+}}
+
diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h
index e2e83c8caf..e41ab363b5 100644
--- a/cpp/src/qpid/client/Connection.h
+++ b/cpp/src/qpid/client/Connection.h
@@ -27,6 +27,7 @@
#include "ClientChannel.h"
#include "ConnectionImpl.h"
#include "Session.h"
+#include "qpid/framing/AMQP_HighestVersion.h"
namespace qpid {
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp
index f47506d977..66db9384e2 100644
--- a/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -22,6 +22,8 @@
#include "ConnectionHandler.h"
#include "qpid/log/Statement.h"
#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/AMQP_HighestVersion.h"
+#include "qpid/framing/all_method_bodies.h"
using namespace qpid::client;
using namespace qpid::framing;
@@ -53,16 +55,16 @@ void ConnectionHandler::incoming(AMQFrame& frame)
throw Exception("Connection is closed.");
}
- AMQBody::shared_ptr body = frame.getBody();
+ AMQBody* body = frame.getBody();
if (frame.getChannel() == 0) {
- if (body->type() == METHOD_BODY) {
- handle(shared_polymorphic_cast<AMQMethodBody>(body));
+ if (body->getMethod()) {
+ handle(body->getMethod());
} else {
error(503, "Cannot send content on channel zero.");
}
} else {
switch(getState()) {
- case OPEN:
+ case OPEN:
try {
in(frame);
}catch(ConnectionException& e){
@@ -71,10 +73,10 @@ void ConnectionHandler::incoming(AMQFrame& frame)
error(541/*internal error*/, e.what(), body);
}
break;
- case CLOSING:
+ case CLOSING:
QPID_LOG(warning, "Received frame on non-zero channel while closing connection; frame ignored.");
break;
- default:
+ default:
//must be in connection initialisation:
fail("Cannot receive frames on non-zero channel until connection is established.");
}
@@ -101,32 +103,29 @@ void ConnectionHandler::waitForOpen()
void ConnectionHandler::close()
{
setState(CLOSING);
- send(make_shared_ptr(new ConnectionCloseBody(version, 200, OK, 0, 0)));
-
+ send(ConnectionCloseBody(version, 200, OK, 0, 0));
waitFor(CLOSED);
}
-void ConnectionHandler::send(framing::AMQBody::shared_ptr body)
+void ConnectionHandler::send(const framing::AMQBody& body)
{
- AMQFrame f;
- f.setBody(body);
+ AMQFrame f(ProtocolVersion(), 0, body);
out(f);
}
void ConnectionHandler::error(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId)
{
setState(CLOSING);
- send(make_shared_ptr(new ConnectionCloseBody(version, code, message, classId, methodId)));
+ send(ConnectionCloseBody(version, code, message, classId, methodId));
}
-void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody::shared_ptr body)
+void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody* body)
{
- if (body->type() == METHOD_BODY) {
- AMQMethodBody::shared_ptr method(shared_polymorphic_cast<AMQMethodBody>(body));
+ AMQMethodBody* method = body->getMethod();
+ if (method)
error(code, message, method->amqpClassId(), method->amqpMethodId());
- } else {
+ else
error(code, message);
- }
}
@@ -136,54 +135,54 @@ void ConnectionHandler::fail(const std::string& message)
setState(FAILED);
}
-void ConnectionHandler::handle(AMQMethodBody::shared_ptr method)
+void ConnectionHandler::handle(AMQMethodBody* method)
{
switch (getState()) {
- case NOT_STARTED:
+ case NOT_STARTED:
if (method->isA<ConnectionStartBody>()) {
setState(NEGOTIATING);
string response = ((char)0) + uid + ((char)0) + pwd;
- send(make_shared_ptr(new ConnectionStartOkBody(version, properties, mechanism, response, locale)));
+ send(ConnectionStartOkBody(version, properties, mechanism, response, locale));
} else {
fail("Bad method sequence, expected connection-start.");
}
break;
- case NEGOTIATING:
+ case NEGOTIATING:
if (method->isA<ConnectionTuneBody>()) {
- ConnectionTuneBody::shared_ptr proposal(shared_polymorphic_cast<ConnectionTuneBody>(method));
+ ConnectionTuneBody* proposal=polymorphic_downcast<ConnectionTuneBody*>(method);
heartbeat = proposal->getHeartbeat();
maxChannels = proposal->getChannelMax();
- send(make_shared_ptr(new ConnectionTuneOkBody(version, maxChannels, maxFrameSize, heartbeat)));
+ send(ConnectionTuneOkBody(version, maxChannels, maxFrameSize, heartbeat));
setState(OPENING);
- send(make_shared_ptr(new ConnectionOpenBody(version, vhost, capabilities, insist)));
- //TODO: support for further security challenges
- //} else if (method->isA<ConnectionSecureBody>()) {
+ send(ConnectionOpenBody(version, vhost, capabilities, insist));
+ //TODO: support for further security challenges
+ //} else if (method->isA<ConnectionSecureBody>()) {
} else {
fail("Unexpected method sequence, expected connection-tune.");
}
break;
- case OPENING:
+ case OPENING:
if (method->isA<ConnectionOpenOkBody>()) {
setState(OPEN);
- //TODO: support for redirection
- //} else if (method->isA<ConnectionRedirectBody>()) {
+ //TODO: support for redirection
+ //} else if (method->isA<ConnectionRedirectBody>()) {
} else {
fail("Unexpected method sequence, expected connection-open-ok.");
}
break;
- case OPEN:
+ case OPEN:
if (method->isA<ConnectionCloseBody>()) {
- send(make_shared_ptr(new ConnectionCloseOkBody(version)));
+ send(ConnectionCloseOkBody(version));
setState(CLOSED);
if (onError) {
- ConnectionCloseBody::shared_ptr c(shared_polymorphic_cast<ConnectionCloseBody>(method));
+ ConnectionCloseBody* c=polymorphic_downcast<ConnectionCloseBody*>(method);
onError(c->getReplyCode(), c->getReplyText());
}
} else {
error(503, "Unexpected method on channel zero.", method->amqpClassId(), method->amqpMethodId());
}
break;
- case CLOSING:
+ case CLOSING:
if (method->isA<ConnectionCloseOkBody>()) {
if (onClose) {
onClose();
diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h
index 464d0ca26d..d05ae1428b 100644
--- a/cpp/src/qpid/client/ConnectionHandler.h
+++ b/cpp/src/qpid/client/ConnectionHandler.h
@@ -25,6 +25,8 @@
#include "StateManager.h"
#include "ChainableFrameHandler.h"
#include "qpid/framing/InputHandler.h"
+#include "qpid/framing/FieldTable.h"
+#include "qpid/framing/AMQMethodBody.h"
namespace qpid {
namespace client {
@@ -53,10 +55,10 @@ class ConnectionHandler : private StateManager,
enum STATES {NOT_STARTED, NEGOTIATING, OPENING, OPEN, CLOSING, CLOSED, FAILED};
std::set<int> ESTABLISHED;
- void handle(framing::AMQMethodBody::shared_ptr method);
- void send(framing::AMQBody::shared_ptr body);
+ void handle(framing::AMQMethodBody* method);
+ void send(const framing::AMQBody& body);
void error(uint16_t code, const std::string& message, uint16_t classId = 0, uint16_t methodId = 0);
- void error(uint16_t code, const std::string& message, framing::AMQBody::shared_ptr body);
+ void error(uint16_t code, const std::string& message, framing::AMQBody* body);
void fail(const std::string& message);
public:
diff --git a/cpp/src/qpid/client/Correlator.cpp b/cpp/src/qpid/client/Correlator.cpp
index edb16bbcee..9ef6857957 100644
--- a/cpp/src/qpid/client/Correlator.cpp
+++ b/cpp/src/qpid/client/Correlator.cpp
@@ -25,7 +25,7 @@ using qpid::client::Correlator;
using namespace qpid::framing;
using namespace boost;
-void Correlator::receive(AMQMethodBody::shared_ptr response)
+void Correlator::receive(AMQMethodBody* response)
{
if (listeners.empty()) {
throw ConnectionException(503, "Unexpected method!");//TODO: include the method & class name
diff --git a/cpp/src/qpid/client/Correlator.h b/cpp/src/qpid/client/Correlator.h
index 339c9bd0c4..d93e7b66cd 100644
--- a/cpp/src/qpid/client/Correlator.h
+++ b/cpp/src/qpid/client/Correlator.h
@@ -36,9 +36,9 @@ namespace client {
class Correlator
{
public:
- typedef boost::function<void(framing::AMQMethodBody::shared_ptr)> Listener;
+ typedef boost::function<void(framing::AMQMethodBody*)> Listener;
- void receive(framing::AMQMethodBody::shared_ptr);
+ void receive(framing::AMQMethodBody*);
void listen(Listener l);
private:
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
index abfce4f9d1..6ee6429b6b 100644
--- a/cpp/src/qpid/client/ExecutionHandler.cpp
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -23,31 +23,35 @@
#include "qpid/Exception.h"
#include "qpid/framing/BasicDeliverBody.h"
#include "qpid/framing/MessageTransferBody.h"
+#include "qpid/framing/AMQP_HighestVersion.h"
+#include "qpid/framing/all_method_bodies.h"
using namespace qpid::client;
using namespace qpid::framing;
using namespace boost;
-bool isMessageMethod(AMQMethodBody::shared_ptr method)
+bool isMessageMethod(AMQMethodBody* method)
{
return method->isA<BasicDeliverBody>() || method->isA<MessageTransferBody>() || method->isA<BasicGetOkBody>();
}
-bool isMessageMethod(AMQBody::shared_ptr body)
+bool isMessageMethod(AMQBody* body)
{
- return body->type() == METHOD_BODY && isMessageMethod(shared_polymorphic_cast<AMQMethodBody>(body));
+ AMQMethodBody* method=body->getMethod();
+ return method && isMessageMethod(method);
}
bool isContentFrame(AMQFrame& frame)
{
- AMQBody::shared_ptr body = frame.getBody();
+ AMQBody* body = frame.getBody();
uint8_t type = body->type();
return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body);
}
-bool invoke(AMQBody::shared_ptr body, Invocable* target)
+bool invoke(AMQBody* body, Invocable* target)
{
- return body->type() == METHOD_BODY && shared_polymorphic_cast<AMQMethodBody>(body)->invoke(target);
+ AMQMethodBody* method=body->getMethod();
+ return method && method->invoke(target);
}
ExecutionHandler::ExecutionHandler(uint64_t _maxFrameSize) :
@@ -56,7 +60,7 @@ ExecutionHandler::ExecutionHandler(uint64_t _maxFrameSize) :
//incoming:
void ExecutionHandler::handle(AMQFrame& frame)
{
- AMQBody::shared_ptr body = frame.getBody();
+ AMQBody* body = frame.getBody();
if (!invoke(body, this)) {
if (isContentFrame(frame)) {
if (!arriving) {
@@ -69,7 +73,7 @@ void ExecutionHandler::handle(AMQFrame& frame)
}
} else {
++incoming.hwm;
- correlation.receive(shared_polymorphic_cast<AMQMethodBody>(body));
+ correlation.receive(body->getMethod());
}
}
}
@@ -95,16 +99,15 @@ void ExecutionHandler::flush()
{
//send completion
incoming.lwm = incoming.hwm;
- //make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet())));
}
void ExecutionHandler::sendFlush()
{
- AMQFrame frame(version, 0, make_shared_ptr(new ExecutionFlushBody(version)));
+ AMQFrame frame(version, 0, ExecutionFlushBody());
out(frame);
}
-void ExecutionHandler::send(AMQBody::shared_ptr command, CompletionTracker::Listener f, Correlator::Listener g)
+void ExecutionHandler::send(const AMQBody& command, CompletionTracker::Listener f, Correlator::Listener g)
{
//allocate id:
++outgoing.hwm;
@@ -116,18 +119,19 @@ void ExecutionHandler::send(AMQBody::shared_ptr command, CompletionTracker::List
correlation.listen(g);
}
- AMQFrame frame(version, 0/*id will be filled in be channel handler*/, command);
+ AMQFrame frame(version, 0/*id will be filled in be channel handler*/,
+ command);
out(frame);
}
-void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeaderProperties& headers, const std::string& data,
+void ExecutionHandler::sendContent(const AMQBody& command, const BasicHeaderProperties& headers, const std::string& data,
CompletionTracker::Listener f, Correlator::Listener g)
{
send(command, f, g);
- AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC));
- BasicHeaderProperties::copy(*static_cast<BasicHeaderProperties*>(header->getProperties()), headers);
- header->setContentSize(data.size());
+ AMQHeaderBody header(BASIC);
+ BasicHeaderProperties::copy(*static_cast<BasicHeaderProperties*>(header.getProperties()), headers);
+ header.setContentSize(data.size());
AMQFrame h(version, 0, header);
out(h);
@@ -136,7 +140,7 @@ void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeade
//frame itself uses 8 bytes
u_int32_t frag_size = maxFrameSize - 8;
if(data_length < frag_size){
- AMQFrame frame(version, 0, make_shared_ptr(new AMQContentBody(data)));
+ AMQFrame frame(version, 0, AMQContentBody(data));
out(frame);
}else{
u_int32_t offset = 0;
@@ -144,7 +148,7 @@ void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeade
while (remaining > 0) {
u_int32_t length = remaining > frag_size ? frag_size : remaining;
string frag(data.substr(offset, length));
- AMQFrame frame(version, 0, make_shared_ptr(new AMQContentBody(frag)));
+ AMQFrame frame(version, 0, AMQContentBody(frag));
out(frame);
offset += length;
remaining = data_length - offset;
diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h
index f62598ef95..21613df779 100644
--- a/cpp/src/qpid/client/ExecutionHandler.h
+++ b/cpp/src/qpid/client/ExecutionHandler.h
@@ -56,10 +56,10 @@ public:
void setMaxFrameSize(uint64_t size) { maxFrameSize = size; }
void handle(framing::AMQFrame& frame);
- void send(framing::AMQBody::shared_ptr command,
+ void send(const framing::AMQBody& command,
CompletionTracker::Listener f = CompletionTracker::Listener(),
Correlator::Listener g = Correlator::Listener());
- void sendContent(framing::AMQBody::shared_ptr command,
+ void sendContent(const framing::AMQBody& command,
const framing::BasicHeaderProperties& headers, const std::string& data,
CompletionTracker::Listener f = CompletionTracker::Listener(),
Correlator::Listener g = Correlator::Listener());
diff --git a/cpp/src/qpid/client/FutureResponse.cpp b/cpp/src/qpid/client/FutureResponse.cpp
index 6b1246a449..e63dc9c192 100644
--- a/cpp/src/qpid/client/FutureResponse.cpp
+++ b/cpp/src/qpid/client/FutureResponse.cpp
@@ -26,16 +26,16 @@ using namespace qpid::framing;
using namespace qpid::sys;
-AMQMethodBody::shared_ptr FutureResponse::getResponse()
+AMQMethodBody* FutureResponse::getResponse()
{
waitForCompletion();
- return response;
+ return response.get();
}
-void FutureResponse::received(AMQMethodBody::shared_ptr r)
+void FutureResponse::received(AMQMethodBody* r)
{
Monitor::ScopedLock l(lock);
- response = r;
+ response = *r;
complete = true;
lock.notifyAll();
}
diff --git a/cpp/src/qpid/client/FutureResponse.h b/cpp/src/qpid/client/FutureResponse.h
index ccc6fb5894..75b1f72c04 100644
--- a/cpp/src/qpid/client/FutureResponse.h
+++ b/cpp/src/qpid/client/FutureResponse.h
@@ -23,6 +23,7 @@
#define _FutureResponse_
#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/MethodHolder.h"
#include "FutureCompletion.h"
namespace qpid {
@@ -30,11 +31,10 @@ namespace client {
class FutureResponse : public FutureCompletion
{
- framing::AMQMethodBody::shared_ptr response;
-
+ framing::MethodHolder response;
public:
- framing::AMQMethodBody::shared_ptr getResponse();
- void received(framing::AMQMethodBody::shared_ptr response);
+ framing::AMQMethodBody* getResponse();
+ void received(framing::AMQMethodBody* response);
};
}}
diff --git a/cpp/src/qpid/client/ReceivedContent.cpp b/cpp/src/qpid/client/ReceivedContent.cpp
index 9cfee21c3c..5a1f48901a 100644
--- a/cpp/src/qpid/client/ReceivedContent.cpp
+++ b/cpp/src/qpid/client/ReceivedContent.cpp
@@ -20,6 +20,7 @@
*/
#include "ReceivedContent.h"
+#include "qpid/framing/all_method_bodies.h"
using qpid::client::ReceivedContent;
using namespace qpid::framing;
@@ -27,9 +28,9 @@ using namespace boost;
ReceivedContent::ReceivedContent(const SequenceNumber& _id) : id(_id) {}
-void ReceivedContent::append(AMQBody::shared_ptr part)
+void ReceivedContent::append(AMQBody* part)
{
- parts.push_back(part);
+ parts.push_back(AMQFrame(ProtocolVersion(), 0, part));
}
bool ReceivedContent::isComplete() const
@@ -37,7 +38,7 @@ bool ReceivedContent::isComplete() const
if (parts.empty()) {
return false;
} else if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
- AMQHeaderBody::shared_ptr headers(getHeaders());
+ const AMQHeaderBody* headers(getHeaders());
return headers && headers->getContentSize() == getContentSize();
} else if (isA<MessageTransferBody>()) {
//no longer support references, headers and data are still method fields
@@ -48,14 +49,14 @@ bool ReceivedContent::isComplete() const
}
-AMQMethodBody::shared_ptr ReceivedContent::getMethod() const
+const AMQMethodBody* ReceivedContent::getMethod() const
{
- return parts.empty() ? AMQMethodBody::shared_ptr() : dynamic_pointer_cast<AMQMethodBody>(parts[0]);
+ return parts.empty() ? 0 : dynamic_cast<const AMQMethodBody*>(parts[0].getBody());
}
-AMQHeaderBody::shared_ptr ReceivedContent::getHeaders() const
+const AMQHeaderBody* ReceivedContent::getHeaders() const
{
- return parts.size() < 2 ? AMQHeaderBody::shared_ptr() : dynamic_pointer_cast<AMQHeaderBody>(parts[1]);
+ return parts.size() < 2 ? 0 : dynamic_cast<const AMQHeaderBody*>(parts[1].getBody());
}
uint64_t ReceivedContent::getContentSize() const
@@ -63,7 +64,7 @@ uint64_t ReceivedContent::getContentSize() const
if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
uint64_t size(0);
for (uint i = 2; i < parts.size(); i++) {
- size += parts[i]->size();
+ size += parts[i].getBody()->size();
}
return size;
} else if (isA<MessageTransferBody>()) {
@@ -78,7 +79,7 @@ std::string ReceivedContent::getContent() const
if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
string data;
for (uint i = 2; i < parts.size(); i++) {
- data += dynamic_pointer_cast<AMQContentBody>(parts[i])->getData();
+ data += static_cast<const AMQContentBody*>(parts[i].getBody())->getData();
}
return data;
} else if (isA<MessageTransferBody>()) {
@@ -93,7 +94,7 @@ void ReceivedContent::populate(Message& msg)
if (!isComplete()) throw Exception("Incomplete message");
if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
- BasicHeaderProperties* properties = dynamic_cast<BasicHeaderProperties*>(getHeaders()->getProperties());
+ const BasicHeaderProperties* properties = dynamic_cast<const BasicHeaderProperties*>(getHeaders()->getProperties());
BasicHeaderProperties::copy<Message, BasicHeaderProperties>(msg, *properties);
msg.setData(getContent());
} else if (isA<MessageTransferBody>()) {
diff --git a/cpp/src/qpid/client/ReceivedContent.h b/cpp/src/qpid/client/ReceivedContent.h
index 1886034f9b..4f84039c10 100644
--- a/cpp/src/qpid/client/ReceivedContent.h
+++ b/cpp/src/qpid/client/ReceivedContent.h
@@ -20,8 +20,8 @@
*/
#include <string>
#include <vector>
-#include <boost/shared_ptr.hpp>
#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/SequenceNumber.h"
#include "ClientMessage.h"
@@ -38,37 +38,29 @@ namespace client {
class ReceivedContent
{
const framing::SequenceNumber id;
- std::vector<framing::AMQBody::shared_ptr> parts;
+ std::vector<framing::AMQFrame> parts;
public:
typedef boost::shared_ptr<ReceivedContent> shared_ptr;
ReceivedContent(const framing::SequenceNumber& id);
- void append(framing::AMQBody::shared_ptr part);
+ void append(framing::AMQBody* part);
bool isComplete() const;
uint64_t getContentSize() const;
std::string getContent() const;
- framing::AMQMethodBody::shared_ptr getMethod() const;
- framing::AMQHeaderBody::shared_ptr getHeaders() const;
+ const framing::AMQMethodBody* getMethod() const;
+ const framing::AMQHeaderBody* getHeaders() const;
template <class T> bool isA() const {
- framing::AMQMethodBody::shared_ptr method = getMethod();
- if (!method) {
- return false;
- } else {
- return method->isA<T>();
- }
+ const framing::AMQMethodBody* method=getMethod();
+ return method && method->isA<T>();
}
- template <class T> boost::shared_ptr<T> as() const {
- framing::AMQMethodBody::shared_ptr method = getMethod();
- if (method && method->isA<T>()) {
- return boost::dynamic_pointer_cast<T>(method);
- } else {
- return boost::shared_ptr<T>();
- }
+ template <class T> const T* as() const {
+ const framing::AMQMethodBody* method=getMethod();
+ return (method && method->isA<T>()) ? dynamic_cast<const T*>(method) : 0;
}
const framing::SequenceNumber& getId() const { return id; }
diff --git a/cpp/src/qpid/client/Response.h b/cpp/src/qpid/client/Response.h
index 745d4648ad..ad37d7c0f4 100644
--- a/cpp/src/qpid/client/Response.h
+++ b/cpp/src/qpid/client/Response.h
@@ -38,12 +38,12 @@ public:
template <class T> T& as()
{
- framing::AMQMethodBody::shared_ptr response(future->getResponse());
+ framing::AMQMethodBody* response(future->getResponse());
return dynamic_cast<T&>(*response);
}
template <class T> bool isA()
{
- framing::AMQMethodBody::shared_ptr response(future->getResponse());
+ framing::AMQMethodBody* response(future->getResponse());
return response && response->isA<T>();
}
diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp
index 391dcd909d..f7ed7416cd 100644
--- a/cpp/src/qpid/client/SessionCore.cpp
+++ b/cpp/src/qpid/client/SessionCore.cpp
@@ -44,7 +44,7 @@ void SessionCore::flush()
l3.sendFlush();
}
-Response SessionCore::send(AMQMethodBody::shared_ptr method, bool expectResponse)
+Response SessionCore::send(const AMQMethodBody& method, bool expectResponse)
{
boost::shared_ptr<FutureResponse> f(futures.createResponse());
if (expectResponse) {
@@ -59,7 +59,7 @@ Response SessionCore::send(AMQMethodBody::shared_ptr method, bool expectResponse
return Response(f);
}
-Response SessionCore::send(AMQMethodBody::shared_ptr method, const MethodContent& content, bool expectResponse)
+Response SessionCore::send(const AMQMethodBody& method, const MethodContent& content, bool expectResponse)
{
//TODO: lots of duplication between these two send methods; refactor
boost::shared_ptr<FutureResponse> f(futures.createResponse());
diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h
index 15cd36114f..bcbaf0028d 100644
--- a/cpp/src/qpid/client/SessionCore.h
+++ b/cpp/src/qpid/client/SessionCore.h
@@ -47,8 +47,8 @@ public:
typedef boost::shared_ptr<SessionCore> shared_ptr;
SessionCore(uint16_t id, boost::shared_ptr<framing::FrameHandler> out, uint64_t maxFrameSize);
- Response send(framing::AMQMethodBody::shared_ptr method, bool expectResponse = false);
- Response send(framing::AMQMethodBody::shared_ptr method, const framing::MethodContent& content, bool expectResponse = false);
+ Response send(const framing::AMQMethodBody& method, bool expectResponse = false);
+ Response send(const framing::AMQMethodBody& method, const framing::MethodContent& content, bool expectResponse = false);
ReceivedContent::shared_ptr get();
uint16_t getId() const { return id; }
void setSync(bool);