diff options
| -rw-r--r-- | qpid/cpp/include/qpid/sys/Time.h | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.cpp | 16 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Broker.h | 9 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/ConnectionHandler.cpp | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Link.cpp | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Backup.cpp | 2 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Primary.cpp | 3 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/Settings.h | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/StatusCheck.cpp | 20 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/ha/StatusCheck.h | 5 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/sys/posix/Time.cpp | 35 | ||||
| -rw-r--r-- | qpid/cpp/src/tests/TimerTest.cpp | 44 |
12 files changed, 106 insertions, 42 deletions
diff --git a/qpid/cpp/include/qpid/sys/Time.h b/qpid/cpp/include/qpid/sys/Time.h index 9c5ac66e9a..8e99356409 100644 --- a/qpid/cpp/include/qpid/sys/Time.h +++ b/qpid/cpp/include/qpid/sys/Time.h @@ -125,6 +125,7 @@ public: }; std::ostream& operator << (std::ostream&, const Duration&); +std::istream& operator >> (std::istream&, Duration&); inline AbsTime now() { return AbsTime::now(); } diff --git a/qpid/cpp/src/qpid/broker/Broker.cpp b/qpid/cpp/src/qpid/broker/Broker.cpp index bdc49e669d..e059ffba2b 100644 --- a/qpid/cpp/src/qpid/broker/Broker.cpp +++ b/qpid/cpp/src/qpid/broker/Broker.cpp @@ -126,8 +126,8 @@ Broker::Options::Options(const std::string& name) : connectionBacklog(10), enableMgmt(1), mgmtPublish(1), - mgmtPubInterval(10), - queueCleanInterval(60*10),//10 minutes + mgmtPubInterval(10*sys::TIME_SEC), + queueCleanInterval(60*sys::TIME_SEC*10),//10 minutes auth(SaslAuthenticator::available()), realm("QPID"), replayFlushLimit(0), @@ -143,8 +143,8 @@ Broker::Options::Options(const std::string& name) : queueThresholdEventRatio(80), defaultMsgGroup("qpid.no-group"), timestampRcvMsgs(false), // set the 0.10 timestamp delivery property - linkMaintenanceInterval(2), - linkHeartbeatInterval(120), + linkMaintenanceInterval(2*sys::TIME_SEC), + linkHeartbeatInterval(120*sys::TIME_SEC), maxNegotiateTime(10000) // 10s { int c = sys::SystemInfo::concurrency(); @@ -168,8 +168,6 @@ Broker::Options::Options(const std::string& name) : ("mgmt-publish", optValue(mgmtPublish,"yes|no"), "Enable Publish of Management Data ('no' implies query-only)") ("mgmt-qmf2", optValue(qmf2Support,"yes|no"), "Enable broadcast of management information over QMF v2") ("mgmt-qmf1", optValue(qmf1Support,"yes|no"), "Enable broadcast of management information over QMF v1") - // FIXME aconway 2012-02-13: consistent treatment of values in SECONDS - // allow sub-second intervals. ("mgmt-pub-interval", optValue(mgmtPubInterval, "SECONDS"), "Management Publish Interval") ("queue-purge-interval", optValue(queueCleanInterval, "SECONDS"), "Interval between attempts to purge any expired messages from queues") @@ -232,7 +230,7 @@ Broker::Broker(const Broker::Options& conf) : if (conf.enableMgmt) { QPID_LOG(info, "Management enabled"); managementAgent->configure(dataDir.isEnabled() ? dataDir.getPath() : string(), conf.mgmtPublish, - conf.mgmtPubInterval, this, conf.workerThreads + 3); + conf.mgmtPubInterval/sys::TIME_SEC, this, conf.workerThreads + 3); managementAgent->setName("apache.org", "qpidd"); _qmf::Package packageInitializer(managementAgent.get()); @@ -244,7 +242,7 @@ Broker::Broker(const Broker::Options& conf) : mgmtObject->set_port(conf.port); mgmtObject->set_workerThreads(conf.workerThreads); mgmtObject->set_connBacklog(conf.connectionBacklog); - mgmtObject->set_mgmtPubInterval(conf.mgmtPubInterval); + mgmtObject->set_mgmtPubInterval(conf.mgmtPubInterval/sys::TIME_SEC); mgmtObject->set_mgmtPublish(conf.mgmtPublish); mgmtObject->set_version(qpid::version); if (dataDir.isEnabled()) @@ -356,7 +354,7 @@ Broker::Broker(const Broker::Options& conf) : } if (conf.queueCleanInterval) { - queueCleaner.start(conf.queueCleanInterval * qpid::sys::TIME_SEC); + queueCleaner.start(conf.queueCleanInterval); } if (!conf.knownHosts.empty() && conf.knownHosts != knownHostsNone) { diff --git a/qpid/cpp/src/qpid/broker/Broker.h b/qpid/cpp/src/qpid/broker/Broker.h index 6a46095af4..0ac5fc412e 100644 --- a/qpid/cpp/src/qpid/broker/Broker.h +++ b/qpid/cpp/src/qpid/broker/Broker.h @@ -98,8 +98,8 @@ class Broker : public sys::Runnable, public Plugin::Target, int connectionBacklog; bool enableMgmt; bool mgmtPublish; - uint16_t mgmtPubInterval; - uint16_t queueCleanInterval; + sys::Duration mgmtPubInterval; + sys::Duration queueCleanInterval; bool auth; std::string realm; size_t replayFlushLimit; @@ -116,8 +116,8 @@ class Broker : public sys::Runnable, public Plugin::Target, uint16_t queueThresholdEventRatio; std::string defaultMsgGroup; bool timestampRcvMsgs; - double linkMaintenanceInterval; // FIXME aconway 2012-02-13: consistent parsing of SECONDS values. - uint16_t linkHeartbeatInterval; + sys::Duration linkMaintenanceInterval; + sys::Duration linkHeartbeatInterval; uint32_t maxNegotiateTime; // Max time in ms for connection with no negotiation std::string fedTag; @@ -350,7 +350,6 @@ class Broker : public sys::Runnable, public Plugin::Target, QPID_BROKER_EXTERN framing::FieldTable getLinkClientProperties() const; QPID_BROKER_EXTERN void setLinkClientProperties(const framing::FieldTable&); - QPID_BROKER_EXTERN uint16_t getLinkHearbeatInterval() { return config.linkHeartbeatInterval; } /** Information identifying this system */ boost::shared_ptr<const System> getSystem() const { return systemObject; } friend class StatusCheckThread; diff --git a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp index 977c706ebd..ac05178fce 100644 --- a/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp +++ b/qpid/cpp/src/qpid/broker/ConnectionHandler.cpp @@ -34,6 +34,7 @@ #include "qpid/log/Statement.h" #include "qpid/management/ManagementAgent.h" #include "qpid/sys/SecurityLayer.h" +#include "qpid/sys/Time.h" #include "qpid/broker/AclModule.h" #include "qpid/amqp_0_10/Codecs.h" #include "qmf/org/apache/qpid/broker/EventClientConnectFail.h" @@ -401,7 +402,9 @@ void ConnectionHandler::Handler::tune(uint16_t channelMax, // this method is only ever called when this Connection // is a federation link where this Broker is acting as // a client to another Broker - uint16_t hb = std::min(connection.getBroker().getOptions().linkHeartbeatInterval, heartbeatMax); + sys::Duration interval = connection.getBroker().getOptions().linkHeartbeatInterval; + uint16_t intervalSec = static_cast<uint16_t>(interval/sys::TIME_SEC); + uint16_t hb = std::min(intervalSec, heartbeatMax); connection.setHeartbeat(hb); connection.startLinkHeartbeatTimeoutTask(); diff --git a/qpid/cpp/src/qpid/broker/Link.cpp b/qpid/cpp/src/qpid/broker/Link.cpp index 3380708c0e..c6ac6832c0 100644 --- a/qpid/cpp/src/qpid/broker/Link.cpp +++ b/qpid/cpp/src/qpid/broker/Link.cpp @@ -61,8 +61,7 @@ namespace { struct LinkTimerTask : public sys::TimerTask { LinkTimerTask(Link& l, sys::Timer& t) - : TimerTask(int64_t(l.getBroker()->getOptions().linkMaintenanceInterval* - sys::TIME_SEC), + : TimerTask(l.getBroker()->getOptions().linkMaintenanceInterval, "Link retry timer"), link(l), timer(t) {} diff --git a/qpid/cpp/src/qpid/ha/Backup.cpp b/qpid/cpp/src/qpid/ha/Backup.cpp index 2affc12bf6..6d64bf2c82 100644 --- a/qpid/cpp/src/qpid/ha/Backup.cpp +++ b/qpid/cpp/src/qpid/ha/Backup.cpp @@ -54,7 +54,7 @@ Backup::Backup(HaBroker& hb, const Settings& s) : haBroker(hb), broker(hb.getBroker()), settings(s), statusCheck( new StatusCheck( - logPrefix, broker.getLinkHearbeatInterval(), hb.getBrokerInfo())) + logPrefix, broker.getOptions().linkHeartbeatInterval, hb.getBrokerInfo())) { // Set link properties to tag outgoing links. framing::FieldTable linkProperties = broker.getLinkClientProperties(); diff --git a/qpid/cpp/src/qpid/ha/Primary.cpp b/qpid/cpp/src/qpid/ha/Primary.cpp index 93dbbbea85..61cdced1ba 100644 --- a/qpid/cpp/src/qpid/ha/Primary.cpp +++ b/qpid/cpp/src/qpid/ha/Primary.cpp @@ -104,8 +104,7 @@ Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) : backup->setCatchupQueues(hb.getBroker().getQueues(), true); // Create guards } // Set timeout for expected brokers to connect and become ready. - sys::Duration timeout(int64_t(hb.getSettings().backupTimeout*sys::TIME_SEC)); - sys::AbsTime deadline(sys::now(), timeout); + sys::AbsTime deadline(sys::now(), hb.getSettings().backupTimeout); timerTask = new ExpectedBackupTimerTask(*this, deadline); hb.getBroker().getTimer().add(timerTask); } diff --git a/qpid/cpp/src/qpid/ha/Settings.h b/qpid/cpp/src/qpid/ha/Settings.h index 53b61415cf..94c157bccd 100644 --- a/qpid/cpp/src/qpid/ha/Settings.h +++ b/qpid/cpp/src/qpid/ha/Settings.h @@ -23,6 +23,7 @@ */ #include "types.h" +#include "qpid/sys/Time.h" #include "qpid/sys/IntegerTypes.h" #include <string> @@ -36,7 +37,7 @@ class Settings { public: Settings() : cluster(false), queueReplication(false), - replicateDefault(NONE), backupTimeout(5), + replicateDefault(NONE), backupTimeout(5*sys::TIME_SEC), flowMessages(100), flowBytes(0) {} @@ -46,7 +47,7 @@ class Settings std::string brokerUrl; Enum<ReplicateLevel> replicateDefault; std::string username, password, mechanism; - double backupTimeout; + sys::Duration backupTimeout; uint32_t flowMessages, flowBytes; diff --git a/qpid/cpp/src/qpid/ha/StatusCheck.cpp b/qpid/cpp/src/qpid/ha/StatusCheck.cpp index 17613ce3dd..2921b9ec55 100644 --- a/qpid/cpp/src/qpid/ha/StatusCheck.cpp +++ b/qpid/cpp/src/qpid/ha/StatusCheck.cpp @@ -46,21 +46,21 @@ class StatusCheckThread : public sys::Runnable { private: Url url; StatusCheck& statusCheck; - uint16_t linkHeartbeatInterval; + sys::Duration linkHeartbeatInterval; BrokerInfo brokerInfo; }; void StatusCheckThread::run() { QPID_LOG(debug, statusCheck.logPrefix << "Checking status of " << url); - Variant::Map options, clientProperties; - clientProperties = brokerInfo.asMap(); // Detect self connections. - clientProperties["qpid.ha-admin"] = 1; // Allow connection to backups. + try { + Variant::Map options, clientProperties; + clientProperties = brokerInfo.asMap(); // Detect self connections. + clientProperties["qpid.ha-admin"] = 1; // Allow connection to backups. - options["client-properties"] = clientProperties; - options["heartbeat"] = statusCheck.linkHeartbeatInterval; - Connection c(url.str(), options); + options["client-properties"] = clientProperties; + options["heartbeat"] = statusCheck.linkHeartbeatInterval/sys::TIME_SEC; + Connection c(url.str(), options); - try { c.open(); Session session = c.createSession(); messaging::Address responses("#;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.replicate':none}}}}"); @@ -78,7 +78,7 @@ void StatusCheckThread::run() { content["_object_id"] = oid; encode(content, request); s.send(request); - Message response = r.fetch(statusCheck.linkHeartbeatInterval*Duration::SECOND); + Message response = r.fetch(messaging::Duration(linkHeartbeatInterval/TIME_MSEC)); session.acknowledge(); Variant::List contentIn; decode(response, contentIn); @@ -98,7 +98,7 @@ void StatusCheckThread::run() { delete this; } -StatusCheck::StatusCheck(const string& lp, uint16_t lh, const BrokerInfo& self) +StatusCheck::StatusCheck(const string& lp, sys::Duration lh, const BrokerInfo& self) : logPrefix(lp), promote(true), linkHeartbeatInterval(lh), brokerInfo(self) {} diff --git a/qpid/cpp/src/qpid/ha/StatusCheck.h b/qpid/cpp/src/qpid/ha/StatusCheck.h index 997ced4159..924018c50e 100644 --- a/qpid/cpp/src/qpid/ha/StatusCheck.h +++ b/qpid/cpp/src/qpid/ha/StatusCheck.h @@ -27,6 +27,7 @@ #include "qpid/sys/Thread.h" #include "qpid/sys/Mutex.h" #include "qpid/sys/Runnable.h" +#include "qpid/sys/Time.h" #include <vector> namespace qpid { @@ -50,7 +51,7 @@ namespace ha { class StatusCheck { public: - StatusCheck(const std::string& logPrefix, uint16_t linkHeartbeatInteval, const BrokerInfo& self); + StatusCheck(const std::string& logPrefix, sys::Duration linkHeartbeatInterval, const BrokerInfo& self); ~StatusCheck(); void setUrl(const Url&); bool canPromote(); @@ -62,7 +63,7 @@ class StatusCheck sys::Mutex lock; std::vector<sys::Thread> threads; bool promote; - uint16_t linkHeartbeatInterval; + sys::Duration linkHeartbeatInterval; BrokerInfo brokerInfo; friend class StatusCheckThread; }; diff --git a/qpid/cpp/src/qpid/sys/posix/Time.cpp b/qpid/cpp/src/qpid/sys/posix/Time.cpp index 272c6c21a5..10c730af73 100644 --- a/qpid/cpp/src/qpid/sys/posix/Time.cpp +++ b/qpid/cpp/src/qpid/sys/posix/Time.cpp @@ -23,11 +23,13 @@ #include "qpid/sys/Time.h" #include <ostream> +#include <istream> #include <time.h> #include <stdio.h> #include <sys/time.h> #include <unistd.h> #include <iomanip> +#include <cctype> namespace { int64_t max_abstime() { return std::numeric_limits<int64_t>::max(); } @@ -73,19 +75,36 @@ struct timespec& toTimespec(struct timespec& ts, const Duration& t) { return ts; } -struct timeval& toTimeval(struct timeval& tv, const Duration& t) { - Duration secs = t / TIME_SEC; - tv.tv_sec = (secs > TIME_T_MAX) ? TIME_T_MAX : static_cast<time_t>(secs); - tv.tv_usec = static_cast<suseconds_t>((t%TIME_SEC)/TIME_USEC); - return tv; -} - Duration toTime(const struct timespec& ts) { return ts.tv_sec*TIME_SEC + ts.tv_nsec; } std::ostream& operator<<(std::ostream& o, const Duration& d) { - return o << int64_t(d) << "ns"; + if (d >= TIME_SEC) return o << (double(d)/TIME_SEC) << "s"; + if (d >= TIME_MSEC) return o << (double(d)/TIME_MSEC) << "ms"; + if (d >= TIME_USEC) return o << (double(d)/TIME_USEC) << "us"; + return o << int64_t(d) << "ns"; +} + +std::istream& operator>>(std::istream& i, Duration& d) { + // Don't throw, let the istream throw if it's configured to do so. + double number; + i >> number; + if (i.fail()) return i; + + if (i.eof() || std::isspace(i.peek())) // No suffix + d = number*TIME_SEC; + else { + std::string suffix; + i >> suffix; + if (i.fail()) return i; + if (suffix.compare("s") == 0) d = number*TIME_SEC; + else if (suffix.compare("ms") == 0) d = number*TIME_MSEC; + else if (suffix.compare("us") == 0) d = number*TIME_USEC; + else if (suffix.compare("ns") == 0) d = number*TIME_NSEC; + else i.setstate(std::ios::failbit); + } + return i; } namespace { diff --git a/qpid/cpp/src/tests/TimerTest.cpp b/qpid/cpp/src/tests/TimerTest.cpp index fc5004dcb0..4a1a050390 100644 --- a/qpid/cpp/src/tests/TimerTest.cpp +++ b/qpid/cpp/src/tests/TimerTest.cpp @@ -21,6 +21,7 @@ */ #include "qpid/sys/Timer.h" #include "qpid/sys/Monitor.h" +#include "qpid/Options.h" #include "unit_test.h" #include <math.h> #include <iostream> @@ -127,6 +128,49 @@ QPID_AUTO_TEST_CASE(testGeneral) dynamic_pointer_cast<TestTask>(task4)->check(2); } +std::string toString(Duration d) { return boost::lexical_cast<std::string>(d); } +Duration fromString(const std::string& str) { return boost::lexical_cast<Duration>(str); } + +QPID_AUTO_TEST_CASE(testOstreamInOut) { + std::string empty; + BOOST_CHECK_EQUAL(toString(Duration(TIME_SEC)), "1s"); + BOOST_CHECK_EQUAL(toString(Duration(TIME_SEC*123.4)), "123.4s"); + BOOST_CHECK_EQUAL(toString(Duration(TIME_MSEC*123.4)), "123.4ms"); + BOOST_CHECK_EQUAL(toString(Duration(TIME_USEC*123.4)), "123.4us"); + BOOST_CHECK_EQUAL(toString(Duration(TIME_NSEC*123)), "123ns"); + + BOOST_CHECK_EQUAL(fromString("123.4"), Duration(TIME_SEC*123.4)); + BOOST_CHECK_EQUAL(fromString("123.4s"), Duration(TIME_SEC*123.4)); + BOOST_CHECK_EQUAL(fromString("123ms"), Duration(TIME_MSEC*123)); + BOOST_CHECK_EQUAL(fromString("123us"), Duration(TIME_USEC*123)); + BOOST_CHECK_EQUAL(fromString("123ns"), Duration(TIME_NSEC*123)); + + Duration d = 0; + std::istringstream i; + std::string s; + + i.str("123x"); + i >> d; + BOOST_CHECK(i.fail()); + BOOST_CHECK_EQUAL(d, 0); + BOOST_CHECK_EQUAL(i.str(), "123x"); + + i.str("xxx"); + i >> d; + BOOST_CHECK(i.fail()); + BOOST_CHECK_EQUAL(d, 0); + BOOST_CHECK_EQUAL(i.str(), "xxx"); +} + +QPID_AUTO_TEST_CASE(testOptionParse) { + Options opts; + Duration interval; + opts.addOptions()("interval", optValue(interval, "I"), "blah"); + const char *args[] = { "fakeexe", "--interval", "123.4" }; + opts.parse(sizeof(args)/sizeof(args[0]), args); + BOOST_CHECK_EQUAL(interval, Duration(TIME_SEC * 123.4)); +} + QPID_AUTO_TEST_SUITE_END() }} // namespace qpid::tests |
