From 31054f9b5f54d1cf5d4e8503b34399c7b54f8ef9 Mon Sep 17 00:00:00 2001 From: Jonathan Gramain Date: Tue, 7 Jan 2025 16:26:32 -0800 Subject: [PATCH] impr: BKTCLT-36 go client: attach x-scal-request-uids header Add a x-scal-request-uids header to requests sent to bucketd, containing the existing UIDs passed by the caller via the appropriate option (if passed) --- go/admin_bucketrefreshcache.go | 5 +++-- go/admin_getbucketaccessmode.go | 4 ++-- go/admin_getbucketsessionid.go | 5 +++-- go/admin_getsessioninfo.go | 10 ++++++---- go/admin_getsessionleader.go | 5 +++-- go/admin_getsessionlog.go | 8 +++++--- go/admin_setbucketaccessmode.go | 4 ++-- go/bucketclientrequest.go | 12 ++++++++++++ go/bucketclientrequest_test.go | 15 +++++++++++++++ go/createbucket.go | 27 ++++++++++++++++++--------- go/createmetastoreentry.go | 4 ++-- go/deletebucket.go | 5 +++-- go/deletemetastoreentry.go | 5 +++-- go/getbucketattributes.go | 5 +++-- go/getmetastoreentry.go | 5 +++-- go/listbasic.go | 29 +++++++++++++++++++++-------- go/listobjectversions.go | 17 ++++++++++++++++- go/postbatch.go | 14 ++++++++------ go/putbucketattributes.go | 4 ++-- 19 files changed, 130 insertions(+), 53 deletions(-) diff --git a/go/admin_bucketrefreshcache.go b/go/admin_bucketrefreshcache.go index 8e91a35..957bc11 100644 --- a/go/admin_bucketrefreshcache.go +++ b/go/admin_bucketrefreshcache.go @@ -9,8 +9,9 @@ import ( // AdminBucketRefreshCache refreshes the bucketd cache of metastore // entries for the given bucket. Useful after switching the raft session // of a bucket. -func (client *BucketClient) AdminBucketRefreshCache(ctx context.Context, bucketName string) error { +func (client *BucketClient) AdminBucketRefreshCache(ctx context.Context, bucketName string, + opts ...RequestOption) error { resource := fmt.Sprintf("/_/buckets/%s/refreshCache", url.PathEscape(bucketName)) - _, err := client.Request(ctx, "AdminBucketRefreshCache", "GET", resource) + _, err := client.Request(ctx, "AdminBucketRefreshCache", "GET", resource, opts...) return err } diff --git a/go/admin_getbucketaccessmode.go b/go/admin_getbucketaccessmode.go index 014e90c..74b6868 100644 --- a/go/admin_getbucketaccessmode.go +++ b/go/admin_getbucketaccessmode.go @@ -11,11 +11,11 @@ import ( // - "read-only" when the bucket is only accessible for read operations. // Returns "" and an error if the bucket doesn't exist, or if a request error occurs. func (client *BucketClient) AdminGetBucketAccessMode(ctx context.Context, - bucketName string) (BucketAccessMode, error) { + bucketName string, opts ...RequestOption) (BucketAccessMode, error) { // Escape the bucket name to avoid any risk to inadvertently or maliciously // call another route with an incorrect/crafted bucket name containing slashes. resource := fmt.Sprintf("/_/buckets/%s/accessMode", url.PathEscape(bucketName)) - responseBody, err := client.Request(ctx, "AdminGetBucketAccessMode", "GET", resource) + responseBody, err := client.Request(ctx, "AdminGetBucketAccessMode", "GET", resource, opts...) if err != nil { return "", err } diff --git a/go/admin_getbucketsessionid.go b/go/admin_getbucketsessionid.go index 0c83569..7332d13 100644 --- a/go/admin_getbucketsessionid.go +++ b/go/admin_getbucketsessionid.go @@ -9,11 +9,12 @@ import ( // AdminGetBucketSessionID returns the raft session ID of the given bucket. // Returns 0 and an error if the bucket doesn't exist, or if a request error occurs. -func (client *BucketClient) AdminGetBucketSessionID(ctx context.Context, bucketName string) (int, error) { +func (client *BucketClient) AdminGetBucketSessionID(ctx context.Context, bucketName string, + opts ...RequestOption) (int, error) { // Escape the bucket name to avoid any risk to inadvertently or maliciously // call another route with an incorrect/crafted bucket name containing slashes. resource := fmt.Sprintf("/_/buckets/%s/id", url.PathEscape(bucketName)) - responseBody, err := client.Request(ctx, "AdminGetBucketSessionID", "GET", resource) + responseBody, err := client.Request(ctx, "AdminGetBucketSessionID", "GET", resource, opts...) if err != nil { return 0, err } diff --git a/go/admin_getsessioninfo.go b/go/admin_getsessioninfo.go index 0bddfcb..7125055 100644 --- a/go/admin_getsessioninfo.go +++ b/go/admin_getsessioninfo.go @@ -8,8 +8,10 @@ import ( // AdminGetAllSessionsInfo returns raft session info for all Metadata // raft sessions available on the S3C deployment. -func (client *BucketClient) AdminGetAllSessionsInfo(ctx context.Context) ([]SessionInfo, error) { - responseBody, err := client.Request(ctx, "AdminGetAllSessionsInfo", "GET", "/_/raft_sessions") +func (client *BucketClient) AdminGetAllSessionsInfo(ctx context.Context, + opts ...RequestOption) ([]SessionInfo, error) { + responseBody, err := client.Request(ctx, "AdminGetAllSessionsInfo", "GET", "/_/raft_sessions", + opts...) if err != nil { return nil, err } @@ -27,13 +29,13 @@ func (client *BucketClient) AdminGetAllSessionsInfo(ctx context.Context) ([]Sess // Returns nil and an error if the raft session doesn't exist, or if a request // error occurs. func (client *BucketClient) AdminGetSessionInfo(ctx context.Context, - sessionId int) (*SessionInfo, error) { + sessionId int, opts ...RequestOption) (*SessionInfo, error) { // When querying /_/raft_sessions/X/info, bucketd returns a // status 500 if the raft session doesn't exist, which is hard // to distinguish from a generic type of failure. For this // reason, instead, we fetch the info for all raft sessions // then lookup the one we want. - sessionsInfo, err := client.AdminGetAllSessionsInfo(ctx) + sessionsInfo, err := client.AdminGetAllSessionsInfo(ctx, opts...) if err != nil { return nil, err } diff --git a/go/admin_getsessionleader.go b/go/admin_getsessionleader.go index 2028497..b49d5c6 100644 --- a/go/admin_getsessionleader.go +++ b/go/admin_getsessionleader.go @@ -12,9 +12,10 @@ import ( // Returns nil and an error if the raft session doesn't exist, if // bucketd is not connected to the leader, or if a request error // occurs. -func (client *BucketClient) AdminGetSessionLeader(ctx context.Context, sessionId int) (*MemberInfo, error) { +func (client *BucketClient) AdminGetSessionLeader(ctx context.Context, sessionId int, + opts ...RequestOption) (*MemberInfo, error) { resource := fmt.Sprintf("/_/raft_sessions/%d/leader", sessionId) - responseBody, err := client.Request(ctx, "AdminGetSessionLeader", "GET", resource) + responseBody, err := client.Request(ctx, "AdminGetSessionLeader", "GET", resource, opts...) if err != nil { return nil, err } diff --git a/go/admin_getsessionlog.go b/go/admin_getsessionlog.go index f246a76..f74b75f 100644 --- a/go/admin_getsessionlog.go +++ b/go/admin_getsessionlog.go @@ -18,12 +18,14 @@ type AdminGetSessionLogResponse struct { // if targetLeader is true, it fetches the oplog from the leader, // otherwise fetches from one of the followers func (client *BucketClient) AdminGetSessionLog(ctx context.Context, - sessionId int, beginSeq int64, nRecords int, targetLeader bool) (*AdminGetSessionLogResponse, error) { - resource := fmt.Sprintf("/_/raft_sessions/%d/log?begin=%d&limit=%d", sessionId, beginSeq, nRecords) + sessionId int, beginSeq int64, nRecords int, targetLeader bool, + opts ...RequestOption) (*AdminGetSessionLogResponse, error) { + resource := fmt.Sprintf("/_/raft_sessions/%d/log?begin=%d&limit=%d", + sessionId, beginSeq, nRecords) if targetLeader { resource += "&target_leader=true" } - responseBody, err := client.Request(ctx, "AdminGetSessionLog", "GET", resource) + responseBody, err := client.Request(ctx, "AdminGetSessionLog", "GET", resource, opts...) if err != nil { return nil, err } diff --git a/go/admin_setbucketaccessmode.go b/go/admin_setbucketaccessmode.go index be76087..3aef5d3 100644 --- a/go/admin_setbucketaccessmode.go +++ b/go/admin_setbucketaccessmode.go @@ -13,9 +13,9 @@ import ( // // Returns an error if the bucket doesn't exist, or if a request error occurs. func (client *BucketClient) AdminSetBucketAccessMode(ctx context.Context, - bucketName string, accessMode BucketAccessMode) error { + bucketName string, accessMode BucketAccessMode, opts ...RequestOption) error { resource := fmt.Sprintf("/_/buckets/%s/accessMode?mode=%s", url.PathEscape(bucketName), accessMode) - _, err := client.Request(ctx, "AdminSetBucketAccessMode", "PUT", resource) + _, err := client.Request(ctx, "AdminSetBucketAccessMode", "PUT", resource, opts...) return err } diff --git a/go/bucketclientrequest.go b/go/bucketclientrequest.go index 0d43426..45d4b36 100644 --- a/go/bucketclientrequest.go +++ b/go/bucketclientrequest.go @@ -12,6 +12,7 @@ import ( type requestOptionSet struct { requestBody []byte requestBodyContentType string + requestUIDs string idempotent bool } @@ -29,6 +30,12 @@ func RequestBodyContentTypeOption(contentType string) RequestOption { } } +func RequestUIDsOption(uids string) RequestOption { + return func(ros *requestOptionSet) { + ros.requestUIDs = uids + } +} + func RequestIdempotent(ros *requestOptionSet) { ros.idempotent = true } @@ -64,6 +71,11 @@ func (client *BucketClient) Request(ctx context.Context, if options.requestBodyContentType != "" { request.Header.Add("Content-Type", string(options.requestBodyContentType)) } + + if options.requestUIDs != "" { + request.Header.Add("x-scal-request-uids", options.requestUIDs) + } + if options.idempotent { request.Header["Idempotency-Key"] = []string{} } diff --git a/go/bucketclientrequest_test.go b/go/bucketclientrequest_test.go index 6de3f56..29c4750 100644 --- a/go/bucketclientrequest_test.go +++ b/go/bucketclientrequest_test.go @@ -44,6 +44,21 @@ var _ = Describe("BucketClient.Request()", func() { "/default/bucket/somebucket/someobject")).To(Equal( []byte(`{"some":"metadata","version":"1234"}`))) }) + It("succeeds with a 200 response on GET request with provided request UIDs", func(ctx SpecContext) { + httpmock.RegisterResponder( + "GET", "http://localhost:9000/default/bucket/somebucket/someobject", + func(req *http.Request) (*http.Response, error) { + uids := req.Header.Get("x-scal-request-uids") + Expect(uids).To(Equal("my-uids")) + + return httpmock.NewStringResponse(200, `{"some":"metadata","version":"1234"}`), nil + }, + ) + Expect(client.Request(ctx, "GetObject", "GET", + "/default/bucket/somebucket/someobject", + bucketclient.RequestUIDsOption("my-uids"))).To(Equal( + []byte(`{"some":"metadata","version":"1234"}`))) + }) It("sends PUT request with body and succeeds with a 200 response", func(ctx SpecContext) { httpmock.RegisterResponder( "PUT", "http://localhost:9000/default/bucket/somebucket/someobject", diff --git a/go/createbucket.go b/go/createbucket.go index ecc8ec5..2f5a0b4 100644 --- a/go/createbucket.go +++ b/go/createbucket.go @@ -12,6 +12,7 @@ import ( type createBucketOptionSet struct { sessionId int makeIdempotent bool + requestUIDs string } type CreateBucketOption func(*createBucketOptionSet) @@ -26,22 +27,27 @@ func CreateBucketMakeIdempotent(options *createBucketOptionSet) { options.makeIdempotent = true } +func CreateBucketRequestUIDsOption(uids string) CreateBucketOption { + return func(options *createBucketOptionSet) { + options.requestUIDs = uids + } +} + // CreateBucket creates a bucket in metadata. // bucketAttributes is a JSON blob of bucket attributes // opts is a set of options: // -// CreateBucketSessionIdOption forces the session ID where the bucket to be -// created will land +// CreateBucketSessionIdOption forces the session ID where the bucket to be +// created will land // -// CreateBucketMakeIdempotent makes the request return a success if a bucket -// with the same UID already exists (otherwise returns 409 Conflict, as -// if the option is not passed) +// CreateBucketMakeIdempotent makes the request return a success if a bucket +// with the same UID already exists (otherwise returns 409 Conflict, as +// if the option is not passed) +// +// CreateBucketRequestUIDsOption attaches existing UIDs to the CreateBucket request func (client *BucketClient) CreateBucket(ctx context.Context, bucketName string, bucketAttributes []byte, opts ...CreateBucketOption) error { - parsedOpts := createBucketOptionSet{ - sessionId: 0, - makeIdempotent: false, - } + parsedOpts := createBucketOptionSet{} for _, opt := range opts { opt(&parsedOpts) } @@ -66,6 +72,9 @@ func (client *BucketClient) CreateBucket(ctx context.Context, // then be considered a success) requestOptions = append(requestOptions, RequestIdempotent) } + if parsedOpts.requestUIDs != "" { + requestOptions = append(requestOptions, RequestUIDsOption(parsedOpts.requestUIDs)) + } _, err := client.Request(ctx, "CreateBucket", "POST", resource, requestOptions...) if err == nil { return nil diff --git a/go/createmetastoreentry.go b/go/createmetastoreentry.go index e2e9b53..74f87fa 100644 --- a/go/createmetastoreentry.go +++ b/go/createmetastoreentry.go @@ -8,7 +8,7 @@ import ( // CreateMetastoreEntry creates or updates a metastore entry for the given bucket func (client *BucketClient) CreateMetastoreEntry(ctx context.Context, bucketName string, - metastoreEntry MetastoreEntry) error { + metastoreEntry MetastoreEntry, opts ...RequestOption) error { resource := fmt.Sprintf("/default/metastore/db/%s", bucketName) postBody, err := json.Marshal(metastoreEntry) if err != nil { @@ -18,6 +18,6 @@ func (client *BucketClient) CreateMetastoreEntry(ctx context.Context, bucketName } } _, err = client.Request(ctx, "CreateMetastoreEntry", "POST", resource, - RequestBodyOption(postBody)) + append([]RequestOption{RequestBodyOption(postBody)}, opts...)...) return err } diff --git a/go/deletebucket.go b/go/deletebucket.go index 5c021af..4b2061c 100644 --- a/go/deletebucket.go +++ b/go/deletebucket.go @@ -6,9 +6,10 @@ import ( ) // DeleteBucket deletes a bucket entry from metadata. -func (client *BucketClient) DeleteBucket(ctx context.Context, bucketName string) error { +func (client *BucketClient) DeleteBucket(ctx context.Context, bucketName string, + opts ...RequestOption) error { resource := fmt.Sprintf("/default/bucket/%s", bucketName) - _, err := client.Request(ctx, "DeleteBucket", "DELETE", resource) + _, err := client.Request(ctx, "DeleteBucket", "DELETE", resource, opts...) return err } diff --git a/go/deletemetastoreentry.go b/go/deletemetastoreentry.go index 4f0ba37..d447ab8 100644 --- a/go/deletemetastoreentry.go +++ b/go/deletemetastoreentry.go @@ -6,8 +6,9 @@ import ( ) // DeleteMetastoreEntry deletes the metastore entry for the given bucket -func (client *BucketClient) DeleteMetastoreEntry(ctx context.Context, bucketName string) error { +func (client *BucketClient) DeleteMetastoreEntry(ctx context.Context, bucketName string, + opts ...RequestOption) error { resource := fmt.Sprintf("/default/metastore/db/%s", bucketName) - _, err := client.Request(ctx, "DeleteMetastoreEntry", "DELETE", resource) + _, err := client.Request(ctx, "DeleteMetastoreEntry", "DELETE", resource, opts...) return err } diff --git a/go/getbucketattributes.go b/go/getbucketattributes.go index 58ace16..c5e9f1d 100644 --- a/go/getbucketattributes.go +++ b/go/getbucketattributes.go @@ -7,7 +7,8 @@ import ( // GetBucketAttributes retrieves the JSON blob containing the bucket // attributes attached to a bucket. -func (client *BucketClient) GetBucketAttributes(ctx context.Context, bucketName string) ([]byte, error) { +func (client *BucketClient) GetBucketAttributes(ctx context.Context, bucketName string, + opts ...RequestOption) ([]byte, error) { resource := fmt.Sprintf("/default/attributes/%s", bucketName) - return client.Request(ctx, "GetBucketAttributes", "GET", resource) + return client.Request(ctx, "GetBucketAttributes", "GET", resource, opts...) } diff --git a/go/getmetastoreentry.go b/go/getmetastoreentry.go index 9e6d1fa..a96c627 100644 --- a/go/getmetastoreentry.go +++ b/go/getmetastoreentry.go @@ -7,9 +7,10 @@ import ( ) // GetMetastoreEntry retrieves and parses a metastore entry for the given bucket -func (client *BucketClient) GetMetastoreEntry(ctx context.Context, bucketName string) (MetastoreEntry, error) { +func (client *BucketClient) GetMetastoreEntry(ctx context.Context, bucketName string, + opts ...RequestOption) (MetastoreEntry, error) { resource := fmt.Sprintf("/default/metastore/db/%s", bucketName) - responseBody, err := client.Request(ctx, "GetMetastoreEntry", "GET", resource) + responseBody, err := client.Request(ctx, "GetMetastoreEntry", "GET", resource, opts...) if err != nil { return MetastoreEntry{}, err } diff --git a/go/listbasic.go b/go/listbasic.go index 8549aff..68bc231 100644 --- a/go/listbasic.go +++ b/go/listbasic.go @@ -75,6 +75,14 @@ func ListBasicNoValuesOption() ListBasicOption { } } +// ListBasicRequestUIDsOption attaches existing request UIDs to the ListBasic request +func ListBasicRequestUIDsOption(uids string) ListBasicOption { + return func(opts *listBasicOptionSet) error { + opts.requestUIDs = uids + return nil + } +} + type ListBasicEntry struct { Key string `json:"key"` Value string `json:"value"` @@ -83,13 +91,14 @@ type ListBasicEntry struct { type ListBasicResponse []ListBasicEntry type listBasicOptionSet struct { - gt *string - gte *string - lt *string - lte *string - maxKeys *int - noKeys bool - noValues bool + gt *string + gte *string + lt *string + lte *string + maxKeys *int + noKeys bool + noValues bool + requestUIDs string } func parseListBasicOptions(opts []ListBasicOption) (listBasicOptionSet, error) { @@ -139,7 +148,11 @@ func (client *BucketClient) ListBasic(ctx context.Context, u, _ := url.Parse(resource) u.RawQuery = query.Encode() resource = u.String() - responseBody, err := client.Request(ctx, "ListBasic", "GET", resource) + requestOptions := []RequestOption{} + if options.requestUIDs != "" { + requestOptions = append(requestOptions, RequestUIDsOption(options.requestUIDs)) + } + responseBody, err := client.Request(ctx, "ListBasic", "GET", resource, requestOptions...) if err != nil { return nil, err } diff --git a/go/listobjectversions.go b/go/listobjectversions.go index 00b4586..58c1da8 100644 --- a/go/listobjectversions.go +++ b/go/listobjectversions.go @@ -44,6 +44,15 @@ func ListObjectVersionsLastMarkerOption(lastKeyMarker string, lastVersionIdMarke } } +// ListObjectVersionsRequestUIDsOption attaches existing request UIDs +// to the ListObjectVersions request +func ListObjectVersionsRequestUIDsOption(uids string) ListObjectVersionsOption { + return func(opts *listObjectVersionsOptionSet) error { + opts.requestUIDs = uids + return nil + } +} + type ListObjectVersionsEntry struct { Key string `json:"key"` VersionId string `json:"versionId"` @@ -64,6 +73,7 @@ type listObjectVersionsOptionSet struct { maxKeys *int lastKeyMarker *string lastVersionIdMarker *string + requestUIDs string } func parseListObjectVersionsOptions(opts []ListObjectVersionsOption) (listObjectVersionsOptionSet, error) { @@ -99,7 +109,12 @@ func (client *BucketClient) ListObjectVersions(ctx context.Context, u, _ := url.Parse(resource) u.RawQuery = query.Encode() resource = u.String() - responseBody, err := client.Request(ctx, "ListObjectVersions", "GET", resource) + requestOptions := []RequestOption{} + if options.requestUIDs != "" { + requestOptions = append(requestOptions, RequestUIDsOption(options.requestUIDs)) + } + responseBody, err := client.Request(ctx, "ListObjectVersions", "GET", resource, + requestOptions...) if err != nil { return nil, err } diff --git a/go/postbatch.go b/go/postbatch.go index debc1a4..680bf28 100644 --- a/go/postbatch.go +++ b/go/postbatch.go @@ -13,7 +13,7 @@ type PostBatchEntry struct { } func (client *BucketClient) PostBatch(ctx context.Context, - bucketName string, batch []PostBatchEntry) error { + bucketName string, batch []PostBatchEntry, opts ...RequestOption) error { resource := fmt.Sprintf("/default/batch/%s", bucketName) postPayload := struct { Batch []PostBatchEntry `json:"batch"` @@ -26,10 +26,12 @@ func (client *BucketClient) PostBatch(ctx context.Context, } } _, err = client.Request(ctx, "PostBatch", "POST", resource, - RequestBodyOption(postBody), - RequestBodyContentTypeOption("application/json"), - // Because we write a batch of low-level entries directly to - // the database, the request is idempotent. - RequestIdempotent) + append([]RequestOption{ + RequestBodyOption(postBody), + RequestBodyContentTypeOption("application/json"), + // Because we write a batch of low-level entries directly to + // the database, the request is idempotent. + RequestIdempotent, + }, opts...)...) return err } diff --git a/go/putbucketattributes.go b/go/putbucketattributes.go index c5b83a2..98b9288 100644 --- a/go/putbucketattributes.go +++ b/go/putbucketattributes.go @@ -8,9 +8,9 @@ import ( // PutBucketAttributes updates the bucket attributes with a new JSON blob. func (client *BucketClient) PutBucketAttributes(ctx context.Context, bucketName string, - bucketAttributes []byte) error { + bucketAttributes []byte, opts ...RequestOption) error { resource := fmt.Sprintf("/default/attributes/%s", url.PathEscape(bucketName)) _, err := client.Request(ctx, "PutBucketAttributes", "POST", resource, - RequestBodyOption(bucketAttributes)) + append([]RequestOption{RequestBodyOption(bucketAttributes)}, opts...)...) return err }