diff options
author | Andrew Stitcher <astitcher@apache.org> | 2013-07-05 16:06:14 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2013-07-05 16:06:14 +0000 |
commit | 0792650df72a44327dca0f247b2dd7ab367ca38f (patch) | |
tree | 5176e54ba2b4f4dcd8dfbfd5b0b8aa07049ace47 | |
parent | 77171f498c5c7dca09448ce8168c3bd30bfe3825 (diff) | |
download | qpid-python-0792650df72a44327dca0f247b2dd7ab367ca38f.tar.gz |
QPID-4627: Implement most of the remaining selector special identifiers
Implemented:
message_id, correlation_id,
jms_type, creation_time, absolute_expiry_time
There are a couple of caveats: The easily available way to get
jms_type doesn't distinguish between an empty string and the
property not being sent at all. So we treat this case as property
not set as that seems like it will get most cases correct (why bother
to send an empty jms_type?). The creation_time property is currently
implemented as the time the message was put on the queue (if enabled
in the broker) as amqp 0_10 has no standard way to indicate the
creation time and we're not currently holding the creation time for amqp 1.0
messages.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1500052 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/qpid/amqp/CharSequence.cpp | 18 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/amqp/CharSequence.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/amqp/Decoder.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/amqp/MessageId.cpp | 18 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/amqp/MessageId.h | 3 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Message.h | 18 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Selector.cpp | 82 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Message.h | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Translation.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp | 23 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h | 2 |
11 files changed, 136 insertions, 41 deletions
diff --git a/qpid/cpp/src/qpid/amqp/CharSequence.cpp b/qpid/cpp/src/qpid/amqp/CharSequence.cpp index 857ec7e587..7e433bd26e 100644 --- a/qpid/cpp/src/qpid/amqp/CharSequence.cpp +++ b/qpid/cpp/src/qpid/amqp/CharSequence.cpp @@ -38,10 +38,28 @@ std::string CharSequence::str() const return std::string(data, size); } +CharSequence CharSequence::create() +{ + CharSequence c = {0, 0}; + return c; +} + +CharSequence CharSequence::create(const std::string& str) +{ + CharSequence c = {str.data(), str.size()}; + return c; +} + CharSequence CharSequence::create(const char* data, size_t size) { CharSequence c = {data, size}; return c; } +CharSequence CharSequence::create(const unsigned char* data, size_t size) +{ + CharSequence c = {reinterpret_cast<const char*>(data), size}; + return c; +} + }} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/CharSequence.h b/qpid/cpp/src/qpid/amqp/CharSequence.h index 307a4b1537..752097c913 100644 --- a/qpid/cpp/src/qpid/amqp/CharSequence.h +++ b/qpid/cpp/src/qpid/amqp/CharSequence.h @@ -42,7 +42,10 @@ struct CharSequence QPID_COMMON_EXTERN std::string str() const; QPID_COMMON_EXTERN void init(); + QPID_COMMON_EXTERN static CharSequence create(); + QPID_COMMON_EXTERN static CharSequence create(const std::string& str); QPID_COMMON_EXTERN static CharSequence create(const char* data, size_t size); + QPID_COMMON_EXTERN static CharSequence create(const unsigned char* data, size_t size); }; }} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/Decoder.cpp b/qpid/cpp/src/qpid/amqp/Decoder.cpp index 9c577e6c92..1d5abc99c7 100644 --- a/qpid/cpp/src/qpid/amqp/Decoder.cpp +++ b/qpid/cpp/src/qpid/amqp/Decoder.cpp @@ -263,7 +263,7 @@ void Decoder::readValue(Reader& reader, uint8_t code, const Descriptor* descript break; case LIST0: - reader.onStartList(0, CharSequence::create(0, 0), descriptor); + reader.onStartList(0, CharSequence::create(), descriptor); reader.onEndList(0, descriptor); break; case LIST8: diff --git a/qpid/cpp/src/qpid/amqp/MessageId.cpp b/qpid/cpp/src/qpid/amqp/MessageId.cpp index e6f6f4a231..25e9c96ef6 100644 --- a/qpid/cpp/src/qpid/amqp/MessageId.cpp +++ b/qpid/cpp/src/qpid/amqp/MessageId.cpp @@ -25,14 +25,14 @@ namespace qpid { namespace amqp { -MessageId::MessageId() : type(BYTES) +MessageId::MessageId() : type(NONE) { - value.bytes.data = 0; - value.bytes.size = 0; } void MessageId::assign(std::string& s) const { switch (type) { + case NONE: + s = std::string(); case BYTES: if (value.bytes) s.assign(value.bytes.data, value.bytes.size); break; @@ -45,6 +45,18 @@ void MessageId::assign(std::string& s) const } } +MessageId::operator bool() const +{ + return type!=NONE; +} + +std::string MessageId::str() const +{ + std::string s; + assign(s); + return s; +} + void MessageId::set(qpid::amqp::CharSequence bytes, qpid::types::VariantType t) { switch (t) { diff --git a/qpid/cpp/src/qpid/amqp/MessageId.h b/qpid/cpp/src/qpid/amqp/MessageId.h index ee440f3011..4505469148 100644 --- a/qpid/cpp/src/qpid/amqp/MessageId.h +++ b/qpid/cpp/src/qpid/amqp/MessageId.h @@ -37,12 +37,15 @@ struct MessageId } value; enum { + NONE, BYTES, UUID, ULONG } type; QPID_COMMON_EXTERN MessageId(); + QPID_COMMON_EXTERN operator bool() const; + QPID_COMMON_EXTERN std::string str() const; QPID_COMMON_EXTERN void assign(std::string&) const; QPID_COMMON_EXTERN void set(qpid::amqp::CharSequence bytes, qpid::types::VariantType t); QPID_COMMON_EXTERN void set(uint64_t ulong); diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h index 8f12b06a9d..03ef21e77a 100644 --- a/qpid/cpp/src/qpid/broker/Message.h +++ b/qpid/cpp/src/qpid/broker/Message.h @@ -22,22 +22,24 @@ * */ -#include "qpid/broker/BrokerImportExport.h" -#include "qpid/sys/Time.h" -#include "qpid/types/Variant.h" +#include "qpid/RefCounted.h" +#include "qpid/broker/ExpiryPolicy.h" +#include "qpid/broker/PersistableMessage.h" //TODO: move the following out of framing or replace it #include "qpid/framing/SequenceNumber.h" +#include "qpid/sys/Time.h" +#include "qpid/types/Variant.h" + +#include "qpid/broker/BrokerImportExport.h" + #include <string> #include <vector> - -#include "qpid/RefCounted.h" #include <boost/intrusive_ptr.hpp> -#include "qpid/broker/ExpiryPolicy.h" -#include "qpid/broker/PersistableMessage.h" namespace qpid { namespace amqp { class MapHandler; +class MessageId; } namespace management { @@ -67,6 +69,8 @@ public: virtual bool isPersistent() const = 0; virtual uint8_t getPriority() const = 0; virtual uint64_t getContentSize() const = 0; + virtual qpid::amqp::MessageId getMessageId() const = 0; + virtual qpid::amqp::MessageId getCorrelationId() const = 0; virtual std::string getPropertyAsString(const std::string& key) const = 0; virtual std::string getAnnotationAsString(const std::string& key) const = 0; virtual bool getTtl(uint64_t&) const = 0; diff --git a/qpid/cpp/src/qpid/broker/Selector.cpp b/qpid/cpp/src/qpid/broker/Selector.cpp index 129787171d..0c25e7338b 100644 --- a/qpid/cpp/src/qpid/broker/Selector.cpp +++ b/qpid/cpp/src/qpid/broker/Selector.cpp @@ -22,8 +22,9 @@ #include "qpid/broker/Selector.h" #include "qpid/amqp/CharSequence.h" -#include "qpid/broker/Message.h" #include "qpid/amqp/MapHandler.h" +#include "qpid/amqp/MessageId.h" +#include "qpid/broker/Message.h" #include "qpid/broker/SelectorExpression.h" #include "qpid/broker/SelectorValue.h" #include "qpid/log/Statement.h" @@ -43,6 +44,7 @@ using std::string; using qpid::sys::unordered_map; using qpid::amqp::CharSequence; using qpid::amqp::MapHandler; +using qpid::amqp::MessageId; /** * Identifier (amqp.) | JMS... | amqp 1.0 equivalent @@ -63,32 +65,6 @@ const string EMPTY; const string PERSISTENT("PERSISTENT"); const string NON_PERSISTENT("NON_PERSISTENT"); -const Value specialValue(const Message& msg, const string& id) -{ - // TODO: Just use a simple if chain for now - improve this later - if ( id=="delivery_mode" ) { - return msg.getEncoding().isPersistent() ? PERSISTENT : NON_PERSISTENT; - } else if ( id=="redelivered" ) { - return msg.getDeliveryCount()>0 ? true : false; - } else if ( id=="priority" ) { - return int64_t(msg.getPriority()); - } else if ( id=="correlation_id" ) { - return EMPTY; // Needs an indirection in getEncoding(). - } else if ( id=="message_id" ) { - return EMPTY; // Needs an indirection in getEncoding(). - } else if ( id=="to" ) { - return EMPTY; // This is good for 0-10, not sure about 1.0 - } else if ( id=="reply_to" ) { - return EMPTY; // Needs an indirection in getEncoding(). - } else if ( id=="absolute_expiry_time" ) { - return EMPTY; // Needs an indirection in getEncoding(). - } else if ( id=="creation_time" ) { - return EMPTY; // Needs an indirection in getEncoding(). - } else if ( id=="jms_type" ) { - return EMPTY; - } else return Value(); -} - class MessageSelectorEnv : public SelectorEnv { const Message& msg; mutable boost::ptr_vector<string> returnedStrings; @@ -96,6 +72,7 @@ class MessageSelectorEnv : public SelectorEnv { mutable bool valuesLookedup; const Value& value(const string&) const; + const Value specialValue(const string&) const; public: MessageSelectorEnv(const Message&); @@ -107,6 +84,55 @@ MessageSelectorEnv::MessageSelectorEnv(const Message& m) : { } +const Value MessageSelectorEnv::specialValue(const string& id) const +{ + Value v; + // TODO: Just use a simple if chain for now - improve this later + if ( id=="delivery_mode" ) { + v = msg.getEncoding().isPersistent() ? PERSISTENT : NON_PERSISTENT; + } else if ( id=="redelivered" ) { + v = msg.getDeliveryCount()>0 ? true : false; + } else if ( id=="priority" ) { + v = int64_t(msg.getPriority()); + } else if ( id=="correlation_id" ) { + MessageId cId = msg.getEncoding().getCorrelationId(); + if (cId) { + returnedStrings.push_back(new string(cId.str())); + v = returnedStrings[returnedStrings.size()-1]; + } + } else if ( id=="message_id" ) { + MessageId mId = msg.getEncoding().getMessageId(); + if (mId) { + returnedStrings.push_back(new string(mId.str())); + v = returnedStrings[returnedStrings.size()-1]; + } + } else if ( id=="to" ) { + v = EMPTY; // Hard to get this correct for both 1.0 and 0-10 + } else if ( id=="reply_to" ) { + v = EMPTY; // Hard to get this correct for both 1.0 and 0-10 + } else if ( id=="absolute_expiry_time" ) { + qpid::sys::AbsTime expiry = msg.getExpiration(); + // Java property has value of 0 for no expiry + v = (expiry==qpid::sys::FAR_FUTURE) ? 0 + : qpid::sys::Duration(qpid::sys::AbsTime::Epoch(), expiry) / qpid::sys::TIME_MSEC; + } else if ( id=="creation_time" ) { + // Use the time put on queue (if it is enabled) as 0-10 has no standard way to get message + // creation time and we're not paying attention to the 1.0 creation time yet. + v = int64_t(msg.getTimestamp() * 1000); // getTimestamp() returns time in seconds we need milliseconds + } else if ( id=="jms_type" ) { + // Currently we can't distinguish between an empty JMSType and no JMSType + // We'll assume for now that setting an empty JMSType doesn't make a lot of sense + const string jmsType = msg.getAnnotation("jms-type").asString(); + if ( !jmsType.empty() ) { + returnedStrings.push_back(new string(jmsType)); + v = returnedStrings[returnedStrings.size()-1]; + } + } else { + v = Value(); + } + return v; +} + struct ValueHandler : public broker::MapHandler { unordered_map<string, Value>& values; boost::ptr_vector<string>& strings; @@ -152,7 +178,7 @@ const Value& MessageSelectorEnv::value(const string& identifier) const if ( identifier.substr(0, 5) == "amqp." ) { if ( returnedValues.count(identifier)==0 ) { QPID_LOG(debug, "Selector lookup special identifier: " << identifier); - returnedValues[identifier] = specialValue(msg, identifier.substr(5)); + returnedValues[identifier] = specialValue(identifier.substr(5)); } } else if (!valuesLookedup) { QPID_LOG(debug, "Selector lookup triggered by: " << identifier); diff --git a/qpid/cpp/src/qpid/broker/amqp/Message.h b/qpid/cpp/src/qpid/broker/amqp/Message.h index aaa6c1b9f4..cbf8669fc1 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Message.h +++ b/qpid/cpp/src/qpid/broker/amqp/Message.h @@ -51,10 +51,10 @@ class Message : public qpid::broker::Message::Encoding, private qpid::amqp::Mess std::string getContent() const; void processProperties(qpid::amqp::MapHandler&) const; std::string getUserId() const; - qpid::amqp::MessageId getMessageId() const; - qpid::amqp::CharSequence getReplyTo() const; qpid::amqp::MessageId getCorrelationId() const; + + qpid::amqp::CharSequence getReplyTo() const; qpid::amqp::CharSequence getContentType() const; qpid::amqp::CharSequence getContentEncoding() const; diff --git a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp index 846deb92f5..e04d44d2c8 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp @@ -172,6 +172,8 @@ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation qpid::amqp::MessageId mid = message->getMessageId(); qpid::framing::Uuid uuid; switch (mid.type) { + case qpid::amqp::MessageId::NONE: + break; case qpid::amqp::MessageId::UUID: case qpid::amqp::MessageId::BYTES: if (mid.value.bytes.size == 0) break; @@ -183,6 +185,8 @@ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation qpid::amqp::MessageId cid = message->getCorrelationId(); switch (cid.type) { + case qpid::amqp::MessageId::NONE: + break; case qpid::amqp::MessageId::UUID: assert(cid.value.bytes.size = 16); props->setCorrelationId(qpid::framing::Uuid(cid.value.bytes.data).str()); diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp index 9fb263e3d3..e3a967796b 100644 --- a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp +++ b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp @@ -23,6 +23,7 @@ #include "qpid/amqp/CharSequence.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/amqp/MapHandler.h" +#include "qpid/amqp/MessageId.h" #include "qpid/broker/Message.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/MessageProperties.h" @@ -67,6 +68,28 @@ std::string MessageTransfer::getAnnotationAsString(const std::string& key) const } std::string MessageTransfer::getPropertyAsString(const std::string& key) const { return getAnnotationAsString(key); } +amqp::MessageId MessageTransfer::getMessageId() const +{ + const qpid::framing::MessageProperties* mp = getProperties<qpid::framing::MessageProperties>(); + + amqp::MessageId r; + if (mp->hasMessageId()) { + r.set(amqp::CharSequence::create(&mp->getMessageId()[0],16), types::VAR_UUID); + } + return r; +} + +amqp::MessageId MessageTransfer::getCorrelationId() const +{ + const qpid::framing::MessageProperties* mp = getProperties<qpid::framing::MessageProperties>(); + + amqp::MessageId r; + if (mp->hasCorrelationId()) { + r.set(amqp::CharSequence::create(mp->getCorrelationId()), types::VAR_STRING); + } + return r; +} + bool MessageTransfer::getTtl(uint64_t& result) const { const qpid::framing::DeliveryProperties* dp = getProperties<qpid::framing::DeliveryProperties>(); diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h index 3d6e3a2906..d32c0d07fd 100644 --- a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h +++ b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h @@ -45,6 +45,8 @@ class MessageTransfer : public qpid::broker::Message::Encoding, public qpid::bro bool isPersistent() const; uint8_t getPriority() const; uint64_t getContentSize() const; + qpid::amqp::MessageId getMessageId() const; + qpid::amqp::MessageId getCorrelationId() const; std::string getPropertyAsString(const std::string& key) const; std::string getAnnotationAsString(const std::string& key) const; bool getTtl(uint64_t&) const; |