summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-11-15 17:34:38 +0000
committerGordon Sim <gsim@apache.org>2013-11-15 17:34:38 +0000
commitda0093716b0616eaace9102eff98bedfeb4716ee (patch)
treed6a72ee5070b873ed29d06bce745430241e8c225 /qpid/cpp/src
parentacb2b605a39cb9cceabf7aaab646b073f28aefe4 (diff)
downloadqpid-python-da0093716b0616eaace9102eff98bedfeb4716ee.tar.gz
QPID-5348: add option to have to field populated automatically
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1542337 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp4
-rw-r--r--qpid/cpp/src/qpid/messaging/ConnectionOptions.h1
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp22
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.h5
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp4
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionContext.h2
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp2
7 files changed, 24 insertions, 16 deletions
diff --git a/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp b/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp
index e534920876..26bb699565 100644
--- a/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp
+++ b/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp
@@ -52,7 +52,7 @@ void merge(const qpid::types::Variant::List& from, std::vector<std::string>& to)
ConnectionOptions::ConnectionOptions(const std::map<std::string, qpid::types::Variant>& options)
: replaceUrls(false), reconnect(false), timeout(FOREVER), limit(-1), minReconnectInterval(0.001), maxReconnectInterval(2),
- retries(0), reconnectOnLimitExceeded(true), nestAnnotations(false)
+ retries(0), reconnectOnLimitExceeded(true), nestAnnotations(false), setToOnSend(false)
{
for (qpid::types::Variant::Map::const_iterator i = options.begin(); i != options.end(); ++i) {
set(i->first, i->second);
@@ -117,6 +117,8 @@ void ConnectionOptions::set(const std::string& name, const qpid::types::Variant&
identifier = value.asString();
} else if (name == "nest-annotations" || name == "nest_annotations") {
nestAnnotations = value;
+ } else if (name == "set-to-on-send" || name == "set_to_on_send") {
+ setToOnSend = value;
} else {
throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised"));
}
diff --git a/qpid/cpp/src/qpid/messaging/ConnectionOptions.h b/qpid/cpp/src/qpid/messaging/ConnectionOptions.h
index 5942592d78..877e541636 100644
--- a/qpid/cpp/src/qpid/messaging/ConnectionOptions.h
+++ b/qpid/cpp/src/qpid/messaging/ConnectionOptions.h
@@ -46,6 +46,7 @@ struct ConnectionOptions : qpid::client::ConnectionSettings
bool reconnectOnLimitExceeded;
std::string identifier;
bool nestAnnotations;
+ bool setToOnSend;
QPID_MESSAGING_EXTERN ConnectionOptions(const std::map<std::string, qpid::types::Variant>&);
QPID_MESSAGING_EXTERN void set(const std::string& name, const qpid::types::Variant& value);
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
index 1f9d1d5723..0bf8bef27f 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
@@ -40,11 +40,12 @@ namespace qpid {
namespace messaging {
namespace amqp {
//TODO: proper conversion to wide string for address
-SenderContext::SenderContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a)
+SenderContext::SenderContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a, bool setToOnSend_)
: name(n),
address(a),
helper(address),
- sender(pn_sender(session, n.c_str())), capacity(50), unreliable(helper.isUnreliable()) {}
+ sender(pn_sender(session, n.c_str())), capacity(50), unreliable(helper.isUnreliable()),
+ setToOnSend(setToOnSend_) {}
SenderContext::~SenderContext()
{
@@ -88,14 +89,14 @@ bool SenderContext::send(const qpid::messaging::Message& message, SenderContext:
if (processUnsettled(false) < capacity && pn_link_credit(sender)) {
if (unreliable) {
Delivery delivery(nextId++);
- delivery.encode(MessageImplAccess::get(message), address);
+ delivery.encode(MessageImplAccess::get(message), address, setToOnSend);
delivery.send(sender, unreliable);
*out = 0;
return true;
} else {
deliveries.push_back(Delivery(nextId++));
Delivery& delivery = deliveries.back();
- delivery.encode(MessageImplAccess::get(message), address);
+ delivery.encode(MessageImplAccess::get(message), address, setToOnSend);
delivery.send(sender, unreliable);
*out = &delivery;
return true;
@@ -195,7 +196,7 @@ const std::string X_AMQP_DELIVERY_ANNOTATIONS("x-amqp-delivery-annotations");
class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties
{
public:
- PropertiesAdapter(const qpid::messaging::MessageImpl& impl, const std::string& s) : msg(impl), headers(msg.getHeaders()), subject(s) {}
+ PropertiesAdapter(const qpid::messaging::MessageImpl& impl, const std::string& s, const std::string& t) : msg(impl), headers(msg.getHeaders()), subject(s), to(t) {}
bool hasMessageId() const
{
return getMessageId().size();
@@ -217,12 +218,14 @@ class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties
bool hasTo() const
{
- return hasHeader(X_AMQP_TO);
+ return hasHeader(X_AMQP_TO) || !to.empty();
}
std::string getTo() const
{
- return headers.find(X_AMQP_TO)->second;
+ qpid::types::Variant::Map::const_iterator i = headers.find(X_AMQP_TO);
+ if (i == headers.end()) return to;
+ else return i->second;
}
bool hasSubject() const
@@ -333,6 +336,7 @@ class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties
const qpid::messaging::MessageImpl& msg;
const qpid::types::Variant::Map& headers;
const std::string subject;
+ const std::string to;
bool hasHeader(const std::string& key) const
{
@@ -435,7 +439,7 @@ void SenderContext::Delivery::reset()
token = 0;
}
-void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address)
+void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address, bool setToField)
{
try {
boost::shared_ptr<const EncodedMessage> original = msg.getEncoded();
@@ -458,7 +462,7 @@ void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, co
}
} else {
HeaderAdapter header(msg);
- PropertiesAdapter properties(msg, address.getSubject());
+ PropertiesAdapter properties(msg, address.getSubject(), setToField ? address.getName() : EMPTY);
ApplicationPropertiesAdapter applicationProperties(msg.getHeaders());
//compute size:
size_t contentSize = qpid::amqp::MessageEncoder::getEncodedSize(header)
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
index be050024f1..66e45a85a6 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
@@ -51,7 +51,7 @@ class SenderContext
{
public:
Delivery(int32_t id);
- void encode(const qpid::messaging::MessageImpl& message, const qpid::messaging::Address&);
+ void encode(const qpid::messaging::MessageImpl& message, const qpid::messaging::Address&, bool setToField);
void send(pn_link_t*, bool unreliable);
bool delivered();
bool accepted();
@@ -66,7 +66,7 @@ class SenderContext
bool presettled;
};
- SenderContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& target);
+ SenderContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& target, bool setToOnSend);
~SenderContext();
void reset(pn_session_t* session);
void close();
@@ -94,6 +94,7 @@ class SenderContext
Deliveries deliveries;
uint32_t capacity;
bool unreliable;
+ bool setToOnSend;
uint32_t processUnsettled(bool silent);
void configure(pn_terminus_t*);
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
index 3d24380801..7673e744c7 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
@@ -41,11 +41,11 @@ SessionContext::~SessionContext()
pn_session_free(session);
}
-boost::shared_ptr<SenderContext> SessionContext::createSender(const qpid::messaging::Address& address)
+boost::shared_ptr<SenderContext> SessionContext::createSender(const qpid::messaging::Address& address, bool setToOnSend)
{
std::string name = AddressHelper::getLinkName(address);
if (senders.find(name) != senders.end()) throw LinkError("Link name must be unique within the scope of the connection");
- boost::shared_ptr<SenderContext> s(new SenderContext(session, name, address));
+ boost::shared_ptr<SenderContext> s(new SenderContext(session, name, address, setToOnSend));
senders[name] = s;
return s;
}
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
index 75a67a7d15..df69e92ed3 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
@@ -51,7 +51,7 @@ class SessionContext
SessionContext(pn_connection_t*);
~SessionContext();
void reset(pn_connection_t*);
- boost::shared_ptr<SenderContext> createSender(const qpid::messaging::Address& address);
+ boost::shared_ptr<SenderContext> createSender(const qpid::messaging::Address& address, bool setToOnSend);
boost::shared_ptr<ReceiverContext> createReceiver(const qpid::messaging::Address& address);
boost::shared_ptr<SenderContext> getSender(const std::string& name) const;
boost::shared_ptr<ReceiverContext> getReceiver(const std::string& name) const;
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
index 50fe4ef30e..044f208564 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp
@@ -82,7 +82,7 @@ void SessionHandle::sync(bool /*block*/)
qpid::messaging::Sender SessionHandle::createSender(const qpid::messaging::Address& address)
{
- boost::shared_ptr<SenderContext> sender = session->createSender(address);
+ boost::shared_ptr<SenderContext> sender = session->createSender(address, connection->setToOnSend);
try {
connection->attach(session, sender);
return qpid::messaging::Sender(new SenderHandle(connection, session, sender));