summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorStephen D. Huston <shuston@apache.org>2008-10-30 23:18:07 +0000
committerStephen D. Huston <shuston@apache.org>2008-10-30 23:18:07 +0000
commit18af9009d719ce758678185ae0a57967da5ca6fe (patch)
treea291bdc0dc2e139249ddad5c0bb883fe8dde426a /cpp/src
parent5d74de00280c14e33964b099d6658a147479b4ca (diff)
downloadqpid-python-18af9009d719ce758678185ae0a57967da5ca6fe.tar.gz
Resolve Time diffs for Windows; add Windows version of asynch I/O layer. Resolves QPID-1209
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@709285 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/qpid/client/FailoverSession.cpp5
-rw-r--r--cpp/src/qpid/sys/IOHandle.h16
-rw-r--r--cpp/src/qpid/sys/windows/AsynchIO.cpp38
-rwxr-xr-xcpp/src/qpid/sys/windows/AsynchIoResult.h4
-rwxr-xr-xcpp/src/qpid/sys/windows/Condition.h4
-rw-r--r--cpp/src/qpid/sys/windows/Time.cpp90
-rw-r--r--cpp/src/qpid/sys/windows/Time.h36
7 files changed, 173 insertions, 20 deletions
diff --git a/cpp/src/qpid/client/FailoverSession.cpp b/cpp/src/qpid/client/FailoverSession.cpp
index d11d5aa362..5cb887a9f6 100644
--- a/cpp/src/qpid/client/FailoverSession.cpp
+++ b/cpp/src/qpid/client/FailoverSession.cpp
@@ -26,6 +26,7 @@
#include "qpid/log/Logger.h"
#include "qpid/log/Options.h"
#include "qpid/log/Statement.h"
+#include "qpid/sys/Time.h"
#include "qpid/client/FailoverConnection.h"
#include "qpid/client/FailoverSession.h"
@@ -377,7 +378,7 @@ FailoverSession::messageTransfer ( const string& destination,
if ( ! failover_in_progress )
break;
- usleep ( 1000 );
+ qpid::sys::usleep ( 1000 );
}
}
}
@@ -1416,7 +1417,7 @@ FailoverSession::prepareForFailover ( Connection newConnection )
{
newSession = newConnection.newSession();
}
- catch ( const std::exception& error )
+ catch ( const std::exception& /*error*/ )
{
throw Exception(QPID_MSG("Can't create failover session."));
}
diff --git a/cpp/src/qpid/sys/IOHandle.h b/cpp/src/qpid/sys/IOHandle.h
index d06512da58..0bf2abbafa 100644
--- a/cpp/src/qpid/sys/IOHandle.h
+++ b/cpp/src/qpid/sys/IOHandle.h
@@ -26,11 +26,25 @@ namespace qpid {
namespace sys {
/**
- * This is a class intended to abstract the Unix concept of file descriptor or the Windows concept of HANDLE
+ * This is a class intended to abstract the Unix concept of file descriptor
+ * or the Windows concept of HANDLE
*/
+// Windows-related classes
+class AsynchAcceptorPrivate;
+class AsynchAcceptResult;
+namespace windows {
+ class AsynchIO;
+}
+
+// General classes
class PollerHandle;
class IOHandlePrivate;
class IOHandle {
+
+ friend class AsynchAcceptorPrivate;
+ friend class AsynchAcceptResult;
+ friend class windows::AsynchIO;
+
friend class PollerHandle;
protected:
diff --git a/cpp/src/qpid/sys/windows/AsynchIO.cpp b/cpp/src/qpid/sys/windows/AsynchIO.cpp
index bde1213131..e5efc874aa 100644
--- a/cpp/src/qpid/sys/windows/AsynchIO.cpp
+++ b/cpp/src/qpid/sys/windows/AsynchIO.cpp
@@ -32,6 +32,7 @@
#include <boost/thread/once.hpp>
+#include <queue>
#include <winsock2.h>
#include <mswsock.h>
#include <windows.h>
@@ -147,7 +148,7 @@ void AsynchAcceptorPrivate::start(Poller::shared_ptr poller) {
restart ();
}
-void AsynchAcceptor::restart(void) {
+void AsynchAcceptorPrivate::restart(void) {
DWORD bytesReceived = 0; // Not used, needed for AcceptEx API
AsynchAcceptResult *result = new AsynchAcceptResult(acceptedCallback,
this,
@@ -166,7 +167,7 @@ void AsynchAcceptor::restart(void) {
AsynchAcceptResult::AsynchAcceptResult(AsynchAcceptor::Callback cb,
- AsynchAcceptor *acceptor,
+ AsynchAcceptorPrivate *acceptor,
SOCKET listener)
: callback(cb), acceptor(acceptor), listener(listener) {
newSocket.reset (new Socket());
@@ -318,21 +319,27 @@ private:
volatile bool queuedClose;
private:
- void close(void);
+ // Dispatch events that have completed.
+ void dispatchReadComplete(AsynchIO::BufferBase *buffer);
+ void notifyEof(void);
+ void notifyDisconnect(void);
+ void notifyClosed(void);
+ void notifyBuffersEmpty(void);
+ void notifyIdle(void);
/**
* Initiate a read operation. AsynchIO::dispatchReadComplete() will be
* called when the read is complete and data is available.
*/
- virtual void startRead(void);
+ void startRead(void);
/**
* Initiate a write of the specified buffer. There's no callback for
* write completion to the AsynchIO object.
*/
- virtual void startWrite(AsynchIO::BufferBase* buff);
+ void startWrite(AsynchIO::BufferBase* buff);
- virtual bool writesNotComplete();
+ void close(void);
/**
* readComplete is called when a read request is complete.
@@ -602,10 +609,6 @@ void AsynchIO::startWrite(AsynchIO::BufferBase* buff) {
return;
}
-bool AsynchIO::writesNotComplete() {
- return writeInProgress;
-}
-
/*
* Close the socket and callback to say we've done it
*/
@@ -726,4 +729,17 @@ void AsynchIO::completion(AsynchIoResult *result) {
delete this;
}
-}} // namespace qpid::windows
+} // namespace windows
+
+AsynchIO* qpid::sys::AsynchIO::create(const Socket& s,
+ AsynchIO::ReadCallback rCb,
+ AsynchIO::EofCallback eofCb,
+ AsynchIO::DisconnectCallback disCb,
+ AsynchIO::ClosedCallback cCb,
+ AsynchIO::BuffersEmptyCallback eCb,
+ AsynchIO::IdleCallback iCb)
+{
+ return new qpid::sys::windows::AsynchIO(s, rCb, eofCb, disCb, cCb, eCb, iCb);
+}
+
+}} // namespace qpid::sys
diff --git a/cpp/src/qpid/sys/windows/AsynchIoResult.h b/cpp/src/qpid/sys/windows/AsynchIoResult.h
index 9efdaebda8..7db4e9c331 100755
--- a/cpp/src/qpid/sys/windows/AsynchIoResult.h
+++ b/cpp/src/qpid/sys/windows/AsynchIoResult.h
@@ -22,7 +22,7 @@
*
*/
-#include "AsynchIO.h"
+#include "qpid/sys/AsynchIO.h"
#include "qpid/sys/Socket.h"
#include <memory.h>
#include <winsock2.h>
@@ -101,8 +101,6 @@ private:
char addressBuffer[SOCKADDRBUFLEN];
};
-class AsynchIO;
-
class AsynchIoResult : public AsynchResult {
public:
typedef boost::function1<void, AsynchIoResult *> Completer;
diff --git a/cpp/src/qpid/sys/windows/Condition.h b/cpp/src/qpid/sys/windows/Condition.h
index e16fa2a176..979fae9b0a 100755
--- a/cpp/src/qpid/sys/windows/Condition.h
+++ b/cpp/src/qpid/sys/windows/Condition.h
@@ -65,9 +65,7 @@ void Condition::wait(Mutex& mutex) {
}
bool Condition::wait(Mutex& mutex, const AbsTime& absoluteTime){
- boost::system_time st;
- toPtime(st, absoluteTime);
- return condition.timed_wait(mutex.mutex, st);
+ return condition.timed_wait(mutex.mutex, absoluteTime.getPrivate());
}
void Condition::notify(){
diff --git a/cpp/src/qpid/sys/windows/Time.cpp b/cpp/src/qpid/sys/windows/Time.cpp
new file mode 100644
index 0000000000..477a06656c
--- /dev/null
+++ b/cpp/src/qpid/sys/windows/Time.cpp
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/sys/Time.h"
+#include <ostream>
+#include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/thread/thread_time.hpp>
+#include <windows.h>
+
+using namespace boost::posix_time;
+
+namespace qpid {
+namespace sys {
+
+AbsTime::AbsTime(const AbsTime& t, const Duration& d) {
+ if (d == Duration::max()) {
+ timepoint = ptime(max_date_time);
+ }
+ else {
+ time_duration td = microseconds(d.nanosecs / 1000);
+ timepoint = t.timepoint + td;
+ }
+}
+
+AbsTime AbsTime::FarFuture() {
+ AbsTime ff;
+ ptime maxd(max_date_time);
+ ff.timepoint = maxd;
+ return ff;
+}
+
+AbsTime AbsTime::now() {
+ AbsTime time_now;
+ time_now.timepoint = boost::get_system_time();
+ return time_now;
+}
+
+Duration::Duration(const AbsTime& time0) : nanosecs(0) {
+ time_period p(ptime(min_date_time), time0.timepoint);
+ nanosecs = p.length().total_nanoseconds();
+}
+
+Duration::Duration(const AbsTime& start, const AbsTime& finish) {
+ time_duration d = finish.timepoint - start.timepoint;
+ nanosecs = d.total_nanoseconds();
+}
+
+std::ostream& operator<<(std::ostream& o, const Duration& d) {
+ return o << int64_t(d) << "ns";
+}
+
+std::ostream& operator<<(std::ostream& o, const AbsTime& t) {
+ std::string time_string = to_simple_string(t.timepoint);
+ return o << time_string;
+}
+
+void toPtime(ptime& pt, const AbsTime& t) {
+ pt = t.getPrivate();
+}
+
+void sleep(int secs) {
+ ::Sleep(secs * 1000);
+}
+
+void usleep(uint32_t usecs) {
+ DWORD msecs = usecs / 1000;
+ if (msecs == 0)
+ msecs = 1;
+ ::Sleep(msecs);
+}
+
+}}
diff --git a/cpp/src/qpid/sys/windows/Time.h b/cpp/src/qpid/sys/windows/Time.h
new file mode 100644
index 0000000000..49b3c4bab3
--- /dev/null
+++ b/cpp/src/qpid/sys/windows/Time.h
@@ -0,0 +1,36 @@
+#ifndef QPID_SYS_WINDOWS_TIME_H
+#define QPID_SYS_WINDOWS_TIME_H
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include <boost/date_time/posix_time/posix_time.hpp>
+
+namespace qpid {
+namespace sys {
+
+/**
+ * Class to represent an instant in time. Boost has this stuff already done
+ * so just reuse it. We can also grab this for quick use with the Condition
+ * wait operations.
+ */
+typedef boost::posix_time::ptime TimePrivate;
+
+}} // namespace qpid::sys
+
+#endif /*!QPID_SYS_WINDOWS_TIME_H*/