diff options
| author | Andrew Stitcher <astitcher@apache.org> | 2013-03-04 21:08:17 +0000 |
|---|---|---|
| committer | Andrew Stitcher <astitcher@apache.org> | 2013-03-04 21:08:17 +0000 |
| commit | 5aa4b2dc7e531c12597323c2ca73ade8fa72793a (patch) | |
| tree | 1cdf6d586c8af4441f3cd9ace56bf83a1a64339c /qpid/cpp/src | |
| parent | 0337e64c4b49ca3aa83571e483dfee536d0ccb96 (diff) | |
| download | qpid-python-5aa4b2dc7e531c12597323c2ca73ade8fa72793a.tar.gz | |
QPID-4558: Selectors for C++ broker
- Add support to 0-10 protocol codepaths in client messaging library
and the broker to transmit a selector when subscribing to a queue
- This is specified by using a link property qpid.selector in the queue
address.
- The selector is actually transmitted under 0-10 as an user vlaue
named qpid.selector in the arguments table of the subscription.
- Added simple selector framework to broker.
- Added in infrastructure for selector evaluation
-- Put in place a trivial (but real) selector:
The expression specifies a message property
and returns true if it is present.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1452522 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/Makefile.am | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Selector.cpp | 82 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Selector.h | 88 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.cpp | 7 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/SemanticState.h | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp | 10 |
7 files changed, 192 insertions, 2 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index 9e9662ee38..c44bb8d999 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -1205,6 +1205,8 @@ set (qpidbroker_SOURCES qpid/broker/RetryList.cpp qpid/broker/SecureConnection.cpp qpid/broker/SecureConnectionFactory.cpp + qpid/broker/Selector.h + qpid/broker/Selector.cpp qpid/broker/SemanticState.h qpid/broker/SemanticState.cpp qpid/broker/SessionAdapter.cpp diff --git a/qpid/cpp/src/Makefile.am b/qpid/cpp/src/Makefile.am index 0b5de50283..7d03cd1d26 100644 --- a/qpid/cpp/src/Makefile.am +++ b/qpid/cpp/src/Makefile.am @@ -713,6 +713,8 @@ libqpidbroker_la_SOURCES = \ qpid/broker/SecureConnection.h \ qpid/broker/SecureConnectionFactory.cpp \ qpid/broker/SecureConnectionFactory.h \ + qpid/broker/Selector.cpp \ + qpid/broker/Selector.h \ qpid/broker/SemanticState.cpp \ qpid/broker/SemanticState.h \ qpid/broker/SessionAdapter.cpp \ diff --git a/qpid/cpp/src/qpid/broker/Selector.cpp b/qpid/cpp/src/qpid/broker/Selector.cpp new file mode 100644 index 0000000000..805bcdbba1 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/Selector.cpp @@ -0,0 +1,82 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/broker/Selector.h" + +#include "qpid/broker/Message.h" + +#include <boost/make_shared.hpp> + +namespace qpid { +namespace broker { + +using std::string; + +Selector::Selector(const string& e) : + expression(e) +{ +} + +Selector::~Selector() +{ +} + +bool Selector::eval(const SelectorEnv& env) +{ + // Simple test - return true if expression is a non-empty property + return env.present(expression); +} + +bool Selector::filter(const Message& msg) +{ + return eval(MessageSelectorEnv(msg)); +} + +MessageSelectorEnv::MessageSelectorEnv(const Message& m) : + msg(m) +{ +} + +bool MessageSelectorEnv::present(const string& identifier) const +{ + // If the value we get is void then most likely the property wasn't present + return !msg.getProperty(identifier).isVoid(); +} + +string MessageSelectorEnv::value(const string& identifier) const +{ + // Just return property as string + return msg.getPropertyAsString(identifier); +} + + +namespace { +const boost::shared_ptr<Selector> NULL_SELECTOR = boost::shared_ptr<Selector>(); +} + +boost::shared_ptr<Selector> returnSelector(const string& e) +{ + if (e.empty()) return NULL_SELECTOR; + return boost::make_shared<Selector>(e); +} + + +}} diff --git a/qpid/cpp/src/qpid/broker/Selector.h b/qpid/cpp/src/qpid/broker/Selector.h new file mode 100644 index 0000000000..dc0a5719b7 --- /dev/null +++ b/qpid/cpp/src/qpid/broker/Selector.h @@ -0,0 +1,88 @@ +#ifndef QPID_BROKER_SELECTOR_H +#define QPID_BROKER_SELECTOR_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <string> + +#include <boost/scoped_ptr.hpp> +#include <boost/shared_ptr.hpp> + +namespace qpid { +namespace broker { + +class Message; + +/** + * Interface to provide values to a Selector evaluation + * (For the moment just use string values) + * TODO: allow more complex values. + */ +class SelectorEnv { +public: + virtual ~SelectorEnv() {}; + + virtual bool present(const std::string&) const = 0; + virtual std::string value(const std::string&) const = 0; +}; + +class MessageSelectorEnv : public SelectorEnv { + const Message& msg; + + bool present(const std::string&) const; + std::string value(const std::string&) const; + +public: + MessageSelectorEnv(const Message&); +}; + +class Selector { + const std::string expression; + +public: + Selector(const std::string&); + ~Selector(); + + /** + * Evaluate parsed expression with a given environment + */ + bool eval(const SelectorEnv& env); + + /** + * Apply selector to message + * @param msg message to filter against selector + * @return true if msg meets the selector specification + */ + bool filter(const Message& msg); +}; + +/** + * Return a Selector as specified by the string: + * - Structured like this so that we can move to caching Selectors with the same + * specifications and just returning an existing one + */ +boost::shared_ptr<Selector> returnSelector(const std::string&); + +}} + +#endif + diff --git a/qpid/cpp/src/qpid/broker/SemanticState.cpp b/qpid/cpp/src/qpid/broker/SemanticState.cpp index 66d9a95cfc..60c2d6e555 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.cpp +++ b/qpid/cpp/src/qpid/broker/SemanticState.cpp @@ -28,6 +28,7 @@ #include "qpid/broker/DtxTimeout.h" #include "qpid/broker/Message.h" #include "qpid/broker/Queue.h" +#include "qpid/broker/Selector.h" #include "qpid/broker/SessionContext.h" #include "qpid/broker/SessionOutputException.h" #include "qpid/broker/TxAccept.h" @@ -285,6 +286,7 @@ void SemanticState::record(const DeliveryRecord& delivery) } const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency"); +const std::string QPID_SELECTOR("qpid.selector"); SemanticStateConsumerImpl::SemanticStateConsumerImpl(SemanticState* _parent, const string& _name, @@ -307,6 +309,7 @@ Consumer(_name, type), exclusive(_exclusive), resumeId(_resumeId), tag(_tag), + selector(returnSelector(_arguments.getAsString(QPID_SELECTOR))), resumeTtl(_resumeTtl), arguments(_arguments), notifyEnabled(true), @@ -378,9 +381,9 @@ bool SemanticStateConsumerImpl::deliver(const QueueCursor& cursor, const Message return true; } -bool SemanticStateConsumerImpl::filter(const Message&) +bool SemanticStateConsumerImpl::filter(const Message& msg) { - return true; + return !selector || selector->filter(msg); } bool SemanticStateConsumerImpl::accept(const Message& msg) diff --git a/qpid/cpp/src/qpid/broker/SemanticState.h b/qpid/cpp/src/qpid/broker/SemanticState.h index 24ab30bf00..3105398426 100644 --- a/qpid/cpp/src/qpid/broker/SemanticState.h +++ b/qpid/cpp/src/qpid/broker/SemanticState.h @@ -52,6 +52,7 @@ #include <boost/enable_shared_from_this.hpp> #include <boost/cast.hpp> #include <boost/tuple/tuple.hpp> +#include <boost/shared_ptr.hpp> namespace qpid { namespace broker { @@ -59,6 +60,7 @@ namespace broker { class Exchange; class MessageStore; class ProtocolRegistry; +class Selector; class SessionContext; class SessionState; @@ -206,6 +208,7 @@ class SemanticStateConsumerImpl : public Consumer, public sys::OutputTask, bool exclusive; std::string resumeId; const std::string tag; // <destination> from AMQP 0-10 Message.subscribe command + boost::shared_ptr<Selector> selector; uint64_t resumeTtl; framing::FieldTable arguments; Credit credit; diff --git a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp index 2627c178f9..de6ee0cd2d 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/AddressResolution.cpp @@ -93,6 +93,7 @@ const std::string DURABLE("durable"); const std::string X_DECLARE("x-declare"); const std::string X_SUBSCRIBE("x-subscribe"); const std::string X_BINDINGS("x-bindings"); +const std::string QPID_SELECTOR("qpid.selector"); const std::string EXCHANGE("exchange"); const std::string QUEUE("queue"); const std::string KEY("key"); @@ -315,6 +316,7 @@ struct Opt Opt(const Variant::Map& base); Opt& operator/(const std::string& name); operator bool() const; + operator std::string() const; std::string str() const; bool asBool(bool defaultValue) const; const Variant::List& asList() const; @@ -348,6 +350,11 @@ Opt::operator bool() const return value && !value->isVoid() && value->asBool(); } +Opt::operator std::string() const +{ + return str(); +} + bool Opt::asBool(bool defaultValue) const { if (value) return value->asBool(); @@ -470,6 +477,8 @@ QueueSource::QueueSource(const Address& address) : //options) exclusive = Opt(address)/LINK/X_SUBSCRIBE/EXCLUSIVE; (Opt(address)/LINK/X_SUBSCRIBE/ARGUMENTS).collect(options); + std::string selector = Opt(address)/LINK/QPID_SELECTOR; + if (!selector.empty()) options.setString(QPID_SELECTOR, selector); } void QueueSource::subscribe(qpid::client::AsyncSession& session, const std::string& destination) @@ -981,6 +990,7 @@ Verifier::Verifier() link[X_SUBSCRIBE] = true; link[X_DECLARE] = true; link[X_BINDINGS] = true; + link[QPID_SELECTOR] = true; defined[LINK] = link; } void Verifier::verify(const Address& address) const |
