diff options
author | Lorry Tar Creator <lorry-tar-importer@lorry> | 2015-06-01 14:15:30 +0000 |
---|---|---|
committer | Lorry Tar Creator <lorry-tar-importer@lorry> | 2015-06-01 14:15:30 +0000 |
commit | 1425eea04dd872dc6313f5315f317b2de288037c (patch) | |
tree | f81c74f75429e829714029850f89ee4c7f13aa39 /lib/IO/Async/Function.pm | |
download | IO-Async-tarball-master.tar.gz |
IO-Async-0.67HEADIO-Async-0.67master
Diffstat (limited to 'lib/IO/Async/Function.pm')
-rw-r--r-- | lib/IO/Async/Function.pm | 667 |
1 files changed, 667 insertions, 0 deletions
diff --git a/lib/IO/Async/Function.pm b/lib/IO/Async/Function.pm new file mode 100644 index 0000000..adaf396 --- /dev/null +++ b/lib/IO/Async/Function.pm @@ -0,0 +1,667 @@ +# You may distribute under the terms of either the GNU General Public License +# or the Artistic License (the same terms as Perl itself) +# +# (C) Paul Evans, 2011-2015 -- leonerd@leonerd.org.uk + +package IO::Async::Function; + +use strict; +use warnings; + +our $VERSION = '0.67'; + +use base qw( IO::Async::Notifier ); +use IO::Async::Timer::Countdown; + +use Carp; + +use Storable qw( freeze ); + +=head1 NAME + +C<IO::Async::Function> - call a function asynchronously + +=head1 SYNOPSIS + + use IO::Async::Function; + + use IO::Async::Loop; + my $loop = IO::Async::Loop->new; + + my $function = IO::Async::Function->new( + code => sub { + my ( $number ) = @_; + return is_prime( $number ); + }, + ); + + $loop->add( $function ); + + $function->call( + args => [ 123454321 ], + )->on_done( sub { + my $isprime = shift; + print "123454321 " . ( $isprime ? "is" : "is not" ) . " a prime number\n"; + })->on_fail( sub { + print STDERR "Cannot determine if it's prime - $_[0]\n"; + })->get; + +=head1 DESCRIPTION + +This subclass of L<IO::Async::Notifier> wraps a function body in a collection +of worker processes, to allow it to execute independently of the main process. +The object acts as a proxy to the function, allowing invocations to be made by +passing in arguments, and invoking a continuation in the main process when the +function returns. + +The object represents the function code itself, rather than one specific +invocation of it. It can be called multiple times, by the C<call> method. +Multiple outstanding invocations can be called; they will be dispatched in +the order they were queued. If only one worker process is used then results +will be returned in the order they were called. If multiple are used, then +each request will be sent in the order called, but timing differences between +each worker may mean results are returned in a different order. + +Since the code block will be called multiple times within the same child +process, it must take care not to modify any of its state that might affect +subsequent calls. Since it executes in a child process, it cannot make any +modifications to the state of the parent program. Therefore, all the data +required to perform its task must be represented in the call arguments, and +all of the result must be represented in the return values. + +The Function object is implemented using an L<IO::Async::Routine> with two +L<IO::Async::Channel> objects to pass calls into and results out from it. + +The C<IO::Async> framework generally provides mechanisms for multiplexing IO +tasks between different handles, so there aren't many occasions when such an +asynchronous function is necessary. Two cases where this does become useful +are: + +=over 4 + +=item 1. + +When a large amount of computationally-intensive work needs to be performed +(for example, the C<is_prime> test in the example in the C<SYNOPSIS>). + +=item 2. + +When a blocking OS syscall or library-level function needs to be called, and +no nonblocking or asynchronous version is supplied. This is used by +C<IO::Async::Resolver>. + +=back + +This object is ideal for representing "pure" functions; that is, blocks of +code which have no stateful effect on the process, and whose result depends +only on the arguments passed in. For a more general co-routine ability, see +also L<IO::Async::Routine>. + +=cut + +=head1 PARAMETERS + +The following named parameters may be passed to C<new> or C<configure>: + +=head2 code => CODE + +The body of the function to execute. + +=head2 model => "fork" | "thread" + +Optional. Requests a specific C<IO::Async::Routine> model. If not supplied, +leaves the default choice up to Routine. + +=head2 min_workers => INT + +=head2 max_workers => INT + +The lower and upper bounds of worker processes to try to keep running. The +actual number running at any time will be kept somewhere between these bounds +according to load. + +=head2 max_worker_calls => INT + +Optional. If provided, stop a worker process after it has processed this +number of calls. (New workers may be started to replace stopped ones, within +the bounds given above). + +=head2 idle_timeout => NUM + +Optional. If provided, idle worker processes will be shut down after this +amount of time, if there are more than C<min_workers> of them. + +=head2 exit_on_die => BOOL + +Optional boolean, controls what happens after the C<code> throws an +exception. If missing or false, the worker will continue running to process +more requests. If true, the worker will be shut down. A new worker might be +constructed by the C<call> method to replace it, if necessary. + +=head2 setup => ARRAY + +Optional array reference. Specifies the C<setup> key to pass to the underlying +L<IO::Async::Process> when setting up new worker processes. + +=cut + +sub _init +{ + my $self = shift; + $self->SUPER::_init( @_ ); + + $self->{min_workers} = 1; + $self->{max_workers} = 8; + + $self->{workers} = {}; # {$id} => IaFunction:Worker + + $self->{pending_queue} = []; +} + +sub configure +{ + my $self = shift; + my %params = @_; + + my %worker_params; + foreach (qw( model exit_on_die max_worker_calls )) { + $self->{$_} = $worker_params{$_} = delete $params{$_} if exists $params{$_}; + } + + if( keys %worker_params ) { + foreach my $worker ( $self->_worker_objects ) { + $worker->configure( %worker_params ); + } + } + + if( exists $params{idle_timeout} ) { + my $timeout = delete $params{idle_timeout}; + if( !$timeout ) { + $self->remove_child( delete $self->{idle_timer} ) if $self->{idle_timer}; + } + elsif( my $idle_timer = $self->{idle_timer} ) { + $idle_timer->configure( delay => $timeout ); + } + else { + $self->{idle_timer} = IO::Async::Timer::Countdown->new( + delay => $timeout, + on_expire => $self->_capture_weakself( sub { + my $self = shift or return; + my $workers = $self->{workers}; + + # Shut down atmost one idle worker, starting from the highest + # ID. Since we search from lowest to assign work, this tries + # to ensure we'll shut down the least useful ones first, + # keeping more useful ones in memory (page/cache warmth, etc..) + foreach my $id ( reverse sort keys %$workers ) { + next if $workers->{$id}{busy}; + + $workers->{$id}->stop; + last; + } + + # Still more? + $self->{idle_timer}->start if $self->workers_idle > $self->{min_workers}; + } ), + ); + $self->add_child( $self->{idle_timer} ); + } + } + + foreach (qw( min_workers max_workers )) { + $self->{$_} = delete $params{$_} if exists $params{$_}; + # TODO: something about retuning + } + + my $need_restart; + + foreach (qw( code setup )) { + $need_restart++, $self->{$_} = delete $params{$_} if exists $params{$_}; + } + + $self->SUPER::configure( %params ); + + if( $need_restart and $self->loop ) { + $self->stop; + $self->start; + } +} + +sub _add_to_loop +{ + my $self = shift; + $self->SUPER::_add_to_loop( @_ ); + + $self->start; +} + +sub _remove_from_loop +{ + my $self = shift; + + $self->stop; + + $self->SUPER::_remove_from_loop( @_ ); +} + +=head1 METHODS + +The following methods documented with a trailing call to C<< ->get >> return +L<Future> instances. + +=cut + +=head2 $function->start + +Start the worker processes + +=cut + +sub start +{ + my $self = shift; + + $self->_new_worker for 1 .. $self->{min_workers}; +} + +=head2 $function->stop + +Stop the worker processes + +=cut + +sub stop +{ + my $self = shift; + + $self->{stopping} = 1; + foreach my $worker ( $self->_worker_objects ) { + $worker->stop; + } +} + +=head2 $function->restart + +Gracefully stop and restart all the worker processes. + +=cut + +sub restart +{ + my $self = shift; + + $self->stop; + $self->start; +} + +=head2 @result = $function->call( %params )->get + +Schedules an invocation of the contained function to be executed on one of the +worker processes. If a non-busy worker is available now, it will be called +immediately. If not, it will be queued and sent to the next free worker that +becomes available. + +The request will already have been serialised by the marshaller, so it will be +safe to modify any referenced data structures in the arguments after this call +returns. + +The C<%params> hash takes the following keys: + +=over 8 + +=item args => ARRAY + +A reference to the array of arguments to pass to the code. + +=back + +=head2 $function->call( %params ) + +When not returning a future, the C<on_result>, C<on_return> and C<on_error> +arguments give continuations to handle successful results or failure. + +=over 8 + +=item on_result => CODE + +A continuation that is invoked when the code has been executed. If the code +returned normally, it is called as: + + $on_result->( 'return', @values ) + +If the code threw an exception, or some other error occured such as a closed +connection or the process died, it is called as: + + $on_result->( 'error', $exception_name ) + +=item on_return => CODE and on_error => CODE + +An alternative to C<on_result>. Two continuations to use in either of the +circumstances given above. They will be called directly, without the leading +'return' or 'error' value. + +=back + +=cut + +sub call +{ + my $self = shift; + my %params = @_; + + # TODO: possibly just queue this? + $self->loop or croak "Cannot ->call on a Function not yet in a Loop"; + + my $args = delete $params{args}; + ref $args eq "ARRAY" or croak "Expected 'args' to be an array"; + + my ( $on_done, $on_fail ); + if( defined $params{on_result} ) { + my $on_result = delete $params{on_result}; + ref $on_result or croak "Expected 'on_result' to be a reference"; + + $on_done = $self->_capture_weakself( sub { + my $self = shift or return; + $self->debug_printf( "CONT on_result return" ); + $on_result->( return => @_ ); + } ); + $on_fail = $self->_capture_weakself( sub { + my $self = shift or return; + my ( $err, @values ) = @_; + $self->debug_printf( "CONT on_result error" ); + $on_result->( error => @values ); + } ); + } + elsif( defined $params{on_return} and defined $params{on_error} ) { + my $on_return = delete $params{on_return}; + ref $on_return or croak "Expected 'on_return' to be a reference"; + my $on_error = delete $params{on_error}; + ref $on_error or croak "Expected 'on_error' to be a reference"; + + $on_done = $self->_capture_weakself( sub { + my $self = shift or return; + $self->debug_printf( "CONT on_return" ); + $on_return->( @_ ); + } ); + $on_fail = $self->_capture_weakself( sub { + my $self = shift or return; + $self->debug_printf( "CONT on_error" ); + $on_error->( @_ ); + } ); + } + elsif( !defined wantarray ) { + croak "Expected either 'on_result' or 'on_return' and 'on_error' keys, or to return a Future"; + } + + my $request = freeze( $args ); + + my $future; + if( my $worker = $self->_get_worker ) { + $self->debug_printf( "CALL" ); + $future = $self->_call_worker( $worker, $request ); + } + else { + $self->debug_printf( "QUEUE" ); + push @{ $self->{pending_queue} }, my $wait_f = $self->loop->new_future; + + $future = $wait_f->then( sub { + my ( $self, $worker ) = @_; + $self->_call_worker( $worker, $request ); + }); + } + + $future->on_done( $on_done ) if $on_done; + $future->on_fail( $on_fail ) if $on_fail; + + return $future if defined wantarray; + + # Caller is not going to keep hold of the Future, so we have to ensure it + # stays alive somehow + $self->adopt_future( $future->else( sub { Future->done } ) ); +} + +sub _worker_objects +{ + my $self = shift; + return values %{ $self->{workers} }; +} + +=head2 $count = $function->workers + +Returns the total number of worker processes available + +=cut + +sub workers +{ + my $self = shift; + return scalar keys %{ $self->{workers} }; +} + +=head2 $count = $function->workers_busy + +Returns the number of worker processes that are currently busy + +=cut + +sub workers_busy +{ + my $self = shift; + return scalar grep { $_->{busy} } $self->_worker_objects; +} + +=head2 $count = $function->workers_idle + +Returns the number of worker processes that are currently idle + +=cut + +sub workers_idle +{ + my $self = shift; + return scalar grep { !$_->{busy} } $self->_worker_objects; +} + +sub _new_worker +{ + my $self = shift; + + my $worker = IO::Async::Function::Worker->new( + ( map { $_ => $self->{$_} } qw( model code setup exit_on_die ) ), + max_calls => $self->{max_worker_calls}, + + on_finish => $self->_capture_weakself( sub { + my $self = shift or return; + my ( $worker ) = @_; + + return if $self->{stopping}; + + $self->_new_worker if $self->workers < $self->{min_workers}; + + $self->_dispatch_pending; + } ), + ); + + $self->add_child( $worker ); + + return $self->{workers}{$worker->id} = $worker; +} + +sub _get_worker +{ + my $self = shift; + + foreach ( sort keys %{ $self->{workers} } ) { + return $self->{workers}{$_} if !$self->{workers}{$_}{busy}; + } + + if( $self->workers < $self->{max_workers} ) { + return $self->_new_worker; + } + + return undef; +} + +sub _call_worker +{ + my $self = shift; + my ( $worker, $type, $args ) = @_; + + my $future = $worker->call( $type, $args ); + + if( $self->workers_idle == 0 ) { + $self->{idle_timer}->stop if $self->{idle_timer}; + } + + return $future; +} + +sub _dispatch_pending +{ + my $self = shift; + + while( my $next = shift @{ $self->{pending_queue} } ) { + my $worker = $self->_get_worker or return; + + next if $next->is_cancelled; + + $self->debug_printf( "UNQUEUE" ); + $next->done( $self, $worker ); + return; + } + + if( $self->workers_idle > $self->{min_workers} ) { + $self->{idle_timer}->start if $self->{idle_timer} and !$self->{idle_timer}->is_running; + } +} + +package # hide from indexer + IO::Async::Function::Worker; + +use base qw( IO::Async::Routine ); + +use IO::Async::Channel; + +sub new +{ + my $class = shift; + my %params = @_; + + my $arg_channel = IO::Async::Channel->new; + my $ret_channel = IO::Async::Channel->new; + + my $code = delete $params{code}; + $params{code} = sub { + while( my $args = $arg_channel->recv ) { + my @ret; + my $ok = eval { @ret = $code->( @$args ); 1 }; + + if( $ok ) { + $ret_channel->send( [ r => @ret ] ); + } + else { + chomp( my $e = "$@" ); + $ret_channel->send( [ e => $e, error => ] ); + } + } + }; + + my $worker = $class->SUPER::new( + %params, + channels_in => [ $arg_channel ], + channels_out => [ $ret_channel ], + ); + + $worker->{arg_channel} = $arg_channel; + $worker->{ret_channel} = $ret_channel; + + return $worker; +} + +sub configure +{ + my $self = shift; + my %params = @_; + + exists $params{$_} and $self->{$_} = delete $params{$_} for qw( exit_on_die max_calls ); + + $self->SUPER::configure( %params ); +} + +sub stop +{ + my $worker = shift; + $worker->{arg_channel}->close; + + if( my $function = $worker->parent ) { + delete $function->{workers}{$worker->id}; + + if( $worker->{busy} ) { + $worker->{remove_on_idle}++; + } + else { + $function->remove_child( $worker ); + } + } +} + +sub call +{ + my $worker = shift; + my ( $args ) = @_; + + $worker->{arg_channel}->send_frozen( $args ); + + $worker->{busy} = 1; + $worker->{max_calls}--; + + return $worker->{ret_channel}->recv->then( + # on recv + $worker->_capture_weakself( sub { + my ( $worker, $result ) = @_; + my ( $type, @values ) = @$result; + + $worker->stop if !$worker->{max_calls} or + $worker->{exit_on_die} && $type eq "e"; + + if( $type eq "r" ) { + return Future->done( @values ); + } + elsif( $type eq "e" ) { + return Future->fail( @values ); + } + else { + die "Unrecognised type from worker - $type\n"; + } + } ), + # on EOF + $worker->_capture_weakself( sub { + my ( $worker ) = @_; + + $worker->stop; + + return Future->fail( "closed", "closed" ); + } ) + )->on_ready( $worker->_capture_weakself( sub { + my ( $worker, $f ) = @_; + $worker->{busy} = 0; + + my $function = $worker->parent; + $function->_dispatch_pending if $function; + + $function->remove_child( $worker ) if $function and $worker->{remove_on_idle}; + })); +} + +=head1 NOTES + +For the record, 123454321 is 11111 * 11111, a square number, and therefore not +prime. + +=head1 AUTHOR + +Paul Evans <leonerd@leonerd.org.uk> + +=cut + +0x55AA; |