Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 multipart upload #110

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,18 @@ Map<String, ?> env = ImmutableMap.<String, Object> builder()
FileSystems.newFileSystem("s3:///", env, Thread.currentThread().getContextClassLoader());
```

##### Uploading Objects Using Multipart Upload API

By default s3fs will upload an object to s3 by calling `AmazonS3Client#putObject()`.
If you whant to upload file using the Multipart Upload API :

```java
Map<String, ?> env = ImmutableMap.<String, Object> builder()
.put(com.upplication.s3fs.AmazonS3Factory.MULTIPART_UPLOAD_ENABLED, "true")
.build()
FileSystems.newFileSystem("s3:///", env, Thread.currentThread().getContextClassLoader());
```

Complete settings lists:

* s3fs_access_key
Expand Down
8 changes: 7 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<groupId>com.upplication</groupId>
<artifactId>s3fs</artifactId>
<packaging>jar</packaging>
<version>2.2.2</version>
<version>2.3.0-SNAPSHOT</version>
<name>s3fs</name>
<description>S3 filesystem provider for Java 7</description>
<url>https://github.com/Upplication/Amazon-S3-FileSystem-NIO2</url>
Expand Down Expand Up @@ -62,6 +62,7 @@
<com.google.guava.guava.version>18.0</com.google.guava.guava.version>
<org.apache.tika.tika-core.version>1.5</org.apache.tika.tika-core.version>
<com.google.code.findbugs.jsr305.version>1.3.9</com.google.code.findbugs.jsr305.version>
<s3.stream.upload.version>1.0.1</s3.stream.upload.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -101,6 +102,11 @@
<artifactId>jsr305</artifactId>
<version>${com.google.code.findbugs.jsr305.version}</version>
</dependency>
<dependency>
<groupId>com.github.alexmojaki</groupId>
<artifactId>s3-stream-upload</artifactId>
<version>${s3.stream.upload.version}</version>
</dependency>

<!-- for testing -->
<dependency>
Expand Down
1 change: 0 additions & 1 deletion src/main/java/com/upplication/s3fs/S3FileChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.util.IOUtils;
import org.apache.tika.Tika;

import java.io.*;
Expand Down
9 changes: 8 additions & 1 deletion src/main/java/com/upplication/s3fs/S3FileSystem.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import com.amazonaws.services.s3.model.Bucket;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.util.Properties;

/**
* S3FileSystem with a concrete client configured and ready to use.
Expand All @@ -24,16 +25,18 @@
public class S3FileSystem extends FileSystem implements Comparable<S3FileSystem> {

private final S3FileSystemProvider provider;
private final Properties properties;
private final String key;
private final AmazonS3 client;
private final String endpoint;
private int cache;

public S3FileSystem(S3FileSystemProvider provider, String key, AmazonS3 client, String endpoint) {
public S3FileSystem(S3FileSystemProvider provider, String key, AmazonS3 client, String endpoint, Properties props) {
this.provider = provider;
this.key = key;
this.client = client;
this.endpoint = endpoint;
this.properties = props;
this.cache = 60000; // 1 minute cache for the s3Path
}

Expand Down Expand Up @@ -176,4 +179,8 @@ public int compareTo(S3FileSystem o) {
public int getCache() {
return cache;
}

public Properties getProperties() {
return properties;
}
}
121 changes: 115 additions & 6 deletions src/main/java/com/upplication/s3fs/S3FileSystemProvider.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.amazonaws.services.s3.model.Bucket;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectId;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
Expand All @@ -31,9 +32,13 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static com.google.common.collect.Sets.difference;
import static com.upplication.s3fs.AmazonS3Factory.*;
import static java.lang.String.format;
import static java.lang.String.format;

/**
* Spec:
Expand Down Expand Up @@ -65,13 +70,21 @@
*/
public class S3FileSystemProvider extends FileSystemProvider {

private static final Logger LOGGER = LoggerFactory.getLogger(S3FileSystemProvider.class);
public static final String CHARSET_KEY = "s3fs_charset";
public static final String AMAZON_S3_FACTORY_CLASS = "s3fs_amazon_s3_factory";
public static final String MULTIPART_UPLOAD_ENABLED = "s3fs_multipart_upload_enabled";
public static final String MULTIPART_UPLOAD_PART_SIZE = "s3fs_multipart_upload_part_size";
public static final String MULTIPART_UPLOAD_NUM_STREAMS = "s3fs_multipart_upload_num_streams";
public static final String MULTIPART_UPLOAD_QUEUE_CAPACITY = "s3fs_multipart_upload_queue_capacity";
public static final String MULTIPART_UPLOAD_NUM_UPLOAD_THREADS = "s3fs_multipart_upload_num_upload_threads";

private static final ConcurrentMap<String, S3FileSystem> fileSystems = new ConcurrentHashMap<>();
private static final List<String> PROPS_TO_OVERLOAD = Arrays.asList(ACCESS_KEY, SECRET_KEY, REQUEST_METRIC_COLLECTOR_CLASS, CONNECTION_TIMEOUT, MAX_CONNECTIONS, MAX_ERROR_RETRY, PROTOCOL, PROXY_DOMAIN,
PROXY_HOST, PROXY_PASSWORD, PROXY_PORT, PROXY_USERNAME, PROXY_WORKSTATION, SOCKET_SEND_BUFFER_SIZE_HINT, SOCKET_RECEIVE_BUFFER_SIZE_HINT, SOCKET_TIMEOUT,
USER_AGENT, AMAZON_S3_FACTORY_CLASS, SIGNER_OVERRIDE, PATH_STYLE_ACCESS);
USER_AGENT, AMAZON_S3_FACTORY_CLASS, SIGNER_OVERRIDE, PATH_STYLE_ACCESS,
MULTIPART_UPLOAD_ENABLED, MULTIPART_UPLOAD_PART_SIZE, MULTIPART_UPLOAD_NUM_STREAMS,
MULTIPART_UPLOAD_QUEUE_CAPACITY, MULTIPART_UPLOAD_NUM_UPLOAD_THREADS);

private S3Utils s3Utils = new S3Utils();
private Cache cache = new Cache();
Expand All @@ -95,6 +108,9 @@ public FileSystem newFileSystem(URI uri, Map<String, ?> env) {
// create the filesystem with the final properties, store and return
S3FileSystem fileSystem = createFileSystem(uri, props);
fileSystems.put(fileSystem.getKey(), fileSystem);

LOGGER.debug("New file system created. url:{}, props:{}", uri, props);

return fileSystem;
}

Expand Down Expand Up @@ -302,6 +318,8 @@ public Path getPath(URI uri) {

@Override
public DirectoryStream<Path> newDirectoryStream(Path dir, DirectoryStream.Filter<? super Path> filter) throws IOException {
LOGGER.debug("New directory stream. path:{}, filter:{}", dir, filter);

final S3Path s3Path = toS3Path(dir);
return new DirectoryStream<Path>() {
@Override
Expand All @@ -316,8 +334,47 @@ public Iterator<Path> iterator() {
};
}

private S3MultipartUploadOutputStream createMultipartUploadOutputStream(final S3Path s3Path, Set<? extends OpenOption> opts) throws IOException {
final S3ObjectId objectId = s3Path.toS3ObjectId();
final Set<OpenOption> options = Sets.newHashSet(opts);
final S3FileSystem fileSystem = s3Path.getFileSystem();
final Properties properties = fileSystem.getProperties();
final AmazonS3 client = s3Path.getFileSystem().getClient();
final boolean createOpt = options.remove(StandardOpenOption.CREATE);
final boolean createNewOpt = options.remove(StandardOpenOption.CREATE_NEW);
final S3MultipartUploadOutputStream stream = new S3MultipartUploadOutputStream(client, objectId, properties);

// validate options
if (options.isEmpty()) {
return stream;
}

// Remove irrelevant/ignored options
options.remove(StandardOpenOption.WRITE);
options.remove(StandardOpenOption.SPARSE);
options.remove(StandardOpenOption.TRUNCATE_EXISTING);

if (!options.isEmpty()) {
throw new UnsupportedOperationException(format("Unsupported operation: %s", options));
}

if (createNewOpt && fileSystem.provider().exists(s3Path)) {
fileSystem.provider().delete(s3Path);
}

if (!createOpt && fileSystem.provider().exists(s3Path)) {
throw new FileAlreadyExistsException(format("Target already exists: %s", s3Path));
}

return stream;
}

@Override
public InputStream newInputStream(Path path, OpenOption... options) throws IOException {
LOGGER.debug("New input stream. path:{}, options:{}", path, options);

System.out.println("newInputStream");

S3Path s3Path = toS3Path(path);
String key = s3Path.getKey();

Expand All @@ -342,14 +399,46 @@ public InputStream newInputStream(Path path, OpenOption... options) throws IOExc

@Override
public SeekableByteChannel newByteChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException {
S3Path s3Path = toS3Path(path);
return new S3SeekableByteChannel(s3Path, options);
LOGGER.debug("New byte channel. path:{}, options:{}", path, options);

final S3Path s3Path = toS3Path(path);
final boolean multipartEnabled = isMultipartUploadCapable(s3Path, options);

if (!multipartEnabled) {

LOGGER.debug("Using S3SeekableByteChannel");

return new S3SeekableByteChannel(s3Path, options);
}

LOGGER.debug("Using S3MultipartFileChannel");

final S3MultipartUploadOutputStream outputStream = createMultipartUploadOutputStream(s3Path, options);
final FileChannel channel = new S3MultipartUploadChannel(outputStream);

return channel;
}

@Override
public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options, FileAttribute<?>... attrs) throws IOException {
S3Path s3Path = toS3Path(path);
return new S3FileChannel(s3Path, options);
LOGGER.debug("New file channel. path:{}, filter:{}", path, options);

final S3Path s3Path = toS3Path(path);
final boolean multipartEnabled = isMultipartUploadCapable(s3Path, options);

if (!multipartEnabled) {

LOGGER.debug("Using S3FileChannel");

return new S3FileChannel(s3Path, options);
}

LOGGER.debug("Using S3MultipartFileChannel");

final S3MultipartUploadOutputStream outputStream = createMultipartUploadOutputStream(s3Path, options);
final FileChannel channel = new S3MultipartUploadChannel(outputStream);

return channel;
}

/**
Expand All @@ -359,6 +448,8 @@ public FileChannel newFileChannel(Path path, Set<? extends OpenOption> options,
*/
@Override
public void createDirectory(Path dir, FileAttribute<?>... attrs) throws IOException {
LOGGER.debug("Create directory. path:{}, attrs:{}", dir, attrs);

S3Path s3Path = toS3Path(dir);
Preconditions.checkArgument(attrs.length == 0, "attrs not yet supported: %s", ImmutableList.copyOf(attrs)); // TODO
if (exists(s3Path))
Expand All @@ -378,6 +469,8 @@ public void createDirectory(Path dir, FileAttribute<?>... attrs) throws IOExcept

@Override
public void delete(Path path) throws IOException {
LOGGER.debug("Delete path:{}", path);

S3Path s3Path = toS3Path(path);
if (Files.notExists(s3Path))
throw new NoSuchFileException("the path: " + this + " not exists");
Expand All @@ -393,6 +486,8 @@ public void delete(Path path) throws IOException {

@Override
public void copy(Path source, Path target, CopyOption... options) throws IOException {
LOGGER.debug("Copy {} to target. options:{}", source, target, options);

if (isSameFile(source, target))
return;

Expand Down Expand Up @@ -424,6 +519,8 @@ public void copy(Path source, Path target, CopyOption... options) throws IOExcep

@Override
public void move(Path source, Path target, CopyOption... options) throws IOException {
LOGGER.debug("Move {} to target. options:{}", source, target, options);

if (options != null && Arrays.asList(options).contains(StandardCopyOption.ATOMIC_MOVE))
throw new AtomicMoveNotSupportedException(source.toString(), target.toString(), "Atomic not supported");
copy(source, target, options);
Expand Down Expand Up @@ -550,7 +647,7 @@ public void setAttribute(Path path, String attribute, Object value, LinkOption..
* @return S3FileSystem never null
*/
public S3FileSystem createFileSystem(URI uri, Properties props) {
return new S3FileSystem(this, getFileSystemKey(uri, props), getAmazonS3(uri, props), uri.getHost());
return new S3FileSystem(this, getFileSystemKey(uri, props), getAmazonS3(uri, props), uri.getHost(), props);
}

protected AmazonS3 getAmazonS3(URI uri, Properties props) {
Expand Down Expand Up @@ -634,4 +731,16 @@ public Cache getCache() {
public void setCache(Cache cache) {
this.cache = cache;
}

private boolean isMultipartUploadCapable(final S3Path s3Path, final Set<? extends OpenOption> options) {
// Not supported options
if (options.contains(StandardOpenOption.READ) || options.contains(StandardOpenOption.APPEND)) {
return false;
}

final S3FileSystem fileSystem = s3Path.getFileSystem();
final Properties properties = fileSystem.getProperties();

return Boolean.parseBoolean(properties.getProperty(MULTIPART_UPLOAD_ENABLED, "false"));
}
}
Loading