diff options
author | Gordon Sim <gsim@apache.org> | 2013-04-29 18:04:17 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2013-04-29 18:04:17 +0000 |
commit | a29b83f08ed76102923a9304dc8c89c68faca97c (patch) | |
tree | 05cadd4783165344d510c0c6d6f69f933b36455f | |
parent | fe17c41b0b6b3e31ad575dfd2ff9a4b007650dbe (diff) | |
download | qpid-python-a29b83f08ed76102923a9304dc8c89c68faca97c.tar.gz |
QPID-4339: simple paged queue implementation
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1477236 13f79535-47bb-0310-9956-ffa450edef68
23 files changed, 900 insertions, 47 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt index f3d7f5eed0..1fe2198ea6 100644 --- a/cpp/src/CMakeLists.txt +++ b/cpp/src/CMakeLists.txt @@ -748,6 +748,7 @@ if (CMAKE_SYSTEM_NAME STREQUAL Windows) qpid/sys/windows/IocpPoller.cpp qpid/sys/windows/IOHandle.cpp qpid/sys/windows/LockFile.cpp + qpid/sys/windows/MemoryMappedFile.cpp qpid/sys/windows/PipeHandle.cpp qpid/sys/windows/PollableCondition.cpp qpid/sys/windows/Shlib.cpp @@ -849,6 +850,7 @@ else (CMAKE_SYSTEM_NAME STREQUAL Windows) qpid/sys/posix/FileSysDir.cpp qpid/sys/posix/IOHandle.cpp qpid/sys/posix/LockFile.cpp + qpid/sys/posix/MemoryMappedFile.cpp qpid/sys/posix/Mutex.cpp qpid/sys/posix/PipeHandle.cpp qpid/sys/posix/PollableCondition.cpp @@ -1196,6 +1198,7 @@ set (qpidbroker_SOURCES qpid/broker/MessageStoreModule.cpp qpid/broker/NameGenerator.cpp qpid/broker/NullMessageStore.cpp + qpid/broker/PagedQueue.cpp qpid/broker/QueueBindings.cpp qpid/broker/QueuedMessage.cpp qpid/broker/QueueCursor.cpp diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index cad41bca86..a99a57d32d 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -39,6 +39,7 @@ windows_dist = \ qpid/sys/windows/IOHandle.cpp \ qpid/sys/windows/IoHandlePrivate.h \ qpid/sys/windows/LockFile.cpp \ + qpid/sys/windows/MemoryMappedFile.cpp \ qpid/sys/windows/mingw32_compat.h \ qpid/sys/windows/PollableCondition.cpp \ qpid/sys/windows/PipeHandle.cpp \ @@ -167,6 +168,7 @@ libqpidcommon_la_SOURCES += \ qpid/sys/posix/AsynchIO.cpp \ qpid/sys/posix/FileSysDir.cpp \ qpid/sys/posix/LockFile.cpp \ + qpid/sys/posix/MemoryMappedFile.cpp \ qpid/sys/posix/Time.cpp \ qpid/sys/posix/Thread.cpp \ qpid/sys/posix/Shlib.cpp \ @@ -492,6 +494,7 @@ libqpidcommon_la_SOURCES += \ qpid/sys/Fork.h \ qpid/sys/LockFile.h \ qpid/sys/LockPtr.h \ + qpid/sys/MemoryMappedFile.h \ qpid/sys/MemStat.h \ qpid/sys/OutputControl.h \ qpid/sys/OutputTask.h \ @@ -654,6 +657,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/MessageStore.h \ qpid/broker/MessageStoreModule.cpp \ qpid/broker/MessageStoreModule.h \ + qpid/broker/PagedQueue.h \ + qpid/broker/PagedQueue.cpp \ qpid/broker/PriorityQueue.h \ qpid/broker/PriorityQueue.cpp \ qpid/broker/Protocol.h \ diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index b999a7d5a3..64ef5bd771 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -145,9 +145,6 @@ public: void annotationsChanged(); }; -QPID_BROKER_EXTERN void encode(const Message&, std::string&); -QPID_BROKER_EXTERN void decode(const std::string&, Message&); - }} diff --git a/cpp/src/qpid/broker/PagedQueue.cpp b/cpp/src/qpid/broker/PagedQueue.cpp new file mode 100644 index 0000000000..9bce28061a --- /dev/null +++ b/cpp/src/qpid/broker/PagedQueue.cpp @@ -0,0 +1,425 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/PagedQueue.h" +#include "qpid/broker/Protocol.h" +#include "qpid/broker/QueueCursor.h" +#include "qpid/broker/Message.h" +#include "qpid/log/Statement.h" +#include <string.h> + +namespace qpid { +namespace broker { +namespace { +using qpid::sys::MemoryMappedFile; + +size_t encodedSize(const Message& msg) +{ + return msg.getPersistentContext()->encodedSize() + 4/*content-size*/ + 4/*sequence-number*/; +} + +size_t encode(const Message& msg, char* data, size_t size) +{ + uint32_t encoded = msg.getPersistentContext()->encodedSize(); + uint32_t required = encoded + 4/*content-size*/ + 4/*sequence-number*/; + if (required > size) return 0; + qpid::framing::Buffer buffer(data, required); + buffer.putLong(encoded); + buffer.putLong(msg.getSequence()); + msg.getPersistentContext()->encode(buffer); + assert(buffer.getPosition() == required); + return required; +} + +size_t decode(ProtocolRegistry& protocols, Message& msg, const char* data, size_t size) +{ + qpid::framing::Buffer metadata(const_cast<char*>(data), size); + uint32_t encoded = metadata.getLong(); + uint32_t sequence = metadata.getLong(); + assert(metadata.available() >= encoded); + qpid::framing::Buffer buffer(const_cast<char*>(data) + metadata.getPosition(), encoded); + msg = protocols.decode(buffer); + assert(buffer.getPosition() == encoded); + msg.setSequence(qpid::framing::SequenceNumber(sequence)); + return encoded + metadata.getPosition(); +} + +} + +PagedQueue::PagedQueue(const std::string& name, const std::string& directory, uint m, uint factor, ProtocolRegistry& p) + : pageSize(file.getPageSize()*factor), maxLoaded(m), protocols(p), offset(0) +{ + path = file.open(name, directory); + QPID_LOG(debug, "PagedQueue[" << path << "]"); +} + +size_t PagedQueue::size() +{ + size_t total(0); + for (Used::const_iterator i = used.begin(); i != used.end(); ++i) { + total += i->second.available(); + } + return total; +} + +bool PagedQueue::deleted(const QueueCursor& cursor) +{ + if (cursor.valid) { + Used::iterator page = findPage(cursor.position, false); + if (page == used.end()) { + return false; + } + page->second.deleted(cursor.position); + if (page->second.empty()) { + //move page to free list + --loaded; + page->second.clear(file); + free.push_back(page->second); + used.erase(page); + } + return true; + } else { + return false; + } +} + +void PagedQueue::publish(const Message& added) +{ + if (encodedSize(added) > pageSize) { + throw qpid::Exception(QPID_MSG("Message is larger than page size for queue backed by " << path)); + } + Used::reverse_iterator i = used.rbegin(); + if (i != used.rend()) { + if (!i->second.isLoaded()) load(i->second); + if (i->second.add(added)) return; + } + //used is empty or last page is full, need to add a new page + if (!newPage(added.getSequence()).add(added)) { + throw qpid::Exception(QPID_MSG("Could not add message to paged queue backed by " << path)); + } +} + +Message* PagedQueue::next(QueueCursor& cursor) +{ + Used::iterator i = used.begin(); + if (cursor.valid) { + qpid::framing::SequenceNumber position(cursor.position); + ++position; + i = findPage(position, true); + if (i == used.end() && !used.empty() && used.begin()->first > position) i = used.begin(); + } + while (i != used.end()) { + Message* m = i->second.next(version, cursor); + QPID_LOG(debug, "PagedQueue::next(" << cursor.valid << ":" << cursor.position << "): " << m); + if (m) return m; + ++i; + } + QPID_LOG(debug, "PagedQueue::next(" << cursor.valid << ":" << cursor.position << ") returning 0 "); + return 0; +} + +Message* PagedQueue::release(const QueueCursor& cursor) +{ + if (cursor.valid) { + Used::iterator i = findPage(cursor.position, true); + if (i == used.end()) return 0; + return i->second.release(cursor.position); + } else { + return 0; + } +} + +Message* PagedQueue::find(const framing::SequenceNumber& position, QueueCursor* cursor) +{ + Used::iterator i = findPage(position, true); + if (i != used.end()) { + Message* m = i->second.find(position); + if (cursor) { + cursor->setPosition(version, m ? m->getSequence() : position); + } + return m; + } else { + return 0; + } +} + +void PagedQueue::foreach(Functor) +{ + //TODO: +} + +Message* PagedQueue::find(const QueueCursor& cursor) +{ + if (cursor.valid) return find(cursor.position, 0); + else return 0; +} + +PagedQueue::Page::Page(size_t s, size_t o) : size(s), offset(o), region(0), used(0) +{ + QPID_LOG(debug, "Created Page[" << offset << "], size=" << size); +} + +void PagedQueue::Page::deleted(qpid::framing::SequenceNumber s) +{ + if (isLoaded()) { + Message* message = find(s); + assert(message);//could this ever legitimately be 0? + message->setState(DELETED); + } + contents.remove(s); + acquired.remove(s); +} + +Message* PagedQueue::Page::release(qpid::framing::SequenceNumber s) +{ + Message* m = find(s); + if (m) { + m->setState(AVAILABLE); + } + acquired.remove(s); + return m; +} + +bool PagedQueue::Page::add(const Message& message) +{ + assert(region); + assert (size >= used); + size_t encoded = encode(message, region + used, size - used); + QPID_LOG(debug, "Calling Page[" << offset << "]::add() used=" << used << ", size=" << size << ", encoded=" << encoded << ")"); + if (encoded) { + used += encoded; + messages.push_back(message); + messages.back().setState(AVAILABLE); + contents.add(message.getSequence()); + return true; + } else { + return false; + } +} + +bool PagedQueue::Page::empty() const +{ + return contents.empty(); +} + +bool PagedQueue::Page::isLoaded() const +{ + return region; +} + +Message* PagedQueue::Page::next(uint32_t version, QueueCursor& cursor) +{ + if (messages.empty()) return 0; + + qpid::framing::SequenceNumber position; + if (cursor.valid) { + position = cursor.position + 1; + if (position < messages.front().getSequence()) { + position = messages.front().getSequence(); + cursor.setPosition(position, version); + } + } else { + position = messages.front().getSequence(); + cursor.setPosition(position, version); + } + + Message* m; + do { + m = find(position); + if (m) cursor.setPosition(position, version); + ++position; + } while (m != 0 && !cursor.check(*m)); + return m; + //if it is the first in the page, increment the hint count of the page + //if it is the last in the page, decrement the hint count of the page +} + +/** + * Called before adding to the free list + */ +void PagedQueue::Page::clear(MemoryMappedFile& file) +{ + if (region) file.unmap(region, size); + region = 0; + used = 0; + contents.clear(); + messages.clear(); +} + +size_t PagedQueue::Page::available() const +{ + return contents.size() - acquired.size(); +} + +Message* PagedQueue::Page::find(qpid::framing::SequenceNumber position) +{ + if (messages.size()) { + assert(position >= messages.front().getSequence()); + + size_t index = position - messages.front().getSequence(); + if (index < messages.size()) return &(messages[index]); + else return 0; + } else { + //page is empty, is this an error? + QPID_LOG(warning, "Could not find message at " << position << "; empty page."); + return 0; + } + + //if it is the first in the page, increment the hint count of the page + //if it is the last in the page, decrement the hint count of the page +} + +void PagedQueue::Page::load(MemoryMappedFile& file, ProtocolRegistry& protocols) +{ + QPID_LOG(debug, "Page[" << offset << "]::load" << " used=" << used << ", size=" << size); + assert(region == 0); + region = file.map(offset, size); + assert(region != 0); + bool haveData = used > 0; + used = 4;//first 4 bytes are the count + if (haveData) { + uint32_t count = *(reinterpret_cast<uint32_t*>(region)); + //decode messages into Page::messages + for (size_t i = 0; i < count; ++i) { + Message message; + used += decode(protocols, message, region + used, size - used); + if (!contents.contains(message.getSequence())) { + message.setState(DELETED); + QPID_LOG(debug, "Setting state to deleted for message loaded at " << message.getSequence()); + } else if (acquired.contains(message.getSequence())) { + message.setState(ACQUIRED); + } else { + message.setState(AVAILABLE); + } + messages.push_back(message); + } + if (messages.size()) { + QPID_LOG(debug, "Page[" << offset << "]::load " << messages.size() << " messages loaded from " + << messages.front().getSequence() << " to " << messages.back().getSequence()); + } else { + QPID_LOG(debug, "Page[" << offset << "]::load no messages loaded"); + } + }//else there is nothing we need to explicitly load, just needed to map region +} + +void PagedQueue::Page::unload(MemoryMappedFile& file) +{ + if (messages.size()) { + QPID_LOG(debug, "Page[" << offset << "]::unload " << messages.size() << " messages to unload from " + << messages.front().getSequence() << " to " << messages.back().getSequence()); + } else { + QPID_LOG(debug, "Page[" << offset << "]::unload no messages to unload"); + } + for (std::deque<Message>::iterator i = messages.begin(); i != messages.end(); ++i) { + if (i->getState() == ACQUIRED) acquired.add(i->getSequence()); + } + uint32_t count = messages.size(); + ::memcpy(region, &count, sizeof(count)); + file.flush(region, size); + file.unmap(region, size); + //remove messages from memory + messages.clear(); + region = 0; +} + +void PagedQueue::load(Page& page) +{ + //if needed, release another page + if (loaded == maxLoaded) { + //which page to select? + Used::reverse_iterator i = used.rbegin(); + while (i != used.rend() && !i->second.isLoaded()) { + ++i; + } + assert(i != used.rend()); + unload(i->second); + } + page.load(file, protocols); + ++loaded; + QPID_LOG(debug, "PagedQueue[" << path << "] loaded page, " << loaded << " pages now loaded"); +} + +void PagedQueue::unload(Page& page) +{ + page.unload(file); + --loaded; + QPID_LOG(debug, "PagedQueue[" << path << "] unloaded page, " << loaded << " pages now loaded"); +} + + +PagedQueue::Page& PagedQueue::newPage(qpid::framing::SequenceNumber id) +{ + if (loaded == maxLoaded) { + //need to release a page from memory to make way for a new one + + //choose last one? + Used::reverse_iterator i = used.rbegin(); + while (!i->second.isLoaded() && i != used.rend()) { + ++i; + } + assert(i != used.rend()); + unload(i->second); + } + if (free.empty()) { + //need to extend file and add some pages to the free list + addPages(4/*arbitrary number, should this be config item?*/); + } + std::pair<Used::iterator, bool> result = used.insert(Used::value_type(id, free.front())); + QPID_LOG(debug, "Added page for sequence starting from " << id); + assert(result.second); + free.pop_front(); + load(result.first->second); + return result.first->second; +} + +void PagedQueue::addPages(size_t count) +{ + for (size_t i = 0; i < count; ++i) { + free.push_back(Page(pageSize, offset)); + offset += pageSize; + file.expand(offset); + } + QPID_LOG(debug, "Added " << count << " pages to free list; now have " << used.size() << " used, and " << free.size() << " free"); +} + +PagedQueue::Used::iterator PagedQueue::findPage(const QueueCursor& cursor) +{ + Used::iterator i = used.begin(); + if (cursor.valid) { + i = findPage(cursor.position, true); + } else if (i != used.end() && !i->second.isLoaded()) { + load(i->second); + } + return i; +} + +PagedQueue::Used::iterator PagedQueue::findPage(qpid::framing::SequenceNumber n, bool loadIfRequired) +{ + Used::iterator i = used.end(); + for (Used::iterator j = used.begin(); j != used.end() && j->first <= n; ++j) { + i = j; + } + if (loadIfRequired && i != used.end() && !i->second.isLoaded()) { + load(i->second); + } + return i; +} + +}} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/PagedQueue.h b/cpp/src/qpid/broker/PagedQueue.h new file mode 100644 index 0000000000..7b96a37141 --- /dev/null +++ b/cpp/src/qpid/broker/PagedQueue.h @@ -0,0 +1,98 @@ +#ifndef QPID_BROKER_PAGEDQUEUE_H +#define QPID_BROKER_PAGEDQUEUE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/Messages.h" +#include "qpid/framing/SequenceSet.h" +#include "qpid/sys/MemoryMappedFile.h" +#include <deque> +#include <list> +#include <map> + +namespace qpid { +namespace broker { +class ProtocolRegistry; +/** + * + */ +class PagedQueue : public Messages { + public: + PagedQueue(const std::string& name, const std::string& directory, uint maxLoaded, uint pageFactor, ProtocolRegistry& protocols); + size_t size(); + bool deleted(const QueueCursor&); + void publish(const Message& added); + Message* next(QueueCursor& cursor); + Message* release(const QueueCursor& cursor); + Message* find(const framing::SequenceNumber&, QueueCursor*); + Message* find(const QueueCursor&); + void foreach(Functor); + private: + class Page { + public: + Page(size_t size, size_t offset); + void read();//decode messages from memory mapped file + void write();//encode messages into memory mapped file + bool isLoaded() const; + bool empty() const; + void deleted(qpid::framing::SequenceNumber); + Message* release(qpid::framing::SequenceNumber); + bool add(const Message&); + Message* next(uint32_t version, QueueCursor&); + Message* find(qpid::framing::SequenceNumber); + void load(qpid::sys::MemoryMappedFile&,ProtocolRegistry&); + void unload(qpid::sys::MemoryMappedFile&); + void clear(qpid::sys::MemoryMappedFile&); + size_t available() const; + private: + size_t size; + size_t offset; + + char* region;//0 implies not mapped + qpid::framing::SequenceSet contents; + qpid::framing::SequenceSet acquired; + std::deque<Message> messages;//decoded representation + size_t used;//amount of data used to encode current set of messages held + }; + + qpid::sys::MemoryMappedFile file; + std::string path; + const size_t pageSize; + const uint maxLoaded; + ProtocolRegistry& protocols; + size_t offset; + typedef std::map<qpid::framing::SequenceNumber, Page> Used; + Used used; + std::list<Page> free; + uint loaded; + uint32_t version; + + void addPages(size_t count); + Page& newPage(qpid::framing::SequenceNumber); + Used::iterator findPage(const QueueCursor& cursor); + Used::iterator findPage(qpid::framing::SequenceNumber n, bool loadIfRequired); + void load(Page&); + void unload(Page&); + bool deleted(qpid::framing::SequenceNumber); +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_PAGEDQUEUE_H*/ diff --git a/cpp/src/qpid/broker/Protocol.cpp b/cpp/src/qpid/broker/Protocol.cpp index e236698142..a98160e502 100644 --- a/cpp/src/qpid/broker/Protocol.cpp +++ b/cpp/src/qpid/broker/Protocol.cpp @@ -20,6 +20,7 @@ */ #include "Protocol.h" #include "qpid/sys/ConnectionCodec.h" +#include "qpid/broker/RecoverableMessageImpl.h" #include "qpid/broker/amqp_0_10/MessageTransfer.h" #include "qpid/log/Statement.h" @@ -47,13 +48,25 @@ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> ProtocolReg } boost::shared_ptr<RecoverableMessage> ProtocolRegistry::recover(qpid::framing::Buffer& b) { - boost::shared_ptr<RecoverableMessage> msg; - for (Protocols::const_iterator i = protocols.begin(); !msg && i != protocols.end(); ++i) { - msg = i->second->recover(b); + uint32_t position = b.getPosition(); + for (Protocols::const_iterator i = protocols.begin(); i != protocols.end(); ++i) { + boost::shared_ptr<RecoverableMessage> msg = i->second->recover(b); + if (msg) return msg; + else b.setPosition(position); } + boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer()); + transfer->decodeHeader(b); + boost::shared_ptr<RecoverableMessage> msg(new RecoverableMessageImpl(Message(transfer, transfer))); return msg; } +Message ProtocolRegistry::decode(qpid::framing::Buffer& buffer) +{ + boost::shared_ptr<RecoverableMessage> r = recover(buffer); + r->decodeContent(buffer); + return r->getMessage(); +} + ProtocolRegistry::~ProtocolRegistry() { for (Protocols::const_iterator i = protocols.begin(); i != protocols.end(); ++i) { diff --git a/cpp/src/qpid/broker/Protocol.h b/cpp/src/qpid/broker/Protocol.h index 2f268748fb..1d253e6d0f 100644 --- a/cpp/src/qpid/broker/Protocol.h +++ b/cpp/src/qpid/broker/Protocol.h @@ -68,6 +68,7 @@ class ProtocolRegistry : public Protocol qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&); boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> translate(const Message&); boost::shared_ptr<RecoverableMessage> recover(qpid::framing::Buffer&); + Message decode(qpid::framing::Buffer&); ~ProtocolRegistry(); void add(const std::string&, Protocol*); diff --git a/cpp/src/qpid/broker/QueueCursor.h b/cpp/src/qpid/broker/QueueCursor.h index 2551b64a48..c7368177e9 100644 --- a/cpp/src/qpid/broker/QueueCursor.h +++ b/cpp/src/qpid/broker/QueueCursor.h @@ -64,6 +64,7 @@ class QueueCursor friend class MessageDeque; friend class MessageMap; friend class PriorityQueue; + friend class PagedQueue; template <typename T> friend class IndexedDeque; }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/QueueFactory.cpp b/cpp/src/qpid/broker/QueueFactory.cpp index efeb9ae53b..67499c9985 100644 --- a/cpp/src/qpid/broker/QueueFactory.cpp +++ b/cpp/src/qpid/broker/QueueFactory.cpp @@ -30,17 +30,18 @@ #include "qpid/broker/Fairshare.h" #include "qpid/broker/MessageDeque.h" #include "qpid/broker/MessageMap.h" +#include "qpid/broker/PagedQueue.h" #include "qpid/broker/PriorityQueue.h" #include "qpid/broker/QueueFlowLimit.h" #include "qpid/broker/ThresholdAlerts.h" #include "qpid/broker/FifoDistributor.h" +#include "qpid/log/Statement.h" #include <map> #include <memory> namespace qpid { namespace broker { - QueueFactory::QueueFactory() : broker(0), store(0), parent(0) {} boost::shared_ptr<Queue> QueueFactory::create(const std::string& name, const QueueSettings& settings) @@ -66,6 +67,17 @@ boost::shared_ptr<Queue> QueueFactory::create(const std::string& name, const Que } else { queue->messages = std::auto_ptr<Messages>(new PriorityQueue(settings.priorities)); } + } else if (settings.paging) { + if (!broker) { + QPID_LOG(warning, "Cannot create paged queue without broker context"); + } else if (!qpid::sys::MemoryMappedFile::isSupported()) { + QPID_LOG(warning, "Cannot create paged queue; memory mapped file support not available on this platform"); + } else { + queue->messages = std::auto_ptr<Messages>(new PagedQueue(name, broker->getOptions().dataDir, + settings.maxPages ? settings.maxPages : 4, + settings.pageFactor ? settings.pageFactor : 1, + broker->getProtocolRegistry())); + } } else if (settings.lvqKey.empty()) {//LVQ already handled above queue->messages = std::auto_ptr<Messages>(new MessageDeque()); } diff --git a/cpp/src/qpid/broker/QueueSettings.cpp b/cpp/src/qpid/broker/QueueSettings.cpp index cb2b11621a..d28fa38fde 100644 --- a/cpp/src/qpid/broker/QueueSettings.cpp +++ b/cpp/src/qpid/broker/QueueSettings.cpp @@ -54,6 +54,9 @@ const std::string ALERT_SIZE_DOWN("qpid.alert_size_down"); const std::string PRIORITIES("qpid.priorities"); const std::string FAIRSHARE("qpid.fairshare"); const std::string FAIRSHARE_ALIAS("x-qpid-fairshare"); +const std::string PAGING("qpid.paging"); +const std::string MAX_PAGES("qpid.max_pages_loaded"); +const std::string PAGE_FACTOR("qpid.page_factor"); const std::string LVQ_LEGACY("qpid.last_value_queue"); const std::string LVQ_LEGACY_KEY("qpid.LVQ_key"); @@ -87,6 +90,9 @@ QueueSettings::QueueSettings(bool d, bool a) : shareGroups(false), addTimestamp(false), dropMessagesAtLimit(false), + paging(false), + maxPages(0), + pageFactor(0), noLocal(false), isBrowseOnly(false), autoDeleteDelay(0), @@ -187,6 +193,15 @@ bool QueueSettings::handle(const std::string& key, const qpid::types::Variant& v } else if (key == MAX_FILE_SIZE && value.asUint64() > 0) { maxFileSize = value.asUint64(); return false; // 'handle' here and also pass to store + } else if (key == PAGING) { + paging = value; + return true; + } else if (key == MAX_PAGES) { + maxPages = value; + return true; + } else if (key == PAGE_FACTOR) { + pageFactor = value; + return true; } else { return false; } @@ -218,6 +233,25 @@ void QueueSettings::validate() const throw qpid::framing::InvalidArgumentException(QPID_MSG("Only shared groups are supported at present; " << MessageGroupManager::qpidSharedGroup << " is required if " << MessageGroupManager::qpidMessageGroupKey << " is set")); } + + if (paging) { + if(lvqKey.size()) { + throw qpid::framing::InvalidArgumentException(QPID_MSG("Cannot specify " << LVQ_KEY << " and " << PAGING << " for the same queue")); + } + if(priorities) { + throw qpid::framing::InvalidArgumentException(QPID_MSG("Cannot specify " << PRIORITIES << " and " << PAGING << " for the same queue")); + } + if(groupKey.size()) { + throw qpid::framing::InvalidArgumentException(QPID_MSG("Cannot specify " << MessageGroupManager::qpidMessageGroupKey << " and " << PAGING << " for the same queue")); + } + } else { + if (maxPages) { + throw qpid::framing::InvalidArgumentException(QPID_MSG("Can only specify " << MAX_PAGES << " if " << PAGING << " is set")); + } + if (pageFactor) { + throw qpid::framing::InvalidArgumentException(QPID_MSG("Can only specify " << PAGE_FACTOR << " if " << PAGING << " is set")); + } + } } void QueueSettings::populate(const std::map<std::string, qpid::types::Variant>& inputs, std::map<std::string, qpid::types::Variant>& unused) diff --git a/cpp/src/qpid/broker/QueueSettings.h b/cpp/src/qpid/broker/QueueSettings.h index ec012c578f..3f42b53f23 100644 --- a/cpp/src/qpid/broker/QueueSettings.h +++ b/cpp/src/qpid/broker/QueueSettings.h @@ -59,6 +59,11 @@ struct QueueSettings QueueDepth maxDepth; bool dropMessagesAtLimit;//aka ring queue policy + //PagedQueue: + bool paging; + uint maxPages; + uint pageFactor; + bool noLocal; bool isBrowseOnly; std::string traceId; diff --git a/cpp/src/qpid/broker/RecoverableMessage.h b/cpp/src/qpid/broker/RecoverableMessage.h index aafcd756d5..888dc364e9 100644 --- a/cpp/src/qpid/broker/RecoverableMessage.h +++ b/cpp/src/qpid/broker/RecoverableMessage.h @@ -30,7 +30,7 @@ namespace qpid { namespace broker { class ExpiryPolicy; - +class Message; /** * The interface through which messages are reloaded on recovery. */ @@ -53,6 +53,7 @@ public: * of length as necessary) */ virtual void decodeContent(framing::Buffer& buffer) = 0; + virtual Message getMessage() = 0; virtual ~RecoverableMessage() {}; }; diff --git a/cpp/src/qpid/broker/RecoverableMessageImpl.h b/cpp/src/qpid/broker/RecoverableMessageImpl.h index a46f5a3676..7cef5fd265 100644 --- a/cpp/src/qpid/broker/RecoverableMessageImpl.h +++ b/cpp/src/qpid/broker/RecoverableMessageImpl.h @@ -22,11 +22,11 @@ * */ #include "RecoverableMessage.h" +#include "Message.h" namespace qpid { namespace broker { class DtxBuffer; -class Message; class Queue; class RecoverableMessageImpl : public RecoverableMessage @@ -43,6 +43,7 @@ public: void recover(boost::shared_ptr<Queue> queue); void enqueue(boost::shared_ptr<DtxBuffer> buffer, boost::shared_ptr<Queue> queue); void dequeue(boost::shared_ptr<DtxBuffer> buffer, boost::shared_ptr<Queue> queue); + Message getMessage(); }; }} // namespace qpid::broker diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp index ab89a46a46..df1c88d183 100644 --- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp +++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp @@ -118,15 +118,8 @@ RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buffer& buffer) { - framing::Buffer sniffer(buffer.getPointer(), buffer.available()); - RecoverableMessage::shared_ptr m = protocols.recover(sniffer); - if (m) { - return m; - } else { - boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer()); - transfer->decodeHeader(buffer); - return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(Message(transfer, transfer))); - } + RecoverableMessage::shared_ptr m = protocols.recover(buffer); + return m; } RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid, @@ -191,6 +184,11 @@ void RecoverableMessageImpl::computeExpiration(const boost::intrusive_ptr<Expiry msg.computeExpiration(ep); } +Message RecoverableMessageImpl::getMessage() +{ + return msg; +} + void RecoverableQueueImpl::recover(RecoverableMessage::shared_ptr msg) { dynamic_pointer_cast<RecoverableMessageImpl>(msg)->recover(queue); diff --git a/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp b/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp index b9c509c015..a389127b9e 100644 --- a/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp +++ b/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp @@ -228,11 +228,16 @@ void MessageTransfer::decodeHeader(framing::Buffer& buffer) } void MessageTransfer::decodeContent(framing::Buffer& buffer) { - if (buffer.available()) { + decodeContent(buffer, buffer.available()); +} + +void MessageTransfer::decodeContent(framing::Buffer& buffer, size_t size) +{ + if (size) { //get the data as a string and set that as the content //body on a frame then add that frame to the frameset AMQFrame frame((AMQContentBody())); - frame.castBody<AMQContentBody>()->decode(buffer, buffer.available()); + frame.castBody<AMQContentBody>()->decode(buffer, size); frame.setFirstSegment(false); frames.append(frame); } else { @@ -373,25 +378,4 @@ boost::intrusive_ptr<PersistableMessage> MessageTransfer::merge(const std::map<s } return clone; } -} -// qpid::broker namespace, TODO: move these elsewhere! -void encode(const Message& in, std::string& out) -{ - const amqp_0_10::MessageTransfer& transfer = amqp_0_10::MessageTransfer::get(in); - uint32_t size = transfer.encodedSize(); - std::vector<char> data(size); - qpid::framing::Buffer buffer(&(data[0]), size); - transfer.encode(buffer); - buffer.reset(); - buffer.getRawData(out, size); -} -void decode(const std::string& in, Message& out) -{ - boost::intrusive_ptr<amqp_0_10::MessageTransfer> transfer(new amqp_0_10::MessageTransfer); - qpid::framing::Buffer buffer(const_cast<char*>(in.data()), in.size()); - transfer->decodeHeader(buffer); - transfer->decodeContent(buffer); - out = Message(transfer, transfer); -} - -}} // namespace qpid::broker::amqp_0_10 +}}} // namespace qpid::broker::amqp_0_10 diff --git a/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h b/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h index 9e432235e6..8ac9862445 100644 --- a/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h +++ b/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h @@ -93,6 +93,7 @@ class MessageTransfer : public qpid::broker::Message::Encoding, public qpid::bro void decodeHeader(framing::Buffer& buffer); void decodeContent(framing::Buffer& buffer); + void decodeContent(framing::Buffer& buffer, size_t size); void encode(framing::Buffer& buffer) const; uint32_t encodedSize() const; diff --git a/cpp/src/qpid/sys/MemoryMappedFile.h b/cpp/src/qpid/sys/MemoryMappedFile.h new file mode 100644 index 0000000000..3dabada06b --- /dev/null +++ b/cpp/src/qpid/sys/MemoryMappedFile.h @@ -0,0 +1,74 @@ +#ifndef QPID_SYS_MEMORYMAPPEDFILE_H +#define QPID_SYS_MEMORYMAPPEDFILE_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <memory> +#include <string> + +namespace qpid { +namespace sys { + +class MemoryMappedFilePrivate; +/** + * Abstraction of memory mapping functionality + */ +class MemoryMappedFile { + public: + MemoryMappedFile(); + /** + * Opens a file that can be mapped by region into memory + */ + std::string open(const std::string& name, const std::string& directory); + /** + * Returns the page size + */ + size_t getPageSize(); + /** + * Load a portion of the file into memory + */ + char* map(size_t offset, size_t size); + /** + * Evict a portion of the file from memory + */ + void unmap(char* region, size_t size); + /** + * Flush any changes to a previously mapped region of the file + * back to disk + */ + void flush(char* region, size_t size); + /** + * Expand the capacity of the file + */ + void expand(size_t offset); + /** + * Returns true if memory mapping is supported, false otherwise + */ + static bool isSupported(); + private: + std::auto_ptr<MemoryMappedFilePrivate> state; + + MemoryMappedFile(const MemoryMappedFile&); + MemoryMappedFile& operator=(const MemoryMappedFile&); +}; +}} // namespace qpid::sys + +#endif /*!QPID_SYS_MEMORYMAPPEDFILE_H*/ diff --git a/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp b/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp new file mode 100644 index 0000000000..eae23cc6a9 --- /dev/null +++ b/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp @@ -0,0 +1,105 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/MemoryMappedFile.h" +#include "qpid/Exception.h" +#include "qpid/Msg.h" +#include <sys/mman.h> +#include <unistd.h> +#include <fcntl.h> + +namespace qpid { +namespace sys { +namespace { +const std::string PATH_SEPARATOR("/"); +const std::string ESCAPE("%"); +const std::string VALID("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-."); +std::string getFileName(const std::string& name, const std::string& dir) +{ + std::stringstream filename; + if (dir.size()) filename << dir << PATH_SEPARATOR; + size_t start = 0; + while (true) { + size_t i = name.find_first_not_of(VALID, start); + if (i == std::string::npos) { + filename << name.substr(start); + return filename.str(); + } else { + if (i > start) filename << name.substr(start, i-start); + filename << ESCAPE << (int) name.at(i); + start = i+1; + } + } + +} +} + +class MemoryMappedFilePrivate +{ + friend class MemoryMappedFile; + const int fd; + MemoryMappedFilePrivate(int fd_) : fd(fd_) {} +}; +MemoryMappedFile::MemoryMappedFile() {} + +std::string MemoryMappedFile::open(const std::string& name, const std::string& directory) +{ + std::string path = getFileName(name, directory); + + int flags = O_CREAT | O_TRUNC | O_RDWR; + int fd = ::open(path.c_str(), flags, S_IRWXU); + if (fd == -1) throw qpid::Exception(QPID_MSG("Failed to open memory mapped file " << path << ": " << qpid::sys::strError(errno) << " [flags=" << flags << "]")); + state = std::auto_ptr<MemoryMappedFilePrivate>(new MemoryMappedFilePrivate(fd)); + return path; +} +size_t MemoryMappedFile::getPageSize() +{ + return ::sysconf(_SC_PAGE_SIZE); +} +char* MemoryMappedFile::map(size_t offset, size_t size) +{ + int protection = PROT_READ | PROT_WRITE; + char* region = (char*) ::mmap(0, size, protection, MAP_SHARED, state->fd, offset); + if (region == MAP_FAILED) { + throw qpid::Exception(QPID_MSG("Failed to map page into memory: " << qpid::sys::strError(errno))); + } + return region; + +} +void MemoryMappedFile::unmap(char* region, size_t size) +{ + ::munmap(region, size); +} +void MemoryMappedFile::flush(char* region, size_t size) +{ + ::msync(region, size, MS_ASYNC); +} +void MemoryMappedFile::expand(size_t offset) +{ + if ((::lseek(state->fd, offset - 1, SEEK_SET) == -1) || (::write(state->fd, "", 1) == -1)) { + throw qpid::Exception(QPID_MSG("Failed to expand paged queue file: " << qpid::sys::strError(errno))); + } +} +bool MemoryMappedFile::isSupported() +{ + return true; +} + +}} // namespace qpid::sys diff --git a/cpp/src/qpid/sys/windows/MemoryMappedFile.cpp b/cpp/src/qpid/sys/windows/MemoryMappedFile.cpp new file mode 100644 index 0000000000..c43a50db74 --- /dev/null +++ b/cpp/src/qpid/sys/windows/MemoryMappedFile.cpp @@ -0,0 +1,55 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/sys/MemoryMappedFile.h" + +namespace qpid { +namespace sys { +class MemoryMappedFilePrivate {}; + +MemoryMappedFile::MemoryMappedFile() {} + +std::string MemoryMappedFile::open(const std::string& /*name*/, const std::string& /*directory*/) +{ + return std::string(); +} +size_t MemoryMappedFile::getPageSize() +{ + return 0; +} +char* MemoryMappedFile::map(size_t /*offset*/, size_t /*size*/) +{ + return 0; +} +void MemoryMappedFile::unmap(char* /*region*/, size_t /*size*/) +{ +} +void MemoryMappedFile::flush(char* /*region*/, size_t /*size*/) +{ +} +void MemoryMappedFile::expand(size_t /*offset*/) +{ +} +bool MemoryMappedFile::isSupported() +{ + return false; +} + +}} // namespace qpid::sys diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt index cb10414970..dd29743126 100644 --- a/cpp/src/tests/CMakeLists.txt +++ b/cpp/src/tests/CMakeLists.txt @@ -319,6 +319,10 @@ add_test (qpid-client-test ${test_wrap} ${qpid-client-test_LOCATION}) add_test (quick_perftest ${test_wrap} ${qpid-perftest_LOCATION} --summary --count 100) add_test (quick_topictest ${test_wrap} ${CMAKE_CURRENT_SOURCE_DIR}/quick_topictest${test_script_suffix}) add_test (quick_txtest ${test_wrap} ${qpid-txtest_LOCATION} --queues 4 --tx-count 10 --quiet) +if (NOT CMAKE_SYSTEM_NAME STREQUAL Windows) + # paged queue not yet implemented for windows + add_test (paged_queue_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_paged_queue_tests${test_script_suffix}) +endif (NOT CMAKE_SYSTEM_NAME STREQUAL Windows) if (PYTHON_EXECUTABLE) add_test (run_header_test ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_header_test${test_script_suffix}) add_test (python_tests ${test_wrap} ${CMAKE_CURRENT_SOURCE_DIR}/python_tests${test_script_suffix}) diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 3943e21b7f..43a538d6d0 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -325,6 +325,7 @@ EXTRA_DIST += \ run_perftest \ ring_queue_test \ run_ring_queue_test \ + run_paged_queue_tests \ dynamic_log_level_test \ dynamic_log_hires_timestamp \ qpid-ctrl \ @@ -362,7 +363,7 @@ CLEANFILES+=valgrind.out *.log *.vglog* dummy_test qpidd.port $(unit_wrappers) # Not run under valgrind, too slow LONG_TESTS+=start_broker \ - fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test \ + fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test run_paged_queue_tests\ run_msg_group_tests_soak \ stop_broker \ run_long_federation_sys_tests diff --git a/cpp/src/tests/MessageTest.cpp b/cpp/src/tests/MessageTest.cpp index fe670a274e..666a2c6297 100644 --- a/cpp/src/tests/MessageTest.cpp +++ b/cpp/src/tests/MessageTest.cpp @@ -19,6 +19,7 @@ * */ #include "qpid/broker/Message.h" +#include "qpid/broker/Protocol.h" #include "qpid/framing/AMQP_HighestVersion.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/MessageTransferBody.h" @@ -56,10 +57,12 @@ QPID_AUTO_TEST_CASE(testEncodeDecode) properties["abc"] = "xyz"; Message msg = MessageUtils::createMessage(properties, data); - std::string buffer; - encode(msg, buffer); - msg = Message(); - decode(buffer, msg); + std::vector<char> bytes(msg.getPersistentContext()->encodedSize()); + qpid::framing::Buffer buffer(&bytes[0], bytes.size()); + msg.getPersistentContext()->encode(buffer); + buffer.reset(); + ProtocolRegistry registry; + msg = registry.decode(buffer); BOOST_CHECK_EQUAL(routingKey, msg.getRoutingKey()); BOOST_CHECK_EQUAL((uint64_t) data.size(), msg.getContentSize()); diff --git a/cpp/src/tests/run_paged_queue_tests b/cpp/src/tests/run_paged_queue_tests new file mode 100755 index 0000000000..8a72c23d86 --- /dev/null +++ b/cpp/src/tests/run_paged_queue_tests @@ -0,0 +1,32 @@ +#!/bin/bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +#setup path to find qpid-config and sender/receiver test progs +source ./test_env.sh + +export PATH=$PWD:$srcdir:$PYTHON_COMMANDS:$PATH + +#set port to connect to via env var +test -s qpidd.port && QPID_PORT=`cat qpidd.port` +export QPID_PORT + +qpid-cpp-benchmark --broker "localhost:$QPID_PORT" --create-option "node:{x-declare:{arguments:{'qpid.paging':True,'qpid.max_size':0,'qpid.max_count':0,'qpid.flow_stop_size':0,'qpid.flow_resume_size':0,'qpid.flow_stop_count':0,'qpid.flow_resume_count':0}}}" +qpid-cpp-benchmark --broker "localhost:$QPID_PORT" --create-option "node:{x-declare:{arguments:{'qpid.paging':True,'qpid.max_size':0,'qpid.max_count':0,'qpid.flow_stop_size':0,'qpid.flow_resume_size':0,'qpid.flow_stop_count':0,'qpid.flow_resume_count':0}}}" --fill-drain
\ No newline at end of file |