diff options
author | Gordon Sim <gsim@apache.org> | 2010-06-25 17:25:46 +0000 |
---|---|---|
committer | Gordon Sim <gsim@apache.org> | 2010-06-25 17:25:46 +0000 |
commit | c3a6e87c3f21009135b5826de6917586477e0589 (patch) | |
tree | b351c0d8a301b6eb7a90d1a586479f2d428ce9f5 /cpp/src | |
parent | e9920e89d298dbcc5cd01d0c79616353eb750c43 (diff) | |
download | qpid-python-c3a6e87c3f21009135b5826de6917586477e0589.tar.gz |
QPID-2698: recognise special property names for amqp 0-10 specific message- and delivery- properties
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@958044 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp | 19 | ||||
-rw-r--r-- | cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp | 32 | ||||
-rw-r--r-- | cpp/src/tests/MessagingFixture.h | 5 | ||||
-rw-r--r-- | cpp/src/tests/MessagingSessionTests.cpp | 45 |
4 files changed, 98 insertions, 3 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index 30cb6340a0..2c00e6fae8 100644 --- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -290,6 +290,10 @@ namespace { //TODO: unify conversion to and from 0-10 message that is currently //split between IncomingMessages and OutgoingMessage const std::string SUBJECT("qpid.subject"); + +const std::string X_APP_ID("x-amqp-0-10.app-id"); +const std::string X_ROUTING_KEY("x-amqp-0-10.routing-key"); +const std::string X_CONTENT_ENCODING("x-amqp-0-10.content-encoding"); } void populateHeaders(qpid::messaging::Message& message, @@ -312,6 +316,21 @@ void populateHeaders(qpid::messaging::Message& message, translate(messageProperties->getApplicationHeaders(), message.getProperties()); message.setCorrelationId(messageProperties->getCorrelationId()); message.setUserId(messageProperties->getUserId()); + if (messageProperties->hasMessageId()) { + message.setMessageId(messageProperties->getMessageId().str()); + } + //expose 0-10 specific items through special properties: + // app-id, content-encoding + if (messageProperties->hasAppId()) { + message.getProperties()[X_APP_ID] = messageProperties->getAppId(); + } + if (messageProperties->hasContentEncoding()) { + message.getProperties()[X_CONTENT_ENCODING] = messageProperties->getContentEncoding(); + } + // routing-key, others? + if (deliveryProperties && deliveryProperties->hasRoutingKey()) { + message.getProperties()[X_ROUTING_KEY] = deliveryProperties->getRoutingKey(); + } } } diff --git a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp index c22eb5403f..82358961c8 100644 --- a/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp +++ b/cpp/src/qpid/client/amqp0_10/OutgoingMessage.cpp @@ -21,10 +21,12 @@ #include "qpid/client/amqp0_10/OutgoingMessage.h" #include "qpid/client/amqp0_10/AddressResolution.h" #include "qpid/amqp_0_10/Codecs.h" +#include "qpid/types/Variant.h" #include "qpid/messaging/Address.h" #include "qpid/messaging/Message.h" #include "qpid/messaging/MessageImpl.h" #include "qpid/framing/enum.h" +#include <sstream> namespace qpid { namespace client { @@ -32,9 +34,19 @@ namespace amqp0_10 { using qpid::messaging::Address; using qpid::messaging::MessageImplAccess; +using qpid::types::Variant; using namespace qpid::framing::message; using namespace qpid::amqp_0_10; +namespace { +//TODO: unify conversion to and from 0-10 message that is currently +//split between IncomingMessages and OutgoingMessage +const std::string SUBJECT("qpid.subject"); +const std::string X_APP_ID("x-amqp-0-10.app-id"); +const std::string X_ROUTING_KEY("x-amqp-0-10.routing-key"); +const std::string X_CONTENT_ENCODING("x-amqp-0-10.content-encoding"); +} + void OutgoingMessage::convert(const qpid::messaging::Message& from) { //TODO: need to avoid copying as much as possible @@ -55,10 +67,24 @@ void OutgoingMessage::convert(const qpid::messaging::Message& from) message.getDeliveryProperties().setRedelivered(true); } if (from.getPriority()) message.getDeliveryProperties().setPriority(from.getPriority()); -} -namespace { -const std::string SUBJECT("qpid.subject"); + //allow certain 0-10 specific items to be set through special properties: + // message-id, app-id, content-encoding + if (from.getMessageId().size()) { + qpid::framing::Uuid uuid; + std::istringstream data(from.getMessageId()); + data >> uuid; + message.getMessageProperties().setMessageId(uuid); + } + Variant::Map::const_iterator i; + i = from.getProperties().find(X_APP_ID); + if (i != from.getProperties().end()) { + message.getMessageProperties().setAppId(i->second.asString()); + } + i = from.getProperties().find(X_CONTENT_ENCODING); + if (i != from.getProperties().end()) { + message.getMessageProperties().setContentEncoding(i->second.asString()); + } } void OutgoingMessage::setSubject(const std::string& subject) diff --git a/cpp/src/tests/MessagingFixture.h b/cpp/src/tests/MessagingFixture.h index 5546b4ef49..c8ae86dbc3 100644 --- a/cpp/src/tests/MessagingFixture.h +++ b/cpp/src/tests/MessagingFixture.h @@ -79,6 +79,11 @@ struct BrokerAdmin return !result.getNotFound(); } + void send(qpid::client::Message& message, const std::string& exchange=std::string()) + { + session.messageTransfer(qpid::client::arg::destination=exchange, qpid::client::arg::content=message); + } + ~BrokerAdmin() { session.close(); diff --git a/cpp/src/tests/MessagingSessionTests.cpp b/cpp/src/tests/MessagingSessionTests.cpp index 375af73799..6fee1233d6 100644 --- a/cpp/src/tests/MessagingSessionTests.cpp +++ b/cpp/src/tests/MessagingSessionTests.cpp @@ -712,6 +712,51 @@ QPID_AUTO_TEST_CASE(testOptionVerification) BOOST_CHECK_THROW(fix.session.createReceiver("my-queue; {invalid-option:blah}"), qpid::messaging::AddressError); } +QPID_AUTO_TEST_CASE(testReceiveSpecialProperties) +{ + QueueFixture fix; + + qpid::client::Message out; + out.getDeliveryProperties().setRoutingKey(fix.queue); + out.getMessageProperties().setAppId("my-app-id"); + out.getMessageProperties().setMessageId(qpid::framing::Uuid(true)); + out.getMessageProperties().setContentEncoding("my-content-encoding"); + fix.admin.send(out); + + Receiver receiver = fix.session.createReceiver(fix.queue); + Message in = receiver.fetch(Duration::SECOND * 5); + BOOST_CHECK_EQUAL(in.getProperties()["x-amqp-0-10.routing-key"].asString(), out.getDeliveryProperties().getRoutingKey()); + BOOST_CHECK_EQUAL(in.getProperties()["x-amqp-0-10.app-id"].asString(), out.getMessageProperties().getAppId()); + BOOST_CHECK_EQUAL(in.getProperties()["x-amqp-0-10.content-encoding"].asString(), out.getMessageProperties().getContentEncoding()); + BOOST_CHECK_EQUAL(in.getMessageId(), out.getMessageProperties().getMessageId().str()); + fix.session.acknowledge(true); +} + +QPID_AUTO_TEST_CASE(testSendSpecialProperties) +{ + QueueFixture fix; + Sender sender = fix.session.createSender(fix.queue); + Message out("test-message"); + std::string appId = "my-app-id"; + std::string contentEncoding = "my-content-encoding"; + out.getProperties()["x-amqp-0-10.app-id"] = appId; + out.getProperties()["x-amqp-0-10.content-encoding"] = contentEncoding; + out.setMessageId(qpid::framing::Uuid(true).str()); + sender.send(out, true); + + qpid::client::LocalQueue q; + qpid::client::SubscriptionManager subs(fix.admin.session); + qpid::client::Subscription s = subs.subscribe(q, fix.queue); + qpid::client::Message in = q.get(); + s.cancel(); + fix.admin.session.sync(); + + BOOST_CHECK_EQUAL(in.getMessageProperties().getAppId(), appId); + BOOST_CHECK_EQUAL(in.getMessageProperties().getContentEncoding(), contentEncoding); + BOOST_CHECK_EQUAL(in.getMessageProperties().getMessageId().str(), out.getMessageId()); +} + + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests |