view aquainfra_ogc_api_processes.R @ 9:7b003d6604f8 draft default tip

planemo upload for repository https://github.com/AquaINFRA/tools-ecology/tree/master commit b3d67b03e9bde6f6b1e2a48b8d5664129d3c57d5
author ecology
date Mon, 18 Aug 2025 10:12:19 +0000
parents aec309f44d47
children
line wrap: on
line source

library("httr2")
library("jsonlite")
library("getopt")

cat("START GENERIC WRAPPER SERVICE \n")

remove_null_values <- function(x) {
  if (is.list(x)) {
    x <- lapply(x, remove_null_values)
    x <- x[!sapply(x, is.null)]
  }
  return(x)
}

getParameters <- function() {
  con <- file("inputs.json", "r")
  lines <- readLines(con)
  close(con)

  json_string <- paste(lines, collapse = "\n")
  json_data <- fromJSON(json_string)

  cleaned_json_data <- remove_null_values(json_data)
  return(cleaned_json_data$conditional_process)
}

parseResponseBody <- function(body) {
  hex <- c(body)
  intValues <- as.integer(hex)
  rawVector <- as.raw(intValues)
  readableOutput <- rawToChar(rawVector)
  jsonObject <- jsonlite::fromJSON(readableOutput)
  return(jsonObject)
}

executeProcess <- function(url, process, requestBodyData) {
  url <-
    paste(paste(paste(url, "processes/", sep = ""), process, sep = ""), "/execution", sep = "")
  requestBodyData$inputs$select_process <- NULL
  
  body <- list()
  body$inputs <- requestBodyData$inputs
  
  response <- request(url) %>%
    req_headers("Content-Type" = "application/json",
                "Prefer" = "respond-async") %>%
    req_body_json(body) %>%
    req_perform()
  
  cat("\n 3.1: Process executed")
  cat("\n 3.1: Status code: ", response$status_code)
  jobId <- parseResponseBody(response$body)$jobID
  cat("\n 3.1: Job ID: ", jobId, "\n")
  
  return(jobId)
}

checkJobStatus <- function(server, process, jobID) {
  url <- paste0(server, "jobs/", jobID)
  response <- request(url) %>%
    req_perform()
  jobStatus <- parseResponseBody(response$body)$status
  jobProgress <- parseResponseBody(response$body)$progress
  return(jobStatus)
}

getStatusCode <- function(url) {
  response <- request(url) %>%
    req_perform()
  status_code <- response$status_code
  return(status_code)
}

getResult <- function (server, process, jobID) {
  response <-
    request(paste0(server, "jobs/", jobID, "/results?f=json")) %>%
    req_perform()
  return(response)
}

findHref <- function(obj) {
  hrefs <- c()
  if (is.list(obj)) {
    for (name in names(obj)) {
      element <- obj[[name]]
      if (is.list(element)) {
        hrefs <- c(hrefs, findHref(element))
      } else if (name == "href") {
        hrefs <- c(hrefs, element)
      }
    }
  }
  return(hrefs)
}

retrieveResults <- function(server, process, jobID, outputData) {
  url <- paste0(server, "jobs/", jobID)
  cat(" 4.1: Job URL: ", url)
  status_code <- getStatusCode(url)
  cat("\n 4.2: Status code: ", status_code, "\n")
  
  if (status_code == 200) {
    status <- "running"
    
    while (status == "running") {
      jobStatus <- checkJobStatus(server, process, jobID)
      cat(" 4.3: Job status: ", jobStatus, "\n")
      
      if (jobStatus == "successful") {
        status <- jobStatus
        result <- getResult(server, process, jobID)
        
        if (result$status_code == 200) {
          resultBody <- parseResponseBody(result$body)
          cat("\n 4.4 Outputs: \n")
          print(resultBody)
          hrefs <- findHref(resultBody)
          
          if (length(hrefs) > 0) {
            urls_with_newline <- paste(hrefs, collapse = "\n")
            con <- file(outputData, "w")
            writeLines(urls_with_newline, con = con)
            close(con)
          } else {
            stop(paste0("Job failed. No hrefs found. See details at: ", server, "jobs/", jobID))
          }
        }
      } else if (jobStatus == "failed") {
        stop(paste0("Job failed. See details at: ", server, "jobs/", jobID))
      }
      Sys.sleep(3)
    }
    
  } else if (status_code1 == 400) {
    print("A query parameter has an invalid value.")
  } else if (status_code1 == 404) {
    print("The requested URI was not found.")
  } else if (status_code1 == 500) {
    print("The requested URI was not found.")
  } else {
    print(paste("HTTP", status_code1, "Error:", resp1$status_message))
  }
}

saveResult <- function(href, outputData) {
  con <- file(outputData, "w")
    writeLines(href, con = con)
  close(con)
}

is_url <- function(x) {
  grepl("^https?://", x)
}

server <- "https://aquainfra.ogc.igb-berlin.de/pygeoapi/"

cat("\n1: START RETRIEVING PARAMETERS\n\n")
inputParameters <- getParameters()
print(inputParameters)
cat("1: END RETRIEVING PARAMETERS\n")

args <- commandArgs(trailingOnly = TRUE)
outputLocation <- args[2]

cat("\n2: START PARSING INPUTS\n\n")
convertedKeys <- c()

for (key in names(inputParameters)) {
  value_no_spaces <- gsub(" ", "", inputParameters[[key]])
  if (value_no_spaces == "") {
    inputParameters[[key]] <- NULL
  }

  if (is.character(inputParameters[[key]]) &&
      (endsWith(inputParameters[[key]], ".dat") ||
       endsWith(inputParameters[[key]], ".txt"))) {
    con <- file(inputParameters[[key]], "r")
    url_list <- list()

    con <- file(inputParameters[[key]], "r")
    lines <- readLines(con)
    close(con)

    json_string <- paste(lines, collapse = ",")
    inputParameters[[key]] <- json_string

    convertedKeys <- append(convertedKeys, key)
  } else if (
      grepl("_object", key) &&
      !is.null(inputParameters[[key]]) &&
      !is.na(inputParameters[[key]]) &&
      gsub(" ", "", inputParameters[[key]]) != ""
    ) {
    decoded_value <- gsub("__oc__", "{", inputParameters[[key]])
    decoded_value <- gsub("__cc__", "}", decoded_value)
    decoded_value <- gsub("__ob__", "[", decoded_value)
    decoded_value <- gsub("__cb__", "]", decoded_value)
    decoded_value <- gsub("__dq__", "\"", decoded_value)  # Optional: sometimes used for quotes
    decoded_value <- gsub("__cn__", ":", decoded_value)   # Optional: used for colon in older versions

    parsed_json <- fromJSON(decoded_value)
    convertedKey <- gsub("_object", "", key)
    convertedKeys <- append(convertedKeys, convertedKey)
    #json_string <- toJSON(parsed_json, auto_unbox = FALSE)
    inputParameters[[key]] <- parsed_json
  } else if (grepl("_array", key)) {
      keyParts <- strsplit(key, split = "_")[[1]]
      type <- keyParts[length(keyParts)]
      values <- inputParameters[[key]]
      if (is.character(values) && grepl(",", values)) {
        value_list <- unlist(strsplit(values, split = ","))
      } else if (is.character(values)) {
        value_list <- c(values)
      }
      
      convertedValues <- c()

      for (value in value_list) {
        value <- as.character(value)
        value <- gsub(" ", "", value)
        #if(type == "integer") {
        #  value <- as.integer(value)
        #} else if (type == "numeric") {
        #  value <- as.numeric(balue)
        #} else if (type == "character") {
        #  value <- as.character(value)
        #  value <- gsub(" ", "", value)
        #}
        convertedValues <- append(convertedValues, value)
        convertedKey <- ""
        for (part in keyParts) {
          if(part == "array") {
            break
          }
          convertedKey <- paste(convertedKey, paste(part, "_", sep=""), sep="")
        }
        convertedKey <- substr(convertedKey, 1, nchar(convertedKey)-1)
      }
      inputParameters[[key]] <- convertedValues
      convertedKeys <- append(convertedKeys, convertedKey)
  }
    else {
    if (!is.null(inputParameters[[key]])) {
      convertedKeys <- append(convertedKeys, key)
    }
  }
}
names(inputParameters) <- convertedKeys
print(inputParameters)
cat("2: END PARSING INPUTSs\n")

cat("\n3: START EXECUTING PROCESS\n")
jsonData <- list("inputs" = inputParameters)
jobId <- executeProcess(server, inputParameters$select_process, jsonData)
cat("\n3: END EXECUTING PROCESS\n")

cat("\n4: START RETRIEVING RESULTS\n\n")
retrieveResults(server, inputParameters$select_process, jobId, outputLocation)
cat("4: END RETRIEVING RESULTS\n")

cat("\n5: DONE.")