Skip to content

Commit

Permalink
Merge branch 'develop' into WX-1307
Browse files Browse the repository at this point in the history
  • Loading branch information
JVThomas authored Oct 13, 2023
2 parents ec43d25 + b6aae14 commit d30c61d
Show file tree
Hide file tree
Showing 70 changed files with 2,323 additions and 986 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/chart_update_on_merge.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
- name: Fetch Jira ID from the commit message
id: fetch-jira-id
run: |
JIRA_ID=$(echo '${{ github.event.pull_request.title }}' | grep -Eo '\[?[A-Z][A-Z]+-[0-9]+\]?')
JIRA_ID=$(echo '${{ github.event.pull_request.title }}' | grep -Eo '[A-Z][A-Z]+-[0-9]+' | xargs echo -n | tr '[:space:]' ',')
[[ -z "$JIRA_ID" ]] && { echo "No Jira ID found in $1" ; exit 1; }
echo "JIRA_ID=$JIRA_ID" >> $GITHUB_OUTPUT
- name: Clone Cromwell
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/cromwell_unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ jobs:

#Invoke SBT to run all unit tests for Cromwell.
- name: Run tests
env:
AZURE_CLIENT_ID: ${{ secrets.VAULT_AZURE_CENTAUR_CLIENT_ID }}
AZURE_CLIENT_SECRET: ${{ secrets.VAULT_AZURE_CENTAUR_CLIENT_SECRET }}
AZURE_TENANT_ID: ${{ secrets.VAULT_AZURE_CENTAUR_TENANT_ID }}
run: |
set -e
sbt "test"
6 changes: 3 additions & 3 deletions .github/workflows/integration_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ jobs:
matrix:
# Batch test fixes to land later
include:
# - build_type: centaurGcpBatch
# build_mysql: 5.7
# friendly_name: Centaur GCP Batch with MySQL 5.7
- build_type: centaurGcpBatch
build_mysql: 5.7
friendly_name: Centaur GCP Batch with MySQL 5.7
- build_type: centaurPapiV2beta
build_mysql: 5.7
friendly_name: Centaur Papi V2 Beta with MySQL 5.7
Expand Down
15 changes: 9 additions & 6 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@

## 86 Release Notes

### HTTPFilesystem Improvements
### GCP Batch
Cromwell now supports the GCP Batch backend for running workflows. See `Backend` in [ReadTheDocs](https://cromwell.readthedocs.io/en/stable/) for more information.

WDL `size` engine function now works for HTTP files.

### Azure ApplicationInsights Logging Support
Cromwell can now send logs to Azure Application Insights. To enable, set environment
variable `APPLICATIONINSIGHTS_INSTRUMENTATIONKEY` to your account's key. [See here for information.](https://learn.microsoft.com/en-us/azure/azure-monitor/app/sdk-connection-string)
### Workflow Completion Callback
Cromwell can be configured to send a POST request to a specified URL when a workflow completes. The request body includes the workflow ID, terminal state,
and (if applicable) final outputs or error message. See `WorkflowCallback` in [ReadTheDocs](https://cromwell.readthedocs.io/en/stable/) for more information.

### Other Improvements
* Cromwell will now parallelize the downloads of DRS files that resolve to signed URLs. This significantly reduces the time localization takes in certain situations.
* WDL size engine function now works for HTTP files
* Improved Cromwell's handling of docker manifests. Additional logging information is emitted, and Cromwell will fall back to using OCI manifests if it encounters an error with a Docker Image Manifest V2.

## 85 Release Notes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,6 @@

package com.azure.storage.blob.nio;

import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobListDetails;
import com.azure.storage.blob.models.ListBlobsOptions;

import java.io.IOException;
import java.nio.file.DirectoryIteratorException;
import java.nio.file.DirectoryStream;
Expand All @@ -18,6 +12,12 @@
import java.util.NoSuchElementException;
import java.util.Set;

import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobListDetails;
import com.azure.storage.blob.models.ListBlobsOptions;

/**
* A type for iterating over the contents of a directory.
*
Expand Down Expand Up @@ -88,7 +88,7 @@ private static class AzureDirectoryIterator implements Iterator<Path> {
if (path.isRoot()) {
String containerName = path.toString().substring(0, path.toString().length() - 1);
AzureFileSystem afs = ((AzureFileSystem) path.getFileSystem());
containerClient = ((AzureFileStore) afs.getFileStore(containerName)).getContainerClient();
containerClient = ((AzureFileStore) afs.getFileStore()).getContainerClient();
} else {
AzureResource azureResource = new AzureResource(path);
listOptions.setPrefix(azureResource.getBlobClient().getBlobName() + AzureFileSystem.PATH_SEPARATOR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,6 @@

package com.azure.storage.blob.nio;

import com.azure.core.credential.AzureSasCredential;
import com.azure.core.http.HttpClient;
import com.azure.core.http.policy.HttpLogDetailLevel;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.implementation.util.BlobUserAgentModificationPolicy;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.common.policy.RequestRetryOptions;
import com.azure.storage.common.policy.RetryPolicyType;

import java.io.IOException;
import java.nio.file.FileStore;
import java.nio.file.FileSystem;
Expand All @@ -27,14 +14,31 @@
import java.nio.file.attribute.FileAttributeView;
import java.nio.file.attribute.UserPrincipalLookupService;
import java.nio.file.spi.FileSystemProvider;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;

import com.azure.core.credential.AzureSasCredential;
import com.azure.core.http.HttpClient;
import com.azure.core.http.policy.HttpLogDetailLevel;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.implementation.util.BlobUserAgentModificationPolicy;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.azure.storage.common.policy.RequestRetryOptions;
import com.azure.storage.common.policy.RetryPolicyType;

/**
* Implement's Java's {@link FileSystem} interface for Azure Blob Storage.
Expand Down Expand Up @@ -67,6 +71,11 @@ public final class AzureFileSystem extends FileSystem {
*/
public static final String AZURE_STORAGE_SAS_TOKEN_CREDENTIAL = "AzureStorageSasTokenCredential";

/**
* Expected type: String
*/
public static final String AZURE_STORAGE_PUBLIC_ACCESS_CREDENTIAL = "AzureStoragePublicAccessCredential";

/**
* Expected type: com.azure.core.http.policy.HttpLogLevelDetail
*/
Expand Down Expand Up @@ -159,10 +168,12 @@ public final class AzureFileSystem extends FileSystem {
private final Long putBlobThreshold;
private final Integer maxConcurrencyPerRequest;
private final Integer downloadResumeRetries;
private final Map<String, FileStore> fileStores;
private FileStore defaultFileStore;
private boolean closed;

private AzureSasCredential currentActiveSasCredential;
private Instant expiry;

AzureFileSystem(AzureFileSystemProvider parentFileSystemProvider, String endpoint, Map<String, ?> config)
throws IOException {
// A FileSystem should only ever be instantiated by a provider.
Expand All @@ -179,9 +190,10 @@ public final class AzureFileSystem extends FileSystem {
this.putBlobThreshold = (Long) config.get(AZURE_STORAGE_PUT_BLOB_THRESHOLD);
this.maxConcurrencyPerRequest = (Integer) config.get(AZURE_STORAGE_MAX_CONCURRENCY_PER_REQUEST);
this.downloadResumeRetries = (Integer) config.get(AZURE_STORAGE_DOWNLOAD_RESUME_RETRIES);
this.currentActiveSasCredential = (AzureSasCredential) config.get(AZURE_STORAGE_SAS_TOKEN_CREDENTIAL);

// Initialize and ensure access to FileStores.
this.fileStores = this.initializeFileStores(config);
this.defaultFileStore = this.initializeFileStore(config);
} catch (RuntimeException e) {
throw LoggingUtility.logError(LOGGER, new IllegalArgumentException("There was an error parsing the "
+ "configurations map. Please ensure all fields are set to a legal value of the correct type.", e));
Expand Down Expand Up @@ -221,7 +233,7 @@ public FileSystemProvider provider() {
@Override
public void close() throws IOException {
this.closed = true;
this.parentFileSystemProvider.closeFileSystem(this.getFileSystemUrl());
this.parentFileSystemProvider.closeFileSystem(this.getFileSystemUrl() + "/" + defaultFileStore.name());
}

/**
Expand Down Expand Up @@ -282,9 +294,7 @@ public Iterable<Path> getRootDirectories() {
If the file system was set to use all containers in the account, the account will be re-queried and the
list may grow or shrink if containers were added or deleted.
*/
return fileStores.keySet().stream()
.map(name -> this.getPath(name + AzurePath.ROOT_DIR_SUFFIX))
.collect(Collectors.toList());
return Arrays.asList(this.getPath(defaultFileStore.name() + AzurePath.ROOT_DIR_SUFFIX));
}

/**
Expand All @@ -304,7 +314,7 @@ public Iterable<FileStore> getFileStores() {
If the file system was set to use all containers in the account, the account will be re-queried and the
list may grow or shrink if containers were added or deleted.
*/
return this.fileStores.values();
return Arrays.asList(defaultFileStore);
}

/**
Expand Down Expand Up @@ -397,6 +407,12 @@ private BlobServiceClient buildBlobServiceClient(String endpoint, Map<String, ?>
builder.credential((StorageSharedKeyCredential) config.get(AZURE_STORAGE_SHARED_KEY_CREDENTIAL));
} else if (config.containsKey(AZURE_STORAGE_SAS_TOKEN_CREDENTIAL)) {
builder.credential((AzureSasCredential) config.get(AZURE_STORAGE_SAS_TOKEN_CREDENTIAL));
this.setExpiryFromSAS((AzureSasCredential) config.get(AZURE_STORAGE_SAS_TOKEN_CREDENTIAL));
} else if (config.containsKey(AZURE_STORAGE_PUBLIC_ACCESS_CREDENTIAL)) {
// The Blob Service Client Builder requires at least one kind of authentication to make requests
// For public files however, this is unnecessary. This key-value pair is to denote the case
// explicitly when we supply a placeholder SAS credential to bypass this requirement.
builder.credential((AzureSasCredential) config.get(AZURE_STORAGE_PUBLIC_ACCESS_CREDENTIAL));
} else {
throw LoggingUtility.logError(LOGGER, new IllegalArgumentException(String.format("No credentials were "
+ "provided. Please specify one of the following when constructing an AzureFileSystem: %s, %s.",
Expand Down Expand Up @@ -430,23 +446,17 @@ private BlobServiceClient buildBlobServiceClient(String endpoint, Map<String, ?>
return builder.buildClient();
}

private Map<String, FileStore> initializeFileStores(Map<String, ?> config) throws IOException {
String fileStoreNames = (String) config.get(AZURE_STORAGE_FILE_STORES);
if (CoreUtils.isNullOrEmpty(fileStoreNames)) {
private FileStore initializeFileStore(Map<String, ?> config) throws IOException {
String fileStoreName = (String) config.get(AZURE_STORAGE_FILE_STORES);
if (CoreUtils.isNullOrEmpty(fileStoreName)) {
throw LoggingUtility.logError(LOGGER, new IllegalArgumentException("The list of FileStores cannot be "
+ "null."));
}

Boolean skipConnectionCheck = (Boolean) config.get(AZURE_STORAGE_SKIP_INITIAL_CONTAINER_CHECK);
Map<String, FileStore> fileStores = new HashMap<>();
for (String fileStoreName : fileStoreNames.split(",")) {
FileStore fs = new AzureFileStore(this, fileStoreName, skipConnectionCheck);
if (this.defaultFileStore == null) {
this.defaultFileStore = fs;
}
fileStores.put(fileStoreName, fs);
}
return fileStores;
this.defaultFileStore = new AzureFileStore(this, fileStoreName, skipConnectionCheck);
return this.defaultFileStore;
}

@Override
Expand All @@ -470,12 +480,11 @@ Path getDefaultDirectory() {
return this.getPath(this.defaultFileStore.name() + AzurePath.ROOT_DIR_SUFFIX);
}

FileStore getFileStore(String name) throws IOException {
FileStore store = this.fileStores.get(name);
if (store == null) {
throw LoggingUtility.logError(LOGGER, new IOException("Invalid file store: " + name));
FileStore getFileStore() throws IOException {
if (this.defaultFileStore == null) {
throw LoggingUtility.logError(LOGGER, new IOException("FileStore not initialized"));
}
return store;
return defaultFileStore;
}

Long getBlockSize() {
Expand All @@ -489,4 +498,37 @@ Long getPutBlobThreshold() {
Integer getMaxConcurrencyPerRequest() {
return this.maxConcurrencyPerRequest;
}

public String createSASAppendedURL(String url) throws IllegalStateException {
if (Objects.isNull(currentActiveSasCredential)) {
throw new IllegalStateException("No current active SAS credential present");
}
return url + "?" + currentActiveSasCredential.getSignature();
}

public Optional<Instant> getExpiry() {
return Optional.ofNullable(expiry);
}

private void setExpiryFromSAS(AzureSasCredential token) {
List<String> strings = Arrays.asList(token.getSignature().split("&"));
Optional<String> expiryString = strings.stream()
.filter(s -> s.startsWith("se"))
.findFirst()
.map(s -> s.replaceFirst("se=",""))
.map(s -> s.replace("%3A", ":"));
this.expiry = expiryString.map(es -> Instant.parse(es)).orElse(null);
}

/**
* Return true if this filesystem has SAS credentials with an expiration data attached, and we're within
* `buffer` of the expiration. Return false if our credentials don't come with an expiration, or we
* aren't within `buffer` of our expiration.
*/
public boolean isExpired(Duration buffer) {
return Optional.ofNullable(this.expiry)
.map(e -> Instant.now().plus(buffer).isAfter(e))
.orElse(false);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
Expand Down Expand Up @@ -695,16 +696,23 @@ public void copy(Path source, Path destination, CopyOption... copyOptions) throw
// Remove accepted options as we find them. Anything left we don't support.
boolean replaceExisting = false;
List<CopyOption> optionsList = new ArrayList<>(Arrays.asList(copyOptions));
if (!optionsList.contains(StandardCopyOption.COPY_ATTRIBUTES)) {
throw LoggingUtility.logError(ClientLoggerHolder.LOGGER, new UnsupportedOperationException(
"StandardCopyOption.COPY_ATTRIBUTES must be specified as the service will always copy "
+ "file attributes."));
// NOTE: We're going to assume COPY_ATTRIBUTES as a default copy option (but can still be provided and handled safely)
// REPLACE_EXISTING must still be provided if you want to replace existing file

// if (!optionsList.contains(StandardCopyOption.COPY_ATTRIBUTES)) {
// throw LoggingUtility.logError(ClientLoggerHolder.LOGGER, new UnsupportedOperationException(
// "StandardCopyOption.COPY_ATTRIBUTES must be specified as the service will always copy "
// + "file attributes."));
// }
if(optionsList.contains(StandardCopyOption.COPY_ATTRIBUTES)) {
optionsList.remove(StandardCopyOption.COPY_ATTRIBUTES);
}
optionsList.remove(StandardCopyOption.COPY_ATTRIBUTES);

if (optionsList.contains(StandardCopyOption.REPLACE_EXISTING)) {
replaceExisting = true;
optionsList.remove(StandardCopyOption.REPLACE_EXISTING);
}

if (!optionsList.isEmpty()) {
throw LoggingUtility.logError(ClientLoggerHolder.LOGGER,
new UnsupportedOperationException("Unsupported copy option found. Only "
Expand Down Expand Up @@ -760,9 +768,16 @@ public void copy(Path source, Path destination, CopyOption... copyOptions) throw
customer scenarios and how many virtual directories they copy, it could be better to check the directory status
first and then do a copy or createDir, which would always be two requests for all resource types.
*/

try {
/*
Format the url by appending the SAS token as a param, otherwise the copy request will fail.
AzureFileSystem has been updated to handle url transformation via createSASAuthorizedURL()
*/
AzureFileSystem afs = (AzureFileSystem) sourceRes.getPath().getFileSystem();
String sasAppendedSourceUrl = afs.createSASAppendedURL(sourceRes.getBlobClient().getBlobUrl());
SyncPoller<BlobCopyInfo, Void> pollResponse =
destinationRes.getBlobClient().beginCopy(sourceRes.getBlobClient().getBlobUrl(), null, null, null,
destinationRes.getBlobClient().beginCopy(sasAppendedSourceUrl, null, null, null,
null, requestConditions, null);
pollResponse.waitForCompletion(Duration.ofSeconds(COPY_TIMEOUT_SECONDS));
} catch (BlobStorageException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ public BlobClient toBlobClient() throws IOException {
String fileStoreName = this.rootToFileStore(root.toString());

BlobContainerClient containerClient =
((AzureFileStore) this.parentFileSystem.getFileStore(fileStoreName)).getContainerClient();
((AzureFileStore) this.parentFileSystem.getFileStore()).getContainerClient();

String blobName = this.withoutRoot();
if (blobName.isEmpty()) {
Expand Down
6 changes: 4 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,11 @@ lazy val azureBlobNio = (project in file("azure-blob-nio"))
lazy val azureBlobFileSystem = (project in file("filesystems/blob"))
.withLibrarySettings("cromwell-azure-blobFileSystem", blobFileSystemDependencies)
.dependsOn(core)
.dependsOn(core % "test->test")
.dependsOn(common % "test->test")
.dependsOn(cloudSupport)
.dependsOn(azureBlobNio)
.dependsOn(core % "test->test")
.dependsOn(common % "test->test")
.dependsOn(azureBlobNio % "test->test")

lazy val awsS3FileSystem = (project in file("filesystems/s3"))
.withLibrarySettings("cromwell-aws-s3filesystem", s3FileSystemDependencies)
Expand Down Expand Up @@ -165,6 +166,7 @@ lazy val databaseMigration = (project in file("database/migration"))

lazy val dockerHashing = project
.withLibrarySettings("cromwell-docker-hashing", dockerHashingDependencies)
.dependsOn(cloudSupport)
.dependsOn(core)
.dependsOn(core % "test->test")
.dependsOn(common % "test->test")
Expand Down
Loading

0 comments on commit d30c61d

Please sign in to comment.