diff options
| author | Gordon Sim <gsim@apache.org> | 2013-11-15 17:34:38 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-11-15 17:34:38 +0000 |
| commit | da0093716b0616eaace9102eff98bedfeb4716ee (patch) | |
| tree | d6a72ee5070b873ed29d06bce745430241e8c225 /qpid/cpp/src | |
| parent | acb2b605a39cb9cceabf7aaab646b073f28aefe4 (diff) | |
| download | qpid-python-da0093716b0616eaace9102eff98bedfeb4716ee.tar.gz | |
QPID-5348: add option to have to field populated automatically
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1542337 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
7 files changed, 24 insertions, 16 deletions
diff --git a/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp b/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp index e534920876..26bb699565 100644 --- a/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp +++ b/qpid/cpp/src/qpid/messaging/ConnectionOptions.cpp @@ -52,7 +52,7 @@ void merge(const qpid::types::Variant::List& from, std::vector<std::string>& to) ConnectionOptions::ConnectionOptions(const std::map<std::string, qpid::types::Variant>& options) : replaceUrls(false), reconnect(false), timeout(FOREVER), limit(-1), minReconnectInterval(0.001), maxReconnectInterval(2), - retries(0), reconnectOnLimitExceeded(true), nestAnnotations(false) + retries(0), reconnectOnLimitExceeded(true), nestAnnotations(false), setToOnSend(false) { for (qpid::types::Variant::Map::const_iterator i = options.begin(); i != options.end(); ++i) { set(i->first, i->second); @@ -117,6 +117,8 @@ void ConnectionOptions::set(const std::string& name, const qpid::types::Variant& identifier = value.asString(); } else if (name == "nest-annotations" || name == "nest_annotations") { nestAnnotations = value; + } else if (name == "set-to-on-send" || name == "set_to_on_send") { + setToOnSend = value; } else { throw qpid::messaging::MessagingException(QPID_MSG("Invalid option: " << name << " not recognised")); } diff --git a/qpid/cpp/src/qpid/messaging/ConnectionOptions.h b/qpid/cpp/src/qpid/messaging/ConnectionOptions.h index 5942592d78..877e541636 100644 --- a/qpid/cpp/src/qpid/messaging/ConnectionOptions.h +++ b/qpid/cpp/src/qpid/messaging/ConnectionOptions.h @@ -46,6 +46,7 @@ struct ConnectionOptions : qpid::client::ConnectionSettings bool reconnectOnLimitExceeded; std::string identifier; bool nestAnnotations; + bool setToOnSend; QPID_MESSAGING_EXTERN ConnectionOptions(const std::map<std::string, qpid::types::Variant>&); QPID_MESSAGING_EXTERN void set(const std::string& name, const qpid::types::Variant& value); diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp index 1f9d1d5723..0bf8bef27f 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -40,11 +40,12 @@ namespace qpid { namespace messaging { namespace amqp { //TODO: proper conversion to wide string for address -SenderContext::SenderContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a) +SenderContext::SenderContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a, bool setToOnSend_) : name(n), address(a), helper(address), - sender(pn_sender(session, n.c_str())), capacity(50), unreliable(helper.isUnreliable()) {} + sender(pn_sender(session, n.c_str())), capacity(50), unreliable(helper.isUnreliable()), + setToOnSend(setToOnSend_) {} SenderContext::~SenderContext() { @@ -88,14 +89,14 @@ bool SenderContext::send(const qpid::messaging::Message& message, SenderContext: if (processUnsettled(false) < capacity && pn_link_credit(sender)) { if (unreliable) { Delivery delivery(nextId++); - delivery.encode(MessageImplAccess::get(message), address); + delivery.encode(MessageImplAccess::get(message), address, setToOnSend); delivery.send(sender, unreliable); *out = 0; return true; } else { deliveries.push_back(Delivery(nextId++)); Delivery& delivery = deliveries.back(); - delivery.encode(MessageImplAccess::get(message), address); + delivery.encode(MessageImplAccess::get(message), address, setToOnSend); delivery.send(sender, unreliable); *out = &delivery; return true; @@ -195,7 +196,7 @@ const std::string X_AMQP_DELIVERY_ANNOTATIONS("x-amqp-delivery-annotations"); class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties { public: - PropertiesAdapter(const qpid::messaging::MessageImpl& impl, const std::string& s) : msg(impl), headers(msg.getHeaders()), subject(s) {} + PropertiesAdapter(const qpid::messaging::MessageImpl& impl, const std::string& s, const std::string& t) : msg(impl), headers(msg.getHeaders()), subject(s), to(t) {} bool hasMessageId() const { return getMessageId().size(); @@ -217,12 +218,14 @@ class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties bool hasTo() const { - return hasHeader(X_AMQP_TO); + return hasHeader(X_AMQP_TO) || !to.empty(); } std::string getTo() const { - return headers.find(X_AMQP_TO)->second; + qpid::types::Variant::Map::const_iterator i = headers.find(X_AMQP_TO); + if (i == headers.end()) return to; + else return i->second; } bool hasSubject() const @@ -333,6 +336,7 @@ class PropertiesAdapter : public qpid::amqp::MessageEncoder::Properties const qpid::messaging::MessageImpl& msg; const qpid::types::Variant::Map& headers; const std::string subject; + const std::string to; bool hasHeader(const std::string& key) const { @@ -435,7 +439,7 @@ void SenderContext::Delivery::reset() token = 0; } -void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address) +void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address, bool setToField) { try { boost::shared_ptr<const EncodedMessage> original = msg.getEncoded(); @@ -458,7 +462,7 @@ void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, co } } else { HeaderAdapter header(msg); - PropertiesAdapter properties(msg, address.getSubject()); + PropertiesAdapter properties(msg, address.getSubject(), setToField ? address.getName() : EMPTY); ApplicationPropertiesAdapter applicationProperties(msg.getHeaders()); //compute size: size_t contentSize = qpid::amqp::MessageEncoder::getEncodedSize(header) diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h index be050024f1..66e45a85a6 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h @@ -51,7 +51,7 @@ class SenderContext { public: Delivery(int32_t id); - void encode(const qpid::messaging::MessageImpl& message, const qpid::messaging::Address&); + void encode(const qpid::messaging::MessageImpl& message, const qpid::messaging::Address&, bool setToField); void send(pn_link_t*, bool unreliable); bool delivered(); bool accepted(); @@ -66,7 +66,7 @@ class SenderContext bool presettled; }; - SenderContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& target); + SenderContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& target, bool setToOnSend); ~SenderContext(); void reset(pn_session_t* session); void close(); @@ -94,6 +94,7 @@ class SenderContext Deliveries deliveries; uint32_t capacity; bool unreliable; + bool setToOnSend; uint32_t processUnsettled(bool silent); void configure(pn_terminus_t*); diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp index 3d24380801..7673e744c7 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp @@ -41,11 +41,11 @@ SessionContext::~SessionContext() pn_session_free(session); } -boost::shared_ptr<SenderContext> SessionContext::createSender(const qpid::messaging::Address& address) +boost::shared_ptr<SenderContext> SessionContext::createSender(const qpid::messaging::Address& address, bool setToOnSend) { std::string name = AddressHelper::getLinkName(address); if (senders.find(name) != senders.end()) throw LinkError("Link name must be unique within the scope of the connection"); - boost::shared_ptr<SenderContext> s(new SenderContext(session, name, address)); + boost::shared_ptr<SenderContext> s(new SenderContext(session, name, address, setToOnSend)); senders[name] = s; return s; } diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h index 75a67a7d15..df69e92ed3 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h @@ -51,7 +51,7 @@ class SessionContext SessionContext(pn_connection_t*); ~SessionContext(); void reset(pn_connection_t*); - boost::shared_ptr<SenderContext> createSender(const qpid::messaging::Address& address); + boost::shared_ptr<SenderContext> createSender(const qpid::messaging::Address& address, bool setToOnSend); boost::shared_ptr<ReceiverContext> createReceiver(const qpid::messaging::Address& address); boost::shared_ptr<SenderContext> getSender(const std::string& name) const; boost::shared_ptr<ReceiverContext> getReceiver(const std::string& name) const; diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp index 50fe4ef30e..044f208564 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionHandle.cpp @@ -82,7 +82,7 @@ void SessionHandle::sync(bool /*block*/) qpid::messaging::Sender SessionHandle::createSender(const qpid::messaging::Address& address) { - boost::shared_ptr<SenderContext> sender = session->createSender(address); + boost::shared_ptr<SenderContext> sender = session->createSender(address, connection->setToOnSend); try { connection->attach(session, sender); return qpid::messaging::Sender(new SenderHandle(connection, session, sender)); |
