diff options
author | Andrew Stitcher <astitcher@apache.org> | 2014-07-01 16:55:43 +0000 |
---|---|---|
committer | Andrew Stitcher <astitcher@apache.org> | 2014-07-01 16:55:43 +0000 |
commit | b2bd2715e9b15271fb051e594f8bd141cf4b073a (patch) | |
tree | 005d33c3a0292af64d0c528874a11e8612a05f6d | |
parent | ea3eadaf0f332988da797cc66d6534dc2ee4ee1e (diff) | |
download | qpid-python-b2bd2715e9b15271fb051e594f8bd141cf4b073a.tar.gz |
QPID-5865: Be more robust in face of system clock being changed:
- Separate Wall clock time uses from other time
* (assumed that any time with respect to the epoch is wallclock)
- For Posix use CLOCK_MONOTONIC for all non wall clock purposes
so that changing system time doesn't affect internal timekeeping
- For Windows kept the same time keeping scheme.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1607140 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | qpid/cpp/src/CMakeLists.txt | 1 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/AgentSession.cpp | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qmf/ConsoleSession.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/acl/Acl.cpp | 8 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/PagedQueue.cpp | 7 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/Selector.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/broker/SessionManager.cpp | 4 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementAgent.cpp | 10 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/management/ManagementObject.cpp | 12 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/Time.h | 19 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/posix/Condition.cpp | 45 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/posix/Condition.h | 28 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/posix/PrivatePosix.h | 2 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/posix/Time.cpp | 17 | ||||
-rw-r--r-- | qpid/cpp/src/qpid/sys/windows/Time.cpp | 13 | ||||
-rw-r--r-- | qpid/cpp/src/tests/Statistics.cpp | 2 | ||||
-rw-r--r-- | qpid/cpp/src/tests/echotest.cpp | 3 | ||||
-rw-r--r-- | qpid/cpp/src/tests/qpid-latency-test.cpp | 5 | ||||
-rw-r--r-- | qpid/cpp/src/tests/qpid-send.cpp | 2 |
19 files changed, 129 insertions, 65 deletions
diff --git a/qpid/cpp/src/CMakeLists.txt b/qpid/cpp/src/CMakeLists.txt index 783b26cc5e..f003a34ff7 100644 --- a/qpid/cpp/src/CMakeLists.txt +++ b/qpid/cpp/src/CMakeLists.txt @@ -687,6 +687,7 @@ else (CMAKE_SYSTEM_NAME STREQUAL Windows) set (qpidcommon_platform_SOURCES qpid/sys/posix/AsynchIO.cpp + qpid/sys/posix/Condition.cpp qpid/sys/posix/Fork.cpp qpid/sys/posix/Path.cpp qpid/sys/posix/FileSysDir.cpp diff --git a/qpid/cpp/src/qmf/AgentSession.cpp b/qpid/cpp/src/qmf/AgentSession.cpp index 1afdc14021..4605285448 100644 --- a/qpid/cpp/src/qmf/AgentSession.cpp +++ b/qpid/cpp/src/qmf/AgentSession.cpp @@ -76,7 +76,7 @@ AgentSessionImpl::AgentSessionImpl(Connection& c, const string& options) : externalStorage(false), autoAllowQueries(true), autoAllowMethods(true), maxSubscriptions(64), minSubInterval(3000), subLifetime(300), publicEvents(true), listenOnDirect(true), strictSecurity(false), maxThreadWaitTime(5), - schemaUpdateTime(uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()))) + schemaUpdateTime(uint64_t(qpid::sys::Duration::FromEpoch())) { // // Set Agent Capability Level @@ -288,7 +288,7 @@ void AgentSessionImpl::registerSchema(Schema& schema) // // Get the news out at the next periodic interval that there is new schema information. // - schemaUpdateTime = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); + schemaUpdateTime = uint64_t(qpid::sys::Duration::FromEpoch()); forceHeartbeat = true; } @@ -509,7 +509,7 @@ void AgentSessionImpl::raiseEvent(const Data& data, int severity) Variant::List list; Variant::Map dataAsMap(DataImplAccess::get(data).asMap()); dataAsMap["_severity"] = severity; - dataAsMap["_timestamp"] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); + dataAsMap["_timestamp"] = uint64_t(qpid::sys::Duration::FromEpoch()); list.push_back(dataAsMap); encode(list, msg); topicSender.send(msg); @@ -591,7 +591,7 @@ void AgentSessionImpl::handleLocateRequest(const Variant::List& predicate, const headers[protocol::HEADER_KEY_APP_ID] = protocol::HEADER_APP_ID_QMF; map["_values"] = attributes; - map["_values"].asMap()[protocol::AGENT_ATTR_TIMESTAMP] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); + map["_values"].asMap()[protocol::AGENT_ATTR_TIMESTAMP] = uint64_t(qpid::sys::Duration::FromEpoch()); map["_values"].asMap()[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = interval; map["_values"].asMap()[protocol::AGENT_ATTR_EPOCH] = bootSequence; map["_values"].asMap()[protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP] = schemaUpdateTime; @@ -883,7 +883,7 @@ void AgentSessionImpl::sendHeartbeat() msg.setSubject(address.str()); map["_values"] = attributes; - map["_values"].asMap()[protocol::AGENT_ATTR_TIMESTAMP] = uint64_t(qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); + map["_values"].asMap()[protocol::AGENT_ATTR_TIMESTAMP] = uint64_t(qpid::sys::Duration::FromEpoch()); map["_values"].asMap()[protocol::AGENT_ATTR_HEARTBEAT_INTERVAL] = interval; map["_values"].asMap()[protocol::AGENT_ATTR_EPOCH] = bootSequence; map["_values"].asMap()[protocol::AGENT_ATTR_SCHEMA_UPDATED_TIMESTAMP] = schemaUpdateTime; @@ -992,7 +992,7 @@ void AgentSessionImpl::run() try { while (!threadCanceled) { - periodicProcessing((uint64_t) qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()) / qpid::sys::TIME_SEC); + periodicProcessing((uint64_t) qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_SEC); Receiver rx; bool valid = session.nextReceiver(rx, Duration::SECOND * maxThreadWaitTime); diff --git a/qpid/cpp/src/qmf/ConsoleSession.cpp b/qpid/cpp/src/qmf/ConsoleSession.cpp index 2dfc894c58..c74d4de8db 100644 --- a/qpid/cpp/src/qmf/ConsoleSession.cpp +++ b/qpid/cpp/src/qmf/ConsoleSession.cpp @@ -645,7 +645,7 @@ void ConsoleSessionImpl::run() try { while (!threadCanceled) { - periodicProcessing((uint64_t) qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now()) / + periodicProcessing((uint64_t) qpid::sys::Duration::FromEpoch() / qpid::sys::TIME_SEC); Receiver rx; diff --git a/qpid/cpp/src/qpid/acl/Acl.cpp b/qpid/cpp/src/qpid/acl/Acl.cpp index 9117894d70..8a71cf7782 100644 --- a/qpid/cpp/src/qpid/acl/Acl.cpp +++ b/qpid/cpp/src/qpid/acl/Acl.cpp @@ -280,9 +280,7 @@ bool Acl::readAclFile(std::string& aclFile, std::string& errorText) { if (mgmtObject!=0){ mgmtObject->set_transferAcl(transferAcl?1:0); mgmtObject->set_policyFile(aclFile); - sys::AbsTime now = sys::AbsTime::now(); - int64_t ns = sys::Duration(sys::EPOCH, now); - mgmtObject->set_lastAclLoad(ns); + mgmtObject->set_lastAclLoad(Duration::FromEpoch()); agent->raiseEvent(_qmf::EventFileLoaded("")); } return true; @@ -305,9 +303,7 @@ void Acl::loadEmptyAclRuleset() { if (mgmtObject!=0){ mgmtObject->set_transferAcl(transferAcl?1:0); mgmtObject->set_policyFile(""); - sys::AbsTime now = sys::AbsTime::now(); - int64_t ns = sys::Duration(sys::EPOCH, now); - mgmtObject->set_lastAclLoad(ns); + mgmtObject->set_lastAclLoad(Duration::FromEpoch()); agent->raiseEvent(_qmf::EventFileLoaded("")); } } diff --git a/qpid/cpp/src/qpid/broker/PagedQueue.cpp b/qpid/cpp/src/qpid/broker/PagedQueue.cpp index 521bd542f7..b5edfb89c0 100644 --- a/qpid/cpp/src/qpid/broker/PagedQueue.cpp +++ b/qpid/cpp/src/qpid/broker/PagedQueue.cpp @@ -32,7 +32,7 @@ namespace broker { namespace { using qpid::sys::AbsTime; using qpid::sys::Duration; -using qpid::sys::EPOCH; +using qpid::sys::ZERO; using qpid::sys::FAR_FUTURE; using qpid::sys::MemoryMappedFile; const uint32_t OVERHEAD(4/*content-size*/ + 4/*sequence-number*/ + 8/*persistence-id*/ + 8/*expiration*/); @@ -54,7 +54,8 @@ size_t encode(const Message& msg, char* data, size_t size) sys::AbsTime expiration = msg.getExpiration(); int64_t t(0); if (expiration < FAR_FUTURE) { - t = Duration(EPOCH, expiration); + // Just need an integer that will round trip + t = Duration(ZERO, expiration); } buffer.putLongLong(t); msg.getPersistentContext()->encode(buffer); @@ -76,7 +77,7 @@ size_t decode(ProtocolRegistry& protocols, Message& msg, const char* data, size_ msg.setSequence(qpid::framing::SequenceNumber(sequence)); msg.getPersistentContext()->setPersistenceId(persistenceId); if (t) { - sys::AbsTime expiration(EPOCH, t); + sys::AbsTime expiration(ZERO, t); msg.getSharedState().setExpiration(expiration); } return encoded + metadata.getPosition(); diff --git a/qpid/cpp/src/qpid/broker/Selector.cpp b/qpid/cpp/src/qpid/broker/Selector.cpp index b9888b4cd4..5bee0b5714 100644 --- a/qpid/cpp/src/qpid/broker/Selector.cpp +++ b/qpid/cpp/src/qpid/broker/Selector.cpp @@ -117,7 +117,7 @@ const Value MessageSelectorEnv::specialValue(const string& id) const qpid::sys::AbsTime expiry = msg.getExpiration(); // Java property has value of 0 for no expiry v = (expiry==qpid::sys::FAR_FUTURE) ? 0 - : qpid::sys::Duration(qpid::sys::AbsTime::Epoch(), expiry) / qpid::sys::TIME_MSEC; + : qpid::sys::Duration(qpid::sys::AbsTime::epoch(), expiry) / qpid::sys::TIME_MSEC; } else if ( id=="creation_time" ) { // Use the time put on queue (if it is enabled) as 0-10 has no standard way to get message // creation time and we're not paying attention to the 1.0 creation time yet. diff --git a/qpid/cpp/src/qpid/broker/SessionManager.cpp b/qpid/cpp/src/qpid/broker/SessionManager.cpp index 8cc58571af..a2955f3bb0 100644 --- a/qpid/cpp/src/qpid/broker/SessionManager.cpp +++ b/qpid/cpp/src/qpid/broker/SessionManager.cpp @@ -70,9 +70,9 @@ void SessionManager::detach(std::auto_ptr<SessionState> session) { attached.erase(session->getId()); session->detach(); if (session->getTimeout() > 0) { - session->expiry = AbsTime(now(),session->getTimeout()*TIME_SEC); + session->expiry = AbsTime(now(), session->getTimeout()*TIME_SEC); if (session->mgmtObject != 0) - session->mgmtObject->set_expireTime ((uint64_t) Duration (EPOCH, session->expiry)); + session->mgmtObject->set_expireTime (Duration::FromEpoch()+session->getTimeout()*TIME_SEC); detached.push_back(session.release()); // In expiry order eraseExpired(); } diff --git a/qpid/cpp/src/qpid/management/ManagementAgent.cpp b/qpid/cpp/src/qpid/management/ManagementAgent.cpp index 0586171dc4..1eab6f622b 100644 --- a/qpid/cpp/src/qpid/management/ManagementAgent.cpp +++ b/qpid/cpp/src/qpid/management/ManagementAgent.cpp @@ -416,7 +416,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi outBuffer.putShortString(event.getPackageName()); outBuffer.putShortString(event.getEventName()); outBuffer.putBin128(event.getMd5Sum()); - outBuffer.putLongLong(uint64_t(sys::Duration(sys::EPOCH, sys::now()))); + outBuffer.putLongLong(sys::Duration::FromEpoch()); outBuffer.putOctet(sev); string sBuf; event.encode(sBuf); @@ -438,7 +438,7 @@ void ManagementAgent::raiseEvent(const ManagementEvent& event, severity_t severi event.getMd5Sum()); event.mapEncode(values); map_["_values"] = values; - map_["_timestamp"] = uint64_t(sys::Duration(sys::EPOCH, sys::now())); + map_["_timestamp"] = uint64_t(sys::Duration::FromEpoch()); map_["_severity"] = sev; headers["method"] = "indication"; @@ -1048,7 +1048,7 @@ void ManagementAgent::periodicProcessing (void) char msgChars[qmfV1BufferSize]; Buffer msgBuffer(msgChars, qmfV1BufferSize); encodeHeader(msgBuffer, 'h'); - msgBuffer.putLongLong(uint64_t(sys::Duration(sys::EPOCH, sys::now()))); + msgBuffer.putLongLong(sys::Duration::FromEpoch()); routingKey = "console.heartbeat.1.0"; sendBuffer(msgBuffer, mExchange, routingKey); @@ -1070,7 +1070,7 @@ void ManagementAgent::periodicProcessing (void) headers["qmf.agent"] = name_address; map["_values"] = attrMap; - map["_values"].asMap()["_timestamp"] = uint64_t(sys::Duration(sys::EPOCH, sys::now())); + map["_values"].asMap()["_timestamp"] = uint64_t(sys::Duration::FromEpoch()); map["_values"].asMap()["_heartbeat_interval"] = interval; map["_values"].asMap()["_epoch"] = bootSequence; @@ -2132,7 +2132,7 @@ void ManagementAgent::handleLocateRequest(const string&, const string& rte, cons headers["qmf.agent"] = name_address; map["_values"] = attrMap; - map["_values"].asMap()["_timestamp"] = uint64_t(sys::Duration(sys::EPOCH, sys::now())); + map["_values"].asMap()["_timestamp"] = uint64_t(sys::Duration::FromEpoch()); map["_values"].asMap()["_heartbeat_interval"] = interval; map["_values"].asMap()["_epoch"] = bootSequence; diff --git a/qpid/cpp/src/qpid/management/ManagementObject.cpp b/qpid/cpp/src/qpid/management/ManagementObject.cpp index b4d469afbe..019963e832 100644 --- a/qpid/cpp/src/qpid/management/ManagementObject.cpp +++ b/qpid/cpp/src/qpid/management/ManagementObject.cpp @@ -246,20 +246,20 @@ ostream& operator<<(ostream& out, const ObjectId& i) }} ManagementObject::ManagementObject(Manageable* _core) : -createTime(qpid::sys::Duration(sys::EPOCH, sys::now())), - destroyTime(0), updateTime(createTime), configChanged(true), - instChanged(true), deleted(false), - coreObject(_core), flags(0), forcePublish(false) {} + createTime(qpid::sys::Duration::FromEpoch()), + destroyTime(0), updateTime(createTime), configChanged(true), + instChanged(true), deleted(false), + coreObject(_core), flags(0), forcePublish(false) {} void ManagementObject::setUpdateTime() { - updateTime = sys::Duration(sys::EPOCH, sys::now()); + updateTime = sys::Duration::FromEpoch(); } void ManagementObject::resourceDestroy() { QPID_LOG(trace, "Management object marked deleted: " << getObjectId().getV2Key()); - destroyTime = sys::Duration(sys::EPOCH, sys::now()); + destroyTime = sys::Duration::FromEpoch(); deleted = true; } diff --git a/qpid/cpp/src/qpid/sys/Time.h b/qpid/cpp/src/qpid/sys/Time.h index 2ccff92e7c..8a82b5f826 100644 --- a/qpid/cpp/src/qpid/sys/Time.h +++ b/qpid/cpp/src/qpid/sys/Time.h @@ -62,11 +62,11 @@ class Duration; * ... * when = AbsTime(when, 2*TIME_SEC); // Advance timer 2 secs * - * AbsTime is not intended to be used to represent calendar dates/times - * but you can construct a Duration since the Unix Epoch, 1970-1-1-00:00, - * so that you can convert to a date/time if needed: + * AbsTime is not intended to be used to represent calendar dates/times. + * There is a specific way to construct a Duration since the Unix Epoch, + * 1970-1-1-00:00: * - * int64_t nanosec_since_epoch = Duration(EPOCH, now()); + * int64_t nanosec_since_epoch = Duration::FromEpoch(); * * There are some sensible operations that are currently missing from * AbsTime, but nearly all that's needed can be done with a mixture of @@ -91,8 +91,9 @@ public: // Default copy constructor fine QPID_COMMON_EXTERN static AbsTime now(); + QPID_COMMON_EXTERN static AbsTime epoch(); QPID_COMMON_EXTERN static AbsTime FarFuture(); - QPID_COMMON_EXTERN static AbsTime Epoch(); + QPID_COMMON_EXTERN static AbsTime Zero(); bool operator==(const AbsTime& t) const { return t.timepoint == timepoint; } @@ -121,6 +122,10 @@ class Duration { public: QPID_COMMON_INLINE_EXTERN inline Duration(int64_t time0 = 0); QPID_COMMON_EXTERN explicit Duration(const AbsTime& start, const AbsTime& finish); + + /** Duration since the Unix epoch: 1970-01-01T00:00:00 */ + QPID_COMMON_EXTERN static Duration FromEpoch(); + inline operator int64_t() const; }; @@ -153,8 +158,8 @@ const Duration TIME_NSEC = 1; /** Value to represent an infinite timeout */ const Duration TIME_INFINITE = std::numeric_limits<int64_t>::max(); -/** Absolute time point for the Unix epoch: 1970-01-01T00:00:00 */ -const AbsTime EPOCH = AbsTime::Epoch(); +/** Absolute time zero point */ +const AbsTime ZERO = AbsTime::Zero(); /** Time greater than any other time */ const AbsTime FAR_FUTURE = AbsTime::FarFuture(); diff --git a/qpid/cpp/src/qpid/sys/posix/Condition.cpp b/qpid/cpp/src/qpid/sys/posix/Condition.cpp new file mode 100644 index 0000000000..f629e50cd7 --- /dev/null +++ b/qpid/cpp/src/qpid/sys/posix/Condition.cpp @@ -0,0 +1,45 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Condition.h" + +namespace qpid { +namespace sys { + +namespace { + +struct ClockMonotonicAttr { + ::pthread_condattr_t attr; + + ClockMonotonicAttr() { + QPID_POSIX_ASSERT_THROW_IF(pthread_condattr_init(&attr)); + QPID_POSIX_ASSERT_THROW_IF(pthread_condattr_setclock(&attr, CLOCK_MONOTONIC)); + } +}; + +} + +Condition::Condition() { + static ClockMonotonicAttr attr; + QPID_POSIX_ASSERT_THROW_IF(pthread_cond_init(&condition, &attr.attr)); +} + +}} diff --git a/qpid/cpp/src/qpid/sys/posix/Condition.h b/qpid/cpp/src/qpid/sys/posix/Condition.h index 36e7557ffd..66f95d5fc8 100644 --- a/qpid/cpp/src/qpid/sys/posix/Condition.h +++ b/qpid/cpp/src/qpid/sys/posix/Condition.h @@ -40,32 +40,28 @@ namespace sys { class Condition { public: - inline Condition(); - inline ~Condition(); - inline void wait(Mutex&); - inline bool wait(Mutex&, const AbsTime& absoluteTime); - inline void notify(); - inline void notifyAll(); + Condition(); + ~Condition(); + void wait(Mutex&); + bool wait(Mutex&, const AbsTime& absoluteTime); + void notify(); + void notifyAll(); private: pthread_cond_t condition; }; -Condition::Condition() { - QPID_POSIX_ASSERT_THROW_IF(pthread_cond_init(&condition, 0)); -} - -Condition::~Condition() { +inline Condition::~Condition() { QPID_POSIX_ABORT_IF(pthread_cond_destroy(&condition)); } -void Condition::wait(Mutex& mutex) { +inline void Condition::wait(Mutex& mutex) { QPID_POSIX_ASSERT_THROW_IF(pthread_cond_wait(&condition, &mutex.mutex)); } -bool Condition::wait(Mutex& mutex, const AbsTime& absoluteTime){ +inline bool Condition::wait(Mutex& mutex, const AbsTime& absoluteTime){ struct timespec ts; - toTimespec(ts, Duration(EPOCH, absoluteTime)); + toTimespec(ts, absoluteTime); int status = pthread_cond_timedwait(&condition, &mutex.mutex, &ts); if (status != 0) { if (status == ETIMEDOUT) return false; @@ -74,11 +70,11 @@ bool Condition::wait(Mutex& mutex, const AbsTime& absoluteTime){ return true; } -void Condition::notify(){ +inline void Condition::notify(){ QPID_POSIX_ASSERT_THROW_IF(pthread_cond_signal(&condition)); } -void Condition::notifyAll(){ +inline void Condition::notifyAll(){ QPID_POSIX_ASSERT_THROW_IF(pthread_cond_broadcast(&condition)); } diff --git a/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h b/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h index 0f59fe3176..34a2022694 100644 --- a/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h +++ b/qpid/cpp/src/qpid/sys/posix/PrivatePosix.h @@ -32,7 +32,7 @@ namespace qpid { namespace sys { // Private Time related implementation details -struct timespec& toTimespec(struct timespec& ts, const Duration& t); +struct timespec& toTimespec(struct timespec& ts, const AbsTime& t); struct timeval& toTimeval(struct timeval& tv, const Duration& t); Duration toTime(const struct timespec& ts); diff --git a/qpid/cpp/src/qpid/sys/posix/Time.cpp b/qpid/cpp/src/qpid/sys/posix/Time.cpp index b78a7e254a..c44640b3e0 100644 --- a/qpid/cpp/src/qpid/sys/posix/Time.cpp +++ b/qpid/cpp/src/qpid/sys/posix/Time.cpp @@ -42,7 +42,7 @@ AbsTime::AbsTime(const AbsTime& t, const Duration& d) : timepoint(d == Duration::max() ? max_abstime() : t.timepoint+d.nanosecs) {} -AbsTime AbsTime::Epoch() { +AbsTime AbsTime::Zero() { AbsTime epoch; epoch.timepoint = 0; return epoch; } @@ -53,12 +53,22 @@ AbsTime AbsTime::FarFuture() { AbsTime AbsTime::now() { struct timespec ts; - ::clock_gettime(CLOCK_REALTIME, &ts); + ::clock_gettime(CLOCK_MONOTONIC, &ts); AbsTime time_now; time_now.timepoint = toTime(ts).nanosecs; return time_now; } +AbsTime AbsTime::epoch() { + return AbsTime(now(), -Duration::FromEpoch()); +} + +Duration Duration::FromEpoch() { + struct timespec ts; + ::clock_gettime(CLOCK_REALTIME, &ts); + return toTime(ts).nanosecs; +} + Duration::Duration(const AbsTime& start, const AbsTime& finish) : nanosecs(finish.timepoint - start.timepoint) {} @@ -68,7 +78,8 @@ namespace { const time_t TIME_T_MAX = std::numeric_limits<time_t>::max(); } -struct timespec& toTimespec(struct timespec& ts, const Duration& t) { +struct timespec& toTimespec(struct timespec& ts, const AbsTime& a) { + Duration t(ZERO, a); Duration secs = t / TIME_SEC; ts.tv_sec = (secs > TIME_T_MAX) ? TIME_T_MAX : static_cast<time_t>(secs); ts.tv_nsec = static_cast<long>(t % TIME_SEC); diff --git a/qpid/cpp/src/qpid/sys/windows/Time.cpp b/qpid/cpp/src/qpid/sys/windows/Time.cpp index a144a2dd5f..4169ef1f0a 100644 --- a/qpid/cpp/src/qpid/sys/windows/Time.cpp +++ b/qpid/cpp/src/qpid/sys/windows/Time.cpp @@ -68,7 +68,13 @@ AbsTime AbsTime::FarFuture() { return ff; } -AbsTime AbsTime::Epoch() { +AbsTime AbsTime::Zero() { + AbsTime time_epoch; + time_epoch.timepoint = boost::posix_time::from_time_t(0); + return time_epoch; +} + +AbsTime AbsTime::epoch() { AbsTime time_epoch; time_epoch.timepoint = boost::posix_time::from_time_t(0); return time_epoch; @@ -80,6 +86,11 @@ AbsTime AbsTime::now() { return time_now; } +Duration Duration::FromEpoch() { + time_duration d = boost::get_system_time() - boost::posix_time::from_time_t(0); + return d.total_nanoseconds(); +} + Duration::Duration(const AbsTime& start, const AbsTime& finish) { time_duration d = finish.timepoint - start.timepoint; nanosecs = d.total_nanoseconds(); diff --git a/qpid/cpp/src/tests/Statistics.cpp b/qpid/cpp/src/tests/Statistics.cpp index 19531762b1..7cacde8b74 100644 --- a/qpid/cpp/src/tests/Statistics.cpp +++ b/qpid/cpp/src/tests/Statistics.cpp @@ -64,7 +64,7 @@ void ThroughputAndLatency::message(const messaging::Message& m) { if (i != m.getProperties().end()) { ++samples; int64_t start(i->second.asInt64()); - int64_t end(sys::Duration(sys::EPOCH, sys::now())); + int64_t end(sys::Duration::FromEpoch()); double latency = double(end - start)/sys::TIME_MSEC; if (latency > 0) { total += latency; diff --git a/qpid/cpp/src/tests/echotest.cpp b/qpid/cpp/src/tests/echotest.cpp index 5114ab883d..7c30989098 100644 --- a/qpid/cpp/src/tests/echotest.cpp +++ b/qpid/cpp/src/tests/echotest.cpp @@ -64,8 +64,7 @@ struct Args : public qpid::Options, uint64_t current_time() { - Duration t(EPOCH, now()); - return t; + return Duration::FromEpoch(); } class Listener : public MessageListener diff --git a/qpid/cpp/src/tests/qpid-latency-test.cpp b/qpid/cpp/src/tests/qpid-latency-test.cpp index ba53ba0459..a03963467b 100644 --- a/qpid/cpp/src/tests/qpid-latency-test.cpp +++ b/qpid/cpp/src/tests/qpid-latency-test.cpp @@ -97,8 +97,7 @@ Connection globalConnection; uint64_t current_time() { - Duration t(EPOCH, now()); - return t; + return Duration::FromEpoch(); } struct Stats @@ -364,7 +363,7 @@ void Sender::sendByRate() AbsTime last = start; while (true) { AbsTime sentAt=now(); - msg.getDeliveryProperties().setTimestamp(Duration(EPOCH, sentAt)); + msg.getDeliveryProperties().setTimestamp(Duration::FromEpoch()); async(session).messageTransfer(arg::content=msg, arg::acceptMode=1); if (opts.sync) session.sync(); ++sent; diff --git a/qpid/cpp/src/tests/qpid-send.cpp b/qpid/cpp/src/tests/qpid-send.cpp index 78702f7a85..71ce1e32bb 100644 --- a/qpid/cpp/src/tests/qpid-send.cpp +++ b/qpid/cpp/src/tests/qpid-send.cpp @@ -409,7 +409,7 @@ int main(int argc, char ** argv) if (opts.timestamp) msg.getProperties()[TS] = int64_t( - qpid::sys::Duration(qpid::sys::EPOCH, qpid::sys::now())); + qpid::sys::Duration::FromEpoch()); sender.send(msg); reporter.message(msg); |