summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/CMakeLists.txt3
-rw-r--r--cpp/src/Makefile.am5
-rw-r--r--cpp/src/qpid/broker/Message.h3
-rw-r--r--cpp/src/qpid/broker/PagedQueue.cpp425
-rw-r--r--cpp/src/qpid/broker/PagedQueue.h98
-rw-r--r--cpp/src/qpid/broker/Protocol.cpp19
-rw-r--r--cpp/src/qpid/broker/Protocol.h1
-rw-r--r--cpp/src/qpid/broker/QueueCursor.h1
-rw-r--r--cpp/src/qpid/broker/QueueFactory.cpp14
-rw-r--r--cpp/src/qpid/broker/QueueSettings.cpp34
-rw-r--r--cpp/src/qpid/broker/QueueSettings.h5
-rw-r--r--cpp/src/qpid/broker/RecoverableMessage.h3
-rw-r--r--cpp/src/qpid/broker/RecoverableMessageImpl.h3
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.cpp16
-rw-r--r--cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp32
-rw-r--r--cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h1
-rw-r--r--cpp/src/qpid/sys/MemoryMappedFile.h74
-rw-r--r--cpp/src/qpid/sys/posix/MemoryMappedFile.cpp105
-rw-r--r--cpp/src/qpid/sys/windows/MemoryMappedFile.cpp55
-rw-r--r--cpp/src/tests/CMakeLists.txt4
-rw-r--r--cpp/src/tests/Makefile.am3
-rw-r--r--cpp/src/tests/MessageTest.cpp11
-rwxr-xr-xcpp/src/tests/run_paged_queue_tests32
23 files changed, 900 insertions, 47 deletions
diff --git a/cpp/src/CMakeLists.txt b/cpp/src/CMakeLists.txt
index f3d7f5eed0..1fe2198ea6 100644
--- a/cpp/src/CMakeLists.txt
+++ b/cpp/src/CMakeLists.txt
@@ -748,6 +748,7 @@ if (CMAKE_SYSTEM_NAME STREQUAL Windows)
qpid/sys/windows/IocpPoller.cpp
qpid/sys/windows/IOHandle.cpp
qpid/sys/windows/LockFile.cpp
+ qpid/sys/windows/MemoryMappedFile.cpp
qpid/sys/windows/PipeHandle.cpp
qpid/sys/windows/PollableCondition.cpp
qpid/sys/windows/Shlib.cpp
@@ -849,6 +850,7 @@ else (CMAKE_SYSTEM_NAME STREQUAL Windows)
qpid/sys/posix/FileSysDir.cpp
qpid/sys/posix/IOHandle.cpp
qpid/sys/posix/LockFile.cpp
+ qpid/sys/posix/MemoryMappedFile.cpp
qpid/sys/posix/Mutex.cpp
qpid/sys/posix/PipeHandle.cpp
qpid/sys/posix/PollableCondition.cpp
@@ -1196,6 +1198,7 @@ set (qpidbroker_SOURCES
qpid/broker/MessageStoreModule.cpp
qpid/broker/NameGenerator.cpp
qpid/broker/NullMessageStore.cpp
+ qpid/broker/PagedQueue.cpp
qpid/broker/QueueBindings.cpp
qpid/broker/QueuedMessage.cpp
qpid/broker/QueueCursor.cpp
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index cad41bca86..a99a57d32d 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -39,6 +39,7 @@ windows_dist = \
qpid/sys/windows/IOHandle.cpp \
qpid/sys/windows/IoHandlePrivate.h \
qpid/sys/windows/LockFile.cpp \
+ qpid/sys/windows/MemoryMappedFile.cpp \
qpid/sys/windows/mingw32_compat.h \
qpid/sys/windows/PollableCondition.cpp \
qpid/sys/windows/PipeHandle.cpp \
@@ -167,6 +168,7 @@ libqpidcommon_la_SOURCES += \
qpid/sys/posix/AsynchIO.cpp \
qpid/sys/posix/FileSysDir.cpp \
qpid/sys/posix/LockFile.cpp \
+ qpid/sys/posix/MemoryMappedFile.cpp \
qpid/sys/posix/Time.cpp \
qpid/sys/posix/Thread.cpp \
qpid/sys/posix/Shlib.cpp \
@@ -492,6 +494,7 @@ libqpidcommon_la_SOURCES += \
qpid/sys/Fork.h \
qpid/sys/LockFile.h \
qpid/sys/LockPtr.h \
+ qpid/sys/MemoryMappedFile.h \
qpid/sys/MemStat.h \
qpid/sys/OutputControl.h \
qpid/sys/OutputTask.h \
@@ -654,6 +657,8 @@ libqpidbroker_la_SOURCES = \
qpid/broker/MessageStore.h \
qpid/broker/MessageStoreModule.cpp \
qpid/broker/MessageStoreModule.h \
+ qpid/broker/PagedQueue.h \
+ qpid/broker/PagedQueue.cpp \
qpid/broker/PriorityQueue.h \
qpid/broker/PriorityQueue.cpp \
qpid/broker/Protocol.h \
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index b999a7d5a3..64ef5bd771 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -145,9 +145,6 @@ public:
void annotationsChanged();
};
-QPID_BROKER_EXTERN void encode(const Message&, std::string&);
-QPID_BROKER_EXTERN void decode(const std::string&, Message&);
-
}}
diff --git a/cpp/src/qpid/broker/PagedQueue.cpp b/cpp/src/qpid/broker/PagedQueue.cpp
new file mode 100644
index 0000000000..9bce28061a
--- /dev/null
+++ b/cpp/src/qpid/broker/PagedQueue.cpp
@@ -0,0 +1,425 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/PagedQueue.h"
+#include "qpid/broker/Protocol.h"
+#include "qpid/broker/QueueCursor.h"
+#include "qpid/broker/Message.h"
+#include "qpid/log/Statement.h"
+#include <string.h>
+
+namespace qpid {
+namespace broker {
+namespace {
+using qpid::sys::MemoryMappedFile;
+
+size_t encodedSize(const Message& msg)
+{
+ return msg.getPersistentContext()->encodedSize() + 4/*content-size*/ + 4/*sequence-number*/;
+}
+
+size_t encode(const Message& msg, char* data, size_t size)
+{
+ uint32_t encoded = msg.getPersistentContext()->encodedSize();
+ uint32_t required = encoded + 4/*content-size*/ + 4/*sequence-number*/;
+ if (required > size) return 0;
+ qpid::framing::Buffer buffer(data, required);
+ buffer.putLong(encoded);
+ buffer.putLong(msg.getSequence());
+ msg.getPersistentContext()->encode(buffer);
+ assert(buffer.getPosition() == required);
+ return required;
+}
+
+size_t decode(ProtocolRegistry& protocols, Message& msg, const char* data, size_t size)
+{
+ qpid::framing::Buffer metadata(const_cast<char*>(data), size);
+ uint32_t encoded = metadata.getLong();
+ uint32_t sequence = metadata.getLong();
+ assert(metadata.available() >= encoded);
+ qpid::framing::Buffer buffer(const_cast<char*>(data) + metadata.getPosition(), encoded);
+ msg = protocols.decode(buffer);
+ assert(buffer.getPosition() == encoded);
+ msg.setSequence(qpid::framing::SequenceNumber(sequence));
+ return encoded + metadata.getPosition();
+}
+
+}
+
+PagedQueue::PagedQueue(const std::string& name, const std::string& directory, uint m, uint factor, ProtocolRegistry& p)
+ : pageSize(file.getPageSize()*factor), maxLoaded(m), protocols(p), offset(0)
+{
+ path = file.open(name, directory);
+ QPID_LOG(debug, "PagedQueue[" << path << "]");
+}
+
+size_t PagedQueue::size()
+{
+ size_t total(0);
+ for (Used::const_iterator i = used.begin(); i != used.end(); ++i) {
+ total += i->second.available();
+ }
+ return total;
+}
+
+bool PagedQueue::deleted(const QueueCursor& cursor)
+{
+ if (cursor.valid) {
+ Used::iterator page = findPage(cursor.position, false);
+ if (page == used.end()) {
+ return false;
+ }
+ page->second.deleted(cursor.position);
+ if (page->second.empty()) {
+ //move page to free list
+ --loaded;
+ page->second.clear(file);
+ free.push_back(page->second);
+ used.erase(page);
+ }
+ return true;
+ } else {
+ return false;
+ }
+}
+
+void PagedQueue::publish(const Message& added)
+{
+ if (encodedSize(added) > pageSize) {
+ throw qpid::Exception(QPID_MSG("Message is larger than page size for queue backed by " << path));
+ }
+ Used::reverse_iterator i = used.rbegin();
+ if (i != used.rend()) {
+ if (!i->second.isLoaded()) load(i->second);
+ if (i->second.add(added)) return;
+ }
+ //used is empty or last page is full, need to add a new page
+ if (!newPage(added.getSequence()).add(added)) {
+ throw qpid::Exception(QPID_MSG("Could not add message to paged queue backed by " << path));
+ }
+}
+
+Message* PagedQueue::next(QueueCursor& cursor)
+{
+ Used::iterator i = used.begin();
+ if (cursor.valid) {
+ qpid::framing::SequenceNumber position(cursor.position);
+ ++position;
+ i = findPage(position, true);
+ if (i == used.end() && !used.empty() && used.begin()->first > position) i = used.begin();
+ }
+ while (i != used.end()) {
+ Message* m = i->second.next(version, cursor);
+ QPID_LOG(debug, "PagedQueue::next(" << cursor.valid << ":" << cursor.position << "): " << m);
+ if (m) return m;
+ ++i;
+ }
+ QPID_LOG(debug, "PagedQueue::next(" << cursor.valid << ":" << cursor.position << ") returning 0 ");
+ return 0;
+}
+
+Message* PagedQueue::release(const QueueCursor& cursor)
+{
+ if (cursor.valid) {
+ Used::iterator i = findPage(cursor.position, true);
+ if (i == used.end()) return 0;
+ return i->second.release(cursor.position);
+ } else {
+ return 0;
+ }
+}
+
+Message* PagedQueue::find(const framing::SequenceNumber& position, QueueCursor* cursor)
+{
+ Used::iterator i = findPage(position, true);
+ if (i != used.end()) {
+ Message* m = i->second.find(position);
+ if (cursor) {
+ cursor->setPosition(version, m ? m->getSequence() : position);
+ }
+ return m;
+ } else {
+ return 0;
+ }
+}
+
+void PagedQueue::foreach(Functor)
+{
+ //TODO:
+}
+
+Message* PagedQueue::find(const QueueCursor& cursor)
+{
+ if (cursor.valid) return find(cursor.position, 0);
+ else return 0;
+}
+
+PagedQueue::Page::Page(size_t s, size_t o) : size(s), offset(o), region(0), used(0)
+{
+ QPID_LOG(debug, "Created Page[" << offset << "], size=" << size);
+}
+
+void PagedQueue::Page::deleted(qpid::framing::SequenceNumber s)
+{
+ if (isLoaded()) {
+ Message* message = find(s);
+ assert(message);//could this ever legitimately be 0?
+ message->setState(DELETED);
+ }
+ contents.remove(s);
+ acquired.remove(s);
+}
+
+Message* PagedQueue::Page::release(qpid::framing::SequenceNumber s)
+{
+ Message* m = find(s);
+ if (m) {
+ m->setState(AVAILABLE);
+ }
+ acquired.remove(s);
+ return m;
+}
+
+bool PagedQueue::Page::add(const Message& message)
+{
+ assert(region);
+ assert (size >= used);
+ size_t encoded = encode(message, region + used, size - used);
+ QPID_LOG(debug, "Calling Page[" << offset << "]::add() used=" << used << ", size=" << size << ", encoded=" << encoded << ")");
+ if (encoded) {
+ used += encoded;
+ messages.push_back(message);
+ messages.back().setState(AVAILABLE);
+ contents.add(message.getSequence());
+ return true;
+ } else {
+ return false;
+ }
+}
+
+bool PagedQueue::Page::empty() const
+{
+ return contents.empty();
+}
+
+bool PagedQueue::Page::isLoaded() const
+{
+ return region;
+}
+
+Message* PagedQueue::Page::next(uint32_t version, QueueCursor& cursor)
+{
+ if (messages.empty()) return 0;
+
+ qpid::framing::SequenceNumber position;
+ if (cursor.valid) {
+ position = cursor.position + 1;
+ if (position < messages.front().getSequence()) {
+ position = messages.front().getSequence();
+ cursor.setPosition(position, version);
+ }
+ } else {
+ position = messages.front().getSequence();
+ cursor.setPosition(position, version);
+ }
+
+ Message* m;
+ do {
+ m = find(position);
+ if (m) cursor.setPosition(position, version);
+ ++position;
+ } while (m != 0 && !cursor.check(*m));
+ return m;
+ //if it is the first in the page, increment the hint count of the page
+ //if it is the last in the page, decrement the hint count of the page
+}
+
+/**
+ * Called before adding to the free list
+ */
+void PagedQueue::Page::clear(MemoryMappedFile& file)
+{
+ if (region) file.unmap(region, size);
+ region = 0;
+ used = 0;
+ contents.clear();
+ messages.clear();
+}
+
+size_t PagedQueue::Page::available() const
+{
+ return contents.size() - acquired.size();
+}
+
+Message* PagedQueue::Page::find(qpid::framing::SequenceNumber position)
+{
+ if (messages.size()) {
+ assert(position >= messages.front().getSequence());
+
+ size_t index = position - messages.front().getSequence();
+ if (index < messages.size()) return &(messages[index]);
+ else return 0;
+ } else {
+ //page is empty, is this an error?
+ QPID_LOG(warning, "Could not find message at " << position << "; empty page.");
+ return 0;
+ }
+
+ //if it is the first in the page, increment the hint count of the page
+ //if it is the last in the page, decrement the hint count of the page
+}
+
+void PagedQueue::Page::load(MemoryMappedFile& file, ProtocolRegistry& protocols)
+{
+ QPID_LOG(debug, "Page[" << offset << "]::load" << " used=" << used << ", size=" << size);
+ assert(region == 0);
+ region = file.map(offset, size);
+ assert(region != 0);
+ bool haveData = used > 0;
+ used = 4;//first 4 bytes are the count
+ if (haveData) {
+ uint32_t count = *(reinterpret_cast<uint32_t*>(region));
+ //decode messages into Page::messages
+ for (size_t i = 0; i < count; ++i) {
+ Message message;
+ used += decode(protocols, message, region + used, size - used);
+ if (!contents.contains(message.getSequence())) {
+ message.setState(DELETED);
+ QPID_LOG(debug, "Setting state to deleted for message loaded at " << message.getSequence());
+ } else if (acquired.contains(message.getSequence())) {
+ message.setState(ACQUIRED);
+ } else {
+ message.setState(AVAILABLE);
+ }
+ messages.push_back(message);
+ }
+ if (messages.size()) {
+ QPID_LOG(debug, "Page[" << offset << "]::load " << messages.size() << " messages loaded from "
+ << messages.front().getSequence() << " to " << messages.back().getSequence());
+ } else {
+ QPID_LOG(debug, "Page[" << offset << "]::load no messages loaded");
+ }
+ }//else there is nothing we need to explicitly load, just needed to map region
+}
+
+void PagedQueue::Page::unload(MemoryMappedFile& file)
+{
+ if (messages.size()) {
+ QPID_LOG(debug, "Page[" << offset << "]::unload " << messages.size() << " messages to unload from "
+ << messages.front().getSequence() << " to " << messages.back().getSequence());
+ } else {
+ QPID_LOG(debug, "Page[" << offset << "]::unload no messages to unload");
+ }
+ for (std::deque<Message>::iterator i = messages.begin(); i != messages.end(); ++i) {
+ if (i->getState() == ACQUIRED) acquired.add(i->getSequence());
+ }
+ uint32_t count = messages.size();
+ ::memcpy(region, &count, sizeof(count));
+ file.flush(region, size);
+ file.unmap(region, size);
+ //remove messages from memory
+ messages.clear();
+ region = 0;
+}
+
+void PagedQueue::load(Page& page)
+{
+ //if needed, release another page
+ if (loaded == maxLoaded) {
+ //which page to select?
+ Used::reverse_iterator i = used.rbegin();
+ while (i != used.rend() && !i->second.isLoaded()) {
+ ++i;
+ }
+ assert(i != used.rend());
+ unload(i->second);
+ }
+ page.load(file, protocols);
+ ++loaded;
+ QPID_LOG(debug, "PagedQueue[" << path << "] loaded page, " << loaded << " pages now loaded");
+}
+
+void PagedQueue::unload(Page& page)
+{
+ page.unload(file);
+ --loaded;
+ QPID_LOG(debug, "PagedQueue[" << path << "] unloaded page, " << loaded << " pages now loaded");
+}
+
+
+PagedQueue::Page& PagedQueue::newPage(qpid::framing::SequenceNumber id)
+{
+ if (loaded == maxLoaded) {
+ //need to release a page from memory to make way for a new one
+
+ //choose last one?
+ Used::reverse_iterator i = used.rbegin();
+ while (!i->second.isLoaded() && i != used.rend()) {
+ ++i;
+ }
+ assert(i != used.rend());
+ unload(i->second);
+ }
+ if (free.empty()) {
+ //need to extend file and add some pages to the free list
+ addPages(4/*arbitrary number, should this be config item?*/);
+ }
+ std::pair<Used::iterator, bool> result = used.insert(Used::value_type(id, free.front()));
+ QPID_LOG(debug, "Added page for sequence starting from " << id);
+ assert(result.second);
+ free.pop_front();
+ load(result.first->second);
+ return result.first->second;
+}
+
+void PagedQueue::addPages(size_t count)
+{
+ for (size_t i = 0; i < count; ++i) {
+ free.push_back(Page(pageSize, offset));
+ offset += pageSize;
+ file.expand(offset);
+ }
+ QPID_LOG(debug, "Added " << count << " pages to free list; now have " << used.size() << " used, and " << free.size() << " free");
+}
+
+PagedQueue::Used::iterator PagedQueue::findPage(const QueueCursor& cursor)
+{
+ Used::iterator i = used.begin();
+ if (cursor.valid) {
+ i = findPage(cursor.position, true);
+ } else if (i != used.end() && !i->second.isLoaded()) {
+ load(i->second);
+ }
+ return i;
+}
+
+PagedQueue::Used::iterator PagedQueue::findPage(qpid::framing::SequenceNumber n, bool loadIfRequired)
+{
+ Used::iterator i = used.end();
+ for (Used::iterator j = used.begin(); j != used.end() && j->first <= n; ++j) {
+ i = j;
+ }
+ if (loadIfRequired && i != used.end() && !i->second.isLoaded()) {
+ load(i->second);
+ }
+ return i;
+}
+
+}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/PagedQueue.h b/cpp/src/qpid/broker/PagedQueue.h
new file mode 100644
index 0000000000..7b96a37141
--- /dev/null
+++ b/cpp/src/qpid/broker/PagedQueue.h
@@ -0,0 +1,98 @@
+#ifndef QPID_BROKER_PAGEDQUEUE_H
+#define QPID_BROKER_PAGEDQUEUE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Messages.h"
+#include "qpid/framing/SequenceSet.h"
+#include "qpid/sys/MemoryMappedFile.h"
+#include <deque>
+#include <list>
+#include <map>
+
+namespace qpid {
+namespace broker {
+class ProtocolRegistry;
+/**
+ *
+ */
+class PagedQueue : public Messages {
+ public:
+ PagedQueue(const std::string& name, const std::string& directory, uint maxLoaded, uint pageFactor, ProtocolRegistry& protocols);
+ size_t size();
+ bool deleted(const QueueCursor&);
+ void publish(const Message& added);
+ Message* next(QueueCursor& cursor);
+ Message* release(const QueueCursor& cursor);
+ Message* find(const framing::SequenceNumber&, QueueCursor*);
+ Message* find(const QueueCursor&);
+ void foreach(Functor);
+ private:
+ class Page {
+ public:
+ Page(size_t size, size_t offset);
+ void read();//decode messages from memory mapped file
+ void write();//encode messages into memory mapped file
+ bool isLoaded() const;
+ bool empty() const;
+ void deleted(qpid::framing::SequenceNumber);
+ Message* release(qpid::framing::SequenceNumber);
+ bool add(const Message&);
+ Message* next(uint32_t version, QueueCursor&);
+ Message* find(qpid::framing::SequenceNumber);
+ void load(qpid::sys::MemoryMappedFile&,ProtocolRegistry&);
+ void unload(qpid::sys::MemoryMappedFile&);
+ void clear(qpid::sys::MemoryMappedFile&);
+ size_t available() const;
+ private:
+ size_t size;
+ size_t offset;
+
+ char* region;//0 implies not mapped
+ qpid::framing::SequenceSet contents;
+ qpid::framing::SequenceSet acquired;
+ std::deque<Message> messages;//decoded representation
+ size_t used;//amount of data used to encode current set of messages held
+ };
+
+ qpid::sys::MemoryMappedFile file;
+ std::string path;
+ const size_t pageSize;
+ const uint maxLoaded;
+ ProtocolRegistry& protocols;
+ size_t offset;
+ typedef std::map<qpid::framing::SequenceNumber, Page> Used;
+ Used used;
+ std::list<Page> free;
+ uint loaded;
+ uint32_t version;
+
+ void addPages(size_t count);
+ Page& newPage(qpid::framing::SequenceNumber);
+ Used::iterator findPage(const QueueCursor& cursor);
+ Used::iterator findPage(qpid::framing::SequenceNumber n, bool loadIfRequired);
+ void load(Page&);
+ void unload(Page&);
+ bool deleted(qpid::framing::SequenceNumber);
+};
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_PAGEDQUEUE_H*/
diff --git a/cpp/src/qpid/broker/Protocol.cpp b/cpp/src/qpid/broker/Protocol.cpp
index e236698142..a98160e502 100644
--- a/cpp/src/qpid/broker/Protocol.cpp
+++ b/cpp/src/qpid/broker/Protocol.cpp
@@ -20,6 +20,7 @@
*/
#include "Protocol.h"
#include "qpid/sys/ConnectionCodec.h"
+#include "qpid/broker/RecoverableMessageImpl.h"
#include "qpid/broker/amqp_0_10/MessageTransfer.h"
#include "qpid/log/Statement.h"
@@ -47,13 +48,25 @@ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> ProtocolReg
}
boost::shared_ptr<RecoverableMessage> ProtocolRegistry::recover(qpid::framing::Buffer& b)
{
- boost::shared_ptr<RecoverableMessage> msg;
- for (Protocols::const_iterator i = protocols.begin(); !msg && i != protocols.end(); ++i) {
- msg = i->second->recover(b);
+ uint32_t position = b.getPosition();
+ for (Protocols::const_iterator i = protocols.begin(); i != protocols.end(); ++i) {
+ boost::shared_ptr<RecoverableMessage> msg = i->second->recover(b);
+ if (msg) return msg;
+ else b.setPosition(position);
}
+ boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer());
+ transfer->decodeHeader(b);
+ boost::shared_ptr<RecoverableMessage> msg(new RecoverableMessageImpl(Message(transfer, transfer)));
return msg;
}
+Message ProtocolRegistry::decode(qpid::framing::Buffer& buffer)
+{
+ boost::shared_ptr<RecoverableMessage> r = recover(buffer);
+ r->decodeContent(buffer);
+ return r->getMessage();
+}
+
ProtocolRegistry::~ProtocolRegistry()
{
for (Protocols::const_iterator i = protocols.begin(); i != protocols.end(); ++i) {
diff --git a/cpp/src/qpid/broker/Protocol.h b/cpp/src/qpid/broker/Protocol.h
index 2f268748fb..1d253e6d0f 100644
--- a/cpp/src/qpid/broker/Protocol.h
+++ b/cpp/src/qpid/broker/Protocol.h
@@ -68,6 +68,7 @@ class ProtocolRegistry : public Protocol
qpid::sys::ConnectionCodec* create(const qpid::framing::ProtocolVersion&, qpid::sys::OutputControl&, const std::string&, const qpid::sys::SecuritySettings&);
boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> translate(const Message&);
boost::shared_ptr<RecoverableMessage> recover(qpid::framing::Buffer&);
+ Message decode(qpid::framing::Buffer&);
~ProtocolRegistry();
void add(const std::string&, Protocol*);
diff --git a/cpp/src/qpid/broker/QueueCursor.h b/cpp/src/qpid/broker/QueueCursor.h
index 2551b64a48..c7368177e9 100644
--- a/cpp/src/qpid/broker/QueueCursor.h
+++ b/cpp/src/qpid/broker/QueueCursor.h
@@ -64,6 +64,7 @@ class QueueCursor
friend class MessageDeque;
friend class MessageMap;
friend class PriorityQueue;
+ friend class PagedQueue;
template <typename T> friend class IndexedDeque;
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/QueueFactory.cpp b/cpp/src/qpid/broker/QueueFactory.cpp
index efeb9ae53b..67499c9985 100644
--- a/cpp/src/qpid/broker/QueueFactory.cpp
+++ b/cpp/src/qpid/broker/QueueFactory.cpp
@@ -30,17 +30,18 @@
#include "qpid/broker/Fairshare.h"
#include "qpid/broker/MessageDeque.h"
#include "qpid/broker/MessageMap.h"
+#include "qpid/broker/PagedQueue.h"
#include "qpid/broker/PriorityQueue.h"
#include "qpid/broker/QueueFlowLimit.h"
#include "qpid/broker/ThresholdAlerts.h"
#include "qpid/broker/FifoDistributor.h"
+#include "qpid/log/Statement.h"
#include <map>
#include <memory>
namespace qpid {
namespace broker {
-
QueueFactory::QueueFactory() : broker(0), store(0), parent(0) {}
boost::shared_ptr<Queue> QueueFactory::create(const std::string& name, const QueueSettings& settings)
@@ -66,6 +67,17 @@ boost::shared_ptr<Queue> QueueFactory::create(const std::string& name, const Que
} else {
queue->messages = std::auto_ptr<Messages>(new PriorityQueue(settings.priorities));
}
+ } else if (settings.paging) {
+ if (!broker) {
+ QPID_LOG(warning, "Cannot create paged queue without broker context");
+ } else if (!qpid::sys::MemoryMappedFile::isSupported()) {
+ QPID_LOG(warning, "Cannot create paged queue; memory mapped file support not available on this platform");
+ } else {
+ queue->messages = std::auto_ptr<Messages>(new PagedQueue(name, broker->getOptions().dataDir,
+ settings.maxPages ? settings.maxPages : 4,
+ settings.pageFactor ? settings.pageFactor : 1,
+ broker->getProtocolRegistry()));
+ }
} else if (settings.lvqKey.empty()) {//LVQ already handled above
queue->messages = std::auto_ptr<Messages>(new MessageDeque());
}
diff --git a/cpp/src/qpid/broker/QueueSettings.cpp b/cpp/src/qpid/broker/QueueSettings.cpp
index cb2b11621a..d28fa38fde 100644
--- a/cpp/src/qpid/broker/QueueSettings.cpp
+++ b/cpp/src/qpid/broker/QueueSettings.cpp
@@ -54,6 +54,9 @@ const std::string ALERT_SIZE_DOWN("qpid.alert_size_down");
const std::string PRIORITIES("qpid.priorities");
const std::string FAIRSHARE("qpid.fairshare");
const std::string FAIRSHARE_ALIAS("x-qpid-fairshare");
+const std::string PAGING("qpid.paging");
+const std::string MAX_PAGES("qpid.max_pages_loaded");
+const std::string PAGE_FACTOR("qpid.page_factor");
const std::string LVQ_LEGACY("qpid.last_value_queue");
const std::string LVQ_LEGACY_KEY("qpid.LVQ_key");
@@ -87,6 +90,9 @@ QueueSettings::QueueSettings(bool d, bool a) :
shareGroups(false),
addTimestamp(false),
dropMessagesAtLimit(false),
+ paging(false),
+ maxPages(0),
+ pageFactor(0),
noLocal(false),
isBrowseOnly(false),
autoDeleteDelay(0),
@@ -187,6 +193,15 @@ bool QueueSettings::handle(const std::string& key, const qpid::types::Variant& v
} else if (key == MAX_FILE_SIZE && value.asUint64() > 0) {
maxFileSize = value.asUint64();
return false; // 'handle' here and also pass to store
+ } else if (key == PAGING) {
+ paging = value;
+ return true;
+ } else if (key == MAX_PAGES) {
+ maxPages = value;
+ return true;
+ } else if (key == PAGE_FACTOR) {
+ pageFactor = value;
+ return true;
} else {
return false;
}
@@ -218,6 +233,25 @@ void QueueSettings::validate() const
throw qpid::framing::InvalidArgumentException(QPID_MSG("Only shared groups are supported at present; " << MessageGroupManager::qpidSharedGroup
<< " is required if " << MessageGroupManager::qpidMessageGroupKey << " is set"));
}
+
+ if (paging) {
+ if(lvqKey.size()) {
+ throw qpid::framing::InvalidArgumentException(QPID_MSG("Cannot specify " << LVQ_KEY << " and " << PAGING << " for the same queue"));
+ }
+ if(priorities) {
+ throw qpid::framing::InvalidArgumentException(QPID_MSG("Cannot specify " << PRIORITIES << " and " << PAGING << " for the same queue"));
+ }
+ if(groupKey.size()) {
+ throw qpid::framing::InvalidArgumentException(QPID_MSG("Cannot specify " << MessageGroupManager::qpidMessageGroupKey << " and " << PAGING << " for the same queue"));
+ }
+ } else {
+ if (maxPages) {
+ throw qpid::framing::InvalidArgumentException(QPID_MSG("Can only specify " << MAX_PAGES << " if " << PAGING << " is set"));
+ }
+ if (pageFactor) {
+ throw qpid::framing::InvalidArgumentException(QPID_MSG("Can only specify " << PAGE_FACTOR << " if " << PAGING << " is set"));
+ }
+ }
}
void QueueSettings::populate(const std::map<std::string, qpid::types::Variant>& inputs, std::map<std::string, qpid::types::Variant>& unused)
diff --git a/cpp/src/qpid/broker/QueueSettings.h b/cpp/src/qpid/broker/QueueSettings.h
index ec012c578f..3f42b53f23 100644
--- a/cpp/src/qpid/broker/QueueSettings.h
+++ b/cpp/src/qpid/broker/QueueSettings.h
@@ -59,6 +59,11 @@ struct QueueSettings
QueueDepth maxDepth;
bool dropMessagesAtLimit;//aka ring queue policy
+ //PagedQueue:
+ bool paging;
+ uint maxPages;
+ uint pageFactor;
+
bool noLocal;
bool isBrowseOnly;
std::string traceId;
diff --git a/cpp/src/qpid/broker/RecoverableMessage.h b/cpp/src/qpid/broker/RecoverableMessage.h
index aafcd756d5..888dc364e9 100644
--- a/cpp/src/qpid/broker/RecoverableMessage.h
+++ b/cpp/src/qpid/broker/RecoverableMessage.h
@@ -30,7 +30,7 @@
namespace qpid {
namespace broker {
class ExpiryPolicy;
-
+class Message;
/**
* The interface through which messages are reloaded on recovery.
*/
@@ -53,6 +53,7 @@ public:
* of length as necessary)
*/
virtual void decodeContent(framing::Buffer& buffer) = 0;
+ virtual Message getMessage() = 0;
virtual ~RecoverableMessage() {};
};
diff --git a/cpp/src/qpid/broker/RecoverableMessageImpl.h b/cpp/src/qpid/broker/RecoverableMessageImpl.h
index a46f5a3676..7cef5fd265 100644
--- a/cpp/src/qpid/broker/RecoverableMessageImpl.h
+++ b/cpp/src/qpid/broker/RecoverableMessageImpl.h
@@ -22,11 +22,11 @@
*
*/
#include "RecoverableMessage.h"
+#include "Message.h"
namespace qpid {
namespace broker {
class DtxBuffer;
-class Message;
class Queue;
class RecoverableMessageImpl : public RecoverableMessage
@@ -43,6 +43,7 @@ public:
void recover(boost::shared_ptr<Queue> queue);
void enqueue(boost::shared_ptr<DtxBuffer> buffer, boost::shared_ptr<Queue> queue);
void dequeue(boost::shared_ptr<DtxBuffer> buffer, boost::shared_ptr<Queue> queue);
+ Message getMessage();
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index ab89a46a46..df1c88d183 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -118,15 +118,8 @@ RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer&
RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buffer& buffer)
{
- framing::Buffer sniffer(buffer.getPointer(), buffer.available());
- RecoverableMessage::shared_ptr m = protocols.recover(sniffer);
- if (m) {
- return m;
- } else {
- boost::intrusive_ptr<qpid::broker::amqp_0_10::MessageTransfer> transfer(new qpid::broker::amqp_0_10::MessageTransfer());
- transfer->decodeHeader(buffer);
- return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(Message(transfer, transfer)));
- }
+ RecoverableMessage::shared_ptr m = protocols.recover(buffer);
+ return m;
}
RecoverableTransaction::shared_ptr RecoveryManagerImpl::recoverTransaction(const std::string& xid,
@@ -191,6 +184,11 @@ void RecoverableMessageImpl::computeExpiration(const boost::intrusive_ptr<Expiry
msg.computeExpiration(ep);
}
+Message RecoverableMessageImpl::getMessage()
+{
+ return msg;
+}
+
void RecoverableQueueImpl::recover(RecoverableMessage::shared_ptr msg)
{
dynamic_pointer_cast<RecoverableMessageImpl>(msg)->recover(queue);
diff --git a/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp b/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
index b9c509c015..a389127b9e 100644
--- a/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
+++ b/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
@@ -228,11 +228,16 @@ void MessageTransfer::decodeHeader(framing::Buffer& buffer)
}
void MessageTransfer::decodeContent(framing::Buffer& buffer)
{
- if (buffer.available()) {
+ decodeContent(buffer, buffer.available());
+}
+
+void MessageTransfer::decodeContent(framing::Buffer& buffer, size_t size)
+{
+ if (size) {
//get the data as a string and set that as the content
//body on a frame then add that frame to the frameset
AMQFrame frame((AMQContentBody()));
- frame.castBody<AMQContentBody>()->decode(buffer, buffer.available());
+ frame.castBody<AMQContentBody>()->decode(buffer, size);
frame.setFirstSegment(false);
frames.append(frame);
} else {
@@ -373,25 +378,4 @@ boost::intrusive_ptr<PersistableMessage> MessageTransfer::merge(const std::map<s
}
return clone;
}
-}
-// qpid::broker namespace, TODO: move these elsewhere!
-void encode(const Message& in, std::string& out)
-{
- const amqp_0_10::MessageTransfer& transfer = amqp_0_10::MessageTransfer::get(in);
- uint32_t size = transfer.encodedSize();
- std::vector<char> data(size);
- qpid::framing::Buffer buffer(&(data[0]), size);
- transfer.encode(buffer);
- buffer.reset();
- buffer.getRawData(out, size);
-}
-void decode(const std::string& in, Message& out)
-{
- boost::intrusive_ptr<amqp_0_10::MessageTransfer> transfer(new amqp_0_10::MessageTransfer);
- qpid::framing::Buffer buffer(const_cast<char*>(in.data()), in.size());
- transfer->decodeHeader(buffer);
- transfer->decodeContent(buffer);
- out = Message(transfer, transfer);
-}
-
-}} // namespace qpid::broker::amqp_0_10
+}}} // namespace qpid::broker::amqp_0_10
diff --git a/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h b/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
index 9e432235e6..8ac9862445 100644
--- a/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
+++ b/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
@@ -93,6 +93,7 @@ class MessageTransfer : public qpid::broker::Message::Encoding, public qpid::bro
void decodeHeader(framing::Buffer& buffer);
void decodeContent(framing::Buffer& buffer);
+ void decodeContent(framing::Buffer& buffer, size_t size);
void encode(framing::Buffer& buffer) const;
uint32_t encodedSize() const;
diff --git a/cpp/src/qpid/sys/MemoryMappedFile.h b/cpp/src/qpid/sys/MemoryMappedFile.h
new file mode 100644
index 0000000000..3dabada06b
--- /dev/null
+++ b/cpp/src/qpid/sys/MemoryMappedFile.h
@@ -0,0 +1,74 @@
+#ifndef QPID_SYS_MEMORYMAPPEDFILE_H
+#define QPID_SYS_MEMORYMAPPEDFILE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include <memory>
+#include <string>
+
+namespace qpid {
+namespace sys {
+
+class MemoryMappedFilePrivate;
+/**
+ * Abstraction of memory mapping functionality
+ */
+class MemoryMappedFile {
+ public:
+ MemoryMappedFile();
+ /**
+ * Opens a file that can be mapped by region into memory
+ */
+ std::string open(const std::string& name, const std::string& directory);
+ /**
+ * Returns the page size
+ */
+ size_t getPageSize();
+ /**
+ * Load a portion of the file into memory
+ */
+ char* map(size_t offset, size_t size);
+ /**
+ * Evict a portion of the file from memory
+ */
+ void unmap(char* region, size_t size);
+ /**
+ * Flush any changes to a previously mapped region of the file
+ * back to disk
+ */
+ void flush(char* region, size_t size);
+ /**
+ * Expand the capacity of the file
+ */
+ void expand(size_t offset);
+ /**
+ * Returns true if memory mapping is supported, false otherwise
+ */
+ static bool isSupported();
+ private:
+ std::auto_ptr<MemoryMappedFilePrivate> state;
+
+ MemoryMappedFile(const MemoryMappedFile&);
+ MemoryMappedFile& operator=(const MemoryMappedFile&);
+};
+}} // namespace qpid::sys
+
+#endif /*!QPID_SYS_MEMORYMAPPEDFILE_H*/
diff --git a/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp b/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp
new file mode 100644
index 0000000000..eae23cc6a9
--- /dev/null
+++ b/cpp/src/qpid/sys/posix/MemoryMappedFile.cpp
@@ -0,0 +1,105 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/MemoryMappedFile.h"
+#include "qpid/Exception.h"
+#include "qpid/Msg.h"
+#include <sys/mman.h>
+#include <unistd.h>
+#include <fcntl.h>
+
+namespace qpid {
+namespace sys {
+namespace {
+const std::string PATH_SEPARATOR("/");
+const std::string ESCAPE("%");
+const std::string VALID("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789_-.");
+std::string getFileName(const std::string& name, const std::string& dir)
+{
+ std::stringstream filename;
+ if (dir.size()) filename << dir << PATH_SEPARATOR;
+ size_t start = 0;
+ while (true) {
+ size_t i = name.find_first_not_of(VALID, start);
+ if (i == std::string::npos) {
+ filename << name.substr(start);
+ return filename.str();
+ } else {
+ if (i > start) filename << name.substr(start, i-start);
+ filename << ESCAPE << (int) name.at(i);
+ start = i+1;
+ }
+ }
+
+}
+}
+
+class MemoryMappedFilePrivate
+{
+ friend class MemoryMappedFile;
+ const int fd;
+ MemoryMappedFilePrivate(int fd_) : fd(fd_) {}
+};
+MemoryMappedFile::MemoryMappedFile() {}
+
+std::string MemoryMappedFile::open(const std::string& name, const std::string& directory)
+{
+ std::string path = getFileName(name, directory);
+
+ int flags = O_CREAT | O_TRUNC | O_RDWR;
+ int fd = ::open(path.c_str(), flags, S_IRWXU);
+ if (fd == -1) throw qpid::Exception(QPID_MSG("Failed to open memory mapped file " << path << ": " << qpid::sys::strError(errno) << " [flags=" << flags << "]"));
+ state = std::auto_ptr<MemoryMappedFilePrivate>(new MemoryMappedFilePrivate(fd));
+ return path;
+}
+size_t MemoryMappedFile::getPageSize()
+{
+ return ::sysconf(_SC_PAGE_SIZE);
+}
+char* MemoryMappedFile::map(size_t offset, size_t size)
+{
+ int protection = PROT_READ | PROT_WRITE;
+ char* region = (char*) ::mmap(0, size, protection, MAP_SHARED, state->fd, offset);
+ if (region == MAP_FAILED) {
+ throw qpid::Exception(QPID_MSG("Failed to map page into memory: " << qpid::sys::strError(errno)));
+ }
+ return region;
+
+}
+void MemoryMappedFile::unmap(char* region, size_t size)
+{
+ ::munmap(region, size);
+}
+void MemoryMappedFile::flush(char* region, size_t size)
+{
+ ::msync(region, size, MS_ASYNC);
+}
+void MemoryMappedFile::expand(size_t offset)
+{
+ if ((::lseek(state->fd, offset - 1, SEEK_SET) == -1) || (::write(state->fd, "", 1) == -1)) {
+ throw qpid::Exception(QPID_MSG("Failed to expand paged queue file: " << qpid::sys::strError(errno)));
+ }
+}
+bool MemoryMappedFile::isSupported()
+{
+ return true;
+}
+
+}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/windows/MemoryMappedFile.cpp b/cpp/src/qpid/sys/windows/MemoryMappedFile.cpp
new file mode 100644
index 0000000000..c43a50db74
--- /dev/null
+++ b/cpp/src/qpid/sys/windows/MemoryMappedFile.cpp
@@ -0,0 +1,55 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/sys/MemoryMappedFile.h"
+
+namespace qpid {
+namespace sys {
+class MemoryMappedFilePrivate {};
+
+MemoryMappedFile::MemoryMappedFile() {}
+
+std::string MemoryMappedFile::open(const std::string& /*name*/, const std::string& /*directory*/)
+{
+ return std::string();
+}
+size_t MemoryMappedFile::getPageSize()
+{
+ return 0;
+}
+char* MemoryMappedFile::map(size_t /*offset*/, size_t /*size*/)
+{
+ return 0;
+}
+void MemoryMappedFile::unmap(char* /*region*/, size_t /*size*/)
+{
+}
+void MemoryMappedFile::flush(char* /*region*/, size_t /*size*/)
+{
+}
+void MemoryMappedFile::expand(size_t /*offset*/)
+{
+}
+bool MemoryMappedFile::isSupported()
+{
+ return false;
+}
+
+}} // namespace qpid::sys
diff --git a/cpp/src/tests/CMakeLists.txt b/cpp/src/tests/CMakeLists.txt
index cb10414970..dd29743126 100644
--- a/cpp/src/tests/CMakeLists.txt
+++ b/cpp/src/tests/CMakeLists.txt
@@ -319,6 +319,10 @@ add_test (qpid-client-test ${test_wrap} ${qpid-client-test_LOCATION})
add_test (quick_perftest ${test_wrap} ${qpid-perftest_LOCATION} --summary --count 100)
add_test (quick_topictest ${test_wrap} ${CMAKE_CURRENT_SOURCE_DIR}/quick_topictest${test_script_suffix})
add_test (quick_txtest ${test_wrap} ${qpid-txtest_LOCATION} --queues 4 --tx-count 10 --quiet)
+if (NOT CMAKE_SYSTEM_NAME STREQUAL Windows)
+ # paged queue not yet implemented for windows
+ add_test (paged_queue_tests ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_paged_queue_tests${test_script_suffix})
+endif (NOT CMAKE_SYSTEM_NAME STREQUAL Windows)
if (PYTHON_EXECUTABLE)
add_test (run_header_test ${shell} ${CMAKE_CURRENT_SOURCE_DIR}/run_header_test${test_script_suffix})
add_test (python_tests ${test_wrap} ${CMAKE_CURRENT_SOURCE_DIR}/python_tests${test_script_suffix})
diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am
index 3943e21b7f..43a538d6d0 100644
--- a/cpp/src/tests/Makefile.am
+++ b/cpp/src/tests/Makefile.am
@@ -325,6 +325,7 @@ EXTRA_DIST += \
run_perftest \
ring_queue_test \
run_ring_queue_test \
+ run_paged_queue_tests \
dynamic_log_level_test \
dynamic_log_hires_timestamp \
qpid-ctrl \
@@ -362,7 +363,7 @@ CLEANFILES+=valgrind.out *.log *.vglog* dummy_test qpidd.port $(unit_wrappers)
# Not run under valgrind, too slow
LONG_TESTS+=start_broker \
- fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test \
+ fanout_perftest shared_perftest multiq_perftest topic_perftest run_ring_queue_test run_paged_queue_tests\
run_msg_group_tests_soak \
stop_broker \
run_long_federation_sys_tests
diff --git a/cpp/src/tests/MessageTest.cpp b/cpp/src/tests/MessageTest.cpp
index fe670a274e..666a2c6297 100644
--- a/cpp/src/tests/MessageTest.cpp
+++ b/cpp/src/tests/MessageTest.cpp
@@ -19,6 +19,7 @@
*
*/
#include "qpid/broker/Message.h"
+#include "qpid/broker/Protocol.h"
#include "qpid/framing/AMQP_HighestVersion.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/MessageTransferBody.h"
@@ -56,10 +57,12 @@ QPID_AUTO_TEST_CASE(testEncodeDecode)
properties["abc"] = "xyz";
Message msg = MessageUtils::createMessage(properties, data);
- std::string buffer;
- encode(msg, buffer);
- msg = Message();
- decode(buffer, msg);
+ std::vector<char> bytes(msg.getPersistentContext()->encodedSize());
+ qpid::framing::Buffer buffer(&bytes[0], bytes.size());
+ msg.getPersistentContext()->encode(buffer);
+ buffer.reset();
+ ProtocolRegistry registry;
+ msg = registry.decode(buffer);
BOOST_CHECK_EQUAL(routingKey, msg.getRoutingKey());
BOOST_CHECK_EQUAL((uint64_t) data.size(), msg.getContentSize());
diff --git a/cpp/src/tests/run_paged_queue_tests b/cpp/src/tests/run_paged_queue_tests
new file mode 100755
index 0000000000..8a72c23d86
--- /dev/null
+++ b/cpp/src/tests/run_paged_queue_tests
@@ -0,0 +1,32 @@
+#!/bin/bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#setup path to find qpid-config and sender/receiver test progs
+source ./test_env.sh
+
+export PATH=$PWD:$srcdir:$PYTHON_COMMANDS:$PATH
+
+#set port to connect to via env var
+test -s qpidd.port && QPID_PORT=`cat qpidd.port`
+export QPID_PORT
+
+qpid-cpp-benchmark --broker "localhost:$QPID_PORT" --create-option "node:{x-declare:{arguments:{'qpid.paging':True,'qpid.max_size':0,'qpid.max_count':0,'qpid.flow_stop_size':0,'qpid.flow_resume_size':0,'qpid.flow_stop_count':0,'qpid.flow_resume_count':0}}}"
+qpid-cpp-benchmark --broker "localhost:$QPID_PORT" --create-option "node:{x-declare:{arguments:{'qpid.paging':True,'qpid.max_size':0,'qpid.max_count':0,'qpid.flow_stop_size':0,'qpid.flow_resume_size':0,'qpid.flow_stop_count':0,'qpid.flow_resume_count':0}}}" --fill-drain \ No newline at end of file