From 73d21775ddcf9e7640d5774970ed2fe4dab8943a Mon Sep 17 00:00:00 2001 From: Arno Kaimbacher Date: Wed, 4 May 2022 16:49:27 +0200 Subject: [PATCH] - add cron job for automatically updating automatic inclinometers --- .../import_observations_ampfwang_kb1.py | 2 +- ...date_daily_automatic_inclinometers_cron.py | 301 ++++++++++++++++++ 2 files changed, 302 insertions(+), 1 deletion(-) create mode 100644 automatic_inclinometer/update_daily_automatic_inclinometers_cron.py diff --git a/automatic_inclinometer/import_observations_ampfwang_kb1.py b/automatic_inclinometer/import_observations_ampfwang_kb1.py index 76fcabc..38f04ef 100644 --- a/automatic_inclinometer/import_observations_ampfwang_kb1.py +++ b/automatic_inclinometer/import_observations_ampfwang_kb1.py @@ -37,7 +37,7 @@ def main(): feature_of_interest = 'GSA02A-010-1210' # Ampflwang KB1 # sensor name in postgis db # sensor = 'wolfsegg_kb1_0' - platform = 'ampflwang_kb1_inclinometer' + platform = 'ampflwang_inclinometer' sensor_env_list = os.getenv('AMPFLWANG_KB1_SENSORS').replace('\n', '') sensor_list = json.loads(sensor_env_list) diff --git a/automatic_inclinometer/update_daily_automatic_inclinometers_cron.py b/automatic_inclinometer/update_daily_automatic_inclinometers_cron.py new file mode 100644 index 0000000..105e887 --- /dev/null +++ b/automatic_inclinometer/update_daily_automatic_inclinometers_cron.py @@ -0,0 +1,301 @@ +''' +Tutorial link: https://realpython.com/flask-connexion-rest-api-part-2/ +https://github.com/realpython/materials/blob/master/flask-connexion-rest-part-2/version_1/people.py +Sqlalchemy version: 1.2.15 +Python version: 3.10 +''' + +# from itertools import count +import os +import json +import uuid +import time +from typing import List +from dotenv import load_dotenv, find_dotenv +from sqlalchemy.orm import session +from sqlalchemy import asc, desc +# from sqlalchemy.sql import or_ +from db.fb_models import (create_session, FbObservation, Catena) +from db.models import ( + Observation, create_pg_session, + Dataset, Procedure, Phenomenon) + + +def main(): + ''' main method ''' + # update automatic observations for KB1 Gschliefgraben + feature_of_interest = 'TAC003-020-0521' # Gschliefgraben KB1 + platform_name = 'gschliefgraben_inclinometer' + sensor_list_conv = 'GSCHLIEFGRABEN_KB1_SENSORS' + update(feature_of_interest, platform_name, sensor_list_conv) + + # update automatic observations for KB2 Gschliefgraben + feature_of_interest = 'TAC005-013-0521' # Gschliefgraben KB2 + platform_name = 'gschliefgraben_inclinometer' + sensor_list_conv = 'GSCHLIEFGRABEN_KB2_SENSORS' + update(feature_of_interest, platform_name, sensor_list_conv) + + # update automatic observations for KB1 Ampflwang + feature_of_interest = 'GSA02A-010-1210' # Ampflwang KB1 + platform_name = 'ampflwang_inclinometer' + sensor_list_conv = 'AMPFLWANG_KB1_SENSORS' + update(feature_of_interest, platform_name, sensor_list_conv) + + # update automatic observations for KB2 Ampflwang + feature_of_interest = 'GSA02B-007-1210' # KB2 + platform_name = 'ampflwang_inclinometer' + sensor_list_conv = 'AMPFLWANG_KB2_SENSORS' + update(feature_of_interest, platform_name, sensor_list_conv) + + +def update(feature_of_interest, platform_name, sensor_list_conv): + ''' starting update method ''' + + pg_session: session = create_pg_session() + # feature_of_interest = 'TAC003-020-0521' # Gschliefgraben KB1 + # platform_name = 'gschliefgraben_inclinometer' + sensor_env_list = os.getenv(sensor_list_conv).replace('\n', '') + # sensor_list = json.loads(os.environ['GLASFASER_GSCHLIEFGRABEN_SENSORS']) + sensor_list = json.loads(sensor_env_list) + + firebird_session: session = create_session() + # this will print elements along with their index value + for sensor_id, sensor in enumerate(sensor_list): + print( + "========================= Start =========================" + f"start update script: for sensor {sensor} at " + f"feature {feature_of_interest} at platform {platform_name} \n") + + pg_query = pg_session.query(Dataset) \ + .join(Procedure) \ + .join(Phenomenon) \ + .filter(Procedure.sta_identifier == sensor.lower()) + # slope_dataset: Dataset = pg_query.filter( + # Phenomenon.sta_identifier == "Slope").first() + roll_dataset: Dataset = pg_query.filter( + Phenomenon.sta_identifier == "Roll").first() + + slope_dataset: Dataset = pg_query.filter( + Phenomenon.sta_identifier == "Slope").first() + + temperature_dataset: Dataset = pg_query.filter( + Phenomenon.sta_identifier == "InSystemTemperature").first() + + query_count = firebird_session.query(FbObservation).join(FbObservation.catena) \ + .filter(FbObservation.sensore == sensor_id) \ + .filter(Catena.name == feature_of_interest) \ + .filter(FbObservation.data >= slope_dataset.last_time) \ + .count() + if query_count == 0: + print(f"sensor {sensor} for platform {platform_name} " + f"doesn't have any updated observations " + f"later than {slope_dataset.last_time} in firebird database! \n") + # hop to next for iteration, next sensor in list, don't insert any observations + continue + + filtered_last_resultime_firebird_observations: List[FbObservation] = [] + query = firebird_session.query(FbObservation).join(FbObservation.catena) \ + .filter(FbObservation.sensore == sensor_id) \ + .filter(Catena.name == feature_of_interest) \ + .filter(FbObservation.data >= slope_dataset.last_time) + + # print (query.statement.compile(dialect=firebird.dialect())) + filtered_last_resultime_firebird_observations = query.all() + firebird_session.close() + + # insert the new observation from firebird db into postgresql: + create_db_observations(filtered_last_resultime_firebird_observations, roll_dataset, + slope_dataset, temperature_dataset, pg_session) + + # commit new observations: + pg_session.commit() + + # set is_published to true, if dataset hadn't any observations before update + if len(roll_dataset.observations) > 0: + # if not published yet, publish the roll dataset + if not roll_dataset.is_published: + roll_dataset.is_published = 1 + roll_dataset.is_hidden = 0 + roll_dataset.dataset_type = "timeseries" + roll_dataset.observation_type = "simple" + roll_dataset.value_type = "quantity" + + if len(slope_dataset.observations) > 0: + # if not published yet, publish the roll dataset + if not slope_dataset.is_published: + slope_dataset.is_published = 1 + slope_dataset.is_hidden = 0 + slope_dataset.dataset_type = "timeseries" + slope_dataset.observation_type = "simple" + slope_dataset.value_type = "quantity" + + if len(temperature_dataset.observations) > 0: + # if not published yet, publish the temperature dataset + if not temperature_dataset.is_published: + temperature_dataset.is_published = 1 + temperature_dataset.is_hidden = 0 + temperature_dataset.dataset_type = "timeseries" + temperature_dataset.observation_type = "simple" + temperature_dataset.value_type = "quantity" + pg_session.commit() + + # set first and last slope observations of slope dataset + first_slope_observation = pg_session.query(Observation) \ + .filter(Observation.fk_dataset_id == slope_dataset.id) \ + .order_by(asc('sampling_time_start')) \ + .first() + if first_slope_observation is not None: + slope_dataset.first_time = first_slope_observation.sampling_time_start + slope_dataset.first_value = first_slope_observation.value_quantity + slope_dataset.fk_first_observation_id = first_slope_observation.id + last_slope_observation = pg_session.query(Observation) \ + .filter(Observation.fk_dataset_id == slope_dataset.id) \ + .order_by(desc('sampling_time_start')) \ + .first() + if last_slope_observation is not None: + slope_dataset.last_time = last_slope_observation.sampling_time_start + slope_dataset.last_value = last_slope_observation.value_quantity + slope_dataset.fk_last_observation_id = last_slope_observation.id + + # set first and last roll observations of roll dataset + first_roll_observation = pg_session.query(Observation) \ + .filter(Observation.fk_dataset_id == roll_dataset.id) \ + .order_by(asc('sampling_time_start')) \ + .first() + if first_roll_observation is not None: + roll_dataset.first_time = first_roll_observation.sampling_time_start + roll_dataset.first_value = first_roll_observation.value_quantity + roll_dataset.fk_first_observation_id = first_roll_observation.id + last_roll_observation = pg_session.query(Observation) \ + .filter(Observation.fk_dataset_id == roll_dataset.id) \ + .order_by(desc('sampling_time_start')) \ + .first() + if last_roll_observation is not None: + roll_dataset.last_time = last_roll_observation.sampling_time_start + roll_dataset.last_value = last_roll_observation.value_quantity + roll_dataset.fk_last_observation_id = last_roll_observation.id + + # set first and last temperature observations od temperature dataset + first_temperature_observation = pg_session.query(Observation) \ + .filter(Observation.fk_dataset_id == temperature_dataset.id) \ + .order_by(asc('sampling_time_start')) \ + .first() + if first_temperature_observation is not None: + temperature_dataset.first_time = first_temperature_observation.sampling_time_start + temperature_dataset.first_value = first_temperature_observation.value_quantity + temperature_dataset.fk_first_observation_id = first_temperature_observation.id + last_temperature_observation = pg_session.query(Observation) \ + .filter(Observation.fk_dataset_id == temperature_dataset.id) \ + .order_by(desc('sampling_time_start')) \ + .first() + if last_temperature_observation is not None: + temperature_dataset.last_time = last_temperature_observation.sampling_time_start + temperature_dataset.last_value = last_temperature_observation.value_quantity + temperature_dataset.fk_last_observation_id = last_temperature_observation.id + + pg_session.commit() + print( + f"end of update script: for sensor {sensor} at " + f"feature {feature_of_interest} at platform {platform_name} " + "========================= End =========================") + # for loop sensors end + pg_session.close() + + +def create_db_observations(firebird_observations: List[FbObservation], + roll_dataset: Dataset, + slope_dataset: Dataset, + temperature_dataset: Dataset, + pg_session: session): + ''' insert new observations ito db ''' + # roll_result = ( + # pg_session.query(Observation.result_time) + # .filter(Observation.fk_dataset_id == roll_dataset.id) + # .all() + # ) + # roll_result_time_db_list1: List[str] = list(chain(*roll_result)) + # roll_result_time_db_list: List[float] = [time.mktime( + # date_obj.timetuple()) for date_obj in roll_result_time_db_list1] + + # slope_result = ( + # pg_session.query(Observation.result_time) + # .filter(Observation.fk_dataset_id == slope_dataset.id) + # .all() + # ) + # slope_result_time_db_list1: List[str] = list(chain(*slope_result)) + # slope_result_time_db_list: List[float] = [time.mktime( + # date_obj.timetuple()) for date_obj in slope_result_time_db_list1] + + # temperature_result = ( + # pg_session.query(Observation.result_time) + # .filter(Observation.fk_dataset_id == temperature_dataset.id) + # .all() + # ) + # temperature_result_time_db_list1: List[str] = list( + # chain(*temperature_result)) + # temperature_result_time_db_list: List[float] = [time.mktime( + # date_obj.timetuple()) for date_obj in temperature_result_time_db_list1] + + for fb_observation in firebird_observations: + # print(fb_observation.catena.name) + if(fb_observation.roll is not None and roll_dataset is not None): + value = fb_observation.roll + add_observation(roll_dataset, fb_observation, + value, pg_session) + + if(fb_observation.pitch is not None and slope_dataset is not None): + # max_id = max_id + 1 + value = fb_observation.pitch + add_observation(slope_dataset, fb_observation, + value, pg_session) + + if(fb_observation.temperature is not None and temperature_dataset is not None): + # max_id = max_id + 1 + value = fb_observation.temperature + add_observation(temperature_dataset, fb_observation, + value, pg_session) + + +def add_observation( + dataset: Dataset, + fb_observation: FbObservation, + value: str, + db_session: session): + ''' check if observation still extists in db, + otherwise add it to fb''' + # ob_id: str = str(observation_json.get('id')) + + existing_observation: bool = ( + db_session.query(Observation) + .filter(Observation.result_time == fb_observation.result_time, + Observation.fk_dataset_id == dataset.id) + .one_or_none() + ) + # existing_observation: bool = time.mktime( + # fb_observation.result_time.timetuple()) in value_identifier_db_list + # Can we insert this observation? + if existing_observation is None: + # insert bew observation + new_observation: Observation = Observation() + new_observation = Observation( + # id=max_id, + value_type='quantity', + sampling_time_start=fb_observation.result_time, + sampling_time_end=fb_observation.result_time, + result_time=fb_observation.result_time, + sta_identifier=str(uuid.uuid4()), + value_identifier=str(time.mktime( + fb_observation.result_time.timetuple())), + value_quantity=value + ) + dataset.observations.append(new_observation) + print(f"new observation with result time {new_observation.result_time} " + f"for inclinometer {dataset.procedure.name} succesfully imported!") + else: + print(f"observation with result time {fb_observation.result_time} " + f"for inclinometer {dataset.procedure.name} already exists!") + + +if __name__ == "__main__": + load_dotenv(find_dotenv()) + main()