Skip to content

Commit

Permalink
Merge branch 'develop' into bennettn/azure-gov-update
Browse files Browse the repository at this point in the history
  • Loading branch information
jsaun authored Jan 7, 2025
2 parents 69a00e6 + 95d2d53 commit 74c4680
Show file tree
Hide file tree
Showing 16 changed files with 493 additions and 31 deletions.
8 changes: 6 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,13 @@ The `IX_WORKFLOW_STORE_ENTRY_WS` index is removed from `WORKFLOW_STORE_ENTRY`.

The index had low cardinality and workflow pickup is faster without it. Migration time depends on workflow store size, but should be very fast for most installations. Terminal workflows are removed from the workflow store, so only running workflows contribute to the cost.

### Bug fixes and small changes
#### Index additions

* Changed default boot disk size from 10GB to 20GB in PipelinesAPI and Google Batch backends
The `IX_METADATA_ENTRY_WEU_MK` index is added to `METADATA_ENTRY`. In pre-release testing, the migration proceeded at about 3 million rows per minute. Please plan downtime accordingly.

### Reduce errors from boot disk filling up on Google Lifesciences API

* If Cromwell can't determine the size of the user command Docker image, it will increase Lifesciences API boot disk size by 30GB rather than 0. This should reduce incidence of tasks failing due to boot disk filling up.

#### Improved `size()` function performance on arrays

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ files {

metadata {
status: Succeeded
"outputs.docker_size_dockerhub.large_dockerhub_image_with_hash.bootDiskSize": 27
"outputs.docker_size_dockerhub.large_dockerhub_image_with_tag.bootDiskSize": 27
"outputs.docker_size_dockerhub.large_dockerhub_image_with_hash.bootDiskSize": 17
"outputs.docker_size_dockerhub.large_dockerhub_image_with_tag.bootDiskSize": 17
}

workflowType: WDL
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ files {

metadata {
status: Succeeded
"outputs.docker_size_gcr.large_gcr_image_with_hash.bootDiskSize": 27
"outputs.docker_size_gcr.large_gcr_image_with_tag.bootDiskSize": 27
"outputs.docker_size_gcr.large_gcr_image_with_hash.bootDiskSize": 17
"outputs.docker_size_gcr.large_gcr_image_with_tag.bootDiskSize": 17
}

workflowType: WDL
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<databaseChangeLog objectQuotingStrategy="QUOTE_ALL_OBJECTS"
xmlns="http://www.liquibase.org/xml/ns/dbchangelog"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.3.xsd">

<changeSet id="metadata_index_add_workflow_key" author="anichols" dbms="hsqldb,mariadb,mysql,postgresql">
<!--
This index creates at about 3M rows per minute on MySQL.
That would be an impossible multi-day downtime in Terra, so we manually pre-create the index asynchronously.
This changeset detects environments where this has been done and immediately marks itself as applied.
-->
<preConditions onFail="MARK_RAN">
<not>
<indexExists indexName="IX_METADATA_ENTRY_WEU_MK"/>
</not>
</preConditions>
<createIndex indexName="IX_METADATA_ENTRY_WEU_MK" tableName="METADATA_ENTRY">
<column name="WORKFLOW_EXECUTION_UUID"/>
<column name="METADATA_KEY"/>
</createIndex>
</changeSet>
</databaseChangeLog>
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
<include file="metadata_changesets/remove_non_summarizable_metadata_from_queue.xml" relativeToChangelogFile="true" />
<include file="metadata_changesets/update_metadata_archive_index.xml" relativeToChangelogFile="true" />
<include file="metadata_changesets/reset_archive_statuses_to_null.xml" relativeToChangelogFile="true" />
<include file="metadata_changesets/metadata_index_add_workflow_key.xml" relativeToChangelogFile="true" />
<!-- WARNING!
This changeset should always be last.
It it always run (and should always run last) to set table ownership correctly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,23 @@ trait MetadataEntryComponent {

// TODO: rename index via liquibase
def ixMetadataEntryWeu = index("METADATA_WORKFLOW_IDX", workflowExecutionUuid, unique = false)

/**
* Index designed to accelerate common key-specific queries across an entire workflow, such as:
* - Get workflow-level `outputs%` keys (no tasks, requireEmptyJobKey = true)
* - Get all `vmStartTime%`, `vmEndTime%`, `vmCostPerHour%` keys in the workflow (include tasks, requireEmptyJobKey = false)
*
* It is NOT good, as in may make actively slower, queries that reference a specific job. If we do more
* with getting metadata for individual jobs, recommend creating this index with all 5 columns:
* - WORKFLOW_EXECUTION_UUID, CALL_FQN, JOB_SCATTER_INDEX, JOB_RETRY_ATTEMPT, METADATA_KEY
*
* Do NOT recommend this alternate order, as wildcards in the middle are inefficient and this can be
* slower than no indexes. Tested with 20M row `69e8259c` workflow in October 2024.
* - WORKFLOW_EXECUTION_UUID, METADATA_KEY, CALL_FQN, JOB_SCATTER_INDEX, JOB_RETRY_ATTEMPT
*
* @return A reference to the index
*/
def ixMetadataEntryWeuMk = index("IX_METADATA_ENTRY_WEU_MK", (workflowExecutionUuid, metadataKey), unique = false)
}

val metadataEntries = TableQuery[MetadataEntries]
Expand Down
4 changes: 3 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ object Dependencies {
private val metrics3StatsdV = "4.2.0"
private val mockFtpServerV = "3.0.0"
private val mockitoV = "3.12.4"
private val mockitoInlineV = "2.8.9"
private val mockserverNettyV = "5.14.0"
private val mouseV = "1.0.11"

Expand Down Expand Up @@ -625,7 +626,8 @@ object Dependencies {
"org.scalatest" %% "scalatest" % scalatestV,
// Use mockito Java DSL directly instead of the numerous and often hard to keep updated Scala DSLs.
// See also scaladoc in common.mock.MockSugar and that trait's various usages.
"org.mockito" % "mockito-core" % mockitoV
"org.mockito" % "mockito-core" % mockitoV,
"org.mockito" % "mockito-inline" % mockitoInlineV
) ++ slf4jBindingDependencies // During testing, add an slf4j binding for _all_ libraries.

val kindProjectorPlugin = "org.typelevel" % "kind-projector" % kindProjectorV cross CrossVersion.full
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ class BatchPollResultMonitorActor(pollMonitorParameters: PollMonitorParameters)
case event if event.name == CallMetadataKeys.VmEndTime => event.offsetDateTime
}

override def extractVmInfoFromRunState(pollStatus: RunStatus): Option[InstantiatedVmInfo] =
pollStatus.instantiatedVmInfo

override def handleVmCostLookup(vmInfo: InstantiatedVmInfo) = {
val request = GcpCostLookupRequest(vmInfo, self)
params.serviceRegistry ! request
Expand All @@ -69,6 +72,7 @@ class BatchPollResultMonitorActor(pollMonitorParameters: PollMonitorParameters)
}

override def receive: Receive = {
case costResponse: GcpCostLookupResponse => handleCostResponse(costResponse)
case message: PollResultMessage =>
message match {
case ProcessThisPollResult(pollResult: RunStatus) => processPollResult(pollResult)
Expand All @@ -93,5 +97,4 @@ class BatchPollResultMonitorActor(pollMonitorParameters: PollMonitorParameters)

override def params: PollMonitorParameters = pollMonitorParameters

override def extractVmInfoFromRunState(pollStatus: RunStatus): Option[InstantiatedVmInfo] = Option.empty // TODO
}
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,18 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
} yield status
}

override val pollingResultMonitorActor: Option[ActorRef] = Option(
context.actorOf(
BatchPollResultMonitorActor.props(serviceRegistryActor,
workflowDescriptor,
jobDescriptor,
validatedRuntimeAttributes,
platform,
jobLogger
)
)
)

override def isTerminal(runStatus: RunStatus): Boolean =
runStatus match {
case _: RunStatus.TerminalRunStatus => true
Expand Down Expand Up @@ -1070,7 +1082,7 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
Future.fromTry {
Try {
runStatus match {
case RunStatus.Aborted(_) => AbortedExecutionHandle
case RunStatus.Aborted(_, _) => AbortedExecutionHandle
case failedStatus: RunStatus.UnsuccessfulRunStatus => handleFailedRunStatus(failedStatus)
case unknown =>
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cromwell.backend.google.batch.api.request

import com.google.api.gax.rpc.{ApiException, StatusCode}
import com.google.cloud.batch.v1.AllocationPolicy.ProvisioningModel
import com.google.cloud.batch.v1._
import com.typesafe.scalalogging.LazyLogging
import cromwell.backend.google.batch.actors.BatchApiAbortClient.{
Expand All @@ -11,6 +12,8 @@ import cromwell.backend.google.batch.api.BatchApiRequestManager._
import cromwell.backend.google.batch.api.{BatchApiRequestManager, BatchApiResponse}
import cromwell.backend.google.batch.models.{GcpBatchExitCode, RunStatus}
import cromwell.core.ExecutionEvent
import cromwell.services.cost.InstantiatedVmInfo
import cromwell.services.metadata.CallMetadataKeys

import scala.annotation.unused
import scala.concurrent.{ExecutionContext, Future, Promise}
Expand Down Expand Up @@ -136,14 +139,32 @@ object BatchRequestExecutor {
)
lazy val exitCode = findBatchExitCode(events)

// Get vm info for this job
val allocationPolicy = job.getAllocationPolicy

// Get instances that can be created with this AllocationPolicy, only instances[0] is supported
val instancePolicy = allocationPolicy.getInstances(0).getPolicy
val machineType = instancePolicy.getMachineType
val preemtible = instancePolicy.getProvisioningModelValue == ProvisioningModel.PREEMPTIBLE.getNumber

// location list = [regions/us-central1, zones/us-central1-b], region is the first element
val location = allocationPolicy.getLocation.getAllowedLocationsList.get(0)
val region =
if (location.isEmpty)
"us-central1"
else
location.split("/").last

val instantiatedVmInfo = Some(InstantiatedVmInfo(region, machineType, preemtible))

if (job.getStatus.getState == JobStatus.State.SUCCEEDED) {
RunStatus.Success(events)
RunStatus.Success(events, instantiatedVmInfo)
} else if (job.getStatus.getState == JobStatus.State.RUNNING) {
RunStatus.Running(events)
RunStatus.Running(events, instantiatedVmInfo)
} else if (job.getStatus.getState == JobStatus.State.FAILED) {
RunStatus.Failed(exitCode, events)
RunStatus.Failed(exitCode, events, instantiatedVmInfo)
} else {
RunStatus.Initializing(events)
RunStatus.Initializing(events, instantiatedVmInfo)
}
}

Expand All @@ -152,12 +173,27 @@ object BatchRequestExecutor {
GcpBatchExitCode.fromEventMessage(e.name.toLowerCase)
}.headOption

private def getEventList(events: List[StatusEvent]): List[ExecutionEvent] =
events.map { e =>
private def getEventList(events: List[StatusEvent]): List[ExecutionEvent] = {
val startedRegex = ".*SCHEDULED to RUNNING.*".r
val endedRegex = ".*RUNNING to.*".r // can be SUCCEEDED or FAILED
events.flatMap { e =>
val time = java.time.Instant
.ofEpochSecond(e.getEventTime.getSeconds, e.getEventTime.getNanos.toLong)
.atOffset(java.time.ZoneOffset.UTC)
ExecutionEvent(name = e.getDescription, offsetDateTime = time)
val eventType = e.getDescription match {
case startedRegex() => CallMetadataKeys.VmStartTime
case endedRegex() => CallMetadataKeys.VmEndTime
case _ => e.getType
}
val executionEvents = List(ExecutionEvent(name = eventType, offsetDateTime = time))

// Add an additional ExecutionEvent to capture other info if the event is a VmStartTime or VmEndTime
if (eventType == CallMetadataKeys.VmStartTime || eventType == CallMetadataKeys.VmEndTime) {
executionEvents :+ ExecutionEvent(name = e.getDescription, offsetDateTime = time)
} else {
executionEvents
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ object GcpBatchRuntimeAttributes {

val BootDiskSizeKey = "bootDiskSizeGb"
private val bootDiskValidationInstance = new IntRuntimeAttributesValidation(BootDiskSizeKey)
private val BootDiskDefaultValue = WomInteger(20)
private val BootDiskDefaultValue = WomInteger(10)

val NoAddressKey = "noAddress"
private val noAddressValidationInstance = new BooleanRuntimeAttributesValidation(NoAddressKey)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,32 @@
package cromwell.backend.google.batch.models

import cromwell.core.ExecutionEvent
import cromwell.services.cost.InstantiatedVmInfo

sealed trait RunStatus {
def eventList: Seq[ExecutionEvent]
def toString: String

val instantiatedVmInfo: Option[InstantiatedVmInfo]
}

object RunStatus {

case class Initializing(eventList: Seq[ExecutionEvent]) extends RunStatus { override def toString = "Initializing" }
case class AwaitingCloudQuota(eventList: Seq[ExecutionEvent]) extends RunStatus {
case class Initializing(eventList: Seq[ExecutionEvent], instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty)
extends RunStatus { override def toString = "Initializing" }
case class AwaitingCloudQuota(eventList: Seq[ExecutionEvent],
instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty
) extends RunStatus {
override def toString = "AwaitingCloudQuota"
}

case class Running(eventList: Seq[ExecutionEvent]) extends RunStatus { override def toString = "Running" }
case class Running(eventList: Seq[ExecutionEvent], instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty)
extends RunStatus { override def toString = "Running" }

sealed trait TerminalRunStatus extends RunStatus

case class Success(eventList: Seq[ExecutionEvent]) extends TerminalRunStatus {
case class Success(eventList: Seq[ExecutionEvent], instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty)
extends TerminalRunStatus {
override def toString = "Success"
}

Expand All @@ -29,7 +37,8 @@ object RunStatus {

final case class Failed(
exitCode: Option[GcpBatchExitCode],
eventList: Seq[ExecutionEvent]
eventList: Seq[ExecutionEvent],
instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty
) extends UnsuccessfulRunStatus {
override def toString = "Failed"

Expand Down Expand Up @@ -58,7 +67,9 @@ object RunStatus {
}
}

final case class Aborted(eventList: Seq[ExecutionEvent]) extends UnsuccessfulRunStatus {
final case class Aborted(eventList: Seq[ExecutionEvent],
instantiatedVmInfo: Option[InstantiatedVmInfo] = Option.empty
) extends UnsuccessfulRunStatus {
override def toString = "Aborted"

override val exitCode: Option[GcpBatchExitCode] = None
Expand Down
Loading

0 comments on commit 74c4680

Please sign in to comment.