comparison aquainfra_ogc_api_processes.R @ 7:c805cb663bac draft default tip

planemo upload for repository https://github.com/AquaINFRA/tools-ecology/tree/master commit 176da50f4af1ff7f4e0e325984c8edd27fe93926
author ecology
date Mon, 21 Jul 2025 06:19:08 +0000
parents e1f41a25ff24
children
comparison
equal deleted inserted replaced
6:8f9d34c780de 7:c805cb663bac
1 library("httr2") 1 library("httr2")
2 library("jsonlite") 2 library("jsonlite")
3 library("getopt") 3 library("getopt")
4 4
5 cat("start generic wrapper service \n") 5 cat("START GENERIC WRAPPER SERVICE \n")
6 6
7 remove_null_values <- function(x) { 7 remove_null_values <- function(x) {
8 if (is.list(x)) { 8 if (is.list(x)) {
9 x <- lapply(x, remove_null_values) 9 x <- lapply(x, remove_null_values)
10 x <- x[!sapply(x, is.null)] 10 x <- x[!sapply(x, is.null)]
31 readableOutput <- rawToChar(rawVector) 31 readableOutput <- rawToChar(rawVector)
32 jsonObject <- jsonlite::fromJSON(readableOutput) 32 jsonObject <- jsonlite::fromJSON(readableOutput)
33 return(jsonObject) 33 return(jsonObject)
34 } 34 }
35 35
36 getOutputs <- function(inputs, output, server) {
37 url <-
38 paste(paste(server, "processes/", sep = ""),
39 inputs$select_process,
40 sep = "")
41 print(url)
42 request <- request(url)
43 response <- req_perform(request)
44 responseBody <- parseResponseBody(response$body)
45 outputs <- list()
46
47 for (x in 1:length(responseBody$outputs)) {
48 outputformatName <-
49 paste(names(responseBody$outputs[x]), "_outformat", sep = "")
50 output_item <- list()
51
52 for (p in names(inputs)) {
53 if (p == outputformatName) {
54 format <- list("mediaType" = inputs[[outputformatName]])
55 output_item$format <- format
56 }
57 }
58 output_item$transmissionMode <- "reference"
59 outputs[[x]] <- output_item
60 }
61
62 names(outputs) <- names(responseBody$outputs)
63 return(outputs)
64 }
65
66 executeProcess <- function(url, process, requestBodyData) { 36 executeProcess <- function(url, process, requestBodyData) {
67 url <- 37 url <-
68 paste(paste(paste(url, "processes/", sep = ""), process, sep = ""), "/execution", sep = "") 38 paste(paste(paste(url, "processes/", sep = ""), process, sep = ""), "/execution", sep = "")
69 requestBodyData$inputs$select_process <- NULL 39 requestBodyData$inputs$select_process <- NULL
70 40
75 req_headers("Content-Type" = "application/json", 45 req_headers("Content-Type" = "application/json",
76 "Prefer" = "respond-async") %>% 46 "Prefer" = "respond-async") %>%
77 req_body_json(body) %>% 47 req_body_json(body) %>%
78 req_perform() 48 req_perform()
79 49
80 cat("\n Process executed") 50 cat("\n 3.1: Process executed")
81 cat("\n status: ", response$status_code) 51 cat("\n 3.1: Status code: ", response$status_code)
82 jobId <- parseResponseBody(response$body)$jobID 52 jobId <- parseResponseBody(response$body)$jobID
53 cat("\n 3.1: Job ID: ", jobId, "\n")
83 54
84 return(jobId) 55 return(jobId)
85 } 56 }
86 57
87 checkJobStatus <- function(server, process, jobID) { 58 checkJobStatus <- function(server, process, jobID) {
91 jobStatus <- parseResponseBody(response$body)$status 62 jobStatus <- parseResponseBody(response$body)$status
92 jobProgress <- parseResponseBody(response$body)$progress 63 jobProgress <- parseResponseBody(response$body)$progress
93 return(jobStatus) 64 return(jobStatus)
94 } 65 }
95 66
96 getStatusCode <- function(server, process, jobID) { 67 getStatusCode <- function(url) {
97 url <- paste0(server, "jobs/", jobID)
98 print(url)
99 response <- request(url) %>% 68 response <- request(url) %>%
100 req_perform() 69 req_perform()
101 status_code <- response$status_code 70 status_code <- response$status_code
102 return(status_code) 71 return(status_code)
103 } 72 }
123 } 92 }
124 return(hrefs) 93 return(hrefs)
125 } 94 }
126 95
127 retrieveResults <- function(server, process, jobID, outputData) { 96 retrieveResults <- function(server, process, jobID, outputData) {
128 status_code <- getStatusCode(server, process, jobID) 97 url <- paste0(server, "jobs/", jobID)
129 print(status_code) 98 cat(" 4.1: Job URL: ", url)
99 status_code <- getStatusCode(url)
100 cat("\n 4.2: Status code: ", status_code, "\n")
130 101
131 if (status_code == 200) { 102 if (status_code == 200) {
132 status <- "running" 103 status <- "running"
133 104
134 while (status == "running") { 105 while (status == "running") {
135 jobStatus <- checkJobStatus(server, process, jobID) 106 jobStatus <- checkJobStatus(server, process, jobID)
136 print(jobStatus) 107 cat(" 4.3: Job status: ", jobStatus, "\n")
137 108
138 if (jobStatus == "successful") { 109 if (jobStatus == "successful") {
139 status <- jobStatus 110 status <- jobStatus
140 result <- getResult(server, process, jobID) 111 result <- getResult(server, process, jobID)
141 112
142 if (result$status_code == 200) { 113 if (result$status_code == 200) {
143 resultBody <- parseResponseBody(result$body) 114 resultBody <- parseResponseBody(result$body)
115 cat("\n 4.4 Outputs: \n")
144 print(resultBody) 116 print(resultBody)
145 hrefs <- findHref(resultBody) 117 hrefs <- findHref(resultBody)
146 118
147 if (length(hrefs) > 0) { 119 if (length(hrefs) > 0) {
148 urls_with_newline <- paste(hrefs, collapse = "\n") 120 urls_with_newline <- paste(hrefs, collapse = "\n")
149 print(urls_with_newline)
150 con <- file(outputData, "w") 121 con <- file(outputData, "w")
151 writeLines(urls_with_newline, con = con) 122 writeLines(urls_with_newline, con = con)
152 close(con) 123 close(con)
153 } else { 124 } else {
154 print("No hrefs found.") 125 stop(paste0("Job failed. No hrefs found. See details at: ", server, "jobs/", jobID))
155 } 126 }
156 } 127 }
157 } else if (jobStatus == "failed") { 128 } else if (jobStatus == "failed") {
158 status <- jobStatus 129 stop(paste0("Job failed. See details at: ", server, "jobs/", jobID))
159 } 130 }
160 Sys.sleep(3) 131 Sys.sleep(3)
161 } 132 }
162
163 cat("\n done \n")
164 133
165 } else if (status_code1 == 400) { 134 } else if (status_code1 == 400) {
166 print("A query parameter has an invalid value.") 135 print("A query parameter has an invalid value.")
167 } else if (status_code1 == 404) { 136 } else if (status_code1 == 404) {
168 print("The requested URI was not found.") 137 print("The requested URI was not found.")
183 grepl("^https?://", x) 152 grepl("^https?://", x)
184 } 153 }
185 154
186 server <- "https://aquainfra.ogc.igb-berlin.de/pygeoapi/" 155 server <- "https://aquainfra.ogc.igb-berlin.de/pygeoapi/"
187 156
188 print("--> Retrieve parameters") 157 cat("\n1: START RETRIEVING PARAMETERS\n\n")
189 inputParameters <- getParameters() 158 inputParameters <- getParameters()
190 print("--> Parameters retrieved") 159 print(inputParameters)
160 cat("1: END RETRIEVING PARAMETERS\n")
191 161
192 args <- commandArgs(trailingOnly = TRUE) 162 args <- commandArgs(trailingOnly = TRUE)
193 outputLocation <- args[2] 163 outputLocation <- args[2]
194 164
195 print("--> Retrieve outputs") 165 cat("\n2: START PARSING INPUTS\n\n")
196 outputs <- getOutputs(inputParameters, outputLocation, server)
197 print("--> Outputs retrieved")
198
199 print("--> Parse inputs")
200
201 convertedKeys <- c() 166 convertedKeys <- c()
202 167
203 for (key in names(inputParameters)) { 168 for (key in names(inputParameters)) {
204 if (is.character(inputParameters[[key]]) && 169 if (is.character(inputParameters[[key]]) &&
205 (endsWith(inputParameters[[key]], ".dat") || 170 (endsWith(inputParameters[[key]], ".dat") ||
207 con <- file(inputParameters[[key]], "r") 172 con <- file(inputParameters[[key]], "r")
208 url_list <- list() 173 url_list <- list()
209 174
210 con <- file(inputParameters[[key]], "r") 175 con <- file(inputParameters[[key]], "r")
211 lines <- readLines(con) 176 lines <- readLines(con)
212 print(length(lines))
213 close(con) 177 close(con)
214 178
215 json_string <- paste(lines, collapse = "\n") 179 json_string <- paste(lines, collapse = ",")
216 inputParameters[[key]] <- json_string 180 inputParameters[[key]] <- json_string
217 181
218 convertedKeys <- append(convertedKeys, key) 182 convertedKeys <- append(convertedKeys, key)
219 }
220 else if (grepl("_Array_", key)) {
221 keyParts <- strsplit(key, split = "_")[[1]]
222 type <- keyParts[length(keyParts)]
223 values <- inputParameters[[key]]
224 value_list <- strsplit(values, split = ",")
225 convertedValues <- c()
226
227 for (value in value_list) {
228 if (type == "integer") {
229 value <- as.integer(value)
230 } else if (type == "numeric") {
231 value <- as.numeric(value)
232 } else if (type == "character") {
233 value <- as.character(value)
234 }
235 convertedValues <- append(convertedValues, value)
236
237 convertedKey <- ""
238 for (part in keyParts) {
239 if (part == "Array") {
240 break
241 }
242 convertedKey <-
243 paste(convertedKey, paste(part, "_", sep = ""), sep = "")
244 }
245 convertedKey <- substr(convertedKey, 1, nchar(convertedKey) - 1)
246 }
247
248 inputParameters[[key]] <- convertedValues
249 convertedKeys <- append(convertedKeys, convertedKey)
250 } else { 183 } else {
251 if (!is.null(inputParameters[[key]])) { 184 if (!is.null(inputParameters[[key]])) {
252 convertedKeys <- append(convertedKeys, key) 185 convertedKeys <- append(convertedKeys, key)
253 } 186 }
254 } 187 }
255 } 188 }
189 names(inputParameters) <- convertedKeys
256 print(inputParameters) 190 print(inputParameters)
257 names(inputParameters) <- convertedKeys 191 cat("2: END PARSING INPUTSs\n")
258 print("--> Inputs parsed") 192
259 193 cat("\n3: START EXECUTING PROCESS\n")
260 print("--> Prepare process execution") 194 jsonData <- list("inputs" = inputParameters)
261 jsonData <- list("inputs" = inputParameters,
262 "outputs" = outputs)
263
264 print("--> Execute process")
265 jobId <- executeProcess(server, inputParameters$select_process, jsonData) 195 jobId <- executeProcess(server, inputParameters$select_process, jsonData)
266 print("--> Process executed") 196 cat("\n3: END EXECUTING PROCESS\n")
267 197
268 print("--> Retrieve results") 198 cat("\n4: START RETRIEVING RESULTS\n\n")
269 retrieveResults(server, inputParameters$select_process, jobId, outputLocation) 199 retrieveResults(server, inputParameters$select_process, jobId, outputLocation)
270 print("--> Results retrieved") 200 cat("4: END RETRIEVING RESULTS\n")
201
202 cat("\n5: DONE.")