Mercurial > repos > ecology > hirondelle_crim
comparison ogc_api_processes_wrapper.R @ 0:92ca20fdb790 draft default tip
planemo upload for repository https://github.com/galaxyecology/tools-ecology/tree/master/tools/ogc_api_processes_wrapper commit 8b4b58222af2c77abd41dd8f17862a24ca7d3381
author | ecology |
---|---|
date | Fri, 06 Sep 2024 10:30:22 +0000 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
-1:000000000000 | 0:92ca20fdb790 |
---|---|
1 library("httr2") | |
2 library("jsonlite") | |
3 library("getopt") | |
4 | |
5 cat("start generic wrapper service \n") | |
6 | |
7 remove_null_values <- function(x) { | |
8 # Check if the input is a list | |
9 if (is.list(x)) { | |
10 # Remove NULL values and apply the function recursively to sublists | |
11 x <- lapply(x, remove_null_values) | |
12 x <- x[!sapply(x, is.null)] | |
13 } | |
14 return(x) | |
15 } | |
16 | |
17 getParameters <- function() { | |
18 con <- file("inputs.json", "r") | |
19 lines <- readLines(con) | |
20 close(con) | |
21 | |
22 json_string <- paste(lines, collapse = "\n") | |
23 json_data <- fromJSON(json_string) | |
24 | |
25 # Remove NULL values from json_data | |
26 cleaned_json_data <- remove_null_values(json_data) | |
27 return(cleaned_json_data$conditional_process) | |
28 } | |
29 | |
30 parseResponseBody <- function(body) { | |
31 hex <- c(body) | |
32 intValues <- as.integer(hex) | |
33 rawVector <- as.raw(intValues) | |
34 readableOutput <- rawToChar(rawVector) | |
35 jsonObject <- jsonlite::fromJSON(readableOutput) | |
36 return(jsonObject) | |
37 } | |
38 | |
39 getOutputs <- function(inputs, output, server) { | |
40 url <- | |
41 paste(paste(server, "/processes/", sep = ""), | |
42 inputs$select_process, | |
43 sep = "") | |
44 request <- request(url) | |
45 response <- req_perform(request) | |
46 responseBody <- parseResponseBody(response$body) | |
47 outputs <- list() | |
48 | |
49 for (x in 1:length(responseBody$outputs)) { | |
50 outputformatName <- | |
51 paste(names(responseBody$outputs[x]), "_outformat", sep = "") | |
52 output_item <- list() | |
53 | |
54 for (p in names(inputs)) { | |
55 if (p == outputformatName) { | |
56 format <- list("mediaType" = inputs[[outputformatName]]) | |
57 output_item$format <- format | |
58 } | |
59 } | |
60 output_item$transmissionMode <- "reference" | |
61 outputs[[x]] <- output_item | |
62 } | |
63 | |
64 names(outputs) <- names(responseBody$outputs) | |
65 return(outputs) | |
66 } | |
67 | |
68 executeProcess <- function(url, process, requestBodyData, cookie) { | |
69 url <- | |
70 paste(paste(paste(url, "processes/", sep = ""), process, sep = ""), "/execution", sep = "") | |
71 requestBodyData$inputs$cookie <- NULL | |
72 requestBodyData$inputs$select_process <- NULL | |
73 | |
74 requestBodyData$inputs$s3_access_key <- | |
75 requestBodyData$inputs$user_credentials$s3_access_key | |
76 requestBodyData$inputs$s3_secret_key <- | |
77 requestBodyData$inputs$user_credentials$s3_secret_key | |
78 requestBodyData$inputs$user_credentials <- NULL | |
79 if (process == "plot-image") { | |
80 tmp <- requestBodyData$inputs$color_scale | |
81 color_scale <- gsub("__ob__", "[", tmp) | |
82 color_scale <- gsub("__cb__", "]", color_scale) | |
83 requestBodyData$inputs$color_scale <- color_scale | |
84 #print(requestBodyData$inputs$color_scale) | |
85 } | |
86 if (process == "calculate-band") { | |
87 requestBodyData$inputs$name <- "output" | |
88 } | |
89 if (process == "reproject-image") { | |
90 requestBodyData$inputs$output_name <- "output" | |
91 } | |
92 #requestBodyData$inputs$input_image$href <- "https://hirondelle.crim.ca/wpsoutputs/weaver/public/test-data/S2A_MSIL2A_20190701T110621_N0500_R137_T29SPC_20230604T023542_turbidity.tiff" | |
93 | |
94 body <- list() | |
95 body$inputs <- requestBodyData$inputs | |
96 #print(body$inputs) | |
97 body$mode <- "async" | |
98 body$response <- "document" | |
99 #print(body$inputs) | |
100 | |
101 response <- request(url) %>% | |
102 req_headers("Accept" = "application/json", | |
103 "Content-Type" = "application/json", | |
104 "Cookie" = cookie) %>% | |
105 req_body_json(body) %>% | |
106 req_perform() | |
107 | |
108 cat("\n Process executed") | |
109 cat("\n status: ", response$status_code) | |
110 cat("\n jobID: ", parseResponseBody(response$body)$jobID, "\n") | |
111 | |
112 jobID <- parseResponseBody(response$body)$jobID | |
113 | |
114 return(jobID) | |
115 } | |
116 | |
117 checkJobStatus <- function(server, process, jobID, cookie) { | |
118 url <- paste0(server, "processes/", process, "/jobs/", jobID) | |
119 response <- request(url) %>% | |
120 req_headers("Cookie" = cookie) %>% | |
121 req_perform() | |
122 jobStatus <- parseResponseBody(response$body)$status | |
123 jobProgress <- parseResponseBody(response$body)$progress | |
124 return(jobStatus) | |
125 } | |
126 | |
127 getStatusCode <- function(server, process, jobID, cookie) { | |
128 url <- paste0(server, "processes/", process, "/jobs/", jobID) | |
129 response <- request(url) %>% | |
130 req_headers("Cookie" = cookie) %>% | |
131 req_perform() | |
132 status_code <- response$status_code | |
133 return(status_code) | |
134 } | |
135 | |
136 getResult <- function (server, process, jobID, cookie) { | |
137 response <- | |
138 request(paste0(server, "processes/", process, "/jobs/", jobID, "/results")) %>% | |
139 req_headers("Cookie" = cookie) %>% | |
140 req_perform() | |
141 return(response) | |
142 } | |
143 | |
144 retrieveResults <- | |
145 function(server, process, jobID, outputData, cookie) { | |
146 status_code <- getStatusCode(server, process, jobID, cookie) | |
147 if (status_code == 200) { | |
148 status <- "running" | |
149 while (status == "running") { | |
150 jobStatus <- checkJobStatus(server, process, jobID, cookie) | |
151 print(jobStatus) | |
152 if (jobStatus == "succeeded") { | |
153 status <- jobStatus | |
154 result <- getResult(server, process, jobID, cookie) | |
155 if (result$status_code == 200) { | |
156 resultBody <- parseResponseBody(result$body) | |
157 #print(resultBody) | |
158 if (process == "select-products-sentinel2") { | |
159 urls <- unname(unlist(lapply(resultBody, function(x) | |
160 x$value))) | |
161 } else if (process == "download-band-sentinel2-product-safe" || | |
162 process == "calculate-band" || | |
163 process == "plot-image" || process == "reproject-image") { | |
164 urls <- unname(unlist(lapply(resultBody, function(x) | |
165 x$href))) | |
166 } | |
167 urls_with_newline <- paste(urls, collapse = "\n") | |
168 con <- file(outputData, "w") | |
169 writeLines(urls_with_newline, con = con) | |
170 close(con) | |
171 } | |
172 } else if (jobStatus == "failed") { | |
173 status <- jobStatus | |
174 } | |
175 Sys.sleep(3) | |
176 } | |
177 cat("\n done \n") | |
178 } else if (status_code1 == 400) { | |
179 print("A query parameter has an invalid value.") | |
180 } else if (status_code1 == 404) { | |
181 print("The requested URI was not found.") | |
182 } else if (status_code1 == 500) { | |
183 print("The requested URI was not found.") | |
184 } else { | |
185 print(paste("HTTP", status_code1, "Error:", resp1$status_message)) | |
186 } | |
187 } | |
188 | |
189 is_url <- function(x) { | |
190 grepl("^https?://", x) | |
191 } | |
192 | |
193 server <- "https://hirondelle.crim.ca/weaver/" | |
194 | |
195 print("--> Retrieve parameters") | |
196 inputParameters <- getParameters() | |
197 #print(inputParameters) | |
198 print("--> Parameters retrieved") | |
199 | |
200 args <- commandArgs(trailingOnly = TRUE) | |
201 outputLocation <- args[2] | |
202 | |
203 print("--> Retrieve outputs") | |
204 outputs <- getOutputs(inputParameters, outputLocation, server) | |
205 print("--> Outputs retrieved") | |
206 | |
207 print("--> Parse inputs") | |
208 convertedKeys <- c() | |
209 for (key in names(inputParameters)) { | |
210 if (is.character(inputParameters[[key]]) && | |
211 (endsWith(inputParameters[[key]], ".dat") || | |
212 endsWith(inputParameters[[key]], ".txt"))) { | |
213 con <- file(inputParameters[[key]], "r") | |
214 url_list <- list() | |
215 #while (length(line <- readLines(con, n = 1)) > 0) { | |
216 # if (is_url(line)) { | |
217 # url_list <- c(url_list, list(list(href = trimws(line)))) | |
218 # } | |
219 #} | |
220 con <- file(inputParameters[[key]], "r") | |
221 lines <- readLines(con) | |
222 print("--------------------------------------------------------------------1") | |
223 print(length(lines)) | |
224 close(con) | |
225 if (!length(lines) > 1 && endsWith(lines, ".jp2") && startsWith(lines, "https")) { | |
226 print("--------------------------------------------------------------------2") | |
227 tmp <- list() | |
228 tmp$href <- lines | |
229 tmp$type <- "image/jp2" | |
230 inputParameters[[key]] <- tmp | |
231 } | |
232 else if (!length(lines) > 1 && endsWith(lines, ".SAFE") && startsWith(lines, "s3:")) { | |
233 print("--------------------------------------------------------------------3") | |
234 json_string <- paste(lines, collapse = "\n") | |
235 inputParameters[[key]] <- json_string | |
236 } else if (inputParameters$select_process == "plot-image" || | |
237 inputParameters$select_process == "reproject-image") { | |
238 print("--------------------------------------------------------------------4") | |
239 tmp <- list() | |
240 tmp$href <- lines | |
241 tmp$type <- "image/tiff; application=geotiff" | |
242 if (inputParameters$select_process == "reproject-image") { | |
243 tmp$type <- "image/tiff; subtype=geotiff" | |
244 } | |
245 inputParameters[[key]] <- tmp | |
246 } else { | |
247 print("-----------------------------------5") | |
248 json_string <- paste(lines, collapse = "\n") | |
249 json_data <- fromJSON(json_string) | |
250 inputParameters[[key]] <- json_data | |
251 } | |
252 convertedKeys <- append(convertedKeys, key) | |
253 } | |
254 else if (grepl("_Array_", key)) { | |
255 keyParts <- strsplit(key, split = "_")[[1]] | |
256 type <- keyParts[length(keyParts)] | |
257 values <- inputParameters[[key]] | |
258 value_list <- strsplit(values, split = ",") | |
259 convertedValues <- c() | |
260 | |
261 for (value in value_list) { | |
262 if (type == "integer") { | |
263 value <- as.integer(value) | |
264 } else if (type == "numeric") { | |
265 value <- as.numeric(balue) | |
266 } else if (type == "character") { | |
267 value <- as.character(value) | |
268 } | |
269 convertedValues <- append(convertedValues, value) | |
270 | |
271 convertedKey <- "" | |
272 for (part in keyParts) { | |
273 if (part == "Array") { | |
274 break | |
275 } | |
276 convertedKey <- | |
277 paste(convertedKey, paste(part, "_", sep = ""), sep = "") | |
278 } | |
279 convertedKey <- substr(convertedKey, 1, nchar(convertedKey) - 1) | |
280 } | |
281 | |
282 inputParameters[[key]] <- convertedValues | |
283 #print("-------------------------") | |
284 #print(convertedValues) | |
285 #print("-------------------------") | |
286 convertedKeys <- append(convertedKeys, convertedKey) | |
287 } else { | |
288 #print("-------------------------") | |
289 #print(key) | |
290 #print(inputParameters[[key]]) | |
291 if (!is.null(inputParameters[[key]])) { | |
292 convertedKeys <- append(convertedKeys, key) | |
293 } | |
294 #print("-------------------------") | |
295 | |
296 } | |
297 } | |
298 #print(inputParameters) | |
299 names(inputParameters) <- convertedKeys | |
300 #print(inputParameters) | |
301 print("--> Inputs parsed") | |
302 | |
303 print("--> Prepare process execution") | |
304 jsonData <- list("inputs" = inputParameters, | |
305 "outputs" = outputs) | |
306 | |
307 cookie <- inputParameters$cookie | |
308 | |
309 print("--> Execute process") | |
310 jobID <- | |
311 executeProcess(server, inputParameters$select_process, jsonData, cookie) | |
312 print("--> Process executed") | |
313 | |
314 print("--> Retrieve results") | |
315 retrieveResults(server, | |
316 inputParameters$select_process, | |
317 jobID, | |
318 outputLocation, | |
319 cookie) | |
320 print("--> Results retrieved") |