summaryrefslogtreecommitdiff
path: root/src/io
diff options
context:
space:
mode:
authorJoe Tsai <joetsai@digital-static.net>2017-09-21 17:43:00 -0700
committerJoe Tsai <thebrokentoaster@gmail.com>2017-10-11 20:00:54 +0000
commit371eda45582aea165131cc204e92bdbce3c95097 (patch)
tree598a8f58ac162d3779886bc1c8253d56024cd1f5 /src/io
parente2dd8ca946be884bb877e074a21727f1a685a706 (diff)
downloadgo-git-371eda45582aea165131cc204e92bdbce3c95097.tar.gz
io: simplify pipe implementation
In the distant past, Pipe was implemented with channels and a long running pipe.run goroutine (see CL 994043). This approach of having all communication serialized through the run method was error prone giving Pipe a history of deadlocks and race conditions. After the introduction of sync.Cond, the implementation was rewritten (see CL 4252057) to use condition variables and avoid the long running pipe.run goroutine. While this implementation is superior to the previous one, this implementation is strange in that the p.data field is always set immediately prior to signaling the other goroutine with Cond.Signal, effectively making the combination of the two a channel-like operation. Inferior to a channel, however, this still requires explicit locking around the p.data field. The data+rwait can be effectively be replaced by a "chan []byte" to inform a reader that there is data available. The data+wwait can be effectively be replaced by a "chan int" to inform a writer of how many bytes were read. This implementation is a simplified from net.Pipe in CL 37402. Change-Id: Ia5b26320b0525934fd87a3b69a091c787167f5aa Reviewed-on: https://go-review.googlesource.com/65330 Run-TryBot: Joe Tsai <thebrokentoaster@gmail.com> TryBot-Result: Gobot Gobot <gobot@golang.org> Reviewed-by: Bryan Mills <bcmills@google.com>
Diffstat (limited to 'src/io')
-rw-r--r--src/io/pipe.go165
-rw-r--r--src/io/pipe_test.go84
2 files changed, 158 insertions, 91 deletions
diff --git a/src/io/pipe.go b/src/io/pipe.go
index b6e7755f64..544481e1b9 100644
--- a/src/io/pipe.go
+++ b/src/io/pipe.go
@@ -10,6 +10,7 @@ package io
import (
"errors"
"sync"
+ "sync/atomic"
)
// ErrClosedPipe is the error used for read or write operations on a closed pipe.
@@ -17,103 +18,87 @@ var ErrClosedPipe = errors.New("io: read/write on closed pipe")
// A pipe is the shared pipe structure underlying PipeReader and PipeWriter.
type pipe struct {
- rl sync.Mutex // gates readers one at a time
- wl sync.Mutex // gates writers one at a time
- l sync.Mutex // protects remaining fields
- data []byte // data remaining in pending write
- rwait sync.Cond // waiting reader
- wwait sync.Cond // waiting writer
- rerr error // if reader closed, error to give writes
- werr error // if writer closed, error to give reads
-}
-
-func (p *pipe) read(b []byte) (n int, err error) {
- // One reader at a time.
- p.rl.Lock()
- defer p.rl.Unlock()
-
- p.l.Lock()
- defer p.l.Unlock()
- for {
- if p.rerr != nil {
- return 0, ErrClosedPipe
- }
- if p.data != nil {
- break
- }
- if p.werr != nil {
- return 0, p.werr
- }
- p.rwait.Wait()
- }
- n = copy(b, p.data)
- p.data = p.data[n:]
- if len(p.data) == 0 {
- p.data = nil
- p.wwait.Signal()
- }
- return
+ wrMu sync.Mutex // Serializes Write operations
+ wrCh chan []byte
+ rdCh chan int
+
+ once sync.Once // Protects closing done
+ done chan struct{}
+ rerr atomic.Value
+ werr atomic.Value
}
-var zero [0]byte
+func (p *pipe) Read(b []byte) (n int, err error) {
+ select {
+ case <-p.done:
+ return 0, p.readCloseError()
+ default:
+ }
-func (p *pipe) write(b []byte) (n int, err error) {
- // pipe uses nil to mean not available
- if b == nil {
- b = zero[:]
+ select {
+ case bw := <-p.wrCh:
+ nr := copy(b, bw)
+ p.rdCh <- nr
+ return nr, nil
+ case <-p.done:
+ return 0, p.readCloseError()
}
+}
- // One writer at a time.
- p.wl.Lock()
- defer p.wl.Unlock()
+func (p *pipe) readCloseError() error {
+ _, rok := p.rerr.Load().(error)
+ if werr, wok := p.werr.Load().(error); !rok && wok {
+ return werr
+ }
+ return ErrClosedPipe
+}
- p.l.Lock()
- defer p.l.Unlock()
- if p.werr != nil {
+func (p *pipe) CloseRead(err error) error {
+ if err == nil {
err = ErrClosedPipe
- return
}
- p.data = b
- p.rwait.Signal()
- for {
- if p.data == nil {
- break
- }
- if p.rerr != nil {
- err = p.rerr
- break
- }
- if p.werr != nil {
- err = ErrClosedPipe
- break
+ p.rerr.Store(err)
+ p.once.Do(func() { close(p.done) })
+ return nil
+}
+
+func (p *pipe) Write(b []byte) (n int, err error) {
+ select {
+ case <-p.done:
+ return 0, p.writeCloseError()
+ default:
+ p.wrMu.Lock()
+ defer p.wrMu.Unlock()
+ }
+
+ for once := true; once || len(b) > 0; once = false {
+ select {
+ case p.wrCh <- b:
+ nw := <-p.rdCh
+ b = b[nw:]
+ n += nw
+ case <-p.done:
+ return n, p.writeCloseError()
}
- p.wwait.Wait()
}
- n = len(b) - len(p.data)
- p.data = nil // in case of rerr or werr
- return
+ return n, nil
}
-func (p *pipe) rclose(err error) {
- if err == nil {
- err = ErrClosedPipe
+func (p *pipe) writeCloseError() error {
+ _, wok := p.werr.Load().(error)
+ if rerr, rok := p.rerr.Load().(error); !wok && rok {
+ return rerr
}
- p.l.Lock()
- defer p.l.Unlock()
- p.rerr = err
- p.rwait.Signal()
- p.wwait.Signal()
+ return ErrClosedPipe
}
-func (p *pipe) wclose(err error) {
+func (p *pipe) CloseWrite(err error) error {
if err == nil {
err = EOF
}
- p.l.Lock()
- defer p.l.Unlock()
- p.werr = err
- p.rwait.Signal()
- p.wwait.Signal()
+ p.werr.Store(err)
+ p.once.Do(func() { close(p.done) })
+ return nil
}
// A PipeReader is the read half of a pipe.
@@ -127,7 +112,7 @@ type PipeReader struct {
// If the write end is closed with an error, that error is
// returned as err; otherwise err is EOF.
func (r *PipeReader) Read(data []byte) (n int, err error) {
- return r.p.read(data)
+ return r.p.Read(data)
}
// Close closes the reader; subsequent writes to the
@@ -139,8 +124,7 @@ func (r *PipeReader) Close() error {
// CloseWithError closes the reader; subsequent writes
// to the write half of the pipe will return the error err.
func (r *PipeReader) CloseWithError(err error) error {
- r.p.rclose(err)
- return nil
+ return r.p.CloseRead(err)
}
// A PipeWriter is the write half of a pipe.
@@ -154,7 +138,7 @@ type PipeWriter struct {
// If the read end is closed with an error, that err is
// returned as err; otherwise err is ErrClosedPipe.
func (w *PipeWriter) Write(data []byte) (n int, err error) {
- return w.p.write(data)
+ return w.p.Write(data)
}
// Close closes the writer; subsequent reads from the
@@ -169,8 +153,7 @@ func (w *PipeWriter) Close() error {
//
// CloseWithError always returns nil.
func (w *PipeWriter) CloseWithError(err error) error {
- w.p.wclose(err)
- return nil
+ return w.p.CloseWrite(err)
}
// Pipe creates a synchronous in-memory pipe.
@@ -189,10 +172,10 @@ func (w *PipeWriter) CloseWithError(err error) error {
// Parallel calls to Read and parallel calls to Write are also safe:
// the individual calls will be gated sequentially.
func Pipe() (*PipeReader, *PipeWriter) {
- p := new(pipe)
- p.rwait.L = &p.l
- p.wwait.L = &p.l
- r := &PipeReader{p}
- w := &PipeWriter{p}
- return r, w
+ p := &pipe{
+ wrCh: make(chan []byte),
+ rdCh: make(chan int),
+ done: make(chan struct{}),
+ }
+ return &PipeReader{p}, &PipeWriter{p}
}
diff --git a/src/io/pipe_test.go b/src/io/pipe_test.go
index 95930e86a4..2bf95f03e3 100644
--- a/src/io/pipe_test.go
+++ b/src/io/pipe_test.go
@@ -5,8 +5,11 @@
package io_test
import (
+ "bytes"
"fmt"
. "io"
+ "sort"
+ "strings"
"testing"
"time"
)
@@ -312,3 +315,84 @@ func TestWriteAfterWriterClose(t *testing.T) {
t.Errorf("got: %q; want: %q", writeErr, ErrClosedPipe)
}
}
+
+func TestPipeConcurrent(t *testing.T) {
+ const (
+ input = "0123456789abcdef"
+ count = 8
+ readSize = 2
+ )
+
+ t.Run("Write", func(t *testing.T) {
+ r, w := Pipe()
+
+ for i := 0; i < count; i++ {
+ go func() {
+ time.Sleep(time.Millisecond) // Increase probability of race
+ if n, err := w.Write([]byte(input)); n != len(input) || err != nil {
+ t.Errorf("Write() = (%d, %v); want (%d, nil)", n, err, len(input))
+ }
+ }()
+ }
+
+ buf := make([]byte, count*len(input))
+ for i := 0; i < len(buf); i += readSize {
+ if n, err := r.Read(buf[i : i+readSize]); n != readSize || err != nil {
+ t.Errorf("Read() = (%d, %v); want (%d, nil)", n, err, readSize)
+ }
+ }
+
+ // Since each Write is fully gated, if multiple Read calls were needed,
+ // the contents of Write should still appear together in the output.
+ got := string(buf)
+ want := strings.Repeat(input, count)
+ if got != want {
+ t.Errorf("got: %q; want: %q", got, want)
+ }
+ })
+
+ t.Run("Read", func(t *testing.T) {
+ r, w := Pipe()
+
+ c := make(chan []byte, count*len(input)/readSize)
+ for i := 0; i < cap(c); i++ {
+ go func() {
+ time.Sleep(time.Millisecond) // Increase probability of race
+ buf := make([]byte, readSize)
+ if n, err := r.Read(buf); n != readSize || err != nil {
+ t.Errorf("Read() = (%d, %v); want (%d, nil)", n, err, readSize)
+ }
+ c <- buf
+ }()
+ }
+
+ for i := 0; i < count; i++ {
+ if n, err := w.Write([]byte(input)); n != len(input) || err != nil {
+ t.Errorf("Write() = (%d, %v); want (%d, nil)", n, err, len(input))
+ }
+ }
+
+ // Since each read is independent, the only guarantee about the output
+ // is that it is a permutation of the input in readSized groups.
+ got := make([]byte, 0, count*len(input))
+ for i := 0; i < cap(c); i++ {
+ got = append(got, (<-c)...)
+ }
+ got = sortBytesInGroups(got, readSize)
+ want := bytes.Repeat([]byte(input), count)
+ want = sortBytesInGroups(want, readSize)
+ if string(got) != string(want) {
+ t.Errorf("got: %q; want: %q", got, want)
+ }
+ })
+}
+
+func sortBytesInGroups(b []byte, n int) []byte {
+ var groups [][]byte
+ for len(b) > 0 {
+ groups = append(groups, b[:n])
+ b = b[n:]
+ }
+ sort.Slice(groups, func(i, j int) bool { return bytes.Compare(groups[i], groups[j]) < 0 })
+ return bytes.Join(groups, nil)
+}