diff options
Diffstat (limited to 'lib/IO/Async/Internals/Connector.pm')
-rw-r--r-- | lib/IO/Async/Internals/Connector.pm | 243 |
1 files changed, 243 insertions, 0 deletions
diff --git a/lib/IO/Async/Internals/Connector.pm b/lib/IO/Async/Internals/Connector.pm new file mode 100644 index 0000000..57029ef --- /dev/null +++ b/lib/IO/Async/Internals/Connector.pm @@ -0,0 +1,243 @@ +# You may distribute under the terms of either the GNU General Public License +# or the Artistic License (the same terms as Perl itself) +# +# (C) Paul Evans, 2008-2013 -- leonerd@leonerd.org.uk + +package # hide from CPAN + IO::Async::Internals::Connector; + +use strict; +use warnings; + +our $VERSION = '0.67'; + +use Scalar::Util qw( weaken ); + +use POSIX qw( EINPROGRESS ); +use Socket qw( SOL_SOCKET SO_ERROR ); + +use Future; +use Future::Utils 0.18 qw( try_repeat_until_success ); + +use IO::Async::OS; + +use Carp; + +use constant CONNECT_EWOULDLBOCK => IO::Async::OS->HAVE_CONNECT_EWOULDBLOCK; + +# Internal constructor +sub new +{ + my $class = shift; + my ( %params ) = @_; + + my $loop = delete $params{loop} or croak "Expected a 'loop'"; + + my $self = bless {}, $class; + weaken( $self->{loop} = $loop ); + + return $self; +} + +## Utility function +sub _get_sock_err +{ + my ( $sock ) = @_; + + my $err = $sock->getsockopt( SOL_SOCKET, SO_ERROR ); + + if( defined $err ) { + # 0 means no error, but is still defined + return undef if !$err; + + $! = $err; + return $!; + } + + # It seems we can't call getsockopt to query SO_ERROR. We'll try getpeername + if( defined getpeername( $sock ) ) { + return undef; + } + + my $peername_errno = $!+0; + my $peername_errstr = "$!"; + + # Not connected so we know this ought to fail + if( read( $sock, my $buff, 1 ) ) { + # That was most unexpected. getpeername fails because we're not + # connected, yet read succeeds. + warn "getpeername fails with $peername_errno ($peername_errstr) but read is successful\n"; + warn "Please see http://rt.cpan.org/Ticket/Display.html?id=38382\n"; + + $! = $peername_errno; + return $!; + } + + return $!; +} + +sub _connect_addresses +{ + my $self = shift; + my ( $addrlist, $on_fail ) = @_; + + my $loop = $self->{loop}; + + my ( $connecterr, $binderr, $socketerr ); + + my $future = try_repeat_until_success { + my $addr = shift; + my ( $family, $socktype, $protocol, $localaddr, $peeraddr ) = + @{$addr}{qw( family socktype protocol localaddr peeraddr )}; + + my $sock = IO::Async::OS->socket( $family, $socktype, $protocol ); + + if( !$sock ) { + $socketerr = $!; + $on_fail->( "socket", $family, $socktype, $protocol, $! ) if $on_fail; + return Future->fail( 1 ); + } + + if( $localaddr and not $sock->bind( $localaddr ) ) { + $binderr = $!; + $on_fail->( "bind", $sock, $localaddr, $! ) if $on_fail; + return Future->fail( 1 ); + } + + $sock->blocking( 0 ); + + # TODO: $sock->connect returns success masking EINPROGRESS + my $ret = connect( $sock, $peeraddr ); + if( $ret ) { + # Succeeded already? Dubious, but OK. Can happen e.g. with connections to + # localhost, or UNIX sockets, or something like that. + return Future->done( $sock ); + } + elsif( $! != EINPROGRESS and !CONNECT_EWOULDLBOCK || $! != POSIX::EWOULDBLOCK ) { + $connecterr = $!; + $on_fail->( "connect", $sock, $peeraddr, $! ) if $on_fail; + return Future->fail( 1 ); + } + + # Else + my $f = $loop->new_future; + $loop->watch_io( + handle => $sock, + on_write_ready => sub { + $loop->unwatch_io( handle => $sock, on_write_ready => 1 ); + + my $err = _get_sock_err( $sock ); + + return $f->done( $sock ) if !$err; + + $connecterr = $!; + $on_fail->( "connect", $sock, $peeraddr, $err ) if $on_fail; + return $f->fail( 1 ); + }, + ); + $f->on_cancel( + sub { $loop->unwatch_io( handle => $sock, on_write_ready => 1 ); } + ); + return $f; + } foreach => $addrlist; + + return $future->else( sub { + return $future->new->fail( "connect: $connecterr", connect => connect => $connecterr ) + if $connecterr; + return $future->new->fail( "bind: $binderr", connect => bind => $binderr ) + if $binderr; + return $future->new->fail( "socket: $socketerr", connect => socket => $socketerr ) + if $socketerr; + # If it gets this far then something went wrong + die 'Oops; $loop->connect failed but no error cause was found'; + } ); +} + +sub connect +{ + my $self = shift; + my ( %params ) = @_; + + my $loop = $self->{loop}; + + my $on_fail = $params{on_fail}; + + my %gai_hints; + exists $params{$_} and $gai_hints{$_} = $params{$_} for qw( family socktype protocol flags ); + + if( exists $params{host} or exists $params{local_host} or exists $params{local_port} ) { + # We'll be making a ->getaddrinfo call + defined $gai_hints{socktype} or defined $gai_hints{protocol} or + carp "Attempting to ->connect without either 'socktype' or 'protocol' hint is not portable"; + } + + my $peeraddrfuture; + if( exists $params{host} and exists $params{service} ) { + my $host = $params{host} or croak "Expected 'host'"; + my $service = $params{service} or croak "Expected 'service'"; + + $peeraddrfuture = $loop->resolver->getaddrinfo( + host => $host, + service => $service, + %gai_hints, + ); + } + elsif( exists $params{addrs} or exists $params{addr} ) { + $peeraddrfuture = $loop->new_future->done( exists $params{addrs} ? @{ $params{addrs} } : ( $params{addr} ) ); + } + else { + croak "Expected 'host' and 'service' or 'addrs' or 'addr' arguments"; + } + + my $localaddrfuture; + if( defined $params{local_host} or defined $params{local_service} ) { + # Empty is fine on either of these + my $host = $params{local_host}; + my $service = $params{local_service}; + + $localaddrfuture = $loop->resolver->getaddrinfo( + host => $host, + service => $service, + %gai_hints, + ); + } + elsif( exists $params{local_addrs} or exists $params{local_addr} ) { + $localaddrfuture = $loop->new_future->done( exists $params{local_addrs} ? @{ $params{local_addrs} } : ( $params{local_addr} ) ); + } + else { + $localaddrfuture = $loop->new_future->done( {} ); + } + + return Future->needs_all( $peeraddrfuture, $localaddrfuture ) + ->then( sub { + my @peeraddrs = $peeraddrfuture->get; + my @localaddrs = $localaddrfuture->get; + + my @addrs; + + foreach my $local ( @localaddrs ) { + my ( $l_family, $l_socktype, $l_protocol, $l_addr ) = + IO::Async::OS->extract_addrinfo( $local, 'local_addr' ); + foreach my $peer ( @peeraddrs ) { + my ( $p_family, $p_socktype, $p_protocol, $p_addr ) = + IO::Async::OS->extract_addrinfo( $peer ); + + next if $l_family and $p_family and $l_family != $p_family; + next if $l_socktype and $p_socktype and $l_socktype != $p_socktype; + next if $l_protocol and $p_protocol and $l_protocol != $p_protocol; + + push @addrs, { + family => $l_family || $p_family, + socktype => $l_socktype || $p_socktype, + protocol => $l_protocol || $p_protocol, + localaddr => $l_addr, + peeraddr => $p_addr, + }; + } + } + + return $self->_connect_addresses( \@addrs, $on_fail ); + } ); +} + +0x55AA; |