From 49c7a491c98c26fe7d4f017a7ba655dfc029278c Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 16 Aug 2007 20:12:33 +0000 Subject: AMQBodies are no longer allocated on the heap and passed with shared_ptr. AMQFrame contains a boost::variant of AMQHeaderBody,AMQContentBody, AMQHeatbeatBody, and MethodHolder. A variant is basically a type-safe union, it can allocate any of the types in-place. MethodHolder contains a Blob, a less sophisticated kind of variant, which can contain any of the concrete method body types. Using variants for all the method types causes outrageous compile times and bloated library symbol names. Blob lacks some of the finer features of variant and needs help from generated code. For now both are hidden to the rest of the code base behind AMQFrame and MethodBody classes so if/when we decide to settle on just one "variant" type solution we can do so. This commit touches nearly 100 files, mostly converting method signatures with shared_ptr to FooBody* or FooBody&, and converting stored shared_ptr to AMQFrame and share_ptr to MethodHolder. There is one outstanding client memory leak, which I will fix in my next commit. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@566822 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/client/ChannelHandler.cpp | 44 ++++++++++----------- cpp/src/qpid/client/ChannelHandler.h | 8 +--- cpp/src/qpid/client/ClientChannel.cpp | 10 ++--- cpp/src/qpid/client/Connection.h | 1 + cpp/src/qpid/client/ConnectionHandler.cpp | 65 +++++++++++++++---------------- cpp/src/qpid/client/ConnectionHandler.h | 8 ++-- cpp/src/qpid/client/Correlator.cpp | 2 +- cpp/src/qpid/client/Correlator.h | 4 +- cpp/src/qpid/client/ExecutionHandler.cpp | 40 ++++++++++--------- cpp/src/qpid/client/ExecutionHandler.h | 4 +- cpp/src/qpid/client/FutureResponse.cpp | 8 ++-- cpp/src/qpid/client/FutureResponse.h | 8 ++-- cpp/src/qpid/client/ReceivedContent.cpp | 21 +++++----- cpp/src/qpid/client/ReceivedContent.h | 28 +++++-------- cpp/src/qpid/client/Response.h | 4 +- cpp/src/qpid/client/SessionCore.cpp | 4 +- cpp/src/qpid/client/SessionCore.h | 4 +- 17 files changed, 128 insertions(+), 135 deletions(-) (limited to 'cpp/src/qpid/client') diff --git a/cpp/src/qpid/client/ChannelHandler.cpp b/cpp/src/qpid/client/ChannelHandler.cpp index a6aea438f0..b3d720baf0 100644 --- a/cpp/src/qpid/client/ChannelHandler.cpp +++ b/cpp/src/qpid/client/ChannelHandler.cpp @@ -21,6 +21,7 @@ #include "ChannelHandler.h" #include "qpid/framing/amqp_framing.h" +#include "qpid/framing/all_method_bodies.h" using namespace qpid::client; using namespace qpid::framing; @@ -30,40 +31,39 @@ ChannelHandler::ChannelHandler() : StateManager(CLOSED), id(0) {} void ChannelHandler::incoming(AMQFrame& frame) { - AMQBody::shared_ptr body = frame.getBody(); + AMQBody* body = frame.getBody(); if (getState() == OPEN) { - if (isA(body)) { - ChannelCloseBody::shared_ptr method(shared_polymorphic_cast(body)); + ChannelCloseBody* closeBody= + dynamic_cast(body->getMethod()); + if (closeBody) { setState(CLOSED); if (onClose) { - onClose(method->getReplyCode(), method->getReplyText()); + onClose(closeBody->getReplyCode(), closeBody->getReplyText()); } } else { try { in(frame); }catch(ChannelException& e){ - if (body->type() == METHOD_BODY) { - AMQMethodBody::shared_ptr method(shared_polymorphic_cast(body)); - close(e.code, e.toString(), method->amqpClassId(), method->amqpMethodId()); - } else { + AMQMethodBody* method=body->getMethod(); + if (method) + close(e.code, e.toString(), + method->amqpClassId(), method->amqpMethodId()); + else close(e.code, e.toString(), 0, 0); - } } } } else { - if (body->type() == METHOD_BODY) { - handleMethod(shared_polymorphic_cast(body)); - } else { + if (body->getMethod()) + handleMethod(body->getMethod()); + else throw new ConnectionException(504, "Channel not open."); - } - } } void ChannelHandler::outgoing(AMQFrame& frame) { if (getState() == OPEN) { - frame.channel = id; + frame.setChannel(id); out(frame); } else { throw Exception("Channel not open"); @@ -75,7 +75,7 @@ void ChannelHandler::open(uint16_t _id) id = _id; setState(OPENING); - AMQFrame f(version, id, make_shared_ptr(new ChannelOpenBody(version))); + AMQFrame f(version, id, ChannelOpenBody(version)); out(f); std::set states; @@ -90,7 +90,7 @@ void ChannelHandler::open(uint16_t _id) void ChannelHandler::close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId) { setState(CLOSING); - AMQFrame f(version, id, make_shared_ptr(new ChannelCloseBody(version, code, message, classId, methodId))); + AMQFrame f(version, id, ChannelCloseBody(version, code, message, classId, methodId)); out(f); } @@ -100,24 +100,24 @@ void ChannelHandler::close() waitFor(CLOSED); } -void ChannelHandler::handleMethod(AMQMethodBody::shared_ptr method) +void ChannelHandler::handleMethod(AMQMethodBody* method) { switch (getState()) { - case OPENING: + case OPENING: if (method->isA()) { setState(OPEN); } else { throw ConnectionException(504, "Channel not opened."); } break; - case CLOSING: + case CLOSING: if (method->isA()) { setState(CLOSED); } //else just ignore it break; - case CLOSED: + case CLOSED: throw ConnectionException(504, "Channel not opened."); - default: + default: throw Exception("Unexpected state encountered in ChannelHandler!"); } } diff --git a/cpp/src/qpid/client/ChannelHandler.h b/cpp/src/qpid/client/ChannelHandler.h index eaa7e7cc72..556e13a4f8 100644 --- a/cpp/src/qpid/client/ChannelHandler.h +++ b/cpp/src/qpid/client/ChannelHandler.h @@ -34,13 +34,7 @@ class ChannelHandler : private StateManager, public ChainableFrameHandler framing::ProtocolVersion version; uint16_t id; - void handleMethod(framing::AMQMethodBody::shared_ptr method); - - template bool isA(framing::AMQBody::shared_ptr body) { - return body->type() == framing::METHOD_BODY && - boost::shared_polymorphic_cast(body)->isA(); - } - + void handleMethod(framing::AMQMethodBody* method); void close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId); diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp index 8ffddd0dbf..aa73e83328 100644 --- a/cpp/src/qpid/client/ClientChannel.cpp +++ b/cpp/src/qpid/client/ClientChannel.cpp @@ -25,11 +25,11 @@ #include "ClientMessage.h" #include "qpid/QpidError.h" #include "Connection.h" -#include "ConnectionHandler.h" #include "FutureResponse.h" #include "MessageListener.h" #include #include +#include "qpid/framing/all_method_bodies.h" // FIXME aconway 2007-01-26: Evaluate all throws, ensure consistent // handling of errors that should close the connection or the channel. @@ -37,7 +37,6 @@ // using namespace std; using namespace boost; -using namespace qpid::client; using namespace qpid::framing; using namespace qpid::sys; @@ -49,13 +48,11 @@ const std::string empty; class ScopedSync { Session& session; -public: + public: ScopedSync(Session& s, bool enabled = true) : session(s) { session.setSynchronous(enabled); } ~ScopedSync() { session.setSynchronous(false); } }; -}} - Channel::Channel(bool _transactional, u_int16_t _prefetch) : prefetch(_prefetch), transactional(_transactional), running(false) { @@ -250,3 +247,6 @@ void Channel::run() { } } catch (const QueueClosed&) {} } + +}} + diff --git a/cpp/src/qpid/client/Connection.h b/cpp/src/qpid/client/Connection.h index e2e83c8caf..e41ab363b5 100644 --- a/cpp/src/qpid/client/Connection.h +++ b/cpp/src/qpid/client/Connection.h @@ -27,6 +27,7 @@ #include "ClientChannel.h" #include "ConnectionImpl.h" #include "Session.h" +#include "qpid/framing/AMQP_HighestVersion.h" namespace qpid { diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index f47506d977..66db9384e2 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -22,6 +22,8 @@ #include "ConnectionHandler.h" #include "qpid/log/Statement.h" #include "qpid/framing/amqp_framing.h" +#include "qpid/framing/AMQP_HighestVersion.h" +#include "qpid/framing/all_method_bodies.h" using namespace qpid::client; using namespace qpid::framing; @@ -53,16 +55,16 @@ void ConnectionHandler::incoming(AMQFrame& frame) throw Exception("Connection is closed."); } - AMQBody::shared_ptr body = frame.getBody(); + AMQBody* body = frame.getBody(); if (frame.getChannel() == 0) { - if (body->type() == METHOD_BODY) { - handle(shared_polymorphic_cast(body)); + if (body->getMethod()) { + handle(body->getMethod()); } else { error(503, "Cannot send content on channel zero."); } } else { switch(getState()) { - case OPEN: + case OPEN: try { in(frame); }catch(ConnectionException& e){ @@ -71,10 +73,10 @@ void ConnectionHandler::incoming(AMQFrame& frame) error(541/*internal error*/, e.what(), body); } break; - case CLOSING: + case CLOSING: QPID_LOG(warning, "Received frame on non-zero channel while closing connection; frame ignored."); break; - default: + default: //must be in connection initialisation: fail("Cannot receive frames on non-zero channel until connection is established."); } @@ -101,32 +103,29 @@ void ConnectionHandler::waitForOpen() void ConnectionHandler::close() { setState(CLOSING); - send(make_shared_ptr(new ConnectionCloseBody(version, 200, OK, 0, 0))); - + send(ConnectionCloseBody(version, 200, OK, 0, 0)); waitFor(CLOSED); } -void ConnectionHandler::send(framing::AMQBody::shared_ptr body) +void ConnectionHandler::send(const framing::AMQBody& body) { - AMQFrame f; - f.setBody(body); + AMQFrame f(ProtocolVersion(), 0, body); out(f); } void ConnectionHandler::error(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId) { setState(CLOSING); - send(make_shared_ptr(new ConnectionCloseBody(version, code, message, classId, methodId))); + send(ConnectionCloseBody(version, code, message, classId, methodId)); } -void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody::shared_ptr body) +void ConnectionHandler::error(uint16_t code, const std::string& message, AMQBody* body) { - if (body->type() == METHOD_BODY) { - AMQMethodBody::shared_ptr method(shared_polymorphic_cast(body)); + AMQMethodBody* method = body->getMethod(); + if (method) error(code, message, method->amqpClassId(), method->amqpMethodId()); - } else { + else error(code, message); - } } @@ -136,54 +135,54 @@ void ConnectionHandler::fail(const std::string& message) setState(FAILED); } -void ConnectionHandler::handle(AMQMethodBody::shared_ptr method) +void ConnectionHandler::handle(AMQMethodBody* method) { switch (getState()) { - case NOT_STARTED: + case NOT_STARTED: if (method->isA()) { setState(NEGOTIATING); string response = ((char)0) + uid + ((char)0) + pwd; - send(make_shared_ptr(new ConnectionStartOkBody(version, properties, mechanism, response, locale))); + send(ConnectionStartOkBody(version, properties, mechanism, response, locale)); } else { fail("Bad method sequence, expected connection-start."); } break; - case NEGOTIATING: + case NEGOTIATING: if (method->isA()) { - ConnectionTuneBody::shared_ptr proposal(shared_polymorphic_cast(method)); + ConnectionTuneBody* proposal=polymorphic_downcast(method); heartbeat = proposal->getHeartbeat(); maxChannels = proposal->getChannelMax(); - send(make_shared_ptr(new ConnectionTuneOkBody(version, maxChannels, maxFrameSize, heartbeat))); + send(ConnectionTuneOkBody(version, maxChannels, maxFrameSize, heartbeat)); setState(OPENING); - send(make_shared_ptr(new ConnectionOpenBody(version, vhost, capabilities, insist))); - //TODO: support for further security challenges - //} else if (method->isA()) { + send(ConnectionOpenBody(version, vhost, capabilities, insist)); + //TODO: support for further security challenges + //} else if (method->isA()) { } else { fail("Unexpected method sequence, expected connection-tune."); } break; - case OPENING: + case OPENING: if (method->isA()) { setState(OPEN); - //TODO: support for redirection - //} else if (method->isA()) { + //TODO: support for redirection + //} else if (method->isA()) { } else { fail("Unexpected method sequence, expected connection-open-ok."); } break; - case OPEN: + case OPEN: if (method->isA()) { - send(make_shared_ptr(new ConnectionCloseOkBody(version))); + send(ConnectionCloseOkBody(version)); setState(CLOSED); if (onError) { - ConnectionCloseBody::shared_ptr c(shared_polymorphic_cast(method)); + ConnectionCloseBody* c=polymorphic_downcast(method); onError(c->getReplyCode(), c->getReplyText()); } } else { error(503, "Unexpected method on channel zero.", method->amqpClassId(), method->amqpMethodId()); } break; - case CLOSING: + case CLOSING: if (method->isA()) { if (onClose) { onClose(); diff --git a/cpp/src/qpid/client/ConnectionHandler.h b/cpp/src/qpid/client/ConnectionHandler.h index 464d0ca26d..d05ae1428b 100644 --- a/cpp/src/qpid/client/ConnectionHandler.h +++ b/cpp/src/qpid/client/ConnectionHandler.h @@ -25,6 +25,8 @@ #include "StateManager.h" #include "ChainableFrameHandler.h" #include "qpid/framing/InputHandler.h" +#include "qpid/framing/FieldTable.h" +#include "qpid/framing/AMQMethodBody.h" namespace qpid { namespace client { @@ -53,10 +55,10 @@ class ConnectionHandler : private StateManager, enum STATES {NOT_STARTED, NEGOTIATING, OPENING, OPEN, CLOSING, CLOSED, FAILED}; std::set ESTABLISHED; - void handle(framing::AMQMethodBody::shared_ptr method); - void send(framing::AMQBody::shared_ptr body); + void handle(framing::AMQMethodBody* method); + void send(const framing::AMQBody& body); void error(uint16_t code, const std::string& message, uint16_t classId = 0, uint16_t methodId = 0); - void error(uint16_t code, const std::string& message, framing::AMQBody::shared_ptr body); + void error(uint16_t code, const std::string& message, framing::AMQBody* body); void fail(const std::string& message); public: diff --git a/cpp/src/qpid/client/Correlator.cpp b/cpp/src/qpid/client/Correlator.cpp index edb16bbcee..9ef6857957 100644 --- a/cpp/src/qpid/client/Correlator.cpp +++ b/cpp/src/qpid/client/Correlator.cpp @@ -25,7 +25,7 @@ using qpid::client::Correlator; using namespace qpid::framing; using namespace boost; -void Correlator::receive(AMQMethodBody::shared_ptr response) +void Correlator::receive(AMQMethodBody* response) { if (listeners.empty()) { throw ConnectionException(503, "Unexpected method!");//TODO: include the method & class name diff --git a/cpp/src/qpid/client/Correlator.h b/cpp/src/qpid/client/Correlator.h index 339c9bd0c4..d93e7b66cd 100644 --- a/cpp/src/qpid/client/Correlator.h +++ b/cpp/src/qpid/client/Correlator.h @@ -36,9 +36,9 @@ namespace client { class Correlator { public: - typedef boost::function Listener; + typedef boost::function Listener; - void receive(framing::AMQMethodBody::shared_ptr); + void receive(framing::AMQMethodBody*); void listen(Listener l); private: diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp index abfce4f9d1..6ee6429b6b 100644 --- a/cpp/src/qpid/client/ExecutionHandler.cpp +++ b/cpp/src/qpid/client/ExecutionHandler.cpp @@ -23,31 +23,35 @@ #include "qpid/Exception.h" #include "qpid/framing/BasicDeliverBody.h" #include "qpid/framing/MessageTransferBody.h" +#include "qpid/framing/AMQP_HighestVersion.h" +#include "qpid/framing/all_method_bodies.h" using namespace qpid::client; using namespace qpid::framing; using namespace boost; -bool isMessageMethod(AMQMethodBody::shared_ptr method) +bool isMessageMethod(AMQMethodBody* method) { return method->isA() || method->isA() || method->isA(); } -bool isMessageMethod(AMQBody::shared_ptr body) +bool isMessageMethod(AMQBody* body) { - return body->type() == METHOD_BODY && isMessageMethod(shared_polymorphic_cast(body)); + AMQMethodBody* method=body->getMethod(); + return method && isMessageMethod(method); } bool isContentFrame(AMQFrame& frame) { - AMQBody::shared_ptr body = frame.getBody(); + AMQBody* body = frame.getBody(); uint8_t type = body->type(); return type == HEADER_BODY || type == CONTENT_BODY || isMessageMethod(body); } -bool invoke(AMQBody::shared_ptr body, Invocable* target) +bool invoke(AMQBody* body, Invocable* target) { - return body->type() == METHOD_BODY && shared_polymorphic_cast(body)->invoke(target); + AMQMethodBody* method=body->getMethod(); + return method && method->invoke(target); } ExecutionHandler::ExecutionHandler(uint64_t _maxFrameSize) : @@ -56,7 +60,7 @@ ExecutionHandler::ExecutionHandler(uint64_t _maxFrameSize) : //incoming: void ExecutionHandler::handle(AMQFrame& frame) { - AMQBody::shared_ptr body = frame.getBody(); + AMQBody* body = frame.getBody(); if (!invoke(body, this)) { if (isContentFrame(frame)) { if (!arriving) { @@ -69,7 +73,7 @@ void ExecutionHandler::handle(AMQFrame& frame) } } else { ++incoming.hwm; - correlation.receive(shared_polymorphic_cast(body)); + correlation.receive(body->getMethod()); } } } @@ -95,16 +99,15 @@ void ExecutionHandler::flush() { //send completion incoming.lwm = incoming.hwm; - //make_shared_ptr(new ExecutionCompleteBody(getVersion(), incoming.hwm.getValue(), SequenceNumberSet()))); } void ExecutionHandler::sendFlush() { - AMQFrame frame(version, 0, make_shared_ptr(new ExecutionFlushBody(version))); + AMQFrame frame(version, 0, ExecutionFlushBody()); out(frame); } -void ExecutionHandler::send(AMQBody::shared_ptr command, CompletionTracker::Listener f, Correlator::Listener g) +void ExecutionHandler::send(const AMQBody& command, CompletionTracker::Listener f, Correlator::Listener g) { //allocate id: ++outgoing.hwm; @@ -116,18 +119,19 @@ void ExecutionHandler::send(AMQBody::shared_ptr command, CompletionTracker::List correlation.listen(g); } - AMQFrame frame(version, 0/*id will be filled in be channel handler*/, command); + AMQFrame frame(version, 0/*id will be filled in be channel handler*/, + command); out(frame); } -void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeaderProperties& headers, const std::string& data, +void ExecutionHandler::sendContent(const AMQBody& command, const BasicHeaderProperties& headers, const std::string& data, CompletionTracker::Listener f, Correlator::Listener g) { send(command, f, g); - AMQHeaderBody::shared_ptr header(new AMQHeaderBody(BASIC)); - BasicHeaderProperties::copy(*static_cast(header->getProperties()), headers); - header->setContentSize(data.size()); + AMQHeaderBody header(BASIC); + BasicHeaderProperties::copy(*static_cast(header.getProperties()), headers); + header.setContentSize(data.size()); AMQFrame h(version, 0, header); out(h); @@ -136,7 +140,7 @@ void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeade //frame itself uses 8 bytes u_int32_t frag_size = maxFrameSize - 8; if(data_length < frag_size){ - AMQFrame frame(version, 0, make_shared_ptr(new AMQContentBody(data))); + AMQFrame frame(version, 0, AMQContentBody(data)); out(frame); }else{ u_int32_t offset = 0; @@ -144,7 +148,7 @@ void ExecutionHandler::sendContent(AMQBody::shared_ptr command, const BasicHeade while (remaining > 0) { u_int32_t length = remaining > frag_size ? frag_size : remaining; string frag(data.substr(offset, length)); - AMQFrame frame(version, 0, make_shared_ptr(new AMQContentBody(frag))); + AMQFrame frame(version, 0, AMQContentBody(frag)); out(frame); offset += length; remaining = data_length - offset; diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h index f62598ef95..21613df779 100644 --- a/cpp/src/qpid/client/ExecutionHandler.h +++ b/cpp/src/qpid/client/ExecutionHandler.h @@ -56,10 +56,10 @@ public: void setMaxFrameSize(uint64_t size) { maxFrameSize = size; } void handle(framing::AMQFrame& frame); - void send(framing::AMQBody::shared_ptr command, + void send(const framing::AMQBody& command, CompletionTracker::Listener f = CompletionTracker::Listener(), Correlator::Listener g = Correlator::Listener()); - void sendContent(framing::AMQBody::shared_ptr command, + void sendContent(const framing::AMQBody& command, const framing::BasicHeaderProperties& headers, const std::string& data, CompletionTracker::Listener f = CompletionTracker::Listener(), Correlator::Listener g = Correlator::Listener()); diff --git a/cpp/src/qpid/client/FutureResponse.cpp b/cpp/src/qpid/client/FutureResponse.cpp index 6b1246a449..e63dc9c192 100644 --- a/cpp/src/qpid/client/FutureResponse.cpp +++ b/cpp/src/qpid/client/FutureResponse.cpp @@ -26,16 +26,16 @@ using namespace qpid::framing; using namespace qpid::sys; -AMQMethodBody::shared_ptr FutureResponse::getResponse() +AMQMethodBody* FutureResponse::getResponse() { waitForCompletion(); - return response; + return response.get(); } -void FutureResponse::received(AMQMethodBody::shared_ptr r) +void FutureResponse::received(AMQMethodBody* r) { Monitor::ScopedLock l(lock); - response = r; + response = *r; complete = true; lock.notifyAll(); } diff --git a/cpp/src/qpid/client/FutureResponse.h b/cpp/src/qpid/client/FutureResponse.h index ccc6fb5894..75b1f72c04 100644 --- a/cpp/src/qpid/client/FutureResponse.h +++ b/cpp/src/qpid/client/FutureResponse.h @@ -23,6 +23,7 @@ #define _FutureResponse_ #include "qpid/framing/amqp_framing.h" +#include "qpid/framing/MethodHolder.h" #include "FutureCompletion.h" namespace qpid { @@ -30,11 +31,10 @@ namespace client { class FutureResponse : public FutureCompletion { - framing::AMQMethodBody::shared_ptr response; - + framing::MethodHolder response; public: - framing::AMQMethodBody::shared_ptr getResponse(); - void received(framing::AMQMethodBody::shared_ptr response); + framing::AMQMethodBody* getResponse(); + void received(framing::AMQMethodBody* response); }; }} diff --git a/cpp/src/qpid/client/ReceivedContent.cpp b/cpp/src/qpid/client/ReceivedContent.cpp index 9cfee21c3c..5a1f48901a 100644 --- a/cpp/src/qpid/client/ReceivedContent.cpp +++ b/cpp/src/qpid/client/ReceivedContent.cpp @@ -20,6 +20,7 @@ */ #include "ReceivedContent.h" +#include "qpid/framing/all_method_bodies.h" using qpid::client::ReceivedContent; using namespace qpid::framing; @@ -27,9 +28,9 @@ using namespace boost; ReceivedContent::ReceivedContent(const SequenceNumber& _id) : id(_id) {} -void ReceivedContent::append(AMQBody::shared_ptr part) +void ReceivedContent::append(AMQBody* part) { - parts.push_back(part); + parts.push_back(AMQFrame(ProtocolVersion(), 0, part)); } bool ReceivedContent::isComplete() const @@ -37,7 +38,7 @@ bool ReceivedContent::isComplete() const if (parts.empty()) { return false; } else if (isA() || isA()) { - AMQHeaderBody::shared_ptr headers(getHeaders()); + const AMQHeaderBody* headers(getHeaders()); return headers && headers->getContentSize() == getContentSize(); } else if (isA()) { //no longer support references, headers and data are still method fields @@ -48,14 +49,14 @@ bool ReceivedContent::isComplete() const } -AMQMethodBody::shared_ptr ReceivedContent::getMethod() const +const AMQMethodBody* ReceivedContent::getMethod() const { - return parts.empty() ? AMQMethodBody::shared_ptr() : dynamic_pointer_cast(parts[0]); + return parts.empty() ? 0 : dynamic_cast(parts[0].getBody()); } -AMQHeaderBody::shared_ptr ReceivedContent::getHeaders() const +const AMQHeaderBody* ReceivedContent::getHeaders() const { - return parts.size() < 2 ? AMQHeaderBody::shared_ptr() : dynamic_pointer_cast(parts[1]); + return parts.size() < 2 ? 0 : dynamic_cast(parts[1].getBody()); } uint64_t ReceivedContent::getContentSize() const @@ -63,7 +64,7 @@ uint64_t ReceivedContent::getContentSize() const if (isA() || isA()) { uint64_t size(0); for (uint i = 2; i < parts.size(); i++) { - size += parts[i]->size(); + size += parts[i].getBody()->size(); } return size; } else if (isA()) { @@ -78,7 +79,7 @@ std::string ReceivedContent::getContent() const if (isA() || isA()) { string data; for (uint i = 2; i < parts.size(); i++) { - data += dynamic_pointer_cast(parts[i])->getData(); + data += static_cast(parts[i].getBody())->getData(); } return data; } else if (isA()) { @@ -93,7 +94,7 @@ void ReceivedContent::populate(Message& msg) if (!isComplete()) throw Exception("Incomplete message"); if (isA() || isA()) { - BasicHeaderProperties* properties = dynamic_cast(getHeaders()->getProperties()); + const BasicHeaderProperties* properties = dynamic_cast(getHeaders()->getProperties()); BasicHeaderProperties::copy(msg, *properties); msg.setData(getContent()); } else if (isA()) { diff --git a/cpp/src/qpid/client/ReceivedContent.h b/cpp/src/qpid/client/ReceivedContent.h index 1886034f9b..4f84039c10 100644 --- a/cpp/src/qpid/client/ReceivedContent.h +++ b/cpp/src/qpid/client/ReceivedContent.h @@ -20,8 +20,8 @@ */ #include #include -#include #include "qpid/framing/amqp_framing.h" +#include "qpid/framing/AMQFrame.h" #include "qpid/framing/SequenceNumber.h" #include "ClientMessage.h" @@ -38,37 +38,29 @@ namespace client { class ReceivedContent { const framing::SequenceNumber id; - std::vector parts; + std::vector parts; public: typedef boost::shared_ptr shared_ptr; ReceivedContent(const framing::SequenceNumber& id); - void append(framing::AMQBody::shared_ptr part); + void append(framing::AMQBody* part); bool isComplete() const; uint64_t getContentSize() const; std::string getContent() const; - framing::AMQMethodBody::shared_ptr getMethod() const; - framing::AMQHeaderBody::shared_ptr getHeaders() const; + const framing::AMQMethodBody* getMethod() const; + const framing::AMQHeaderBody* getHeaders() const; template bool isA() const { - framing::AMQMethodBody::shared_ptr method = getMethod(); - if (!method) { - return false; - } else { - return method->isA(); - } + const framing::AMQMethodBody* method=getMethod(); + return method && method->isA(); } - template boost::shared_ptr as() const { - framing::AMQMethodBody::shared_ptr method = getMethod(); - if (method && method->isA()) { - return boost::dynamic_pointer_cast(method); - } else { - return boost::shared_ptr(); - } + template const T* as() const { + const framing::AMQMethodBody* method=getMethod(); + return (method && method->isA()) ? dynamic_cast(method) : 0; } const framing::SequenceNumber& getId() const { return id; } diff --git a/cpp/src/qpid/client/Response.h b/cpp/src/qpid/client/Response.h index 745d4648ad..ad37d7c0f4 100644 --- a/cpp/src/qpid/client/Response.h +++ b/cpp/src/qpid/client/Response.h @@ -38,12 +38,12 @@ public: template T& as() { - framing::AMQMethodBody::shared_ptr response(future->getResponse()); + framing::AMQMethodBody* response(future->getResponse()); return dynamic_cast(*response); } template bool isA() { - framing::AMQMethodBody::shared_ptr response(future->getResponse()); + framing::AMQMethodBody* response(future->getResponse()); return response && response->isA(); } diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp index 391dcd909d..f7ed7416cd 100644 --- a/cpp/src/qpid/client/SessionCore.cpp +++ b/cpp/src/qpid/client/SessionCore.cpp @@ -44,7 +44,7 @@ void SessionCore::flush() l3.sendFlush(); } -Response SessionCore::send(AMQMethodBody::shared_ptr method, bool expectResponse) +Response SessionCore::send(const AMQMethodBody& method, bool expectResponse) { boost::shared_ptr f(futures.createResponse()); if (expectResponse) { @@ -59,7 +59,7 @@ Response SessionCore::send(AMQMethodBody::shared_ptr method, bool expectResponse return Response(f); } -Response SessionCore::send(AMQMethodBody::shared_ptr method, const MethodContent& content, bool expectResponse) +Response SessionCore::send(const AMQMethodBody& method, const MethodContent& content, bool expectResponse) { //TODO: lots of duplication between these two send methods; refactor boost::shared_ptr f(futures.createResponse()); diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h index 15cd36114f..bcbaf0028d 100644 --- a/cpp/src/qpid/client/SessionCore.h +++ b/cpp/src/qpid/client/SessionCore.h @@ -47,8 +47,8 @@ public: typedef boost::shared_ptr shared_ptr; SessionCore(uint16_t id, boost::shared_ptr out, uint64_t maxFrameSize); - Response send(framing::AMQMethodBody::shared_ptr method, bool expectResponse = false); - Response send(framing::AMQMethodBody::shared_ptr method, const framing::MethodContent& content, bool expectResponse = false); + Response send(const framing::AMQMethodBody& method, bool expectResponse = false); + Response send(const framing::AMQMethodBody& method, const framing::MethodContent& content, bool expectResponse = false); ReceivedContent::shared_ptr get(); uint16_t getId() const { return id; } void setSync(bool); -- cgit v1.2.1