diff options
| author | Gordon Sim <gsim@apache.org> | 2013-09-20 15:44:05 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-09-20 15:44:05 +0000 |
| commit | e384bf948ed10dcaa1193c5e873053cbd249accc (patch) | |
| tree | b11fd5952149a50757f6a94c9ec0956155468c8b /qpid/cpp | |
| parent | ebbfc33fdf739e8f6f7a70cec6e3e34c9c5b2fce (diff) | |
| download | qpid-python-e384bf948ed10dcaa1193c5e873053cbd249accc.tar.gz | |
QPID-5150: fixes for QMFv2 over AMQP 1.0
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1525044 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Translation.cpp | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.cpp | 19 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.h | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp | 3 |
4 files changed, 20 insertions, 9 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp index 2f15ff6658..2196c8ff3d 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp @@ -41,6 +41,7 @@ const std::string EMPTY; const std::string FORWARD_SLASH("/"); const std::string TEXT_PLAIN("text/plain"); const std::string SUBJECT_KEY("qpid.subject"); +const std::string APP_ID("x-amqp-0-10.app-id"); qpid::framing::ReplyTo translate(const std::string address, Broker* broker) { @@ -253,6 +254,10 @@ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation if (ap) { qpid::amqp::Decoder d(ap.data, ap.size); qpid::amqp_0_10::translate(d.readMap(), props->getApplicationHeaders()); + std::string appid = props->getApplicationHeaders().getAsString(APP_ID); + if (!appid.empty()) { + props->setAppId(appid); + } } qpid::framing::DeliveryProperties* dp = diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 5f653939ec..d80dd6e6a3 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -39,6 +39,7 @@ #include "qpid/sys/PollableQueue.h" #include "qpid/broker/Connection.h" #include "qpid/broker/AclModule.h" +#include "qpid/broker/Protocol.h" #include "qpid/types/Variant.h" #include "qpid/types/Uuid.h" #include "qpid/framing/List.h" @@ -169,7 +170,7 @@ ManagementAgent::RemoteAgent::~RemoteAgent () } ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) : - threadPoolSize(1), publish(true), interval(10), broker(0), timer(0), + threadPoolSize(1), publish(true), interval(10), broker(0), timer(0), protocols(0), startTime(sys::now()), suppressed(false), disallowAllV1Methods(false), vendorNameKey(defaultVendorName), productNameKey(defaultProductName), @@ -221,6 +222,7 @@ void ManagementAgent::configure(const string& _dataDir, bool _publish, uint16_t timer = &broker->getTimer(); timer->add(new Periodic(boost::bind(&ManagementAgent::periodicProcessing, this), timer, interval)); + protocols = &broker->getProtocolRegistry(); // Get from file or generate and save to file. if (dataDir.empty()) { @@ -2132,9 +2134,9 @@ bool ManagementAgent::authorizeAgentMessage(Message& msg) uint32_t bufferLen = inBuffer.getPosition(); inBuffer.reset(); - qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg)); + boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> transfer = protocols->translate(msg); const framing::MessageProperties* p = - transfer.getFrames().getHeaders()->get<framing::MessageProperties>(); + transfer ? transfer->getFrames().getHeaders()->get<framing::MessageProperties>() : 0; const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0; @@ -2229,9 +2231,9 @@ bool ManagementAgent::authorizeAgentMessage(Message& msg) // authorization failed, send reply if replyTo present - qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg)); + boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> transfer = protocols->translate(msg); const framing::MessageProperties* p = - transfer.getFrames().getHeaders()->get<framing::MessageProperties>(); + transfer ? transfer->getFrames().getHeaders()->get<framing::MessageProperties>() : 0; if (p && p->hasReplyTo()) { const framing::ReplyTo& rt = p->getReplyTo(); string rte = rt.getExchange(); @@ -2266,9 +2268,10 @@ void ManagementAgent::dispatchAgentCommand(Message& msg, bool viaLocal) { string rte; string rtk; - qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg)); - const framing::MessageProperties* p = - transfer.getFrames().getHeaders()->get<framing::MessageProperties>(); + + boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> transfer = protocols->translate(msg); + const framing::MessageProperties* p = transfer ? + transfer->getFrames().getHeaders()->get<framing::MessageProperties>() : 0; if (p && p->hasReplyTo()) { const framing::ReplyTo& rt = p->getReplyTo(); rte = rt.getExchange(); diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h index d2869a705f..c2eb5d4a31 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.h +++ b/qpid/cpp/src/qpid/management/ManagementAgent.h @@ -44,6 +44,7 @@ namespace qpid { namespace broker { class Connection; +class ProtocolRegistry; } namespace sys { class Timer; @@ -256,6 +257,7 @@ private: uint16_t interval; qpid::broker::Broker* broker; qpid::sys::Timer* timer; + qpid::broker::ProtocolRegistry* protocols; uint16_t bootSequence; uint32_t nextObjectId; uint32_t brokerBank; diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp index c3a9107333..94851da273 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -134,6 +134,7 @@ namespace { const std::string X_AMQP("x-amqp-"); const std::string X_AMQP_FIRST_ACQUIRER("x-amqp-first-acquirer"); const std::string X_AMQP_DELIVERY_COUNT("x-amqp-delivery-count"); +const std::string X_AMQP_0_10_APP_ID("x-amqp-0-10.app-id"); class HeaderAdapter : public qpid::amqp::MessageEncoder::Header { @@ -353,7 +354,7 @@ class ApplicationPropertiesAdapter : public qpid::amqp::MessageEncoder::Applicat { for (qpid::types::Variant::Map::const_iterator i = headers.begin(); i != headers.end(); ++i) { //strip out values with special keys as they are sent in standard fields - if (!startsWith(i->first, X_AMQP)) { + if (!startsWith(i->first, X_AMQP) || i->first == X_AMQP_0_10_APP_ID) { qpid::amqp::CharSequence key(convert(i->first)); switch (i->second.getType()) { case qpid::types::VAR_VOID: |
