diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/broker/DirectExchange.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Exchange.cpp | 26 | ||||
-rw-r--r-- | cpp/src/qpid/broker/Exchange.h | 7 | ||||
-rw-r--r-- | cpp/src/qpid/broker/FanOutExchange.cpp | 13 | ||||
-rw-r--r-- | cpp/src/qpid/broker/HeadersExchange.cpp | 19 | ||||
-rw-r--r-- | cpp/src/qpid/broker/TopicExchange.cpp | 1 | ||||
-rw-r--r-- | cpp/src/qpid/framing/FieldTable.cpp | 16 | ||||
-rw-r--r-- | cpp/src/qpid/framing/FieldTable.h | 6 | ||||
-rw-r--r-- | cpp/src/qpid/framing/FieldValue.cpp | 14 | ||||
-rw-r--r-- | cpp/src/qpid/framing/FieldValue.h | 16 | ||||
-rw-r--r-- | cpp/src/tests/ExchangeTest.cpp | 58 | ||||
-rw-r--r-- | cpp/src/tests/FieldTable.cpp | 13 |
12 files changed, 172 insertions, 18 deletions
diff --git a/cpp/src/qpid/broker/DirectExchange.cpp b/cpp/src/qpid/broker/DirectExchange.cpp index b4b892feeb..fe2e9db3d5 100644 --- a/cpp/src/qpid/broker/DirectExchange.cpp +++ b/cpp/src/qpid/broker/DirectExchange.cpp @@ -70,6 +70,7 @@ bool DirectExchange::unbind(Queue::shared_ptr queue, const string& routingKey, c } void DirectExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ + preRoute(msg); Queues::ConstPtr p; { Mutex::ScopedLock l(lock); diff --git a/cpp/src/qpid/broker/Exchange.cpp b/cpp/src/qpid/broker/Exchange.cpp index 4824fa5742..deb8df3fc4 100644 --- a/cpp/src/qpid/broker/Exchange.cpp +++ b/cpp/src/qpid/broker/Exchange.cpp @@ -22,8 +22,11 @@ #include "Exchange.h" #include "ExchangeRegistry.h" #include "qpid/agent/ManagementAgent.h" +#include "qpid/log/Statement.h" +#include "qpid/framing/MessageProperties.h" using namespace qpid::broker; +using namespace qpid::framing; using qpid::framing::Buffer; using qpid::framing::FieldTable; using qpid::management::ManagementAgent; @@ -32,8 +35,15 @@ using qpid::management::Manageable; using qpid::management::Args; namespace _qmf = qmf::org::apache::qpid::broker; +namespace +{ +const std::string qpidMsgSequence("qpid.msg_sequence"); +} + + Exchange::Exchange (const string& _name, Manageable* parent) : - name(_name), durable(false), persistenceId(0), mgmtExchange(0) + name(_name), durable(false), persistenceId(0), sequence(false), + sequenceNo(0), mgmtExchange(0) { if (parent != 0) { @@ -48,7 +58,8 @@ Exchange::Exchange (const string& _name, Manageable* parent) : Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::FieldTable& _args, Manageable* parent) - : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0), mgmtExchange(0) + : name(_name), durable(_durable), args(_args), alternateUsers(0), persistenceId(0), + sequence(false), sequenceNo(0), mgmtExchange(0) { if (parent != 0) { @@ -66,6 +77,10 @@ Exchange::Exchange(const string& _name, bool _durable, const qpid::framing::Fiel } } } + + sequence = _args.get(qpidMsgSequence); + if (sequence) QPID_LOG(debug, "Configured exchange "+ _name +" with Msg sequencing"); + } Exchange::~Exchange () @@ -74,6 +89,13 @@ Exchange::~Exchange () mgmtExchange->resourceDestroy (); } +void Exchange::preRoute(Deliverable& msg){ + if (sequence){ + sys::Mutex::ScopedLock lock(sequenceLock); + msg.getMessage().getProperties<MessageProperties>()->getApplicationHeaders().setInt64(qpidMsgSequence,++sequenceNo); + } +} + void Exchange::setPersistenceId(uint64_t id) const { if (mgmtExchange != 0 && persistenceId == 0) diff --git a/cpp/src/qpid/broker/Exchange.h b/cpp/src/qpid/broker/Exchange.h index 9901fbb18b..54d16a47bd 100644 --- a/cpp/src/qpid/broker/Exchange.h +++ b/cpp/src/qpid/broker/Exchange.h @@ -28,6 +28,7 @@ #include "MessageStore.h" #include "PersistableExchange.h" #include "qpid/framing/FieldTable.h" +#include "qpid/sys/Mutex.h" #include "qpid/management/Manageable.h" #include "qmf/org/apache/qpid/broker/Exchange.h" #include "qmf/org/apache/qpid/broker/Binding.h" @@ -45,8 +46,14 @@ namespace qpid { boost::shared_ptr<Exchange> alternate; uint32_t alternateUsers; mutable uint64_t persistenceId; + bool sequence; + mutable qpid::sys::Mutex sequenceLock; + uint64_t sequenceNo; protected: + + void preRoute(Deliverable& msg); + struct Binding : public management::Manageable { typedef boost::shared_ptr<Binding> shared_ptr; typedef std::vector<Binding::shared_ptr> vector; diff --git a/cpp/src/qpid/broker/FanOutExchange.cpp b/cpp/src/qpid/broker/FanOutExchange.cpp index 1e03d34b25..42fe537c6a 100644 --- a/cpp/src/qpid/broker/FanOutExchange.cpp +++ b/cpp/src/qpid/broker/FanOutExchange.cpp @@ -69,15 +69,18 @@ bool FanOutExchange::unbind(Queue::shared_ptr queue, const string& /*key*/, cons } void FanOutExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* /*args*/){ + preRoute(msg); uint32_t count(0); BindingsArray::ConstPtr p = bindings.snapshot(); - for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++){ - msg.deliverTo((*i)->queue); - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched (); + if (p.get()){ + for(std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++){ + msg.deliverTo((*i)->queue); + if ((*i)->mgmtBinding != 0) + (*i)->mgmtBinding->inc_msgMatched (); + } } - + if (mgmtExchange != 0) { mgmtExchange->inc_msgReceives (); diff --git a/cpp/src/qpid/broker/HeadersExchange.cpp b/cpp/src/qpid/broker/HeadersExchange.cpp index 5f89202c75..8e62803cc6 100644 --- a/cpp/src/qpid/broker/HeadersExchange.cpp +++ b/cpp/src/qpid/broker/HeadersExchange.cpp @@ -105,14 +105,17 @@ bool HeadersExchange::unbind(Queue::shared_ptr queue, const string& bindingKey, void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, const FieldTable* args){ if (!args) return;//can't match if there were no headers passed in + preRoute(msg); uint32_t count(0); Bindings::ConstPtr p = bindings.snapshot(); - for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++) { - if (match((*i)->args, *args)) msg.deliverTo((*i)->queue); - if ((*i)->mgmtBinding != 0) - (*i)->mgmtBinding->inc_msgMatched (); + if (p.get()){ + for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i, count++) { + if (match((*i)->args, *args)) msg.deliverTo((*i)->queue); + if ((*i)->mgmtBinding != 0) + (*i)->mgmtBinding->inc_msgMatched (); + } } if (mgmtExchange != 0) @@ -136,9 +139,11 @@ void HeadersExchange::route(Deliverable& msg, const string& /*routingKey*/, cons bool HeadersExchange::isBound(Queue::shared_ptr queue, const string* const, const FieldTable* const args) { Bindings::ConstPtr p = bindings.snapshot(); - for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) { - if ( (!args || equal((*i)->args, *args)) && (!queue || (*i)->queue == queue)) { - return true; + if (p.get()){ + for (std::vector<Binding::shared_ptr>::const_iterator i = p->begin(); i != p->end(); ++i) { + if ( (!args || equal((*i)->args, *args)) && (!queue || (*i)->queue == queue)) { + return true; + } } } return false; diff --git a/cpp/src/qpid/broker/TopicExchange.cpp b/cpp/src/qpid/broker/TopicExchange.cpp index 787cf9637c..cf4a765266 100644 --- a/cpp/src/qpid/broker/TopicExchange.cpp +++ b/cpp/src/qpid/broker/TopicExchange.cpp @@ -180,6 +180,7 @@ bool TopicExchange::isBound(Queue::shared_ptr queue, TopicPattern& pattern) void TopicExchange::route(Deliverable& msg, const string& routingKey, const FieldTable* /*args*/){ RWlock::ScopedRlock l(lock); + preRoute(msg); uint32_t count(0); Tokens tokens(routingKey); diff --git a/cpp/src/qpid/framing/FieldTable.cpp b/cpp/src/qpid/framing/FieldTable.cpp index 290983e304..013bcd1797 100644 --- a/cpp/src/qpid/framing/FieldTable.cpp +++ b/cpp/src/qpid/framing/FieldTable.cpp @@ -74,10 +74,18 @@ void FieldTable::setInt(const std::string& name, int value){ values[name] = ValuePtr(new IntegerValue(value)); } +void FieldTable::setInt64(const std::string& name, int64_t value){ + values[name] = ValuePtr(new Integer64Value(value)); +} + void FieldTable::setTimestamp(const std::string& name, uint64_t value){ values[name] = ValuePtr(new TimeValue(value)); } +void FieldTable::setUInt64(const std::string& name, uint64_t value){ + values[name] = ValuePtr(new Unsigned64Value(value)); +} + void FieldTable::setTable(const std::string& name, const FieldTable& value) { values[name] = ValuePtr(new FieldTableValue(value)); @@ -131,6 +139,14 @@ int FieldTable::getInt(const std::string& name) const { // return getValue<uint64_t>(name); //} +uint64_t FieldTable::getAsUInt64(const std::string& name) const { + return static_cast<uint64_t>( getValue<int64_t>(get(name))); +} + +int64_t FieldTable::getAsInt64(const std::string& name) const { + return getValue<int64_t>(get(name)); +} + bool FieldTable::getTable(const std::string& name, FieldTable& value) const { return getEncodedValue<FieldTable>(get(name), value); } diff --git a/cpp/src/qpid/framing/FieldTable.h b/cpp/src/qpid/framing/FieldTable.h index 6dcc2ea7b4..f4f130743b 100644 --- a/cpp/src/qpid/framing/FieldTable.h +++ b/cpp/src/qpid/framing/FieldTable.h @@ -63,7 +63,9 @@ class FieldTable void setString(const std::string& name, const std::string& value); void setInt(const std::string& name, int value); + void setInt64(const std::string& name, int64_t value); void setTimestamp(const std::string& name, uint64_t value); + void setUInt64(const std::string& name, uint64_t value); void setTable(const std::string& name, const FieldTable& value); void setArray(const std::string& name, const Array& value); void setFloat(const std::string& name, float value); @@ -73,11 +75,13 @@ class FieldTable std::string getString(const std::string& name) const; int getInt(const std::string& name) const; // uint64_t getTimestamp(const std::string& name) const; + uint64_t getAsUInt64(const std::string& name) const; + int64_t getAsInt64(const std::string& name) const; bool getTable(const std::string& name, FieldTable& value) const; bool getArray(const std::string& name, Array& value) const; bool getFloat(const std::string& name, float& value) const; bool getDouble(const std::string& name, double& value) const; -// //void getDecimal(string& name, xxx& value); +// void getDecimal(string& name, xxx& value); void erase(const std::string& name); diff --git a/cpp/src/qpid/framing/FieldValue.cpp b/cpp/src/qpid/framing/FieldValue.cpp index bbef9ebceb..5fbfe7d0c1 100644 --- a/cpp/src/qpid/framing/FieldValue.cpp +++ b/cpp/src/qpid/framing/FieldValue.cpp @@ -135,8 +135,7 @@ Struct32Value::Struct32Value(const std::string& v) : IntegerValue::IntegerValue(int v) : FieldValue(0x21, new FixedWidthValue<4>(v)) -{ -} +{} FloatValue::FloatValue(float v) : FieldValue(0x23, new FixedWidthValue<4>(reinterpret_cast<uint8_t*>(&v))) @@ -146,8 +145,17 @@ DoubleValue::DoubleValue(double v) : FieldValue(0x33, new FixedWidthValue<8>(reinterpret_cast<uint8_t*>(&v))) {} -TimeValue::TimeValue(uint64_t v) : +Integer64Value::Integer64Value(int64_t v) : + FieldValue(0x31, new FixedWidthValue<8>(v)) +{} + +Unsigned64Value::Unsigned64Value(uint64_t v) : FieldValue(0x32, new FixedWidthValue<8>(v)) +{} + + +TimeValue::TimeValue(uint64_t v) : + FieldValue(0x38, new FixedWidthValue<8>(v)) { } diff --git a/cpp/src/qpid/framing/FieldValue.h b/cpp/src/qpid/framing/FieldValue.h index 17f858989e..0a70360cbd 100644 --- a/cpp/src/qpid/framing/FieldValue.h +++ b/cpp/src/qpid/framing/FieldValue.h @@ -106,12 +106,18 @@ template <> inline bool FieldValue::convertsTo<int>() const { return data->convertsToInt(); } template <> +inline bool FieldValue::convertsTo<int64_t>() const { return data->convertsToInt(); } + +template <> inline bool FieldValue::convertsTo<std::string>() const { return data->convertsToString(); } template <> inline int FieldValue::get<int>() const { return data->getInt(); } template <> +inline int64_t FieldValue::get<int64_t>() const { return data->getInt(); } + +template <> inline std::string FieldValue::get<std::string>() const { return data->getString(); } inline std::ostream& operator<<(std::ostream& out, const FieldValue& v) { @@ -278,6 +284,16 @@ class TimeValue : public FieldValue { TimeValue(uint64_t v); }; +class Integer64Value : public FieldValue { + public: + Integer64Value(int64_t v); +}; + +class Unsigned64Value : public FieldValue { + public: + Unsigned64Value(uint64_t v); +}; + class FieldTableValue : public FieldValue { public: FieldTableValue(const FieldTable&); diff --git a/cpp/src/tests/ExchangeTest.cpp b/cpp/src/tests/ExchangeTest.cpp index 94ee36065d..711fede950 100644 --- a/cpp/src/tests/ExchangeTest.cpp +++ b/cpp/src/tests/ExchangeTest.cpp @@ -165,4 +165,62 @@ QPID_AUTO_TEST_CASE(testDeleteGetAndRedeclare) BOOST_CHECK_EQUAL(string("direct"), response.first->getType()); } +intrusive_ptr<Message> cmessage(std::string exchange, std::string routingKey) { + intrusive_ptr<Message> msg(new Message()); + AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), exchange, 0, 0)); + AMQFrame header(in_place<AMQHeaderBody>()); + msg->getFrames().append(method); + msg->getFrames().append(header); + msg->getFrames().getHeaders()->get<DeliveryProperties>(true)->setRoutingKey(routingKey); + return msg; +} + +QPID_AUTO_TEST_CASE(testSequenceOptions) +{ + FieldTable args; + args.setInt("qpid.msg_sequence",1); + + DirectExchange direct("direct1", false, args); + + intrusive_ptr<Message> msg1 = cmessage("e", "A"); + intrusive_ptr<Message> msg2 = cmessage("e", "B"); + intrusive_ptr<Message> msg3 = cmessage("e", "C"); + + DeliverableMessage dmsg1(msg1); + DeliverableMessage dmsg2(msg2); + DeliverableMessage dmsg3(msg3); + + direct.route(dmsg1, "abc", 0); + direct.route(dmsg2, "abc", 0); + direct.route(dmsg3, "abc", 0); + + BOOST_CHECK_EQUAL(1, msg1->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); + BOOST_CHECK_EQUAL(2, msg2->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); + BOOST_CHECK_EQUAL(3, msg3->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); + + FanOutExchange fanout("fanout1", false, args); + HeadersExchange header("headers1", false, args); + TopicExchange topic ("topic1", false, args); + + // check other exchanges, that they preroute + intrusive_ptr<Message> msg4 = cmessage("e", "A"); + intrusive_ptr<Message> msg5 = cmessage("e", "B"); + intrusive_ptr<Message> msg6 = cmessage("e", "C"); + + DeliverableMessage dmsg4(msg4); + DeliverableMessage dmsg5(msg5); + DeliverableMessage dmsg6(msg6); + + fanout.route(dmsg4, "abc", 0); + BOOST_CHECK_EQUAL(1, msg4->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); + + FieldTable headers; + header.route(dmsg5, "abc", &headers); + BOOST_CHECK_EQUAL(1, msg5->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); + + topic.route(dmsg6, "abc", 0); + BOOST_CHECK_EQUAL(1, msg6->getApplicationHeaders()->getAsInt64("qpid.msg_sequence")); + +} + QPID_AUTO_TEST_SUITE_END() diff --git a/cpp/src/tests/FieldTable.cpp b/cpp/src/tests/FieldTable.cpp index 9c75970278..415461676c 100644 --- a/cpp/src/tests/FieldTable.cpp +++ b/cpp/src/tests/FieldTable.cpp @@ -157,4 +157,17 @@ QPID_AUTO_TEST_CASE(testFloatAndDouble) } } +QPID_AUTO_TEST_CASE(test64GetAndSetConverts) +{ + FieldTable args; + args.setInt64("a",100); + + args.setUInt64("b",1u); + BOOST_CHECK_EQUAL(1u, args.getAsUInt64("b")); + BOOST_CHECK_EQUAL(100u, args.getAsUInt64("a")); + BOOST_CHECK_EQUAL(1, args.getAsInt64("b")); + BOOST_CHECK_EQUAL(100, args.getAsInt64("a")); + +} + QPID_AUTO_TEST_SUITE_END() |