From 0c33cee0c849020522fc309f396b9ccbd658ccfe Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Tue, 9 May 2017 18:47:31 -0400 Subject: [PATCH] rpc: use sync.Pools for snappy readers and writers Creating a snappy.{Reader,Writer} on every compression/decompression is fairly expensive. This reduced the performance hit for enabling compression in a local 3-node cluster from 38% to 3%. See #14721 --- pkg/rpc/snappy.go | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/pkg/rpc/snappy.go b/pkg/rpc/snappy.go index d0b571d5ad6c..4169f0ff687c 100644 --- a/pkg/rpc/snappy.go +++ b/pkg/rpc/snappy.go @@ -17,19 +17,33 @@ package rpc import ( "io" "io/ioutil" + "sync" "github.com/golang/snappy" ) +// NB: The grpc.{Compressor,Decompressor} implementations need to be goroutine +// safe as multiple goroutines may be using the same compressor/decompressor +// for different streams on the same connection. +var snappyWriterPool sync.Pool +var snappyReaderPool sync.Pool + type snappyCompressor struct { } func (snappyCompressor) Do(w io.Writer, p []byte) error { - z := snappy.NewBufferedWriter(w) - if _, err := z.Write(p); err != nil { - return err + z, ok := snappyWriterPool.Get().(*snappy.Writer) + if !ok { + z = snappy.NewBufferedWriter(w) + } else { + z.Reset(w) + } + _, err := z.Write(p) + if err == nil { + err = z.Flush() } - return z.Close() + snappyWriterPool.Put(z) + return err } func (snappyCompressor) Type() string { @@ -40,7 +54,15 @@ type snappyDecompressor struct { } func (snappyDecompressor) Do(r io.Reader) ([]byte, error) { - return ioutil.ReadAll(snappy.NewReader(r)) + z, ok := snappyReaderPool.Get().(*snappy.Reader) + if !ok { + z = snappy.NewReader(r) + } else { + z.Reset(r) + } + b, err := ioutil.ReadAll(z) + snappyReaderPool.Put(z) + return b, err } func (snappyDecompressor) Type() string {