Skip to content

Commit

Permalink
Adjusts handling of asynchronous execution of extraction pipelines.
Browse files Browse the repository at this point in the history
Signed-off-by: Ralph Gasser <[email protected]>
  • Loading branch information
ppanopticon committed Nov 30, 2023
1 parent 9b07f60 commit 563f042
Show file tree
Hide file tree
Showing 8 changed files with 143 additions and 128 deletions.
Original file line number Diff line number Diff line change
@@ -1,102 +1,117 @@
package org.vitrivr.engine.core.config.pipeline.execution

import io.github.oshai.kotlinlogging.KLogger
import io.github.oshai.kotlinlogging.KotlinLogging
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.cancellable
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.takeWhile
import org.vitrivr.engine.core.config.pipeline.Pipeline
import org.vitrivr.engine.core.config.pipeline.PipelineBuilder
import org.vitrivr.engine.core.model.metamodel.Schema
import org.vitrivr.engine.core.operators.Operator
import org.vitrivr.engine.core.operators.ingest.AbstractSegmenter
import org.vitrivr.engine.core.operators.ingest.Extractor
import java.util.UUID
import java.util.concurrent.BlockingQueue
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.LinkedBlockingDeque


private val logger: KLogger = KotlinLogging.logger {}

/**
* A
* An execution environment for data ingest and retrieval.
*
* @author Ralph Gasser
* @version 1.0.0
*/
class ExecutionServer private constructor(schema: Schema){

companion object {

@Volatile private var instances: MutableMap<Schema, ExecutionServer> = mutableMapOf()

fun getInstance(schema: Schema) =
instances[schema] ?: synchronized(this) { // synchronized to avoid concurrency problem
instances[schema] ?: ExecutionServer(schema).also { instances[schema] = it }
}
}

class ExecutionServer {

/** The [ExecutorService] used to execution [] */
private val executor: ExecutorService = Executors.newCachedThreadPool()

/** The [CoroutineDispatcher] used for execution. */
private val dispatcher: CoroutineDispatcher = this.executor.asCoroutineDispatcher()

var indexJobQueue: BlockingQueue<Pair<Pipeline,UUID>> = LinkedBlockingDeque()

init {
this.run()
}
/** A [ConcurrentHashMap] of all ongoing [Job]s. */
private val jobs = ConcurrentHashMap<UUID, Job>()

fun isPending(uuid: UUID): Int {
return this.indexJobQueue.indexOf(this.indexJobQueue.find { it.second == uuid })
}
/** A [ConcurrentHashMap] of all ongoing [Job]s. */
private val jobHistory = Collections.synchronizedList(ArrayList<Triple<UUID, ExecutionStatus, Long>>(100))

fun enqueueIndexJob(pipeline: Pipeline): UUID {
val uuid = UUID.randomUUID()
return this.enqueueIndexJob(pipeline, uuid)
}
/**
* Checks the status of the [Job] with the provided [UUID].
*
* @param uuid [UUID] of the [Job] to check.
* @return [ExecutionStatus] of [Job]
*/
fun status(uuid: UUID): ExecutionStatus {
/* Check if job is still running. */
val jobs = this.jobs[uuid]
if (jobs != null) {
return ExecutionStatus.RUNNING
}

/* Check list for job. */
for (job in this.jobHistory) {
if (job.first == uuid) {
return job.second
}
}

fun enqueueIndexJob(pipeline: Pipeline, uuid: UUID): UUID{
this.indexJobQueue.add(Pair(pipeline, uuid))
return uuid;
/* Otherwise, job is unknown. */
return ExecutionStatus.UNKNOWN
}

/**
* Executes an extraction job using a [List] of [Extractor]s.
* Cancels the [Job] with the provided [UUID].
*
* @param extractors The [List] of [Extractor]s to execute.
* @param uuid [UUID] of the [Job] to check.
* @return True, if job is running, false otherwise.
*/
private fun extract(pipeline: Pipeline) = runBlocking {
val scope = CoroutineScope(this@ExecutionServer.dispatcher)
val jobs = pipeline.getLeaves().map { e -> scope.launch { e.toFlow(scope).takeWhile { it != AbstractSegmenter.TerminalRetrievable }.collect() } }
jobs.forEach { it.join() }
fun cancel(uuid: UUID): Boolean {
val job = this.jobs[uuid]
return if (job != null) {
job.cancel()
true
} else {
false
}
}


private fun run() {
Thread {
val running = true
while (running) {
val pipeline = indexJobQueue.take()
try {
this.extract(pipeline.first)
logger.debug { "Extraction with pipeline '${pipeline.second}' finished." }
} catch (e: Exception) {
logger.error { "Error while executing extraction job: ${e.message}" }
/**
* Executes an extraction [Pipeline] in a blocking fashion, i.e., the call will block until the [Pipeline] has been executed.
*
* @param pipeline The [Pipeline] to execute.
* @return [UUID] identifying the job.
*/
fun extractAsync(pipeline: Pipeline): UUID {
val jobId = UUID.randomUUID()
val scope = CoroutineScope(this@ExecutionServer.dispatcher) + CoroutineName("index-job-$jobId")
val job = scope.launch {
try {
val jobs = pipeline.getLeaves().map { e -> this.launch { e.toFlow(scope).cancellable().takeWhile { it != AbstractSegmenter.TerminalRetrievable }.collect() } }
jobs.forEach { it.join() }
this@ExecutionServer.jobHistory.add(Triple(jobId, ExecutionStatus.COMPLETED, System.currentTimeMillis()))
} catch (e: Throwable) {
this@ExecutionServer.jobHistory.add(Triple(jobId, ExecutionStatus.FAILED, System.currentTimeMillis()))
} finally {
this@ExecutionServer.jobs.remove(jobId)
if (this@ExecutionServer.jobHistory.size > 100) {
this@ExecutionServer.jobHistory.removeFirst()
}
// wait
Thread.sleep(10000)
}
}.start()

}
this.jobs[jobId] = job
return jobId
}

/**
* Shuts down the [ExecutorService] used by this [ExecutionServer]
* Executes an extraction [Pipeline] in a blocking fashion, i.e., the call will block until the [Pipeline] has been executed.
*
* This is mainly for testing purposes!
*
* @param pipeline The [Pipeline] to execute.
*/
fun shutdown() {
this.executor.shutdown()
fun extract(pipeline: Pipeline) {
val jobId = UUID.randomUUID()
val scope = CoroutineScope(this@ExecutionServer.dispatcher) + CoroutineName("index-job-$jobId")
runBlocking {
val jobs = pipeline.getLeaves().map { e -> scope.launch { e.toFlow(this).takeWhile { it != AbstractSegmenter.TerminalRetrievable }.collect() } }
jobs.forEach { it.join() }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.vitrivr.engine.core.config.pipeline.execution

/**
* A status enumeration for [ExecutionServer] jobs.
*
* @author Ralph Gasser
* @version 1.0.0
*/
enum class ExecutionStatus {
UNKNOWN,
RUNNING,
FAILED,
COMPLETED
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import io.github.oshai.kotlinlogging.KotlinLogging
import org.vitrivr.engine.core.config.IndexConfig
import org.vitrivr.engine.core.config.pipeline.Pipeline
import org.vitrivr.engine.core.config.pipeline.PipelineBuilder
import org.vitrivr.engine.core.config.pipeline.execution.ExecutionServer
import org.vitrivr.engine.core.context.IndexContext
import org.vitrivr.engine.core.context.QueryContext
import org.vitrivr.engine.core.database.Connection
Expand Down Expand Up @@ -43,9 +42,6 @@ class Schema(val name: String = "vitrivr", val connection: Connection) : Closeab
/** The [List] of [Exporter]s contained in this [Schema]. */
private val exporters: MutableList<Schema.Exporter> = mutableListOf()

/** The [List] of [Pipeline]s contained in this [Schema]. */
private val executionServer: ExecutionServer = ExecutionServer.getInstance(this)

/** The [List] of [Pipeline]s contained in this [Schema]. */
private val extractionPipelines: MutableMap<String, PipelineBuilder> = mutableMapOf()

Expand Down Expand Up @@ -121,8 +117,6 @@ class Schema(val name: String = "vitrivr", val connection: Connection) : Closeab
fun getPipelineBuilder(key: String): PipelineBuilder = this.extractionPipelines[key]
?: throw IllegalArgumentException("No pipeline with key '$key' found in schema '$name'.")

fun getExecutionServer(): ExecutionServer = this.executionServer

/**
* Closes this [Schema] and the associated database [Connection].
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import io.javalin.openapi.plugin.OpenApiPluginConfiguration
import io.javalin.openapi.plugin.SecurityComponentConfiguration
import io.javalin.openapi.plugin.swagger.SwaggerConfiguration
import io.javalin.openapi.plugin.swagger.SwaggerPlugin
import org.vitrivr.engine.core.model.metamodel.SchemaManager
import org.vitrivr.engine.core.config.pipeline.execution.ExecutionServer
import org.vitrivr.engine.core.model.metamodel.SchemaManager
import org.vitrivr.engine.query.execution.RetrievalRuntime
import org.vitrivr.engine.server.api.cli.Cli
import org.vitrivr.engine.server.api.cli.commands.SchemaCommand
Expand Down Expand Up @@ -38,6 +38,9 @@ fun main(args: Array<String>) {
manager.load(schema)
}

/* Execution server singleton for this instance. */
val executor = ExecutionServer()

/* Initialize retrieval runtime. */
val runtime = RetrievalRuntime()

Expand Down Expand Up @@ -74,7 +77,7 @@ fun main(args: Array<String>) {
)
)
}.routes {
configureApiRoutes(config.api, manager, runtime)
configureApiRoutes(config.api, manager, runtime, executor)
}.exception(ErrorStatusException::class.java) { e, ctx ->
ctx.status(e.statusCode).json(ErrorStatus(e.message))
}.exception(Exception::class.java) { e, ctx ->
Expand All @@ -84,7 +87,7 @@ fun main(args: Array<String>) {
/* Prepare CLI endpoint. */
val cli = Cli(manager)
for (schema in manager.listSchemas()) {
cli.register(SchemaCommand(schema, schema.getExecutionServer()))
cli.register(SchemaCommand(schema, executor))
}

/* Start the Javalin and CLI. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.core.NoOpCliktCommand
import com.github.ajalt.clikt.core.subcommands
import com.jakewharton.picnic.table
import org.vitrivr.engine.core.database.Initializer
import org.vitrivr.engine.core.model.metamodel.Schema
import org.vitrivr.engine.core.config.IndexConfig
import org.vitrivr.engine.core.config.pipeline.PipelineBuilder
import org.vitrivr.engine.core.config.pipeline.execution.ExecutionServer
import org.vitrivr.engine.core.database.Initializer
import org.vitrivr.engine.core.model.metamodel.Schema
import java.nio.file.Paths

/**
Expand Down Expand Up @@ -94,12 +94,16 @@ class SchemaCommand(private val schema: Schema, private val server: ExecutionSer
}
}

inner class Extract(private val schema: Schema, private val server: ExecutionServer) : CliktCommand(name = "extract", help = "Extracts data from a source and stores it in the schema.") {
/**
* [CliktCommand] to start an extraction job.
*/
inner class Extract(private val schema: Schema, private val executor: ExecutionServer) : CliktCommand(name = "extract", help = "Extracts data from a source and stores it in the schema.") {
override fun run() {
val config = IndexConfig.read(Paths.get(IndexConfig.DEFAULT_PIPELINE_PATH)) ?: return
val pipelineBuilder = PipelineBuilder.forConfig(this.schema, config)
val pipeline = pipelineBuilder.getPipeline()
schema.getExecutionServer().enqueueIndexJob(pipeline)
val uuid = this.executor.extractAsync(pipeline)
println("Started extraction job with UUID $uuid.")
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.vitrivr.engine.server.api.rest

import io.javalin.apibuilder.ApiBuilder.*
import org.vitrivr.engine.core.config.pipeline.execution.ExecutionServer
import org.vitrivr.engine.core.model.metamodel.SchemaManager
import org.vitrivr.engine.query.execution.RetrievalRuntime
import org.vitrivr.engine.server.api.rest.handlers.*
Expand All @@ -10,10 +11,10 @@ import org.vitrivr.engine.server.config.ApiConfig
/**
* Configures all the API routes.
*
* @param config The [VitrivrConfig] used for persistence.
* @param config The [ApiConfig] used for persistence.
* @param manager The [SchemaManager] used for persistence.
*/
fun configureApiRoutes(config: ApiConfig, manager: SchemaManager, retrievalRuntime: RetrievalRuntime) {
fun configureApiRoutes(config: ApiConfig, manager: SchemaManager, retrievalRuntime: RetrievalRuntime, executor: ExecutionServer) {
path("api") {
/* Add global routes (non-schema specific). */
path("schema") {
Expand All @@ -32,9 +33,9 @@ fun configureApiRoutes(config: ApiConfig, manager: SchemaManager, retrievalRunti
for (schema in manager.listSchemas()) {
path(schema.name) {
if (config.index) {
post("index") { ctx -> executeIngest(ctx, schema) }
post("index") { ctx -> executeIngest(ctx, schema, executor) }
path("index") {
get("{id}") { ctx -> executeIngestStatus(ctx, schema) }
get("{id}") { ctx -> executeIngestStatus(ctx, executor) }
}
}

Expand Down
Loading

0 comments on commit 563f042

Please sign in to comment.