summaryrefslogtreecommitdiff
path: root/qpid/cpp/src
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2014-10-29 22:24:41 +0000
committerGordon Sim <gsim@apache.org>2014-10-29 22:24:41 +0000
commit32dd24b009ff8adeaf47a77f5606eb51e07e0a44 (patch)
treecebea96d6af636ab3ee3244a2e5cd2975c8b4654 /qpid/cpp/src
parent01e36f9700b6c0e473b1d16ae7bd214805cdc035 (diff)
downloadqpid-python-32dd24b009ff8adeaf47a77f5606eb51e07e0a44.tar.gz
QPID-5538: simple heartbeat enablement over 1.0 for broker
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1635316 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/cpp/src')
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Connection.cpp49
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Connection.h6
2 files changed, 51 insertions, 4 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
index 6042af3580..0a1adab517 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Connection.cpp
@@ -29,6 +29,8 @@
#include "qpid/framing/ProtocolInitiation.h"
#include "qpid/framing/ProtocolVersion.h"
#include "qpid/log/Statement.h"
+#include "qpid/sys/Time.h"
+#include "qpid/sys/Timer.h"
#include "qpid/sys/OutputControl.h"
#include "config.h"
#include <sstream>
@@ -89,6 +91,28 @@ void Connection::trace(const char* message) const
QPID_LOG_CAT(trace, protocol, "[" << id << "]: " << message);
}
+namespace {
+struct ConnectionTickerTask : public qpid::sys::TimerTask
+{
+ qpid::sys::Timer& timer;
+ Connection& connection;
+ ConnectionTickerTask(uint64_t interval, qpid::sys::Timer& t, Connection& c) :
+ TimerTask(qpid::sys::Duration(interval*qpid::sys::TIME_MSEC), "ConnectionTicker"),
+ timer(t),
+ connection(c)
+ {}
+
+ void fire() {
+ // Setup next firing
+ setupNextFire();
+ timer.add(this);
+
+ // Send Ticker
+ connection.requestIO();
+ }
+};
+}
+
Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, BrokerContext& b, bool saslInUse, bool brokerInitiated)
: BrokerContext(b), ManagedConnection(getBroker(), i, brokerInitiated),
connection(pn_connection()),
@@ -122,9 +146,14 @@ Connection::Connection(qpid::sys::OutputControl& o, const std::string& i, Broker
}
}
+void Connection::requestIO()
+{
+ out.activateOutput();
+}
Connection::~Connection()
{
+ if (ticker) ticker->cancel();
getBroker().getConnectionObservers().closed(*this);
pn_transport_free(transport);
pn_connection_free(connection);
@@ -161,7 +190,7 @@ size_t Connection::decode(const char* buffer, size_t size)
pn_condition_set_description(error, e.what());
close();
}
- pn_transport_tick(transport, 0);
+ pn_transport_tick(transport, qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_MSEC);
if (!haveOutput) {
haveOutput = true;
out.activateOutput();
@@ -245,8 +274,7 @@ bool Connection::canEncode()
} else {
QPID_LOG(info, "Connection " << id << " has been closed locally");
}
- //TODO: proper handling of time in and out of tick
- pn_transport_tick(transport, 0);
+ pn_transport_tick(transport, qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_MSEC);
QPID_LOG_CAT(trace, network, id << " canEncode(): " << haveOutput)
return haveOutput;
}
@@ -256,6 +284,12 @@ void Connection::open()
readPeerProperties();
pn_connection_set_container(connection, getBroker().getFederationTag().c_str());
+ uint32_t timeout = pn_transport_get_remote_idle_timeout(transport);
+ if (timeout) {
+ ticker = boost::intrusive_ptr<qpid::sys::TimerTask>(new ConnectionTickerTask(timeout, getBroker().getTimer(), *this));
+ pn_transport_set_idle_timeout(transport, timeout);
+ }
+
pn_connection_open(connection);
out.connectionEstablished();
opened();
@@ -271,6 +305,7 @@ void Connection::readPeerProperties()
void Connection::closed()
{
+ if (ticker) ticker->cancel();
for (Sessions::iterator i = sessions.begin(); i != sessions.end(); ++i) {
i->second->close();
}
@@ -372,7 +407,12 @@ void Connection::process()
pn_connection_close(connection);
}
}
-
+namespace {
+std::string convert(pn_delivery_tag_t in)
+{
+ return std::string(in.bytes, in.size);
+}
+}
void Connection::processDeliveries()
{
//handle deliveries
@@ -382,6 +422,7 @@ void Connection::processDeliveries()
if (pn_link_is_receiver(link)) {
Sessions::iterator i = sessions.find(pn_link_session(link));
if (i != sessions.end()) {
+ QPID_LOG(notice, "Processing delivery with tag " << convert(pn_delivery_tag(delivery)));
i->second->readable(link, delivery);
} else {
pn_delivery_update(delivery, PN_REJECTED);
diff --git a/qpid/cpp/src/qpid/broker/amqp/Connection.h b/qpid/cpp/src/qpid/broker/amqp/Connection.h
index ed9a3a26b7..17c5b0ecf0 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Connection.h
+++ b/qpid/cpp/src/qpid/broker/amqp/Connection.h
@@ -25,6 +25,7 @@
#include "qpid/broker/amqp/BrokerContext.h"
#include "qpid/broker/amqp/ManagedConnection.h"
#include <map>
+#include <boost/intrusive_ptr.hpp>
#include <boost/shared_ptr.hpp>
struct pn_connection_t;
@@ -32,6 +33,9 @@ struct pn_session_t;
struct pn_transport_t;
namespace qpid {
+namespace sys {
+class TimerTask;
+}
namespace broker {
class Broker;
@@ -60,6 +64,7 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man
void setUserId(const std::string&);
void abort();
void trace(const char*) const;
+ void requestIO();
protected:
typedef std::map<pn_session_t*, boost::shared_ptr<Session> > Sessions;
pn_connection_t* connection;
@@ -70,6 +75,7 @@ class Connection : public BrokerContext, public sys::ConnectionCodec, public Man
Sessions sessions;
bool closeInitiated;
bool closeRequested;
+ boost::intrusive_ptr<sys::TimerTask> ticker;
virtual void process();
void doOutput(size_t);