diff --git a/cpp/include/milvus-storage/common/path_util.h b/cpp/include/milvus-storage/common/path_util.h new file mode 100644 index 0000000..1bbec94 --- /dev/null +++ b/cpp/include/milvus-storage/common/path_util.h @@ -0,0 +1,51 @@ +// Copyright 2024 Zilliz +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include "arrow/status.h" + +namespace milvus_storage { + +constexpr char kSep = '/'; + +arrow::Status NotAFile(std::string_view path) { + return arrow::Status::IOError("Not a regular file: " + std::string(path)); +} + +bool HasTrailingSlash(std::string_view s) { return !s.empty() && s.back() == kSep; } + +std::string EnsureTrailingSlash(std::string_view v) { + if (!v.empty() && !HasTrailingSlash(v)) { + // XXX How about "C:" on Windows? We probably don't want to turn it into "C:/"... + // Unless the local filesystem always uses absolute paths + return std::string(v) + kSep; + } else { + return std::string(v); + } +} + +std::pair GetAbstractPathParent(const std::string& s) { + // XXX should strip trailing slash? + + auto pos = s.find_last_of(kSep); + if (pos == std::string::npos) { + // Empty parent + return {{}, s}; + } + return {s.substr(0, pos), s.substr(pos + 1)}; +} + +} // namespace milvus_storage \ No newline at end of file diff --git a/cpp/include/milvus-storage/common/status.h b/cpp/include/milvus-storage/common/status.h index db99911..fa6d42c 100644 --- a/cpp/include/milvus-storage/common/status.h +++ b/cpp/include/milvus-storage/common/status.h @@ -37,6 +37,8 @@ class Status { static Status WriterError(const std::string& msg) { return Status(kWriterError, msg); } + static Status IOError(const std::string& msg) { return Status(kIOError, msg); } + bool ok() const { return code_ == kOk; } bool IsArrowError() const { return code_ == kArrowError; } @@ -49,6 +51,8 @@ class Status { bool IsWriterError() const { return code_ == kWriterError; } + bool IsIOError() const { return code_ == kIOError; } + std::string ToString() const; private: @@ -59,6 +63,7 @@ class Status { kInternalStateError = 3, kFileNotFound = 4, kWriterError = 5, + kIOError = 6, }; explicit Status(Code code, const std::string& msg = "") : code_(code), msg_(msg) {} diff --git a/cpp/include/milvus-storage/filesystem/s3/multi_part_upload_s3_fs.h b/cpp/include/milvus-storage/filesystem/s3/multi_part_upload_s3_fs.h new file mode 100644 index 0000000..5bbcb34 --- /dev/null +++ b/cpp/include/milvus-storage/filesystem/s3/multi_part_upload_s3_fs.h @@ -0,0 +1,209 @@ +// Copyright 2024 Zilliz +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#include +#include +#include "arrow/filesystem/filesystem.h" +#include "arrow/util/macros.h" +#include "arrow/util/uri.h" + +using ::arrow::Result; + +namespace milvus_storage { + +struct S3Options { + /// \brief AWS region to connect to. + /// + /// If unset, the AWS SDK will choose a default value. The exact algorithm + /// depends on the SDK version. Before 1.8, the default is hardcoded + /// to "us-east-1". Since 1.8, several heuristics are used to determine + /// the region (environment variables, configuration profile, EC2 metadata + /// server). + std::string region; + + /// \brief Socket connection timeout, in seconds + /// + /// If negative, the AWS SDK default value is used (typically 1 second). + double connect_timeout = -1; + + /// \brief Socket read timeout on Windows and macOS, in seconds + /// + /// If negative, the AWS SDK default value is used (typically 3 seconds). + /// This option is ignored on non-Windows, non-macOS systems. + double request_timeout = -1; + + /// If non-empty, override region with a connect string such as "localhost:9000" + // XXX perhaps instead take a URL like "http://localhost:9000"? + std::string endpoint_override; + /// S3 connection transport, default "https" + std::string scheme = "https"; + + /// ARN of role to assume + std::string role_arn; + /// Optional identifier for an assumed role session. + std::string session_name; + /// Optional external identifier to pass to STS when assuming a role + std::string external_id; + /// Frequency (in seconds) to refresh temporary credentials from assumed role + int load_frequency = 900; + + /// If connection is through a proxy, set options here + arrow::fs::S3ProxyOptions proxy_options; + + /// AWS credentials provider + std::shared_ptr credentials_provider; + + /// Type of credentials being used. Set along with credentials_provider. + arrow::fs::S3CredentialsKind credentials_kind = arrow::fs::S3CredentialsKind::Default; + + /// Whether to use virtual addressing of buckets + /// + /// If true, then virtual addressing is always enabled. + /// If false, then virtual addressing is only enabled if `endpoint_override` is empty. + /// + /// This can be used for non-AWS backends that only support virtual hosted-style access. + bool force_virtual_addressing = false; + + /// Whether OutputStream writes will be issued in the background, without blocking. + bool background_writes = true; + + /// Whether to allow creation of buckets + /// + /// When S3FileSystem creates new buckets, it does not pass any non-default settings. + /// In AWS S3, the bucket and all objects will be not publicly visible, and there + /// will be no bucket policies and no resource tags. To have more control over how + /// buckets are created, use a different API to create them. + bool allow_bucket_creation = false; + + /// Whether to allow deletion of buckets + bool allow_bucket_deletion = false; + + /// Whether to allow pessimistic directory creation in CreateDir function + /// + /// By default, CreateDir function will try to create the directory without checking its + /// existence. It's an optimization to try directory creation and catch the error, + /// rather than issue two dependent I/O calls. + /// Though for key/value storage like Google Cloud Storage, too many creation calls will + /// breach the rate limit for object mutation operations and cause serious consequences. + /// It's also possible you don't have creation access for the parent directory. Set it + /// to be true to address these scenarios. + bool check_directory_existence_before_creation = false; + + /// Whether to allow file-open methods to return before the actual open. + /// + /// Enabling this may reduce the latency of `OpenInputStream`, `OpenOutputStream`, + /// and similar methods, by reducing the number of roundtrips necessary. It may also + /// allow usage of more efficient S3 APIs for small files. + /// The downside is that failure conditions such as attempting to open a file in a + /// non-existing bucket will only be reported when actual I/O is done (at worse, + /// when attempting to close the file). + bool allow_delayed_open = false; + + /// \brief Default metadata for OpenOutputStream. + /// + /// This will be ignored if non-empty metadata is passed to OpenOutputStream. + std::shared_ptr default_metadata; + + /// Optional retry strategy to determine which error types should be retried, and the + /// delay between retries. + std::shared_ptr retry_strategy; + + S3Options(); + + /// Configure with the default AWS credentials provider chain. + void ConfigureDefaultCredentials(); + + /// Configure with anonymous credentials. This will only let you access public buckets. + void ConfigureAnonymousCredentials(); + + /// Configure with explicit access and secret key. + void ConfigureAccessKey(const std::string& access_key, + const std::string& secret_key, + const std::string& session_token = ""); + + /// Configure with credentials from an assumed role. + void ConfigureAssumeRoleCredentials(const std::string& role_arn, + const std::string& session_name = "", + const std::string& external_id = "", + int load_frequency = 900, + const std::shared_ptr& stsClient = NULLPTR); + + /// Configure with credentials from role assumed using a web identity token + void ConfigureAssumeRoleWithWebIdentityCredentials(); + + std::string GetAccessKey() const; + std::string GetSecretKey() const; + std::string GetSessionToken() const; + + bool Equals(const S3Options& other) const; + + /// \brief Initialize with default credentials provider chain + /// + /// This is recommended if you use the standard AWS environment variables + /// and/or configuration file. + static S3Options Defaults(); + + /// \brief Initialize with anonymous credentials. + /// + /// This will only let you access public buckets. + static S3Options Anonymous(); + + /// \brief Initialize with explicit access and secret key. + /// + /// Optionally, a session token may also be provided for temporary credentials + /// (from STS). + static S3Options FromAccessKey(const std::string& access_key, + const std::string& secret_key, + const std::string& session_token = ""); + + /// \brief Initialize from an assumed role. + static S3Options FromAssumeRole(const std::string& role_arn, + const std::string& session_name = "", + const std::string& external_id = "", + int load_frequency = 900, + const std::shared_ptr& stsClient = NULLPTR); + + /// \brief Initialize from an assumed role with web-identity. + /// Uses the AWS SDK which uses environment variables to + /// generate temporary credentials. + static S3Options FromAssumeRoleWithWebIdentity(); + + static Result FromUri(const ::arrow::util::Uri& uri, std::string* out_path = NULLPTR); + static Result FromUri(const std::string& uri, std::string* out_path = NULLPTR); +}; + +class MultiPartUploadS3FS : public arrow::fs::FileSystem { + public: + explicit MultiPartUploadS3FS(const arrow::fs::S3Options& options, int64_t part_size) + : options_(options), part_size_(part_size) {} + + arrow::Result> OpenOutputStream( + const std::string& s, const std::shared_ptr& metadata) override; + + protected: + class Impl; + std::shared_ptr impl_; + + private: + const int64_t part_size_; + const arrow::fs::S3Options& options_; +}; + +} // namespace milvus_storage \ No newline at end of file diff --git a/cpp/include/milvus-storage/filesystem/s3/s3_internal.h b/cpp/include/milvus-storage/filesystem/s3/s3_internal.h new file mode 100644 index 0000000..a58bcda --- /dev/null +++ b/cpp/include/milvus-storage/filesystem/s3/s3_internal.h @@ -0,0 +1,273 @@ + +#pragma once + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include "arrow/filesystem/filesystem.h" +#include "arrow/filesystem/s3fs.h" +#include "arrow/status.h" +#include "arrow/util/logging.h" +#include "arrow/util/print.h" +#include "arrow/util/string.h" + +namespace arrow { +namespace fs { +namespace internal { + +// XXX Should we expose this at some point? +enum class S3Backend { Amazon, Minio, Other }; + +// Detect the S3 backend type from the S3 server's response headers +inline S3Backend DetectS3Backend(const Aws::Http::HeaderValueCollection& headers) { + const auto it = headers.find("server"); + if (it != headers.end()) { + const auto& value = std::string_view(it->second); + if (value.find("AmazonS3") != std::string::npos) { + return S3Backend::Amazon; + } + if (value.find("MinIO") != std::string::npos) { + return S3Backend::Minio; + } + } + return S3Backend::Other; +} + +template +inline S3Backend DetectS3Backend(const Aws::Client::AWSError& error) { + return DetectS3Backend(error.GetResponseHeaders()); +} + +template +inline bool IsConnectError(const Aws::Client::AWSError& error) { + if (error.ShouldRetry()) { + return true; + } + // Sometimes Minio may fail with a 503 error + // (exception name: XMinioServerNotInitialized, + // message: "Server not initialized, please try again") + if (error.GetExceptionName() == "XMinioServerNotInitialized") { + return true; + } + return false; +} + +template +inline std::optional BucketRegionFromError(const Aws::Client::AWSError& error) { + if constexpr (std::is_same_v) { + const auto& headers = error.GetResponseHeaders(); + const auto it = headers.find("x-amz-bucket-region"); + if (it != headers.end()) { + const std::string region(it->second.begin(), it->second.end()); + return region; + } + } + return std::nullopt; +} + +inline bool IsNotFound(const Aws::Client::AWSError& error) { + const auto error_type = error.GetErrorType(); + return (error_type == Aws::S3::S3Errors::NO_SUCH_BUCKET || error_type == Aws::S3::S3Errors::RESOURCE_NOT_FOUND); +} + +inline bool IsAlreadyExists(const Aws::Client::AWSError& error) { + const auto error_type = error.GetErrorType(); + return (error_type == Aws::S3::S3Errors::BUCKET_ALREADY_EXISTS || + error_type == Aws::S3::S3Errors::BUCKET_ALREADY_OWNED_BY_YOU); +} + +inline std::string S3ErrorToString(Aws::S3::S3Errors error_type) { + switch (error_type) { +#define S3_ERROR_CASE(NAME) \ + case Aws::S3::S3Errors::NAME: \ + return #NAME; + + S3_ERROR_CASE(INCOMPLETE_SIGNATURE) + S3_ERROR_CASE(INTERNAL_FAILURE) + S3_ERROR_CASE(INVALID_ACTION) + S3_ERROR_CASE(INVALID_CLIENT_TOKEN_ID) + S3_ERROR_CASE(INVALID_PARAMETER_COMBINATION) + S3_ERROR_CASE(INVALID_QUERY_PARAMETER) + S3_ERROR_CASE(INVALID_PARAMETER_VALUE) + S3_ERROR_CASE(MISSING_ACTION) + S3_ERROR_CASE(MISSING_AUTHENTICATION_TOKEN) + S3_ERROR_CASE(MISSING_PARAMETER) + S3_ERROR_CASE(OPT_IN_REQUIRED) + S3_ERROR_CASE(REQUEST_EXPIRED) + S3_ERROR_CASE(SERVICE_UNAVAILABLE) + S3_ERROR_CASE(THROTTLING) + S3_ERROR_CASE(VALIDATION) + S3_ERROR_CASE(ACCESS_DENIED) + S3_ERROR_CASE(RESOURCE_NOT_FOUND) + S3_ERROR_CASE(UNRECOGNIZED_CLIENT) + S3_ERROR_CASE(MALFORMED_QUERY_STRING) + S3_ERROR_CASE(SLOW_DOWN) + S3_ERROR_CASE(REQUEST_TIME_TOO_SKEWED) + S3_ERROR_CASE(INVALID_SIGNATURE) + S3_ERROR_CASE(SIGNATURE_DOES_NOT_MATCH) + S3_ERROR_CASE(INVALID_ACCESS_KEY_ID) + S3_ERROR_CASE(REQUEST_TIMEOUT) + S3_ERROR_CASE(NETWORK_CONNECTION) + S3_ERROR_CASE(UNKNOWN) + S3_ERROR_CASE(BUCKET_ALREADY_EXISTS) + S3_ERROR_CASE(BUCKET_ALREADY_OWNED_BY_YOU) + // The following is the most recent addition to S3Errors + // and is not supported yet for some versions of the SDK + // that Apache Arrow is using. This is not a big deal + // since this error will happen only in very specialized + // settings and we will print the correct numerical error + // code as per the "default" case down below. We should + // put it back once the SDK has been upgraded in all + // Apache Arrow build configurations. + // S3_ERROR_CASE(INVALID_OBJECT_STATE) + S3_ERROR_CASE(NO_SUCH_BUCKET) + S3_ERROR_CASE(NO_SUCH_KEY) + S3_ERROR_CASE(NO_SUCH_UPLOAD) + S3_ERROR_CASE(OBJECT_ALREADY_IN_ACTIVE_TIER) + S3_ERROR_CASE(OBJECT_NOT_IN_ACTIVE_TIER) + +#undef S3_ERROR_CASE + default: + return "[code " + ::arrow::internal::ToChars(static_cast(error_type)) + "]"; + } +} + +// TODO qualify error messages with a prefix indicating context +// (e.g. "When completing multipart upload to bucket 'xxx', key 'xxx': ...") +template +Status ErrorToStatus(const std::string& prefix, + const std::string& operation, + const Aws::Client::AWSError& error, + const std::optional& region = std::nullopt) { + // XXX Handle fine-grained error types + // See + // https://sdk.amazonaws.com/cpp/api/LATEST/namespace_aws_1_1_s3.html#ae3f82f8132b619b6e91c88a9f1bde371 + auto error_type = static_cast(error.GetErrorType()); + std::stringstream ss; + ss << S3ErrorToString(error_type); + if (error_type == Aws::S3::S3Errors::UNKNOWN) { + ss << " (HTTP status " << static_cast(error.GetResponseCode()) << ")"; + } + + // Possibly an error due to wrong region configuration from client and bucket. + std::optional wrong_region_msg = std::nullopt; + if (region.has_value()) { + const auto maybe_region = BucketRegionFromError(error); + if (maybe_region.has_value() && maybe_region.value() != region.value()) { + wrong_region_msg = " Looks like the configured region is '" + region.value() + + "' while the bucket is located in '" + maybe_region.value() + "'."; + } + } + return Status::IOError(prefix, "AWS Error ", ss.str(), " during ", operation, " operation: ", error.GetMessage(), + wrong_region_msg.value_or("")); +} + +template +Status ErrorToStatus(const std::tuple& prefix, + const std::string& operation, + const Aws::Client::AWSError& error) { + std::stringstream ss; + ::arrow::internal::PrintTuple(&ss, prefix); + return ErrorToStatus(ss.str(), operation, error); +} + +template +Status ErrorToStatus(const std::string& operation, const Aws::Client::AWSError& error) { + return ErrorToStatus(std::string(), operation, error); +} + +template +Status OutcomeToStatus(const std::string& prefix, + const std::string& operation, + const Aws::Utils::Outcome& outcome) { + if (outcome.IsSuccess()) { + return Status::OK(); + } else { + return ErrorToStatus(prefix, operation, outcome.GetError()); + } +} + +template +Status OutcomeToStatus(const std::tuple& prefix, + const std::string& operation, + const Aws::Utils::Outcome& outcome) { + if (outcome.IsSuccess()) { + return Status::OK(); + } else { + return ErrorToStatus(prefix, operation, outcome.GetError()); + } +} + +template +Status OutcomeToStatus(const std::string& operation, const Aws::Utils::Outcome& outcome) { + return OutcomeToStatus(std::string(), operation, outcome); +} + +template +Result OutcomeToResult(const std::string& operation, Aws::Utils::Outcome outcome) { + if (outcome.IsSuccess()) { + return std::move(outcome).GetResultWithOwnership(); + } else { + return ErrorToStatus(operation, outcome.GetError()); + } +} + +inline Aws::String ToAwsString(const std::string& s) { + // Direct construction of Aws::String from std::string doesn't work because + // it uses a specific Allocator class. + return Aws::String(s.begin(), s.end()); +} + +inline std::string_view FromAwsString(const Aws::String& s) { return {s.data(), s.length()}; } + +inline Aws::String ToURLEncodedAwsString(const std::string& s) { return Aws::Utils::StringUtils::URLEncode(s.data()); } + +inline TimePoint FromAwsDatetime(const Aws::Utils::DateTime& dt) { + return std::chrono::time_point_cast(dt.UnderlyingTimestamp()); +} + +// A connect retry strategy with a controlled max duration. + +class ConnectRetryStrategy : public Aws::Client::RetryStrategy { + public: + static const int32_t kDefaultRetryInterval = 200; /* milliseconds */ + static const int32_t kDefaultMaxRetryDuration = 6000; /* milliseconds */ + + explicit ConnectRetryStrategy(int32_t retry_interval = kDefaultRetryInterval, + int32_t max_retry_duration = kDefaultMaxRetryDuration) + : retry_interval_(retry_interval), max_retry_duration_(max_retry_duration) {} + + bool ShouldRetry(const Aws::Client::AWSError& error, + long attempted_retries) const override { // NOLINT runtime/int + if (!IsConnectError(error)) { + // Not a connect error, don't retry + return false; + } + return attempted_retries * retry_interval_ < max_retry_duration_; + } + + long CalculateDelayBeforeNextRetry( // NOLINT runtime/int + const Aws::Client::AWSError& error, + long attempted_retries) const override { // NOLINT runtime/int + return retry_interval_; + } + + protected: + int32_t retry_interval_; + int32_t max_retry_duration_; +}; + +} // namespace internal +} // namespace fs +} // namespace arrow diff --git a/cpp/include/milvus-storage/filesystem/s3/util_internal.h b/cpp/include/milvus-storage/filesystem/s3/util_internal.h new file mode 100644 index 0000000..35a9dca --- /dev/null +++ b/cpp/include/milvus-storage/filesystem/s3/util_internal.h @@ -0,0 +1,93 @@ + +#pragma once + +#include +#include +#include + +#include "arrow/filesystem/filesystem.h" +#include "arrow/io/interfaces.h" +#include "arrow/status.h" +#include "arrow/util/uri.h" +#include "arrow/util/visibility.h" + +namespace arrow { +using util::Uri; +namespace fs { +namespace internal { + +ARROW_EXPORT +TimePoint CurrentTimePoint(); + +ARROW_EXPORT +Status CopyStream(const std::shared_ptr& src, + const std::shared_ptr& dest, + int64_t chunk_size, + const io::IOContext& io_context); + +ARROW_EXPORT +Status PathNotFound(std::string_view path); + +ARROW_EXPORT +Status IsADir(std::string_view path); + +ARROW_EXPORT +Status NotADir(std::string_view path); + +ARROW_EXPORT +Status NotEmpty(std::string_view path); + +ARROW_EXPORT +Status NotAFile(std::string_view path); + +ARROW_EXPORT +Status InvalidDeleteDirContents(std::string_view path); + +/// \brief Parse the string as a URI +/// \param uri_string the string to parse +/// +/// This is the same as Uri::Parse except it tolerates Windows +/// file URIs that contain backslash instead of / +Result ParseFileSystemUri(const std::string& uri_string); + +/// \brief check if the string is a local absolute path +ARROW_EXPORT +bool DetectAbsolutePath(const std::string& s); + +/// \brief describes how to handle the authority (host) component of the URI +enum class AuthorityHandlingBehavior { + // Return an invalid status if the authority is non-empty + kDisallow = 0, + // Prepend the authority to the path (e.g. authority/some/path) + kPrepend = 1, + // Convert to a Windows style network path (e.g. //authority/some/path) + kWindows = 2, + // Ignore the authority and just use the path + kIgnore = 3 +}; + +/// \brief check to see if uri_string matches one of the supported schemes and return the +/// path component +/// \param uri_string a uri or local path to test and convert +/// \param supported_schemes the set of URI schemes that should be accepted +/// \param accept_local_paths if true, allow an absolute path +/// \return the path portion of the URI +Result PathFromUriHelper(const std::string& uri_string, + std::vector supported_schemes, + bool accept_local_paths, + AuthorityHandlingBehavior authority_handling); + +/// \brief Return files matching the glob pattern on the filesystem +/// +/// Globbing starts from the root of the filesystem. +ARROW_EXPORT +Result GlobFiles(const std::shared_ptr& filesystem, const std::string& glob); + +extern FileSystemGlobalOptions global_options; + +ARROW_EXPORT +Status PathNotFound(std::string_view path); + +} // namespace internal +} // namespace fs +} // namespace arrow diff --git a/cpp/src/filesystem/io/io_util.h b/cpp/src/filesystem/io/io_util.h new file mode 100644 index 0000000..880c539 --- /dev/null +++ b/cpp/src/filesystem/io/io_util.h @@ -0,0 +1,26 @@ +#include +#include +#include +#include "common/log.h" + +namespace milvus_storage { + +template +auto SubmitIO(arrow::io::IOContext io_context, SubmitArgs&&... submit_args) + -> decltype(std::declval<::arrow::internal::Executor*>()->Submit(submit_args...)) { + arrow::internal::TaskHints hints; + hints.external_id = io_context.external_id(); + return io_context.executor()->Submit(hints, io_context.stop_token(), std::forward(submit_args)...); +}; + +void CloseFromDestructor(arrow::io::FileInterface* file) { + arrow::Status st = file->Close(); + if (!st.ok()) { + auto file_type = typeid(*file).name(); + std::stringstream ss; + ss << "When destroying file of type " << file_type << ": " << st.message(); + LOG_STORAGE_FATAL_ << st.WithMessage(ss.str()); + } +} + +} // namespace milvus_storage \ No newline at end of file diff --git a/cpp/src/filesystem/s3/multi_part_upload_s3_fs.cpp b/cpp/src/filesystem/s3/multi_part_upload_s3_fs.cpp new file mode 100644 index 0000000..7997721 --- /dev/null +++ b/cpp/src/filesystem/s3/multi_part_upload_s3_fs.cpp @@ -0,0 +1,2275 @@ +// Copyright 2024 Zilliz +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "filesystem/s3/multi_part_upload_s3_fs.h" +#include "filesystem/s3/s3_internal.h" +#include "filesystem/s3/util_internal.h" + +#include "common/path_util.h" +#include "filesystem/io/io_util.h" + +#include "arrow/util/async_generator.h" +#include "arrow/util/logging.h" +#include "arrow/buffer.h" +#include "arrow/result.h" +#include "arrow/io/memory.h" +#include "arrow/util/future.h" +#include "arrow/util/thread_pool.h" +#include "arrow/filesystem/path_util.h" +#include "arrow/io/interfaces.h" +#include "arrow/util/key_value_metadata.h" +#include "arrow/filesystem/type_fwd.h" +#include "arrow/util/string.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static constexpr const char kSep = '/'; + +using ::arrow::Buffer; +using ::arrow::Future; +using ::arrow::Result; +using ::arrow::Status; +using ::arrow::fs::FileInfo; +using ::arrow::fs::FileInfoGenerator; +using ::arrow::fs::FileInfoVector; +using ::arrow::fs::FileSelector; +using ::arrow::fs::FileType; +using ::arrow::fs::kNoSize; +using ::arrow::fs::S3FileSystem; +using ::arrow::fs::S3RetryStrategy; +using ::arrow::fs::internal::ConnectRetryStrategy; +using ::arrow::fs::internal::DetectS3Backend; +using ::arrow::fs::internal::ErrorToStatus; +using ::arrow::fs::internal::FromAwsDatetime; +using ::arrow::fs::internal::FromAwsString; +using ::arrow::fs::internal::IsAlreadyExists; +using ::arrow::fs::internal::IsNotFound; +using ::arrow::fs::internal::OutcomeToResult; +using ::arrow::fs::internal::OutcomeToStatus; +using ::arrow::fs::internal::RemoveTrailingSlash; +using ::arrow::fs::internal::S3Backend; +using ::arrow::fs::internal::ToAwsString; +using ::arrow::fs::internal::ToURLEncodedAwsString; +using ::Aws::Client::AWSError; +using ::Aws::S3::S3Errors; + +using namespace ::arrow; + +namespace milvus_storage { + +static constexpr const char kAwsEndpointUrlEnvVar[] = "AWS_ENDPOINT_URL"; +static constexpr const char kAwsEndpointUrlS3EnvVar[] = "AWS_ENDPOINT_URL_S3"; +static constexpr const char kAwsDirectoryContentType[] = "application/x-directory"; + +bool IsDirectory(std::string_view key, const Aws::S3::Model::HeadObjectResult& result) { + // If it has a non-zero length, it's a regular file. We do this even if + // the key has a trailing slash, as directory markers should never have + // any data associated to them. + if (result.GetContentLength() > 0) { + return false; + } + // Otherwise, if it has a trailing slash, it's a directory + if (arrow::fs::internal::HasTrailingSlash(key)) { + return true; + } + // Otherwise, if its content type starts with "application/x-directory", + // it's a directory + if (::arrow::internal::StartsWith(result.GetContentType(), kAwsDirectoryContentType)) { + return true; + } + // Otherwise, it's a regular file. + return false; +} + +inline Aws::String ToAwsString(const std::string& s) { + // Direct construction of Aws::String from std::string doesn't work because + // it uses a specific Allocator class. + return Aws::String(s.begin(), s.end()); +} + +inline std::string_view FromAwsString(const Aws::String& s) { return {s.data(), s.length()}; } + +template +struct ObjectMetadataSetter { + using Setter = std::function; + + static std::unordered_map GetSetters() { + return {{"ACL", CannedACLSetter()}, + {"Cache-Control", StringSetter(&ObjectRequest::SetCacheControl)}, + {"Content-Type", ContentTypeSetter()}, + {"Content-Language", StringSetter(&ObjectRequest::SetContentLanguage)}, + {"Expires", DateTimeSetter(&ObjectRequest::SetExpires)}}; + } + + private: + static Setter StringSetter(void (ObjectRequest::*req_method)(Aws::String&&)) { + return [req_method](const std::string& v, ObjectRequest* req) { + (req->*req_method)(ToAwsString(v)); + return Status::OK(); + }; + } + + static Setter DateTimeSetter(void (ObjectRequest::*req_method)(Aws::Utils::DateTime&&)) { + return [req_method](const std::string& v, ObjectRequest* req) { + (req->*req_method)(Aws::Utils::DateTime(v.data(), Aws::Utils::DateFormat::ISO_8601)); + return Status::OK(); + }; + } + + static Setter CannedACLSetter() { + return [](const std::string& v, ObjectRequest* req) { + ARROW_ASSIGN_OR_RAISE(auto acl, ParseACL(v)); + req->SetACL(acl); + return Status::OK(); + }; + } + + /** We need a special setter here and can not use `StringSetter` because for e.g. the + * `PutObjectRequest`, the setter is located in the base class (instead of the concrete + * class). */ + static Setter ContentTypeSetter() { + return [](const std::string& str, ObjectRequest* req) { + req->SetContentType(str); + return Status::OK(); + }; + } + + static Result ParseACL(const std::string& v) { + if (v.empty()) { + return Aws::S3::Model::ObjectCannedACL::NOT_SET; + } + auto acl = Aws::S3::Model::ObjectCannedACLMapper::GetObjectCannedACLForName(ToAwsString(v)); + if (acl == Aws::S3::Model::ObjectCannedACL::NOT_SET) { + // XXX This actually never happens, as the AWS SDK dynamically + // expands the enum range using Aws::GetEnumOverflowContainer() + return Status::Invalid("Invalid S3 canned ACL: '", v, "'"); + } + return acl; + } +}; + +struct S3Path { + std::string full_path; + std::string bucket; + std::string key; + std::vector key_parts; + + static Result FromString(const std::string& s) { + if (arrow::fs::internal::IsLikelyUri(s)) { + return arrow::Status::Invalid("Expected an S3 object path of the form 'bucket/key...', got a URI: '", s, "'"); + } + const auto src = RemoveTrailingSlash(s); + auto first_sep = src.find_first_of(kSep); + if (first_sep == 0) { + return arrow::Status::Invalid("Path cannot start with a separator ('", s, "')"); + } + if (first_sep == std::string::npos) { + return S3Path{std::string(src), std::string(src), "", {}}; + } + S3Path path; + path.full_path = std::string(src); + path.bucket = std::string(src.substr(0, first_sep)); + path.key = std::string(src.substr(first_sep + 1)); + path.key_parts = arrow::fs::internal::SplitAbstractPath(path.key); + ARROW_RETURN_NOT_OK(Validate(path)); + return path; + } + + static arrow::Status Validate(const S3Path& path) { + auto st = arrow::fs::internal::ValidateAbstractPath(path.full_path); + if (!st.ok()) { + return arrow::Status::Invalid(st.message(), " in path ", path.full_path); + } + return arrow::Status::OK(); + } + + Aws::String ToAwsString() const { + Aws::String res(bucket.begin(), bucket.end()); + res.reserve(bucket.size() + key.size() + 1); + res += kSep; + res.append(key.begin(), key.end()); + return res; + } + + S3Path parent() const { + DCHECK(!key_parts.empty()); + auto parent = S3Path{"", bucket, "", key_parts}; + parent.key_parts.pop_back(); + parent.key = arrow::fs::internal::JoinAbstractPath(parent.key_parts); + parent.full_path = parent.bucket + kSep + parent.key; + return parent; + } + + bool has_parent() const { return !key.empty(); } + + bool empty() const { return bucket.empty() && key.empty(); } + + bool operator==(const S3Path& other) const { return bucket == other.bucket && key == other.key; } +}; + +Status PathNotFound(const S3Path& path) { return ::arrow::fs::internal::PathNotFound(path.full_path); } + +Status PathNotFound(const std::string& bucket, const std::string& key) { + return ::arrow::fs::internal::PathNotFound(bucket + kSep + key); +} + +arrow::Status NotAFile(const S3Path& path) { return NotAFile(path.full_path); } + +arrow::Status ValidateFilePath(const S3Path& path) { + if (path.bucket.empty() || path.key.empty()) { + return NotAFile(path); + } + return arrow::Status::OK(); +}; + +arrow::Status ErrorS3Finalized() { return arrow::Status::Invalid("S3 subsystem is finalized"); } + +arrow::Status CheckS3Initialized() { + if (!arrow::fs::IsS3Initialized()) { + if (arrow::fs::IsS3Finalized()) { + return ErrorS3Finalized(); + } + return arrow::Status::Invalid( + "S3 subsystem is not initialized; please call InitializeS3() " + "before carrying out any S3-related operation"); + } + return arrow::Status::OK(); +}; + +class WrappedRetryStrategy : public Aws::Client::RetryStrategy { + public: + explicit WrappedRetryStrategy(const std::shared_ptr& s3_retry_strategy) + : s3_retry_strategy_(s3_retry_strategy) {} + + bool ShouldRetry(const Aws::Client::AWSError& error, + long attempted_retries) const override { // NOLINT runtime/int + arrow::fs::S3RetryStrategy::AWSErrorDetail detail = ErrorToDetail(error); + return s3_retry_strategy_->ShouldRetry(detail, static_cast(attempted_retries)); + } + + long CalculateDelayBeforeNextRetry( // NOLINT runtime/int + const Aws::Client::AWSError& error, + long attempted_retries) const override { // NOLINT runtime/int + arrow::fs::S3RetryStrategy::AWSErrorDetail detail = ErrorToDetail(error); + return static_cast( // NOLINT runtime/int + s3_retry_strategy_->CalculateDelayBeforeNextRetry(detail, static_cast(attempted_retries))); + } + + private: + template + static arrow::fs::S3RetryStrategy::AWSErrorDetail ErrorToDetail(const Aws::Client::AWSError& error) { + arrow::fs::S3RetryStrategy::AWSErrorDetail detail; + detail.error_type = static_cast(error.GetErrorType()); + detail.message = std::string(FromAwsString(error.GetMessage())); + detail.exception_name = std::string(FromAwsString(error.GetExceptionName())); + detail.should_retry = error.ShouldRetry(); + return detail; + } + + std::shared_ptr s3_retry_strategy_; +}; + +class S3Client : public Aws::S3::S3Client { + public: + using Aws::S3::S3Client::S3Client; + + static inline constexpr auto kBucketRegionHeaderName = "x-amz-bucket-region"; + + std::string GetBucketRegionFromHeaders(const Aws::Http::HeaderValueCollection& headers) { + const auto it = headers.find(ToAwsString(kBucketRegionHeaderName)); + if (it != headers.end()) { + return std::string(FromAwsString(it->second)); + } + return std::string(); + } + + template + arrow::Result GetBucketRegionFromError(const std::string& bucket, + const Aws::Client::AWSError& error) { + std::string region = GetBucketRegionFromHeaders(error.GetResponseHeaders()); + if (!region.empty()) { + return region; + } else if (error.GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) { + return arrow::Status::IOError("Bucket '", bucket, "' not found"); + } else { + return arrow::Status::IOError("When resolving region for bucket: ", bucket); + } + } + + Result GetBucketRegion(const std::string& bucket, const Aws::S3::Model::HeadBucketRequest& request) { + auto uri = GeneratePresignedUrl(request.GetBucket(), + /*key=*/"", Aws::Http::HttpMethod::HTTP_HEAD); + // NOTE: The signer region argument isn't passed here, as there's no easy + // way of computing it (the relevant method is private). + auto outcome = MakeRequest(uri, request, Aws::Http::HttpMethod::HTTP_HEAD, Aws::Auth::SIGV4_SIGNER); + if (!outcome.IsSuccess()) { + return GetBucketRegionFromError(bucket, outcome.GetError()); + } + std::string region = GetBucketRegionFromHeaders(outcome.GetResult().GetHeaderValueCollection()); + if (!region.empty()) { + return region; + } else if (outcome.GetResult().GetResponseCode() == Aws::Http::HttpResponseCode::NOT_FOUND) { + return arrow::Status::IOError("Bucket '", request.GetBucket(), "' not found"); + } else { + return arrow::Status::IOError("When resolving region for bucket '", request.GetBucket(), + "': missing 'x-amz-bucket-region' header in response"); + } + } + + Result GetBucketRegion(const std::string& bucket) { + Aws::S3::Model::HeadBucketRequest req; + req.SetBucket(ToAwsString(bucket)); + return GetBucketRegion(bucket, req); + } + + Aws::S3::Model::CompleteMultipartUploadOutcome CompleteMultipartUploadWithErrorFixup( + Aws::S3::Model::CompleteMultipartUploadRequest&& request) const { + // CompletedMultipartUpload can return a 200 OK response with an error + // encoded in the response body, in which case we should either retry + // or propagate the error to the user (see + // https://docs.aws.amazon.com/AmazonS3/latest/API/API_CompleteMultipartUpload.html). + // + // Unfortunately the AWS SDK doesn't detect such situations but lets them + // return successfully (see https://github.com/aws/aws-sdk-cpp/issues/658). + // + // We work around the issue by registering a DataReceivedEventHandler + // which parses the XML response for embedded errors. + + std::optional> aws_error; + + auto handler = [&](const Aws::Http::HttpRequest* http_req, Aws::Http::HttpResponse* http_resp, + long long) { // NOLINT runtime/int + auto& stream = http_resp->GetResponseBody(); + const auto pos = stream.tellg(); + const auto doc = Aws::Utils::Xml::XmlDocument::CreateFromXmlStream(stream); + // Rewind stream for later + stream.clear(); + stream.seekg(pos); + + if (doc.WasParseSuccessful()) { + auto root = doc.GetRootElement(); + if (!root.IsNull()) { + // Detect something that looks like an abnormal CompletedMultipartUpload + // response. + if (root.GetName() != "CompleteMultipartUploadResult" || !root.FirstChild("Error").IsNull() || + !root.FirstChild("Errors").IsNull()) { + // Make sure the error marshaller doesn't see a 200 OK + http_resp->SetResponseCode(Aws::Http::HttpResponseCode::INTERNAL_SERVER_ERROR); + aws_error = GetErrorMarshaller()->Marshall(*http_resp); + // Rewind stream for later + stream.clear(); + stream.seekg(pos); + } + } + } + }; + + request.SetDataReceivedEventHandler(std::move(handler)); + + // We don't have access to the configured AWS retry strategy + // (m_retryStrategy is a private member of AwsClient), so don't use that. + std::unique_ptr retry_strategy; + if (s3_retry_strategy_) { + retry_strategy.reset(new WrappedRetryStrategy(s3_retry_strategy_)); + } else { + // Note that DefaultRetryStrategy, unlike StandardRetryStrategy, + // has empty definitions for RequestBookkeeping() and GetSendToken(), + // which simplifies the code below. + retry_strategy.reset(new Aws::Client::DefaultRetryStrategy()); + } + + for (int32_t retries = 0;; retries++) { + aws_error.reset(); + auto outcome = Aws::S3::S3Client::S3Client::CompleteMultipartUpload(request); + if (!outcome.IsSuccess()) { + // Error returned in HTTP headers (or client failure) + return outcome; + } + if (!aws_error.has_value()) { + // Genuinely successful outcome + return outcome; + } + + const bool should_retry = retry_strategy->ShouldRetry(*aws_error, retries); + + ARROW_LOG(WARNING) << "CompletedMultipartUpload got error embedded in a 200 OK response: " + << aws_error->GetExceptionName() << " (\"" << aws_error->GetMessage() + << "\"), retry = " << should_retry; + + if (!should_retry) { + break; + } + const auto delay = std::chrono::milliseconds(retry_strategy->CalculateDelayBeforeNextRetry(*aws_error, retries)); + std::this_thread::sleep_for(delay); + } + + DCHECK(aws_error.has_value()); + auto s3_error = AWSError(std::move(aws_error).value()); + return Aws::S3::Model::CompleteMultipartUploadOutcome(std::move(s3_error)); + } + + std::shared_ptr s3_retry_strategy_; +}; + +class S3ClientFinalizer; + +class S3ClientLock { + public: + S3Client* get() { return client_.get(); } + S3Client* operator->() { return client_.get(); } + + // Move this S3ClientLock into a temporary instance + // + // It is counter-intuitive, but lock ordering issues can happen even + // with a shared mutex locked in shared mode. + // The reason is that locking again in shared mode can block while + // there are threads waiting to take the lock in exclusive mode. + // Therefore, we should avoid obtaining the S3ClientLock when + // we already have it locked. + // + // This methods helps by moving the S3ClientLock into a temporary + // that is immediately destroyed so the lock will be released as + // soon as we are done making the call to the underlying client. + // + // (see GH-36523) + S3ClientLock Move() { return std::move(*this); } + + protected: + friend class S3ClientHolder; + + // Locks the finalizer until the S3ClientLock gets out of scope. + std::shared_lock lock_; + std::shared_ptr client_; +}; + +class S3ClientHolder { + public: + /// \brief Return a RAII guard guaranteeing a S3Client is safe for use + /// + /// S3 finalization will be deferred until the returned S3ClientLock + /// goes out of scope. + /// An error is returned if S3 is already finalized. + arrow::Result Lock(); + + S3ClientHolder(std::weak_ptr finalizer, std::shared_ptr client) + : finalizer_(std::move(finalizer)), client_(std::move(client)) {} + + void Finalize(); + + protected: + std::mutex mutex_; + std::weak_ptr finalizer_; + std::shared_ptr client_; +}; + +class S3ClientFinalizer : public std::enable_shared_from_this { + using ClientHolderList = std::vector>; + + public: + arrow::Result> AddClient(std::shared_ptr client) { + std::unique_lock lock(mutex_); + if (finalized_) { + return ErrorS3Finalized(); + } + + auto holder = std::make_shared(shared_from_this(), std::move(client)); + + // Remove expired entries before adding new one + auto end = std::remove_if(holders_.begin(), holders_.end(), + [](std::weak_ptr holder) { return holder.expired(); }); + holders_.erase(end, holders_.end()); + holders_.emplace_back(holder); + return holder; + } + + void Finalize() { + std::unique_lock lock(mutex_); + finalized_ = true; + + ClientHolderList finalizing = std::move(holders_); + lock.unlock(); // avoid lock ordering issue with S3ClientHolder::Finalize + + // Finalize all client holders, such that no S3Client remains alive + // after this. + for (auto&& weak_holder : finalizing) { + auto holder = weak_holder.lock(); + if (holder) { + holder->Finalize(); + } + } + } + + auto LockShared() { return std::shared_lock(mutex_); } + + protected: + friend class S3ClientHolder; + + std::shared_mutex mutex_; + ClientHolderList holders_; + bool finalized_ = false; +}; + +arrow::Result S3ClientHolder::Lock() { + std::shared_ptr finalizer; + std::shared_ptr client; + { + std::unique_lock lock(mutex_); + finalizer = finalizer_.lock(); + client = client_; + } + // Do not hold mutex while taking finalizer lock below. + // + // Acquiring a shared_mutex in shared mode may block even if not already + // acquired in exclusive mode, because of pending writers: + // https://github.com/google/sanitizers/issues/1668#issuecomment-1624985664 + // """It is implementation-defined whether the calling thread acquires + // the lock when a writer does not hold the lock and there are writers + // blocked on the lock""". + // + // Therefore, we want to avoid potential lock ordering issues + // even when a shared lock is involved (GH-36523). + if (!finalizer) { + return ErrorS3Finalized(); + } + + S3ClientLock client_lock; + // Lock the finalizer before examining it + client_lock.lock_ = finalizer->LockShared(); + if (finalizer->finalized_) { + return ErrorS3Finalized(); + } + // (the client can be cleared only if finalizer->finalized_ is true) + DCHECK(client) << "inconsistent S3ClientHolder"; + client_lock.client_ = std::move(client); + return client_lock; +} + +void S3ClientHolder::Finalize() { + std::shared_ptr client; + { + std::unique_lock lock(mutex_); + client = std::move(client_); + } + // Do not hold mutex while ~S3Client potentially runs +} + +std::shared_ptr GetClientFinalizer() { + static auto finalizer = std::make_shared(); + return finalizer; +} + +arrow::Result> GetClientHolder(std::shared_ptr client) { + return GetClientFinalizer()->AddClient(std::move(client)); +} + +template +arrow::Status SetObjectMetadata(const std::shared_ptr& metadata, ObjectRequest* req) { + static auto setters = ObjectMetadataSetter::GetSetters(); + + DCHECK_NE(metadata, nullptr); + const auto& keys = metadata->keys(); + const auto& values = metadata->values(); + + for (size_t i = 0; i < keys.size(); ++i) { + auto it = setters.find(keys[i]); + if (it != setters.end()) { + RETURN_NOT_OK(it->second(values[i], req)); + } + } + return arrow::Status::OK(); +} + +class StringViewStream : Aws::Utils::Stream::PreallocatedStreamBuf, public std::iostream { + public: + StringViewStream(const void* data, int64_t nbytes) + : Aws::Utils::Stream::PreallocatedStreamBuf(reinterpret_cast(const_cast(data)), + static_cast(nbytes)), + std::iostream(this) {} +}; + +class ClientBuilder { + public: + explicit ClientBuilder(S3Options options) : options_(std::move(options)) {} + + const Aws::Client::ClientConfiguration& config() const { return client_config_; } + + Aws::Client::ClientConfiguration* mutable_config() { return &client_config_; } + + Result> BuildClient(std::optional io_context = std::nullopt) { + credentials_provider_ = options_.credentials_provider; + if (!options_.region.empty()) { + client_config_.region = ToAwsString(options_.region); + } + if (options_.request_timeout > 0) { + // Use ceil() to avoid setting it to 0 as that probably means no timeout. + client_config_.requestTimeoutMs = static_cast(ceil(options_.request_timeout * 1000)); // NOLINT runtime/int + } + if (options_.connect_timeout > 0) { + client_config_.connectTimeoutMs = static_cast(ceil(options_.connect_timeout * 1000)); // NOLINT runtime/int + } + + client_config_.endpointOverride = ToAwsString(options_.endpoint_override); + if (options_.scheme == "http") { + client_config_.scheme = Aws::Http::Scheme::HTTP; + } else if (options_.scheme == "https") { + client_config_.scheme = Aws::Http::Scheme::HTTPS; + } else { + return Status::Invalid("Invalid S3 connection scheme '", options_.scheme, "'"); + } + if (options_.retry_strategy) { + client_config_.retryStrategy = std::make_shared(options_.retry_strategy); + } else { + client_config_.retryStrategy = std::make_shared(); + } + if (!arrow::fs::internal::global_options.tls_ca_file_path.empty()) { + client_config_.caFile = ToAwsString(arrow::fs::internal::global_options.tls_ca_file_path); + } + if (!arrow::fs::internal::global_options.tls_ca_dir_path.empty()) { + client_config_.caPath = ToAwsString(arrow::fs::internal::global_options.tls_ca_dir_path); + } + + // Set proxy options if provided + if (!options_.proxy_options.scheme.empty()) { + if (options_.proxy_options.scheme == "http") { + client_config_.proxyScheme = Aws::Http::Scheme::HTTP; + } else if (options_.proxy_options.scheme == "https") { + client_config_.proxyScheme = Aws::Http::Scheme::HTTPS; + } else { + return Status::Invalid("Invalid proxy connection scheme '", options_.proxy_options.scheme, "'"); + } + } + if (!options_.proxy_options.host.empty()) { + client_config_.proxyHost = ToAwsString(options_.proxy_options.host); + } + if (options_.proxy_options.port != -1) { + client_config_.proxyPort = options_.proxy_options.port; + } + if (!options_.proxy_options.username.empty()) { + client_config_.proxyUserName = ToAwsString(options_.proxy_options.username); + } + if (!options_.proxy_options.password.empty()) { + client_config_.proxyPassword = ToAwsString(options_.proxy_options.password); + } + + if (io_context) { + // TODO: Once ARROW-15035 is done we can get rid of the "at least 25" fallback + client_config_.maxConnections = std::max(io_context->executor()->GetCapacity(), 25); + } + + const bool use_virtual_addressing = options_.endpoint_override.empty() || options_.force_virtual_addressing; + +#ifdef ARROW_S3_HAS_S3CLIENT_CONFIGURATION + client_config_.useVirtualAddressing = use_virtual_addressing; + auto endpoint_provider = EndpointProviderCache::Instance()->Lookup(client_config_); + auto client = std::make_shared(credentials_provider_, endpoint_provider, client_config_); +#else + auto client = + std::make_shared(credentials_provider_, client_config_, + Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never, use_virtual_addressing); +#endif + client->s3_retry_strategy_ = options_.retry_strategy; + return GetClientHolder(std::move(client)); + } + + const S3Options& options() const { return options_; } + + protected: + S3Options options_; +#ifdef ARROW_S3_HAS_S3CLIENT_CONFIGURATION + Aws::S3::S3ClientConfiguration client_config_; +#else + Aws::Client::ClientConfiguration client_config_; +#endif + std::shared_ptr credentials_provider_; +}; + +std::string FormatRange(int64_t start, int64_t length) { + // Format a HTTP range header value + std::stringstream ss; + ss << "bytes=" << start << "-" << start + length - 1; + return ss.str(); +} + +Aws::IOStreamFactory AwsWriteableStreamFactory(void* data, int64_t nbytes) { + return [=]() { return Aws::New("", data, nbytes); }; +} + +Result GetObjectRange( + Aws::S3::S3Client* client, const S3Path& path, int64_t start, int64_t length, void* out) { + Aws::S3::Model::GetObjectRequest req; + req.SetBucket(ToAwsString(path.bucket)); + req.SetKey(ToAwsString(path.key)); + req.SetRange(ToAwsString(FormatRange(start, length))); + req.SetResponseStreamFactory(AwsWriteableStreamFactory(out, length)); + return OutcomeToResult("GetObject", client->GetObject(req)); +} + +template +std::shared_ptr GetObjectMetadata(const ObjectResult& result) { + auto md = std::make_shared(); + + auto push = [&](std::string k, const Aws::String& v) { + if (!v.empty()) { + md->Append(std::move(k), std::string(FromAwsString(v))); + } + }; + auto push_datetime = [&](std::string k, const Aws::Utils::DateTime& v) { + if (v != Aws::Utils::DateTime(0.0)) { + push(std::move(k), v.ToGmtString(Aws::Utils::DateFormat::ISO_8601)); + } + }; + + md->Append("Content-Length", ToChars(result.GetContentLength())); + push("Cache-Control", result.GetCacheControl()); + push("Content-Type", result.GetContentType()); + push("Content-Language", result.GetContentLanguage()); + push("ETag", result.GetETag()); + push("VersionId", result.GetVersionId()); + push_datetime("Last-Modified", result.GetLastModified()); + push_datetime("Expires", result.GetExpires()); + // NOTE the "canned ACL" isn't available for reading (one can get an expanded + // ACL using a separate GetObjectAcl request) + return md; +} + +class ObjectInputFile final : public io::RandomAccessFile { + public: + ObjectInputFile(std::shared_ptr holder, + const io::IOContext& io_context, + const S3Path& path, + int64_t size = kNoSize) + : holder_(std::move(holder)), io_context_(io_context), path_(path), content_length_(size) {} + + Status Init() { + // Issue a HEAD Object to get the content-length and ensure any + // errors (e.g. file not found) don't wait until the first Read() call. + if (content_length_ != kNoSize) { + DCHECK_GE(content_length_, 0); + return Status::OK(); + } + + Aws::S3::Model::HeadObjectRequest req; + req.SetBucket(ToAwsString(path_.bucket)); + req.SetKey(ToAwsString(path_.key)); + + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + auto outcome = client_lock.Move()->HeadObject(req); + if (!outcome.IsSuccess()) { + if (IsNotFound(outcome.GetError())) { + return PathNotFound(path_); + } else { + return ErrorToStatus(std::forward_as_tuple("When reading information for key '", path_.key, "' in bucket '", + path_.bucket, "': "), + "HeadObject", outcome.GetError()); + } + } + content_length_ = outcome.GetResult().GetContentLength(); + DCHECK_GE(content_length_, 0); + metadata_ = GetObjectMetadata(outcome.GetResult()); + return Status::OK(); + } + + Status CheckClosed() const { + if (closed_) { + return Status::Invalid("Operation on closed stream"); + } + return Status::OK(); + } + + Status CheckPosition(int64_t position, const char* action) const { + if (position < 0) { + return Status::Invalid("Cannot ", action, " from negative position"); + } + if (position > content_length_) { + return Status::IOError("Cannot ", action, " past end of file"); + } + return Status::OK(); + } + + // RandomAccessFile APIs + + Result> ReadMetadata() override { return metadata_; } + + Future> ReadMetadataAsync(const io::IOContext& io_context) override { + return metadata_; + } + + Status Close() override { + holder_ = nullptr; + closed_ = true; + return Status::OK(); + } + + bool closed() const override { return closed_; } + + Result Tell() const override { + RETURN_NOT_OK(CheckClosed()); + return pos_; + } + + Result GetSize() override { + RETURN_NOT_OK(CheckClosed()); + return content_length_; + } + + Status Seek(int64_t position) override { + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(CheckPosition(position, "seek")); + + pos_ = position; + return Status::OK(); + } + + Result ReadAt(int64_t position, int64_t nbytes, void* out) override { + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(CheckPosition(position, "read")); + + nbytes = std::min(nbytes, content_length_ - position); + if (nbytes == 0) { + return 0; + } + + // Read the desired range of bytes + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + ARROW_ASSIGN_OR_RAISE(Aws::S3::Model::GetObjectResult result, + GetObjectRange(client_lock.get(), path_, position, nbytes, out)); + + auto& stream = result.GetBody(); + stream.ignore(nbytes); + // NOTE: the stream is a stringstream by default, there is no actual error + // to check for. However, stream.fail() may return true if EOF is reached. + return stream.gcount(); + } + + Result> ReadAt(int64_t position, int64_t nbytes) override { + RETURN_NOT_OK(CheckClosed()); + RETURN_NOT_OK(CheckPosition(position, "read")); + + // No need to allocate more than the remaining number of bytes + nbytes = std::min(nbytes, content_length_ - position); + + ARROW_ASSIGN_OR_RAISE(auto buf, AllocateResizableBuffer(nbytes, io_context_.pool())); + if (nbytes > 0) { + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(position, nbytes, buf->mutable_data())); + DCHECK_LE(bytes_read, nbytes); + RETURN_NOT_OK(buf->Resize(bytes_read)); + } + // R build with openSUSE155 requires an explicit shared_ptr construction + return std::shared_ptr(std::move(buf)); + } + + Result Read(int64_t nbytes, void* out) override { + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(pos_, nbytes, out)); + pos_ += bytes_read; + return bytes_read; + } + + Result> Read(int64_t nbytes) override { + ARROW_ASSIGN_OR_RAISE(auto buffer, ReadAt(pos_, nbytes)); + pos_ += buffer->size(); + return buffer; + } + + protected: + std::shared_ptr holder_; + const io::IOContext io_context_; + S3Path path_; + + bool closed_ = false; + int64_t pos_ = 0; + int64_t content_length_ = kNoSize; + std::shared_ptr metadata_; +}; + +void FileObjectToInfo(const Aws::S3::Model::Object& obj, FileInfo* info) { + info->set_type(arrow::fs::FileType::File); + info->set_size(static_cast(obj.GetSize())); + info->set_mtime(FromAwsDatetime(obj.GetLastModified())); +} + +class CustomOutputStream final : public arrow::io::OutputStream { + protected: + struct UploadState; + + public: + CustomOutputStream(std::shared_ptr holder, + const arrow::io::IOContext& io_context, + const S3Path& path, + const S3Options& options, + const std::shared_ptr& metadata, + const int64_t part_size) + : holder_(std::move(holder)), + io_context_(io_context), + path_(path), + metadata_(metadata), + default_metadata_(options.default_metadata), + background_writes_(options.background_writes), + allow_delayed_open_(options.allow_delayed_open), + part_upload_size_(part_size), + multi_part_upload_threshold_size_(part_size - 1) {} + + ~CustomOutputStream() override { + // For compliance with the rest of the IO stack, Close rather than Abort, + // even though it may be more expensive. + CloseFromDestructor(this); + } + + template + arrow::Status SetMetadataInRequest(ObjectRequest* request) { + std::shared_ptr metadata; + + if (metadata_ && metadata_->size() != 0) { + metadata = metadata_; + } else if (default_metadata_ && default_metadata_->size() != 0) { + metadata = default_metadata_; + } + + bool is_content_type_set{false}; + if (metadata) { + RETURN_NOT_OK(SetObjectMetadata(metadata, request)); + + is_content_type_set = metadata->Contains("Content-Type"); + } + + if (!is_content_type_set) { + // If we do not set anything then the SDK will default to application/xml + // which confuses some tools (https://github.com/apache/arrow/issues/11934) + // So we instead default to application/octet-stream which is less misleading + request->SetContentType("application/octet-stream"); + } + + return arrow::Status::OK(); + } + + std::shared_ptr Self() { + return std::dynamic_pointer_cast(shared_from_this()); + } + + arrow::Status CreateMultipartUpload() { + DCHECK(ShouldBeMultipartUpload()); + + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + + // Initiate the multi-part upload + Aws::S3::Model::CreateMultipartUploadRequest req; + req.SetBucket(ToAwsString(path_.bucket)); + req.SetKey(ToAwsString(path_.key)); + RETURN_NOT_OK(SetMetadataInRequest(&req)); + + auto outcome = client_lock.Move()->CreateMultipartUpload(req); + if (!outcome.IsSuccess()) { + return arrow::Status::Invalid("When initiating multiple part upload for key '", path_.key, "' in bucket '", + path_.bucket + "': ", "CreateMultipartUpload", outcome.GetError()); + } + multipart_upload_id_ = outcome.GetResult().GetUploadId(); + + return arrow::Status::OK(); + } + + arrow::Status Init() { + // If we are allowed to do delayed I/O, we can use a single request to upload the + // data. If not, we use a multi-part upload and initiate it here to + // sanitize that writing to the bucket is possible. + if (!allow_delayed_open_) { + RETURN_NOT_OK(CreateMultipartUpload()); + } + + upload_state_ = std::make_shared(); + closed_ = false; + return arrow::Status::OK(); + } + + arrow::Status Abort() override { + if (closed_) { + return arrow::Status::OK(); + } + + if (IsMultipartCreated()) { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + + Aws::S3::Model::AbortMultipartUploadRequest req; + req.SetBucket(ToAwsString(path_.bucket)); + req.SetKey(ToAwsString(path_.key)); + req.SetUploadId(multipart_upload_id_); + + auto outcome = client_lock.Move()->AbortMultipartUpload(req); + if (!outcome.IsSuccess()) { + return arrow::Status::Invalid("When aborting multiple part upload for key '", path_.key, "' in bucket '", + path_.bucket, "': ", "AbortMultipartUpload", outcome.GetError()); + } + } + + current_part_.reset(); + holder_ = nullptr; + closed_ = true; + + return arrow::Status::OK(); + } + + // OutputStream interface + + bool ShouldBeMultipartUpload() const { return pos_ > multi_part_upload_threshold_size_ || !allow_delayed_open_; } + + bool IsMultipartCreated() const { return !multipart_upload_id_.empty(); } + + arrow::Status EnsureReadyToFlushFromClose() { + if (ShouldBeMultipartUpload()) { + if (current_part_) { + // Upload last part + RETURN_NOT_OK(CommitCurrentPart()); + } + + // S3 mandates at least one part, upload an empty one if necessary + if (part_number_ == 1) { + RETURN_NOT_OK(UploadPart("", 0)); + } + } else { + RETURN_NOT_OK(UploadUsingSingleRequest()); + } + + return arrow::Status::OK(); + } + + arrow::Status CleanupAfterClose() { + holder_ = nullptr; + closed_ = true; + return arrow::Status::OK(); + } + + arrow::Status FinishPartUploadAfterFlush() { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + + // At this point, all part uploads have finished successfully + DCHECK_GT(part_number_, 1); + DCHECK_EQ(upload_state_->completed_parts.size(), static_cast(part_number_ - 1)); + + Aws::S3::Model::CompletedMultipartUpload completed_upload; + completed_upload.SetParts(upload_state_->completed_parts); + Aws::S3::Model::CompleteMultipartUploadRequest req; + req.SetBucket(ToAwsString(path_.bucket)); + req.SetKey(ToAwsString(path_.key)); + req.SetUploadId(multipart_upload_id_); + req.SetMultipartUpload(std::move(completed_upload)); + + auto outcome = client_lock.Move()->CompleteMultipartUploadWithErrorFixup(std::move(req)); + if (!outcome.IsSuccess()) { + return Status::IOError("When completing multiple part upload for key '" + path_.key + "' in bucket '" + + path_.bucket + "':CompleteMultipartUpload error"); + } + + return arrow::Status::OK(); + } + + arrow::Status Close() override { + if (closed_) + return arrow::Status::OK(); + + RETURN_NOT_OK(EnsureReadyToFlushFromClose()); + + RETURN_NOT_OK(Flush()); + + if (IsMultipartCreated()) { + RETURN_NOT_OK(FinishPartUploadAfterFlush()); + } + + return CleanupAfterClose(); + } + + Future<> CloseAsync() override { + if (closed_) + return arrow::Status::OK(); + + RETURN_NOT_OK(EnsureReadyToFlushFromClose()); + + // Wait for in-progress uploads to finish (if async writes are enabled) + return FlushAsync().Then([self = Self()]() { + if (self->IsMultipartCreated()) { + RETURN_NOT_OK(self->FinishPartUploadAfterFlush()); + } + return self->CleanupAfterClose(); + }); + } + + bool closed() const override { return closed_; } + + Result Tell() const override { + if (closed_) { + return arrow::Status::Invalid("Operation on closed stream"); + } + return pos_; + } + + arrow::Status Write(const std::shared_ptr& buffer) override { + return DoWrite(buffer->data(), buffer->size(), buffer); + } + + arrow::Status Write(const void* data, int64_t nbytes) override { return DoWrite(data, nbytes); } + + arrow::Status DoWrite(const void* data, int64_t nbytes, std::shared_ptr owned_buffer = nullptr) { + if (closed_) { + return arrow::Status::Invalid("Operation on closed stream"); + } + + const int8_t* data_ptr = reinterpret_cast(data); + auto advance_ptr = [&data_ptr, &nbytes](const int64_t offset) { + data_ptr += offset; + nbytes -= offset; + }; + + // Handle case where we have some bytes buffered from prior calls. + if (current_part_size_ > 0) { + // Try to fill current buffer + const int64_t to_copy = std::min(nbytes, part_upload_size_ - current_part_size_); + RETURN_NOT_OK(current_part_->Write(data_ptr, to_copy)); + current_part_size_ += to_copy; + advance_ptr(to_copy); + pos_ += to_copy; + + // If buffer isn't full, break + if (current_part_size_ < part_upload_size_) { + return arrow::Status::OK(); + } + + // Upload current buffer. We're only reaching this point if we have accumulated + // enough data to upload. + RETURN_NOT_OK(CommitCurrentPart()); + } + + // We can upload chunks without copying them into a buffer + while (nbytes >= part_upload_size_) { + RETURN_NOT_OK(UploadPart(data_ptr, part_upload_size_)); + advance_ptr(part_upload_size_); + pos_ += part_upload_size_; + } + + // Buffer remaining bytes + if (nbytes > 0) { + current_part_size_ = nbytes; + ARROW_ASSIGN_OR_RAISE(current_part_, + arrow::io::BufferOutputStream::Create(part_upload_size_, io_context_.pool())); + RETURN_NOT_OK(current_part_->Write(data_ptr, current_part_size_)); + pos_ += current_part_size_; + } + + return arrow::Status::OK(); + } + + arrow::Status Flush() override { + auto fut = FlushAsync(); + return fut.status(); + } + + Future<> FlushAsync() { + if (closed_) { + return arrow::Status::Invalid("Operation on closed stream"); + } + // Wait for background writes to finish + std::unique_lock lock(upload_state_->mutex); + return upload_state_->pending_uploads_completed; + } + + // Upload-related helpers + + arrow::Status CommitCurrentPart() { + if (!IsMultipartCreated()) { + RETURN_NOT_OK(CreateMultipartUpload()); + } + + ARROW_ASSIGN_OR_RAISE(auto buf, current_part_->Finish()); + current_part_.reset(); + current_part_size_ = 0; + return UploadPart(buf); + } + + arrow::Status UploadUsingSingleRequest() { + std::shared_ptr buf; + if (current_part_ == nullptr) { + // In case the stream is closed directly after it has been opened without writing + // anything, we'll have to create an empty buffer. + buf = std::make_shared(""); + } else { + ARROW_ASSIGN_OR_RAISE(buf, current_part_->Finish()); + } + + current_part_.reset(); + current_part_size_ = 0; + return UploadUsingSingleRequest(buf); + } + + template + using UploadResultCallbackFunction = std::function, int32_t part_number, OutcomeType outcome)>; + + static Result TriggerUploadRequest(const Aws::S3::Model::PutObjectRequest& request, + const std::shared_ptr& holder) { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder->Lock()); + return client_lock.Move()->PutObject(request); + } + + static Result TriggerUploadRequest( + const Aws::S3::Model::UploadPartRequest& request, const std::shared_ptr& holder) { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder->Lock()); + return client_lock.Move()->UploadPart(request); + } + + template + arrow::Status Upload(RequestType&& req, + UploadResultCallbackFunction sync_result_callback, + UploadResultCallbackFunction async_result_callback, + const void* data, + int64_t nbytes, + std::shared_ptr owned_buffer = nullptr) { + req.SetBucket(ToAwsString(path_.bucket)); + req.SetKey(ToAwsString(path_.key)); + req.SetBody(std::make_shared(data, nbytes)); + req.SetContentLength(nbytes); + + if (!background_writes_) { + req.SetBody(std::make_shared(data, nbytes)); + + ARROW_ASSIGN_OR_RAISE(auto outcome, TriggerUploadRequest(req, holder_)); + + RETURN_NOT_OK(sync_result_callback(req, upload_state_, part_number_, outcome)); + } else { + // If the data isn't owned, make an immutable copy for the lifetime of the closure + if (owned_buffer == nullptr) { + ARROW_ASSIGN_OR_RAISE(owned_buffer, AllocateBuffer(nbytes, io_context_.pool())); + memcpy(owned_buffer->mutable_data(), data, nbytes); + } else { + DCHECK_EQ(data, owned_buffer->data()); + DCHECK_EQ(nbytes, owned_buffer->size()); + } + req.SetBody(std::make_shared(owned_buffer->data(), owned_buffer->size())); + + { + std::unique_lock lock(upload_state_->mutex); + if (upload_state_->uploads_in_progress++ == 0) { + upload_state_->pending_uploads_completed = Future<>::Make(); + } + } + + // The closure keeps the buffer and the upload state alive + auto deferred = [owned_buffer, holder = holder_, req = std::move(req), state = upload_state_, + async_result_callback, part_number = part_number_]() mutable -> arrow::Status { + ARROW_ASSIGN_OR_RAISE(auto outcome, TriggerUploadRequest(req, holder)); + + return async_result_callback(req, state, part_number, outcome); + }; + ARROW_RETURN_NOT_OK(SubmitIO(io_context_, std::move(deferred))); + } + + ++part_number_; + + return arrow::Status::OK(); + } + + static arrow::Status UploadUsingSingleRequestError(const Aws::S3::Model::PutObjectRequest& request, + const Aws::S3::Model::PutObjectOutcome& outcome) { + return arrow::Status::IOError("When uploading object with key '" + request.GetKey() + "' in bucket '", + request.GetBucket() + "': PutObject failed"); + } + + arrow::Status UploadUsingSingleRequest(std::shared_ptr buffer) { + return UploadUsingSingleRequest(buffer->data(), buffer->size(), buffer); + } + + arrow::Status UploadUsingSingleRequest(const void* data, + int64_t nbytes, + std::shared_ptr owned_buffer = nullptr) { + auto sync_result_callback = [](const Aws::S3::Model::PutObjectRequest& request, std::shared_ptr state, + int32_t part_number, Aws::S3::Model::PutObjectOutcome outcome) { + if (!outcome.IsSuccess()) { + return UploadUsingSingleRequestError(request, outcome); + } + return arrow::Status::OK(); + }; + + auto async_result_callback = [](const Aws::S3::Model::PutObjectRequest& request, std::shared_ptr state, + int32_t part_number, Aws::S3::Model::PutObjectOutcome outcome) { + HandleUploadUsingSingleRequestOutcome(state, request, outcome.GetResult()); + return arrow::Status::OK(); + }; + + Aws::S3::Model::PutObjectRequest req{}; + RETURN_NOT_OK(SetMetadataInRequest(&req)); + + return Upload( + std::move(req), std::move(sync_result_callback), std::move(async_result_callback), data, nbytes, + std::move(owned_buffer)); + } + + arrow::Status UploadPart(std::shared_ptr buffer) { + return UploadPart(buffer->data(), buffer->size(), buffer); + } + + static arrow::Status UploadPartError(const Aws::S3::Model::UploadPartRequest& request, + const Aws::S3::Model::UploadPartOutcome& outcome) { + return arrow::Status::IOError("When uploading part for key '" + request.GetKey() + "' in bucket '" + + request.GetBucket() + "': UploadPart failed"); + } + + arrow::Status UploadPart(const void* data, int64_t nbytes, std::shared_ptr owned_buffer = nullptr) { + if (!IsMultipartCreated()) { + RETURN_NOT_OK(CreateMultipartUpload()); + } + + Aws::S3::Model::UploadPartRequest req{}; + req.SetPartNumber(part_number_); + req.SetUploadId(multipart_upload_id_); + + auto sync_result_callback = [](const Aws::S3::Model::UploadPartRequest& request, std::shared_ptr state, + int32_t part_number, Aws::S3::Model::UploadPartOutcome outcome) { + if (!outcome.IsSuccess()) { + return UploadPartError(request, outcome); + } else { + AddCompletedPart(state, part_number, outcome.GetResult()); + } + + return arrow::Status::OK(); + }; + + auto async_result_callback = [](const Aws::S3::Model::UploadPartRequest& request, + std::shared_ptr state, int32_t part_number, + Aws::S3::Model::UploadPartOutcome outcome) { + HandleUploadPartOutcome(state, part_number, request, outcome.GetResult()); + return arrow::Status::OK(); + }; + + return Upload( + std::move(req), std::move(sync_result_callback), std::move(async_result_callback), data, nbytes, + std::move(owned_buffer)); + } + + static void HandleUploadUsingSingleRequestOutcome(const std::shared_ptr& state, + const Aws::S3::Model::PutObjectRequest& req, + const arrow::Result& result) { + std::unique_lock lock(state->mutex); + if (!result.ok()) { + state->status &= result.status(); + } else { + const auto& outcome = *result; + if (!outcome.IsSuccess()) { + state->status &= UploadUsingSingleRequestError(req, outcome); + } + } + // GH-41862: avoid potential deadlock if the Future's callback is called + // with the mutex taken. + auto fut = state->pending_uploads_completed; + lock.unlock(); + fut.MarkFinished(state->status); + } + + static void HandleUploadPartOutcome(const std::shared_ptr& state, + int part_number, + const Aws::S3::Model::UploadPartRequest& req, + const Result& result) { + std::unique_lock lock(state->mutex); + if (!result.ok()) { + state->status &= result.status(); + } else { + const auto& outcome = *result; + if (!outcome.IsSuccess()) { + state->status &= UploadPartError(req, outcome); + } else { + AddCompletedPart(state, part_number, outcome.GetResult()); + } + } + // Notify completion + if (--state->uploads_in_progress == 0) { + // GH-41862: avoid potential deadlock if the Future's callback is called + // with the mutex taken. + auto fut = state->pending_uploads_completed; + lock.unlock(); + // State could be mutated concurrently if another thread writes to the + // stream, but in this case the Flush() call is only advisory anyway. + // Besides, it's not generally sound to write to an OutputStream from + // several threads at once. + fut.MarkFinished(state->status); + } + } + + static void AddCompletedPart(const std::shared_ptr& state, + int part_number, + const Aws::S3::Model::UploadPartResult& result) { + Aws::S3::Model::CompletedPart part; + // Append ETag and part number for this uploaded part + // (will be needed for upload completion in Close()) + part.SetPartNumber(part_number); + part.SetETag(result.GetETag()); + int slot = part_number - 1; + if (state->completed_parts.size() <= static_cast(slot)) { + state->completed_parts.resize(slot + 1); + } + DCHECK(!state->completed_parts[slot].PartNumberHasBeenSet()); + state->completed_parts[slot] = std::move(part); + } + + protected: + std::shared_ptr holder_; + const arrow::io::IOContext io_context_; + const S3Path path_; + const std::shared_ptr metadata_; + const std::shared_ptr default_metadata_; + const bool background_writes_; + const bool allow_delayed_open_; + + int64_t part_upload_size_; + int64_t multi_part_upload_threshold_size_; + + Aws::String multipart_upload_id_; + bool closed_ = true; + int64_t pos_ = 0; + int32_t part_number_ = 1; + std::shared_ptr current_part_; + int64_t current_part_size_ = 0; + + // This struct is kept alive through background writes to avoid problems + // in the completion handler. + struct UploadState { + std::mutex mutex; + // Only populated for multi-part uploads. + Aws::Vector completed_parts; + int64_t uploads_in_progress = 0; + arrow::Status status; + arrow::Future<> pending_uploads_completed = arrow::Future<>::MakeFinished(arrow::Status::OK()); + }; + std::shared_ptr upload_state_; +}; + +class MultiPartUploadS3FS::Impl : public std::enable_shared_from_this { + public: + ClientBuilder builder_; + arrow::io::IOContext io_context_; + std::shared_ptr holder_; + std::optional backend_; + + static constexpr int32_t kListObjectsMaxKeys = 1000; + // At most 1000 keys per multiple-delete request + static constexpr int32_t kMultipleDeleteMaxKeys = 1000; + + explicit Impl(S3Options options, io::IOContext io_context) : builder_(std::move(options)), io_context_(io_context) {} + + arrow::Status Init() { return builder_.BuildClient(io_context_).Value(&holder_); } + + const S3Options& options() const { return builder_.options(); } + + std::string region() const { return std::string(FromAwsString(builder_.config().region)); } + + template + void SaveBackend(const Aws::Client::AWSError& error) { + if (!backend_ || *backend_ == S3Backend::Other) { + backend_ = DetectS3Backend(error); + } + } + + // Tests to see if a bucket exists + Result BucketExists(const std::string& bucket) { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + + Aws::S3::Model::HeadBucketRequest req; + req.SetBucket(ToAwsString(bucket)); + + auto outcome = client_lock.Move()->HeadBucket(req); + if (!outcome.IsSuccess()) { + if (!IsNotFound(outcome.GetError())) { + return ErrorToStatus(std::forward_as_tuple("When testing for existence of bucket '", bucket, "': "), + "HeadBucket", outcome.GetError()); + } + return false; + } + return true; + } + + // Create a bucket. Successful if bucket already exists. + arrow::Status CreateBucket(const std::string& bucket) { + // Check bucket exists first. + { + Aws::S3::Model::HeadBucketRequest req; + req.SetBucket(ToAwsString(bucket)); + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + auto outcome = client_lock.Move()->HeadBucket(req); + + if (outcome.IsSuccess()) { + return Status::OK(); + } else if (!IsNotFound(outcome.GetError())) { + return ErrorToStatus(std::forward_as_tuple("When creating bucket '", bucket, "': "), "HeadBucket", + outcome.GetError()); + } + + if (!options().allow_bucket_creation) { + return Status::IOError("Bucket '", bucket, "' not found. ", + "To create buckets, enable the allow_bucket_creation option."); + } + } + + Aws::S3::Model::CreateBucketConfiguration config; + Aws::S3::Model::CreateBucketRequest req; + auto _region = region(); + // AWS S3 treats the us-east-1 differently than other regions + // https://docs.aws.amazon.com/cli/latest/reference/s3api/create-bucket.html + if (_region != "us-east-1") { + config.SetLocationConstraint( + Aws::S3::Model::BucketLocationConstraintMapper::GetBucketLocationConstraintForName(ToAwsString(_region))); + } + req.SetBucket(ToAwsString(bucket)); + req.SetCreateBucketConfiguration(config); + + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + auto outcome = client_lock.Move()->CreateBucket(req); + if (!outcome.IsSuccess() && !IsAlreadyExists(outcome.GetError())) { + return ErrorToStatus(std::forward_as_tuple("When creating bucket '", bucket, "': "), "CreateBucket", + outcome.GetError()); + } + return Status::OK(); + } + + // Create a directory-like object with empty contents. Successful if already exists. + arrow::Status CreateEmptyDir(const std::string& bucket, std::string_view key_view) { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + + auto key = EnsureTrailingSlash(key_view); + Aws::S3::Model::PutObjectRequest req; + req.SetBucket(ToAwsString(bucket)); + req.SetKey(ToAwsString(key)); + req.SetContentType(kAwsDirectoryContentType); + req.SetBody(std::make_shared("")); + return OutcomeToStatus(std::forward_as_tuple("When creating key '", key, "' in bucket '", bucket, "': "), + "PutObject", client_lock.Move()->PutObject(req)); + } + + arrow::Status DeleteObject(const std::string& bucket, const std::string& key) { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + + Aws::S3::Model::DeleteObjectRequest req; + req.SetBucket(ToAwsString(bucket)); + req.SetKey(ToAwsString(key)); + return OutcomeToStatus(std::forward_as_tuple("When delete key '", key, "' in bucket '", bucket, "': "), + "DeleteObject", client_lock.Move()->DeleteObject(req)); + } + + arrow::Status CopyObject(const S3Path& src_path, const S3Path& dest_path) { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + + Aws::S3::Model::CopyObjectRequest req; + req.SetBucket(ToAwsString(dest_path.bucket)); + req.SetKey(ToAwsString(dest_path.key)); + // ARROW-13048: Copy source "Must be URL-encoded" according to AWS SDK docs. + // However at least in 1.8 and 1.9 the SDK URL-encodes the path for you + req.SetCopySource(src_path.ToAwsString()); + return OutcomeToStatus(std::forward_as_tuple("When copying key '", src_path.key, "' in bucket '", src_path.bucket, + "' to key '", dest_path.key, "' in bucket '", dest_path.bucket, "': "), + "CopyObject", client_lock.Move()->CopyObject(req)); + } + + // On Minio, an empty "directory" doesn't satisfy the same API requests as + // a non-empty "directory". This is a Minio-specific quirk, but we need + // to handle it for unit testing. + + // If this method is called after HEAD on "bucket/key" already returned a 404, + // can pass the given outcome to spare a spurious HEAD call. + Result IsEmptyDirectory(const std::string& bucket, + const std::string& key, + const Aws::S3::Model::HeadObjectOutcome* previous_outcome = nullptr) { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + + if (previous_outcome) { + // Fetch the backend from the previous error + DCHECK(!previous_outcome->IsSuccess()); + if (!backend_) { + SaveBackend(previous_outcome->GetError()); + DCHECK(backend_); + } + if (backend_ != S3Backend::Minio) { + // HEAD already returned a 404, nothing more to do + return false; + } + } + + // We come here in one of two situations: + // - we don't know the backend and there is no previous outcome + // - the backend is Minio + Aws::S3::Model::HeadObjectRequest req; + req.SetBucket(ToAwsString(bucket)); + if (backend_ && *backend_ == S3Backend::Minio) { + // Minio wants a slash at the end, Amazon doesn't + req.SetKey(ToAwsString(key) + kSep); + } else { + req.SetKey(ToAwsString(key)); + } + + auto outcome = client_lock.Move()->HeadObject(req); + if (outcome.IsSuccess()) { + return true; + } + if (!backend_) { + SaveBackend(outcome.GetError()); + DCHECK(backend_); + if (*backend_ == S3Backend::Minio) { + // Try again with separator-terminated key (see above) + return IsEmptyDirectory(bucket, key); + } + } + if (IsNotFound(outcome.GetError())) { + return false; + } + return ErrorToStatus( + std::forward_as_tuple("When reading information for key '", key, "' in bucket '", bucket, "': "), "HeadObject", + outcome.GetError()); + } + + Result IsEmptyDirectory(const S3Path& path, + const Aws::S3::Model::HeadObjectOutcome* previous_outcome = nullptr) { + return IsEmptyDirectory(path.bucket, path.key, previous_outcome); + } + + Result IsNonEmptyDirectory(const S3Path& path) { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + + Aws::S3::Model::ListObjectsV2Request req; + req.SetBucket(ToAwsString(path.bucket)); + req.SetPrefix(ToAwsString(path.key) + kSep); + req.SetDelimiter(Aws::String() + kSep); + req.SetMaxKeys(1); + auto outcome = client_lock.Move()->ListObjectsV2(req); + if (outcome.IsSuccess()) { + const Aws::S3::Model::ListObjectsV2Result& r = outcome.GetResult(); + // In some cases, there may be 0 keys but some prefixes + return r.GetKeyCount() > 0 || !r.GetCommonPrefixes().empty(); + } + if (IsNotFound(outcome.GetError())) { + return false; + } + return ErrorToStatus( + std::forward_as_tuple("When listing objects under key '", path.key, "' in bucket '", path.bucket, "': "), + "ListObjectsV2", outcome.GetError()); + } + + static FileInfo MakeDirectoryInfo(std::string dirname) { + FileInfo dir; + dir.set_type(FileType::Directory); + dir.set_path(std::move(dirname)); + return dir; + } + + static std::vector MakeDirectoryInfos(std::vector dirnames) { + std::vector dir_infos; + for (auto& dirname : dirnames) { + dir_infos.push_back(MakeDirectoryInfo(std::move(dirname))); + } + return dir_infos; + } + + using FileInfoSink = PushGenerator>::Producer; + + struct FileListerState { + FileInfoSink files_queue; + const bool allow_not_found; + const int max_recursion; + const bool include_implicit_dirs; + const io::IOContext io_context; + S3ClientHolder* const holder; + + Aws::S3::Model::ListObjectsV2Request req; + std::unordered_set directories; + bool empty = true; + + FileListerState(PushGenerator>::Producer files_queue, + FileSelector select, + const std::string& bucket, + const std::string& key, + bool include_implicit_dirs, + io::IOContext io_context, + S3ClientHolder* holder) + : files_queue(std::move(files_queue)), + allow_not_found(select.allow_not_found), + max_recursion(select.max_recursion), + include_implicit_dirs(include_implicit_dirs), + io_context(std::move(io_context)), + holder(holder) { + req.SetBucket(bucket); + req.SetMaxKeys(kListObjectsMaxKeys); + if (!key.empty()) { + req.SetPrefix(key + kSep); + } + if (!select.recursive) { + req.SetDelimiter(Aws::String() + kSep); + } + } + + void Finish() { + // `empty` means that we didn't get a single file info back from S3. This may be + // a situation that we should consider as PathNotFound. + // + // * If the prefix is empty then we were querying the contents of an entire bucket + // and this is not a PathNotFound case because if the bucket didn't exist then + // we would have received an error and not an empty set of results. + // + // * If the prefix is not empty then we asked for all files under a particular + // directory. S3 will also return the directory itself, if it exists. So if + // we get zero results then we know that there are no files under the directory + // and the directory itself doesn't exist. This should be considered PathNotFound + if (empty && !allow_not_found && !req.GetPrefix().empty()) { + files_queue.Push(PathNotFound(req.GetBucket(), req.GetPrefix())); + } + } + + // Given a path, iterate through all possible sub-paths and, if we haven't + // seen that sub-path before, return it. + // + // For example, given A/B/C we might return A/B and A if we have not seen + // those paths before. This allows us to consider "implicit" directories which + // don't exist as objects in S3 but can be inferred. + std::vector GetNewDirectories(const std::string_view& path) { + std::string current(path); + std::string base = req.GetBucket(); + if (!req.GetPrefix().empty()) { + base = base + kSep + std::string(RemoveTrailingSlash(req.GetPrefix())); + } + std::vector new_directories; + while (true) { + const std::string parent_dir = GetAbstractPathParent(current).first; + if (parent_dir.empty()) { + break; + } + current = parent_dir; + if (current == base) { + break; + } + if (directories.insert(parent_dir).second) { + new_directories.push_back(std::move(parent_dir)); + } + } + return new_directories; + } + }; + + struct FileListerTask : public util::AsyncTaskScheduler::Task { + std::shared_ptr state; + util::AsyncTaskScheduler* scheduler; + + FileListerTask(std::shared_ptr state, util::AsyncTaskScheduler* scheduler) + : state(std::move(state)), scheduler(scheduler) {} + + std::vector ToFileInfos(const std::string& bucket, + const std::string& prefix, + const Aws::S3::Model::ListObjectsV2Result& result) { + std::vector file_infos; + // If this is a non-recursive listing we may see "common prefixes" which represent + // directories we did not recurse into. We will add those as directories. + for (const auto& child_prefix : result.GetCommonPrefixes()) { + const auto child_key = RemoveTrailingSlash(FromAwsString(child_prefix.GetPrefix())); + std::stringstream child_path_ss; + child_path_ss << bucket << kSep << child_key; + FileInfo info; + info.set_path(child_path_ss.str()); + info.set_type(FileType::Directory); + file_infos.push_back(std::move(info)); + } + // S3 doesn't have any concept of "max depth" and so we emulate it by counting the + // number of '/' characters. E.g. if the user is searching bucket/subdirA/subdirB + // then the starting depth is 2. + // A file subdirA/subdirB/somefile will have a child depth of 2 and a "depth" of 0. + // A file subdirA/subdirB/subdirC/somefile will have a child depth of 3 and a + // "depth" of 1 + int base_depth = arrow::fs::internal::GetAbstractPathDepth(prefix); + for (const auto& obj : result.GetContents()) { + if (obj.GetKey() == prefix) { + // S3 will return the basedir itself (if it is a file / empty file). We don't + // want that. But this is still considered "finding the basedir" and so we mark + // it "not empty". + state->empty = false; + continue; + } + std::string child_key = std::string(RemoveTrailingSlash(FromAwsString(obj.GetKey()))); + bool had_trailing_slash = child_key.size() != obj.GetKey().size(); + int child_depth = arrow::fs::internal::GetAbstractPathDepth(child_key); + // Recursion depth is 1 smaller because a path with depth 1 (e.g. foo) is + // considered to have a "recursion" of 0 + int recursion_depth = child_depth - base_depth - 1; + if (recursion_depth > state->max_recursion) { + // If we have A/B/C/D and max_recursion is 2 then we ignore this (don't add it + // to file_infos) but we still want to potentially add A and A/B as directories. + // So we "pretend" like we have a file A/B/C for the call to GetNewDirectories + // below + int to_trim = recursion_depth - state->max_recursion - 1; + if (to_trim > 0) { + child_key = bucket + kSep + arrow::fs::internal::SliceAbstractPath(child_key, 0, child_depth - to_trim); + } else { + child_key = bucket + kSep + child_key; + } + } else { + // If the file isn't beyond our max recursion then count it as a file + // unless it's empty and then it depends on whether or not the file ends + // with a trailing slash + std::stringstream child_path_ss; + child_path_ss << bucket << kSep << child_key; + child_key = child_path_ss.str(); + if (obj.GetSize() > 0 || !had_trailing_slash) { + // We found a real file. + // XXX Ideally, for 0-sized files we would also check the Content-Type + // against kAwsDirectoryContentType, but ListObjectsV2 does not give + // that information. + FileInfo info; + info.set_path(child_key); + FileObjectToInfo(obj, &info); + file_infos.push_back(std::move(info)); + } else { + // We found an empty file and we want to treat it like a directory. Only + // add it if we haven't seen this directory before. + if (state->directories.insert(child_key).second) { + file_infos.push_back(MakeDirectoryInfo(child_key)); + } + } + } + + if (state->include_implicit_dirs) { + // Now that we've dealt with the file itself we need to look at each of the + // parent paths and potentially add them as directories. For example, after + // finding a file A/B/C/D we want to consider adding directories A, A/B, and + // A/B/C. + for (const auto& newdir : state->GetNewDirectories(child_key)) { + file_infos.push_back(MakeDirectoryInfo(newdir)); + } + } + } + if (file_infos.size() > 0) { + state->empty = false; + } + return file_infos; + } + + void Run() { + // We are on an I/O thread now so just synchronously make the call and interpret the + // results. + Result client_lock = state->holder->Lock(); + if (!client_lock.ok()) { + state->files_queue.Push(client_lock.status()); + return; + } + Aws::S3::Model::ListObjectsV2Outcome outcome = client_lock->Move()->ListObjectsV2(state->req); + if (!outcome.IsSuccess()) { + const auto& err = outcome.GetError(); + if (state->allow_not_found && IsNotFound(err)) { + return; + } + state->files_queue.Push( + ErrorToStatus(std::forward_as_tuple("When listing objects under key '", state->req.GetPrefix(), + "' in bucket '", state->req.GetBucket(), "': "), + "ListObjectsV2", err)); + return; + } + const Aws::S3::Model::ListObjectsV2Result& result = outcome.GetResult(); + // We could immediately schedule the continuation (if there are enough results to + // trigger paging) but that would introduce race condition complexity for arguably + // little benefit. + std::vector file_infos = ToFileInfos(state->req.GetBucket(), state->req.GetPrefix(), result); + if (file_infos.size() > 0) { + state->files_queue.Push(std::move(file_infos)); + } + + // If there are enough files to warrant a continuation then go ahead and schedule + // that now. + if (result.GetIsTruncated()) { + DCHECK(!result.GetNextContinuationToken().empty()); + state->req.SetContinuationToken(result.GetNextContinuationToken()); + scheduler->AddTask(std::make_unique(state, scheduler)); + } else { + // Otherwise, we have finished listing all the files + state->Finish(); + } + } + + Result> operator()() override { + return state->io_context.executor()->Submit([this] { + Run(); + return Status::OK(); + }); + } + std::string_view name() const override { return "S3ListFiles"; } + }; + + // Lists all file, potentially recursively, in a bucket + // + // include_implicit_dirs controls whether or not implicit directories should be + // included. These are directories that are not actually file objects but instead are + // inferred from other objects. + // + // For example, if a file exists with path A/B/C then implicit directories A/ and A/B/ + // will exist even if there are no file objects with these paths. + void ListAsync(const FileSelector& select, + const std::string& bucket, + const std::string& key, + bool include_implicit_dirs, + util::AsyncTaskScheduler* scheduler, + FileInfoSink sink) { + // We can only fetch kListObjectsMaxKeys files at a time and so we create a + // scheduler and schedule a task to grab the first batch. Once that's done we + // schedule a new task for the next batch. All of these tasks share the same + // FileListerState object but none of these tasks run in parallel so there is + // no need to worry about mutexes + auto state = std::make_shared(sink, select, bucket, key, include_implicit_dirs, io_context_, + this->holder_.get()); + + // Create the first file lister task (it may spawn more) + auto file_lister_task = std::make_unique(state, scheduler); + scheduler->AddTask(std::move(file_lister_task)); + } + + // Fully list all files from all buckets + void FullListAsync(bool include_implicit_dirs, + util::AsyncTaskScheduler* scheduler, + FileInfoSink sink, + bool recursive) { + scheduler->AddSimpleTask( + [this, scheduler, sink, include_implicit_dirs, recursive]() mutable { + return ListBucketsAsync().Then([this, scheduler, sink, include_implicit_dirs, + recursive](const std::vector& buckets) mutable { + // Return the buckets themselves as directories + std::vector buckets_as_directories = MakeDirectoryInfos(buckets); + sink.Push(std::move(buckets_as_directories)); + + if (recursive) { + // Recursively list each bucket (these will run in parallel but sink + // should be thread safe and so this is ok) + for (const auto& bucket : buckets) { + FileSelector select; + select.allow_not_found = true; + select.recursive = true; + select.base_dir = bucket; + ListAsync(select, bucket, "", include_implicit_dirs, scheduler, sink); + } + } + }); + }, + std::string_view("FullListBucketScan")); + } + + // Delete multiple objects at once + Future<> DeleteObjectsAsync(const std::string& bucket, const std::vector& keys) { + struct DeleteCallback { + std::string bucket; + + arrow::Status operator()(const Aws::S3::Model::DeleteObjectsOutcome& outcome) const { + if (!outcome.IsSuccess()) { + return ErrorToStatus("DeleteObjects", outcome.GetError()); + } + // Also need to check per-key errors, even on successful outcome + // See + // https://docs.aws.amazon.com/fr_fr/AmazonS3/latest/API/multiobjectdeleteapi.html + const auto& errors = outcome.GetResult().GetErrors(); + if (!errors.empty()) { + std::stringstream ss; + ss << "Got the following " << errors.size() << " errors when deleting objects in S3 bucket '" << bucket + << "':\n"; + for (const auto& error : errors) { + ss << "- key '" << error.GetKey() << "': " << error.GetMessage() << "\n"; + } + return Status::IOError(ss.str()); + } + return Status::OK(); + } + }; + + const auto chunk_size = static_cast(kMultipleDeleteMaxKeys); + const DeleteCallback delete_cb{bucket}; + + std::vector> futures; + futures.reserve(bit_util::CeilDiv(keys.size(), chunk_size)); + + for (size_t start = 0; start < keys.size(); start += chunk_size) { + Aws::S3::Model::DeleteObjectsRequest req; + Aws::S3::Model::Delete del; + size_t remaining = keys.size() - start; + size_t next_chunk_size = std::min(remaining, chunk_size); + for (size_t i = start; i < start + next_chunk_size; ++i) { + del.AddObjects(Aws::S3::Model::ObjectIdentifier().WithKey(ToAwsString(keys[i]))); + } + req.SetBucket(ToAwsString(bucket)); + req.SetDelete(std::move(del)); + ARROW_ASSIGN_OR_RAISE( + auto fut, SubmitIO(io_context_, [holder = holder_, req = std::move(req), delete_cb]() -> arrow::Status { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder->Lock()); + return delete_cb(client_lock.Move()->DeleteObjects(req)); + })); + futures.push_back(std::move(fut)); + } + + return AllFinished(futures); + } + + arrow::Status DeleteObjects(const std::string& bucket, const std::vector& keys) { + return DeleteObjectsAsync(bucket, keys).status(); + } + + // Check to make sure the given path is not a file + // + // Returns true if the path seems to be a directory, false if it is a file + Future EnsureIsDirAsync(const std::string& bucket, const std::string& key) { + if (key.empty()) { + // There is no way for a bucket to be a file + return Future::MakeFinished(true); + } + auto self = shared_from_this(); + return DeferNotOk(SubmitIO(io_context_, [self, bucket, key]() mutable -> Result { + Aws::S3::Model::HeadObjectRequest req; + req.SetBucket(ToAwsString(bucket)); + req.SetKey(ToAwsString(key)); + + ARROW_ASSIGN_OR_RAISE(auto client_lock, self->holder_->Lock()); + auto outcome = client_lock.Move()->HeadObject(req); + if (outcome.IsSuccess()) { + return IsDirectory(key, outcome.GetResult()); + } + if (IsNotFound(outcome.GetError())) { + // If we can't find it then it isn't a file. + return true; + } else { + return ErrorToStatus( + std::forward_as_tuple("When getting information for key '", key, "' in bucket '", bucket, "': "), + "HeadObject", outcome.GetError()); + } + })); + } + + // Some operations require running multiple S3 calls, either in parallel or serially. We + // need to ensure that the S3 filesystem instance stays valid and that S3 isn't + // finalized. We do this by wrapping all the tasks in a scheduler which keeps the + // resources alive + Future<> RunInScheduler(std::function callable) { + auto self = shared_from_this(); + FnOnce initial_task = [callable = std::move(callable), + this](util::AsyncTaskScheduler* scheduler) mutable { + return callable(scheduler, this); + }; + Future<> scheduler_fut = util::AsyncTaskScheduler::Make( + std::move(initial_task), + /*abort_callback=*/ + [](const Status& st) { + // No need for special abort logic. + }, + io_context_.stop_token()); + // Keep self alive until all tasks finish + return scheduler_fut.Then([self]() { return Status::OK(); }); + } + + Future<> DoDeleteDirContentsAsync(const std::string& bucket, const std::string& key) { + return RunInScheduler([bucket, key](util::AsyncTaskScheduler* scheduler, MultiPartUploadS3FS::Impl* self) { + scheduler->AddSimpleTask( + [=] { + FileSelector select; + select.base_dir = bucket + kSep + key; + select.recursive = true; + select.allow_not_found = false; + + FileInfoGenerator file_infos = self->GetFileInfoGenerator(select); + + auto handle_file_infos = [=](const std::vector& file_infos) { + std::vector file_paths; + for (const auto& file_info : file_infos) { + DCHECK_GT(file_info.path().size(), bucket.size()); + auto file_path = file_info.path().substr(bucket.size() + 1); + if (file_info.IsDirectory()) { + // The selector returns FileInfo objects for directories with a + // a path that never ends in a trailing slash, but for AWS the file + // needs to have a trailing slash to recognize it as directory + // (https://github.com/apache/arrow/issues/38618) + DCHECK_OK(arrow::fs::internal::AssertNoTrailingSlash(file_path)); + file_path = file_path + kSep; + } + file_paths.push_back(std::move(file_path)); + } + scheduler->AddSimpleTask( + [=, file_paths = std::move(file_paths)] { return self->DeleteObjectsAsync(bucket, file_paths); }, + std::string_view("DeleteDirContentsDeleteTask")); + return Status::OK(); + }; + + return VisitAsyncGenerator(AsyncGenerator>(std::move(file_infos)), + std::move(handle_file_infos)); + }, + std::string_view("ListFilesForDelete")); + return Status::OK(); + }); + } + + Future<> DeleteDirContentsAsync(const std::string& bucket, const std::string& key) { + auto self = shared_from_this(); + return EnsureIsDirAsync(bucket, key).Then([self, bucket, key](bool is_dir) -> Future<> { + if (!is_dir) { + return Status::IOError("Cannot delete directory contents at ", bucket, kSep, key, " because it is a file"); + } + return self->DoDeleteDirContentsAsync(bucket, key); + }); + } + + FileInfoGenerator GetFileInfoGenerator(const FileSelector& select) { + auto maybe_base_path = S3Path::FromString(select.base_dir); + if (!maybe_base_path.ok()) { + return MakeFailingGenerator(maybe_base_path.status()); + } + auto base_path = *std::move(maybe_base_path); + + PushGenerator> generator; + Future<> scheduler_fut = RunInScheduler([select, base_path, sink = generator.producer()]( + util::AsyncTaskScheduler* scheduler, MultiPartUploadS3FS::Impl* self) { + if (base_path.empty()) { + bool should_recurse = select.recursive && select.max_recursion > 0; + self->FullListAsync(/*include_implicit_dirs=*/true, scheduler, sink, should_recurse); + } else { + self->ListAsync(select, base_path.bucket, base_path.key, + /*include_implicit_dirs=*/true, scheduler, sink); + } + return Status::OK(); + }); + + // Mark the generator done once all tasks are finished + scheduler_fut.AddCallback([sink = generator.producer()](const Status& st) mutable { + if (!st.ok()) { + sink.Push(st); + } + sink.Close(); + }); + + return generator; + } + + arrow::Status EnsureDirectoryExists(const S3Path& path) { + if (!path.key.empty()) { + return CreateEmptyDir(path.bucket, path.key); + } + return Status::OK(); + } + + arrow::Status EnsureParentExists(const S3Path& path) { + if (path.has_parent()) { + return EnsureDirectoryExists(path.parent()); + } + return Status::OK(); + } + + static Result> ProcessListBuckets(const Aws::S3::Model::ListBucketsOutcome& outcome) { + if (!outcome.IsSuccess()) { + return ErrorToStatus(std::forward_as_tuple("When listing buckets: "), "ListBuckets", outcome.GetError()); + } + std::vector buckets; + buckets.reserve(outcome.GetResult().GetBuckets().size()); + for (const auto& bucket : outcome.GetResult().GetBuckets()) { + buckets.emplace_back(FromAwsString(bucket.GetName())); + } + return buckets; + } + + Result> ListBuckets() { + ARROW_ASSIGN_OR_RAISE(auto client_lock, holder_->Lock()); + return ProcessListBuckets(client_lock.Move()->ListBuckets()); + } + + Future> ListBucketsAsync() { + auto deferred = [self = shared_from_this()]() mutable -> Result> { + ARROW_ASSIGN_OR_RAISE(auto client_lock, self->holder_->Lock()); + return self->ProcessListBuckets(client_lock.Move()->ListBuckets()); + }; + return DeferNotOk(SubmitIO(io_context_, std::move(deferred))); + } + + Result> OpenInputFile(const std::string& s, S3FileSystem* fs) { + ARROW_RETURN_NOT_OK(arrow::fs::internal::AssertNoTrailingSlash(s)); + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); + RETURN_NOT_OK(ValidateFilePath(path)); + + RETURN_NOT_OK(CheckS3Initialized()); + + auto ptr = std::make_shared(holder_, fs->io_context(), path); + RETURN_NOT_OK(ptr->Init()); + return ptr; + } + + Result> OpenInputFile(const FileInfo& info, S3FileSystem* fs) { + ARROW_RETURN_NOT_OK(arrow::fs::internal::AssertNoTrailingSlash(info.path())); + if (info.type() == FileType::NotFound) { + return ::arrow::fs::internal::PathNotFound(info.path()); + } + if (info.type() != FileType::File && info.type() != FileType::Unknown) { + return ::arrow::fs::internal::NotAFile(info.path()); + } + + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(info.path())); + RETURN_NOT_OK(ValidateFilePath(path)); + + RETURN_NOT_OK(CheckS3Initialized()); + + auto ptr = std::make_shared(holder_, fs->io_context(), path, info.size()); + RETURN_NOT_OK(ptr->Init()); + return ptr; + } +}; + +arrow::Result> MultiPartUploadS3FS::OpenOutputStream( + const std::string& s, const std::shared_ptr& metadata) { + RETURN_NOT_OK(arrow::fs::internal::AssertNoTrailingSlash(s)); + ARROW_ASSIGN_OR_RAISE(auto path, S3Path::FromString(s)); + RETURN_NOT_OK(ValidateFilePath(path)); + + RETURN_NOT_OK(CheckS3Initialized()); + + auto ptr = + std::make_shared(impl_->holder_, io_context(), path, impl_->options(), metadata, part_size_); + RETURN_NOT_OK(ptr->Init()); + return ptr; +}; + +} // namespace milvus_storage \ No newline at end of file diff --git a/cpp/src/filesystem/s3/util_internal.cpp b/cpp/src/filesystem/s3/util_internal.cpp new file mode 100644 index 0000000..a86461b --- /dev/null +++ b/cpp/src/filesystem/s3/util_internal.cpp @@ -0,0 +1,235 @@ + +#include "filesystem/s3/util_internal.h" + +#include +#include + +#include "arrow/buffer.h" +#include "arrow/filesystem/path_util.h" +#include "arrow/result.h" +#include "arrow/status.h" +#include "arrow/util/io_util.h" +#include "arrow/util/string.h" + +namespace arrow { + +using internal::StatusDetailFromErrno; +using util::Uri; + +namespace fs { +namespace internal { + +TimePoint CurrentTimePoint() { + auto now = std::chrono::system_clock::now(); + return TimePoint(std::chrono::duration_cast(now.time_since_epoch())); +} + +Status CopyStream(const std::shared_ptr& src, + const std::shared_ptr& dest, + int64_t chunk_size, + const io::IOContext& io_context) { + ARROW_ASSIGN_OR_RAISE(auto chunk, AllocateBuffer(chunk_size, io_context.pool())); + + while (true) { + ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, src->Read(chunk_size, chunk->mutable_data())); + if (bytes_read == 0) { + // EOF + break; + } + RETURN_NOT_OK(dest->Write(chunk->data(), bytes_read)); + } + + return Status::OK(); +} + +Status PathNotFound(std::string_view path) { + return Status::IOError("Path does not exist '", path, "'").WithDetail(arrow::internal::StatusDetailFromErrno(ENOENT)); +} + +Status IsADir(std::string_view path) { + return Status::IOError("Is a directory: '", path, "'").WithDetail(StatusDetailFromErrno(EISDIR)); +} + +Status NotADir(std::string_view path) { + return Status::IOError("Not a directory: '", path, "'").WithDetail(StatusDetailFromErrno(ENOTDIR)); +} + +Status NotEmpty(std::string_view path) { + return Status::IOError("Directory not empty: '", path, "'").WithDetail(StatusDetailFromErrno(ENOTEMPTY)); +} + +Status NotAFile(std::string_view path) { return Status::IOError("Not a regular file: '", path, "'"); } + +Status InvalidDeleteDirContents(std::string_view path) { + return Status::Invalid("DeleteDirContents called on invalid path '", path, "'. ", + "If you wish to delete the root directory's contents, call DeleteRootDirContents."); +} + +Result ParseFileSystemUri(const std::string& uri_string) { + Uri uri; + auto status = uri.Parse(uri_string); + if (!status.ok()) { +#ifdef _WIN32 + // Could be a "file:..." URI with backslashes instead of regular slashes. + RETURN_NOT_OK(uri.Parse(ToSlashes(uri_string))); + if (uri.scheme() != "file") { + return status; + } +#else + return status; +#endif + } + return uri; +} + +#ifdef _WIN32 +static bool IsDriveLetter(char c) { + // Can't use locale-dependent functions from the C/C++ stdlib + return (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z'); +} +#endif + +bool DetectAbsolutePath(const std::string& s) { + // Is it a /-prefixed local path? + if (s.length() >= 1 && s[0] == '/') { + return true; + } +#ifdef _WIN32 + // Is it a \-prefixed local path? + if (s.length() >= 1 && s[0] == '\\') { + return true; + } + // Does it start with a drive letter in addition to being /- or \-prefixed, + // e.g. "C:\..."? + if (s.length() >= 3 && s[1] == ':' && (s[2] == '/' || s[2] == '\\') && IsDriveLetter(s[0])) { + return true; + } +#endif + return false; +} + +Result PathFromUriHelper(const std::string& uri_string, + std::vector supported_schemes, + bool accept_local_paths, + AuthorityHandlingBehavior authority_handling) { + if (internal::DetectAbsolutePath(uri_string)) { + if (accept_local_paths) { + // Normalize the path and remove any trailing slash + return std::string(internal::RemoveTrailingSlash(ToSlashes(uri_string), /*preserve_root=*/true)); + } + return Status::Invalid( + "The filesystem is not capable of loading local paths. Expected a URI but " + "received ", + uri_string); + } + Uri uri; + ARROW_RETURN_NOT_OK(uri.Parse(uri_string)); + const auto scheme = uri.scheme(); + if (std::find(supported_schemes.begin(), supported_schemes.end(), scheme) == supported_schemes.end()) { + std::string expected_schemes = ::arrow::internal::JoinStrings(supported_schemes, ", "); + return Status::Invalid("The filesystem expected a URI with one of the schemes (", expected_schemes, + ") but received ", uri_string); + } + std::string host = uri.host(); + std::string path = uri.path(); + if (host.empty()) { + // Just a path, may be absolute or relative, only allow relative paths if local + if (path[0] == '/') { + return std::string(internal::RemoveTrailingSlash(path)); + } + if (accept_local_paths) { + return std::string(internal::RemoveTrailingSlash(path)); + } + return Status::Invalid("The filesystem does not support relative paths. Received ", uri_string); + } + if (authority_handling == AuthorityHandlingBehavior::kDisallow) { + return Status::Invalid( + "The filesystem does not support the authority (host) component of a URI. " + "Received ", + uri_string); + } + if (path[0] != '/') { + // This should not be possible + return Status::Invalid( + "The provided URI has a host component but a relative path which is not " + "supported. " + "Received ", + uri_string); + } + switch (authority_handling) { + case AuthorityHandlingBehavior::kPrepend: + return std::string(internal::RemoveTrailingSlash(host + path)); + case AuthorityHandlingBehavior::kWindows: + return std::string(internal::RemoveTrailingSlash("//" + host + path)); + case AuthorityHandlingBehavior::kIgnore: + return std::string(internal::RemoveTrailingSlash(path, /*preserve_root=*/true)); + default: + return Status::Invalid("Unrecognized authority_handling value"); + } +} + +Result GlobFiles(const std::shared_ptr& filesystem, const std::string& glob) { + // TODO: ARROW-17640 + // The candidate entries at the current depth level. + // We start with the filesystem root. + FileInfoVector results{FileInfo("", FileType::Directory)}; + // The exact tail that will later require matching with candidate entries + std::string current_tail; + auto is_leading_slash = HasLeadingSlash(glob); + auto split_glob = SplitAbstractPath(glob, '/'); + + // Process one depth level at once, from root to leaf + for (const auto& glob_component : split_glob) { + if (glob_component.find_first_of("*?") == std::string::npos) { + // If there are no wildcards at the current level, just append + // the exact glob path component. + current_tail = ConcatAbstractPath(current_tail, glob_component); + continue; + } else { + FileInfoVector children; + for (const auto& res : results) { + if (res.type() != FileType::Directory) { + continue; + } + FileSelector selector; + selector.base_dir = current_tail.empty() ? res.path() : ConcatAbstractPath(res.path(), current_tail); + if (is_leading_slash) { + selector.base_dir = EnsureLeadingSlash(selector.base_dir); + } + ARROW_ASSIGN_OR_RAISE(auto entries, filesystem->GetFileInfo(selector)); + Globber globber(ConcatAbstractPath(selector.base_dir, glob_component)); + for (auto&& entry : entries) { + if (globber.Matches(entry.path())) { + children.push_back(std::move(entry)); + } + } + } + results = std::move(children); + current_tail.clear(); + } + } + + if (!current_tail.empty()) { + std::vector paths; + paths.reserve(results.size()); + for (const auto& file : results) { + paths.push_back(ConcatAbstractPath(file.path(), current_tail)); + } + ARROW_ASSIGN_OR_RAISE(results, filesystem->GetFileInfo(paths)); + } + + std::vector out; + for (auto&& file : results) { + if (file.type() != FileType::NotFound) { + out.push_back(std::move(file)); + } + } + + return out; +} + +FileSystemGlobalOptions global_options; + +} // namespace internal +} // namespace fs +} // namespace arrow