Skip to content

Commit

Permalink
Merge #138368
Browse files Browse the repository at this point in the history
138368: all: host a DRPC server and use it for node-node BatchRequests r=cthumuluru-crdb a=cthumuluru-crdb

drpc (github.com/storj/drpc) is a lightweight, drop-in replacement for gRPC.  Initial benchmark results from the Perf Efficiency team, using a simple ping service (github.com/cockroachlabs/perf-efficiency-team/tree/main/rpctoy),
demonstrate that switching from gRPC to drpc delivers significant performance improvements.

```
$ benchdiff --old lastmerge ./pkg/sql/tests -b -r 'Sysbench/SQL/3node/oltp_read_write' -d 1000x -c 10

name                                   old time/op    new time/op    delta
Sysbench/SQL/3node/oltp_read_write-24    14.0ms ± 2%    14.1ms ± 1%     ~     (p=0.063 n=10+10)

name                                   old alloc/op   new alloc/op   delta
Sysbench/SQL/3node/oltp_read_write-24    2.55MB ± 1%    2.26MB ± 2%  -11.39%  (p=0.000 n=9+9)

name                                   old allocs/op  new allocs/op  delta
Sysbench/SQL/3node/oltp_read_write-24     17.2k ± 1%     11.7k ± 0%  -32.30%  (p=0.000 n=10+8)
```

This pull request introduces experimental support for using drpc to serve BatchRequests. The feature is disabled by default but can be enabled explicitly by setting the `COCKROACH_EXPERIMENTAL_DRPC_ENABLED` environment variable or automatically in CRDB test builds. The prototype lacks key features, such as authorization checks, interceptors, etc, and is not production-ready.

When DRPC is disabled, sever, muxer and listener mock implementations are injected. If it is enabled real implementations are used. DRPC is only used for BatchRequests. This PR also added support for pooled streaming unary operations similar to #136648.

Added unit tests to ensure:
- CRDB nodes can server BatchRequests.
- Simple queries when DRPC is enabled or disabled.

Note: drpc protocol generation is not yet integrated into the build process. Protobuf files were generated manually and committed directly to the repository.

This PR has a few parts:

- it adds a copy of drpc generated code for a service that supports unary BatchRequest.
- in `rpc.NewServerEx`, we now also set up a drpc server and return it up the stack.
- Registers BatchRequest service with both DRPC and gRPC. If DRPC is enabled, it is
used to make BatchRequests, if not gRPC is used.
- in our listener setup, we configure cmux to match on the `DRPC!!!1` header and serve the resulting listener on the drpc server. The drpc example uses `drpcmigrate.ListenMux` instead of cmux; we keep cmux but must make sure the header is read off the connection before delegating (which the drpxmigrate mux would have done for us).
- if using TLS, wrap the drpc listener with TLS config and use it to servr DRPC requests.
- add support to reuse drpc streams across unary BatchRequests. However, the DRPC implementation is not on par with the gRPC counterpart in terms of allocation optimizations.

Closes #136794.

Epic: CRDB-42584
Release note: None

Co-authored-by: Chandra Thumuluru <[email protected]>
Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
3 people committed Jan 14, 2025
2 parents f87bf4d + b0ea57c commit f4ad047
Show file tree
Hide file tree
Showing 26 changed files with 900 additions and 53 deletions.
20 changes: 20 additions & 0 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -9302,6 +9302,16 @@ def go_deps():
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/zeebo/assert/com_github_zeebo_assert-v1.3.0.zip",
],
)
go_repository(
name = "com_github_zeebo_errs",
build_file_proto_mode = "disable_global",
importpath = "github.com/zeebo/errs",
sha256 = "d2fa293e275c21bfb413e2968d79036931a55f503d8b62381563ed189b523cd2",
strip_prefix = "github.com/zeebo/[email protected]",
urls = [
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/zeebo/errs/com_github_zeebo_errs-v1.2.2.zip",
],
)
go_repository(
name = "com_github_zeebo_xxh3",
build_file_proto_mode = "disable_global",
Expand Down Expand Up @@ -11435,6 +11445,16 @@ def go_deps():
"https://storage.googleapis.com/cockroach-godeps/gomod/rsc.io/sampler/io_rsc_sampler-v1.3.0.zip",
],
)
go_repository(
name = "io_storj_drpc",
build_file_proto_mode = "disable_global",
importpath = "storj.io/drpc",
sha256 = "e297ccead2763d354959a3c04b0c9c27c9c84c99d129f216ec07da663ee0091a",
strip_prefix = "storj.io/[email protected]",
urls = [
"https://storage.googleapis.com/cockroach-godeps/gomod/storj.io/drpc/io_storj_drpc-v0.0.34.zip",
],
)
go_repository(
name = "io_vitess_vitess",
build_file_proto_mode = "disable_global",
Expand Down
2 changes: 2 additions & 0 deletions build/bazelutil/distdir_files.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,7 @@ DISTDIR_FILES = {
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/z-division/go-zookeeper/com_github_z_division_go_zookeeper-v0.0.0-20190128072838-6d7457066b9b.zip": "b0a67a3bb3cfbb1be18618b84b02588979795966e040f18c5bb4be036888cabd",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/zabawaba99/go-gitignore/com_github_zabawaba99_go_gitignore-v0.0.0-20200117185801-39e6bddfb292.zip": "6c837b93e1c73e53123941c8e866de1deae6b645cc49a7d30d493c146178f8e8",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/zeebo/assert/com_github_zeebo_assert-v1.3.0.zip": "1f01421d74ff37cb8247988155be9e6877d336029bcd887a1d035fd32d7ab6ae",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/zeebo/errs/com_github_zeebo_errs-v1.2.2.zip": "d2fa293e275c21bfb413e2968d79036931a55f503d8b62381563ed189b523cd2",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/zeebo/xxh3/com_github_zeebo_xxh3-v1.0.2.zip": "190e5ef1f672e9321a1580bdd31c6440fde6044ca8168d2b489cf50cdc4f58a6",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/zenazn/goji/com_github_zenazn_goji-v0.9.0.zip": "0807a255d9d715d18427a6eedd8e4f5a22670b09e5f45fddd229c1ae38da25a9",
"https://storage.googleapis.com/cockroach-godeps/gomod/gitlab.com/golang-commonmark/html/com_gitlab_golang_commonmark_html-v0.0.0-20191124015941-a22733972181.zip": "f2ba8985dc9d6be347a17d9200a0be0cee5ab3bce4dc601c0651a77ef2bbffc3",
Expand Down Expand Up @@ -1198,6 +1199,7 @@ DISTDIR_FILES = {
"https://storage.googleapis.com/cockroach-godeps/gomod/sigs.k8s.io/structured-merge-diff/v4/io_k8s_sigs_structured_merge_diff_v4-v4.1.2.zip": "b32af97dadd79179a8f62aaf4ef1e0562e051be77053a60c7a4e724a5cbd00ce",
"https://storage.googleapis.com/cockroach-godeps/gomod/sigs.k8s.io/yaml/io_k8s_sigs_yaml-v1.2.0.zip": "55ed08c5df448a033bf7e2c2912d4daa85b856a05c854b0c87ccc85c7f3fbfc7",
"https://storage.googleapis.com/cockroach-godeps/gomod/sourcegraph.com/sourcegraph/appdash/com_sourcegraph_sourcegraph_appdash-v0.0.0-20190731080439-ebfcffb1b5c0.zip": "bd2492d9db05362c2fecd0b3d0f6002c89a6d90d678fb93b4158298ab883736f",
"https://storage.googleapis.com/cockroach-godeps/gomod/storj.io/drpc/io_storj_drpc-v0.0.34.zip": "e297ccead2763d354959a3c04b0c9c27c9c84c99d129f216ec07da663ee0091a",
"https://storage.googleapis.com/public-bazel-artifacts/bazel/88ef31b429631b787ceb5e4556d773b20ad797c8.zip": "92a89a2bbe6c6db2a8b87da4ce723aff6253656e8417f37e50d362817c39b98b",
"https://storage.googleapis.com/public-bazel-artifacts/bazel/bazel-gazelle-v0.39.1.tar.gz": "b760f7fe75173886007f7c2e616a21241208f3d90e8657dc65d36a771e916b6a",
"https://storage.googleapis.com/public-bazel-artifacts/bazel/bazel-lib-v1.42.3.tar.gz": "d0529773764ac61184eb3ad3c687fb835df5bee01afedf07f0cf1a45515c96bc",
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ require (
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1
google.golang.org/grpc v1.56.3
google.golang.org/protobuf v1.35.1
storj.io/drpc v0.0.34
)

// If any of the following dependencies get updated as a side-effect
Expand Down Expand Up @@ -432,6 +433,7 @@ require (
github.com/twpayne/go-kml v1.5.2 // indirect
github.com/urfave/cli/v2 v2.3.0 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
github.com/zeebo/errs v1.2.2 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
gitlab.com/golang-commonmark/html v0.0.0-20191124015941-a22733972181 // indirect
gitlab.com/golang-commonmark/linkify v0.0.0-20191026162114-a0c2df6c8f82 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2388,6 +2388,8 @@ github.com/zabawaba99/go-gitignore v0.0.0-20200117185801-39e6bddfb292 h1:vpcCVk+
github.com/zabawaba99/go-gitignore v0.0.0-20200117185801-39e6bddfb292/go.mod h1:qcqv8IHwbR0JmjY1LZy4PeytlwxDPn1vUkjX7Wq0VaY=
github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ=
github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0=
github.com/zeebo/errs v1.2.2 h1:5NFypMTuSdoySVTqlNs1dEoU21QVamMQJxW/Fii5O7g=
github.com/zeebo/errs v1.2.2/go.mod h1:sgbWHsvVuTPHcqJJGQ1WhI5KbWlHYz+2+2C/LSEtCw4=
github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0=
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q=
Expand Down Expand Up @@ -3351,3 +3353,5 @@ sigs.k8s.io/structured-merge-diff/v4 v4.1.2/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZa
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
storj.io/drpc v0.0.34 h1:q9zlQKfJ5A7x8NQNFk8x7eKUF78FMhmAbZLnFK+og7I=
storj.io/drpc v0.0.34/go.mod h1:Y9LZaa8esL1PW2IDMqJE7CFSNq7d5bQ3RI7mGPtmKMg=
12 changes: 9 additions & 3 deletions pkg/kv/kvpb/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,21 @@ load(":gen.bzl", "batch_gen")
go_library(
name = "kvpb",
srcs = [
":gen-batch-generated", # keep
":gen-errordetailtype-stringer", # keep
":gen-method-stringer", # keep
"ambiguous_result_error.go",
"api.go",
# DRPC protobuf file (api_drpc.pb.go) is currently generated manually.
# TODO (chandrat): Remove this once DRPC protobuf generation is
# integrated into the build process.
"api_drpc_hacky.go",
"batch.go",
"data.go",
"errors.go",
"method.go",
"node_decommissioned_error.go",
"replica_unavailable_error.go",
":gen-batch-generated", # keep
":gen-errordetailtype-stringer", # keep
":gen-method-stringer", # keep
],
embed = [":kvpb_go_proto"],
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvpb",
Expand Down Expand Up @@ -46,6 +50,8 @@ go_library(
"@com_github_gogo_protobuf//types",
"@com_github_gogo_status//:status",
"@com_github_golang_mock//gomock", # keep
"@io_storj_drpc//:drpc",
"@io_storj_drpc//drpcerr",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//metadata", # keep
],
Expand Down
207 changes: 207 additions & 0 deletions pkg/kv/kvpb/api_drpc_hacky.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
// Copyright 2025 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

// This file was manually generated with the DRPC protogen plugin using a dummy
// `api.proto` that includes a subset of relevant service methods.
//
// For instance, to generate this file, following proto file was used:
//
// -- api.proto -- begin --
// syntax = "proto3";
// package cockroach.kv.kvpb;
// option go_package = "github.com/cockroachdb/cockroach/pkg/kv/kvpb";
// service Batch {
// rpc Batch (BatchRequest) returns (BatchResponse) {}
// rpc BatchStream (stream BatchRequest) returns (stream BatchResponse) {}
// }
// message BatchRequest{}
// message BatchResponse{}
// -- api.proto -- end --
//
// NB: The use of empty BatchRequest and BatchResponse messages is a deliberate
// decision to avoid dependencies.
//
//
// To generate this file using DRPC protogen plugin from the dummy `api.proto`
// defined above, use the following command:
//
// ```
// protoc --gogo_out=paths=source_relative:. \
// --go-drpc_out=paths=source_relative,protolib=github.com/gogo/protobuf:. \
// api.proto
// ```
//
// NB: Make sure you have `protoc` installed and `protoc-gen-gogoroach` is
// built from $COCKROACH_SRC/pkg/cmd/protoc-gen-gogoroach.
//
// This code-gen should be automated as part of productionizing drpc.

package kvpb

import (
"context"

"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
"storj.io/drpc"
"storj.io/drpc/drpcerr"
)

type drpcEncoding_File_api_proto struct{}

func (drpcEncoding_File_api_proto) Marshal(msg drpc.Message) ([]byte, error) {
return protoutil.Marshal(msg.(protoutil.Message))
}

func (drpcEncoding_File_api_proto) Unmarshal(buf []byte, msg drpc.Message) error {
return protoutil.Unmarshal(buf, msg.(protoutil.Message))
}

type DRPCBatchClient interface {
DRPCConn() drpc.Conn

Batch(ctx context.Context, in *BatchRequest) (*BatchResponse, error)
BatchStream(ctx context.Context) (DRPCBatch_BatchStreamClient, error)
}

type drpcBatchClient struct {
cc drpc.Conn
}

func NewDRPCBatchClient(cc drpc.Conn) DRPCBatchClient {
return &drpcBatchClient{cc}
}

func (c *drpcBatchClient) DRPCConn() drpc.Conn { return c.cc }

func (c *drpcBatchClient) Batch(ctx context.Context, in *BatchRequest) (*BatchResponse, error) {
out := new(BatchResponse)
err := c.cc.Invoke(ctx, "/cockroach.kv.kvpb.Batch/Batch", drpcEncoding_File_api_proto{}, in, out)
if err != nil {
return nil, err
}
return out, nil
}

func (c *drpcBatchClient) BatchStream(ctx context.Context) (DRPCBatch_BatchStreamClient, error) {
stream, err := c.cc.NewStream(ctx, "/cockroach.kv.kvpb.Batch/BatchStream", drpcEncoding_File_api_proto{})
if err != nil {
return nil, err
}
x := &drpcBatch_BatchStreamClient{stream}
return x, nil
}

type DRPCBatch_BatchStreamClient interface {
drpc.Stream
Send(*BatchRequest) error
Recv() (*BatchResponse, error)
}

type drpcBatch_BatchStreamClient struct {
drpc.Stream
}

func (x *drpcBatch_BatchStreamClient) GetStream() drpc.Stream {
return x.Stream
}

func (x *drpcBatch_BatchStreamClient) Send(m *BatchRequest) error {
return x.MsgSend(m, drpcEncoding_File_api_proto{})
}

func (x *drpcBatch_BatchStreamClient) Recv() (*BatchResponse, error) {
m := new(BatchResponse)
if err := x.MsgRecv(m, drpcEncoding_File_api_proto{}); err != nil {
return nil, err
}
return m, nil
}

func (x *drpcBatch_BatchStreamClient) RecvMsg(m *BatchResponse) error {
return x.MsgRecv(m, drpcEncoding_File_api_proto{})
}

type DRPCBatchServer interface {
Batch(context.Context, *BatchRequest) (*BatchResponse, error)
BatchStream(DRPCBatch_BatchStreamStream) error
}

type DRPCBatchUnimplementedServer struct{}

func (s *DRPCBatchUnimplementedServer) Batch(
context.Context, *BatchRequest,
) (*BatchResponse, error) {
return nil, drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
}

func (s *DRPCBatchUnimplementedServer) BatchStream(DRPCBatch_BatchStreamStream) error {
return drpcerr.WithCode(errors.New("Unimplemented"), drpcerr.Unimplemented)
}

type DRPCBatchDescription struct{}

func (DRPCBatchDescription) NumMethods() int { return 2 }

func (DRPCBatchDescription) Method(
n int,
) (string, drpc.Encoding, drpc.Receiver, interface{}, bool) {
switch n {
case 0:
return "/cockroach.kv.kvpb.Batch/Batch", drpcEncoding_File_api_proto{},
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
return srv.(DRPCBatchServer).
Batch(
ctx,
in1.(*BatchRequest),
)
}, DRPCBatchServer.Batch, true
case 1:
return "/cockroach.kv.kvpb.Batch/BatchStream", drpcEncoding_File_api_proto{},
func(srv interface{}, ctx context.Context, in1, in2 interface{}) (drpc.Message, error) {
return nil, srv.(DRPCBatchServer).
BatchStream(
&drpcBatch_BatchStreamStream{in1.(drpc.Stream)},
)
}, DRPCBatchServer.BatchStream, true
default:
return "", nil, nil, nil, false
}
}

func DRPCRegisterBatch(mux drpc.Mux, impl DRPCBatchServer) error {
return mux.Register(impl, DRPCBatchDescription{})
}

type DRPCBatch_BatchStream interface {
drpc.Stream
SendAndClose(*BatchResponse) error
}

type DRPCBatch_BatchStreamStream interface {
drpc.Stream
Send(*BatchResponse) error
Recv() (*BatchRequest, error)
}

type drpcBatch_BatchStreamStream struct {
drpc.Stream
}

func (x *drpcBatch_BatchStreamStream) Send(m *BatchResponse) error {
return x.MsgSend(m, drpcEncoding_File_api_proto{})
}

func (x *drpcBatch_BatchStreamStream) Recv() (*BatchRequest, error) {
m := new(BatchRequest)
if err := x.MsgRecv(m, drpcEncoding_File_api_proto{}); err != nil {
return nil, err
}
return m, nil
}

func (x *drpcBatch_BatchStreamStream) RecvMsg(m *BatchRequest) error {
return x.MsgRecv(m, drpcEncoding_File_api_proto{})
}
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/loqrecovery/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ func visitNodeWithRetry(
// Note that we use ConnectNoBreaker here to avoid any race with probe
// running on current node and target node restarting. Errors from circuit
// breaker probes could confuse us and present node as unavailable.
conn, err = rpcCtx.GRPCDialNode(addr.String(), node.NodeID, node.Locality, rpc.DefaultClass).ConnectNoBreaker(ctx)
conn, _, err = rpcCtx.GRPCDialNode(addr.String(), node.NodeID, node.Locality, rpc.DefaultClass).ConnectNoBreaker(ctx)
// Nodes would contain dead nodes that we don't need to visit. We can skip
// them and let caller handle incomplete info.
if err != nil {
Expand Down Expand Up @@ -803,7 +803,7 @@ func makeVisitNode(g *gossip.Gossip, loc roachpb.Locality, rpcCtx *rpc.Context)
// Note that we use ConnectNoBreaker here to avoid any race with probe
// running on current node and target node restarting. Errors from circuit
// breaker probes could confuse us and present node as unavailable.
conn, err = rpcCtx.GRPCDialNode(addr.String(), node.NodeID, node.Locality, rpc.DefaultClass).ConnectNoBreaker(ctx)
conn, _, err = rpcCtx.GRPCDialNode(addr.String(), node.NodeID, node.Locality, rpc.DefaultClass).ConnectNoBreaker(ctx)
if err != nil {
if grpcutil.IsClosedConnection(err) {
log.Infof(ctx, "can't dial node n%d because connection is permanently closed: %s",
Expand Down
12 changes: 12 additions & 0 deletions pkg/rpc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ go_library(
"connection_class.go",
"context.go",
"context_testutils.go",
"drpc.go",
"errors.go",
"heartbeat.go",
"keepalive.go",
"metrics.go",
"peer.go",
"peer_drpc.go",
"peer_map.go",
"restricted_internal_client.go",
"settings.go",
Expand All @@ -45,6 +47,7 @@ go_library(
"//pkg/settings/cluster",
"//pkg/ts/tspb",
"//pkg/util",
"//pkg/util/buildutil",
"//pkg/util/circuit",
"//pkg/util/envutil",
"//pkg/util/growstack",
Expand Down Expand Up @@ -72,6 +75,15 @@ go_library(
"@com_github_montanaflynn_stats//:stats",
"@com_github_vividcortex_ewma//:ewma",
"@io_opentelemetry_go_otel//attribute",
"@io_storj_drpc//:drpc",
"@io_storj_drpc//drpcconn",
"@io_storj_drpc//drpcmanager",
"@io_storj_drpc//drpcmigrate",
"@io_storj_drpc//drpcmux",
"@io_storj_drpc//drpcpool",
"@io_storj_drpc//drpcserver",
"@io_storj_drpc//drpcstream",
"@io_storj_drpc//drpcwire",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//backoff",
"@org_golang_google_grpc//codes",
Expand Down
Loading

0 comments on commit f4ad047

Please sign in to comment.