summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2008-10-13 18:07:07 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2008-10-13 18:07:07 +0000
commit604b624676c20770d6b6b30be3b2fb357892982e (patch)
treed9cf003df5760f7815f94e9181df5429b1bbeaf4 /cpp/src/qpid/broker
parent02b136ad60558724e77cb1d72d9ca06679c7df87 (diff)
downloadqpid-python-604b624676c20770d6b6b30be3b2fb357892982e.tar.gz
QPID-1351
-Support for sequencing messages through an exchange -Related changes - Bug fix for ptr saftey in Headers & FanOut exchange - Added support for int64 and uint64 in fieldvalue / fieldtable - Added tests for fieldtable - Added tests for sequencing message feature. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@704192 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker')
-rw-r--r--cpp/src/qpid/broker/DirectExchange.cpp1
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp26
-rw-r--r--cpp/src/qpid/broker/Exchange.h7
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.cpp13
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp19
-rw-r--r--cpp/src/qpid/broker/TopicExchange.cpp1
6 files changed, 53 insertions, 14 deletions
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp
index b4b892feeb..fe2e9db3d5 100644
--- a/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/cpp/src/qpid/broker/DirectExchange.cpp
@@ -70,6 +70,7 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c
}
void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
+ preRoute(msg);
Queues::ConstPtr p;
{
Mutex::ScopedLock l(lock);
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index 4824fa5742..deb8df3fc4 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -22,8 +22,11 @@
#include "Exchange.h"
#include "ExchangeRegistry.h"
#include "qpid/agent/ManagementAgent.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/MessageProperties.h"
using namespace qpid::broker;
+using namespace qpid::framing;
using qpid::framing::Buffer;
using qpid::framing::FieldTable;
using qpid::management::ManagementAgent;
@@ -32,8 +35,15 @@ using qpid::management::Manageable;
using qpid::management::Args;
namespace _qmf = qmf::org::apache::qpid::broker;
+namespace
+{
+const std::string qpidMsgSequence("qpid.msg_sequence");
+}
+
+
Exchange::Exchange (const string& _name, Manageable* parent) :
- name(_name), durable(false), persistenceId(0), mgmtExchange(0)
+ name(_name), durable(false), persistenceId(0), sequence(false),
+ sequenceNo(0), mgmtExchange(0)
{
if (parent != 0)
{
@@ -48,7 +58,8 @@ Exchange::Exchange (const string& _name, Manageable* parent) :
Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
Manageable* parent)
- : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0), mgmtExchange(0)
+ : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0),
+ sequence(false), sequenceNo(0), mgmtExchange(0)
{
if (parent != 0)
{
@@ -66,6 +77,10 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel
}
}
}
+
+ sequence = _args.get(qpidMsgSequence);
+ if (sequence) QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing");
+
}
Exchange::~Exchange ()
@@ -74,6 +89,13 @@ Exchange::~Exchange ()
mgmtExchange->resourceDestroy ();
}
+void Exchange::preRoute(Deliverable& msg){
+ if (sequence){
+ sys::Mutex::ScopedLock lock(sequenceLock);
+ msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,++sequenceNo);
+ }
+}
+
void Exchange::setPersistenceId(uint64_t id) const
{
if (mgmtExchange != 0 && persistenceId == 0)
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
index 9901fbb18b..54d16a47bd 100644
--- a/cpp/src/qpid/broker/Exchange.h
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -28,6 +28,7 @@
#include "MessageStore.h"
#include "PersistableExchange.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/sys/Mutex.h"
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Exchange.h"
#include "qmf/org/apache/qpid/broker/Binding.h"
@@ -45,8 +46,14 @@ namespace qpid {
boost::shared_ptr<Exchange> alternate;
uint32_t alternateUsers;
mutable uint64_t persistenceId;
+ bool sequence;
+ mutable qpid::sys::Mutex sequenceLock;
+ uint64_t sequenceNo;
protected:
+
+ void preRoute(Deliverable& msg);
+
struct Binding : public management::Manageable {
typedef boost::shared_ptr<Binding> shared_ptr;
typedef std::vector<Binding::shared_ptr> vector;
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp
index 1e03d34b25..42fe537c6a 100644
--- a/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -69,15 +69,18 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, cons
}
void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){
+ preRoute(msg);
uint32_t count(0);
BindingsArray::ConstPtr p = bindings.snapshot();
- for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++){
- msg.deliverTo((*i)->queue);
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched ();
+ if (p.get()){
+ for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++){
+ msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding != 0)
+ (*i)->mgmtBinding->inc_msgMatched ();
+ }
}
-
+
if (mgmtExchange != 0)
{
mgmtExchange->inc_msgReceives ();
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index 5f89202c75..8e62803cc6 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -105,14 +105,17 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey,
void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){
if (!args) return;//can't match if there were no headers passed in
+ preRoute(msg);
uint32_t count(0);
Bindings::ConstPtr p = bindings.snapshot();
- for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++) {
- if (match((*i)->args, *args)) msg.deliverTo((*i)->queue);
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched ();
+ if (p.get()){
+ for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++) {
+ if (match((*i)->args, *args)) msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding != 0)
+ (*i)->mgmtBinding->inc_msgMatched ();
+ }
}
if (mgmtExchange != 0)
@@ -136,9 +139,11 @@ void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, cons
bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const args)
{
Bindings::ConstPtr p = bindings.snapshot();
- for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) {
- if ( (!args || equal((*i)->args, *args)) && (!queue || (*i)->queue == queue)) {
- return true;
+ if (p.get()){
+ for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) {
+ if ( (!args || equal((*i)->args, *args)) && (!queue || (*i)->queue == queue)) {
+ return true;
+ }
}
}
return false;
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp
index 787cf9637c..cf4a765266 100644
--- a/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/cpp/src/qpid/broker/TopicExchange.cpp
@@ -180,6 +180,7 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, TopicPattern& pattern)
void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
RWlock::ScopedRlock l(lock);
+ preRoute(msg);
uint32_t count(0);
Tokens tokens(routingKey);