''' Sqlalchemy version: 1.2.15 Python version: 3.7 ''' import json import os import uuid from datetime import datetime from sqlalchemy.orm import session from sqlalchemy import asc, desc, func from dotenv import load_dotenv, find_dotenv import requests from db.models import ( Observation, create_pg_session, Dataset, Procedure, Phenomenon, Platform, Format ) def main(): ''' main method ''' pg_session: session = create_pg_session() platform_sta_identifier = "pechgraben_piezometer" # sensor = "bohrloch1" # sensor_list = os.environ.get('PIEZOMETER_GSCHLIEFGRABEN_SENSORS', []) sensor_list = json.loads(os.environ['PIEZOMETER_GSCHLIEFGRABEN_SENSORS']) url = 'https://jaa5ixl2y0.execute-api.ap-southeast-2.amazonaws.com/v1/data' params = {} headers = {'content-type': 'application/json'} resp = requests.get(url=url, params=params, headers=headers) data: json = resp.json() # Check the JSON Response Content documentation below for sensor in sensor_list: pg_query = pg_session.query(Dataset) \ .join(Procedure) \ .join(Phenomenon) \ .filter(Procedure.sta_identifier == sensor.lower()) elevation_dataset: Dataset = pg_query.filter( Phenomenon.sta_identifier == "Elevation").first() if not elevation_dataset: print("Sensor " + sensor + " ist noch nicht angelegt!") continue if not elevation_dataset.is_published: elevation_dataset.is_published = 1 elevation_dataset.is_hidden = 0 elevation_dataset.dataset_type = "timeseries" elevation_dataset.observation_type = "simple" elevation_dataset.value_type = "text" pg_session.commit() platform_exists: bool = pg_session.query(Platform.id).filter_by( sta_identifier=platform_sta_identifier).scalar() is not None # if platform_exists: # sensor_platform = pg_session.query(Platform.id) \ # .filter(Platform.sta_identifier == platform_sta_identifier) \ # .first() # elevation_dataset.fk_platform_id = sensor_platform.id if not platform_exists: sensor_platform = Platform() max_id = pg_session.query(func.max(Platform.id)).scalar() # sensor_platform.id = max_id + 1 sensor_platform.sta_identifier = platform_sta_identifier.lower() sensor_platform.identifier = platform_sta_identifier.lower() sensor_platform.name = platform_sta_identifier.lower() elevation_dataset.platform = sensor_platform else: sensor_platform = pg_session.query(Platform.id) \ .filter(Platform.sta_identifier == platform_sta_identifier) \ .first() elevation_dataset.fk_platform_id = sensor_platform.id format_exists: bool = pg_session.query(Format.id).filter_by( definition="http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement" ).scalar() is not None if format_exists: sensor_format = pg_session.query(Format.id) \ .filter(Format.definition == "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement") \ .first() elevation_dataset.fk_format_id = sensor_format.id if sensor in data: create_observation(elevation_dataset, sensor, data, pg_session) pg_session.commit() first_elevation_observation = pg_session.query(Observation) \ .filter(Observation.fk_dataset_id == elevation_dataset.id) \ .order_by(asc('sampling_time_start')) \ .first() if first_elevation_observation is not None: elevation_dataset.first_time = first_elevation_observation.sampling_time_start elevation_dataset.first_value = first_elevation_observation.value_quantity elevation_dataset.fk_first_observation_id = first_elevation_observation.id last_elevation_observation = pg_session.query(Observation) \ .filter(Observation.fk_dataset_id == elevation_dataset.id) \ .order_by(desc('sampling_time_start')) \ .first() if last_elevation_observation is not None: elevation_dataset.last_time = last_elevation_observation.sampling_time_start elevation_dataset.last_value = last_elevation_observation.value_quantity elevation_dataset.fk_last_observation_id = last_elevation_observation.id pg_session.commit() pg_session.close() def create_observation(elevation_dataset: Dataset, sensor_key: str, data: json, db_session: session): ''' create observation in db''' print("Sesnor key exist in JSON data") sensor_object = data[sensor_key] zeitstempel = sensor_object["zeitstempel"] abstich = sensor_object["abstich"] date_obj = datetime.strptime( zeitstempel, '%Y-%m-%d %H:%M:%S') existing_observation: bool = ( db_session.query(Observation) .filter(Observation.result_time == date_obj, Observation.fk_dataset_id == elevation_dataset.id) .one_or_none() ) # Can we insert this observation? if existing_observation is None: # insert bew observation new_observation: Observation = Observation() # new_observation.id = max_id new_observation.sta_identifier = str(uuid.uuid4()) new_observation.result_time = date_obj new_observation.sampling_time_start = new_observation.result_time new_observation.sampling_time_end = new_observation.result_time new_observation.value_type = "quantity" new_observation.value_quantity = abstich new_observation.fk_dataset_id = elevation_dataset.id db_session.add(new_observation) def test(): ''' test method ''' sensor_key = 'bohrloch1' url = 'https://jaa5ixl2y0.execute-api.ap-southeast-2.amazonaws.com/v1/data' params = {} headers = {'content-type': 'application/json'} resp = requests.get(url=url, params=params, headers=headers) data = resp.json() # Check the JSON Response Content documentation below # sensor_data = json.dumps(data) if sensor_key in data: print("Sesnor key exist in JSON data") sensor_object = data[sensor_key] zeitstempel = sensor_object["zeitstempel"] abstich = sensor_object["abstich"] date_obj = datetime.strptime( zeitstempel, '%Y-%m-%d %H:%M:%S') new_observation: Observation = Observation() # new_observation.id = max_id new_observation.sta_identifier = str(uuid.uuid4()) new_observation.result_time = date_obj new_observation.sampling_time_start = new_observation.result_time new_observation.sampling_time_end = new_observation.result_time new_observation.value_type = "quantity" new_observation.value_quantity = abstich # new_observation.fk_dataset_id = dataset.id if __name__ == "__main__": load_dotenv(find_dotenv()) sensor_list1 = os.environ.get('PIEZOMETER_GSCHLIEFGRABEN_SENSORS', []) print(f'sensors: {sensor_list1} .') main()