diff options
| author | Gordon Sim <gsim@apache.org> | 2007-08-28 19:38:17 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2007-08-28 19:38:17 +0000 |
| commit | 9e10f4ea3b2f8ab6650f635cada48e4735ca20d7 (patch) | |
| tree | 26ad3b8dffa17fa665fe7a033a7c8092839df011 /cpp/src/qpid/framing | |
| parent | 6b09696b216c090b512c6af92bf7976ae3407add (diff) | |
| download | qpid-python-9e10f4ea3b2f8ab6650f635cada48e4735ca20d7.tar.gz | |
Updated message.transfer encoding to use header and content segments (including new structs).
Unified more between the basic and message classes messages.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@570538 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/framing')
| -rw-r--r-- | cpp/src/qpid/framing/AMQContentBody.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/AMQDataBlock.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/AMQFrame.cpp | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/AMQFrame.h | 19 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/AMQHeaderBody.cpp | 68 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/AMQHeaderBody.h | 95 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/AMQMethodBody.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/BasicHeaderProperties.cpp | 35 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/BasicHeaderProperties.h | 10 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/ChannelAdapter.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/FrameSet.cpp | 83 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/FrameSet.h | 102 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/ProtocolInitiation.cpp | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/ProtocolInitiation.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/SendContent.cpp | 51 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/SendContent.h | 53 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/TypeFilter.h | 52 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/frame_functors.h | 108 |
18 files changed, 645 insertions, 50 deletions
diff --git a/cpp/src/qpid/framing/AMQContentBody.h b/cpp/src/qpid/framing/AMQContentBody.h index dd4ab10d7d..5d530a1b9a 100644 --- a/cpp/src/qpid/framing/AMQContentBody.h +++ b/cpp/src/qpid/framing/AMQContentBody.h @@ -38,6 +38,7 @@ public: inline virtual ~AMQContentBody(){} inline uint8_t type() const { return CONTENT_BODY; }; inline const string& getData() const { return data; } + inline string& getData() { return data; } uint32_t size() const; void encode(Buffer& buffer) const; void decode(Buffer& buffer, uint32_t size); diff --git a/cpp/src/qpid/framing/AMQDataBlock.h b/cpp/src/qpid/framing/AMQDataBlock.h index 6ff61b185e..9b6fdfd966 100644 --- a/cpp/src/qpid/framing/AMQDataBlock.h +++ b/cpp/src/qpid/framing/AMQDataBlock.h @@ -30,7 +30,7 @@ class AMQDataBlock { public: virtual ~AMQDataBlock() {} - virtual void encode(Buffer& buffer) = 0; + virtual void encode(Buffer& buffer) const = 0; virtual bool decode(Buffer& buffer) = 0; virtual uint32_t size() const = 0; }; diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp index 780af71be4..a7fd068ee4 100644 --- a/cpp/src/qpid/framing/AMQFrame.cpp +++ b/cpp/src/qpid/framing/AMQFrame.cpp @@ -70,7 +70,7 @@ const AMQBody* AMQFrame::getBody() const { return boost::apply_visitor(GetBodyVisitor(), const_cast<Variant&>(body)); } -void AMQFrame::encode(Buffer& buffer) +void AMQFrame::encode(Buffer& buffer) const { buffer.putOctet(getBody()->type()); buffer.putShort(channel); @@ -80,8 +80,11 @@ void AMQFrame::encode(Buffer& buffer) } uint32_t AMQFrame::size() const{ - return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + - boost::apply_visitor(SizeVisitor(), body) + 1/*0xCE*/; + return frameOverhead() + boost::apply_visitor(SizeVisitor(), body); +} + +uint32_t AMQFrame::frameOverhead() { + return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + 1/*0xCE*/; } bool AMQFrame::decode(Buffer& buffer) diff --git a/cpp/src/qpid/framing/AMQFrame.h b/cpp/src/qpid/framing/AMQFrame.h index 9e825a9936..84e7660218 100644 --- a/cpp/src/qpid/framing/AMQFrame.h +++ b/cpp/src/qpid/framing/AMQFrame.h @@ -37,14 +37,14 @@ namespace framing { class AMQFrame : public AMQDataBlock { public: - AMQFrame(ProtocolVersion=ProtocolVersion()) {} + AMQFrame() : channel(0) {} /** Construct a frame with a copy of b */ - AMQFrame(ProtocolVersion, ChannelId c, const AMQBody* b) : channel(c) { + AMQFrame(ChannelId c, const AMQBody* b) : channel(c) { setBody(*b); } - AMQFrame(ProtocolVersion, ChannelId c, const AMQBody& b) : channel(c) { + AMQFrame(ChannelId c, const AMQBody& b) : channel(c) { setBody(b); } @@ -52,21 +52,26 @@ class AMQFrame : public AMQDataBlock void setChannel(ChannelId c) { channel = c; } AMQBody* getBody(); - const AMQBody* getBody() const; + const AMQBody* getBody() const; /** Copy a body instance to the frame */ void setBody(const AMQBody& b) { CopyVisitor cv(*this); b.accept(cv); } /** Convenience template to cast the body to an expected type. */ template <class T> T* castBody() { - boost::polymorphic_downcast<T*>(getBody()); + return boost::polymorphic_downcast<T*>(getBody()); + } + + template <class T> const T* castBody() const { + return boost::polymorphic_downcast<const T*>(getBody()); } bool empty() { return boost::get<boost::blank>(&body); } - void encode(Buffer& buffer); + void encode(Buffer& buffer) const; bool decode(Buffer& buffer); uint32_t size() const; + static uint32_t frameOverhead(); private: struct CopyVisitor : public AMQBodyConstVisitor { @@ -77,7 +82,7 @@ class AMQFrame : public AMQDataBlock void visit(const AMQHeartbeatBody& x) { frame.body=x; } void visit(const AMQMethodBody& x) { frame.body=MethodHolder(x); } }; - friend struct CopyVisitor; + friend struct CopyVisitor; typedef boost::variant<boost::blank, AMQHeaderBody, diff --git a/cpp/src/qpid/framing/AMQHeaderBody.cpp b/cpp/src/qpid/framing/AMQHeaderBody.cpp index 6a3c8f27d1..7083709fde 100644 --- a/cpp/src/qpid/framing/AMQHeaderBody.cpp +++ b/cpp/src/qpid/framing/AMQHeaderBody.cpp @@ -19,37 +19,65 @@ * */ #include "AMQHeaderBody.h" -#include "qpid/QpidError.h" -#include "BasicHeaderProperties.h" +#include "qpid/Exception.h" +#include "qpid/log/Statement.h" -qpid::framing::AMQHeaderBody::AMQHeaderBody(int) : weight(0), contentSize(0) {} +qpid::framing::AMQHeaderBody::AMQHeaderBody() {} -qpid::framing::AMQHeaderBody::AMQHeaderBody() : weight(0), contentSize(0){} - -qpid::framing::AMQHeaderBody::~AMQHeaderBody(){} +qpid::framing::AMQHeaderBody::~AMQHeaderBody() {} uint32_t qpid::framing::AMQHeaderBody::size() const{ - return 12 + properties.size(); + CalculateSize visitor; + for_each(properties.begin(), properties.end(), boost::apply_visitor(visitor)); + return visitor.totalSize() + (properties.size() * (2/*type codes*/ + 4/*size*/)); } void qpid::framing::AMQHeaderBody::encode(Buffer& buffer) const { - buffer.putShort(properties.classId()); - buffer.putShort(weight); - buffer.putLongLong(contentSize); - properties.encode(buffer); + Encode visitor(buffer); + for_each(properties.begin(), properties.end(), boost::apply_visitor(visitor)); +} + +void qpid::framing::AMQHeaderBody::decode(Buffer& buffer, uint32_t size){ + uint32_t limit = buffer.available() - size; + while (buffer.available() > limit + 2) { + uint32_t len = buffer.getLong(); + uint16_t type = buffer.getShort(); + //The following switch could be generated as the number of options increases: + switch(type) { + case BasicHeaderProperties::TYPE: + decode(BasicHeaderProperties(), buffer, len - 2); + break; + case MessageProperties::TYPE: + decode(MessageProperties(), buffer, len - 2); + break; + case DeliveryProperties::TYPE: + decode(DeliveryProperties(), buffer, len - 2); + break; + default: + //TODO: should just skip over them keeping them for later dispatch as is + throw Exception(QPID_MSG("Unexpected property type: " << type)); + } + } } -void qpid::framing::AMQHeaderBody::decode(Buffer& buffer, uint32_t bufSize){ - buffer.getShort(); // Ignore classId - weight = buffer.getShort(); - contentSize = buffer.getLongLong(); - properties.decode(buffer, bufSize - 12); +uint64_t qpid::framing::AMQHeaderBody::getContentLength() const +{ + const MessageProperties* mProps = get<MessageProperties>(); + if (mProps) { + return mProps->getContentLength(); + } + const BasicHeaderProperties* bProps = get<BasicHeaderProperties>(); + if (bProps) { + return bProps->getContentLength(); + } + return 0; } void qpid::framing::AMQHeaderBody::print(std::ostream& out) const { - out << "header (" << size() << " bytes)" << " content_size=" << getContentSize(); - out << ", message_id=" << properties.getMessageId(); - out << ", delivery_mode=" << (int) properties.getDeliveryMode(); - out << ", headers=" << properties.getHeaders(); + out << "header (" << size() << " bytes)"; + out << "; properties={"; + Print visitor(out); + for_each(properties.begin(), properties.end(), boost::apply_visitor(visitor)); + out << "}"; } diff --git a/cpp/src/qpid/framing/AMQHeaderBody.h b/cpp/src/qpid/framing/AMQHeaderBody.h index 894936060c..76bd60559e 100644 --- a/cpp/src/qpid/framing/AMQHeaderBody.h +++ b/cpp/src/qpid/framing/AMQHeaderBody.h @@ -22,6 +22,12 @@ #include "AMQBody.h" #include "Buffer.h" #include "BasicHeaderProperties.h" +#include "qpid/framing/DeliveryProperties.h" +#include "qpid/framing/MessageProperties.h" +#include <iostream> +#include <vector> +#include <boost/variant.hpp> +#include <boost/variant/get.hpp> #ifndef _AMQHeaderBody_ #define _AMQHeaderBody_ @@ -31,24 +37,85 @@ namespace framing { class AMQHeaderBody : public AMQBody { - BasicHeaderProperties properties; - uint16_t weight; - uint64_t contentSize; - public: - AMQHeaderBody(int classId); + typedef std::vector< boost::variant<BasicHeaderProperties, DeliveryProperties, MessageProperties> > PropertyList; + + PropertyList properties; + + template <class T> void decode(T t, Buffer& b, uint32_t size) { + t.decode(b, size); + properties.push_back(t); + } + + class Encode : public boost::static_visitor<> { + Buffer& buffer; + public: + Encode(Buffer& b) : buffer(b) {} + + template <class T> void operator()(T& t) const { + buffer.putLong(t.size() + 2/*typecode*/); + buffer.putShort(T::TYPE); + t.encode(buffer); + } + }; + + class CalculateSize : public boost::static_visitor<> { + uint32_t size; + public: + CalculateSize() : size(0) {} + + template <class T> void operator()(T& t) { + size += t.size(); + } + + uint32_t totalSize() { + return size; + } + }; + + class Print : public boost::static_visitor<> { + std::ostream& out; + public: + Print(std::ostream& o) : out(o) {} + + template <class T> void operator()(T& t) { + out << t; + } + }; + +public: + AMQHeaderBody(); + ~AMQHeaderBody(); inline uint8_t type() const { return HEADER_BODY; } - BasicHeaderProperties* getProperties(){ return &properties; } - const BasicHeaderProperties* getProperties() const { return &properties; } - inline uint64_t getContentSize() const { return contentSize; } - inline void setContentSize(uint64_t _size) { contentSize = _size; } - virtual ~AMQHeaderBody(); - virtual uint32_t size() const; - virtual void encode(Buffer& buffer) const; - virtual void decode(Buffer& buffer, uint32_t size); - virtual void print(std::ostream& out) const; + + uint32_t size() const; + void encode(Buffer& buffer) const; + void decode(Buffer& buffer, uint32_t size); + uint64_t getContentLength() const; + void print(std::ostream& out) const; void accept(AMQBodyConstVisitor& v) const { v.visit(*this); } + + template <class T> T* get(bool create) { + for (PropertyList::iterator i = properties.begin(); i != properties.end(); i++) { + T* p = boost::get<T>(&(*i)); + if (p) return p; + } + if (create) { + properties.push_back(T()); + return boost::get<T>(&(properties.back())); + } else { + return 0; + } + } + + template <class T> const T* get() const { + for (PropertyList::const_iterator i = properties.begin(); i != properties.end(); i++) { + const T* p = boost::get<T>(&(*i)); + if (p) return p; + } + return 0; + } }; } diff --git a/cpp/src/qpid/framing/AMQMethodBody.h b/cpp/src/qpid/framing/AMQMethodBody.h index 5acb3a7b66..a5c14a37e9 100644 --- a/cpp/src/qpid/framing/AMQMethodBody.h +++ b/cpp/src/qpid/framing/AMQMethodBody.h @@ -49,6 +49,7 @@ class AMQMethodBody : public AMQBody { virtual MethodId amqpMethodId() const = 0; virtual ClassId amqpClassId() const = 0; + virtual bool isContentBearing() const = 0; void invoke(AMQP_ServerOperations&); bool invoke(Invocable*); diff --git a/cpp/src/qpid/framing/BasicHeaderProperties.cpp b/cpp/src/qpid/framing/BasicHeaderProperties.cpp index dfa5e1bc3f..7d933d0db8 100644 --- a/cpp/src/qpid/framing/BasicHeaderProperties.cpp +++ b/cpp/src/qpid/framing/BasicHeaderProperties.cpp @@ -22,7 +22,10 @@ //TODO: This could be easily generated from the spec -qpid::framing::BasicHeaderProperties::BasicHeaderProperties() : deliveryMode(DeliveryMode(0)), priority(0), timestamp(0){} +qpid::framing::BasicHeaderProperties::BasicHeaderProperties() : deliveryMode(DeliveryMode(0)), + priority(0), + timestamp(0), + contentLength(0){} qpid::framing::BasicHeaderProperties::~BasicHeaderProperties(){} uint32_t qpid::framing::BasicHeaderProperties::size() const{ @@ -41,6 +44,7 @@ uint32_t qpid::framing::BasicHeaderProperties::size() const{ if(userId.length() > 0) bytes += userId.length() + 1; if(appId.length() > 0) bytes += appId.length() + 1; if(clusterId.length() > 0) bytes += clusterId.length() + 1; + if(contentLength != 0) bytes += 8; return bytes; } @@ -63,6 +67,7 @@ void qpid::framing::BasicHeaderProperties::encode(qpid::framing::Buffer& buffer) if(userId.length() > 0) buffer.putShortString(userId); if(appId.length() > 0) buffer.putShortString(appId); if(clusterId.length() > 0) buffer.putShortString(clusterId); + if(contentLength != 0) buffer.putLongLong(contentLength); } void qpid::framing::BasicHeaderProperties::decode(qpid::framing::Buffer& buffer, uint32_t /*size*/){ @@ -81,6 +86,7 @@ void qpid::framing::BasicHeaderProperties::decode(qpid::framing::Buffer& buffer, if(flags & (1 << 4)) buffer.getShortString(userId); if(flags & (1 << 3)) buffer.getShortString(appId); if(flags & (1 << 2)) buffer.getShortString(clusterId); + if(flags & (1 << 1)) contentLength = buffer.getLongLong(); } uint16_t qpid::framing::BasicHeaderProperties::getFlags() const{ @@ -99,5 +105,32 @@ uint16_t qpid::framing::BasicHeaderProperties::getFlags() const{ if(userId.length() > 0) flags |= (1 << 4); if(appId.length() > 0) flags |= (1 << 3); if(clusterId.length() > 0) flags |= (1 << 2); + if(contentLength != 0) flags |= (1 << 1); return flags; } + +namespace qpid{ +namespace framing{ + + std::ostream& operator<<(std::ostream& out, const BasicHeaderProperties& props) + { + if(props.contentType.length() > 0) out << "contentType=" << props.contentType << ";"; + if(props.contentEncoding.length() > 0) out << "contentEncoding=" << props.contentEncoding << ";"; + if(props.headers.count() > 0) out << "headers=" << props.headers << ";"; + if(props.deliveryMode != 0) out << "deliveryMode=" << props.deliveryMode << ";"; + if(props.priority != 0) out << "priority=" << props.priority << ";"; + if(props.correlationId.length() > 0) out << "correlationId=" << props.correlationId << ";"; + if(props.replyTo.length() > 0) out << "replyTo=" << props.replyTo << ";"; + if(props.expiration.length() > 0) out << "expiration=" << props.expiration << ";"; + if(props.messageId.length() > 0) out << "messageId=" << props.messageId << ";"; + if(props.timestamp != 0) out << "timestamp=" << props.timestamp << ";"; + if(props.type.length() > 0) out << "type=" << props.type << ";"; + if(props.userId.length() > 0) out << "userId=" << props.userId << ";"; + if(props.appId.length() > 0) out << "appId=" << props.appId << ";"; + if(props.clusterId.length() > 0) out << "clusterId=" << props.clusterId << ";"; + if(props.contentLength != 0) out << "contentLength=" << props.contentLength << ";"; + + return out; + } + +}} diff --git a/cpp/src/qpid/framing/BasicHeaderProperties.h b/cpp/src/qpid/framing/BasicHeaderProperties.h index a8ef401b50..d6c71437fb 100644 --- a/cpp/src/qpid/framing/BasicHeaderProperties.h +++ b/cpp/src/qpid/framing/BasicHeaderProperties.h @@ -47,15 +47,18 @@ class BasicHeaderProperties : public HeaderProperties string userId; string appId; string clusterId; + uint64_t contentLength; uint16_t getFlags() const; public: + static const uint16_t TYPE = BASIC; + BasicHeaderProperties(); virtual ~BasicHeaderProperties(); virtual uint32_t size() const; virtual void encode(Buffer& buffer) const; - virtual void decode(Buffer& buffer, uint32_t size); + virtual void decode(Buffer& buffer, uint32_t size = 0); virtual uint8_t classId() const { return BASIC; } @@ -74,6 +77,7 @@ class BasicHeaderProperties : public HeaderProperties string getUserId() const { return userId; } string getAppId() const { return appId; } string getClusterId() const { return clusterId; } + uint64_t getContentLength() const { return contentLength; } void setContentType(const string& _type){ contentType = _type; } void setContentEncoding(const string& encoding){ contentEncoding = encoding; } @@ -89,6 +93,9 @@ class BasicHeaderProperties : public HeaderProperties void setUserId(const string& _userId){ userId = _userId; } void setAppId(const string& _appId){appId = _appId; } void setClusterId(const string& _clusterId){ clusterId = _clusterId; } + void setContentLength(uint64_t _contentLength){ contentLength = _contentLength; } + + friend std::ostream& operator<<(std::ostream&, const BasicHeaderProperties&); /** \internal * Template to copy between types like BasicHeaderProperties. @@ -109,6 +116,7 @@ class BasicHeaderProperties : public HeaderProperties to.setUserId(from.getUserId()); to.setAppId(from.getAppId()); to.setClusterId(from.getClusterId()); + to.setContentLength(from.getContentLength()); } }; }} diff --git a/cpp/src/qpid/framing/ChannelAdapter.cpp b/cpp/src/qpid/framing/ChannelAdapter.cpp index 25ff46acdd..86b60d896b 100644 --- a/cpp/src/qpid/framing/ChannelAdapter.cpp +++ b/cpp/src/qpid/framing/ChannelAdapter.cpp @@ -51,7 +51,7 @@ void ChannelAdapter::init(ChannelId i, OutputHandler& out, ProtocolVersion v) void ChannelAdapter::send(const AMQBody& body) { assertChannelOpen(); - AMQFrame frame(getVersion(), getId(), body); + AMQFrame frame(getId(), body); handlers.out->handle(frame); } diff --git a/cpp/src/qpid/framing/FrameSet.cpp b/cpp/src/qpid/framing/FrameSet.cpp new file mode 100644 index 0000000000..434f1b3aad --- /dev/null +++ b/cpp/src/qpid/framing/FrameSet.cpp @@ -0,0 +1,83 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "FrameSet.h" +#include "qpid/framing/all_method_bodies.h" +#include "qpid/framing/frame_functors.h" +#include "qpid/framing/BasicHeaderProperties.h" +#include "qpid/framing/MessageProperties.h" +#include "qpid/framing/TypeFilter.h" + +using namespace qpid::framing; +using namespace boost; + +FrameSet::FrameSet(const SequenceNumber& _id) : id(_id) {} + +void FrameSet::append(AMQFrame& part) +{ + parts.push_back(part); +} + +bool FrameSet::isComplete() const +{ + //TODO: should eventually use the 0-10 frame header flags when available + const AMQMethodBody* method = getMethod(); + if (!method) { + return false; + } else if (method->isContentBearing()) { + const AMQHeaderBody* header = getHeaders(); + if (header) { + return header->getContentLength() == getContentSize(); + } else { + return false; + } + } else { + return true; + } +} + +const AMQMethodBody* FrameSet::getMethod() const +{ + return parts.empty() ? 0 : dynamic_cast<const AMQMethodBody*>(parts[0].getBody()); +} + +const AMQHeaderBody* FrameSet::getHeaders() const +{ + return parts.size() < 2 ? 0 : dynamic_cast<const AMQHeaderBody*>(parts[1].getBody()); +} + +AMQHeaderBody* FrameSet::getHeaders() +{ + return parts.size() < 2 ? 0 : dynamic_cast<AMQHeaderBody*>(parts[1].getBody()); +} + +uint64_t FrameSet::getContentSize() const +{ + SumBodySize sum; + map_if(sum, TypeFilter(CONTENT_BODY)); + return sum.getSize(); +} + +void FrameSet::getContent(std::string& out) const +{ + AccumulateContent accumulator(out); + map_if(accumulator, TypeFilter(CONTENT_BODY)); +} diff --git a/cpp/src/qpid/framing/FrameSet.h b/cpp/src/qpid/framing/FrameSet.h new file mode 100644 index 0000000000..d6d5cd7a13 --- /dev/null +++ b/cpp/src/qpid/framing/FrameSet.h @@ -0,0 +1,102 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include <vector> +#include "qpid/framing/amqp_framing.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/SequenceNumber.h" + +#ifndef _FrameSet_ +#define _FrameSet_ + +namespace qpid { +namespace framing { + +/** + * Collects the frames representing a message. + */ +class FrameSet +{ + typedef std::vector<AMQFrame> Frames; + const SequenceNumber id; + Frames parts; + +public: + typedef boost::shared_ptr<FrameSet> shared_ptr; + + FrameSet(const SequenceNumber& id); + void append(AMQFrame& part); + bool isComplete() const; + + uint64_t getContentSize() const; + void getContent(std::string&) const; + + const AMQMethodBody* getMethod() const; + const AMQHeaderBody* getHeaders() const; + AMQHeaderBody* getHeaders(); + + template <class T> bool isA() const { + const AMQMethodBody* method = getMethod(); + return method && method->isA<T>(); + } + + template <class T> const T* as() const { + const AMQMethodBody* method = getMethod(); + return (method && method->isA<T>()) ? dynamic_cast<const T*>(method) : 0; + } + + template <class T> const T* getHeaderProperties() const { + const AMQHeaderBody* header = getHeaders(); + return header ? header->get<T>() : 0; + } + + const SequenceNumber& getId() const { return id; } + + template <class P> void remove(P predicate) { + parts.erase(remove_if(parts.begin(), parts.end(), predicate), parts.end()); + } + + template <class F> void map(F& functor) { + for_each(parts.begin(), parts.end(), functor); + } + + template <class F> void map(F& functor) const { + for_each(parts.begin(), parts.end(), functor); + } + + template <class F, class P> void map_if(F& functor, P predicate) { + for(Frames::iterator i = parts.begin(); i != parts.end(); i++) { + if (predicate(*i)) functor(*i); + } + } + + template <class F, class P> void map_if(F& functor, P predicate) const { + for(Frames::const_iterator i = parts.begin(); i != parts.end(); i++) { + if (predicate(*i)) functor(*i); + } + } +}; + +} +} + + +#endif diff --git a/cpp/src/qpid/framing/ProtocolInitiation.cpp b/cpp/src/qpid/framing/ProtocolInitiation.cpp index a6d1b17f6e..7164bceb12 100644 --- a/cpp/src/qpid/framing/ProtocolInitiation.cpp +++ b/cpp/src/qpid/framing/ProtocolInitiation.cpp @@ -31,7 +31,7 @@ ProtocolInitiation::ProtocolInitiation(ProtocolVersion p) : version(p) {} ProtocolInitiation::~ProtocolInitiation(){} -void ProtocolInitiation::encode(Buffer& buffer){ +void ProtocolInitiation::encode(Buffer& buffer) const { buffer.putOctet('A'); buffer.putOctet('M'); buffer.putOctet('Q'); diff --git a/cpp/src/qpid/framing/ProtocolInitiation.h b/cpp/src/qpid/framing/ProtocolInitiation.h index adfdc8215d..31c73eb124 100644 --- a/cpp/src/qpid/framing/ProtocolInitiation.h +++ b/cpp/src/qpid/framing/ProtocolInitiation.h @@ -39,7 +39,7 @@ public: ProtocolInitiation(uint8_t major, uint8_t minor); ProtocolInitiation(ProtocolVersion p); virtual ~ProtocolInitiation(); - virtual void encode(Buffer& buffer); + virtual void encode(Buffer& buffer) const; virtual bool decode(Buffer& buffer); inline virtual uint32_t size() const { return 8; } inline uint8_t getMajor() const { return version.getMajor(); } diff --git a/cpp/src/qpid/framing/SendContent.cpp b/cpp/src/qpid/framing/SendContent.cpp new file mode 100644 index 0000000000..568cc01665 --- /dev/null +++ b/cpp/src/qpid/framing/SendContent.cpp @@ -0,0 +1,51 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "SendContent.h" + +qpid::framing::SendContent::SendContent(FrameHandler& h, uint16_t c, uint16_t mfs) : handler(h), channel(c), maxFrameSize(mfs) {} + +void qpid::framing::SendContent::operator()(AMQFrame& f) const +{ + uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); + const AMQContentBody* body(f.castBody<AMQContentBody>()); + if (body->size() > maxContentSize) { + uint32_t offset = 0; + for (int chunk = body->size() / maxContentSize; chunk > 0; chunk--) { + sendFragment(*body, offset, maxContentSize); + offset += maxContentSize; + } + uint32_t remainder = body->size() % maxContentSize; + if (remainder) { + sendFragment(*body, offset, remainder); + } + } else { + AMQFrame copy(f); + copy.setChannel(channel); + handler.handle(copy); + } +} + +void qpid::framing::SendContent::sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size) const +{ + AMQFrame fragment(channel, AMQContentBody(body.getData().substr(offset, size))); + handler.handle(fragment); +} diff --git a/cpp/src/qpid/framing/SendContent.h b/cpp/src/qpid/framing/SendContent.h new file mode 100644 index 0000000000..a88319e2f9 --- /dev/null +++ b/cpp/src/qpid/framing/SendContent.h @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include "qpid/framing/amqp_framing.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/FrameHandler.h" + +#ifndef _SendContent_ +#define _SendContent_ + +namespace qpid { +namespace framing { + +/** + * Functor that sends frame to handler, refragmenting if + * necessary. Currently only works on content frames but this could be + * changed once we support multi-frame segments in general. + */ +class SendContent +{ + mutable FrameHandler& handler; + const uint16_t channel; + const uint16_t maxFrameSize; + + void sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size) const; +public: + SendContent(FrameHandler& _handler, uint16_t channel, uint16_t _maxFrameSize); + void operator()(AMQFrame& f) const; +}; + +} +} + + +#endif diff --git a/cpp/src/qpid/framing/TypeFilter.h b/cpp/src/qpid/framing/TypeFilter.h new file mode 100644 index 0000000000..3a607190fd --- /dev/null +++ b/cpp/src/qpid/framing/TypeFilter.h @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include "qpid/framing/amqp_framing.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/FrameHandler.h" + +#ifndef _TypeFilter_ +#define _TypeFilter_ + +namespace qpid { +namespace framing { + +/** + * Predicate that selects frames by type + */ +class TypeFilter +{ + std::vector<uint8_t> types; +public: + TypeFilter(uint8_t type) { add(type); } + TypeFilter(uint8_t type1, uint8_t type2) { add(type1); add(type2); } + void add(uint8_t type) { types.push_back(type); } + bool operator()(const AMQFrame& f) const + { + return find(types.begin(), types.end(), f.getBody()->type()) != types.end(); + } +}; + +} +} + + +#endif diff --git a/cpp/src/qpid/framing/frame_functors.h b/cpp/src/qpid/framing/frame_functors.h new file mode 100644 index 0000000000..3112da8e24 --- /dev/null +++ b/cpp/src/qpid/framing/frame_functors.h @@ -0,0 +1,108 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <string> +#include <ostream> +#include <iostream> +#include "qpid/framing/amqp_framing.h" +#include "qpid/framing/AMQFrame.h" +#include "qpid/framing/Buffer.h" + +#ifndef _frame_functors_ +#define _frame_functors_ + +namespace qpid { +namespace framing { + +class SumFrameSize +{ + uint64_t size; +public: + SumFrameSize() : size(0) {} + void operator()(const AMQFrame& f) { size += f.size(); } + uint64_t getSize() { return size; } +}; + +class SumBodySize +{ + uint64_t size; +public: + SumBodySize() : size(0) {} + void operator()(const AMQFrame& f) { size += f.getBody()->size(); } + uint64_t getSize() { return size; } +}; + +class EncodeFrame +{ + Buffer& buffer; +public: + EncodeFrame(Buffer& b) : buffer(b) {} + void operator()(const AMQFrame& f) { f.encode(buffer); } +}; + +class EncodeBody +{ + Buffer& buffer; +public: + EncodeBody(Buffer& b) : buffer(b) {} + void operator()(const AMQFrame& f) { f.getBody()->encode(buffer); } +}; + +class AccumulateContent +{ + std::string& content; +public: + AccumulateContent(std::string& c) : content(c) {} + void operator()(const AMQFrame& f) { content += f.castBody<AMQContentBody>()->getData(); } +}; + +class Relay +{ + FrameHandler& handler; + const uint16_t channel; + +public: + Relay(FrameHandler& h, uint16_t c) : handler(h), channel(c) {} + + void operator()(AMQFrame& f) + { + AMQFrame copy(f); + copy.setChannel(channel); + handler.handle(copy); + } +}; + +class Print +{ + std::ostream& out; +public: + Print(std::ostream& o) : out(o) {} + + void operator()(const AMQFrame& f) + { + out << f << std::endl; + } +}; + +} +} + + +#endif |
