''' Sqlalchemy version: 1.2.15 Python version: 3.7 ''' import os import uuid from sqlalchemy.orm import session from dotenv import load_dotenv, find_dotenv import requests from datetime import datetime from db.models import ( Observation, create_pg_session, Dataset, Procedure, Phenomenon, Platform, Format ) def main(): ''' main method ''' pg_session: session = create_pg_session() platform_sta_identifier = "pechgraben_piezometer" sensor = "bohrloch1" pg_query = pg_session.query(Dataset) \ .join(Procedure) \ .join(Phenomenon) \ .filter(Procedure.sta_identifier == sensor.lower()) elevation_dataset: Dataset = pg_query.filter( Phenomenon.sta_identifier == "Elevation").first() if not elevation_dataset: print("Sensor " + sensor + " ist noch nicht angelegt!") exit() # if not elevation_dataset.is_published: # elevation_dataset.is_published = 1 # elevation_dataset.is_hidden = 0 # elevation_dataset.dataset_type = "timeseries" # elevation_dataset.observation_type = "simple" # elevation_dataset.value_type = "text" # pg_session.commit() platform_exists: bool = pg_session.query(Platform.id).filter_by( sta_identifier=platform_sta_identifier).scalar() is not None if platform_exists: sensor_platform = pg_session.query(Platform.id) \ .filter(Platform.sta_identifier == platform_sta_identifier) \ .first() elevation_dataset.fk_platform_id = sensor_platform.id format_exists: bool = pg_session.query(Format.id).filter_by( definition="http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement" ).scalar() is not None if format_exists: sensor_format = pg_session.query(Format.id) \ .filter(Format.definition == "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement") \ .first() elevation_dataset.fk_format_id = sensor_format.id def test(): ''' test method ''' sensor_key = 'bohrloch1' url = 'https://jaa5ixl2y0.execute-api.ap-southeast-2.amazonaws.com/v1/data' params = {} headers = {'content-type': 'application/json'} resp = requests.get(url=url, params=params, headers=headers) data = resp.json() # Check the JSON Response Content documentation below # sensor_data = json.dumps(data) if sensor_key in data: print("Sesnor key exist in JSON data") sensor_object = data[sensor_key] zeitstempel = sensor_object["zeitstempel"] abstich = sensor_object["abstich"] date_obj = datetime.strptime( zeitstempel, '%Y:%m:%d %H:%M:%S') new_observation: Observation = Observation() # new_observation.id = max_id new_observation.sta_identifier = str(uuid.uuid4()) new_observation.result_time = date_obj new_observation.sampling_time_start = new_observation.result_time new_observation.sampling_time_end = new_observation.result_time new_observation.value_type = "quantity" new_observation.value_quantity = abstich # new_observation.fk_dataset_id = dataset.id if __name__ == "__main__": load_dotenv(find_dotenv()) sensor_list1 = os.environ.get('GLASFASER_GSCHLIEFGRABEN_SENSORS', []) print(f'sensors: {sensor_list1} .') test()