diff options
Diffstat (limited to 'lib/IO/Async/Stream.pm')
-rw-r--r-- | lib/IO/Async/Stream.pm | 1419 |
1 files changed, 1419 insertions, 0 deletions
diff --git a/lib/IO/Async/Stream.pm b/lib/IO/Async/Stream.pm new file mode 100644 index 0000000..487eb38 --- /dev/null +++ b/lib/IO/Async/Stream.pm @@ -0,0 +1,1419 @@ +# You may distribute under the terms of either the GNU General Public License +# or the Artistic License (the same terms as Perl itself) +# +# (C) Paul Evans, 2006-2015 -- leonerd@leonerd.org.uk + +package IO::Async::Stream; + +use strict; +use warnings; +use 5.010; # // + +our $VERSION = '0.67'; + +use base qw( IO::Async::Handle ); + +use Errno qw( EAGAIN EWOULDBLOCK EINTR EPIPE ); + +use Carp; + +use Encode 2.11 qw( find_encoding STOP_AT_PARTIAL ); +use Scalar::Util qw( blessed ); + +use IO::Async::Debug; + +# Tuneable from outside +# Not yet documented +our $READLEN = 8192; +our $WRITELEN = 8192; + +use Struct::Dumb; + +# Element of the writequeue +struct Writer => [qw( data writelen on_write on_flush on_error watching )]; + +# Element of the readqueue +struct Reader => [qw( on_read future )]; + +# Bitfields in the want flags +use constant WANT_READ_FOR_READ => 0x01; +use constant WANT_READ_FOR_WRITE => 0x02; +use constant WANT_WRITE_FOR_READ => 0x04; +use constant WANT_WRITE_FOR_WRITE => 0x08; +use constant WANT_ANY_READ => WANT_READ_FOR_READ |WANT_READ_FOR_WRITE; +use constant WANT_ANY_WRITE => WANT_WRITE_FOR_READ|WANT_WRITE_FOR_WRITE; + +=head1 NAME + +C<IO::Async::Stream> - event callbacks and write bufering for a stream +filehandle + +=head1 SYNOPSIS + + use IO::Async::Stream; + + use IO::Async::Loop; + my $loop = IO::Async::Loop->new; + + my $stream = IO::Async::Stream->new( + read_handle => \*STDIN, + write_handle => \*STDOUT, + + on_read => sub { + my ( $self, $buffref, $eof ) = @_; + + while( $$buffref =~ s/^(.*\n)// ) { + print "Received a line $1"; + } + + if( $eof ) { + print "EOF; last partial line is $$buffref\n"; + } + + return 0; + } + ); + + $loop->add( $stream ); + + $stream->write( "An initial line here\n" ); + +=head1 DESCRIPTION + +This subclass of L<IO::Async::Handle> contains a filehandle that represents +a byte-stream. It provides buffering for both incoming and outgoing data. It +invokes the C<on_read> handler when new data is read from the filehandle. Data +may be written to the filehandle by calling the C<write> method. + +This class is suitable for any kind of filehandle that provides a +possibly-bidirectional reliable byte stream, such as a pipe, TTY, or +C<SOCK_STREAM> socket (such as TCP or a byte-oriented UNIX local socket). For +datagram or raw message-based sockets (such as UDP) see instead +L<IO::Async::Socket>. + +=cut + +=head1 EVENTS + +The following events are invoked, either using subclass methods or CODE +references in parameters: + +=head2 $ret = on_read \$buffer, $eof + +Invoked when more data is available in the internal receiving buffer. + +The first argument is a reference to a plain perl string. The code should +inspect and remove any data it likes, but is not required to remove all, or +indeed any of the data. Any data remaining in the buffer will be preserved for +the next call, the next time more data is received from the handle. + +In this way, it is easy to implement code that reads records of some form when +completed, but ignores partially-received records, until all the data is +present. If the handler is confident no more useful data remains, it should +return C<0>. If not, it should return C<1>, and the handler will be called +again. This makes it easy to implement code that handles multiple incoming +records at the same time. See the examples at the end of this documentation +for more detail. + +The second argument is a scalar indicating whether the stream has reported an +end-of-file (EOF) condition. A reference to the buffer is passed to the +handler in the usual way, so it may inspect data contained in it. Once the +handler returns a false value, it will not be called again, as the handle is +now at EOF and no more data can arrive. + +The C<on_read> code may also dynamically replace itself with a new callback +by returning a CODE reference instead of C<0> or C<1>. The original callback +or method that the object first started with may be restored by returning +C<undef>. Whenever the callback is changed in this way, the new code is called +again; even if the read buffer is currently empty. See the examples at the end +of this documentation for more detail. + +The C<push_on_read> method can be used to insert new, temporary handlers that +take precedence over the global C<on_read> handler. This event is only used if +there are no further pending handlers created by C<push_on_read>. + +=head2 on_read_eof + +Optional. Invoked when the read handle indicates an end-of-file (EOF) +condition. If there is any data in the buffer still to be processed, the +C<on_read> event will be invoked first, before this one. + +=head2 on_write_eof + +Optional. Invoked when the write handle indicates an end-of-file (EOF) +condition. Note that this condition can only be detected after a C<write> +syscall returns the C<EPIPE> error. If there is no data pending to be written +then it will not be detected yet. + +=head2 on_read_error $errno + +Optional. Invoked when the C<sysread> method on the read handle fails. + +=head2 on_write_error $errno + +Optional. Invoked when the C<syswrite> method on the write handle fails. + +The C<on_read_error> and C<on_write_error> handlers are passed the value of +C<$!> at the time the error occured. (The C<$!> variable itself, by its +nature, may have changed from the original error by the time this handler +runs so it should always use the value passed in). + +If an error occurs when the corresponding error callback is not supplied, and +there is not a handler for it, then the C<close> method is called instead. + +=head2 on_read_high_watermark $length + +=head2 on_read_low_watermark $length + +Optional. Invoked when the read buffer grows larger than the high watermark +or smaller than the low watermark respectively. These are edge-triggered +events; they will only be triggered once per crossing, not continuously while +the buffer remains above or below the given limit. + +If these event handlers are not defined, the default behaviour is to disable +read-ready notifications if the read buffer grows larger than the high +watermark (so as to avoid it growing arbitrarily if nothing is consuming it), +and re-enable notifications again once something has read enough to cause it to +drop. If these events are overridden, the overriding code will have to perform +this behaviour if required, by using + + $self->want_readready_for_read(...) + +=head2 on_outgoing_empty + +Optional. Invoked when the writing data buffer becomes empty. + +=head2 on_writeable_start + +=head2 on_writeable_stop + +Optional. These two events inform when the filehandle becomes writeable, and +when it stops being writeable. C<on_writeable_start> is invoked by the +C<on_write_ready> event if previously it was known to be not writeable. +C<on_writeable_stop> is invoked after a C<syswrite> operation fails with +C<EAGAIN> or C<EWOULDBLOCK>. These two events track the writeability state, +and ensure that only state change cause events to be invoked. A stream starts +off being presumed writeable, so the first of these events to be observed will +be C<on_writeable_stop>. + +=cut + +sub _init +{ + my $self = shift; + + $self->{writequeue} = []; # Queue of Writers + $self->{readqueue} = []; # Queue of Readers + $self->{writeable} = 1; # "innocent until proven guilty" (by means of EAGAIN) + $self->{readbuff} = ""; + + $self->{reader} = "_sysread"; + $self->{writer} = "_syswrite"; + + $self->{read_len} = $READLEN; + $self->{write_len} = $WRITELEN; + + $self->{want} = WANT_READ_FOR_READ; + + $self->{close_on_read_eof} = 1; +} + +=head1 PARAMETERS + +The following named parameters may be passed to C<new> or C<configure>: + +=head2 read_handle => IO + +The IO handle to read from. Must implement C<fileno> and C<sysread> methods. + +=head2 write_handle => IO + +The IO handle to write to. Must implement C<fileno> and C<syswrite> methods. + +=head2 handle => IO + +Shortcut to specifying the same IO handle for both of the above. + +=head2 on_read => CODE + +=head2 on_read_error => CODE + +=head2 on_outgoing_empty => CODE + +=head2 on_write_error => CODE + +=head2 on_writeable_start => CODE + +=head2 on_writeable_stop => CODE + +CODE references for event handlers. + +=head2 autoflush => BOOL + +Optional. If true, the C<write> method will attempt to write data to the +operating system immediately, without waiting for the loop to indicate the +filehandle is write-ready. This is useful, for example, on streams that should +contain up-to-date logging or console information. + +It currently defaults to false for any file handle, but future versions of +C<IO::Async> may enable this by default on STDOUT and STDERR. + +=head2 read_len => INT + +Optional. Sets the buffer size for C<read> calls. Defaults to 8 KiBytes. + +=head2 read_all => BOOL + +Optional. If true, attempt to read as much data from the kernel as possible +when the handle becomes readable. By default this is turned off, meaning at +most one fixed-size buffer is read. If there is still more data in the +kernel's buffer, the handle will still be readable, and will be read from +again. + +This behaviour allows multiple streams and sockets to be multiplexed +simultaneously, meaning that a large bulk transfer on one cannot starve other +filehandles of processing time. Turning this option on may improve bulk data +transfer rate, at the risk of delaying or stalling processing on other +filehandles. + +=head2 write_len => INT + +Optional. Sets the buffer size for C<write> calls. Defaults to 8 KiBytes. + +=head2 write_all => BOOL + +Optional. Analogous to the C<read_all> option, but for writing. When +C<autoflush> is enabled, this option only affects deferred writing if the +initial attempt failed due to buffer space. + +=head2 read_high_watermark => INT + +=head2 read_low_watermark => INT + +Optional. If defined, gives a way to implement flow control or other +behaviours that depend on the size of Stream's read buffer. + +If after more data is read from the underlying filehandle the read buffer is +now larger than the high watermark, the C<on_read_high_watermark> event is +triggered (which, by default, will disable read-ready notifications and pause +reading from the filehandle). + +If after data is consumed by an C<on_read> handler the read buffer is now +smaller than the low watermark, the C<on_read_low_watermark> event is +triggered (which, by default, will re-enable read-ready notifications and +resume reading from the filehandle). For to be possible, the read handler +would have to be one added by the C<push_on_read> method or one of the +Future-returning C<read_*> methods. + +By default these options are not defined, so this behaviour will not happen. +C<read_low_watermark> may not be set to a larger value than +C<read_high_watermark>, but it may be set to a smaller value, creating a +hysteresis region. If either option is defined then both must be. + +If these options are used with the default event handlers, be careful not to +cause deadlocks by having a high watermark sufficiently low that a single +C<on_read> invocation might not consider it finished yet. + +=head2 reader => STRING|CODE + +=head2 writer => STRING|CODE + +Optional. If defined, gives the name of a method or a CODE reference to use +to implement the actual reading from or writing to the filehandle. These will +be invoked as + + $stream->reader( $read_handle, $buffer, $len ) + $stream->writer( $write_handle, $buffer, $len ) + +Each is expected to modify the passed buffer; C<reader> by appending to it, +C<writer> by removing a prefix from it. Each is expected to return a true +value on success, zero on EOF, or C<undef> with C<$!> set for errors. If not +provided, they will be substituted by implenentations using C<sysread> and +C<syswrite> on the underlying handle, respectively. + +=head2 close_on_read_eof => BOOL + +Optional. Usually true, but if set to a false value then the stream will not +be C<close>d when an EOF condition occurs on read. This is normally not useful +as at that point the underlying stream filehandle is no longer useable, but it +may be useful for reading regular files, or interacting with TTY devices. + +=head2 encoding => STRING + +If supplied, sets the name of encoding of the underlying stream. If an +encoding is set, then the C<write> method will expect to receive Unicode +strings and encodes them into bytes, and incoming bytes will be decoded into +Unicode strings for the C<on_read> event. + +If an encoding is not supplied then C<write> and C<on_read> will work in byte +strings. + +I<IMPORTANT NOTE:> in order to handle reads of UTF-8 content or other +multibyte encodings, the code implementing the C<on_read> event uses a feature +of L<Encode>; the C<STOP_AT_PARTIAL> flag. While this flag has existed for a +while and is used by the C<:encoding> PerlIO layer itself for similar +purposes, the flag is not officially documented by the C<Encode> module. In +principle this undocumented feature could be subject to change, in practice I +believe it to be reasonably stable. + +This note applies only to the C<on_read> event; data written using the +C<write> method does not rely on any undocumented features of C<Encode>. + +If a read handle is given, it is required that either an C<on_read> callback +reference is configured, or that the object provides an C<on_read> method. It +is optional whether either is true for C<on_outgoing_empty>; if neither is +supplied then no action will be taken when the writing buffer becomes empty. + +An C<on_read> handler may be supplied even if no read handle is yet given, to +be used when a read handle is eventually provided by the C<set_handles> +method. + +This condition is checked at the time the object is added to a Loop; it is +allowed to create a C<IO::Async::Stream> object with a read handle but without +a C<on_read> handler, provided that one is later given using C<configure> +before the stream is added to its containing Loop, either directly or by being +a child of another Notifier already in a Loop, or added to one. + +=cut + +sub configure +{ + my $self = shift; + my %params = @_; + + for (qw( on_read on_outgoing_empty on_read_eof on_write_eof on_read_error + on_write_error on_writeable_start on_writeable_stop autoflush + read_len read_all write_len write_all on_read_high_watermark + on_read_low_watermark reader writer close_on_read_eof )) { + $self->{$_} = delete $params{$_} if exists $params{$_}; + } + + if( exists $params{read_high_watermark} or exists $params{read_low_watermark} ) { + my $high = delete $params{read_high_watermark} // $self->{read_high_watermark}; + my $low = delete $params{read_low_watermark} // $self->{read_low_watermark}; + + croak "Cannot set read_low_watermark without read_high_watermark" if defined $low and !defined $high; + croak "Cannot set read_high_watermark without read_low_watermark" if defined $high and !defined $low; + + croak "Cannot set read_low_watermark higher than read_high_watermark" if defined $low and defined $high and $low > $high; + + $self->{read_high_watermark} = $high; + $self->{read_low_watermark} = $low; + + # TODO: reassert levels if we've moved them + } + + if( exists $params{encoding} ) { + my $encoding = delete $params{encoding}; + my $obj = find_encoding( $encoding ); + defined $obj or croak "Cannot handle an encoding of '$encoding'"; + $self->{encoding} = $obj; + } + + $self->SUPER::configure( %params ); + + if( $self->loop and $self->read_handle ) { + $self->can_event( "on_read" ) or + croak 'Expected either an on_read callback or to be able to ->on_read'; + } +} + +sub _add_to_loop +{ + my $self = shift; + + if( defined $self->read_handle ) { + $self->can_event( "on_read" ) or + croak 'Expected either an on_read callback or to be able to ->on_read'; + } + + $self->SUPER::_add_to_loop( @_ ); + + if( !$self->_is_empty ) { + $self->want_writeready_for_write( 1 ); + } +} + +=head1 METHODS + +The following methods documented with a trailing call to C<< ->get >> return +L<Future> instances. + +=cut + +=head2 $stream->want_readready_for_read( $set ) + +=head2 $stream->want_readready_for_write( $set ) + +Mutators for the C<want_readready> property on L<IO::Async::Handle>, which +control whether the C<read> or C<write> behaviour should be continued once the +filehandle becomes ready for read. + +Normally, C<want_readready_for_read> is always true (though the read watermark +behaviour can modify it), and C<want_readready_for_write> is not used. +However, if a custom C<writer> function is provided, it may find this useful +for being invoked again if it cannot proceed with a write operation until the +filehandle becomes readable (such as during transport negotiation or SSL key +management, for example). + +=cut + +sub want_readready_for_read +{ + my $self = shift; + my ( $set ) = @_; + $set ? ( $self->{want} |= WANT_READ_FOR_READ ) : ( $self->{want} &= ~WANT_READ_FOR_READ ); + + $self->want_readready( $self->{want} & WANT_ANY_READ ) if $self->read_handle; +} + +sub want_readready_for_write +{ + my $self = shift; + my ( $set ) = @_; + $set ? ( $self->{want} |= WANT_READ_FOR_WRITE ) : ( $self->{want} &= ~WANT_READ_FOR_WRITE ); + + $self->want_readready( $self->{want} & WANT_ANY_READ ) if $self->read_handle; +} + +=head2 $stream->want_writeready_for_write( $set ) + +=head2 $stream->want_writeready_for_read( $set ) + +Mutators for the C<want_writeready> property on L<IO::Async::Handle>, which +control whether the C<write> or C<read> behaviour should be continued once the +filehandle becomes ready for write. + +Normally, C<want_writeready_for_write> is managed by the C<write> method and +associated flushing, and C<want_writeready_for_read> is not used. However, if +a custom C<reader> function is provided, it may find this useful for being +invoked again if it cannot proceed with a read operation until the filehandle +becomes writable (such as during transport negotiation or SSL key management, +for example). + +=cut + +sub want_writeready_for_write +{ + my $self = shift; + my ( $set ) = @_; + $set ? ( $self->{want} |= WANT_WRITE_FOR_WRITE ) : ( $self->{want} &= ~WANT_WRITE_FOR_WRITE ); + + $self->want_writeready( $self->{want} & WANT_ANY_WRITE ) if $self->write_handle; +} + +sub want_writeready_for_read +{ + my $self = shift; + my ( $set ) = @_; + $set ? ( $self->{want} |= WANT_WRITE_FOR_READ ) : ( $self->{want} &= ~WANT_WRITE_FOR_READ ); + + $self->want_writeready( $self->{want} & WANT_ANY_WRITE ) if $self->write_handle; +} + +# FUNCTION not method +sub _nonfatal_error +{ + my ( $errno ) = @_; + + return $errno == EAGAIN || + $errno == EWOULDBLOCK || + $errno == EINTR; +} + +sub _is_empty +{ + my $self = shift; + return !@{ $self->{writequeue} }; +} + +=head2 $stream->close + +A synonym for C<close_when_empty>. This should not be used when the deferred +wait behaviour is required, as the behaviour of C<close> may change in a +future version of C<IO::Async>. Instead, call C<close_when_empty> directly. + +=cut + +sub close +{ + my $self = shift; + $self->close_when_empty; +} + +=head2 $stream->close_when_empty + +If the write buffer is empty, this method calls C<close> on the underlying IO +handles, and removes the stream from its containing loop. If the write buffer +still contains data, then this is deferred until the buffer is empty. This is +intended for "write-then-close" one-shot streams. + + $stream->write( "Here is my final data\n" ); + $stream->close_when_empty; + +Because of this deferred nature, it may not be suitable for error handling. +See instead the C<close_now> method. + +=cut + +sub close_when_empty +{ + my $self = shift; + + return $self->SUPER::close if $self->_is_empty; + + $self->{stream_closing} = 1; +} + +=head2 $stream->close_now + +This method immediately closes the underlying IO handles and removes the +stream from the containing loop. It will not wait to flush the remaining data +in the write buffer. + +=cut + +sub close_now +{ + my $self = shift; + + foreach ( @{ $self->{writequeue} } ) { + $_->on_error->( "stream closing" ) if $_->on_error; + } + + undef @{ $self->{writequeue} }; + undef $self->{stream_closing}; + + $self->SUPER::close; +} + +=head2 $eof = $stream->is_read_eof + +=head2 $eof = $stream->is_write_eof + +Returns true after an EOF condition is reported on either the read or the +write handle, respectively. + +=cut + +sub is_read_eof +{ + my $self = shift; + return $self->{read_eof}; +} + +sub is_write_eof +{ + my $self = shift; + return $self->{write_eof}; +} + +=head2 $stream->write( $data, %params ) + +This method adds data to the outgoing data queue, or writes it immediately, +according to the C<autoflush> parameter. + +If the C<autoflush> option is set, this method will try immediately to write +the data to the underlying filehandle. If this completes successfully then it +will have been written by the time this method returns. If it fails to write +completely, then the data is queued as if C<autoflush> were not set, and will +be flushed as normal. + +C<$data> can either be a plain string, a L<Future>, or a CODE reference. If it +is a plain string it is written immediately. If it is not, its value will be +used to generate more C<$data> values, eventually leading to strings to be +written. + +If C<$data> is a C<Future>, the Stream will wait until it is ready, and take +the single value it yields. + +If C<$data> is a CODE reference, it will be repeatedly invoked to generate new +values. Each time the filehandle is ready to write more data to it, the +function is invoked. Once the function has finished generating data it should +return undef. The function is passed the Stream object as its first argument. + +It is allowed that C<Future>s yield CODE references, or CODE references return +C<Future>s, as well as plain strings. + +For example, to stream the contents of an existing opened filehandle: + + open my $fileh, "<", $path or die "Cannot open $path - $!"; + + $stream->write( sub { + my ( $stream ) = @_; + + sysread $fileh, my $buffer, 8192 or return; + return $buffer; + } ); + +Takes the following optional named parameters in C<%params>: + +=over 8 + +=item write_len => INT + +Overrides the C<write_len> parameter for the data written by this call. + +=item on_write => CODE + +A CODE reference which will be invoked after every successful C<syswrite> +operation on the underlying filehandle. It will be passed the number of bytes +that were written by this call, which may not be the entire length of the +buffer - if it takes more than one C<syscall> operation to empty the buffer +then this callback will be invoked multiple times. + + $on_write->( $stream, $len ) + +=item on_flush => CODE + +A CODE reference which will be invoked once the data queued by this C<write> +call has been flushed. This will be invoked even if the buffer itself is not +yet empty; if more data has been queued since the call. + + $on_flush->( $stream ) + +=item on_error => CODE + +A CODE reference which will be invoked if a C<syswrite> error happens while +performing this write. Invoked as for the C<Stream>'s C<on_write_error> event. + + $on_error->( $stream, $errno ) + +=back + +If the object is not yet a member of a loop and doesn't yet have a +C<write_handle>, then calls to the C<write> method will simply queue the data +and return. It will be flushed when the object is added to the loop. + +If C<$data> is a defined but empty string, the write is still queued, and the +C<on_flush> continuation will be invoked, if supplied. This can be used to +obtain a marker, to invoke some code once the output queue has been flushed up +to this point. + +=head2 $stream->write( ... )->get + +If called in non-void context, this method returns a L<Future> which will +complete (with no value) when the write operation has been flushed. This may +be used as an alternative to, or combined with, the C<on_flush> callback. + +=cut + +sub _syswrite +{ + my $self = shift; + my ( $handle, undef, $len ) = @_; + + my $written = $handle->syswrite( $_[1], $len ); + return $written if !$written; # zero or undef + + substr( $_[1], 0, $written ) = ""; + return $written; +} + +sub _flush_one_write +{ + my $self = shift; + + my $writequeue = $self->{writequeue}; + + my $head; + while( $head = $writequeue->[0] and ref $head->data ) { + if( ref $head->data eq "CODE" ) { + my $data = $head->data->( $self ); + if( !defined $data ) { + $head->on_flush->( $self ) if $head->on_flush; + shift @$writequeue; + return 1; + } + if( !ref $data and my $encoding = $self->{encoding} ) { + $data = $encoding->encode( $data ); + } + unshift @$writequeue, my $new = Writer( + $data, $head->writelen, $head->on_write, undef, undef, 0 + ); + next; + } + elsif( blessed $head->data and $head->data->isa( "Future" ) ) { + my $f = $head->data; + if( !$f->is_ready ) { + return 0 if $head->watching; + $f->on_ready( sub { $self->_flush_one_write } ); + $head->watching++; + return 0; + } + my $data = $f->get; + if( !ref $data and my $encoding = $self->{encoding} ) { + $data = $encoding->encode( $data ); + } + $head->data = $data; + next; + } + else { + die "Unsure what to do with reference ".ref($head->data)." in write queue"; + } + } + + my $second; + while( $second = $writequeue->[1] and + !ref $second->data and + $head->writelen == $second->writelen and + !$head->on_write and !$second->on_write and + !$head->on_flush ) { + $head->data .= $second->data; + $head->on_write = $second->on_write; + $head->on_flush = $second->on_flush; + splice @$writequeue, 1, 1, (); + } + + die "TODO: head data does not contain a plain string" if ref $head->data; + + if( $IO::Async::Debug::DEBUG > 1 ) { + my $data = substr $head->data, 0, $head->writelen; + $self->debug_printf( "WRITE len=%d", length $data ); + IO::Async::Debug::log_hexdump( $data ) if $IO::Async::Debug::DEBUG_FLAGS{Sw}; + } + + my $writer = $self->{writer}; + my $len = $self->$writer( $self->write_handle, $head->data, $head->writelen ); + + if( !defined $len ) { + my $errno = $!; + + if( $errno == EAGAIN or $errno == EWOULDBLOCK ) { + $self->maybe_invoke_event( on_writeable_stop => ) if $self->{writeable}; + $self->{writeable} = 0; + } + + return 0 if _nonfatal_error( $errno ); + + if( $errno == EPIPE ) { + $self->{write_eof} = 1; + $self->maybe_invoke_event( on_write_eof => ); + } + + $head->on_error->( $self, $errno ) if $head->on_error; + $self->maybe_invoke_event( on_write_error => $errno ) + or $self->close_now; + + return 0; + } + + if( my $on_write = $head->on_write ) { + $on_write->( $self, $len ); + } + + if( !length $head->data ) { + $head->on_flush->( $self ) if $head->on_flush; + shift @{ $self->{writequeue} }; + } + + return 1; +} + +sub write +{ + my $self = shift; + my ( $data, %params ) = @_; + + carp "Cannot write data to a Stream that is closing" and return if $self->{stream_closing}; + + # Allow writes without a filehandle if we're not yet in a Loop, just don't + # try to flush them + my $handle = $self->write_handle; + + croak "Cannot write data to a Stream with no write_handle" if !$handle and $self->loop; + + if( !ref $data and my $encoding = $self->{encoding} ) { + $data = $encoding->encode( $data ); + } + + my $on_write = delete $params{on_write}; + my $on_flush = delete $params{on_flush}; + my $on_error = delete $params{on_error}; + + my $f; + if( defined wantarray ) { + my $orig_on_flush = $on_flush; + my $orig_on_error = $on_error; + + my $loop = $self->loop or + croak "Cannot ->write data returning a Future to a Stream not in a Loop"; + $f = $loop->new_future; + $on_flush = sub { + $f->done; + $orig_on_flush->( @_ ) if $orig_on_flush; + }; + $on_error = sub { + my $self = shift; + my ( $errno ) = @_; + + $f->fail( "write failed: $errno", syswrite => $errno ) unless $f->is_ready; + + $orig_on_error->( $self, @_ ) if $orig_on_error; + }; + } + + push @{ $self->{writequeue} }, Writer( + $data, $params{write_len} // $self->{write_len}, $on_write, $on_flush, $on_error, 0 + ); + + keys %params and croak "Unrecognised keys for ->write - " . join( ", ", keys %params ); + + return $f unless $handle; + + if( $self->{autoflush} ) { + 1 while !$self->_is_empty and $self->_flush_one_write; + + if( $self->_is_empty ) { + $self->want_writeready_for_write( 0 ); + return $f; + } + } + + $self->want_writeready_for_write( 1 ); + return $f; +} + +sub on_write_ready +{ + my $self = shift; + + if( !$self->{writeable} ) { + $self->maybe_invoke_event( on_writeable_start => ); + $self->{writeable} = 1; + } + + $self->_do_write if $self->{want} & WANT_WRITE_FOR_WRITE; + $self->_do_read if $self->{want} & WANT_WRITE_FOR_READ; +} + +sub _do_write +{ + my $self = shift; + + 1 while !$self->_is_empty and $self->_flush_one_write and $self->{write_all}; + + # All data successfully flushed + if( $self->_is_empty ) { + $self->want_writeready_for_write( 0 ); + + $self->maybe_invoke_event( on_outgoing_empty => ); + + $self->close_now if $self->{stream_closing}; + } +} + +sub _flush_one_read +{ + my $self = shift; + my ( $eof ) = @_; + + local $self->{flushing_read} = 1; + + my $readqueue = $self->{readqueue}; + + my $ret; + if( $readqueue->[0] and my $on_read = $readqueue->[0]->on_read ) { + $ret = $on_read->( $self, \$self->{readbuff}, $eof ); + } + else { + $ret = $self->invoke_event( on_read => \$self->{readbuff}, $eof ); + } + + if( defined $self->{read_low_watermark} and $self->{at_read_high_watermark} and + length $self->{readbuff} < $self->{read_low_watermark} ) { + undef $self->{at_read_high_watermark}; + $self->invoke_event( on_read_low_watermark => length $self->{readbuff} ); + } + + if( ref $ret eq "CODE" ) { + # Replace the top CODE, or add it if there was none + $readqueue->[0] = Reader( $ret, undef ); + return 1; + } + elsif( @$readqueue and !defined $ret ) { + shift @$readqueue; + return 1; + } + else { + return $ret && ( length( $self->{readbuff} ) > 0 || $eof ); + } +} + +sub _sysread +{ + my $self = shift; + my ( $handle, undef, $len ) = @_; + return $handle->sysread( $_[1], $len ); +} + +sub on_read_ready +{ + my $self = shift; + + $self->_do_read if $self->{want} & WANT_READ_FOR_READ; + $self->_do_write if $self->{want} & WANT_READ_FOR_WRITE; +} + +sub _do_read +{ + my $self = shift; + + my $handle = $self->read_handle; + my $reader = $self->{reader}; + + while(1) { + my $data; + my $len = $self->$reader( $handle, $data, $self->{read_len} ); + + if( !defined $len ) { + my $errno = $!; + + return if _nonfatal_error( $errno ); + + $self->maybe_invoke_event( on_read_error => $errno ) + or $self->close_now; + + foreach ( @{ $self->{readqueue} } ) { + $_->future->fail( "read failed: $errno", sysread => $errno ) if $_->future; + } + undef @{ $self->{readqueue} }; + + return; + } + + if( $IO::Async::Debug::DEBUG > 1 ) { + $self->debug_printf( "READ len=%d", $len ); + IO::Async::Debug::log_hexdump( $data ) if $IO::Async::Debug::DEBUG_FLAGS{Sr}; + } + + my $eof = $self->{read_eof} = ( $len == 0 ); + + if( my $encoding = $self->{encoding} ) { + my $bytes = defined $self->{bytes_remaining} ? $self->{bytes_remaining} . $data : $data; + $data = $encoding->decode( $bytes, STOP_AT_PARTIAL ); + $self->{bytes_remaining} = $bytes; + } + + $self->{readbuff} .= $data if !$eof; + + 1 while $self->_flush_one_read( $eof ); + + if( $eof ) { + $self->maybe_invoke_event( on_read_eof => ); + $self->close_now if $self->{close_on_read_eof}; + foreach ( @{ $self->{readqueue} } ) { + $_->future->done( undef ) if $_->future; + } + undef @{ $self->{readqueue} }; + return; + } + + last unless $self->{read_all}; + } + + if( defined $self->{read_high_watermark} and length $self->{readbuff} >= $self->{read_high_watermark} ) { + $self->{at_read_high_watermark} or + $self->invoke_event( on_read_high_watermark => length $self->{readbuff} ); + + $self->{at_read_high_watermark} = 1; + } +} + +sub on_read_high_watermark +{ + my $self = shift; + $self->want_readready_for_read( 0 ); +} + +sub on_read_low_watermark +{ + my $self = shift; + $self->want_readready_for_read( 1 ); +} + +=head2 $stream->push_on_read( $on_read ) + +Pushes a new temporary C<on_read> handler to the end of the queue. This queue, +if non-empty, is used to provide C<on_read> event handling code in preference +to using the object's main event handler or method. New handlers can be +supplied at any time, and they will be used in first-in first-out (FIFO) +order. + +As with the main C<on_read> event handler, each can return a (defined) boolean +to indicate if they wish to be invoked again or not, another C<CODE> reference +to replace themself with, or C<undef> to indicate it is now complete and +should be removed. When a temporary handler returns C<undef> it is shifted +from the queue and the next one, if present, is invoked instead. If there are +no more then the object's main handler is invoked instead. + +=cut + +sub push_on_read +{ + my $self = shift; + my ( $on_read, %args ) = @_; + # %args undocumented for internal use + + push @{ $self->{readqueue} }, Reader( $on_read, $args{future} ); + + # TODO: Should this always defer? + return if $self->{flushing_read}; + 1 while length $self->{readbuff} and $self->_flush_one_read( 0 ); +} + +=head1 FUTURE-RETURNING READ METHODS + +The following methods all return a L<Future> which will become ready when +enough data has been read by the Stream into its buffer. At this point, the +data is removed from the buffer and given to the C<Future> object to complete +it. + + my $f = $stream->read_... + + my ( $string ) = $f->get; + +Unlike the C<on_read> event handlers, these methods don't allow for access to +"partial" results; they only provide the final result once it is ready. + +If a C<Future> is cancelled before it completes it is removed from the read +queue without consuming any data; i.e. each C<Future> atomically either +completes or is cancelled. + +Since it is possible to use a readable C<Stream> entirely using these +C<Future>-returning methods instead of the C<on_read> event, it may be useful +to configure a trivial return-false event handler to keep it from consuming +any input, and to allow it to be added to a C<Loop> in the first place. + + my $stream = IO::Async::Stream->new( on_read => sub { 0 }, ... ); + $loop->add( $stream ); + + my $f = $stream->read_... + +If a read EOF or error condition happens while there are read C<Future>s +pending, they are all completed. In the case of a read EOF, they are done with +C<undef>; in the case of a read error they are failed using the C<$!> error +value as the failure. + + $f->fail( $message, sysread => $! ) + +If a read EOF condition happens to the currently-processing read C<Future>, it +will return a partial result. The calling code can detect this by the fact +that the returned data is not complete according to the specification (too +short in C<read_exactly>'s case, or lacking the ending pattern in +C<read_until>'s case). Additionally, each C<Future> will yield the C<$eof> +value in its results. + + my ( $string, $eof ) = $f->get; + +=cut + +sub _read_future +{ + my $self = shift; + my $f = $self->loop->new_future; + $f->on_cancel( $self->_capture_weakself( sub { + my $self = shift or return; + 1 while $self->_flush_one_read; + })); + return $f; +} + +=head2 ( $string, $eof ) = $stream->read_atmost( $len )->get + +=head2 ( $string, $eof ) = $stream->read_exactly( $len )->get + +Completes the C<Future> when the read buffer contains C<$len> or more +characters of input. C<read_atmost> will also complete after the first +invocation of C<on_read>, even if fewer characters are available, whereas +C<read_exactly> will wait until at least C<$len> are available. + +=cut + +sub read_atmost +{ + my $self = shift; + my ( $len ) = @_; + + my $f = $self->_read_future; + $self->push_on_read( sub { + my ( undef, $buffref, $eof ) = @_; + return undef if $f->is_cancelled; + $f->done( substr( $$buffref, 0, $len, "" ), $eof ); + return undef; + }, future => $f ); + return $f; +} + +sub read_exactly +{ + my $self = shift; + my ( $len ) = @_; + + my $f = $self->_read_future; + $self->push_on_read( sub { + my ( undef, $buffref, $eof ) = @_; + return undef if $f->is_cancelled; + return 0 unless $eof or length $$buffref >= $len; + $f->done( substr( $$buffref, 0, $len, "" ), $eof ); + return undef; + }, future => $f ); + return $f; +} + +=head2 ( $string, $eof ) = $stream->read_until( $end )->get + +Completes the C<Future> when the read buffer contains a match for C<$end>, +which may either be a plain string or a compiled C<Regexp> reference. Yields +the prefix of the buffer up to and including this match. + +=cut + +sub read_until +{ + my $self = shift; + my ( $until ) = @_; + + ref $until or $until = qr/\Q$until\E/; + + my $f = $self->_read_future; + $self->push_on_read( sub { + my ( undef, $buffref, $eof ) = @_; + return undef if $f->is_cancelled; + if( $$buffref =~ $until ) { + $f->done( substr( $$buffref, 0, $+[0], "" ), $eof ); + return undef; + } + elsif( $eof ) { + $f->done( $$buffref, $eof ); $$buffref = ""; + return undef; + } + else { + return 0; + } + }, future => $f ); + return $f; +} + +=head2 ( $string, $eof ) = $stream->read_until_eof->get + +Completes the C<Future> when the stream is eventually closed at EOF, and +yields all of the data that was available. + +=cut + +sub read_until_eof +{ + my $self = shift; + + my $f = $self->_read_future; + $self->push_on_read( sub { + my ( undef, $buffref, $eof ) = @_; + return undef if $f->is_cancelled; + return 0 unless $eof; + $f->done( $$buffref, $eof ); $$buffref = ""; + return undef; + }, future => $f ); + return $f; +} + +=head1 UTILITY CONSTRUCTORS + +=cut + +=head2 $stream = IO::Async::Stream->new_for_stdin + +=head2 $stream = IO::Async::Stream->new_for_stdout + +=head2 $stream = IO::Async::Stream->new_for_stdio + +Return a C<IO::Async::Stream> object preconfigured with the correct +C<read_handle>, C<write_handle> or both. + +=cut + +sub new_for_stdin { shift->new( read_handle => \*STDIN, @_ ) } +sub new_for_stdout { shift->new( write_handle => \*STDOUT, @_ ) } + +sub new_for_stdio { shift->new( read_handle => \*STDIN, write_handle => \*STDOUT, @_ ) } + +=head2 $future = $stream->connect( %args ) + +A convenient wrapper for calling the C<connect> method on the underlying +L<IO::Async::Loop> object, passing the C<socktype> hint as C<stream> if not +otherwise supplied. + +=cut + +sub connect +{ + my $self = shift; + return $self->SUPER::connect( socktype => "stream", @_ ); +} + +=head1 DEBUGGING FLAGS + +The following flags in C<IO_ASYNC_DEBUG_FLAGS> enable extra logging: + +=over 4 + +=item C<Sr> + +Log byte buffers as data is read from a Stream + +=item C<Sw> + +Log byte buffers as data is written to a Stream + +=back + +=cut + +=head1 EXAMPLES + +=head2 A line-based C<on_read> method + +The following C<on_read> method accepts incoming C<\n>-terminated lines and +prints them to the program's C<STDOUT> stream. + + sub on_read + { + my $self = shift; + my ( $buffref, $eof ) = @_; + + while( $$buffref =~ s/^(.*\n)// ) { + print "Received a line: $1"; + } + + return 0; + } + +Because a reference to the buffer itself is passed, it is simple to use a +C<s///> regular expression on the scalar it points at, to both check if data +is ready (i.e. a whole line), and to remove it from the buffer. If no data is +available then C<0> is returned, to indicate it should not be tried again. If +a line was successfully extracted, then C<1> is returned, to indicate it +should try again in case more lines exist in the buffer. + +=head2 Reading binary data + +This C<on_read> method accepts incoming records in 16-byte chunks, printing +each one. + + sub on_read + { + my ( $self, $buffref, $eof ) = @_; + + if( length $$buffref >= 16 ) { + my $record = substr( $$buffref, 0, 16, "" ); + print "Received a 16-byte record: $record\n"; + + return 1; + } + + if( $eof and length $$buffref ) { + print "EOF: a partial record still exists\n"; + } + + return 0; + } + +The 4-argument form of C<substr()> extracts the 16-byte record from the buffer +and assigns it to the C<$record> variable, if there was enough data in the +buffer to extract it. + +A lot of protocols use a fixed-size header, followed by a variable-sized body +of data, whose size is given by one of the fields of the header. The following +C<on_read> method extracts messages in such a protocol. + + sub on_read + { + my ( $self, $buffref, $eof ) = @_; + + return 0 unless length $$buffref >= 8; # "N n n" consumes 8 bytes + + my ( $len, $x, $y ) = unpack "N n n", $$buffref; + + return 0 unless length $$buffref >= 8 + $len; + + substr( $$buffref, 0, 8, "" ); + my $data = substr( $$buffref, 0, $len, "" ); + + print "A record with values x=$x y=$y\n"; + + return 1; + } + +In this example, the header is C<unpack()>ed first, to extract the body +length, and then the body is extracted. If the buffer does not have enough +data yet for a complete message then C<0> is returned, and the buffer is left +unmodified for next time. Only when there are enough bytes in total does it +use C<substr()> to remove them. + +=head2 Dynamic replacement of C<on_read> + +Consider the following protocol (inspired by IMAP), which consists of +C<\n>-terminated lines that may have an optional data block attached. The +presence of such a data block, as well as its size, is indicated by the line +prefix. + + sub on_read + { + my $self = shift; + my ( $buffref, $eof ) = @_; + + if( $$buffref =~ s/^DATA (\d+):(.*)\n// ) { + my $length = $1; + my $line = $2; + + return sub { + my $self = shift; + my ( $buffref, $eof ) = @_; + + return 0 unless length $$buffref >= $length; + + # Take and remove the data from the buffer + my $data = substr( $$buffref, 0, $length, "" ); + + print "Received a line $line with some data ($data)\n"; + + return undef; # Restore the original method + } + } + elsif( $$buffref =~ s/^LINE:(.*)\n// ) { + my $line = $1; + + print "Received a line $line with no data\n"; + + return 1; + } + else { + print STDERR "Unrecognised input\n"; + # Handle it somehow + } + } + +In the case where trailing data is supplied, a new temporary C<on_read> +callback is provided in a closure. This closure captures the C<$length> +variable so it knows how much data to expect. It also captures the C<$line> +variable so it can use it in the event report. When this method has finished +reading the data, it reports the event, then restores the original method by +returning C<undef>. + +=head1 SEE ALSO + +=over 4 + +=item * + +L<IO::Handle> - Supply object methods for I/O handles + +=back + +=head1 AUTHOR + +Paul Evans <leonerd@leonerd.org.uk> + +=cut + +0x55AA; |