diff options
| author | Gordon Sim <gsim@apache.org> | 2013-08-28 14:56:46 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2013-08-28 14:56:46 +0000 |
| commit | 0b1ee9f4672f8c21d3622cd971b671bae3b10401 (patch) | |
| tree | 4e3baec48a73e1f241dda1e81a70006ea714ef17 /qpid/cpp/src | |
| parent | 01cb164d09b628206335c138eba796b3487c5ea0 (diff) | |
| download | qpid-python-0b1ee9f4672f8c21d3622cd971b671bae3b10401.tar.gz | |
QPID-4708: support for reconnect over AMQP 1.0
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1518233 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp | 358 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h | 26 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp | 18 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp | 50 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/SenderContext.h | 7 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp | 11 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/messaging/amqp/SessionContext.h | 1 | ||||
| -rwxr-xr-x | qpid/cpp/src/tests/interlink_tests.py | 67 |
9 files changed, 399 insertions, 142 deletions
diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp index e42002aa0d..0d2640eb26 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp @@ -45,18 +45,17 @@ namespace qpid { namespace messaging { namespace amqp { -ConnectionContext::ConnectionContext(const std::string& u, const qpid::types::Variant::Map& o) +ConnectionContext::ConnectionContext(const std::string& url, const qpid::types::Variant::Map& o) : qpid::messaging::ConnectionOptions(o), - url(u, protocol.empty() ? qpid::Address::TCP : protocol), engine(pn_transport()), connection(pn_connection()), //note: disabled read/write of header as now handled by engine writeHeader(false), readHeader(false), haveOutput(false), - state(DISCONNECTED), - codecSwitch(*this) + state(DISCONNECTED) { + urls.insert(urls.begin(), url); if (pn_transport_bind(engine, connection)) { //error } @@ -77,67 +76,6 @@ ConnectionContext::~ConnectionContext() pn_connection_free(connection); } -namespace { -const std::string COLON(":"); -} -void ConnectionContext::open() -{ - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); - if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!"); - if (!driver) driver = DriverImpl::getDefault(); - if (url.getUser().size()) username = url.getUser(); - if (url.getPass().size()) password = url.getPass(); - - for (Url::const_iterator i = url.begin(); state != CONNECTED && i != url.end(); ++i) { - transport = driver->getTransport(i->protocol, *this); - std::stringstream port; - port << i->port; - id = i->host + COLON + port.str(); - if (useSasl()) { - sasl = std::auto_ptr<Sasl>(new Sasl(id, *this, i->host)); - } - state = CONNECTING; - try { - QPID_LOG(debug, id << " Connecting ..."); - transport->connect(i->host, port.str()); - } catch (const std::exception& e) { - QPID_LOG(info, id << " Error while connecting: " << e.what()); - } - while (state == CONNECTING) { - lock.wait(); - } - if (state == DISCONNECTED) { - QPID_LOG(debug, id << " Failed to connect"); - transport = boost::shared_ptr<Transport>(); - } else { - QPID_LOG(debug, id << " Connected"); - } - } - - if (state != CONNECTED) throw qpid::messaging::TransportFailure(QPID_MSG("Could not connect to " << url)); - - if (sasl.get()) { - wakeupDriver(); - while (!sasl->authenticated()) { - QPID_LOG(debug, id << " Waiting to be authenticated..."); - wait(); - } - QPID_LOG(debug, id << " Authenticated"); - } - - QPID_LOG(debug, id << " Opening..."); - setProperties(); - pn_connection_open(connection); - wakeupDriver(); //want to write - while (pn_connection_state(connection) & PN_REMOTE_UNINIT) { - wait(); - } - if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) { - throw qpid::messaging::ConnectionError("Failed to open connection"); - } - QPID_LOG(debug, id << " Opened"); -} - bool ConnectionContext::isOpen() const { qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); @@ -323,45 +261,26 @@ void ConnectionContext::detach(boost::shared_ptr<SessionContext> ssn, boost::sha void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<SenderContext> lnk) { + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); lnk->configure(); - attach(ssn->session, (pn_link_t*) lnk->sender); - pn_terminus_t* t = pn_link_remote_target(lnk->sender); - if (!pn_terminus_get_address(t)) { - std::string msg("No such target : "); - msg += lnk->getTarget(); - QPID_LOG(debug, msg); - throw qpid::messaging::NotFound(msg); - } else if (AddressImpl::isTemporary(lnk->address)) { - lnk->address.setName(pn_terminus_get_address(t)); - QPID_LOG(debug, "Dynamic target name set to " << lnk->address.getName()); - } - lnk->verify(t); + attach(lnk->sender); + lnk->verify(); checkClosed(ssn, lnk); QPID_LOG(debug, "Attach succeeded to " << lnk->getTarget()); } void ConnectionContext::attach(boost::shared_ptr<SessionContext> ssn, boost::shared_ptr<ReceiverContext> lnk) { + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); lnk->configure(); - attach(ssn->session, lnk->receiver, lnk->capacity); - pn_terminus_t* s = pn_link_remote_source(lnk->receiver); - if (!pn_terminus_get_address(s)) { - std::string msg("No such source : "); - msg += lnk->getSource(); - QPID_LOG(debug, msg); - throw qpid::messaging::NotFound(msg); - } else if (AddressImpl::isTemporary(lnk->address)) { - lnk->address.setName(pn_terminus_get_address(s)); - QPID_LOG(debug, "Dynamic source name set to " << lnk->address.getName()); - } - lnk->verify(s); + attach(lnk->receiver, lnk->capacity); + lnk->verify(); checkClosed(ssn, lnk); QPID_LOG(debug, "Attach succeeded from " << lnk->getSource()); } -void ConnectionContext::attach(pn_session_t* /*session*/, pn_link_t* link, int credit) +void ConnectionContext::attach(pn_link_t* link, int credit) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); pn_link_open(link); QPID_LOG(debug, "Link attach sent for " << link << ", state=" << pn_link_state(link)); if (credit) pn_link_flow(link, credit); @@ -457,10 +376,32 @@ pn_state_t REQUIRES_CLOSE = PN_LOCAL_ACTIVE | PN_REMOTE_CLOSED; pn_state_t IS_CLOSED = PN_LOCAL_CLOSED | PN_REMOTE_CLOSED; } +void ConnectionContext::reset() +{ + pn_transport_free(engine); + pn_connection_free(connection); + + engine = pn_transport(); + connection = pn_connection(); + pn_connection_set_container(connection, identifier.c_str()); + bool enableTrace(false); + QPID_LOG_TEST_CAT(trace, protocol, enableTrace); + if (enableTrace) pn_transport_trace(engine, PN_TRACE_FRM); + for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { + i->second->reset(connection); + } + pn_transport_bind(engine, connection); +} + void ConnectionContext::check() { if (state == DISCONNECTED) { - throw qpid::messaging::TransportFailure("Disconnected"); + if (ConnectionOptions::reconnect) { + reset(); + reconnect(); + } else { + throw qpid::messaging::TransportFailure("Disconnected (reconnect disabled)"); + } } if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { pn_connection_close(connection); @@ -510,6 +451,7 @@ void ConnectionContext::waitUntil(boost::shared_ptr<SessionContext> ssn, boost:: } void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn) { + check(); if ((pn_session_state(ssn->session) & REQUIRES_CLOSE) == REQUIRES_CLOSE) { pn_session_close(ssn->session); throw qpid::messaging::SessionError("Session ended by peer"); @@ -543,6 +485,31 @@ void ConnectionContext::checkClosed(boost::shared_ptr<SessionContext> ssn, pn_li throw qpid::messaging::LinkError("Link is not attached"); } } + +void ConnectionContext::restartSession(boost::shared_ptr<SessionContext> s) +{ + pn_session_open(s->session); + wakeupDriver(); + while (pn_session_state(s->session) & PN_REMOTE_UNINIT) { + wait(); + } + + for (SessionContext::SenderMap::iterator i = s->senders.begin(); i != s->senders.end(); ++i) { + QPID_LOG(debug, id << " reattaching sender " << i->first); + attach(i->second->sender); + i->second->verify(); + QPID_LOG(debug, id << " sender " << i->first << " reattached"); + i->second->resend(); + } + for (SessionContext::ReceiverMap::iterator i = s->receivers.begin(); i != s->receivers.end(); ++i) { + QPID_LOG(debug, id << " reattaching receiver " << i->first); + attach(i->second->receiver, i->second->capacity); + i->second->verify(); + QPID_LOG(debug, id << " receiver " << i->first << " reattached"); + } + wakeupDriver(); +} + boost::shared_ptr<SessionContext> ConnectionContext::newSession(bool transactional, const std::string& n) { qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); @@ -585,7 +552,7 @@ std::string ConnectionContext::getAuthenticatedUsername() return sasl.get() ? sasl->getAuthenticatedUsername() : std::string(); } -std::size_t ConnectionContext::decode(const char* buffer, std::size_t size) +std::size_t ConnectionContext::decodePlain(const char* buffer, std::size_t size) { qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); QPID_LOG(trace, id << " decode(" << size << ")"); @@ -615,7 +582,7 @@ std::size_t ConnectionContext::decode(const char* buffer, std::size_t size) } } -std::size_t ConnectionContext::encode(char* buffer, std::size_t size) +std::size_t ConnectionContext::encodePlain(char* buffer, std::size_t size) { qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); QPID_LOG(trace, id << " encode(" << size << ")"); @@ -642,7 +609,7 @@ std::size_t ConnectionContext::encode(char* buffer, std::size_t size) return 0; } } -bool ConnectionContext::canEncode() +bool ConnectionContext::canEncodePlain() { qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); return haveOutput && state == CONNECTED; @@ -716,47 +683,46 @@ bool ConnectionContext::useSasl() qpid::sys::Codec& ConnectionContext::getCodec() { - return codecSwitch; + return *this; } -ConnectionContext::CodecSwitch::CodecSwitch(ConnectionContext& p) : parent(p) {} -std::size_t ConnectionContext::CodecSwitch::decode(const char* buffer, std::size_t size) +std::size_t ConnectionContext::decode(const char* buffer, std::size_t size) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock); + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); size_t decoded = 0; - if (parent.sasl.get() && !parent.sasl->authenticated()) { - decoded = parent.sasl->decode(buffer, size); - if (!parent.sasl->authenticated()) return decoded; + if (sasl.get() && !sasl->authenticated()) { + decoded = sasl->decode(buffer, size); + if (!sasl->authenticated()) return decoded; } if (decoded < size) { - if (parent.sasl.get() && parent.sasl->getSecurityLayer()) decoded += parent.sasl->getSecurityLayer()->decode(buffer+decoded, size-decoded); - else decoded += parent.decode(buffer+decoded, size-decoded); + if (sasl.get() && sasl->getSecurityLayer()) decoded += sasl->getSecurityLayer()->decode(buffer+decoded, size-decoded); + else decoded += decodePlain(buffer+decoded, size-decoded); } return decoded; } -std::size_t ConnectionContext::CodecSwitch::encode(char* buffer, std::size_t size) +std::size_t ConnectionContext::encode(char* buffer, std::size_t size) { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock); + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); size_t encoded = 0; - if (parent.sasl.get() && parent.sasl->canEncode()) { - encoded += parent.sasl->encode(buffer, size); - if (!parent.sasl->authenticated()) return encoded; + if (sasl.get() && sasl->canEncode()) { + encoded += sasl->encode(buffer, size); + if (!sasl->authenticated()) return encoded; } if (encoded < size) { - if (parent.sasl.get() && parent.sasl->getSecurityLayer()) encoded += parent.sasl->getSecurityLayer()->encode(buffer+encoded, size-encoded); - else encoded += parent.encode(buffer+encoded, size-encoded); + if (sasl.get() && sasl->getSecurityLayer()) encoded += sasl->getSecurityLayer()->encode(buffer+encoded, size-encoded); + else encoded += encodePlain(buffer+encoded, size-encoded); } return encoded; } -bool ConnectionContext::CodecSwitch::canEncode() +bool ConnectionContext::canEncode() { - qpid::sys::ScopedLock<qpid::sys::Monitor> l(parent.lock); - if (parent.sasl.get()) { - if (parent.sasl->canEncode()) return true; - else if (!parent.sasl->authenticated()) return false; - else if (parent.sasl->getSecurityLayer()) return parent.sasl->getSecurityLayer()->canEncode(); + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + if (sasl.get()) { + if (sasl->canEncode()) return true; + else if (!sasl->authenticated()) return false; + else if (sasl->getSecurityLayer()) return sasl->getSecurityLayer()->canEncode(); } - return parent.canEncode(); + return canEncodePlain(); } namespace { @@ -794,4 +760,160 @@ const qpid::sys::SecuritySettings* ConnectionContext::getTransportSecuritySettin return transport ? transport->getSecuritySettings() : 0; } +void ConnectionContext::open() +{ + qpid::sys::ScopedLock<qpid::sys::Monitor> l(lock); + if (state != DISCONNECTED) throw qpid::messaging::ConnectionError("Connection was already opened!"); + if (!driver) driver = DriverImpl::getDefault(); + + tryConnect(); +} + + +namespace { +std::string asString(const std::vector<std::string>& v) { + std::stringstream os; + os << "["; + for(std::vector<std::string>::const_iterator i = v.begin(); i != v.end(); ++i ) { + if (i != v.begin()) os << ", "; + os << *i; + } + os << "]"; + return os.str(); +} +double FOREVER(std::numeric_limits<double>::max()); +bool expired(const sys::AbsTime& start, double timeout) +{ + if (timeout == 0) return true; + if (timeout == FOREVER) return false; + qpid::sys::Duration used(start, qpid::sys::now()); + qpid::sys::Duration allowed((int64_t)(timeout*qpid::sys::TIME_SEC)); + return allowed < used; +} +const std::string COLON(":"); +} + +void ConnectionContext::reconnect() +{ + qpid::sys::AbsTime started(qpid::sys::now()); + QPID_LOG(debug, "Starting connection, urls=" << asString(urls)); + for (double i = minReconnectInterval; !tryConnect(); i = std::min(i*2, maxReconnectInterval)) { + if (!ConnectionOptions::reconnect) { + throw qpid::messaging::TransportFailure("Failed to connect (reconnect disabled)"); + } + if (limit >= 0 && retries++ >= limit) { + throw qpid::messaging::TransportFailure("Failed to connect within reconnect limit"); + } + if (expired(started, timeout)) { + throw qpid::messaging::TransportFailure("Failed to connect within reconnect timeout"); + } + QPID_LOG(debug, "Connection retry in " << i*1000*1000 << " microseconds, urls=" + << asString(urls)); + qpid::sys::usleep(int64_t(i*1000*1000)); // Sleep in microseconds. + } + retries = 0; +} + +bool ConnectionContext::tryConnect() +{ + for (std::vector<std::string>::const_iterator i = urls.begin(); i != urls.end(); ++i) { + try { + QPID_LOG(info, "Trying to connect to " << *i << "..."); + if (tryConnect(qpid::Url(*i, protocol.empty() ? qpid::Address::TCP : protocol))) { + QPID_LOG(info, "Connected to " << *i); + if (sasl.get()) { + wakeupDriver(); + while (!sasl->authenticated()) { + QPID_LOG(debug, id << " Waiting to be authenticated..."); + wait(); + } + QPID_LOG(debug, id << " Authenticated"); + } + + QPID_LOG(debug, id << " Opening..."); + setProperties(); + pn_connection_open(connection); + wakeupDriver(); //want to write + while (pn_connection_state(connection) & PN_REMOTE_UNINIT) { + wait(); + } + if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) { + throw qpid::messaging::ConnectionError("Failed to open connection"); + } + QPID_LOG(debug, id << " Opened"); + + return restartSessions(); + } + } catch (const qpid::messaging::TransportFailure& e) { + QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what()); + } + } + return false; +} + +bool ConnectionContext::tryConnect(const std::string& url) +{ + return tryConnect(qpid::Url(url, protocol.empty() ? qpid::Address::TCP : protocol)); +} + +bool ConnectionContext::tryConnect(const Url& url) +{ + if (url.getUser().size()) username = url.getUser(); + if (url.getPass().size()) password = url.getPass(); + + for (Url::const_iterator i = url.begin(); i != url.end(); ++i) { + if (tryConnect(*i)) return true; + } + return false; +} + +bool ConnectionContext::tryConnect(const qpid::Address& address) +{ + transport = driver->getTransport(address.protocol, *this); + std::stringstream port; + port << address.port; + id = address.host + COLON + port.str(); + if (useSasl()) { + sasl = std::auto_ptr<Sasl>(new Sasl(id, *this, address.host)); + } + state = CONNECTING; + try { + QPID_LOG(debug, id << " Connecting ..."); + transport->connect(address.host, port.str()); + bool waiting(true); + while (waiting) { + switch (state) { + case CONNECTED: + QPID_LOG(debug, id << " Connected"); + return true; + case CONNECTING: + lock.wait(); + break; + case DISCONNECTED: + waiting = false; + QPID_LOG(debug, id << " Failed to connect"); + break; + } + } + } catch (const std::exception& e) { + QPID_LOG(info, id << " Error while connecting: " << e.what()); + } + transport = boost::shared_ptr<Transport>(); + return false; +} + +bool ConnectionContext::restartSessions() +{ + try { + for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { + restartSession(i->second); + } + return true; + } catch (const qpid::TransportFailure& e) { + QPID_LOG(debug, "Connection Failed to re-initialize sessions: " << e.what()); + return false; + } +} + + }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h index 35948b65cf..3b80f71f1d 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h @@ -110,7 +110,6 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag private: typedef std::map<std::string, boost::shared_ptr<SessionContext> > SessionMap; - qpid::Url url; boost::shared_ptr<DriverImpl> driver; boost::shared_ptr<Transport> transport; @@ -129,17 +128,6 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag CONNECTED } state; std::auto_ptr<Sasl> sasl; - class CodecSwitch : public qpid::sys::Codec - { - public: - CodecSwitch(ConnectionContext&); - std::size_t decode(const char* buffer, std::size_t size); - std::size_t encode(char* buffer, std::size_t size); - bool canEncode(); - private: - ConnectionContext& parent; - }; - CodecSwitch codecSwitch; void check(); void wait(); @@ -155,7 +143,19 @@ class ConnectionContext : public qpid::sys::ConnectionCodec, public qpid::messag void checkClosed(boost::shared_ptr<SessionContext>, boost::shared_ptr<SenderContext>); void checkClosed(boost::shared_ptr<SessionContext>, pn_link_t*); void wakeupDriver(); - void attach(pn_session_t*, pn_link_t*, int credit=0); + void attach(pn_link_t*, int credit=0); + void reconnect(); + bool tryConnect(); + bool tryConnect(const std::string& url); + bool tryConnect(const qpid::Url& url); + bool tryConnect(const qpid::Address& address); + void reset(); + bool restartSessions(); + void restartSession(boost::shared_ptr<SessionContext>); + + std::size_t decodePlain(const char* buffer, std::size_t size); + std::size_t encodePlain(char* buffer, std::size_t size); + bool canEncodePlain(); std::size_t readProtocolHeader(const char* buffer, std::size_t size); std::size_t writeProtocolHeader(char* buffer, std::size_t size); diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp index 661856122d..473c120e27 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.cpp @@ -89,8 +89,18 @@ const std::string& ReceiverContext::getSource() const { return address.getName(); } -void ReceiverContext::verify(pn_terminus_t* source) +void ReceiverContext::verify() { + pn_terminus_t* source = pn_link_remote_source(receiver); + if (!pn_terminus_get_address(source)) { + std::string msg("No such source : "); + msg += getSource(); + QPID_LOG(debug, msg); + throw qpid::messaging::NotFound(msg); + } else if (AddressImpl::isTemporary(address)) { + address.setName(pn_terminus_get_address(source)); + QPID_LOG(debug, "Dynamic source name set to " << address.getName()); + } helper.checkAssertion(source, AddressHelper::FOR_RECEIVER); } void ReceiverContext::configure() @@ -118,6 +128,10 @@ bool ReceiverContext::isClosed() const return false;//TODO } - +void ReceiverContext::reset(pn_session_t* session) +{ + receiver = pn_receiver(session, name.c_str()); + configure(); +} }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h index 79049d9263..fb8ea0d336 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/ReceiverContext.h @@ -46,6 +46,7 @@ class ReceiverContext public: ReceiverContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& source); ~ReceiverContext(); + void reset(pn_session_t* session); void setCapacity(uint32_t); uint32_t getCapacity(); uint32_t getAvailable(); @@ -56,7 +57,7 @@ class ReceiverContext const std::string& getSource() const; bool isClosed() const; void configure(); - void verify(pn_terminus_t*); + void verify(); Address getAddress() const; private: friend class ConnectionContext; diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp index 1926afcb27..c3a9107333 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.cpp @@ -82,6 +82,7 @@ const std::string& SenderContext::getTarget() const bool SenderContext::send(const qpid::messaging::Message& message, SenderContext::Delivery** out) { + resend();//if there are any messages needing to be resent at the front of the queue, send them first if (processUnsettled(false) < capacity && pn_link_credit(sender)) { if (unreliable) { Delivery delivery(nextId++); @@ -424,7 +425,12 @@ bool changedSubject(const qpid::messaging::MessageImpl& msg, const qpid::messagi } -SenderContext::Delivery::Delivery(int32_t i) : id(i), token(0) {} +SenderContext::Delivery::Delivery(int32_t i) : id(i), token(0), presettled(false) {} + +void SenderContext::Delivery::reset() +{ + token = 0; +} void SenderContext::Delivery::encode(const qpid::messaging::MessageImpl& msg, const qpid::messaging::Address& address) { @@ -490,13 +496,20 @@ void SenderContext::Delivery::send(pn_link_t* sender, bool unreliable) tag.bytes = reinterpret_cast<const char*>(&id); token = pn_delivery(sender, tag); pn_link_send(sender, encoded.getData(), encoded.getSize()); - if (unreliable) pn_delivery_settle(token); + if (unreliable) { + pn_delivery_settle(token); + presettled = true; + } pn_link_advance(sender); } +bool SenderContext::Delivery::sent() const +{ + return presettled || token; +} bool SenderContext::Delivery::delivered() { - if (pn_delivery_remote_state(token) || pn_delivery_settled(token)) { + if (presettled || (token && (pn_delivery_remote_state(token) || pn_delivery_settled(token)))) { //TODO: need a better means for signalling outcomes other than accepted if (rejected()) { QPID_LOG(warning, "delivery " << id << " was rejected by peer"); @@ -520,8 +533,19 @@ void SenderContext::Delivery::settle() { pn_delivery_settle(token); } -void SenderContext::verify(pn_terminus_t* target) +void SenderContext::verify() { + pn_terminus_t* target = pn_link_remote_target(sender); + if (!pn_terminus_get_address(target)) { + std::string msg("No such target : "); + msg += getTarget(); + QPID_LOG(debug, msg); + throw qpid::messaging::NotFound(msg); + } else if (AddressImpl::isTemporary(address)) { + address.setName(pn_terminus_get_address(target)); + QPID_LOG(debug, "Dynamic target name set to " << address.getName()); + } + helper.checkAssertion(target, AddressHelper::FOR_SENDER); } void SenderContext::configure() @@ -549,4 +573,22 @@ Address SenderContext::getAddress() const return address; } + +void SenderContext::reset(pn_session_t* session) +{ + sender = pn_sender(session, name.c_str()); + configure(); + + for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end(); ++i) { + i->reset(); + } +} + +void SenderContext::resend() +{ + for (Deliveries::iterator i = deliveries.begin(); i != deliveries.end() && pn_link_credit(sender) && !i->sent(); ++i) { + i->send(sender, false/*only resend reliable transfers*/); + } +} + }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h index fcdfbbcf96..27ffa1e81b 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/SenderContext.h @@ -57,14 +57,18 @@ class SenderContext bool accepted(); bool rejected(); void settle(); + void reset(); + bool sent() const; private: int32_t id; pn_delivery_t* token; EncodedMessage encoded; + bool presettled; }; SenderContext(pn_session_t* session, const std::string& name, const qpid::messaging::Address& target); ~SenderContext(); + void reset(pn_session_t* session); void close(); void setCapacity(uint32_t); uint32_t getCapacity(); @@ -73,7 +77,7 @@ class SenderContext const std::string& getTarget() const; bool send(const qpid::messaging::Message& message, Delivery**); void configure(); - void verify(pn_terminus_t*); + void verify(); void check(); bool settled(); Address getAddress() const; @@ -92,6 +96,7 @@ class SenderContext uint32_t processUnsettled(bool silent); void configure(pn_terminus_t*); + void resend(); }; }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp index 8170bc74b8..8f2a7d15d8 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.cpp @@ -163,4 +163,15 @@ std::string SessionContext::getName() const return name; } +void SessionContext::reset(pn_connection_t* connection) +{ + session = pn_session(connection); + unacked.clear(); + for (SessionContext::SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) { + i->second->reset(session); + } + for (SessionContext::ReceiverMap::iterator i = receivers.begin(); i != receivers.end(); ++i) { + i->second->reset(session); + } +} }}} // namespace qpid::messaging::amqp diff --git a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h index 7a0e0fb23e..5d68f6d8de 100644 --- a/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h +++ b/qpid/cpp/src/qpid/messaging/amqp/SessionContext.h @@ -50,6 +50,7 @@ class SessionContext public: SessionContext(pn_connection_t*); ~SessionContext(); + void reset(pn_connection_t*); boost::shared_ptr<SenderContext> createSender(const qpid::messaging::Address& address); boost::shared_ptr<ReceiverContext> createReceiver(const qpid::messaging::Address& address); boost::shared_ptr<SenderContext> getSender(const std::string& name) const; diff --git a/qpid/cpp/src/tests/interlink_tests.py b/qpid/cpp/src/tests/interlink_tests.py index 0d2b757152..1724607533 100755 --- a/qpid/cpp/src/tests/interlink_tests.py +++ b/qpid/cpp/src/tests/interlink_tests.py @@ -22,6 +22,7 @@ import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil import traceback from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty from brokertest import * +from ha_test import HaPort from threading import Thread, Lock, Condition from logging import getLogger, WARN, ERROR, DEBUG, INFO from qpidtoollibs import BrokerAgent, BrokerObject @@ -46,7 +47,8 @@ class AmqpBrokerTest(BrokerTest): def setUp(self): BrokerTest.setUp(self) os.putenv("QPID_LOAD_MODULE", BrokerTest.amqpc_lib) - self.broker = self.amqp_broker() + self.port_holder = HaPort(self) + self.broker = self.amqp_broker(port_holder=self.port_holder) self.default_config = Config(self.broker) self.agent = BrokerAgent(self.broker.connect()) @@ -252,14 +254,73 @@ class AmqpBrokerTest(BrokerTest): #send to q on broker B through brokerA self.send_and_receive(send_config=Config(self.broker, address="q@BrokerB"), recv_config=Config(brokerB)) + def test_reconnect(self): + receiver_cmd = ["qpid-receive", + "--broker", self.broker.host_port(), + "--address=amq.fanout", + "--connection-options={protocol:amqp1.0, reconnect:True,container_id:receiver}", + "--timeout=10", "--print-content=true", "--print-headers=false" + ] + receiver = self.popen(receiver_cmd, stdout=PIPE) + + sender_cmd = ["qpid-send", + "--broker", self.broker.host_port(), + "--address=amq.fanout", + "--connection-options={protocol:amqp1.0,reconnect:True,container_id:sender}", + "--content-stdin", "--send-eos=1" + ] + sender = self.popen(sender_cmd, stdin=PIPE) + sender._set_cloexec_flag(sender.stdin) #required for older python, see http://bugs.python.org/issue4112 + + + batch1 = ["message-%s" % (i+1) for i in range(10000)] + for m in batch1: + sender.stdin.write(m + "\n") + sender.stdin.flush() + + self.broker.kill() + self.broker = self.amqp_broker(port_holder=self.port_holder) + + batch2 = ["message-%s" % (i+1) for i in range(10000, 20000)] + for m in batch2: + sender.stdin.write(m + "\n") + sender.stdin.flush() + + sender.stdin.close() + + last = None + m = receiver.stdout.readline().rstrip() + while len(m): + last = m + m = receiver.stdout.readline().rstrip() + assert last == "message-20000", (last) + """ Create and return a broker with AMQP 1.0 support """ def amqp_broker(self): assert BrokerTest.amqp_lib, "Cannot locate AMQP 1.0 plug-in" + self.port_holder = HaPort(self) #reserve port args = ["--load-module", BrokerTest.amqp_lib, - "--max-negotiate-time=600000", + "--socket-fd=%s" % self.port_holder.fileno, + "--listen-disable=tcp", "--log-enable=trace+:Protocol", "--log-enable=info+"] - return BrokerTest.broker(self, args) + return BrokerTest.broker(self, args, port=self.port_holder.port) + + def amqp_broker(self, port_holder=None): + assert BrokerTest.amqp_lib, "Cannot locate AMQP 1.0 plug-in" + if port_holder: + args = ["--load-module", BrokerTest.amqp_lib, + "--socket-fd=%s" % port_holder.fileno, + "--listen-disable=tcp", + "--log-enable=trace+:Protocol", + "--log-enable=info+"] + return BrokerTest.broker(self, args, port=port_holder.port) + else: + args = ["--load-module", BrokerTest.amqp_lib, + "--log-enable=trace+:Protocol", + "--log-enable=info+"] + return BrokerTest.broker(self, args) + if __name__ == "__main__": shutil.rmtree("brokertest.tmp", True) |
