diff options
| author | Alan Conway <aconway@apache.org> | 2007-08-16 20:12:33 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2007-08-16 20:12:33 +0000 |
| commit | 49c7a491c98c26fe7d4f017a7ba655dfc029278c (patch) | |
| tree | 304d51ba039a5391b4ebde08caab3da978b465fb /cpp/src/qpid/framing | |
| parent | dc13ca80ff893f74ab57fee6543de6543aa366bc (diff) | |
| download | qpid-python-49c7a491c98c26fe7d4f017a7ba655dfc029278c.tar.gz | |
AMQBodies are no longer allocated on the heap and passed with shared_ptr.
AMQFrame contains a boost::variant of AMQHeaderBody,AMQContentBody,
AMQHeatbeatBody, and MethodHolder. A variant is basically a type-safe
union, it can allocate any of the types in-place.
MethodHolder contains a Blob, a less sophisticated kind of variant,
which can contain any of the concrete method body types.
Using variants for all the method types causes outrageous compile
times and bloated library symbol names. Blob lacks some of the finer
features of variant and needs help from generated code. For now both
are hidden to the rest of the code base behind AMQFrame and MethodBody
classes so if/when we decide to settle on just one "variant" type
solution we can do so.
This commit touches nearly 100 files, mostly converting method
signatures with shared_ptr<FooBody> to FooBody* or FooBody&, and
converting stored shared_ptr<AMQBody> to AMQFrame and
share_ptr<AMQMethodBody> to MethodHolder.
There is one outstanding client memory leak, which I will fix in my next commit.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@566822 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/framing')
24 files changed, 348 insertions, 495 deletions
diff --git a/cpp/src/qpid/framing/AMQBody.h b/cpp/src/qpid/framing/AMQBody.h index eaa568c06a..0b2718bef3 100644 --- a/cpp/src/qpid/framing/AMQBody.h +++ b/cpp/src/qpid/framing/AMQBody.h @@ -1,3 +1,6 @@ +#ifndef QPID_FRAMING_AMQBODY_H +#define QPID_FRAMING_AMQBODY_H + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -18,42 +21,58 @@ * under the License. * */ -#include <boost/shared_ptr.hpp> -#include "amqp_types.h" -#include "Buffer.h" +#include "qpid/framing/amqp_types.h" +#include "qpid/shared_ptr.h" -#ifndef _AMQBody_ -#define _AMQBody_ +#include <ostream> namespace qpid { - namespace framing { +namespace framing { + +class Buffer; + +class AMQMethodBody; +class AMQHeaderBody; +class AMQContentBody; +class AMQHeartbeatBody; + +struct AMQBodyConstVisitor { + virtual ~AMQBodyConstVisitor() {} + virtual void visit(const AMQHeaderBody&) = 0; + virtual void visit(const AMQContentBody&) = 0; + virtual void visit(const AMQHeartbeatBody&) = 0; + virtual void visit(const AMQMethodBody&) = 0; +}; + +class AMQBody +{ + public: + typedef shared_ptr<AMQBody> shared_ptr; + + virtual ~AMQBody(); + + virtual uint8_t type() const = 0; - class AMQBody - { - public: - typedef boost::shared_ptr<AMQBody> shared_ptr; + virtual void encode(Buffer& buffer) const = 0; + virtual void decode(Buffer& buffer, uint32_t=0) = 0; + virtual uint32_t size() const = 0; - virtual ~AMQBody(); - virtual uint32_t size() const = 0; - virtual uint8_t type() const = 0; - virtual void encode(Buffer& buffer) const = 0; - virtual void decode(Buffer& buffer, uint32_t size) = 0; + virtual void print(std::ostream& out) const = 0; + virtual void accept(AMQBodyConstVisitor&) const = 0; - virtual void print(std::ostream& out) const = 0; - }; + virtual AMQMethodBody* getMethod() { return 0; } + virtual const AMQMethodBody* getMethod() const { return 0; } +}; - std::ostream& operator<<(std::ostream& out, const AMQBody& body) ; +std::ostream& operator<<(std::ostream& out, const AMQBody& body) ; - enum BodyTypes { - METHOD_BODY = 1, - HEADER_BODY = 2, - CONTENT_BODY = 3, - HEARTBEAT_BODY = 8, - REQUEST_BODY = 9, - RESPONSE_BODY = 10 - }; - } -} +enum BodyTypes { + METHOD_BODY = 1, + HEADER_BODY = 2, + CONTENT_BODY = 3, + HEARTBEAT_BODY = 8 +}; +}} // namespace qpid::framing -#endif +#endif /*!QPID_FRAMING_AMQBODY_H*/ diff --git a/cpp/src/qpid/framing/AMQContentBody.cpp b/cpp/src/qpid/framing/AMQContentBody.cpp index 19d2e5714a..c472af555d 100644 --- a/cpp/src/qpid/framing/AMQContentBody.cpp +++ b/cpp/src/qpid/framing/AMQContentBody.cpp @@ -12,7 +12,7 @@ * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +n * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. diff --git a/cpp/src/qpid/framing/AMQContentBody.h b/cpp/src/qpid/framing/AMQContentBody.h index 6d4c1ef4b6..a476796015 100644 --- a/cpp/src/qpid/framing/AMQContentBody.h +++ b/cpp/src/qpid/framing/AMQContentBody.h @@ -28,13 +28,11 @@ namespace qpid { namespace framing { -class AMQContentBody : public AMQBody +class AMQContentBody : public AMQBody { string data; public: - typedef boost::shared_ptr<AMQContentBody> shared_ptr; - AMQContentBody(); AMQContentBody(const string& data); inline virtual ~AMQContentBody(){} @@ -44,6 +42,7 @@ public: void encode(Buffer& buffer) const; void decode(Buffer& buffer, uint32_t size); void print(std::ostream& out) const; + void accept(AMQBodyConstVisitor& v) const { v.visit(*this); } }; } diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp index f79eae3524..780af71be4 100644 --- a/cpp/src/qpid/framing/AMQFrame.cpp +++ b/cpp/src/qpid/framing/AMQFrame.cpp @@ -1,4 +1,3 @@ - /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,46 +18,70 @@ * under the License. * */ -#include <boost/format.hpp> - #include "AMQFrame.h" + #include "qpid/QpidError.h" -#include "AMQMethodBody.h" +#include "qpid/framing/variant.h" +#include "qpid/framing/AMQMethodBody.h" +#include <boost/format.hpp> + +#include <iostream> namespace qpid { namespace framing { +namespace { +struct GetBodyVisitor : public NoBlankVisitor<AMQBody*> { + QPID_USING_NOBLANK(AMQBody*); + AMQBody* operator()(MethodHolder& t) const { return t.get(); } + template <class T> AMQBody* operator()(T& t) const { return &t; } +}; + +struct EncodeVisitor : public NoBlankVisitor<void> { + Buffer& buffer; + EncodeVisitor(Buffer& b) : buffer(b) {} + + QPID_USING_NOBLANK(void); + template <class T> void operator()(const T& t) const { return t.encode(buffer); } +}; + +struct SizeVisitor : public NoBlankVisitor<uint32_t> { + QPID_USING_NOBLANK(uint32_t); + template <class T> uint32_t operator()(const T& t) const { return t.size(); } +}; + +struct DecodeVisitor : public NoBlankVisitor<void> { + Buffer& buffer; + uint32_t size; + DecodeVisitor(Buffer& b, uint32_t s) : buffer(b), size(s) {} + QPID_USING_NOBLANK(void); + void operator()(MethodHolder& t) const { return t.decode(buffer); } + template <class T> void operator()(T& t) const { return t.decode(buffer, size); } +}; -AMQP_MethodVersionMap AMQFrame::versionMap; - -AMQFrame::AMQFrame(ProtocolVersion _version) - : channel(0), type(0), version(_version) - { - assert(version != ProtocolVersion(0,0)); - } - -AMQFrame::AMQFrame(ProtocolVersion _version, uint16_t _channel, AMQBody* _body) : channel(_channel), body(_body),version(_version) {} +} -AMQFrame::AMQFrame(ProtocolVersion _version, uint16_t _channel, const AMQBody::shared_ptr& _body) : - channel(_channel), body(_body), version(_version) -{} +AMQBody* AMQFrame::getBody() { + return boost::apply_visitor(GetBodyVisitor(), body); +} -AMQFrame::~AMQFrame() {} +const AMQBody* AMQFrame::getBody() const { + return boost::apply_visitor(GetBodyVisitor(), const_cast<Variant&>(body)); +} void AMQFrame::encode(Buffer& buffer) { - buffer.putOctet(body->type()); + buffer.putOctet(getBody()->type()); buffer.putShort(channel); - buffer.putLong(body->size()); - body->encode(buffer); + buffer.putLong(boost::apply_visitor(SizeVisitor(), body)); + boost::apply_visitor(EncodeVisitor(buffer), body); buffer.putOctet(0xCE); } uint32_t AMQFrame::size() const{ - assert(body.get()); - return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + body->size() - + 1/*0xCE*/; + return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + + boost::apply_visitor(SizeVisitor(), body) + 1/*0xCE*/; } bool AMQFrame::decode(Buffer& buffer) @@ -66,56 +89,42 @@ bool AMQFrame::decode(Buffer& buffer) if(buffer.available() < 7) return false; buffer.record(); - uint32_t frameSize = decodeHead(buffer); - if(buffer.available() < frameSize + 1){ + + uint8_t type = buffer.getOctet(); + channel = buffer.getShort(); + uint32_t size = buffer.getLong(); + + if(buffer.available() < size+1){ buffer.restore(); return false; } - decodeBody(buffer, frameSize); + decodeBody(buffer, size, type); uint8_t end = buffer.getOctet(); if(end != 0xCE) THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found"); return true; } -uint32_t AMQFrame::decodeHead(Buffer& buffer){ - type = buffer.getOctet(); - channel = buffer.getShort(); - return buffer.getLong(); -} - -void AMQFrame::decodeBody(Buffer& buffer, uint32_t size) +void AMQFrame::decodeBody(Buffer& buffer, uint32_t size, uint8_t type) { switch(type) { - case METHOD_BODY: - body = AMQMethodBody::create(versionMap, version, buffer); - break; - case HEADER_BODY: - body = AMQBody::shared_ptr(new AMQHeaderBody()); - break; - case CONTENT_BODY: - body = AMQBody::shared_ptr(new AMQContentBody()); - break; - case HEARTBEAT_BODY: - body = AMQBody::shared_ptr(new AMQHeartbeatBody()); - break; + case METHOD_BODY: body = MethodHolder(); break; + case HEADER_BODY: body = AMQHeaderBody(); break; + case CONTENT_BODY: body = AMQContentBody(); break; + case HEARTBEAT_BODY: body = AMQHeartbeatBody(); break; + default: THROW_QPID_ERROR( FRAMING_ERROR, boost::format("Unknown frame type %d") % type); } - body->decode(buffer, size); + boost::apply_visitor(DecodeVisitor(buffer,size), body); } -std::ostream& operator<<(std::ostream& out, const AMQFrame& t) +std::ostream& operator<<(std::ostream& out, const AMQFrame& f) { - out << "Frame[channel=" << t.channel << "; "; - if (t.body.get() == 0) - out << "empty"; - else - out << *t.body; - out << "]"; - return out; + return out << "Frame[channel=" << f.getChannel() << "; " << *f.getBody() + << "]"; } diff --git a/cpp/src/qpid/framing/AMQFrame.h b/cpp/src/qpid/framing/AMQFrame.h index 16c1427802..9e825a9936 100644 --- a/cpp/src/qpid/framing/AMQFrame.h +++ b/cpp/src/qpid/framing/AMQFrame.h @@ -21,55 +21,76 @@ * under the License. * */ -#include <boost/cast.hpp> - -#include "amqp_types.h" -#include "AMQBody.h" #include "AMQDataBlock.h" -#include "AMQMethodBody.h" #include "AMQHeaderBody.h" #include "AMQContentBody.h" #include "AMQHeartbeatBody.h" -#include "qpid/framing/AMQP_MethodVersionMap.h" -#include "qpid/framing/AMQP_HighestVersion.h" -#include "qpid/framing/Buffer.h" -#include "qpid/shared_ptr.h" +#include "MethodHolder.h" +#include "ProtocolVersion.h" + +#include <boost/cast.hpp> +#include <boost/variant.hpp> namespace qpid { namespace framing { - class AMQFrame : public AMQDataBlock { public: - AMQFrame(ProtocolVersion _version = highestProtocolVersion); - AMQFrame(ProtocolVersion _version, uint16_t channel, AMQBody* body); - AMQFrame(ProtocolVersion _version, uint16_t channel, const AMQBody::shared_ptr& body); - virtual ~AMQFrame(); - virtual void encode(Buffer& buffer); - virtual bool decode(Buffer& buffer); - virtual uint32_t size() const; - uint16_t getChannel() const { return channel; } - - shared_ptr<AMQBody> getBody() { return body; } - void setBody(const shared_ptr<AMQBody>& b) { body = b; } - - /** Convenience template to cast the body to an expected type */ - template <class T> boost::shared_ptr<T> castBody() { - assert(dynamic_cast<T*>(getBody().get())); - boost::static_pointer_cast<T>(getBody()); + AMQFrame(ProtocolVersion=ProtocolVersion()) {} + + /** Construct a frame with a copy of b */ + AMQFrame(ProtocolVersion, ChannelId c, const AMQBody* b) : channel(c) { + setBody(*b); + } + + AMQFrame(ProtocolVersion, ChannelId c, const AMQBody& b) : channel(c) { + setBody(b); } + + ChannelId getChannel() const { return channel; } + void setChannel(ChannelId c) { channel = c; } - uint32_t decodeHead(Buffer& buffer); - void decodeBody(Buffer& buffer, uint32_t size); + AMQBody* getBody(); + const AMQBody* getBody() const; - uint16_t channel; - uint8_t type; - AMQBody::shared_ptr body; - ProtocolVersion version; + /** Copy a body instance to the frame */ + void setBody(const AMQBody& b) { CopyVisitor cv(*this); b.accept(cv); } + + /** Convenience template to cast the body to an expected type. */ + template <class T> T* castBody() { + boost::polymorphic_downcast<T*>(getBody()); + } + + bool empty() { return boost::get<boost::blank>(&body); } + + void encode(Buffer& buffer); + bool decode(Buffer& buffer); + uint32_t size() const; private: - static AMQP_MethodVersionMap versionMap; + struct CopyVisitor : public AMQBodyConstVisitor { + AMQFrame& frame; + CopyVisitor(AMQFrame& f) : frame(f) {} + void visit(const AMQHeaderBody& x) { frame.body=x; } + void visit(const AMQContentBody& x) { frame.body=x; } + void visit(const AMQHeartbeatBody& x) { frame.body=x; } + void visit(const AMQMethodBody& x) { frame.body=MethodHolder(x); } + }; + friend struct CopyVisitor; + + typedef boost::variant<boost::blank, + AMQHeaderBody, + AMQContentBody, + AMQHeartbeatBody, + MethodHolder> Variant; + + void visit(AMQHeaderBody& x) { body=x; } + + void decodeBody(Buffer& buffer, uint32_t size, uint8_t type); + + uint16_t channel; + Variant body; }; std::ostream& operator<<(std::ostream&, const AMQFrame&); diff --git a/cpp/src/qpid/framing/AMQHeaderBody.cpp b/cpp/src/qpid/framing/AMQHeaderBody.cpp index 4d3611eb40..6a3c8f27d1 100644 --- a/cpp/src/qpid/framing/AMQHeaderBody.cpp +++ b/cpp/src/qpid/framing/AMQHeaderBody.cpp @@ -22,54 +22,34 @@ #include "qpid/QpidError.h" #include "BasicHeaderProperties.h" -qpid::framing::AMQHeaderBody::AMQHeaderBody(int classId) : weight(0), contentSize(0){ - createProperties(classId); -} +qpid::framing::AMQHeaderBody::AMQHeaderBody(int) : weight(0), contentSize(0) {} -qpid::framing::AMQHeaderBody::AMQHeaderBody() : properties(0), weight(0), contentSize(0){ -} +qpid::framing::AMQHeaderBody::AMQHeaderBody() : weight(0), contentSize(0){} -qpid::framing::AMQHeaderBody::~AMQHeaderBody(){ - delete properties; -} +qpid::framing::AMQHeaderBody::~AMQHeaderBody(){} uint32_t qpid::framing::AMQHeaderBody::size() const{ - return 12 + properties->size(); + return 12 + properties.size(); } void qpid::framing::AMQHeaderBody::encode(Buffer& buffer) const { - buffer.putShort(properties->classId()); + buffer.putShort(properties.classId()); buffer.putShort(weight); buffer.putLongLong(contentSize); - properties->encode(buffer); + properties.encode(buffer); } void qpid::framing::AMQHeaderBody::decode(Buffer& buffer, uint32_t bufSize){ - uint16_t classId = buffer.getShort(); + buffer.getShort(); // Ignore classId weight = buffer.getShort(); contentSize = buffer.getLongLong(); - createProperties(classId); - properties->decode(buffer, bufSize - 12); -} - -void qpid::framing::AMQHeaderBody::createProperties(int classId){ - switch(classId){ - case BASIC: - properties = new qpid::framing::BasicHeaderProperties(); - break; - default: - THROW_QPID_ERROR(FRAMING_ERROR, "Unknown header class"); - } + properties.decode(buffer, bufSize - 12); } void qpid::framing::AMQHeaderBody::print(std::ostream& out) const { out << "header (" << size() << " bytes)" << " content_size=" << getContentSize(); - const BasicHeaderProperties* props = - dynamic_cast<const BasicHeaderProperties*>(getProperties()); - if (props) { - out << ", message_id=" << props->getMessageId(); - out << ", delivery_mode=" << (int) props->getDeliveryMode(); - out << ", headers=" << const_cast<BasicHeaderProperties*>(props)->getHeaders(); - } + out << ", message_id=" << properties.getMessageId(); + out << ", delivery_mode=" << (int) properties.getDeliveryMode(); + out << ", headers=" << properties.getHeaders(); } diff --git a/cpp/src/qpid/framing/AMQHeaderBody.h b/cpp/src/qpid/framing/AMQHeaderBody.h index 691ceeff73..894936060c 100644 --- a/cpp/src/qpid/framing/AMQHeaderBody.h +++ b/cpp/src/qpid/framing/AMQHeaderBody.h @@ -21,7 +21,7 @@ #include "amqp_types.h" #include "AMQBody.h" #include "Buffer.h" -#include "HeaderProperties.h" +#include "BasicHeaderProperties.h" #ifndef _AMQHeaderBody_ #define _AMQHeaderBody_ @@ -31,19 +31,15 @@ namespace framing { class AMQHeaderBody : public AMQBody { - HeaderProperties* properties; + BasicHeaderProperties properties; uint16_t weight; uint64_t contentSize; - - void createProperties(int classId); -public: - typedef boost::shared_ptr<AMQHeaderBody> shared_ptr; - + public: AMQHeaderBody(int classId); AMQHeaderBody(); inline uint8_t type() const { return HEADER_BODY; } - HeaderProperties* getProperties(){ return properties; } - const HeaderProperties* getProperties() const { return properties; } + BasicHeaderProperties* getProperties(){ return &properties; } + const BasicHeaderProperties* getProperties() const { return &properties; } inline uint64_t getContentSize() const { return contentSize; } inline void setContentSize(uint64_t _size) { contentSize = _size; } virtual ~AMQHeaderBody(); @@ -51,6 +47,8 @@ public: virtual void encode(Buffer& buffer) const; virtual void decode(Buffer& buffer, uint32_t size); virtual void print(std::ostream& out) const; + + void accept(AMQBodyConstVisitor& v) const { v.visit(*this); } }; } diff --git a/cpp/src/qpid/framing/AMQHeartbeatBody.h b/cpp/src/qpid/framing/AMQHeartbeatBody.h index 4c046b81a7..a2701c3398 100644 --- a/cpp/src/qpid/framing/AMQHeartbeatBody.h +++ b/cpp/src/qpid/framing/AMQHeartbeatBody.h @@ -31,14 +31,13 @@ namespace framing { class AMQHeartbeatBody : public AMQBody { public: - typedef boost::shared_ptr<AMQHeartbeatBody> shared_ptr; - virtual ~AMQHeartbeatBody(); inline uint32_t size() const { return 0; } inline uint8_t type() const { return HEARTBEAT_BODY; } inline void encode(Buffer& ) const {} inline void decode(Buffer& , uint32_t /*size*/) {} virtual void print(std::ostream& out) const; + void accept(AMQBodyConstVisitor& v) const { v.visit(*this); } }; } diff --git a/cpp/src/qpid/framing/AMQMethodBody.cpp b/cpp/src/qpid/framing/AMQMethodBody.cpp index 0a2720e69a..924d906d43 100644 --- a/cpp/src/qpid/framing/AMQMethodBody.cpp +++ b/cpp/src/qpid/framing/AMQMethodBody.cpp @@ -18,51 +18,11 @@ * under the License. * */ -#include "AMQFrame.h" #include "AMQMethodBody.h" -#include "qpid/QpidError.h" -#include "qpid/framing/AMQP_MethodVersionMap.h" namespace qpid { namespace framing { -void AMQMethodBody::encodeId(Buffer& buffer) const{ - buffer.putShort(amqpClassId()); - buffer.putShort(amqpMethodId()); -} - -void AMQMethodBody::invoke(AMQP_ServerOperations&){ - assert(0); - THROW_QPID_ERROR(PROTOCOL_ERROR, "Method not supported by AMQP Server."); -} - -bool AMQMethodBody::invoke(Invocable*) { - return false; -} - -AMQMethodBody::shared_ptr AMQMethodBody::create( - AMQP_MethodVersionMap& versionMap, ProtocolVersion version, - Buffer& buffer) -{ - ClassMethodId id; - id.decode(buffer); - return AMQMethodBody::shared_ptr( - versionMap.createMethodBody( - id.classId, id.methodId, version.getMajor(), version.getMinor())); -} - -void AMQMethodBody::ClassMethodId::decode(Buffer& buffer) { - classId = buffer.getShort(); - methodId = buffer.getShort(); -} - -void AMQMethodBody::decode(Buffer& buffer, uint32_t /*size*/) { - decodeContent(buffer); -} - -void AMQMethodBody::encode(Buffer& buffer) const { - encodeId(buffer); - encodeContent(buffer); -} +AMQMethodBody::~AMQMethodBody() {} }} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/AMQMethodBody.h b/cpp/src/qpid/framing/AMQMethodBody.h index 73c5eb78a6..9c776e143b 100644 --- a/cpp/src/qpid/framing/AMQMethodBody.h +++ b/cpp/src/qpid/framing/AMQMethodBody.h @@ -21,65 +21,48 @@ * under the License. * */ -#include <iostream> #include "amqp_types.h" #include "AMQBody.h" -#include "Buffer.h" -#include "qpid/framing/AMQP_ServerOperations.h" +#include "qpid/framing/ProtocolVersion.h" +#include "qpid/shared_ptr.h" + +#include <ostream> + +#include <assert.h> namespace qpid { namespace framing { -class AMQP_MethodVersionMap; +class Buffer; +class AMQP_ServerOperations; +class Invocable; +class MethodBodyConstVisitor; -class AMQMethodBody : public AMQBody -{ +class AMQMethodBody : public AMQBody { public: - typedef boost::shared_ptr<AMQMethodBody> shared_ptr; - - static shared_ptr create( - AMQP_MethodVersionMap& map, ProtocolVersion version, Buffer& buf); - - ProtocolVersion version; - uint8_t type() const { return METHOD_BODY; } - AMQMethodBody(uint8_t major, uint8_t minor) : version(major, minor) {} - AMQMethodBody(ProtocolVersion ver) : version(ver) {} - virtual ~AMQMethodBody() {} - void decode(Buffer&, uint32_t); - virtual void encode(Buffer& buffer) const; + AMQMethodBody() {} + AMQMethodBody(uint8_t, uint8_t) {} + + virtual ~AMQMethodBody(); + virtual void accept(MethodBodyConstVisitor&) const = 0; + virtual MethodId amqpMethodId() const = 0; virtual ClassId amqpClassId() const = 0; - virtual void invoke(AMQP_ServerOperations&); - virtual bool invoke(Invocable* target); + virtual void invoke(AMQP_ServerOperations&) { assert(0); } + virtual bool invoke(Invocable*) { return false; } - template <class T> bool isA() { + template <class T> bool isA() const { return amqpClassId()==T::CLASS_ID && amqpMethodId()==T::METHOD_ID; } - /** Return request ID or response correlationID */ - virtual RequestId getRequestId() const { return 0; } - - virtual bool isRequest() const { return false; } - virtual bool isResponse() const { return false; } - - static uint32_t baseSize() { return 4; } - protected: - - struct ClassMethodId { - uint16_t classId; - uint16_t methodId; - void decode(Buffer& b); - }; - - void encodeId(Buffer& buffer) const; - virtual void encodeContent(Buffer& buffer) const = 0; - virtual void decodeContent(Buffer& buffer) = 0; - - virtual void printPrefix(std::ostream&) const {} + virtual uint32_t size() const = 0; + virtual uint8_t type() const { return METHOD_BODY; } - friend class MethodHolder; + AMQMethodBody* getMethod() { return this; } + const AMQMethodBody* getMethod() const { return this; } + void accept(AMQBodyConstVisitor& v) const { v.visit(*this); } }; diff --git a/cpp/src/qpid/framing/BasicHeaderProperties.h b/cpp/src/qpid/framing/BasicHeaderProperties.h index a6347b37fd..a8ef401b50 100644 --- a/cpp/src/qpid/framing/BasicHeaderProperties.h +++ b/cpp/src/qpid/framing/BasicHeaderProperties.h @@ -57,7 +57,7 @@ class BasicHeaderProperties : public HeaderProperties virtual void encode(Buffer& buffer) const; virtual void decode(Buffer& buffer, uint32_t size); - virtual uint8_t classId() { return BASIC; } + virtual uint8_t classId() const { return BASIC; } string getContentType() const { return contentType; } string getContentEncoding() const { return contentEncoding; } diff --git a/cpp/src/qpid/framing/Blob.cpp b/cpp/src/qpid/framing/Blob.cpp new file mode 100644 index 0000000000..0aaeb4138e --- /dev/null +++ b/cpp/src/qpid/framing/Blob.cpp @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Blob.h" + + +namespace qpid { +namespace framing { + +void BlobHelper<void>::destroy(void*) {} +void BlobHelper<void>::copy(void*, const void*) {} + +}} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/Blob.h b/cpp/src/qpid/framing/Blob.h index 1754e851e5..f89bad55ea 100644 --- a/cpp/src/qpid/framing/Blob.h +++ b/cpp/src/qpid/framing/Blob.h @@ -33,6 +33,8 @@ namespace boost { /** + * 0-arg typed_in_place_factory constructor and in_place() override. + * * Boost doesn't provide the 0 arg version since it assumes * in_place_factory will be used when there is no default ctor. */ @@ -48,20 +50,29 @@ typed_in_place_factory0<T> in_place() { return typed_in_place_factory0<T>(); } } // namespace boost + namespace qpid { namespace framing { using boost::in_place; -template <class T> -void destroyT(void* ptr) { static_cast<T*>(ptr)->~T(); } -inline void nullDestroy(void*) {} +template <class T> struct BlobHelper { + static void destroy(void* ptr) { static_cast<T*>(ptr)->~T(); } + static void copy(void* dest, const void* src) { + new (dest) T(*static_cast<const T*>(src)); + } +}; + +template <> struct BlobHelper<void> { + static void destroy(void*); + static void copy(void* to, const void* from); +}; /** * A "blob" is a chunk of memory which can contain a single object at * a time arbitrary type, provided sizeof(T)<=blob.size(). Blob * ensures proper construction and destruction of its contents, - * nothing else. + * and proper copying between Blobs, but nothing else. * * In particular the user must ensure the blob is big enough for its * contents and must know the type of object in the blob to cast get(). @@ -75,7 +86,13 @@ class Blob { boost::aligned_storage<Size> store; void (*destroy)(void*); + void (*copy)(void*, const void*); + template <class T> void setType() { + destroy=&BlobHelper<T>::destroy; + copy=&BlobHelper<T>::copy; + } + template<class TypedInPlaceFactory> void construct (const TypedInPlaceFactory& factory, const boost::typed_in_place_factory_base* ) @@ -84,16 +101,28 @@ class Blob assert(sizeof(T) <= Size); clear(); // Destroy old object. factory.apply(store.address()); - destroy=&destroyT<T>; + setType<T>(); } public: /** Construct an empty blob. */ - Blob() : destroy(&nullDestroy) {} + Blob() { setType<void>(); } + + /** Copy a blob. */ + Blob(const Blob& b) { *this = b; } + + /** Assign a blob */ + Blob& operator=(const Blob& b) { + setType<void>(); // Exception safety. + b.copy(this->get(), b.get()); + copy = b.copy; + destroy = b.destroy; + return *this; + } /** @see construct() */ template<class Expr> - Blob( const Expr & expr ) : destroy(nullDestroy) { construct(expr,&expr); } + Blob( const Expr & expr ) { setType<void>(); construct(expr,&expr); } ~Blob() { clear(); } @@ -106,7 +135,7 @@ class Blob /** Copy construct an instance of T into the Blob. */ template<class T> - Blob& operator=(const T& x) { construct(in_place<T>(x)); } + Blob& operator=(const T& x) { construct(in_place<T>(x)); return *this; } /** Get pointer to blob contents. Caller must know how to cast it. */ void* get() { return store.address(); } @@ -116,17 +145,15 @@ class Blob /** Destroy the object in the blob making it empty. */ void clear() { - void (*saveDestroy)(void*) = destroy; // Exception safety - destroy = &nullDestroy; - saveDestroy(store.address()); + void (*oldDestroy)(void*) = destroy; + setType<void>(); + oldDestroy(store.address()); } - /** True if there is no object allocated in the blob */ - bool empty() { return destroy==nullDestroy; } - static size_t size() { return Size; } }; + }} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/BodyHandler.cpp b/cpp/src/qpid/framing/BodyHandler.cpp index 7a7bf22f15..53a01141c1 100644 --- a/cpp/src/qpid/framing/BodyHandler.cpp +++ b/cpp/src/qpid/framing/BodyHandler.cpp @@ -25,25 +25,28 @@ #include "AMQContentBody.h" #include "AMQHeartbeatBody.h" +#include <boost/cast.hpp> + using namespace qpid::framing; using namespace boost; BodyHandler::~BodyHandler() {} -void BodyHandler::handleBody(shared_ptr<AMQBody> body) { +// TODO aconway 2007-08-13: Replace with visitor. +void BodyHandler::handleBody(AMQBody* body) { switch(body->type()) { case METHOD_BODY: - handleMethod(shared_polymorphic_cast<AMQMethodBody>(body)); + handleMethod(polymorphic_downcast<AMQMethodBody*>(body)); break; case HEADER_BODY: - handleHeader(shared_polymorphic_cast<AMQHeaderBody>(body)); + handleHeader(polymorphic_downcast<AMQHeaderBody*>(body)); break; case CONTENT_BODY: - handleContent(shared_polymorphic_cast<AMQContentBody>(body)); + handleContent(polymorphic_downcast<AMQContentBody*>(body)); break; case HEARTBEAT_BODY: - handleHeartbeat(shared_polymorphic_cast<AMQHeartbeatBody>(body)); + handleHeartbeat(polymorphic_downcast<AMQHeartbeatBody*>(body)); break; default: QPID_ERROR(PROTOCOL_ERROR, "Unknown frame type "+body->type()); diff --git a/cpp/src/qpid/framing/BodyHandler.h b/cpp/src/qpid/framing/BodyHandler.h index 07d1658afa..9ded737195 100644 --- a/cpp/src/qpid/framing/BodyHandler.h +++ b/cpp/src/qpid/framing/BodyHandler.h @@ -32,6 +32,8 @@ class AMQHeaderBody; class AMQContentBody; class AMQHeartbeatBody; +// TODO aconway 2007-08-10: rework using Visitor pattern? + /** * Interface to handle incoming frame bodies. * Derived classes provide logic for each frame type. @@ -39,13 +41,13 @@ class AMQHeartbeatBody; class BodyHandler { public: virtual ~BodyHandler(); - virtual void handleBody(boost::shared_ptr<AMQBody> body); + virtual void handleBody(AMQBody* body); protected: - virtual void handleMethod(boost::shared_ptr<AMQMethodBody>) = 0; - virtual void handleHeader(boost::shared_ptr<AMQHeaderBody>) = 0; - virtual void handleContent(boost::shared_ptr<AMQContentBody>) = 0; - virtual void handleHeartbeat(boost::shared_ptr<AMQHeartbeatBody>) = 0; + virtual void handleMethod(AMQMethodBody*) = 0; + virtual void handleHeader(AMQHeaderBody*) = 0; + virtual void handleContent(AMQContentBody*) = 0; + virtual void handleHeartbeat(AMQHeartbeatBody*) = 0; }; }} diff --git a/cpp/src/qpid/framing/ChannelAdapter.cpp b/cpp/src/qpid/framing/ChannelAdapter.cpp index d61126bc7f..25ff46acdd 100644 --- a/cpp/src/qpid/framing/ChannelAdapter.cpp +++ b/cpp/src/qpid/framing/ChannelAdapter.cpp @@ -23,6 +23,9 @@ #include "FrameHandler.h" #include "qpid/Exception.h" +#include "AMQMethodBody.h" +#include "qpid/framing/ConnectionOpenBody.h" + using boost::format; namespace qpid { @@ -45,7 +48,7 @@ void ChannelAdapter::init(ChannelId i, OutputHandler& out, ProtocolVersion v) handlers.out= make_shared_ptr(new OutputHandlerFrameHandler(out)); } -void ChannelAdapter::send(shared_ptr<AMQBody> body) +void ChannelAdapter::send(const AMQBody& body) { assertChannelOpen(); AMQFrame frame(getVersion(), getId(), body); diff --git a/cpp/src/qpid/framing/ChannelAdapter.h b/cpp/src/qpid/framing/ChannelAdapter.h index 9e418013eb..729f5e7b47 100644 --- a/cpp/src/qpid/framing/ChannelAdapter.h +++ b/cpp/src/qpid/framing/ChannelAdapter.h @@ -25,11 +25,11 @@ * */ -#include "qpid/shared_ptr.h" #include "BodyHandler.h" #include "ProtocolVersion.h" #include "amqp_types.h" #include "FrameHandler.h" +#include "OutputHandler.h" namespace qpid { namespace framing { @@ -66,7 +66,7 @@ class ChannelAdapter : protected BodyHandler { ChannelId getId() const { return id; } ProtocolVersion getVersion() const { return version; } - virtual void send(shared_ptr<AMQBody> body); + virtual void send(const AMQBody& body); virtual bool isOpen() const = 0; @@ -75,7 +75,7 @@ class ChannelAdapter : protected BodyHandler { void assertChannelOpen() const; void assertChannelNotOpen() const; - virtual void handleMethod(shared_ptr<AMQMethodBody>) = 0; + virtual void handleMethod(AMQMethodBody*) = 0; private: class ChannelAdapterHandler; diff --git a/cpp/src/qpid/framing/Frame.cpp b/cpp/src/qpid/framing/Frame.cpp deleted file mode 100644 index 1ba8112faa..0000000000 --- a/cpp/src/qpid/framing/Frame.cpp +++ /dev/null @@ -1,117 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <boost/format.hpp> - -#include "Frame.h" -#include "qpid/QpidError.h" - - -namespace qpid { -namespace framing { - -namespace { -struct GetBodyVisitor : public NoBlankVisitor<AMQBody*> { - QPID_USING_NOBLANK(AMQBody*); - AMQBody* operator()(MethodHolder& h) const { return h.getMethod(); } - template <class T> AMQBody* operator()(T& t) const { return &t; } -}; -} - -AMQBody* Frame::getBody() { - return boost::apply_visitor(GetBodyVisitor(), body); -} - -const AMQBody* Frame::getBody() const { - return boost::apply_visitor(GetBodyVisitor(), const_cast<Variant&>(body)); -} - -void Frame::encode(Buffer& buffer) -{ - buffer.putOctet(getBody()->type()); - buffer.putShort(channel); - buffer.putLong(getBody()->size()); - getBody()->encode(buffer); - buffer.putOctet(0xCE); -} - -uint32_t Frame::size() const{ - return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + getBody()->size() - + 1/*0xCE*/; -} - -bool Frame::decode(Buffer& buffer) -{ - if(buffer.available() < 7) - return false; - buffer.record(); - uint32_t frameSize = decodeHead(buffer); - if(buffer.available() < frameSize + 1){ - buffer.restore(); - return false; - } - decodeBody(buffer, frameSize); - uint8_t end = buffer.getOctet(); - if(end != 0xCE) THROW_QPID_ERROR(FRAMING_ERROR, "Frame end not found"); - return true; -} - -uint32_t Frame::decodeHead(Buffer& buffer){ - type = buffer.getOctet(); - channel = buffer.getShort(); - return buffer.getLong(); -} - -void Frame::decodeBody(Buffer& buffer, uint32_t size) -{ - switch(type) - { - case METHOD_BODY: - case REQUEST_BODY: - case RESPONSE_BODY: { - ClassId c=buffer.getShort(); - MethodId m=buffer.getShort(); - body = MethodHolder(c,m); - break; - } - case HEADER_BODY: - body = AMQHeaderBody(); - break; - case CONTENT_BODY: - body = AMQContentBody(); - break; - case HEARTBEAT_BODY: - body = AMQHeartbeatBody(); - break; - default: - THROW_QPID_ERROR( - FRAMING_ERROR, - boost::format("Unknown frame type %d") % type); - } - getBody()->decode(buffer, size); -} - -std::ostream& operator<<(std::ostream& out, const Frame& f) -{ - return out << "Frame[channel=" << f.getChannel() << "; " << f.body << "]"; -} - - -}} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/Frame.h b/cpp/src/qpid/framing/Frame.h deleted file mode 100644 index 02ab15a828..0000000000 --- a/cpp/src/qpid/framing/Frame.h +++ /dev/null @@ -1,72 +0,0 @@ -#ifndef QPID_FRAMING_FRAME_H -#define QPID_FRAMING_FRAME_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "AMQDataBlock.h" -#include "AMQHeaderBody.h" -#include "AMQContentBody.h" -#include "AMQHeartbeatBody.h" -#include "MethodHolder.h" - -namespace qpid { -namespace framing { - -class Frame : public AMQDataBlock { - public: - typedef boost::variant<boost::blank, - AMQHeaderBody, - AMQContentBody, - AMQHeartbeatBody, - MethodHolder> Variant; - - Frame(ChannelId channel_=0, const Variant& body_=Variant()) - : body(body_), channel(channel_) {} - - void encode(Buffer& buffer); - bool decode(Buffer& buffer); - uint32_t size() const; - - uint16_t getChannel() const { return channel; } - - AMQBody* getBody(); - const AMQBody* getBody() const; - - template <class T> T* castBody() { - return boost::polymorphic_downcast<T*>(getBody()); - } - - Variant body; - - private: - uint32_t decodeHead(Buffer& buffer); - void decodeBody(Buffer& buffer, uint32_t size); - - uint8_t type; - uint16_t channel; -}; - -std::ostream& operator<<(std::ostream&, const Frame&); - -}} // namespace qpid::framing - - -#endif /*!QPID_FRAMING_FRAME_H*/ diff --git a/cpp/src/qpid/framing/HeaderProperties.h b/cpp/src/qpid/framing/HeaderProperties.h index ae8b796aa9..0c805922e8 100644 --- a/cpp/src/qpid/framing/HeaderProperties.h +++ b/cpp/src/qpid/framing/HeaderProperties.h @@ -34,7 +34,7 @@ namespace framing { public: inline virtual ~HeaderProperties(){} - virtual uint8_t classId() = 0; + virtual uint8_t classId() const = 0; virtual uint32_t size() const = 0; virtual void encode(Buffer& buffer) const = 0; virtual void decode(Buffer& buffer, uint32_t size) = 0; diff --git a/cpp/src/qpid/framing/MethodHolder.cpp b/cpp/src/qpid/framing/MethodHolder.cpp index 43997e6d55..de8f0da6d4 100644 --- a/cpp/src/qpid/framing/MethodHolder.cpp +++ b/cpp/src/qpid/framing/MethodHolder.cpp @@ -23,8 +23,8 @@ #include "qpid/framing/AMQMethodBody.h" #include "qpid/framing/Buffer.h" -// Note: MethodHolder::construct is in a separate generated file -// MethodHolder_construct.cpp +// Note: MethodHolder::construct is and operator= are code-generated +// in file MethodHolder_construct.cpp. using namespace boost; @@ -35,16 +35,18 @@ void MethodHolder::encode(Buffer& b) const { const AMQMethodBody* body = get(); b.putShort(body->amqpClassId()); b.putShort(body->amqpMethodId()); - body->encodeContent(b); + body->encode(b); } void MethodHolder::decode(Buffer& b) { - construct(std::make_pair(b.getShort(), b.getShort())); - get()->decodeContent(b); + ClassId c=b.getShort(); + MethodId m=b.getShort(); + construct(c,m); + get()->decode(b); } uint32_t MethodHolder::size() const { - return sizeof(Id)+get()->size(); + return sizeof(ClassId)+sizeof(MethodId)+get()->size(); } std::ostream& operator<<(std::ostream& out, const MethodHolder& h) { diff --git a/cpp/src/qpid/framing/MethodHolder.h b/cpp/src/qpid/framing/MethodHolder.h index fa14db2f79..59dbb1a708 100644 --- a/cpp/src/qpid/framing/MethodHolder.h +++ b/cpp/src/qpid/framing/MethodHolder.h @@ -22,64 +22,74 @@ * */ +#include "qpid/framing/AMQBody.h" +#include "qpid/framing/amqp_types.h" #include "qpid/framing/amqp_types.h" #include "qpid/framing/Blob.h" #include "qpid/framing/MethodHolderMaxSize.h" // Generated file. +#include <boost/type_traits/is_base_and_derived.hpp> +#include <boost/utility/enable_if.hpp> + #include <utility> namespace qpid { namespace framing { class AMQMethodBody; +class AMQBody; class Buffer; +class MethodHolder; +std::ostream& operator<<(std::ostream& out, const MethodHolder& h); + /** * Holder for arbitrary method body. */ +// TODO aconway 2007-08-14: Fix up naming, this class should really be +// called AMQMethodBody and use a different name for the root of +// the concrete method body tree, which should not inherit AMQBody. +// class MethodHolder { - public: - typedef std::pair<ClassId, MethodId> Id; - - template <class T>static Id idOf() { - return std::make_pair(T::CLASS_ID, T::METHOD_ID); } + template <class T> struct EnableIfMethod: + public boost::enable_if<boost::is_base_and_derived<AMQMethodBody,T>,T> + {}; + template <class T> EnableIfMethod<T>& assertMethod(T& t) { return t; } + + public: MethodHolder() {} - MethodHolder(const Id& id) { construct(id); } - MethodHolder(ClassId& c, MethodId& m) { construct(std::make_pair(c,m)); } + MethodHolder(ClassId& c, MethodId& m) { construct(c,m); } - template <class M> - MethodHolder(const M& m) : blob(m), id(idOf<M>()) {} + /** Construct with a copy of a method body. */ + MethodHolder(const AMQMethodBody& m) { *this = m; } - template <class M> - MethodHolder& operator=(const M& m) { blob=m; id=idOf<M>(); return *this; } + /** Copy method body into holder. */ + MethodHolder& operator=(const AMQMethodBody&); - /** Construct the method body corresponding to Id */ - void construct(const Id&); + /** Construct the method body corresponding to class/method id */ + void construct(ClassId c, MethodId m); + uint8_t type() const { return 1; } void encode(Buffer&) const; void decode(Buffer&); uint32_t size() const; AMQMethodBody* get() { - return static_cast<AMQMethodBody*>(blob.get()); + return reinterpret_cast<AMQMethodBody*>(blob.get()); } const AMQMethodBody* get() const { - return static_cast<const AMQMethodBody*>(blob.get()); + return reinterpret_cast<const AMQMethodBody*>(blob.get()); } - AMQMethodBody* operator* () { return get(); } - const AMQMethodBody* operator*() const { return get(); } - AMQMethodBody* operator-> () { return get(); } - const AMQMethodBody* operator->() const { return get(); } - private: Blob<MAX_METHODBODY_SIZE> blob; - Id id; + class CopyVisitor; + friend struct CopyVisitor; }; -std::ostream& operator<<(std::ostream& out, const MethodHolder& h); + }} // namespace qpid::framing diff --git a/cpp/src/qpid/framing/amqp_framing.h b/cpp/src/qpid/framing/amqp_framing.h index 888c6a7382..eec28333bc 100644 --- a/cpp/src/qpid/framing/amqp_framing.h +++ b/cpp/src/qpid/framing/amqp_framing.h @@ -26,7 +26,6 @@ #include "AMQHeaderBody.h" #include "AMQContentBody.h" #include "AMQHeartbeatBody.h" -#include "qpid/framing/AMQP_MethodVersionMap.h" #include "InputHandler.h" #include "OutputHandler.h" #include "InitiationHandler.h" diff --git a/cpp/src/qpid/framing/amqp_types.h b/cpp/src/qpid/framing/amqp_types.h index 5ea08a69af..bfd5b2206f 100644 --- a/cpp/src/qpid/framing/amqp_types.h +++ b/cpp/src/qpid/framing/amqp_types.h @@ -44,8 +44,6 @@ namespace framing { using std::string; typedef uint8_t FrameType; typedef uint16_t ChannelId; -typedef uint64_t RequestId; -typedef uint64_t ResponseId; typedef uint32_t BatchOffset; typedef uint16_t ClassId; typedef uint16_t MethodId; |
