diff options
Diffstat (limited to 'src/net/fd_windows.go')
-rw-r--r-- | src/net/fd_windows.go | 644 |
1 files changed, 644 insertions, 0 deletions
diff --git a/src/net/fd_windows.go b/src/net/fd_windows.go new file mode 100644 index 000000000..6d69e0624 --- /dev/null +++ b/src/net/fd_windows.go @@ -0,0 +1,644 @@ +// Copyright 2010 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package net + +import ( + "errors" + "io" + "os" + "runtime" + "sync" + "syscall" + "time" + "unsafe" +) + +var ( + initErr error + ioSync uint64 +) + +// CancelIo Windows API cancels all outstanding IO for a particular +// socket on current thread. To overcome that limitation, we run +// special goroutine, locked to OS single thread, that both starts +// and cancels IO. It means, there are 2 unavoidable thread switches +// for every IO. +// Some newer versions of Windows has new CancelIoEx API, that does +// not have that limitation and can be used from any thread. This +// package uses CancelIoEx API, if present, otherwise it fallback +// to CancelIo. + +var ( + canCancelIO bool // determines if CancelIoEx API is present + skipSyncNotif bool + hasLoadSetFileCompletionNotificationModes bool +) + +func sysInit() { + var d syscall.WSAData + e := syscall.WSAStartup(uint32(0x202), &d) + if e != nil { + initErr = os.NewSyscallError("WSAStartup", e) + } + canCancelIO = syscall.LoadCancelIoEx() == nil + if syscall.LoadGetAddrInfo() == nil { + lookupPort = newLookupPort + lookupIP = newLookupIP + } + + hasLoadSetFileCompletionNotificationModes = syscall.LoadSetFileCompletionNotificationModes() == nil + if hasLoadSetFileCompletionNotificationModes { + // It's not safe to use FILE_SKIP_COMPLETION_PORT_ON_SUCCESS if non IFS providers are installed: + // http://support.microsoft.com/kb/2568167 + skipSyncNotif = true + protos := [2]int32{syscall.IPPROTO_TCP, 0} + var buf [32]syscall.WSAProtocolInfo + len := uint32(unsafe.Sizeof(buf)) + n, err := syscall.WSAEnumProtocols(&protos[0], &buf[0], &len) + if err != nil { + skipSyncNotif = false + } else { + for i := int32(0); i < n; i++ { + if buf[i].ServiceFlags1&syscall.XP1_IFS_HANDLES == 0 { + skipSyncNotif = false + break + } + } + } + } +} + +func closesocket(s syscall.Handle) error { + return syscall.Closesocket(s) +} + +func canUseConnectEx(net string) bool { + switch net { + case "udp", "udp4", "udp6", "ip", "ip4", "ip6": + // ConnectEx windows API does not support connectionless sockets. + return false + } + return syscall.LoadConnectEx() == nil +} + +func dial(net string, ra Addr, dialer func(time.Time) (Conn, error), deadline time.Time) (Conn, error) { + if !canUseConnectEx(net) { + // Use the relatively inefficient goroutine-racing + // implementation of DialTimeout. + return dialChannel(net, ra, dialer, deadline) + } + return dialer(deadline) +} + +// operation contains superset of data necessary to perform all async IO. +type operation struct { + // Used by IOCP interface, it must be first field + // of the struct, as our code rely on it. + o syscall.Overlapped + + // fields used by runtime.netpoll + runtimeCtx uintptr + mode int32 + errno int32 + qty uint32 + + // fields used only by net package + fd *netFD + errc chan error + buf syscall.WSABuf + sa syscall.Sockaddr + rsa *syscall.RawSockaddrAny + rsan int32 + handle syscall.Handle + flags uint32 +} + +func (o *operation) InitBuf(buf []byte) { + o.buf.Len = uint32(len(buf)) + o.buf.Buf = nil + if len(buf) != 0 { + o.buf.Buf = &buf[0] + } +} + +// ioSrv executes net IO requests. +type ioSrv struct { + req chan ioSrvReq +} + +type ioSrvReq struct { + o *operation + submit func(o *operation) error // if nil, cancel the operation +} + +// ProcessRemoteIO will execute submit IO requests on behalf +// of other goroutines, all on a single os thread, so it can +// cancel them later. Results of all operations will be sent +// back to their requesters via channel supplied in request. +// It is used only when the CancelIoEx API is unavailable. +func (s *ioSrv) ProcessRemoteIO() { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + for r := range s.req { + if r.submit != nil { + r.o.errc <- r.submit(r.o) + } else { + r.o.errc <- syscall.CancelIo(r.o.fd.sysfd) + } + } +} + +// ExecIO executes a single IO operation o. It submits and cancels +// IO in the current thread for systems where Windows CancelIoEx API +// is available. Alternatively, it passes the request onto +// runtime netpoll and waits for completion or cancels request. +func (s *ioSrv) ExecIO(o *operation, name string, submit func(o *operation) error) (int, error) { + fd := o.fd + // Notify runtime netpoll about starting IO. + err := fd.pd.Prepare(int(o.mode)) + if err != nil { + return 0, &OpError{name, fd.net, fd.laddr, err} + } + // Start IO. + if canCancelIO { + err = submit(o) + } else { + // Send request to a special dedicated thread, + // so it can stop the IO with CancelIO later. + s.req <- ioSrvReq{o, submit} + err = <-o.errc + } + switch err { + case nil: + // IO completed immediately + if o.fd.skipSyncNotif { + // No completion message will follow, so return immediately. + return int(o.qty), nil + } + // Need to get our completion message anyway. + case syscall.ERROR_IO_PENDING: + // IO started, and we have to wait for its completion. + err = nil + default: + return 0, &OpError{name, fd.net, fd.laddr, err} + } + // Wait for our request to complete. + err = fd.pd.Wait(int(o.mode)) + if err == nil { + // All is good. Extract our IO results and return. + if o.errno != 0 { + err = syscall.Errno(o.errno) + return 0, &OpError{name, fd.net, fd.laddr, err} + } + return int(o.qty), nil + } + // IO is interrupted by "close" or "timeout" + netpollErr := err + switch netpollErr { + case errClosing, errTimeout: + // will deal with those. + default: + panic("net: unexpected runtime.netpoll error: " + netpollErr.Error()) + } + // Cancel our request. + if canCancelIO { + err := syscall.CancelIoEx(fd.sysfd, &o.o) + // Assuming ERROR_NOT_FOUND is returned, if IO is completed. + if err != nil && err != syscall.ERROR_NOT_FOUND { + // TODO(brainman): maybe do something else, but panic. + panic(err) + } + } else { + s.req <- ioSrvReq{o, nil} + <-o.errc + } + // Wait for cancellation to complete. + fd.pd.WaitCanceled(int(o.mode)) + if o.errno != 0 { + err = syscall.Errno(o.errno) + if err == syscall.ERROR_OPERATION_ABORTED { // IO Canceled + err = netpollErr + } + return 0, &OpError{name, fd.net, fd.laddr, err} + } + // We issued cancellation request. But, it seems, IO operation succeeded + // before cancellation request run. We need to treat IO operation as + // succeeded (the bytes are actually sent/recv from network). + return int(o.qty), nil +} + +// Start helper goroutines. +var rsrv, wsrv *ioSrv +var onceStartServer sync.Once + +func startServer() { + rsrv = new(ioSrv) + wsrv = new(ioSrv) + if !canCancelIO { + // Only CancelIo API is available. Lets start two special goroutines + // locked to an OS thread, that both starts and cancels IO. One will + // process read requests, while other will do writes. + rsrv.req = make(chan ioSrvReq) + go rsrv.ProcessRemoteIO() + wsrv.req = make(chan ioSrvReq) + go wsrv.ProcessRemoteIO() + } +} + +// Network file descriptor. +type netFD struct { + // locking/lifetime of sysfd + serialize access to Read and Write methods + fdmu fdMutex + + // immutable until Close + sysfd syscall.Handle + family int + sotype int + isConnected bool + skipSyncNotif bool + net string + laddr Addr + raddr Addr + + rop operation // read operation + wop operation // write operation + + // wait server + pd pollDesc +} + +func newFD(sysfd syscall.Handle, family, sotype int, net string) (*netFD, error) { + if initErr != nil { + return nil, initErr + } + onceStartServer.Do(startServer) + return &netFD{sysfd: sysfd, family: family, sotype: sotype, net: net}, nil +} + +func (fd *netFD) init() error { + if err := fd.pd.Init(fd); err != nil { + return err + } + if hasLoadSetFileCompletionNotificationModes { + // We do not use events, so we can skip them always. + flags := uint8(syscall.FILE_SKIP_SET_EVENT_ON_HANDLE) + // It's not safe to skip completion notifications for UDP: + // http://blogs.technet.com/b/winserverperformance/archive/2008/06/26/designing-applications-for-high-performance-part-iii.aspx + if skipSyncNotif && fd.net == "tcp" { + flags |= syscall.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS + } + err := syscall.SetFileCompletionNotificationModes(fd.sysfd, flags) + if err == nil && flags&syscall.FILE_SKIP_COMPLETION_PORT_ON_SUCCESS != 0 { + fd.skipSyncNotif = true + } + } + fd.rop.mode = 'r' + fd.wop.mode = 'w' + fd.rop.fd = fd + fd.wop.fd = fd + fd.rop.runtimeCtx = fd.pd.runtimeCtx + fd.wop.runtimeCtx = fd.pd.runtimeCtx + if !canCancelIO { + fd.rop.errc = make(chan error) + fd.wop.errc = make(chan error) + } + return nil +} + +func (fd *netFD) setAddr(laddr, raddr Addr) { + fd.laddr = laddr + fd.raddr = raddr + runtime.SetFinalizer(fd, (*netFD).Close) +} + +func (fd *netFD) connect(la, ra syscall.Sockaddr, deadline time.Time) error { + // Do not need to call fd.writeLock here, + // because fd is not yet accessible to user, + // so no concurrent operations are possible. + if err := fd.init(); err != nil { + return err + } + if !deadline.IsZero() { + fd.setWriteDeadline(deadline) + defer fd.setWriteDeadline(noDeadline) + } + if !canUseConnectEx(fd.net) { + return syscall.Connect(fd.sysfd, ra) + } + // ConnectEx windows API requires an unconnected, previously bound socket. + if la == nil { + switch ra.(type) { + case *syscall.SockaddrInet4: + la = &syscall.SockaddrInet4{} + case *syscall.SockaddrInet6: + la = &syscall.SockaddrInet6{} + default: + panic("unexpected type in connect") + } + if err := syscall.Bind(fd.sysfd, la); err != nil { + return err + } + } + // Call ConnectEx API. + o := &fd.wop + o.sa = ra + _, err := wsrv.ExecIO(o, "ConnectEx", func(o *operation) error { + return syscall.ConnectEx(o.fd.sysfd, o.sa, nil, 0, nil, &o.o) + }) + if err != nil { + return err + } + // Refresh socket properties. + return syscall.Setsockopt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_UPDATE_CONNECT_CONTEXT, (*byte)(unsafe.Pointer(&fd.sysfd)), int32(unsafe.Sizeof(fd.sysfd))) +} + +func (fd *netFD) destroy() { + if fd.sysfd == syscall.InvalidHandle { + return + } + // Poller may want to unregister fd in readiness notification mechanism, + // so this must be executed before closesocket. + fd.pd.Close() + closesocket(fd.sysfd) + fd.sysfd = syscall.InvalidHandle + // no need for a finalizer anymore + runtime.SetFinalizer(fd, nil) +} + +// Add a reference to this fd. +// Returns an error if the fd cannot be used. +func (fd *netFD) incref() error { + if !fd.fdmu.Incref() { + return errClosing + } + return nil +} + +// Remove a reference to this FD and close if we've been asked to do so +// (and there are no references left). +func (fd *netFD) decref() { + if fd.fdmu.Decref() { + fd.destroy() + } +} + +// Add a reference to this fd and lock for reading. +// Returns an error if the fd cannot be used. +func (fd *netFD) readLock() error { + if !fd.fdmu.RWLock(true) { + return errClosing + } + return nil +} + +// Unlock for reading and remove a reference to this FD. +func (fd *netFD) readUnlock() { + if fd.fdmu.RWUnlock(true) { + fd.destroy() + } +} + +// Add a reference to this fd and lock for writing. +// Returns an error if the fd cannot be used. +func (fd *netFD) writeLock() error { + if !fd.fdmu.RWLock(false) { + return errClosing + } + return nil +} + +// Unlock for writing and remove a reference to this FD. +func (fd *netFD) writeUnlock() { + if fd.fdmu.RWUnlock(false) { + fd.destroy() + } +} + +func (fd *netFD) Close() error { + if !fd.fdmu.IncrefAndClose() { + return errClosing + } + // unblock pending reader and writer + fd.pd.Evict() + fd.decref() + return nil +} + +func (fd *netFD) shutdown(how int) error { + if err := fd.incref(); err != nil { + return err + } + defer fd.decref() + err := syscall.Shutdown(fd.sysfd, how) + if err != nil { + return &OpError{"shutdown", fd.net, fd.laddr, err} + } + return nil +} + +func (fd *netFD) closeRead() error { + return fd.shutdown(syscall.SHUT_RD) +} + +func (fd *netFD) closeWrite() error { + return fd.shutdown(syscall.SHUT_WR) +} + +func (fd *netFD) Read(buf []byte) (int, error) { + if err := fd.readLock(); err != nil { + return 0, err + } + defer fd.readUnlock() + o := &fd.rop + o.InitBuf(buf) + n, err := rsrv.ExecIO(o, "WSARecv", func(o *operation) error { + return syscall.WSARecv(o.fd.sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil) + }) + if err == nil && n == 0 { + err = io.EOF + } + if raceenabled { + raceAcquire(unsafe.Pointer(&ioSync)) + } + return n, err +} + +func (fd *netFD) readFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) { + if len(buf) == 0 { + return 0, nil, nil + } + if err := fd.readLock(); err != nil { + return 0, nil, err + } + defer fd.readUnlock() + o := &fd.rop + o.InitBuf(buf) + n, err = rsrv.ExecIO(o, "WSARecvFrom", func(o *operation) error { + if o.rsa == nil { + o.rsa = new(syscall.RawSockaddrAny) + } + o.rsan = int32(unsafe.Sizeof(*o.rsa)) + return syscall.WSARecvFrom(o.fd.sysfd, &o.buf, 1, &o.qty, &o.flags, o.rsa, &o.rsan, &o.o, nil) + }) + if err != nil { + return 0, nil, err + } + sa, _ = o.rsa.Sockaddr() + return +} + +func (fd *netFD) Write(buf []byte) (int, error) { + if err := fd.writeLock(); err != nil { + return 0, err + } + defer fd.writeUnlock() + if raceenabled { + raceReleaseMerge(unsafe.Pointer(&ioSync)) + } + o := &fd.wop + o.InitBuf(buf) + return wsrv.ExecIO(o, "WSASend", func(o *operation) error { + return syscall.WSASend(o.fd.sysfd, &o.buf, 1, &o.qty, 0, &o.o, nil) + }) +} + +func (fd *netFD) writeTo(buf []byte, sa syscall.Sockaddr) (int, error) { + if len(buf) == 0 { + return 0, nil + } + if err := fd.writeLock(); err != nil { + return 0, err + } + defer fd.writeUnlock() + o := &fd.wop + o.InitBuf(buf) + o.sa = sa + return wsrv.ExecIO(o, "WSASendto", func(o *operation) error { + return syscall.WSASendto(o.fd.sysfd, &o.buf, 1, &o.qty, 0, o.sa, &o.o, nil) + }) +} + +func (fd *netFD) acceptOne(rawsa []syscall.RawSockaddrAny, o *operation) (*netFD, error) { + // Get new socket. + s, err := sysSocket(fd.family, fd.sotype, 0) + if err != nil { + return nil, &OpError{"socket", fd.net, fd.laddr, err} + } + + // Associate our new socket with IOCP. + netfd, err := newFD(s, fd.family, fd.sotype, fd.net) + if err != nil { + closesocket(s) + return nil, &OpError{"accept", fd.net, fd.laddr, err} + } + if err := netfd.init(); err != nil { + fd.Close() + return nil, err + } + + // Submit accept request. + o.handle = s + o.rsan = int32(unsafe.Sizeof(rawsa[0])) + _, err = rsrv.ExecIO(o, "AcceptEx", func(o *operation) error { + return syscall.AcceptEx(o.fd.sysfd, o.handle, (*byte)(unsafe.Pointer(&rawsa[0])), 0, uint32(o.rsan), uint32(o.rsan), &o.qty, &o.o) + }) + if err != nil { + netfd.Close() + return nil, err + } + + // Inherit properties of the listening socket. + err = syscall.Setsockopt(s, syscall.SOL_SOCKET, syscall.SO_UPDATE_ACCEPT_CONTEXT, (*byte)(unsafe.Pointer(&fd.sysfd)), int32(unsafe.Sizeof(fd.sysfd))) + if err != nil { + netfd.Close() + return nil, &OpError{"Setsockopt", fd.net, fd.laddr, err} + } + + return netfd, nil +} + +func (fd *netFD) accept() (*netFD, error) { + if err := fd.readLock(); err != nil { + return nil, err + } + defer fd.readUnlock() + + o := &fd.rop + var netfd *netFD + var err error + var rawsa [2]syscall.RawSockaddrAny + for { + netfd, err = fd.acceptOne(rawsa[:], o) + if err == nil { + break + } + // Sometimes we see WSAECONNRESET and ERROR_NETNAME_DELETED is + // returned here. These happen if connection reset is received + // before AcceptEx could complete. These errors relate to new + // connection, not to AcceptEx, so ignore broken connection and + // try AcceptEx again for more connections. + operr, ok := err.(*OpError) + if !ok { + return nil, err + } + errno, ok := operr.Err.(syscall.Errno) + if !ok { + return nil, err + } + switch errno { + case syscall.ERROR_NETNAME_DELETED, syscall.WSAECONNRESET: + // ignore these and try again + default: + return nil, err + } + } + + // Get local and peer addr out of AcceptEx buffer. + var lrsa, rrsa *syscall.RawSockaddrAny + var llen, rlen int32 + syscall.GetAcceptExSockaddrs((*byte)(unsafe.Pointer(&rawsa[0])), + 0, uint32(o.rsan), uint32(o.rsan), &lrsa, &llen, &rrsa, &rlen) + lsa, _ := lrsa.Sockaddr() + rsa, _ := rrsa.Sockaddr() + + netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa)) + return netfd, nil +} + +func skipRawSocketTests() (skip bool, skipmsg string, err error) { + // From http://msdn.microsoft.com/en-us/library/windows/desktop/ms740548.aspx: + // Note: To use a socket of type SOCK_RAW requires administrative privileges. + // Users running Winsock applications that use raw sockets must be a member of + // the Administrators group on the local computer, otherwise raw socket calls + // will fail with an error code of WSAEACCES. On Windows Vista and later, access + // for raw sockets is enforced at socket creation. In earlier versions of Windows, + // access for raw sockets is enforced during other socket operations. + s, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_RAW, 0) + if err == syscall.WSAEACCES { + return true, "skipping test; no access to raw socket allowed", nil + } + if err != nil { + return true, "", err + } + defer syscall.Closesocket(s) + return false, "", nil +} + +// Unimplemented functions. + +func (fd *netFD) dup() (*os.File, error) { + // TODO: Implement this + return nil, os.NewSyscallError("dup", syscall.EWINDOWS) +} + +var errNoSupport = errors.New("address family not supported") + +func (fd *netFD) readMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.Sockaddr, err error) { + return 0, 0, 0, nil, errNoSupport +} + +func (fd *netFD) writeMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oobn int, err error) { + return 0, 0, errNoSupport +} |