summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2013-03-04 21:08:36 +0000
committerAndrew Stitcher <astitcher@apache.org>2013-03-04 21:08:36 +0000
commit9db3ea865f15fd03b5f9a42e26d18385bed6bcdd (patch)
treef1a059412666e796659625d17472b88d5fb0bced /cpp
parenteb491c5c7b45989ac1daf7f29adcbae8209233e9 (diff)
downloadqpid-python-9db3ea865f15fd03b5f9a42e26d18385bed6bcdd.tar.gz
QPID-4558: Selectors for C++ broker
- Added in amqp 1.0 support that uses a filter for the selector - This change requires at least qpid-proton 0.4 (or a lot of warning messages are produced by the broker) git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1452524 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/amqp/descriptors.h3
-rw-r--r--cpp/src/qpid/broker/amqp/Filter.cpp22
-rw-r--r--cpp/src/qpid/broker/amqp/Filter.h4
-rw-r--r--cpp/src/qpid/broker/amqp/Outgoing.cpp9
-rw-r--r--cpp/src/qpid/broker/amqp/Outgoing.h5
-rw-r--r--cpp/src/qpid/broker/amqp/Session.cpp4
-rw-r--r--cpp/src/qpid/messaging/amqp/ReceiverContext.cpp14
7 files changed, 59 insertions, 2 deletions
diff --git a/cpp/src/qpid/amqp/descriptors.h b/cpp/src/qpid/amqp/descriptors.h
index 19a8985433..0a3e71fe6d 100644
--- a/cpp/src/qpid/amqp/descriptors.h
+++ b/cpp/src/qpid/amqp/descriptors.h
@@ -77,7 +77,8 @@ const Descriptor SASL_OUTCOME(SASL_OUTCOME_CODE);
namespace filters {
const std::string LEGACY_DIRECT_FILTER_SYMBOL("apache.org:legacy-amqp-direct-binding:string");
-const std::string LEGACY_TOPIC_FILTER_SYMBOL("apache.org:legacy-amqp-direct-binding:string");
+const std::string LEGACY_TOPIC_FILTER_SYMBOL("apache.org:legacy-amqp-topic-binding:string");
+const std::string QPID_SELECTOR_FILTER_SYMBOL("qpid.apache.org:selector-filter:string");
const uint64_t LEGACY_DIRECT_FILTER_CODE(0x0000468C00000000ULL);
const uint64_t LEGACY_TOPIC_FILTER_CODE(0x0000468C00000001ULL);
diff --git a/cpp/src/qpid/broker/amqp/Filter.cpp b/cpp/src/qpid/broker/amqp/Filter.cpp
index 38baba0df1..2984c2923a 100644
--- a/cpp/src/qpid/broker/amqp/Filter.cpp
+++ b/cpp/src/qpid/broker/amqp/Filter.cpp
@@ -61,6 +61,8 @@ void Filter::onStringValue(const qpid::amqp::CharSequence& key, const qpid::amqp
if (descriptor->match(qpid::amqp::filters::LEGACY_TOPIC_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE)
|| descriptor->match(qpid::amqp::filters::LEGACY_DIRECT_FILTER_SYMBOL, qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE)) {
setSubjectFilter(filter);
+ } else if (descriptor->match(qpid::amqp::filters::QPID_SELECTOR_FILTER_SYMBOL, 0)) {
+ setSelectorFilter(filter);
} else {
QPID_LOG(notice, "Skipping unrecognised string filter with key " << filter.key << " and descriptor " << filter.descriptor);
}
@@ -89,6 +91,26 @@ void Filter::setSubjectFilter(const StringFilter& filter)
}
}
+bool Filter::hasSelectorFilter() const
+{
+ return !selectorFilter.value.empty();
+}
+
+std::string Filter::getSelectorFilter() const
+{
+ return selectorFilter.value;
+}
+
+
+void Filter::setSelectorFilter(const StringFilter& filter)
+{
+ if (hasSelectorFilter()) {
+ QPID_LOG(notice, "Skipping filter with key " << filter.key << "; selector filter already set");
+ } else {
+ selectorFilter = filter;
+ }
+}
+
void Filter::bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue)
{
subjectFilter.bind(exchange, queue);
diff --git a/cpp/src/qpid/broker/amqp/Filter.h b/cpp/src/qpid/broker/amqp/Filter.h
index 20cceb372a..72f92b57a5 100644
--- a/cpp/src/qpid/broker/amqp/Filter.h
+++ b/cpp/src/qpid/broker/amqp/Filter.h
@@ -40,6 +40,8 @@ class Filter : qpid::amqp::MapReader
void write(pn_data_t*);
bool hasSubjectFilter() const;
std::string getSubjectFilter() const;
+ bool hasSelectorFilter() const;
+ std::string getSelectorFilter() const;
void bind(boost::shared_ptr<Exchange> exchange, boost::shared_ptr<Queue> queue);
private:
struct StringFilter
@@ -55,8 +57,10 @@ class Filter : qpid::amqp::MapReader
void onStringValue(const qpid::amqp::CharSequence& key, const qpid::amqp::CharSequence& value, const qpid::amqp::Descriptor* descriptor);
void setSubjectFilter(const StringFilter&);
+ void setSelectorFilter(const StringFilter&);
StringFilter subjectFilter;
+ StringFilter selectorFilter;
};
}}} // namespace qpid::broker::amqp
diff --git a/cpp/src/qpid/broker/amqp/Outgoing.cpp b/cpp/src/qpid/broker/amqp/Outgoing.cpp
index 9605cacac1..eb0a6e20aa 100644
--- a/cpp/src/qpid/broker/amqp/Outgoing.cpp
+++ b/cpp/src/qpid/broker/amqp/Outgoing.cpp
@@ -22,6 +22,7 @@
#include "qpid/broker/amqp/Header.h"
#include "qpid/broker/amqp/Translation.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/Selector.h"
#include "qpid/broker/TopicKeyNode.h"
#include "qpid/sys/OutputControl.h"
#include "qpid/amqp/MessageEncoder.h"
@@ -170,6 +171,11 @@ void Outgoing::setSubjectFilter(const std::string& f)
subjectFilter = f;
}
+void Outgoing::setSelectorFilter(const std::string& f)
+{
+ selector.reset(new Selector(f));
+}
+
namespace {
bool match(TokenIterator& filter, TokenIterator& target)
@@ -213,7 +219,8 @@ bool match(const std::string& filter, const std::string& target)
bool Outgoing::filter(const qpid::broker::Message& m)
{
- return subjectFilter.empty() || subjectFilter == m.getRoutingKey() || match(subjectFilter, m.getRoutingKey());
+ return (subjectFilter.empty() || subjectFilter == m.getRoutingKey() || match(subjectFilter, m.getRoutingKey()))
+ && (!selector || selector->filter(m));
}
void Outgoing::cancel() {}
diff --git a/cpp/src/qpid/broker/amqp/Outgoing.h b/cpp/src/qpid/broker/amqp/Outgoing.h
index a8450a48cf..7d845a1427 100644
--- a/cpp/src/qpid/broker/amqp/Outgoing.h
+++ b/cpp/src/qpid/broker/amqp/Outgoing.h
@@ -24,7 +24,9 @@
#include "qpid/broker/amqp/Message.h"
#include "qpid/broker/amqp/ManagedOutgoingLink.h"
#include "qpid/broker/Consumer.h"
+
#include <boost/shared_ptr.hpp>
+#include <boost/scoped_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
extern "C" {
#include <proton/engine.h>
@@ -37,6 +39,7 @@ class OutputControl;
namespace broker {
class Broker;
class Queue;
+class Selector;
namespace amqp {
class ManagedSession;
template <class T>
@@ -61,6 +64,7 @@ class Outgoing : public qpid::broker::Consumer, public boost::enable_shared_from
public:
Outgoing(Broker&,boost::shared_ptr<Queue> q, pn_link_t* l, ManagedSession&, qpid::sys::OutputControl& o, bool topic);
void setSubjectFilter(const std::string&);
+ void setSelectorFilter(const std::string&);
void init();
bool dispatch();
void write(const char* data, size_t size);
@@ -102,6 +106,7 @@ class Outgoing : public qpid::broker::Consumer, public boost::enable_shared_from
int outstanding;
std::vector<char> buffer;
std::string subjectFilter;
+ boost::scoped_ptr<Selector> selector;
};
}}} // namespace qpid::broker::amqp
diff --git a/cpp/src/qpid/broker/amqp/Session.cpp b/cpp/src/qpid/broker/amqp/Session.cpp
index fabe609473..3ec5eb15dd 100644
--- a/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/cpp/src/qpid/broker/amqp/Session.cpp
@@ -31,6 +31,7 @@
#include "qpid/broker/FanOutExchange.h"
#include "qpid/broker/Message.h"
#include "qpid/broker/Queue.h"
+#include "qpid/broker/Selector.h"
#include "qpid/broker/TopicExchange.h"
#include "qpid/broker/amqp/Filter.h"
#include "qpid/broker/amqp/NodeProperties.h"
@@ -132,6 +133,9 @@ void Session::attach(pn_link_t* link)
if (filter.hasSubjectFilter()) {
q->setSubjectFilter(filter.getSubjectFilter());
}
+ if (filter.hasSelectorFilter()) {
+ q->setSelectorFilter(filter.getSelectorFilter());
+ }
senders[link] = q;
} else if (node.exchange) {
QueueSettings settings(false, true);
diff --git a/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
index acf301d28c..638b1fc674 100644
--- a/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
+++ b/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
@@ -120,6 +120,20 @@ void ReceiverContext::configure(pn_terminus_t* source) const
helper.setNodeProperties(source);
}
+ // Look specifically for qpid.selector link property and add a filter for it
+ qpid::types::Variant::Map::const_iterator i = helper.getLinkProperties().find("qpid.selector");
+ if (i!=helper.getLinkProperties().end()) {
+ pn_data_t* filter = pn_terminus_filter(source);
+ pn_data_put_map(filter);
+ pn_data_enter(filter);
+ pn_data_put_symbol(filter, convert("qpid.selector"));
+ pn_data_put_described(filter);
+ pn_data_enter(filter);
+ pn_data_put_symbol(filter, convert(qpid::amqp::filters::QPID_SELECTOR_FILTER_SYMBOL));
+ pn_data_put_string(filter, convert(i->second));
+ pn_data_exit(filter);
+ pn_data_exit(filter);
+ }
if (!address.getSubject().empty()) {
//filter:
pn_data_t* filter = pn_terminus_filter(source);