Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: support capturing to and replaying from shared storage #58862

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 156 additions & 29 deletions pkg/executor/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"io"
"maps"
"net"
"net/http"
"net/url"
Expand All @@ -27,6 +28,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/parser/mysql"
Expand All @@ -40,8 +42,10 @@ import (

// The keys for the mocked data that stored in context. They are only used for test.
type tiproxyAddrKeyType struct{}
type trafficPathKeyType struct{}

var tiproxyAddrKey tiproxyAddrKeyType
var trafficPathKey trafficPathKeyType

type trafficJob struct {
Instance string `json:"-"` // not passed from TiProxy
Expand All @@ -55,6 +59,16 @@ type trafficJob struct {

const (
startTimeKey = "start-time"
outputKey = "output"
inputKey = "input"

capturePath = "/api/traffic/capture"
replayPath = "/api/traffic/replay"
cancelPath = "/api/traffic/cancel"
showPath = "/api/traffic/show"

sharedStorageTimeout = 10 * time.Second
filePrefix = "tiproxy-"
)

// TrafficCaptureExec sends capture traffic requests to TiProxy.
Expand All @@ -66,8 +80,16 @@ type TrafficCaptureExec struct {
// Next implements the Executor Next interface.
func (e *TrafficCaptureExec) Next(ctx context.Context, _ *chunk.Chunk) error {
e.Args[startTimeKey] = time.Now().Format(time.RFC3339)
form := getForm(e.Args)
_, err := request(ctx, e.BaseExecutor, strings.NewReader(form), http.MethodPost, "api/traffic/capture")
addrs, err := getTiProxyAddrs(ctx)
if err != nil {
return errors.Wrapf(err, "get tiproxy addresses failed")
}
// For shared storage, append a suffix to the output path for each TiProxy so that they won't write to the same path.
readers, err := formReader4Capture(e.Args, len(addrs))
if err != nil {
return err
}
_, err = request(ctx, addrs, readers, http.MethodPost, capturePath)
return err
}

Expand All @@ -80,8 +102,31 @@ type TrafficReplayExec struct {
// Next implements the Executor Next interface.
func (e *TrafficReplayExec) Next(ctx context.Context, _ *chunk.Chunk) error {
e.Args[startTimeKey] = time.Now().Format(time.RFC3339)
form := getForm(e.Args)
_, err := request(ctx, e.BaseExecutor, strings.NewReader(form), http.MethodPost, "api/traffic/replay")
addrs, err := getTiProxyAddrs(ctx)
if err != nil {
return errors.Wrapf(err, "get tiproxy addresses failed")
}
// For shared storage, read the sub-direcotires from the input path and assign each sub-directory to a TiProxy instance.
formCtx, cancel := context.WithTimeout(ctx, sharedStorageTimeout)
readers, err := formReader4Replay(formCtx, e.Args, len(addrs))
cancel()
if err != nil {
return err
}
var warning error
readerNum, tiproxyNum := len(readers), len(addrs)
if readerNum > tiproxyNum {
warning = errors.Errorf("tiproxy instances number (%d) is less than input paths number (%d), some paths won't be replayed", tiproxyNum, readerNum)
readers = readers[:tiproxyNum]
} else if readerNum < tiproxyNum {
warning = errors.Errorf("tiproxy instances number (%d) is greater than input paths number (%d), some instances won't replay", tiproxyNum, readerNum)
addrs = addrs[:readerNum]
}
if warning != nil {
e.Ctx().GetSessionVars().StmtCtx.AppendWarning(warning)
logutil.Logger(ctx).Warn("tiproxy instances and input paths don't match", zap.String("input", e.Args[inputKey]), zap.Error(warning))
}
_, err = request(ctx, addrs, readers, http.MethodPost, replayPath)
return err
}

Expand All @@ -91,8 +136,12 @@ type TrafficCancelExec struct {
}

// Next implements the Executor Next interface.
func (e *TrafficCancelExec) Next(ctx context.Context, _ *chunk.Chunk) error {
_, err := request(ctx, e.BaseExecutor, nil, http.MethodPost, "api/traffic/cancel")
func (*TrafficCancelExec) Next(ctx context.Context, _ *chunk.Chunk) error {
addrs, err := getTiProxyAddrs(ctx)
if err != nil {
return errors.Wrapf(err, "get tiproxy addresses failed")
}
_, err = request(ctx, addrs, nil, http.MethodPost, cancelPath)
return err
}

Expand All @@ -108,7 +157,11 @@ func (e *TrafficShowExec) Open(ctx context.Context) error {
if err := e.BaseExecutor.Open(ctx); err != nil {
return err
}
resps, err := request(ctx, e.BaseExecutor, nil, http.MethodGet, "api/traffic/show")
addrs, err := getTiProxyAddrs(ctx)
if err != nil {
return errors.Wrapf(err, "get tiproxy addresses failed")
}
resps, err := request(ctx, addrs, nil, http.MethodGet, showPath)
if err != nil {
return err
}
Expand Down Expand Up @@ -154,30 +207,23 @@ func (e *TrafficShowExec) Next(ctx context.Context, req *chunk.Chunk) error {
return nil
}

func request(ctx context.Context, exec exec.BaseExecutor, reader io.Reader, method, path string) (map[string]string, error) {
addrs, err := getTiProxyAddrs(ctx)
if err != nil {
return nil, err
}
func request(ctx context.Context, addrs []string, readers []io.Reader, method, path string) (map[string]string, error) {
resps := make(map[string]string, len(addrs))
for _, addr := range addrs {
resp, requestErr := requestOne(method, addr, path, reader)
if requestErr != nil {
// Let's send requests to all the instances even if some fail.
exec.Ctx().GetSessionVars().StmtCtx.AppendError(requestErr)
logutil.Logger(ctx).Error("traffic request to tiproxy failed", zap.String("method", method),
zap.String("path", path), zap.String("addr", addr), zap.String("resp", resp), zap.Error(requestErr))
if err == nil {
err = requestErr
}
} else {
resps[addr] = resp
for i, addr := range addrs {
var reader io.Reader
if readers != nil && i < len(readers) {
reader = readers[i]
}
resp, err := requestOne(method, addr, path, reader)
if err != nil {
logutil.Logger(ctx).Error("traffic request to tiproxy failed", zap.String("path", path), zap.String("addr", addr),
zap.String("resp", resp), zap.Error(err))
return resps, errors.Wrapf(err, "request to tiproxy '%s' failed", addr)
}
resps[addr] = resp
}
if err == nil {
logutil.Logger(ctx).Info("traffic request to tiproxy succeeds", zap.Strings("addrs", addrs), zap.String("path", path))
}
return resps, err
logutil.Logger(ctx).Info("traffic request to tiproxy succeeds", zap.Strings("addrs", addrs), zap.String("path", path))
return resps, nil
}

func getTiProxyAddrs(ctx context.Context) ([]string, error) {
Expand All @@ -202,7 +248,7 @@ func getTiProxyAddrs(ctx context.Context) ([]string, error) {
}

func requestOne(method, addr, path string, rd io.Reader) (string, error) {
url := fmt.Sprintf("%s://%s/%s", util.InternalHTTPSchema(), addr, path)
url := fmt.Sprintf("%s://%s%s", util.InternalHTTPSchema(), addr, path)
req, err := http.NewRequest(method, url, rd)
if err != nil {
return "", errors.Trace(err)
Expand Down Expand Up @@ -242,3 +288,84 @@ func parseTime(ctx context.Context, exec exec.BaseExecutor, timeStr string) type
}
return types.NewTime(types.FromGoTime(t), mysql.TypeDatetime, types.MaxFsp)
}

func formReader4Capture(args map[string]string, tiproxyNum int) ([]io.Reader, error) {
output, ok := args[outputKey]
if !ok || len(output) == 0 {
return nil, errors.New("the output path for capture must be specified")
}
u, err := url.Parse(output)
if err != nil {
return nil, errors.Wrapf(err, "parse output path failed")
}
readers := make([]io.Reader, tiproxyNum)
switch u.Scheme {
case "", "file", "local":
form := getForm(args)
for i := 0; i < tiproxyNum; i++ {
readers[i] = strings.NewReader(form)
}
default:
for i := 0; i < tiproxyNum; i++ {
m := maps.Clone(args)
m[outputKey] = u.JoinPath(fmt.Sprintf("%s%d", filePrefix, i)).String()
form := getForm(m)
readers[i] = strings.NewReader(form)
}
}
return readers, nil
}

func formReader4Replay(ctx context.Context, args map[string]string, tiproxyNum int) ([]io.Reader, error) {
input, ok := args[inputKey]
if !ok || len(input) == 0 {
return nil, errors.New("the input path for replay must be specified")
}
u, err := storage.ParseRawURL(input)
if err != nil {
return nil, errors.Wrapf(err, "parse input path failed")
}
if storage.IsLocal(u) {
readers := make([]io.Reader, tiproxyNum)
form := getForm(args)
for i := 0; i < tiproxyNum; i++ {
readers[i] = strings.NewReader(form)
}
return readers, nil
}

names := make([]string, 0, tiproxyNum)
if mockNames := ctx.Value(trafficPathKey); mockNames != nil {
names = mockNames.([]string)
} else {
backend, err := storage.ParseBackendFromURL(u, nil)
if err != nil {
return nil, errors.Wrapf(err, "parse backend from the input path failed")
}
store, err := storage.NewWithDefaultOpt(ctx, backend)
if err != nil {
return nil, errors.Wrapf(err, "create storage for input failed")
}
defer store.Close()
err = store.WalkDir(ctx, &storage.WalkOption{
ObjPrefix: filePrefix,
}, func(name string, _ int64) error {
names = append(names, name)
return nil
})
if err != nil {
return nil, errors.Wrapf(err, "walk input path failed")
}
}
if len(names) == 0 {
return nil, errors.New("no replay files found in the input path")
}
readers := make([]io.Reader, 0, len(names))
for _, name := range names {
m := maps.Clone(args)
m[inputKey] = u.JoinPath(name).String()
form := getForm(m)
readers = append(readers, strings.NewReader(form))
}
return readers, nil
}
Loading