summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2013-08-28 14:56:46 +0000
committerGordon Sim <gsim@apache.org>2013-08-28 14:56:46 +0000
commit0b1ee9f4672f8c21d3622cd971b671bae3b10401 (patch)
tree4e3baec48a73e1f241dda1e81a70006ea714ef17 /qpid/cpp/src
parent01cb164d09b628206335c138eba796b3487c5ea0 (diff)
downloadqpid-python-0b1ee9f4672f8c21d3622cd971b671bae3b10401.tar.gz
QPID-4708: support for reconnect over AMQP 1.0
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1518233 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp358
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h26
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp18
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h3
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp50
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SenderContext.h7
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp11
-rw-r--r--qpid/cpp/src/qpid/messaging/amqp/SessionContext.h1
-rwxr-xr-xqpid/cpp/src/tests/interlink_tests.py67
9 files changed, 399 insertions, 142 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
index e42002aa0d..0d2640eb26 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
@@ -45,18 +45,17 @@ namespace qpid {
namespace messaging {
namespace amqp {
-ConnectionContext::ConnectionContext(const std::string& u, const qpid::types::Variant::Map& o)
+ConnectionContext::ConnectionContext(const std::string& url, const qpid::types::Variant::Map& o)
: qpid::messaging::ConnectionOptions(o),
- url(u, protocol.empty() ? qpid::Address::TCP : protocol),
engine(pn_transport()),
connection(pn_connection()),
//note: disabled read/write of header as now handled by engine
writeHeader(false),
readHeader(false),
haveOutput(false),
- state(DISCONNECTED),
- codecSwitch(*this)
+ state(DISCONNECTED)
{
+ urls.insert(urls.begin(), url);
if (pn_transport_bind(engine, connection)) {
//error
}
@@ -77,67 +76,6 @@ ConnectionContext::~ConnectionContext()
pn_connection_free(connection);
}
-namespace {
-const std::string COLON(":");
-}
-void ConnectionContext::open()
-{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
- if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!");
- if (!driver) driver = DriverImpl::getDefault();
- if (url.getUser().size()) username = url.getUser();
- if (url.getPass().size()) password = url.getPass();
-
- for (Url::const_iterator i = url.begin(); state != CONNECTED && i != url.end(); ++i) {
- transport = driver->getTransport(i->protocol, *this);
- std::stringstream port;
- port << i->port;
- id = i->host + COLON + port.str();
- if (useSasl()) {
- sasl = std::auto_ptr<Sasl>(new Sasl(id, *this, i->host));
- }
- state = CONNECTING;
- try {
- QPID_LOG(debug, id << " Connecting ...");
- transport->connect(i->host, port.str());
- } catch (const std::exception& e) {
- QPID_LOG(info, id << " Error while connecting: " << e.what());
- }
- while (state == CONNECTING) {
- lock.wait();
- }
- if (state == DISCONNECTED) {
- QPID_LOG(debug, id << " Failed to connect");
- transport = boost::shared_ptr<Transport>();
- } else {
- QPID_LOG(debug, id << " Connected");
- }
- }
-
- if (state != CONNECTED) throw qpid::messaging::TransportFailure(QPID_MSG("Could not connect to " << url));
-
- if (sasl.get()) {
- wakeupDriver();
- while (!sasl->authenticated()) {
- QPID_LOG(debug, id << " Waiting to be authenticated...");
- wait();
- }
- QPID_LOG(debug, id << " Authenticated");
- }
-
- QPID_LOG(debug, id << " Opening...");
- setProperties();
- pn_connection_open(connection);
- wakeupDriver(); //want to write
- while (pn_connection_state(connection) & PN_REMOTE_UNINIT) {
- wait();
- }
- if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) {
- throw qpid::messaging::ConnectionError("Failed to open connection");
- }
- QPID_LOG(debug, id << " Opened");
-}
-
bool ConnectionContext::isOpen() const
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
@@ -323,45 +261,26 @@ void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::sha
void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk)
{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
lnk->configure();
- attach(ssn->session, (pn_link_t*) lnk->sender);
- pn_terminus_t* t = pn_link_remote_target(lnk->sender);
- if (!pn_terminus_get_address(t)) {
- std::string msg("No such target : ");
- msg += lnk->getTarget();
- QPID_LOG(debug, msg);
- throw qpid::messaging::NotFound(msg);
- } else if (AddressImpl::isTemporary(lnk->address)) {
- lnk->address.setName(pn_terminus_get_address(t));
- QPID_LOG(debug, "Dynamic target name set to " << lnk->address.getName());
- }
- lnk->verify(t);
+ attach(lnk->sender);
+ lnk->verify();
checkClosed(ssn, lnk);
QPID_LOG(debug, "Attach succeeded to " << lnk->getTarget());
}
void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk)
{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
lnk->configure();
- attach(ssn->session, lnk->receiver, lnk->capacity);
- pn_terminus_t* s = pn_link_remote_source(lnk->receiver);
- if (!pn_terminus_get_address(s)) {
- std::string msg("No such source : ");
- msg += lnk->getSource();
- QPID_LOG(debug, msg);
- throw qpid::messaging::NotFound(msg);
- } else if (AddressImpl::isTemporary(lnk->address)) {
- lnk->address.setName(pn_terminus_get_address(s));
- QPID_LOG(debug, "Dynamic source name set to " << lnk->address.getName());
- }
- lnk->verify(s);
+ attach(lnk->receiver, lnk->capacity);
+ lnk->verify();
checkClosed(ssn, lnk);
QPID_LOG(debug, "Attach succeeded from " << lnk->getSource());
}
-void ConnectionContext::attach(pn_session_t* /*session*/, pn_link_t* link, int credit)
+void ConnectionContext::attach(pn_link_t* link, int credit)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
pn_link_open(link);
QPID_LOG(debug, "Link attach sent for " << link << ", state=" << pn_link_state(link));
if (credit) pn_link_flow(link, credit);
@@ -457,10 +376,32 @@ pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED;
pn_state_t IS_CLOSED = PN_LOCAL_CLOSED | PN_REMOTE_CLOSED;
}
+void ConnectionContext::reset()
+{
+ pn_transport_free(engine);
+ pn_connection_free(connection);
+
+ engine = pn_transport();
+ connection = pn_connection();
+ pn_connection_set_container(connection, identifier.c_str());
+ bool enableTrace(false);
+ QPID_LOG_TEST_CAT(trace, protocol, enableTrace);
+ if (enableTrace) pn_transport_trace(engine, PN_TRACE_FRM);
+ for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+ i->second->reset(connection);
+ }
+ pn_transport_bind(engine, connection);
+}
+
void ConnectionContext::check()
{
if (state == DISCONNECTED) {
- throw qpid::messaging::TransportFailure("Disconnected");
+ if (ConnectionOptions::reconnect) {
+ reset();
+ reconnect();
+ } else {
+ throw qpid::messaging::TransportFailure("Disconnected (reconnect disabled)");
+ }
}
if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
pn_connection_close(connection);
@@ -510,6 +451,7 @@ void ConnectionContext::waitUntil(boost::shared_ptr<SessionContext> ssn, boost::
}
void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn)
{
+ check();
if ((pn_session_state(ssn->session) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
pn_session_close(ssn->session);
throw qpid::messaging::SessionError("Session ended by peer");
@@ -543,6 +485,31 @@ void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, pn_li
throw qpid::messaging::LinkError("Link is not attached");
}
}
+
+void ConnectionContext::restartSession(boost::shared_ptr<SessionContext> s)
+{
+ pn_session_open(s->session);
+ wakeupDriver();
+ while (pn_session_state(s->session) & PN_REMOTE_UNINIT) {
+ wait();
+ }
+
+ for (SessionContext::SenderMap::iterator i = s->senders.begin(); i != s->senders.end(); ++i) {
+ QPID_LOG(debug, id << " reattaching sender " << i->first);
+ attach(i->second->sender);
+ i->second->verify();
+ QPID_LOG(debug, id << " sender " << i->first << " reattached");
+ i->second->resend();
+ }
+ for (SessionContext::ReceiverMap::iterator i = s->receivers.begin(); i != s->receivers.end(); ++i) {
+ QPID_LOG(debug, id << " reattaching receiver " << i->first);
+ attach(i->second->receiver, i->second->capacity);
+ i->second->verify();
+ QPID_LOG(debug, id << " receiver " << i->first << " reattached");
+ }
+ wakeupDriver();
+}
+
boost::shared_ptr<SessionContext> ConnectionContext::newSession(bool transactional, const std::string& n)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
@@ -585,7 +552,7 @@ std::string ConnectionContext::getAuthenticatedUsername()
return sasl.get() ? sasl->getAuthenticatedUsername() : std::string();
}
-std::size_t ConnectionContext::decode(const char* buffer, std::size_t size)
+std::size_t ConnectionContext::decodePlain(const char* buffer, std::size_t size)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
QPID_LOG(trace, id << " decode(" << size << ")");
@@ -615,7 +582,7 @@ std::size_t ConnectionContext::decode(const char* buffer, std::size_t size)
}
}
-std::size_t ConnectionContext::encode(char* buffer, std::size_t size)
+std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size)
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
QPID_LOG(trace, id << " encode(" << size << ")");
@@ -642,7 +609,7 @@ std::size_t ConnectionContext::encode(char* buffer, std::size_t size)
return 0;
}
}
-bool ConnectionContext::canEncode()
+bool ConnectionContext::canEncodePlain()
{
qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
return haveOutput && state == CONNECTED;
@@ -716,47 +683,46 @@ bool ConnectionContext::useSasl()
qpid::sys::Codec& ConnectionContext::getCodec()
{
- return codecSwitch;
+ return *this;
}
-ConnectionContext::CodecSwitch::CodecSwitch(ConnectionContext& p) : parent(p) {}
-std::size_t ConnectionContext::CodecSwitch::decode(const char* buffer, std::size_t size)
+std::size_t ConnectionContext::decode(const char* buffer, std::size_t size)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock);
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
size_t decoded = 0;
- if (parent.sasl.get() && !parent.sasl->authenticated()) {
- decoded = parent.sasl->decode(buffer, size);
- if (!parent.sasl->authenticated()) return decoded;
+ if (sasl.get() && !sasl->authenticated()) {
+ decoded = sasl->decode(buffer, size);
+ if (!sasl->authenticated()) return decoded;
}
if (decoded < size) {
- if (parent.sasl.get() && parent.sasl->getSecurityLayer()) decoded += parent.sasl->getSecurityLayer()->decode(buffer+decoded, size-decoded);
- else decoded += parent.decode(buffer+decoded, size-decoded);
+ if (sasl.get() && sasl->getSecurityLayer()) decoded += sasl->getSecurityLayer()->decode(buffer+decoded, size-decoded);
+ else decoded += decodePlain(buffer+decoded, size-decoded);
}
return decoded;
}
-std::size_t ConnectionContext::CodecSwitch::encode(char* buffer, std::size_t size)
+std::size_t ConnectionContext::encode(char* buffer, std::size_t size)
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock);
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
size_t encoded = 0;
- if (parent.sasl.get() && parent.sasl->canEncode()) {
- encoded += parent.sasl->encode(buffer, size);
- if (!parent.sasl->authenticated()) return encoded;
+ if (sasl.get() && sasl->canEncode()) {
+ encoded += sasl->encode(buffer, size);
+ if (!sasl->authenticated()) return encoded;
}
if (encoded < size) {
- if (parent.sasl.get() && parent.sasl->getSecurityLayer()) encoded += parent.sasl->getSecurityLayer()->encode(buffer+encoded, size-encoded);
- else encoded += parent.encode(buffer+encoded, size-encoded);
+ if (sasl.get() && sasl->getSecurityLayer()) encoded += sasl->getSecurityLayer()->encode(buffer+encoded, size-encoded);
+ else encoded += encodePlain(buffer+encoded, size-encoded);
}
return encoded;
}
-bool ConnectionContext::CodecSwitch::canEncode()
+bool ConnectionContext::canEncode()
{
- qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock);
- if (parent.sasl.get()) {
- if (parent.sasl->canEncode()) return true;
- else if (!parent.sasl->authenticated()) return false;
- else if (parent.sasl->getSecurityLayer()) return parent.sasl->getSecurityLayer()->canEncode();
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ if (sasl.get()) {
+ if (sasl->canEncode()) return true;
+ else if (!sasl->authenticated()) return false;
+ else if (sasl->getSecurityLayer()) return sasl->getSecurityLayer()->canEncode();
}
- return parent.canEncode();
+ return canEncodePlain();
}
namespace {
@@ -794,4 +760,160 @@ const qpid::sys::SecuritySettings* ConnectionContext::getTransportSecuritySettin
return transport ? transport->getSecuritySettings() : 0;
}
+void ConnectionContext::open()
+{
+ qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock);
+ if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!");
+ if (!driver) driver = DriverImpl::getDefault();
+
+ tryConnect();
+}
+
+
+namespace {
+std::string asString(const std::vector<std::string>& v) {
+ std::stringstream os;
+ os << "[";
+ for(std::vector<std::string>::const_iterator i = v.begin(); i != v.end(); ++i ) {
+ if (i != v.begin()) os << ", ";
+ os << *i;
+ }
+ os << "]";
+ return os.str();
+}
+double FOREVER(std::numeric_limits<double>::max());
+bool expired(const sys::AbsTime& start, double timeout)
+{
+ if (timeout == 0) return true;
+ if (timeout == FOREVER) return false;
+ qpid::sys::Duration used(start, qpid::sys::now());
+ qpid::sys::Duration allowed((int64_t)(timeout*qpid::sys::TIME_SEC));
+ return allowed < used;
+}
+const std::string COLON(":");
+}
+
+void ConnectionContext::reconnect()
+{
+ qpid::sys::AbsTime started(qpid::sys::now());
+ QPID_LOG(debug, "Starting connection, urls=" << asString(urls));
+ for (double i = minReconnectInterval; !tryConnect(); i = std::min(i*2, maxReconnectInterval)) {
+ if (!ConnectionOptions::reconnect) {
+ throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)");
+ }
+ if (limit >= 0 && retries++ >= limit) {
+ throw qpid::messaging::TransportFailure("Failed to connect within reconnect limit");
+ }
+ if (expired(started, timeout)) {
+ throw qpid::messaging::TransportFailure("Failed to connect within reconnect timeout");
+ }
+ QPID_LOG(debug, "Connection retry in " << i*1000*1000 << " microseconds, urls="
+ << asString(urls));
+ qpid::sys::usleep(int64_t(i*1000*1000)); // Sleep in microseconds.
+ }
+ retries = 0;
+}
+
+bool ConnectionContext::tryConnect()
+{
+ for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) {
+ try {
+ QPID_LOG(info, "Trying to connect to " << *i << "...");
+ if (tryConnect(qpid::Url(*i, protocol.empty() ? qpid::Address::TCP : protocol))) {
+ QPID_LOG(info, "Connected to " << *i);
+ if (sasl.get()) {
+ wakeupDriver();
+ while (!sasl->authenticated()) {
+ QPID_LOG(debug, id << " Waiting to be authenticated...");
+ wait();
+ }
+ QPID_LOG(debug, id << " Authenticated");
+ }
+
+ QPID_LOG(debug, id << " Opening...");
+ setProperties();
+ pn_connection_open(connection);
+ wakeupDriver(); //want to write
+ while (pn_connection_state(connection) & PN_REMOTE_UNINIT) {
+ wait();
+ }
+ if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) {
+ throw qpid::messaging::ConnectionError("Failed to open connection");
+ }
+ QPID_LOG(debug, id << " Opened");
+
+ return restartSessions();
+ }
+ } catch (const qpid::messaging::TransportFailure& e) {
+ QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
+ }
+ }
+ return false;
+}
+
+bool ConnectionContext::tryConnect(const std::string& url)
+{
+ return tryConnect(qpid::Url(url, protocol.empty() ? qpid::Address::TCP : protocol));
+}
+
+bool ConnectionContext::tryConnect(const Url& url)
+{
+ if (url.getUser().size()) username = url.getUser();
+ if (url.getPass().size()) password = url.getPass();
+
+ for (Url::const_iterator i = url.begin(); i != url.end(); ++i) {
+ if (tryConnect(*i)) return true;
+ }
+ return false;
+}
+
+bool ConnectionContext::tryConnect(const qpid::Address& address)
+{
+ transport = driver->getTransport(address.protocol, *this);
+ std::stringstream port;
+ port << address.port;
+ id = address.host + COLON + port.str();
+ if (useSasl()) {
+ sasl = std::auto_ptr<Sasl>(new Sasl(id, *this, address.host));
+ }
+ state = CONNECTING;
+ try {
+ QPID_LOG(debug, id << " Connecting ...");
+ transport->connect(address.host, port.str());
+ bool waiting(true);
+ while (waiting) {
+ switch (state) {
+ case CONNECTED:
+ QPID_LOG(debug, id << " Connected");
+ return true;
+ case CONNECTING:
+ lock.wait();
+ break;
+ case DISCONNECTED:
+ waiting = false;
+ QPID_LOG(debug, id << " Failed to connect");
+ break;
+ }
+ }
+ } catch (const std::exception& e) {
+ QPID_LOG(info, id << " Error while connecting: " << e.what());
+ }
+ transport = boost::shared_ptr<Transport>();
+ return false;
+}
+
+bool ConnectionContext::restartSessions()
+{
+ try {
+ for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+ restartSession(i->second);
+ }
+ return true;
+ } catch (const qpid::TransportFailure& e) {
+ QPID_LOG(debug, "Connection Failed to re-initialize sessions: " << e.what());
+ return false;
+ }
+}
+
+
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
index 35948b65cf..3b80f71f1d 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
@@ -110,7 +110,6 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
private:
typedef std::map<std::string, boost::shared_ptr<SessionContext> > SessionMap;
- qpid::Url url;
boost::shared_ptr<DriverImpl> driver;
boost::shared_ptr<Transport> transport;
@@ -129,17 +128,6 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
CONNECTED
} state;
std::auto_ptr<Sasl> sasl;
- class CodecSwitch : public qpid::sys::Codec
- {
- public:
- CodecSwitch(ConnectionContext&);
- std::size_t decode(const char* buffer, std::size_t size);
- std::size_t encode(char* buffer, std::size_t size);
- bool canEncode();
- private:
- ConnectionContext& parent;
- };
- CodecSwitch codecSwitch;
void check();
void wait();
@@ -155,7 +143,19 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag
void checkClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>);
void checkClosed(boost::shared_ptr<SessionContext>, pn_link_t*);
void wakeupDriver();
- void attach(pn_session_t*, pn_link_t*, int credit=0);
+ void attach(pn_link_t*, int credit=0);
+ void reconnect();
+ bool tryConnect();
+ bool tryConnect(const std::string& url);
+ bool tryConnect(const qpid::Url& url);
+ bool tryConnect(const qpid::Address& address);
+ void reset();
+ bool restartSessions();
+ void restartSession(boost::shared_ptr<SessionContext>);
+
+ std::size_t decodePlain(const char* buffer, std::size_t size);
+ std::size_t encodePlain(char* buffer, std::size_t size);
+ bool canEncodePlain();
std::size_t readProtocolHeader(const char* buffer, std::size_t size);
std::size_t writeProtocolHeader(char* buffer, std::size_t size);
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
index 661856122d..473c120e27 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp
@@ -89,8 +89,18 @@ const std::string& ReceiverContext::getSource() const
{
return address.getName();
}
-void ReceiverContext::verify(pn_terminus_t* source)
+void ReceiverContext::verify()
{
+ pn_terminus_t* source = pn_link_remote_source(receiver);
+ if (!pn_terminus_get_address(source)) {
+ std::string msg("No such source : ");
+ msg += getSource();
+ QPID_LOG(debug, msg);
+ throw qpid::messaging::NotFound(msg);
+ } else if (AddressImpl::isTemporary(address)) {
+ address.setName(pn_terminus_get_address(source));
+ QPID_LOG(debug, "Dynamic source name set to " << address.getName());
+ }
helper.checkAssertion(source, AddressHelper::FOR_RECEIVER);
}
void ReceiverContext::configure()
@@ -118,6 +128,10 @@ bool ReceiverContext::isClosed() const
return false;//TODO
}
-
+void ReceiverContext::reset(pn_session_t* session)
+{
+ receiver = pn_receiver(session, name.c_str());
+ configure();
+}
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
index 79049d9263..fb8ea0d336 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h
@@ -46,6 +46,7 @@ class ReceiverContext
public:
ReceiverContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& source);
~ReceiverContext();
+ void reset(pn_session_t* session);
void setCapacity(uint32_t);
uint32_t getCapacity();
uint32_t getAvailable();
@@ -56,7 +57,7 @@ class ReceiverContext
const std::string& getSource() const;
bool isClosed() const;
void configure();
- void verify(pn_terminus_t*);
+ void verify();
Address getAddress() const;
private:
friend class ConnectionContext;
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
index 1926afcb27..c3a9107333 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp
@@ -82,6 +82,7 @@ const std::string& SenderContext::getTarget() const
bool SenderContext::send(const qpid::messaging::Message& message, SenderContext::Delivery** out)
{
+ resend();//if there are any messages needing to be resent at the front of the queue, send them first
if (processUnsettled(false) < capacity && pn_link_credit(sender)) {
if (unreliable) {
Delivery delivery(nextId++);
@@ -424,7 +425,12 @@ bool changedSubject(const qpid::messaging::MessageImpl& msg, const qpid::messagi
}
-SenderContext::Delivery::Delivery(int32_t i) : id(i), token(0) {}
+SenderContext::Delivery::Delivery(int32_t i) : id(i), token(0), presettled(false) {}
+
+void SenderContext::Delivery::reset()
+{
+ token = 0;
+}
void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address)
{
@@ -490,13 +496,20 @@ void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable)
tag.bytes = reinterpret_cast<const char*>(&id);
token = pn_delivery(sender, tag);
pn_link_send(sender, encoded.getData(), encoded.getSize());
- if (unreliable) pn_delivery_settle(token);
+ if (unreliable) {
+ pn_delivery_settle(token);
+ presettled = true;
+ }
pn_link_advance(sender);
}
+bool SenderContext::Delivery::sent() const
+{
+ return presettled || token;
+}
bool SenderContext::Delivery::delivered()
{
- if (pn_delivery_remote_state(token) || pn_delivery_settled(token)) {
+ if (presettled || (token && (pn_delivery_remote_state(token) || pn_delivery_settled(token)))) {
//TODO: need a better means for signalling outcomes other than accepted
if (rejected()) {
QPID_LOG(warning, "delivery " << id << " was rejected by peer");
@@ -520,8 +533,19 @@ void SenderContext::Delivery::settle()
{
pn_delivery_settle(token);
}
-void SenderContext::verify(pn_terminus_t* target)
+void SenderContext::verify()
{
+ pn_terminus_t* target = pn_link_remote_target(sender);
+ if (!pn_terminus_get_address(target)) {
+ std::string msg("No such target : ");
+ msg += getTarget();
+ QPID_LOG(debug, msg);
+ throw qpid::messaging::NotFound(msg);
+ } else if (AddressImpl::isTemporary(address)) {
+ address.setName(pn_terminus_get_address(target));
+ QPID_LOG(debug, "Dynamic target name set to " << address.getName());
+ }
+
helper.checkAssertion(target, AddressHelper::FOR_SENDER);
}
void SenderContext::configure()
@@ -549,4 +573,22 @@ Address SenderContext::getAddress() const
return address;
}
+
+void SenderContext::reset(pn_session_t* session)
+{
+ sender = pn_sender(session, name.c_str());
+ configure();
+
+ for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end(); ++i) {
+ i->reset();
+ }
+}
+
+void SenderContext::resend()
+{
+ for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end() && pn_link_credit(sender) && !i->sent(); ++i) {
+ i->send(sender, false/*only resend reliable transfers*/);
+ }
+}
+
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
index fcdfbbcf96..27ffa1e81b 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h
@@ -57,14 +57,18 @@ class SenderContext
bool accepted();
bool rejected();
void settle();
+ void reset();
+ bool sent() const;
private:
int32_t id;
pn_delivery_t* token;
EncodedMessage encoded;
+ bool presettled;
};
SenderContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& target);
~SenderContext();
+ void reset(pn_session_t* session);
void close();
void setCapacity(uint32_t);
uint32_t getCapacity();
@@ -73,7 +77,7 @@ class SenderContext
const std::string& getTarget() const;
bool send(const qpid::messaging::Message& message, Delivery**);
void configure();
- void verify(pn_terminus_t*);
+ void verify();
void check();
bool settled();
Address getAddress() const;
@@ -92,6 +96,7 @@ class SenderContext
uint32_t processUnsettled(bool silent);
void configure(pn_terminus_t*);
+ void resend();
};
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
index 8170bc74b8..8f2a7d15d8 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp
@@ -163,4 +163,15 @@ std::string SessionContext::getName() const
return name;
}
+void SessionContext::reset(pn_connection_t* connection)
+{
+ session = pn_session(connection);
+ unacked.clear();
+ for (SessionContext::SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) {
+ i->second->reset(session);
+ }
+ for (SessionContext::ReceiverMap::iterator i = receivers.begin(); i != receivers.end(); ++i) {
+ i->second->reset(session);
+ }
+}
}}} // namespace qpid::messaging::amqp
diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
index 7a0e0fb23e..5d68f6d8de 100644
--- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
+++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h
@@ -50,6 +50,7 @@ class SessionContext
public:
SessionContext(pn_connection_t*);
~SessionContext();
+ void reset(pn_connection_t*);
boost::shared_ptr<SenderContext> createSender(const qpid::messaging::Address& address);
boost::shared_ptr<ReceiverContext> createReceiver(const qpid::messaging::Address& address);
boost::shared_ptr<SenderContext> getSender(const std::string& name) const;
diff --git a/qpid/cpp/src/tests/interlink_tests.py b/qpid/cpp/src/tests/interlink_tests.py
index 0d2b757152..1724607533 100755
--- a/qpid/cpp/src/tests/interlink_tests.py
+++ b/qpid/cpp/src/tests/interlink_tests.py
@@ -22,6 +22,7 @@ import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil
import traceback
from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
from brokertest import *
+from ha_test import HaPort
from threading import Thread, Lock, Condition
from logging import getLogger, WARN, ERROR, DEBUG, INFO
from qpidtoollibs import BrokerAgent, BrokerObject
@@ -46,7 +47,8 @@ class AmqpBrokerTest(BrokerTest):
def setUp(self):
BrokerTest.setUp(self)
os.putenv("QPID_LOAD_MODULE", BrokerTest.amqpc_lib)
- self.broker = self.amqp_broker()
+ self.port_holder = HaPort(self)
+ self.broker = self.amqp_broker(port_holder=self.port_holder)
self.default_config = Config(self.broker)
self.agent = BrokerAgent(self.broker.connect())
@@ -252,14 +254,73 @@ class AmqpBrokerTest(BrokerTest):
#send to q on broker B through brokerA
self.send_and_receive(send_config=Config(self.broker, address="q@BrokerB"), recv_config=Config(brokerB))
+ def test_reconnect(self):
+ receiver_cmd = ["qpid-receive",
+ "--broker", self.broker.host_port(),
+ "--address=amq.fanout",
+ "--connection-options={protocol:amqp1.0, reconnect:True,container_id:receiver}",
+ "--timeout=10", "--print-content=true", "--print-headers=false"
+ ]
+ receiver = self.popen(receiver_cmd, stdout=PIPE)
+
+ sender_cmd = ["qpid-send",
+ "--broker", self.broker.host_port(),
+ "--address=amq.fanout",
+ "--connection-options={protocol:amqp1.0,reconnect:True,container_id:sender}",
+ "--content-stdin", "--send-eos=1"
+ ]
+ sender = self.popen(sender_cmd, stdin=PIPE)
+ sender._set_cloexec_flag(sender.stdin) #required for older python, see http://bugs.python.org/issue4112
+
+
+ batch1 = ["message-%s" % (i+1) for i in range(10000)]
+ for m in batch1:
+ sender.stdin.write(m + "\n")
+ sender.stdin.flush()
+
+ self.broker.kill()
+ self.broker = self.amqp_broker(port_holder=self.port_holder)
+
+ batch2 = ["message-%s" % (i+1) for i in range(10000, 20000)]
+ for m in batch2:
+ sender.stdin.write(m + "\n")
+ sender.stdin.flush()
+
+ sender.stdin.close()
+
+ last = None
+ m = receiver.stdout.readline().rstrip()
+ while len(m):
+ last = m
+ m = receiver.stdout.readline().rstrip()
+ assert last == "message-20000", (last)
+
""" Create and return a broker with AMQP 1.0 support """
def amqp_broker(self):
assert BrokerTest.amqp_lib, "Cannot locate AMQP 1.0 plug-in"
+ self.port_holder = HaPort(self) #reserve port
args = ["--load-module", BrokerTest.amqp_lib,
- "--max-negotiate-time=600000",
+ "--socket-fd=%s" % self.port_holder.fileno,
+ "--listen-disable=tcp",
"--log-enable=trace+:Protocol",
"--log-enable=info+"]
- return BrokerTest.broker(self, args)
+ return BrokerTest.broker(self, args, port=self.port_holder.port)
+
+ def amqp_broker(self, port_holder=None):
+ assert BrokerTest.amqp_lib, "Cannot locate AMQP 1.0 plug-in"
+ if port_holder:
+ args = ["--load-module", BrokerTest.amqp_lib,
+ "--socket-fd=%s" % port_holder.fileno,
+ "--listen-disable=tcp",
+ "--log-enable=trace+:Protocol",
+ "--log-enable=info+"]
+ return BrokerTest.broker(self, args, port=port_holder.port)
+ else:
+ args = ["--load-module", BrokerTest.amqp_lib,
+ "--log-enable=trace+:Protocol",
+ "--log-enable=info+"]
+ return BrokerTest.broker(self, args)
+
if __name__ == "__main__":
shutil.rmtree("brokertest.tmp", True)