summaryrefslogtreecommitdiff
path: root/qpid
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2015-08-28 22:16:27 +0000
committerGordon Sim <gsim@apache.org>2015-08-28 22:16:27 +0000
commit3d0db5c7cb450ef49ff6bcca072b92e869d3a0d1 (patch)
tree1a76403c073a1bd9965d8542c62df03e9f572423 /qpid
parent507553d663cce9764387b7135f99e2c47ebfcbee (diff)
downloadqpid-python-3d0db5c7cb450ef49ff6bcca072b92e869d3a0d1.tar.gz
QPID-6714: support for JMS header names in selectors, plus support for to, replyto and subject
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1698426 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid')
-rw-r--r--qpid/cpp/src/qpid/broker/Message.cpp13
-rw-r--r--qpid/cpp/src/qpid/broker/Message.h8
-rw-r--r--qpid/cpp/src/qpid/broker/Selector.cpp51
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Message.cpp28
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Message.h6
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Translation.cpp2
-rw-r--r--qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp38
-rw-r--r--qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h4
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_1_0/selector.py22
9 files changed, 161 insertions, 11 deletions
diff --git a/qpid/cpp/src/qpid/broker/Message.cpp b/qpid/cpp/src/qpid/broker/Message.cpp
index 250acf6b4e..f41cf767c7 100644
--- a/qpid/cpp/src/qpid/broker/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/Message.cpp
@@ -64,6 +64,19 @@ std::string Message::getRoutingKey() const
return getEncoding().getRoutingKey();
}
+std::string Message::getTo() const
+{
+ return getEncoding().getTo();
+}
+std::string Message::getSubject() const
+{
+ return getEncoding().getSubject();
+}
+std::string Message::getReplyTo() const
+{
+ return getEncoding().getReplyTo();
+}
+
bool Message::isPersistent() const
{
return getEncoding().isPersistent();
diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h
index 9843bc6220..1865a4fa4e 100644
--- a/qpid/cpp/src/qpid/broker/Message.h
+++ b/qpid/cpp/src/qpid/broker/Message.h
@@ -78,6 +78,9 @@ public:
virtual void processProperties(qpid::amqp::MapHandler&) const = 0;
virtual std::string getUserId() const = 0;
virtual uint64_t getTimestamp() const = 0;
+ virtual std::string getTo() const = 0;
+ virtual std::string getSubject() const = 0;
+ virtual std::string getReplyTo() const = 0;
};
class SharedState : public Encoding
@@ -137,6 +140,11 @@ public:
QPID_BROKER_EXTERN uint64_t getTimestamp() const;
+ //required for selectors:
+ QPID_BROKER_EXTERN std::string getTo() const;
+ QPID_BROKER_EXTERN std::string getSubject() const;
+ QPID_BROKER_EXTERN std::string getReplyTo() const;
+
QPID_BROKER_EXTERN void addAnnotation(const std::string& key, const qpid::types::Variant& value);
QPID_BROKER_EXTERN bool isExcluded(const std::vector<std::string>& excludes) const;
QPID_BROKER_EXTERN void addTraceId(const std::string& id);
diff --git a/qpid/cpp/src/qpid/broker/Selector.cpp b/qpid/cpp/src/qpid/broker/Selector.cpp
index 21247e125c..bf30df59e4 100644
--- a/qpid/cpp/src/qpid/broker/Selector.cpp
+++ b/qpid/cpp/src/qpid/broker/Selector.cpp
@@ -30,6 +30,7 @@
#include "qpid/log/Statement.h"
#include "qpid/types/Variant.h"
+#include <map>
#include <stdexcept>
#include <string>
#include <sstream>
@@ -54,6 +55,7 @@ using qpid::amqp::MessageId;
* priority | Priority | priority header section
* delivery_count | | delivery-count header section
* redelivered |[Redelivered] | (delivery_count>0) (computed value)
+ * subject | Type | subject properties section
* correlation_id | CorrelationID| correlation-id properties section
* to |[Destination] | to properties section
* absolute_expiry_time |[Expiration] | absolute-expiry-time properties section
@@ -66,6 +68,26 @@ const string EMPTY;
const string PERSISTENT("PERSISTENT");
const string NON_PERSISTENT("NON_PERSISTENT");
+namespace {
+ typedef std::map<std::string, std::string> Aliases;
+ Aliases define_aliases()
+ {
+ Aliases aliases;
+ aliases["JMSType"] = "subject";
+ aliases["JMSCorrelationID"] = "correlation_id";
+ aliases["JMSMessageID"] = "message_id";
+ aliases["JMSDeliveryMode"] = "delivery_mode";
+ aliases["JMSRedelivered"] = "redelivered";
+ aliases["JMSPriority"] = "priority";
+ aliases["JMSDestination"] = "to";
+ aliases["JMSReplyTo"] = "reply_to";
+ aliases["JMSTimestamp"] = "creation_time";
+ aliases["JMSExpiration"] = "absolute_expiry_time";
+ return aliases;
+ }
+ const Aliases aliases = define_aliases();
+}
+
class MessageSelectorEnv : public SelectorEnv {
const Message& msg;
mutable boost::ptr_vector<string> returnedStrings;
@@ -82,8 +104,7 @@ public:
MessageSelectorEnv::MessageSelectorEnv(const Message& m) :
msg(m),
valuesLookedup(false)
-{
-}
+{}
const Value MessageSelectorEnv::specialValue(const string& id) const
{
@@ -91,6 +112,12 @@ const Value MessageSelectorEnv::specialValue(const string& id) const
// TODO: Just use a simple if chain for now - improve this later
if ( id=="delivery_mode" ) {
v = msg.getEncoding().isPersistent() ? PERSISTENT : NON_PERSISTENT;
+ } else if ( id=="subject" ) {
+ std::string s = msg.getSubject();
+ if (!s.empty()) {
+ returnedStrings.push_back(new string(s));
+ v = returnedStrings[returnedStrings.size()-1];
+ }
} else if ( id=="redelivered" ) {
// Although redelivered is defined to be true delivery-count>0 if it is 0 now
// it will be 1 by the time the message is delivered
@@ -110,9 +137,17 @@ const Value MessageSelectorEnv::specialValue(const string& id) const
v = returnedStrings[returnedStrings.size()-1];
}
} else if ( id=="to" ) {
- v = EMPTY; // Hard to get this correct for both 1.0 and 0-10
+ std::string s = msg.getTo();
+ if (!s.empty()) {
+ returnedStrings.push_back(new string(s));
+ v = returnedStrings[returnedStrings.size()-1];
+ }
} else if ( id=="reply_to" ) {
- v = EMPTY; // Hard to get this correct for both 1.0 and 0-10
+ std::string s = msg.getReplyTo();
+ if (!s.empty()) {
+ returnedStrings.push_back(new string(s));
+ v = returnedStrings[returnedStrings.size()-1];
+ }
} else if ( id=="absolute_expiry_time" ) {
qpid::sys::AbsTime expiry = msg.getExpiration();
// Java property has value of 0 for no expiry
@@ -183,6 +218,14 @@ const Value& MessageSelectorEnv::value(const string& identifier) const
QPID_LOG(debug, "Selector lookup special identifier: " << identifier);
returnedValues[identifier] = specialValue(identifier.substr(5));
}
+ } else if (identifier.substr(0, 3) == "JMS") {
+ Aliases::const_iterator equivalent = aliases.find(identifier);
+ if (equivalent != aliases.end()) {
+ QPID_LOG(debug, "Selector lookup JMS identifier: " << identifier << " treated as alias for " << equivalent->second);
+ returnedValues[identifier] = specialValue(equivalent->second);
+ } else {
+ QPID_LOG(info, "Unrecognised JMS identifier in selector: " << identifier);
+ }
} else if (!valuesLookedup) {
QPID_LOG(debug, "Selector lookup triggered by: " << identifier);
// Iterate over all the message properties
diff --git a/qpid/cpp/src/qpid/broker/amqp/Message.cpp b/qpid/cpp/src/qpid/broker/amqp/Message.cpp
index 54741e6436..857ca2c313 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Message.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Message.cpp
@@ -62,9 +62,8 @@ std::string Message::getUserId() const
uint64_t Message::getTimestamp() const
{
- //AMQP 1.0 message doesn't have the equivalent of the 0-10 timestamp field
- //TODO: define an annotation for that
- return 0;
+ //creation time is in milliseconds, timestamp (from the 0-10 spec) is in seconds
+ return !creationTime ? 0 : creationTime.get()/1000;
}
bool Message::isPersistent() const
@@ -87,6 +86,25 @@ uint8_t Message::getPriority() const
else return priority.get();
}
+std::string Message::getTo() const
+{
+ std::string v;
+ if (to.data) v.assign(to.data, to.size);
+ return v;
+}
+std::string Message::getSubject() const
+{
+ std::string v;
+ if (subject.data) v.assign(subject.data, subject.size);
+ return v;
+}
+std::string Message::getReplyTo() const
+{
+ std::string v;
+ if (replyTo.data) v.assign(replyTo.data, replyTo.size);
+ return v;
+}
+
namespace {
class StringRetriever : public MapHandler
{
@@ -242,7 +260,7 @@ qpid::amqp::MessageId Message::getMessageId() const
{
return messageId;
}
-qpid::amqp::CharSequence Message::getReplyTo() const
+qpid::amqp::CharSequence Message::getReplyToAsCharSequence() const
{
return replyTo;
}
@@ -318,7 +336,7 @@ void Message::onCorrelationId(const qpid::amqp::CharSequence& v, qpid::types::Va
void Message::onContentType(const qpid::amqp::CharSequence& v) { contentType = v; }
void Message::onContentEncoding(const qpid::amqp::CharSequence& v) { contentEncoding = v; }
void Message::onAbsoluteExpiryTime(int64_t) {}
-void Message::onCreationTime(int64_t) {}
+void Message::onCreationTime(int64_t v) { creationTime = v; }
void Message::onGroupId(const qpid::amqp::CharSequence&) {}
void Message::onGroupSequence(uint32_t) {}
void Message::onReplyToGroupId(const qpid::amqp::CharSequence&) {}
diff --git a/qpid/cpp/src/qpid/broker/amqp/Message.h b/qpid/cpp/src/qpid/broker/amqp/Message.h
index 20310aa977..39ed1f60b6 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Message.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Message.h
@@ -53,10 +53,13 @@ class Message : public qpid::broker::Message::SharedStateImpl, private qpid::amq
void processProperties(qpid::amqp::MapHandler&) const;
std::string getUserId() const;
uint64_t getTimestamp() const;
+ std::string getTo() const;
+ std::string getSubject() const;
+ std::string getReplyTo() const;
qpid::amqp::MessageId getMessageId() const;
qpid::amqp::MessageId getCorrelationId() const;
- qpid::amqp::CharSequence getReplyTo() const;
+ qpid::amqp::CharSequence getReplyToAsCharSequence() const;
qpid::amqp::CharSequence getContentType() const;
qpid::amqp::CharSequence getContentEncoding() const;
@@ -108,6 +111,7 @@ class Message : public qpid::broker::Message::SharedStateImpl, private qpid::amq
qpid::amqp::MessageId correlationId;
qpid::amqp::CharSequence contentType;
qpid::amqp::CharSequence contentEncoding;
+ boost::optional<int64_t> creationTime;
//application-properties:
qpid::amqp::CharSequence applicationProperties;
diff --git a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
index 2196c8ff3d..23375f0e99 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
@@ -245,7 +245,7 @@ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation
props->setCorrelationId(boost::lexical_cast<std::string>(cid.value.ulong));
break;
}
- if (message->getReplyTo()) props->setReplyTo(translate(message->getReplyTo(), broker));
+ if (message->getReplyToAsCharSequence()) props->setReplyTo(translate(message->getReplyTo(), broker));
if (message->getContentType()) props->setContentType(translate(message->getContentType()));
if (message->getContentEncoding()) props->setContentEncoding(translate(message->getContentEncoding()));
props->setUserId(message->getUserId());
diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
index a43bf8efa6..8c65c9c55e 100644
--- a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
@@ -41,8 +41,11 @@ namespace qpid {
namespace broker {
namespace amqp_0_10 {
namespace {
+const std::string DELIMITER("/");
+const std::string EMPTY;
const std::string QMF2("qmf2");
const std::string PARTIAL("partial");
+const std::string SUBJECT_KEY("qpid.subject");
}
MessageTransfer::MessageTransfer() : frames(framing::SequenceNumber()), requiredCredit(0), cachedRequiredCredit(false) {}
MessageTransfer::MessageTransfer(const framing::SequenceNumber& id) : frames(id), requiredCredit(0), cachedRequiredCredit(false) {}
@@ -143,6 +146,41 @@ uint64_t MessageTransfer::getTimestamp() const
return props ? props->getTimestamp() : 0;
}
+std::string MessageTransfer::getTo() const
+{
+ const DeliveryProperties* props = getProperties<DeliveryProperties>();
+ if (props) {
+ //if message was sent to 'nameless exchange' then the routing key is the queue
+ return props->getExchange().empty() ? props->getRoutingKey() : props->getExchange();
+ } else {
+ return EMPTY;
+ }
+}
+std::string MessageTransfer::getSubject() const
+{
+ const DeliveryProperties* props = getProperties<DeliveryProperties>();
+ if (props) {
+ //if message was sent to 'nameless exchange' then the routing key is the queue name, not the subject
+ return props->getExchange().empty() ? getPropertyAsString(SUBJECT_KEY) : props->getRoutingKey();
+ } else {
+ return EMPTY;
+ }
+}
+std::string MessageTransfer::getReplyTo() const
+{
+ const MessageProperties* props = getProperties<MessageProperties>();
+ if (props && props->hasReplyTo()) {
+ const qpid::framing::ReplyTo& replyto = props->getReplyTo();
+ if (replyto.hasExchange() && replyto.hasRoutingKey())
+ return replyto.getExchange() + DELIMITER + replyto.getRoutingKey();
+ else if (replyto.hasExchange()) return replyto.getExchange();
+ else if (replyto.hasRoutingKey()) return replyto.getRoutingKey();
+ else return EMPTY;
+ } else {
+ return EMPTY;
+ }
+}
+
bool MessageTransfer::requiresAccept() const
{
const framing::MessageTransferBody* b = getFrames().as<framing::MessageTransferBody>();
diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
index 513bbe1bfb..fdf4fd0f95 100644
--- a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
+++ b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
@@ -57,6 +57,10 @@ class MessageTransfer : public qpid::broker::Message::SharedStateImpl, public qp
std::string getUserId() const;
void setTimestamp();
uint64_t getTimestamp() const;
+ std::string getTo() const;
+ std::string getSubject() const;
+ std::string getReplyTo() const;
+
bool requiresAccept() const;
const qpid::framing::SequenceNumber& getCommandId() const;
diff --git a/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py b/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py
index ac2bbd8db3..323baaab07 100644
--- a/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py
+++ b/qpid/tests/src/py/qpid_tests/broker_1_0/selector.py
@@ -71,3 +71,25 @@ class SelectorTests (VersionTest):
msg = rcv_4.fetch(0)
assert msg.content == 'd'
self.ssn.acknowledge(msg)
+
+ def check_selected(self,node, selector, expected_content):
+ rcv = self.ssn.receiver("%s; {mode:browse, link:{selector:\"%s\"}}" % (node, selector))
+ msg = rcv.fetch(0)
+ assert msg.content == expected_content, msg
+ rcv.close()
+
+ def test_jms_header_names(self):
+ """
+ The new AMQP 1.0 based JMS client uses these rather than the special names above
+ """
+ msgs = [Message(content=i, id=i, correlation_id=i, subject=i, priority=p+1, reply_to=i, properties={'x-amqp-to':i}) for p, i in enumerate(['a', 'b', 'c', 'd'])]
+
+ snd = self.ssn.sender("#")
+ for m in msgs: snd.send(m)
+
+ self.check_selected(snd.target, "JMSMessageID = 'a'", 'a')
+ self.check_selected(snd.target, "JMSCorrelationID = 'b'", 'b')
+ self.check_selected(snd.target, "JMSPriority = 3", 'c')
+ self.check_selected(snd.target, "JMSDestination = 'a'", 'a')
+ self.check_selected(snd.target, "JMSReplyTo = 'b'", 'b')
+ self.check_selected(snd.target, "JMSType = 'c'", 'c')