comparison ogc_api_processes_wrapper.R @ 0:92ca20fdb790 draft default tip

planemo upload for repository https://github.com/galaxyecology/tools-ecology/tree/master/tools/ogc_api_processes_wrapper commit 8b4b58222af2c77abd41dd8f17862a24ca7d3381
author ecology
date Fri, 06 Sep 2024 10:30:22 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:92ca20fdb790
1 library("httr2")
2 library("jsonlite")
3 library("getopt")
4
5 cat("start generic wrapper service \n")
6
7 remove_null_values <- function(x) {
8 # Check if the input is a list
9 if (is.list(x)) {
10 # Remove NULL values and apply the function recursively to sublists
11 x <- lapply(x, remove_null_values)
12 x <- x[!sapply(x, is.null)]
13 }
14 return(x)
15 }
16
17 getParameters <- function() {
18 con <- file("inputs.json", "r")
19 lines <- readLines(con)
20 close(con)
21
22 json_string <- paste(lines, collapse = "\n")
23 json_data <- fromJSON(json_string)
24
25 # Remove NULL values from json_data
26 cleaned_json_data <- remove_null_values(json_data)
27 return(cleaned_json_data$conditional_process)
28 }
29
30 parseResponseBody <- function(body) {
31 hex <- c(body)
32 intValues <- as.integer(hex)
33 rawVector <- as.raw(intValues)
34 readableOutput <- rawToChar(rawVector)
35 jsonObject <- jsonlite::fromJSON(readableOutput)
36 return(jsonObject)
37 }
38
39 getOutputs <- function(inputs, output, server) {
40 url <-
41 paste(paste(server, "/processes/", sep = ""),
42 inputs$select_process,
43 sep = "")
44 request <- request(url)
45 response <- req_perform(request)
46 responseBody <- parseResponseBody(response$body)
47 outputs <- list()
48
49 for (x in 1:length(responseBody$outputs)) {
50 outputformatName <-
51 paste(names(responseBody$outputs[x]), "_outformat", sep = "")
52 output_item <- list()
53
54 for (p in names(inputs)) {
55 if (p == outputformatName) {
56 format <- list("mediaType" = inputs[[outputformatName]])
57 output_item$format <- format
58 }
59 }
60 output_item$transmissionMode <- "reference"
61 outputs[[x]] <- output_item
62 }
63
64 names(outputs) <- names(responseBody$outputs)
65 return(outputs)
66 }
67
68 executeProcess <- function(url, process, requestBodyData, cookie) {
69 url <-
70 paste(paste(paste(url, "processes/", sep = ""), process, sep = ""), "/execution", sep = "")
71 requestBodyData$inputs$cookie <- NULL
72 requestBodyData$inputs$select_process <- NULL
73
74 requestBodyData$inputs$s3_access_key <-
75 requestBodyData$inputs$user_credentials$s3_access_key
76 requestBodyData$inputs$s3_secret_key <-
77 requestBodyData$inputs$user_credentials$s3_secret_key
78 requestBodyData$inputs$user_credentials <- NULL
79 if (process == "plot-image") {
80 tmp <- requestBodyData$inputs$color_scale
81 color_scale <- gsub("__ob__", "[", tmp)
82 color_scale <- gsub("__cb__", "]", color_scale)
83 requestBodyData$inputs$color_scale <- color_scale
84 #print(requestBodyData$inputs$color_scale)
85 }
86 if (process == "calculate-band") {
87 requestBodyData$inputs$name <- "output"
88 }
89 if (process == "reproject-image") {
90 requestBodyData$inputs$output_name <- "output"
91 }
92 #requestBodyData$inputs$input_image$href <- "https://hirondelle.crim.ca/wpsoutputs/weaver/public/test-data/S2A_MSIL2A_20190701T110621_N0500_R137_T29SPC_20230604T023542_turbidity.tiff"
93
94 body <- list()
95 body$inputs <- requestBodyData$inputs
96 #print(body$inputs)
97 body$mode <- "async"
98 body$response <- "document"
99 #print(body$inputs)
100
101 response <- request(url) %>%
102 req_headers("Accept" = "application/json",
103 "Content-Type" = "application/json",
104 "Cookie" = cookie) %>%
105 req_body_json(body) %>%
106 req_perform()
107
108 cat("\n Process executed")
109 cat("\n status: ", response$status_code)
110 cat("\n jobID: ", parseResponseBody(response$body)$jobID, "\n")
111
112 jobID <- parseResponseBody(response$body)$jobID
113
114 return(jobID)
115 }
116
117 checkJobStatus <- function(server, process, jobID, cookie) {
118 url <- paste0(server, "processes/", process, "/jobs/", jobID)
119 response <- request(url) %>%
120 req_headers("Cookie" = cookie) %>%
121 req_perform()
122 jobStatus <- parseResponseBody(response$body)$status
123 jobProgress <- parseResponseBody(response$body)$progress
124 return(jobStatus)
125 }
126
127 getStatusCode <- function(server, process, jobID, cookie) {
128 url <- paste0(server, "processes/", process, "/jobs/", jobID)
129 response <- request(url) %>%
130 req_headers("Cookie" = cookie) %>%
131 req_perform()
132 status_code <- response$status_code
133 return(status_code)
134 }
135
136 getResult <- function (server, process, jobID, cookie) {
137 response <-
138 request(paste0(server, "processes/", process, "/jobs/", jobID, "/results")) %>%
139 req_headers("Cookie" = cookie) %>%
140 req_perform()
141 return(response)
142 }
143
144 retrieveResults <-
145 function(server, process, jobID, outputData, cookie) {
146 status_code <- getStatusCode(server, process, jobID, cookie)
147 if (status_code == 200) {
148 status <- "running"
149 while (status == "running") {
150 jobStatus <- checkJobStatus(server, process, jobID, cookie)
151 print(jobStatus)
152 if (jobStatus == "succeeded") {
153 status <- jobStatus
154 result <- getResult(server, process, jobID, cookie)
155 if (result$status_code == 200) {
156 resultBody <- parseResponseBody(result$body)
157 #print(resultBody)
158 if (process == "select-products-sentinel2") {
159 urls <- unname(unlist(lapply(resultBody, function(x)
160 x$value)))
161 } else if (process == "download-band-sentinel2-product-safe" ||
162 process == "calculate-band" ||
163 process == "plot-image" || process == "reproject-image") {
164 urls <- unname(unlist(lapply(resultBody, function(x)
165 x$href)))
166 }
167 urls_with_newline <- paste(urls, collapse = "\n")
168 con <- file(outputData, "w")
169 writeLines(urls_with_newline, con = con)
170 close(con)
171 }
172 } else if (jobStatus == "failed") {
173 status <- jobStatus
174 }
175 Sys.sleep(3)
176 }
177 cat("\n done \n")
178 } else if (status_code1 == 400) {
179 print("A query parameter has an invalid value.")
180 } else if (status_code1 == 404) {
181 print("The requested URI was not found.")
182 } else if (status_code1 == 500) {
183 print("The requested URI was not found.")
184 } else {
185 print(paste("HTTP", status_code1, "Error:", resp1$status_message))
186 }
187 }
188
189 is_url <- function(x) {
190 grepl("^https?://", x)
191 }
192
193 server <- "https://hirondelle.crim.ca/weaver/"
194
195 print("--> Retrieve parameters")
196 inputParameters <- getParameters()
197 #print(inputParameters)
198 print("--> Parameters retrieved")
199
200 args <- commandArgs(trailingOnly = TRUE)
201 outputLocation <- args[2]
202
203 print("--> Retrieve outputs")
204 outputs <- getOutputs(inputParameters, outputLocation, server)
205 print("--> Outputs retrieved")
206
207 print("--> Parse inputs")
208 convertedKeys <- c()
209 for (key in names(inputParameters)) {
210 if (is.character(inputParameters[[key]]) &&
211 (endsWith(inputParameters[[key]], ".dat") ||
212 endsWith(inputParameters[[key]], ".txt"))) {
213 con <- file(inputParameters[[key]], "r")
214 url_list <- list()
215 #while (length(line <- readLines(con, n = 1)) > 0) {
216 # if (is_url(line)) {
217 # url_list <- c(url_list, list(list(href = trimws(line))))
218 # }
219 #}
220 con <- file(inputParameters[[key]], "r")
221 lines <- readLines(con)
222 print("--------------------------------------------------------------------1")
223 print(length(lines))
224 close(con)
225 if (!length(lines) > 1 && endsWith(lines, ".jp2") && startsWith(lines, "https")) {
226 print("--------------------------------------------------------------------2")
227 tmp <- list()
228 tmp$href <- lines
229 tmp$type <- "image/jp2"
230 inputParameters[[key]] <- tmp
231 }
232 else if (!length(lines) > 1 && endsWith(lines, ".SAFE") && startsWith(lines, "s3:")) {
233 print("--------------------------------------------------------------------3")
234 json_string <- paste(lines, collapse = "\n")
235 inputParameters[[key]] <- json_string
236 } else if (inputParameters$select_process == "plot-image" ||
237 inputParameters$select_process == "reproject-image") {
238 print("--------------------------------------------------------------------4")
239 tmp <- list()
240 tmp$href <- lines
241 tmp$type <- "image/tiff; application=geotiff"
242 if (inputParameters$select_process == "reproject-image") {
243 tmp$type <- "image/tiff; subtype=geotiff"
244 }
245 inputParameters[[key]] <- tmp
246 } else {
247 print("-----------------------------------5")
248 json_string <- paste(lines, collapse = "\n")
249 json_data <- fromJSON(json_string)
250 inputParameters[[key]] <- json_data
251 }
252 convertedKeys <- append(convertedKeys, key)
253 }
254 else if (grepl("_Array_", key)) {
255 keyParts <- strsplit(key, split = "_")[[1]]
256 type <- keyParts[length(keyParts)]
257 values <- inputParameters[[key]]
258 value_list <- strsplit(values, split = ",")
259 convertedValues <- c()
260
261 for (value in value_list) {
262 if (type == "integer") {
263 value <- as.integer(value)
264 } else if (type == "numeric") {
265 value <- as.numeric(balue)
266 } else if (type == "character") {
267 value <- as.character(value)
268 }
269 convertedValues <- append(convertedValues, value)
270
271 convertedKey <- ""
272 for (part in keyParts) {
273 if (part == "Array") {
274 break
275 }
276 convertedKey <-
277 paste(convertedKey, paste(part, "_", sep = ""), sep = "")
278 }
279 convertedKey <- substr(convertedKey, 1, nchar(convertedKey) - 1)
280 }
281
282 inputParameters[[key]] <- convertedValues
283 #print("-------------------------")
284 #print(convertedValues)
285 #print("-------------------------")
286 convertedKeys <- append(convertedKeys, convertedKey)
287 } else {
288 #print("-------------------------")
289 #print(key)
290 #print(inputParameters[[key]])
291 if (!is.null(inputParameters[[key]])) {
292 convertedKeys <- append(convertedKeys, key)
293 }
294 #print("-------------------------")
295
296 }
297 }
298 #print(inputParameters)
299 names(inputParameters) <- convertedKeys
300 #print(inputParameters)
301 print("--> Inputs parsed")
302
303 print("--> Prepare process execution")
304 jsonData <- list("inputs" = inputParameters,
305 "outputs" = outputs)
306
307 cookie <- inputParameters$cookie
308
309 print("--> Execute process")
310 jobID <-
311 executeProcess(server, inputParameters$select_process, jsonData, cookie)
312 print("--> Process executed")
313
314 print("--> Retrieve results")
315 retrieveResults(server,
316 inputParameters$select_process,
317 jobID,
318 outputLocation,
319 cookie)
320 print("--> Results retrieved")