summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/framing
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/framing')
-rw-r--r--cpp/src/qpid/framing/AMQFrame.cpp4
-rw-r--r--cpp/src/qpid/framing/AMQFrame.h11
-rw-r--r--cpp/src/qpid/framing/FrameSet.cpp19
-rw-r--r--cpp/src/qpid/framing/FrameSet.h2
-rw-r--r--cpp/src/qpid/framing/SendContent.cpp26
-rw-r--r--cpp/src/qpid/framing/SendContent.h9
-rw-r--r--cpp/src/qpid/framing/TransferContent.cpp5
-rw-r--r--cpp/src/qpid/framing/frame_functors.h9
8 files changed, 62 insertions, 23 deletions
diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp
index 52425f28b7..736c3f08ef 100644
--- a/cpp/src/qpid/framing/AMQFrame.cpp
+++ b/cpp/src/qpid/framing/AMQFrame.cpp
@@ -156,7 +156,9 @@ void AMQFrame::decodeBody(Buffer& buffer, uint32_t size, uint8_t type)
std::ostream& operator<<(std::ostream& out, const AMQFrame& f)
{
- return out << "Frame[channel=" << f.getChannel() << "; " << *f.getBody()
+ return out << "Frame["
+ //<< "B=" << f.getBof() << "E=" << f.getEof() << "b=" << f.getBos() << "e=" << f.getEos() << "; "
+ << "channel=" << f.getChannel() << "; " << *f.getBody()
<< "]";
}
diff --git a/cpp/src/qpid/framing/AMQFrame.h b/cpp/src/qpid/framing/AMQFrame.h
index a96b0483b7..f4aec72e4c 100644
--- a/cpp/src/qpid/framing/AMQFrame.h
+++ b/cpp/src/qpid/framing/AMQFrame.h
@@ -74,6 +74,17 @@ class AMQFrame : public AMQDataBlock
void encode(Buffer& buffer) const;
bool decode(Buffer& buffer);
uint32_t size() const;
+
+ bool getBof() const { return bof; }
+ void setBof(bool isBof) { bof = isBof; }
+ bool getEof() const { return eof; }
+ void setEof(bool isEof) { eof = isEof; }
+
+ bool getBos() const { return bos; }
+ void setBos(bool isBos) { bos = isBos; }
+ bool getEos() const { return eos; }
+ void setEos(bool isEos) { eos = isEos; }
+
static uint32_t frameOverhead();
private:
diff --git a/cpp/src/qpid/framing/FrameSet.cpp b/cpp/src/qpid/framing/FrameSet.cpp
index 12579f53cb..129219e0a1 100644
--- a/cpp/src/qpid/framing/FrameSet.cpp
+++ b/cpp/src/qpid/framing/FrameSet.cpp
@@ -38,20 +38,13 @@ void FrameSet::append(AMQFrame& part)
bool FrameSet::isComplete() const
{
- //TODO: should eventually use the 0-10 frame header flags when available
+ return !parts.empty() && parts.back().getEof();
+}
+
+bool FrameSet::isContentBearing() const
+{
const AMQMethodBody* method = getMethod();
- if (!method) {
- return false;
- } else if (method->isContentBearing()) {
- const AMQHeaderBody* header = getHeaders();
- if (header) {
- return header->getContentLength() == getContentSize();
- } else {
- return false;
- }
- } else {
- return true;
- }
+ return method && method->isContentBearing();
}
const AMQMethodBody* FrameSet::getMethod() const
diff --git a/cpp/src/qpid/framing/FrameSet.h b/cpp/src/qpid/framing/FrameSet.h
index 9a9512a6d4..8ba22f07cb 100644
--- a/cpp/src/qpid/framing/FrameSet.h
+++ b/cpp/src/qpid/framing/FrameSet.h
@@ -50,6 +50,8 @@ public:
void getContent(std::string&) const;
std::string getContent() const;
+ bool isContentBearing() const;
+
const AMQMethodBody* getMethod() const;
const AMQHeaderBody* getHeaders() const;
AMQHeaderBody* getHeaders();
diff --git a/cpp/src/qpid/framing/SendContent.cpp b/cpp/src/qpid/framing/SendContent.cpp
index 568cc01665..573ebca9e2 100644
--- a/cpp/src/qpid/framing/SendContent.cpp
+++ b/cpp/src/qpid/framing/SendContent.cpp
@@ -21,31 +21,47 @@
#include "SendContent.h"
-qpid::framing::SendContent::SendContent(FrameHandler& h, uint16_t c, uint16_t mfs) : handler(h), channel(c), maxFrameSize(mfs) {}
+qpid::framing::SendContent::SendContent(FrameHandler& h, uint16_t c, uint16_t mfs, uint efc) : handler(h), channel(c),
+ maxFrameSize(mfs),
+ expectedFrameCount(efc), frameCount(0) {}
-void qpid::framing::SendContent::operator()(AMQFrame& f) const
+void qpid::framing::SendContent::operator()(const AMQFrame& f)
{
+ bool first = frameCount == 0;
+ bool last = ++frameCount == expectedFrameCount;
+
uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead();
const AMQContentBody* body(f.castBody<AMQContentBody>());
if (body->size() > maxContentSize) {
uint32_t offset = 0;
for (int chunk = body->size() / maxContentSize; chunk > 0; chunk--) {
- sendFragment(*body, offset, maxContentSize);
+ sendFragment(*body, offset, maxContentSize, first && offset == 0, last && offset + maxContentSize == body->size());
offset += maxContentSize;
}
uint32_t remainder = body->size() % maxContentSize;
if (remainder) {
- sendFragment(*body, offset, remainder);
+ sendFragment(*body, offset, remainder, first && offset == 0, last);
}
} else {
AMQFrame copy(f);
+ setFlags(copy, first, last);
copy.setChannel(channel);
handler.handle(copy);
}
}
-void qpid::framing::SendContent::sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size) const
+void qpid::framing::SendContent::sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size, bool first, bool last) const
{
AMQFrame fragment(channel, AMQContentBody(body.getData().substr(offset, size)));
+ setFlags(fragment, first, last);
handler.handle(fragment);
}
+
+void qpid::framing::SendContent::setFlags(AMQFrame& f, bool first, bool last) const
+{
+ f.setBof(false);
+ f.setBos(first);
+ f.setEof(last);
+ f.setEos(last);
+}
+
diff --git a/cpp/src/qpid/framing/SendContent.h b/cpp/src/qpid/framing/SendContent.h
index a88319e2f9..05b5838c62 100644
--- a/cpp/src/qpid/framing/SendContent.h
+++ b/cpp/src/qpid/framing/SendContent.h
@@ -39,11 +39,14 @@ class SendContent
mutable FrameHandler& handler;
const uint16_t channel;
const uint16_t maxFrameSize;
+ uint expectedFrameCount;
+ uint frameCount;
- void sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size) const;
+ void sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size, bool first, bool last) const;
+ void setFlags(AMQFrame& f, bool first, bool last) const;
public:
- SendContent(FrameHandler& _handler, uint16_t channel, uint16_t _maxFrameSize);
- void operator()(AMQFrame& f) const;
+ SendContent(FrameHandler& _handler, uint16_t channel, uint16_t _maxFrameSize, uint frameCount);
+ void operator()(const AMQFrame& f);
};
}
diff --git a/cpp/src/qpid/framing/TransferContent.cpp b/cpp/src/qpid/framing/TransferContent.cpp
index 4c2d06ae42..e0372b2f68 100644
--- a/cpp/src/qpid/framing/TransferContent.cpp
+++ b/cpp/src/qpid/framing/TransferContent.cpp
@@ -63,7 +63,10 @@ DeliveryProperties& TransferContent::getDeliveryProperties()
void TransferContent::populate(const FrameSet& frameset)
{
- header = *frameset.getHeaders();
+ const AMQHeaderBody* h = frameset.getHeaders();
+ if (h) {
+ header = *h;
+ }
frameset.getContent(data);
}
diff --git a/cpp/src/qpid/framing/frame_functors.h b/cpp/src/qpid/framing/frame_functors.h
index 3112da8e24..7b7e24b2b3 100644
--- a/cpp/src/qpid/framing/frame_functors.h
+++ b/cpp/src/qpid/framing/frame_functors.h
@@ -49,6 +49,15 @@ public:
uint64_t getSize() { return size; }
};
+class Count
+{
+ uint count;
+public:
+ Count() : count(0) {}
+ void operator()(const AMQFrame&) { count++; }
+ uint getCount() { return count; }
+};
+
class EncodeFrame
{
Buffer& buffer;