Source code for sta2stac.extensions.datacube

# SPDX-FileCopyrightText: 2023 Karlsruher Institut für Technologie
#
# SPDX-License-Identifier: CC0-1.0
import pystac
from pystac.extensions.datacube import (
    DatacubeExtension,
    Dimension,
    DimensionType,
    Variable,
    VariableType,
)

from ..logger import Logger


[docs] class Datacube: """ This is a class for adding datacube extension to the STAC item. Args: logger_properties (dict): A dictionary containing the logger properties. """ logger_properties: dict """ A dictionary containing the logger properties. By default it sets to None. """ def __init__(self, logger_properties: dict = dict()): self.logger_properties = logger_properties
[docs] def item(self, item: pystac.Item, harvesting_vars: dict = dict()): """ Add datacube extension to the STAC item. Args: item (pystac.Item): A STAC item. harvesting_vars (dict): A dictionary containing the harvesting variables. """ variables = ( {} ) # variables dictionary for gathering the Variable objects dimensions: dict = ( {} ) # dimensions dictionary for gathering the Dimension objects cube = DatacubeExtension.ext(item, add_if_missing=True) variable_dimensions = harvesting_vars["item_variable_dimensions"] if len(harvesting_vars["item_variable_names"]) != len( variable_dimensions ): # Another solution for this is to index each output element and find the None values to decide about them later self.logger_properties["logger_level"] = "ERROR" self.logger_properties[ "logger_msg" ] = "The length of the variable list and the dimension list are not equal. Check your data in STA." else: for i, v in enumerate(harvesting_vars["item_variable_names"]): variable_dict = dict() # forth case is when description is not available or is less than then variable ids or dimensions. It's not required then can be ignored # Usually every varialbe in ncML has name and shape attrs that it can be used as variable id and dimension. variable_dict["dimensions"] = variable_dimensions[i] variable_dict["type"] = VariableType.DATA.value if ( harvesting_vars["item_variable_descriptions"] is not None and len(harvesting_vars["item_variable_names"]) == len(harvesting_vars["item_variable_descriptions"]) and len(harvesting_vars["item_variable_descriptions"]) != harvesting_vars["item_variable_descriptions"].count( None ) ): if ( harvesting_vars["item_variable_descriptions"][i] is not None ): variable_dict["description"] = harvesting_vars[ "item_variable_descriptions" ][i] variable_dict["dimensions"] = variable_dimensions[i] if ( harvesting_vars["item_variable_units"] is not None and len(harvesting_vars["item_variable_units"]) == len(variable_dimensions) and len(harvesting_vars["item_variable_units"]) != harvesting_vars["item_variable_units"].count(None) ): if harvesting_vars["item_variable_units"][i] is not None: variable_dict["units"] = harvesting_vars[ "item_variable_units" ][i] variables[ harvesting_vars["item_variable_names"][i] ] = Variable(variable_dict) # Temporal Dimension list_of_required_keys = [ "item_datetime", ] if all( harvesting_vars.get(key) is not None for key in list_of_required_keys ): temporal_dict = dict() harvesting_vars["item_datetime_str"] = [ datetime_elem.strftime("%Y-%m-%dT%H:%M:%SZ") for datetime_elem in harvesting_vars["item_datetime"] ] temporal_dict = { "type": DimensionType.TEMPORAL.value, "description": "time dimension", "extent": harvesting_vars["item_datetime_str"], } dimensions["time"] = Dimension(temporal_dict) else: self.logger_properties["logger_level"] = "ERROR" self.logger_properties[ "logger_msg" ] = "Required attributes are not involved in the output dictionary. Check your TS data in STA. " Logger(self.logger_properties) cube.apply(dimensions=dimensions, variables=variables)