From 5d0150f77c0395cb98a632343ba0138571806b80 Mon Sep 17 00:00:00 2001 From: djshow832 Date: Fri, 10 Jan 2025 16:12:07 +0800 Subject: [PATCH 1/5] support s3 --- pkg/executor/traffic.go | 181 ++++++++++++++++++++++++++++++----- pkg/executor/traffic_test.go | 115 ++++++++++++++++++++-- 2 files changed, 262 insertions(+), 34 deletions(-) diff --git a/pkg/executor/traffic.go b/pkg/executor/traffic.go index c16895c152f54..592f149764325 100644 --- a/pkg/executor/traffic.go +++ b/pkg/executor/traffic.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" "io" + "maps" "net" "net/http" "net/url" @@ -27,6 +28,7 @@ import ( "time" "github.com/pingcap/errors" + "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/pkg/domain/infosync" "github.com/pingcap/tidb/pkg/executor/internal/exec" "github.com/pingcap/tidb/pkg/parser/mysql" @@ -40,8 +42,10 @@ import ( // The keys for the mocked data that stored in context. They are only used for test. type tiproxyAddrKeyType struct{} +type trafficPathKeyType struct{} var tiproxyAddrKey tiproxyAddrKeyType +var trafficPathKey trafficPathKeyType type trafficJob struct { Instance string `json:"-"` // not passed from TiProxy @@ -55,6 +59,16 @@ type trafficJob struct { const ( startTimeKey = "start-time" + outputKey = "output" + inputKey = "input" + + capturePath = "/api/traffic/capture" + replayPath = "/api/traffic/replay" + cancelPath = "/api/traffic/cancel" + showPath = "/api/traffic/show" + + sharedStorageTimeout = 10 * time.Second + filePrefix = "tiproxy-" ) // TrafficCaptureExec sends capture traffic requests to TiProxy. @@ -66,8 +80,16 @@ type TrafficCaptureExec struct { // Next implements the Executor Next interface. func (e *TrafficCaptureExec) Next(ctx context.Context, _ *chunk.Chunk) error { e.Args[startTimeKey] = time.Now().Format(time.RFC3339) - form := getForm(e.Args) - _, err := request(ctx, e.BaseExecutor, strings.NewReader(form), http.MethodPost, "api/traffic/capture") + addrs, err := getTiProxyAddrs(ctx) + if err != nil { + return errors.Wrapf(err, "get tiproxy addresses failed") + } + // For shared storage, append a suffix to the output path for each TiProxy so that they won't write to the same path. + readers, err := formReader4Capture(e.Args, len(addrs)) + if err != nil { + return err + } + _, err = request(ctx, addrs, readers, http.MethodPost, capturePath) return err } @@ -80,8 +102,31 @@ type TrafficReplayExec struct { // Next implements the Executor Next interface. func (e *TrafficReplayExec) Next(ctx context.Context, _ *chunk.Chunk) error { e.Args[startTimeKey] = time.Now().Format(time.RFC3339) - form := getForm(e.Args) - _, err := request(ctx, e.BaseExecutor, strings.NewReader(form), http.MethodPost, "api/traffic/replay") + addrs, err := getTiProxyAddrs(ctx) + if err != nil { + return errors.Wrapf(err, "get tiproxy addresses failed") + } + // For shared storage, read the sub-direcotires from the input path and assign each sub-directory to a TiProxy instance. + formCtx, cancel := context.WithTimeout(ctx, sharedStorageTimeout) + readers, err := formReader4Replay(formCtx, e.Args, len(addrs)) + cancel() + if err != nil { + return err + } + var warning error + readerNum, tiproxyNum := len(readers), len(addrs) + if readerNum > tiproxyNum { + warning = errors.Errorf("tiproxy instances number (%d) is less than input paths number (%d), some paths won't be replayed", tiproxyNum, readerNum) + readers = readers[:tiproxyNum] + } else if readerNum < tiproxyNum { + warning = errors.Errorf("tiproxy instances number (%d) is greater than input paths number (%d), some instances won't replay", tiproxyNum, readerNum) + addrs = addrs[:readerNum] + } + if warning != nil { + e.Ctx().GetSessionVars().StmtCtx.AppendWarning(warning) + logutil.Logger(ctx).Warn("tiproxy instances and input paths don't match", zap.String("input", e.Args[inputKey]), zap.Error(warning)) + } + _, err = request(ctx, addrs, readers, http.MethodPost, replayPath) return err } @@ -92,7 +137,11 @@ type TrafficCancelExec struct { // Next implements the Executor Next interface. func (e *TrafficCancelExec) Next(ctx context.Context, _ *chunk.Chunk) error { - _, err := request(ctx, e.BaseExecutor, nil, http.MethodPost, "api/traffic/cancel") + addrs, err := getTiProxyAddrs(ctx) + if err != nil { + return errors.Wrapf(err, "get tiproxy addresses failed") + } + _, err = request(ctx, addrs, nil, http.MethodPost, cancelPath) return err } @@ -108,7 +157,11 @@ func (e *TrafficShowExec) Open(ctx context.Context) error { if err := e.BaseExecutor.Open(ctx); err != nil { return err } - resps, err := request(ctx, e.BaseExecutor, nil, http.MethodGet, "api/traffic/show") + addrs, err := getTiProxyAddrs(ctx) + if err != nil { + return errors.Wrapf(err, "get tiproxy addresses failed") + } + resps, err := request(ctx, addrs, nil, http.MethodGet, showPath) if err != nil { return err } @@ -154,30 +207,24 @@ func (e *TrafficShowExec) Next(ctx context.Context, req *chunk.Chunk) error { return nil } -func request(ctx context.Context, exec exec.BaseExecutor, reader io.Reader, method, path string) (map[string]string, error) { - addrs, err := getTiProxyAddrs(ctx) - if err != nil { - return nil, err - } +func request(ctx context.Context, addrs []string, readers []io.Reader, method, path string) (map[string]string, error) { resps := make(map[string]string, len(addrs)) - for _, addr := range addrs { - resp, requestErr := requestOne(method, addr, path, reader) - if requestErr != nil { - // Let's send requests to all the instances even if some fail. - exec.Ctx().GetSessionVars().StmtCtx.AppendError(requestErr) - logutil.Logger(ctx).Error("traffic request to tiproxy failed", zap.String("method", method), - zap.String("path", path), zap.String("addr", addr), zap.String("resp", resp), zap.Error(requestErr)) - if err == nil { - err = requestErr - } + for i, addr := range addrs { + var reader io.Reader + if readers != nil && i < len(readers) { + reader = readers[i] + } + resp, err := requestOne(method, addr, path, reader) + if err != nil { + logutil.Logger(ctx).Error("traffic request to tiproxy failed", zap.String("path", path), zap.String("addr", addr), + zap.String("resp", resp), zap.Error(err)) + return resps, errors.Wrapf(err, "request to tiproxy '%s' failed", addr) } else { resps[addr] = resp } } - if err == nil { - logutil.Logger(ctx).Info("traffic request to tiproxy succeeds", zap.Strings("addrs", addrs), zap.String("path", path)) - } - return resps, err + logutil.Logger(ctx).Info("traffic request to tiproxy succeeds", zap.Strings("addrs", addrs), zap.String("path", path)) + return resps, nil } func getTiProxyAddrs(ctx context.Context) ([]string, error) { @@ -202,7 +249,7 @@ func getTiProxyAddrs(ctx context.Context) ([]string, error) { } func requestOne(method, addr, path string, rd io.Reader) (string, error) { - url := fmt.Sprintf("%s://%s/%s", util.InternalHTTPSchema(), addr, path) + url := fmt.Sprintf("%s://%s%s", util.InternalHTTPSchema(), addr, path) req, err := http.NewRequest(method, url, rd) if err != nil { return "", errors.Trace(err) @@ -242,3 +289,85 @@ func parseTime(ctx context.Context, exec exec.BaseExecutor, timeStr string) type } return types.NewTime(types.FromGoTime(t), mysql.TypeDatetime, types.MaxFsp) } + +func formReader4Capture(args map[string]string, tiproxyNum int) ([]io.Reader, error) { + output, ok := args[outputKey] + if !ok || len(output) == 0 { + return nil, errors.New("the output path for capture must be specified") + } + u, err := url.Parse(output) + if err != nil { + return nil, errors.Wrapf(err, "parse output path failed") + } + readers := make([]io.Reader, tiproxyNum) + switch u.Scheme { + case "", "file", "local": + form := getForm(args) + for i := 0; i < tiproxyNum; i++ { + readers[i] = strings.NewReader(form) + } + default: + for i := 0; i < tiproxyNum; i++ { + m := maps.Clone(args) + m[outputKey] = u.JoinPath(fmt.Sprintf("%s%d", filePrefix, i)).String() + form := getForm(m) + readers[i] = strings.NewReader(form) + } + } + return readers, nil +} + +func formReader4Replay(ctx context.Context, args map[string]string, tiproxyNum int) ([]io.Reader, error) { + input, ok := args[inputKey] + if !ok || len(input) == 0 { + return nil, errors.New("the input path for replay must be specified") + } + u, err := storage.ParseRawURL(input) + if err != nil { + return nil, errors.Wrapf(err, "parse input path failed") + } + if storage.IsLocal(u) { + readers := make([]io.Reader, tiproxyNum) + form := getForm(args) + for i := 0; i < tiproxyNum; i++ { + readers[i] = strings.NewReader(form) + } + return readers, nil + } + + backend, err := storage.ParseBackendFromURL(u, nil) + if err != nil { + return nil, errors.Wrapf(err, "parse backend from the input path failed") + } + store, err := storage.NewWithDefaultOpt(ctx, backend) + if err != nil { + return nil, errors.Wrapf(err, "create storage for input failed") + } + defer store.Close() + + names := make([]string, 0, tiproxyNum) + if mockNames := ctx.Value(trafficPathKey); mockNames != nil { + names = mockNames.([]string) + } else { + err = store.WalkDir(ctx, &storage.WalkOption{ + ObjPrefix: filePrefix, + }, func(name string, _ int64) error { + names = append(names, name) + return nil + }) + if err != nil { + return nil, errors.Wrapf(err, "walk input path failed") + } + } + if len(names) == 0 { + return nil, errors.New("no replay files found in the input path") + } + readers := make([]io.Reader, 0, len(names)) + for _, name := range names { + m := maps.Clone(args) + m[inputKey] = u.JoinPath(name).String() + form := getForm(m) + readers = append(readers, strings.NewReader(form)) + } + return readers, nil +} diff --git a/pkg/executor/traffic_test.go b/pkg/executor/traffic_test.go index 974bf8df279d7..0872a499bb0c5 100644 --- a/pkg/executor/traffic_test.go +++ b/pkg/executor/traffic_test.go @@ -50,7 +50,7 @@ func TestTrafficForm(t *testing.T) { { sql: "traffic capture to '/tmp' duration='1s' encryption_method='aes' compress=false", method: http.MethodPost, - path: "/api/traffic/capture", + path: capturePath, form: url.Values{ "output": []string{"/tmp"}, "duration": []string{"1s"}, @@ -62,7 +62,7 @@ func TestTrafficForm(t *testing.T) { { sql: "traffic capture to '/tmp' duration='1s'", method: http.MethodPost, - path: "/api/traffic/capture", + path: capturePath, form: url.Values{ "output": []string{"/tmp"}, "duration": []string{"1s"}, @@ -72,7 +72,7 @@ func TestTrafficForm(t *testing.T) { { sql: "traffic replay from '/tmp' user='root' password='123456' speed=1.0 read_only=true", method: http.MethodPost, - path: "/api/traffic/replay", + path: replayPath, form: url.Values{ "input": []string{"/tmp"}, "username": []string{"root"}, @@ -85,7 +85,7 @@ func TestTrafficForm(t *testing.T) { { sql: "traffic replay from '/tmp' user='root'", method: http.MethodPost, - path: "/api/traffic/replay", + path: replayPath, form: url.Values{ "input": []string{"/tmp"}, "username": []string{"root"}, @@ -95,13 +95,13 @@ func TestTrafficForm(t *testing.T) { { sql: "cancel traffic jobs", method: http.MethodPost, - path: "/api/traffic/cancel", + path: cancelPath, form: url.Values{}, }, { sql: "show traffic jobs", method: http.MethodGet, - path: "/api/traffic/show", + path: showPath, form: url.Values{}, }, } @@ -125,21 +125,32 @@ func TestTrafficForm(t *testing.T) { test.form.Add("start-time", actualForm.Get("start-time")) } require.Equal(t, test.form, actualForm, "case %d", i) + require.EqualValues(t, 0, suite.execBuilder.ctx.GetSessionVars().StmtCtx.WarningCount(), "case %d", i) } } func TestTrafficError(t *testing.T) { suite := newTrafficTestSuite(t, 10) ctx := context.TODO() - exec := suite.build(ctx, "cancel traffic jobs") + exec := suite.build(ctx, "traffic capture to 'test://tmp ?' duration='1s'") // no tiproxy m := make(map[string]*infosync.TiProxyServerInfo) tempCtx := context.WithValue(ctx, tiproxyAddrKey, m) require.ErrorContains(t, exec.Next(tempCtx, nil), "no tiproxy server found") - // tiproxy no response + // invalid file path m["127.0.0.1:0"] = &infosync.TiProxyServerInfo{IP: "127.0.0.1", StatusPort: "0"} + require.ErrorContains(t, exec.Next(tempCtx, nil), "parse output path failed") + + // can't connect to s3 + replayCtx, cancel := context.WithCancel(tempCtx) + cancel() + exec = suite.build(replayCtx, "traffic replay from 's3://bucket/tmp' user='root' password='123456'") + require.ErrorContains(t, exec.Next(replayCtx, nil), "context canceled") + + // tiproxy no response + exec = suite.build(tempCtx, "traffic capture to '/tmp' duration='1s'") require.ErrorContains(t, exec.Next(tempCtx, nil), "dial tcp") // tiproxy responds with error @@ -150,6 +161,86 @@ func TestTrafficError(t *testing.T) { require.ErrorContains(t, exec.Next(tempCtx, nil), "500 Internal Server Error") } +func TestCapturePath(t *testing.T) { + tiproxyNum := 3 + handlers := make([]*mockHTTPHandler, 0, tiproxyNum) + servers := make([]*http.Server, 0, tiproxyNum) + ports := make([]int, 0, tiproxyNum) + for i := 0; i < tiproxyNum; i++ { + httpHandler := &mockHTTPHandler{t: t, httpOK: true} + handlers = append(handlers, httpHandler) + server, port := runServer(t, httpHandler) + servers = append(servers, server) + ports = append(ports, port) + } + defer func() { + for _, server := range servers { + server.Close() + } + }() + + ctx := context.TODO() + tempCtx := fillCtxWithTiProxyAddr(ctx, ports) + suite := newTrafficTestSuite(t, 10) + exec := suite.build(ctx, "traffic capture to 's3://bucket/tmp' duration='1s'") + require.NoError(t, exec.Next(tempCtx, nil)) + + for i := 0; i < tiproxyNum; i++ { + httpHandler := handlers[i] + require.Equal(t, http.MethodPost, httpHandler.getMethod()) + require.Equal(t, capturePath, httpHandler.getPath()) + require.Equal(t, fmt.Sprintf("s3://bucket/tmp/%s%d", filePrefix, i), httpHandler.getForm().Get("output"), "case %d", i) + } +} + +func TestReplayPath(t *testing.T) { + tiproxyNum := 2 + handlers := make([]*mockHTTPHandler, 0, tiproxyNum) + servers := make([]*http.Server, 0, tiproxyNum) + ports := make([]int, 0, tiproxyNum) + for i := 0; i < tiproxyNum; i++ { + httpHandler := &mockHTTPHandler{t: t, httpOK: true} + handlers = append(handlers, httpHandler) + server, port := runServer(t, httpHandler) + servers = append(servers, server) + ports = append(ports, port) + } + defer func() { + for _, server := range servers { + server.Close() + } + }() + + tests := [][]string{ + {"tiproxy-0"}, + {"tiproxy-0", "tiproxy-1"}, + {"tiproxy-0", "tiproxy-1", "tiproxy-2"}, + } + + for i, test := range tests { + ctx := context.TODO() + tempCtx := fillCtxWithTiProxyAddr(ctx, ports) + tempCtx = context.WithValue(tempCtx, trafficPathKey, test) + suite := newTrafficTestSuite(t, 10) + exec := suite.build(ctx, "traffic replay from 's3://bucket/tmp' user='root'") + for j := 0; j < tiproxyNum; j++ { + handlers[j].reset() + } + require.NoError(t, exec.Next(tempCtx, nil)) + + for j := 0; j < tiproxyNum; j++ { + httpHandler := handlers[j] + if j < len(test) { + require.Equal(t, http.MethodPost, httpHandler.getMethod()) + require.Equal(t, replayPath, httpHandler.getPath()) + require.Equal(t, fmt.Sprintf("s3://bucket/tmp/%s%d", filePrefix, j), httpHandler.getForm().Get("input"), "case %d-%d", i, j) + } else { + require.Nil(t, httpHandler.getForm(), "case %d-%d", i, j) + } + } + } +} + func TestTrafficShow(t *testing.T) { suite := newTrafficTestSuite(t, 2) ctx := context.TODO() @@ -293,6 +384,14 @@ func (handler *mockHTTPHandler) getPath() string { return handler.path } +func (handler *mockHTTPHandler) reset() { + handler.Lock() + defer handler.Unlock() + handler.form = nil + handler.method = "" + handler.path = "" +} + func (handler *mockHTTPHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { handler.Lock() defer handler.Unlock() From a66c1c2cbc3bba254ca768ef6a6c3c34f005e2be Mon Sep 17 00:00:00 2001 From: djshow832 Date: Fri, 10 Jan 2025 16:26:39 +0800 Subject: [PATCH 2/5] fix linter --- pkg/executor/traffic.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/executor/traffic.go b/pkg/executor/traffic.go index 592f149764325..1232505c444b7 100644 --- a/pkg/executor/traffic.go +++ b/pkg/executor/traffic.go @@ -219,9 +219,8 @@ func request(ctx context.Context, addrs []string, readers []io.Reader, method, p logutil.Logger(ctx).Error("traffic request to tiproxy failed", zap.String("path", path), zap.String("addr", addr), zap.String("resp", resp), zap.Error(err)) return resps, errors.Wrapf(err, "request to tiproxy '%s' failed", addr) - } else { - resps[addr] = resp } + resps[addr] = resp } logutil.Logger(ctx).Info("traffic request to tiproxy succeeds", zap.Strings("addrs", addrs), zap.String("path", path)) return resps, nil From fd0f30ac8d0a764c892780f411540b058f02adc2 Mon Sep 17 00:00:00 2001 From: djshow832 Date: Fri, 10 Jan 2025 17:24:18 +0800 Subject: [PATCH 3/5] remove e --- pkg/executor/traffic.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/executor/traffic.go b/pkg/executor/traffic.go index 1232505c444b7..e5d01d68926fd 100644 --- a/pkg/executor/traffic.go +++ b/pkg/executor/traffic.go @@ -136,7 +136,7 @@ type TrafficCancelExec struct { } // Next implements the Executor Next interface. -func (e *TrafficCancelExec) Next(ctx context.Context, _ *chunk.Chunk) error { +func (*TrafficCancelExec) Next(ctx context.Context, _ *chunk.Chunk) error { addrs, err := getTiProxyAddrs(ctx) if err != nil { return errors.Wrapf(err, "get tiproxy addresses failed") From 062292bc1f4bec11c543fa6c90adee33b316faa3 Mon Sep 17 00:00:00 2001 From: djshow832 Date: Fri, 10 Jan 2025 18:42:00 +0800 Subject: [PATCH 4/5] fix unstable TestReplayPath --- pkg/executor/traffic_test.go | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/pkg/executor/traffic_test.go b/pkg/executor/traffic_test.go index 0872a499bb0c5..205e15bd99f90 100644 --- a/pkg/executor/traffic_test.go +++ b/pkg/executor/traffic_test.go @@ -20,7 +20,9 @@ import ( "net" "net/http" "net/url" + "sort" "strconv" + "strings" "sync" "testing" @@ -217,10 +219,10 @@ func TestReplayPath(t *testing.T) { {"tiproxy-0", "tiproxy-1", "tiproxy-2"}, } + ctx := context.TODO() + ctx = fillCtxWithTiProxyAddr(ctx, ports) for i, test := range tests { - ctx := context.TODO() - tempCtx := fillCtxWithTiProxyAddr(ctx, ports) - tempCtx = context.WithValue(tempCtx, trafficPathKey, test) + tempCtx := context.WithValue(ctx, trafficPathKey, test) suite := newTrafficTestSuite(t, 10) exec := suite.build(ctx, "traffic replay from 's3://bucket/tmp' user='root'") for j := 0; j < tiproxyNum; j++ { @@ -228,16 +230,23 @@ func TestReplayPath(t *testing.T) { } require.NoError(t, exec.Next(tempCtx, nil)) + paths := make([]string, 0, len(test)) for j := 0; j < tiproxyNum; j++ { httpHandler := handlers[j] - if j < len(test) { - require.Equal(t, http.MethodPost, httpHandler.getMethod()) - require.Equal(t, replayPath, httpHandler.getPath()) - require.Equal(t, fmt.Sprintf("s3://bucket/tmp/%s%d", filePrefix, j), httpHandler.getForm().Get("input"), "case %d-%d", i, j) - } else { - require.Nil(t, httpHandler.getForm(), "case %d-%d", i, j) + if httpHandler.getMethod() != "" { + form := httpHandler.getForm() + require.NotEmpty(t, form) + input := form.Get("input") + require.True(t, strings.HasPrefix(input, "s3://bucket/tmp/"), input) + paths = append(paths, input[len("s3://bucket/tmp/"):]) } + sort.Strings(paths) + } + expectedPaths := test + if len(test) > tiproxyNum { + expectedPaths = test[:tiproxyNum] } + require.Equal(t, expectedPaths, paths, "case %d", i) } } From 641faef490881c378799b151e78c1bae06baef73 Mon Sep 17 00:00:00 2001 From: djshow832 Date: Fri, 10 Jan 2025 19:27:22 +0800 Subject: [PATCH 5/5] fix tests fail on CI machine --- pkg/executor/traffic.go | 19 +++++++++---------- pkg/executor/traffic_test.go | 13 +++++++++---- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/pkg/executor/traffic.go b/pkg/executor/traffic.go index e5d01d68926fd..251e7faa12fca 100644 --- a/pkg/executor/traffic.go +++ b/pkg/executor/traffic.go @@ -334,20 +334,19 @@ func formReader4Replay(ctx context.Context, args map[string]string, tiproxyNum i return readers, nil } - backend, err := storage.ParseBackendFromURL(u, nil) - if err != nil { - return nil, errors.Wrapf(err, "parse backend from the input path failed") - } - store, err := storage.NewWithDefaultOpt(ctx, backend) - if err != nil { - return nil, errors.Wrapf(err, "create storage for input failed") - } - defer store.Close() - names := make([]string, 0, tiproxyNum) if mockNames := ctx.Value(trafficPathKey); mockNames != nil { names = mockNames.([]string) } else { + backend, err := storage.ParseBackendFromURL(u, nil) + if err != nil { + return nil, errors.Wrapf(err, "parse backend from the input path failed") + } + store, err := storage.NewWithDefaultOpt(ctx, backend) + if err != nil { + return nil, errors.Wrapf(err, "create storage for input failed") + } + defer store.Close() err = store.WalkDir(ctx, &storage.WalkOption{ ObjPrefix: filePrefix, }, func(name string, _ int64) error { diff --git a/pkg/executor/traffic_test.go b/pkg/executor/traffic_test.go index 205e15bd99f90..58914b9155186 100644 --- a/pkg/executor/traffic_test.go +++ b/pkg/executor/traffic_test.go @@ -187,12 +187,17 @@ func TestCapturePath(t *testing.T) { exec := suite.build(ctx, "traffic capture to 's3://bucket/tmp' duration='1s'") require.NoError(t, exec.Next(tempCtx, nil)) + paths := make([]string, 0, tiproxyNum) + expectedPaths := make([]string, 0, tiproxyNum) for i := 0; i < tiproxyNum; i++ { httpHandler := handlers[i] - require.Equal(t, http.MethodPost, httpHandler.getMethod()) - require.Equal(t, capturePath, httpHandler.getPath()) - require.Equal(t, fmt.Sprintf("s3://bucket/tmp/%s%d", filePrefix, i), httpHandler.getForm().Get("output"), "case %d", i) + output := httpHandler.getForm().Get("output") + require.True(t, strings.HasPrefix(output, "s3://bucket/tmp/"), output) + paths = append(paths, output[len("s3://bucket/tmp/"):]) + expectedPaths = append(expectedPaths, fmt.Sprintf("tiproxy-%d", i)) } + sort.Strings(paths) + require.Equal(t, expectedPaths, paths) } func TestReplayPath(t *testing.T) { @@ -240,8 +245,8 @@ func TestReplayPath(t *testing.T) { require.True(t, strings.HasPrefix(input, "s3://bucket/tmp/"), input) paths = append(paths, input[len("s3://bucket/tmp/"):]) } - sort.Strings(paths) } + sort.Strings(paths) expectedPaths := test if len(test) > tiproxyNum { expectedPaths = test[:tiproxyNum]