From f5ae06e7edeabb5386c18ef0d07069adc866195e Mon Sep 17 00:00:00 2001 From: Jan K Date: Wed, 7 Jun 2023 20:52:51 +0200 Subject: [PATCH] blob: added url builder; --- .github/workflows/go.yml | 32 ++++---- .tool-versions | 1 + docs/log/usage.go | 2 +- go.mod | 2 +- pkg/apiserver/handler.go | 4 +- pkg/blob/mocks/UrlBuilder.go | 85 ++++++++++++++++++++ pkg/blob/store.go | 25 +++--- pkg/blob/stream.go | 7 +- pkg/blob/url_builder.go | 57 +++++++++++++ pkg/blob/url_builder_test.go | 75 +++++++++++++++++ pkg/cfg/read.go | 4 +- pkg/clock/clock.go | 1 + pkg/clock/fake_clock.go | 1 + pkg/clock/ticker.go | 1 + pkg/clock/timer.go | 1 + pkg/cloud/aws/awsv2_delayer.go | 13 +-- pkg/cloud/aws/kinesis/metadata_repository.go | 2 + pkg/cloud/aws/s3/client.go | 25 ++++-- pkg/cloud/aws/s3/client_test.go | 35 ++++++++ pkg/conc/poisoned_lock.go | 1 + pkg/dx/localstack_defaults.go | 39 --------- pkg/es/clients_v7.go | 5 +- pkg/fixtures/writer_blob.go | 3 +- pkg/log/ecs_metadata.go | 3 +- pkg/mapx/map.go | 4 + pkg/metric/channel.go | 20 ++--- pkg/stream/compression_test.go | 4 +- pkg/stream/input.go | 2 + pkg/stream/output_file_test.go | 3 +- pkg/test/env/component_localstack.go | 2 +- pkg/test/env/component_stream_input.go | 4 +- pkg/test/env/factory_localstack.go | 4 +- pkg/test/env/factory_wiremock.go | 4 +- pkg/test/env/filesystem.go | 7 +- test/fixtures/s3/s3_test.go | 12 +-- 35 files changed, 365 insertions(+), 125 deletions(-) create mode 100644 pkg/blob/mocks/UrlBuilder.go create mode 100644 pkg/blob/url_builder.go create mode 100644 pkg/blob/url_builder_test.go create mode 100644 pkg/cloud/aws/s3/client_test.go delete mode 100644 pkg/dx/localstack_defaults.go diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 220b076f6..64023325d 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -23,10 +23,10 @@ jobs: name: go fmt runs-on: ubuntu-20.04 steps: - - name: Set up Go 1.18 + - name: Set up Go 1.20 uses: actions/setup-go@v3 with: - go-version: "1.18" + go-version: "1.20" - name: Check out code into the Go module directory uses: actions/checkout@v3 @@ -72,10 +72,10 @@ jobs: name: go build runs-on: ubuntu-20.04 steps: - - name: Set up Go 1.18 + - name: Set up Go 1.20 uses: actions/setup-go@v3 with: - go-version: "1.18" + go-version: "1.20" - name: Check out code into the Go module directory uses: actions/checkout@v3 @@ -90,10 +90,10 @@ jobs: name: static code analysis (go vet) runs-on: ubuntu-20.04 steps: - - name: Set up Go 1.18 + - name: Set up Go 1.20 uses: actions/setup-go@v3 with: - go-version: "1.18" + go-version: "1.20" - name: Check out code into the Go module directory uses: actions/checkout@v3 @@ -108,10 +108,10 @@ jobs: name: static code analysis (golangci-lint) runs-on: ubuntu-20.04 steps: - - name: Set up Go 1.18 + - name: Set up Go 1.20 uses: actions/setup-go@v3 with: - go-version: "1.18" + go-version: "1.20" - name: Check out code into the Go module directory uses: actions/checkout@v3 @@ -126,10 +126,10 @@ jobs: name: go test runs-on: ubuntu-20.04 steps: - - name: Set up Go 1.18 + - name: Set up Go 1.20 uses: actions/setup-go@v3 with: - go-version: "1.18" + go-version: "1.20" - name: Check out code into the Go module directory uses: actions/checkout@v3 @@ -144,10 +144,10 @@ jobs: name: go test (race) runs-on: ubuntu-20.04 steps: - - name: Set up Go 1.18 + - name: Set up Go 1.20 uses: actions/setup-go@v3 with: - go-version: "1.18" + go-version: "1.20" - name: Check out code into the Go module directory uses: actions/checkout@v3 @@ -162,10 +162,10 @@ jobs: name: go test (integration) runs-on: ubuntu-20.04 steps: - - name: Set up Go 1.18 + - name: Set up Go 1.20 uses: actions/setup-go@v3 with: - go-version: "1.18" + go-version: "1.20" - name: Check out code into the Go module directory uses: actions/checkout@v3 @@ -180,10 +180,10 @@ jobs: name: go test (integration, race) runs-on: ubuntu-20.04 steps: - - name: Set up Go 1.18 + - name: Set up Go 1.20 uses: actions/setup-go@v3 with: - go-version: "1.18" + go-version: "1.20" - name: Check out code into the Go module directory uses: actions/checkout@v3 diff --git a/.tool-versions b/.tool-versions index 1f831b8dd..a996ce99a 100644 --- a/.tool-versions +++ b/.tool-versions @@ -1 +1,2 @@ golangci-lint 1.45.2 +mockery 2.22.1 \ No newline at end of file diff --git a/docs/log/usage.go b/docs/log/usage.go index b7fd9fc4b..f017884c9 100644 --- a/docs/log/usage.go +++ b/docs/log/usage.go @@ -1,4 +1,4 @@ -//nolint +// nolint package main import ( diff --git a/go.mod b/go.mod index 481d2263e..ba1c3bf85 100644 --- a/go.mod +++ b/go.mod @@ -184,4 +184,4 @@ require ( gopkg.in/yaml.v2 v2.4.0 // indirect ) -go 1.18 +go 1.20 diff --git a/pkg/apiserver/handler.go b/pkg/apiserver/handler.go index 63bdbac5c..d999dff0b 100644 --- a/pkg/apiserver/handler.go +++ b/pkg/apiserver/handler.go @@ -3,7 +3,7 @@ package apiserver import ( "context" "fmt" - "io/ioutil" + "io" "net/http" "net/url" "reflect" @@ -295,7 +295,7 @@ func handleWithMultipleBindings(handler HandlerWithMultipleBindings, errHandler func handleRaw(handler HandlerWithoutInput, errHandler ErrorHandler) gin.HandlerFunc { return func(ginCtx *gin.Context) { - body, err := ioutil.ReadAll(ginCtx.Request.Body) + body, err := io.ReadAll(ginCtx.Request.Body) if err != nil { handleError(ginCtx, errHandler, http.StatusBadRequest, gin.Error{ Err: err, diff --git a/pkg/blob/mocks/UrlBuilder.go b/pkg/blob/mocks/UrlBuilder.go new file mode 100644 index 000000000..172fd3bb5 --- /dev/null +++ b/pkg/blob/mocks/UrlBuilder.go @@ -0,0 +1,85 @@ +// Code generated by mockery v2.22.1. DO NOT EDIT. + +package mocks + +import mock "github.com/stretchr/testify/mock" + +// UrlBuilder is an autogenerated mock type for the UrlBuilder type +type UrlBuilder struct { + mock.Mock +} + +type UrlBuilder_Expecter struct { + mock *mock.Mock +} + +func (_m *UrlBuilder) EXPECT() *UrlBuilder_Expecter { + return &UrlBuilder_Expecter{mock: &_m.Mock} +} + +// GetAbsoluteUrl provides a mock function with given fields: path +func (_m *UrlBuilder) GetAbsoluteUrl(path string) (string, error) { + ret := _m.Called(path) + + var r0 string + var r1 error + if rf, ok := ret.Get(0).(func(string) (string, error)); ok { + return rf(path) + } + if rf, ok := ret.Get(0).(func(string) string); ok { + r0 = rf(path) + } else { + r0 = ret.Get(0).(string) + } + + if rf, ok := ret.Get(1).(func(string) error); ok { + r1 = rf(path) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// UrlBuilder_GetAbsoluteUrl_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetAbsoluteUrl' +type UrlBuilder_GetAbsoluteUrl_Call struct { + *mock.Call +} + +// GetAbsoluteUrl is a helper method to define mock.On call +// - path string +func (_e *UrlBuilder_Expecter) GetAbsoluteUrl(path interface{}) *UrlBuilder_GetAbsoluteUrl_Call { + return &UrlBuilder_GetAbsoluteUrl_Call{Call: _e.mock.On("GetAbsoluteUrl", path)} +} + +func (_c *UrlBuilder_GetAbsoluteUrl_Call) Run(run func(path string)) *UrlBuilder_GetAbsoluteUrl_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(string)) + }) + return _c +} + +func (_c *UrlBuilder_GetAbsoluteUrl_Call) Return(_a0 string, _a1 error) *UrlBuilder_GetAbsoluteUrl_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *UrlBuilder_GetAbsoluteUrl_Call) RunAndReturn(run func(string) (string, error)) *UrlBuilder_GetAbsoluteUrl_Call { + _c.Call.Return(run) + return _c +} + +type mockConstructorTestingTNewUrlBuilder interface { + mock.TestingT + Cleanup(func()) +} + +// NewUrlBuilder creates a new instance of UrlBuilder. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewUrlBuilder(t mockConstructorTestingTNewUrlBuilder) *UrlBuilder { + mock := &UrlBuilder{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/pkg/blob/store.go b/pkg/blob/store.go index 87d1e5609..3146cbef0 100644 --- a/pkg/blob/store.go +++ b/pkg/blob/store.go @@ -122,15 +122,7 @@ func CreateKey() string { func NewStore(ctx context.Context, config cfg.Config, logger log.Logger, name string) (Store, error) { channels := ProvideBatchRunnerChannels(config) - - var settings Settings - key := fmt.Sprintf("blob.%s", name) - config.UnmarshalKey(key, &settings) - settings.AppId.PadFromConfig(config) - - if settings.Bucket == "" { - settings.Bucket = fmt.Sprintf("%s-%s-%s", settings.Project, settings.Environment, settings.Family) - } + settings := getStoreSettings(config, name) s3Client, err := gosoS3.ProvideClient(ctx, config, logger, settings.ClientName) if err != nil { @@ -149,7 +141,7 @@ func NewStore(ctx context.Context, config cfg.Config, logger log.Logger, name st return store, nil } -func NewStoreWithInterfaces(logger log.Logger, channels *BatchRunnerChannels, client gosoS3.Client, settings Settings) Store { +func NewStoreWithInterfaces(logger log.Logger, channels *BatchRunnerChannels, client gosoS3.Client, settings *Settings) Store { return &s3Store{ logger: logger, channels: channels, @@ -358,3 +350,16 @@ func isBucketAlreadyExistsError(err error) bool { return false } + +func getStoreSettings(config cfg.Config, name string) *Settings { + settings := &Settings{} + key := fmt.Sprintf("blob.%s", name) + config.UnmarshalKey(key, settings) + settings.AppId.PadFromConfig(config) + + if settings.Bucket == "" { + settings.Bucket = fmt.Sprintf("%s-%s-%s", settings.Project, settings.Environment, settings.Family) + } + + return settings +} diff --git a/pkg/blob/stream.go b/pkg/blob/stream.go index 9a0077c94..8d1038cb2 100644 --- a/pkg/blob/stream.go +++ b/pkg/blob/stream.go @@ -3,7 +3,6 @@ package blob import ( "bytes" "io" - "io/ioutil" "sync/atomic" "github.com/pkg/errors" @@ -15,6 +14,7 @@ type ReadCloser interface { } // A reader that we can close and that can seek +// //go:generate mockery --name ReadSeekerCloser type ReadSeekerCloser interface { io.ReadSeeker @@ -22,6 +22,7 @@ type ReadSeekerCloser interface { } // A stream is a source of bytes you can either get as a full []byte or stream as a reader. +// //go:generate mockery --name Stream type Stream interface { // Read all data and close the reader. @@ -86,7 +87,7 @@ type readerStream struct { } func (r readerStream) ReadAll() ([]byte, error) { - b, err := ioutil.ReadAll(r.reader) + b, err := io.ReadAll(r.reader) if err == nil { err = r.reader.Close() @@ -106,7 +107,7 @@ type noSeekerReaderStream struct { } func (r noSeekerReaderStream) ReadAll() ([]byte, error) { - b, err := ioutil.ReadAll(r.reader) + b, err := io.ReadAll(r.reader) if err == nil { err = r.reader.Close() diff --git a/pkg/blob/url_builder.go b/pkg/blob/url_builder.go new file mode 100644 index 000000000..673c3200b --- /dev/null +++ b/pkg/blob/url_builder.go @@ -0,0 +1,57 @@ +package blob + +import ( + "fmt" + "net/url" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/justtrackio/gosoline/pkg/cfg" + "github.com/justtrackio/gosoline/pkg/cloud/aws/s3" +) + +//go:generate mockery --name UrlBuilder +type UrlBuilder interface { + GetAbsoluteUrl(path string) (string, error) +} + +type urlBuilder struct { + endpoint string + usePathStyle bool + bucket string +} + +func NewUrlBuilder(config cfg.Config, name string) (UrlBuilder, error) { + storeSettings := getStoreSettings(config, name) + clientConfig := s3.GetClientConfig(config, storeSettings.ClientName) + + var err error + var endpoint aws.Endpoint + + if endpoint, err = s3.ResolveEndpoint(config, storeSettings.ClientName); err != nil { + return nil, fmt.Errorf("can not resolve s3 endpoint for client %s: %w", storeSettings.ClientName, err) + } + + return &urlBuilder{ + endpoint: endpoint.URL, + usePathStyle: clientConfig.Settings.UsePathStyle, + bucket: storeSettings.Bucket, + }, nil +} + +func (b *urlBuilder) GetAbsoluteUrl(path string) (string, error) { + var err error + var blobUrl *url.URL + + if blobUrl, err = blobUrl.Parse(b.endpoint); err != nil { + return "", fmt.Errorf("can not parse endpoint %s: %w", b.endpoint, err) + } + + if b.usePathStyle { + blobUrl = blobUrl.JoinPath(b.bucket, path) + } else { + blobUrl = blobUrl.JoinPath(path) + blobUrl.Host = fmt.Sprintf("%s.%s", b.bucket, blobUrl.Host) + } + + return blobUrl.String(), nil +} diff --git a/pkg/blob/url_builder_test.go b/pkg/blob/url_builder_test.go new file mode 100644 index 000000000..b64912360 --- /dev/null +++ b/pkg/blob/url_builder_test.go @@ -0,0 +1,75 @@ +package blob_test + +import ( + "testing" + + "github.com/justtrackio/gosoline/pkg/blob" + "github.com/justtrackio/gosoline/pkg/cfg" + "github.com/stretchr/testify/suite" +) + +func TestUrlBuilderTestSuite(t *testing.T) { + suite.Run(t, new(UrlBuilderTestSuite)) +} + +type UrlBuilderTestSuite struct { + suite.Suite + + config cfg.GosoConf +} + +func (s *UrlBuilderTestSuite) SetupTest() { + s.config = cfg.New() + err := s.config.Option(cfg.WithConfigMap(map[string]interface{}{ + "app_project": "justtrack", + "app_family": "gosoline", + "app_name": "uploader", + "env": "test", + })) + + s.NoError(err, "there should be no error on config create") +} + +func (s *UrlBuilderTestSuite) TestLocalstack() { + builder, err := blob.NewUrlBuilder(s.config, "my_store") + s.NoError(err, "there should be no error on builder create") + + url, err := builder.GetAbsoluteUrl("my_file.bin") + s.NoError(err, "there should be no error on GetAbsoluteUrl") + s.Equal("http://localhost:4566/justtrack-test-gosoline/my_file.bin", url) +} + +func (s *UrlBuilderTestSuite) TestAws() { + err := s.config.Option(cfg.WithConfigMap(map[string]interface{}{ + "cloud.aws.defaults.endpoint": "", + "cloud.aws.s3.clients.default.usePathStyle": false, + })) + + s.NoError(err, "there should be no error on config create") + + builder, err := blob.NewUrlBuilder(s.config, "my_store") + s.NoError(err, "there should be no error on builder create") + + url, err := builder.GetAbsoluteUrl("my_file.bin") + s.NoError(err, "there should be no error on GetAbsoluteUrl") + s.Equal("https://justtrack-test-gosoline.s3.eu-central-1.amazonaws.com/my_file.bin", url) +} + +func (s *UrlBuilderTestSuite) TestWithCustomBucket() { + err := s.config.Option(cfg.WithConfigMap(map[string]interface{}{ + "blob.my_store": map[string]interface{}{ + "bucket": "my-custom-bucket", + }, + "cloud.aws.defaults.endpoint": "", + "cloud.aws.s3.clients.default.usePathStyle": false, + })) + + s.NoError(err, "there should be no error on config create") + + builder, err := blob.NewUrlBuilder(s.config, "my_store") + s.NoError(err, "there should be no error on builder create") + + url, err := builder.GetAbsoluteUrl("my_file.bin") + s.NoError(err, "there should be no error on GetAbsoluteUrl") + s.Equal("https://my-custom-bucket.s3.eu-central-1.amazonaws.com/my_file.bin", url) +} diff --git a/pkg/cfg/read.go b/pkg/cfg/read.go index d54c40134..56aaeb641 100644 --- a/pkg/cfg/read.go +++ b/pkg/cfg/read.go @@ -1,7 +1,7 @@ package cfg import ( - "io/ioutil" + "os" "github.com/justtrackio/gosoline/pkg/encoding/yaml" "github.com/pkg/errors" @@ -12,7 +12,7 @@ func readConfigFromFile(cfg *config, filePath string, fileType string) error { return nil } - bytes, err := ioutil.ReadFile(filePath) + bytes, err := os.ReadFile(filePath) if err != nil { return errors.Wrapf(err, "can not read config file %s", filePath) } diff --git a/pkg/clock/clock.go b/pkg/clock/clock.go index c2dc8139e..c673fc450 100644 --- a/pkg/clock/clock.go +++ b/pkg/clock/clock.go @@ -6,6 +6,7 @@ import ( // A Clock provides the most commonly needed functions from the time package while allowing you to substitute them for unit // and integration tests. +// //go:generate mockery --name Clock type Clock interface { // After waits for the duration to elapse and then sends the current time on the returned channel. diff --git a/pkg/clock/fake_clock.go b/pkg/clock/fake_clock.go index cb2538826..e9e08f957 100644 --- a/pkg/clock/fake_clock.go +++ b/pkg/clock/fake_clock.go @@ -7,6 +7,7 @@ import ( // A FakeClock provides the functionality of a Clock with the added functionality to Advance said Clock and block until // at least a given number of timers, tickers, or channels (Clock.After and Clock.Sleep) wait for the time to Advance. +// //go:generate mockery --name FakeClock type FakeClock interface { Clock diff --git a/pkg/clock/ticker.go b/pkg/clock/ticker.go index 281142b5c..ba0273b1c 100644 --- a/pkg/clock/ticker.go +++ b/pkg/clock/ticker.go @@ -7,6 +7,7 @@ import ( ) // A Ticker is similar to a Timer, but it sends the current time continuously to the channel returned by Chan. +// //go:generate mockery --name Ticker type Ticker interface { // Chan returns the channel to which the current time will be sent every time the Ticker expires. diff --git a/pkg/clock/timer.go b/pkg/clock/timer.go index 05ce606b1..d3e40374b 100644 --- a/pkg/clock/timer.go +++ b/pkg/clock/timer.go @@ -5,6 +5,7 @@ import ( ) // A Timer will send the current time to a channel after a delay elapsed. +// //go:generate mockery --name Timer type Timer interface { // Chan returns the channel to which the current time will be sent once the Timer expires. diff --git a/pkg/cloud/aws/awsv2_delayer.go b/pkg/cloud/aws/awsv2_delayer.go index ae13a8724..50b9cdd43 100644 --- a/pkg/cloud/aws/awsv2_delayer.go +++ b/pkg/cloud/aws/awsv2_delayer.go @@ -6,11 +6,8 @@ import ( "time" ) -func init() { - rand.Seed(time.Now().UnixNano()) -} - type BackoffDelayer struct { + rand *rand.Rand initialInterval time.Duration maxInterval time.Duration multiplier float64 @@ -18,7 +15,10 @@ type BackoffDelayer struct { } func NewBackoffDelayer(initialInterval time.Duration, maxInterval time.Duration) *BackoffDelayer { + randSource := rand.NewSource(time.Now().UnixNano()) + return &BackoffDelayer{ + rand: rand.New(randSource), initialInterval: initialInterval, maxInterval: maxInterval, multiplier: 1.5, @@ -38,7 +38,8 @@ func (d *BackoffDelayer) BackoffDelay(attempt int, _ error) (time.Duration, erro } // Returns a random value from the following interval: -// [randomizationFactor * currentInterval, randomizationFactor * currentInterval]. +// +// [randomizationFactor * currentInterval, randomizationFactor * currentInterval]. func (d *BackoffDelayer) getRandomValueFromInterval(currentInterval time.Duration) time.Duration { delta := d.randomizationFactor * float64(currentInterval) minInterval := float64(currentInterval) - delta @@ -47,7 +48,7 @@ func (d *BackoffDelayer) getRandomValueFromInterval(currentInterval time.Duratio // Get a random value from the range [minInterval, maxInterval]. // The formula used below has a +1 because if the minInterval is 1 and the maxInterval is 3 then // we want a 33% chance for selecting either 1, 2 or 3. - random := rand.Float64() + random := d.rand.Float64() return time.Duration(minInterval + (random * (maxInterval - minInterval + 1))) } diff --git a/pkg/cloud/aws/kinesis/metadata_repository.go b/pkg/cloud/aws/kinesis/metadata_repository.go index 12c563e53..d0ce9eac9 100644 --- a/pkg/cloud/aws/kinesis/metadata_repository.go +++ b/pkg/cloud/aws/kinesis/metadata_repository.go @@ -42,6 +42,7 @@ type MetadataRepository interface { } // A Checkpoint describes our position in a shard of the stream. +// //go:generate mockery --name Checkpoint type Checkpoint interface { CheckpointWithoutRelease @@ -51,6 +52,7 @@ type Checkpoint interface { // CheckpointWithoutRelease consists of the Checkpoint interface without the release method. We only use this internally // to ensure Release can only be called when we have taken ownership of the Checkpoint. +// //go:generate mockery --name CheckpointWithoutRelease type CheckpointWithoutRelease interface { GetSequenceNumber() SequenceNumber diff --git a/pkg/cloud/aws/s3/client.go b/pkg/cloud/aws/s3/client.go index fbb46b0fd..eb0bb33ca 100644 --- a/pkg/cloud/aws/s3/client.go +++ b/pkg/cloud/aws/s3/client.go @@ -2,6 +2,7 @@ package s3 import ( "context" + "errors" "fmt" "github.com/aws/aws-sdk-go-v2/aws" @@ -11,14 +12,9 @@ import ( "github.com/justtrackio/gosoline/pkg/appctx" "github.com/justtrackio/gosoline/pkg/cfg" gosoAws "github.com/justtrackio/gosoline/pkg/cloud/aws" - "github.com/justtrackio/gosoline/pkg/dx" "github.com/justtrackio/gosoline/pkg/log" ) -func init() { - dx.RegisterLocalstackSetting("cloud.aws.s3.clients.default.usePathStyle", true) -} - //go:generate mockery --name Client type Client interface { CopyObject(ctx context.Context, params *s3.CopyObjectInput, optFns ...func(*s3.Options)) (*s3.CopyObjectOutput, error) @@ -40,7 +36,7 @@ type Client interface { type ClientSettings struct { gosoAws.ClientSettings - UsePathStyle bool `cfg:"usePathStyle" default:"false"` + UsePathStyle bool `cfg:"usePathStyle" default:"true"` } type ClientConfig struct { @@ -71,7 +67,7 @@ func ProvideClient(ctx context.Context, config cfg.Config, logger log.Logger, na } func NewClient(ctx context.Context, config cfg.Config, logger log.Logger, name string, optFns ...ClientOption) (*s3.Client, error) { - clientCfg := getClientConfig(config, name, optFns...) + clientCfg := GetClientConfig(config, name, optFns...) var err error var awsConfig aws.Config @@ -87,7 +83,7 @@ func NewClient(ctx context.Context, config cfg.Config, logger log.Logger, name s return client, nil } -func getClientConfig(config cfg.Config, name string, optFns ...ClientOption) *ClientConfig { +func GetClientConfig(config cfg.Config, name string, optFns ...ClientOption) *ClientConfig { clientCfg := &ClientConfig{} gosoAws.UnmarshalClientSettings(config, &clientCfg.Settings, "s3", name) @@ -97,3 +93,16 @@ func getClientConfig(config cfg.Config, name string, optFns ...ClientOption) *Cl return clientCfg } + +func ResolveEndpoint(config cfg.Config, name string, optFns ...ClientOption) (aws.Endpoint, error) { + clientCfg := GetClientConfig(config, name, optFns...) + gosoResolver := gosoAws.EndpointResolver(clientCfg.Settings.Endpoint) + + endpoint, err := gosoResolver.ResolveEndpoint("s3", clientCfg.Settings.Region) + + if nf := (&aws.EndpointNotFoundError{}); !errors.As(err, &nf) { + return endpoint, err + } + + return s3.NewDefaultEndpointResolver().ResolveEndpoint(clientCfg.Settings.Region, s3.EndpointResolverOptions{}) +} diff --git a/pkg/cloud/aws/s3/client_test.go b/pkg/cloud/aws/s3/client_test.go new file mode 100644 index 000000000..b89428f5c --- /dev/null +++ b/pkg/cloud/aws/s3/client_test.go @@ -0,0 +1,35 @@ +package s3_test + +import ( + "testing" + + "github.com/justtrackio/gosoline/pkg/cfg" + "github.com/justtrackio/gosoline/pkg/cloud/aws/s3" + "github.com/stretchr/testify/assert" +) + +func TestResolveDefaultEndpoint(t *testing.T) { + config := createConfig(t, map[string]interface{}{}) + + endpoint, err := s3.ResolveEndpoint(config, "default") + assert.NoError(t, err, "there should be no error resolving the endpoint") + assert.Equal(t, "http://localhost:4566", endpoint.URL) +} + +func TestResolveAwsEndpoint(t *testing.T) { + config := createConfig(t, map[string]interface{}{ + "cloud.aws.defaults.endpoint": "", + }) + + endpoint, err := s3.ResolveEndpoint(config, "default") + assert.NoError(t, err, "there should be no error resolving the endpoint") + assert.Equal(t, "https://s3.eu-central-1.amazonaws.com", endpoint.URL) +} + +func createConfig(t *testing.T, settings map[string]interface{}) cfg.Config { + config := cfg.New() + err := config.Option(cfg.WithConfigMap(settings)) + assert.NoError(t, err, "there should be no error on config create") + + return config +} diff --git a/pkg/conc/poisoned_lock.go b/pkg/conc/poisoned_lock.go index b90f60e12..201ba4e84 100644 --- a/pkg/conc/poisoned_lock.go +++ b/pkg/conc/poisoned_lock.go @@ -11,6 +11,7 @@ var ErrAlreadyPoisoned = fmt.Errorf("lock was already poisoned") // A PoisonedLock is similar to a sync.Mutex, but once you Poison it, any attempt to Lock it will fail. Thus, you can // implement something which is available for some time and at some point no longer is available (because it was closed // or released and is not automatically reopened, etc.) +// //go:generate mockery --name PoisonedLock type PoisonedLock interface { // MustLock is like TryLock, but panics if an error is returned by TryLock diff --git a/pkg/dx/localstack_defaults.go b/pkg/dx/localstack_defaults.go deleted file mode 100644 index f84d54e26..000000000 --- a/pkg/dx/localstack_defaults.go +++ /dev/null @@ -1,39 +0,0 @@ -package dx - -import ( - "fmt" - - "github.com/justtrackio/gosoline/pkg/cfg" -) - -func init() { - cfg.AddPostProcessor(1, "gosoline.dx.useLocalstackDefaults", UseLocalstackDefaultsConfigPostProcessor) -} - -var localstackSetting = make(map[string]interface{}) - -func RegisterLocalstackSetting(setting string, value interface{}) { - localstackSetting[setting] = value -} - -func UseLocalstackDefaultsConfigPostProcessor(config cfg.GosoConf) (bool, error) { - return runPostProcessorForDev(config, func(config cfg.GosoConf) error { - if err := config.Option(cfg.WithConfigSetting("dx.use_localstack_defaults", true, cfg.SkipExisting)); err != nil { - return fmt.Errorf("could not set dx.use_localstack_defaults: %w", err) - } - - if ShouldUseLocalstackDefaults(config) { - for setting, value := range localstackSetting { - if err := config.Option(cfg.WithConfigSetting(setting, value, cfg.SkipExisting)); err != nil { - return fmt.Errorf("could not set %s to %v: %w", setting, value, err) - } - } - } - - return nil - }) -} - -func ShouldUseLocalstackDefaults(config cfg.Config) bool { - return config.GetBool("dx.use_localstack_defaults", false) -} diff --git a/pkg/es/clients_v7.go b/pkg/es/clients_v7.go index 1ab6227e8..7a2d8dd64 100644 --- a/pkg/es/clients_v7.go +++ b/pkg/es/clients_v7.go @@ -3,7 +3,6 @@ package es import ( "bytes" "fmt" - "io/ioutil" "net/http" "os" "path/filepath" @@ -148,7 +147,7 @@ func putTemplates(logger Logger, client *ClientV7, name string, paths []string) } for _, file := range files { - buf, err := ioutil.ReadFile(file) + buf, err := os.ReadFile(file) if err != nil { return fmt.Errorf("could not read es-templates file %s: %w", file, err) } @@ -196,7 +195,7 @@ func getTemplateFiles(logger Logger, paths []string) ([]string, error) { return nil, fmt.Errorf("the es-tempates path %s is neither a file or a directory: %w", p, err) } - fileInfos, err := ioutil.ReadDir(p) + fileInfos, err := os.ReadDir(p) if err != nil { return nil, fmt.Errorf("could not scan the the es-tempates path %s: %w", p, err) } diff --git a/pkg/fixtures/writer_blob.go b/pkg/fixtures/writer_blob.go index 0861abb91..8367a3882 100644 --- a/pkg/fixtures/writer_blob.go +++ b/pkg/fixtures/writer_blob.go @@ -3,7 +3,6 @@ package fixtures import ( "context" "fmt" - "io/ioutil" "os" "path/filepath" "strings" @@ -98,7 +97,7 @@ func (s *blobFixtureWriter) Write(ctx context.Context, _ *FixtureSet) error { var batch blob.Batch for _, file := range files { - body, err := ioutil.ReadFile(file) + body, err := os.ReadFile(file) if err != nil { return err } diff --git a/pkg/log/ecs_metadata.go b/pkg/log/ecs_metadata.go index 453d12f9e..810a99190 100644 --- a/pkg/log/ecs_metadata.go +++ b/pkg/log/ecs_metadata.go @@ -1,7 +1,6 @@ package log import ( - "io/ioutil" "os" "sync" "time" @@ -36,7 +35,7 @@ func ReadEcsMetadata() (EcsMetadata, error) { metadata := make(EcsMetadata) for { - data, err := ioutil.ReadFile(path) + data, err := os.ReadFile(path) if err != nil { return nil, errors.Wrap(err, "can not read ecs metadata file") } diff --git a/pkg/mapx/map.go b/pkg/mapx/map.go index 480aa6c03..8f2f3bea1 100644 --- a/pkg/mapx/map.go +++ b/pkg/mapx/map.go @@ -346,6 +346,10 @@ func (m *MapX) Merge(key string, source interface{}, options ...MapOption) { m.Set(key, source, options...) } +func (m *MapX) String() string { + return fmt.Sprint(m.Msi()) +} + // getIndex returns the index, which is hold in s by two braches. // It also returns s withour the index part, e.g. name[1] will return (1, name). // If no index is found, -1 is returned diff --git a/pkg/metric/channel.go b/pkg/metric/channel.go index 6a5b3f647..ce8561c91 100644 --- a/pkg/metric/channel.go +++ b/pkg/metric/channel.go @@ -52,16 +52,16 @@ func (c *metricChannel) write(batch Data) { // Lock the channel metadata, close the channel and unlock it again. // Why do we need a RW lock for the channel? Multiple possible choices: -// - Just read until we get nothing more - does not work if a producer -// writes more messages after we read "everything" to the channel. If -// the producer writes enough messages, it could actually get stuck -// because there is no consumer left and we only buffer 100 items -// - Just add an (atomic) boolean flag: If we check whether we closed the -// channel and then write to it, if not, we have a time-of-check to -// time-of-use race condition. Between our check and writing to the -// channel someone could have closed the channel. -// - Just use recover when you get a panic: Would work, but this is really -// not pretty. +// - Just read until we get nothing more - does not work if a producer +// writes more messages after we read "everything" to the channel. If +// the producer writes enough messages, it could actually get stuck +// because there is no consumer left and we only buffer 100 items +// - Just add an (atomic) boolean flag: If we check whether we closed the +// channel and then write to it, if not, we have a time-of-check to +// time-of-use race condition. Between our check and writing to the +// channel someone could have closed the channel. +// - Just use recover when you get a panic: Would work, but this is really +// not pretty. func (c *metricChannel) close() { c.lck.Lock() defer c.lck.Unlock() diff --git a/pkg/stream/compression_test.go b/pkg/stream/compression_test.go index 273db8fd7..cbc7e6ff0 100644 --- a/pkg/stream/compression_test.go +++ b/pkg/stream/compression_test.go @@ -3,7 +3,7 @@ package stream_test import ( "bytes" "compress/gzip" - "io/ioutil" + "io" "testing" "github.com/justtrackio/gosoline/pkg/stream" @@ -44,7 +44,7 @@ func TestCompressionGzip(t *testing.T) { reader, err := gzip.NewReader(bytes.NewReader(compressed)) assert.NoError(t, err) - decompressedBody, err := ioutil.ReadAll(reader) + decompressedBody, err := io.ReadAll(reader) assert.NoError(t, err) assert.Equal(t, body, string(decompressedBody)) diff --git a/pkg/stream/input.go b/pkg/stream/input.go index 5dfbdffb8..d1ac34a48 100644 --- a/pkg/stream/input.go +++ b/pkg/stream/input.go @@ -3,6 +3,7 @@ package stream import "context" // An Input provides you with a steady stream of messages until you Stop it. +// //go:generate mockery --name Input type Input interface { // Run provides a steady stream of messages, returned via Data. Run does not return until Stop is called and thus @@ -21,6 +22,7 @@ type Input interface { // An AcknowledgeableInput is an Input with the additional ability to mark messages as successfully consumed. For example, // an SQS queue would provide a message after its visibility timeout a second time if we didn't acknowledge it. +// //go:generate mockery --name AcknowledgeableInput type AcknowledgeableInput interface { Input diff --git a/pkg/stream/output_file_test.go b/pkg/stream/output_file_test.go index 081c91533..f655f3481 100644 --- a/pkg/stream/output_file_test.go +++ b/pkg/stream/output_file_test.go @@ -3,7 +3,6 @@ package stream_test import ( "context" "fmt" - "io/ioutil" "os" "regexp" "sync" @@ -41,7 +40,7 @@ func TestOutputFile_ConcurrentWrite(t *testing.T) { waitGroup.Wait() - result, err := ioutil.ReadFile(fileName) + result, err := os.ReadFile(fileName) assert.NoError(t, err) assert.False(t, regexp.MustCompile("\n{2}").Match(result), "unexpected double new line") } diff --git a/pkg/test/env/component_localstack.go b/pkg/test/env/component_localstack.go index b9d6c3a12..6f56f0ec5 100644 --- a/pkg/test/env/component_localstack.go +++ b/pkg/test/env/component_localstack.go @@ -21,7 +21,7 @@ func (c *localstackComponent) CfgOptions() []cfg.Option { options := []cfg.Option{ cfg.WithConfigMap(map[string]interface{}{ "cloud.aws": map[string]interface{}{ - "credentials:": map[string]interface{}{ + "credentials": map[string]interface{}{ "access_key_id": DefaultAccessKeyID, "secret_access_key": DefaultSecretAccessKey, "session_token": DefaultToken, diff --git a/pkg/test/env/component_stream_input.go b/pkg/test/env/component_stream_input.go index e9eba4304..30687cfd3 100644 --- a/pkg/test/env/component_stream_input.go +++ b/pkg/test/env/component_stream_input.go @@ -2,7 +2,7 @@ package env import ( "fmt" - "io/ioutil" + "os" "github.com/justtrackio/gosoline/pkg/cfg" "github.com/justtrackio/gosoline/pkg/encoding/json" @@ -43,7 +43,7 @@ func (s *StreamInputComponent) PublishAndStop(body interface{}, attributes map[s } func (s *StreamInputComponent) PublishFromJsonFile(fileName string) { - bytes, err := ioutil.ReadFile(fileName) + bytes, err := os.ReadFile(fileName) if err != nil { s.failNow(err.Error(), "can not open json file to publish messages") } diff --git a/pkg/test/env/factory_localstack.go b/pkg/test/env/factory_localstack.go index 029180ce8..bb6cb6f97 100644 --- a/pkg/test/env/factory_localstack.go +++ b/pkg/test/env/factory_localstack.go @@ -2,7 +2,7 @@ package env import ( "fmt" - "io/ioutil" + "io" "net/http" "strings" @@ -125,7 +125,7 @@ func (f *localstackFactory) healthCheck(settings interface{}) ComponentHealthChe return err } - if body, err = ioutil.ReadAll(resp.Body); err != nil { + if body, err = io.ReadAll(resp.Body); err != nil { return err } diff --git a/pkg/test/env/factory_wiremock.go b/pkg/test/env/factory_wiremock.go index a2ff0e19b..20a0b97a1 100644 --- a/pkg/test/env/factory_wiremock.go +++ b/pkg/test/env/factory_wiremock.go @@ -3,8 +3,8 @@ package env import ( "bytes" "fmt" - "io/ioutil" "net/http" + "os" "path/filepath" "github.com/justtrackio/gosoline/pkg/cfg" @@ -80,7 +80,7 @@ func (f *wiremockFactory) Component(_ cfg.Config, logger log.Logger, containers } s := settings.(*wiremockSettings) - jsonStr, err := ioutil.ReadFile(s.Mocks) + jsonStr, err := os.ReadFile(s.Mocks) if err != nil { filename := s.Mocks diff --git a/pkg/test/env/filesystem.go b/pkg/test/env/filesystem.go index 4d33ebf8e..300f1d822 100644 --- a/pkg/test/env/filesystem.go +++ b/pkg/test/env/filesystem.go @@ -2,9 +2,10 @@ package env import ( "fmt" - "github.com/stretchr/testify/assert" - "io/ioutil" + "os" "testing" + + "github.com/stretchr/testify/assert" ) type filesystem struct { @@ -18,7 +19,7 @@ func newFilesystem(t *testing.T) *filesystem { } func (f *filesystem) ReadString(filename string) string { - bytes, err := ioutil.ReadFile(filename) + bytes, err := os.ReadFile(filename) if err != nil { err = fmt.Errorf("can not read test data from file %s: %w", filename, err) diff --git a/test/fixtures/s3/s3_test.go b/test/fixtures/s3/s3_test.go index 1027bed69..5affa6548 100644 --- a/test/fixtures/s3/s3_test.go +++ b/test/fixtures/s3/s3_test.go @@ -5,7 +5,7 @@ package s3_test import ( "context" "errors" - "io/ioutil" + "io" "testing" "github.com/aws/aws-sdk-go-v2/aws" @@ -46,7 +46,7 @@ func (s *S3TestSuite) TestS3() { output, err := s3Client.GetObject(context.Background(), input) s.NoError(err) - body, err := ioutil.ReadAll(output.Body) + body, err := io.ReadAll(output.Body) s.NoError(err) s.Equal(28092, len(body)) @@ -58,7 +58,7 @@ func (s *S3TestSuite) TestS3() { output, err = s3Client.GetObject(context.Background(), input) s.NoError(err) - body, err = ioutil.ReadAll(output.Body) + body, err = io.ReadAll(output.Body) s.NoError(err) s.Equal(28092, len(body)) @@ -70,7 +70,7 @@ func (s *S3TestSuite) TestS3() { output, err = s3Client.GetObject(context.Background(), input) s.NoError(err) - body, err = ioutil.ReadAll(output.Body) + body, err = io.ReadAll(output.Body) s.NoError(err) s.Equal(28092, len(body)) @@ -90,7 +90,7 @@ func (s *S3TestSuite) TestS3WithPurge() { output, err := s3Client.GetObject(context.Background(), input) s.NoError(err) - body, err := ioutil.ReadAll(output.Body) + body, err := io.ReadAll(output.Body) s.NoError(err) s.Equal(28092, len(body)) @@ -105,7 +105,7 @@ func (s *S3TestSuite) TestS3WithPurge() { output, err = s3Client.GetObject(context.Background(), input) s.NoError(err) - body, err = ioutil.ReadAll(output.Body) + body, err = io.ReadAll(output.Body) s.NoError(err) s.Equal(28092, len(body))