diff options
| -rw-r--r-- | qpid/cpp/src/qpid/amqp/CharSequence.cpp | 18 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/amqp/CharSequence.h | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/amqp/Decoder.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/amqp/MessageId.cpp | 18 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/amqp/MessageId.h | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Message.h | 18 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Selector.cpp | 82 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Message.h | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Translation.cpp | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp | 23 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h | 2 |
11 files changed, 136 insertions, 41 deletions
diff --git a/qpid/cpp/src/qpid/amqp/CharSequence.cpp b/qpid/cpp/src/qpid/amqp/CharSequence.cpp index 857ec7e587..7e433bd26e 100644 --- a/qpid/cpp/src/qpid/amqp/CharSequence.cpp +++ b/qpid/cpp/src/qpid/amqp/CharSequence.cpp @@ -38,10 +38,28 @@ std::string CharSequence::str() const return std::string(data, size); } +CharSequence CharSequence::create() +{ + CharSequence c = {0, 0}; + return c; +} + +CharSequence CharSequence::create(const std::string& str) +{ + CharSequence c = {str.data(), str.size()}; + return c; +} + CharSequence CharSequence::create(const char* data, size_t size) { CharSequence c = {data, size}; return c; } +CharSequence CharSequence::create(const unsigned char* data, size_t size) +{ + CharSequence c = {reinterpret_cast<const char*>(data), size}; + return c; +} + }} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/CharSequence.h b/qpid/cpp/src/qpid/amqp/CharSequence.h index 307a4b1537..752097c913 100644 --- a/qpid/cpp/src/qpid/amqp/CharSequence.h +++ b/qpid/cpp/src/qpid/amqp/CharSequence.h @@ -42,7 +42,10 @@ struct CharSequence QPID_COMMON_EXTERN std::string str() const; QPID_COMMON_EXTERN void init(); + QPID_COMMON_EXTERN static CharSequence create(); + QPID_COMMON_EXTERN static CharSequence create(const std::string& str); QPID_COMMON_EXTERN static CharSequence create(const char* data, size_t size); + QPID_COMMON_EXTERN static CharSequence create(const unsigned char* data, size_t size); }; }} // namespace qpid::amqp diff --git a/qpid/cpp/src/qpid/amqp/Decoder.cpp b/qpid/cpp/src/qpid/amqp/Decoder.cpp index 9c577e6c92..1d5abc99c7 100644 --- a/qpid/cpp/src/qpid/amqp/Decoder.cpp +++ b/qpid/cpp/src/qpid/amqp/Decoder.cpp @@ -263,7 +263,7 @@ void Decoder::readValue(Reader& reader, uint8_t code, const Descriptor* descript break; case LIST0: - reader.onStartList(0, CharSequence::create(0, 0), descriptor); + reader.onStartList(0, CharSequence::create(), descriptor); reader.onEndList(0, descriptor); break; case LIST8: diff --git a/qpid/cpp/src/qpid/amqp/MessageId.cpp b/qpid/cpp/src/qpid/amqp/MessageId.cpp index e6f6f4a231..25e9c96ef6 100644 --- a/qpid/cpp/src/qpid/amqp/MessageId.cpp +++ b/qpid/cpp/src/qpid/amqp/MessageId.cpp @@ -25,14 +25,14 @@ namespace qpid { namespace amqp { -MessageId::MessageId() : type(BYTES) +MessageId::MessageId() : type(NONE) { - value.bytes.data = 0; - value.bytes.size = 0; } void MessageId::assign(std::string& s) const { switch (type) { + case NONE: + s = std::string(); case BYTES: if (value.bytes) s.assign(value.bytes.data, value.bytes.size); break; @@ -45,6 +45,18 @@ void MessageId::assign(std::string& s) const } } +MessageId::operator bool() const +{ + return type!=NONE; +} + +std::string MessageId::str() const +{ + std::string s; + assign(s); + return s; +} + void MessageId::set(qpid::amqp::CharSequence bytes, qpid::types::VariantType t) { switch (t) { diff --git a/qpid/cpp/src/qpid/amqp/MessageId.h b/qpid/cpp/src/qpid/amqp/MessageId.h index ee440f3011..4505469148 100644 --- a/qpid/cpp/src/qpid/amqp/MessageId.h +++ b/qpid/cpp/src/qpid/amqp/MessageId.h @@ -37,12 +37,15 @@ struct MessageId } value; enum { + NONE, BYTES, UUID, ULONG } type; QPID_COMMON_EXTERN MessageId(); + QPID_COMMON_EXTERN operator bool() const; + QPID_COMMON_EXTERN std::string str() const; QPID_COMMON_EXTERN void assign(std::string&) const; QPID_COMMON_EXTERN void set(qpid::amqp::CharSequence bytes, qpid::types::VariantType t); QPID_COMMON_EXTERN void set(uint64_t ulong); diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h index 8f12b06a9d..03ef21e77a 100644 --- a/qpid/cpp/src/qpid/broker/Message.h +++ b/qpid/cpp/src/qpid/broker/Message.h @@ -22,22 +22,24 @@ * */ -#include "qpid/broker/BrokerImportExport.h" -#include "qpid/sys/Time.h" -#include "qpid/types/Variant.h" +#include "qpid/RefCounted.h" +#include "qpid/broker/ExpiryPolicy.h" +#include "qpid/broker/PersistableMessage.h" //TODO: move the following out of framing or replace it #include "qpid/framing/SequenceNumber.h" +#include "qpid/sys/Time.h" +#include "qpid/types/Variant.h" + +#include "qpid/broker/BrokerImportExport.h" + #include <string> #include <vector> - -#include "qpid/RefCounted.h" #include <boost/intrusive_ptr.hpp> -#include "qpid/broker/ExpiryPolicy.h" -#include "qpid/broker/PersistableMessage.h" namespace qpid { namespace amqp { class MapHandler; +class MessageId; } namespace management { @@ -67,6 +69,8 @@ public: virtual bool isPersistent() const = 0; virtual uint8_t getPriority() const = 0; virtual uint64_t getContentSize() const = 0; + virtual qpid::amqp::MessageId getMessageId() const = 0; + virtual qpid::amqp::MessageId getCorrelationId() const = 0; virtual std::string getPropertyAsString(const std::string& key) const = 0; virtual std::string getAnnotationAsString(const std::string& key) const = 0; virtual bool getTtl(uint64_t&) const = 0; diff --git a/qpid/cpp/src/qpid/broker/Selector.cpp b/qpid/cpp/src/qpid/broker/Selector.cpp index 129787171d..0c25e7338b 100644 --- a/qpid/cpp/src/qpid/broker/Selector.cpp +++ b/qpid/cpp/src/qpid/broker/Selector.cpp @@ -22,8 +22,9 @@ #include "qpid/broker/Selector.h" #include "qpid/amqp/CharSequence.h" -#include "qpid/broker/Message.h" #include "qpid/amqp/MapHandler.h" +#include "qpid/amqp/MessageId.h" +#include "qpid/broker/Message.h" #include "qpid/broker/SelectorExpression.h" #include "qpid/broker/SelectorValue.h" #include "qpid/log/Statement.h" @@ -43,6 +44,7 @@ using std::string; using qpid::sys::unordered_map; using qpid::amqp::CharSequence; using qpid::amqp::MapHandler; +using qpid::amqp::MessageId; /** * Identifier (amqp.) | JMS... | amqp 1.0 equivalent @@ -63,32 +65,6 @@ const string EMPTY; const string PERSISTENT("PERSISTENT"); const string NON_PERSISTENT("NON_PERSISTENT"); -const Value specialValue(const Message& msg, const string& id) -{ - // TODO: Just use a simple if chain for now - improve this later - if ( id=="delivery_mode" ) { - return msg.getEncoding().isPersistent() ? PERSISTENT : NON_PERSISTENT; - } else if ( id=="redelivered" ) { - return msg.getDeliveryCount()>0 ? true : false; - } else if ( id=="priority" ) { - return int64_t(msg.getPriority()); - } else if ( id=="correlation_id" ) { - return EMPTY; // Needs an indirection in getEncoding(). - } else if ( id=="message_id" ) { - return EMPTY; // Needs an indirection in getEncoding(). - } else if ( id=="to" ) { - return EMPTY; // This is good for 0-10, not sure about 1.0 - } else if ( id=="reply_to" ) { - return EMPTY; // Needs an indirection in getEncoding(). - } else if ( id=="absolute_expiry_time" ) { - return EMPTY; // Needs an indirection in getEncoding(). - } else if ( id=="creation_time" ) { - return EMPTY; // Needs an indirection in getEncoding(). - } else if ( id=="jms_type" ) { - return EMPTY; - } else return Value(); -} - class MessageSelectorEnv : public SelectorEnv { const Message& msg; mutable boost::ptr_vector<string> returnedStrings; @@ -96,6 +72,7 @@ class MessageSelectorEnv : public SelectorEnv { mutable bool valuesLookedup; const Value& value(const string&) const; + const Value specialValue(const string&) const; public: MessageSelectorEnv(const Message&); @@ -107,6 +84,55 @@ MessageSelectorEnv::MessageSelectorEnv(const Message& m) : { } +const Value MessageSelectorEnv::specialValue(const string& id) const +{ + Value v; + // TODO: Just use a simple if chain for now - improve this later + if ( id=="delivery_mode" ) { + v = msg.getEncoding().isPersistent() ? PERSISTENT : NON_PERSISTENT; + } else if ( id=="redelivered" ) { + v = msg.getDeliveryCount()>0 ? true : false; + } else if ( id=="priority" ) { + v = int64_t(msg.getPriority()); + } else if ( id=="correlation_id" ) { + MessageId cId = msg.getEncoding().getCorrelationId(); + if (cId) { + returnedStrings.push_back(new string(cId.str())); + v = returnedStrings[returnedStrings.size()-1]; + } + } else if ( id=="message_id" ) { + MessageId mId = msg.getEncoding().getMessageId(); + if (mId) { + returnedStrings.push_back(new string(mId.str())); + v = returnedStrings[returnedStrings.size()-1]; + } + } else if ( id=="to" ) { + v = EMPTY; // Hard to get this correct for both 1.0 and 0-10 + } else if ( id=="reply_to" ) { + v = EMPTY; // Hard to get this correct for both 1.0 and 0-10 + } else if ( id=="absolute_expiry_time" ) { + qpid::sys::AbsTime expiry = msg.getExpiration(); + // Java property has value of 0 for no expiry + v = (expiry==qpid::sys::FAR_FUTURE) ? 0 + : qpid::sys::Duration(qpid::sys::AbsTime::Epoch(), expiry) / qpid::sys::TIME_MSEC; + } else if ( id=="creation_time" ) { + // Use the time put on queue (if it is enabled) as 0-10 has no standard way to get message + // creation time and we're not paying attention to the 1.0 creation time yet. + v = int64_t(msg.getTimestamp() * 1000); // getTimestamp() returns time in seconds we need milliseconds + } else if ( id=="jms_type" ) { + // Currently we can't distinguish between an empty JMSType and no JMSType + // We'll assume for now that setting an empty JMSType doesn't make a lot of sense + const string jmsType = msg.getAnnotation("jms-type").asString(); + if ( !jmsType.empty() ) { + returnedStrings.push_back(new string(jmsType)); + v = returnedStrings[returnedStrings.size()-1]; + } + } else { + v = Value(); + } + return v; +} + struct ValueHandler : public broker::MapHandler { unordered_map<string, Value>& values; boost::ptr_vector<string>& strings; @@ -152,7 +178,7 @@ const Value& MessageSelectorEnv::value(const string& identifier) const if ( identifier.substr(0, 5) == "amqp." ) { if ( returnedValues.count(identifier)==0 ) { QPID_LOG(debug, "Selector lookup special identifier: " << identifier); - returnedValues[identifier] = specialValue(msg, identifier.substr(5)); + returnedValues[identifier] = specialValue(identifier.substr(5)); } } else if (!valuesLookedup) { QPID_LOG(debug, "Selector lookup triggered by: " << identifier); diff --git a/qpid/cpp/src/qpid/broker/amqp/Message.h b/qpid/cpp/src/qpid/broker/amqp/Message.h index aaa6c1b9f4..cbf8669fc1 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Message.h +++ b/qpid/cpp/src/qpid/broker/amqp/Message.h @@ -51,10 +51,10 @@ class Message : public qpid::broker::Message::Encoding, private qpid::amqp::Mess std::string getContent() const; void processProperties(qpid::amqp::MapHandler&) const; std::string getUserId() const; - qpid::amqp::MessageId getMessageId() const; - qpid::amqp::CharSequence getReplyTo() const; qpid::amqp::MessageId getCorrelationId() const; + + qpid::amqp::CharSequence getReplyTo() const; qpid::amqp::CharSequence getContentType() const; qpid::amqp::CharSequence getContentEncoding() const; diff --git a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp index 846deb92f5..e04d44d2c8 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp @@ -172,6 +172,8 @@ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation qpid::amqp::MessageId mid = message->getMessageId(); qpid::framing::Uuid uuid; switch (mid.type) { + case qpid::amqp::MessageId::NONE: + break; case qpid::amqp::MessageId::UUID: case qpid::amqp::MessageId::BYTES: if (mid.value.bytes.size == 0) break; @@ -183,6 +185,8 @@ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation qpid::amqp::MessageId cid = message->getCorrelationId(); switch (cid.type) { + case qpid::amqp::MessageId::NONE: + break; case qpid::amqp::MessageId::UUID: assert(cid.value.bytes.size = 16); props->setCorrelationId(qpid::framing::Uuid(cid.value.bytes.data).str()); diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp index 9fb263e3d3..e3a967796b 100644 --- a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp +++ b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp @@ -23,6 +23,7 @@ #include "qpid/amqp/CharSequence.h" #include "qpid/amqp_0_10/Codecs.h" #include "qpid/amqp/MapHandler.h" +#include "qpid/amqp/MessageId.h" #include "qpid/broker/Message.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/framing/MessageProperties.h" @@ -67,6 +68,28 @@ std::string MessageTransfer::getAnnotationAsString(const std::string& key) const } std::string MessageTransfer::getPropertyAsString(const std::string& key) const { return getAnnotationAsString(key); } +amqp::MessageId MessageTransfer::getMessageId() const +{ + const qpid::framing::MessageProperties* mp = getProperties<qpid::framing::MessageProperties>(); + + amqp::MessageId r; + if (mp->hasMessageId()) { + r.set(amqp::CharSequence::create(&mp->getMessageId()[0],16), types::VAR_UUID); + } + return r; +} + +amqp::MessageId MessageTransfer::getCorrelationId() const +{ + const qpid::framing::MessageProperties* mp = getProperties<qpid::framing::MessageProperties>(); + + amqp::MessageId r; + if (mp->hasCorrelationId()) { + r.set(amqp::CharSequence::create(mp->getCorrelationId()), types::VAR_STRING); + } + return r; +} + bool MessageTransfer::getTtl(uint64_t& result) const { const qpid::framing::DeliveryProperties* dp = getProperties<qpid::framing::DeliveryProperties>(); diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h index 3d6e3a2906..d32c0d07fd 100644 --- a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h +++ b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h @@ -45,6 +45,8 @@ class MessageTransfer : public qpid::broker::Message::Encoding, public qpid::bro bool isPersistent() const; uint8_t getPriority() const; uint64_t getContentSize() const; + qpid::amqp::MessageId getMessageId() const; + qpid::amqp::MessageId getCorrelationId() const; std::string getPropertyAsString(const std::string& key) const; std::string getAnnotationAsString(const std::string& key) const; bool getTtl(uint64_t&) const; |
