diff --git a/pkg/rpc/snappy.go b/pkg/rpc/snappy.go index d0b571d5ad6c..4169f0ff687c 100644 --- a/pkg/rpc/snappy.go +++ b/pkg/rpc/snappy.go @@ -17,19 +17,33 @@ package rpc import ( "io" "io/ioutil" + "sync" "github.com/golang/snappy" ) +// NB: The grpc.{Compressor,Decompressor} implementations need to be goroutine +// safe as multiple goroutines may be using the same compressor/decompressor +// for different streams on the same connection. +var snappyWriterPool sync.Pool +var snappyReaderPool sync.Pool + type snappyCompressor struct { } func (snappyCompressor) Do(w io.Writer, p []byte) error { - z := snappy.NewBufferedWriter(w) - if _, err := z.Write(p); err != nil { - return err + z, ok := snappyWriterPool.Get().(*snappy.Writer) + if !ok { + z = snappy.NewBufferedWriter(w) + } else { + z.Reset(w) + } + _, err := z.Write(p) + if err == nil { + err = z.Flush() } - return z.Close() + snappyWriterPool.Put(z) + return err } func (snappyCompressor) Type() string { @@ -40,7 +54,15 @@ type snappyDecompressor struct { } func (snappyDecompressor) Do(r io.Reader) ([]byte, error) { - return ioutil.ReadAll(snappy.NewReader(r)) + z, ok := snappyReaderPool.Get().(*snappy.Reader) + if !ok { + z = snappy.NewReader(r) + } else { + z.Reset(r) + } + b, err := ioutil.ReadAll(z) + snappyReaderPool.Put(z) + return b, err } func (snappyDecompressor) Type() string {