diff options
author | Gordon Sim <gsim@apache.org> | 2007-03-30 15:50:07 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2007-03-30 15:50:07 +0000 |
commit | 33d8343d134a391fa7d0a338fafad1a22ff58dc3 (patch) | |
tree | 7c8b9ef3ea62852eb38548be87f908b2892e12a2 /cpp | |
parent | 8fc571ee337add8f2c4ab1f1ebc0c4784c58e2bf (diff) | |
download | qpid-python-33d8343d134a391fa7d0a338fafad1a22ff58dc3.tar.gz |
Refactored the MessageStore interface to restrict visibility of broker core from store implementations.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@524139 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
41 files changed, 903 insertions, 408 deletions
diff --git a/cpp/lib/broker/Broker.cpp b/cpp/lib/broker/Broker.cpp index 335ce2b3a0..fa80867b69 100644 --- a/cpp/lib/broker/Broker.cpp +++ b/cpp/lib/broker/Broker.cpp @@ -29,6 +29,7 @@ #include "MessageStoreModule.h" #include "NullMessageStore.h" #include "ProtocolInitiation.h" +#include "RecoveryManagerImpl.h" #include "Connection.h" #include "sys/ConnectionInputHandler.h" #include "sys/ConnectionInputHandlerFactory.h" @@ -61,9 +62,8 @@ Broker::Broker(const Configuration& conf) : exchanges.declare(amq_match, HeadersExchange::typeName); if(store.get()) { - RecoveryManager recoverer(queues, exchanges); - MessageStoreSettings storeSettings = { getStagingThreshold() }; - store->recover(recoverer, &storeSettings); + RecoveryManagerImpl recoverer(queues, exchanges, conf.getStagingThreshold()); + store->recover(recoverer); } cleaner.start(); diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp index 5673a2c42a..5897914f26 100644 --- a/cpp/lib/broker/BrokerChannel.cpp +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -264,7 +264,7 @@ void Channel::ack(uint64_t firstTag, uint64_t lastTag){ throw ConnectionException(530, "Received ack for unrecognised delivery tag"); }else if(i!=j){ ack_iterator end = ++i; - for_each(j, end, mem_fun_ref(&DeliveryRecord::discard)); + for_each(j, end, bind2nd(mem_fun_ref(&DeliveryRecord::discard), 0)); unacked.erase(unacked.begin(), end); //recalculate the prefetch: diff --git a/cpp/lib/broker/BrokerMessage.cpp b/cpp/lib/broker/BrokerMessage.cpp index fc61cd2296..b14efb966e 100644 --- a/cpp/lib/broker/BrokerMessage.cpp +++ b/cpp/lib/broker/BrokerMessage.cpp @@ -33,6 +33,7 @@ #include "AMQMethodBody.h" #include "AMQFrame.h" #include "framing/ChannelAdapter.h" +#include "RecoveryManagerImpl.h" using namespace boost; using namespace qpid::broker; @@ -134,6 +135,9 @@ void BasicMessage::decode(Buffer& buffer, bool headersOnly, uint32_t contentChun void BasicMessage::decodeHeader(Buffer& buffer) { + //don't care about the type here, but want encode/decode to be symmetric + RecoveryManagerImpl::decodeMessageType(buffer); + string exchange; string routingKey; @@ -169,40 +173,42 @@ void BasicMessage::decodeContent(Buffer& buffer, uint32_t chunkSize) } } -void BasicMessage::encode(Buffer& buffer) +void BasicMessage::encode(Buffer& buffer) const { encodeHeader(buffer); encodeContent(buffer); } -void BasicMessage::encodeHeader(Buffer& buffer) +void BasicMessage::encodeHeader(Buffer& buffer) const { + RecoveryManagerImpl::encodeMessageType(*this, buffer); buffer.putShortString(getExchange()); buffer.putShortString(getRoutingKey()); buffer.putLong(header->size()); header->encode(buffer); } -void BasicMessage::encodeContent(Buffer& buffer) +void BasicMessage::encodeContent(Buffer& buffer) const { Mutex::ScopedLock locker(contentLock); if (content.get()) content->encode(buffer); } -uint32_t BasicMessage::encodedSize() +uint32_t BasicMessage::encodedSize() const { return encodedHeaderSize() + encodedContentSize(); } -uint32_t BasicMessage::encodedContentSize() +uint32_t BasicMessage::encodedContentSize() const { Mutex::ScopedLock locker(contentLock); return content.get() ? content->size() : 0; } -uint32_t BasicMessage::encodedHeaderSize() +uint32_t BasicMessage::encodedHeaderSize() const { - return getExchange().size() + 1 + return RecoveryManagerImpl::encodedMessageTypeSize() + +getExchange().size() + 1 + getRoutingKey().size() + 1 + header->size() + 4;//4 extra bytes for size } @@ -216,7 +222,7 @@ void BasicMessage::releaseContent(MessageStore* store) { Mutex::ScopedLock locker(contentLock); if (!isPersistent() && getPersistenceId() == 0) { - store->stage(this); + store->stage(*this); } if (!content.get() || content->size() > 0) { //set content to lazy loading mode (but only if there is diff --git a/cpp/lib/broker/BrokerMessage.h b/cpp/lib/broker/BrokerMessage.h index fcb104edbb..8b408ae669 100644 --- a/cpp/lib/broker/BrokerMessage.h +++ b/cpp/lib/broker/BrokerMessage.h @@ -53,7 +53,7 @@ using framing::string; class BasicMessage : public Message { boost::shared_ptr<framing::AMQHeaderBody> header; std::auto_ptr<Content> content; - sys::Mutex contentLock; + mutable sys::Mutex contentLock; uint64_t size; void sendContent(framing::ChannelAdapter&, uint32_t framesize); @@ -92,25 +92,25 @@ class BasicMessage : public Message { void decodeHeader(framing::Buffer& buffer); void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0); - void encode(framing::Buffer& buffer); - void encodeHeader(framing::Buffer& buffer); - void encodeContent(framing::Buffer& buffer); + void encode(framing::Buffer& buffer) const; + void encodeHeader(framing::Buffer& buffer) const; + void encodeContent(framing::Buffer& buffer) const; /** * @returns the size of the buffer needed to encode this * message in its entirety */ - uint32_t encodedSize(); + uint32_t encodedSize() const; /** * @returns the size of the buffer needed to encode the * 'header' of this message (not just the header frame, * but other meta data e.g.routing key and exchange) */ - uint32_t encodedHeaderSize(); + uint32_t encodedHeaderSize() const; /** * @returns the size of the buffer needed to encode the * (possibly partial) content held by this message */ - uint32_t encodedContentSize(); + uint32_t encodedContentSize() const; /** * Releases the in-memory content data held by this * message. Must pass in a store from which the data can diff --git a/cpp/lib/broker/BrokerMessageBase.h b/cpp/lib/broker/BrokerMessageBase.h index 4989cccdd3..da0cc57756 100644 --- a/cpp/lib/broker/BrokerMessageBase.h +++ b/cpp/lib/broker/BrokerMessageBase.h @@ -25,6 +25,7 @@ #include <string> #include <boost/shared_ptr.hpp> #include "Content.h" +#include "PersistableMessage.h" #include "framing/amqp_types.h" namespace qpid { @@ -49,7 +50,7 @@ class MessageStore; * abstracting away the operations * TODO; AMS: for the moment this is mostly a placeholder */ -class Message { +class Message : public PersistableMessage{ public: typedef boost::shared_ptr<Message> shared_ptr; typedef boost::shared_ptr<framing::AMQMethodBody> AMQMethodBodyPtr; @@ -117,25 +118,25 @@ class Message { return publisher; } - virtual void encode(framing::Buffer& buffer) = 0; - virtual void encodeHeader(framing::Buffer& buffer) = 0; + virtual void encode(framing::Buffer& buffer) const = 0; + virtual void encodeHeader(framing::Buffer& buffer) const = 0; /** * @returns the size of the buffer needed to encode this * message in its entirety */ - virtual uint32_t encodedSize() = 0; + virtual uint32_t encodedSize() const = 0; /** * @returns the size of the buffer needed to encode the * 'header' of this message (not just the header frame, * but other meta data e.g.routing key and exchange) */ - virtual uint32_t encodedHeaderSize() = 0; + virtual uint32_t encodedHeaderSize() const = 0; /** * @returns the size of the buffer needed to encode the * (possibly partial) content held by this message */ - virtual uint32_t encodedContentSize() = 0; + virtual uint32_t encodedContentSize() const = 0; /** * If headers have been received, returns the expected * content size else returns 0. @@ -145,6 +146,7 @@ class Message { virtual void decodeHeader(framing::Buffer& buffer) = 0; virtual void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0) = 0; + static shared_ptr decode(framing::Buffer& buffer); // TODO: AMS 29/1/2007 Don't think these are really part of base class diff --git a/cpp/lib/broker/BrokerMessageMessage.cpp b/cpp/lib/broker/BrokerMessageMessage.cpp index a50375cdd3..73ad961938 100644 --- a/cpp/lib/broker/BrokerMessageMessage.cpp +++ b/cpp/lib/broker/BrokerMessageMessage.cpp @@ -29,6 +29,7 @@ #include "framing/AMQFrame.h" #include "framing/FieldTable.h" #include "framing/BasicHeaderProperties.h" +#include "RecoveryManagerImpl.h" #include <algorithm> @@ -217,17 +218,17 @@ bool MessageMessage::isPersistent() return transfer->getDeliveryMode() == PERSISTENT; } -uint32_t MessageMessage::encodedSize() +uint32_t MessageMessage::encodedSize() const { return encodedHeaderSize() + encodedContentSize(); } -uint32_t MessageMessage::encodedHeaderSize() +uint32_t MessageMessage::encodedHeaderSize() const { - return transfer->size() - transfer->baseSize(); + return RecoveryManagerImpl::encodedMessageTypeSize() + transfer->size() - transfer->baseSize(); } -uint32_t MessageMessage::encodedContentSize() +uint32_t MessageMessage::encodedContentSize() const { return 0; } @@ -237,13 +238,14 @@ uint64_t MessageMessage::expectedContentSize() return 0; } -void MessageMessage::encode(Buffer& buffer) +void MessageMessage::encode(Buffer& buffer) const { encodeHeader(buffer); } -void MessageMessage::encodeHeader(Buffer& buffer) +void MessageMessage::encodeHeader(Buffer& buffer) const { + RecoveryManagerImpl::encodeMessageType(*this, buffer); if (transfer->getBody().isInline()) { transfer->encodeContent(buffer); } else { @@ -259,6 +261,9 @@ void MessageMessage::encodeHeader(Buffer& buffer) void MessageMessage::decodeHeader(Buffer& buffer) { + //don't care about the type here, but want encode/decode to be symmetric + RecoveryManagerImpl::decodeMessageType(buffer); + transfer->decodeContent(buffer); } @@ -269,7 +274,7 @@ void MessageMessage::decodeContent(Buffer& /*buffer*/, uint32_t /*chunkSize*/) MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version, const string& destination, - const framing::Content& body) + const framing::Content& body) const { return new MessageTransferBody(version, transfer->getTicket(), diff --git a/cpp/lib/broker/BrokerMessageMessage.h b/cpp/lib/broker/BrokerMessageMessage.h index a13a63a416..1da171fba8 100644 --- a/cpp/lib/broker/BrokerMessageMessage.h +++ b/cpp/lib/broker/BrokerMessageMessage.h @@ -71,11 +71,11 @@ class MessageMessage: public Message{ const framing::FieldTable& getApplicationHeaders(); bool isPersistent(); - void encode(framing::Buffer& buffer); - void encodeHeader(framing::Buffer& buffer); - uint32_t encodedSize(); - uint32_t encodedHeaderSize(); - uint32_t encodedContentSize(); + void encode(framing::Buffer& buffer) const; + void encodeHeader(framing::Buffer& buffer) const; + uint32_t encodedSize() const; + uint32_t encodedHeaderSize() const; + uint32_t encodedContentSize() const; uint64_t expectedContentSize(); void decodeHeader(framing::Buffer& buffer); void decodeContent(framing::Buffer& buffer, uint32_t contentChunkSize = 0); @@ -86,7 +86,7 @@ class MessageMessage: public Message{ uint32_t framesize); framing::MessageTransferBody* copyTransfer(const framing::ProtocolVersion& version, const std::string& destination, - const framing::Content& body); + const framing::Content& body) const; framing::RequestId requestId; const TransferPtr transfer; diff --git a/cpp/lib/broker/BrokerQueue.cpp b/cpp/lib/broker/BrokerQueue.cpp index 4b0ed6111c..e2b59aa766 100644 --- a/cpp/lib/broker/BrokerQueue.cpp +++ b/cpp/lib/broker/BrokerQueue.cpp @@ -26,6 +26,7 @@ #include <sys/Monitor.h> #include <sys/Time.h> #include <iostream> +#include "QueueRegistry.h" using namespace qpid::broker; using namespace qpid::sys; @@ -53,7 +54,7 @@ Queue::Queue(const string& _name, uint32_t _autodelete, Queue::~Queue(){} void Queue::deliver(Message::shared_ptr& msg){ - enqueue(0, msg, 0); + enqueue(0, msg); process(msg); } @@ -195,17 +196,17 @@ bool Queue::canAutoDelete() const{ return lastUsed && (now()*TIME_MSEC - lastUsed > autodelete); } -void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid) +void Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr& msg) { if (msg->isPersistent() && store) { - store->enqueue(ctxt, msg.get(), *this, xid); + store->enqueue(ctxt, *msg.get(), *this); } } -void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid) +void Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr& msg) { if (msg->isPersistent() && store) { - store->dequeue(ctxt, msg.get(), *this, xid); + store->dequeue(ctxt, *msg.get(), *this); } } @@ -217,8 +218,10 @@ namespace void Queue::create(const FieldTable& settings) { + //TODO: hold onto settings and persist them as part of encode + // in fact settings should be passed in on construction if (store) { - store->create(*this, settings); + store->create(*this); } configure(settings); } @@ -246,3 +249,34 @@ const QueuePolicy* const Queue::getPolicy() { return policy.get(); } + +uint64_t Queue::getPersistenceId() const +{ + return persistenceId; +} + +void Queue::setPersistenceId(uint64_t _persistenceId) +{ + persistenceId = _persistenceId; +} + +void Queue::encode(framing::Buffer& buffer) const +{ + buffer.putShortString(name); + //TODO store all required properties +} + +uint32_t Queue::encodedSize() const +{ + //TODO, revise when storing full set of queue properties + return name.size() + 1/*short string size octet*/; +} + +Queue::shared_ptr Queue::decode(QueueRegistry& queues, framing::Buffer& buffer) +{ + string name; + buffer.getShortString(name); + std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true); + return result.first; +} + diff --git a/cpp/lib/broker/BrokerQueue.h b/cpp/lib/broker/BrokerQueue.h index 45cf317037..20d81e4e87 100644 --- a/cpp/lib/broker/BrokerQueue.h +++ b/cpp/lib/broker/BrokerQueue.h @@ -31,6 +31,7 @@ #include <BrokerMessage.h> #include <FieldTable.h> #include <sys/Monitor.h> +#include "PersistableQueue.h" #include <QueuePolicy.h> // TODO aconway 2007-02-06: Use auto_ptr and boost::ptr_vector to @@ -39,6 +40,7 @@ namespace qpid { namespace broker { class MessageStore; + class QueueRegistry; /** * Thrown when exclusive access would be violated. @@ -51,7 +53,7 @@ namespace qpid { * registered consumers or be stored until dequeued or until one * or more consumers registers. */ - class Queue{ + class Queue : public PersistableQueue{ typedef std::vector<Consumer*> Consumers; typedef std::queue<Message::shared_ptr> Messages; @@ -119,22 +121,28 @@ namespace qpid { inline const string& getName() const { return name; } inline const bool isExclusiveOwner(const ConnectionToken* const o) const { return o == owner; } inline bool hasExclusiveConsumer() const { return exclusive; } - inline uint64_t getPersistenceId() const { return persistenceId; } - inline void setPersistenceId(uint64_t _persistenceId) const { persistenceId = _persistenceId; } bool canAutoDelete() const; - void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid); + void enqueue(TransactionContext* ctxt, Message::shared_ptr& msg); /** * dequeue from store (only done once messages is acknowledged) */ - void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg, const string * const xid); + void dequeue(TransactionContext* ctxt, Message::shared_ptr& msg); /** * dequeues from memory only */ Message::shared_ptr dequeue(); const QueuePolicy* const getPolicy(); + + //PersistableQueue support: + uint64_t getPersistenceId() const; + void setPersistenceId(uint64_t persistenceId); + void encode(framing::Buffer& buffer) const; + uint32_t encodedSize() const; + + static Queue::shared_ptr decode(QueueRegistry& queues, framing::Buffer& buffer); }; } } diff --git a/cpp/lib/broker/DeliveryRecord.cpp b/cpp/lib/broker/DeliveryRecord.cpp index 0d2e5325c5..e0b5bcfeb1 100644 --- a/cpp/lib/broker/DeliveryRecord.cpp +++ b/cpp/lib/broker/DeliveryRecord.cpp @@ -42,12 +42,8 @@ DeliveryRecord::DeliveryRecord(Message::shared_ptr _msg, pull(true){} -void DeliveryRecord::discard(TransactionContext* ctxt, const std::string* const xid) const{ - queue->dequeue(ctxt, msg, xid); -} - -void DeliveryRecord::discard() const{ - discard(0, 0); +void DeliveryRecord::discard(TransactionContext* ctxt) const{ + queue->dequeue(ctxt, msg); } bool DeliveryRecord::matches(uint64_t tag) const{ diff --git a/cpp/lib/broker/DeliveryRecord.h b/cpp/lib/broker/DeliveryRecord.h index bda2c2ec90..9423dd2062 100644 --- a/cpp/lib/broker/DeliveryRecord.h +++ b/cpp/lib/broker/DeliveryRecord.h @@ -46,8 +46,7 @@ namespace qpid { DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const std::string consumerTag, const uint64_t deliveryTag); DeliveryRecord(Message::shared_ptr msg, Queue::shared_ptr queue, const uint64_t deliveryTag); - void discard() const; - void discard(TransactionContext* ctxt, const std::string* const xid) const; + void discard(TransactionContext* ctxt = 0) const; bool matches(uint64_t tag) const; bool coveredBy(const AccumulatedAck* const range) const; void requeue() const; diff --git a/cpp/lib/broker/LazyLoadedContent.cpp b/cpp/lib/broker/LazyLoadedContent.cpp index 131943b448..9810ee671c 100644 --- a/cpp/lib/broker/LazyLoadedContent.cpp +++ b/cpp/lib/broker/LazyLoadedContent.cpp @@ -27,7 +27,7 @@ using namespace qpid::framing; LazyLoadedContent::~LazyLoadedContent() { - store->destroy(msg); + store->destroy(*msg); } LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, Message* const _msg, uint64_t _expectedSize) : @@ -35,7 +35,7 @@ LazyLoadedContent::LazyLoadedContent(MessageStore* const _store, Message* const void LazyLoadedContent::add(AMQContentBody::shared_ptr data) { - store->appendContent(msg, data->getData()); + store->appendContent(*msg, data->getData()); } uint32_t LazyLoadedContent::size() @@ -50,13 +50,13 @@ void LazyLoadedContent::send(ChannelAdapter& channel, uint32_t framesize) { uint64_t remaining = expectedSize - offset; string data; - store->loadContent(msg, data, offset, + store->loadContent(*msg, data, offset, remaining > framesize ? framesize : remaining); channel.send(new AMQContentBody(data)); } } else { string data; - store->loadContent(msg, data, 0, expectedSize); + store->loadContent(*msg, data, 0, expectedSize); channel.send(new AMQContentBody(data)); } } diff --git a/cpp/lib/broker/LazyLoadedContent.h b/cpp/lib/broker/LazyLoadedContent.h index e000a4ef69..3306f6e3ba 100644 --- a/cpp/lib/broker/LazyLoadedContent.h +++ b/cpp/lib/broker/LazyLoadedContent.h @@ -23,6 +23,7 @@ #include <Content.h> #include <MessageStore.h> +#include "BrokerMessageBase.h" namespace qpid { namespace broker { diff --git a/cpp/lib/broker/Makefile.am b/cpp/lib/broker/Makefile.am index bc3d2089c2..29cfb0f347 100644 --- a/cpp/lib/broker/Makefile.am +++ b/cpp/lib/broker/Makefile.am @@ -61,13 +61,20 @@ libqpidbroker_la_SOURCES = \ NameGenerator.h \ NullMessageStore.cpp \ NullMessageStore.h \ + Persistable.h \ + PersistableExchange.h \ + PersistableMessage.h \ + PersistableQueue.h \ Prefetch.h \ QueuePolicy.cpp \ QueuePolicy.h \ QueueRegistry.cpp \ QueueRegistry.h \ - RecoveryManager.cpp \ + RecoverableMessage.h \ + RecoverableQueue.h \ RecoveryManager.h \ + RecoveryManagerImpl.cpp \ + RecoveryManagerImpl.h \ Reference.cpp \ Reference.h \ ConnectionFactory.cpp \ diff --git a/cpp/lib/broker/MessageBuilder.cpp b/cpp/lib/broker/MessageBuilder.cpp index 8bffaef50f..e99dcad7d6 100644 --- a/cpp/lib/broker/MessageBuilder.cpp +++ b/cpp/lib/broker/MessageBuilder.cpp @@ -56,7 +56,7 @@ void MessageBuilder::setHeader(AMQHeaderBody::shared_ptr& header){ } message->setHeader(header); if (stagingThreshold && header->getContentSize() >= stagingThreshold) { - store->stage(message.get()); + store->stage(*message); message->releaseContent(store); } else { auto_ptr<Content> content(new InMemoryContent()); diff --git a/cpp/lib/broker/MessageStore.h b/cpp/lib/broker/MessageStore.h index 9e38408886..1d9ee86e48 100644 --- a/cpp/lib/broker/MessageStore.h +++ b/cpp/lib/broker/MessageStore.h @@ -21,119 +21,108 @@ #ifndef _MessageStore_ #define _MessageStore_ -#include <BrokerMessage.h> -#include <FieldTable.h> -#include <RecoveryManager.h> -#include <TransactionalStore.h> +#include "PersistableExchange.h" +#include "PersistableMessage.h" +#include "PersistableQueue.h" +#include "RecoveryManager.h" +#include "TransactionalStore.h" namespace qpid { - namespace broker { - struct MessageStoreSettings - { - /** - * Messages whose content length is larger than this value - * will be staged (i.e. will have thier data written to - * disk as it arrives) and will load their data lazily. On - * recovery therefore, only the headers should be loaded. - */ - uint64_t stagingThreshold; - }; - /** - * An abstraction of the persistent storage for messages. (In - * all methods, any pointers/references to queues or messages - * are valid only for the duration of the call). - */ - class MessageStore : public TransactionalStore{ - public: - /** - * Record the existance of a durable queue - */ - virtual void create(const Queue& queue, const qpid::framing::FieldTable& settings) = 0; - /** - * Destroy a durable queue - */ - virtual void destroy(const Queue& queue) = 0; +namespace broker { - /** - * Request recovery of queue and message state from store - */ - virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0) = 0; - - /** - * Stores a messages before it has been enqueued - * (enqueueing automatically stores the message so this is - * only required if storage is required prior to that - * point). If the message has not yet been stored it will - * store the headers as well as any content passed in. A - * persistence id will be set on the message which can be - * used to load the content or to append to it. - */ - virtual void stage(Message* const msg) = 0; +/** + * An abstraction of the persistent storage for messages. (In + * all methods, any pointers/references to queues or messages + * are valid only for the duration of the call). + */ +class MessageStore : public TransactionalStore{ +public: + /** + * Record the existence of a durable queue + */ + virtual void create(const PersistableQueue& queue) = 0; + /** + * Destroy a durable queue + */ + virtual void destroy(const PersistableQueue& queue) = 0; + + /** + * Record the existence of a durable exchange + */ + virtual void create(const PersistableExchange& exchange) = 0; + /** + * Destroy a durable exchange + */ + virtual void destroy(const PersistableExchange& exchange) = 0; + + /** + * Request recovery of queue and message state from store + */ + virtual void recover(RecoveryManager& queues) = 0; + + /** + * Stores a messages before it has been enqueued + * (enqueueing automatically stores the message so this is + * only required if storage is required prior to that + * point). If the message has not yet been stored it will + * store the headers as well as any content passed in. A + * persistence id will be set on the message which can be + * used to load the content or to append to it. + */ + virtual void stage(PersistableMessage& msg) = 0; - /** - * Destroys a previously staged message. This only needs - * to be called if the message is never enqueued. (Once - * enqueued, deletion will be automatic when the message - * is dequeued from all queues it was enqueued onto). - */ - virtual void destroy(Message* const msg) = 0; - - /** - * Appends content to a previously staged message - */ - virtual void appendContent(Message* const msg, const std::string& data) = 0; + /** + * Destroys a previously staged message. This only needs + * to be called if the message is never enqueued. (Once + * enqueued, deletion will be automatic when the message + * is dequeued from all queues it was enqueued onto). + */ + virtual void destroy(PersistableMessage& msg) = 0; - /** - * Loads (a section) of content data for the specified - * message (previously stored through a call to stage or - * enqueue) into data. The offset refers to the content - * only (i.e. an offset of 0 implies that the start of the - * content should be loaded, not the headers or related - * meta-data). - */ - virtual void loadContent(Message* const msg, std::string& data, uint64_t offset, uint32_t length) = 0; + /** + * Appends content to a previously staged message + */ + virtual void appendContent(PersistableMessage& msg, const std::string& data) = 0; + + /** + * Loads (a section) of content data for the specified + * message (previously stored through a call to stage or + * enqueue) into data. The offset refers to the content + * only (i.e. an offset of 0 implies that the start of the + * content should be loaded, not the headers or related + * meta-data). + */ + virtual void loadContent(PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length) = 0; + + /** + * Enqueues a message, storing the message if it has not + * been previously stored and recording that the given + * message is on the given queue. + * + * @param msg the message to enqueue + * @param queue the name of the queue onto which it is to be enqueued + * @param xid (a pointer to) an identifier of the + * distributed transaction in which the operation takes + * place or null for 'local' transactions + */ + virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue) = 0; + /** + * Dequeues a message, recording that the given message is + * no longer on the given queue and deleting the message + * if it is no longer on any other queue. + * + * @param msg the message to dequeue + * @param queue the name of th queue from which it is to be dequeued + * @param xid (a pointer to) an identifier of the + * distributed transaction in which the operation takes + * place or null for 'local' transactions + */ + virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue) = 0; + + virtual ~MessageStore(){} +}; - /** - * Enqueues a message, storing the message if it has not - * been previously stored and recording that the given - * message is on the given queue. - * - * @param msg the message to enqueue - * @param queue the name of the queue onto which it is to be enqueued - * @param xid (a pointer to) an identifier of the - * distributed transaction in which the operation takes - * place or null for 'local' transactions - */ - virtual void enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const std::string * const xid) = 0; - /** - * Dequeues a message, recording that the given message is - * no longer on the given queue and deleting the message - * if it is no longer on any other queue. - * - * @param msg the message to dequeue - * @param queue the name of th queue from which it is to be dequeued - * @param xid (a pointer to) an identifier of the - * distributed transaction in which the operation takes - * place or null for 'local' transactions - */ - virtual void dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const std::string * const xid) = 0; - - /** - * Treat all enqueue/dequeues where this xid was specified as being prepared. - */ - virtual void prepared(const std::string * const xid) = 0; - /** - * Treat all enqueue/dequeues where this xid was specified as being committed. - */ - virtual void committed(const std::string * const xid) = 0; - /** - * Treat all enqueue/dequeues where this xid was specified as being aborted. - */ - virtual void aborted(const std::string * const xid) = 0; - - virtual ~MessageStore(){} - }; - } +} } diff --git a/cpp/lib/broker/MessageStoreModule.cpp b/cpp/lib/broker/MessageStoreModule.cpp index 676e86f84a..9939440ecb 100644 --- a/cpp/lib/broker/MessageStoreModule.cpp +++ b/cpp/lib/broker/MessageStoreModule.cpp @@ -28,77 +28,82 @@ MessageStoreModule::MessageStoreModule(const std::string& name) : store(name) { } -void MessageStoreModule::create(const Queue& queue, const qpid::framing::FieldTable& settings) +void MessageStoreModule::create(const PersistableQueue& queue) { - store->create(queue, settings); + store->create(queue); } -void MessageStoreModule::destroy(const Queue& queue) +void MessageStoreModule::destroy(const PersistableQueue& queue) { store->destroy(queue); } -void MessageStoreModule::recover(RecoveryManager& registry, const MessageStoreSettings* const settings) +void MessageStoreModule::create(const PersistableExchange& exchange) { - store->recover(registry, settings); + store->create(exchange); } -void MessageStoreModule::stage(Message* const msg) +void MessageStoreModule::destroy(const PersistableExchange& exchange) +{ + store->destroy(exchange); +} + +void MessageStoreModule::recover(RecoveryManager& registry) +{ + store->recover(registry); +} + +void MessageStoreModule::stage(PersistableMessage& msg) { store->stage(msg); } -void MessageStoreModule::destroy(Message* const msg) +void MessageStoreModule::destroy(PersistableMessage& msg) { store->destroy(msg); } -void MessageStoreModule::appendContent(Message* const msg, const std::string& data) +void MessageStoreModule::appendContent(PersistableMessage& msg, const std::string& data) { store->appendContent(msg, data); } -void MessageStoreModule::loadContent(Message* const msg, string& data, uint64_t offset, uint32_t length) +void MessageStoreModule::loadContent(PersistableMessage& msg, string& data, uint64_t offset, uint32_t length) { store->loadContent(msg, data, offset, length); } -void MessageStoreModule::enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid) +void MessageStoreModule::enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue) { - store->enqueue(ctxt, msg, queue, xid); + store->enqueue(ctxt, msg, queue); } -void MessageStoreModule::dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid) +void MessageStoreModule::dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue) { - store->dequeue(ctxt, msg, queue, xid); + store->dequeue(ctxt, msg, queue); } -void MessageStoreModule::prepared(const string * const xid) -{ - store->prepared(xid); -} - -void MessageStoreModule::committed(const string * const xid) +std::auto_ptr<TransactionContext> MessageStoreModule::begin() { - store->committed(xid); + return store->begin(); } -void MessageStoreModule::aborted(const string * const xid) +std::auto_ptr<TPCTransactionContext> MessageStoreModule::begin(const std::string& xid) { - store->aborted(xid); + return store->begin(xid); } -std::auto_ptr<TransactionContext> MessageStoreModule::begin() +void MessageStoreModule::prepare(TPCTransactionContext& txn) { - return store->begin(); + store->prepare(txn); } -void MessageStoreModule::commit(TransactionContext* ctxt) +void MessageStoreModule::commit(TransactionContext& ctxt) { store->commit(ctxt); } -void MessageStoreModule::abort(TransactionContext* ctxt) +void MessageStoreModule::abort(TransactionContext& ctxt) { store->abort(ctxt); } diff --git a/cpp/lib/broker/MessageStoreModule.h b/cpp/lib/broker/MessageStoreModule.h index 27fedbf635..1787a4f361 100644 --- a/cpp/lib/broker/MessageStoreModule.h +++ b/cpp/lib/broker/MessageStoreModule.h @@ -28,32 +28,39 @@ #include <sys/Module.h> namespace qpid { - namespace broker { - /** - * A null implementation of the MessageStore interface - */ - class MessageStoreModule : public MessageStore{ - qpid::sys::Module<MessageStore> store; - public: - MessageStoreModule(const std::string& name); - void create(const Queue& queue, const qpid::framing::FieldTable& settings); - void destroy(const Queue& queue); - void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0); - void stage(Message* const msg); - void destroy(Message* const msg); - void appendContent(Message* const msg, const std::string& data); - void loadContent(Message* const msg, std::string& data, uint64_t offset, uint32_t length); - void enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid); - void dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid); - void prepared(const std::string * const xid); - void committed(const std::string * const xid); - void aborted(const std::string * const xid); - std::auto_ptr<TransactionContext> begin(); - void commit(TransactionContext* ctxt); - void abort(TransactionContext* ctxt); - ~MessageStoreModule(){} - }; - } +namespace broker { + +/** + * A null implementation of the MessageStore interface + */ +class MessageStoreModule : public MessageStore +{ + qpid::sys::Module<MessageStore> store; +public: + MessageStoreModule(const std::string& name); + + std::auto_ptr<TransactionContext> begin(); + std::auto_ptr<TPCTransactionContext> begin(const std::string& xid); + void prepare(TPCTransactionContext& txn); + void commit(TransactionContext& txn); + void abort(TransactionContext& txn); + + void create(const PersistableQueue& queue); + void destroy(const PersistableQueue& queue); + void create(const PersistableExchange& exchange); + void destroy(const PersistableExchange& exchange); + void recover(RecoveryManager& queues); + void stage(PersistableMessage& msg); + void destroy(PersistableMessage& msg); + void appendContent(PersistableMessage& msg, const std::string& data); + void loadContent(PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length); + void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue); + void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue); + + ~MessageStoreModule(){} +}; + +} } diff --git a/cpp/lib/broker/NullMessageStore.cpp b/cpp/lib/broker/NullMessageStore.cpp index bcb15c2ae0..0d53a31069 100644 --- a/cpp/lib/broker/NullMessageStore.cpp +++ b/cpp/lib/broker/NullMessageStore.cpp @@ -21,7 +21,6 @@ #include <NullMessageStore.h> -#include <BrokerQueue.h> #include <RecoveryManager.h> #include <iostream> @@ -30,75 +29,77 @@ using namespace qpid::broker; NullMessageStore::NullMessageStore(bool _warn) : warn(_warn){} -void NullMessageStore::create(const Queue& queue, const qpid::framing::FieldTable&) +void NullMessageStore::create(const PersistableQueue& queue) { if (warn) std::cout << "WARNING: Can't create durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl; } -void NullMessageStore::destroy(const Queue& queue) +void NullMessageStore::destroy(const PersistableQueue& queue) { if (warn) std::cout << "WARNING: Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled." << std::endl; } -void NullMessageStore::recover(RecoveryManager&, const MessageStoreSettings* const) +void NullMessageStore::create(const PersistableExchange&) +{ +} + +void NullMessageStore::destroy(const PersistableExchange&) +{ +} + +void NullMessageStore::recover(RecoveryManager&) { if (warn) std::cout << "WARNING: Persistence not enabled, no recovery of queues or messages." << std::endl; } -void NullMessageStore::stage(Message* const) +void NullMessageStore::stage(PersistableMessage&) { if (warn) std::cout << "WARNING: Can't stage message. Persistence not enabled." << std::endl; } -void NullMessageStore::destroy(Message* const) +void NullMessageStore::destroy(PersistableMessage&) { if (warn) std::cout << "WARNING: No need to destroy staged message. Persistence not enabled." << std::endl; } -void NullMessageStore::appendContent(Message* const, const string&) +void NullMessageStore::appendContent(PersistableMessage&, const string&) { if (warn) std::cout << "WARNING: Can't append content. Persistence not enabled." << std::endl; } -void NullMessageStore::loadContent(Message* const, string&, uint64_t, uint32_t) +void NullMessageStore::loadContent(PersistableMessage&, string&, uint64_t, uint32_t) { if (warn) std::cout << "WARNING: Can't load content. Persistence not enabled." << std::endl; } -void NullMessageStore::enqueue(TransactionContext*, Message* const, const Queue& queue, const string * const) +void NullMessageStore::enqueue(TransactionContext*, PersistableMessage&, const PersistableQueue& queue) { if (warn) std::cout << "WARNING: Can't enqueue message onto '" << queue.getName() << "'. Persistence not enabled." << std::endl; } -void NullMessageStore::dequeue(TransactionContext*, Message* const, const Queue& queue, const string * const) +void NullMessageStore::dequeue(TransactionContext*, PersistableMessage&, const PersistableQueue& queue) { if (warn) std::cout << "WARNING: Can't dequeue message from '" << queue.getName() << "'. Persistence not enabled." << std::endl; } -void NullMessageStore::prepared(const string * const) -{ - if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl; -} - -void NullMessageStore::committed(const string * const) +std::auto_ptr<TransactionContext> NullMessageStore::begin() { - if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl; + return std::auto_ptr<TransactionContext>(); } -void NullMessageStore::aborted(const string * const) +std::auto_ptr<TPCTransactionContext> NullMessageStore::begin(const std::string&) { - if (warn) std::cout << "WARNING: Persistence not enabled." << std::endl; + return std::auto_ptr<TPCTransactionContext>(); } -std::auto_ptr<TransactionContext> NullMessageStore::begin() +void NullMessageStore::prepare(TPCTransactionContext&) { - return std::auto_ptr<TransactionContext>(); } -void NullMessageStore::commit(TransactionContext*) +void NullMessageStore::commit(TransactionContext&) { } -void NullMessageStore::abort(TransactionContext*) +void NullMessageStore::abort(TransactionContext&) { } diff --git a/cpp/lib/broker/NullMessageStore.h b/cpp/lib/broker/NullMessageStore.h index 705f18ab43..f1a321cff4 100644 --- a/cpp/lib/broker/NullMessageStore.h +++ b/cpp/lib/broker/NullMessageStore.h @@ -26,33 +26,38 @@ #include <BrokerQueue.h> namespace qpid { - namespace broker { - - /** - * A null implementation of the MessageStore interface - */ - class NullMessageStore : public MessageStore{ - const bool warn; - public: - NullMessageStore(bool warn = false); - virtual void create(const Queue& queue, const qpid::framing::FieldTable& settings); - virtual void destroy(const Queue& queue); - virtual void recover(RecoveryManager& queues, const MessageStoreSettings* const settings = 0); - virtual void stage(Message* const msg); - virtual void destroy(Message* const msg); - virtual void appendContent(Message* const msg, const std::string& data); - virtual void loadContent(Message* const msg, std::string& data, uint64_t offset, uint32_t length); - virtual void enqueue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid); - virtual void dequeue(TransactionContext* ctxt, Message* const msg, const Queue& queue, const string * const xid); - virtual void prepared(const std::string * const xid); - virtual void committed(const std::string * const xid); - virtual void aborted(const std::string * const xid); - virtual std::auto_ptr<TransactionContext> begin(); - virtual void commit(TransactionContext* ctxt); - virtual void abort(TransactionContext* ctxt); - ~NullMessageStore(){} - }; - } +namespace broker { + +/** + * A null implementation of the MessageStore interface + */ +class NullMessageStore : public MessageStore +{ + const bool warn; +public: + NullMessageStore(bool warn = false); + + virtual std::auto_ptr<TransactionContext> begin(); + virtual std::auto_ptr<TPCTransactionContext> begin(const std::string& xid); + virtual void prepare(TPCTransactionContext& txn); + virtual void commit(TransactionContext& txn); + virtual void abort(TransactionContext& txn); + + virtual void create(const PersistableQueue& queue); + virtual void destroy(const PersistableQueue& queue); + virtual void create(const PersistableExchange& exchange); + virtual void destroy(const PersistableExchange& exchange); + virtual void recover(RecoveryManager& queues); + virtual void stage(PersistableMessage& msg); + virtual void destroy(PersistableMessage& msg); + virtual void appendContent(PersistableMessage& msg, const std::string& data); + virtual void loadContent(PersistableMessage& msg, std::string& data, uint64_t offset, uint32_t length); + virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue); + virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue); + ~NullMessageStore(){} +}; + +} } diff --git a/cpp/lib/broker/Persistable.h b/cpp/lib/broker/Persistable.h new file mode 100644 index 0000000000..9f48643c9e --- /dev/null +++ b/cpp/lib/broker/Persistable.h @@ -0,0 +1,62 @@ +#ifndef _broker_Persistable_h +#define _broker_Persistable_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "framing/amqp_types.h" +#include "framing/Buffer.h" + +namespace qpid { +namespace broker { + +/** + * Base class for all persistable objects + */ +class Persistable +{ +public: + /** + * Allows the store to attach its own identifier to this object + */ + virtual void setPersistenceId(uint64_t id) = 0; + /** + * Returns any identifier the store may have attached to this + * object + */ + virtual uint64_t getPersistenceId() const = 0; + /** + * Encodes the persistable state of this object into the supplied + * buffer + */ + virtual void encode(framing::Buffer& buffer) const = 0; + /** + * @returns the size of the buffer needed to encode this object + */ + virtual uint32_t encodedSize() const = 0; + + virtual ~Persistable() {}; +}; + +}} + + +#endif diff --git a/cpp/lib/broker/RecoveryManager.cpp b/cpp/lib/broker/PersistableExchange.h index 6548e6a24f..9badf5f609 100644 --- a/cpp/lib/broker/RecoveryManager.cpp +++ b/cpp/lib/broker/PersistableExchange.h @@ -1,3 +1,6 @@ +#ifndef _broker_PersistableExchange_h +#define _broker_PersistableExchange_h + /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -18,29 +21,24 @@ * under the License. * */ -#include <RecoveryManager.h> - -using namespace qpid::broker; -RecoveryManager::RecoveryManager(QueueRegistry& _queues, ExchangeRegistry& _exchanges) : queues(_queues), exchanges(_exchanges) {} +#include <string> +#include "Persistable.h" -RecoveryManager::~RecoveryManager() {} +namespace qpid { +namespace broker { -Queue::shared_ptr RecoveryManager::recoverQueue(const string& name) +/** + * The interface exchanges must expose to the MessageStore in order to be + * persistable. + */ +class PersistableExchange : public Persistable { - std::pair<Queue::shared_ptr, bool> result = queues.declare(name, true); - try { - Exchange::shared_ptr exchange = exchanges.getDefault(); - if (exchange) { - exchange->bind(result.first, result.first->getName(), 0); - } - } catch (ChannelException& e) { - //assume no default exchange has been declared - } - return result.first; -} +public: + virtual ~PersistableExchange() {}; +}; -Exchange::shared_ptr RecoveryManager::recoverExchange(const string& name, const string& type) -{ - return exchanges.declare(name, type).first; -} +}} + + +#endif diff --git a/cpp/lib/broker/PersistableMessage.h b/cpp/lib/broker/PersistableMessage.h new file mode 100644 index 0000000000..f598e48709 --- /dev/null +++ b/cpp/lib/broker/PersistableMessage.h @@ -0,0 +1,53 @@ +#ifndef _broker_PersistableMessage_h +#define _broker_PersistableMessage_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> +#include <boost/shared_ptr.hpp> +#include "Persistable.h" +#include "framing/amqp_types.h" + +namespace qpid { +namespace broker { + +/** + * The interface messages must expose to the MessageStore in order to + * be persistable. + */ + class PersistableMessage : public Persistable +{ +public: + typedef boost::shared_ptr<PersistableMessage> shared_ptr; + + /** + * @returns the size of the headers when encoded + */ + virtual uint32_t encodedHeaderSize() const = 0; + + virtual ~PersistableMessage() {}; +}; + +}} + + +#endif diff --git a/cpp/lib/broker/PersistableQueue.h b/cpp/lib/broker/PersistableQueue.h new file mode 100644 index 0000000000..5dd91dde9b --- /dev/null +++ b/cpp/lib/broker/PersistableQueue.h @@ -0,0 +1,45 @@ +#ifndef _broker_PersistableQueue_h +#define _broker_PersistableQueue_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> +#include "Persistable.h" + +namespace qpid { +namespace broker { + +/** + * The interface queues must expose to the MessageStore in order to be + * persistable. + */ +class PersistableQueue : public Persistable +{ +public: + virtual const std::string& getName() const = 0; + virtual ~PersistableQueue() {}; +}; + +}} + + +#endif diff --git a/cpp/lib/broker/RecoverableMessage.h b/cpp/lib/broker/RecoverableMessage.h new file mode 100644 index 0000000000..4bb0d2c4a1 --- /dev/null +++ b/cpp/lib/broker/RecoverableMessage.h @@ -0,0 +1,57 @@ +#ifndef _broker_RecoverableMessage_h +#define _broker_RecoverableMessage_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <boost/shared_ptr.hpp> +#include "framing/amqp_types.h" +#include "framing/Buffer.h" + +namespace qpid { +namespace broker { + +/** + * The interface through which messages are reloaded on recovery. + */ +class RecoverableMessage +{ +public: + typedef boost::shared_ptr<RecoverableMessage> shared_ptr; + /** + * Used by store to determine whether to load content on recovery + * or let message load its own content as and when it requires it. + * + * @returns true if the content of the message should be loaded + */ + virtual bool loadContent(uint64_t available) = 0; + /** + * Loads the content held in the supplied buffer (may do checking + * of length as necessary) + */ + virtual void decodeContent(framing::Buffer& buffer) = 0; + virtual ~RecoverableMessage() {}; +}; + +}} + + +#endif diff --git a/cpp/lib/broker/RecoverableQueue.h b/cpp/lib/broker/RecoverableQueue.h new file mode 100644 index 0000000000..a5c564b947 --- /dev/null +++ b/cpp/lib/broker/RecoverableQueue.h @@ -0,0 +1,49 @@ +#ifndef _broker_RecoverableQueue_h +#define _broker_RecoverableQueue_h + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "RecoverableMessage.h" +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace broker { + +/** + * The interface through which messages are added back to queues on + * recovery. + */ +class RecoverableQueue +{ +public: + typedef boost::shared_ptr<RecoverableQueue> shared_ptr; + /** + * Used during recovery to add stored messages back to the queue + */ + virtual void recover(RecoverableMessage::shared_ptr msg) = 0; + virtual ~RecoverableQueue() {}; +}; + +}} + + +#endif diff --git a/cpp/lib/broker/RecoveryManager.h b/cpp/lib/broker/RecoveryManager.h index d4e4cff3fd..700bbdcf80 100644 --- a/cpp/lib/broker/RecoveryManager.h +++ b/cpp/lib/broker/RecoveryManager.h @@ -21,20 +21,20 @@ #ifndef _RecoveryManager_ #define _RecoveryManager_ -#include <ExchangeRegistry.h> -#include <QueueRegistry.h> +#include "RecoverableQueue.h" +#include "RecoverableMessage.h" +#include "framing/Buffer.h" namespace qpid { namespace broker { class RecoveryManager{ - QueueRegistry& queues; - ExchangeRegistry& exchanges; public: - RecoveryManager(QueueRegistry& queues, ExchangeRegistry& exchanges); - ~RecoveryManager(); - Queue::shared_ptr recoverQueue(const std::string& name); - Exchange::shared_ptr recoverExchange(const std::string& name, const std::string& type); + virtual ~RecoveryManager(){} + virtual void recoverExchange(framing::Buffer& buffer) = 0; + virtual RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer) = 0; + virtual RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer) = 0; + virtual void recoveryComplete() = 0; }; diff --git a/cpp/lib/broker/RecoveryManagerImpl.cpp b/cpp/lib/broker/RecoveryManagerImpl.cpp new file mode 100644 index 0000000000..c14f9c52cc --- /dev/null +++ b/cpp/lib/broker/RecoveryManagerImpl.cpp @@ -0,0 +1,131 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <RecoveryManagerImpl.h> + +#include "BrokerMessage.h" +#include "BrokerMessageMessage.h" +#include "BrokerQueue.h" + +using namespace qpid; +using namespace qpid::broker; +using boost::dynamic_pointer_cast; + + +static const uint8_t BASIC = 1; +static const uint8_t MESSAGE = 2; + +RecoveryManagerImpl::RecoveryManagerImpl(QueueRegistry& _queues, ExchangeRegistry& _exchanges, uint64_t _stagingThreshold) + : queues(_queues), exchanges(_exchanges), stagingThreshold(_stagingThreshold) {} + +RecoveryManagerImpl::~RecoveryManagerImpl() {} + +class RecoverableMessageImpl : public RecoverableMessage +{ + Message::shared_ptr msg; + const uint64_t stagingThreshold; +public: + RecoverableMessageImpl(Message::shared_ptr& _msg, uint64_t _stagingThreshold) + : msg(_msg), stagingThreshold(_stagingThreshold) {} + ~RecoverableMessageImpl() {}; + bool loadContent(uint64_t available); + void decodeContent(framing::Buffer& buffer); + void recover(Queue::shared_ptr queue); +}; + +class RecoverableQueueImpl : public RecoverableQueue +{ + Queue::shared_ptr queue; +public: + RecoverableQueueImpl(Queue::shared_ptr& _queue) : queue(_queue) {} + ~RecoverableQueueImpl() {}; + void recover(RecoverableMessage::shared_ptr msg); +}; + +void RecoveryManagerImpl::recoverExchange(framing::Buffer&) +{ + //TODO +} + +RecoverableQueue::shared_ptr RecoveryManagerImpl::recoverQueue(framing::Buffer& buffer) +{ + Queue::shared_ptr queue = Queue::decode(queues, buffer); + try { + Exchange::shared_ptr exchange = exchanges.getDefault(); + if (exchange) { + exchange->bind(queue, queue->getName(), 0); + } + } catch (ChannelException& e) { + //assume no default exchange has been declared + } + return RecoverableQueue::shared_ptr(new RecoverableQueueImpl(queue)); +} + +RecoverableMessage::shared_ptr RecoveryManagerImpl::recoverMessage(framing::Buffer& buffer) +{ + buffer.record(); + //peek at type: + Message::shared_ptr message(decodeMessageType(buffer) == MESSAGE ? + ((Message*) new MessageMessage()) : + ((Message*) new BasicMessage())); + buffer.restore(); + message->decodeHeader(buffer); + return RecoverableMessage::shared_ptr(new RecoverableMessageImpl(message, stagingThreshold)); +} + +void RecoveryManagerImpl::recoveryComplete() +{ + //TODO (finalise binding setup etc) +} + +uint8_t RecoveryManagerImpl::decodeMessageType(framing::Buffer& buffer) +{ + return buffer.getOctet(); +} + +void RecoveryManagerImpl::encodeMessageType(const Message& msg, framing::Buffer& buffer) +{ + buffer.putOctet(dynamic_cast<const MessageMessage*>(&msg) ? MESSAGE : BASIC); +} + +uint32_t RecoveryManagerImpl::encodedMessageTypeSize() +{ + return 1; +} + +bool RecoverableMessageImpl::loadContent(uint64_t available) +{ + return !stagingThreshold || available < stagingThreshold; +} + +void RecoverableMessageImpl::decodeContent(framing::Buffer& buffer) +{ + msg->decodeContent(buffer); +} + +void RecoverableMessageImpl::recover(Queue::shared_ptr queue) +{ + queue->recover(msg); +} + +void RecoverableQueueImpl::recover(RecoverableMessage::shared_ptr msg) +{ + dynamic_pointer_cast<RecoverableMessageImpl>(msg)->recover(queue); +} diff --git a/cpp/lib/broker/RecoveryManagerImpl.h b/cpp/lib/broker/RecoveryManagerImpl.h new file mode 100644 index 0000000000..c40de7895f --- /dev/null +++ b/cpp/lib/broker/RecoveryManagerImpl.h @@ -0,0 +1,55 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _RecoveryManagerImpl_ +#define _RecoveryManagerImpl_ + +#include <list> +#include "ExchangeRegistry.h" +#include "QueueRegistry.h" +#include "RecoveryManager.h" + +namespace qpid { +namespace broker { + + class RecoveryManagerImpl : public RecoveryManager{ + QueueRegistry& queues; + ExchangeRegistry& exchanges; + const uint64_t stagingThreshold; + public: + RecoveryManagerImpl(QueueRegistry& queues, ExchangeRegistry& exchanges, uint64_t stagingThreshold); + ~RecoveryManagerImpl(); + + void recoverExchange(framing::Buffer& buffer); + RecoverableQueue::shared_ptr recoverQueue(framing::Buffer& buffer); + RecoverableMessage::shared_ptr recoverMessage(framing::Buffer& buffer); + void recoveryComplete(); + + static uint8_t decodeMessageType(framing::Buffer& buffer); + static void encodeMessageType(const Message& msg, framing::Buffer& buffer); + static uint32_t encodedMessageTypeSize(); + }; + + +} +} + + +#endif diff --git a/cpp/lib/broker/TransactionalStore.h b/cpp/lib/broker/TransactionalStore.h index 17bca3878a..9347edf0ad 100644 --- a/cpp/lib/broker/TransactionalStore.h +++ b/cpp/lib/broker/TransactionalStore.h @@ -22,25 +22,35 @@ #define _TransactionalStore_ #include <memory> +#include <string> namespace qpid { - namespace broker { - struct InvalidTransactionContextException : public std::exception {}; - - class TransactionContext{ - public: - virtual ~TransactionContext(){} - }; - - class TransactionalStore{ - public: - virtual std::auto_ptr<TransactionContext> begin() = 0; - virtual void commit(TransactionContext*) = 0; - virtual void abort(TransactionContext*) = 0; - - virtual ~TransactionalStore(){} - }; - } +namespace broker { + +struct InvalidTransactionContextException : public std::exception {}; + +class TransactionContext { +public: + virtual ~TransactionContext(){} +}; + +class TPCTransactionContext : public TransactionContext { +public: + virtual ~TPCTransactionContext(){} +}; + +class TransactionalStore { +public: + virtual std::auto_ptr<TransactionContext> begin() = 0; + virtual std::auto_ptr<TPCTransactionContext> begin(const std::string& xid) = 0; + virtual void prepare(TPCTransactionContext& txn) = 0; + virtual void commit(TransactionContext& txn) = 0; + virtual void abort(TransactionContext& txn) = 0; + + virtual ~TransactionalStore(){} +}; + +} } diff --git a/cpp/lib/broker/TxAck.cpp b/cpp/lib/broker/TxAck.cpp index b5211158f3..a2f3283f91 100644 --- a/cpp/lib/broker/TxAck.cpp +++ b/cpp/lib/broker/TxAck.cpp @@ -25,8 +25,8 @@ using std::bind2nd; using std::mem_fun_ref; using namespace qpid::broker; -TxAck::TxAck(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked, const std::string* const _xid) : - acked(_acked), unacked(_unacked), xid(_xid){ +TxAck::TxAck(AccumulatedAck& _acked, std::list<DeliveryRecord>& _unacked) : + acked(_acked), unacked(_unacked){ } @@ -35,7 +35,7 @@ bool TxAck::prepare(TransactionContext* ctxt) throw(){ //dequeue all acked messages from their queues for (ack_iterator i = unacked.begin(); i != unacked.end(); i++) { if (i->coveredBy(&acked)) { - i->discard(ctxt, xid); + i->discard(ctxt); } } return true; diff --git a/cpp/lib/broker/TxAck.h b/cpp/lib/broker/TxAck.h index 88c321c445..d023cfae0d 100644 --- a/cpp/lib/broker/TxAck.h +++ b/cpp/lib/broker/TxAck.h @@ -37,7 +37,6 @@ namespace qpid { class TxAck : public TxOp{ AccumulatedAck& acked; std::list<DeliveryRecord>& unacked; - const std::string* const xid; public: /** @@ -45,7 +44,7 @@ namespace qpid { * acks received * @param unacked the record of delivered messages */ - TxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked, const std::string* const xid = 0); + TxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked); virtual bool prepare(TransactionContext* ctxt) throw(); virtual void commit() throw(); virtual void rollback() throw(); diff --git a/cpp/lib/broker/TxBuffer.cpp b/cpp/lib/broker/TxBuffer.cpp index acd3283bb7..e5701c3d46 100644 --- a/cpp/lib/broker/TxBuffer.cpp +++ b/cpp/lib/broker/TxBuffer.cpp @@ -29,11 +29,11 @@ bool TxBuffer::prepare(TransactionalStore* const store) if(store) ctxt = store->begin(); for(op_iterator i = ops.begin(); i < ops.end(); i++){ if(!(*i)->prepare(ctxt.get())){ - if(store) store->abort(ctxt.get()); + if(store) store->abort(*ctxt); return false; } } - if(store) store->commit(ctxt.get()); + if(store) store->commit(*ctxt); return true; } diff --git a/cpp/lib/broker/TxPublish.cpp b/cpp/lib/broker/TxPublish.cpp index 49dd8abd89..57993782d0 100644 --- a/cpp/lib/broker/TxPublish.cpp +++ b/cpp/lib/broker/TxPublish.cpp @@ -22,11 +22,11 @@ using namespace qpid::broker; -TxPublish::TxPublish(Message::shared_ptr _msg, const std::string* const _xid) : msg(_msg), xid(_xid) {} +TxPublish::TxPublish(Message::shared_ptr _msg) : msg(_msg) {} bool TxPublish::prepare(TransactionContext* ctxt) throw(){ try{ - for_each(queues.begin(), queues.end(), Prepare(ctxt, msg, xid)); + for_each(queues.begin(), queues.end(), Prepare(ctxt, msg)); return true; }catch(...){ std::cout << "TxPublish::prepare() - Failed to prepare" << std::endl; @@ -45,11 +45,11 @@ void TxPublish::deliverTo(Queue::shared_ptr& queue){ queues.push_back(queue); } -TxPublish::Prepare::Prepare(TransactionContext* _ctxt, Message::shared_ptr& _msg, const string* const _xid) - : ctxt(_ctxt), msg(_msg), xid(_xid){} +TxPublish::Prepare::Prepare(TransactionContext* _ctxt, Message::shared_ptr& _msg) + : ctxt(_ctxt), msg(_msg){} void TxPublish::Prepare::operator()(Queue::shared_ptr& queue){ - queue->enqueue(ctxt, msg, xid); + queue->enqueue(ctxt, msg); } TxPublish::Commit::Commit(Message::shared_ptr& _msg) : msg(_msg){} diff --git a/cpp/lib/broker/TxPublish.h b/cpp/lib/broker/TxPublish.h index 75f201257e..0c7596086a 100644 --- a/cpp/lib/broker/TxPublish.h +++ b/cpp/lib/broker/TxPublish.h @@ -46,9 +46,8 @@ namespace qpid { class Prepare{ TransactionContext* ctxt; Message::shared_ptr& msg; - const std::string* const xid; public: - Prepare(TransactionContext* ctxt, Message::shared_ptr& msg, const std::string* const xid); + Prepare(TransactionContext* ctxt, Message::shared_ptr& msg); void operator()(Queue::shared_ptr& queue); }; @@ -60,11 +59,10 @@ namespace qpid { }; Message::shared_ptr msg; - const std::string* const xid; std::list<Queue::shared_ptr> queues; public: - TxPublish(Message::shared_ptr msg, const std::string* const xid = 0); + TxPublish(Message::shared_ptr msg); virtual bool prepare(TransactionContext* ctxt) throw(); virtual void commit() throw(); virtual void rollback() throw(); diff --git a/cpp/tests/BrokerChannelTest.cpp b/cpp/tests/BrokerChannelTest.cpp index 61135c898b..66a7138dab 100644 --- a/cpp/tests/BrokerChannelTest.cpp +++ b/cpp/tests/BrokerChannelTest.cpp @@ -68,7 +68,7 @@ class BrokerChannelTest : public CppUnit::TestCase struct MethodCall { const string name; - Message* const msg; + PersistableMessage* msg; const string data;//only needed for appendContent void check(const MethodCall& other) const @@ -92,7 +92,7 @@ class BrokerChannelTest : public CppUnit::TestCase } } - void handle(const string& name, Message* msg, const string& data) + void handle(const string& name, PersistableMessage* msg, const string& data) { MethodCall call = {name, msg, data}; handle(call); @@ -102,25 +102,25 @@ class BrokerChannelTest : public CppUnit::TestCase MockMessageStore() : expectMode(false) {} - void stage(Message* const msg) + void stage(PersistableMessage& msg) { - if(!expectMode) msg->setPersistenceId(1); - MethodCall call = {"stage", msg, ""}; + if(!expectMode) msg.setPersistenceId(1); + MethodCall call = {"stage", &msg, ""}; handle(call); } - void appendContent(Message* msg, const string& data) + void appendContent(PersistableMessage& msg, const string& data) { - MethodCall call = {"appendContent", msg, data}; + MethodCall call = {"appendContent", &msg, data}; handle(call); } // Don't hide overloads. using NullMessageStore::destroy; - void destroy(Message* msg) + void destroy(PersistableMessage& msg) { - MethodCall call = {"destroy", msg, ""}; + MethodCall call = {"destroy", &msg, ""}; handle(call); } @@ -249,11 +249,11 @@ class BrokerChannelTest : public CppUnit::TestCase MockChannel::basicGetBody()); store.expect(); - store.stage(msg); + store.stage(*msg); for (int i = 0; i < 3; i++) { - store.appendContent(msg, data[i]); + store.appendContent(*msg, data[i]); } - store.destroy(msg); + store.destroy(*msg); store.test(); Exchange::shared_ptr exchange = @@ -304,8 +304,8 @@ class BrokerChannelTest : public CppUnit::TestCase policy.update(settings); store.expect(); - store.stage(msg3.get()); - store.destroy(msg3.get()); + store.stage(*msg3); + store.destroy(*msg3); store.test(); Queue::shared_ptr queue(new Queue("my_queue", false, &store, 0)); diff --git a/cpp/tests/LazyLoadedContentTest.cpp b/cpp/tests/LazyLoadedContentTest.cpp index 5a4dd34b53..4cd8e9b307 100644 --- a/cpp/tests/LazyLoadedContentTest.cpp +++ b/cpp/tests/LazyLoadedContentTest.cpp @@ -50,7 +50,7 @@ class LazyLoadedContentTest : public CppUnit::TestCase public: TestMessageStore(const string& _content) : content(_content) {} - void loadContent(Message* const, string& data, uint64_t offset, uint32_t length) + void loadContent(PersistableMessage&, string& data, uint64_t offset, uint32_t length) { if (offset + length <= content.size()) { data = content.substr(offset, length); diff --git a/cpp/tests/MessageBuilderTest.cpp b/cpp/tests/MessageBuilderTest.cpp index d3b3902e97..68e5abf60e 100644 --- a/cpp/tests/MessageBuilderTest.cpp +++ b/cpp/tests/MessageBuilderTest.cpp @@ -51,33 +51,32 @@ class MessageBuilderTest : public CppUnit::TestCase public: - void stage(Message* const msg) + void stage(PersistableMessage& msg) { - if (msg->getPersistenceId() == 0) { - header = new Buffer(msg->encodedHeaderSize()); - msg->encodeHeader(*header); + if (msg.getPersistenceId() == 0) { + header = new Buffer(msg.encodedSize()); + msg.encode(*header); content = new Buffer(contentBufferSize); - msg->setPersistenceId(1); + msg.setPersistenceId(1); } else { throw qpid::Exception("Message already staged!"); } } - void appendContent(Message* msg, const string& data) + void appendContent(PersistableMessage& msg, const string& data) { - if (msg) { + if (msg.getPersistenceId() == 1) { content->putRawData(data); } else { throw qpid::Exception("Invalid message id!"); } } - // Don't hide overloads. using NullMessageStore::destroy; - void destroy(BasicMessage* msg) + void destroy(PersistableMessage& msg) { - CPPUNIT_ASSERT(msg->getPersistenceId()); + CPPUNIT_ASSERT(msg.getPersistenceId()); } BasicMessage::shared_ptr getRestoredMessage() diff --git a/cpp/tests/TxAckTest.cpp b/cpp/tests/TxAckTest.cpp index c189533ea9..eb4ada4ac8 100644 --- a/cpp/tests/TxAckTest.cpp +++ b/cpp/tests/TxAckTest.cpp @@ -38,11 +38,11 @@ class TxAckTest : public CppUnit::TestCase class TestMessageStore : public NullMessageStore { public: - vector< std::pair<Message*, const string*> > dequeued; + vector<PersistableMessage*> dequeued; - void dequeue(TransactionContext*, Message* const msg, const Queue& /*queue*/, const string * const xid) + void dequeue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& /*queue*/) { - dequeued.push_back(std::pair<Message*, const string*>(msg, xid)); + dequeued.push_back(&msg); } TestMessageStore() : NullMessageStore() {} @@ -50,7 +50,6 @@ class TxAckTest : public CppUnit::TestCase }; CPPUNIT_TEST_SUITE(TxAckTest); - CPPUNIT_TEST(testPrepare2pc); CPPUNIT_TEST(testPrepare); CPPUNIT_TEST(testCommit); CPPUNIT_TEST_SUITE_END(); @@ -62,12 +61,11 @@ class TxAckTest : public CppUnit::TestCase vector<Message::shared_ptr> messages; list<DeliveryRecord> deliveries; TxAck op; - std::string xid; public: - TxAckTest() : acked(0), queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries, &xid) + TxAckTest() : acked(0), queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries) { for(int i = 0; i < 10; i++){ Message::shared_ptr msg( @@ -93,17 +91,7 @@ public: CPPUNIT_ASSERT_EQUAL((size_t) 10, deliveries.size()); int dequeued[] = {0, 1, 2, 3, 4, 6, 8}; for (int i = 0; i < 7; i++) { - CPPUNIT_ASSERT_EQUAL(messages[dequeued[i]].get(), store.dequeued[i].first); - } - } - - void testPrepare2pc() - { - xid = "abcdefg"; - testPrepare(); - const string expected(xid); - for (int i = 0; i < 7; i++) { - CPPUNIT_ASSERT_EQUAL(expected, *store.dequeued[i].second); + CPPUNIT_ASSERT_EQUAL((PersistableMessage*) messages[dequeued[i]].get(), store.dequeued[i]); } } diff --git a/cpp/tests/TxBufferTest.cpp b/cpp/tests/TxBufferTest.cpp index 8a9b233eb9..bd8ae3796b 100644 --- a/cpp/tests/TxBufferTest.cpp +++ b/cpp/tests/TxBufferTest.cpp @@ -102,22 +102,25 @@ class TxBufferTest : public CppUnit::TestCase public: MockTransactionalStore() : state(OPEN){} + std::auto_ptr<TPCTransactionContext> begin(const std::string&){ + throw "Operation not supported"; + } + void prepare(TPCTransactionContext&){ + throw "Operation not supported"; + } + std::auto_ptr<TransactionContext> begin(){ actual.push_back(BEGIN); std::auto_ptr<TransactionContext> txn(new TestTransactionContext(this)); return txn; } - void commit(TransactionContext* ctxt){ + void commit(TransactionContext& ctxt){ actual.push_back(COMMIT); - TestTransactionContext* txn(dynamic_cast<TestTransactionContext*>(ctxt)); - CPPUNIT_ASSERT(txn); - txn->commit(); + dynamic_cast<TestTransactionContext&>(ctxt).commit(); } - void abort(TransactionContext* ctxt){ + void abort(TransactionContext& ctxt){ actual.push_back(ABORT); - TestTransactionContext* txn(dynamic_cast<TestTransactionContext*>(ctxt)); - CPPUNIT_ASSERT(txn); - txn->abort(); + dynamic_cast<TestTransactionContext&>(ctxt).abort(); } MockTransactionalStore& expectBegin(){ expected.push_back(BEGIN); diff --git a/cpp/tests/TxPublishTest.cpp b/cpp/tests/TxPublishTest.cpp index d9d5607c06..8ce0da6508 100644 --- a/cpp/tests/TxPublishTest.cpp +++ b/cpp/tests/TxPublishTest.cpp @@ -35,22 +35,16 @@ using namespace qpid::framing; class TxPublishTest : public CppUnit::TestCase { - struct Triple - { - string first; - Message* second; - const string* third; - }; + typedef std::pair<string, PersistableMessage*> msg_queue_pair; class TestMessageStore : public NullMessageStore { public: - vector<Triple> enqueued; + vector<msg_queue_pair> enqueued; - void enqueue(TransactionContext*, Message* const msg, const Queue& queue, const string * const xid) + void enqueue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& queue) { - Triple args = {queue.getName(), msg, xid}; - enqueued.push_back(args); + enqueued.push_back(msg_queue_pair(queue.getName(), &msg)); } //dont care about any of the other methods: @@ -60,7 +54,6 @@ class TxPublishTest : public CppUnit::TestCase CPPUNIT_TEST_SUITE(TxPublishTest); CPPUNIT_TEST(testPrepare); - CPPUNIT_TEST(testPrepare2pc); CPPUNIT_TEST(testCommit); CPPUNIT_TEST_SUITE_END(); @@ -70,7 +63,6 @@ class TxPublishTest : public CppUnit::TestCase Queue::shared_ptr queue2; Message::shared_ptr const msg; TxPublish op; - string xid; public: @@ -79,7 +71,7 @@ public: queue2(new Queue("queue2", false, &store, 0)), msg(new BasicMessage(0, "exchange", "routing_key", false, false, MockChannel::basicGetBody())), - op(msg, &xid) + op(msg) { msg->setHeader(AMQHeaderBody::shared_ptr(new AMQHeaderBody(BASIC))); msg->getHeaderProperties()->setDeliveryMode(PERSISTENT); @@ -93,18 +85,9 @@ public: op.prepare(0); CPPUNIT_ASSERT_EQUAL((size_t) 2, store.enqueued.size()); CPPUNIT_ASSERT_EQUAL(string("queue1"), store.enqueued[0].first); - CPPUNIT_ASSERT_EQUAL(msg.get(), store.enqueued[0].second); + CPPUNIT_ASSERT_EQUAL((PersistableMessage*) msg.get(), store.enqueued[0].second); CPPUNIT_ASSERT_EQUAL(string("queue2"), store.enqueued[1].first); - CPPUNIT_ASSERT_EQUAL(msg.get(), store.enqueued[1].second); - } - - void testPrepare2pc() - { - xid = "abcde"; - const string expected(xid); - testPrepare(); - CPPUNIT_ASSERT_EQUAL(expected, *store.enqueued[0].third); - CPPUNIT_ASSERT_EQUAL(expected, *store.enqueued[1].third); + CPPUNIT_ASSERT_EQUAL((PersistableMessage*) msg.get(), store.enqueued[1].second); } void testCommit() |