summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/qpid/amqp/CharSequence.cpp18
-rw-r--r--qpid/cpp/src/qpid/amqp/CharSequence.h3
-rw-r--r--qpid/cpp/src/qpid/amqp/Decoder.cpp2
-rw-r--r--qpid/cpp/src/qpid/amqp/MessageId.cpp18
-rw-r--r--qpid/cpp/src/qpid/amqp/MessageId.h3
-rw-r--r--qpid/cpp/src/qpid/broker/Message.h18
-rw-r--r--qpid/cpp/src/qpid/broker/Selector.cpp82
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Message.h4
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Translation.cpp4
-rw-r--r--qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp23
-rw-r--r--qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h2
11 files changed, 136 insertions, 41 deletions
diff --git a/qpid/cpp/src/qpid/amqp/CharSequence.cpp b/qpid/cpp/src/qpid/amqp/CharSequence.cpp
index 857ec7e587..7e433bd26e 100644
--- a/qpid/cpp/src/qpid/amqp/CharSequence.cpp
+++ b/qpid/cpp/src/qpid/amqp/CharSequence.cpp
@@ -38,10 +38,28 @@ std::string CharSequence::str() const
return std::string(data, size);
}
+CharSequence CharSequence::create()
+{
+ CharSequence c = {0, 0};
+ return c;
+}
+
+CharSequence CharSequence::create(const std::string& str)
+{
+ CharSequence c = {str.data(), str.size()};
+ return c;
+}
+
CharSequence CharSequence::create(const char* data, size_t size)
{
CharSequence c = {data, size};
return c;
}
+CharSequence CharSequence::create(const unsigned char* data, size_t size)
+{
+ CharSequence c = {reinterpret_cast<const char*>(data), size};
+ return c;
+}
+
}} // namespace qpid::amqp
diff --git a/qpid/cpp/src/qpid/amqp/CharSequence.h b/qpid/cpp/src/qpid/amqp/CharSequence.h
index 307a4b1537..752097c913 100644
--- a/qpid/cpp/src/qpid/amqp/CharSequence.h
+++ b/qpid/cpp/src/qpid/amqp/CharSequence.h
@@ -42,7 +42,10 @@ struct CharSequence
QPID_COMMON_EXTERN std::string str() const;
QPID_COMMON_EXTERN void init();
+ QPID_COMMON_EXTERN static CharSequence create();
+ QPID_COMMON_EXTERN static CharSequence create(const std::string& str);
QPID_COMMON_EXTERN static CharSequence create(const char* data, size_t size);
+ QPID_COMMON_EXTERN static CharSequence create(const unsigned char* data, size_t size);
};
}} // namespace qpid::amqp
diff --git a/qpid/cpp/src/qpid/amqp/Decoder.cpp b/qpid/cpp/src/qpid/amqp/Decoder.cpp
index 9c577e6c92..1d5abc99c7 100644
--- a/qpid/cpp/src/qpid/amqp/Decoder.cpp
+++ b/qpid/cpp/src/qpid/amqp/Decoder.cpp
@@ -263,7 +263,7 @@ void Decoder::readValue(Reader& reader, uint8_t code, const Descriptor* descript
break;
case LIST0:
- reader.onStartList(0, CharSequence::create(0, 0), descriptor);
+ reader.onStartList(0, CharSequence::create(), descriptor);
reader.onEndList(0, descriptor);
break;
case LIST8:
diff --git a/qpid/cpp/src/qpid/amqp/MessageId.cpp b/qpid/cpp/src/qpid/amqp/MessageId.cpp
index e6f6f4a231..25e9c96ef6 100644
--- a/qpid/cpp/src/qpid/amqp/MessageId.cpp
+++ b/qpid/cpp/src/qpid/amqp/MessageId.cpp
@@ -25,14 +25,14 @@
namespace qpid {
namespace amqp {
-MessageId::MessageId() : type(BYTES)
+MessageId::MessageId() : type(NONE)
{
- value.bytes.data = 0;
- value.bytes.size = 0;
}
void MessageId::assign(std::string& s) const
{
switch (type) {
+ case NONE:
+ s = std::string();
case BYTES:
if (value.bytes) s.assign(value.bytes.data, value.bytes.size);
break;
@@ -45,6 +45,18 @@ void MessageId::assign(std::string& s) const
}
}
+MessageId::operator bool() const
+{
+ return type!=NONE;
+}
+
+std::string MessageId::str() const
+{
+ std::string s;
+ assign(s);
+ return s;
+}
+
void MessageId::set(qpid::amqp::CharSequence bytes, qpid::types::VariantType t)
{
switch (t) {
diff --git a/qpid/cpp/src/qpid/amqp/MessageId.h b/qpid/cpp/src/qpid/amqp/MessageId.h
index ee440f3011..4505469148 100644
--- a/qpid/cpp/src/qpid/amqp/MessageId.h
+++ b/qpid/cpp/src/qpid/amqp/MessageId.h
@@ -37,12 +37,15 @@ struct MessageId
} value;
enum
{
+ NONE,
BYTES,
UUID,
ULONG
} type;
QPID_COMMON_EXTERN MessageId();
+ QPID_COMMON_EXTERN operator bool() const;
+ QPID_COMMON_EXTERN std::string str() const;
QPID_COMMON_EXTERN void assign(std::string&) const;
QPID_COMMON_EXTERN void set(qpid::amqp::CharSequence bytes, qpid::types::VariantType t);
QPID_COMMON_EXTERN void set(uint64_t ulong);
diff --git a/qpid/cpp/src/qpid/broker/Message.h b/qpid/cpp/src/qpid/broker/Message.h
index 8f12b06a9d..03ef21e77a 100644
--- a/qpid/cpp/src/qpid/broker/Message.h
+++ b/qpid/cpp/src/qpid/broker/Message.h
@@ -22,22 +22,24 @@
*
*/
-#include "qpid/broker/BrokerImportExport.h"
-#include "qpid/sys/Time.h"
-#include "qpid/types/Variant.h"
+#include "qpid/RefCounted.h"
+#include "qpid/broker/ExpiryPolicy.h"
+#include "qpid/broker/PersistableMessage.h"
//TODO: move the following out of framing or replace it
#include "qpid/framing/SequenceNumber.h"
+#include "qpid/sys/Time.h"
+#include "qpid/types/Variant.h"
+
+#include "qpid/broker/BrokerImportExport.h"
+
#include <string>
#include <vector>
-
-#include "qpid/RefCounted.h"
#include <boost/intrusive_ptr.hpp>
-#include "qpid/broker/ExpiryPolicy.h"
-#include "qpid/broker/PersistableMessage.h"
namespace qpid {
namespace amqp {
class MapHandler;
+class MessageId;
}
namespace management {
@@ -67,6 +69,8 @@ public:
virtual bool isPersistent() const = 0;
virtual uint8_t getPriority() const = 0;
virtual uint64_t getContentSize() const = 0;
+ virtual qpid::amqp::MessageId getMessageId() const = 0;
+ virtual qpid::amqp::MessageId getCorrelationId() const = 0;
virtual std::string getPropertyAsString(const std::string& key) const = 0;
virtual std::string getAnnotationAsString(const std::string& key) const = 0;
virtual bool getTtl(uint64_t&) const = 0;
diff --git a/qpid/cpp/src/qpid/broker/Selector.cpp b/qpid/cpp/src/qpid/broker/Selector.cpp
index 129787171d..0c25e7338b 100644
--- a/qpid/cpp/src/qpid/broker/Selector.cpp
+++ b/qpid/cpp/src/qpid/broker/Selector.cpp
@@ -22,8 +22,9 @@
#include "qpid/broker/Selector.h"
#include "qpid/amqp/CharSequence.h"
-#include "qpid/broker/Message.h"
#include "qpid/amqp/MapHandler.h"
+#include "qpid/amqp/MessageId.h"
+#include "qpid/broker/Message.h"
#include "qpid/broker/SelectorExpression.h"
#include "qpid/broker/SelectorValue.h"
#include "qpid/log/Statement.h"
@@ -43,6 +44,7 @@ using std::string;
using qpid::sys::unordered_map;
using qpid::amqp::CharSequence;
using qpid::amqp::MapHandler;
+using qpid::amqp::MessageId;
/**
* Identifier (amqp.) | JMS... | amqp 1.0 equivalent
@@ -63,32 +65,6 @@ const string EMPTY;
const string PERSISTENT("PERSISTENT");
const string NON_PERSISTENT("NON_PERSISTENT");
-const Value specialValue(const Message& msg, const string& id)
-{
- // TODO: Just use a simple if chain for now - improve this later
- if ( id=="delivery_mode" ) {
- return msg.getEncoding().isPersistent() ? PERSISTENT : NON_PERSISTENT;
- } else if ( id=="redelivered" ) {
- return msg.getDeliveryCount()>0 ? true : false;
- } else if ( id=="priority" ) {
- return int64_t(msg.getPriority());
- } else if ( id=="correlation_id" ) {
- return EMPTY; // Needs an indirection in getEncoding().
- } else if ( id=="message_id" ) {
- return EMPTY; // Needs an indirection in getEncoding().
- } else if ( id=="to" ) {
- return EMPTY; // This is good for 0-10, not sure about 1.0
- } else if ( id=="reply_to" ) {
- return EMPTY; // Needs an indirection in getEncoding().
- } else if ( id=="absolute_expiry_time" ) {
- return EMPTY; // Needs an indirection in getEncoding().
- } else if ( id=="creation_time" ) {
- return EMPTY; // Needs an indirection in getEncoding().
- } else if ( id=="jms_type" ) {
- return EMPTY;
- } else return Value();
-}
-
class MessageSelectorEnv : public SelectorEnv {
const Message& msg;
mutable boost::ptr_vector<string> returnedStrings;
@@ -96,6 +72,7 @@ class MessageSelectorEnv : public SelectorEnv {
mutable bool valuesLookedup;
const Value& value(const string&) const;
+ const Value specialValue(const string&) const;
public:
MessageSelectorEnv(const Message&);
@@ -107,6 +84,55 @@ MessageSelectorEnv::MessageSelectorEnv(const Message& m) :
{
}
+const Value MessageSelectorEnv::specialValue(const string& id) const
+{
+ Value v;
+ // TODO: Just use a simple if chain for now - improve this later
+ if ( id=="delivery_mode" ) {
+ v = msg.getEncoding().isPersistent() ? PERSISTENT : NON_PERSISTENT;
+ } else if ( id=="redelivered" ) {
+ v = msg.getDeliveryCount()>0 ? true : false;
+ } else if ( id=="priority" ) {
+ v = int64_t(msg.getPriority());
+ } else if ( id=="correlation_id" ) {
+ MessageId cId = msg.getEncoding().getCorrelationId();
+ if (cId) {
+ returnedStrings.push_back(new string(cId.str()));
+ v = returnedStrings[returnedStrings.size()-1];
+ }
+ } else if ( id=="message_id" ) {
+ MessageId mId = msg.getEncoding().getMessageId();
+ if (mId) {
+ returnedStrings.push_back(new string(mId.str()));
+ v = returnedStrings[returnedStrings.size()-1];
+ }
+ } else if ( id=="to" ) {
+ v = EMPTY; // Hard to get this correct for both 1.0 and 0-10
+ } else if ( id=="reply_to" ) {
+ v = EMPTY; // Hard to get this correct for both 1.0 and 0-10
+ } else if ( id=="absolute_expiry_time" ) {
+ qpid::sys::AbsTime expiry = msg.getExpiration();
+ // Java property has value of 0 for no expiry
+ v = (expiry==qpid::sys::FAR_FUTURE) ? 0
+ : qpid::sys::Duration(qpid::sys::AbsTime::Epoch(), expiry) / qpid::sys::TIME_MSEC;
+ } else if ( id=="creation_time" ) {
+ // Use the time put on queue (if it is enabled) as 0-10 has no standard way to get message
+ // creation time and we're not paying attention to the 1.0 creation time yet.
+ v = int64_t(msg.getTimestamp() * 1000); // getTimestamp() returns time in seconds we need milliseconds
+ } else if ( id=="jms_type" ) {
+ // Currently we can't distinguish between an empty JMSType and no JMSType
+ // We'll assume for now that setting an empty JMSType doesn't make a lot of sense
+ const string jmsType = msg.getAnnotation("jms-type").asString();
+ if ( !jmsType.empty() ) {
+ returnedStrings.push_back(new string(jmsType));
+ v = returnedStrings[returnedStrings.size()-1];
+ }
+ } else {
+ v = Value();
+ }
+ return v;
+}
+
struct ValueHandler : public broker::MapHandler {
unordered_map<string, Value>& values;
boost::ptr_vector<string>& strings;
@@ -152,7 +178,7 @@ const Value& MessageSelectorEnv::value(const string& identifier) const
if ( identifier.substr(0, 5) == "amqp." ) {
if ( returnedValues.count(identifier)==0 ) {
QPID_LOG(debug, "Selector lookup special identifier: " << identifier);
- returnedValues[identifier] = specialValue(msg, identifier.substr(5));
+ returnedValues[identifier] = specialValue(identifier.substr(5));
}
} else if (!valuesLookedup) {
QPID_LOG(debug, "Selector lookup triggered by: " << identifier);
diff --git a/qpid/cpp/src/qpid/broker/amqp/Message.h b/qpid/cpp/src/qpid/broker/amqp/Message.h
index aaa6c1b9f4..cbf8669fc1 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Message.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Message.h
@@ -51,10 +51,10 @@ class Message : public qpid::broker::Message::Encoding, private qpid::amqp::Mess
std::string getContent() const;
void processProperties(qpid::amqp::MapHandler&) const;
std::string getUserId() const;
-
qpid::amqp::MessageId getMessageId() const;
- qpid::amqp::CharSequence getReplyTo() const;
qpid::amqp::MessageId getCorrelationId() const;
+
+ qpid::amqp::CharSequence getReplyTo() const;
qpid::amqp::CharSequence getContentType() const;
qpid::amqp::CharSequence getContentEncoding() const;
diff --git a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
index 846deb92f5..e04d44d2c8 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
@@ -172,6 +172,8 @@ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation
qpid::amqp::MessageId mid = message->getMessageId();
qpid::framing::Uuid uuid;
switch (mid.type) {
+ case qpid::amqp::MessageId::NONE:
+ break;
case qpid::amqp::MessageId::UUID:
case qpid::amqp::MessageId::BYTES:
if (mid.value.bytes.size == 0) break;
@@ -183,6 +185,8 @@ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation
qpid::amqp::MessageId cid = message->getCorrelationId();
switch (cid.type) {
+ case qpid::amqp::MessageId::NONE:
+ break;
case qpid::amqp::MessageId::UUID:
assert(cid.value.bytes.size = 16);
props->setCorrelationId(qpid::framing::Uuid(cid.value.bytes.data).str());
diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
index 9fb263e3d3..e3a967796b 100644
--- a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.cpp
@@ -23,6 +23,7 @@
#include "qpid/amqp/CharSequence.h"
#include "qpid/amqp_0_10/Codecs.h"
#include "qpid/amqp/MapHandler.h"
+#include "qpid/amqp/MessageId.h"
#include "qpid/broker/Message.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/MessageProperties.h"
@@ -67,6 +68,28 @@ std::string MessageTransfer::getAnnotationAsString(const std::string& key) const
}
std::string MessageTransfer::getPropertyAsString(const std::string& key) const { return getAnnotationAsString(key); }
+amqp::MessageId MessageTransfer::getMessageId() const
+{
+ const qpid::framing::MessageProperties* mp = getProperties<qpid::framing::MessageProperties>();
+
+ amqp::MessageId r;
+ if (mp->hasMessageId()) {
+ r.set(amqp::CharSequence::create(&mp->getMessageId()[0],16), types::VAR_UUID);
+ }
+ return r;
+}
+
+amqp::MessageId MessageTransfer::getCorrelationId() const
+{
+ const qpid::framing::MessageProperties* mp = getProperties<qpid::framing::MessageProperties>();
+
+ amqp::MessageId r;
+ if (mp->hasCorrelationId()) {
+ r.set(amqp::CharSequence::create(mp->getCorrelationId()), types::VAR_STRING);
+ }
+ return r;
+}
+
bool MessageTransfer::getTtl(uint64_t& result) const
{
const qpid::framing::DeliveryProperties* dp = getProperties<qpid::framing::DeliveryProperties>();
diff --git a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
index 3d6e3a2906..d32c0d07fd 100644
--- a/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
+++ b/qpid/cpp/src/qpid/broker/amqp_0_10/MessageTransfer.h
@@ -45,6 +45,8 @@ class MessageTransfer : public qpid::broker::Message::Encoding, public qpid::bro
bool isPersistent() const;
uint8_t getPriority() const;
uint64_t getContentSize() const;
+ qpid::amqp::MessageId getMessageId() const;
+ qpid::amqp::MessageId getCorrelationId() const;
std::string getPropertyAsString(const std::string& key) const;
std::string getAnnotationAsString(const std::string& key) const;
bool getTtl(uint64_t&) const;