Mercurial > repos > ecology > hirondelle_crim
comparison ogc_api_processes_wrapper.R @ 0:92ca20fdb790 draft default tip
planemo upload for repository https://github.com/galaxyecology/tools-ecology/tree/master/tools/ogc_api_processes_wrapper commit 8b4b58222af2c77abd41dd8f17862a24ca7d3381
| author | ecology |
|---|---|
| date | Fri, 06 Sep 2024 10:30:22 +0000 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:92ca20fdb790 |
|---|---|
| 1 library("httr2") | |
| 2 library("jsonlite") | |
| 3 library("getopt") | |
| 4 | |
| 5 cat("start generic wrapper service \n") | |
| 6 | |
| 7 remove_null_values <- function(x) { | |
| 8 # Check if the input is a list | |
| 9 if (is.list(x)) { | |
| 10 # Remove NULL values and apply the function recursively to sublists | |
| 11 x <- lapply(x, remove_null_values) | |
| 12 x <- x[!sapply(x, is.null)] | |
| 13 } | |
| 14 return(x) | |
| 15 } | |
| 16 | |
| 17 getParameters <- function() { | |
| 18 con <- file("inputs.json", "r") | |
| 19 lines <- readLines(con) | |
| 20 close(con) | |
| 21 | |
| 22 json_string <- paste(lines, collapse = "\n") | |
| 23 json_data <- fromJSON(json_string) | |
| 24 | |
| 25 # Remove NULL values from json_data | |
| 26 cleaned_json_data <- remove_null_values(json_data) | |
| 27 return(cleaned_json_data$conditional_process) | |
| 28 } | |
| 29 | |
| 30 parseResponseBody <- function(body) { | |
| 31 hex <- c(body) | |
| 32 intValues <- as.integer(hex) | |
| 33 rawVector <- as.raw(intValues) | |
| 34 readableOutput <- rawToChar(rawVector) | |
| 35 jsonObject <- jsonlite::fromJSON(readableOutput) | |
| 36 return(jsonObject) | |
| 37 } | |
| 38 | |
| 39 getOutputs <- function(inputs, output, server) { | |
| 40 url <- | |
| 41 paste(paste(server, "/processes/", sep = ""), | |
| 42 inputs$select_process, | |
| 43 sep = "") | |
| 44 request <- request(url) | |
| 45 response <- req_perform(request) | |
| 46 responseBody <- parseResponseBody(response$body) | |
| 47 outputs <- list() | |
| 48 | |
| 49 for (x in 1:length(responseBody$outputs)) { | |
| 50 outputformatName <- | |
| 51 paste(names(responseBody$outputs[x]), "_outformat", sep = "") | |
| 52 output_item <- list() | |
| 53 | |
| 54 for (p in names(inputs)) { | |
| 55 if (p == outputformatName) { | |
| 56 format <- list("mediaType" = inputs[[outputformatName]]) | |
| 57 output_item$format <- format | |
| 58 } | |
| 59 } | |
| 60 output_item$transmissionMode <- "reference" | |
| 61 outputs[[x]] <- output_item | |
| 62 } | |
| 63 | |
| 64 names(outputs) <- names(responseBody$outputs) | |
| 65 return(outputs) | |
| 66 } | |
| 67 | |
| 68 executeProcess <- function(url, process, requestBodyData, cookie) { | |
| 69 url <- | |
| 70 paste(paste(paste(url, "processes/", sep = ""), process, sep = ""), "/execution", sep = "") | |
| 71 requestBodyData$inputs$cookie <- NULL | |
| 72 requestBodyData$inputs$select_process <- NULL | |
| 73 | |
| 74 requestBodyData$inputs$s3_access_key <- | |
| 75 requestBodyData$inputs$user_credentials$s3_access_key | |
| 76 requestBodyData$inputs$s3_secret_key <- | |
| 77 requestBodyData$inputs$user_credentials$s3_secret_key | |
| 78 requestBodyData$inputs$user_credentials <- NULL | |
| 79 if (process == "plot-image") { | |
| 80 tmp <- requestBodyData$inputs$color_scale | |
| 81 color_scale <- gsub("__ob__", "[", tmp) | |
| 82 color_scale <- gsub("__cb__", "]", color_scale) | |
| 83 requestBodyData$inputs$color_scale <- color_scale | |
| 84 #print(requestBodyData$inputs$color_scale) | |
| 85 } | |
| 86 if (process == "calculate-band") { | |
| 87 requestBodyData$inputs$name <- "output" | |
| 88 } | |
| 89 if (process == "reproject-image") { | |
| 90 requestBodyData$inputs$output_name <- "output" | |
| 91 } | |
| 92 #requestBodyData$inputs$input_image$href <- "https://hirondelle.crim.ca/wpsoutputs/weaver/public/test-data/S2A_MSIL2A_20190701T110621_N0500_R137_T29SPC_20230604T023542_turbidity.tiff" | |
| 93 | |
| 94 body <- list() | |
| 95 body$inputs <- requestBodyData$inputs | |
| 96 #print(body$inputs) | |
| 97 body$mode <- "async" | |
| 98 body$response <- "document" | |
| 99 #print(body$inputs) | |
| 100 | |
| 101 response <- request(url) %>% | |
| 102 req_headers("Accept" = "application/json", | |
| 103 "Content-Type" = "application/json", | |
| 104 "Cookie" = cookie) %>% | |
| 105 req_body_json(body) %>% | |
| 106 req_perform() | |
| 107 | |
| 108 cat("\n Process executed") | |
| 109 cat("\n status: ", response$status_code) | |
| 110 cat("\n jobID: ", parseResponseBody(response$body)$jobID, "\n") | |
| 111 | |
| 112 jobID <- parseResponseBody(response$body)$jobID | |
| 113 | |
| 114 return(jobID) | |
| 115 } | |
| 116 | |
| 117 checkJobStatus <- function(server, process, jobID, cookie) { | |
| 118 url <- paste0(server, "processes/", process, "/jobs/", jobID) | |
| 119 response <- request(url) %>% | |
| 120 req_headers("Cookie" = cookie) %>% | |
| 121 req_perform() | |
| 122 jobStatus <- parseResponseBody(response$body)$status | |
| 123 jobProgress <- parseResponseBody(response$body)$progress | |
| 124 return(jobStatus) | |
| 125 } | |
| 126 | |
| 127 getStatusCode <- function(server, process, jobID, cookie) { | |
| 128 url <- paste0(server, "processes/", process, "/jobs/", jobID) | |
| 129 response <- request(url) %>% | |
| 130 req_headers("Cookie" = cookie) %>% | |
| 131 req_perform() | |
| 132 status_code <- response$status_code | |
| 133 return(status_code) | |
| 134 } | |
| 135 | |
| 136 getResult <- function (server, process, jobID, cookie) { | |
| 137 response <- | |
| 138 request(paste0(server, "processes/", process, "/jobs/", jobID, "/results")) %>% | |
| 139 req_headers("Cookie" = cookie) %>% | |
| 140 req_perform() | |
| 141 return(response) | |
| 142 } | |
| 143 | |
| 144 retrieveResults <- | |
| 145 function(server, process, jobID, outputData, cookie) { | |
| 146 status_code <- getStatusCode(server, process, jobID, cookie) | |
| 147 if (status_code == 200) { | |
| 148 status <- "running" | |
| 149 while (status == "running") { | |
| 150 jobStatus <- checkJobStatus(server, process, jobID, cookie) | |
| 151 print(jobStatus) | |
| 152 if (jobStatus == "succeeded") { | |
| 153 status <- jobStatus | |
| 154 result <- getResult(server, process, jobID, cookie) | |
| 155 if (result$status_code == 200) { | |
| 156 resultBody <- parseResponseBody(result$body) | |
| 157 #print(resultBody) | |
| 158 if (process == "select-products-sentinel2") { | |
| 159 urls <- unname(unlist(lapply(resultBody, function(x) | |
| 160 x$value))) | |
| 161 } else if (process == "download-band-sentinel2-product-safe" || | |
| 162 process == "calculate-band" || | |
| 163 process == "plot-image" || process == "reproject-image") { | |
| 164 urls <- unname(unlist(lapply(resultBody, function(x) | |
| 165 x$href))) | |
| 166 } | |
| 167 urls_with_newline <- paste(urls, collapse = "\n") | |
| 168 con <- file(outputData, "w") | |
| 169 writeLines(urls_with_newline, con = con) | |
| 170 close(con) | |
| 171 } | |
| 172 } else if (jobStatus == "failed") { | |
| 173 status <- jobStatus | |
| 174 } | |
| 175 Sys.sleep(3) | |
| 176 } | |
| 177 cat("\n done \n") | |
| 178 } else if (status_code1 == 400) { | |
| 179 print("A query parameter has an invalid value.") | |
| 180 } else if (status_code1 == 404) { | |
| 181 print("The requested URI was not found.") | |
| 182 } else if (status_code1 == 500) { | |
| 183 print("The requested URI was not found.") | |
| 184 } else { | |
| 185 print(paste("HTTP", status_code1, "Error:", resp1$status_message)) | |
| 186 } | |
| 187 } | |
| 188 | |
| 189 is_url <- function(x) { | |
| 190 grepl("^https?://", x) | |
| 191 } | |
| 192 | |
| 193 server <- "https://hirondelle.crim.ca/weaver/" | |
| 194 | |
| 195 print("--> Retrieve parameters") | |
| 196 inputParameters <- getParameters() | |
| 197 #print(inputParameters) | |
| 198 print("--> Parameters retrieved") | |
| 199 | |
| 200 args <- commandArgs(trailingOnly = TRUE) | |
| 201 outputLocation <- args[2] | |
| 202 | |
| 203 print("--> Retrieve outputs") | |
| 204 outputs <- getOutputs(inputParameters, outputLocation, server) | |
| 205 print("--> Outputs retrieved") | |
| 206 | |
| 207 print("--> Parse inputs") | |
| 208 convertedKeys <- c() | |
| 209 for (key in names(inputParameters)) { | |
| 210 if (is.character(inputParameters[[key]]) && | |
| 211 (endsWith(inputParameters[[key]], ".dat") || | |
| 212 endsWith(inputParameters[[key]], ".txt"))) { | |
| 213 con <- file(inputParameters[[key]], "r") | |
| 214 url_list <- list() | |
| 215 #while (length(line <- readLines(con, n = 1)) > 0) { | |
| 216 # if (is_url(line)) { | |
| 217 # url_list <- c(url_list, list(list(href = trimws(line)))) | |
| 218 # } | |
| 219 #} | |
| 220 con <- file(inputParameters[[key]], "r") | |
| 221 lines <- readLines(con) | |
| 222 print("--------------------------------------------------------------------1") | |
| 223 print(length(lines)) | |
| 224 close(con) | |
| 225 if (!length(lines) > 1 && endsWith(lines, ".jp2") && startsWith(lines, "https")) { | |
| 226 print("--------------------------------------------------------------------2") | |
| 227 tmp <- list() | |
| 228 tmp$href <- lines | |
| 229 tmp$type <- "image/jp2" | |
| 230 inputParameters[[key]] <- tmp | |
| 231 } | |
| 232 else if (!length(lines) > 1 && endsWith(lines, ".SAFE") && startsWith(lines, "s3:")) { | |
| 233 print("--------------------------------------------------------------------3") | |
| 234 json_string <- paste(lines, collapse = "\n") | |
| 235 inputParameters[[key]] <- json_string | |
| 236 } else if (inputParameters$select_process == "plot-image" || | |
| 237 inputParameters$select_process == "reproject-image") { | |
| 238 print("--------------------------------------------------------------------4") | |
| 239 tmp <- list() | |
| 240 tmp$href <- lines | |
| 241 tmp$type <- "image/tiff; application=geotiff" | |
| 242 if (inputParameters$select_process == "reproject-image") { | |
| 243 tmp$type <- "image/tiff; subtype=geotiff" | |
| 244 } | |
| 245 inputParameters[[key]] <- tmp | |
| 246 } else { | |
| 247 print("-----------------------------------5") | |
| 248 json_string <- paste(lines, collapse = "\n") | |
| 249 json_data <- fromJSON(json_string) | |
| 250 inputParameters[[key]] <- json_data | |
| 251 } | |
| 252 convertedKeys <- append(convertedKeys, key) | |
| 253 } | |
| 254 else if (grepl("_Array_", key)) { | |
| 255 keyParts <- strsplit(key, split = "_")[[1]] | |
| 256 type <- keyParts[length(keyParts)] | |
| 257 values <- inputParameters[[key]] | |
| 258 value_list <- strsplit(values, split = ",") | |
| 259 convertedValues <- c() | |
| 260 | |
| 261 for (value in value_list) { | |
| 262 if (type == "integer") { | |
| 263 value <- as.integer(value) | |
| 264 } else if (type == "numeric") { | |
| 265 value <- as.numeric(balue) | |
| 266 } else if (type == "character") { | |
| 267 value <- as.character(value) | |
| 268 } | |
| 269 convertedValues <- append(convertedValues, value) | |
| 270 | |
| 271 convertedKey <- "" | |
| 272 for (part in keyParts) { | |
| 273 if (part == "Array") { | |
| 274 break | |
| 275 } | |
| 276 convertedKey <- | |
| 277 paste(convertedKey, paste(part, "_", sep = ""), sep = "") | |
| 278 } | |
| 279 convertedKey <- substr(convertedKey, 1, nchar(convertedKey) - 1) | |
| 280 } | |
| 281 | |
| 282 inputParameters[[key]] <- convertedValues | |
| 283 #print("-------------------------") | |
| 284 #print(convertedValues) | |
| 285 #print("-------------------------") | |
| 286 convertedKeys <- append(convertedKeys, convertedKey) | |
| 287 } else { | |
| 288 #print("-------------------------") | |
| 289 #print(key) | |
| 290 #print(inputParameters[[key]]) | |
| 291 if (!is.null(inputParameters[[key]])) { | |
| 292 convertedKeys <- append(convertedKeys, key) | |
| 293 } | |
| 294 #print("-------------------------") | |
| 295 | |
| 296 } | |
| 297 } | |
| 298 #print(inputParameters) | |
| 299 names(inputParameters) <- convertedKeys | |
| 300 #print(inputParameters) | |
| 301 print("--> Inputs parsed") | |
| 302 | |
| 303 print("--> Prepare process execution") | |
| 304 jsonData <- list("inputs" = inputParameters, | |
| 305 "outputs" = outputs) | |
| 306 | |
| 307 cookie <- inputParameters$cookie | |
| 308 | |
| 309 print("--> Execute process") | |
| 310 jobID <- | |
| 311 executeProcess(server, inputParameters$select_process, jsonData, cookie) | |
| 312 print("--> Process executed") | |
| 313 | |
| 314 print("--> Retrieve results") | |
| 315 retrieveResults(server, | |
| 316 inputParameters$select_process, | |
| 317 jobID, | |
| 318 outputLocation, | |
| 319 cookie) | |
| 320 print("--> Results retrieved") |
