summaryrefslogtreecommitdiff
path: root/lib/IO/Async/Loop
diff options
context:
space:
mode:
authorLorry Tar Creator <lorry-tar-importer@lorry>2015-06-01 14:15:30 +0000
committerLorry Tar Creator <lorry-tar-importer@lorry>2015-06-01 14:15:30 +0000
commit1425eea04dd872dc6313f5315f317b2de288037c (patch)
treef81c74f75429e829714029850f89ee4c7f13aa39 /lib/IO/Async/Loop
downloadIO-Async-tarball-master.tar.gz
Diffstat (limited to 'lib/IO/Async/Loop')
-rw-r--r--lib/IO/Async/Loop/Poll.pm350
-rw-r--r--lib/IO/Async/Loop/Select.pm294
2 files changed, 644 insertions, 0 deletions
diff --git a/lib/IO/Async/Loop/Poll.pm b/lib/IO/Async/Loop/Poll.pm
new file mode 100644
index 0000000..fb7bbf1
--- /dev/null
+++ b/lib/IO/Async/Loop/Poll.pm
@@ -0,0 +1,350 @@
+# You may distribute under the terms of either the GNU General Public License
+# or the Artistic License (the same terms as Perl itself)
+#
+# (C) Paul Evans, 2007-2015 -- leonerd@leonerd.org.uk
+
+package IO::Async::Loop::Poll;
+
+use strict;
+use warnings;
+
+our $VERSION = '0.67';
+use constant API_VERSION => '0.49';
+
+use base qw( IO::Async::Loop );
+
+use Carp;
+
+use IO::Poll qw( POLLIN POLLOUT POLLPRI POLLHUP POLLERR );
+
+use Errno qw( EINTR );
+use Fcntl qw( S_ISREG );
+
+# Only Linux, or FreeBSD 8.0 and above, are known always to be able to report
+# EOF conditions on filehandles using POLLHUP
+use constant _CAN_ON_HANGUP =>
+ ( $^O eq "linux" ) ||
+ ( $^O eq "freebsd" and do { no warnings 'numeric'; (POSIX::uname)[2] >= 8.0 } );
+
+# poll() on most platforms claims that ISREG files are always read- and
+# write-ready, but not on MSWin32. We need to fake this
+use constant FAKE_ISREG_READY => IO::Async::OS->HAVE_FAKE_ISREG_READY;
+# poll() on most platforms indicates POLLOUT when connect() fails, but not on
+# MSWin32. Have to poll also for POLLPRI in that case
+use constant POLL_CONNECT_POLLPRI => IO::Async::OS->HAVE_POLL_CONNECT_POLLPRI;
+
+use constant _CAN_WATCHDOG => 1;
+use constant WATCHDOG_ENABLE => IO::Async::Loop->WATCHDOG_ENABLE;
+
+=head1 NAME
+
+C<IO::Async::Loop::Poll> - use C<IO::Async> with C<poll(2)>
+
+=head1 SYNOPSIS
+
+Normally an instance of this class would not be directly constructed by a
+program. It may however, be useful for runinng L<IO::Async> with an existing
+program already using an C<IO::Poll> object.
+
+ use IO::Poll;
+ use IO::Async::Loop::Poll;
+
+ my $poll = IO::Poll->new;
+ my $loop = IO::Async::Loop::Poll->new( poll => $poll );
+
+ $loop->add( ... );
+
+ while(1) {
+ my $timeout = ...
+ my $ret = $poll->poll( $timeout );
+ $loop->post_poll;
+ }
+
+=head1 DESCRIPTION
+
+This subclass of C<IO::Async::Loop> uses the C<poll(2)> system call to perform
+read-ready and write-ready tests.
+
+By default, this loop will use the underlying C<poll()> system call directly,
+bypassing the usual L<IO::Poll> object wrapper around it because of a number
+of bugs and design flaws in that class; namely
+
+=over 2
+
+=item *
+
+L<https://rt.cpan.org/Ticket/Display.html?id=93107> - IO::Poll relies on
+stable stringification of IO handles
+
+=item *
+
+L<https://rt.cpan.org/Ticket/Display.html?id=25049> - IO::Poll->poll() with no
+handles always returns immediately
+
+=back
+
+However, to integrate with existing code that uses an C<IO::Poll> object, a
+C<post_poll> can be called immediately after the C<poll> method that
+C<IO::Poll> object. The appropriate mask bits are maintained on the
+C<IO::Poll> object when notifiers are added or removed from the loop, or when
+they change their C<want_*> status. The C<post_poll> method inspects the
+result bits and invokes the C<on_read_ready> or C<on_write_ready> methods on
+the notifiers.
+
+=cut
+
+=head1 CONSTRUCTOR
+
+=cut
+
+=head2 $loop = IO::Async::Loop::Poll->new( %args )
+
+This function returns a new instance of a C<IO::Async::Loop::Poll> object. It
+takes the following named arguments:
+
+=over 8
+
+=item C<poll>
+
+The C<IO::Poll> object to use for notification. Optional; if a value is not
+given, the underlying C<IO::Poll::_poll()> function is invoked directly,
+outside of the object wrapping.
+
+=back
+
+=cut
+
+sub new
+{
+ my $class = shift;
+ my ( %args ) = @_;
+
+ my $poll = delete $args{poll};
+
+ my $self = $class->__new( %args );
+
+ $self->{poll} = $poll;
+ $self->{pollmask} = {};
+
+ return $self;
+}
+
+=head1 METHODS
+
+=cut
+
+=head2 $count = $loop->post_poll
+
+This method checks the returned event list from a C<IO::Poll::poll> call,
+and calls any of the notification methods or callbacks that are appropriate.
+It returns the total number of callbacks that were invoked; that is, the
+total number of C<on_read_ready> and C<on_write_ready> callbacks for
+C<watch_io>, and C<watch_time> event callbacks.
+
+=cut
+
+sub post_poll
+{
+ my $self = shift;
+
+ my $iowatches = $self->{iowatches};
+ my $poll = $self->{poll};
+
+ my $count = 0;
+
+ alarm( IO::Async::Loop->WATCHDOG_INTERVAL ) if WATCHDOG_ENABLE;
+
+ foreach my $fd ( keys %$iowatches ) {
+ my $watch = $iowatches->{$fd} or next;
+
+ my $events = $poll ? $poll->events( $watch->[0] )
+ : $self->{pollevents}{$fd};
+ if( FAKE_ISREG_READY and $self->{fake_isreg}{$fd} ) {
+ $events |= $self->{fake_isreg}{$fd} & ( POLLIN|POLLOUT );
+ }
+
+ # We have to test separately because kernel doesn't report POLLIN when
+ # a pipe gets closed.
+ if( $events & (POLLIN|POLLHUP|POLLERR) ) {
+ $count++, $watch->[1]->() if defined $watch->[1];
+ }
+
+ if( $events & (POLLOUT|POLLPRI|POLLHUP|POLLERR) ) {
+ $count++, $watch->[2]->() if defined $watch->[2];
+ }
+
+ if( $events & (POLLHUP|POLLERR) ) {
+ $count++, $watch->[3]->() if defined $watch->[3];
+ }
+ }
+
+ # Since we have no way to know if the timeout occured, we'll have to
+ # attempt to fire any waiting timeout events anyway
+ $count += $self->_manage_queues;
+
+ alarm( 0 ) if WATCHDOG_ENABLE;
+
+ return $count;
+}
+
+=head2 $count = $loop->loop_once( $timeout )
+
+This method calls the C<poll> method on the stored C<IO::Poll> object,
+passing in the value of C<$timeout>, and then runs the C<post_poll> method
+on itself. It returns the total number of callbacks invoked by the
+C<post_poll> method, or C<undef> if the underlying C<poll> method returned
+an error.
+
+=cut
+
+sub loop_once
+{
+ my $self = shift;
+ my ( $timeout ) = @_;
+
+ $self->_adjust_timeout( \$timeout );
+
+ $timeout = 0 if FAKE_ISREG_READY and keys %{ $self->{fake_isreg} };
+
+ # Round up to nearest millisecond
+ if( $timeout ) {
+ my $mils = $timeout * 1000;
+ my $fraction = $mils - int $mils;
+ $timeout += ( 1 - $fraction ) / 1000 if $fraction;
+ }
+
+ if( my $poll = $self->{poll} ) {
+ my $pollret;
+
+ # There is a bug in IO::Poll at least version 0.07, where poll with no
+ # registered masks returns immediately, rather than waiting for a timeout
+ # This has been reported:
+ # http://rt.cpan.org/Ticket/Display.html?id=25049
+ if( $poll->handles ) {
+ $pollret = $poll->poll( $timeout );
+
+ if( ( $pollret == -1 and $! == EINTR ) or $pollret == 0
+ and defined $self->{sigproxy} ) {
+ # A signal occured and we have a sigproxy. Allow one more poll call
+ # with zero timeout. If it finds something, keep that result. If it
+ # finds nothing, keep -1
+
+ # Preserve $! whatever happens
+ local $!;
+
+ my $secondattempt = $poll->poll( 0 );
+ $pollret = $secondattempt if $secondattempt > 0;
+ }
+ }
+ else {
+ # Workaround - we'll use select to fake a millisecond-accurate sleep
+ $pollret = select( undef, undef, undef, $timeout );
+ }
+
+ return undef unless defined $pollret;
+ return $self->post_poll;
+ }
+ else {
+ my $msec = defined $timeout ? $timeout * 1000 : -1;
+ my @pollmasks = %{ $self->{pollmask} };
+
+ my $pollret = IO::Poll::_poll( $msec, @pollmasks );
+ if( $pollret == -1 and $! == EINTR or
+ $pollret == 0 and $self->{sigproxy} ) {
+ local $!;
+
+ @pollmasks = %{ $self->{pollmask} };
+ my $secondattempt = IO::Poll::_poll( $msec, @pollmasks );
+ $pollret = $secondattempt if $secondattempt > 0;
+ }
+
+ return undef unless defined $pollret;
+
+ $self->{pollevents} = { @pollmasks };
+ return $self->post_poll;
+ }
+}
+
+sub watch_io
+{
+ my $self = shift;
+ my %params = @_;
+
+ $self->__watch_io( %params );
+
+ my $poll = $self->{poll};
+
+ my $handle = $params{handle};
+ my $fileno = $handle->fileno;
+
+ my $curmask = $poll ? $poll->mask( $handle )
+ : $self->{pollmask}{$fileno};
+ $curmask ||= 0;
+
+ my $mask = $curmask;
+ $params{on_read_ready} and $mask |= POLLIN;
+ $params{on_write_ready} and $mask |= POLLOUT | (POLL_CONNECT_POLLPRI ? POLLPRI : 0);
+ $params{on_hangup} and $mask |= POLLHUP;
+
+ if( FAKE_ISREG_READY and S_ISREG +(stat $handle)[2] ) {
+ $self->{fake_isreg}{$fileno} = $mask;
+ }
+
+ return if $mask == $curmask;
+
+ if( $poll ) {
+ $poll->mask( $handle, $mask );
+ }
+ else {
+ $self->{pollmask}{$fileno} = $mask;
+ }
+}
+
+sub unwatch_io
+{
+ my $self = shift;
+ my %params = @_;
+
+ $self->__unwatch_io( %params );
+
+ my $poll = $self->{poll};
+
+ my $handle = $params{handle};
+ my $fileno = $handle->fileno;
+
+ my $curmask = $poll ? $poll->mask( $handle )
+ : $self->{pollmask}{$fileno};
+ $curmask ||= 0;
+
+ my $mask = $curmask;
+ $params{on_read_ready} and $mask &= ~POLLIN;
+ $params{on_write_ready} and $mask &= ~(POLLOUT | (POLL_CONNECT_POLLPRI ? POLLPRI : 0));
+ $params{on_hangup} and $mask &= ~POLLHUP;
+
+ if( FAKE_ISREG_READY and S_ISREG +(stat $handle)[2] ) {
+ if( $mask ) {
+ $self->{fake_isreg}{$handle->fileno} = $mask;
+ }
+ else {
+ delete $self->{fake_isreg}{$handle->fileno};
+ }
+ }
+
+ return if $mask == $curmask;
+
+ if( $poll ) {
+ $poll->mask( $handle, $mask );
+ }
+ else {
+ $mask ? ( $self->{pollmask}{$fileno} = $mask )
+ : ( delete $self->{pollmask}{$fileno} );
+ }
+}
+
+=head1 AUTHOR
+
+Paul Evans <leonerd@leonerd.org.uk>
+
+=cut
+
+0x55AA;
diff --git a/lib/IO/Async/Loop/Select.pm b/lib/IO/Async/Loop/Select.pm
new file mode 100644
index 0000000..0c3bd9c
--- /dev/null
+++ b/lib/IO/Async/Loop/Select.pm
@@ -0,0 +1,294 @@
+# You may distribute under the terms of either the GNU General Public License
+# or the Artistic License (the same terms as Perl itself)
+#
+# (C) Paul Evans, 2007-2015 -- leonerd@leonerd.org.uk
+
+package IO::Async::Loop::Select;
+
+use strict;
+use warnings;
+
+our $VERSION = '0.67';
+use constant API_VERSION => '0.49';
+
+use base qw( IO::Async::Loop );
+
+use IO::Async::OS;
+
+use Carp;
+
+# select() on most platforms claims that ISREG files are always read- and
+# write-ready, but not on MSWin32. We need to fake this
+use constant FAKE_ISREG_READY => IO::Async::OS->HAVE_FAKE_ISREG_READY;
+# select() on most platforms indicates write-ready when connect() fails, but
+# not on MSWin32. Have to pull from evec in that case
+use constant SELECT_CONNECT_EVEC => IO::Async::OS->HAVE_SELECT_CONNECT_EVEC;
+
+use constant _CAN_WATCHDOG => 1;
+use constant WATCHDOG_ENABLE => IO::Async::Loop->WATCHDOG_ENABLE;
+
+=head1 NAME
+
+C<IO::Async::Loop::Select> - use C<IO::Async> with C<select(2)>
+
+=head1 SYNOPSIS
+
+Normally an instance of this class would not be directly constructed by a
+program. It may however, be useful for runinng L<IO::Async> with an existing
+program already using a C<select> call.
+
+ use IO::Async::Loop::Select;
+
+ my $loop = IO::Async::Loop::Select->new;
+
+ $loop->add( ... );
+
+ while(1) {
+ my ( $rvec, $wvec, $evec ) = ('') x 3;
+ my $timeout;
+
+ $loop->pre_select( \$rvec, \$wvec, \$evec, \$timeout );
+ ...
+ my $ret = select( $rvec, $wvec, $evec, $timeout );
+ ...
+ $loop->post_select( $rvec, $evec, $wvec );
+ }
+
+=head1 DESCRIPTION
+
+This subclass of C<IO::Async::Loop> uses the C<select(2)> syscall to perform
+read-ready and write-ready tests.
+
+To integrate with an existing C<select>-based event loop, a pair of methods
+C<pre_select> and C<post_select> can be called immediately before and
+after a C<select> call. The relevant bits in the read-ready, write-ready and
+exceptional-state bitvectors are set by the C<pre_select> method, and tested
+by the C<post_select> method to pick which event callbacks to invoke.
+
+=cut
+
+=head1 CONSTRUCTOR
+
+=cut
+
+=head2 $loop = IO::Async::Loop::Select->new
+
+This function returns a new instance of a C<IO::Async::Loop::Select> object.
+It takes no special arguments.
+
+=cut
+
+sub new
+{
+ my $class = shift;
+
+ my $self = $class->__new( @_ );
+
+ $self->{rvec} = '';
+ $self->{wvec} = '';
+ $self->{evec} = '';
+
+ $self->{avec} = ''; # Bitvector of handles always to claim are ready
+
+ return $self;
+}
+
+=head1 METHODS
+
+=cut
+
+=head2 $loop->pre_select( \$readvec, \$writevec, \$exceptvec, \$timeout )
+
+This method prepares the bitvectors for a C<select> call, setting the bits
+that the Loop is interested in. It will also adjust the C<$timeout> value if
+appropriate, reducing it if the next event timeout the Loop requires is sooner
+than the current value.
+
+=over 8
+
+=item \$readvec
+
+=item \$writevec
+
+=item \$exceptvec
+
+Scalar references to the reading, writing and exception bitvectors
+
+=item \$timeout
+
+Scalar reference to the timeout value
+
+=back
+
+=cut
+
+sub pre_select
+{
+ my $self = shift;
+ my ( $readref, $writeref, $exceptref, $timeref ) = @_;
+
+ # BITWISE operations
+ $$readref |= $self->{rvec};
+ $$writeref |= $self->{wvec};
+ $$exceptref |= $self->{evec};
+
+ $self->_adjust_timeout( $timeref );
+
+ $$timeref = 0 if FAKE_ISREG_READY and length $self->{avec};
+
+ # Round up to nearest millisecond
+ if( $$timeref ) {
+ my $mils = $$timeref * 1000;
+ my $fraction = $mils - int $mils;
+ $$timeref += ( 1 - $fraction ) / 1000 if $fraction;
+ }
+
+ return;
+}
+
+=head2 $loop->post_select( $readvec, $writevec, $exceptvec )
+
+This method checks the returned bitvectors from a C<select> call, and calls
+any of the callbacks that are appropriate.
+
+=over 8
+
+=item $readvec
+
+=item $writevec
+
+=item $exceptvec
+
+Scalars containing the read-ready, write-ready and exception bitvectors
+
+=back
+
+=cut
+
+sub post_select
+{
+ my $self = shift;
+ my ( $readvec, $writevec, $exceptvec ) = @_;
+
+ my $iowatches = $self->{iowatches};
+
+ my $count = 0;
+
+ alarm( IO::Async::Loop->WATCHDOG_INTERVAL ) if WATCHDOG_ENABLE;
+
+ foreach my $fd ( keys %$iowatches ) {
+ my $watch = $iowatches->{$fd} or next;
+
+ my $fileno = $watch->[0]->fileno;
+
+ if( vec( $readvec, $fileno, 1 ) or
+ FAKE_ISREG_READY and vec( $self->{avec}, $fileno, 1 ) and vec( $self->{rvec}, $fileno, 1 ) ) {
+ $count++, $watch->[1]->() if defined $watch->[1];
+ }
+
+ if( vec( $writevec, $fileno, 1 ) or
+ SELECT_CONNECT_EVEC and vec( $exceptvec, $fileno, 1 ) or
+ FAKE_ISREG_READY and vec( $self->{avec}, $fileno, 1 ) and vec( $self->{wvec}, $fileno, 1 ) ) {
+ $count++, $watch->[2]->() if defined $watch->[2];
+ }
+ }
+
+ # Since we have no way to know if the timeout occured, we'll have to
+ # attempt to fire any waiting timeout events anyway
+
+ $self->_manage_queues;
+
+ alarm( 0 ) if WATCHDOG_ENABLE;
+}
+
+=head2 $count = $loop->loop_once( $timeout )
+
+This method calls the C<pre_select> method to prepare the bitvectors for a
+C<select> syscall, performs it, then calls C<post_select> to process the
+result. It returns the total number of callbacks invoked by the
+C<post_select> method, or C<undef> if the underlying C<select(2)> syscall
+returned an error.
+
+=cut
+
+sub loop_once
+{
+ my $self = shift;
+ my ( $timeout ) = @_;
+
+ my ( $rvec, $wvec, $evec ) = ('') x 3;
+
+ $self->pre_select( \$rvec, \$wvec, \$evec, \$timeout );
+
+ my $ret = select( $rvec, $wvec, $evec, $timeout );
+
+ if( $ret < 0 ) {
+ # r/w/e vec can't be trusted
+ $rvec = $wvec = $evec = '';
+ }
+
+ {
+ local $!;
+ $self->post_select( $rvec, $wvec, $evec );
+ }
+
+ return $ret;
+}
+
+sub watch_io
+{
+ my $self = shift;
+ my %params = @_;
+
+ $self->__watch_io( %params );
+
+ my $fileno = $params{handle}->fileno;
+
+ vec( $self->{rvec}, $fileno, 1 ) = 1 if $params{on_read_ready};
+ vec( $self->{wvec}, $fileno, 1 ) = 1 if $params{on_write_ready};
+
+ # MSWin32 does not indicate writeready for connect() errors, HUPs, etc
+ # but it does indicate exceptional
+ vec( $self->{evec}, $fileno, 1 ) = 1 if SELECT_CONNECT_EVEC and $params{on_write_ready};
+
+ vec( $self->{avec}, $fileno, 1 ) = 1 if FAKE_ISREG_READY and stat( $params{handle} ) and -f _;
+}
+
+sub unwatch_io
+{
+ my $self = shift;
+ my %params = @_;
+
+ $self->__unwatch_io( %params );
+
+ my $fileno = $params{handle}->fileno;
+
+ vec( $self->{rvec}, $fileno, 1 ) = 0 if $params{on_read_ready};
+ vec( $self->{wvec}, $fileno, 1 ) = 0 if $params{on_write_ready};
+
+ vec( $self->{evec}, $fileno, 1 ) = 0 if SELECT_CONNECT_EVEC and $params{on_write_ready};
+
+ vec( $self->{avec}, $fileno, 1 ) = 0 if FAKE_ISREG_READY and stat( $params{handle} ) and -f _;
+
+ # vec will grow a bit vector as needed, but never shrink it. We'll trim
+ # trailing null bytes
+ $_ =~s/\0+\z// for $self->{rvec}, $self->{wvec}, $self->{evec}, $self->{avec};
+}
+
+=head1 SEE ALSO
+
+=over 4
+
+=item *
+
+L<IO::Select> - OO interface to select system call
+
+=back
+
+=head1 AUTHOR
+
+Paul Evans <leonerd@leonerd.org.uk>
+
+=cut
+
+0x55AA;