diff options
| author | Gordon Sim <gsim@apache.org> | 2015-08-28 22:16:27 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2015-08-28 22:16:27 +0000 |
| commit | 3d0db5c7cb450ef49ff6bcca072b92e869d3a0d1 (patch) | |
| tree | 1a76403c073a1bd9965d8542c62df03e9f572423 /qpid | |
| parent | 507553d663cce9764387b7135f99e2c47ebfcbee (diff) | |
| download | qpid-python-3d0db5c7cb450ef49ff6bcca072b92e869d3a0d1.tar.gz | |
QPID-6714: support for JMS header names in selectors, plus support for to, replyto and subject
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1698426 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Message.cpp | 13 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Message.h | 8 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Selector.cpp | 51 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Message.cpp | 28 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Message.h | 6 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Translation.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp | 38 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h | 4 | ||||
| -rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_1_0/selector.py | 22 |
9 files changed, 161 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp index 250acf6b4e..f41cf767c7 100644 --- a/qpid/cpp/src/qpid/broker/Message.cpp +++ b/qpid/cpp/src/qpid/broker/Message.cpp @@ -64,6 +64,19 @@ std::string Message::getRoutingKey() const return getEncoding().getRoutingKey(); } +std::string Message::getTo() const +{ + return getEncoding().getTo(); +} +std::string Message::getSubject() const +{ + return getEncoding().getSubject(); +} +std::string Message::getReplyTo() const +{ + return getEncoding().getReplyTo(); +} + bool Message::isPersistent() const { return getEncoding().isPersistent(); diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h index 9843bc6220..1865a4fa4e 100644 --- a/qpid/cpp/src/qpid/broker/Message.h +++ b/qpid/cpp/src/qpid/broker/Message.h @@ -78,6 +78,9 @@ public: virtual void processProperties(qpid::amqp::MapHandler&) const = 0; virtual std::string getUserId() const = 0; virtual uint64_t getTimestamp() const = 0; + virtual std::string getTo() const = 0; + virtual std::string getSubject() const = 0; + virtual std::string getReplyTo() const = 0; }; class SharedState : public Encoding @@ -137,6 +140,11 @@ public: QPID_BROKER_EXTERN uint64_t getTimestamp() const; + //required for selectors: + QPID_BROKER_EXTERN std::string getTo() const; + QPID_BROKER_EXTERN std::string getSubject() const; + QPID_BROKER_EXTERN std::string getReplyTo() const; + QPID_BROKER_EXTERN void addAnnotation(const std::string& key, const qpid::types::Variant& value); QPID_BROKER_EXTERN bool isExcluded(const std::vector<std::string>& excludes) const; QPID_BROKER_EXTERN void addTraceId(const std::string& id); diff --git a/qpid/cpp/src/qpid/broker/Selector.cpp b/qpid/cpp/src/qpid/broker/Selector.cpp index 21247e125c..bf30df59e4 100644 --- a/qpid/cpp/src/qpid/broker/Selector.cpp +++ b/qpid/cpp/src/qpid/broker/Selector.cpp @@ -30,6 +30,7 @@ #include "qpid/log/Statement.h" #include "qpid/types/Variant.h" +#include <map> #include <stdexcept> #include <string> #include <sstream> @@ -54,6 +55,7 @@ using qpid::amqp::MessageId; * priority | Priority | priority header section * delivery_count | | delivery-count header section * redelivered |[Redelivered] | (delivery_count>0) (computed value) + * subject | Type | subject properties section * correlation_id | CorrelationID| correlation-id properties section * to |[Destination] | to properties section * absolute_expiry_time |[Expiration] | absolute-expiry-time properties section @@ -66,6 +68,26 @@ const string EMPTY; const string PERSISTENT("PERSISTENT"); const string NON_PERSISTENT("NON_PERSISTENT"); +namespace { + typedef std::map<std::string, std::string> Aliases; + Aliases define_aliases() + { + Aliases aliases; + aliases["JMSType"] = "subject"; + aliases["JMSCorrelationID"] = "correlation_id"; + aliases["JMSMessageID"] = "message_id"; + aliases["JMSDeliveryMode"] = "delivery_mode"; + aliases["JMSRedelivered"] = "redelivered"; + aliases["JMSPriority"] = "priority"; + aliases["JMSDestination"] = "to"; + aliases["JMSReplyTo"] = "reply_to"; + aliases["JMSTimestamp"] = "creation_time"; + aliases["JMSExpiration"] = "absolute_expiry_time"; + return aliases; + } + const Aliases aliases = define_aliases(); +} + class MessageSelectorEnv : public SelectorEnv { const Message& msg; mutable boost::ptr_vector<string> returnedStrings; @@ -82,8 +104,7 @@ public: MessageSelectorEnv::MessageSelectorEnv(const Message& m) : msg(m), valuesLookedup(false) -{ -} +{} const Value MessageSelectorEnv::specialValue(const string& id) const { @@ -91,6 +112,12 @@ const Value MessageSelectorEnv::specialValue(const string& id) const // TODO: Just use a simple if chain for now - improve this later if ( id=="delivery_mode" ) { v = msg.getEncoding().isPersistent() ? PERSISTENT : NON_PERSISTENT; + } else if ( id=="subject" ) { + std::string s = msg.getSubject(); + if (!s.empty()) { + returnedStrings.push_back(new string(s)); + v = returnedStrings[returnedStrings.size()-1]; + } } else if ( id=="redelivered" ) { // Although redelivered is defined to be true delivery-count>0 if it is 0 now // it will be 1 by the time the message is delivered @@ -110,9 +137,17 @@ const Value MessageSelectorEnv::specialValue(const string& id) const v = returnedStrings[returnedStrings.size()-1]; } } else if ( id=="to" ) { - v = EMPTY; // Hard to get this correct for both 1.0 and 0-10 + std::string s = msg.getTo(); + if (!s.empty()) { + returnedStrings.push_back(new string(s)); + v = returnedStrings[returnedStrings.size()-1]; + } } else if ( id=="reply_to" ) { - v = EMPTY; // Hard to get this correct for both 1.0 and 0-10 + std::string s = msg.getReplyTo(); + if (!s.empty()) { + returnedStrings.push_back(new string(s)); + v = returnedStrings[returnedStrings.size()-1]; + } } else if ( id=="absolute_expiry_time" ) { qpid::sys::AbsTime expiry = msg.getExpiration(); // Java property has value of 0 for no expiry @@ -183,6 +218,14 @@ const Value& MessageSelectorEnv::value(const string& identifier) const QPID_LOG(debug, "Selector lookup special identifier: " << identifier); returnedValues[identifier] = specialValue(identifier.substr(5)); } + } else if (identifier.substr(0, 3) == "JMS") { + Aliases::const_iterator equivalent = aliases.find(identifier); + if (equivalent != aliases.end()) { + QPID_LOG(debug, "Selector lookup JMS identifier: " << identifier << " treated as alias for " << equivalent->second); + returnedValues[identifier] = specialValue(equivalent->second); + } else { + QPID_LOG(info, "Unrecognised JMS identifier in selector: " << identifier); + } } else if (!valuesLookedup) { QPID_LOG(debug, "Selector lookup triggered by: " << identifier); // Iterate over all the message properties diff --git a/qpid/cpp/src/qpid/broker/amqp/Message.cpp b/qpid/cpp/src/qpid/broker/amqp/Message.cpp index 54741e6436..857ca2c313 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Message.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Message.cpp @@ -62,9 +62,8 @@ std::string Message::getUserId() const uint64_t Message::getTimestamp() const { - //AMQP 1.0 message doesn't have the equivalent of the 0-10 timestamp field - //TODO: define an annotation for that - return 0; + //creation time is in milliseconds, timestamp (from the 0-10 spec) is in seconds + return !creationTime ? 0 : creationTime.get()/1000; } bool Message::isPersistent() const @@ -87,6 +86,25 @@ uint8_t Message::getPriority() const else return priority.get(); } +std::string Message::getTo() const +{ + std::string v; + if (to.data) v.assign(to.data, to.size); + return v; +} +std::string Message::getSubject() const +{ + std::string v; + if (subject.data) v.assign(subject.data, subject.size); + return v; +} +std::string Message::getReplyTo() const +{ + std::string v; + if (replyTo.data) v.assign(replyTo.data, replyTo.size); + return v; +} + namespace { class StringRetriever : public MapHandler { @@ -242,7 +260,7 @@ qpid::amqp::MessageId Message::getMessageId() const { return messageId; } -qpid::amqp::CharSequence Message::getReplyTo() const +qpid::amqp::CharSequence Message::getReplyToAsCharSequence() const { return replyTo; } @@ -318,7 +336,7 @@ void Message::onCorrelationId(const qpid::amqp::CharSequence& v, qpid::types::Va void Message::onContentType(const qpid::amqp::CharSequence& v) { contentType = v; } void Message::onContentEncoding(const qpid::amqp::CharSequence& v) { contentEncoding = v; } void Message::onAbsoluteExpiryTime(int64_t) {} -void Message::onCreationTime(int64_t) {} +void Message::onCreationTime(int64_t v) { creationTime = v; } void Message::onGroupId(const qpid::amqp::CharSequence&) {} void Message::onGroupSequence(uint32_t) {} void Message::onReplyToGroupId(const qpid::amqp::CharSequence&) {} diff --git a/qpid/cpp/src/qpid/broker/amqp/Message.h b/qpid/cpp/src/qpid/broker/amqp/Message.h index 20310aa977..39ed1f60b6 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Message.h +++ b/qpid/cpp/src/qpid/broker/amqp/Message.h @@ -53,10 +53,13 @@ class Message : public qpid::broker::Message::SharedStateImpl, private qpid::amq void processProperties(qpid::amqp::MapHandler&) const; std::string getUserId() const; uint64_t getTimestamp() const; + std::string getTo() const; + std::string getSubject() const; + std::string getReplyTo() const; qpid::amqp::MessageId getMessageId() const; qpid::amqp::MessageId getCorrelationId() const; - qpid::amqp::CharSequence getReplyTo() const; + qpid::amqp::CharSequence getReplyToAsCharSequence() const; qpid::amqp::CharSequence getContentType() const; qpid::amqp::CharSequence getContentEncoding() const; @@ -108,6 +111,7 @@ class Message : public qpid::broker::Message::SharedStateImpl, private qpid::amq qpid::amqp::MessageId correlationId; qpid::amqp::CharSequence contentType; qpid::amqp::CharSequence contentEncoding; + boost::optional<int64_t> creationTime; //application-properties: qpid::amqp::CharSequence applicationProperties; diff --git a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp index 2196c8ff3d..23375f0e99 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp @@ -245,7 +245,7 @@ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation props->setCorrelationId(boost::lexical_cast<std::string>(cid.value.ulong)); break; } - if (message->getReplyTo()) props->setReplyTo(translate(message->getReplyTo(), broker)); + if (message->getReplyToAsCharSequence()) props->setReplyTo(translate(message->getReplyTo(), broker)); if (message->getContentType()) props->setContentType(translate(message->getContentType())); if (message->getContentEncoding()) props->setContentEncoding(translate(message->getContentEncoding())); props->setUserId(message->getUserId()); diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp index a43bf8efa6..8c65c9c55e 100644 --- a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp +++ b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp @@ -41,8 +41,11 @@ namespace qpid { namespace broker { namespace amqp_0_10 { namespace { +const std::string DELIMITER("/"); +const std::string EMPTY; const std::string QMF2("qmf2"); const std::string PARTIAL("partial"); +const std::string SUBJECT_KEY("qpid.subject"); } MessageTransfer::MessageTransfer() : frames(framing::SequenceNumber()), requiredCredit(0), cachedRequiredCredit(false) {} MessageTransfer::MessageTransfer(const framing::SequenceNumber& id) : frames(id), requiredCredit(0), cachedRequiredCredit(false) {} @@ -143,6 +146,41 @@ uint64_t MessageTransfer::getTimestamp() const return props ? props->getTimestamp() : 0; } +std::string MessageTransfer::getTo() const +{ + const DeliveryProperties* props = getProperties<DeliveryProperties>(); + if (props) { + //if message was sent to 'nameless exchange' then the routing key is the queue + return props->getExchange().empty() ? props->getRoutingKey() : props->getExchange(); + } else { + return EMPTY; + } +} +std::string MessageTransfer::getSubject() const +{ + const DeliveryProperties* props = getProperties<DeliveryProperties>(); + if (props) { + //if message was sent to 'nameless exchange' then the routing key is the queue name, not the subject + return props->getExchange().empty() ? getPropertyAsString(SUBJECT_KEY) : props->getRoutingKey(); + } else { + return EMPTY; + } +} +std::string MessageTransfer::getReplyTo() const +{ + const MessageProperties* props = getProperties<MessageProperties>(); + if (props && props->hasReplyTo()) { + const qpid::framing::ReplyTo& replyto = props->getReplyTo(); + if (replyto.hasExchange() && replyto.hasRoutingKey()) + return replyto.getExchange() + DELIMITER + replyto.getRoutingKey(); + else if (replyto.hasExchange()) return replyto.getExchange(); + else if (replyto.hasRoutingKey()) return replyto.getRoutingKey(); + else return EMPTY; + } else { + return EMPTY; + } +} + bool MessageTransfer::requiresAccept() const { const framing::MessageTransferBody* b = getFrames().as<framing::MessageTransferBody>(); diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h index 513bbe1bfb..fdf4fd0f95 100644 --- a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h +++ b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h @@ -57,6 +57,10 @@ class MessageTransfer : public qpid::broker::Message::SharedStateImpl, public qp std::string getUserId() const; void setTimestamp(); uint64_t getTimestamp() const; + std::string getTo() const; + std::string getSubject() const; + std::string getReplyTo() const; + bool requiresAccept() const; const qpid::framing::SequenceNumber& getCommandId() const; diff --git a/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py b/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py index ac2bbd8db3..323baaab07 100644 --- a/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py +++ b/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py @@ -71,3 +71,25 @@ class SelectorTests (VersionTest): msg = rcv_4.fetch(0) assert msg.content == 'd' self.ssn.acknowledge(msg) + + def check_selected(self,node, selector, expected_content): + rcv = self.ssn.receiver("%s; {mode:browse, link:{selector:\"%s\"}}" % (node, selector)) + msg = rcv.fetch(0) + assert msg.content == expected_content, msg + rcv.close() + + def test_jms_header_names(self): + """ + The new AMQP 1.0 based JMS client uses these rather than the special names above + """ + msgs = [Message(content=i, id=i, correlation_id=i, subject=i, priority=p+1, reply_to=i, properties={'x-amqp-to':i}) for p, i in enumerate(['a', 'b', 'c', 'd'])] + + snd = self.ssn.sender("#") + for m in msgs: snd.send(m) + + self.check_selected(snd.target, "JMSMessageID = 'a'", 'a') + self.check_selected(snd.target, "JMSCorrelationID = 'b'", 'b') + self.check_selected(snd.target, "JMSPriority = 3", 'c') + self.check_selected(snd.target, "JMSDestination = 'a'", 'a') + self.check_selected(snd.target, "JMSReplyTo = 'b'", 'b') + self.check_selected(snd.target, "JMSType = 'c'", 'c') |
