diff options
Diffstat (limited to 'lib/go/thrift/header_transport.go')
-rw-r--r-- | lib/go/thrift/header_transport.go | 42 |
1 files changed, 24 insertions, 18 deletions
diff --git a/lib/go/thrift/header_transport.go b/lib/go/thrift/header_transport.go index f5736df42..5ec045482 100644 --- a/lib/go/thrift/header_transport.go +++ b/lib/go/thrift/header_transport.go @@ -28,7 +28,6 @@ import ( "errors" "fmt" "io" - "io/ioutil" ) // Size in bytes for 32-bit ints. @@ -253,14 +252,14 @@ type THeaderTransport struct { // Reading related variables. reader *bufio.Reader // When frame is detected, we read the frame fully into frameBuffer. - frameBuffer bytes.Buffer + frameBuffer *bytes.Buffer // When it's non-nil, Read should read from frameReader instead of // reader, and EOF error indicates end of frame instead of end of all // transport. frameReader io.ReadCloser // Writing related variables - writeBuffer bytes.Buffer + writeBuffer *bytes.Buffer writeTransforms []THeaderTransformID clientType clientType @@ -370,11 +369,14 @@ func (t *THeaderTransport) ReadFrame(ctx context.Context) error { t.reader.Discard(size32) // Read the frame fully into frameBuffer. - _, err = io.CopyN(&t.frameBuffer, t.reader, int64(frameSize)) + if t.frameBuffer == nil { + t.frameBuffer = getBufFromPool() + } + _, err = io.CopyN(t.frameBuffer, t.reader, int64(frameSize)) if err != nil { return err } - t.frameReader = ioutil.NopCloser(&t.frameBuffer) + t.frameReader = io.NopCloser(t.frameBuffer) // Peek and handle the next 32 bits. buf = t.frameBuffer.Bytes()[:size32] @@ -405,7 +407,7 @@ func (t *THeaderTransport) ReadFrame(ctx context.Context) error { // It closes frameReader, and also resets frame related states. func (t *THeaderTransport) endOfFrame() error { defer func() { - t.frameBuffer.Reset() + returnBufToPool(&t.frameBuffer) t.frameReader = nil }() return t.frameReader.Close() @@ -418,7 +420,7 @@ func (t *THeaderTransport) parseHeaders(ctx context.Context, frameSize uint32) e var err error var meta headerMeta - if err = binary.Read(&t.frameBuffer, binary.BigEndian, &meta); err != nil { + if err = binary.Read(t.frameBuffer, binary.BigEndian, &meta); err != nil { return err } frameSize -= headerMetaSize @@ -432,7 +434,7 @@ func (t *THeaderTransport) parseHeaders(ctx context.Context, frameSize uint32) e ) } headerBuf := NewTMemoryBuffer() - _, err = io.CopyN(headerBuf, &t.frameBuffer, headerLength) + _, err = io.CopyN(headerBuf, t.frameBuffer, headerLength) if err != nil { return err } @@ -454,7 +456,7 @@ func (t *THeaderTransport) parseHeaders(ctx context.Context, frameSize uint32) e } if transformCount > 0 { reader := NewTransformReaderWithCapacity( - &t.frameBuffer, + t.frameBuffer, int(transformCount), ) t.frameReader = reader @@ -569,16 +571,19 @@ func (t *THeaderTransport) Read(p []byte) (read int, err error) { // // You need to call Flush to actually write them to the transport. func (t *THeaderTransport) Write(p []byte) (int, error) { + if t.writeBuffer == nil { + t.writeBuffer = getBufFromPool() + } return t.writeBuffer.Write(p) } // Flush writes the appropriate header and the write buffer to the underlying transport. func (t *THeaderTransport) Flush(ctx context.Context) error { - if t.writeBuffer.Len() == 0 { + if t.writeBuffer == nil || t.writeBuffer.Len() == 0 { return nil } - defer t.writeBuffer.Reset() + defer returnBufToPool(&t.writeBuffer) switch t.clientType { default: @@ -628,24 +633,25 @@ func (t *THeaderTransport) Flush(ctx context.Context) error { } } - var payload bytes.Buffer + payload := getBufFromPool() + defer returnBufToPool(&payload) meta := headerMeta{ MagicFlags: THeaderHeaderMagic + t.Flags&THeaderFlagsMask, SequenceID: t.SequenceID, HeaderLength: uint16(headers.Len() / 4), } - if err := binary.Write(&payload, binary.BigEndian, meta); err != nil { + if err := binary.Write(payload, binary.BigEndian, meta); err != nil { return NewTTransportExceptionFromError(err) } - if _, err := io.Copy(&payload, headers); err != nil { + if _, err := io.Copy(payload, headers); err != nil { return NewTTransportExceptionFromError(err) } - writer, err := NewTransformWriter(&payload, t.writeTransforms) + writer, err := NewTransformWriter(payload, t.writeTransforms) if err != nil { return NewTTransportExceptionFromError(err) } - if _, err := io.Copy(writer, &t.writeBuffer); err != nil { + if _, err := io.Copy(writer, t.writeBuffer); err != nil { return NewTTransportExceptionFromError(err) } if err := writer.Close(); err != nil { @@ -659,7 +665,7 @@ func (t *THeaderTransport) Flush(ctx context.Context) error { return NewTTransportExceptionFromError(err) } // Then write the payload - if _, err := io.Copy(t.transport, &payload); err != nil { + if _, err := io.Copy(t.transport, payload); err != nil { return NewTTransportExceptionFromError(err) } @@ -671,7 +677,7 @@ func (t *THeaderTransport) Flush(ctx context.Context) error { } fallthrough case clientUnframedBinary, clientUnframedCompact: - if _, err := io.Copy(t.transport, &t.writeBuffer); err != nil { + if _, err := io.Copy(t.transport, t.writeBuffer); err != nil { return NewTTransportExceptionFromError(err) } } |