summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-11-08 14:05:38 +0000
committerGordon Sim <gsim@apache.org>2007-11-08 14:05:38 +0000
commit604a216da725da2770978756de763caa5c66e5c7 (patch)
tree194340aa45f23efb6fc037d11f61ff63d47ffc7f /cpp/src
parentcb5d95fba16cbe2a0e03eae6ba6c9ce776b88327 (diff)
downloadqpid-python-604a216da725da2770978756de763caa5c66e5c7.tar.gz
Make standard exchanges durable
Ensure flags are set correctly for recovered messages with no content git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@593159 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/Broker.cpp18
-rw-r--r--cpp/src/qpid/broker/Broker.h1
-rw-r--r--cpp/src/qpid/broker/ExchangeRegistry.h2
-rw-r--r--cpp/src/qpid/broker/Message.cpp24
-rw-r--r--cpp/src/qpid/broker/Message.h1
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.cpp21
-rw-r--r--cpp/src/qpid/broker/NullMessageStore.h4
-rw-r--r--cpp/src/qpid/framing/frame_functors.h6
8 files changed, 50 insertions, 27 deletions
diff --git a/cpp/src/qpid/broker/Broker.cpp b/cpp/src/qpid/broker/Broker.cpp
index 5896ce7947..a02ec432f6 100644
--- a/cpp/src/qpid/broker/Broker.cpp
+++ b/cpp/src/qpid/broker/Broker.cpp
@@ -128,10 +128,6 @@ Broker::Broker(const Broker::Options& conf) :
}
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
- exchanges.declare(amq_direct, DirectExchange::typeName);
- exchanges.declare(amq_topic, TopicExchange::typeName);
- exchanges.declare(amq_fanout, FanOutExchange::typeName);
- exchanges.declare(amq_match, HeadersExchange::typeName);
if(conf.enableMgmt) {
QPID_LOG(info, "Management enabled");
@@ -153,6 +149,11 @@ Broker::Broker(const Broker::Options& conf) :
store->recover(recoverer);
}
}
+ //ensure standard exchanges exist (done after recovery from store)
+ declareStandardExchange(amq_direct, DirectExchange::typeName);
+ declareStandardExchange(amq_topic, TopicExchange::typeName);
+ declareStandardExchange(amq_fanout, FanOutExchange::typeName);
+ declareStandardExchange(amq_match, HeadersExchange::typeName);
// Initialize plugins
const Plugin::Plugins& plugins=Plugin::getPlugins();
@@ -162,6 +163,15 @@ Broker::Broker(const Broker::Options& conf) :
(*i)->initialize(*this);
}
+void Broker::declareStandardExchange(const std::string& name, const std::string& type)
+{
+ bool storeEnabled = store.get();
+ std::pair<Exchange::shared_ptr, bool> status = exchanges.declare(name, type, storeEnabled);
+ if (status.second && storeEnabled) {
+ store->create(*status.first);
+ }
+}
+
shared_ptr<Broker> Broker::create(int16_t port)
{
diff --git a/cpp/src/qpid/broker/Broker.h b/cpp/src/qpid/broker/Broker.h
index 9ae6a5c8af..6f4cc97936 100644
--- a/cpp/src/qpid/broker/Broker.h
+++ b/cpp/src/qpid/broker/Broker.h
@@ -136,6 +136,7 @@ class Broker : public sys::Runnable, public Plugin::Target
ManagementObjectVhost::shared_ptr mgmtVhostObject;
static MessageStore* createStore(const Options& config);
+ void declareStandardExchange(const std::string& name, const std::string& type);
};
}}
diff --git a/cpp/src/qpid/broker/ExchangeRegistry.h b/cpp/src/qpid/broker/ExchangeRegistry.h
index f7f29ff0ea..dd75940c4c 100644
--- a/cpp/src/qpid/broker/ExchangeRegistry.h
+++ b/cpp/src/qpid/broker/ExchangeRegistry.h
@@ -40,7 +40,7 @@ namespace broker {
std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type)
throw(UnknownExchangeTypeException);
std::pair<Exchange::shared_ptr, bool> declare(const std::string& name, const std::string& type,
- bool durable, const qpid::framing::FieldTable& args)
+ bool durable, const qpid::framing::FieldTable& args = framing::FieldTable())
throw(UnknownExchangeTypeException);
void destroy(const std::string& name);
Exchange::shared_ptr get(const std::string& name);
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 78d6cd3891..6e3e6a55f7 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -36,7 +36,7 @@ using std::string;
TransferAdapter Message::TRANSFER;
PublishAdapter Message::PUBLISH;
-Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), publisher(0), adapter(0) {}
+Message::Message(const SequenceNumber& id) : frames(id), persistenceId(0), redelivered(false), loaded(false), publisher(0), adapter(0) {}
std::string Message::getRoutingKey() const
{
@@ -121,12 +121,20 @@ void Message::decodeHeader(framing::Buffer& buffer)
void Message::decodeContent(framing::Buffer& buffer)
{
- //get the data as a string and set that as the content
- //body on a frame then add that frame to the frameset
- AMQFrame frame;
- frame.setBody(AMQContentBody());
- frame.castBody<AMQContentBody>()->decode(buffer, buffer.available());
- frames.append(frame);
+ if (buffer.available()) {
+ //get the data as a string and set that as the content
+ //body on a frame then add that frame to the frameset
+ AMQFrame frame;
+ frame.setBody(AMQContentBody());
+ frame.castBody<AMQContentBody>()->decode(buffer, buffer.available());
+ frames.append(frame);
+ } else {
+ //adjust header flags
+ MarkLastSegment f;
+ frames.map_if(f, TypeFilter(HEADER_BODY));
+ }
+ //mark content loaded
+ loaded = true;
}
void Message::releaseContent(MessageStore* _store)
@@ -205,5 +213,5 @@ uint64_t Message::contentSize() const
bool Message::isContentLoaded() const
{
- return contentSize() > 0;
+ return loaded;
}
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index f9a3596a98..d043d50ad0 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -124,6 +124,7 @@ public:
mutable boost::shared_ptr<Exchange> exchange;
mutable uint64_t persistenceId;
bool redelivered;
+ bool loaded;
ConnectionToken* publisher;
mutable MessageAdapter* adapter;
diff --git a/cpp/src/qpid/broker/NullMessageStore.cpp b/cpp/src/qpid/broker/NullMessageStore.cpp
index 2190ff4086..2b7e8ff32d 100644
--- a/cpp/src/qpid/broker/NullMessageStore.cpp
+++ b/cpp/src/qpid/broker/NullMessageStore.cpp
@@ -53,18 +53,18 @@ bool NullMessageStore::init(const std::string& /*dir*/, const bool /*async*/, co
void NullMessageStore::create(PersistableQueue& queue)
{
- QPID_LOG(info, "Can't create durable queue '" << queue.getName() << "'. Persistence not enabled.");
+ QPID_LOG(info, "Queue '" << queue.getName()
+ << "' will not be durable. Persistence not enabled.");
}
-void NullMessageStore::destroy(PersistableQueue& queue)
+void NullMessageStore::destroy(PersistableQueue&)
{
- QPID_LOG(info, "Can't destroy durable queue '" << queue.getName() << "'. Persistence not enabled.");
}
void NullMessageStore::create(const PersistableExchange& exchange)
{
- QPID_LOG(info, "Can't create durable exchange '"
- << exchange.getName() << "'. Persistence not enabled.");
+ QPID_LOG(info, "Exchange'" << exchange.getName()
+ << "' will not be durable. Persistence not enabled.");
}
void NullMessageStore::destroy(const PersistableExchange& )
@@ -86,12 +86,11 @@ void NullMessageStore::stage(PersistableMessage&)
void NullMessageStore::destroy(PersistableMessage&)
{
- QPID_LOG(info, "No need to destroy staged message. Persistence not enabled.");
}
void NullMessageStore::appendContent(const PersistableMessage&, const string&)
{
- QPID_LOG(info, "Can't load content. Persistence not enabled.");
+ QPID_LOG(info, "Can't append content. Persistence not enabled.");
}
void NullMessageStore::loadContent(const PersistableMessage&, string&, uint64_t, uint32_t)
@@ -102,18 +101,16 @@ void NullMessageStore::loadContent(const PersistableMessage&, string&, uint64_t,
void NullMessageStore::enqueue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& queue)
{
msg.enqueueComplete();
- QPID_LOG(info, "Can't enqueue message onto '" << queue.getName() << "'. Persistence not enabled.");
+ QPID_LOG(info, "Message is not durably recorded on '" << queue.getName() << "'. Persistence not enabled.");
}
-void NullMessageStore::dequeue(TransactionContext*, PersistableMessage& msg, const PersistableQueue& queue)
+void NullMessageStore::dequeue(TransactionContext*, PersistableMessage& msg, const PersistableQueue&)
{
msg.dequeueComplete();
- QPID_LOG(info, "Can't dequeue message from '" << queue.getName() << "'. Persistence not enabled.");
}
-void NullMessageStore::flush(const qpid::broker::PersistableQueue& queue)
+void NullMessageStore::flush(const qpid::broker::PersistableQueue&)
{
- QPID_LOG(info, "Can't flush. Persistence not enabled queue-" << queue.getName());
}
u_int32_t NullMessageStore::outstandingQueueAIO(const PersistableQueue& )
diff --git a/cpp/src/qpid/broker/NullMessageStore.h b/cpp/src/qpid/broker/NullMessageStore.h
index 96aa286b82..b58bd20ab3 100644
--- a/cpp/src/qpid/broker/NullMessageStore.h
+++ b/cpp/src/qpid/broker/NullMessageStore.h
@@ -38,7 +38,7 @@ class NullMessageStore : public MessageStore
public:
NullMessageStore(bool warn = false);
- virtual bool init(const std::string& dir, const bool async, const bool force);
+ virtual bool init(const std::string& dir, const bool async, const bool force);
virtual std::auto_ptr<TransactionContext> begin();
virtual std::auto_ptr<TPCTransactionContext> begin(const std::string& xid);
virtual void prepare(TPCTransactionContext& txn);
@@ -63,7 +63,7 @@ public:
virtual void enqueue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
virtual void dequeue(TransactionContext* ctxt, PersistableMessage& msg, const PersistableQueue& queue);
virtual u_int32_t outstandingQueueAIO(const PersistableQueue& queue);
- virtual void flush(const qpid::broker::PersistableQueue& queue);
+ virtual void flush(const qpid::broker::PersistableQueue& queue);
~NullMessageStore(){}
};
diff --git a/cpp/src/qpid/framing/frame_functors.h b/cpp/src/qpid/framing/frame_functors.h
index d5b1321d72..992d389faf 100644
--- a/cpp/src/qpid/framing/frame_functors.h
+++ b/cpp/src/qpid/framing/frame_functors.h
@@ -111,6 +111,12 @@ public:
}
};
+class MarkLastSegment
+{
+public:
+ void operator()(AMQFrame& f) const { f.setEof(true); }
+};
+
}
}