summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Stitcher <astitcher@apache.org>2013-07-05 16:06:14 +0000
committerAndrew Stitcher <astitcher@apache.org>2013-07-05 16:06:14 +0000
commit0792650df72a44327dca0f247b2dd7ab367ca38f (patch)
tree5176e54ba2b4f4dcd8dfbfd5b0b8aa07049ace47
parent77171f498c5c7dca09448ce8168c3bd30bfe3825 (diff)
downloadqpid-python-0792650df72a44327dca0f247b2dd7ab367ca38f.tar.gz
QPID-4627: Implement most of the remaining selector special identifiers
Implemented: message_id, correlation_id, jms_type, creation_time, absolute_expiry_time There are a couple of caveats: The easily available way to get jms_type doesn't distinguish between an empty string and the property not being sent at all. So we treat this case as property not set as that seems like it will get most cases correct (why bother to send an empty jms_type?). The creation_time property is currently implemented as the time the message was put on the queue (if enabled in the broker) as amqp 0_10 has no standard way to indicate the creation time and we're not currently holding the creation time for amqp 1.0 messages. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1500052 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--qpid/cpp/src/qpid/amqp/CharSequence.cpp18
-rw-r--r--qpid/cpp/src/qpid/amqp/CharSequence.h3
-rw-r--r--qpid/cpp/src/qpid/amqp/Decoder.cpp2
-rw-r--r--qpid/cpp/src/qpid/amqp/MessageId.cpp18
-rw-r--r--qpid/cpp/src/qpid/amqp/MessageId.h3
-rw-r--r--qpid/cpp/src/qpid/broker/Message.h18
-rw-r--r--qpid/cpp/src/qpid/broker/Selector.cpp82
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Message.h4
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Translation.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp23
-rw-r--r--qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h2
11 files changed, 136 insertions, 41 deletions
diff --git a/qpid/cpp/src/qpid/amqp/CharSequence.cpp b/qpid/cpp/src/qpid/amqp/CharSequence.cpp
index 857ec7e587..7e433bd26e 100644
--- a/qpid/cpp/src/qpid/amqp/CharSequence.cpp
+++ b/qpid/cpp/src/qpid/amqp/CharSequence.cpp
@@ -38,10 +38,28 @@ std::string CharSequence::str() const
return std::string(data, size);
}
+CharSequence CharSequence::create()
+{
+ CharSequence c = {0, 0};
+ return c;
+}
+
+CharSequence CharSequence::create(const std::string& str)
+{
+ CharSequence c = {str.data(), str.size()};
+ return c;
+}
+
CharSequence CharSequence::create(const char* data, size_t size)
{
CharSequence c = {data, size};
return c;
}
+CharSequence CharSequence::create(const unsigned char* data, size_t size)
+{
+ CharSequence c = {reinterpret_cast<const char*>(data), size};
+ return c;
+}
+
}} // namespace qpid::amqp
diff --git a/qpid/cpp/src/qpid/amqp/CharSequence.h b/qpid/cpp/src/qpid/amqp/CharSequence.h
index 307a4b1537..752097c913 100644
--- a/qpid/cpp/src/qpid/amqp/CharSequence.h
+++ b/qpid/cpp/src/qpid/amqp/CharSequence.h
@@ -42,7 +42,10 @@ struct CharSequence
QPID_COMMON_EXTERN std::string str() const;
QPID_COMMON_EXTERN void init();
+ QPID_COMMON_EXTERN static CharSequence create();
+ QPID_COMMON_EXTERN static CharSequence create(const std::string& str);
QPID_COMMON_EXTERN static CharSequence create(const char* data, size_t size);
+ QPID_COMMON_EXTERN static CharSequence create(const unsigned char* data, size_t size);
};
}} // namespace qpid::amqp
diff --git a/qpid/cpp/src/qpid/amqp/Decoder.cpp b/qpid/cpp/src/qpid/amqp/Decoder.cpp
index 9c577e6c92..1d5abc99c7 100644
--- a/qpid/cpp/src/qpid/amqp/Decoder.cpp
+++ b/qpid/cpp/src/qpid/amqp/Decoder.cpp
@@ -263,7 +263,7 @@ void Decoder::readValue(Reader& reader, uint8_t code, const Descriptor* descript
break;
case LIST0:
- reader.onStartList(0, CharSequence::create(0, 0), descriptor);
+ reader.onStartList(0, CharSequence::create(), descriptor);
reader.onEndList(0, descriptor);
break;
case LIST8:
diff --git a/qpid/cpp/src/qpid/amqp/MessageId.cpp b/qpid/cpp/src/qpid/amqp/MessageId.cpp
index e6f6f4a231..25e9c96ef6 100644
--- a/qpid/cpp/src/qpid/amqp/MessageId.cpp
+++ b/qpid/cpp/src/qpid/amqp/MessageId.cpp
@@ -25,14 +25,14 @@
namespace qpid {
namespace amqp {
-MessageId::MessageId() : type(BYTES)
+MessageId::MessageId() : type(NONE)
{
- value.bytes.data = 0;
- value.bytes.size = 0;
}
void MessageId::assign(std::string& s) const
{
switch (type) {
+ case NONE:
+ s = std::string();
case BYTES:
if (value.bytes) s.assign(value.bytes.data, value.bytes.size);
break;
@@ -45,6 +45,18 @@ void MessageId::assign(std::string& s) const
}
}
+MessageId::operator bool() const
+{
+ return type!=NONE;
+}
+
+std::string MessageId::str() const
+{
+ std::string s;
+ assign(s);
+ return s;
+}
+
void MessageId::set(qpid::amqp::CharSequence bytes, qpid::types::VariantType t)
{
switch (t) {
diff --git a/qpid/cpp/src/qpid/amqp/MessageId.h b/qpid/cpp/src/qpid/amqp/MessageId.h
index ee440f3011..4505469148 100644
--- a/qpid/cpp/src/qpid/amqp/MessageId.h
+++ b/qpid/cpp/src/qpid/amqp/MessageId.h
@@ -37,12 +37,15 @@ struct MessageId
} value;
enum
{
+ NONE,
BYTES,
UUID,
ULONG
} type;
QPID_COMMON_EXTERN MessageId();
+ QPID_COMMON_EXTERN operator bool() const;
+ QPID_COMMON_EXTERN std::string str() const;
QPID_COMMON_EXTERN void assign(std::string&) const;
QPID_COMMON_EXTERN void set(qpid::amqp::CharSequence bytes, qpid::types::VariantType t);
QPID_COMMON_EXTERN void set(uint64_t ulong);
diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h
index 8f12b06a9d..03ef21e77a 100644
--- a/qpid/cpp/src/qpid/broker/Message.h
+++ b/qpid/cpp/src/qpid/broker/Message.h
@@ -22,22 +22,24 @@
*
*/
-#include "qpid/broker/BrokerImportExport.h"
-#include "qpid/sys/Time.h"
-#include "qpid/types/Variant.h"
+#include "qpid/RefCounted.h"
+#include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/broker/PersistableMessage.h"
//TODO: move the following out of framing or replace it
#include "qpid/framing/SequenceNumber.h"
+#include "qpid/sys/Time.h"
+#include "qpid/types/Variant.h"
+
+#include "qpid/broker/BrokerImportExport.h"
+
#include <string>
#include <vector>
-
-#include "qpid/RefCounted.h"
#include <boost/intrusive_ptr.hpp>
-#include "qpid/broker/ExpiryPolicy.h"
-#include "qpid/broker/PersistableMessage.h"
namespace qpid {
namespace amqp {
class MapHandler;
+class MessageId;
}
namespace management {
@@ -67,6 +69,8 @@ public:
virtual bool isPersistent() const = 0;
virtual uint8_t getPriority() const = 0;
virtual uint64_t getContentSize() const = 0;
+ virtual qpid::amqp::MessageId getMessageId() const = 0;
+ virtual qpid::amqp::MessageId getCorrelationId() const = 0;
virtual std::string getPropertyAsString(const std::string& key) const = 0;
virtual std::string getAnnotationAsString(const std::string& key) const = 0;
virtual bool getTtl(uint64_t&) const = 0;
diff --git a/qpid/cpp/src/qpid/broker/Selector.cpp b/qpid/cpp/src/qpid/broker/Selector.cpp
index 129787171d..0c25e7338b 100644
--- a/qpid/cpp/src/qpid/broker/Selector.cpp
+++ b/qpid/cpp/src/qpid/broker/Selector.cpp
@@ -22,8 +22,9 @@
#include "qpid/broker/Selector.h"
#include "qpid/amqp/CharSequence.h"
-#include "qpid/broker/Message.h"
#include "qpid/amqp/MapHandler.h"
+#include "qpid/amqp/MessageId.h"
+#include "qpid/broker/Message.h"
#include "qpid/broker/SelectorExpression.h"
#include "qpid/broker/SelectorValue.h"
#include "qpid/log/Statement.h"
@@ -43,6 +44,7 @@ using std::string;
using qpid::sys::unordered_map;
using qpid::amqp::CharSequence;
using qpid::amqp::MapHandler;
+using qpid::amqp::MessageId;
/**
* Identifier (amqp.) | JMS... | amqp 1.0 equivalent
@@ -63,32 +65,6 @@ const string EMPTY;
const string PERSISTENT("PERSISTENT");
const string NON_PERSISTENT("NON_PERSISTENT");
-const Value specialValue(const Message& msg, const string& id)
-{
- // TODO: Just use a simple if chain for now - improve this later
- if ( id=="delivery_mode" ) {
- return msg.getEncoding().isPersistent() ? PERSISTENT : NON_PERSISTENT;
- } else if ( id=="redelivered" ) {
- return msg.getDeliveryCount()>0 ? true : false;
- } else if ( id=="priority" ) {
- return int64_t(msg.getPriority());
- } else if ( id=="correlation_id" ) {
- return EMPTY; // Needs an indirection in getEncoding().
- } else if ( id=="message_id" ) {
- return EMPTY; // Needs an indirection in getEncoding().
- } else if ( id=="to" ) {
- return EMPTY; // This is good for 0-10, not sure about 1.0
- } else if ( id=="reply_to" ) {
- return EMPTY; // Needs an indirection in getEncoding().
- } else if ( id=="absolute_expiry_time" ) {
- return EMPTY; // Needs an indirection in getEncoding().
- } else if ( id=="creation_time" ) {
- return EMPTY; // Needs an indirection in getEncoding().
- } else if ( id=="jms_type" ) {
- return EMPTY;
- } else return Value();
-}
-
class MessageSelectorEnv : public SelectorEnv {
const Message& msg;
mutable boost::ptr_vector<string> returnedStrings;
@@ -96,6 +72,7 @@ class MessageSelectorEnv : public SelectorEnv {
mutable bool valuesLookedup;
const Value& value(const string&) const;
+ const Value specialValue(const string&) const;
public:
MessageSelectorEnv(const Message&);
@@ -107,6 +84,55 @@ MessageSelectorEnv::MessageSelectorEnv(const Message& m) :
{
}
+const Value MessageSelectorEnv::specialValue(const string& id) const
+{
+ Value v;
+ // TODO: Just use a simple if chain for now - improve this later
+ if ( id=="delivery_mode" ) {
+ v = msg.getEncoding().isPersistent() ? PERSISTENT : NON_PERSISTENT;
+ } else if ( id=="redelivered" ) {
+ v = msg.getDeliveryCount()>0 ? true : false;
+ } else if ( id=="priority" ) {
+ v = int64_t(msg.getPriority());
+ } else if ( id=="correlation_id" ) {
+ MessageId cId = msg.getEncoding().getCorrelationId();
+ if (cId) {
+ returnedStrings.push_back(new string(cId.str()));
+ v = returnedStrings[returnedStrings.size()-1];
+ }
+ } else if ( id=="message_id" ) {
+ MessageId mId = msg.getEncoding().getMessageId();
+ if (mId) {
+ returnedStrings.push_back(new string(mId.str()));
+ v = returnedStrings[returnedStrings.size()-1];
+ }
+ } else if ( id=="to" ) {
+ v = EMPTY; // Hard to get this correct for both 1.0 and 0-10
+ } else if ( id=="reply_to" ) {
+ v = EMPTY; // Hard to get this correct for both 1.0 and 0-10
+ } else if ( id=="absolute_expiry_time" ) {
+ qpid::sys::AbsTime expiry = msg.getExpiration();
+ // Java property has value of 0 for no expiry
+ v = (expiry==qpid::sys::FAR_FUTURE) ? 0
+ : qpid::sys::Duration(qpid::sys::AbsTime::Epoch(), expiry) / qpid::sys::TIME_MSEC;
+ } else if ( id=="creation_time" ) {
+ // Use the time put on queue (if it is enabled) as 0-10 has no standard way to get message
+ // creation time and we're not paying attention to the 1.0 creation time yet.
+ v = int64_t(msg.getTimestamp() * 1000); // getTimestamp() returns time in seconds we need milliseconds
+ } else if ( id=="jms_type" ) {
+ // Currently we can't distinguish between an empty JMSType and no JMSType
+ // We'll assume for now that setting an empty JMSType doesn't make a lot of sense
+ const string jmsType = msg.getAnnotation("jms-type").asString();
+ if ( !jmsType.empty() ) {
+ returnedStrings.push_back(new string(jmsType));
+ v = returnedStrings[returnedStrings.size()-1];
+ }
+ } else {
+ v = Value();
+ }
+ return v;
+}
+
struct ValueHandler : public broker::MapHandler {
unordered_map<string, Value>& values;
boost::ptr_vector<string>& strings;
@@ -152,7 +178,7 @@ const Value& MessageSelectorEnv::value(const string& identifier) const
if ( identifier.substr(0, 5) == "amqp." ) {
if ( returnedValues.count(identifier)==0 ) {
QPID_LOG(debug, "Selector lookup special identifier: " << identifier);
- returnedValues[identifier] = specialValue(msg, identifier.substr(5));
+ returnedValues[identifier] = specialValue(identifier.substr(5));
}
} else if (!valuesLookedup) {
QPID_LOG(debug, "Selector lookup triggered by: " << identifier);
diff --git a/qpid/cpp/src/qpid/broker/amqp/Message.h b/qpid/cpp/src/qpid/broker/amqp/Message.h
index aaa6c1b9f4..cbf8669fc1 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Message.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Message.h
@@ -51,10 +51,10 @@ class Message : public qpid::broker::Message::Encoding, private qpid::amqp::Mess
std::string getContent() const;
void processProperties(qpid::amqp::MapHandler&) const;
std::string getUserId() const;
-
qpid::amqp::MessageId getMessageId() const;
- qpid::amqp::CharSequence getReplyTo() const;
qpid::amqp::MessageId getCorrelationId() const;
+
+ qpid::amqp::CharSequence getReplyTo() const;
qpid::amqp::CharSequence getContentType() const;
qpid::amqp::CharSequence getContentEncoding() const;
diff --git a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
index 846deb92f5..e04d44d2c8 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
@@ -172,6 +172,8 @@ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation
qpid::amqp::MessageId mid = message->getMessageId();
qpid::framing::Uuid uuid;
switch (mid.type) {
+ case qpid::amqp::MessageId::NONE:
+ break;
case qpid::amqp::MessageId::UUID:
case qpid::amqp::MessageId::BYTES:
if (mid.value.bytes.size == 0) break;
@@ -183,6 +185,8 @@ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation
qpid::amqp::MessageId cid = message->getCorrelationId();
switch (cid.type) {
+ case qpid::amqp::MessageId::NONE:
+ break;
case qpid::amqp::MessageId::UUID:
assert(cid.value.bytes.size = 16);
props->setCorrelationId(qpid::framing::Uuid(cid.value.bytes.data).str());
diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
index 9fb263e3d3..e3a967796b 100644
--- a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
@@ -23,6 +23,7 @@
#include "qpid/amqp/CharSequence.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/amqp/MapHandler.h"
+#include "qpid/amqp/MessageId.h"
#include "qpid/broker/Message.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/MessageProperties.h"
@@ -67,6 +68,28 @@ std::string MessageTransfer::getAnnotationAsString(const std::string& key) const
}
std::string MessageTransfer::getPropertyAsString(const std::string& key) const { return getAnnotationAsString(key); }
+amqp::MessageId MessageTransfer::getMessageId() const
+{
+ const qpid::framing::MessageProperties* mp = getProperties<qpid::framing::MessageProperties>();
+
+ amqp::MessageId r;
+ if (mp->hasMessageId()) {
+ r.set(amqp::CharSequence::create(&mp->getMessageId()[0],16), types::VAR_UUID);
+ }
+ return r;
+}
+
+amqp::MessageId MessageTransfer::getCorrelationId() const
+{
+ const qpid::framing::MessageProperties* mp = getProperties<qpid::framing::MessageProperties>();
+
+ amqp::MessageId r;
+ if (mp->hasCorrelationId()) {
+ r.set(amqp::CharSequence::create(mp->getCorrelationId()), types::VAR_STRING);
+ }
+ return r;
+}
+
bool MessageTransfer::getTtl(uint64_t& result) const
{
const qpid::framing::DeliveryProperties* dp = getProperties<qpid::framing::DeliveryProperties>();
diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
index 3d6e3a2906..d32c0d07fd 100644
--- a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
+++ b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
@@ -45,6 +45,8 @@ class MessageTransfer : public qpid::broker::Message::Encoding, public qpid::bro
bool isPersistent() const;
uint8_t getPriority() const;
uint64_t getContentSize() const;
+ qpid::amqp::MessageId getMessageId() const;
+ qpid::amqp::MessageId getCorrelationId() const;
std::string getPropertyAsString(const std::string& key) const;
std::string getAnnotationAsString(const std::string& key) const;
bool getTtl(uint64_t&) const;