summaryrefslogtreecommitdiff
path: root/t/42function.t
diff options
context:
space:
mode:
Diffstat (limited to 't/42function.t')
-rw-r--r--t/42function.t569
1 files changed, 569 insertions, 0 deletions
diff --git a/t/42function.t b/t/42function.t
new file mode 100644
index 0000000..f4b1c4d
--- /dev/null
+++ b/t/42function.t
@@ -0,0 +1,569 @@
+#!/usr/bin/perl
+
+use strict;
+use warnings;
+
+use IO::Async::Test;
+
+use Test::More;
+use Test::Fatal;
+use Test::Refcount;
+use constant HAVE_TEST_MEMORYGROWTH => eval { require Test::MemoryGrowth; };
+
+use File::Temp qw( tempdir );
+use Time::HiRes qw( sleep );
+
+use IO::Async::Function;
+
+use IO::Async::OS;
+
+use IO::Async::Loop;
+
+use constant AUT => $ENV{TEST_QUICK_TIMERS} ? 0.1 : 1;
+
+my $loop = IO::Async::Loop->new_builtin;
+
+testing_loop( $loop );
+
+# by future
+{
+ my $function = IO::Async::Function->new(
+ min_workers => 1,
+ max_workers => 1,
+ code => sub { return $_[0] + $_[1] },
+ );
+
+ ok( defined $function, '$function defined' );
+ isa_ok( $function, "IO::Async::Function", '$function isa IO::Async::Function' );
+
+ is_oneref( $function, '$function has refcount 1' );
+
+ $loop->add( $function );
+
+ is_refcount( $function, 2, '$function has refcount 2 after $loop->add' );
+
+ is( $function->workers, 1, '$function has 1 worker' );
+ is( $function->workers_busy, 0, '$function has 0 workers busy' );
+ is( $function->workers_idle, 1, '$function has 1 workers idle' );
+
+ my $future = $function->call(
+ args => [ 10, 20 ],
+ );
+
+ isa_ok( $future, "Future", '$future' );
+
+ is_refcount( $function, 2, '$function has refcount 2 after ->call' );
+
+ is( $function->workers_busy, 1, '$function has 1 worker busy after ->call' );
+ is( $function->workers_idle, 0, '$function has 0 worker idle after ->call' );
+
+ wait_for { $future->is_ready };
+
+ my ( $result ) = $future->get;
+
+ is( $result, 30, '$result after call returns by future' );
+
+ is( $function->workers_busy, 0, '$function has 0 workers busy after call returns' );
+ is( $function->workers_idle, 1, '$function has 1 workers idle after call returns' );
+
+ $loop->remove( $function );
+}
+
+# by callback
+{
+ my $function = IO::Async::Function->new(
+ min_workers => 1,
+ max_workers => 1,
+ code => sub { return $_[0] + $_[1] },
+ );
+
+ $loop->add( $function );
+
+ my $result;
+
+ $function->call(
+ args => [ 10, 20 ],
+ on_return => sub { $result = shift },
+ on_error => sub { die "Test failed early - @_" },
+ );
+
+ wait_for { defined $result };
+
+ is( $result, 30, '$result after call returns by callback' );
+
+ $loop->remove( $function );
+}
+
+# Test queueing
+{
+ my $function = IO::Async::Function->new(
+ min_workers => 1,
+ max_workers => 1,
+ code => sub { return $_[0] + $_[1] },
+ );
+
+ $loop->add( $function );
+
+ my @result;
+
+ my $f1 = $function->call(
+ args => [ 1, 2 ],
+ on_return => sub { push @result, shift },
+ on_error => sub { die "Test failed early - @_" },
+ );
+ my $f2 = $function->call(
+ args => [ 3, 4 ],
+ on_return => sub { push @result, shift },
+ on_error => sub { die "Test failed early - @_" },
+ );
+
+ is( $function->workers, 1, '$function->workers is still 1 after 2 calls' );
+
+ isa_ok( $f1, "Future", '$f1' );
+ isa_ok( $f2, "Future", '$f2' );
+
+ wait_for { @result == 2 };
+
+ is_deeply( \@result, [ 3, 7 ], '@result after both calls return' );
+
+ is( $function->workers, 1, '$function->workers is still 1 after 2 calls return' );
+
+ $loop->remove( $function );
+}
+
+# References
+{
+ my $function = IO::Async::Function->new(
+ code => sub { return ref( $_[0] ), \$_[1] },
+ );
+
+ $loop->add( $function );
+
+ my @result;
+
+ $function->call(
+ args => [ \'a', 'b' ],
+ on_return => sub { @result = @_ },
+ on_error => sub { die "Test failed early - @_" },
+ );
+
+ wait_for { scalar @result };
+
+ is_deeply( \@result, [ 'SCALAR', \'b' ], 'Call and result preserves references' );
+
+ $loop->remove( $function );
+}
+
+# Exception throwing
+{
+ my $line = __LINE__ + 2;
+ my $function = IO::Async::Function->new(
+ code => sub { die shift },
+ );
+
+ $loop->add( $function );
+
+ my $err;
+
+ my $f = $function->call(
+ args => [ "exception name" ],
+ on_return => sub { },
+ on_error => sub { $err = shift },
+ );
+
+ wait_for { defined $err };
+
+ like( $err, qr/^exception name at \Q$0\E line \d+\.$/, '$err after exception' );
+
+ is_deeply( [ $f->failure ],
+ [ "exception name at $0 line $line.", error => ],
+ '$f->failure after exception' );
+
+ $loop->remove( $function );
+}
+
+# max_workers
+{
+ my $count = 0;
+
+ my $function = IO::Async::Function->new(
+ max_workers => 1,
+ code => sub { $count++; die "$count\n" },
+ exit_on_die => 0,
+ );
+
+ $loop->add( $function );
+
+ my @errs;
+ $function->call(
+ args => [],
+ on_return => sub { },
+ on_error => sub { push @errs, shift },
+ );
+ $function->call(
+ args => [],
+ on_return => sub { },
+ on_error => sub { push @errs, shift },
+ );
+
+ undef @errs;
+ wait_for { scalar @errs == 2 };
+
+ is_deeply( \@errs, [ "1", "2" ], 'Closed variables preserved when exit_on_die => 0' );
+
+ $loop->remove( $function );
+}
+
+# exit_on_die
+{
+ my $count = 0;
+
+ my $function = IO::Async::Function->new(
+ max_workers => 1,
+ code => sub { $count++; die "$count\n" },
+ exit_on_die => 1,
+ );
+
+ $loop->add( $function );
+
+ my @errs;
+ $function->call(
+ args => [],
+ on_return => sub { },
+ on_error => sub { push @errs, shift },
+ );
+ $function->call(
+ args => [],
+ on_return => sub { },
+ on_error => sub { push @errs, shift },
+ );
+
+ undef @errs;
+ wait_for { scalar @errs == 2 };
+
+ is_deeply( \@errs, [ "1", "1" ], 'Closed variables preserved when exit_on_die => 1' );
+
+ $loop->remove( $function );
+}
+
+# restart after exit
+SKIP: {
+ skip "This Perl does not support fork()", 4
+ if not IO::Async::OS->HAVE_POSIX_FORK;
+
+ my $function = IO::Async::Function->new(
+ model => "fork",
+ min_workers => 0,
+ max_workers => 1,
+ code => sub { $_[0] ? exit shift : return 0 },
+ );
+
+ $loop->add( $function );
+
+ my $err;
+
+ $function->call(
+ args => [ 16 ],
+ on_return => sub { $err = "" },
+ on_error => sub { $err = [ @_ ] },
+ );
+
+ wait_for { defined $err };
+
+ # Not sure what reason we might get - need to check both
+ ok( $err->[0] eq "closed" || $err->[0] eq "exit", '$err->[0] after child death' )
+ or diag( 'Expected "closed" or "exit", found ' . $err->[0] );
+
+ is( scalar $function->workers, 0, '$function->workers is now 0' );
+
+ $function->call(
+ args => [ 0 ],
+ on_return => sub { $err = "return" },
+ on_error => sub { $err = [ @_ ] },
+ );
+
+ is( scalar $function->workers, 1, '$function->workers is now 1 again' );
+
+ undef $err;
+ wait_for { defined $err };
+
+ is( $err, "return", '$err is "return" after child nondeath' );
+
+ $loop->remove( $function );
+}
+
+## Now test that parallel runs really are parallel
+{
+ # touch $dir/$n in each worker, touch $dir/done to finish it
+ sub touch
+ {
+ my ( $file ) = @_;
+
+ open( my $fh, ">", $file ) or die "Cannot write $file - $!";
+ close( $fh );
+ }
+
+ my $function = IO::Async::Function->new(
+ min_workers => 3,
+ code => sub {
+ my ( $dir, $n ) = @_;
+ my $file = "$dir/$n";
+
+ touch( $file );
+
+ # Wait for synchronisation
+ sleep 0.1 while ! -e "$dir/done";
+
+ unlink( $file );
+
+ return $n;
+ },
+ );
+
+ $loop->add( $function );
+
+ is( scalar $function->workers, 3, '$function->workers is 3' );
+
+ my $dir = tempdir( CLEANUP => 1 );
+
+ my %ret;
+
+ foreach my $id ( 1, 2, 3 ) {
+ $function->call(
+ args => [ $dir, $id ],
+ on_return => sub { $ret{$id} = shift },
+ on_error => sub { die "Test failed early - @_" },
+ );
+ }
+
+ wait_for { -e "$dir/1" and -e "$dir/2" and -e "$dir/3" };
+
+ ok( 1, 'synchronise files created' );
+
+ # Synchronize deleting them;
+ touch( "$dir/done" );
+
+ undef %ret;
+ wait_for { keys %ret == 3 };
+
+ unlink( "$dir/done" );
+
+ is_deeply( \%ret, { 1 => 1, 2 => 2, 3 => 3 }, 'ret keys after parallel run' );
+
+ is( scalar $function->workers, 3, '$function->workers is still 3' );
+
+ $loop->remove( $function );
+}
+
+# Test for idle timeout
+{
+ my $function = IO::Async::Function->new(
+ min_workers => 0,
+ max_workers => 1,
+ idle_timeout => 2 * AUT,
+ code => sub { return $_[0] },
+ );
+
+ $loop->add( $function );
+
+ my $result;
+
+ $function->call(
+ args => [ 1 ],
+ on_result => sub { $result = $_[0] },
+ );
+
+ wait_for { defined $result };
+
+ is( $function->workers, 1, '$function has 1 worker after call' );
+
+ my $waited;
+ $loop->watch_time( after => 1 * AUT, code => sub { $waited++ } );
+
+ wait_for { $waited };
+
+ is( $function->workers, 1, '$function still has 1 worker after short delay' );
+
+ undef $result;
+ $function->call(
+ args => [ 1 ],
+ on_result => sub { $result = $_[0] },
+ );
+
+ wait_for { defined $result };
+
+ undef $waited;
+ $loop->watch_time( after => 3 * AUT, code => sub { $waited++ } );
+
+ wait_for { $waited };
+
+ is( $function->workers, 0, '$function has 0 workers after longer delay' );
+
+ $loop->remove( $function );
+}
+
+# Restart
+{
+ my $value = 1;
+
+ my $function = IO::Async::Function->new(
+ code => sub { return $value },
+ );
+
+ $loop->add( $function );
+
+ my $result;
+ $function->call(
+ args => [],
+ on_return => sub { $result = shift },
+ on_error => sub { die "Test failed early - @_" },
+ );
+
+ wait_for { defined $result };
+
+ is( $result, 1, '$result before restart' );
+
+ $value = 2;
+ $function->restart;
+
+ undef $result;
+ $function->call(
+ args => [],
+ on_return => sub { $result = shift },
+ on_error => sub { die "Test failed early - @_" },
+ );
+
+ wait_for { defined $result };
+
+ is( $result, 2, '$result after restart' );
+
+ undef $result;
+ $function->call(
+ args => [],
+ on_return => sub { $result = shift },
+ on_error => sub { die "Test failed early - @_" },
+ );
+
+ $function->restart;
+
+ wait_for { defined $result };
+
+ is( $result, 2, 'call before restart still returns result' );
+
+ $loop->remove( $function );
+}
+
+# max_worker_calls
+{
+ my $counter;
+ my $function = IO::Async::Function->new(
+ max_workers => 1,
+ max_worker_calls => 2,
+ code => sub { return ++$counter; }
+ );
+
+ $loop->add( $function );
+
+ my $result;
+ $function->call(
+ args => [],
+ on_return => sub { $result = shift },
+ on_error => sub { die "Test failed early - @_" },
+ );
+ wait_for { defined $result };
+ is( $result, 1, '$result from first call' );
+
+ undef $result;
+ $function->call(
+ args => [],
+ on_return => sub { $result = shift },
+ on_error => sub { die "Test failed early - @_" },
+ );
+ wait_for { defined $result };
+ is( $result, 2, '$result from second call' );
+
+ undef $result;
+ $function->call(
+ args => [],
+ on_return => sub { $result = shift },
+ on_error => sub { die "Test failed early - @_" },
+ );
+ wait_for { defined $result };
+ is( $result, 1, '$result from third call' );
+
+ $loop->remove( $function );
+}
+
+# Cancellation of sent calls
+{
+ my $function = IO::Async::Function->new(
+ max_workers => 1,
+ code => sub {
+ return 123;
+ },
+ );
+
+ $loop->add( $function );
+
+ my $f1 = $function->call( args => [] );
+ $f1->cancel;
+
+ my $f2 = $function->call( args => [] );
+
+ wait_for { $f2->is_ready };
+
+ is( scalar $f2->get, 123, 'Result of function call after cancelled call' );
+
+ $loop->remove( $function );
+}
+
+# Cancellation of pending calls
+{
+ my $function = IO::Async::Function->new(
+ max_workers => 1,
+ code => do { my $state; sub {
+ my $oldstate = $state;
+ $state = shift;
+ return $oldstate;
+ } },
+ );
+
+ $loop->add( $function );
+
+ # Queue 3 calls but immediately cancel the middle one
+ my ( $f1, $f2, $f3 ) = map {
+ $function->call( args => [ $_ ] )
+ } 1 .. 3;
+
+ $f2->cancel;
+
+ wait_for { $f1->is_ready and $f3->is_ready };
+
+ is( scalar $f1->get, undef, '$f1 result is undef' );
+ is( scalar $f3->get, 1, '$f3 result is 1' );
+
+ $loop->remove( $function );
+}
+
+# Leak test (RT99552)
+if( HAVE_TEST_MEMORYGROWTH ) {
+ diag( "Performing memory leak test" );
+
+ my $function = IO::Async::Function->new(
+ max_workers => 8,
+ code => sub {},
+ );
+
+ $loop->add( $function );
+
+ Test::MemoryGrowth::no_growth( sub {
+ $function->restart;
+ $function->call( args => [] )->get;
+ }, calls => 100,
+ 'IO::Async::Function calls do not leak memory' );
+
+ $loop->remove( $function );
+ undef $function;
+}
+
+done_testing;