summaryrefslogtreecommitdiff
path: root/t/28filestream.t
diff options
context:
space:
mode:
Diffstat (limited to 't/28filestream.t')
-rw-r--r--t/28filestream.t323
1 files changed, 323 insertions, 0 deletions
diff --git a/t/28filestream.t b/t/28filestream.t
new file mode 100644
index 0000000..f51887e
--- /dev/null
+++ b/t/28filestream.t
@@ -0,0 +1,323 @@
+#!/usr/bin/perl
+
+use strict;
+use warnings;
+
+use IO::Async::Test;
+
+use Test::More;
+use Test::Fatal;
+use Test::Refcount;
+
+use Fcntl qw( SEEK_SET SEEK_END );
+use File::Temp qw( tempfile );
+
+use IO::Async::Loop;
+
+use IO::Async::OS;
+
+use IO::Async::FileStream;
+
+use constant AUT => $ENV{TEST_QUICK_TIMERS} ? 0.1 : 1;
+
+my $loop = IO::Async::Loop->new_builtin;
+
+testing_loop( $loop );
+
+sub mkhandles
+{
+ my ( $rd, $filename ) = tempfile( "tmpfile.XXXXXX", UNLINK => 1 );
+ open my $wr, ">", $filename or die "Cannot reopen file for writing - $!";
+
+ $wr->autoflush( 1 );
+
+ return ( $rd, $wr, $filename );
+}
+
+{
+ my ( $rd, $wr ) = mkhandles;
+
+ my @lines;
+ my $initial_size;
+
+ my $filestream = IO::Async::FileStream->new(
+ interval => 0.1 * AUT,
+ read_handle => $rd,
+ on_read => sub {
+ my $self = shift;
+ my ( $buffref, $eof ) = @_;
+
+ push @lines, $1 while $$buffref =~ s/^(.*\n)//;
+ return 0;
+ },
+ on_initial => sub { ( undef, $initial_size ) = @_ },
+ );
+
+ ok( defined $filestream, '$filestream defined' );
+ isa_ok( $filestream, "IO::Async::FileStream", '$filestream isa IO::Async::FileStream' );
+
+ is_oneref( $filestream, 'reading $filestream has refcount 1 initially' );
+
+ $loop->add( $filestream );
+
+ is_refcount( $filestream, 2, '$filestream has refcount 2 after adding to Loop' );
+
+ is( $initial_size, 0, '$initial_size is 0' );
+
+ $wr->syswrite( "message\n" );
+
+ is_deeply( \@lines, [], '@lines before wait' );
+
+ wait_for { scalar @lines };
+
+ is_deeply( \@lines, [ "message\n" ], '@lines after wait' );
+
+ $loop->remove( $filestream );
+}
+
+# on_initial
+{
+ my ( $rd, $wr ) = mkhandles;
+
+ $wr->syswrite( "Some initial content\n" );
+
+ my @lines;
+ my $initial_size;
+
+ my $filestream = IO::Async::FileStream->new(
+ interval => 0.1 * AUT,
+ read_handle => $rd,
+ on_read => sub {
+ my $self = shift;
+ my ( $buffref, $eof ) = @_;
+
+ push @lines, $1 while $$buffref =~ s/^(.*\n)//;
+ return 0;
+ },
+ on_initial => sub { ( undef, $initial_size ) = @_ },
+ );
+
+ $loop->add( $filestream );
+
+ is( $initial_size, 21, '$initial_size is 21' );
+
+ $wr->syswrite( "More content\n" );
+
+ wait_for { scalar @lines };
+
+ is_deeply( \@lines, [ "Some initial content\n", "More content\n" ], 'All content is visible' );
+
+ $loop->remove( $filestream );
+}
+
+# seek_to_last
+{
+ my ( $rd, $wr ) = mkhandles;
+
+ $wr->syswrite( "Some skipped content\nWith a partial line" );
+
+ my @lines;
+
+ my $filestream = IO::Async::FileStream->new(
+ interval => 0.1 * AUT,
+ read_handle => $rd,
+ on_read => sub {
+ my $self = shift;
+ my ( $buffref, $eof ) = @_;
+
+ return 0 unless( $$buffref =~ s/^(.*\n)// );
+
+ push @lines, $1;
+ return 1;
+ },
+ on_initial => sub {
+ my $self = shift;
+ # Give it a tiny block size, forcing it to have to seek harder to find the \n
+ ok( $self->seek_to_last( "\n", blocksize => 8 ), 'FileStream successfully seeks to last \n' );
+ },
+ );
+
+ $loop->add( $filestream );
+
+ $wr->syswrite( " finished here\n" );
+
+ wait_for { scalar @lines };
+
+ is_deeply( \@lines, [ "With a partial line finished here\n" ], 'Partial line completely returned' );
+
+ $loop->remove( $filestream );
+}
+
+# on_initial can skip content
+{
+ my ( $rd, $wr ) = mkhandles;
+
+ $wr->syswrite( "Some skipped content\n" );
+
+ my @lines;
+
+ my $filestream = IO::Async::FileStream->new(
+ interval => 0.1 * AUT,
+ read_handle => $rd,
+ on_read => sub {
+ my $self = shift;
+ my ( $buffref, $eof ) = @_;
+
+ return 0 unless( $$buffref =~ s/^(.*\n)// );
+
+ push @lines, $1;
+ return 1;
+ },
+ on_initial => sub { my $self = shift; $self->seek( 0, SEEK_END ); },
+ );
+
+ $loop->add( $filestream );
+
+ $wr->syswrite( "Additional content\n" );
+
+ wait_for { scalar @lines };
+
+ is_deeply( \@lines, [ "Additional content\n" ], 'Initial content is skipped' );
+
+ $loop->remove( $filestream );
+}
+
+# Truncation
+{
+ my ( $rd, $wr ) = mkhandles;
+
+ my @lines;
+ my $truncated;
+
+ my $filestream = IO::Async::FileStream->new(
+ interval => 0.1 * AUT,
+ read_handle => $rd,
+ on_read => sub {
+ my $self = shift;
+ my ( $buffref, $eof ) = @_;
+
+ return 0 unless( $$buffref =~ s/^(.*\n)// );
+
+ push @lines, $1;
+ return 1;
+ },
+ on_truncated => sub { $truncated++ },
+ );
+
+ $loop->add( $filestream );
+
+ $wr->syswrite( "Some original lines\nin the file\n" );
+
+ wait_for { scalar @lines };
+
+ $wr->truncate( 0 );
+ sysseek( $wr, 0, SEEK_SET );
+ $wr->syswrite( "And another\n" );
+
+ wait_for { @lines == 3 };
+
+ is( $truncated, 1, 'File content truncation detected' );
+ is_deeply( \@lines,
+ [ "Some original lines\n", "in the file\n", "And another\n" ],
+ 'All three lines read' );
+
+ $loop->remove( $filestream );
+}
+
+# Follow by name
+SKIP: {
+ skip "OS is unable to rename open files", 7 unless IO::Async::OS->HAVE_RENAME_OPEN_FILES;
+
+ my ( undef, $wr, $filename ) = mkhandles;
+
+ my @lines;
+
+ my $filestream = IO::Async::FileStream->new(
+ interval => 0.1 * AUT,
+ filename => $filename,
+ on_read => sub {
+ my $self = shift;
+ my ( $buffref, $eof ) = @_;
+
+ push @lines, $1 while $$buffref =~ s/^(.*\n)//;
+ return 0;
+ },
+ );
+
+ ok( defined $filestream, '$filestream defined for filenaem' );
+ isa_ok( $filestream, "IO::Async::FileStream", '$filestream isa IO::Async::FileStream' );
+
+ is_oneref( $filestream, 'reading $filestream has refcount 1 initially' );
+
+ $loop->add( $filestream );
+
+ is_refcount( $filestream, 2, '$filestream has refcount 2 after adding to Loop' );
+
+ $wr->syswrite( "message\n" );
+ wait_for { scalar @lines };
+
+ is_deeply( \@lines, [ "message\n" ], '@lines after wait' );
+ shift @lines;
+
+ $wr->syswrite( "last line of old file\n" );
+ close $wr;
+ rename( $filename, "$filename.old" ) or die "Cannot rename $filename - $!";
+ END { defined $filename and -f $filename and unlink $filename }
+ END { defined $filename and -f "$filename.old" and unlink "$filename.old" }
+ open $wr, ">", $filename or die "Cannot reopen $filename for writing - $!";
+ $wr->syswrite( "first line of new file\n" );
+
+ wait_for { scalar @lines };
+ is_deeply( $lines[0], "last line of old file\n", '@lines sees last line of old file' );
+ wait_for { scalar @lines >= 2 };
+ is_deeply( $lines[1], "first line of new file\n", '@lines sees first line of new file' );
+
+ $loop->remove( $filestream );
+}
+
+# Subclass
+my @sub_lines;
+
+{
+ my ( $rd, $wr ) = mkhandles;
+
+ my $filestream = TestStream->new(
+ interval => 0.1 * AUT,
+ read_handle => $rd,
+ );
+
+ ok( defined $filestream, 'subclass $filestream defined' );
+ isa_ok( $filestream, "IO::Async::FileStream", '$filestream isa IO::Async::FileStream' );
+
+ is_oneref( $filestream, 'subclass $filestream has refcount 1 initially' );
+
+ $loop->add( $filestream );
+
+ is_refcount( $filestream, 2, 'subclass $filestream has refcount 2 after adding to Loop' );
+
+ $wr->syswrite( "message\n" );
+
+ is_deeply( \@sub_lines, [], '@sub_lines before wait' );
+
+ wait_for { scalar @sub_lines };
+
+ is_deeply( \@sub_lines, [ "message\n" ], '@sub_lines after wait' );
+
+ $loop->remove( $filestream );
+}
+
+done_testing;
+
+package TestStream;
+use base qw( IO::Async::FileStream );
+
+sub on_read
+{
+ my $self = shift;
+ my ( $buffref ) = @_;
+
+ return 0 unless $$buffref =~ s/^(.*\n)//;
+
+ push @sub_lines, $1;
+ return 1;
+}