Source code for sta2stac.harvester

# SPDX-FileCopyrightText: 2023 Karlsruher Institut für Technologie
#
# SPDX-License-Identifier: CC0-1.0


from shapely import geometry

from .analysers.item_info_retriever import ItemInfoHandler
from .analysers.processing import Processing
from .analysers.utils import Utils
from .logger import Logger


[docs] class Harvester: """ A class to harvest the SensorThings API attributes. Args: logger_properties (dict): A dictionary containing the logger properties. harvesting_vars (dict): A dictionary containing the harvesting variables. """ logger_properties: dict """ A dictionary containing the logger properties. """ harvesting_vars: dict """ A dictionary containing the harvesting variables keys. """ def __init__(self, logger_properties: dict, harvesting_vars: dict): self.logger_properties = logger_properties self.harvesting_vars = harvesting_vars
[docs] def item( self, link: str, version: str, number_of_things: int, requests_properties: dict, item_tuples: list[tuple] = [], datacube_extension: bool = False, filter: str = "", ): """ Harvest the Things in SensorThings API as STAC-Item. """ if filter is not None and filter != "": thing_url = f"{link}/{version}/Things({number_of_things})" all_observations_geojson_url = f"{link}/{version}/Observations?$filter=Datastream/Thing/id%20eq%20%27{number_of_things}%27&$resultFormat=GeoJSON" all_observations_dataarray_url = f"{link}/{version}/Observations?$filter=Datastream/Thing/id%20eq%20%27{number_of_things}%27&$resultFormat=DataArray" all_observations_csv_url = f"{link}/{version}/Observations?$filter=Datastream/Thing/id%20eq%20%27{number_of_things}%27&$resultFormat=CSV" datastreams_url = f"{thing_url}/Datastreams?$count=true&$top=1000" locations_url = f"{thing_url}/Locations?$count=true&$top=1000" else: thing_url = f"{link}/{version}/Things({number_of_things})" all_observations_geojson_url = f"{link}/{version}/Observations?$filter=Datastream/Thing/id%20eq%20%27{number_of_things}%27&$resultFormat=GeoJSON" all_observations_dataarray_url = f"{link}/{version}/Observations?$filter=Datastream/Thing/id%20eq%20%27{number_of_things}%27&$resultFormat=DataArray" all_observations_csv_url = f"{link}/{version}/Observations?$filter=Datastream/Thing/id%20eq%20%27{number_of_things}%27&$resultFormat=CSV" datastreams_url = f"{thing_url}/Datastreams?$count=true&$top=1000" locations_url = f"{thing_url}/Locations?$count=true&$top=1000" self.harvesting_vars["item_thing_url_json"] = thing_url self.harvesting_vars[ "item_all_observations_geojson_url" ] = all_observations_geojson_url self.harvesting_vars[ "item_all_observations_dataarray_url" ] = all_observations_dataarray_url self.harvesting_vars[ "item_all_observations_csv_url" ] = all_observations_csv_url thing_json = Utils( logger_properties=self.logger_properties ).open_sta_entity_links( link=thing_url, requests_properties=requests_properties ) datastreams_json = Utils( logger_properties=self.logger_properties ).open_sta_entity_links( link=datastreams_url, requests_properties=requests_properties ) locations_json = Utils( logger_properties=self.logger_properties ).open_sta_entity_links( link=locations_url, requests_properties=requests_properties ) if thing_json is not None and thing_json != {}: # Harvesting the STAC-Item ID, Title and Description # TODO: make ID, Title, and Description in a another function based on comparison of `items_tuples` ( self.harvesting_vars["item_id"], self.harvesting_vars["item_title"], self.harvesting_vars["item_description"], ) = ItemInfoHandler( logger_properties=self.logger_properties ).replace_item_info( thing_json=thing_json, item_tuples=item_tuples ) # Harvesting the STAC-Item Spatial and Temporal extent if locations_json is not None and locations_json != {}: if locations_json["@iot.count"] == 0: self.logger_properties["logger_level"] = "WARNING" self.logger_properties[ "logger_msg" ] = "The Thing does not have any Location. So, it will look for the location in `observedArea` attribute of `Datastream`, and if it couldn't find any coordinate there, the spatial extent of the item will be None." Logger(self.logger_properties) elif locations_json["@iot.count"] == 1: if ( locations_json.get("value", [])[0] .get("location", {}) .get("type") is not None ): self.harvesting_vars["item_geometry"] = locations_json[ "value" ][0]["location"]["type"] else: self.logger_properties["logger_level"] = "WARNING" self.logger_properties[ "logger_msg" ] = "The Location of the Thing does not have any geometry type. So, it will look for the location in `observedArea` attribute of Datastream, and if it couldn't find any geometry there, the geometry of the item will be None." Logger(self.logger_properties) if ( locations_json.get("value", [])[0] .get("location", {}) .get("coordinates") is not None ): self.harvesting_vars["item_bbox"] = [ locations_json["value"][0]["location"][ "coordinates" ] ] else: self.logger_properties["logger_level"] = "WARNING" self.logger_properties[ "logger_msg" ] = "The Location of the Thing does not have any coordinates. So, it will look for the location in `observedArea` attribute of Datastream, and if it couldn't find any coordinate there, the spatial extent of the item will be None." Logger(self.logger_properties) elif locations_json["@iot.count"] > 1: geometry_list = [] bbox_list = [] locations_number = Utils( self.logger_properties ).get_number_of_entities( link=thing_url, entity="Locations", filter=filter, requests_properties=requests_properties, ) list_of_locations_id = Utils( self.logger_properties ).get_list_of_entities_id( link=thing_url, entity="Locations", filter=filter, requests_properties=requests_properties, ) if locations_number == len(list_of_locations_id): for location_index, location_number in enumerate( list_of_locations_id ): locations_url_by_number = ( f"{thing_url}/Locations({location_number})" ) locations_json_by_number = Utils( logger_properties=self.logger_properties ).open_sta_entity_links( locations_url_by_number, requests_properties, ) if ( locations_json_by_number.get( "location", {} ).get("type") is not None ): geometry_list.append( locations_json_by_number["location"][ "type" ] ) else: self.logger_properties[ "logger_level" ] = "WARNING" self.logger_properties[ "logger_msg" ] = "The Location of the Thing does not have any geometry type in the list of locations. So, it will look for the location in `observedArea` attribute of Datastream, and if it couldn't find any geometry there, the geometry of the item will be None." Logger(self.logger_properties) if ( locations_json_by_number.get( "location", {} ).get("coordinates") is not None ): bbox_list.append( locations_json_by_number["location"][ "coordinates" ] ) else: self.logger_properties[ "logger_level" ] = "ERROR" self.logger_properties[ "logger_msg" ] = "The Location of the Thing does not have any geometry type in the list of locations. So, it will look for the location in `observedArea` attribute of Datastream, and if it couldn't find any coordinate there, the spatial extent of the item will be None." Logger(self.logger_properties) else: self.logger_properties["logger_level"] = "ERROR" self.logger_properties[ "logger_msg" ] = "The number of Locations in the Thing is not equal to the number of Locations in the list of locations. So, it will look for the location in `observedArea` attribute of Datastream, and if it couldn't find any geometry there, the geometry of the item will be None." Logger(self.logger_properties) if self.harvesting_vars.get("item_geometry") is None: self.harvesting_vars["item_geometry"] = geometry_list if self.harvesting_vars.get("item_bbox") is None: self.harvesting_vars["item_bbox"] = bbox_list if datastreams_json is not None and datastreams_json != {}: # Harvesting the STAC-Item Spatial and Temporal extent, Variable names, dimensions, description and units. Dimension names are lat, long and time. if datastreams_json["@iot.count"] == 0: if self.harvesting_vars["item_bbox"] is None: self.logger_properties["logger_level"] = "ERROR" self.logger_properties[ "logger_msg" ] = "The Thing does not have any Datastream. So, there is no temporal and spatial extent for the item." Logger(self.logger_properties) return else: self.logger_properties["logger_level"] = "ERROR" self.logger_properties[ "logger_msg" ] = "The Thing does not have any Datastream. So, there is no temporal extent for the item." Logger(self.logger_properties) return elif datastreams_json["@iot.count"] == 1: if datastreams_json.get("observedArea") is not None: if ( datastreams_json["observedArea"].get("type") is not None and self.harvesting_vars["item_geometry"] is None ): self.harvesting_vars[ "item_geometry" ] = datastreams_json["observedArea"]["type"] elif ( datastreams_json["observedArea"].get("type") is None and self.harvesting_vars["item_geometry"] is None ): self.logger_properties["logger_level"] = "WARNING" self.logger_properties[ "logger_msg" ] = "The Datastream does not have any geometry type. It tries to find out the geometry type automatically." Logger(self.logger_properties) if ( datastreams_json["observedArea"].get("coordinates") is not None and self.harvesting_vars["item_bbox"] is None ): self.harvesting_vars["item_bbox"] = [ datastreams_json["observedArea"]["coordinates"] ] elif ( datastreams_json["observedArea"].get("coordinates") is None and self.harvesting_vars["item_bbox"] is None ): self.logger_properties["logger_level"] = "ERROR" self.logger_properties[ "logger_msg" ] = "The Datastream does not have any coordinates. So, the spatial extent of the item will be None." Logger(self.logger_properties) return else: self.logger_properties["logger_level"] = "ERROR" self.logger_properties[ "logger_msg" ] = "The Datastream does not have any observedArea. So, the spatial extent of the item will be None." Logger(self.logger_properties) return if ( datastreams_json.get("name") is not None and datacube_extension is True ): self.harvesting_vars["item_variable_names"] = [ datastreams_json["name"] ] if ( datastreams_json.get("description") is not None and datacube_extension is True ): self.harvesting_vars["item_variable_descriptions"] = [ datastreams_json["description"] ] if ( datastreams_json.get("unitOfMeasurement") is not None and datacube_extension is True ): self.harvesting_vars["item_variable_units"] = [ datastreams_json["unitOfMeasurement"]["name"] ] if ( datastreams_json.get("phenomenonTime") is not None and datastreams_json.get("observedArea") is not None and datacube_extension is True ): self.harvesting_vars["item_variable_dimensions"] = [ ["lat", "long", "time"] ] self.harvesting_vars["item_dimension_names"] = [ "lat", "long", "time", ] elif ( datastreams_json.get("phenomenonTime") is not None and datastreams_json.get("observedArea") is None and datacube_extension is True ): self.harvesting_vars["item_variable_dimensions"] = [ ["time"] ] self.harvesting_vars["item_dimension_names"] = ["time"] elif ( datastreams_json.get("phenomenonTime") is None and datastreams_json.get("observedArea") is not None and datacube_extension is True ): self.harvesting_vars["item_variable_dimensions"] = [ ["lat", "long"] ] self.harvesting_vars["item_dimension_names"] = [ "lat", "long", ] else: self.logger_properties["logger_level"] = "ERROR" self.logger_properties[ "logger_msg" ] = "The Datastream does not have any dimensions. So, it cannot attach datacube extension to the item." Logger(self.logger_properties) return if datastreams_json.get("phenomenonTime") is not None: self.harvesting_vars[ "item_datetime" ] = datastreams_json["phenomenonTime"] else: self.logger_properties["logger_level"] = "ERROR" self.logger_properties[ "logger_msg" ] = "The Datastream does not have any phenomenonTime. So, the temporal extent of the item will be None." Logger(self.logger_properties) return elif datastreams_json["@iot.count"] > 1: geometry_list = [] bbox_list = [] datetime_list = [] variable_names_list = [] variable_descriptions_list = [] variable_units_list = [] variable_dimensions_list = [] dimension_names_list = [] datastreams_number = Utils( self.logger_properties ).get_number_of_entities( link=thing_url, entity="Datastreams", filter=filter, requests_properties=requests_properties, ) list_of_datastreams_id = Utils( self.logger_properties ).get_list_of_entities_id( link=thing_url, entity="Datastreams", filter=filter, requests_properties=requests_properties, ) if datastreams_number == len(list_of_datastreams_id): for datastream_index, datastream_number in enumerate( list_of_datastreams_id ): datastreams_url_by_number = ( f"{thing_url}/Datastreams({datastream_number})" ) datastreams_json_by_number = Utils( logger_properties=self.logger_properties ).open_sta_entity_links( link=datastreams_url_by_number, requests_properties=requests_properties, ) if ( datastreams_json_by_number.get("observedArea") is not None ): if ( datastreams_json_by_number.get( "observedArea", {} ).get("type") is not None and self.harvesting_vars.get( "item_geometry" ) is None ): geometry_list.append( datastreams_json_by_number[ "observedArea" ]["type"] ) elif ( datastreams_json_by_number.get( "observedArea", {} ).get("type") is None and self.harvesting_vars["item_geometry"] is None ): self.logger_properties[ "logger_level" ] = "WARNING" self.logger_properties[ "logger_msg" ] = "The Datastream does not have any geometry type. It tries to find out the geometry type automatically." Logger(self.logger_properties) if ( datastreams_json_by_number.get( "observedArea", {} ).get("coordinates") is not None and self.harvesting_vars.get("item_bbox") is None ): bbox_list.append( datastreams_json_by_number[ "observedArea" ]["coordinates"] ) elif ( datastreams_json_by_number.get( "observedArea", {} ).get("coordinates") is None and self.harvesting_vars.get("item_bbox") is None ): self.logger_properties[ "logger_level" ] = "WARNING" self.logger_properties[ "logger_msg" ] = "The Datastream does not have any coordinates. So, the spatial extent of the item will be None." Logger(self.logger_properties) else: self.logger_properties[ "logger_level" ] = "WARNING" self.logger_properties[ "logger_msg" ] = "The Datastream does not have any observedArea. So, the spatial extent of the item will be None." Logger(self.logger_properties) if ( datastreams_json_by_number.get("name") is not None and datacube_extension is True ): variable_names_list.append( datastreams_json_by_number["name"] ) if ( datastreams_json_by_number.get("description") is not None and datacube_extension is True ): variable_descriptions_list.append( datastreams_json_by_number["description"] ) if ( datastreams_json_by_number.get( "unitOfMeasurement" ) is not None and datacube_extension is True ): variable_units_list.append( datastreams_json_by_number[ "unitOfMeasurement" ]["name"] ) # TODO: We need to refactor this condition. Because in TS data we have one time dimension. if ( datastreams_json_by_number.get( "phenomenonTime" ) is not None and datastreams_json_by_number.get( "observedArea" ) is not None and datacube_extension is True ): variable_dimensions_list.append( ["lat", "long", "time"] ) dimension_names_list.append( ["lat", "long", "time"] ) elif ( datastreams_json_by_number.get( "phenomenonTime" ) is not None and datastreams_json_by_number.get( "observedArea" ) is None and datacube_extension is True ): variable_dimensions_list.append(["time"]) dimension_names_list.append(["time"]) elif ( datastreams_json_by_number.get( "phenomenonTime" ) is None and datastreams_json_by_number.get( "observedArea" ) is not None and datacube_extension is True ): variable_dimensions_list.append( ["lat", "long"] ) dimension_names_list.append(["lat", "long"]) elif ( datastreams_json_by_number.get( "phenomenonTime" ) is None and datastreams_json_by_number.get( "observedArea" ) is None and datacube_extension is True ): self.logger_properties[ "logger_level" ] = "WARNING" self.logger_properties[ "logger_msg" ] = "The Datastream does not have any dimensions." dimension_names_list.append(["time"]) variable_dimensions_list.append(["time"]) Logger(self.logger_properties) if ( datastreams_json_by_number.get( "phenomenonTime" ) is not None and self.harvesting_vars.get("item_datetime") is None ): datetime_list.append( datastreams_json_by_number[ "phenomenonTime" ] ) else: datetime_list.append(None) self.logger_properties[ "logger_level" ] = "WARNING" self.logger_properties[ "logger_msg" ] = "The Datastream does not have any phenomenonTime. So, the temporal extent of the item will be None." Logger(self.logger_properties) else: self.logger_properties["logger_level"] = "ERROR" self.logger_properties[ "logger_msg" ] = "The number of Locations in the Thing is not equal to the number of Locations in the list of locations. So, it will look for the location in `observedArea` attribute of Datastream, and if it couldn't find any geometry there, the geometry of the item will be None." Logger(self.logger_properties) return if self.harvesting_vars.get("item_geometry") is None: self.harvesting_vars["item_geometry"] = geometry_list if self.harvesting_vars.get("item_bbox") is None: self.harvesting_vars["item_bbox"] = bbox_list if self.harvesting_vars.get("item_datetime") is None: self.harvesting_vars["item_datetime"] = datetime_list if ( self.harvesting_vars.get("item_variable_names") is None or self.harvesting_vars.get("item_variable_names") == [] ): self.harvesting_vars[ "item_variable_names" ] = variable_names_list for var in variable_names_list: if filter is not None and filter != "": self.harvesting_vars[ "sta2stac_thing_variable_geojson_" + var ] = f"{link}/{version}/Observations?$filter=Datastream/name%20eq%20%27{var}%27&$resultFormat=GeoJSON&{filter}" self.harvesting_vars[ "sta2stac_thing_variable_csv_" + var ] = f"{link}/{version}/Observations?$filter=Datastream/name%20eq%20%27{var}%27&$resultFormat=CSV&{filter}" self.harvesting_vars[ "sta2stac_thing_variable_dataarray_" + var ] = f"{link}/{version}/Observations?$filter=Datastream/name%20eq%20%27{var}%27&$resultFormat=DataArray&{filter}" else: self.harvesting_vars[ "sta2stac_thing_variable_geojson_" + var ] = f"{link}/{version}/Observations?$filter=Datastream/name%20eq%20%27{var}%27&$resultFormat=GeoJSON" self.harvesting_vars[ "sta2stac_thing_variable_csv_" + var ] = f"{link}/{version}/Observations?$filter=Datastream/name%20eq%20%27{var}%27&$resultFormat=CSV" self.harvesting_vars[ "sta2stac_thing_variable_dataarray_" + var ] = f"{link}/{version}/Observations?$filter=Datastream/name%20eq%20%27{var}%27&$resultFormat=DataArray" if ( self.harvesting_vars.get("item_variable_descriptions") is None or self.harvesting_vars.get( "item_variable_descriptions" ) == [] ): self.harvesting_vars[ "item_variable_descriptions" ] = variable_descriptions_list if ( self.harvesting_vars.get("item_variable_units") is None or self.harvesting_vars.get("item_variable_units") == [] ): self.harvesting_vars[ "item_variable_units" ] = variable_units_list if ( self.harvesting_vars.get("item_variable_dimensions") is None or self.harvesting_vars.get("item_variable_dimensions") == [] ): self.harvesting_vars[ "item_variable_dimensions" ] = variable_dimensions_list if ( self.harvesting_vars.get("item_dimension_names") is None or self.harvesting_vars.get("item_dimension_names") == [] ): self.harvesting_vars[ "item_dimension_names" ] = dimension_names_list self.harvesting_vars["item_footprint"] = Processing( self.logger_properties ).geometry( self.harvesting_vars["item_bbox"], self.harvesting_vars["item_geometry"], ) if self.harvesting_vars.get("collection_footprint") is None: self.harvesting_vars[ "collection_footprint" ] = self.harvesting_vars["item_footprint"] Processing(logger_properties=self.logger_properties).item( self.harvesting_vars ) self.harvesting_vars["collection_footprint"] = geometry.shape( self.harvesting_vars["item_footprint"] ).union( geometry.shape(self.harvesting_vars["collection_footprint"]) ) self.harvesting_vars["collection_bbox"] = list( self.harvesting_vars["collection_footprint"].bounds ) if self.harvesting_vars.get("collection_datetime") is None: self.harvesting_vars["collection_datetime"] = [] self.harvesting_vars["collection_datetime"].extend( self.harvesting_vars["item_datetime"] ) # To arrange datetime and bbox return self.harvesting_vars