From 6a1db02499d843d546f2bb67c97e66be8ba2cd73 Mon Sep 17 00:00:00 2001 From: michael_b Date: Fri, 9 Feb 2024 16:15:31 +0100 Subject: [PATCH 1/7] add new processes --- R/api.R | 2 + R/apply_prediction.R | 135 +++++++++++++++++ R/train_model.R | 336 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 473 insertions(+) create mode 100644 R/apply_prediction.R create mode 100644 R/train_model.R diff --git a/R/api.R b/R/api.R index 56a6dec..46852ff 100644 --- a/R/api.R +++ b/R/api.R @@ -315,5 +315,7 @@ addEndpoint = function() { Session$assignProcess(multiply) Session$assignProcess(divide) Session$assignProcess(evi) + Session$assignProcess(train_model) + Session$assignProcess(apply_prediction) } diff --git a/R/apply_prediction.R b/R/apply_prediction.R new file mode 100644 index 0000000..5e13b0c --- /dev/null +++ b/R/apply_prediction.R @@ -0,0 +1,135 @@ +#' @export +apply_prediction_opp = function(data, model_id, keep_bands = FALSE, job) { + + message("\napply_prediction called...") + + message("\nCall parameters:") + message("\ndata:") + print(data) + + message("\nmodel_id:") + message(model_id) + + tryCatch({ + message("\nTry loading the model from user workspace...") + + path_to_model = paste0(Session$getConfig()$workspace.path, "/", model_id, ".rds") + + message("Path to model: ", path_to_model) + + # get model from user workspace + model = readRDS(path_to_model) + + message("Model found in: ", path_to_model) + message("\nModel loaded successfully!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + stop("No Model found!") + }) + + # get band names for to later create a data.frame + band_names = names(data) + + # the model and band_names need to be loaded into a tempdir, so that it can be accessed in a new process in "FUN" from "apply_pixel" (see below) + t = tempdir() + + message("Current tempdir: ", t) + + # save variable in tempdir + saveRDS(model, paste0(t, "/model.rds")) + saveRDS(band_names, paste0(t, "/band_names.rds")) + + # save tempdir string to later retrieve files in another process + Sys.setenv(TEMP_DIR = t) + + # creates two bands "predicted_classes", "class_confidence" in the datacube + cube = gdalcubes::apply_pixel( + data, + names = c("predicted_class", "class_confidence"), + FUN = function(band_values_vector) + { + library(caret) + library(stringr) + library(randomForest) + + # load tempdir path from Global Env, to ensure its the same as in the process above + tmp = Sys.getenv("TEMP_DIR") + message("Tempdir in FUN: ", tmp) + + # load variables needed for prediction + model = readRDS(paste0(tmp, "/model.rds")) + band_names = readRDS(paste0(tmp, "/band_names.rds")) + + # named vector for df creation + named_vector = setNames(band_values_vector, band_names) + + # create 1-row df per pixel of the datacube + band_value_df = named_vector |> t() |> as.data.frame() + + tryCatch({ + predicted_class = stats::predict(model, newdata = band_value_df) + class_confidence = stats::predict(model, newdata = band_value_df, type = "prob") + + # parse Integer value from string + predicted_class <- predicted_class |> + base::as.character() |> + stringr::str_extract_all("\\d+") |> + base::as.numeric() + + # determine confidence value for the classified class + highest_class_confidence = base::apply(class_confidence, 1, base::max) + + return(c(predicted_class, highest_class_confidence)) + }, + error = function(err) + { + stop("Error in apply_pixel!") + }) + + }, + keep_bands = keep_bands) + + + message("\nDatacube: ") + print(cube) + + return(cube) +} + + +#' apply_prediction +apply_prediction <- Process$new( + id = "apply_prediction", + description = "Apply a machine-learning model on each pixel of the datacube. This creates 2 new bands in the cube containing the predicted classes per pixel and the propability of the predicted class (class confidence). Bands of the source cube can optionally be included. This Algorithm will only save integer values in the datacube. If the levels of the model are factor values without an integer part like: 'expl', gdalcubes will assign a integer value instead. Therefore it is advised to provide a model with levels in the form of 'X1' 'X2', ..., 'Xn'.", + categories = as.array("cubes", "machine-learning"), + summary = "Apply a machine-learning based prediction on a datacube", + parameters = list( + Parameter$new( + name = "data", + description = "A data cube with bands.", + schema = list( + type = "object", + subtype = "raster-cube" + ) + ), + Parameter$new( + name = "model_id", + description = "Id of the model that should be used for prediction. The model will be searched in the user workspace.", + schema = list( + type = "string" + ) + ), + Parameter$new( + name = "keep_bands", + description = "Keep bands of input data cube, defaults to FALSE, i.e. original bands will be dropped.", + schema = list( + type = "boolean" + ) + ) + ), + returns = eo_datacube, + operation = apply_prediction_opp +) diff --git a/R/train_model.R b/R/train_model.R new file mode 100644 index 0000000..bc9753c --- /dev/null +++ b/R/train_model.R @@ -0,0 +1,336 @@ +train_model_opp = function(data, model_type, labeled_polygons, hyperparameters = NULL, save_model = FALSE, model_id = NULL, job) +{ + # show call stack for debugging + message("train_model called...") + + message("\nCall parameters: ") + message("\ndata: ") + print(data) + message("\nmodel_type: ") + message(model_type) + + tryCatch({ + message("\nlabeled_polygons: ") + + # read GeoJSON data as sf + labeled_polygons = sf::st_read(labeled_polygons, quiet = TRUE) + + # change CRS to cube CRS + labeled_polygons = sf::st_transform(labeled_polygons, crs = gdalcubes::srs(data)) + + message("Training Polygons sucessfully loaded!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + stop("couldn't load training polygons!") + }) + + message("\nhyperparameters: ") + if (is.null(hyperparameters)) + { + message("No Hyperparameters passed!") + } + else + { + for (name in names(hyperparameters)) + { + message(paste0(name, ": ", hyperparameters[name])) + } + } + + message("\nsave_model:") + message(save_model) + + message("\nmodel_id:") + print(model_id) # to also show "NULL" + + + if (!is.numeric(labeled_polygons$class)) + { + stop("class labels need to be numeric") + } + + + # obvios boolean check for mor readibility + if (save_model == TRUE && is.null(model_id)) + { + message("If the model should be safed, a model_id needs to be given!") + stop("If the model should be safed, a model_id needs to be given!") + } + + tryCatch({ + message("\nExtract features...") + + # extract features from cube + features = gdalcubes::extract_geom(data, labeled_polygons) + + message("all features extracted!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + stop("Features couldn't be extracted") + }) + + # add FID for merge with 'features' + labeled_polygons$FID = rownames(labeled_polygons) + + tryCatch({ + message("\nMerge features with training data...") + + # this df contains all information from the datacube and the labeled_polgons + training_df = merge(labeled_polygons, features, by = "FID") + + message("Merging complete!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + stop("Merging data.frames failed") + }) + + # make copy to filter out values not needed for training + training_df_filtered = training_df + + training_df_filtered$time = NULL + training_df_filtered$geometry = NULL + # convert numbers to "X" to make valid class names + training_df_filtered$class = base::make.names(training_df_filtered$class) + + message("\nclasses in training_df ") + print(unique(training_df_filtered$class)) + + message("\nFeatures in 'training_df': ", nrow(training_df)) + + #TODO: find reasonable threshold + if (nrow(training_df) > 10000) + { + tryCatch({ + message("\nReduce number of features...") + # data frame for later storage + training_df_reduced = data.frame() + + # from all data with the same FID (same polygon) take only 50% of the + # features for each training polygon as they are assumed to carry similar information + for (i in as.numeric(unique(training_df_filtered$FID))) + { + #TODO: find better "reducing" function + sub_df = training_df_filtered[training_df_filtered$FID == i,] + + # take 50% of sub_df rows + sub_df = sub_df[1:(nrow(sub_df)/2),] + + # append new rows + training_df_reduced = rbind(training_df_reduced, sub_df) + } + + # overwrite filtered df + training_df_filtered = training_df_reduced + + message("Reducing completed!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + stop("Reducing Features failed") + }) + } + + # remove FID to not train model on FID + training_df_filtered$FID = NULL + + + tryCatch({ + message("\nSplit training Data...") + + train_row_numbers = caret::createDataPartition( + training_df_filtered$class, p=0.8, list=FALSE + ) + training_data = training_df_filtered[train_row_numbers,] + testing_data = training_df_filtered[-train_row_numbers,] + + message("Data splitting completed!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + stop("Splitting training data failed") + }) + + # build specific model given by "model_type" + if (model_type == "RF") + { + # set seed for reproducibility while model training + set.seed(1) + + # use fixed hyperparams given by the user + # (this may result in a lack of accuracy for the model) + if (!is.null(hyperparameters)) + { + + message("\nChecking hyperparameters for Random Forest...") + + if (!all(c("mtry", "ntree") %in% names(hyperparameters))) + { + message("'hyperparameters' has to contain 'mtry' and 'ntree'!") + stop("'hyperparameters' has to contain 'mtry' and 'ntree'!") + } + + message("hyperparameters for Random Forest checked!") + + tryCatch({ + message("\nTrain Model with fixed hyperparameters...") + + # no parameters are tuned + trainCtrl <- caret::trainControl(method = "none", classProbs = TRUE) + + model <- caret::train( + class ~ ., + data = training_data, + method = "rf", + trControl = trainCtrl, + # only one model is passed (fixed hyperparams are given) + tuneGrid = expand.grid(mtry = hyperparameters$mtry), + ntree = hyperparameters$ntree) + + message("Model training finished!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + stop("model training failed") + }) + + } + else + { + # else tune model hyperparameters + tryCatch({ + message("\nTrain Model with parameter tuning...") + # cross-validate training data with random-parameter search + trainCtrl <- caret::trainControl( + search = "random", + # 10-fold CV + method = "repeatedcv", + number = 10, + # repeated 10 times + repeats = 10) + + model <- caret::train( + class ~ ., + data = training_data, + method = "rf", + trControl = trainCtrl, + tuneLength = 10) + + # print model and confusion matrix, to evaluate if the model is well trained + message("\nModel Details: ") + print(model) + + predicted_test_classes = stats::predict(model, newdata = testing_data) + + message("\nConfusion Matrix based on Test Dataset: ") + print(caret::confusionMatrix(predicted_test_classes, as.factor(testing_data$class))) + + message("Model training finished!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + stop("model training failed") + }) + } + } + + # save model to user workspace + if (save_model) + { + tryCatch({ + message("\nSaving model to user workspace...") + + saveRDS(model, paste0(Session$getConfig()$workspace.path, "/", model_id, ".rds")) + + message("Saving complete!") + }, + error = function(err) + { + message("An Error occured!") + message(toString(err)) + stop("model saving failed") + }) + } + + return(model) +} + + +#' train_model +train_model <- Process$new( + id = "train_model", + description = "Train a machine learning algorithm based on the provided training data on satellite imagery gathered from a datacube. This process will convert integer class values into factors by prefixing a 'X'. ", + categories = as.array("machine-learning", "cubes"), + summary = "train machine learning model.", + parameters = list( + Parameter$new( + name = "data", + description = "A data cube with bands.", + schema = list( + type = "object", + subtype = "raster-cube" + ) + ), + Parameter$new( + name = "model_type", + description = "Type of the model to be trained. Must be one of the following types: RF.", + schema = list( + type = "string" + ), + ), + Parameter$new( + name = "labeled_polygons", + description = "String containing the GeoJSON with Polygons. These contain class labels used to train the model.", + schema = list( + type = "string", + subtype = "GeoJSON" + ) + ), + Parameter$new( + name = "hyperparameters", + description = "List of Hyperparameters used for the model. If no hyperparameters are passed, the algorithm will tune the hyperparameters by random grid search and 10-times-10-fold-crossvalidation. This may take very long!", + schema = list( + type = "list" + ), + optional = TRUE + ), + Parameter$new( + name = "save_model", + description = "Declare wether the computed model should be saved in the user workspace. Defaults to false.", + schema = list( + type = "boolean" + ), + optional = TRUE + ), + Parameter$new( + name = "model_id", + description = "Id under which the model should be stored. Defaults to NULL", + schema = list( + type = "string" + ), + optional = TRUE + ) + + ), + returns = list( + description = "The trained model.", + schema = list(type = "object", subtype = list("train", "train.formula")) + ), + operation = train_model_opp +) From d11bbbd77b18cca7d4f00bb4246fc49723e3dc6a Mon Sep 17 00:00:00 2001 From: michael_b Date: Fri, 9 Feb 2024 16:16:40 +0100 Subject: [PATCH 2/7] add support for RDS files --- R/SessionConfig-Class.R | 12 ++++ R/api.R | 144 +++++++++++++++++++++++++++++----------- 2 files changed, 118 insertions(+), 38 deletions(-) diff --git a/R/SessionConfig-Class.R b/R/SessionConfig-Class.R index 6616c0d..a16e408 100644 --- a/R/SessionConfig-Class.R +++ b/R/SessionConfig-Class.R @@ -69,6 +69,18 @@ SessionConfig = function(api.port = NULL, host = NULL, aws.ipv4 = NULL) { ) ), + RDS = list( + title = "R Data Set", + description = "Export to RDS", + gis_data_types = list("raster"), + parameters = list( + format = list( + type = "string", + description = "RDS" + ) + ) + + ), inputFormats = list( ImageCollection = list( title = "ImageCollection", diff --git a/R/api.R b/R/api.R index 46852ff..86dc34b 100644 --- a/R/api.R +++ b/R/api.R @@ -157,45 +157,113 @@ NULL .executeSynchronous = function(req, res) { - tryCatch({ - sent_job = jsonlite::fromJSON(req$rook.input$read_lines(),simplifyDataFrame = FALSE) - process_graph = sent_job$process - newJob = Job$new(process = process_graph) - - job = newJob$run() - format = job$output - - if (class(format) == "list") { - if (format$title == "Network Common Data Form") { - file = gdalcubes::write_ncdf(job$results) - } - else if (format$title == "GeoTiff") { - file = gdalcubes::write_tif(job$results) - } - else { - throwError("FormatUnsupported") - } - } - else { - if (format == "NetCDF") { - file = gdalcubes::write_ncdf(job$results) - } - else if (format == "GTiff") { - file = gdalcubes::write_tif(job$results) - } - else { - throwError("FormatUnsupported") - } - } - - first = file[1] - res$status = 200 - res$body = readBin(first, "raw", n = file.info(first)$size) - content_type = plumber:::getContentType(tools::file_ext(first)) - res$setHeader("Content-Type", content_type) + tryCatch( + { + sent_job = jsonlite::fromJSON(req$rook.input$read_lines(),simplifyDataFrame = FALSE) + process_graph = sent_job$process + newJob = Job$new(process = process_graph) + + job = newJob$run() + format = job$output + + if(job$status == "error") + { + # throw error stored in job$results + stop(job$results) + } - return(res) -},error=handleError) + # Only support naming short names of output formats do remove redundant code + # to add a new supported class, just add a new else if Statement in the desired file format + message("\nStart file saving...") + + # just one try catch were all evaluations happen + tryCatch( + { + if (format == "NetCDF") + { + # check class of the result + if ("cube" %in% class(job$results)) + { + # result is a datacube + # write datacube as NetCDF and take first file path (see gdalcubes reference: https://gdalcubes.github.io/source/reference/ref/write_ncdf.html#value) + file = gdalcubes::write_ncdf(job$results)[1] + } + else if (all(c("sf", "data.frame") %in% class(job$results))) + { + # result is a sf data.frame + file = base::tempfile() + + # save whole file + sf::st_write(job$results, file, driver = "netCDF") + } + } + else if (format == "GTiff") + { + # check class of the result + if ("cube" %in% class(job$results)) + { + # result is a datacube + # write datacube as Tiff and take first file path (see gdalcubes reference: https://gdalcubes.github.io/source/reference/ref/write_tif.html#value) + file = gdalcubes::write_tif(job$results)[1] + } + } + else if (format == "RDS") + { + file = base::tempfile() + + # check class of the result + if ("cube" %in% class(job$results)) + { + # result is a datacube + + # save proxy cube as json + base::saveRDS(gdalcubes:::as_json(job$results), file) + } + else if (all(c("train", "train.formula") %in% class(job$results))) + { + # result is a caret model + + base::saveRDS(job$results, file) + } + else if (all(c("sf", "data.frame") %in% class(job$results))) + { + # result is a sf data.frame + + base::saveRDS(job$results, file) + } + } + else + { + throwError("FormatUnsupported") + } + }, + error = function(error) + { + message('An Error Occurred.') + message(toString(error)) + stop("FormatUnsupported") + }, + warning = function(warning) { + message('A Warning Occurred') + message(toString(warning)) + }) + # handle creation of http result + + message("File saving finished!") + + message("\nPreparing HTTP result...") + + res$status = 200 + res$body = readBin(file, "raw", n = file.info(file)$size) + + content_type = plumber:::getContentType(tools::file_ext(file)) + res$setHeader("Content-Type", content_type) + + message("HTTP Result send!") + return(res) + + }, + error = handleError) } .cors_filter = function(req,res) { From fe38dc771c5a359e59c0bba44ad332b2ee1f3e2d Mon Sep 17 00:00:00 2001 From: michael_b Date: Fri, 9 Feb 2024 16:17:39 +0100 Subject: [PATCH 3/7] adjust error output --- R/Job-class.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/Job-class.R b/R/Job-class.R index 5b70594..8a9b6bf 100644 --- a/R/Job-class.R +++ b/R/Job-class.R @@ -119,7 +119,7 @@ Job <- R6Class( }, error=function (e) { self$status = "error" - self$results = NULL + self$results = e writeJobInfo(self) }, finally = { From f33f90e175dabd0a2b00974594efc640b4b50d52 Mon Sep 17 00:00:00 2001 From: michael_b Date: Fri, 9 Feb 2024 16:20:30 +0100 Subject: [PATCH 4/7] add needed dependencies --- Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 617197a..1104d79 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,7 +14,7 @@ RUN R -e "install.packages('gdalcubes')" # install other necessary packages RUN apt-get install -y libsodium-dev libudunits2-dev -RUN Rscript -e "install.packages(c('plumber', 'useful', 'ids', 'R6', 'sf', 'rstac','bfast'))" +RUN Rscript -e "install.packages(c('plumber', 'useful', 'ids', 'R6', 'sf', 'rstac','bfast', 'caret', randomForest', 'stringr'))" # create directories RUN mkdir -p /opt/dockerfiles/ && mkdir -p /var/openeo/workspace/ && mkdir -p /var/openeo/workspace/data/ @@ -26,4 +26,4 @@ RUN Rscript -e "remotes::install_local('/opt/dockerfiles',dependencies=TRUE)" # cmd or entrypoint for startup CMD ["R", "-q", "--no-save", "-f /opt/dockerfiles/startProduction.R"] -EXPOSE 8000 \ No newline at end of file +EXPOSE 8000 From 142a604515eed04a49b1b659d5e1a6ccdc1233b4 Mon Sep 17 00:00:00 2001 From: michael_b Date: Fri, 9 Feb 2024 16:21:02 +0100 Subject: [PATCH 5/7] remove unneeded auto update of dependencies --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 1104d79..f52b48b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -21,7 +21,7 @@ RUN mkdir -p /opt/dockerfiles/ && mkdir -p /var/openeo/workspace/ && mkdir -p /v # install packages from local directory COPY ./ /opt/dockerfiles/ -RUN Rscript -e "remotes::install_local('/opt/dockerfiles',dependencies=TRUE)" +RUN Rscript -e "remotes::install_local('/opt/dockerfiles',dependencies=FALSE)" # cmd or entrypoint for startup CMD ["R", "-q", "--no-save", "-f /opt/dockerfiles/startProduction.R"] From 019a2f5233858fb420f03adc507510d59f6c2c6a Mon Sep 17 00:00:00 2001 From: michael_b Date: Fri, 9 Feb 2024 16:24:07 +0100 Subject: [PATCH 6/7] add new functions to package --- DESCRIPTION | 2 ++ NAMESPACE | 1 + man/apply_prediction.Rd | 16 ++++++++++++++++ man/train_model.Rd | 16 ++++++++++++++++ 4 files changed, 35 insertions(+) create mode 100644 man/apply_prediction.Rd create mode 100644 man/train_model.Rd diff --git a/DESCRIPTION b/DESCRIPTION index 5536568..fe9a8b6 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -42,6 +42,8 @@ Collate: 'Job-class.R' 'ProcessGraph-class.R' 'SessionConfig-Class.R' + 'apply_prediction.R' 'error.R' + 'train_model.R' Depends: R (>= 4.3.0) diff --git a/NAMESPACE b/NAMESPACE index 4e3fba5..c0fdbd3 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -8,6 +8,7 @@ export(Process) export(ProcessGraph) export(SessionConfig) export(SessionInstance) +export(apply_prediction_opp) export(createSessionInstance) export(getJobIdIndex) export(is.Collection) diff --git a/man/apply_prediction.Rd b/man/apply_prediction.Rd new file mode 100644 index 0000000..5ceb220 --- /dev/null +++ b/man/apply_prediction.Rd @@ -0,0 +1,16 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/apply_prediction.R +\docType{data} +\name{apply_prediction} +\alias{apply_prediction} +\title{apply_prediction} +\format{ +An object of class \code{Process} (inherits from \code{R6}) of length 12. +} +\usage{ +apply_prediction +} +\description{ +apply_prediction +} +\keyword{datasets} diff --git a/man/train_model.Rd b/man/train_model.Rd new file mode 100644 index 0000000..9ae50e7 --- /dev/null +++ b/man/train_model.Rd @@ -0,0 +1,16 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/train_model.R +\docType{data} +\name{train_model} +\alias{train_model} +\title{train_model} +\format{ +An object of class \code{Process} (inherits from \code{R6}) of length 12. +} +\usage{ +train_model +} +\description{ +train_model +} +\keyword{datasets} From 9c6fe9f226d9e3719544e75205d93968b797a53e Mon Sep 17 00:00:00 2001 From: michael_b Date: Fri, 9 Feb 2024 17:31:05 +0100 Subject: [PATCH 7/7] fix typo --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index f52b48b..afde048 100644 --- a/Dockerfile +++ b/Dockerfile @@ -14,7 +14,7 @@ RUN R -e "install.packages('gdalcubes')" # install other necessary packages RUN apt-get install -y libsodium-dev libudunits2-dev -RUN Rscript -e "install.packages(c('plumber', 'useful', 'ids', 'R6', 'sf', 'rstac','bfast', 'caret', randomForest', 'stringr'))" +RUN Rscript -e "install.packages(c('plumber', 'useful', 'ids', 'R6', 'sf', 'rstac','bfast', 'caret', 'randomForest', 'stringr'))" # create directories RUN mkdir -p /opt/dockerfiles/ && mkdir -p /var/openeo/workspace/ && mkdir -p /var/openeo/workspace/data/