diff options
| author | Rafael H. Schloming <rhs@apache.org> | 2009-12-26 12:42:57 +0000 |
|---|---|---|
| committer | Rafael H. Schloming <rhs@apache.org> | 2009-12-26 12:42:57 +0000 |
| commit | 248f1fe188fe2307b9dcf2c87a83b653eaa1920c (patch) | |
| tree | d5d0959a70218946ff72e107a6c106e32479a398 /cpp/src/tests/DispatcherTest.cpp | |
| parent | 3c83a0e3ec7cf4dc23e83a340b25f5fc1676f937 (diff) | |
| download | qpid-python-248f1fe188fe2307b9dcf2c87a83b653eaa1920c.tar.gz | |
synchronized with trunk except for ruby dir
git-svn-id: https://svn.apache.org/repos/asf/qpid/branches/qpid.rnr@893970 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/tests/DispatcherTest.cpp')
| -rw-r--r-- | cpp/src/tests/DispatcherTest.cpp | 173 |
1 files changed, 147 insertions, 26 deletions
diff --git a/cpp/src/tests/DispatcherTest.cpp b/cpp/src/tests/DispatcherTest.cpp index 7631956acc..17b3b4e3e6 100644 --- a/cpp/src/tests/DispatcherTest.cpp +++ b/cpp/src/tests/DispatcherTest.cpp @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -20,7 +20,10 @@ */ #include "qpid/sys/Poller.h" +#include "qpid/sys/IOHandle.h" #include "qpid/sys/Dispatcher.h" +#include "qpid/sys/DispatchHandle.h" +#include "qpid/sys/posix/PrivatePosix.h" #include "qpid/sys/Thread.h" #include <sys/types.h> @@ -28,6 +31,7 @@ #include <fcntl.h> #include <unistd.h> #include <errno.h> +#include <signal.h> #include <iostream> #include <boost/bind.hpp> @@ -35,6 +39,9 @@ using namespace std; using namespace qpid::sys; +namespace qpid { +namespace tests { + int writeALot(int fd, const string& s) { int bytesWritten = 0; do { @@ -42,7 +49,7 @@ int writeALot(int fd, const string& s) { int lastWrite = ::write(fd, s.c_str(), s.size()); if ( lastWrite >= 0) { bytesWritten += lastWrite; - } + } } while (errno != EAGAIN); return bytesWritten; } @@ -50,13 +57,13 @@ int writeALot(int fd, const string& s) { int readALot(int fd) { int bytesRead = 0; char buf[10240]; - + do { errno = 0; int lastRead = ::read(fd, buf, sizeof(buf)); if ( lastRead >= 0) { bytesRead += lastRead; - } + } } while (errno != EAGAIN); return bytesRead; } @@ -74,55 +81,169 @@ void reader(DispatchHandle& h, int fd) { h.rewatch(); } -int main(int argc, char** argv) +void rInterrupt(DispatchHandle&) { + cerr << "R"; +} + +void wInterrupt(DispatchHandle&) { + cerr << "W"; +} + +DispatchHandle::Callback rcb = rInterrupt; +DispatchHandle::Callback wcb = wInterrupt; + +DispatchHandleRef *volatile rh = 0; +DispatchHandleRef *volatile wh = 0; + +volatile bool stopWait = false; +volatile bool phase1finished = false; + +timer_t timer; + +void stop_handler(int /*signo*/, siginfo_t* /*info*/, void* /*context*/) { + stopWait = true; +} + +void timer_handler(int /*signo*/, siginfo_t* /*info*/, void* /*context*/) { + static int count = 0; + if (count++ < 10) { + rh->call(rcb); + wh->call(wcb); + } else { + phase1finished = true; + assert(::timer_delete(timer) == 0); + } +} + +}} // namespace qpid::tests + +using namespace qpid::tests; + +int main(int /*argc*/, char** /*argv*/) { // Create poller Poller::shared_ptr poller(new Poller); - + // Create dispatcher thread Dispatcher d(poller); Dispatcher d1(poller); - //Dispatcher d2(poller); - //Dispatcher d3(poller); + Dispatcher d2(poller); + Dispatcher d3(poller); Thread dt(d); Thread dt1(d1); - //Thread dt2(d2); - //Thread dt3(d3); + Thread dt2(d2); + Thread dt3(d3); // Setup sender and receiver int sv[2]; - int rc = ::socketpair(AF_LOCAL, SOCK_STREAM, 0, sv); + int rc = ::socketpair(AF_UNIX, SOCK_STREAM, 0, sv); assert(rc >= 0); - + // Set non-blocking rc = ::fcntl(sv[0], F_SETFL, O_NONBLOCK); assert(rc >= 0); rc = ::fcntl(sv[1], F_SETFL, O_NONBLOCK); assert(rc >= 0); - + // Make up a large string string testString = "This is only a test ... 1,2,3,4,5,6,7,8,9,10;"; for (int i = 0; i < 8; i++) testString += testString; - DispatchHandle rh(sv[0], boost::bind(reader, _1, sv[0]), 0); - DispatchHandle wh(sv[1], 0, boost::bind(writer, _1, sv[1], testString)); + PosixIOHandle f0(sv[0]); + PosixIOHandle f1(sv[1]); + + rh = new DispatchHandleRef(f0, boost::bind(reader, _1, sv[0]), 0, 0); + wh = new DispatchHandleRef(f1, 0, boost::bind(writer, _1, sv[1], testString), 0); + + rh->startWatch(poller); + wh->startWatch(poller); + + // Set up a regular itimer interupt + + // Ignore signal in this thread + ::sigset_t sm; + ::sigemptyset(&sm); + ::sigaddset(&sm, SIGRTMIN); + ::pthread_sigmask(SIG_BLOCK, &sm, 0); + + // Signal handling + struct ::sigaction sa; + sa.sa_sigaction = timer_handler; + sa.sa_flags = SA_RESTART | SA_SIGINFO; + ::sigemptyset(&sa.sa_mask); + rc = ::sigaction(SIGRTMIN, &sa,0); + assert(rc == 0); + + ::sigevent se; + ::memset(&se, 0, sizeof(se)); // Clear to make valgrind happy (this *is* the neatest way to do this portably - sigh) + se.sigev_notify = SIGEV_SIGNAL; + se.sigev_signo = SIGRTMIN; + rc = ::timer_create(CLOCK_REALTIME, &se, &timer); + assert(rc == 0); + + itimerspec ts = { + /*.it_value = */ {2, 0}, // s, ns + /*.it_interval = */ {2, 0}}; // s, ns + + rc = ::timer_settime(timer, 0, &ts, 0); + assert(rc == 0); + + // wait + while (!phase1finished) { + ::sleep(1); + } + + // Now test deleting/creating DispatchHandles in tight loop, so that we are likely to still be using the + // attached PollerHandles after deleting the DispatchHandle + DispatchHandleRef* t = wh; + wh = 0; + delete t; + t = rh; + rh = 0; + delete t; + + sa.sa_sigaction = stop_handler; + rc = ::sigaction(SIGRTMIN, &sa,0); + assert(rc == 0); + + itimerspec nts = { + /*.it_value = */ {30, 0}, // s, ns + /*.it_interval = */ {30, 0}}; // s, ns + + rc = ::timer_create(CLOCK_REALTIME, &se, &timer); + assert(rc == 0); + rc = ::timer_settime(timer, 0, &nts, 0); + assert(rc == 0); + + DispatchHandleRef* rh1; + DispatchHandleRef* wh1; + + struct timespec w = {0, 1000000}; + while (!stopWait) { + rh1 = new DispatchHandleRef(f0, boost::bind(reader, _1, sv[0]), 0, 0); + wh1 = new DispatchHandleRef(f1, 0, boost::bind(writer, _1, sv[1], testString), 0); + rh1->startWatch(poller); + wh1->startWatch(poller); + + ::nanosleep(&w, 0); + + delete wh1; + delete rh1; + } + + rc = ::timer_delete(timer); + assert(rc == 0); - rh.watch(poller); - wh.watch(poller); - - // wait 2 minutes then shutdown - sleep(60); - poller->shutdown(); dt.join(); dt1.join(); - //dt2.join(); - //dt3.join(); + dt2.join(); + dt3.join(); - cout << "Wrote: " << writtenBytes << "\n"; + cout << "\nWrote: " << writtenBytes << "\n"; cout << "Read: " << readBytes << "\n"; - + return 0; } |
