diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2013-03-04 21:08:36 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2013-03-04 21:08:36 +0000 |
| commit | 9db3ea865f15fd03b5f9a42e26d18385bed6bcdd (patch) | |
| tree | f1a059412666e796659625d17472b88d5fb0bced /cpp | |
| parent | eb491c5c7b45989ac1daf7f29adcbae8209233e9 (diff) | |
| download | qpid-python-9db3ea865f15fd03b5f9a42e26d18385bed6bcdd.tar.gz | |
QPID-4558: Selectors for C++ broker
- Added in amqp 1.0 support that uses a filter for the selector
- This change requires at least qpid-proton 0.4 (or a lot of warning
messages are produced by the broker)
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1452524 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
| -rw-r--r-- | cpp/src/qpid/amqp/descriptors.h | 3 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/amqp/Filter.cpp | 22 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/amqp/Filter.h | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/amqp/Outgoing.cpp | 9 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/amqp/Outgoing.h | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/broker/amqp/Session.cpp | 4 | ||||
| -rw-r--r-- | cpp/src/qpid/messaging/amqp/ReceiverContext.cpp | 14 |
7 files changed, 59 insertions, 2 deletions
diff --git a/cpp/src/qpid/amqp/descriptors.h b/cpp/src/qpid/amqp/descriptors.h index 19a8985433..0a3e71fe6d 100644 --- a/cpp/src/qpid/amqp/descriptors.h +++ b/cpp/src/qpid/amqp/descriptors.h @@ -77,7 +77,8 @@ const Descriptor SASL_OUTCOME(SASL_OUTCOME_CODE); namespace filters { const std::string LEGACY_DIRECT_FILTER_SYMBOL("apache.org:legacy-amqp-direct-binding:string"); -const std::string LEGACY_TOPIC_FILTER_SYMBOL("apache.org:legacy-amqp-direct-binding:string"); +const std::string LEGACY_TOPIC_FILTER_SYMBOL("apache.org:legacy-amqp-topic-binding:string"); +const std::string QPID_SELECTOR_FILTER_SYMBOL("qpid.apache.org:selector-filter:string"); const uint64_t LEGACY_DIRECT_FILTER_CODE(0x0000468C00000000ULL); const uint64_t LEGACY_TOPIC_FILTER_CODE(0x0000468C00000001ULL); diff --git a/cpp/src/qpid/broker/amqp/Filter.cpp b/cpp/src/qpid/broker/amqp/Filter.cpp index 38baba0df1..2984c2923a 100644 --- a/cpp/src/qpid/broker/amqp/Filter.cpp +++ b/cpp/src/qpid/broker/amqp/Filter.cpp @@ -61,6 +61,8 @@ void Filter::onStringValue(const qpid::amqp::CharSequence& key, const qpid::amqp if (descriptor->match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE) || descriptor->match(qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE)) { setSubjectFilter(filter); + } else if (descriptor->match(qpid::amqp::filters::QPID_SELECTOR_FILTER_SYMBOL, 0)) { + setSelectorFilter(filter); } else { QPID_LOG(notice, "Skipping unrecognised string filter with key " << filter.key << " and descriptor " << filter.descriptor); } @@ -89,6 +91,26 @@ void Filter::setSubjectFilter(const StringFilter& filter) } } +bool Filter::hasSelectorFilter() const +{ + return !selectorFilter.value.empty(); +} + +std::string Filter::getSelectorFilter() const +{ + return selectorFilter.value; +} + + +void Filter::setSelectorFilter(const StringFilter& filter) +{ + if (hasSelectorFilter()) { + QPID_LOG(notice, "Skipping filter with key " << filter.key << "; selector filter already set"); + } else { + selectorFilter = filter; + } +} + void Filter::bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue) { subjectFilter.bind(exchange, queue); diff --git a/cpp/src/qpid/broker/amqp/Filter.h b/cpp/src/qpid/broker/amqp/Filter.h index 20cceb372a..72f92b57a5 100644 --- a/cpp/src/qpid/broker/amqp/Filter.h +++ b/cpp/src/qpid/broker/amqp/Filter.h @@ -40,6 +40,8 @@ class Filter : qpid::amqp::MapReader void write(pn_data_t*); bool hasSubjectFilter() const; std::string getSubjectFilter() const; + bool hasSelectorFilter() const; + std::string getSelectorFilter() const; void bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue); private: struct StringFilter @@ -55,8 +57,10 @@ class Filter : qpid::amqp::MapReader void onStringValue(const qpid::amqp::CharSequence& key, const qpid::amqp::CharSequence& value, const qpid::amqp::Descriptor* descriptor); void setSubjectFilter(const StringFilter&); + void setSelectorFilter(const StringFilter&); StringFilter subjectFilter; + StringFilter selectorFilter; }; }}} // namespace qpid::broker::amqp diff --git a/cpp/src/qpid/broker/amqp/Outgoing.cpp b/cpp/src/qpid/broker/amqp/Outgoing.cpp index 9605cacac1..eb0a6e20aa 100644 --- a/cpp/src/qpid/broker/amqp/Outgoing.cpp +++ b/cpp/src/qpid/broker/amqp/Outgoing.cpp @@ -22,6 +22,7 @@ #include "qpid/broker/amqp/Header.h" #include "qpid/broker/amqp/Translation.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/Selector.h" #include "qpid/broker/TopicKeyNode.h" #include "qpid/sys/OutputControl.h" #include "qpid/amqp/MessageEncoder.h" @@ -170,6 +171,11 @@ void Outgoing::setSubjectFilter(const std::string& f) subjectFilter = f; } +void Outgoing::setSelectorFilter(const std::string& f) +{ + selector.reset(new Selector(f)); +} + namespace { bool match(TokenIterator& filter, TokenIterator& target) @@ -213,7 +219,8 @@ bool match(const std::string& filter, const std::string& target) bool Outgoing::filter(const qpid::broker::Message& m) { - return subjectFilter.empty() || subjectFilter == m.getRoutingKey() || match(subjectFilter, m.getRoutingKey()); + return (subjectFilter.empty() || subjectFilter == m.getRoutingKey() || match(subjectFilter, m.getRoutingKey())) + && (!selector || selector->filter(m)); } void Outgoing::cancel() {} diff --git a/cpp/src/qpid/broker/amqp/Outgoing.h b/cpp/src/qpid/broker/amqp/Outgoing.h index a8450a48cf..7d845a1427 100644 --- a/cpp/src/qpid/broker/amqp/Outgoing.h +++ b/cpp/src/qpid/broker/amqp/Outgoing.h @@ -24,7 +24,9 @@ #include "qpid/broker/amqp/Message.h" #include "qpid/broker/amqp/ManagedOutgoingLink.h" #include "qpid/broker/Consumer.h" + #include <boost/shared_ptr.hpp> +#include <boost/scoped_ptr.hpp> #include <boost/enable_shared_from_this.hpp> extern "C" { #include <proton/engine.h> @@ -37,6 +39,7 @@ class OutputControl; namespace broker { class Broker; class Queue; +class Selector; namespace amqp { class ManagedSession; template <class T> @@ -61,6 +64,7 @@ class Outgoing : public qpid::broker::Consumer, public boost::enable_shared_from public: Outgoing(Broker&,boost::shared_ptr<Queue> q, pn_link_t* l, ManagedSession&, qpid::sys::OutputControl& o, bool topic); void setSubjectFilter(const std::string&); + void setSelectorFilter(const std::string&); void init(); bool dispatch(); void write(const char* data, size_t size); @@ -102,6 +106,7 @@ class Outgoing : public qpid::broker::Consumer, public boost::enable_shared_from int outstanding; std::vector<char> buffer; std::string subjectFilter; + boost::scoped_ptr<Selector> selector; }; }}} // namespace qpid::broker::amqp diff --git a/cpp/src/qpid/broker/amqp/Session.cpp b/cpp/src/qpid/broker/amqp/Session.cpp index fabe609473..3ec5eb15dd 100644 --- a/cpp/src/qpid/broker/amqp/Session.cpp +++ b/cpp/src/qpid/broker/amqp/Session.cpp @@ -31,6 +31,7 @@ #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/Message.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/Selector.h" #include "qpid/broker/TopicExchange.h" #include "qpid/broker/amqp/Filter.h" #include "qpid/broker/amqp/NodeProperties.h" @@ -132,6 +133,9 @@ void Session::attach(pn_link_t* link) if (filter.hasSubjectFilter()) { q->setSubjectFilter(filter.getSubjectFilter()); } + if (filter.hasSelectorFilter()) { + q->setSelectorFilter(filter.getSelectorFilter()); + } senders[link] = q; } else if (node.exchange) { QueueSettings settings(false, true); diff --git a/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp index acf301d28c..638b1fc674 100644 --- a/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp +++ b/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp @@ -120,6 +120,20 @@ void ReceiverContext::configure(pn_terminus_t* source) const helper.setNodeProperties(source); } + // Look specifically for qpid.selector link property and add a filter for it + qpid::types::Variant::Map::const_iterator i = helper.getLinkProperties().find("qpid.selector"); + if (i!=helper.getLinkProperties().end()) { + pn_data_t* filter = pn_terminus_filter(source); + pn_data_put_map(filter); + pn_data_enter(filter); + pn_data_put_symbol(filter, convert("qpid.selector")); + pn_data_put_described(filter); + pn_data_enter(filter); + pn_data_put_symbol(filter, convert(qpid::amqp::filters::QPID_SELECTOR_FILTER_SYMBOL)); + pn_data_put_string(filter, convert(i->second)); + pn_data_exit(filter); + pn_data_exit(filter); + } if (!address.getSubject().empty()) { //filter: pn_data_t* filter = pn_terminus_filter(source); |
