diff options
Diffstat (limited to 'cpp/src/qpid/framing')
| -rw-r--r-- | cpp/src/qpid/framing/AMQFrame.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/AMQFrame.h | 11 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/FrameSet.cpp | 19 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/FrameSet.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/SendContent.cpp | 26 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/SendContent.h | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/TransferContent.cpp | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/frame_functors.h | 9 |
8 files changed, 62 insertions, 23 deletions
diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp index 52425f28b7..736c3f08ef 100644 --- a/cpp/src/qpid/framing/AMQFrame.cpp +++ b/cpp/src/qpid/framing/AMQFrame.cpp @@ -156,7 +156,9 @@ void AMQFrame::decodeBody(Buffer& buffer, uint32_t size, uint8_t type) std::ostream& operator<<(std::ostream& out, const AMQFrame& f) { - return out << "Frame[channel=" << f.getChannel() << "; " << *f.getBody() + return out << "Frame[" + //<< "B=" << f.getBof() << "E=" << f.getEof() << "b=" << f.getBos() << "e=" << f.getEos() << "; " + << "channel=" << f.getChannel() << "; " << *f.getBody() << "]"; } diff --git a/cpp/src/qpid/framing/AMQFrame.h b/cpp/src/qpid/framing/AMQFrame.h index a96b0483b7..f4aec72e4c 100644 --- a/cpp/src/qpid/framing/AMQFrame.h +++ b/cpp/src/qpid/framing/AMQFrame.h @@ -74,6 +74,17 @@ class AMQFrame : public AMQDataBlock void encode(Buffer& buffer) const; bool decode(Buffer& buffer); uint32_t size() const; + + bool getBof() const { return bof; } + void setBof(bool isBof) { bof = isBof; } + bool getEof() const { return eof; } + void setEof(bool isEof) { eof = isEof; } + + bool getBos() const { return bos; } + void setBos(bool isBos) { bos = isBos; } + bool getEos() const { return eos; } + void setEos(bool isEos) { eos = isEos; } + static uint32_t frameOverhead(); private: diff --git a/cpp/src/qpid/framing/FrameSet.cpp b/cpp/src/qpid/framing/FrameSet.cpp index 12579f53cb..129219e0a1 100644 --- a/cpp/src/qpid/framing/FrameSet.cpp +++ b/cpp/src/qpid/framing/FrameSet.cpp @@ -38,20 +38,13 @@ void FrameSet::append(AMQFrame& part) bool FrameSet::isComplete() const { - //TODO: should eventually use the 0-10 frame header flags when available + return !parts.empty() && parts.back().getEof(); +} + +bool FrameSet::isContentBearing() const +{ const AMQMethodBody* method = getMethod(); - if (!method) { - return false; - } else if (method->isContentBearing()) { - const AMQHeaderBody* header = getHeaders(); - if (header) { - return header->getContentLength() == getContentSize(); - } else { - return false; - } - } else { - return true; - } + return method && method->isContentBearing(); } const AMQMethodBody* FrameSet::getMethod() const diff --git a/cpp/src/qpid/framing/FrameSet.h b/cpp/src/qpid/framing/FrameSet.h index 9a9512a6d4..8ba22f07cb 100644 --- a/cpp/src/qpid/framing/FrameSet.h +++ b/cpp/src/qpid/framing/FrameSet.h @@ -50,6 +50,8 @@ public: void getContent(std::string&) const; std::string getContent() const; + bool isContentBearing() const; + const AMQMethodBody* getMethod() const; const AMQHeaderBody* getHeaders() const; AMQHeaderBody* getHeaders(); diff --git a/cpp/src/qpid/framing/SendContent.cpp b/cpp/src/qpid/framing/SendContent.cpp index 568cc01665..573ebca9e2 100644 --- a/cpp/src/qpid/framing/SendContent.cpp +++ b/cpp/src/qpid/framing/SendContent.cpp @@ -21,31 +21,47 @@ #include "SendContent.h" -qpid::framing::SendContent::SendContent(FrameHandler& h, uint16_t c, uint16_t mfs) : handler(h), channel(c), maxFrameSize(mfs) {} +qpid::framing::SendContent::SendContent(FrameHandler& h, uint16_t c, uint16_t mfs, uint efc) : handler(h), channel(c), + maxFrameSize(mfs), + expectedFrameCount(efc), frameCount(0) {} -void qpid::framing::SendContent::operator()(AMQFrame& f) const +void qpid::framing::SendContent::operator()(const AMQFrame& f) { + bool first = frameCount == 0; + bool last = ++frameCount == expectedFrameCount; + uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); const AMQContentBody* body(f.castBody<AMQContentBody>()); if (body->size() > maxContentSize) { uint32_t offset = 0; for (int chunk = body->size() / maxContentSize; chunk > 0; chunk--) { - sendFragment(*body, offset, maxContentSize); + sendFragment(*body, offset, maxContentSize, first && offset == 0, last && offset + maxContentSize == body->size()); offset += maxContentSize; } uint32_t remainder = body->size() % maxContentSize; if (remainder) { - sendFragment(*body, offset, remainder); + sendFragment(*body, offset, remainder, first && offset == 0, last); } } else { AMQFrame copy(f); + setFlags(copy, first, last); copy.setChannel(channel); handler.handle(copy); } } -void qpid::framing::SendContent::sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size) const +void qpid::framing::SendContent::sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size, bool first, bool last) const { AMQFrame fragment(channel, AMQContentBody(body.getData().substr(offset, size))); + setFlags(fragment, first, last); handler.handle(fragment); } + +void qpid::framing::SendContent::setFlags(AMQFrame& f, bool first, bool last) const +{ + f.setBof(false); + f.setBos(first); + f.setEof(last); + f.setEos(last); +} + diff --git a/cpp/src/qpid/framing/SendContent.h b/cpp/src/qpid/framing/SendContent.h index a88319e2f9..05b5838c62 100644 --- a/cpp/src/qpid/framing/SendContent.h +++ b/cpp/src/qpid/framing/SendContent.h @@ -39,11 +39,14 @@ class SendContent mutable FrameHandler& handler; const uint16_t channel; const uint16_t maxFrameSize; + uint expectedFrameCount; + uint frameCount; - void sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size) const; + void sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size, bool first, bool last) const; + void setFlags(AMQFrame& f, bool first, bool last) const; public: - SendContent(FrameHandler& _handler, uint16_t channel, uint16_t _maxFrameSize); - void operator()(AMQFrame& f) const; + SendContent(FrameHandler& _handler, uint16_t channel, uint16_t _maxFrameSize, uint frameCount); + void operator()(const AMQFrame& f); }; } diff --git a/cpp/src/qpid/framing/TransferContent.cpp b/cpp/src/qpid/framing/TransferContent.cpp index 4c2d06ae42..e0372b2f68 100644 --- a/cpp/src/qpid/framing/TransferContent.cpp +++ b/cpp/src/qpid/framing/TransferContent.cpp @@ -63,7 +63,10 @@ DeliveryProperties& TransferContent::getDeliveryProperties() void TransferContent::populate(const FrameSet& frameset) { - header = *frameset.getHeaders(); + const AMQHeaderBody* h = frameset.getHeaders(); + if (h) { + header = *h; + } frameset.getContent(data); } diff --git a/cpp/src/qpid/framing/frame_functors.h b/cpp/src/qpid/framing/frame_functors.h index 3112da8e24..7b7e24b2b3 100644 --- a/cpp/src/qpid/framing/frame_functors.h +++ b/cpp/src/qpid/framing/frame_functors.h @@ -49,6 +49,15 @@ public: uint64_t getSize() { return size; } }; +class Count +{ + uint count; +public: + Count() : count(0) {} + void operator()(const AMQFrame&) { count++; } + uint getCount() { return count; } +}; + class EncodeFrame { Buffer& buffer; |
