diff options
Diffstat (limited to 'cpp/src/qpid/broker')
| -rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 6 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/MessageBuilder.cpp | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/MessageStore.h | 17 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/MessageStoreModule.cpp | 12 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/MessageStoreModule.h | 15 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/NullMessageStore.cpp | 16 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/NullMessageStore.h | 18 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Queue.cpp | 8 |
8 files changed, 57 insertions, 38 deletions
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 6aa0f4c30b..0fa8380a32 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -149,7 +149,8 @@ void Message::releaseContent(MessageStore* _store) store = _store; } if (!getPersistenceId()) { - store->stage(*this); + intrusive_ptr<PersistableMessage> pmsg(this); + store->stage(pmsg); } //remove any content frames from the frameset frames.remove(TypeFilter<CONTENT_BODY>()); @@ -162,13 +163,14 @@ void Message::sendContent(Queue& queue, framing::FrameHandler& out, uint16_t max //load content from store in chunks of maxContentSize uint16_t maxContentSize = maxFrameSize - AMQFrame::frameOverhead(); uint64_t expectedSize(frames.getHeaders()->getContentLength()); + intrusive_ptr<const PersistableMessage> pmsg(this); for (uint64_t offset = 0; offset < expectedSize; offset += maxContentSize) { uint64_t remaining = expectedSize - offset; AMQFrame frame(in_place<AMQContentBody>()); string& data = frame.castBody<AMQContentBody>()->getData(); - store->loadContent(queue, *this, data, offset, + store->loadContent(queue, pmsg, data, offset, remaining > maxContentSize ? maxContentSize : remaining); frame.setBof(false); frame.setEof(true); diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp index a56c65333c..376a321d2d 100644 --- a/cpp/src/qpid/broker/MessageBuilder.cpp +++ b/cpp/src/qpid/broker/MessageBuilder.cpp @@ -49,7 +49,8 @@ void MessageBuilder::handle(AMQFrame& frame) throw CommandInvalidException(QPID_MSG("Invalid frame sequence for message (state=" << state << ")")); } if (staging) { - store->appendContent(*message, frame.castBody<AMQContentBody>()->getData()); + intrusive_ptr<const PersistableMessage> cpmsg = boost::static_pointer_cast<const PersistableMessage>(message); + store->appendContent(cpmsg, frame.castBody<AMQContentBody>()->getData()); } else { message->getFrames().append(frame); //have we reached the staging limit? if so stage message and release content diff --git a/cpp/src/qpid/broker/MessageStore.h b/cpp/src/qpid/broker/MessageStore.h index 04dbb22376..432fe30bb3 100644 --- a/cpp/src/qpid/broker/MessageStore.h +++ b/cpp/src/qpid/broker/MessageStore.h @@ -21,6 +21,7 @@ #ifndef _MessageStore_ #define _MessageStore_ +#include <boost/shared_ptr.hpp> #include "PersistableExchange.h" #include "PersistableMessage.h" #include "PersistableQueue.h" @@ -94,7 +95,7 @@ public: * for that queue and avoid searching based on id. Set queue = 0 for * large message staging when the queue is not known. */ - virtual void stage( PersistableMessage& msg) = 0; + virtual void stage(intrusive_ptr<PersistableMessage>& msg) = 0; /** * Destroys a previously staged message. This only needs @@ -102,12 +103,13 @@ public: * enqueued, deletion will be automatic when the message * is dequeued from all queues it was enqueued onto). */ - virtual void destroy(PersistableMessage& msg) = 0; + virtual void destroy(intrusive_ptr<PersistableMessage>& msg) = 0; /** * Appends content to a previously staged message */ - virtual void appendContent(const PersistableMessage& msg, const std::string& data) = 0; + virtual void appendContent(intrusive_ptr<const PersistableMessage>& msg, + const std::string& data) = 0; /** * Loads (a section) of content data for the specified @@ -118,7 +120,8 @@ public: * meta-data). */ virtual void loadContent(const qpid::broker::PersistableQueue& queue, - const PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length) = 0; + intrusive_ptr<const PersistableMessage>& msg, + std::string& data, uint64_t offset, uint32_t length) = 0; /** * Enqueues a message, storing the message if it has not @@ -134,7 +137,8 @@ public: * distributed transaction in which the operation takes * place or null for 'local' transactions */ - virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue) = 0; + virtual void enqueue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue) = 0; /** * Dequeues a message, recording that the given message is @@ -150,7 +154,8 @@ public: * distributed transaction in which the operation takes * place or null for 'local' transactions */ - virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue) = 0; + virtual void dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue) = 0; /** * Flushes all async messages to disk for the specified queue diff --git a/cpp/src/qpid/broker/MessageStoreModule.cpp b/cpp/src/qpid/broker/MessageStoreModule.cpp index 797ac1f617..a1979e2f43 100644 --- a/cpp/src/qpid/broker/MessageStoreModule.cpp +++ b/cpp/src/qpid/broker/MessageStoreModule.cpp @@ -73,33 +73,33 @@ void MessageStoreModule::recover(RecoveryManager& registry) TRANSFER_EXCEPTION(store->recover(registry)); } -void MessageStoreModule::stage( PersistableMessage& msg) +void MessageStoreModule::stage( intrusive_ptr<PersistableMessage>& msg) { TRANSFER_EXCEPTION(store->stage(msg)); } -void MessageStoreModule::destroy(PersistableMessage& msg) +void MessageStoreModule::destroy(intrusive_ptr<PersistableMessage>& msg) { TRANSFER_EXCEPTION(store->destroy(msg)); } -void MessageStoreModule::appendContent(const PersistableMessage& msg, const std::string& data) +void MessageStoreModule::appendContent(intrusive_ptr<const PersistableMessage>& msg, const std::string& data) { TRANSFER_EXCEPTION(store->appendContent(msg, data)); } void MessageStoreModule::loadContent(const qpid::broker::PersistableQueue& queue, - const PersistableMessage& msg, string& data, uint64_t offset, uint32_t length) + intrusive_ptr<const PersistableMessage>& msg, string& data, uint64_t offset, uint32_t length) { TRANSFER_EXCEPTION(store->loadContent(queue, msg, data, offset, length)); } -void MessageStoreModule::enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue) +void MessageStoreModule::enqueue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue) { TRANSFER_EXCEPTION(store->enqueue(ctxt, msg, queue)); } -void MessageStoreModule::dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue) +void MessageStoreModule::dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue) { TRANSFER_EXCEPTION(store->dequeue(ctxt, msg, queue)); } diff --git a/cpp/src/qpid/broker/MessageStoreModule.h b/cpp/src/qpid/broker/MessageStoreModule.h index 6738f0e539..e7404487b0 100644 --- a/cpp/src/qpid/broker/MessageStoreModule.h +++ b/cpp/src/qpid/broker/MessageStoreModule.h @@ -55,14 +55,17 @@ public: void unbind(const PersistableExchange& exchange, const PersistableQueue& queue, const std::string& key, const framing::FieldTable& args); void recover(RecoveryManager& queues); - void stage(PersistableMessage& msg); - void destroy(PersistableMessage& msg); - void appendContent(const PersistableMessage& msg, const std::string& data); + void stage(intrusive_ptr<PersistableMessage>& msg); + void destroy(intrusive_ptr<PersistableMessage>& msg); + void appendContent(intrusive_ptr<const PersistableMessage>& msg, const std::string& data); void loadContent(const qpid::broker::PersistableQueue& queue, - const PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length); + intrusive_ptr<const PersistableMessage>& msg, std::string& data, + uint64_t offset, uint32_t length); - void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue); - void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue); + void enqueue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue); + void dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue); u_int32_t outstandingQueueAIO(const PersistableQueue& queue); void flush(const qpid::broker::PersistableQueue& queue); diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp index eb20ab6936..c0dbd9a315 100644 --- a/cpp/src/qpid/broker/NullMessageStore.cpp +++ b/cpp/src/qpid/broker/NullMessageStore.cpp @@ -79,34 +79,34 @@ void NullMessageStore::recover(RecoveryManager&) QPID_LOG(info, "Persistence not enabled, no recovery attempted."); } -void NullMessageStore::stage(PersistableMessage&) +void NullMessageStore::stage(intrusive_ptr<PersistableMessage>&) { QPID_LOG(info, "Can't stage message. Persistence not enabled."); } -void NullMessageStore::destroy(PersistableMessage&) +void NullMessageStore::destroy(intrusive_ptr<PersistableMessage>&) { } -void NullMessageStore::appendContent(const PersistableMessage&, const string&) +void NullMessageStore::appendContent(intrusive_ptr<const PersistableMessage>&, const string&) { QPID_LOG(info, "Can't append content. Persistence not enabled."); } -void NullMessageStore::loadContent(const qpid::broker::PersistableQueue&, const PersistableMessage&, string&, uint64_t, uint32_t) +void NullMessageStore::loadContent(const qpid::broker::PersistableQueue&, intrusive_ptr<const PersistableMessage>&, string&, uint64_t, uint32_t) { QPID_LOG(info, "Can't load content. Persistence not enabled."); } -void NullMessageStore::enqueue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& queue) +void NullMessageStore::enqueue(TransactionContext*, intrusive_ptr<PersistableMessage>& msg, const PersistableQueue& queue) { - msg.enqueueComplete(); + msg->enqueueComplete(); QPID_LOG(info, "Message is not durably recorded on '" << queue.getName() << "'. Persistence not enabled."); } -void NullMessageStore::dequeue(TransactionContext*, PersistableMessage& msg, const PersistableQueue&) +void NullMessageStore::dequeue(TransactionContext*, intrusive_ptr<PersistableMessage>& msg, const PersistableQueue&) { - msg.dequeueComplete(); + msg->dequeueComplete(); } void NullMessageStore::flush(const qpid::broker::PersistableQueue&) diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h index caf018655c..6a2e960b0f 100644 --- a/cpp/src/qpid/broker/NullMessageStore.h +++ b/cpp/src/qpid/broker/NullMessageStore.h @@ -56,13 +56,17 @@ public: virtual void unbind(const PersistableExchange& exchange, const PersistableQueue& queue, const std::string& key, const framing::FieldTable& args); virtual void recover(RecoveryManager& queues); - virtual void stage(PersistableMessage& msg); - virtual void destroy(PersistableMessage& msg); - virtual void appendContent(const PersistableMessage& msg, const std::string& data); - virtual void loadContent(const qpid::broker::PersistableQueue& queue, - const PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length); - virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue); - virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue); + virtual void stage(intrusive_ptr<PersistableMessage>& msg); + virtual void destroy(intrusive_ptr<PersistableMessage>& msg); + virtual void appendContent(intrusive_ptr<const PersistableMessage>& msg, + const std::string& data); + virtual void loadContent(const qpid::broker::PersistableQueue& queue, + intrusive_ptr<const PersistableMessage>& msg, std::string& data, + uint64_t offset, uint32_t length); + virtual void enqueue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue); + virtual void dequeue(TransactionContext* ctxt, intrusive_ptr<PersistableMessage>& msg, + const PersistableQueue& queue); virtual u_int32_t outstandingQueueAIO(const PersistableQueue& queue); virtual void flush(const qpid::broker::PersistableQueue& queue); ~NullMessageStore(){} diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp index 376b9367d0..c43ab8c231 100644 --- a/cpp/src/qpid/broker/Queue.cpp +++ b/cpp/src/qpid/broker/Queue.cpp @@ -431,9 +431,11 @@ bool Queue::enqueue(TransactionContext* ctxt, intrusive_ptr<Message> msg) { if (msg->isPersistent() && store) { msg->enqueueAsync(this, store); //increment to async counter -- for message sent to more than one queue - store->enqueue(ctxt, *msg.get(), *this); + intrusive_ptr<PersistableMessage> pmsg = static_pointer_cast<PersistableMessage>(msg); + store->enqueue(ctxt, pmsg, *this); return true; } + //msg->enqueueAsync(); // increments intrusive ptr cnt return false; } @@ -442,9 +444,11 @@ bool Queue::dequeue(TransactionContext* ctxt, intrusive_ptr<Message> msg) { if (msg->isPersistent() && store) { msg->dequeueAsync(this, store); //increment to async counter -- for message sent to more than one queue - store->dequeue(ctxt, *msg.get(), *this); + intrusive_ptr<PersistableMessage> pmsg = static_pointer_cast<PersistableMessage>(msg); + store->dequeue(ctxt, pmsg, *this); return true; } + //msg->dequeueAsync(); // decrements intrusive ptr cnt return false; } |
