summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2010-06-25 17:25:46 +0000
committerGordon Sim <gsim@apache.org>2010-06-25 17:25:46 +0000
commitc3a6e87c3f21009135b5826de6917586477e0589 (patch)
treeb351c0d8a301b6eb7a90d1a586479f2d428ce9f5 /cpp/src
parente9920e89d298dbcc5cd01d0c79616353eb750c43 (diff)
downloadqpid-python-c3a6e87c3f21009135b5826de6917586477e0589.tar.gz
QPID-2698: recognise special property names for amqp 0-10 specific message- and delivery- properties
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@958044 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp19
-rw-r--r--cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp32
-rw-r--r--cpp/src/tests/MessagingFixture.h5
-rw-r--r--cpp/src/tests/MessagingSessionTests.cpp45
4 files changed, 98 insertions, 3 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index 30cb6340a0..2c00e6fae8 100644
--- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -290,6 +290,10 @@ namespace {
//TODO: unify conversion to and from 0-10 message that is currently
//split between IncomingMessages and OutgoingMessage
const std::string SUBJECT("qpid.subject");
+
+const std::string X_APP_ID("x-amqp-0-10.app-id");
+const std::string X_ROUTING_KEY("x-amqp-0-10.routing-key");
+const std::string X_CONTENT_ENCODING("x-amqp-0-10.content-encoding");
}
void populateHeaders(qpid::messaging::Message& message,
@@ -312,6 +316,21 @@ void populateHeaders(qpid::messaging::Message& message,
translate(messageProperties->getApplicationHeaders(), message.getProperties());
message.setCorrelationId(messageProperties->getCorrelationId());
message.setUserId(messageProperties->getUserId());
+ if (messageProperties->hasMessageId()) {
+ message.setMessageId(messageProperties->getMessageId().str());
+ }
+ //expose 0-10 specific items through special properties:
+ // app-id, content-encoding
+ if (messageProperties->hasAppId()) {
+ message.getProperties()[X_APP_ID] = messageProperties->getAppId();
+ }
+ if (messageProperties->hasContentEncoding()) {
+ message.getProperties()[X_CONTENT_ENCODING] = messageProperties->getContentEncoding();
+ }
+ // routing-key, others?
+ if (deliveryProperties && deliveryProperties->hasRoutingKey()) {
+ message.getProperties()[X_ROUTING_KEY] = deliveryProperties->getRoutingKey();
+ }
}
}
diff --git a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
index c22eb5403f..82358961c8 100644
--- a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
+++ b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp
@@ -21,10 +21,12 @@
#include "qpid/client/amqp0_10/OutgoingMessage.h"
#include "qpid/client/amqp0_10/AddressResolution.h"
#include "qpid/amqp_0_10/Codecs.h"
+#include "qpid/types/Variant.h"
#include "qpid/messaging/Address.h"
#include "qpid/messaging/Message.h"
#include "qpid/messaging/MessageImpl.h"
#include "qpid/framing/enum.h"
+#include <sstream>
namespace qpid {
namespace client {
@@ -32,9 +34,19 @@ namespace amqp0_10 {
using qpid::messaging::Address;
using qpid::messaging::MessageImplAccess;
+using qpid::types::Variant;
using namespace qpid::framing::message;
using namespace qpid::amqp_0_10;
+namespace {
+//TODO: unify conversion to and from 0-10 message that is currently
+//split between IncomingMessages and OutgoingMessage
+const std::string SUBJECT("qpid.subject");
+const std::string X_APP_ID("x-amqp-0-10.app-id");
+const std::string X_ROUTING_KEY("x-amqp-0-10.routing-key");
+const std::string X_CONTENT_ENCODING("x-amqp-0-10.content-encoding");
+}
+
void OutgoingMessage::convert(const qpid::messaging::Message& from)
{
//TODO: need to avoid copying as much as possible
@@ -55,10 +67,24 @@ void OutgoingMessage::convert(const qpid::messaging::Message& from)
message.getDeliveryProperties().setRedelivered(true);
}
if (from.getPriority()) message.getDeliveryProperties().setPriority(from.getPriority());
-}
-namespace {
-const std::string SUBJECT("qpid.subject");
+ //allow certain 0-10 specific items to be set through special properties:
+ // message-id, app-id, content-encoding
+ if (from.getMessageId().size()) {
+ qpid::framing::Uuid uuid;
+ std::istringstream data(from.getMessageId());
+ data >> uuid;
+ message.getMessageProperties().setMessageId(uuid);
+ }
+ Variant::Map::const_iterator i;
+ i = from.getProperties().find(X_APP_ID);
+ if (i != from.getProperties().end()) {
+ message.getMessageProperties().setAppId(i->second.asString());
+ }
+ i = from.getProperties().find(X_CONTENT_ENCODING);
+ if (i != from.getProperties().end()) {
+ message.getMessageProperties().setContentEncoding(i->second.asString());
+ }
}
void OutgoingMessage::setSubject(const std::string& subject)
diff --git a/cpp/src/tests/MessagingFixture.h b/cpp/src/tests/MessagingFixture.h
index 5546b4ef49..c8ae86dbc3 100644
--- a/cpp/src/tests/MessagingFixture.h
+++ b/cpp/src/tests/MessagingFixture.h
@@ -79,6 +79,11 @@ struct BrokerAdmin
return !result.getNotFound();
}
+ void send(qpid::client::Message& message, const std::string& exchange=std::string())
+ {
+ session.messageTransfer(qpid::client::arg::destination=exchange, qpid::client::arg::content=message);
+ }
+
~BrokerAdmin()
{
session.close();
diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp
index 375af73799..6fee1233d6 100644
--- a/cpp/src/tests/MessagingSessionTests.cpp
+++ b/cpp/src/tests/MessagingSessionTests.cpp
@@ -712,6 +712,51 @@ QPID_AUTO_TEST_CASE(testOptionVerification)
BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; {invalid-option:blah}"), qpid::messaging::AddressError);
}
+QPID_AUTO_TEST_CASE(testReceiveSpecialProperties)
+{
+ QueueFixture fix;
+
+ qpid::client::Message out;
+ out.getDeliveryProperties().setRoutingKey(fix.queue);
+ out.getMessageProperties().setAppId("my-app-id");
+ out.getMessageProperties().setMessageId(qpid::framing::Uuid(true));
+ out.getMessageProperties().setContentEncoding("my-content-encoding");
+ fix.admin.send(out);
+
+ Receiver receiver = fix.session.createReceiver(fix.queue);
+ Message in = receiver.fetch(Duration::SECOND * 5);
+ BOOST_CHECK_EQUAL(in.getProperties()["x-amqp-0-10.routing-key"].asString(), out.getDeliveryProperties().getRoutingKey());
+ BOOST_CHECK_EQUAL(in.getProperties()["x-amqp-0-10.app-id"].asString(), out.getMessageProperties().getAppId());
+ BOOST_CHECK_EQUAL(in.getProperties()["x-amqp-0-10.content-encoding"].asString(), out.getMessageProperties().getContentEncoding());
+ BOOST_CHECK_EQUAL(in.getMessageId(), out.getMessageProperties().getMessageId().str());
+ fix.session.acknowledge(true);
+}
+
+QPID_AUTO_TEST_CASE(testSendSpecialProperties)
+{
+ QueueFixture fix;
+ Sender sender = fix.session.createSender(fix.queue);
+ Message out("test-message");
+ std::string appId = "my-app-id";
+ std::string contentEncoding = "my-content-encoding";
+ out.getProperties()["x-amqp-0-10.app-id"] = appId;
+ out.getProperties()["x-amqp-0-10.content-encoding"] = contentEncoding;
+ out.setMessageId(qpid::framing::Uuid(true).str());
+ sender.send(out, true);
+
+ qpid::client::LocalQueue q;
+ qpid::client::SubscriptionManager subs(fix.admin.session);
+ qpid::client::Subscription s = subs.subscribe(q, fix.queue);
+ qpid::client::Message in = q.get();
+ s.cancel();
+ fix.admin.session.sync();
+
+ BOOST_CHECK_EQUAL(in.getMessageProperties().getAppId(), appId);
+ BOOST_CHECK_EQUAL(in.getMessageProperties().getContentEncoding(), contentEncoding);
+ BOOST_CHECK_EQUAL(in.getMessageProperties().getMessageId().str(), out.getMessageId());
+}
+
+
QPID_AUTO_TEST_SUITE_END()
}} // namespace qpid::tests