diff options
| author | Gordon Sim <gsim@apache.org> | 2007-11-08 14:05:38 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2007-11-08 14:05:38 +0000 |
| commit | 604a216da725da2770978756de763caa5c66e5c7 (patch) | |
| tree | 194340aa45f23efb6fc037d11f61ff63d47ffc7f /cpp/src | |
| parent | cb5d95fba16cbe2a0e03eae6ba6c9ce776b88327 (diff) | |
| download | qpid-python-604a216da725da2770978756de763caa5c66e5c7.tar.gz | |
Make standard exchanges durable
Ensure flags are set correctly for recovered messages with no content
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@593159 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
| -rw-r--r-- | cpp/src/qpid/broker/Broker.cpp | 18 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Broker.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/ExchangeRegistry.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Message.cpp | 24 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/Message.h | 1 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/NullMessageStore.cpp | 21 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/NullMessageStore.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/framing/frame_functors.h | 6 |
8 files changed, 50 insertions, 27 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp index 5896ce7947..a02ec432f6 100644 --- a/cpp/src/qpid/broker/Broker.cpp +++ b/cpp/src/qpid/broker/Broker.cpp @@ -128,10 +128,6 @@ Broker::Broker(const Broker::Options& conf) : } exchanges.declare(empty, DirectExchange::typeName); // Default exchange. - exchanges.declare(amq_direct, DirectExchange::typeName); - exchanges.declare(amq_topic, TopicExchange::typeName); - exchanges.declare(amq_fanout, FanOutExchange::typeName); - exchanges.declare(amq_match, HeadersExchange::typeName); if(conf.enableMgmt) { QPID_LOG(info, "Management enabled"); @@ -153,6 +149,11 @@ Broker::Broker(const Broker::Options& conf) : store->recover(recoverer); } } + //ensure standard exchanges exist (done after recovery from store) + declareStandardExchange(amq_direct, DirectExchange::typeName); + declareStandardExchange(amq_topic, TopicExchange::typeName); + declareStandardExchange(amq_fanout, FanOutExchange::typeName); + declareStandardExchange(amq_match, HeadersExchange::typeName); // Initialize plugins const Plugin::Plugins& plugins=Plugin::getPlugins(); @@ -162,6 +163,15 @@ Broker::Broker(const Broker::Options& conf) : (*i)->initialize(*this); } +void Broker::declareStandardExchange(const std::string& name, const std::string& type) +{ + bool storeEnabled = store.get(); + std::pair<Exchange::shared_ptr, bool> status = exchanges.declare(name, type, storeEnabled); + if (status.second && storeEnabled) { + store->create(*status.first); + } +} + shared_ptr<Broker> Broker::create(int16_t port) { diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h index 9ae6a5c8af..6f4cc97936 100644 --- a/cpp/src/qpid/broker/Broker.h +++ b/cpp/src/qpid/broker/Broker.h @@ -136,6 +136,7 @@ class Broker : public sys::Runnable, public Plugin::Target ManagementObjectVhost::shared_ptr mgmtVhostObject; static MessageStore* createStore(const Options& config); + void declareStandardExchange(const std::string& name, const std::string& type); }; }} diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h index f7f29ff0ea..dd75940c4c 100644 --- a/cpp/src/qpid/broker/ExchangeRegistry.h +++ b/cpp/src/qpid/broker/ExchangeRegistry.h @@ -40,7 +40,7 @@ namespace broker { std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type) throw(UnknownExchangeTypeException); std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type, - bool durable, const qpid::framing::FieldTable& args) + bool durable, const qpid::framing::FieldTable& args = framing::FieldTable()) throw(UnknownExchangeTypeException); void destroy(const std::string& name); Exchange::shared_ptr get(const std::string& name); diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp index 78d6cd3891..6e3e6a55f7 100644 --- a/cpp/src/qpid/broker/Message.cpp +++ b/cpp/src/qpid/broker/Message.cpp @@ -36,7 +36,7 @@ using std::string; TransferAdapter Message::TRANSFER; PublishAdapter Message::PUBLISH; -Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), publisher(0), adapter(0) {} +Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), publisher(0), adapter(0) {} std::string Message::getRoutingKey() const { @@ -121,12 +121,20 @@ void Message::decodeHeader(framing::Buffer& buffer) void Message::decodeContent(framing::Buffer& buffer) { - //get the data as a string and set that as the content - //body on a frame then add that frame to the frameset - AMQFrame frame; - frame.setBody(AMQContentBody()); - frame.castBody<AMQContentBody>()->decode(buffer, buffer.available()); - frames.append(frame); + if (buffer.available()) { + //get the data as a string and set that as the content + //body on a frame then add that frame to the frameset + AMQFrame frame; + frame.setBody(AMQContentBody()); + frame.castBody<AMQContentBody>()->decode(buffer, buffer.available()); + frames.append(frame); + } else { + //adjust header flags + MarkLastSegment f; + frames.map_if(f, TypeFilter(HEADER_BODY)); + } + //mark content loaded + loaded = true; } void Message::releaseContent(MessageStore* _store) @@ -205,5 +213,5 @@ uint64_t Message::contentSize() const bool Message::isContentLoaded() const { - return contentSize() > 0; + return loaded; } diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h index f9a3596a98..d043d50ad0 100644 --- a/cpp/src/qpid/broker/Message.h +++ b/cpp/src/qpid/broker/Message.h @@ -124,6 +124,7 @@ public: mutable boost::shared_ptr<Exchange> exchange; mutable uint64_t persistenceId; bool redelivered; + bool loaded; ConnectionToken* publisher; mutable MessageAdapter* adapter; diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp index 2190ff4086..2b7e8ff32d 100644 --- a/cpp/src/qpid/broker/NullMessageStore.cpp +++ b/cpp/src/qpid/broker/NullMessageStore.cpp @@ -53,18 +53,18 @@ bool NullMessageStore::init(const std::string& /*dir*/, const bool /*async*/, co void NullMessageStore::create(PersistableQueue& queue) { - QPID_LOG(info, "Can't create durable queue '" << queue.getName() << "'. Persistence not enabled."); + QPID_LOG(info, "Queue '" << queue.getName() + << "' will not be durable. Persistence not enabled."); } -void NullMessageStore::destroy(PersistableQueue& queue) +void NullMessageStore::destroy(PersistableQueue&) { - QPID_LOG(info, "Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled."); } void NullMessageStore::create(const PersistableExchange& exchange) { - QPID_LOG(info, "Can't create durable exchange '" - << exchange.getName() << "'. Persistence not enabled."); + QPID_LOG(info, "Exchange'" << exchange.getName() + << "' will not be durable. Persistence not enabled."); } void NullMessageStore::destroy(const PersistableExchange& ) @@ -86,12 +86,11 @@ void NullMessageStore::stage(PersistableMessage&) void NullMessageStore::destroy(PersistableMessage&) { - QPID_LOG(info, "No need to destroy staged message. Persistence not enabled."); } void NullMessageStore::appendContent(const PersistableMessage&, const string&) { - QPID_LOG(info, "Can't load content. Persistence not enabled."); + QPID_LOG(info, "Can't append content. Persistence not enabled."); } void NullMessageStore::loadContent(const PersistableMessage&, string&, uint64_t, uint32_t) @@ -102,18 +101,16 @@ void NullMessageStore::loadContent(const PersistableMessage&, string&, uint64_t, void NullMessageStore::enqueue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& queue) { msg.enqueueComplete(); - QPID_LOG(info, "Can't enqueue message onto '" << queue.getName() << "'. Persistence not enabled."); + QPID_LOG(info, "Message is not durably recorded on '" << queue.getName() << "'. Persistence not enabled."); } -void NullMessageStore::dequeue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& queue) +void NullMessageStore::dequeue(TransactionContext*, PersistableMessage& msg, const PersistableQueue&) { msg.dequeueComplete(); - QPID_LOG(info, "Can't dequeue message from '" << queue.getName() << "'. Persistence not enabled."); } -void NullMessageStore::flush(const qpid::broker::PersistableQueue& queue) +void NullMessageStore::flush(const qpid::broker::PersistableQueue&) { - QPID_LOG(info, "Can't flush. Persistence not enabled queue-" << queue.getName()); } u_int32_t NullMessageStore::outstandingQueueAIO(const PersistableQueue& ) diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h index 96aa286b82..b58bd20ab3 100644 --- a/cpp/src/qpid/broker/NullMessageStore.h +++ b/cpp/src/qpid/broker/NullMessageStore.h @@ -38,7 +38,7 @@ class NullMessageStore : public MessageStore public: NullMessageStore(bool warn = false); - virtual bool init(const std::string& dir, const bool async, const bool force); + virtual bool init(const std::string& dir, const bool async, const bool force); virtual std::auto_ptr<TransactionContext> begin(); virtual std::auto_ptr<TPCTransactionContext> begin(const std::string& xid); virtual void prepare(TPCTransactionContext& txn); @@ -63,7 +63,7 @@ public: virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue); virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue); virtual u_int32_t outstandingQueueAIO(const PersistableQueue& queue); - virtual void flush(const qpid::broker::PersistableQueue& queue); + virtual void flush(const qpid::broker::PersistableQueue& queue); ~NullMessageStore(){} }; diff --git a/cpp/src/qpid/framing/frame_functors.h b/cpp/src/qpid/framing/frame_functors.h index d5b1321d72..992d389faf 100644 --- a/cpp/src/qpid/framing/frame_functors.h +++ b/cpp/src/qpid/framing/frame_functors.h @@ -111,6 +111,12 @@ public: } }; +class MarkLastSegment +{ +public: + void operator()(AMQFrame& f) const { f.setEof(true); } +}; + } } |
