Skip to content

Commit

Permalink
impr: BKTCLT-36 go client: attach x-scal-request-uids header
Browse files Browse the repository at this point in the history
Add a x-scal-request-uids header to requests sent to bucketd,
containing the existing UIDs passed by the caller via the appropriate
option (if passed)
  • Loading branch information
jonathan-gramain committed Jan 8, 2025
1 parent da505af commit 31054f9
Show file tree
Hide file tree
Showing 19 changed files with 130 additions and 53 deletions.
5 changes: 3 additions & 2 deletions go/admin_bucketrefreshcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
// AdminBucketRefreshCache refreshes the bucketd cache of metastore
// entries for the given bucket. Useful after switching the raft session
// of a bucket.
func (client *BucketClient) AdminBucketRefreshCache(ctx context.Context, bucketName string) error {
func (client *BucketClient) AdminBucketRefreshCache(ctx context.Context, bucketName string,
opts ...RequestOption) error {
resource := fmt.Sprintf("/_/buckets/%s/refreshCache", url.PathEscape(bucketName))
_, err := client.Request(ctx, "AdminBucketRefreshCache", "GET", resource)
_, err := client.Request(ctx, "AdminBucketRefreshCache", "GET", resource, opts...)
return err
}
4 changes: 2 additions & 2 deletions go/admin_getbucketaccessmode.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (
// - "read-only" when the bucket is only accessible for read operations.
// Returns "" and an error if the bucket doesn't exist, or if a request error occurs.
func (client *BucketClient) AdminGetBucketAccessMode(ctx context.Context,
bucketName string) (BucketAccessMode, error) {
bucketName string, opts ...RequestOption) (BucketAccessMode, error) {
// Escape the bucket name to avoid any risk to inadvertently or maliciously
// call another route with an incorrect/crafted bucket name containing slashes.
resource := fmt.Sprintf("/_/buckets/%s/accessMode", url.PathEscape(bucketName))
responseBody, err := client.Request(ctx, "AdminGetBucketAccessMode", "GET", resource)
responseBody, err := client.Request(ctx, "AdminGetBucketAccessMode", "GET", resource, opts...)
if err != nil {
return "", err
}
Expand Down
5 changes: 3 additions & 2 deletions go/admin_getbucketsessionid.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ import (

// AdminGetBucketSessionID returns the raft session ID of the given bucket.
// Returns 0 and an error if the bucket doesn't exist, or if a request error occurs.
func (client *BucketClient) AdminGetBucketSessionID(ctx context.Context, bucketName string) (int, error) {
func (client *BucketClient) AdminGetBucketSessionID(ctx context.Context, bucketName string,
opts ...RequestOption) (int, error) {
// Escape the bucket name to avoid any risk to inadvertently or maliciously
// call another route with an incorrect/crafted bucket name containing slashes.
resource := fmt.Sprintf("/_/buckets/%s/id", url.PathEscape(bucketName))
responseBody, err := client.Request(ctx, "AdminGetBucketSessionID", "GET", resource)
responseBody, err := client.Request(ctx, "AdminGetBucketSessionID", "GET", resource, opts...)
if err != nil {
return 0, err
}
Expand Down
10 changes: 6 additions & 4 deletions go/admin_getsessioninfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (

// AdminGetAllSessionsInfo returns raft session info for all Metadata
// raft sessions available on the S3C deployment.
func (client *BucketClient) AdminGetAllSessionsInfo(ctx context.Context) ([]SessionInfo, error) {
responseBody, err := client.Request(ctx, "AdminGetAllSessionsInfo", "GET", "/_/raft_sessions")
func (client *BucketClient) AdminGetAllSessionsInfo(ctx context.Context,
opts ...RequestOption) ([]SessionInfo, error) {
responseBody, err := client.Request(ctx, "AdminGetAllSessionsInfo", "GET", "/_/raft_sessions",
opts...)
if err != nil {
return nil, err
}
Expand All @@ -27,13 +29,13 @@ func (client *BucketClient) AdminGetAllSessionsInfo(ctx context.Context) ([]Sess
// Returns nil and an error if the raft session doesn't exist, or if a request
// error occurs.
func (client *BucketClient) AdminGetSessionInfo(ctx context.Context,
sessionId int) (*SessionInfo, error) {
sessionId int, opts ...RequestOption) (*SessionInfo, error) {
// When querying /_/raft_sessions/X/info, bucketd returns a
// status 500 if the raft session doesn't exist, which is hard
// to distinguish from a generic type of failure. For this
// reason, instead, we fetch the info for all raft sessions
// then lookup the one we want.
sessionsInfo, err := client.AdminGetAllSessionsInfo(ctx)
sessionsInfo, err := client.AdminGetAllSessionsInfo(ctx, opts...)
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions go/admin_getsessionleader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (
// Returns nil and an error if the raft session doesn't exist, if
// bucketd is not connected to the leader, or if a request error
// occurs.
func (client *BucketClient) AdminGetSessionLeader(ctx context.Context, sessionId int) (*MemberInfo, error) {
func (client *BucketClient) AdminGetSessionLeader(ctx context.Context, sessionId int,
opts ...RequestOption) (*MemberInfo, error) {
resource := fmt.Sprintf("/_/raft_sessions/%d/leader", sessionId)
responseBody, err := client.Request(ctx, "AdminGetSessionLeader", "GET", resource)
responseBody, err := client.Request(ctx, "AdminGetSessionLeader", "GET", resource, opts...)
if err != nil {
return nil, err
}
Expand Down
8 changes: 5 additions & 3 deletions go/admin_getsessionlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ type AdminGetSessionLogResponse struct {
// if targetLeader is true, it fetches the oplog from the leader,
// otherwise fetches from one of the followers
func (client *BucketClient) AdminGetSessionLog(ctx context.Context,
sessionId int, beginSeq int64, nRecords int, targetLeader bool) (*AdminGetSessionLogResponse, error) {
resource := fmt.Sprintf("/_/raft_sessions/%d/log?begin=%d&limit=%d", sessionId, beginSeq, nRecords)
sessionId int, beginSeq int64, nRecords int, targetLeader bool,
opts ...RequestOption) (*AdminGetSessionLogResponse, error) {
resource := fmt.Sprintf("/_/raft_sessions/%d/log?begin=%d&limit=%d",
sessionId, beginSeq, nRecords)
if targetLeader {
resource += "&target_leader=true"
}
responseBody, err := client.Request(ctx, "AdminGetSessionLog", "GET", resource)
responseBody, err := client.Request(ctx, "AdminGetSessionLog", "GET", resource, opts...)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions go/admin_setbucketaccessmode.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
//
// Returns an error if the bucket doesn't exist, or if a request error occurs.
func (client *BucketClient) AdminSetBucketAccessMode(ctx context.Context,
bucketName string, accessMode BucketAccessMode) error {
bucketName string, accessMode BucketAccessMode, opts ...RequestOption) error {
resource := fmt.Sprintf("/_/buckets/%s/accessMode?mode=%s",
url.PathEscape(bucketName), accessMode)
_, err := client.Request(ctx, "AdminSetBucketAccessMode", "PUT", resource)
_, err := client.Request(ctx, "AdminSetBucketAccessMode", "PUT", resource, opts...)
return err
}
12 changes: 12 additions & 0 deletions go/bucketclientrequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
type requestOptionSet struct {
requestBody []byte
requestBodyContentType string
requestUIDs string
idempotent bool
}

Expand All @@ -29,6 +30,12 @@ func RequestBodyContentTypeOption(contentType string) RequestOption {
}
}

func RequestUIDsOption(uids string) RequestOption {
return func(ros *requestOptionSet) {
ros.requestUIDs = uids
}
}

func RequestIdempotent(ros *requestOptionSet) {
ros.idempotent = true
}
Expand Down Expand Up @@ -64,6 +71,11 @@ func (client *BucketClient) Request(ctx context.Context,
if options.requestBodyContentType != "" {
request.Header.Add("Content-Type", string(options.requestBodyContentType))
}

if options.requestUIDs != "" {
request.Header.Add("x-scal-request-uids", options.requestUIDs)
}

if options.idempotent {
request.Header["Idempotency-Key"] = []string{}
}
Expand Down
15 changes: 15 additions & 0 deletions go/bucketclientrequest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,21 @@ var _ = Describe("BucketClient.Request()", func() {
"/default/bucket/somebucket/someobject")).To(Equal(
[]byte(`{"some":"metadata","version":"1234"}`)))
})
It("succeeds with a 200 response on GET request with provided request UIDs", func(ctx SpecContext) {
httpmock.RegisterResponder(
"GET", "http://localhost:9000/default/bucket/somebucket/someobject",
func(req *http.Request) (*http.Response, error) {
uids := req.Header.Get("x-scal-request-uids")
Expect(uids).To(Equal("my-uids"))

return httpmock.NewStringResponse(200, `{"some":"metadata","version":"1234"}`), nil
},
)
Expect(client.Request(ctx, "GetObject", "GET",
"/default/bucket/somebucket/someobject",
bucketclient.RequestUIDsOption("my-uids"))).To(Equal(
[]byte(`{"some":"metadata","version":"1234"}`)))
})
It("sends PUT request with body and succeeds with a 200 response", func(ctx SpecContext) {
httpmock.RegisterResponder(
"PUT", "http://localhost:9000/default/bucket/somebucket/someobject",
Expand Down
27 changes: 18 additions & 9 deletions go/createbucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
type createBucketOptionSet struct {
sessionId int
makeIdempotent bool
requestUIDs string
}

type CreateBucketOption func(*createBucketOptionSet)
Expand All @@ -26,22 +27,27 @@ func CreateBucketMakeIdempotent(options *createBucketOptionSet) {
options.makeIdempotent = true
}

func CreateBucketRequestUIDsOption(uids string) CreateBucketOption {
return func(options *createBucketOptionSet) {
options.requestUIDs = uids
}
}

// CreateBucket creates a bucket in metadata.
// bucketAttributes is a JSON blob of bucket attributes
// opts is a set of options:
//
// CreateBucketSessionIdOption forces the session ID where the bucket to be
// created will land
// CreateBucketSessionIdOption forces the session ID where the bucket to be
// created will land
//
// CreateBucketMakeIdempotent makes the request return a success if a bucket
// with the same UID already exists (otherwise returns 409 Conflict, as
// if the option is not passed)
// CreateBucketMakeIdempotent makes the request return a success if a bucket
// with the same UID already exists (otherwise returns 409 Conflict, as
// if the option is not passed)
//
// CreateBucketRequestUIDsOption attaches existing UIDs to the CreateBucket request
func (client *BucketClient) CreateBucket(ctx context.Context,
bucketName string, bucketAttributes []byte, opts ...CreateBucketOption) error {
parsedOpts := createBucketOptionSet{
sessionId: 0,
makeIdempotent: false,
}
parsedOpts := createBucketOptionSet{}
for _, opt := range opts {
opt(&parsedOpts)
}
Expand All @@ -66,6 +72,9 @@ func (client *BucketClient) CreateBucket(ctx context.Context,
// then be considered a success)
requestOptions = append(requestOptions, RequestIdempotent)
}
if parsedOpts.requestUIDs != "" {
requestOptions = append(requestOptions, RequestUIDsOption(parsedOpts.requestUIDs))
}
_, err := client.Request(ctx, "CreateBucket", "POST", resource, requestOptions...)
if err == nil {
return nil
Expand Down
4 changes: 2 additions & 2 deletions go/createmetastoreentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

// CreateMetastoreEntry creates or updates a metastore entry for the given bucket
func (client *BucketClient) CreateMetastoreEntry(ctx context.Context, bucketName string,
metastoreEntry MetastoreEntry) error {
metastoreEntry MetastoreEntry, opts ...RequestOption) error {
resource := fmt.Sprintf("/default/metastore/db/%s", bucketName)
postBody, err := json.Marshal(metastoreEntry)
if err != nil {
Expand All @@ -18,6 +18,6 @@ func (client *BucketClient) CreateMetastoreEntry(ctx context.Context, bucketName
}
}
_, err = client.Request(ctx, "CreateMetastoreEntry", "POST", resource,
RequestBodyOption(postBody))
append([]RequestOption{RequestBodyOption(postBody)}, opts...)...)
return err
}
5 changes: 3 additions & 2 deletions go/deletebucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (
)

// DeleteBucket deletes a bucket entry from metadata.
func (client *BucketClient) DeleteBucket(ctx context.Context, bucketName string) error {
func (client *BucketClient) DeleteBucket(ctx context.Context, bucketName string,
opts ...RequestOption) error {
resource := fmt.Sprintf("/default/bucket/%s", bucketName)

_, err := client.Request(ctx, "DeleteBucket", "DELETE", resource)
_, err := client.Request(ctx, "DeleteBucket", "DELETE", resource, opts...)
return err
}
5 changes: 3 additions & 2 deletions go/deletemetastoreentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
)

// DeleteMetastoreEntry deletes the metastore entry for the given bucket
func (client *BucketClient) DeleteMetastoreEntry(ctx context.Context, bucketName string) error {
func (client *BucketClient) DeleteMetastoreEntry(ctx context.Context, bucketName string,
opts ...RequestOption) error {
resource := fmt.Sprintf("/default/metastore/db/%s", bucketName)
_, err := client.Request(ctx, "DeleteMetastoreEntry", "DELETE", resource)
_, err := client.Request(ctx, "DeleteMetastoreEntry", "DELETE", resource, opts...)
return err
}
5 changes: 3 additions & 2 deletions go/getbucketattributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import (

// GetBucketAttributes retrieves the JSON blob containing the bucket
// attributes attached to a bucket.
func (client *BucketClient) GetBucketAttributes(ctx context.Context, bucketName string) ([]byte, error) {
func (client *BucketClient) GetBucketAttributes(ctx context.Context, bucketName string,
opts ...RequestOption) ([]byte, error) {
resource := fmt.Sprintf("/default/attributes/%s", bucketName)
return client.Request(ctx, "GetBucketAttributes", "GET", resource)
return client.Request(ctx, "GetBucketAttributes", "GET", resource, opts...)
}
5 changes: 3 additions & 2 deletions go/getmetastoreentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (
)

// GetMetastoreEntry retrieves and parses a metastore entry for the given bucket
func (client *BucketClient) GetMetastoreEntry(ctx context.Context, bucketName string) (MetastoreEntry, error) {
func (client *BucketClient) GetMetastoreEntry(ctx context.Context, bucketName string,
opts ...RequestOption) (MetastoreEntry, error) {
resource := fmt.Sprintf("/default/metastore/db/%s", bucketName)
responseBody, err := client.Request(ctx, "GetMetastoreEntry", "GET", resource)
responseBody, err := client.Request(ctx, "GetMetastoreEntry", "GET", resource, opts...)
if err != nil {
return MetastoreEntry{}, err
}
Expand Down
29 changes: 21 additions & 8 deletions go/listbasic.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ func ListBasicNoValuesOption() ListBasicOption {
}
}

// ListBasicRequestUIDsOption attaches existing request UIDs to the ListBasic request
func ListBasicRequestUIDsOption(uids string) ListBasicOption {
return func(opts *listBasicOptionSet) error {
opts.requestUIDs = uids
return nil
}
}

type ListBasicEntry struct {
Key string `json:"key"`
Value string `json:"value"`
Expand All @@ -83,13 +91,14 @@ type ListBasicEntry struct {
type ListBasicResponse []ListBasicEntry

type listBasicOptionSet struct {
gt *string
gte *string
lt *string
lte *string
maxKeys *int
noKeys bool
noValues bool
gt *string
gte *string
lt *string
lte *string
maxKeys *int
noKeys bool
noValues bool
requestUIDs string
}

func parseListBasicOptions(opts []ListBasicOption) (listBasicOptionSet, error) {
Expand Down Expand Up @@ -139,7 +148,11 @@ func (client *BucketClient) ListBasic(ctx context.Context,
u, _ := url.Parse(resource)
u.RawQuery = query.Encode()
resource = u.String()
responseBody, err := client.Request(ctx, "ListBasic", "GET", resource)
requestOptions := []RequestOption{}
if options.requestUIDs != "" {
requestOptions = append(requestOptions, RequestUIDsOption(options.requestUIDs))
}
responseBody, err := client.Request(ctx, "ListBasic", "GET", resource, requestOptions...)
if err != nil {
return nil, err
}
Expand Down
17 changes: 16 additions & 1 deletion go/listobjectversions.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ func ListObjectVersionsLastMarkerOption(lastKeyMarker string, lastVersionIdMarke
}
}

// ListObjectVersionsRequestUIDsOption attaches existing request UIDs
// to the ListObjectVersions request
func ListObjectVersionsRequestUIDsOption(uids string) ListObjectVersionsOption {
return func(opts *listObjectVersionsOptionSet) error {
opts.requestUIDs = uids
return nil
}
}

type ListObjectVersionsEntry struct {
Key string `json:"key"`
VersionId string `json:"versionId"`
Expand All @@ -64,6 +73,7 @@ type listObjectVersionsOptionSet struct {
maxKeys *int
lastKeyMarker *string
lastVersionIdMarker *string
requestUIDs string
}

func parseListObjectVersionsOptions(opts []ListObjectVersionsOption) (listObjectVersionsOptionSet, error) {
Expand Down Expand Up @@ -99,7 +109,12 @@ func (client *BucketClient) ListObjectVersions(ctx context.Context,
u, _ := url.Parse(resource)
u.RawQuery = query.Encode()
resource = u.String()
responseBody, err := client.Request(ctx, "ListObjectVersions", "GET", resource)
requestOptions := []RequestOption{}
if options.requestUIDs != "" {
requestOptions = append(requestOptions, RequestUIDsOption(options.requestUIDs))
}
responseBody, err := client.Request(ctx, "ListObjectVersions", "GET", resource,
requestOptions...)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 31054f9

Please sign in to comment.