''' module for importing observations ''' import csv # import requests from datetime import datetime from typing import List import uuid from pyproj import Transformer # from insert_sensor.wrapper import (Offering, FoI, Procedure) from sqlalchemy.orm import session from sqlalchemy import asc, desc from db.models import ( Observation, create_pg_session, Dataset, Procedure, Phenomenon, Platform, Format ) def main(): ''' main method ''' pg_session: session = create_pg_session() platform_sta_identifier = "voegelsberg_tachymeter" with open('voegelsberg/data.txt', 'rt', encoding="utf-8") as csvfile: spamreader = csv.DictReader(csvfile, delimiter=';', quotechar='"') for row in spamreader: # print(row) sensor: str = row['Punktnummer'] pg_query = pg_session.query(Dataset) \ .join(Procedure) \ .join(Phenomenon) \ .filter(Procedure.sta_identifier == sensor) location_dataset: Dataset = pg_query.filter( Phenomenon.sta_identifier == "TachymeterLocation").first() if not location_dataset: print("Sensor " + sensor + " ist noch nicht in der Datenbank angelegt!") continue platform_exists: bool = pg_session.query(Platform.id).filter_by( sta_identifier=platform_sta_identifier).scalar() is not None if not platform_exists: sensor_platform = Platform() # max_id = pg_session.query(func.max(Platform.id)).scalar() # sensor_platform.id = max_id + 1 sensor_platform.sta_identifier = platform_sta_identifier.lower() sensor_platform.identifier = platform_sta_identifier.lower() sensor_platform.name = platform_sta_identifier.lower() location_dataset.platform = sensor_platform else: sensor_platform = pg_session.query(Platform.id) \ .filter(Platform.sta_identifier == platform_sta_identifier) \ .first() location_dataset.fk_platform_id = sensor_platform.id format_exists: bool = pg_session.query(Format.id).filter_by( definition="http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_GeometryObservation" ).scalar() is not None if format_exists: sensor_format = pg_session.query(Format.id) \ .filter(Format.definition == "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_GeometryObservation" ) \ .first() location_dataset.fk_format_id = sensor_format.id pg_session.commit() successfully_inserted = create_observation( location_dataset, row, pg_session) # commit new observations: if successfully_inserted: if not location_dataset.is_published: location_dataset.is_published = 1 location_dataset.is_hidden = 0 location_dataset.dataset_type = "trajectory" location_dataset.observation_type = "simple" location_dataset.value_type = "geometry" pg_session.commit() # last_location_observation = pg_session.query(Observation) \ # .filter(Observation.fk_dataset_id == location_dataset.id) \ # .order_by(desc('sampling_time_start')) \ # .first() # if last_location_observation is not None: # location_dataset.last_time = last_location_observation.sampling_time_start # #location_dataset.last_value = last_location_observation.value_quantity # location_dataset.fk_last_observation_id = last_location_observation.id # first_location_observation = pg_session.query(Observation) \ # .filter(Observation.fk_dataset_id == location_dataset.id) \ # .order_by(asc('sampling_time_start')) \ # .first() # if first_location_observation is not None: # location_dataset.first_time = first_location_observation.sampling_time_start # # roll_dataset.first_value = first_location_observation.value_quantity # location_dataset.fk_first_observation_id = first_location_observation.id # pg_session.commit() # for loop sensors end actualize_first_last_observations() pg_session.close() def create_observation(location_dataset: Dataset, data, pg_session: session): ''' create observation in db''' # print("Sesnor key exist in JSON data") transprojr = Transformer.from_crs(31254, 4326, always_xy=True) x_1, y_1, z_1 = (float(data['Y']), float(data['X']), float(data['H'])) cord_x, cord_y = map(float, transprojr.transform(x_1, y_1)) print((cord_x, cord_y)) # (11.597409730065536, 47.27196543449542) sensor: str = data['Punktnummer'] zeitstempel = data['Epoche'] date_obj = datetime.strptime(zeitstempel, '%d.%m.%Y').isoformat() existing_observation: bool = ( pg_session.query(Observation) .filter(Observation.result_time == date_obj, Observation.fk_dataset_id == location_dataset.id) .one_or_none() ) # Can we insert this observation? if existing_observation is None: # insert bew observation new_observation: Observation = Observation() # new_observation.id = max_id new_observation.sta_identifier = str(uuid.uuid4()) new_observation.result_time = date_obj new_observation.sampling_time_start = new_observation.result_time new_observation.sampling_time_end = new_observation.result_time new_observation.value_type = "geometry" new_observation.value_geometry = f'SRID=4326;POINTZ({cord_x} {cord_y} {z_1})' new_observation.fk_dataset_id = location_dataset.id pg_session.add(new_observation) print(f"new observation with result time {new_observation.result_time} " f"for drill hole {sensor} succesfully imported!") return True else: print(f"observation with result time {existing_observation.result_time} " f"for tachymeter {sensor} already exists!") return False def actualize_first_last_observations(): ''' iterate throug all datasets of Voregelsberg project area and actualize last and first corresponding observations''' pg_session: session = create_pg_session() platform_sta_identifier = "voegelsberg_tachymeter" # sensor_platform = pg_session.query(Platform.id) \ # .filter(Platform.sta_identifier == platform_sta_identifier) \ # .first() voegelsberg_datasets: List[Dataset] = [] voegelsberg_datasets = pg_session.query(Dataset) \ .join(Procedure) \ .join(Phenomenon) \ .join(Platform) \ .filter(Platform.sta_identifier == platform_sta_identifier).all() for location_dataset in voegelsberg_datasets: ''' iterate throug all datasets of Voregelsberg project area and actualize last and first corresponding observations''' last_location_observation = pg_session.query(Observation) \ .filter(Observation.fk_dataset_id == location_dataset.id) \ .order_by(desc('sampling_time_start')) \ .first() if last_location_observation is not None: location_dataset.last_time = last_location_observation.sampling_time_start # location_dataset.last_value = last_location_observation.value_quantity location_dataset.fk_last_observation_id = last_location_observation.id first_location_observation = pg_session.query(Observation) \ .filter(Observation.fk_dataset_id == location_dataset.id) \ .order_by(asc('sampling_time_start')) \ .first() if first_location_observation is not None: location_dataset.first_time = first_location_observation.sampling_time_start # roll_dataset.first_value = first_location_observation.value_quantity location_dataset.fk_first_observation_id = first_location_observation.id pg_session.commit() def get_xml(offering, procedure, foi, result_time, identifier): ''' """ Prepares the body of a InsertSensor request for JSON biding. :param offering: an instance of class Offering.Type object. :param Procedure: instance of class Procedure. type object. :param foi: feature of interest. Instance of FoI :param sensor_type: SensorType object :return: valid body for an InsertSensor request. """''' offering_name = offering.name # offering_label = offering.label # procedure_name = procedure.name procedure_identifier = procedure.id # featureName = featureID = cordX = cordY = height = h_unit = z_unit = coordinates = "" if foi is not None: # check if feature of interest should be declare # feature_id = 'https://geomon.geologie.ac.at/52n-sos-webapp/api/features/' + \ # str(foi.fid) # URL format cord_x = str(foi.x) # longitude degrees, float cord_y = str(foi.y) # latitude degrees, float cord_z = str(foi.z) coordinates = cord_x + " " + cord_y + " " + cord_z feature_id = foi.fid # "feature location" feature_name = foi.name # "feature location" else: pass xml = f'{offering_name}{result_time}descriptionhereIdentifierhereIam{coordinates}{feature_id}{feature_name}11.597409730065536 47.27196543449542{coordinates}' return xml if __name__ == '__main__': main()