From 9a933ae9011d343a75929136269fe45c6b863a17 Mon Sep 17 00:00:00 2001 From: Andrew Stitcher Date: Tue, 6 Jan 2009 23:42:18 +0000 Subject: Work on the low level IO code: * Introduce code so that you can interrupt waiting for a handle and receive a callback that is correctly serialised with the IO callbacks for that handle git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@732177 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/tests/DispatcherTest.cpp | 87 +++++++++++++++++++++++++++++++++------- cpp/src/tests/Makefile.am | 7 ++++ cpp/src/tests/PollerTest.cpp | 54 ++++++++++++++++++++----- 3 files changed, 125 insertions(+), 23 deletions(-) (limited to 'cpp/src/tests') diff --git a/cpp/src/tests/DispatcherTest.cpp b/cpp/src/tests/DispatcherTest.cpp index 7631956acc..c2f6bca12a 100644 --- a/cpp/src/tests/DispatcherTest.cpp +++ b/cpp/src/tests/DispatcherTest.cpp @@ -20,7 +20,10 @@ */ #include "qpid/sys/Poller.h" +#include "qpid/sys/IOHandle.h" #include "qpid/sys/Dispatcher.h" +#include "qpid/sys/DispatchHandle.h" +#include "qpid/sys/posix/PrivatePosix.h" #include "qpid/sys/Thread.h" #include @@ -28,6 +31,7 @@ #include #include #include +#include #include #include @@ -74,7 +78,26 @@ void reader(DispatchHandle& h, int fd) { h.rewatch(); } -int main(int argc, char** argv) +DispatchHandle* rh = 0; +DispatchHandle* wh = 0; + +void rInterrupt(DispatchHandle&) { + cerr << "R"; +} + +void wInterrupt(DispatchHandle&) { + cerr << "W"; +} + +DispatchHandle::Callback rcb = rInterrupt; +DispatchHandle::Callback wcb = wInterrupt; + +void timer_handler(int /*signo*/, siginfo_t* /*info*/, void* /*context*/) { + rh->call(rcb); + wh->call(wcb); +} + +int main(int /*argc*/, char** /*argv*/) { // Create poller Poller::shared_ptr poller(new Poller); @@ -82,12 +105,12 @@ int main(int argc, char** argv) // Create dispatcher thread Dispatcher d(poller); Dispatcher d1(poller); - //Dispatcher d2(poller); - //Dispatcher d3(poller); + Dispatcher d2(poller); + Dispatcher d3(poller); Thread dt(d); Thread dt1(d1); - //Thread dt2(d2); - //Thread dt3(d3); + Thread dt2(d2); + Thread dt3(d3); // Setup sender and receiver int sv[2]; @@ -106,22 +129,58 @@ int main(int argc, char** argv) for (int i = 0; i < 8; i++) testString += testString; - DispatchHandle rh(sv[0], boost::bind(reader, _1, sv[0]), 0); - DispatchHandle wh(sv[1], 0, boost::bind(writer, _1, sv[1], testString)); + PosixIOHandle f0(sv[0]); + PosixIOHandle f1(sv[1]); - rh.watch(poller); - wh.watch(poller); + rh = new DispatchHandle(f0, boost::bind(reader, _1, sv[0]), 0, 0); + wh = new DispatchHandle(f1, 0, boost::bind(writer, _1, sv[1], testString), 0); - // wait 2 minutes then shutdown - sleep(60); + rh->startWatch(poller); + wh->startWatch(poller); + + // Set up a regular itimer interupt + + // Ignore signal in this thread + ::sigset_t sm; + ::sigemptyset(&sm); + ::sigaddset(&sm, SIGRTMIN); + ::pthread_sigmask(SIG_BLOCK, &sm, 0); + + // Signal handling + struct ::sigaction sa; + sa.sa_sigaction = timer_handler; + sa.sa_flags = SA_RESTART | SA_SIGINFO; + ::sigemptyset(&sa.sa_mask); + rc = ::sigaction(SIGRTMIN, &sa,0); + assert(rc == 0); + + ::sigevent se; + se.sigev_notify = SIGEV_SIGNAL; + se.sigev_signo = SIGRTMIN; + se.sigev_value.sival_ptr = 0; + timer_t timer; + rc = ::timer_create(CLOCK_REALTIME, &se, &timer); + assert(rc == 0); + itimerspec ts = { + /*.it_value = */ {2, 0}, // s, ns + /*.it_interval = */ {2, 0}}; // s, ns + + rc = ::timer_settime(timer, 0, &ts, 0); + assert(rc == 0); + + // wait 2 minutes then shutdown + ::sleep(60); + + rc = ::timer_delete(timer); + assert(rc == 0); poller->shutdown(); dt.join(); dt1.join(); - //dt2.join(); - //dt3.join(); + dt2.join(); + dt3.join(); - cout << "Wrote: " << writtenBytes << "\n"; + cout << "\nWrote: " << writtenBytes << "\n"; cout << "Read: " << readBytes << "\n"; return 0; diff --git a/cpp/src/tests/Makefile.am b/cpp/src/tests/Makefile.am index 3a608b2bae..47439d0bab 100644 --- a/cpp/src/tests/Makefile.am +++ b/cpp/src/tests/Makefile.am @@ -187,6 +187,13 @@ check_PROGRAMS+=sender sender_SOURCES=sender.cpp TestOptions.h ConnectionOptions.h sender_LDADD=$(lib_client) +check_PROGRAMS+=PollerTest +PollerTest_SOURCES=PollerTest.cpp +PollerTest_LDADD=$(lib_common) + +check_PROGRAMS+=DispatcherTest +DispatcherTest_SOURCES=DispatcherTest.cpp +DispatcherTest_LDADD=$(lib_common) TESTS_ENVIRONMENT = VALGRIND=$(VALGRIND) srcdir=$(srcdir) QPID_DATA_DIR= BOOST_TEST_SHOW_PROGRESS=yes $(srcdir)/run_test diff --git a/cpp/src/tests/PollerTest.cpp b/cpp/src/tests/PollerTest.cpp index fcb1d0dadf..4f11dc5901 100644 --- a/cpp/src/tests/PollerTest.cpp +++ b/cpp/src/tests/PollerTest.cpp @@ -23,7 +23,9 @@ * Use socketpair to test the poller */ +#include "qpid/sys/IOHandle.h" #include "qpid/sys/Poller.h" +#include "qpid/sys/posix/PrivatePosix.h" #include #include @@ -67,7 +69,7 @@ int readALot(int fd) { return bytesRead; } -int main(int argc, char** argv) +int main(int /*argc*/, char** /*argv*/) { try { @@ -103,15 +105,18 @@ int main(int argc, char** argv) auto_ptr poller(new Poller); - PollerHandle h0(sv[0]); - PollerHandle h1(sv[1]); + PosixIOHandle f0(sv[0]); + PosixIOHandle f1(sv[1]); + + PollerHandle h0(f0); + PollerHandle h1(f1); poller->addFd(h0, Poller::INOUT); - // Wait for 500ms - h0 should be writable + // h0 should be writable Poller::Event event = poller->wait(); assert(event.handle == &h0); - assert(event.dir == Poller::OUT); + assert(event.type == Poller::WRITABLE); // Write as much as we can to socket 0 bytesWritten = writeALot(sv[0], testString); @@ -126,17 +131,48 @@ int main(int argc, char** argv) poller->addFd(h1, Poller::INOUT); event = poller->wait(); assert(event.handle == &h1); - assert(event.dir == Poller::INOUT); + assert(event.type == Poller::READ_WRITABLE); + + // Can't interrupt, it's not active + assert(poller->interrupt(h1) == false); bytesRead = readALot(sv[1]); assert(bytesRead == bytesWritten); cout << "Read(1): " << bytesRead << " bytes\n"; + + // Test poller interrupt + assert(poller->interrupt(h0) == true); + event = poller->wait(); + assert(event.handle == &h0); + assert(event.type == Poller::INTERRUPTED); + assert(poller->interrupt(h0) == false); + + // Test multiple interrupts + poller->rearmFd(h0); + poller->rearmFd(h1); + assert(poller->interrupt(h0) == true); + assert(poller->interrupt(h1) == true); + // Make sure we can't interrupt them again + assert(poller->interrupt(h0) == false); + assert(poller->interrupt(h1) == false); + + // Make sure that they both come out in the correct order + event = poller->wait(); + assert(event.handle == &h0); + assert(event.type == Poller::INTERRUPTED); + assert(poller->interrupt(h0) == false); + event = poller->wait(); + assert(event.handle == &h1); + assert(event.type == Poller::INTERRUPTED); + assert(poller->interrupt(h1) == false); + // At this point h1 should have been disabled from the poller // (as it was just returned) and h0 can write again + poller->rearmFd(h0); event = poller->wait(); assert(event.handle == &h0); - assert(event.dir == Poller::OUT); + assert(event.type == Poller::WRITABLE); // Now both the handles should be disabled event = poller->wait(500000000); @@ -146,11 +182,11 @@ int main(int argc, char** argv) poller->shutdown(); event = poller->wait(); assert(event.handle == 0); - assert(event.dir == Poller::SHUTDOWN); + assert(event.type == Poller::SHUTDOWN); event = poller->wait(); assert(event.handle == 0); - assert(event.dir == Poller::SHUTDOWN); + assert(event.type == Poller::SHUTDOWN); poller->delFd(h1); poller->delFd(h0); -- cgit v1.2.1