Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

impr: BKTCLT-36 go client: attach x-scal-request-uids header #272

Merged
merged 2 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions go/admin_bucketrefreshcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
// AdminBucketRefreshCache refreshes the bucketd cache of metastore
// entries for the given bucket. Useful after switching the raft session
// of a bucket.
func (client *BucketClient) AdminBucketRefreshCache(ctx context.Context, bucketName string) error {
func (client *BucketClient) AdminBucketRefreshCache(ctx context.Context, bucketName string,
opts ...RequestOption) error {
resource := fmt.Sprintf("/_/buckets/%s/refreshCache", url.PathEscape(bucketName))
_, err := client.Request(ctx, "AdminBucketRefreshCache", "GET", resource)
_, err := client.Request(ctx, "AdminBucketRefreshCache", "GET", resource, opts...)
return err
}
4 changes: 2 additions & 2 deletions go/admin_getbucketaccessmode.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ import (
// - "read-only" when the bucket is only accessible for read operations.
// Returns "" and an error if the bucket doesn't exist, or if a request error occurs.
func (client *BucketClient) AdminGetBucketAccessMode(ctx context.Context,
bucketName string) (BucketAccessMode, error) {
bucketName string, opts ...RequestOption) (BucketAccessMode, error) {
// Escape the bucket name to avoid any risk to inadvertently or maliciously
// call another route with an incorrect/crafted bucket name containing slashes.
resource := fmt.Sprintf("/_/buckets/%s/accessMode", url.PathEscape(bucketName))
responseBody, err := client.Request(ctx, "AdminGetBucketAccessMode", "GET", resource)
responseBody, err := client.Request(ctx, "AdminGetBucketAccessMode", "GET", resource, opts...)
if err != nil {
return "", err
}
Expand Down
5 changes: 3 additions & 2 deletions go/admin_getbucketsessionid.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ import (

// AdminGetBucketSessionID returns the raft session ID of the given bucket.
// Returns 0 and an error if the bucket doesn't exist, or if a request error occurs.
func (client *BucketClient) AdminGetBucketSessionID(ctx context.Context, bucketName string) (int, error) {
func (client *BucketClient) AdminGetBucketSessionID(ctx context.Context, bucketName string,
opts ...RequestOption) (int, error) {
// Escape the bucket name to avoid any risk to inadvertently or maliciously
// call another route with an incorrect/crafted bucket name containing slashes.
resource := fmt.Sprintf("/_/buckets/%s/id", url.PathEscape(bucketName))
responseBody, err := client.Request(ctx, "AdminGetBucketSessionID", "GET", resource)
responseBody, err := client.Request(ctx, "AdminGetBucketSessionID", "GET", resource, opts...)
if err != nil {
return 0, err
}
Expand Down
10 changes: 6 additions & 4 deletions go/admin_getsessioninfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (

// AdminGetAllSessionsInfo returns raft session info for all Metadata
// raft sessions available on the S3C deployment.
func (client *BucketClient) AdminGetAllSessionsInfo(ctx context.Context) ([]SessionInfo, error) {
responseBody, err := client.Request(ctx, "AdminGetAllSessionsInfo", "GET", "/_/raft_sessions")
func (client *BucketClient) AdminGetAllSessionsInfo(ctx context.Context,
opts ...RequestOption) ([]SessionInfo, error) {
responseBody, err := client.Request(ctx, "AdminGetAllSessionsInfo", "GET", "/_/raft_sessions",
opts...)
if err != nil {
return nil, err
}
Expand All @@ -27,13 +29,13 @@ func (client *BucketClient) AdminGetAllSessionsInfo(ctx context.Context) ([]Sess
// Returns nil and an error if the raft session doesn't exist, or if a request
// error occurs.
func (client *BucketClient) AdminGetSessionInfo(ctx context.Context,
sessionId int) (*SessionInfo, error) {
sessionId int, opts ...RequestOption) (*SessionInfo, error) {
// When querying /_/raft_sessions/X/info, bucketd returns a
// status 500 if the raft session doesn't exist, which is hard
// to distinguish from a generic type of failure. For this
// reason, instead, we fetch the info for all raft sessions
// then lookup the one we want.
sessionsInfo, err := client.AdminGetAllSessionsInfo(ctx)
sessionsInfo, err := client.AdminGetAllSessionsInfo(ctx, opts...)
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions go/admin_getsessionleader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (
// Returns nil and an error if the raft session doesn't exist, if
// bucketd is not connected to the leader, or if a request error
// occurs.
func (client *BucketClient) AdminGetSessionLeader(ctx context.Context, sessionId int) (*MemberInfo, error) {
func (client *BucketClient) AdminGetSessionLeader(ctx context.Context, sessionId int,
opts ...RequestOption) (*MemberInfo, error) {
resource := fmt.Sprintf("/_/raft_sessions/%d/leader", sessionId)
responseBody, err := client.Request(ctx, "AdminGetSessionLeader", "GET", resource)
responseBody, err := client.Request(ctx, "AdminGetSessionLeader", "GET", resource, opts...)
if err != nil {
return nil, err
}
Expand Down
8 changes: 5 additions & 3 deletions go/admin_getsessionlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ type AdminGetSessionLogResponse struct {
// if targetLeader is true, it fetches the oplog from the leader,
// otherwise fetches from one of the followers
func (client *BucketClient) AdminGetSessionLog(ctx context.Context,
sessionId int, beginSeq int64, nRecords int, targetLeader bool) (*AdminGetSessionLogResponse, error) {
resource := fmt.Sprintf("/_/raft_sessions/%d/log?begin=%d&limit=%d", sessionId, beginSeq, nRecords)
sessionId int, beginSeq int64, nRecords int, targetLeader bool,
opts ...RequestOption) (*AdminGetSessionLogResponse, error) {
resource := fmt.Sprintf("/_/raft_sessions/%d/log?begin=%d&limit=%d",
sessionId, beginSeq, nRecords)
if targetLeader {
resource += "&target_leader=true"
}
responseBody, err := client.Request(ctx, "AdminGetSessionLog", "GET", resource)
responseBody, err := client.Request(ctx, "AdminGetSessionLog", "GET", resource, opts...)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions go/admin_setbucketaccessmode.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ import (
//
// Returns an error if the bucket doesn't exist, or if a request error occurs.
func (client *BucketClient) AdminSetBucketAccessMode(ctx context.Context,
bucketName string, accessMode BucketAccessMode) error {
bucketName string, accessMode BucketAccessMode, opts ...RequestOption) error {
resource := fmt.Sprintf("/_/buckets/%s/accessMode?mode=%s",
url.PathEscape(bucketName), accessMode)
_, err := client.Request(ctx, "AdminSetBucketAccessMode", "PUT", resource)
_, err := client.Request(ctx, "AdminSetBucketAccessMode", "PUT", resource, opts...)
return err
}
12 changes: 12 additions & 0 deletions go/bucketclientrequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
type requestOptionSet struct {
requestBody []byte
requestBodyContentType string
requestUIDs string
idempotent bool
}

Expand All @@ -29,6 +30,12 @@ func RequestBodyContentTypeOption(contentType string) RequestOption {
}
}

func RequestUIDsOption(uids string) RequestOption {
return func(ros *requestOptionSet) {
ros.requestUIDs = uids
}
}

func RequestIdempotent(ros *requestOptionSet) {
ros.idempotent = true
}
Expand Down Expand Up @@ -64,6 +71,11 @@ func (client *BucketClient) Request(ctx context.Context,
if options.requestBodyContentType != "" {
request.Header.Add("Content-Type", string(options.requestBodyContentType))
}

if options.requestUIDs != "" {
request.Header.Add("x-scal-request-uids", options.requestUIDs)
}

if options.idempotent {
request.Header["Idempotency-Key"] = []string{}
}
Expand Down
15 changes: 15 additions & 0 deletions go/bucketclientrequest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,21 @@ var _ = Describe("BucketClient.Request()", func() {
"/default/bucket/somebucket/someobject")).To(Equal(
[]byte(`{"some":"metadata","version":"1234"}`)))
})
It("succeeds with a 200 response on GET request with provided request UIDs", func(ctx SpecContext) {
httpmock.RegisterResponder(
"GET", "http://localhost:9000/default/bucket/somebucket/someobject",
func(req *http.Request) (*http.Response, error) {
uids := req.Header.Get("x-scal-request-uids")
Expect(uids).To(Equal("my-uids"))

return httpmock.NewStringResponse(200, `{"some":"metadata","version":"1234"}`), nil
},
)
Expect(client.Request(ctx, "GetObject", "GET",
"/default/bucket/somebucket/someobject",
bucketclient.RequestUIDsOption("my-uids"))).To(Equal(
[]byte(`{"some":"metadata","version":"1234"}`)))
})
It("sends PUT request with body and succeeds with a 200 response", func(ctx SpecContext) {
httpmock.RegisterResponder(
"PUT", "http://localhost:9000/default/bucket/somebucket/someobject",
Expand Down
27 changes: 18 additions & 9 deletions go/createbucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
type createBucketOptionSet struct {
sessionId int
makeIdempotent bool
requestUIDs string
}

type CreateBucketOption func(*createBucketOptionSet)
Expand All @@ -26,22 +27,27 @@ func CreateBucketMakeIdempotent(options *createBucketOptionSet) {
options.makeIdempotent = true
}

func CreateBucketRequestUIDsOption(uids string) CreateBucketOption {
return func(options *createBucketOptionSet) {
options.requestUIDs = uids
}
}

// CreateBucket creates a bucket in metadata.
// bucketAttributes is a JSON blob of bucket attributes
// opts is a set of options:
//
// CreateBucketSessionIdOption forces the session ID where the bucket to be
// created will land
// CreateBucketSessionIdOption forces the session ID where the bucket to be
// created will land
//
// CreateBucketMakeIdempotent makes the request return a success if a bucket
// with the same UID already exists (otherwise returns 409 Conflict, as
// if the option is not passed)
// CreateBucketMakeIdempotent makes the request return a success if a bucket
// with the same UID already exists (otherwise returns 409 Conflict, as
// if the option is not passed)
//
// CreateBucketRequestUIDsOption attaches existing UIDs to the CreateBucket request
func (client *BucketClient) CreateBucket(ctx context.Context,
bucketName string, bucketAttributes []byte, opts ...CreateBucketOption) error {
parsedOpts := createBucketOptionSet{
sessionId: 0,
makeIdempotent: false,
}
parsedOpts := createBucketOptionSet{}
for _, opt := range opts {
opt(&parsedOpts)
}
Expand All @@ -66,6 +72,9 @@ func (client *BucketClient) CreateBucket(ctx context.Context,
// then be considered a success)
requestOptions = append(requestOptions, RequestIdempotent)
}
if parsedOpts.requestUIDs != "" {
requestOptions = append(requestOptions, RequestUIDsOption(parsedOpts.requestUIDs))
}
_, err := client.Request(ctx, "CreateBucket", "POST", resource, requestOptions...)
if err == nil {
return nil
Expand Down
4 changes: 2 additions & 2 deletions go/createmetastoreentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

// CreateMetastoreEntry creates or updates a metastore entry for the given bucket
func (client *BucketClient) CreateMetastoreEntry(ctx context.Context, bucketName string,
metastoreEntry MetastoreEntry) error {
metastoreEntry MetastoreEntry, opts ...RequestOption) error {
resource := fmt.Sprintf("/default/metastore/db/%s", bucketName)
postBody, err := json.Marshal(metastoreEntry)
if err != nil {
Expand All @@ -18,6 +18,6 @@ func (client *BucketClient) CreateMetastoreEntry(ctx context.Context, bucketName
}
}
_, err = client.Request(ctx, "CreateMetastoreEntry", "POST", resource,
RequestBodyOption(postBody))
append([]RequestOption{RequestBodyOption(postBody)}, opts...)...)
return err
}
5 changes: 3 additions & 2 deletions go/deletebucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ import (
)

// DeleteBucket deletes a bucket entry from metadata.
func (client *BucketClient) DeleteBucket(ctx context.Context, bucketName string) error {
func (client *BucketClient) DeleteBucket(ctx context.Context, bucketName string,
opts ...RequestOption) error {
resource := fmt.Sprintf("/default/bucket/%s", bucketName)

_, err := client.Request(ctx, "DeleteBucket", "DELETE", resource)
_, err := client.Request(ctx, "DeleteBucket", "DELETE", resource, opts...)
return err
}
5 changes: 3 additions & 2 deletions go/deletemetastoreentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
)

// DeleteMetastoreEntry deletes the metastore entry for the given bucket
func (client *BucketClient) DeleteMetastoreEntry(ctx context.Context, bucketName string) error {
func (client *BucketClient) DeleteMetastoreEntry(ctx context.Context, bucketName string,
opts ...RequestOption) error {
resource := fmt.Sprintf("/default/metastore/db/%s", bucketName)
_, err := client.Request(ctx, "DeleteMetastoreEntry", "DELETE", resource)
_, err := client.Request(ctx, "DeleteMetastoreEntry", "DELETE", resource, opts...)
return err
}
5 changes: 3 additions & 2 deletions go/getbucketattributes.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import (

// GetBucketAttributes retrieves the JSON blob containing the bucket
// attributes attached to a bucket.
func (client *BucketClient) GetBucketAttributes(ctx context.Context, bucketName string) ([]byte, error) {
func (client *BucketClient) GetBucketAttributes(ctx context.Context, bucketName string,
opts ...RequestOption) ([]byte, error) {
resource := fmt.Sprintf("/default/attributes/%s", bucketName)
return client.Request(ctx, "GetBucketAttributes", "GET", resource)
return client.Request(ctx, "GetBucketAttributes", "GET", resource, opts...)
}
5 changes: 3 additions & 2 deletions go/getmetastoreentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (
)

// GetMetastoreEntry retrieves and parses a metastore entry for the given bucket
func (client *BucketClient) GetMetastoreEntry(ctx context.Context, bucketName string) (MetastoreEntry, error) {
func (client *BucketClient) GetMetastoreEntry(ctx context.Context, bucketName string,
opts ...RequestOption) (MetastoreEntry, error) {
resource := fmt.Sprintf("/default/metastore/db/%s", bucketName)
responseBody, err := client.Request(ctx, "GetMetastoreEntry", "GET", resource)
responseBody, err := client.Request(ctx, "GetMetastoreEntry", "GET", resource, opts...)
if err != nil {
return MetastoreEntry{}, err
}
Expand Down
29 changes: 21 additions & 8 deletions go/listbasic.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,14 @@ func ListBasicNoValuesOption() ListBasicOption {
}
}

// ListBasicRequestUIDsOption attaches existing request UIDs to the ListBasic request
func ListBasicRequestUIDsOption(uids string) ListBasicOption {
return func(opts *listBasicOptionSet) error {
opts.requestUIDs = uids
return nil
}
}

type ListBasicEntry struct {
Key string `json:"key"`
Value string `json:"value"`
Expand All @@ -83,13 +91,14 @@ type ListBasicEntry struct {
type ListBasicResponse []ListBasicEntry

type listBasicOptionSet struct {
gt *string
gte *string
lt *string
lte *string
maxKeys *int
noKeys bool
noValues bool
gt *string
gte *string
lt *string
lte *string
maxKeys *int
noKeys bool
noValues bool
requestUIDs string
}

func parseListBasicOptions(opts []ListBasicOption) (listBasicOptionSet, error) {
Expand Down Expand Up @@ -139,7 +148,11 @@ func (client *BucketClient) ListBasic(ctx context.Context,
u, _ := url.Parse(resource)
u.RawQuery = query.Encode()
resource = u.String()
responseBody, err := client.Request(ctx, "ListBasic", "GET", resource)
requestOptions := []RequestOption{}
if options.requestUIDs != "" {
requestOptions = append(requestOptions, RequestUIDsOption(options.requestUIDs))
}
responseBody, err := client.Request(ctx, "ListBasic", "GET", resource, requestOptions...)
if err != nil {
return nil, err
}
Expand Down
17 changes: 16 additions & 1 deletion go/listobjectversions.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ func ListObjectVersionsLastMarkerOption(lastKeyMarker string, lastVersionIdMarke
}
}

// ListObjectVersionsRequestUIDsOption attaches existing request UIDs
// to the ListObjectVersions request
func ListObjectVersionsRequestUIDsOption(uids string) ListObjectVersionsOption {
return func(opts *listObjectVersionsOptionSet) error {
opts.requestUIDs = uids
return nil
}
}

type ListObjectVersionsEntry struct {
Key string `json:"key"`
VersionId string `json:"versionId"`
Expand All @@ -64,6 +73,7 @@ type listObjectVersionsOptionSet struct {
maxKeys *int
lastKeyMarker *string
lastVersionIdMarker *string
requestUIDs string
}

func parseListObjectVersionsOptions(opts []ListObjectVersionsOption) (listObjectVersionsOptionSet, error) {
Expand Down Expand Up @@ -99,7 +109,12 @@ func (client *BucketClient) ListObjectVersions(ctx context.Context,
u, _ := url.Parse(resource)
u.RawQuery = query.Encode()
resource = u.String()
responseBody, err := client.Request(ctx, "ListObjectVersions", "GET", resource)
requestOptions := []RequestOption{}
if options.requestUIDs != "" {
requestOptions = append(requestOptions, RequestUIDsOption(options.requestUIDs))
}
responseBody, err := client.Request(ctx, "ListObjectVersions", "GET", resource,
requestOptions...)
if err != nil {
return nil, err
}
Expand Down
Loading
Loading