summaryrefslogtreecommitdiff
path: root/qpid/cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-03-29 21:13:33 +0000
committerGordon Sim <gsim@apache.org>2013-03-29 21:13:33 +0000
commit443e02c688ef78b0cb7ae9c761fd2c42a383fcac (patch)
treee69bb73af65e834715a80fdcffaefc393aedede0 /qpid/cpp
parent53da3907d44fdb9a599af3eea62514044347121f (diff)
downloadqpid-python-443e02c688ef78b0cb7ae9c761fd2c42a383fcac.tar.gz
QPID-4679: Cleaned up processing of addresses, including errors and warnings where elements can not be supported
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1462646 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp45
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp179
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h15
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp2
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp27
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h7
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp22
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.h7
8 files changed, 241 insertions, 63 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index c779f47135..ee55a3e08b 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -63,8 +63,44 @@ bool is_capability_requested(const std::string& name, pn_data_t* capabilities)
}
return false;
}
-
+//capabilities
const std::string CREATE_ON_DEMAND("create-on-demand");
+const std::string DURABLE("durable");
+const std::string QUEUE("queue");
+const std::string TOPIC("topic");
+const std::string DIRECT_FILTER("legacy-amqp-direct-binding");
+const std::string TOPIC_FILTER("legacy-amqp-topic-binding");
+
+void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Queue> node)
+{
+ while (pn_data_next(in)) {
+ pn_bytes_t c = pn_data_get_symbol(in);
+ std::string s(c.start, c.size);
+ if (s == DURABLE) {
+ if (node->isDurable()) pn_data_put_symbol(out, c);
+ } else if (s == CREATE_ON_DEMAND || s == QUEUE || s == DIRECT_FILTER || s == TOPIC_FILTER) {
+ pn_data_put_symbol(out, c);
+ }
+ }
+}
+
+void setCapabilities(pn_data_t* in, pn_data_t* out, boost::shared_ptr<Exchange> node)
+{
+ while (pn_data_next(in)) {
+ pn_bytes_t c = pn_data_get_symbol(in);
+ std::string s(c.start, c.size);
+ if (s == DURABLE) {
+ if (node->isDurable()) pn_data_put_symbol(out, c);
+ } else if (s == CREATE_ON_DEMAND || s == TOPIC) {
+ pn_data_put_symbol(out, c);
+ } else if (s == DIRECT_FILTER) {
+ if (node->getType() == DirectExchange::typeName) pn_data_put_symbol(out, c);
+ } else if (s == TOPIC_FILTER) {
+ if (node->getType() == TopicExchange::typeName) pn_data_put_symbol(out, c);
+ }
+ }
+}
+
}
class IncomingToQueue : public DecodingIncoming
@@ -178,6 +214,10 @@ void Session::attach(pn_link_t* link)
void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::string& name)
{
ResolvedNode node = resolve(name, target, true);
+ //set capabilities
+ if (node.queue) setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.queue);
+ else if (node.exchange) setCapabilities(pn_terminus_capabilities(target), pn_terminus_capabilities(pn_link_target(link)), node.exchange);
+
const char* sourceAddress = pn_terminus_get_address(pn_link_remote_source(link));
if (!sourceAddress) {
sourceAddress = pn_terminus_get_address(pn_link_source(link));
@@ -205,6 +245,9 @@ void Session::setupIncoming(pn_link_t* link, pn_terminus_t* target, const std::s
void Session::setupOutgoing(pn_link_t* link, pn_terminus_t* source, const std::string& name)
{
ResolvedNode node = resolve(name, source, false);
+ if (node.queue) setCapabilities(pn_terminus_capabilities(source), pn_terminus_capabilities(pn_link_source(link)), node.queue);
+ else if (node.exchange) setCapabilities(pn_terminus_capabilities(source), pn_terminus_capabilities(pn_link_source(link)), node.exchange);
+
Filter filter;
filter.read(pn_terminus_filter(source));
const char* targetAddress = pn_terminus_get_address(pn_link_remote_target(link));
diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
index a46606a526..7b9934fb26 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.cpp
@@ -20,8 +20,10 @@
*/
#include "qpid/messaging/amqp/AddressHelper.h"
#include "qpid/messaging/Address.h"
+#include "qpid/messaging/AddressImpl.h"
#include "qpid/log/Statement.h"
#include <vector>
+#include <set>
#include <boost/assign.hpp>
extern "C" {
#include <proton/engine.h>
@@ -47,10 +49,13 @@ const std::string SENDER("sender");
const std::string NODE("node");
const std::string LINK("link");
+const std::string CAPABILITIES("capabilities");
+const std::string PROPERTIES("properties");
const std::string TYPE("type");
const std::string TOPIC("topic");
const std::string QUEUE("queue");
+const std::string DURABLE("durable");
//distribution modes:
const std::string MOVE("move");
@@ -61,6 +66,11 @@ const std::string CREATE_ON_DEMAND("create-on-demand");
const std::string DUMMY(".");
+const std::string X_DECLARE("x-declare");
+const std::string X_BINDINGS("x-bindings");
+const std::string X_SUBSCRIBE("x-subscribe");
+const std::string ARGUMENTS("arguments");
+
const std::vector<std::string> RECEIVER_MODES = boost::assign::list_of<std::string>(ALWAYS) (RECEIVER);
const std::vector<std::string> SENDER_MODES = boost::assign::list_of<std::string>(ALWAYS) (SENDER);
@@ -72,6 +82,24 @@ pn_bytes_t convert(const std::string& s)
return result;
}
+bool contains(const Variant::List& list, const std::string& item)
+{
+ for (Variant::List::const_iterator i = list.begin(); i != list.end(); ++i) {
+ if (*i == item) return true;
+ }
+ return false;
+}
+
+bool test(const Variant::Map& options, const std::string& name)
+{
+ Variant::Map::const_iterator j = options.find(name);
+ if (j == options.end()) {
+ return false;
+ } else {
+ return j->second;
+ }
+}
+
bool bind(const Variant::Map& options, const std::string& name, std::string& variable)
{
Variant::Map::const_iterator j = options.find(name);
@@ -94,6 +122,17 @@ bool bind(const Variant::Map& options, const std::string& name, Variant::Map& va
}
}
+bool bind(const Variant::Map& options, const std::string& name, Variant::List& variable)
+{
+ Variant::Map::const_iterator j = options.find(name);
+ if (j == options.end()) {
+ return false;
+ } else {
+ variable = j->second.asList();
+ return true;
+ }
+}
+
bool bind(const Address& address, const std::string& name, std::string& variable)
{
return bind(address.getOptions(), name, variable);
@@ -111,9 +150,23 @@ bool in(const std::string& value, const std::vector<std::string>& choices)
}
return false;
}
+void add(Variant::Map& target, const Variant::Map& source)
+{
+ for (Variant::Map::const_iterator i = source.begin(); i != source.end(); ++i) {
+ target[i->first] = i->second;
+ }
+}
+void flatten(Variant::Map& base, const std::string& nested)
+{
+ Variant::Map::iterator i = base.find(nested);
+ if (i != base.end()) {
+ add(base, i->second.asMap());
+ }
+ base.erase(i);
+}
}
-AddressHelper::AddressHelper(const Address& address)
+AddressHelper::AddressHelper(const Address& address) : isTemporary(AddressImpl::isTemporary(address)), name(address.getName()), type(address.getType())
{
bind(address, CREATE, createPolicy);
bind(address, DELETE, deletePolicy);
@@ -121,20 +174,81 @@ AddressHelper::AddressHelper(const Address& address)
bind(address, NODE, node);
bind(address, LINK, link);
+ bind(node, PROPERTIES, properties);
+ bind(node, CAPABILITIES, capabilities);
+ durableNode = test(node, DURABLE);
+
+ if (!deletePolicy.empty()) {
+ throw qpid::messaging::AddressError("Delete policies not supported over AMQP 1.0.");
+ }
+ if (node.find(X_BINDINGS) != node.end()) {
+ throw qpid::messaging::AddressError("Node scoped x-bindings element not supported over AMQP 1.0.");
+ }
+ if (link.find(X_BINDINGS) != link.end()) {
+ throw qpid::messaging::AddressError("Link scoped x-bindings element not supported over AMQP 1.0.");
+ }
+ if (link.find(X_SUBSCRIBE) != link.end()) {
+ throw qpid::messaging::AddressError("Link scoped x-subscribe element not supported over AMQP 1.0.");
+ }
+ if (link.find(X_DECLARE) != link.end()) {
+ throw qpid::messaging::AddressError("Link scoped x-declare element not supported over AMQP 1.0.");
+ }
+ //massage x-declare into properties
+ Variant::Map::iterator i = node.find(X_DECLARE);
+ if (i != node.end()) {
+ Variant::Map x_declare = i->second.asMap();
+ flatten(x_declare, ARGUMENTS);
+ add(properties, x_declare);
+ node.erase(i);
+ }
+
+ if (properties.size() && !(isTemporary || createPolicy.size())) {
+ QPID_LOG(warning, "Properties will be ignored! " << address);
+ }
}
-bool AddressHelper::createEnabled(CheckMode mode) const
+void AddressHelper::checkAssertion(pn_terminus_t* terminus, CheckMode mode)
{
- return enabled(createPolicy, mode);
+ if (assertEnabled(mode)) {
+ QPID_LOG(debug, "checking assertions: " << capabilities);
+ //ensure all desired capabilities have been offerred
+ std::set<std::string> desired;
+ if (type.size()) desired.insert(type);
+ if (durableNode) desired.insert(DURABLE);
+ for (Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) {
+ desired.insert(i->asString());
+ }
+ pn_data_t* data = pn_terminus_capabilities(terminus);
+ while (pn_data_next(data)) {
+ pn_bytes_t c = pn_data_get_symbol(data);
+ std::string s(c.start, c.size);
+ desired.erase(s);
+ }
+
+ if (desired.size()) {
+ std::stringstream missing;
+ missing << "Desired capabilities not met: ";
+ bool first(true);
+ for (std::set<std::string>::const_iterator i = desired.begin(); i != desired.end(); ++i) {
+ if (first) first = false;
+ else missing << ", ";
+ missing << *i;
+ }
+ throw qpid::messaging::AssertionFailed(missing.str());
+ }
+ }
}
-bool AddressHelper::deleteEnabled(CheckMode mode) const
+
+bool AddressHelper::createEnabled(CheckMode mode) const
{
- return enabled(deletePolicy, mode);
+ return enabled(createPolicy, mode);
}
+
bool AddressHelper::assertEnabled(CheckMode mode) const
{
return enabled(assertPolicy, mode);
}
+
bool AddressHelper::enabled(const std::string& policy, CheckMode mode) const
{
bool result = false;
@@ -158,32 +272,53 @@ const qpid::types::Variant::Map& AddressHelper::getLinkProperties() const
return link;
}
-void AddressHelper::setNodeProperties(pn_terminus_t* terminus, bool dynamic)
+void AddressHelper::configure(pn_terminus_t* terminus, CheckMode mode)
{
- if (dynamic) {
- pn_terminus_set_address(terminus, DUMMY.c_str());//Workaround for proton bug
+ bool createOnDemand(false);
+ if (isTemporary) {
+ //application expects a name to be generated
+ pn_terminus_set_address(terminus, DUMMY.c_str());//workaround for PROTON-277
pn_terminus_set_dynamic(terminus, true);
+ setNodeProperties(terminus);
} else {
- pn_data_t* capabilities = pn_terminus_capabilities(terminus);
- if (!capabilities) {
- QPID_LOG(error, "!!!No capabilities!!!");
+ pn_terminus_set_address(terminus, name.c_str());
+ if (createEnabled(mode)) {
+ //application expects name of node to be as specified
+ setNodeProperties(terminus);
+ createOnDemand = true;
}
- pn_data_put_symbol(capabilities, convert(CREATE_ON_DEMAND));
}
+ setCapabilities(terminus, createOnDemand);
+}
- //properties for dynamically created node:
- if (node.size()) {
+void AddressHelper::setCapabilities(pn_terminus_t* terminus, bool create)
+{
+ pn_data_t* data = pn_terminus_capabilities(terminus);
+ if (create) pn_data_put_symbol(data, convert(CREATE_ON_DEMAND));
+ if (type.size()) pn_data_put_symbol(data, convert(type));
+ if (durableNode) pn_data_put_symbol(data, convert(DURABLE));
+ for (qpid::types::Variant::List::const_iterator i = capabilities.begin(); i != capabilities.end(); ++i) {
+ pn_data_put_symbol(data, convert(i->asString()));
+ }
+}
+
+void AddressHelper::setNodeProperties(pn_terminus_t* terminus)
+{
+ if (properties.size() || type.size()) {
pn_data_t* data = pn_terminus_properties(terminus);
pn_data_put_map(data);
pn_data_enter(data);
- for (qpid::types::Variant::Map::const_iterator i = node.begin(); i != node.end(); ++i) {
- if (i->first == TYPE) {
- pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES));
- pn_data_put_string(data, convert(i->second == TOPIC ? COPY : MOVE));
- } else {
- pn_data_put_symbol(data, convert(i->first));
- pn_data_put_string(data, convert(i->second.asString()));
- }
+ if (type.size()) {
+ pn_data_put_symbol(data, convert(SUPPORTED_DIST_MODES));
+ pn_data_put_string(data, convert(type == TOPIC ? COPY : MOVE));
+ }
+ if (durableNode) {
+ pn_data_put_symbol(data, convert(DURABLE));
+ pn_data_put_bool(data, true);
+ }
+ for (qpid::types::Variant::Map::const_iterator i = properties.begin(); i != properties.end(); ++i) {
+ pn_data_put_symbol(data, convert(i->first));
+ pn_data_put_string(data, convert(i->second.asString()));
}
pn_data_exit(data);
}
diff --git a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
index 2442619ed3..4dd441d461 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/AddressHelper.h
@@ -36,22 +36,29 @@ class AddressHelper
enum CheckMode {FOR_RECEIVER, FOR_SENDER};
AddressHelper(const Address& address);
- bool createEnabled(CheckMode mode) const;
- bool deleteEnabled(CheckMode mode) const;
- bool assertEnabled(CheckMode mode) const;
+ void configure(pn_terminus_t* terminus, CheckMode mode);
+ void checkAssertion(pn_terminus_t* terminus, CheckMode mode);
- void setNodeProperties(pn_terminus_t*, bool dynamic);
const qpid::types::Variant::Map& getNodeProperties() const;
const qpid::types::Variant::Map& getLinkProperties() const;
private:
+ bool isTemporary;
std::string createPolicy;
std::string assertPolicy;
std::string deletePolicy;
qpid::types::Variant::Map node;
qpid::types::Variant::Map link;
+ qpid::types::Variant::Map properties;
+ qpid::types::Variant::List capabilities;
std::string name;
+ std::string type;
+ bool durableNode;
bool enabled(const std::string& policy, CheckMode mode) const;
+ bool createEnabled(CheckMode mode) const;
+ bool assertEnabled(CheckMode mode) const;
+ void setCapabilities(pn_terminus_t* terminus, bool create);
+ void setNodeProperties(pn_terminus_t* terminus);
};
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index 9036031931..5f6fda81c5 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -299,6 +299,7 @@ void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::sha
lnk->address.setName(pn_terminus_get_address(t));
QPID_LOG(debug, "Dynamic target name set to " << lnk->address.getName());
}
+ lnk->verify(t);
QPID_LOG(debug, "Attach succeeded to " << lnk->getTarget());
}
@@ -316,6 +317,7 @@ void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::sha
lnk->address.setName(pn_terminus_get_address(s));
QPID_LOG(debug, "Dynamic source name set to " << lnk->address.getName());
}
+ lnk->verify(s);
QPID_LOG(debug, "Attach succeeded from " << lnk->getSource());
}
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
index f7b06ddc05..a495bc25c6 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
@@ -19,7 +19,6 @@
*
*/
#include "qpid/messaging/amqp/ReceiverContext.h"
-#include "qpid/messaging/amqp/AddressHelper.h"
#include "qpid/messaging/AddressImpl.h"
#include "qpid/messaging/Duration.h"
#include "qpid/messaging/Message.h"
@@ -36,6 +35,7 @@ namespace amqp {
ReceiverContext::ReceiverContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a)
: name(n),
address(a),
+ helper(address),
receiver(pn_receiver(session, name.c_str())),
capacity(0) {}
ReceiverContext::~ReceiverContext()
@@ -108,26 +108,17 @@ uint64_t getFilterDescriptor(const std::string& key)
return hasWildcards(key) ? qpid::amqp::filters::LEGACY_TOPIC_FILTER_CODE : qpid::amqp::filters::LEGACY_DIRECT_FILTER_CODE;
}
}
-
-void ReceiverContext::configure() const
+void ReceiverContext::verify(pn_terminus_t* source)
+{
+ helper.checkAssertion(source, AddressHelper::FOR_RECEIVER);
+}
+void ReceiverContext::configure()
{
configure(pn_link_source(receiver));
}
-void ReceiverContext::configure(pn_terminus_t* source) const
-{
- //dynamic create:
- AddressHelper helper(address);
- if (AddressImpl::isTemporary(address)) {
- //application expects a name to be generated
- QPID_LOG(debug, "source is dynamic");
- helper.setNodeProperties(source, true);
- } else {
- pn_terminus_set_address(source, address.getName().c_str());
- if (helper.createEnabled(AddressHelper::FOR_RECEIVER)) {
- //application expects name of node to be as specified
- helper.setNodeProperties(source, false);
- }
- }
+void ReceiverContext::configure(pn_terminus_t* source)
+{
+ helper.configure(source, AddressHelper::FOR_RECEIVER);
// Look specifically for qpid.selector link property and add a filter for it
qpid::types::Variant::Map::const_iterator i = helper.getLinkProperties().find("selector");
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
index 9c5386157b..79049d9263 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
@@ -22,6 +22,7 @@
*
*/
#include "qpid/messaging/Address.h"
+#include "qpid/messaging/amqp/AddressHelper.h"
#include <string>
#include "qpid/sys/IntegerTypes.h"
@@ -54,15 +55,17 @@ class ReceiverContext
const std::string& getName() const;
const std::string& getSource() const;
bool isClosed() const;
- void configure() const;
+ void configure();
+ void verify(pn_terminus_t*);
Address getAddress() const;
private:
friend class ConnectionContext;
const std::string name;
Address address;
+ AddressHelper helper;
pn_link_t* receiver;
uint32_t capacity;
- void configure(pn_terminus_t*) const;
+ void configure(pn_terminus_t*);
};
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
index fe74a4bca8..0b3de7e680 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
@@ -20,7 +20,6 @@
*/
#include "qpid/messaging/amqp/SenderContext.h"
#include "qpid/messaging/amqp/EncodedMessage.h"
-#include "qpid/messaging/amqp/AddressHelper.h"
#include "qpid/messaging/AddressImpl.h"
#include "qpid/amqp/descriptors.h"
#include "qpid/amqp/MessageEncoder.h"
@@ -41,6 +40,7 @@ namespace amqp {
SenderContext::SenderContext(pn_session_t* session, const std::string& n, const qpid::messaging::Address& a)
: name(n),
address(a),
+ helper(address),
sender(pn_sender(session, n.c_str())), capacity(1000) {}
SenderContext::~SenderContext()
@@ -342,23 +342,17 @@ void SenderContext::Delivery::settle()
{
pn_delivery_settle(token);
}
-void SenderContext::configure() const
+void SenderContext::verify(pn_terminus_t* target)
+{
+ helper.checkAssertion(target, AddressHelper::FOR_SENDER);
+}
+void SenderContext::configure()
{
configure(pn_link_target(sender));
}
-void SenderContext::configure(pn_terminus_t* target) const
+void SenderContext::configure(pn_terminus_t* target)
{
- AddressHelper helper(address);
- if (AddressImpl::isTemporary(address)) {
- //application expects a name to be generated
- helper.setNodeProperties(target, true);
- } else {
- pn_terminus_set_address(target, address.getName().c_str());
- if (helper.createEnabled(AddressHelper::FOR_SENDER)) {
- //application expects name of node to be as specified
- helper.setNodeProperties(target, false);
- }
- }
+ helper.configure(target, AddressHelper::FOR_SENDER);
}
bool SenderContext::settled()
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
index 2969e75a16..ba563af2dc 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
@@ -26,6 +26,7 @@
#include <vector>
#include "qpid/sys/IntegerTypes.h"
#include "qpid/messaging/Address.h"
+#include "qpid/messaging/amqp/AddressHelper.h"
#include "qpid/messaging/amqp/EncodedMessage.h"
struct pn_delivery_t;
@@ -69,7 +70,8 @@ class SenderContext
const std::string& getName() const;
const std::string& getTarget() const;
Delivery* send(const qpid::messaging::Message& message);
- void configure() const;
+ void configure();
+ void verify(pn_terminus_t*);
bool settled();
Address getAddress() const;
private:
@@ -78,13 +80,14 @@ class SenderContext
const std::string name;
qpid::messaging::Address address;
+ AddressHelper helper;
pn_link_t* sender;
int32_t nextId;
Deliveries deliveries;
uint32_t capacity;
uint32_t processUnsettled();
- void configure(pn_terminus_t*) const;
+ void configure(pn_terminus_t*);
};
}}} // namespace qpid::messaging::amqp