diff options
Diffstat (limited to 'lib/IO/Async/FileStream.pm')
-rw-r--r-- | lib/IO/Async/FileStream.pm | 413 |
1 files changed, 413 insertions, 0 deletions
diff --git a/lib/IO/Async/FileStream.pm b/lib/IO/Async/FileStream.pm new file mode 100644 index 0000000..96778c9 --- /dev/null +++ b/lib/IO/Async/FileStream.pm @@ -0,0 +1,413 @@ +# You may distribute under the terms of either the GNU General Public License +# or the Artistic License (the same terms as Perl itself) +# +# (C) Paul Evans, 2011-2012 -- leonerd@leonerd.org.uk + +package IO::Async::FileStream; + +use strict; +use warnings; + +our $VERSION = '0.67'; + +use base qw( IO::Async::Stream ); + +use IO::Async::File; + +use Carp; +use Fcntl qw( SEEK_SET SEEK_CUR ); + +=head1 NAME + +C<IO::Async::FileStream> - read the tail of a file + +=head1 SYNOPSIS + + use IO::Async::FileStream; + + use IO::Async::Loop; + my $loop = IO::Async::Loop->new; + + open my $logh, "<", "var/logs/daemon.log" or + die "Cannot open logfile - $!"; + + my $filestream = IO::Async::FileStream->new( + read_handle => $logh, + + on_initial => sub { + my ( $self ) = @_; + $self->seek_to_last( "\n" ); + }, + + on_read => sub { + my ( $self, $buffref ) = @_; + + while( $$buffref =~ s/^(.*\n)// ) { + print "Received a line $1"; + } + + return 0; + }, + ); + + $loop->add( $filestream ); + + $loop->run; + +=head1 DESCRIPTION + +This subclass of L<IO::Async::Stream> allows reading the end of a regular file +which is being appended to by some other process. It invokes the C<on_read> +event when more data has been added to the file. + +This class provides an API identical to C<IO::Async::Stream> when given a +C<read_handle>; it should be treated similarly. In particular, it can be given +an C<on_read> handler, or subclassed to provide an C<on_read> method, or even +used as the C<transport> for an C<IO::Async::Protocol::Stream> object. + +It will not support writing. + +To watch a file, directory, or other filesystem entity for updates of other +properties, such as C<mtime>, see also L<IO::Async::File>. + +=cut + +=head1 EVENTS + +The following events are invoked, either using subclass methods or CODE +references in parameters. + +Because this is a subclass of L<IO::Async::Stream> in read-only mode, all the +events supported by C<Stream> relating to the read handle are supported here. +This is not a full list; see also the documentation relating to +C<IO::Async::Stream>. + +=head2 $ret = on_read \$buffer, $eof + +Invoked when more data is available in the internal receiving buffer. + +Note that C<$eof> only indicates that all the data currently available in the +file has now been read; in contrast to a regular C<IO::Async::Stream>, this +object will not stop watching after this condition. Instead, it will continue +watching the file for updates. + +=head2 on_truncated + +Invoked when the file size shrinks. If this happens, it is presumed that the +file content has been replaced. Reading will then commence from the start of +the file. + +=head2 on_initial $size + +Invoked the first time the file is looked at. It is passed the initial size of +the file. The code implementing this method can use the C<seek> or +C<seek_to_last> methods to set the initial read position in the file to skip +over some initial content. + +This method may be useful to skip initial content in the file, if the object +should only respond to new content added after it was created. + +=cut + +sub _init +{ + my $self = shift; + my ( $params ) = @_; + + $self->SUPER::_init( $params ); + + $params->{close_on_read_eof} = 0; + + $self->{last_size} = undef; + + $self->add_child( $self->{file} = IO::Async::File->new( + on_devino_changed => $self->_replace_weakself( 'on_devino_changed' ), + on_size_changed => $self->_replace_weakself( 'on_size_changed' ), + ) ); +} + +=head1 PARAMETERS + +The following named parameters may be passed to C<new> or C<configure>, in +addition to the parameters relating to reading supported by +C<IO::Async::Stream>. + +=head2 filename => STRING + +Optional. If supplied, watches the named file rather than the filehandle given +in C<read_handle>. The file will be opened by the constructor, and then +watched for renames. If the file is renamed, the new filename is opened and +tracked similarly after closing the previous file. + +=head2 interval => NUM + +Optional. The interval in seconds to poll the filehandle using C<stat(2)> +looking for size changes. A default of 2 seconds will be applied if not +defined. + +=cut + +sub configure +{ + my $self = shift; + my %params = @_; + + foreach (qw( on_truncated on_initial )) { + $self->{$_} = delete $params{$_} if exists $params{$_}; + } + + foreach (qw( interval )) { + $self->{file}->configure( $_ => delete $params{$_} ) if exists $params{$_}; + } + if( exists $params{filename} ) { + $self->{file}->configure( filename => delete $params{filename} ); + $params{read_handle} = $self->{file}->handle; + } + elsif( exists $params{handle} or exists $params{read_handle} ) { + my $handle = delete $params{handle} // delete $params{read_handle}; + $self->{file}->configure( handle => $handle ); + $params{read_handle} = $self->{file}->handle; + } + + croak "Cannot have a write_handle in a ".ref($self) if defined $params{write_handle}; + + $self->SUPER::configure( %params ); + + if( $self->read_handle and !defined $self->{last_size} ) { + my $size = (stat $self->read_handle)[7]; + + $self->{last_size} = $size; + + local $self->{running_initial} = 1; + $self->maybe_invoke_event( on_initial => $size ); + } +} + +=head1 METHODS + +=cut + +# Replace IO::Async::Handle's implementation +sub _watch_read +{ + my $self = shift; + my ( $want ) = @_; + + if( $want ) { + $self->{file}->start if !$self->{file}->is_running; + } + else { + $self->{file}->stop; + } +} + +sub _watch_write +{ + my $self = shift; + my ( $want ) = @_; + + croak "Cannot _watch_write in " . ref($self) if $want; +} + +sub on_devino_changed +{ + my $self = shift or return; + + $self->{renamed} = 1; + $self->debug_printf( "read tail of old file" ); + $self->read_more; +} + +sub on_size_changed +{ + my $self = shift or return; + my ( $size ) = @_; + + if( $size < $self->{last_size} ) { + $self->maybe_invoke_event( on_truncated => ); + $self->{last_pos} = 0; + } + + $self->{last_size} = $size; + + $self->debug_printf( "read_more" ); + $self->read_more; +} + +sub read_more +{ + my $self = shift; + + sysseek( $self->read_handle, $self->{last_pos}, SEEK_SET ) if defined $self->{last_pos}; + + $self->on_read_ready; + + $self->{last_pos} = sysseek( $self->read_handle, 0, SEEK_CUR ); # == systell + + if( $self->{last_pos} < $self->{last_size} ) { + $self->loop->later( sub { $self->read_more } ); + } + elsif( $self->{renamed} ) { + $self->debug_printf( "reopening for rename" ); + + $self->{last_size} = 0; + + if( $self->{last_pos} ) { + $self->maybe_invoke_event( on_truncated => ); + $self->{last_pos} = 0; + $self->loop->later( sub { $self->read_more } ); + } + + $self->configure( read_handle => $self->{file}->handle ); + undef $self->{renamed}; + } +} + +sub write +{ + carp "Cannot ->write from a ".ref($_[0]); +} + +=head2 $filestream->seek( $offset, $whence ) + +Callable only during the C<on_initial> event. Moves the read position in the +filehandle to the given offset. C<$whence> is interpreted as for C<sysseek>, +should be either C<SEEK_SET>, C<SEEK_CUR> or C<SEEK_END>. Will be set to +C<SEEK_SET> if not provided. + +Normally this would be used to seek to the end of the file, for example + + on_initial => sub { + my ( $self, $filesize ) = @_; + $self->seek( $filesize ); + } + +=cut + +sub seek +{ + my $self = shift; + my ( $offset, $whence ) = @_; + + $self->{running_initial} or croak "Cannot ->seek except during on_initial"; + + defined $whence or $whence = SEEK_SET; + + sysseek( $self->read_handle, $offset, $whence ); +} + +=head2 $success = $filestream->seek_to_last( $str_pattern, %opts ) + +Callable only during the C<on_initial> event. Attempts to move the read +position in the filehandle to just after the last occurance of a given match. +C<$str_pattern> may be a literal string or regexp pattern. + +Returns a true value if the seek was successful, or false if not. Takes the +following named arguments: + +=over 8 + +=item blocksize => INT + +Optional. Read the file in blocks of this size. Will take a default of 8KiB if +not defined. + +=item horizon => INT + +Optional. Give up looking for a match after this number of bytes. Will take a +default value of 4 times the blocksize if not defined. + +To force it to always search through the entire file contents, set this +explicitly to C<0>. + +=back + +Because regular file reading happens synchronously, this entire method +operates entirely synchronously. If the file is very large, it may take a +while to read back through the entire contents. While this is happening no +other events can be invoked in the process. + +When looking for a string or regexp match, this method appends the +previously-read buffer to each block read from the file, in case a match +becomes split across two reads. If C<blocksize> is reduced to a very small +value, take care to ensure it isn't so small that a match may not be noticed. + +This is most likely useful for seeking after the last complete line in a +line-based log file, to commence reading from the end, while still managing to +capture any partial content that isn't yet a complete line. + + on_initial => sub { + my $self = shift; + $self->seek_to_last( "\n" ); + } + +=cut + +sub seek_to_last +{ + my $self = shift; + my ( $str_pattern, %opts ) = @_; + + $self->{running_initial} or croak "Cannot ->seek_to_last except during on_initial"; + + my $offset = $self->{last_size}; + + my $blocksize = $opts{blocksize} || 8192; + + defined $opts{horizon} or $opts{horizon} = $blocksize * 4; + my $horizon = $opts{horizon} ? $offset - $opts{horizon} : 0; + $horizon = 0 if $horizon < 0; + + my $re = ref $str_pattern ? $str_pattern : qr/\Q$str_pattern\E/; + + my $prev = ""; + while( $offset > $horizon ) { + my $len = $blocksize; + $len = $offset if $len > $offset; + $offset -= $len; + + sysseek( $self->read_handle, $offset, SEEK_SET ); + sysread( $self->read_handle, my $buffer, $blocksize ); + + # TODO: If $str_pattern is a plain string this could be more efficient + # using rindex + if( () = ( $buffer . $prev ) =~ m/$re/sg ) { + # $+[0] will be end of last match + my $pos = $offset + $+[0]; + $self->seek( $pos ); + return 1; + } + + $prev = $buffer; + } + + $self->seek( $horizon ); + return 0; +} + +=head1 TODO + +=over 4 + +=item * + +Move the actual file update watching code into C<IO::Async::Loop>, possibly as +a new watch/unwatch method pair C<watch_file>. + +=item * + +Consider if a construction-time parameter of C<seek_to_end> or C<seek_to_last> +might be neater than a small code block in C<on_initial>, if that turns out to +be the only or most common form of use. + +=back + +=cut + +=head1 AUTHOR + +Paul Evans <leonerd@leonerd.org.uk> + +=cut + +0x55AA; |