diff options
| author | Gordon Sim <gsim@apache.org> | 2014-10-29 22:24:41 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2014-10-29 22:24:41 +0000 |
| commit | 32dd24b009ff8adeaf47a77f5606eb51e07e0a44 (patch) | |
| tree | cebea96d6af636ab3ee3244a2e5cd2975c8b4654 /qpid/cpp/src | |
| parent | 01e36f9700b6c0e473b1d16ae7bd214805cdc035 (diff) | |
| download | qpid-python-32dd24b009ff8adeaf47a77f5606eb51e07e0a44.tar.gz | |
QPID-5538: simple heartbeat enablement over 1.0 for broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1635316 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Connection.cpp | 49 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Connection.h | 6 |
2 files changed, 51 insertions, 4 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp index 6042af3580..0a1adab517 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp @@ -29,6 +29,8 @@ #include "qpid/framing/ProtocolInitiation.h" #include "qpid/framing/ProtocolVersion.h" #include "qpid/log/Statement.h" +#include "qpid/sys/Time.h" +#include "qpid/sys/Timer.h" #include "qpid/sys/OutputControl.h" #include "config.h" #include <sstream> @@ -89,6 +91,28 @@ void Connection::trace(const char* message) const QPID_LOG_CAT(trace, protocol, "[" << id << "]: " << message); } +namespace { +struct ConnectionTickerTask : public qpid::sys::TimerTask +{ + qpid::sys::Timer& timer; + Connection& connection; + ConnectionTickerTask(uint64_t interval, qpid::sys::Timer& t, Connection& c) : + TimerTask(qpid::sys::Duration(interval*qpid::sys::TIME_MSEC), "ConnectionTicker"), + timer(t), + connection(c) + {} + + void fire() { + // Setup next firing + setupNextFire(); + timer.add(this); + + // Send Ticker + connection.requestIO(); + } +}; +} + Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, BrokerContext& b, bool saslInUse, bool brokerInitiated) : BrokerContext(b), ManagedConnection(getBroker(), i, brokerInitiated), connection(pn_connection()), @@ -122,9 +146,14 @@ Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, Broker } } +void Connection::requestIO() +{ + out.activateOutput(); +} Connection::~Connection() { + if (ticker) ticker->cancel(); getBroker().getConnectionObservers().closed(*this); pn_transport_free(transport); pn_connection_free(connection); @@ -161,7 +190,7 @@ size_t Connection::decode(const char* buffer, size_t size) pn_condition_set_description(error, e.what()); close(); } - pn_transport_tick(transport, 0); + pn_transport_tick(transport, qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_MSEC); if (!haveOutput) { haveOutput = true; out.activateOutput(); @@ -245,8 +274,7 @@ bool Connection::canEncode() } else { QPID_LOG(info, "Connection " << id << " has been closed locally"); } - //TODO: proper handling of time in and out of tick - pn_transport_tick(transport, 0); + pn_transport_tick(transport, qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_MSEC); QPID_LOG_CAT(trace, network, id << " canEncode(): " << haveOutput) return haveOutput; } @@ -256,6 +284,12 @@ void Connection::open() readPeerProperties(); pn_connection_set_container(connection, getBroker().getFederationTag().c_str()); + uint32_t timeout = pn_transport_get_remote_idle_timeout(transport); + if (timeout) { + ticker = boost::intrusive_ptr<qpid::sys::TimerTask>(new ConnectionTickerTask(timeout, getBroker().getTimer(), *this)); + pn_transport_set_idle_timeout(transport, timeout); + } + pn_connection_open(connection); out.connectionEstablished(); opened(); @@ -271,6 +305,7 @@ void Connection::readPeerProperties() void Connection::closed() { + if (ticker) ticker->cancel(); for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) { i->second->close(); } @@ -372,7 +407,12 @@ void Connection::process() pn_connection_close(connection); } } - +namespace { +std::string convert(pn_delivery_tag_t in) +{ + return std::string(in.bytes, in.size); +} +} void Connection::processDeliveries() { //handle deliveries @@ -382,6 +422,7 @@ void Connection::processDeliveries() if (pn_link_is_receiver(link)) { Sessions::iterator i = sessions.find(pn_link_session(link)); if (i != sessions.end()) { + QPID_LOG(notice, "Processing delivery with tag " << convert(pn_delivery_tag(delivery))); i->second->readable(link, delivery); } else { pn_delivery_update(delivery, PN_REJECTED); diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.h b/qpid/cpp/src/qpid/broker/amqp/Connection.h index ed9a3a26b7..17c5b0ecf0 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Connection.h +++ b/qpid/cpp/src/qpid/broker/amqp/Connection.h @@ -25,6 +25,7 @@ #include "qpid/broker/amqp/BrokerContext.h" #include "qpid/broker/amqp/ManagedConnection.h" #include <map> +#include <boost/intrusive_ptr.hpp> #include <boost/shared_ptr.hpp> struct pn_connection_t; @@ -32,6 +33,9 @@ struct pn_session_t; struct pn_transport_t; namespace qpid { +namespace sys { +class TimerTask; +} namespace broker { class Broker; @@ -60,6 +64,7 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man void setUserId(const std::string&); void abort(); void trace(const char*) const; + void requestIO(); protected: typedef std::map<pn_session_t*, boost::shared_ptr<Session> > Sessions; pn_connection_t* connection; @@ -70,6 +75,7 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man Sessions sessions; bool closeInitiated; bool closeRequested; + boost::intrusive_ptr<sys::TimerTask> ticker; virtual void process(); void doOutput(size_t); |
