diff options
Diffstat (limited to 'lib/IO/Async/Channel.pm')
-rw-r--r-- | lib/IO/Async/Channel.pm | 471 |
1 files changed, 471 insertions, 0 deletions
diff --git a/lib/IO/Async/Channel.pm b/lib/IO/Async/Channel.pm new file mode 100644 index 0000000..6e638f9 --- /dev/null +++ b/lib/IO/Async/Channel.pm @@ -0,0 +1,471 @@ +# You may distribute under the terms of either the GNU General Public License +# or the Artistic License (the same terms as Perl itself) +# +# (C) Paul Evans, 2011-2015 -- leonerd@leonerd.org.uk + +package IO::Async::Channel; + +use strict; +use warnings; +use base qw( IO::Async::Notifier ); + +our $VERSION = '0.67'; + +use Carp; + +use IO::Async::Stream; + +=head1 NAME + +C<IO::Async::Channel> - pass values into or out from an L<IO::Async::Routine> + +=head1 DESCRIPTION + +A C<IO::Async::Channel> object allows Perl values to be passed into or out of +an L<IO::Async::Routine>. It is intended to be used primarily with a Routine +object rather than independently. For more detail and examples on how to use +this object see also the documentation for L<IO::Async::Routine>. + +A Channel object is shared between the main process of the program and the +process running within the Routine. In the main process it will be used in +asynchronous mode, and in the Routine process it will be used in synchronous +mode. In asynchronous mode all methods return immediately and use +C<IO::Async>-style futures or callback functions. In synchronous within the +Routine process the methods block until they are ready and may be used for +flow-control within the routine. Alternatively, a Channel may be shared +between two different Routine objects, and not used directly by the +controlling program. + +The channel itself represents a FIFO of Perl reference values. New values may +be put into the channel by the C<send> method in either mode. Values may be +retrieved from it by the C<recv> method. Values inserted into the Channel are +snapshot by the C<send> method. Any changes to referred variables will not be +observed by the other end of the Channel after the C<send> method returns. + +=head1 PARAMETERS + +The following named parameters may be passed to C<new> or C<configure>: + +=head2 codec => STR + +Gives the name of the encoding method used to represent values over the +channel. + +By default this will be C<Storable>, to use the core L<Storable> module. As +this only supports references, to pass a single scalar value, C<send> a SCALAR +reference to it, and dereference the result of C<recv>. + +If the L<Sereal::Encoder> and L<Sereal::Decoder> modules are installed, this +can be set to C<Sereal> instead, and will use those to perform the encoding +and decoding. This optional dependency may give higher performance than using +C<Storable>. + +=cut + +=head1 CONSTRUCTOR + +=cut + +=head2 $channel = IO::Async::Channel->new + +Returns a new C<IO::Async::Channel> object. This object reference itself +should be shared by both sides of a C<fork()>ed process. After C<fork()> the +two C<setup_*> methods may be used to configure the object for operation on +either end. + +While this object does in fact inherit from L<IO::Async::Notifier>, it should +not be added to a Loop object directly; event management will be handled by +its containing C<IO::Async::Routine> object. + +=cut + +=head1 METHODS + +The following methods documented with a trailing call to C<< ->get >> return +L<Future> instances. + +=cut + +=head2 $channel->configure( %params ) + +Similar to the standard C<configure> method on C<IO::Async::Notifier>, this is +used to change details of the Channel's operation. + +=over 4 + +=item on_recv => CODE + +May only be set on an async mode channel. If present, will be invoked whenever +a new value is received, rather than using the C<recv> method. + + $on_recv->( $channel, $data ) + +=item on_eof => CODE + +May only be set on an async mode channel. If present, will be invoked when the +channel gets closed by the peer. + + $on_eof->( $channel ) + +=back + +=cut + +sub _init +{ + my $self = shift; + my ( $params ) = @_; + + $params->{codec} //= "Storable"; + + $self->SUPER::_init( $params ); +} + +sub configure +{ + my $self = shift; + my %params = @_; + + foreach (qw( on_recv on_eof )) { + next unless exists $params{$_}; + $self->{mode} and $self->{mode} eq "async" or + croak "Can only configure $_ in async mode"; + + $self->{$_} = delete $params{$_}; + $self->_build_stream; + } + + if( my $codec = delete $params{codec} ) { + ( $self->can( "_make_codec_$codec" ) or croak "Unrecognised codec name '$codec'" ) + ->( $self ); + } + + $self->SUPER::configure( %params ); +} + +sub _make_codec_Storable +{ + my $self = shift; + + require Storable; + + $self->{encode} = \&Storable::freeze; + $self->{decode} = \&Storable::thaw; +} + +sub _make_codec_Sereal +{ + my $self = shift; + + require Sereal::Encoder; + require Sereal::Decoder; + + my $encoder = Sereal::Encoder->new; + $self->{encode} = sub { $encoder->encode( $_[0] ) }; + + my $decoder = Sereal::Decoder->new; + $self->{decode} = sub { $decoder->decode( $_[0] ) }; +} + +=head2 $channel->send( $data ) + +Pushes the data stored in the given Perl reference into the FIFO of the +Channel, where it can be received by the other end. When called on a +synchronous mode Channel this method may block if a C<write()> call on the +underlying filehandle blocks. When called on an asynchronous mode channel this +method will not block. + +=cut + +sub send +{ + my $self = shift; + my ( $data ) = @_; + + $self->send_frozen( $self->{encode}->( $data ) ); +} + +=head2 $channel->send_frozen( $record ) + +A variant of the C<send> method; this method pushes the byte record given. +This should be the result of a call to C<Storable::freeze()>. + +=cut + +sub send_frozen +{ + my $self = shift; + my ( $record ) = @_; + + my $bytes = pack( "I", length $record ) . $record; + + defined $self->{mode} or die "Cannot ->send without being set up"; + + return $self->_send_sync( $bytes ) if $self->{mode} eq "sync"; + return $self->_send_async( $bytes ) if $self->{mode} eq "async"; +} + +=head2 $data = $channel->recv + +When called on a synchronous mode Channel this method will block until a Perl +reference value is available from the other end and then return it. If the +Channel is closed this method will return C<undef>. Since only references may +be passed and all Perl references are true the truth of the result of this +method can be used to detect that the channel is still open and has not yet +been closed. + +=head2 $data = $channel->recv->get + +When called on an asynchronous mode Channel this method returns a future which +will eventually yield the next Perl reference value that becomes available +from the other end. If the Channel is closed, the future will fail with an +C<eof> failure. + +=head2 $channel->recv( %args ) + +When not returning a future, takes the following named arguments: + +=over 8 + +=item on_recv => CODE + +Called when a new Perl reference value is available. Will be passed the +Channel object and the reference data. + + $on_recv->( $channel, $data ) + +=item on_eof => CODE + +Called if the Channel was closed before a new value was ready. Will be passed +the Channel object. + + $on_eof->( $channel ) + +=back + +=cut + +sub recv +{ + my $self = shift; + + defined $self->{mode} or die "Cannot ->recv without being set up"; + + return $self->_recv_sync( @_ ) if $self->{mode} eq "sync"; + return $self->_recv_async( @_ ) if $self->{mode} eq "async"; +} + +=head2 $channel->close + +Closes the channel. Causes a pending C<recv> on the other end to return undef +or the queued C<on_eof> callbacks to be invoked. + +=cut + +sub close +{ + my $self = shift; + + return $self->_close_sync if $self->{mode} eq "sync"; + return $self->_close_async if $self->{mode} eq "async"; +} + +# Leave this undocumented for now +sub setup_sync_mode +{ + my $self = shift; + ( $self->{fh} ) = @_; + + $self->{mode} = "sync"; + + # Since we're communicating binary structures and not Unicode text we need to + # enable binmode + binmode $self->{fh}; + + $self->{fh}->autoflush(1); +} + +sub _read_exactly +{ + $_[1] = ""; + + while( length $_[1] < $_[2] ) { + my $n = read( $_[0], $_[1], $_[2]-length $_[1], length $_[1] ); + defined $n or return undef; + $n or return ""; + } + + return $_[2]; +} + +sub _recv_sync +{ + my $self = shift; + + my $n = _read_exactly( $self->{fh}, my $lenbuffer, 4 ); + defined $n or die "Cannot read - $!"; + length $n or return undef; + + my $len = unpack( "I", $lenbuffer ); + + $n = _read_exactly( $self->{fh}, my $record, $len ); + defined $n or die "Cannot read - $!"; + length $n or return undef; + + return $self->{decode}->( $record ); +} + +sub _send_sync +{ + my $self = shift; + my ( $bytes ) = @_; + $self->{fh}->print( $bytes ); +} + +sub _close_sync +{ + my $self = shift; + $self->{fh}->close; +} + +# Leave this undocumented for now +sub setup_async_mode +{ + my $self = shift; + my %args = @_; + + exists $args{$_} and $self->{$_} = delete $args{$_} for qw( read_handle write_handle ); + + keys %args and croak "Unrecognised keys for setup_async_mode: " . join( ", ", keys %args ); + + $self->{mode} = "async"; +} + +sub _build_stream +{ + my $self = shift; + return $self->{stream} ||= do { + $self->{on_result_queue} = []; + + my $stream = IO::Async::Stream->new( + read_handle => $self->{read_handle}, + write_handle => $self->{write_handle}, + autoflush => 1, + on_read => $self->_capture_weakself( '_on_stream_read' ) + ); + + $self->add_child( $stream ); + + $stream; + }; +} + +sub _send_async +{ + my $self = shift; + my ( $bytes ) = @_; + $self->_build_stream->write( $bytes ); +} + +sub _recv_async +{ + my $self = shift; + my %args = @_; + + my $on_recv = $args{on_recv}; + my $on_eof = $args{on_eof}; + + my $stream = $self->_build_stream; + + my $f; + $f = $stream->loop->new_future unless !defined wantarray; + + push @{ $self->{on_result_queue} }, sub { + my ( $self, $type, $result ) = @_; + if( $type eq "recv" ) { + $f->done( $result ) if $f and !$f->is_cancelled; + $on_recv->( $self, $result ) if $on_recv; + } + else { + $f->fail( "EOF waiting for Channel recv", eof => ) if $f and !$f->is_cancelled; + $on_eof->( $self ) if $on_eof; + } + }; + + return $f; +} + +sub _close_async +{ + my $self = shift; + if( my $stream = $self->{stream} ) { + $stream->close_when_empty; + } + else { + $_ and $_->close for $self->{read_handle}, $self->{write_handle}; + } + + undef $_ for $self->{read_handle}, $self->{write_handle}; +} + +sub _on_stream_read +{ + my $self = shift or return; + my ( $stream, $buffref, $eof ) = @_; + + if( $eof ) { + while( my $on_result = shift @{ $self->{on_result_queue} } ) { + $on_result->( $self, eof => ); + } + $self->{on_eof}->( $self ) if $self->{on_eof}; + return; + } + + return 0 unless length( $$buffref ) >= 4; + my $len = unpack( "I", $$buffref ); + return 0 unless length( $$buffref ) >= 4 + $len; + + my $record = $self->{decode}->( substr( $$buffref, 4, $len ) ); + substr( $$buffref, 0, 4 + $len ) = ""; + + if( my $on_result = shift @{ $self->{on_result_queue} } ) { + $on_result->( $self, recv => $record ); + } + else { + $self->{on_recv}->( $self, $record ); + } + + return 1; +} + +sub _extract_read_handle +{ + my $self = shift; + + return undef if !$self->{mode}; + + croak "Cannot extract filehandle" if $self->{mode} ne "async"; + $self->{mode} = "dead"; + + return $self->{read_handle}; +} + +sub _extract_write_handle +{ + my $self = shift; + + return undef if !$self->{mode}; + + croak "Cannot extract filehandle" if $self->{mode} ne "async"; + $self->{mode} = "dead"; + + return $self->{write_handle}; +} + +=head1 AUTHOR + +Paul Evans <leonerd@leonerd.org.uk> + +=cut + +0x55AA; |