summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorCarl C. Trieloff <cctrieloff@apache.org>2008-10-13 18:07:07 +0000
committerCarl C. Trieloff <cctrieloff@apache.org>2008-10-13 18:07:07 +0000
commit604b624676c20770d6b6b30be3b2fb357892982e (patch)
treed9cf003df5760f7815f94e9181df5429b1bbeaf4 /cpp/src
parent02b136ad60558724e77cb1d72d9ca06679c7df87 (diff)
downloadqpid-python-604b624676c20770d6b6b30be3b2fb357892982e.tar.gz
QPID-1351
-Support for sequencing messages through an exchange -Related changes - Bug fix for ptr saftey in Headers & FanOut exchange - Added support for int64 and uint64 in fieldvalue / fieldtable - Added tests for fieldtable - Added tests for sequencing message feature. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@704192 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/broker/DirectExchange.cpp1
-rw-r--r--cpp/src/qpid/broker/Exchange.cpp26
-rw-r--r--cpp/src/qpid/broker/Exchange.h7
-rw-r--r--cpp/src/qpid/broker/FanOutExchange.cpp13
-rw-r--r--cpp/src/qpid/broker/HeadersExchange.cpp19
-rw-r--r--cpp/src/qpid/broker/TopicExchange.cpp1
-rw-r--r--cpp/src/qpid/framing/FieldTable.cpp16
-rw-r--r--cpp/src/qpid/framing/FieldTable.h6
-rw-r--r--cpp/src/qpid/framing/FieldValue.cpp14
-rw-r--r--cpp/src/qpid/framing/FieldValue.h16
-rw-r--r--cpp/src/tests/ExchangeTest.cpp58
-rw-r--r--cpp/src/tests/FieldTable.cpp13
12 files changed, 172 insertions, 18 deletions
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp
index b4b892feeb..fe2e9db3d5 100644
--- a/cpp/src/qpid/broker/DirectExchange.cpp
+++ b/cpp/src/qpid/broker/DirectExchange.cpp
@@ -70,6 +70,7 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c
}
void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
+ preRoute(msg);
Queues::ConstPtr p;
{
Mutex::ScopedLock l(lock);
diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp
index 4824fa5742..deb8df3fc4 100644
--- a/cpp/src/qpid/broker/Exchange.cpp
+++ b/cpp/src/qpid/broker/Exchange.cpp
@@ -22,8 +22,11 @@
#include "Exchange.h"
#include "ExchangeRegistry.h"
#include "qpid/agent/ManagementAgent.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/MessageProperties.h"
using namespace qpid::broker;
+using namespace qpid::framing;
using qpid::framing::Buffer;
using qpid::framing::FieldTable;
using qpid::management::ManagementAgent;
@@ -32,8 +35,15 @@ using qpid::management::Manageable;
using qpid::management::Args;
namespace _qmf = qmf::org::apache::qpid::broker;
+namespace
+{
+const std::string qpidMsgSequence("qpid.msg_sequence");
+}
+
+
Exchange::Exchange (const string& _name, Manageable* parent) :
- name(_name), durable(false), persistenceId(0), mgmtExchange(0)
+ name(_name), durable(false), persistenceId(0), sequence(false),
+ sequenceNo(0), mgmtExchange(0)
{
if (parent != 0)
{
@@ -48,7 +58,8 @@ Exchange::Exchange (const string& _name, Manageable* parent) :
Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args,
Manageable* parent)
- : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0), mgmtExchange(0)
+ : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0),
+ sequence(false), sequenceNo(0), mgmtExchange(0)
{
if (parent != 0)
{
@@ -66,6 +77,10 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel
}
}
}
+
+ sequence = _args.get(qpidMsgSequence);
+ if (sequence) QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing");
+
}
Exchange::~Exchange ()
@@ -74,6 +89,13 @@ Exchange::~Exchange ()
mgmtExchange->resourceDestroy ();
}
+void Exchange::preRoute(Deliverable& msg){
+ if (sequence){
+ sys::Mutex::ScopedLock lock(sequenceLock);
+ msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,++sequenceNo);
+ }
+}
+
void Exchange::setPersistenceId(uint64_t id) const
{
if (mgmtExchange != 0 && persistenceId == 0)
diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h
index 9901fbb18b..54d16a47bd 100644
--- a/cpp/src/qpid/broker/Exchange.h
+++ b/cpp/src/qpid/broker/Exchange.h
@@ -28,6 +28,7 @@
#include "MessageStore.h"
#include "PersistableExchange.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/sys/Mutex.h"
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Exchange.h"
#include "qmf/org/apache/qpid/broker/Binding.h"
@@ -45,8 +46,14 @@ namespace qpid {
boost::shared_ptr<Exchange> alternate;
uint32_t alternateUsers;
mutable uint64_t persistenceId;
+ bool sequence;
+ mutable qpid::sys::Mutex sequenceLock;
+ uint64_t sequenceNo;
protected:
+
+ void preRoute(Deliverable& msg);
+
struct Binding : public management::Manageable {
typedef boost::shared_ptr<Binding> shared_ptr;
typedef std::vector<Binding::shared_ptr> vector;
diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp
index 1e03d34b25..42fe537c6a 100644
--- a/cpp/src/qpid/broker/FanOutExchange.cpp
+++ b/cpp/src/qpid/broker/FanOutExchange.cpp
@@ -69,15 +69,18 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, cons
}
void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){
+ preRoute(msg);
uint32_t count(0);
BindingsArray::ConstPtr p = bindings.snapshot();
- for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++){
- msg.deliverTo((*i)->queue);
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched ();
+ if (p.get()){
+ for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++){
+ msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding != 0)
+ (*i)->mgmtBinding->inc_msgMatched ();
+ }
}
-
+
if (mgmtExchange != 0)
{
mgmtExchange->inc_msgReceives ();
diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp
index 5f89202c75..8e62803cc6 100644
--- a/cpp/src/qpid/broker/HeadersExchange.cpp
+++ b/cpp/src/qpid/broker/HeadersExchange.cpp
@@ -105,14 +105,17 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey,
void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){
if (!args) return;//can't match if there were no headers passed in
+ preRoute(msg);
uint32_t count(0);
Bindings::ConstPtr p = bindings.snapshot();
- for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++) {
- if (match((*i)->args, *args)) msg.deliverTo((*i)->queue);
- if ((*i)->mgmtBinding != 0)
- (*i)->mgmtBinding->inc_msgMatched ();
+ if (p.get()){
+ for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++) {
+ if (match((*i)->args, *args)) msg.deliverTo((*i)->queue);
+ if ((*i)->mgmtBinding != 0)
+ (*i)->mgmtBinding->inc_msgMatched ();
+ }
}
if (mgmtExchange != 0)
@@ -136,9 +139,11 @@ void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, cons
bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const args)
{
Bindings::ConstPtr p = bindings.snapshot();
- for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) {
- if ( (!args || equal((*i)->args, *args)) && (!queue || (*i)->queue == queue)) {
- return true;
+ if (p.get()){
+ for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) {
+ if ( (!args || equal((*i)->args, *args)) && (!queue || (*i)->queue == queue)) {
+ return true;
+ }
}
}
return false;
diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp
index 787cf9637c..cf4a765266 100644
--- a/cpp/src/qpid/broker/TopicExchange.cpp
+++ b/cpp/src/qpid/broker/TopicExchange.cpp
@@ -180,6 +180,7 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, TopicPattern& pattern)
void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){
RWlock::ScopedRlock l(lock);
+ preRoute(msg);
uint32_t count(0);
Tokens tokens(routingKey);
diff --git a/cpp/src/qpid/framing/FieldTable.cpp b/cpp/src/qpid/framing/FieldTable.cpp
index 290983e304..013bcd1797 100644
--- a/cpp/src/qpid/framing/FieldTable.cpp
+++ b/cpp/src/qpid/framing/FieldTable.cpp
@@ -74,10 +74,18 @@ void FieldTable::setInt(const std::string& name, int value){
values[name] = ValuePtr(new IntegerValue(value));
}
+void FieldTable::setInt64(const std::string& name, int64_t value){
+ values[name] = ValuePtr(new Integer64Value(value));
+}
+
void FieldTable::setTimestamp(const std::string& name, uint64_t value){
values[name] = ValuePtr(new TimeValue(value));
}
+void FieldTable::setUInt64(const std::string& name, uint64_t value){
+ values[name] = ValuePtr(new Unsigned64Value(value));
+}
+
void FieldTable::setTable(const std::string& name, const FieldTable& value)
{
values[name] = ValuePtr(new FieldTableValue(value));
@@ -131,6 +139,14 @@ int FieldTable::getInt(const std::string& name) const {
// return getValue<uint64_t>(name);
//}
+uint64_t FieldTable::getAsUInt64(const std::string& name) const {
+ return static_cast<uint64_t>( getValue<int64_t>(get(name)));
+}
+
+int64_t FieldTable::getAsInt64(const std::string& name) const {
+ return getValue<int64_t>(get(name));
+}
+
bool FieldTable::getTable(const std::string& name, FieldTable& value) const {
return getEncodedValue<FieldTable>(get(name), value);
}
diff --git a/cpp/src/qpid/framing/FieldTable.h b/cpp/src/qpid/framing/FieldTable.h
index 6dcc2ea7b4..f4f130743b 100644
--- a/cpp/src/qpid/framing/FieldTable.h
+++ b/cpp/src/qpid/framing/FieldTable.h
@@ -63,7 +63,9 @@ class FieldTable
void setString(const std::string& name, const std::string& value);
void setInt(const std::string& name, int value);
+ void setInt64(const std::string& name, int64_t value);
void setTimestamp(const std::string& name, uint64_t value);
+ void setUInt64(const std::string& name, uint64_t value);
void setTable(const std::string& name, const FieldTable& value);
void setArray(const std::string& name, const Array& value);
void setFloat(const std::string& name, float value);
@@ -73,11 +75,13 @@ class FieldTable
std::string getString(const std::string& name) const;
int getInt(const std::string& name) const;
// uint64_t getTimestamp(const std::string& name) const;
+ uint64_t getAsUInt64(const std::string& name) const;
+ int64_t getAsInt64(const std::string& name) const;
bool getTable(const std::string& name, FieldTable& value) const;
bool getArray(const std::string& name, Array& value) const;
bool getFloat(const std::string& name, float& value) const;
bool getDouble(const std::string& name, double& value) const;
-// //void getDecimal(string& name, xxx& value);
+// void getDecimal(string& name, xxx& value);
void erase(const std::string& name);
diff --git a/cpp/src/qpid/framing/FieldValue.cpp b/cpp/src/qpid/framing/FieldValue.cpp
index bbef9ebceb..5fbfe7d0c1 100644
--- a/cpp/src/qpid/framing/FieldValue.cpp
+++ b/cpp/src/qpid/framing/FieldValue.cpp
@@ -135,8 +135,7 @@ Struct32Value::Struct32Value(const std::string& v) :
IntegerValue::IntegerValue(int v) :
FieldValue(0x21, new FixedWidthValue<4>(v))
-{
-}
+{}
FloatValue::FloatValue(float v) :
FieldValue(0x23, new FixedWidthValue<4>(reinterpret_cast<uint8_t*>(&v)))
@@ -146,8 +145,17 @@ DoubleValue::DoubleValue(double v) :
FieldValue(0x33, new FixedWidthValue<8>(reinterpret_cast<uint8_t*>(&v)))
{}
-TimeValue::TimeValue(uint64_t v) :
+Integer64Value::Integer64Value(int64_t v) :
+ FieldValue(0x31, new FixedWidthValue<8>(v))
+{}
+
+Unsigned64Value::Unsigned64Value(uint64_t v) :
FieldValue(0x32, new FixedWidthValue<8>(v))
+{}
+
+
+TimeValue::TimeValue(uint64_t v) :
+ FieldValue(0x38, new FixedWidthValue<8>(v))
{
}
diff --git a/cpp/src/qpid/framing/FieldValue.h b/cpp/src/qpid/framing/FieldValue.h
index 17f858989e..0a70360cbd 100644
--- a/cpp/src/qpid/framing/FieldValue.h
+++ b/cpp/src/qpid/framing/FieldValue.h
@@ -106,12 +106,18 @@ template <>
inline bool FieldValue::convertsTo<int>() const { return data->convertsToInt(); }
template <>
+inline bool FieldValue::convertsTo<int64_t>() const { return data->convertsToInt(); }
+
+template <>
inline bool FieldValue::convertsTo<std::string>() const { return data->convertsToString(); }
template <>
inline int FieldValue::get<int>() const { return data->getInt(); }
template <>
+inline int64_t FieldValue::get<int64_t>() const { return data->getInt(); }
+
+template <>
inline std::string FieldValue::get<std::string>() const { return data->getString(); }
inline std::ostream& operator<<(std::ostream& out, const FieldValue& v) {
@@ -278,6 +284,16 @@ class TimeValue : public FieldValue {
TimeValue(uint64_t v);
};
+class Integer64Value : public FieldValue {
+ public:
+ Integer64Value(int64_t v);
+};
+
+class Unsigned64Value : public FieldValue {
+ public:
+ Unsigned64Value(uint64_t v);
+};
+
class FieldTableValue : public FieldValue {
public:
FieldTableValue(const FieldTable&);
diff --git a/cpp/src/tests/ExchangeTest.cpp b/cpp/src/tests/ExchangeTest.cpp
index 94ee36065d..711fede950 100644
--- a/cpp/src/tests/ExchangeTest.cpp
+++ b/cpp/src/tests/ExchangeTest.cpp
@@ -165,4 +165,62 @@ QPID_AUTO_TEST_CASE(testDeleteGetAndRedeclare)
BOOST_CHECK_EQUAL(string("direct"), response.first->getType());
}
+intrusive_ptr<Message> cmessage(std::string exchange, std::string routingKey) {
+ intrusive_ptr<Message> msg(new Message());
+ AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), exchange, 0, 0));
+ AMQFrame header(in_place<AMQHeaderBody>());
+ msg->getFrames().append(method);
+ msg->getFrames().append(header);
+ msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey);
+ return msg;
+}
+
+QPID_AUTO_TEST_CASE(testSequenceOptions)
+{
+ FieldTable args;
+ args.setInt("qpid.msg_sequence",1);
+
+ DirectExchange direct("direct1", false, args);
+
+ intrusive_ptr<Message> msg1 = cmessage("e", "A");
+ intrusive_ptr<Message> msg2 = cmessage("e", "B");
+ intrusive_ptr<Message> msg3 = cmessage("e", "C");
+
+ DeliverableMessage dmsg1(msg1);
+ DeliverableMessage dmsg2(msg2);
+ DeliverableMessage dmsg3(msg3);
+
+ direct.route(dmsg1, "abc", 0);
+ direct.route(dmsg2, "abc", 0);
+ direct.route(dmsg3, "abc", 0);
+
+ BOOST_CHECK_EQUAL(1, msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+ BOOST_CHECK_EQUAL(2, msg2->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+ BOOST_CHECK_EQUAL(3, msg3->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+
+ FanOutExchange fanout("fanout1", false, args);
+ HeadersExchange header("headers1", false, args);
+ TopicExchange topic ("topic1", false, args);
+
+ // check other exchanges, that they preroute
+ intrusive_ptr<Message> msg4 = cmessage("e", "A");
+ intrusive_ptr<Message> msg5 = cmessage("e", "B");
+ intrusive_ptr<Message> msg6 = cmessage("e", "C");
+
+ DeliverableMessage dmsg4(msg4);
+ DeliverableMessage dmsg5(msg5);
+ DeliverableMessage dmsg6(msg6);
+
+ fanout.route(dmsg4, "abc", 0);
+ BOOST_CHECK_EQUAL(1, msg4->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+
+ FieldTable headers;
+ header.route(dmsg5, "abc", &headers);
+ BOOST_CHECK_EQUAL(1, msg5->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+
+ topic.route(dmsg6, "abc", 0);
+ BOOST_CHECK_EQUAL(1, msg6->getApplicationHeaders()->getAsInt64("qpid.msg_sequence"));
+
+}
+
QPID_AUTO_TEST_SUITE_END()
diff --git a/cpp/src/tests/FieldTable.cpp b/cpp/src/tests/FieldTable.cpp
index 9c75970278..415461676c 100644
--- a/cpp/src/tests/FieldTable.cpp
+++ b/cpp/src/tests/FieldTable.cpp
@@ -157,4 +157,17 @@ QPID_AUTO_TEST_CASE(testFloatAndDouble)
}
}
+QPID_AUTO_TEST_CASE(test64GetAndSetConverts)
+{
+ FieldTable args;
+ args.setInt64("a",100);
+
+ args.setUInt64("b",1u);
+ BOOST_CHECK_EQUAL(1u, args.getAsUInt64("b"));
+ BOOST_CHECK_EQUAL(100u, args.getAsUInt64("a"));
+ BOOST_CHECK_EQUAL(1, args.getAsInt64("b"));
+ BOOST_CHECK_EQUAL(100, args.getAsInt64("a"));
+
+}
+
QPID_AUTO_TEST_SUITE_END()