summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-09-20 15:44:05 +0000
committerGordon Sim <gsim@apache.org>2013-09-20 15:44:05 +0000
commite384bf948ed10dcaa1193c5e873053cbd249accc (patch)
treeb11fd5952149a50757f6a94c9ec0956155468c8b /qpid/cpp
parentebbfc33fdf739e8f6f7a70cec6e3e34c9c5b2fce (diff)
downloadqpid-python-e384bf948ed10dcaa1193c5e873053cbd249accc.tar.gz
QPID-5150: fixes for QMFv2 over AMQP 1.0
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1525044 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Translation.cpp5
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.cpp19
-rw-r--r--qpid/cpp/src/qpid/management/ManagementAgent.h2
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp3
4 files changed, 20 insertions, 9 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
index 2f15ff6658..2196c8ff3d 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Translation.cpp
@@ -41,6 +41,7 @@ const std::string EMPTY;
const std::string FORWARD_SLASH("/");
const std::string TEXT_PLAIN("text/plain");
const std::string SUBJECT_KEY("qpid.subject");
+const std::string APP_ID("x-amqp-0-10.app-id");
qpid::framing::ReplyTo translate(const std::string address, Broker* broker)
{
@@ -253,6 +254,10 @@ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> Translation
if (ap) {
qpid::amqp::Decoder d(ap.data, ap.size);
qpid::amqp_0_10::translate(d.readMap(), props->getApplicationHeaders());
+ std::string appid = props->getApplicationHeaders().getAsString(APP_ID);
+ if (!appid.empty()) {
+ props->setAppId(appid);
+ }
}
qpid::framing::DeliveryProperties* dp =
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
index 5f653939ec..d80dd6e6a3 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp
@@ -39,6 +39,7 @@
#include "qpid/sys/PollableQueue.h"
#include "qpid/broker/Connection.h"
#include "qpid/broker/AclModule.h"
+#include "qpid/broker/Protocol.h"
#include "qpid/types/Variant.h"
#include "qpid/types/Uuid.h"
#include "qpid/framing/List.h"
@@ -169,7 +170,7 @@ ManagementAgent::RemoteAgent::~RemoteAgent ()
}
ManagementAgent::ManagementAgent (const bool qmfV1, const bool qmfV2) :
- threadPoolSize(1), publish(true), interval(10), broker(0), timer(0),
+ threadPoolSize(1), publish(true), interval(10), broker(0), timer(0), protocols(0),
startTime(sys::now()),
suppressed(false), disallowAllV1Methods(false),
vendorNameKey(defaultVendorName), productNameKey(defaultProductName),
@@ -221,6 +222,7 @@ void ManagementAgent::configure(const string& _dataDir, bool _publish, uint16_t
timer = &broker->getTimer();
timer->add(new Periodic(boost::bind(&ManagementAgent::periodicProcessing, this), timer, interval));
+ protocols = &broker->getProtocolRegistry();
// Get from file or generate and save to file.
if (dataDir.empty())
{
@@ -2132,9 +2134,9 @@ bool ManagementAgent::authorizeAgentMessage(Message& msg)
uint32_t bufferLen = inBuffer.getPosition();
inBuffer.reset();
- qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg));
+ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> transfer = protocols->translate(msg);
const framing::MessageProperties* p =
- transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
+ transfer ? transfer->getFrames().getHeaders()->get<framing::MessageProperties>() : 0;
const framing::FieldTable *headers = p ? &p->getApplicationHeaders() : 0;
@@ -2229,9 +2231,9 @@ bool ManagementAgent::authorizeAgentMessage(Message& msg)
// authorization failed, send reply if replyTo present
- qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg));
+ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> transfer = protocols->translate(msg);
const framing::MessageProperties* p =
- transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
+ transfer ? transfer->getFrames().getHeaders()->get<framing::MessageProperties>() : 0;
if (p && p->hasReplyTo()) {
const framing::ReplyTo& rt = p->getReplyTo();
string rte = rt.getExchange();
@@ -2266,9 +2268,10 @@ void ManagementAgent::dispatchAgentCommand(Message& msg, bool viaLocal)
{
string rte;
string rtk;
- qpid::broker::amqp_0_10::MessageTransfer& transfer(qpid::broker::amqp_0_10::MessageTransfer::get(msg));
- const framing::MessageProperties* p =
- transfer.getFrames().getHeaders()->get<framing::MessageProperties>();
+
+ boost::intrusive_ptr<const qpid::broker::amqp_0_10::MessageTransfer> transfer = protocols->translate(msg);
+ const framing::MessageProperties* p = transfer ?
+ transfer->getFrames().getHeaders()->get<framing::MessageProperties>() : 0;
if (p && p->hasReplyTo()) {
const framing::ReplyTo& rt = p->getReplyTo();
rte = rt.getExchange();
diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.h b/qpid/cpp/src/qpid/management/ManagementAgent.h
index d2869a705f..c2eb5d4a31 100644
--- a/qpid/cpp/src/qpid/management/ManagementAgent.h
+++ b/qpid/cpp/src/qpid/management/ManagementAgent.h
@@ -44,6 +44,7 @@
namespace qpid {
namespace broker {
class Connection;
+class ProtocolRegistry;
}
namespace sys {
class Timer;
@@ -256,6 +257,7 @@ private:
uint16_t interval;
qpid::broker::Broker* broker;
qpid::sys::Timer* timer;
+ qpid::broker::ProtocolRegistry* protocols;
uint16_t bootSequence;
uint32_t nextObjectId;
uint32_t brokerBank;
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
index c3a9107333..94851da273 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
@@ -134,6 +134,7 @@ namespace {
const std::string X_AMQP("x-amqp-");
const std::string X_AMQP_FIRST_ACQUIRER("x-amqp-first-acquirer");
const std::string X_AMQP_DELIVERY_COUNT("x-amqp-delivery-count");
+const std::string X_AMQP_0_10_APP_ID("x-amqp-0-10.app-id");
class HeaderAdapter : public qpid::amqp::MessageEncoder::Header
{
@@ -353,7 +354,7 @@ class ApplicationPropertiesAdapter : public qpid::amqp::MessageEncoder::Applicat
{
for (qpid::types::Variant::Map::const_iterator i = headers.begin(); i != headers.end(); ++i) {
//strip out values with special keys as they are sent in standard fields
- if (!startsWith(i->first, X_AMQP)) {
+ if (!startsWith(i->first, X_AMQP) || i->first == X_AMQP_0_10_APP_ID) {
qpid::amqp::CharSequence key(convert(i->first));
switch (i->second.getType()) {
case qpid::types::VAR_VOID: