From 14d3168d0eb8e9a6b8285aa755265407e340c343 Mon Sep 17 00:00:00 2001 From: Arno Kaimbacher Date: Tue, 15 Mar 2022 17:39:12 +0100 Subject: [PATCH] - cron job for hourly importing piezometer data --- .../import_piezometer_observations.py | 98 ------------ .../import_piezometer_observations_hourly.py | 140 ++++++++++++++++++ 2 files changed, 140 insertions(+), 98 deletions(-) delete mode 100644 gschliefgraben_piezometer/import_piezometer_observations.py create mode 100644 gschliefgraben_piezometer/import_piezometer_observations_hourly.py diff --git a/gschliefgraben_piezometer/import_piezometer_observations.py b/gschliefgraben_piezometer/import_piezometer_observations.py deleted file mode 100644 index 4bd734a..0000000 --- a/gschliefgraben_piezometer/import_piezometer_observations.py +++ /dev/null @@ -1,98 +0,0 @@ -''' -Sqlalchemy version: 1.2.15 -Python version: 3.7 -''' - -import os -import uuid -from sqlalchemy.orm import session -from dotenv import load_dotenv, find_dotenv -import requests -from datetime import datetime -from db.models import ( - Observation, - create_pg_session, - Dataset, - Procedure, - Phenomenon, - Platform, - Format -) - - -def main(): - ''' main method ''' - pg_session: session = create_pg_session() - platform_sta_identifier = "pechgraben_piezometer" - sensor = "bohrloch1" - - pg_query = pg_session.query(Dataset) \ - .join(Procedure) \ - .join(Phenomenon) \ - .filter(Procedure.sta_identifier == sensor.lower()) - elevation_dataset: Dataset = pg_query.filter( - Phenomenon.sta_identifier == "Elevation").first() - if not elevation_dataset: - print("Sensor " + sensor + " ist noch nicht angelegt!") - exit() - # if not elevation_dataset.is_published: - # elevation_dataset.is_published = 1 - # elevation_dataset.is_hidden = 0 - # elevation_dataset.dataset_type = "timeseries" - # elevation_dataset.observation_type = "simple" - # elevation_dataset.value_type = "text" - # pg_session.commit() - - platform_exists: bool = pg_session.query(Platform.id).filter_by( - sta_identifier=platform_sta_identifier).scalar() is not None - if platform_exists: - sensor_platform = pg_session.query(Platform.id) \ - .filter(Platform.sta_identifier == platform_sta_identifier) \ - .first() - elevation_dataset.fk_platform_id = sensor_platform.id - - format_exists: bool = pg_session.query(Format.id).filter_by( - definition="http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement" - ).scalar() is not None - if format_exists: - sensor_format = pg_session.query(Format.id) \ - .filter(Format.definition == "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement") \ - .first() - elevation_dataset.fk_format_id = sensor_format.id - - -def test(): - ''' test method ''' - sensor_key = 'bohrloch1' - url = 'https://jaa5ixl2y0.execute-api.ap-southeast-2.amazonaws.com/v1/data' - - params = {} - headers = {'content-type': 'application/json'} - - resp = requests.get(url=url, params=params, headers=headers) - data = resp.json() # Check the JSON Response Content documentation below - # sensor_data = json.dumps(data) - if sensor_key in data: - print("Sesnor key exist in JSON data") - sensor_object = data[sensor_key] - zeitstempel = sensor_object["zeitstempel"] - abstich = sensor_object["abstich"] - date_obj = datetime.strptime( - zeitstempel, '%Y:%m:%d %H:%M:%S') - - new_observation: Observation = Observation() - # new_observation.id = max_id - new_observation.sta_identifier = str(uuid.uuid4()) - new_observation.result_time = date_obj - new_observation.sampling_time_start = new_observation.result_time - new_observation.sampling_time_end = new_observation.result_time - new_observation.value_type = "quantity" - new_observation.value_quantity = abstich - # new_observation.fk_dataset_id = dataset.id - - -if __name__ == "__main__": - load_dotenv(find_dotenv()) - sensor_list1 = os.environ.get('GLASFASER_GSCHLIEFGRABEN_SENSORS', []) - print(f'sensors: {sensor_list1} .') - test() diff --git a/gschliefgraben_piezometer/import_piezometer_observations_hourly.py b/gschliefgraben_piezometer/import_piezometer_observations_hourly.py new file mode 100644 index 0000000..8059ca4 --- /dev/null +++ b/gschliefgraben_piezometer/import_piezometer_observations_hourly.py @@ -0,0 +1,140 @@ +''' +Sqlalchemy version: 1.2.15 +Python version: 3.7 +''' + +import json +import os +import uuid +from sqlalchemy.orm import session +from dotenv import load_dotenv, find_dotenv +import requests +from datetime import datetime +from db.models import ( + Observation, + create_pg_session, + Dataset, + Procedure, + Phenomenon, + Platform, + Format +) + + +def main(): + ''' main method ''' + pg_session: session = create_pg_session() + platform_sta_identifier = "pechgraben_piezometer" + # sensor = "bohrloch1" + # sensor_list = os.environ.get('GLASFASER_GSCHLIEFGRABEN_SENSORS', []) + sensor_list = json.loads(os.environ['GLASFASER_GSCHLIEFGRABEN_SENSORS']) + + url = 'https://jaa5ixl2y0.execute-api.ap-southeast-2.amazonaws.com/v1/data' + params = {} + headers = {'content-type': 'application/json'} + resp = requests.get(url=url, params=params, headers=headers) + data: json = resp.json() # Check the JSON Response Content documentation below + + for sensor in sensor_list: + pg_query = pg_session.query(Dataset) \ + .join(Procedure) \ + .join(Phenomenon) \ + .filter(Procedure.sta_identifier == sensor.lower()) + elevation_dataset: Dataset = pg_query.filter( + Phenomenon.sta_identifier == "Elevation").first() + if not elevation_dataset: + print("Sensor " + sensor + " ist noch nicht angelegt!") + exit() + if not elevation_dataset.is_published: + elevation_dataset.is_published = 1 + elevation_dataset.is_hidden = 0 + elevation_dataset.dataset_type = "timeseries" + elevation_dataset.observation_type = "simple" + elevation_dataset.value_type = "text" + pg_session.commit() + + platform_exists: bool = pg_session.query(Platform.id).filter_by( + sta_identifier=platform_sta_identifier).scalar() is not None + if platform_exists: + sensor_platform = pg_session.query(Platform.id) \ + .filter(Platform.sta_identifier == platform_sta_identifier) \ + .first() + elevation_dataset.fk_platform_id = sensor_platform.id + + format_exists: bool = pg_session.query(Format.id).filter_by( + definition="http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement" + ).scalar() is not None + if format_exists: + sensor_format = pg_session.query(Format.id) \ + .filter(Format.definition == "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement") \ + .first() + elevation_dataset.fk_format_id = sensor_format.id + if sensor in data: + create_observation(elevation_dataset, sensor, data, pg_session) + pg_session.commit() + pg_session.close() + + +def create_observation(elevation_dataset: Dataset, sensor_key: str, data: json, db_session: session): + ''' create observation in db''' + print("Sesnor key exist in JSON data") + sensor_object = data[sensor_key] + zeitstempel = sensor_object["zeitstempel"] + abstich = sensor_object["abstich"] + date_obj = datetime.strptime( + zeitstempel, '%Y-%m-%d %H:%M:%S') + + existing_observation: bool = ( + db_session.query(Observation) + .filter(Observation.result_time == date_obj, Observation.fk_dataset_id == elevation_dataset.id) + .one_or_none() + ) + # Can we insert this observation? + if existing_observation is None: + # insert bew observation + new_observation: Observation = Observation() + # new_observation.id = max_id + new_observation.sta_identifier = str(uuid.uuid4()) + new_observation.result_time = date_obj + new_observation.sampling_time_start = new_observation.result_time + new_observation.sampling_time_end = new_observation.result_time + new_observation.value_type = "quantity" + new_observation.value_quantity = abstich + db_session.add(new_observation) + + +def test(): + ''' test method ''' + sensor_key = 'bohrloch1' + url = 'https://jaa5ixl2y0.execute-api.ap-southeast-2.amazonaws.com/v1/data' + + params = {} + headers = {'content-type': 'application/json'} + + resp = requests.get(url=url, params=params, headers=headers) + data = resp.json() # Check the JSON Response Content documentation below + # sensor_data = json.dumps(data) + if sensor_key in data: + print("Sesnor key exist in JSON data") + sensor_object = data[sensor_key] + zeitstempel = sensor_object["zeitstempel"] + abstich = sensor_object["abstich"] + date_obj = datetime.strptime( + zeitstempel, '%Y-%m-%d %H:%M:%S') + + new_observation: Observation = Observation() + # new_observation.id = max_id + new_observation.sta_identifier = str(uuid.uuid4()) + new_observation.result_time = date_obj + new_observation.sampling_time_start = new_observation.result_time + new_observation.sampling_time_end = new_observation.result_time + new_observation.value_type = "quantity" + new_observation.value_quantity = abstich + # new_observation.fk_dataset_id = dataset.id + + +if __name__ == "__main__": + load_dotenv(find_dotenv()) + sensor_list1 = os.environ.get('GLASFASER_GSCHLIEFGRABEN_SENSORS', []) + print(f'sensors: {sensor_list1} .') + main()