diff options
author | Lorry Tar Creator <lorry-tar-importer@lorry> | 2015-06-01 14:15:30 +0000 |
---|---|---|
committer | Lorry Tar Creator <lorry-tar-importer@lorry> | 2015-06-01 14:15:30 +0000 |
commit | 1425eea04dd872dc6313f5315f317b2de288037c (patch) | |
tree | f81c74f75429e829714029850f89ee4c7f13aa39 /lib/IO/Async/Routine.pm | |
download | IO-Async-tarball-master.tar.gz |
IO-Async-0.67HEADIO-Async-0.67master
Diffstat (limited to 'lib/IO/Async/Routine.pm')
-rw-r--r-- | lib/IO/Async/Routine.pm | 436 |
1 files changed, 436 insertions, 0 deletions
diff --git a/lib/IO/Async/Routine.pm b/lib/IO/Async/Routine.pm new file mode 100644 index 0000000..f9a5a3b --- /dev/null +++ b/lib/IO/Async/Routine.pm @@ -0,0 +1,436 @@ +# You may distribute under the terms of either the GNU General Public License +# or the Artistic License (the same terms as Perl itself) +# +# (C) Paul Evans, 2012-2013 -- leonerd@leonerd.org.uk + +package IO::Async::Routine; + +use strict; +use warnings; + +our $VERSION = '0.67'; + +use base qw( IO::Async::Notifier ); + +use Carp; + +use IO::Async::OS; +use IO::Async::Process; + +=head1 NAME + +C<IO::Async::Routine> - execute code in an independent sub-process or thread + +=head1 SYNOPSIS + + use IO::Async::Routine; + use IO::Async::Channel; + + use IO::Async::Loop; + my $loop = IO::Async::Loop->new; + + my $nums_ch = IO::Async::Channel->new; + my $ret_ch = IO::Async::Channel->new; + + my $routine = IO::Async::Routine->new( + channels_in => [ $nums_ch ], + channels_out => [ $ret_ch ], + + code => sub { + my @nums = @{ $nums_ch->recv }; + my $ret = 0; $ret += $_ for @nums; + + # Can only send references + $ret_ch->send( \$ret ); + }, + + on_finish => sub { + say "The routine aborted early - $_[-1]"; + $loop->stop; + }, + ); + + $loop->add( $routine ); + + $nums_ch->send( [ 10, 20, 30 ] ); + $ret_ch->recv( + on_recv => sub { + my ( $ch, $totalref ) = @_; + say "The total of 10, 20, 30 is: $$totalref"; + $loop->stop; + } + ); + + $loop->run; + +=head1 DESCRIPTION + +This L<IO::Async::Notifier> contains a body of code and executes it in a +sub-process or thread, allowing it to act independently of the main program. +Once set up, all communication with the code happens by values passed into or +out of the Routine via L<IO::Async::Channel> objects. + +A choice of detachment model is available, with options being a C<fork()>ed +child process, or a thread. In both cases the code contained within the +Routine is free to make blocking calls without stalling the rest of the +program. This makes it useful for using existing code which has no option not +to block within an C<IO::Async>-based program. + +Code running inside a C<fork()>-based Routine runs within its own process; it +is isolated from the rest of the program in terms of memory, CPU time, and +other resources. Code running in a thread-based Routine however, shares memory +and other resources such as open filehandles with the main thread. + +To create asynchronous wrappers of functions that return a value based only on +their arguments, and do not generally maintain state within the process it may +be more convenient to use an L<IO::Async::Function> instead, which uses an +C<IO::Async::Routine> to contain the body of the function and manages the +Channels itself. + +=cut + +=head1 EVENTS + +=head2 on_finish $exitcode + +For C<fork()>-based Routines, this is invoked after the process has exited and +is passed the raw exitcode status. + +=head2 on_finish $type, @result + +For thread-based Routines, this is invoked after the thread has returned from +its code block and is passed the C<on_joined> result. + +As the behaviour of these events differs per model, it may be more convenient +to use C<on_return> and C<on_die> instead. + +=head2 on_return $result + +Invoked if the code block returns normally. Note that C<fork()>-based Routines +can only transport an integer result between 0 and 255, as this is the actual +C<exit()> value. + +=head2 on_die $exception + +Invoked if the code block fails with an exception. + +=cut + +=head1 PARAMETERS + +The following named parameters may be passed to C<new> or C<configure>: + +=head2 model => "fork" | "thread" + +Optional. Defines how the routine will detach itself from the main process. +C<fork> uses a child process detached using an L<IO::Async::Process>. +C<thread> uses a thread, and is only available on threaded Perls. + +If the model is not specified, the environment variable +C<IO_ASYNC_ROUTINE_MODEL> is used to pick a default. If that isn't defined, +C<fork> is preferred if it is available, otherwise C<thread>. + +=head2 channels_in => ARRAY of IO::Async::Channel + +ARRAY reference of C<IO::Async::Channel> objects to set up for passing values +in to the Routine. + +=head2 channels_out => ARRAY of IO::Async::Channel + +ARRAY reference of C<IO::Async::Channel> objects to set up for passing values +out of the Routine. + +=head2 code => CODE + +CODE reference to the body of the Routine, to execute once the channels are +set up. + +=head2 setup => ARRAY + +Optional. For C<fork()>-based Routines, gives a reference to an array to pass +to the underlying C<Loop> C<fork_child> method. Ignored for thread-based +Routines. + +=cut + +use constant PREFERRED_MODEL => + IO::Async::OS->HAVE_POSIX_FORK ? "fork" : + IO::Async::OS->HAVE_THREADS ? "thread" : + die "No viable Routine models"; + +sub _init +{ + my $self = shift; + my ( $params ) = @_; + + $params->{model} ||= $ENV{IO_ASYNC_ROUTINE_MODEL} || PREFERRED_MODEL; + + $self->SUPER::_init( @_ ); +} + +sub configure +{ + my $self = shift; + my %params = @_; + + # TODO: Can only reconfigure when not running + foreach (qw( channels_in channels_out code setup on_finish on_return on_die )) { + $self->{$_} = delete $params{$_} if exists $params{$_}; + } + + if( defined( my $model = delete $params{model} ) ) { + $model eq "fork" or $model eq "thread" or + croak "Expected 'model' to be either 'fork' or 'thread'"; + + $model eq "fork" and !IO::Async::OS->HAVE_POSIX_FORK and + croak "Cannot use 'fork' model as fork() is not available"; + $model eq "thread" and !IO::Async::OS->HAVE_THREADS and + croak "Cannot use 'thread' model as threads are not available"; + + $self->{model} = $model; + } + + $self->SUPER::configure( %params ); +} + +sub _add_to_loop +{ + my $self = shift; + my ( $loop ) = @_; + $self->SUPER::_add_to_loop( $loop ); + + return $self->_setup_fork if $self->{model} eq "fork"; + return $self->_setup_thread if $self->{model} eq "thread"; + + die "TODO: unrecognised Routine model $self->{model}"; +} + +sub _setup_fork +{ + my $self = shift; + + my @setup; + my @channels_in; + my @channels_out; + + foreach my $ch ( @{ $self->{channels_in} || [] } ) { + my ( $rd, $wr ); + unless( $rd = $ch->_extract_read_handle ) { + ( $rd, $wr ) = IO::Async::OS->pipepair; + } + push @setup, $rd => "keep"; + push @channels_in, [ $ch, $wr, $rd ]; + } + + foreach my $ch ( @{ $self->{channels_out} || [] } ) { + my ( $rd, $wr ); + unless( $wr = $ch->_extract_write_handle ) { + ( $rd, $wr ) = IO::Async::OS->pipepair; + } + push @setup, $wr => "keep"; + push @channels_out, [ $ch, $rd, $wr ]; + } + + my $code = $self->{code}; + + my $setup = $self->{setup}; + push @setup, @$setup if $setup; + + my $process = IO::Async::Process->new( + setup => \@setup, + code => sub { + foreach ( @channels_in ) { + my ( $ch, undef, $rd ) = @$_; + $ch->setup_sync_mode( $rd ); + } + foreach ( @channels_out ) { + my ( $ch, undef, $wr ) = @$_; + $ch->setup_sync_mode( $wr ); + } + + my $ret = $code->(); + + foreach ( @channels_in, @channels_out ) { + my ( $ch ) = @$_; + $ch->close; + } + + return $ret; + }, + on_finish => $self->_replace_weakself( sub { + my $self = shift or return; + my ( $exitcode ) = @_; + $self->maybe_invoke_event( on_finish => $exitcode ); + + $self->maybe_invoke_event( on_return => ($exitcode >> 8) ) unless $exitcode & 0x7f; + }), + on_exception => $self->_replace_weakself( sub { + my $self = shift or return; + my ( $exception, $errno, $exitcode ) = @_; + + $self->maybe_invoke_event( on_die => $exception ); + }), + ); + + foreach ( @channels_in ) { + my ( $ch, $wr ) = @$_; + + $ch->setup_async_mode( write_handle => $wr ); + + $self->add_child( $ch ) unless $ch->parent; + } + + foreach ( @channels_out ) { + my ( $ch, $rd ) = @$_; + + $ch->setup_async_mode( read_handle => $rd ); + + $self->add_child( $ch ) unless $ch->parent; + } + + $self->add_child( $self->{process} = $process ); + $self->{id} = "P" . $process->pid; + + foreach ( @channels_in, @channels_out ) { + my ( undef, undef, $other ) = @$_; + $other->close; + } +} + +sub _setup_thread +{ + my $self = shift; + + my @channels_in; + my @channels_out; + + foreach my $ch ( @{ $self->{channels_in} || [] } ) { + my ( $rd, $wr ); + unless( $rd = $ch->_extract_read_handle ) { + ( $rd, $wr ) = IO::Async::OS->pipepair; + } + push @channels_in, [ $ch, $wr, $rd ]; + } + + foreach my $ch ( @{ $self->{channels_out} || [] } ) { + my ( $rd, $wr ); + unless( $wr = $ch->_extract_write_handle ) { + ( $rd, $wr ) = IO::Async::OS->pipepair; + } + push @channels_out, [ $ch, $rd, $wr ]; + } + + my $code = $self->{code}; + + my $tid = $self->loop->create_thread( + code => sub { + foreach ( @channels_in ) { + my ( $ch, $wr, $rd ) = @$_; + $ch->setup_sync_mode( $rd ); + $wr->close if $wr; + } + foreach ( @channels_out ) { + my ( $ch, $rd, $wr ) = @$_; + $ch->setup_sync_mode( $wr ); + $rd->close if $rd; + } + + my $ret = $code->(); + + foreach ( @channels_in, @channels_out ) { + my ( $ch ) = @$_; + $ch->close; + } + + return $ret; + }, + on_joined => $self->_capture_weakself( sub { + my $self = shift or return; + my ( $ev, @result ) = @_; + $self->maybe_invoke_event( on_finish => @_ ); + + $self->maybe_invoke_event( on_return => @result ) if $ev eq "return"; + $self->maybe_invoke_event( on_die => $result[0] ) if $ev eq "died"; + + delete $self->{tid}; + }), + ); + + $self->{tid} = $tid; + $self->{id} = "T" . $tid; + + foreach ( @channels_in ) { + my ( $ch, $wr, $rd ) = @$_; + + $ch->setup_async_mode( write_handle => $wr ); + $rd->close; + + $self->add_child( $ch ) unless $ch->parent; + } + + foreach ( @channels_out ) { + my ( $ch, $rd, $wr ) = @$_; + + $ch->setup_async_mode( read_handle => $rd ); + $wr->close; + + $self->add_child( $ch ) unless $ch->parent; + } +} + +=head1 METHODS + +=cut + +=head2 $id = $routine->id + +Returns an ID string that uniquely identifies the Routine out of all the +currently-running ones. (The ID of already-exited Routines may be reused, +however.) + +=cut + +sub id +{ + my $self = shift; + return $self->{id}; +} + +=head2 $model = $routine->model + +Returns the detachment model in use by the Routine. + +=cut + +sub model +{ + my $self = shift; + return $self->{model}; +} + +=head2 $routine->kill( $signal ) + +Sends the specified signal to the routine code. This is either implemented by +C<CORE::kill()> or C<threads::kill> as required. Note that in the thread case +this has the usual limits of signal delivery to threads; namely, that it works +at the Perl interpreter level, and cannot actually interrupt blocking system +calls. + +=cut + +sub kill +{ + my $self = shift; + my ( $signal ) = @_; + + $self->{process}->kill( $signal ) if $self->{model} eq "fork"; + threads->object( $self->{tid} )->kill( $signal ) if $self->{model} eq "thread"; +} + +=head1 AUTHOR + +Paul Evans <leonerd@leonerd.org.uk> + +=cut + +0x55AA; |