From e17badba0a3212275909a8d8b6e9604dfe99b4ae Mon Sep 17 00:00:00 2001 From: David Taylor Date: Tue, 14 Jan 2025 21:02:52 +0000 Subject: [PATCH] jobs: remove legacy error log Release note: none. Epic: none. --- pkg/jobs/BUILD.bazel | 2 - pkg/jobs/jobs.go | 90 --------- pkg/jobs/jobspb/jobs.proto | 7 +- pkg/jobs/registry.go | 23 +-- pkg/jobs/registry_external_test.go | 297 ----------------------------- pkg/server/admin.go | 17 -- pkg/sql/crdb_internal.go | 17 -- 7 files changed, 7 insertions(+), 446 deletions(-) diff --git a/pkg/jobs/BUILD.bazel b/pkg/jobs/BUILD.bazel index afeeba9a9a48..49025fc7df8a 100644 --- a/pkg/jobs/BUILD.bazel +++ b/pkg/jobs/BUILD.bazel @@ -64,7 +64,6 @@ go_library( "//pkg/util/cidr", "//pkg/util/envutil", "//pkg/util/hlc", - "//pkg/util/json", "//pkg/util/log", "//pkg/util/log/eventpb", "//pkg/util/log/severity", @@ -83,7 +82,6 @@ go_library( "@com_github_cockroachdb_errors//oserror", "@com_github_cockroachdb_logtags//:logtags", "@com_github_cockroachdb_redact//:redact", - "@com_github_gogo_protobuf//jsonpb", "@com_github_gogo_protobuf//proto", "@com_github_gogo_protobuf//types", "@com_github_klauspost_compress//gzip", diff --git a/pkg/jobs/jobs.go b/pkg/jobs/jobs.go index f9569c5b75d6..3352a962d448 100644 --- a/pkg/jobs/jobs.go +++ b/pkg/jobs/jobs.go @@ -6,9 +6,7 @@ package jobs import ( - "bytes" "context" - gojson "encoding/json" "fmt" "reflect" "sync/atomic" @@ -21,13 +19,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/isql" - "github.com/cockroachdb/cockroach/pkg/sql/protoreflect" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" - "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util" - "github.com/cockroachdb/cockroach/pkg/util/json" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -36,7 +31,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" - "github.com/gogo/protobuf/jsonpb" ) // Job manages logging the progress of long-running system processes, like @@ -892,90 +886,6 @@ func (sj *StartableJob) recordStart() (alreadyStarted bool) { return atomic.AddInt64(&sj.starts, 1) != 1 } -// ParseRetriableExecutionErrorLogFromJSON inverts the output of -// FormatRetriableExecutionErrorLogToJSON. -func ParseRetriableExecutionErrorLogFromJSON( - log []byte, -) ([]*jobspb.RetriableExecutionFailure, error) { - var jsonArr []gojson.RawMessage - if err := gojson.Unmarshal(log, &jsonArr); err != nil { - return nil, errors.Wrap(err, "failed to decode json array for execution log") - } - ret := make([]*jobspb.RetriableExecutionFailure, len(jsonArr)) - - json := jsonpb.Unmarshaler{AllowUnknownFields: true} - var reader bytes.Reader - for i, data := range jsonArr { - msgI, err := protoreflect.NewMessage("cockroach.sql.jobs.jobspb.RetriableExecutionFailure") - if err != nil { - return nil, errors.WithAssertionFailure(err) - } - msg := msgI.(*jobspb.RetriableExecutionFailure) - reader.Reset(data) - if err := json.Unmarshal(&reader, msg); err != nil { - return nil, err - } - ret[i] = msg - } - return ret, nil -} - -// FormatRetriableExecutionErrorLogToJSON extracts the events -// stored in the payload, formats them into a json array. This function -// is intended for use with crdb_internal.jobs. Note that the error will -// be flattened into a string and stored in the TruncatedError field. -func FormatRetriableExecutionErrorLogToJSON( - ctx context.Context, log []*jobspb.RetriableExecutionFailure, -) (*tree.DJSON, error) { - ab := json.NewArrayBuilder(len(log)) - for i := range log { - ev := *log[i] - if ev.Error != nil { - ev.TruncatedError = errors.DecodeError(ctx, *ev.Error).Error() - ev.Error = nil - } - msg, err := protoreflect.MessageToJSON(&ev, protoreflect.FmtFlags{ - EmitDefaults: false, - }) - if err != nil { - return nil, err - } - ab.Add(msg) - } - return tree.NewDJSON(ab.Build()), nil -} - -// FormatRetriableExecutionErrorLogToStringArray extracts the events -// stored in the payload, formats them into strings and returns them as an -// array of strings. This function is intended for use with crdb_internal.jobs. -func FormatRetriableExecutionErrorLogToStringArray( - ctx context.Context, log []*jobspb.RetriableExecutionFailure, -) *tree.DArray { - arr := tree.NewDArray(types.String) - for _, ev := range log { - if ev == nil { // no reason this should happen, but be defensive - continue - } - var cause error - if ev.Error != nil { - cause = errors.DecodeError(ctx, *ev.Error) - } else { - cause = fmt.Errorf("(truncated) %s", ev.TruncatedError) - } - msg := formatRetriableExecutionFailure( - ev.InstanceID, - Status(ev.Status), - timeutil.FromUnixMicros(ev.ExecutionStartMicros), - timeutil.FromUnixMicros(ev.ExecutionEndMicros), - cause, - ) - // We really don't care about errors here. I'd much rather see nothing - // in my log than crash. - _ = arr.Append(tree.NewDString(msg)) - } - return arr -} - // GetJobTraceID returns the current trace ID of the job from the job progress. func GetJobTraceID(ctx context.Context, db isql.DB, jobID jobspb.JobID) (tracingpb.TraceID, error) { var traceID tracingpb.TraceID diff --git a/pkg/jobs/jobspb/jobs.proto b/pkg/jobs/jobspb/jobs.proto index cacae1048d15..4f0fda95d7ff 100644 --- a/pkg/jobs/jobspb/jobs.proto +++ b/pkg/jobs/jobspb/jobs.proto @@ -1481,12 +1481,9 @@ message Payload { // PauseReason is used to describe the reason that the job is currently paused // or has been requested to be paused. string pause_reason = 28; - // RetriableExecutionFailureLog stores a history of retriable execution - // failures. These failures may occur in either the RUNNING or REVERTING - // status. A finite number of these entries will be kept, as governed by - // the jobs.execution_errors.max_entries cluster setting. - repeated RetriableExecutionFailure retriable_execution_failure_log = 32; + reserved 32; + // CreationClusterID is populated at creation with the ClusterID, in case a // job resuming later, needs to use this information, e.g. to determine if it // has been restored into a different cluster, which might mean it should diff --git a/pkg/jobs/registry.go b/pkg/jobs/registry.go index 6b77540a20dc..74899bece55c 100644 --- a/pkg/jobs/registry.go +++ b/pkg/jobs/registry.go @@ -1983,25 +1983,12 @@ func (r *Registry) maybeRecordExecutionFailure(ctx context.Context, err error, j if !errors.As(err, &efe) { return } - - updateErr := j.NoTxn().Update(ctx, func( - txn isql.Txn, md JobMetadata, ju *JobUpdater, - ) error { - pl := md.Payload - { // Append the entry to the log - maxSize := int(executionErrorsMaxEntrySize.Get(&r.settings.SV)) - pl.RetriableExecutionFailureLog = append(pl.RetriableExecutionFailureLog, - efe.toRetriableExecutionFailure(ctx, maxSize)) - } - { // Maybe truncate the log. - maxEntries := int(executionErrorsMaxEntriesSetting.Get(&r.settings.SV)) - log := &pl.RetriableExecutionFailureLog - if len(*log) > maxEntries { - *log = (*log)[len(*log)-maxEntries:] - } + updateErr := r.db.Txn(ctx, func(ctx context.Context, txn isql.Txn) error { + v, err := txn.GetSystemSchemaVersion(ctx) + if err != nil || v.Less(clusterversion.V25_1.Version()) { + return err } - ju.UpdatePayload(pl) - return nil + return j.Messages().Record(ctx, txn, "retry", efe.cause.Error()) }) if ctx.Err() != nil { return diff --git a/pkg/jobs/registry_external_test.go b/pkg/jobs/registry_external_test.go index a7b0e5bdd0b2..3f60a9220987 100644 --- a/pkg/jobs/registry_external_test.go +++ b/pkg/jobs/registry_external_test.go @@ -12,7 +12,6 @@ import ( "fmt" "reflect" "regexp" - "strconv" "strings" "sync/atomic" "testing" @@ -29,7 +28,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/lease" "github.com/cockroachdb/cockroach/pkg/sql/isql" - "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage" @@ -39,7 +37,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" @@ -417,300 +414,6 @@ func TestGCDurationControl(t *testing.T) { tdb.CheckQueryResultsRetry(t, existsQuery, [][]string{{"false"}}) } -// TestErrorsPopulatedOnRetry confirms that when a job fails with a retriable -// error that that error makes its way to the payload. -func TestErrorsPopulatedOnRetry(t *testing.T) { - defer leaktest.AfterTest(t)() - ls := log.Scope(t) - defer ls.Close(t) - - type event struct { - id jobspb.JobID - resume chan error - } - mkEvent := func(j *jobs.Job) event { - return event{id: j.ID(), resume: make(chan error)} - } - evChan := make(chan event) - defer jobs.TestingRegisterConstructor(jobspb.TypeImport, func(j *jobs.Job, cs *cluster.Settings) jobs.Resumer { - execFn := func(ctx context.Context) error { - ev := mkEvent(j) - select { - case evChan <- ev: - case <-ctx.Done(): - return ctx.Err() - } - select { - case err := <-ev.resume: - return err - case <-ctx.Done(): - return ctx.Err() - } - } - return jobstest.FakeResumer{ - OnResume: execFn, - FailOrCancel: execFn, - } - }, jobs.UsesTenantCostControl)() - s, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{ - Knobs: base.TestingKnobs{ - JobsTestingKnobs: jobs.NewTestingKnobsWithShortIntervals(), - }, - }) - tdb := sqlutils.MakeSQLRunner(sqlDB) - ctx := context.Background() - defer s.Stopper().Stop(ctx) - registry := s.JobRegistry().(*jobs.Registry) - mkJob := func(t *testing.T) jobspb.JobID { - id := registry.MakeJobID() - _, err := registry.CreateJobWithTxn(ctx, jobs.Record{ - // Job does not accept an empty Details field, so arbitrarily provide - // ImportDetails. - Details: jobspb.ImportDetails{}, - Progress: jobspb.ImportProgress{}, - Username: username.TestUserName(), - }, id, nil /* txn */) - require.NoError(t, err) - return id - } - type parsedError struct { - start, end time.Time - status jobs.Status - error string - instance base.SQLInstanceID - } - var ( - executionErrorRE = regexp.MustCompile( - `(?P\w+) execution from '(?P.*)' to '(?P.*)' on (?P\d+) failed: (?P.*)`, - ) - statusIdx = executionErrorRE.SubexpIndex("status") - startIdx = executionErrorRE.SubexpIndex("Start") - endIdx = executionErrorRE.SubexpIndex("End") - instanceIdx = executionErrorRE.SubexpIndex("instance") - errorIdx = executionErrorRE.SubexpIndex("error") - ) - parseTimestamp := func(t *testing.T, s string) time.Time { - ptc := tree.NewParseContext(timeutil.Now()) - ts, _, err := tree.ParseDTimestamp(ptc, s, time.Microsecond) - require.NoError(t, err) - return ts.Time - } - parseInstanceID := func(t *testing.T, s string) base.SQLInstanceID { - i, err := strconv.ParseInt(s, 10, 32) - require.NoError(t, err) - return base.SQLInstanceID(i) - } - parseExecutionError := func(t *testing.T, s string) (ret parsedError) { - matches := executionErrorRE.FindStringSubmatch(s) - require.NotNil(t, matches) - ret.status = jobs.Status(matches[statusIdx]) - ret.start = parseTimestamp(t, matches[startIdx]) - ret.end = parseTimestamp(t, matches[endIdx]) - ret.instance = parseInstanceID(t, matches[instanceIdx]) - ret.error = matches[errorIdx] - return ret - } - parseExecutionErrors := func(t *testing.T, s [][]string) (ret []parsedError) { - for _, res := range s { - require.Len(t, res, 1) - ret = append(ret, parseExecutionError(t, res[0])) - } - return ret - } - executionErrorEqual := func(t *testing.T, a, b parsedError) { - require.Equal(t, a.instance, b.instance) - require.Equal(t, a.error, b.error) - require.Equal(t, a.status, b.status) - } - waitForEvent := func(t *testing.T, id jobspb.JobID) (ev event, start time.Time) { - ev = <-evChan - require.Equal(t, id, ev.id) - tdb.QueryRow(t, "SELECT now() FROM crdb_internal.jobs WHERE job_id = $1", id).Scan(&start) - return ev, start - } - checkExecutionError := func( - t *testing.T, execErr parsedError, status jobs.Status, _, _ time.Time, cause string, - ) { - require.Equal(t, base.SQLInstanceID(1), execErr.instance) - require.Equal(t, status, execErr.status) - require.Equal(t, cause, execErr.error) - } - getExecErrors := func(t *testing.T, id jobspb.JobID) []parsedError { - return parseExecutionErrors(t, - tdb.QueryStr(t, ` -SELECT unnest(execution_errors) - FROM crdb_internal.jobs - WHERE job_id = $1;`, id), - ) - } - checkLogEntry := func( - t *testing.T, id jobspb.JobID, status jobs.Status, - from, to time.Time, cause string, - ) { - log.FlushFiles() - entries, err := log.FetchEntriesFromFiles( - from.UnixNano(), to.UnixNano(), 2, - regexp.MustCompile(fmt.Sprintf( - "job %d: %s execution encountered retriable error: %s", - id, status, cause, - )), - log.WithFlattenedSensitiveData, - ) - require.NoError(t, err) - require.Len(t, entries, 1) - } - t.Run("retriable error makes it into payload", func(t *testing.T) { - id := mkJob(t) - firstRun, firstStart := waitForEvent(t, id) - const err1 = "boom1" - firstRun.resume <- jobs.MarkAsRetryJobError(errors.New(err1)) - - // Wait for the job to get restarted. - secondRun, secondStart := waitForEvent(t, id) - - // Confirm the previous execution error was properly recorded. - var firstExecErr parsedError - { - execErrs := getExecErrors(t, id) - require.Len(t, execErrs, 1) - firstExecErr = execErrs[0] - checkExecutionError(t, firstExecErr, jobs.StatusRunning, firstStart, secondStart, err1) - checkLogEntry(t, id, jobs.StatusRunning, firstStart, secondStart, err1) - } - const err2 = "boom2" - secondRun.resume <- jobs.MarkAsRetryJobError(errors.New(err2)) - thirdRun, thirdStart := waitForEvent(t, id) - { - execErrs := getExecErrors(t, id) - require.Len(t, execErrs, 2) - executionErrorEqual(t, firstExecErr, execErrs[0]) - secondExecErr := execErrs[1] - checkExecutionError(t, secondExecErr, jobs.StatusRunning, secondStart, thirdStart, err2) - checkLogEntry(t, id, jobs.StatusRunning, secondStart, thirdStart, err2) - } - close(thirdRun.resume) - require.NoError(t, registry.WaitForJobs(ctx, []jobspb.JobID{id})) - }) - t.Run("fail or cancel error", func(t *testing.T) { - id := mkJob(t) - firstRun, firstStart := waitForEvent(t, id) - const err1 = "boom1" - firstRun.resume <- jobs.MarkAsRetryJobError(errors.New(err1)) - - // Wait for the job to get restarted. - secondRun, secondStart := waitForEvent(t, id) - - // Confirm the previous execution error was properly recorded. - var firstExecErr parsedError - { - execErrs := getExecErrors(t, id) - require.Len(t, execErrs, 1) - firstExecErr = execErrs[0] - checkExecutionError(t, firstExecErr, jobs.StatusRunning, firstStart, secondStart, err1) - checkLogEntry(t, id, jobs.StatusRunning, firstStart, secondStart, err1) - } - const err2 = "boom2" - secondRun.resume <- errors.New(err2) - thirdRun, thirdStart := waitForEvent(t, id) // thirdRun is Reverting - // Confirm that no new error was recorded in the log. It will be in - // FinalResumeError. - { - execErrs := getExecErrors(t, id) - require.Len(t, execErrs, 1) - executionErrorEqual(t, firstExecErr, execErrs[0]) - } - const err3 = "boom3" - thirdRun.resume <- jobs.MarkAsRetryJobError(errors.New(err3)) - fourthRun, fourthStart := waitForEvent(t, id) - { - execErrs := getExecErrors(t, id) - require.Len(t, execErrs, 2) - executionErrorEqual(t, firstExecErr, execErrs[0]) - checkExecutionError(t, execErrs[1], jobs.StatusReverting, thirdStart, fourthStart, err3) - checkLogEntry(t, id, jobs.StatusReverting, thirdStart, fourthStart, err3) - } - close(fourthRun.resume) - require.Regexp(t, err2, registry.WaitForJobs(ctx, []jobspb.JobID{id})) - }) - t.Run("truncation", func(t *testing.T) { - id := mkJob(t) - firstRun, firstStart := waitForEvent(t, id) - const maxSize, largeSize = 2 << 10, 8 << 10 - tdb.Exec(t, "SET CLUSTER SETTING "+jobs.ExecutionErrorsMaxEntrySizeKey+" = $1", maxSize) - tdb.Exec(t, "SET CLUSTER SETTING "+jobs.ExecutionErrorsMaxEntriesKey+" = $1", 1) - err1 := strings.Repeat("a", largeSize) - firstRun.resume <- jobs.MarkAsRetryJobError(fmt.Errorf("%s", err1)) - - // Wait for the job to get restarted. - secondRun, secondStart := waitForEvent(t, id) - // Confirm the previous execution error was properly recorded. - var firstExecErr parsedError - { - execErrs := getExecErrors(t, id) - require.Len(t, execErrs, 1) - firstExecErr = execErrs[0] - // Ensure we see the truncated error in the table but the full error - // in the logs. - expTruncatedError := "(truncated) " + err1[:maxSize] - checkExecutionError(t, firstExecErr, jobs.StatusRunning, firstStart, secondStart, expTruncatedError) - checkLogEntry(t, id, jobs.StatusRunning, firstStart, secondStart, err1) - } - const err2 = "boom2" - secondRun.resume <- jobs.MarkAsRetryJobError(errors.New(err2)) - thirdRun, thirdStart := waitForEvent(t, id) - var secondExecErr parsedError - { - execErrs := getExecErrors(t, id) - require.Len(t, execErrs, 1) - secondExecErr = execErrs[0] - checkExecutionError(t, secondExecErr, jobs.StatusRunning, secondStart, thirdStart, err2) - checkLogEntry(t, id, jobs.StatusRunning, secondStart, thirdStart, err2) - } - // Fail the job so we can also test the truncation of reverting retry - // errors. - const err3 = "boom3" - thirdRun.resume <- errors.New(err3) // not retriable - fourthRun, fourthStart := waitForEvent(t, id) // first Reverting run - { - execErrs := getExecErrors(t, id) - require.Len(t, execErrs, 1) - executionErrorEqual(t, secondExecErr, execErrs[0]) - } - err4 := strings.Repeat("b", largeSize) - fourthRun.resume <- jobs.MarkAsRetryJobError(fmt.Errorf("%s", err4)) - fifthRun, fifthStart := waitForEvent(t, id) - { - execErrs := getExecErrors(t, id) - require.Len(t, execErrs, 1) - // Ensure we see the truncated error in the table but the full error - // in the logs. - expTruncatedError := "(truncated) " + err4[:maxSize] - checkExecutionError(t, execErrs[0], jobs.StatusReverting, fourthStart, fifthStart, expTruncatedError) - checkLogEntry(t, id, jobs.StatusReverting, fourthStart, fifthStart, err4) - } - const err5 = "boom5" - fifthRun.resume <- jobs.MarkAsRetryJobError(errors.New(err5)) - sixthRun, sixthStart := waitForEvent(t, id) - { - execErrs := getExecErrors(t, id) - require.Len(t, execErrs, 1) - checkExecutionError(t, execErrs[0], jobs.StatusReverting, fifthStart, sixthStart, err5) - checkLogEntry(t, id, jobs.StatusReverting, fifthStart, sixthStart, err5) - } - const err6 = "boom5" - tdb.Exec(t, "SET CLUSTER SETTING "+jobs.ExecutionErrorsMaxEntriesKey+" = $1", 0) - sixthRun.resume <- jobs.MarkAsRetryJobError(errors.New(err6)) - seventhRun, seventhStart := waitForEvent(t, id) - { - execErrs := getExecErrors(t, id) - require.Len(t, execErrs, 0) - checkLogEntry(t, id, jobs.StatusReverting, sixthStart, seventhStart, err6) - } - close(seventhRun.resume) - require.Regexp(t, err3, registry.WaitForJobs(ctx, []jobspb.JobID{id})) - }) -} - // TestWaitWithRetryableError tests retryable errors when querying // for jobs. func TestWaitWithRetryableError(t *testing.T) { diff --git a/pkg/server/admin.go b/pkg/server/admin.go index 035c386a890d..6128a8c07e81 100644 --- a/pkg/server/admin.go +++ b/pkg/server/admin.go @@ -2395,23 +2395,6 @@ func scanRowIntoJob(scanner resultScanner, row tree.Datums, job *serverpb.JobRes if runningStatusOrNil != nil { job.RunningStatus = *runningStatusOrNil } - if executionFailuresOrNil != nil { - failures, err := jobs.ParseRetriableExecutionErrorLogFromJSON([]byte(*executionFailuresOrNil)) - if err != nil { - return errors.Wrap(err, "parse") - } - job.ExecutionFailures = make([]*serverpb.JobResponse_ExecutionFailure, len(failures)) - for i, f := range failures { - start := time.UnixMicro(f.ExecutionStartMicros) - end := time.UnixMicro(f.ExecutionEndMicros) - job.ExecutionFailures[i] = &serverpb.JobResponse_ExecutionFailure{ - Status: f.Status, - Start: &start, - End: &end, - Error: f.TruncatedError, - } - } - } if coordinatorOrNil != nil { job.CoordinatorID = *coordinatorOrNil } diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 0b45ebeff0ee..52840ff1c7cc 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -1365,23 +1365,6 @@ func makeJobsTableRows( traceID = tree.NewDInt(tree.DInt(progress.TraceID)) } } - if len(payload.RetriableExecutionFailureLog) > 0 { - executionErrors = jobs.FormatRetriableExecutionErrorLogToStringArray( - ctx, payload.RetriableExecutionFailureLog, - ) - // It's not clear why we'd ever see an error here, - var err error - executionEvents, err = jobs.FormatRetriableExecutionErrorLogToJSON( - ctx, payload.RetriableExecutionFailureLog, - ) - if err != nil { - if errorStr == tree.DNull { - errorStr = tree.NewDString(errors.Wrap(err, "failed to marshal execution error log").Error()) - } else { - executionEvents = tree.DNull - } - } - } if err = addRow( id,