diff options
Diffstat (limited to 't/28filestream.t')
-rw-r--r-- | t/28filestream.t | 323 |
1 files changed, 323 insertions, 0 deletions
diff --git a/t/28filestream.t b/t/28filestream.t new file mode 100644 index 0000000..f51887e --- /dev/null +++ b/t/28filestream.t @@ -0,0 +1,323 @@ +#!/usr/bin/perl + +use strict; +use warnings; + +use IO::Async::Test; + +use Test::More; +use Test::Fatal; +use Test::Refcount; + +use Fcntl qw( SEEK_SET SEEK_END ); +use File::Temp qw( tempfile ); + +use IO::Async::Loop; + +use IO::Async::OS; + +use IO::Async::FileStream; + +use constant AUT => $ENV{TEST_QUICK_TIMERS} ? 0.1 : 1; + +my $loop = IO::Async::Loop->new_builtin; + +testing_loop( $loop ); + +sub mkhandles +{ + my ( $rd, $filename ) = tempfile( "tmpfile.XXXXXX", UNLINK => 1 ); + open my $wr, ">", $filename or die "Cannot reopen file for writing - $!"; + + $wr->autoflush( 1 ); + + return ( $rd, $wr, $filename ); +} + +{ + my ( $rd, $wr ) = mkhandles; + + my @lines; + my $initial_size; + + my $filestream = IO::Async::FileStream->new( + interval => 0.1 * AUT, + read_handle => $rd, + on_read => sub { + my $self = shift; + my ( $buffref, $eof ) = @_; + + push @lines, $1 while $$buffref =~ s/^(.*\n)//; + return 0; + }, + on_initial => sub { ( undef, $initial_size ) = @_ }, + ); + + ok( defined $filestream, '$filestream defined' ); + isa_ok( $filestream, "IO::Async::FileStream", '$filestream isa IO::Async::FileStream' ); + + is_oneref( $filestream, 'reading $filestream has refcount 1 initially' ); + + $loop->add( $filestream ); + + is_refcount( $filestream, 2, '$filestream has refcount 2 after adding to Loop' ); + + is( $initial_size, 0, '$initial_size is 0' ); + + $wr->syswrite( "message\n" ); + + is_deeply( \@lines, [], '@lines before wait' ); + + wait_for { scalar @lines }; + + is_deeply( \@lines, [ "message\n" ], '@lines after wait' ); + + $loop->remove( $filestream ); +} + +# on_initial +{ + my ( $rd, $wr ) = mkhandles; + + $wr->syswrite( "Some initial content\n" ); + + my @lines; + my $initial_size; + + my $filestream = IO::Async::FileStream->new( + interval => 0.1 * AUT, + read_handle => $rd, + on_read => sub { + my $self = shift; + my ( $buffref, $eof ) = @_; + + push @lines, $1 while $$buffref =~ s/^(.*\n)//; + return 0; + }, + on_initial => sub { ( undef, $initial_size ) = @_ }, + ); + + $loop->add( $filestream ); + + is( $initial_size, 21, '$initial_size is 21' ); + + $wr->syswrite( "More content\n" ); + + wait_for { scalar @lines }; + + is_deeply( \@lines, [ "Some initial content\n", "More content\n" ], 'All content is visible' ); + + $loop->remove( $filestream ); +} + +# seek_to_last +{ + my ( $rd, $wr ) = mkhandles; + + $wr->syswrite( "Some skipped content\nWith a partial line" ); + + my @lines; + + my $filestream = IO::Async::FileStream->new( + interval => 0.1 * AUT, + read_handle => $rd, + on_read => sub { + my $self = shift; + my ( $buffref, $eof ) = @_; + + return 0 unless( $$buffref =~ s/^(.*\n)// ); + + push @lines, $1; + return 1; + }, + on_initial => sub { + my $self = shift; + # Give it a tiny block size, forcing it to have to seek harder to find the \n + ok( $self->seek_to_last( "\n", blocksize => 8 ), 'FileStream successfully seeks to last \n' ); + }, + ); + + $loop->add( $filestream ); + + $wr->syswrite( " finished here\n" ); + + wait_for { scalar @lines }; + + is_deeply( \@lines, [ "With a partial line finished here\n" ], 'Partial line completely returned' ); + + $loop->remove( $filestream ); +} + +# on_initial can skip content +{ + my ( $rd, $wr ) = mkhandles; + + $wr->syswrite( "Some skipped content\n" ); + + my @lines; + + my $filestream = IO::Async::FileStream->new( + interval => 0.1 * AUT, + read_handle => $rd, + on_read => sub { + my $self = shift; + my ( $buffref, $eof ) = @_; + + return 0 unless( $$buffref =~ s/^(.*\n)// ); + + push @lines, $1; + return 1; + }, + on_initial => sub { my $self = shift; $self->seek( 0, SEEK_END ); }, + ); + + $loop->add( $filestream ); + + $wr->syswrite( "Additional content\n" ); + + wait_for { scalar @lines }; + + is_deeply( \@lines, [ "Additional content\n" ], 'Initial content is skipped' ); + + $loop->remove( $filestream ); +} + +# Truncation +{ + my ( $rd, $wr ) = mkhandles; + + my @lines; + my $truncated; + + my $filestream = IO::Async::FileStream->new( + interval => 0.1 * AUT, + read_handle => $rd, + on_read => sub { + my $self = shift; + my ( $buffref, $eof ) = @_; + + return 0 unless( $$buffref =~ s/^(.*\n)// ); + + push @lines, $1; + return 1; + }, + on_truncated => sub { $truncated++ }, + ); + + $loop->add( $filestream ); + + $wr->syswrite( "Some original lines\nin the file\n" ); + + wait_for { scalar @lines }; + + $wr->truncate( 0 ); + sysseek( $wr, 0, SEEK_SET ); + $wr->syswrite( "And another\n" ); + + wait_for { @lines == 3 }; + + is( $truncated, 1, 'File content truncation detected' ); + is_deeply( \@lines, + [ "Some original lines\n", "in the file\n", "And another\n" ], + 'All three lines read' ); + + $loop->remove( $filestream ); +} + +# Follow by name +SKIP: { + skip "OS is unable to rename open files", 7 unless IO::Async::OS->HAVE_RENAME_OPEN_FILES; + + my ( undef, $wr, $filename ) = mkhandles; + + my @lines; + + my $filestream = IO::Async::FileStream->new( + interval => 0.1 * AUT, + filename => $filename, + on_read => sub { + my $self = shift; + my ( $buffref, $eof ) = @_; + + push @lines, $1 while $$buffref =~ s/^(.*\n)//; + return 0; + }, + ); + + ok( defined $filestream, '$filestream defined for filenaem' ); + isa_ok( $filestream, "IO::Async::FileStream", '$filestream isa IO::Async::FileStream' ); + + is_oneref( $filestream, 'reading $filestream has refcount 1 initially' ); + + $loop->add( $filestream ); + + is_refcount( $filestream, 2, '$filestream has refcount 2 after adding to Loop' ); + + $wr->syswrite( "message\n" ); + wait_for { scalar @lines }; + + is_deeply( \@lines, [ "message\n" ], '@lines after wait' ); + shift @lines; + + $wr->syswrite( "last line of old file\n" ); + close $wr; + rename( $filename, "$filename.old" ) or die "Cannot rename $filename - $!"; + END { defined $filename and -f $filename and unlink $filename } + END { defined $filename and -f "$filename.old" and unlink "$filename.old" } + open $wr, ">", $filename or die "Cannot reopen $filename for writing - $!"; + $wr->syswrite( "first line of new file\n" ); + + wait_for { scalar @lines }; + is_deeply( $lines[0], "last line of old file\n", '@lines sees last line of old file' ); + wait_for { scalar @lines >= 2 }; + is_deeply( $lines[1], "first line of new file\n", '@lines sees first line of new file' ); + + $loop->remove( $filestream ); +} + +# Subclass +my @sub_lines; + +{ + my ( $rd, $wr ) = mkhandles; + + my $filestream = TestStream->new( + interval => 0.1 * AUT, + read_handle => $rd, + ); + + ok( defined $filestream, 'subclass $filestream defined' ); + isa_ok( $filestream, "IO::Async::FileStream", '$filestream isa IO::Async::FileStream' ); + + is_oneref( $filestream, 'subclass $filestream has refcount 1 initially' ); + + $loop->add( $filestream ); + + is_refcount( $filestream, 2, 'subclass $filestream has refcount 2 after adding to Loop' ); + + $wr->syswrite( "message\n" ); + + is_deeply( \@sub_lines, [], '@sub_lines before wait' ); + + wait_for { scalar @sub_lines }; + + is_deeply( \@sub_lines, [ "message\n" ], '@sub_lines after wait' ); + + $loop->remove( $filestream ); +} + +done_testing; + +package TestStream; +use base qw( IO::Async::FileStream ); + +sub on_read +{ + my $self = shift; + my ( $buffref ) = @_; + + return 0 unless $$buffref =~ s/^(.*\n)//; + + push @sub_lines, $1; + return 1; +} |