Skip to content

Commit

Permalink
Merge pull request #2 from connexta/STOR-1
Browse files Browse the repository at this point in the history
STOR-1 Fix S3 Provider to read files directly from S3
  • Loading branch information
roelens8 authored Aug 27, 2019
2 parents 37adb9b + 90eb6e4 commit 302d9b7
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 192 deletions.
11 changes: 8 additions & 3 deletions s3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@
<artifactId>ddf-security-common</artifactId>
<version>${ddf.version}</version>
</dependency>
<dependency>
<groupId>ddf.security.encryption</groupId>
<artifactId>security-encryption-api</artifactId>
<version>${ddf.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
Expand Down Expand Up @@ -108,17 +113,17 @@
<limit>
<counter>INSTRUCTION</counter>
<value>COVEREDRATIO</value>
<minimum>0.82</minimum>
<minimum>0.84</minimum>
</limit>
<limit>
<counter>BRANCH</counter>
<value>COVEREDRATIO</value>
<minimum>0.70</minimum>
<minimum>0.65</minimum>
</limit>
<limit>
<counter>COMPLEXITY</counter>
<value>COVEREDRATIO</value>
<minimum>0.72</minimum>
<minimum>0.63</minimum>
</limit>
</limits>
</rule>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.connexta.ddf.catalog.content.impl;

import com.amazonaws.services.s3.model.S3Object;
import com.google.common.io.ByteSource;
import java.io.IOException;
import java.io.InputStream;

public class S3ObjectByteSource extends ByteSource {

private S3Object s3Object;

public S3ObjectByteSource(S3Object s3Object) {
this.s3Object = s3Object;
}

@Override
public InputStream openStream() throws IOException {
return s3Object.getObjectContent();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@
import ddf.catalog.content.operation.impl.UpdateStorageResponseImpl;
import ddf.catalog.data.Metacard;
import ddf.catalog.data.impl.AttributeImpl;
import ddf.mime.MimeTypeMapper;
import ddf.mime.MimeTypeResolutionException;
import ddf.security.encryption.EncryptionService;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
Expand All @@ -59,24 +58,35 @@
import java.util.stream.Collectors;
import javax.ws.rs.core.MediaType;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.codice.ddf.configuration.PropertyResolver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This bundle depends on the camel-aws feature being started within DDF. S3 Content Storage
* Provider.
*/
/** S3 Content Storage Provider. */
public class S3StorageProvider implements StorageProvider {

private static final String DEFAULT_MIME_TYPE = MediaType.APPLICATION_OCTET_STREAM;

private static final Logger LOGGER = LoggerFactory.getLogger(S3StorageProvider.class);

/** Mapper for file extensions-to-mime types (and vice versa) */
private MimeTypeMapper mimeTypeMapper;
private static final String S3_ENDPOINT_NAME = "s3Endpoint";

private static final String S3_REGION_NAME = "s3Region";

private static final String S3_ACCESS_KEY_NAME = "s3AccessKey";

private static final String S3_SECRET_KEY_NAME = "s3SecretKey";

private static final String S3_BUCKET_NAME = "s3Bucket";

private static final String CONTENT_PREFIX_NAME = "contentPrefix";

private static final String AWS_KMS_KEY_ID_NAME = "awsKmsKeyId";

private static final String USE_SSE_S3_ENCRYPTION_NAME = "useSseS3Encryption";

private final EncryptionService encryptionService;

private String s3Endpoint;

Expand All @@ -100,10 +110,10 @@ public class S3StorageProvider implements StorageProvider {

AmazonS3 amazonS3;

public S3StorageProvider(final MimeTypeMapper mimeTypeMapper) {
public S3StorageProvider(final EncryptionService encryptionService) {
LOGGER.info("S3 Content Storage Provider initializing...");

this.mimeTypeMapper = mimeTypeMapper;
this.encryptionService = encryptionService;
}

@Override
Expand All @@ -114,6 +124,7 @@ public CreateStorageResponse create(CreateStorageRequest createRequest) throws S
List<ContentItem> createdContentItems = new ArrayList<>(createRequest.getContentItems().size());
for (ContentItem contentItem : contentItems) {
try {
LOGGER.debug("Processing content item {}", contentItem.getFilename());
if (!ContentItemValidator.validate(contentItem)) {
LOGGER.warn("Item is not valid: {}", contentItem);
continue;
Expand Down Expand Up @@ -267,6 +278,7 @@ private void commitUpdates(StorageRequest request) throws StorageException {
String objectPath = fullContentPrefix + item.getFilename();
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(item.getSize());
metadata.setContentType(item.getMimeType().toString());
for (S3ObjectSummary object :
amazonS3.listObjectsV2(s3Bucket, fullContentPrefix).getObjectSummaries()) {
amazonS3.deleteObject(s3Bucket, object.getKey());
Expand Down Expand Up @@ -312,14 +324,11 @@ private ContentItem readContent(URI uri) throws StorageException {
throw new StorageException(
"Could not get valid content key for resource URI: " + uri.toString());
}
String filename = FilenameUtils.getName(contentKey);
String extension = FilenameUtils.getExtension(filename);

String mimeType = DEFAULT_MIME_TYPE;
String filename = FilenameUtils.getName(contentKey);
ByteSource byteSource;
long size = 0;
ByteSource byteSource = null;

S3Object s3Object = null;
S3Object s3Object;
try {
s3Object = amazonS3.getObject(s3Bucket, contentKey);
if (s3Object == null) {
Expand All @@ -329,26 +338,27 @@ private ContentItem readContent(URI uri) throws StorageException {
throw new StorageException(
"Could not get object from S3 for content key: " + contentKey + ".");
}
InputStream fileInputStream = s3Object.getObjectContent();
byte[] byteArray = IOUtils.toByteArray(fileInputStream);
size = byteArray.length;
byteSource = ByteSource.wrap(byteArray);
mimeType = mimeTypeMapper.guessMimeType(fileInputStream, extension);
} catch (MimeTypeResolutionException e) {
LOGGER.debug(
"Could not determine mime type for file extension = {}; defaulting to {}",
extension,
DEFAULT_MIME_TYPE);
} catch (IOException | SdkClientException ex) {
} catch (SdkClientException ex) {
LOGGER.debug(
"Error getting or reading object from S3 for content key: {}. Failing StorageProvider read.",
"Error getting object from S3 for content key: {}. Failing StorageProvider read.",
contentKey,
ex);
throw new StorageException(
"Could not get or read object for content key: " + contentKey + ".");
}
if (DEFAULT_MIME_TYPE.equals(mimeType)) {
mimeType = s3Object.getObjectMetadata().getContentType();
byteSource = new S3ObjectByteSource(s3Object);
ObjectMetadata objectMetadata = s3Object.getObjectMetadata();
if (objectMetadata != null) {
if (StringUtils.isNotEmpty(objectMetadata.getContentType())) {
mimeType = objectMetadata.getContentType();
} else {
LOGGER.debug("Problem retrieving mime type of resource; defaulting to {}.", mimeType);
}
if (objectMetadata.getContentLength() > 0) {
size = objectMetadata.getContentLength();
} else {
LOGGER.debug("Problem retrieving size of resource; defaulting to {}.", size);
}
}
return new ContentItemImpl(
uri.getSchemeSpecificPart(), uri.getFragment(), byteSource, mimeType, filename, size, null);
Expand All @@ -363,6 +373,7 @@ String getFullContentPrefix(String id, String qualifier) {
if (StringUtils.isNotBlank(qualifier)) {
prefix = prefix.concat(qualifier + "/");
}
LOGGER.trace("Content prefix from ({}, {}): {}", id, qualifier, prefix);
return prefix;
}

Expand All @@ -375,11 +386,11 @@ private String getContentItemKey(URI uri) throws StorageException {
s3Bucket, getFullContentPrefix(uri.getSchemeSpecificPart(), uri.getFragment()))
.getObjectSummaries();
} catch (SdkClientException ex) {
LOGGER.debug("Unable to get object summaries for URI: {}", uri);
throw new StorageException(ex);
}
if (summaries == null || summaries.isEmpty()) {
LOGGER.debug(
"Unable to get content key as the list of S3 object summaries is null or empty.");
LOGGER.debug("Unable to get content key, as the list of object summaries is null or empty.");
return null;
}
return summaries.get(0).getKey();
Expand All @@ -388,21 +399,24 @@ s3Bucket, getFullContentPrefix(uri.getSchemeSpecificPart(), uri.getFragment()))
private ContentItem generateContentItem(ContentItem item) throws IOException {
LOGGER.trace("ENTERING: generateContentFile");

ByteSource byteSource;
ContentItemImpl contentItem;

try (InputStream inputStream = item.getInputStream()) {
byteSource = ByteSource.wrap(IOUtils.toByteArray(inputStream));
contentItem =
new ContentItemImpl(
item.getId(),
item.getQualifier(),
byteSource,
item.getMimeType().toString(),
item.getFilename(),
item.getSize(),
item.getMetacard());
}
ByteSource byteSource =
new ByteSource() {
@Override
public InputStream openStream() throws IOException {
return item.getInputStream();
}
};
contentItem =
new ContentItemImpl(
item.getId(),
item.getQualifier(),
byteSource,
item.getMimeType().toString(),
item.getFilename(),
item.getSize(),
item.getMetacard());

LOGGER.trace("EXITING: generateContentFile");

Expand All @@ -412,35 +426,57 @@ private ContentItem generateContentItem(ContentItem item) throws IOException {
public void init() {
LOGGER.debug("Initializing Amazon S3 Client...");
AmazonS3ClientBuilder amazonClientBuilder = AmazonS3ClientBuilder.standard();
if (StringUtils.isBlank(s3Endpoint) || StringUtils.isBlank(s3Region)) {
amazonS3 = amazonClientBuilder.build();
return;
}
AwsClientBuilder.EndpointConfiguration endpointConfiguration =
new AwsClientBuilder.EndpointConfiguration(s3Endpoint, s3Region);
if (StringUtils.isNotBlank(s3AccessKey) && StringUtils.isNotBlank(s3SecretKey)) {
amazonS3 =
amazonClientBuilder
.withCredentials(
new AWSStaticCredentialsProvider(
new BasicAWSCredentials(s3AccessKey, s3SecretKey)))
.withEndpointConfiguration(endpointConfiguration)
.build();
return;
try {
if (StringUtils.isBlank(s3Endpoint) || StringUtils.isBlank(s3Region)) {
amazonS3 = amazonClientBuilder.build();
return;
}
AwsClientBuilder.EndpointConfiguration endpointConfiguration =
new AwsClientBuilder.EndpointConfiguration(s3Endpoint, s3Region);
if (StringUtils.isNotBlank(s3AccessKey) && StringUtils.isNotBlank(s3SecretKey)) {
amazonS3 =
amazonClientBuilder
.withCredentials(
new AWSStaticCredentialsProvider(
new BasicAWSCredentials(s3AccessKey, s3SecretKey)))
.withEndpointConfiguration(endpointConfiguration)
.build();
return;
}
amazonS3 = amazonClientBuilder.withEndpointConfiguration(endpointConfiguration).build();
} catch (SdkClientException ex) {
LOGGER.warn(
"Problem initializing Amazon S3 client. Please configure the S3 Storage Provider using valid properties.",
ex.getMessage());
}
amazonS3 = amazonClientBuilder.withEndpointConfiguration(endpointConfiguration).build();
}

public void update(Map<String, ?> props) {
if (props != null) {
setS3Endpoint((String) props.get("s3Endpoint"));
setS3Region((String) props.get("s3Region"));
setS3AccessKey((String) props.get("s3AccessKey"));
setS3SecretKey((String) props.get("s3SecretKey"));
setS3Bucket((String) props.get("s3Bucket"));
setContentPrefix((String) props.get("contentPrefix"));
setAwsKmsKeyId((String) props.get("awsKmsKeyId"));
setUseSseS3Encryption((Boolean) props.get("useSseS3Encryption"));
if (props.get(S3_ENDPOINT_NAME) instanceof String) {
setS3Endpoint((String) props.get(S3_ENDPOINT_NAME));
}
if (props.get(S3_REGION_NAME) instanceof String) {
setS3Region((String) props.get(S3_REGION_NAME));
}
if (props.get(S3_ACCESS_KEY_NAME) instanceof String) {
setS3AccessKey((String) props.get(S3_ACCESS_KEY_NAME));
}
if (props.get(S3_SECRET_KEY_NAME) instanceof String) {
setS3SecretKey((String) props.get(S3_SECRET_KEY_NAME));
}
if (props.get(S3_BUCKET_NAME) instanceof String) {
setS3Bucket((String) props.get(S3_BUCKET_NAME));
}
if (props.get(CONTENT_PREFIX_NAME) instanceof String) {
setContentPrefix((String) props.get(CONTENT_PREFIX_NAME));
}
if (props.get(AWS_KMS_KEY_ID_NAME) instanceof String) {
setAwsKmsKeyId((String) props.get(AWS_KMS_KEY_ID_NAME));
}
if (props.get(USE_SSE_S3_ENCRYPTION_NAME) != null) {
setUseSseS3Encryption(Boolean.valueOf(props.get(USE_SSE_S3_ENCRYPTION_NAME).toString()));
}
}
init();
}
Expand All @@ -454,11 +490,11 @@ public void setS3Region(String s3Region) {
}

public void setS3AccessKey(String s3AccessKey) {
this.s3AccessKey = s3AccessKey;
this.s3AccessKey = encryptionService.decryptValue(s3AccessKey);
}

public void setS3SecretKey(String s3SecretKey) {
this.s3SecretKey = s3SecretKey;
this.s3SecretKey = encryptionService.decryptValue(s3SecretKey);
}

public void setS3Bucket(String s3Bucket) {
Expand All @@ -470,7 +506,7 @@ public void setContentPrefix(String contentPrefix) {
}

public void setAwsKmsKeyId(String awsKmsKeyId) {
this.awsKmsKeyId = awsKmsKeyId;
this.awsKmsKeyId = encryptionService.decryptValue(awsKmsKeyId);
}

public void setUseSseS3Encryption(boolean useSseS3Encryption) {
Expand Down
6 changes: 3 additions & 3 deletions s3/src/main/resources/OSGI-INF/blueprint/blueprint.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0">

<ext:property-placeholder/>

<reference id="encryptionService" interface="ddf.security.encryption.EncryptionService"/>

<bean id="s3StoreService" class="com.connexta.ddf.catalog.content.impl.S3StorageProvider" init-method="init">
<cm:managed-properties
persistent-id="com.connexta.ddf.catalog.content.impl.S3StorageProvider"
update-strategy="component-managed" update-method="update"/>
<argument ref="mimeTypeMapper"/>
<argument ref="encryptionService"/>
<property name="s3Endpoint" value=""/>
<property name="s3Region" value=""/>
<property name="s3Bucket" value=""/>
Expand All @@ -35,6 +37,4 @@

<service ref="s3StoreService" interface="ddf.catalog.content.StorageProvider"/>

<reference id="mimeTypeMapper" interface="ddf.mime.MimeTypeMapper"/>

</blueprint>
Loading

0 comments on commit 302d9b7

Please sign in to comment.