Mercurial > repos > ecology > aquainfra_ogc_api_processes
comparison aquainfra_ogc_api_processes.R @ 7:c805cb663bac draft default tip
planemo upload for repository https://github.com/AquaINFRA/tools-ecology/tree/master commit 176da50f4af1ff7f4e0e325984c8edd27fe93926
author | ecology |
---|---|
date | Mon, 21 Jul 2025 06:19:08 +0000 |
parents | e1f41a25ff24 |
children |
comparison
equal
deleted
inserted
replaced
6:8f9d34c780de | 7:c805cb663bac |
---|---|
1 library("httr2") | 1 library("httr2") |
2 library("jsonlite") | 2 library("jsonlite") |
3 library("getopt") | 3 library("getopt") |
4 | 4 |
5 cat("start generic wrapper service \n") | 5 cat("START GENERIC WRAPPER SERVICE \n") |
6 | 6 |
7 remove_null_values <- function(x) { | 7 remove_null_values <- function(x) { |
8 if (is.list(x)) { | 8 if (is.list(x)) { |
9 x <- lapply(x, remove_null_values) | 9 x <- lapply(x, remove_null_values) |
10 x <- x[!sapply(x, is.null)] | 10 x <- x[!sapply(x, is.null)] |
31 readableOutput <- rawToChar(rawVector) | 31 readableOutput <- rawToChar(rawVector) |
32 jsonObject <- jsonlite::fromJSON(readableOutput) | 32 jsonObject <- jsonlite::fromJSON(readableOutput) |
33 return(jsonObject) | 33 return(jsonObject) |
34 } | 34 } |
35 | 35 |
36 getOutputs <- function(inputs, output, server) { | |
37 url <- | |
38 paste(paste(server, "processes/", sep = ""), | |
39 inputs$select_process, | |
40 sep = "") | |
41 print(url) | |
42 request <- request(url) | |
43 response <- req_perform(request) | |
44 responseBody <- parseResponseBody(response$body) | |
45 outputs <- list() | |
46 | |
47 for (x in 1:length(responseBody$outputs)) { | |
48 outputformatName <- | |
49 paste(names(responseBody$outputs[x]), "_outformat", sep = "") | |
50 output_item <- list() | |
51 | |
52 for (p in names(inputs)) { | |
53 if (p == outputformatName) { | |
54 format <- list("mediaType" = inputs[[outputformatName]]) | |
55 output_item$format <- format | |
56 } | |
57 } | |
58 output_item$transmissionMode <- "reference" | |
59 outputs[[x]] <- output_item | |
60 } | |
61 | |
62 names(outputs) <- names(responseBody$outputs) | |
63 return(outputs) | |
64 } | |
65 | |
66 executeProcess <- function(url, process, requestBodyData) { | 36 executeProcess <- function(url, process, requestBodyData) { |
67 url <- | 37 url <- |
68 paste(paste(paste(url, "processes/", sep = ""), process, sep = ""), "/execution", sep = "") | 38 paste(paste(paste(url, "processes/", sep = ""), process, sep = ""), "/execution", sep = "") |
69 requestBodyData$inputs$select_process <- NULL | 39 requestBodyData$inputs$select_process <- NULL |
70 | 40 |
75 req_headers("Content-Type" = "application/json", | 45 req_headers("Content-Type" = "application/json", |
76 "Prefer" = "respond-async") %>% | 46 "Prefer" = "respond-async") %>% |
77 req_body_json(body) %>% | 47 req_body_json(body) %>% |
78 req_perform() | 48 req_perform() |
79 | 49 |
80 cat("\n Process executed") | 50 cat("\n 3.1: Process executed") |
81 cat("\n status: ", response$status_code) | 51 cat("\n 3.1: Status code: ", response$status_code) |
82 jobId <- parseResponseBody(response$body)$jobID | 52 jobId <- parseResponseBody(response$body)$jobID |
53 cat("\n 3.1: Job ID: ", jobId, "\n") | |
83 | 54 |
84 return(jobId) | 55 return(jobId) |
85 } | 56 } |
86 | 57 |
87 checkJobStatus <- function(server, process, jobID) { | 58 checkJobStatus <- function(server, process, jobID) { |
91 jobStatus <- parseResponseBody(response$body)$status | 62 jobStatus <- parseResponseBody(response$body)$status |
92 jobProgress <- parseResponseBody(response$body)$progress | 63 jobProgress <- parseResponseBody(response$body)$progress |
93 return(jobStatus) | 64 return(jobStatus) |
94 } | 65 } |
95 | 66 |
96 getStatusCode <- function(server, process, jobID) { | 67 getStatusCode <- function(url) { |
97 url <- paste0(server, "jobs/", jobID) | |
98 print(url) | |
99 response <- request(url) %>% | 68 response <- request(url) %>% |
100 req_perform() | 69 req_perform() |
101 status_code <- response$status_code | 70 status_code <- response$status_code |
102 return(status_code) | 71 return(status_code) |
103 } | 72 } |
123 } | 92 } |
124 return(hrefs) | 93 return(hrefs) |
125 } | 94 } |
126 | 95 |
127 retrieveResults <- function(server, process, jobID, outputData) { | 96 retrieveResults <- function(server, process, jobID, outputData) { |
128 status_code <- getStatusCode(server, process, jobID) | 97 url <- paste0(server, "jobs/", jobID) |
129 print(status_code) | 98 cat(" 4.1: Job URL: ", url) |
99 status_code <- getStatusCode(url) | |
100 cat("\n 4.2: Status code: ", status_code, "\n") | |
130 | 101 |
131 if (status_code == 200) { | 102 if (status_code == 200) { |
132 status <- "running" | 103 status <- "running" |
133 | 104 |
134 while (status == "running") { | 105 while (status == "running") { |
135 jobStatus <- checkJobStatus(server, process, jobID) | 106 jobStatus <- checkJobStatus(server, process, jobID) |
136 print(jobStatus) | 107 cat(" 4.3: Job status: ", jobStatus, "\n") |
137 | 108 |
138 if (jobStatus == "successful") { | 109 if (jobStatus == "successful") { |
139 status <- jobStatus | 110 status <- jobStatus |
140 result <- getResult(server, process, jobID) | 111 result <- getResult(server, process, jobID) |
141 | 112 |
142 if (result$status_code == 200) { | 113 if (result$status_code == 200) { |
143 resultBody <- parseResponseBody(result$body) | 114 resultBody <- parseResponseBody(result$body) |
115 cat("\n 4.4 Outputs: \n") | |
144 print(resultBody) | 116 print(resultBody) |
145 hrefs <- findHref(resultBody) | 117 hrefs <- findHref(resultBody) |
146 | 118 |
147 if (length(hrefs) > 0) { | 119 if (length(hrefs) > 0) { |
148 urls_with_newline <- paste(hrefs, collapse = "\n") | 120 urls_with_newline <- paste(hrefs, collapse = "\n") |
149 print(urls_with_newline) | |
150 con <- file(outputData, "w") | 121 con <- file(outputData, "w") |
151 writeLines(urls_with_newline, con = con) | 122 writeLines(urls_with_newline, con = con) |
152 close(con) | 123 close(con) |
153 } else { | 124 } else { |
154 print("No hrefs found.") | 125 stop(paste0("Job failed. No hrefs found. See details at: ", server, "jobs/", jobID)) |
155 } | 126 } |
156 } | 127 } |
157 } else if (jobStatus == "failed") { | 128 } else if (jobStatus == "failed") { |
158 status <- jobStatus | 129 stop(paste0("Job failed. See details at: ", server, "jobs/", jobID)) |
159 } | 130 } |
160 Sys.sleep(3) | 131 Sys.sleep(3) |
161 } | 132 } |
162 | |
163 cat("\n done \n") | |
164 | 133 |
165 } else if (status_code1 == 400) { | 134 } else if (status_code1 == 400) { |
166 print("A query parameter has an invalid value.") | 135 print("A query parameter has an invalid value.") |
167 } else if (status_code1 == 404) { | 136 } else if (status_code1 == 404) { |
168 print("The requested URI was not found.") | 137 print("The requested URI was not found.") |
183 grepl("^https?://", x) | 152 grepl("^https?://", x) |
184 } | 153 } |
185 | 154 |
186 server <- "https://aquainfra.ogc.igb-berlin.de/pygeoapi/" | 155 server <- "https://aquainfra.ogc.igb-berlin.de/pygeoapi/" |
187 | 156 |
188 print("--> Retrieve parameters") | 157 cat("\n1: START RETRIEVING PARAMETERS\n\n") |
189 inputParameters <- getParameters() | 158 inputParameters <- getParameters() |
190 print("--> Parameters retrieved") | 159 print(inputParameters) |
160 cat("1: END RETRIEVING PARAMETERS\n") | |
191 | 161 |
192 args <- commandArgs(trailingOnly = TRUE) | 162 args <- commandArgs(trailingOnly = TRUE) |
193 outputLocation <- args[2] | 163 outputLocation <- args[2] |
194 | 164 |
195 print("--> Retrieve outputs") | 165 cat("\n2: START PARSING INPUTS\n\n") |
196 outputs <- getOutputs(inputParameters, outputLocation, server) | |
197 print("--> Outputs retrieved") | |
198 | |
199 print("--> Parse inputs") | |
200 | |
201 convertedKeys <- c() | 166 convertedKeys <- c() |
202 | 167 |
203 for (key in names(inputParameters)) { | 168 for (key in names(inputParameters)) { |
204 if (is.character(inputParameters[[key]]) && | 169 if (is.character(inputParameters[[key]]) && |
205 (endsWith(inputParameters[[key]], ".dat") || | 170 (endsWith(inputParameters[[key]], ".dat") || |
207 con <- file(inputParameters[[key]], "r") | 172 con <- file(inputParameters[[key]], "r") |
208 url_list <- list() | 173 url_list <- list() |
209 | 174 |
210 con <- file(inputParameters[[key]], "r") | 175 con <- file(inputParameters[[key]], "r") |
211 lines <- readLines(con) | 176 lines <- readLines(con) |
212 print(length(lines)) | |
213 close(con) | 177 close(con) |
214 | 178 |
215 json_string <- paste(lines, collapse = "\n") | 179 json_string <- paste(lines, collapse = ",") |
216 inputParameters[[key]] <- json_string | 180 inputParameters[[key]] <- json_string |
217 | 181 |
218 convertedKeys <- append(convertedKeys, key) | 182 convertedKeys <- append(convertedKeys, key) |
219 } | |
220 else if (grepl("_Array_", key)) { | |
221 keyParts <- strsplit(key, split = "_")[[1]] | |
222 type <- keyParts[length(keyParts)] | |
223 values <- inputParameters[[key]] | |
224 value_list <- strsplit(values, split = ",") | |
225 convertedValues <- c() | |
226 | |
227 for (value in value_list) { | |
228 if (type == "integer") { | |
229 value <- as.integer(value) | |
230 } else if (type == "numeric") { | |
231 value <- as.numeric(value) | |
232 } else if (type == "character") { | |
233 value <- as.character(value) | |
234 } | |
235 convertedValues <- append(convertedValues, value) | |
236 | |
237 convertedKey <- "" | |
238 for (part in keyParts) { | |
239 if (part == "Array") { | |
240 break | |
241 } | |
242 convertedKey <- | |
243 paste(convertedKey, paste(part, "_", sep = ""), sep = "") | |
244 } | |
245 convertedKey <- substr(convertedKey, 1, nchar(convertedKey) - 1) | |
246 } | |
247 | |
248 inputParameters[[key]] <- convertedValues | |
249 convertedKeys <- append(convertedKeys, convertedKey) | |
250 } else { | 183 } else { |
251 if (!is.null(inputParameters[[key]])) { | 184 if (!is.null(inputParameters[[key]])) { |
252 convertedKeys <- append(convertedKeys, key) | 185 convertedKeys <- append(convertedKeys, key) |
253 } | 186 } |
254 } | 187 } |
255 } | 188 } |
189 names(inputParameters) <- convertedKeys | |
256 print(inputParameters) | 190 print(inputParameters) |
257 names(inputParameters) <- convertedKeys | 191 cat("2: END PARSING INPUTSs\n") |
258 print("--> Inputs parsed") | 192 |
259 | 193 cat("\n3: START EXECUTING PROCESS\n") |
260 print("--> Prepare process execution") | 194 jsonData <- list("inputs" = inputParameters) |
261 jsonData <- list("inputs" = inputParameters, | |
262 "outputs" = outputs) | |
263 | |
264 print("--> Execute process") | |
265 jobId <- executeProcess(server, inputParameters$select_process, jsonData) | 195 jobId <- executeProcess(server, inputParameters$select_process, jsonData) |
266 print("--> Process executed") | 196 cat("\n3: END EXECUTING PROCESS\n") |
267 | 197 |
268 print("--> Retrieve results") | 198 cat("\n4: START RETRIEVING RESULTS\n\n") |
269 retrieveResults(server, inputParameters$select_process, jobId, outputLocation) | 199 retrieveResults(server, inputParameters$select_process, jobId, outputLocation) |
270 print("--> Results retrieved") | 200 cat("4: END RETRIEVING RESULTS\n") |
201 | |
202 cat("\n5: DONE.") |