From 1425eea04dd872dc6313f5315f317b2de288037c Mon Sep 17 00:00:00 2001 From: Lorry Tar Creator Date: Mon, 1 Jun 2015 14:15:30 +0000 Subject: IO-Async-0.67 --- lib/IO/Async/Loop.pm | 2781 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 2781 insertions(+) create mode 100644 lib/IO/Async/Loop.pm (limited to 'lib/IO/Async/Loop.pm') diff --git a/lib/IO/Async/Loop.pm b/lib/IO/Async/Loop.pm new file mode 100644 index 0000000..e510c64 --- /dev/null +++ b/lib/IO/Async/Loop.pm @@ -0,0 +1,2781 @@ +# You may distribute under the terms of either the GNU General Public License +# or the Artistic License (the same terms as Perl itself) +# +# (C) Paul Evans, 2007-2015 -- leonerd@leonerd.org.uk + +package IO::Async::Loop; + +use strict; +use warnings; +use 5.010; + +our $VERSION = '0.67'; + +# When editing this value don't forget to update the docs below +use constant NEED_API_VERSION => '0.33'; + +# Base value but some classes might override +use constant _CAN_ON_HANGUP => 0; + +# Most Loop implementations do not accurately handle sub-second timers. +# This only matters for unit tests +use constant _CAN_SUBSECOND_ACCURATELY => 0; + +# Does the loop implementation support IO_ASYNC_WATCHDOG? +use constant _CAN_WATCHDOG => 0; + +# Watchdog configuration constants +use constant WATCHDOG_ENABLE => $ENV{IO_ASYNC_WATCHDOG}; +use constant WATCHDOG_INTERVAL => $ENV{IO_ASYNC_WATCHDOG_INTERVAL} || 10; +use constant WATCHDOG_SIGABRT => $ENV{IO_ASYNC_WATCHDOG_SIGABRT}; + +use Carp; + +use IO::Socket (); # empty import +use Time::HiRes qw(); # empty import +use POSIX qw( WNOHANG ); +use Scalar::Util qw( refaddr weaken ); +use Socket qw( SO_REUSEADDR AF_INET6 IPPROTO_IPV6 IPV6_V6ONLY ); + +use IO::Async::OS; + +use constant HAVE_SIGNALS => IO::Async::OS->HAVE_SIGNALS; +use constant HAVE_POSIX_FORK => IO::Async::OS->HAVE_POSIX_FORK; +use constant HAVE_THREADS => IO::Async::OS->HAVE_THREADS; + +# Never sleep for more than 1 second if a signal proxy is registered, to avoid +# a borderline race condition. +# There is a race condition in perl involving signals interacting with XS code +# that implements blocking syscalls. There is a slight chance a signal will +# arrive in the XS function, before the blocking itself. Perl will not run our +# (safe) deferred signal handler in this case. To mitigate this, if we have a +# signal proxy, we'll adjust the maximal timeout. The signal handler will be +# run when the XS function returns. +our $MAX_SIGWAIT_TIME = 1; + +# Also, never sleep for more than 1 second if the OS does not support signals +# and we have child watches registered (so we must use waitpid() polling) +our $MAX_CHILDWAIT_TIME = 1; + +# Maybe our calling program will have a suggested hint of a specific Loop +# class or list of classes to use +our $LOOP; + +# Undocumented; used only by the test scripts. +# Setting this value true will avoid the IO::Async::Loop::$^O candidate in the +# magic constructor +our $LOOP_NO_OS; + +# SIGALRM handler for watchdog +$SIG{ALRM} = sub { + # There are two extra frames here; this one and the signal handler itself + local $Carp::CarpLevel = $Carp::CarpLevel + 2; + if( WATCHDOG_SIGABRT ) { + print STDERR Carp::longmess( "Watchdog timeout" ); + kill ABRT => $$; + } + else { + Carp::confess( "Watchdog timeout" ); + } +} if WATCHDOG_ENABLE; + +$SIG{PIPE} = "IGNORE" if ( $SIG{PIPE} // "" ) eq "DEFAULT"; + +=head1 NAME + +C - core loop of the C framework + +=head1 SYNOPSIS + + use IO::Async::Stream; + use IO::Async::Timer::Countdown; + + use IO::Async::Loop; + + my $loop = IO::Async::Loop->new; + + $loop->add( IO::Async::Timer::Countdown->new( + delay => 10, + on_expire => sub { print "10 seconds have passed\n" }, + )->start ); + + $loop->add( IO::Async::Stream->new_for_stdin( + on_read => sub { + my ( $self, $buffref, $eof ) = @_; + + while( $$buffref =~ s/^(.*)\n// ) { + print "You typed a line $1\n"; + } + + return 0; + }, + ) ); + + $loop->run; + +=head1 DESCRIPTION + +This module provides an abstract class which implements the core loop of the +C framework. Its primary purpose is to store a set of +L objects or subclasses of them. It handles all of the +lower-level set manipulation actions, and leaves the actual IO readiness +testing/notification to the concrete class that implements it. It also +provides other functionality such as signal handling, child process managing, +and timers. + +See also the two bundled Loop subclasses: + +=over 4 + +=item L + +=item L + +=back + +Or other subclasses that may appear on CPAN which are not part of the core +C distribution. + +=head2 Ignoring SIGPIPE + +Since version I<0.66> loading this module automatically ignores C, as +it is highly unlikely that the default-terminate action is the best course of +action for an C-based program to take. If at load time the handler +disposition is still set as C, it is set to ignore. If already +another handler has been placed there by the program code, it will be left +undisturbed. + +=cut + +# Internal constructor used by subclasses +sub __new +{ + my $class = shift; + + # Detect if the API version provided by the subclass is sufficient + $class->can( "API_VERSION" ) or + die "$class is too old for IO::Async $VERSION; it does not provide \->API_VERSION\n"; + + $class->API_VERSION >= NEED_API_VERSION or + die "$class is too old for IO::Async $VERSION; we need API version >= ".NEED_API_VERSION.", it provides ".$class->API_VERSION."\n"; + + WATCHDOG_ENABLE and !$class->_CAN_WATCHDOG and + warn "$class cannot implement IO_ASYNC_WATCHDOG\n"; + + my $self = bless { + notifiers => {}, # {nkey} = notifier + iowatches => {}, # {fd} = [ $on_read_ready, $on_write_ready, $on_hangup ] + sigattaches => {}, # {sig} => \@callbacks + childmanager => undef, + childwatches => {}, # {pid} => $code + threadwatches => {}, # {tid} => $code + timequeue => undef, + deferrals => [], + os => {}, # A generic scratchpad for IO::Async::OS to store whatever it wants + }, $class; + + # It's possible this is a specific subclass constructor. We still want the + # magic IO::Async::Loop->new constructor to yield this if it's the first + # one + our $ONE_TRUE_LOOP ||= $self; + + # Legacy support - temporary until all CPAN classes are updated; bump NEEDAPI version at that point + my $old_timer = $self->can( "enqueue_timer" ) != \&enqueue_timer; + if( $old_timer != ( $self->can( "cancel_timer" ) != \&cancel_timer ) ) { + die "$class should overload both ->enqueue_timer and ->cancel_timer, or neither"; + } + + if( $old_timer ) { + warnings::warnif( deprecated => "Enabling old_timer workaround for old loop class " . $class ); + } + + $self->{old_timer} = $old_timer; + + return $self; +} + +=head1 MAGIC CONSTRUCTOR + +=head2 $loop = IO::Async::Loop->new + +This function attempts to find a good subclass to use, then calls its +constructor. It works by making a list of likely candidate classes, then +trying each one in turn, Cing the module then calling its C +method. If either of these operations fails, the next subclass is tried. If +no class was successful, then an exception is thrown. + +The constructed object is cached, and will be returned again by a subsequent +call. The cache will also be set by a constructor on a specific subclass. This +behaviour makes it possible to simply use the normal constructor in a module +that wishes to interract with the main program's Loop, such as an integration +module for another event system. + +For example, the following two C<$loop> variables will refer to the same +object: + + use IO::Async::Loop; + use IO::Async::Loop::Poll; + + my $loop_poll = IO::Async::Loop::Poll->new; + + my $loop = IO::Async::Loop->new; + +While it is not advised to do so under normal circumstances, if the program +really wishes to construct more than one Loop object, it can call the +constructor C, or invoke one of the subclass-specific constructors +directly. + +The list of candidates is formed from the following choices, in this order: + +=over 4 + +=item * $ENV{IO_ASYNC_LOOP} + +If this environment variable is set, it should contain a comma-separated list +of subclass names. These names may or may not be fully-qualified; if a name +does not contain C<::> then it will have C prepended to it. +This allows the end-user to specify a particular choice to fit the needs of +his use of a program using C. + +=item * $IO::Async::Loop::LOOP + +If this scalar is set, it should contain a comma-separated list of subclass +names. These may or may not be fully-qualified, as with the above case. This +allows a program author to suggest a loop module to use. + +In cases where the module subclass is a hard requirement, such as GTK programs +using C, it would be better to use the module specifically and invoke +its constructor directly. + +=item * IO::Async::OS->LOOP_PREFER_CLASSES + +The L hints module for the given OS is then consulted to see if +it suggests any other module classes specific to the given operating system. + +=item * $^O + +The module called C is tried next. This allows specific +OSes, such as the ever-tricky C, to provide an implementation that +might be more efficient than the generic ones, or even work at all. + +This option is now discouraged in favour of the C hint instead. +At some future point it may be removed entirely, given as currently only +C uses it. + +=item * Poll and Select + +Finally, if no other choice has been made by now, the built-in C module +is chosen. This should always work, but in case it doesn't, the C, C or equivalent with a zero-second +timeout, and process any currently-pending IO conditions before the code is +invoked, but it will not block for a non-zero amount of time. + +This method is implemented using the C method, with the C +parameter set to C. It will return an ID value that can be passed to +C if required. + +=cut + +sub later +{ + my $self = shift; + my ( $code ) = @_; + + return $self->watch_idle( when => 'later', code => $code ); +} + +=head2 $loop->spawn_child( %params ) + +This method creates a new child process to run a given code block or command. +For more detail, see the C method on the +L class. + +=cut + +sub spawn_child +{ + my $self = shift; + my %params = @_; + + my $childmanager = $self->{childmanager} ||= + $self->__new_feature( "IO::Async::ChildManager" ); + + $childmanager->spawn_child( %params ); +} + +=head2 $pid = $loop->open_child( %params ) + +This creates a new child process to run the given code block or command, and +attaches filehandles to it that the parent will watch. This method is a light +wrapper around constructing a new L object, provided +largely for backward compatibility. New code ought to construct such an object +directly, as it may provide more features than are available here. + +The C<%params> hash takes the following keys: + +=over 8 + +=item command => ARRAY or STRING + +=item code => CODE + +The command or code to run in the child process (as per the C method) + +=item on_finish => CODE + +A continuation to be called when the child process exits and has closed all of +the filehandles that were set up for it. It will be invoked in the following +way: + + $on_finish->( $pid, $exitcode ) + +The second argument is passed the plain perl C<$?> value. + +=item on_error => CODE + +Optional continuation to be called when the child code block throws an +exception, or the command could not be Ced. It will be invoked in the +following way (as per C) + + $on_error->( $pid, $exitcode, $dollarbang, $dollarat ) + +If this continuation is not supplied, then C is used instead. The +value of C<$!> and C<$@> will not be reported. + +=item setup => ARRAY + +Optional reference to an array to pass to the underlying C method. + +=back + +In addition, the hash takes keys that define how to set up file descriptors in +the child process. (If the C array is also given, these operations will +be performed after those specified by C.) + +=over 8 + +=item fdI => HASH + +A hash describing how to set up file descriptor I. The hash may contain one +of the following sets of keys: + +=over 4 + +=item on_read => CODE + +The child will be given the writing end of a pipe. The reading end will be +wrapped by an C using this C callback function. + +=item from => STRING + +The child will be given the reading end of a pipe. The string given by the +C parameter will be written to the child. When all of the data has been +written the pipe will be closed. + +=back + +=item stdin => ... + +=item stdout => ... + +=item stderr => ... + +Shortcuts for C, C and C respectively. + +=back + +=cut + +sub open_child +{ + my $self = shift; + my %params = @_; + + my $on_finish = delete $params{on_finish}; + ref $on_finish or croak "Expected 'on_finish' to be a reference"; + $params{on_finish} = sub { + my ( $process, $exitcode ) = @_; + $on_finish->( $process->pid, $exitcode ); + }; + + if( my $on_error = delete $params{on_error} ) { + ref $on_error or croak "Expected 'on_error' to be a reference"; + + $params{on_exception} = sub { + my ( $process, $exception, $errno, $exitcode ) = @_; + # Swap order + $on_error->( $process->pid, $exitcode, $errno, $exception ); + }; + } + + $params{on_exit} and croak "Cannot pass 'on_exit' parameter through ChildManager->open"; + + require IO::Async::Process; + my $process = IO::Async::Process->new( %params ); + + $self->add( $process ); + + return $process->pid; +} + +=head2 $pid = $loop->run_child( %params ) + +This creates a new child process to run the given code block or command, +capturing its STDOUT and STDERR streams. When the process exits, a +continuation is invoked being passed the exitcode, and content of the streams. + +=over 8 + +=item command => ARRAY or STRING + +=item code => CODE + +The command or code to run in the child process (as per the C +method) + +=item on_finish => CODE + +A continuation to be called when the child process exits and closed its STDOUT +and STDERR streams. It will be invoked in the following way: + + $on_finish->( $pid, $exitcode, $stdout, $stderr ) + +The second argument is passed the plain perl C<$?> value. + +=item stdin => STRING + +Optional. String to pass in to the child process's STDIN stream. + +=item setup => ARRAY + +Optional reference to an array to pass to the underlying C method. + +=back + +This method is intended mainly as an IO::Async-compatible replacement for the +perl C function (`backticks`), allowing it to replace + + my $output = `command here`; + +with + + $loop->run_child( + command => "command here", + on_finish => sub { + my ( undef, $exitcode, $output ) = @_; + ... + } + ); + +=cut + +sub run_child +{ + my $self = shift; + my %params = @_; + + my $on_finish = delete $params{on_finish}; + ref $on_finish or croak "Expected 'on_finish' to be a reference"; + + my $stdout; + my $stderr; + + my %subparams; + + if( my $child_stdin = delete $params{stdin} ) { + ref $child_stdin and croak "Expected 'stdin' not to be a reference"; + $subparams{stdin} = { from => $child_stdin }; + } + + $subparams{code} = delete $params{code}; + $subparams{command} = delete $params{command}; + $subparams{setup} = delete $params{setup}; + + croak "Unrecognised parameters " . join( ", ", keys %params ) if keys %params; + + require IO::Async::Process; + my $process = IO::Async::Process->new( + %subparams, + stdout => { into => \$stdout }, + stderr => { into => \$stderr }, + + on_finish => sub { + my ( $process, $exitcode ) = @_; + $on_finish->( $process->pid, $exitcode, $stdout, $stderr ); + }, + ); + + $self->add( $process ); + + return $process->pid; +} + +=head2 $loop->resolver + +Returns the internally-stored L object, used for name +resolution operations by the C, C and C methods. + +=cut + +sub resolver +{ + my $self = shift; + + return $self->{resolver} ||= do { + require IO::Async::Resolver; + my $resolver = IO::Async::Resolver->new; + $self->add( $resolver ); + $resolver; + } +} + +=head2 $loop->set_resolver( $resolver ) + +Sets the internally-stored L object. In most cases this +method should not be required, but it may be used to provide an alternative +resolver for special use-cases. + +=cut + +sub set_resolver +{ + my $self = shift; + my ( $resolver ) = @_; + + $resolver->can( $_ ) or croak "Resolver is unsuitable as it does not implement $_" + for qw( resolve getaddrinfo getnameinfo ); + + $self->{resolver} = $resolver; + + $self->add( $resolver ); +} + +=head2 @result = $loop->resolve( %params )->get + +This method performs a single name resolution operation. It uses an +internally-stored C object. For more detail, see the +C method on the L class. + +=cut + +sub resolve +{ + my $self = shift; + my ( %params ) = @_; + + $self->resolver->resolve( %params ); +} + +=head2 $handle|$socket = $loop->connect( %params )->get + +This method performs a non-blocking connection to a given address or set of +addresses, returning a L which represents the operation. On +completion, the future will yield the connected socket handle, or the given +L object. + +There are two modes of operation. Firstly, a list of addresses can be provided +which will be tried in turn. Alternatively as a convenience, if a host and +service name are provided instead of a list of addresses, these will be +resolved using the underlying loop's C method into the list of +addresses. + +When attempting to connect to any among a list of addresses, there may be +failures among the first attempts, before a valid connection is made. For +example, the resolver may have returned some IPv6 addresses, but only IPv4 +routes are valid on the system. In this case, the first C syscall +will fail. This isn't yet a fatal error, if there are more addresses to try, +perhaps some IPv4 ones. + +For this reason, it is possible that the operation eventually succeeds even +though some system calls initially fail. To be aware of individual failures, +the optional C callback can be used. This will be invoked on each +individual C or C failure, which may be useful for +debugging or logging. + +Because this module simply uses the C resolver, it will be fully +IPv6-aware if the underlying platform's resolver is. This allows programs to +be fully IPv6-capable. + +In plain address mode, the C<%params> hash takes the following keys: + +=over 8 + +=item addrs => ARRAY + +Reference to an array of (possibly-multiple) address structures to attempt to +connect to. Each should be in the layout described for C. Such a layout +is returned by the C named resolver. + +=item addr => HASH or ARRAY + +Shortcut for passing a single address to connect to; it may be passed directly +with this key, instead of in another array on its own. This should be in a +format recognised by L's C method. + +This example shows how to use the C functions to construct one for TCP +port 8001 on address 10.0.0.1: + + $loop->connect( + addr => { + family => "inet", + socktype => "stream", + port => 8001, + ip => "10.0.0.1", + }, + ... + ); + +This example shows another way to connect to a UNIX socket at F. + + $loop->connect( + addr => { + family => "unix", + socktype => "stream", + path => "echo.sock", + }, + ... + ); + +=item local_addrs => ARRAY + +=item local_addr => HASH or ARRAY + +Optional. Similar to the C or C parameters, these specify a local +address or set of addresses to C the socket to before +Cing it. + +=back + +When performing the resolution step too, the C or C keys are +ignored, and instead the following keys are taken: + +=over 8 + +=item host => STRING + +=item service => STRING + +The hostname and service name to connect to. + +=item local_host => STRING + +=item local_service => STRING + +Optional. The hostname and/or service name to C the socket to locally +before connecting to the peer. + +=item family => INT + +=item socktype => INT + +=item protocol => INT + +=item flags => INT + +Optional. Other arguments to pass along with C and C to the +C call. + +=item socktype => STRING + +Optionally may instead be one of the values C<'stream'>, C<'dgram'> or +C<'raw'> to stand for C, C or C. This +utility is provided to allow the caller to avoid a separate C only +for importing these constants. + +=back + +It is necessary to pass the C hint to the resolver when resolving +the host/service names into an address, as some OS's C functions +require this hint. A warning is emitted if neither C nor C +hint is defined when performing a C lookup. To avoid this warning +while still specifying no particular C hint (perhaps to invoke some +OS-specific behaviour), pass C<0> as the C value. + +In either case, it also accepts the following arguments: + +=over 8 + +=item handle => IO::Async::Handle + +Optional. If given a L object or a subclass (such as +L or L its handle will be set to the +newly-connected socket on success, and that handle used as the result of the +future instead. + +=item on_fail => CODE + +Optional. After an individual C or C syscall has failed, +this callback is invoked to inform of the error. It is passed the name of the +syscall that failed, the arguments that were passed to it, and the error it +generated. I.e. + + $on_fail->( "socket", $family, $socktype, $protocol, $! ); + + $on_fail->( "bind", $sock, $address, $! ); + + $on_fail->( "connect", $sock, $address, $! ); + +Because of the "try all" nature when given a list of multiple addresses, this +callback may be invoked multiple times, even before an eventual success. + +=back + +This method accepts an C parameter; see the C section +below. + +=head2 $loop->connect( %params ) + +When not returning a future, additional parameters can be given containing the +continuations to invoke on success or failure. + +=over 8 + +=item on_connected => CODE + +A continuation that is invoked on a successful C call to a valid +socket. It will be passed the connected socket handle, as an C +object. + + $on_connected->( $handle ) + +=item on_stream => CODE + +An alternative to C, a continuation that is passed an instance +of L when the socket is connected. This is provided as a +convenience for the common case that a Stream object is required as the +transport for a Protocol object. + + $on_stream->( $stream ) + +=item on_socket => CODE + +Similar to C, but constructs an instance of L. +This is most useful for C or C sockets. + + $on_socket->( $socket ) + +=item on_connect_error => CODE + +A continuation that is invoked after all of the addresses have been tried, and +none of them succeeded. It will be passed the most significant error that +occurred, and the name of the operation it occurred in. Errors from the +C syscall are considered most significant, then C, then +finally C. + + $on_connect_error->( $syscall, $! ) + +=item on_resolve_error => CODE + +A continuation that is invoked when the name resolution attempt fails. This is +invoked in the same way as the C continuation for the C +method. + +=back + +=cut + +sub connect +{ + my $self = shift; + my ( %params ) = @_; + + my $extensions; + if( $extensions = delete $params{extensions} and @$extensions ) { + my ( $ext, @others ) = @$extensions; + + my $method = "${ext}_connect"; + # TODO: Try to 'require IO::Async::$ext' + + $self->can( $method ) or croak "Extension method '$method' is not available"; + + return $self->$method( + %params, + ( @others ? ( extensions => \@others ) : () ), + ); + } + + my $handle = $params{handle}; + + my $on_done; + # Legacy callbacks + if( my $on_connected = delete $params{on_connected} ) { + $on_done = $on_connected; + } + elsif( my $on_stream = delete $params{on_stream} ) { + defined $handle and croak "Cannot pass 'on_stream' with a handle object as well"; + + require IO::Async::Stream; + # TODO: It doesn't make sense to put a SOCK_DGRAM in an + # IO::Async::Stream but currently we don't detect this + $handle = IO::Async::Stream->new; + $on_done = $on_stream; + } + elsif( my $on_socket = delete $params{on_socket} ) { + defined $handle and croak "Cannot pass 'on_socket' with a handle object as well"; + + require IO::Async::Socket; + $handle = IO::Async::Socket->new; + $on_done = $on_socket; + } + elsif( !defined wantarray ) { + croak "Expected 'on_connected' or 'on_stream' callback or to return a Future"; + } + + my $on_connect_error; + if( $on_connect_error = $params{on_connect_error} ) { + # OK + } + elsif( !defined wantarray ) { + croak "Expected 'on_connect_error' callback"; + } + + my $on_resolve_error; + if( $on_resolve_error = $params{on_resolve_error} ) { + # OK + } + elsif( !defined wantarray and exists $params{host} || exists $params{local_host} ) { + croak "Expected 'on_resolve_error' callback or to return a Future"; + } + + my $connector = $self->{connector} ||= $self->__new_feature( "IO::Async::Internals::Connector" ); + + my $future = $connector->connect( %params ); + + $future = $future->then( sub { + $handle->set_handle( shift ); + return Future->done( $handle ) + }) if $handle; + + $future->on_done( $on_done ) if $on_done; + $future->on_fail( sub { + $on_connect_error->( @_[2,3] ) if $on_connect_error and $_[1] eq "connect"; + $on_resolve_error->( $_[2] ) if $on_resolve_error and $_[1] eq "resolve"; + } ); + + return $future if defined wantarray; + + # Caller is not going to keep hold of the Future, so we have to ensure it + # stays alive somehow + $future->on_ready( sub { undef $future } ); # intentional cycle +} + +=head2 $listener = $loop->listen( %params )->get + +This method sets up a listening socket and arranges for an acceptor callback +to be invoked each time a new connection is accepted on the socket. Internally +it creates an instance of L and adds it to the Loop if +not given one in the arguments. + +Addresses may be given directly, or they may be looked up using the system's +name resolver, or a socket handle may be given directly. + +If multiple addresses are given, or resolved from the service and hostname, +then each will be attempted in turn until one succeeds. + +In named resolver mode, the C<%params> hash takes the following keys: + +=over 8 + +=item service => STRING + +The service name to listen on. + +=item host => STRING + +The hostname to listen on. Optional. Will listen on all addresses if not +supplied. + +=item family => INT + +=item socktype => INT + +=item protocol => INT + +=item flags => INT + +Optional. Other arguments to pass along with C and C to the +C call. + +=item socktype => STRING + +Optionally may instead be one of the values C<'stream'>, C<'dgram'> or +C<'raw'> to stand for C, C or C. This +utility is provided to allow the caller to avoid a separate C only +for importing these constants. + +=back + +It is necessary to pass the C hint to the resolver when resolving +the host/service names into an address, as some OS's C functions +require this hint. A warning is emitted if neither C nor C +hint is defined when performing a C lookup. To avoid this warning +while still specifying no particular C hint (perhaps to invoke some +OS-specific behaviour), pass C<0> as the C value. + +In plain address mode, the C<%params> hash takes the following keys: + +=over 8 + +=item addrs => ARRAY + +Reference to an array of (possibly-multiple) address structures to attempt to +listen on. Each should be in the layout described for C. Such a layout +is returned by the C named resolver. + +=item addr => ARRAY + +Shortcut for passing a single address to listen on; it may be passed directly +with this key, instead of in another array of its own. This should be in a +format recognised by L's C method. See also +the C section. + +=back + +In direct socket handle mode, the following keys are taken: + +=over 8 + +=item handle => IO + +The listening socket handle. + +=back + +In either case, the following keys are also taken: + +=over 8 + +=item on_fail => CODE + +Optional. A callback that is invoked if a syscall fails while attempting to +create a listening sockets. It is passed the name of the syscall that failed, +the arguments that were passed to it, and the error generated. I.e. + + $on_fail->( "socket", $family, $socktype, $protocol, $! ); + + $on_fail->( "sockopt", $sock, $optname, $optval, $! ); + + $on_fail->( "bind", $sock, $address, $! ); + + $on_fail->( "listen", $sock, $queuesize, $! ); + +=item queuesize => INT + +Optional. The queue size to pass to the C calls. If not supplied, +then 3 will be given instead. + +=item reuseaddr => BOOL + +Optional. If true or not supplied then the C socket option will +be set. To prevent this, pass a false value such as 0. + +=item v6only => BOOL + +Optional. If defined, sets or clears the C socket option on +C sockets. This option disables the ability of C socket to +accept connections from C addresses. Not all operating systems allow +this option to be disabled. + +=back + +An alternative which gives more control over the listener, is to create the +C object directly and add it explicitly to the Loop. + +This method accepts an C parameter; see the C section +below. + +=head2 $loop->listen( %params ) + +When not returning a future, additional parameters can be given containing the +continuations to invoke on success or failure. + +=over 8 + +=item on_notifier => CODE + +Optional. A callback that is invoked when the Listener object is ready to +receive connections. The callback is passed the Listener object itself. + + $on_notifier->( $listener ) + +If this callback is required, it may instead be better to construct the +Listener object directly. + +=item on_listen => CODE + +Optional. A callback that is invoked when the listening socket is ready. +Typically this would be used in the name resolver case, in order to inspect +the socket's sockname address, or otherwise inspect the filehandle. + + $on_listen->( $socket ) + +=item on_listen_error => CODE + +A continuation this is invoked after all of the addresses have been tried, and +none of them succeeded. It will be passed the most significant error that +occurred, and the name of the operation it occurred in. Errors from the +C syscall are considered most significant, then C, then +C, then finally C. + +=item on_resolve_error => CODE + +A continuation that is invoked when the name resolution attempt fails. This is +invoked in the same way as the C continuation for the C +method. + +=back + +=cut + +sub listen +{ + my $self = shift; + my ( %params ) = @_; + + my $remove_on_error; + my $listener = $params{listener} ||= do { + $remove_on_error++; + + require IO::Async::Listener; + + # Our wrappings of these don't want $listener + my %listenerparams; + for (qw( on_accept on_stream on_socket )) { + next unless exists $params{$_}; + croak "Cannot ->listen with '$_' and 'listener'" if $params{listener}; + + my $code = delete $params{$_}; + $listenerparams{$_} = sub { + shift; + goto &$code; + }; + } + + my $listener = IO::Async::Listener->new( %listenerparams ); + $self->add( $listener ); + $listener + }; + + my $extensions; + if( $extensions = delete $params{extensions} and @$extensions ) { + my ( $ext, @others ) = @$extensions; + + # We happen to know we break older IO::Async::SSL + if( $ext eq "SSL" and $IO::Async::SSL::VERSION < '0.12001' ) { + croak "IO::Async::SSL version too old; need at least 0.12_001; found $IO::Async::SSL::VERSION"; + } + + my $method = "${ext}_listen"; + # TODO: Try to 'require IO::Async::$ext' + + $self->can( $method ) or croak "Extension method '$method' is not available"; + + my $f = $self->$method( + %params, + ( @others ? ( extensions => \@others ) : () ), + ); + $f->on_fail( sub { $self->remove( $listener ) } ) if $remove_on_error; + + return $f; + } + + my $on_notifier = delete $params{on_notifier}; # optional + + my $on_listen_error = delete $params{on_listen_error}; + my $on_resolve_error = delete $params{on_resolve_error}; + + # Shortcut + if( $params{addr} and not $params{addrs} ) { + $params{addrs} = [ delete $params{addr} ]; + } + + my $f; + if( my $handle = delete $params{handle} ) { + $f = $self->_listen_handle( $listener, $handle, %params ); + } + elsif( my $addrs = delete $params{addrs} ) { + $on_listen_error or defined wantarray or + croak "Expected 'on_listen_error' or to return a Future"; + $f = $self->_listen_addrs( $listener, $addrs, %params ); + } + elsif( defined $params{service} ) { + $on_listen_error or defined wantarray or + croak "Expected 'on_listen_error' or to return a Future"; + $on_resolve_error or defined wantarray or + croak "Expected 'on_resolve_error' or to return a Future"; + $f = $self->_listen_hostservice( $listener, delete $params{host}, delete $params{service}, %params ); + } + else { + croak "Expected either 'service' or 'addrs' or 'addr' arguments"; + } + + $f->on_done( $on_notifier ) if $on_notifier; + if( my $on_listen = $params{on_listen} ) { + $f->on_done( sub { $on_listen->( shift->read_handle ) } ); + } + $f->on_fail( sub { + my ( $message, $how, @rest ) = @_; + $on_listen_error->( @rest ) if $on_listen_error and $how eq "listen"; + $on_resolve_error->( @rest ) if $on_resolve_error and $how eq "resolve"; + }); + $f->on_fail( sub { $self->remove( $listener ) } ) if $remove_on_error; + + return $f if defined wantarray; + + # Caller is not going to keep hold of the Future, so we have to ensure it + # stays alive somehow + $f->on_ready( sub { undef $f } ); # intentional cycle +} + +sub _listen_handle +{ + my $self = shift; + my ( $listener, $handle, %params ) = @_; + + $listener->configure( handle => $handle ); + return $self->new_future->done( $listener ); +} + +sub _listen_addrs +{ + my $self = shift; + my ( $listener, $addrs, %params ) = @_; + + my $queuesize = $params{queuesize} || 3; + + my $on_fail = $params{on_fail}; + !defined $on_fail or ref $on_fail or croak "Expected 'on_fail' to be a reference"; + + my $reuseaddr = 1; + $reuseaddr = 0 if defined $params{reuseaddr} and not $params{reuseaddr}; + + my $v6only = $params{v6only}; + + my ( $listenerr, $binderr, $sockopterr, $socketerr ); + + foreach my $addr ( @$addrs ) { + my ( $family, $socktype, $proto, $address ) = IO::Async::OS->extract_addrinfo( $addr ); + + my $sock; + + unless( $sock = IO::Async::OS->socket( $family, $socktype, $proto ) ) { + $socketerr = $!; + $on_fail->( socket => $family, $socktype, $proto, $! ) if $on_fail; + next; + } + + if( $reuseaddr ) { + unless( $sock->sockopt( SO_REUSEADDR, 1 ) ) { + $sockopterr = $!; + $on_fail->( sockopt => $sock, SO_REUSEADDR, 1, $! ) if $on_fail; + next; + } + } + + if( defined $v6only and $family == AF_INET6 ) { + unless( $sock->setsockopt( IPPROTO_IPV6, IPV6_V6ONLY, $v6only ) ) { + $sockopterr = $!; + $on_fail->( sockopt => $sock, IPV6_V6ONLY, $v6only, $! ) if $on_fail; + next; + } + } + + unless( $sock->bind( $address ) ) { + $binderr = $!; + $on_fail->( bind => $sock, $address, $! ) if $on_fail; + next; + } + + unless( $sock->listen( $queuesize ) ) { + $listenerr = $!; + $on_fail->( listen => $sock, $queuesize, $! ) if $on_fail; + next; + } + + return $self->_listen_handle( $listener, $sock, %params ); + } + + my $f = $self->new_future; + return $f->fail( "Cannot listen() - $listenerr", listen => listen => $listenerr ) if $listenerr; + return $f->fail( "Cannot bind() - $binderr", listen => bind => $binderr ) if $binderr; + return $f->fail( "Cannot setsockopt() - $sockopterr", listen => sockopt => $sockopterr ) if $sockopterr; + return $f->fail( "Cannot socket() - $socketerr", listen => socket => $socketerr ) if $socketerr; + die 'Oops; $loop->listen failed but no error cause was found'; +} + +sub _listen_hostservice +{ + my $self = shift; + my ( $listener, $host, $service, %params ) = @_; + + $host ||= ""; + defined $service or $service = ""; # might be 0 + + my %gai_hints; + exists $params{$_} and $gai_hints{$_} = $params{$_} for qw( family socktype protocol flags ); + + defined $gai_hints{socktype} or defined $gai_hints{protocol} or + carp "Attempting to ->listen without either 'socktype' or 'protocol' hint is not portable"; + + $self->resolver->getaddrinfo( + host => $host, + service => $service, + passive => 1, + %gai_hints, + )->then( sub { + my @addrs = @_; + $self->_listen_addrs( $listener, \@addrs, %params ); + }); +} + +=head1 OS ABSTRACTIONS + +Because the Magic Constructor searches for OS-specific subclasses of the Loop, +several abstractions of OS services are provided, in case specific OSes need +to give different implementations on that OS. + +=cut + +=head2 $signum = $loop->signame2num( $signame ) + +Legacy wrappers around L functions. + +=cut + +sub signame2num { shift; IO::Async::OS->signame2num( @_ ) } + +=head2 $time = $loop->time + +Returns the current UNIX time in fractional seconds. This is currently +equivalent to C but provided here as a utility for +programs to obtain the time current used by C for its own timing +purposes. + +=cut + +sub time +{ + my $self = shift; + return Time::HiRes::time; +} + +=head2 $pid = $loop->fork( %params ) + +This method creates a new child process to run a given code block, returning +its process ID. + +=over 8 + +=item code => CODE + +A block of code to execute in the child process. It will be called in scalar +context inside an C block. The return value will be used as the +C code from the child if it returns (or 255 if it returned C or +thows an exception). + +=item on_exit => CODE + +A optional continuation to be called when the child processes exits. It will +be invoked in the following way: + + $on_exit->( $pid, $exitcode ) + +The second argument is passed the plain perl C<$?> value. + +This key is optional; if not supplied, the calling code should install a +handler using the C method. + +=item keep_signals => BOOL + +Optional boolean. If missing or false, any CODE references in the C<%SIG> hash +will be removed and restored back to C in the child process. If true, +no adjustment of the C<%SIG> hash will be performed. + +=back + +=cut + +sub fork +{ + my $self = shift; + my %params = @_; + + HAVE_POSIX_FORK or croak "POSIX fork() is not available"; + + my $code = $params{code} or croak "Expected 'code' as a CODE reference"; + + my $kid = fork; + defined $kid or croak "Cannot fork() - $!"; + + if( $kid == 0 ) { + unless( $params{keep_signals} ) { + foreach( keys %SIG ) { + next if m/^__(WARN|DIE)__$/; + $SIG{$_} = "DEFAULT" if ref $SIG{$_} eq "CODE"; + } + } + + my $exitvalue = eval { $code->() }; + + defined $exitvalue or $exitvalue = -1; + + POSIX::_exit( $exitvalue ); + } + + if( defined $params{on_exit} ) { + $self->watch_child( $kid => $params{on_exit} ); + } + + return $kid; +} + +=head2 $tid = $loop->create_thread( %params ) + +This method creates a new (non-detached) thread to run the given code block, +returning its thread ID. + +=over 8 + +=item code => CODE + +A block of code to execute in the thread. It is called in the context given by +the C argument, and its return value will be available to the +C callback. It is called inside an C block; if it fails the +exception will be caught. + +=item context => "scalar" | "list" | "void" + +Optional. Gives the calling context that C is invoked in. Defaults to +C if not supplied. + +=item on_joined => CODE + +Callback to invoke when the thread function returns or throws an exception. +If it returned, this callback will be invoked with its result + + $on_joined->( return => @result ) + +If it threw an exception the callback is invoked with the value of C<$@> + + $on_joined->( died => $! ) + +=back + +=cut + +# It is basically impossible to have any semblance of order on global +# destruction, and even harder again to rely on when threads are going to be +# terminated and joined. Instead of ensuring we join them all, just detach any +# we no longer care about at END time +my %threads_to_detach; # {$tid} = $thread_weakly +END { + $_ and $_->detach for values %threads_to_detach; +} + +sub create_thread +{ + my $self = shift; + my %params = @_; + + HAVE_THREADS or croak "Threads are not available"; + + eval { require threads } or croak "This Perl does not support threads"; + + my $code = $params{code} or croak "Expected 'code' as a CODE reference"; + my $on_joined = $params{on_joined} or croak "Expected 'on_joined' as a CODE reference"; + + my $threadwatches = $self->{threadwatches}; + + unless( $self->{thread_join_pipe} ) { + ( my $rd, $self->{thread_join_pipe} ) = IO::Async::OS->pipepair or + croak "Cannot pipepair - $!"; + $self->{thread_join_pipe}->autoflush(1); + + $self->watch_io( + handle => $rd, + on_read_ready => sub { + sysread $rd, my $buffer, 8192 or return; + + # There's a race condition here in that we might have read from + # the pipe after the returning thread has written to it but before + # it has returned. We'll grab the actual $thread object and + # forcibly ->join it here to ensure we wait for its result. + + foreach my $tid ( unpack "N*", $buffer ) { + my ( $thread, $on_joined ) = @{ delete $threadwatches->{$tid} } + or die "ARGH: Can't find threadwatch for tid $tid\n"; + $on_joined->( $thread->join ); + delete $threads_to_detach{$tid}; + } + } + ); + } + + my $wr = $self->{thread_join_pipe}; + + my $context = $params{context} || "scalar"; + + my ( $thread ) = threads->create( + sub { + my ( @ret, $died ); + eval { + $context eq "list" ? ( @ret = $code->() ) : + $context eq "scalar" ? ( $ret[0] = $code->() ) : + $code->(); + 1; + } or $died = $@; + + $wr->syswrite( pack "N", threads->tid ); + + return died => $died if $died; + return return => @ret; + } + ); + + $threadwatches->{$thread->tid} = [ $thread, $on_joined ]; + weaken( $threads_to_detach{$thread->tid} = $thread ); + + return $thread->tid; +} + +=head1 LOW-LEVEL METHODS + +As C is an abstract base class, specific subclasses of it are +required to implement certain methods that form the base level of +functionality. They are not recommended for applications to use; see instead +the various event objects or higher level methods listed above. + +These methods should be considered as part of the interface contract required +to implement a C subclass. + +=cut + +=head2 IO::Async::Loop->API_VERSION + +This method will be called by the magic constructor on the class before it is +constructed, to ensure that the specific implementation will support the +required API. This method should return the API version that the loop +implementation supports. The magic constructor will use that class, provided +it declares a version at least as new as the version documented here. + +The current API version is C<0.49>. + +This method may be implemented using C; e.g + + use constant API_VERSION => '0.49'; + +=cut + +=head2 $loop->watch_io( %params ) + +This method installs callback functions which will be invoked when the given +IO handle becomes read- or write-ready. + +The C<%params> hash takes the following keys: + +=over 8 + +=item handle => IO + +The IO handle to watch. + +=item on_read_ready => CODE + +Optional. A CODE reference to call when the handle becomes read-ready. + +=item on_write_ready => CODE + +Optional. A CODE reference to call when the handle becomes write-ready. + +=back + +There can only be one filehandle of any given fileno registered at any one +time. For any one filehandle, there can only be one read-readiness and/or one +write-readiness callback at any one time. Registering a new one will remove an +existing one of that type. It is not required that both are provided. + +Applications should use a C or C instead +of using this method. + +If the filehandle does not yet have the C flag set, it will be +enabled by this method. This will ensure that any subsequent C, +C, or similar will not block on the filehandle. + +=cut + +# This class specifically does NOT implement this method, so that subclasses +# are forced to. The constructor will be checking.... +sub __watch_io +{ + my $self = shift; + my %params = @_; + + my $handle = delete $params{handle} or croak "Expected 'handle'"; + defined eval { $handle->fileno } or croak "Expected that 'handle' has defined ->fileno"; + + # Silent "upgrade" to O_NONBLOCK + $handle->blocking and $handle->blocking(0); + + my $watch = ( $self->{iowatches}->{$handle->fileno} ||= [] ); + + $watch->[0] = $handle; + + if( exists $params{on_read_ready} ) { + $watch->[1] = delete $params{on_read_ready}; + } + + if( exists $params{on_write_ready} ) { + $watch->[2] = delete $params{on_write_ready}; + } + + if( exists $params{on_hangup} ) { + $self->_CAN_ON_HANGUP or croak "Cannot watch_io for 'on_hangup' in ".ref($self); + $watch->[3] = delete $params{on_hangup}; + } + + keys %params and croak "Unrecognised keys for ->watch_io - " . join( ", ", keys %params ); +} + +=head2 $loop->unwatch_io( %params ) + +This method removes a watch on an IO handle which was previously installed by +C. + +The C<%params> hash takes the following keys: + +=over 8 + +=item handle => IO + +The IO handle to remove the watch for. + +=item on_read_ready => BOOL + +If true, remove the watch for read-readiness. + +=item on_write_ready => BOOL + +If true, remove the watch for write-readiness. + +=back + +Either or both callbacks may be removed at once. It is not an error to attempt +to remove a callback that is not present. If both callbacks were provided to +the C method and only one is removed by this method, the other shall +remain. + +=cut + +sub __unwatch_io +{ + my $self = shift; + my %params = @_; + + my $handle = delete $params{handle} or croak "Expected 'handle'"; + + my $watch = $self->{iowatches}->{$handle->fileno} or return; + + if( delete $params{on_read_ready} ) { + undef $watch->[1]; + } + + if( delete $params{on_write_ready} ) { + undef $watch->[2]; + } + + if( delete $params{on_hangup} ) { + $self->_CAN_ON_HANGUP or croak "Cannot watch_io for 'on_hangup' in ".ref($self); + undef $watch->[3]; + } + + if( not $watch->[1] and not $watch->[2] and not $watch->[3] ) { + delete $self->{iowatches}->{$handle->fileno}; + } + + keys %params and croak "Unrecognised keys for ->unwatch_io - " . join( ", ", keys %params ); +} + +=head2 $loop->watch_signal( $signal, $code ) + +This method adds a new signal handler to watch the given signal. + +=over 8 + +=item $signal + +The name of the signal to watch to. This should be a bare name like C. + +=item $code + +A CODE reference to the handling callback. + +=back + +There can only be one callback per signal name. Registering a new one will +remove an existing one. + +Applications should use a C object, or call +C instead of using this method. + +This and C are optional; a subclass may implement neither, or +both. If it implements neither then signal handling will be performed by the +base class using a self-connected pipe to interrupt the main IO blocking. + +=cut + +sub watch_signal +{ + my $self = shift; + my ( $signal, $code ) = @_; + + HAVE_SIGNALS or croak "This OS cannot ->watch_signal"; + + IO::Async::OS->loop_watch_signal( $self, $signal, $code ); +} + +=head2 $loop->unwatch_signal( $signal ) + +This method removes the signal callback for the given signal. + +=over 8 + +=item $signal + +The name of the signal to watch to. This should be a bare name like C. + +=back + +=cut + +sub unwatch_signal +{ + my $self = shift; + my ( $signal ) = @_; + + HAVE_SIGNALS or croak "This OS cannot ->unwatch_signal"; + + IO::Async::OS->loop_unwatch_signal( $self, $signal ); +} + +=head2 $id = $loop->watch_time( %args ) + +This method installs a callback which will be called at the specified time. +The time may either be specified as an absolute value (the C key), or +as a delay from the time it is installed (the C key). + +The returned C<$id> value can be used to identify the timer in case it needs +to be cancelled by the C method. Note that this value may be +an object reference, so if it is stored, it should be released after it has +been fired or cancelled, so the object itself can be freed. + +The C<%params> hash takes the following keys: + +=over 8 + +=item at => NUM + +The absolute system timestamp to run the event. + +=item after => NUM + +The delay after now at which to run the event, if C is not supplied. A +zero or negative delayed timer should be executed as soon as possible; the +next time the C method is invoked. + +=item now => NUM + +The time to consider as now if calculating an absolute time based on C; +defaults to C if not specified. + +=item code => CODE + +CODE reference to the continuation to run at the allotted time. + +=back + +Either one of C or C is required. + +For more powerful timer functionality as a C (so it can +be used as a child within another Notifier), see instead the +L object and its subclasses. + +These C<*_time> methods are optional; a subclass may implement neither or both +of them. If it implements neither, then the base class will manage a queue of +timer events. This queue should be handled by the C method +implemented by the subclass, using the C<_adjust_timeout> and +C<_manage_queues> methods. + +This is the newer version of the API, replacing C. It is +unspecified how this method pair interacts with the older +C triplet. + +=cut + +sub watch_time +{ + my $self = shift; + my %args = @_; + + # Renamed args + if( exists $args{after} ) { + $args{delay} = delete $args{after}; + } + elsif( exists $args{at} ) { + $args{time} = delete $args{at}; + } + else { + croak "Expected one of 'at' or 'after'"; + } + + if( $self->{old_timer} ) { + $self->enqueue_timer( %args ); + } + else { + my $timequeue = $self->{timequeue} ||= $self->__new_feature( "IO::Async::Internals::TimeQueue" ); + + my $time = $self->_build_time( %args ); + my $code = $args{code}; + + $timequeue->enqueue( time => $time, code => $code ); + } +} + +=head2 $loop->unwatch_time( $id ) + +Removes a timer callback previously created by C. + +This is the newer version of the API, replacing C. It is +unspecified how this method pair interacts with the older +C triplet. + +=cut + +sub unwatch_time +{ + my $self = shift; + my ( $id ) = @_; + + if( $self->{old_timer} ) { + $self->cancel_timer( $id ); + } + else { + my $timequeue = $self->{timequeue} ||= $self->__new_feature( "IO::Async::Internals::TimeQueue" ); + + $timequeue->cancel( $id ); + } +} + +sub _build_time +{ + my $self = shift; + my %params = @_; + + my $time; + if( exists $params{time} ) { + $time = $params{time}; + } + elsif( exists $params{delay} ) { + my $now = exists $params{now} ? $params{now} : $self->time; + + $time = $now + $params{delay}; + } + else { + croak "Expected either 'time' or 'delay' keys"; + } + + return $time; +} + +=head2 $id = $loop->enqueue_timer( %params ) + +An older version of C. This method should not be used in new code +but is retained for legacy purposes. For simple watch/unwatch behaviour use +instead the new C method; though note it has differently-named +arguments. For requeueable timers, consider using an +L or L instead. + +=cut + +sub enqueue_timer +{ + my $self = shift; + my ( %params ) = @_; + + # Renamed args + $params{after} = delete $params{delay} if exists $params{delay}; + $params{at} = delete $params{time} if exists $params{time}; + + my $code = $params{code}; + return [ $self->watch_time( %params ), $code ]; +} + +=head2 $loop->cancel_timer( $id ) + +An older version of C. This method should not be used in new +code but is retained for legacy purposes. + +=cut + +sub cancel_timer +{ + my $self = shift; + my ( $id ) = @_; + $self->unwatch_time( $id->[0] ); +} + +=head2 $newid = $loop->requeue_timer( $id, %params ) + +Reschedule an existing timer, moving it to a new time. The old timer is +removed and will not be invoked. + +The C<%params> hash takes the same keys as C, except for the +C argument. + +The requeue operation may be implemented as a cancel + enqueue, which may +mean the ID changes. Be sure to store the returned C<$newid> value if it is +required. + +This method should not be used in new code but is retained for legacy +purposes. For requeueable, consider using an L or +L instead. + +=cut + +sub requeue_timer +{ + my $self = shift; + my ( $id, %params ) = @_; + + $self->unwatch_time( $id->[0] ); + return $self->enqueue_timer( %params, code => $id->[1] ); +} + +=head2 $id = $loop->watch_idle( %params ) + +This method installs a callback which will be called at some point in the near +future. + +The C<%params> hash takes the following keys: + +=over 8 + +=item when => STRING + +Specifies the time at which the callback will be invoked. See below. + +=item code => CODE + +CODE reference to the continuation to run at the allotted time. + +=back + +The C parameter defines the time at which the callback will later be +invoked. Must be one of the following values: + +=over 8 + +=item later + +Callback is invoked after the current round of IO events have been processed +by the loop's underlying C method. + +If a new idle watch is installed from within a C callback, the +installed one will not be invoked during this round. It will be deferred for +the next time C is called, after any IO events have been handled. + +=back + +If there are pending idle handlers, then the C method will use a +zero timeout; it will return immediately, having processed any IO events and +idle handlers. + +The returned C<$id> value can be used to identify the idle handler in case it +needs to be removed, by calling the C method. Note this value +may be a reference, so if it is stored it should be released after the +callback has been invoked or cancled, so the referrant itself can be freed. + +This and C are optional; a subclass may implement neither, or +both. If it implements neither then idle handling will be performed by the +base class, using the C<_adjust_timeout> and C<_manage_queues> methods. + +=cut + +sub watch_idle +{ + my $self = shift; + my %params = @_; + + my $code = delete $params{code}; + ref $code or croak "Expected 'code' to be a reference"; + + my $when = delete $params{when} or croak "Expected 'when'"; + + # Future-proofing for other idle modes + $when eq "later" or croak "Expected 'when' to be 'later'"; + + my $deferrals = $self->{deferrals}; + + push @$deferrals, $code; + return \$deferrals->[-1]; +} + +=head2 $loop->unwatch_idle( $id ) + +Cancels a previously-installed idle handler. + +=cut + +sub unwatch_idle +{ + my $self = shift; + my ( $id ) = @_; + + my $deferrals = $self->{deferrals}; + + my $idx; + \$deferrals->[$_] == $id and ( $idx = $_ ), last for 0 .. $#$deferrals; + + splice @$deferrals, $idx, 1, () if defined $idx; +} + +sub _reap_children +{ + my ( $childwatches ) = @_; + + while( 1 ) { + my $zid = waitpid( -1, WNOHANG ); + + # PIDs on MSWin32 can be negative + last if !defined $zid or $zid == 0 or $zid == -1; + my $status = $?; + + if( defined $childwatches->{$zid} ) { + $childwatches->{$zid}->( $zid, $status ); + delete $childwatches->{$zid}; + } + + if( defined $childwatches->{0} ) { + $childwatches->{0}->( $zid, $status ); + # Don't delete it + } + } +} + +=head2 $loop->watch_child( $pid, $code ) + +This method adds a new handler for the termination of the given child process +PID, or all child processes. + +=over 8 + +=item $pid + +The PID to watch. Will report on all child processes if this is 0. + +=item $code + +A CODE reference to the exit handler. It will be invoked as + + $code->( $pid, $? ) + +The second argument is passed the plain perl C<$?> value. + +=back + +After invocation, the handler for a PID-specific watch is automatically +removed. The all-child watch will remain until it is removed by +C. + +This and C are optional; a subclass may implement neither, or +both. If it implements neither then child watching will be performed by using +C to install a C handler, which will use C to +look for exited child processes. + +If both a PID-specific and an all-process watch are installed, there is no +ordering guarantee as to which will be called first. + +=cut + +sub watch_child +{ + my $self = shift; + my ( $pid, $code ) = @_; + + my $childwatches = $self->{childwatches}; + + croak "Already have a handler for $pid" if exists $childwatches->{$pid}; + + if( HAVE_SIGNALS and !$self->{childwatch_sigid} ) { + $self->{childwatch_sigid} = $self->attach_signal( + CHLD => sub { _reap_children( $childwatches ) } + ); + + # There's a chance the child has already exited + my $zid = waitpid( $pid, WNOHANG ); + if( defined $zid and $zid > 0 ) { + my $exitstatus = $?; + $self->later( sub { $code->( $pid, $exitstatus ) } ); + return; + } + } + + $childwatches->{$pid} = $code; +} + +=head2 $loop->unwatch_child( $pid ) + +This method removes a watch on an existing child process PID. + +=cut + +sub unwatch_child +{ + my $self = shift; + my ( $pid ) = @_; + + my $childwatches = $self->{childwatches}; + + delete $childwatches->{$pid}; + + if( HAVE_SIGNALS and !keys %$childwatches ) { + $self->detach_signal( CHLD => delete $self->{childwatch_sigid} ); + } +} + +=head1 METHODS FOR SUBCLASSES + +The following methods are provided to access internal features which are +required by specific subclasses to implement the loop functionality. The use +cases of each will be documented in the above section. + +=cut + +=head2 $loop->_adjust_timeout( \$timeout ) + +Shortens the timeout value passed in the scalar reference if it is longer in +seconds than the time until the next queued event on the timer queue. If there +are pending idle handlers, the timeout is reduced to zero. + +=cut + +sub _adjust_timeout +{ + my $self = shift; + my ( $timeref, %params ) = @_; + + $$timeref = 0, return if @{ $self->{deferrals} }; + + if( defined $self->{sigproxy} and !$params{no_sigwait} ) { + $$timeref = $MAX_SIGWAIT_TIME if !defined $$timeref or $$timeref > $MAX_SIGWAIT_TIME; + } + if( !HAVE_SIGNALS and keys %{ $self->{childwatches} } ) { + $$timeref = $MAX_CHILDWAIT_TIME if !defined $$timeref or $$timeref > $MAX_CHILDWAIT_TIME; + } + + my $timequeue = $self->{timequeue}; + return unless defined $timequeue; + + my $nexttime = $timequeue->next_time; + return unless defined $nexttime; + + my $now = exists $params{now} ? $params{now} : $self->time; + my $timer_delay = $nexttime - $now; + + if( $timer_delay < 0 ) { + $$timeref = 0; + } + elsif( !defined $$timeref or $timer_delay < $$timeref ) { + $$timeref = $timer_delay; + } +} + +=head2 $loop->_manage_queues + +Checks the timer queue for callbacks that should have been invoked by now, and +runs them all, removing them from the queue. It also invokes all of the +pending idle handlers. Any new idle handlers installed by these are not +invoked yet; they will wait for the next time this method is called. + +=cut + +sub _manage_queues +{ + my $self = shift; + + my $count = 0; + + my $timequeue = $self->{timequeue}; + $count += $timequeue->fire if $timequeue; + + my $deferrals = $self->{deferrals}; + $self->{deferrals} = []; + + foreach my $code ( @$deferrals ) { + $code->(); + $count++; + } + + my $childwatches = $self->{childwatches}; + if( !HAVE_SIGNALS and keys %$childwatches ) { + _reap_children( $childwatches ); + } + + return $count; +} + +=head1 EXTENSIONS + +An Extension is a Perl module that provides extra methods in the +C or other packages. They are intended to provide extra +functionality that easily integrates with the rest of the code. + +Certain base methods take an C parameter; an ARRAY reference +containing a list of extension names. If such a list is passed to a method, it +will immediately call a method whose name is that of the base method, prefixed +by the first extension name in the list, separated by C<_>. If the +C list contains more extension names, it will be passed the +remaining ones in another C parameter. + +For example, + + $loop->connect( + extensions => [qw( FOO BAR )], + %args + ) + +will become + + $loop->FOO_connect( + extensions => [qw( BAR )], + %args + ) + +This is provided so that extension modules, such as L can +easily be invoked indirectly, by passing extra arguments to C methods +or similar, without needing every module to be aware of the C extension. +This functionality is generic and not limited to C; other extensions may +also use it. + +The following methods take an C parameter: + + $loop->connect + $loop->listen + +If an extension C method is invoked, it will be passed a C +parameter even if one was not provided to the original C<< $loop->listen >> +call, and it will not receive any of the C event callbacks. It should +use the C parameter on the C object. + +=cut + +=head1 STALL WATCHDOG + +A well-behaved C program should spend almost all of its time +blocked on input using the underlying C instance. The stall +watchdog is an optional debugging feature to help detect CPU spinlocks and +other bugs, where control is not returned to the loop every so often. + +If the watchdog is enabled and an event handler consumes more than a given +amount of real time before returning to the event loop, it will be interrupted +by printing a stack trace and terminating the program. The watchdog is only in +effect while the loop itself is not blocking; it won't fail simply because the +loop instance is waiting for input or timers. + +It is implemented using C, so if enabled, this signal will no longer +be available to user code. (Though in any case, most uses of C and +C are better served by one of the L subclasses). + +The following environment variables control its behaviour. + +=over 4 + +=item IO_ASYNC_WATCHDOG => BOOL + +Enables the stall watchdog if set to a non-zero value. + +=item IO_ASYNC_WATCHDOG_INTERVAL => INT + +Watchdog interval, in seconds, to pass to the C call. Defaults to 10 +seconds. + +=item IO_ASYNC_WATCHDOG_SIGABRT => BOOL + +If enabled, the watchdog signal handler will raise a C, which usually +has the effect of breaking out of a running program in debuggers such as +F. If not set then the process is terminated by throwing an exception with +C. + +=back + +=cut + +=head1 AUTHOR + +Paul Evans + +=cut + +0x55AA; -- cgit v1.2.1