summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/framing
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-08-28 19:38:17 +0000
committerGordon Sim <gsim@apache.org>2007-08-28 19:38:17 +0000
commit9e10f4ea3b2f8ab6650f635cada48e4735ca20d7 (patch)
tree26ad3b8dffa17fa665fe7a033a7c8092839df011 /cpp/src/qpid/framing
parent6b09696b216c090b512c6af92bf7976ae3407add (diff)
downloadqpid-python-9e10f4ea3b2f8ab6650f635cada48e4735ca20d7.tar.gz
Updated message.transfer encoding to use header and content segments (including new structs).
Unified more between the basic and message classes messages. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@570538 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/framing')
-rw-r--r--cpp/src/qpid/framing/AMQContentBody.h1
-rw-r--r--cpp/src/qpid/framing/AMQDataBlock.h2
-rw-r--r--cpp/src/qpid/framing/AMQFrame.cpp9
-rw-r--r--cpp/src/qpid/framing/AMQFrame.h19
-rw-r--r--cpp/src/qpid/framing/AMQHeaderBody.cpp68
-rw-r--r--cpp/src/qpid/framing/AMQHeaderBody.h95
-rw-r--r--cpp/src/qpid/framing/AMQMethodBody.h1
-rw-r--r--cpp/src/qpid/framing/BasicHeaderProperties.cpp35
-rw-r--r--cpp/src/qpid/framing/BasicHeaderProperties.h10
-rw-r--r--cpp/src/qpid/framing/ChannelAdapter.cpp2
-rw-r--r--cpp/src/qpid/framing/FrameSet.cpp83
-rw-r--r--cpp/src/qpid/framing/FrameSet.h102
-rw-r--r--cpp/src/qpid/framing/ProtocolInitiation.cpp2
-rw-r--r--cpp/src/qpid/framing/ProtocolInitiation.h2
-rw-r--r--cpp/src/qpid/framing/SendContent.cpp51
-rw-r--r--cpp/src/qpid/framing/SendContent.h53
-rw-r--r--cpp/src/qpid/framing/TypeFilter.h52
-rw-r--r--cpp/src/qpid/framing/frame_functors.h108
18 files changed, 645 insertions, 50 deletions
diff --git a/cpp/src/qpid/framing/AMQContentBody.h b/cpp/src/qpid/framing/AMQContentBody.h
index dd4ab10d7d..5d530a1b9a 100644
--- a/cpp/src/qpid/framing/AMQContentBody.h
+++ b/cpp/src/qpid/framing/AMQContentBody.h
@@ -38,6 +38,7 @@ public:
inline virtual ~AMQContentBody(){}
inline uint8_t type() const { return CONTENT_BODY; };
inline const string& getData() const { return data; }
+ inline string& getData() { return data; }
uint32_t size() const;
void encode(Buffer& buffer) const;
void decode(Buffer& buffer, uint32_t size);
diff --git a/cpp/src/qpid/framing/AMQDataBlock.h b/cpp/src/qpid/framing/AMQDataBlock.h
index 6ff61b185e..9b6fdfd966 100644
--- a/cpp/src/qpid/framing/AMQDataBlock.h
+++ b/cpp/src/qpid/framing/AMQDataBlock.h
@@ -30,7 +30,7 @@ class AMQDataBlock
{
public:
virtual ~AMQDataBlock() {}
- virtual void encode(Buffer& buffer) = 0;
+ virtual void encode(Buffer& buffer) const = 0;
virtual bool decode(Buffer& buffer) = 0;
virtual uint32_t size() const = 0;
};
diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp
index 780af71be4..a7fd068ee4 100644
--- a/cpp/src/qpid/framing/AMQFrame.cpp
+++ b/cpp/src/qpid/framing/AMQFrame.cpp
@@ -70,7 +70,7 @@ const AMQBody* AMQFrame::getBody() const {
return boost::apply_visitor(GetBodyVisitor(), const_cast<Variant&>(body));
}
-void AMQFrame::encode(Buffer& buffer)
+void AMQFrame::encode(Buffer& buffer) const
{
buffer.putOctet(getBody()->type());
buffer.putShort(channel);
@@ -80,8 +80,11 @@ void AMQFrame::encode(Buffer& buffer)
}
uint32_t AMQFrame::size() const{
- return 1/*type*/ + 2/*channel*/ + 4/*body size*/ +
- boost::apply_visitor(SizeVisitor(), body) + 1/*0xCE*/;
+ return frameOverhead() + boost::apply_visitor(SizeVisitor(), body);
+}
+
+uint32_t AMQFrame::frameOverhead() {
+ return 1/*type*/ + 2/*channel*/ + 4/*body size*/ + 1/*0xCE*/;
}
bool AMQFrame::decode(Buffer& buffer)
diff --git a/cpp/src/qpid/framing/AMQFrame.h b/cpp/src/qpid/framing/AMQFrame.h
index 9e825a9936..84e7660218 100644
--- a/cpp/src/qpid/framing/AMQFrame.h
+++ b/cpp/src/qpid/framing/AMQFrame.h
@@ -37,14 +37,14 @@ namespace framing {
class AMQFrame : public AMQDataBlock
{
public:
- AMQFrame(ProtocolVersion=ProtocolVersion()) {}
+ AMQFrame() : channel(0) {}
/** Construct a frame with a copy of b */
- AMQFrame(ProtocolVersion, ChannelId c, const AMQBody* b) : channel(c) {
+ AMQFrame(ChannelId c, const AMQBody* b) : channel(c) {
setBody(*b);
}
- AMQFrame(ProtocolVersion, ChannelId c, const AMQBody& b) : channel(c) {
+ AMQFrame(ChannelId c, const AMQBody& b) : channel(c) {
setBody(b);
}
@@ -52,21 +52,26 @@ class AMQFrame : public AMQDataBlock
void setChannel(ChannelId c) { channel = c; }
AMQBody* getBody();
- const AMQBody* getBody() const;
+ const AMQBody* getBody() const;
/** Copy a body instance to the frame */
void setBody(const AMQBody& b) { CopyVisitor cv(*this); b.accept(cv); }
/** Convenience template to cast the body to an expected type. */
template <class T> T* castBody() {
- boost::polymorphic_downcast<T*>(getBody());
+ return boost::polymorphic_downcast<T*>(getBody());
+ }
+
+ template <class T> const T* castBody() const {
+ return boost::polymorphic_downcast<const T*>(getBody());
}
bool empty() { return boost::get<boost::blank>(&body); }
- void encode(Buffer& buffer);
+ void encode(Buffer& buffer) const;
bool decode(Buffer& buffer);
uint32_t size() const;
+ static uint32_t frameOverhead();
private:
struct CopyVisitor : public AMQBodyConstVisitor {
@@ -77,7 +82,7 @@ class AMQFrame : public AMQDataBlock
void visit(const AMQHeartbeatBody& x) { frame.body=x; }
void visit(const AMQMethodBody& x) { frame.body=MethodHolder(x); }
};
- friend struct CopyVisitor;
+ friend struct CopyVisitor;
typedef boost::variant<boost::blank,
AMQHeaderBody,
diff --git a/cpp/src/qpid/framing/AMQHeaderBody.cpp b/cpp/src/qpid/framing/AMQHeaderBody.cpp
index 6a3c8f27d1..7083709fde 100644
--- a/cpp/src/qpid/framing/AMQHeaderBody.cpp
+++ b/cpp/src/qpid/framing/AMQHeaderBody.cpp
@@ -19,37 +19,65 @@
*
*/
#include "AMQHeaderBody.h"
-#include "qpid/QpidError.h"
-#include "BasicHeaderProperties.h"
+#include "qpid/Exception.h"
+#include "qpid/log/Statement.h"
-qpid::framing::AMQHeaderBody::AMQHeaderBody(int) : weight(0), contentSize(0) {}
+qpid::framing::AMQHeaderBody::AMQHeaderBody() {}
-qpid::framing::AMQHeaderBody::AMQHeaderBody() : weight(0), contentSize(0){}
-
-qpid::framing::AMQHeaderBody::~AMQHeaderBody(){}
+qpid::framing::AMQHeaderBody::~AMQHeaderBody() {}
uint32_t qpid::framing::AMQHeaderBody::size() const{
- return 12 + properties.size();
+ CalculateSize visitor;
+ for_each(properties.begin(), properties.end(), boost::apply_visitor(visitor));
+ return visitor.totalSize() + (properties.size() * (2/*type codes*/ + 4/*size*/));
}
void qpid::framing::AMQHeaderBody::encode(Buffer& buffer) const {
- buffer.putShort(properties.classId());
- buffer.putShort(weight);
- buffer.putLongLong(contentSize);
- properties.encode(buffer);
+ Encode visitor(buffer);
+ for_each(properties.begin(), properties.end(), boost::apply_visitor(visitor));
+}
+
+void qpid::framing::AMQHeaderBody::decode(Buffer& buffer, uint32_t size){
+ uint32_t limit = buffer.available() - size;
+ while (buffer.available() > limit + 2) {
+ uint32_t len = buffer.getLong();
+ uint16_t type = buffer.getShort();
+ //The following switch could be generated as the number of options increases:
+ switch(type) {
+ case BasicHeaderProperties::TYPE:
+ decode(BasicHeaderProperties(), buffer, len - 2);
+ break;
+ case MessageProperties::TYPE:
+ decode(MessageProperties(), buffer, len - 2);
+ break;
+ case DeliveryProperties::TYPE:
+ decode(DeliveryProperties(), buffer, len - 2);
+ break;
+ default:
+ //TODO: should just skip over them keeping them for later dispatch as is
+ throw Exception(QPID_MSG("Unexpected property type: " << type));
+ }
+ }
}
-void qpid::framing::AMQHeaderBody::decode(Buffer& buffer, uint32_t bufSize){
- buffer.getShort(); // Ignore classId
- weight = buffer.getShort();
- contentSize = buffer.getLongLong();
- properties.decode(buffer, bufSize - 12);
+uint64_t qpid::framing::AMQHeaderBody::getContentLength() const
+{
+ const MessageProperties* mProps = get<MessageProperties>();
+ if (mProps) {
+ return mProps->getContentLength();
+ }
+ const BasicHeaderProperties* bProps = get<BasicHeaderProperties>();
+ if (bProps) {
+ return bProps->getContentLength();
+ }
+ return 0;
}
void qpid::framing::AMQHeaderBody::print(std::ostream& out) const
{
- out << "header (" << size() << " bytes)" << " content_size=" << getContentSize();
- out << ", message_id=" << properties.getMessageId();
- out << ", delivery_mode=" << (int) properties.getDeliveryMode();
- out << ", headers=" << properties.getHeaders();
+ out << "header (" << size() << " bytes)";
+ out << "; properties={";
+ Print visitor(out);
+ for_each(properties.begin(), properties.end(), boost::apply_visitor(visitor));
+ out << "}";
}
diff --git a/cpp/src/qpid/framing/AMQHeaderBody.h b/cpp/src/qpid/framing/AMQHeaderBody.h
index 894936060c..76bd60559e 100644
--- a/cpp/src/qpid/framing/AMQHeaderBody.h
+++ b/cpp/src/qpid/framing/AMQHeaderBody.h
@@ -22,6 +22,12 @@
#include "AMQBody.h"
#include "Buffer.h"
#include "BasicHeaderProperties.h"
+#include "qpid/framing/DeliveryProperties.h"
+#include "qpid/framing/MessageProperties.h"
+#include <iostream>
+#include <vector>
+#include <boost/variant.hpp>
+#include <boost/variant/get.hpp>
#ifndef _AMQHeaderBody_
#define _AMQHeaderBody_
@@ -31,24 +37,85 @@ namespace framing {
class AMQHeaderBody : public AMQBody
{
- BasicHeaderProperties properties;
- uint16_t weight;
- uint64_t contentSize;
- public:
- AMQHeaderBody(int classId);
+ typedef std::vector< boost::variant<BasicHeaderProperties, DeliveryProperties, MessageProperties> > PropertyList;
+
+ PropertyList properties;
+
+ template <class T> void decode(T t, Buffer& b, uint32_t size) {
+ t.decode(b, size);
+ properties.push_back(t);
+ }
+
+ class Encode : public boost::static_visitor<> {
+ Buffer& buffer;
+ public:
+ Encode(Buffer& b) : buffer(b) {}
+
+ template <class T> void operator()(T& t) const {
+ buffer.putLong(t.size() + 2/*typecode*/);
+ buffer.putShort(T::TYPE);
+ t.encode(buffer);
+ }
+ };
+
+ class CalculateSize : public boost::static_visitor<> {
+ uint32_t size;
+ public:
+ CalculateSize() : size(0) {}
+
+ template <class T> void operator()(T& t) {
+ size += t.size();
+ }
+
+ uint32_t totalSize() {
+ return size;
+ }
+ };
+
+ class Print : public boost::static_visitor<> {
+ std::ostream& out;
+ public:
+ Print(std::ostream& o) : out(o) {}
+
+ template <class T> void operator()(T& t) {
+ out << t;
+ }
+ };
+
+public:
+
AMQHeaderBody();
+ ~AMQHeaderBody();
inline uint8_t type() const { return HEADER_BODY; }
- BasicHeaderProperties* getProperties(){ return &properties; }
- const BasicHeaderProperties* getProperties() const { return &properties; }
- inline uint64_t getContentSize() const { return contentSize; }
- inline void setContentSize(uint64_t _size) { contentSize = _size; }
- virtual ~AMQHeaderBody();
- virtual uint32_t size() const;
- virtual void encode(Buffer& buffer) const;
- virtual void decode(Buffer& buffer, uint32_t size);
- virtual void print(std::ostream& out) const;
+
+ uint32_t size() const;
+ void encode(Buffer& buffer) const;
+ void decode(Buffer& buffer, uint32_t size);
+ uint64_t getContentLength() const;
+ void print(std::ostream& out) const;
void accept(AMQBodyConstVisitor& v) const { v.visit(*this); }
+
+ template <class T> T* get(bool create) {
+ for (PropertyList::iterator i = properties.begin(); i != properties.end(); i++) {
+ T* p = boost::get<T>(&(*i));
+ if (p) return p;
+ }
+ if (create) {
+ properties.push_back(T());
+ return boost::get<T>(&(properties.back()));
+ } else {
+ return 0;
+ }
+ }
+
+ template <class T> const T* get() const {
+ for (PropertyList::const_iterator i = properties.begin(); i != properties.end(); i++) {
+ const T* p = boost::get<T>(&(*i));
+ if (p) return p;
+ }
+ return 0;
+ }
};
}
diff --git a/cpp/src/qpid/framing/AMQMethodBody.h b/cpp/src/qpid/framing/AMQMethodBody.h
index 5acb3a7b66..a5c14a37e9 100644
--- a/cpp/src/qpid/framing/AMQMethodBody.h
+++ b/cpp/src/qpid/framing/AMQMethodBody.h
@@ -49,6 +49,7 @@ class AMQMethodBody : public AMQBody {
virtual MethodId amqpMethodId() const = 0;
virtual ClassId amqpClassId() const = 0;
+ virtual bool isContentBearing() const = 0;
void invoke(AMQP_ServerOperations&);
bool invoke(Invocable*);
diff --git a/cpp/src/qpid/framing/BasicHeaderProperties.cpp b/cpp/src/qpid/framing/BasicHeaderProperties.cpp
index dfa5e1bc3f..7d933d0db8 100644
--- a/cpp/src/qpid/framing/BasicHeaderProperties.cpp
+++ b/cpp/src/qpid/framing/BasicHeaderProperties.cpp
@@ -22,7 +22,10 @@
//TODO: This could be easily generated from the spec
-qpid::framing::BasicHeaderProperties::BasicHeaderProperties() : deliveryMode(DeliveryMode(0)), priority(0), timestamp(0){}
+qpid::framing::BasicHeaderProperties::BasicHeaderProperties() : deliveryMode(DeliveryMode(0)),
+ priority(0),
+ timestamp(0),
+ contentLength(0){}
qpid::framing::BasicHeaderProperties::~BasicHeaderProperties(){}
uint32_t qpid::framing::BasicHeaderProperties::size() const{
@@ -41,6 +44,7 @@ uint32_t qpid::framing::BasicHeaderProperties::size() const{
if(userId.length() > 0) bytes += userId.length() + 1;
if(appId.length() > 0) bytes += appId.length() + 1;
if(clusterId.length() > 0) bytes += clusterId.length() + 1;
+ if(contentLength != 0) bytes += 8;
return bytes;
}
@@ -63,6 +67,7 @@ void qpid::framing::BasicHeaderProperties::encode(qpid::framing::Buffer& buffer)
if(userId.length() > 0) buffer.putShortString(userId);
if(appId.length() > 0) buffer.putShortString(appId);
if(clusterId.length() > 0) buffer.putShortString(clusterId);
+ if(contentLength != 0) buffer.putLongLong(contentLength);
}
void qpid::framing::BasicHeaderProperties::decode(qpid::framing::Buffer& buffer, uint32_t /*size*/){
@@ -81,6 +86,7 @@ void qpid::framing::BasicHeaderProperties::decode(qpid::framing::Buffer& buffer,
if(flags & (1 << 4)) buffer.getShortString(userId);
if(flags & (1 << 3)) buffer.getShortString(appId);
if(flags & (1 << 2)) buffer.getShortString(clusterId);
+ if(flags & (1 << 1)) contentLength = buffer.getLongLong();
}
uint16_t qpid::framing::BasicHeaderProperties::getFlags() const{
@@ -99,5 +105,32 @@ uint16_t qpid::framing::BasicHeaderProperties::getFlags() const{
if(userId.length() > 0) flags |= (1 << 4);
if(appId.length() > 0) flags |= (1 << 3);
if(clusterId.length() > 0) flags |= (1 << 2);
+ if(contentLength != 0) flags |= (1 << 1);
return flags;
}
+
+namespace qpid{
+namespace framing{
+
+ std::ostream& operator<<(std::ostream& out, const BasicHeaderProperties& props)
+ {
+ if(props.contentType.length() > 0) out << "contentType=" << props.contentType << ";";
+ if(props.contentEncoding.length() > 0) out << "contentEncoding=" << props.contentEncoding << ";";
+ if(props.headers.count() > 0) out << "headers=" << props.headers << ";";
+ if(props.deliveryMode != 0) out << "deliveryMode=" << props.deliveryMode << ";";
+ if(props.priority != 0) out << "priority=" << props.priority << ";";
+ if(props.correlationId.length() > 0) out << "correlationId=" << props.correlationId << ";";
+ if(props.replyTo.length() > 0) out << "replyTo=" << props.replyTo << ";";
+ if(props.expiration.length() > 0) out << "expiration=" << props.expiration << ";";
+ if(props.messageId.length() > 0) out << "messageId=" << props.messageId << ";";
+ if(props.timestamp != 0) out << "timestamp=" << props.timestamp << ";";
+ if(props.type.length() > 0) out << "type=" << props.type << ";";
+ if(props.userId.length() > 0) out << "userId=" << props.userId << ";";
+ if(props.appId.length() > 0) out << "appId=" << props.appId << ";";
+ if(props.clusterId.length() > 0) out << "clusterId=" << props.clusterId << ";";
+ if(props.contentLength != 0) out << "contentLength=" << props.contentLength << ";";
+
+ return out;
+ }
+
+}}
diff --git a/cpp/src/qpid/framing/BasicHeaderProperties.h b/cpp/src/qpid/framing/BasicHeaderProperties.h
index a8ef401b50..d6c71437fb 100644
--- a/cpp/src/qpid/framing/BasicHeaderProperties.h
+++ b/cpp/src/qpid/framing/BasicHeaderProperties.h
@@ -47,15 +47,18 @@ class BasicHeaderProperties : public HeaderProperties
string userId;
string appId;
string clusterId;
+ uint64_t contentLength;
uint16_t getFlags() const;
public:
+ static const uint16_t TYPE = BASIC;
+
BasicHeaderProperties();
virtual ~BasicHeaderProperties();
virtual uint32_t size() const;
virtual void encode(Buffer& buffer) const;
- virtual void decode(Buffer& buffer, uint32_t size);
+ virtual void decode(Buffer& buffer, uint32_t size = 0);
virtual uint8_t classId() const { return BASIC; }
@@ -74,6 +77,7 @@ class BasicHeaderProperties : public HeaderProperties
string getUserId() const { return userId; }
string getAppId() const { return appId; }
string getClusterId() const { return clusterId; }
+ uint64_t getContentLength() const { return contentLength; }
void setContentType(const string& _type){ contentType = _type; }
void setContentEncoding(const string& encoding){ contentEncoding = encoding; }
@@ -89,6 +93,9 @@ class BasicHeaderProperties : public HeaderProperties
void setUserId(const string& _userId){ userId = _userId; }
void setAppId(const string& _appId){appId = _appId; }
void setClusterId(const string& _clusterId){ clusterId = _clusterId; }
+ void setContentLength(uint64_t _contentLength){ contentLength = _contentLength; }
+
+ friend std::ostream& operator<<(std::ostream&, const BasicHeaderProperties&);
/** \internal
* Template to copy between types like BasicHeaderProperties.
@@ -109,6 +116,7 @@ class BasicHeaderProperties : public HeaderProperties
to.setUserId(from.getUserId());
to.setAppId(from.getAppId());
to.setClusterId(from.getClusterId());
+ to.setContentLength(from.getContentLength());
}
};
}}
diff --git a/cpp/src/qpid/framing/ChannelAdapter.cpp b/cpp/src/qpid/framing/ChannelAdapter.cpp
index 25ff46acdd..86b60d896b 100644
--- a/cpp/src/qpid/framing/ChannelAdapter.cpp
+++ b/cpp/src/qpid/framing/ChannelAdapter.cpp
@@ -51,7 +51,7 @@ void ChannelAdapter::init(ChannelId i, OutputHandler& out, ProtocolVersion v)
void ChannelAdapter::send(const AMQBody& body)
{
assertChannelOpen();
- AMQFrame frame(getVersion(), getId(), body);
+ AMQFrame frame(getId(), body);
handlers.out->handle(frame);
}
diff --git a/cpp/src/qpid/framing/FrameSet.cpp b/cpp/src/qpid/framing/FrameSet.cpp
new file mode 100644
index 0000000000..434f1b3aad
--- /dev/null
+++ b/cpp/src/qpid/framing/FrameSet.cpp
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "FrameSet.h"
+#include "qpid/framing/all_method_bodies.h"
+#include "qpid/framing/frame_functors.h"
+#include "qpid/framing/BasicHeaderProperties.h"
+#include "qpid/framing/MessageProperties.h"
+#include "qpid/framing/TypeFilter.h"
+
+using namespace qpid::framing;
+using namespace boost;
+
+FrameSet::FrameSet(const SequenceNumber& _id) : id(_id) {}
+
+void FrameSet::append(AMQFrame& part)
+{
+ parts.push_back(part);
+}
+
+bool FrameSet::isComplete() const
+{
+ //TODO: should eventually use the 0-10 frame header flags when available
+ const AMQMethodBody* method = getMethod();
+ if (!method) {
+ return false;
+ } else if (method->isContentBearing()) {
+ const AMQHeaderBody* header = getHeaders();
+ if (header) {
+ return header->getContentLength() == getContentSize();
+ } else {
+ return false;
+ }
+ } else {
+ return true;
+ }
+}
+
+const AMQMethodBody* FrameSet::getMethod() const
+{
+ return parts.empty() ? 0 : dynamic_cast<const AMQMethodBody*>(parts[0].getBody());
+}
+
+const AMQHeaderBody* FrameSet::getHeaders() const
+{
+ return parts.size() < 2 ? 0 : dynamic_cast<const AMQHeaderBody*>(parts[1].getBody());
+}
+
+AMQHeaderBody* FrameSet::getHeaders()
+{
+ return parts.size() < 2 ? 0 : dynamic_cast<AMQHeaderBody*>(parts[1].getBody());
+}
+
+uint64_t FrameSet::getContentSize() const
+{
+ SumBodySize sum;
+ map_if(sum, TypeFilter(CONTENT_BODY));
+ return sum.getSize();
+}
+
+void FrameSet::getContent(std::string& out) const
+{
+ AccumulateContent accumulator(out);
+ map_if(accumulator, TypeFilter(CONTENT_BODY));
+}
diff --git a/cpp/src/qpid/framing/FrameSet.h b/cpp/src/qpid/framing/FrameSet.h
new file mode 100644
index 0000000000..d6d5cd7a13
--- /dev/null
+++ b/cpp/src/qpid/framing/FrameSet.h
@@ -0,0 +1,102 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include <vector>
+#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/SequenceNumber.h"
+
+#ifndef _FrameSet_
+#define _FrameSet_
+
+namespace qpid {
+namespace framing {
+
+/**
+ * Collects the frames representing a message.
+ */
+class FrameSet
+{
+ typedef std::vector<AMQFrame> Frames;
+ const SequenceNumber id;
+ Frames parts;
+
+public:
+ typedef boost::shared_ptr<FrameSet> shared_ptr;
+
+ FrameSet(const SequenceNumber& id);
+ void append(AMQFrame& part);
+ bool isComplete() const;
+
+ uint64_t getContentSize() const;
+ void getContent(std::string&) const;
+
+ const AMQMethodBody* getMethod() const;
+ const AMQHeaderBody* getHeaders() const;
+ AMQHeaderBody* getHeaders();
+
+ template <class T> bool isA() const {
+ const AMQMethodBody* method = getMethod();
+ return method && method->isA<T>();
+ }
+
+ template <class T> const T* as() const {
+ const AMQMethodBody* method = getMethod();
+ return (method && method->isA<T>()) ? dynamic_cast<const T*>(method) : 0;
+ }
+
+ template <class T> const T* getHeaderProperties() const {
+ const AMQHeaderBody* header = getHeaders();
+ return header ? header->get<T>() : 0;
+ }
+
+ const SequenceNumber& getId() const { return id; }
+
+ template <class P> void remove(P predicate) {
+ parts.erase(remove_if(parts.begin(), parts.end(), predicate), parts.end());
+ }
+
+ template <class F> void map(F& functor) {
+ for_each(parts.begin(), parts.end(), functor);
+ }
+
+ template <class F> void map(F& functor) const {
+ for_each(parts.begin(), parts.end(), functor);
+ }
+
+ template <class F, class P> void map_if(F& functor, P predicate) {
+ for(Frames::iterator i = parts.begin(); i != parts.end(); i++) {
+ if (predicate(*i)) functor(*i);
+ }
+ }
+
+ template <class F, class P> void map_if(F& functor, P predicate) const {
+ for(Frames::const_iterator i = parts.begin(); i != parts.end(); i++) {
+ if (predicate(*i)) functor(*i);
+ }
+ }
+};
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/framing/ProtocolInitiation.cpp b/cpp/src/qpid/framing/ProtocolInitiation.cpp
index a6d1b17f6e..7164bceb12 100644
--- a/cpp/src/qpid/framing/ProtocolInitiation.cpp
+++ b/cpp/src/qpid/framing/ProtocolInitiation.cpp
@@ -31,7 +31,7 @@ ProtocolInitiation::ProtocolInitiation(ProtocolVersion p) : version(p) {}
ProtocolInitiation::~ProtocolInitiation(){}
-void ProtocolInitiation::encode(Buffer& buffer){
+void ProtocolInitiation::encode(Buffer& buffer) const {
buffer.putOctet('A');
buffer.putOctet('M');
buffer.putOctet('Q');
diff --git a/cpp/src/qpid/framing/ProtocolInitiation.h b/cpp/src/qpid/framing/ProtocolInitiation.h
index adfdc8215d..31c73eb124 100644
--- a/cpp/src/qpid/framing/ProtocolInitiation.h
+++ b/cpp/src/qpid/framing/ProtocolInitiation.h
@@ -39,7 +39,7 @@ public:
ProtocolInitiation(uint8_t major, uint8_t minor);
ProtocolInitiation(ProtocolVersion p);
virtual ~ProtocolInitiation();
- virtual void encode(Buffer& buffer);
+ virtual void encode(Buffer& buffer) const;
virtual bool decode(Buffer& buffer);
inline virtual uint32_t size() const { return 8; }
inline uint8_t getMajor() const { return version.getMajor(); }
diff --git a/cpp/src/qpid/framing/SendContent.cpp b/cpp/src/qpid/framing/SendContent.cpp
new file mode 100644
index 0000000000..568cc01665
--- /dev/null
+++ b/cpp/src/qpid/framing/SendContent.cpp
@@ -0,0 +1,51 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "SendContent.h"
+
+qpid::framing::SendContent::SendContent(FrameHandler& h, uint16_t c, uint16_t mfs) : handler(h), channel(c), maxFrameSize(mfs) {}
+
+void qpid::framing::SendContent::operator()(AMQFrame& f) const
+{
+ uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
+ const AMQContentBody* body(f.castBody<AMQContentBody>());
+ if (body->size() > maxContentSize) {
+ uint32_t offset = 0;
+ for (int chunk = body->size() / maxContentSize; chunk > 0; chunk--) {
+ sendFragment(*body, offset, maxContentSize);
+ offset += maxContentSize;
+ }
+ uint32_t remainder = body->size() % maxContentSize;
+ if (remainder) {
+ sendFragment(*body, offset, remainder);
+ }
+ } else {
+ AMQFrame copy(f);
+ copy.setChannel(channel);
+ handler.handle(copy);
+ }
+}
+
+void qpid::framing::SendContent::sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size) const
+{
+ AMQFrame fragment(channel, AMQContentBody(body.getData().substr(offset, size)));
+ handler.handle(fragment);
+}
diff --git a/cpp/src/qpid/framing/SendContent.h b/cpp/src/qpid/framing/SendContent.h
new file mode 100644
index 0000000000..a88319e2f9
--- /dev/null
+++ b/cpp/src/qpid/framing/SendContent.h
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FrameHandler.h"
+
+#ifndef _SendContent_
+#define _SendContent_
+
+namespace qpid {
+namespace framing {
+
+/**
+ * Functor that sends frame to handler, refragmenting if
+ * necessary. Currently only works on content frames but this could be
+ * changed once we support multi-frame segments in general.
+ */
+class SendContent
+{
+ mutable FrameHandler& handler;
+ const uint16_t channel;
+ const uint16_t maxFrameSize;
+
+ void sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size) const;
+public:
+ SendContent(FrameHandler& _handler, uint16_t channel, uint16_t _maxFrameSize);
+ void operator()(AMQFrame& f) const;
+};
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/framing/TypeFilter.h b/cpp/src/qpid/framing/TypeFilter.h
new file mode 100644
index 0000000000..3a607190fd
--- /dev/null
+++ b/cpp/src/qpid/framing/TypeFilter.h
@@ -0,0 +1,52 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FrameHandler.h"
+
+#ifndef _TypeFilter_
+#define _TypeFilter_
+
+namespace qpid {
+namespace framing {
+
+/**
+ * Predicate that selects frames by type
+ */
+class TypeFilter
+{
+ std::vector<uint8_t> types;
+public:
+ TypeFilter(uint8_t type) { add(type); }
+ TypeFilter(uint8_t type1, uint8_t type2) { add(type1); add(type2); }
+ void add(uint8_t type) { types.push_back(type); }
+ bool operator()(const AMQFrame& f) const
+ {
+ return find(types.begin(), types.end(), f.getBody()->type()) != types.end();
+ }
+};
+
+}
+}
+
+
+#endif
diff --git a/cpp/src/qpid/framing/frame_functors.h b/cpp/src/qpid/framing/frame_functors.h
new file mode 100644
index 0000000000..3112da8e24
--- /dev/null
+++ b/cpp/src/qpid/framing/frame_functors.h
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <string>
+#include <ostream>
+#include <iostream>
+#include "qpid/framing/amqp_framing.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/Buffer.h"
+
+#ifndef _frame_functors_
+#define _frame_functors_
+
+namespace qpid {
+namespace framing {
+
+class SumFrameSize
+{
+ uint64_t size;
+public:
+ SumFrameSize() : size(0) {}
+ void operator()(const AMQFrame& f) { size += f.size(); }
+ uint64_t getSize() { return size; }
+};
+
+class SumBodySize
+{
+ uint64_t size;
+public:
+ SumBodySize() : size(0) {}
+ void operator()(const AMQFrame& f) { size += f.getBody()->size(); }
+ uint64_t getSize() { return size; }
+};
+
+class EncodeFrame
+{
+ Buffer& buffer;
+public:
+ EncodeFrame(Buffer& b) : buffer(b) {}
+ void operator()(const AMQFrame& f) { f.encode(buffer); }
+};
+
+class EncodeBody
+{
+ Buffer& buffer;
+public:
+ EncodeBody(Buffer& b) : buffer(b) {}
+ void operator()(const AMQFrame& f) { f.getBody()->encode(buffer); }
+};
+
+class AccumulateContent
+{
+ std::string& content;
+public:
+ AccumulateContent(std::string& c) : content(c) {}
+ void operator()(const AMQFrame& f) { content += f.castBody<AMQContentBody>()->getData(); }
+};
+
+class Relay
+{
+ FrameHandler& handler;
+ const uint16_t channel;
+
+public:
+ Relay(FrameHandler& h, uint16_t c) : handler(h), channel(c) {}
+
+ void operator()(AMQFrame& f)
+ {
+ AMQFrame copy(f);
+ copy.setChannel(channel);
+ handler.handle(copy);
+ }
+};
+
+class Print
+{
+ std::ostream& out;
+public:
+ Print(std::ostream& o) : out(o) {}
+
+ void operator()(const AMQFrame& f)
+ {
+ out << f << std::endl;
+ }
+};
+
+}
+}
+
+
+#endif