302 lines
13 KiB
Python
302 lines
13 KiB
Python
|
'''
|
||
|
Tutorial link: https://realpython.com/flask-connexion-rest-api-part-2/
|
||
|
https://github.com/realpython/materials/blob/master/flask-connexion-rest-part-2/version_1/people.py
|
||
|
Sqlalchemy version: 1.2.15
|
||
|
Python version: 3.10
|
||
|
'''
|
||
|
|
||
|
# from itertools import count
|
||
|
import os
|
||
|
import json
|
||
|
import uuid
|
||
|
import time
|
||
|
from typing import List
|
||
|
from dotenv import load_dotenv, find_dotenv
|
||
|
from sqlalchemy.orm import session
|
||
|
from sqlalchemy import asc, desc
|
||
|
# from sqlalchemy.sql import or_
|
||
|
from db.fb_models import (create_session, FbObservation, Catena)
|
||
|
from db.models import (
|
||
|
Observation, create_pg_session,
|
||
|
Dataset, Procedure, Phenomenon)
|
||
|
|
||
|
|
||
|
def main():
|
||
|
''' main method '''
|
||
|
# update automatic observations for KB1 Gschliefgraben
|
||
|
feature_of_interest = 'TAC003-020-0521' # Gschliefgraben KB1
|
||
|
platform_name = 'gschliefgraben_inclinometer'
|
||
|
sensor_list_conv = 'GSCHLIEFGRABEN_KB1_SENSORS'
|
||
|
update(feature_of_interest, platform_name, sensor_list_conv)
|
||
|
|
||
|
# update automatic observations for KB2 Gschliefgraben
|
||
|
feature_of_interest = 'TAC005-013-0521' # Gschliefgraben KB2
|
||
|
platform_name = 'gschliefgraben_inclinometer'
|
||
|
sensor_list_conv = 'GSCHLIEFGRABEN_KB2_SENSORS'
|
||
|
update(feature_of_interest, platform_name, sensor_list_conv)
|
||
|
|
||
|
# update automatic observations for KB1 Ampflwang
|
||
|
feature_of_interest = 'GSA02A-010-1210' # Ampflwang KB1
|
||
|
platform_name = 'ampflwang_inclinometer'
|
||
|
sensor_list_conv = 'AMPFLWANG_KB1_SENSORS'
|
||
|
update(feature_of_interest, platform_name, sensor_list_conv)
|
||
|
|
||
|
# update automatic observations for KB2 Ampflwang
|
||
|
feature_of_interest = 'GSA02B-007-1210' # KB2
|
||
|
platform_name = 'ampflwang_inclinometer'
|
||
|
sensor_list_conv = 'AMPFLWANG_KB2_SENSORS'
|
||
|
update(feature_of_interest, platform_name, sensor_list_conv)
|
||
|
|
||
|
|
||
|
def update(feature_of_interest, platform_name, sensor_list_conv):
|
||
|
''' starting update method '''
|
||
|
|
||
|
pg_session: session = create_pg_session()
|
||
|
# feature_of_interest = 'TAC003-020-0521' # Gschliefgraben KB1
|
||
|
# platform_name = 'gschliefgraben_inclinometer'
|
||
|
sensor_env_list = os.getenv(sensor_list_conv).replace('\n', '')
|
||
|
# sensor_list = json.loads(os.environ['GLASFASER_GSCHLIEFGRABEN_SENSORS'])
|
||
|
sensor_list = json.loads(sensor_env_list)
|
||
|
|
||
|
firebird_session: session = create_session()
|
||
|
# this will print elements along with their index value
|
||
|
for sensor_id, sensor in enumerate(sensor_list):
|
||
|
print(
|
||
|
"========================= Start ========================="
|
||
|
f"start update script: for sensor {sensor} at "
|
||
|
f"feature {feature_of_interest} at platform {platform_name} \n")
|
||
|
|
||
|
pg_query = pg_session.query(Dataset) \
|
||
|
.join(Procedure) \
|
||
|
.join(Phenomenon) \
|
||
|
.filter(Procedure.sta_identifier == sensor.lower())
|
||
|
# slope_dataset: Dataset = pg_query.filter(
|
||
|
# Phenomenon.sta_identifier == "Slope").first()
|
||
|
roll_dataset: Dataset = pg_query.filter(
|
||
|
Phenomenon.sta_identifier == "Roll").first()
|
||
|
|
||
|
slope_dataset: Dataset = pg_query.filter(
|
||
|
Phenomenon.sta_identifier == "Slope").first()
|
||
|
|
||
|
temperature_dataset: Dataset = pg_query.filter(
|
||
|
Phenomenon.sta_identifier == "InSystemTemperature").first()
|
||
|
|
||
|
query_count = firebird_session.query(FbObservation).join(FbObservation.catena) \
|
||
|
.filter(FbObservation.sensore == sensor_id) \
|
||
|
.filter(Catena.name == feature_of_interest) \
|
||
|
.filter(FbObservation.data >= slope_dataset.last_time) \
|
||
|
.count()
|
||
|
if query_count == 0:
|
||
|
print(f"sensor {sensor} for platform {platform_name} "
|
||
|
f"doesn't have any updated observations "
|
||
|
f"later than {slope_dataset.last_time} in firebird database! \n")
|
||
|
# hop to next for iteration, next sensor in list, don't insert any observations
|
||
|
continue
|
||
|
|
||
|
filtered_last_resultime_firebird_observations: List[FbObservation] = []
|
||
|
query = firebird_session.query(FbObservation).join(FbObservation.catena) \
|
||
|
.filter(FbObservation.sensore == sensor_id) \
|
||
|
.filter(Catena.name == feature_of_interest) \
|
||
|
.filter(FbObservation.data >= slope_dataset.last_time)
|
||
|
|
||
|
# print (query.statement.compile(dialect=firebird.dialect()))
|
||
|
filtered_last_resultime_firebird_observations = query.all()
|
||
|
firebird_session.close()
|
||
|
|
||
|
# insert the new observation from firebird db into postgresql:
|
||
|
create_db_observations(filtered_last_resultime_firebird_observations, roll_dataset,
|
||
|
slope_dataset, temperature_dataset, pg_session)
|
||
|
|
||
|
# commit new observations:
|
||
|
pg_session.commit()
|
||
|
|
||
|
# set is_published to true, if dataset hadn't any observations before update
|
||
|
if len(roll_dataset.observations) > 0:
|
||
|
# if not published yet, publish the roll dataset
|
||
|
if not roll_dataset.is_published:
|
||
|
roll_dataset.is_published = 1
|
||
|
roll_dataset.is_hidden = 0
|
||
|
roll_dataset.dataset_type = "timeseries"
|
||
|
roll_dataset.observation_type = "simple"
|
||
|
roll_dataset.value_type = "quantity"
|
||
|
|
||
|
if len(slope_dataset.observations) > 0:
|
||
|
# if not published yet, publish the roll dataset
|
||
|
if not slope_dataset.is_published:
|
||
|
slope_dataset.is_published = 1
|
||
|
slope_dataset.is_hidden = 0
|
||
|
slope_dataset.dataset_type = "timeseries"
|
||
|
slope_dataset.observation_type = "simple"
|
||
|
slope_dataset.value_type = "quantity"
|
||
|
|
||
|
if len(temperature_dataset.observations) > 0:
|
||
|
# if not published yet, publish the temperature dataset
|
||
|
if not temperature_dataset.is_published:
|
||
|
temperature_dataset.is_published = 1
|
||
|
temperature_dataset.is_hidden = 0
|
||
|
temperature_dataset.dataset_type = "timeseries"
|
||
|
temperature_dataset.observation_type = "simple"
|
||
|
temperature_dataset.value_type = "quantity"
|
||
|
pg_session.commit()
|
||
|
|
||
|
# set first and last slope observations of slope dataset
|
||
|
first_slope_observation = pg_session.query(Observation) \
|
||
|
.filter(Observation.fk_dataset_id == slope_dataset.id) \
|
||
|
.order_by(asc('sampling_time_start')) \
|
||
|
.first()
|
||
|
if first_slope_observation is not None:
|
||
|
slope_dataset.first_time = first_slope_observation.sampling_time_start
|
||
|
slope_dataset.first_value = first_slope_observation.value_quantity
|
||
|
slope_dataset.fk_first_observation_id = first_slope_observation.id
|
||
|
last_slope_observation = pg_session.query(Observation) \
|
||
|
.filter(Observation.fk_dataset_id == slope_dataset.id) \
|
||
|
.order_by(desc('sampling_time_start')) \
|
||
|
.first()
|
||
|
if last_slope_observation is not None:
|
||
|
slope_dataset.last_time = last_slope_observation.sampling_time_start
|
||
|
slope_dataset.last_value = last_slope_observation.value_quantity
|
||
|
slope_dataset.fk_last_observation_id = last_slope_observation.id
|
||
|
|
||
|
# set first and last roll observations of roll dataset
|
||
|
first_roll_observation = pg_session.query(Observation) \
|
||
|
.filter(Observation.fk_dataset_id == roll_dataset.id) \
|
||
|
.order_by(asc('sampling_time_start')) \
|
||
|
.first()
|
||
|
if first_roll_observation is not None:
|
||
|
roll_dataset.first_time = first_roll_observation.sampling_time_start
|
||
|
roll_dataset.first_value = first_roll_observation.value_quantity
|
||
|
roll_dataset.fk_first_observation_id = first_roll_observation.id
|
||
|
last_roll_observation = pg_session.query(Observation) \
|
||
|
.filter(Observation.fk_dataset_id == roll_dataset.id) \
|
||
|
.order_by(desc('sampling_time_start')) \
|
||
|
.first()
|
||
|
if last_roll_observation is not None:
|
||
|
roll_dataset.last_time = last_roll_observation.sampling_time_start
|
||
|
roll_dataset.last_value = last_roll_observation.value_quantity
|
||
|
roll_dataset.fk_last_observation_id = last_roll_observation.id
|
||
|
|
||
|
# set first and last temperature observations od temperature dataset
|
||
|
first_temperature_observation = pg_session.query(Observation) \
|
||
|
.filter(Observation.fk_dataset_id == temperature_dataset.id) \
|
||
|
.order_by(asc('sampling_time_start')) \
|
||
|
.first()
|
||
|
if first_temperature_observation is not None:
|
||
|
temperature_dataset.first_time = first_temperature_observation.sampling_time_start
|
||
|
temperature_dataset.first_value = first_temperature_observation.value_quantity
|
||
|
temperature_dataset.fk_first_observation_id = first_temperature_observation.id
|
||
|
last_temperature_observation = pg_session.query(Observation) \
|
||
|
.filter(Observation.fk_dataset_id == temperature_dataset.id) \
|
||
|
.order_by(desc('sampling_time_start')) \
|
||
|
.first()
|
||
|
if last_temperature_observation is not None:
|
||
|
temperature_dataset.last_time = last_temperature_observation.sampling_time_start
|
||
|
temperature_dataset.last_value = last_temperature_observation.value_quantity
|
||
|
temperature_dataset.fk_last_observation_id = last_temperature_observation.id
|
||
|
|
||
|
pg_session.commit()
|
||
|
print(
|
||
|
f"end of update script: for sensor {sensor} at "
|
||
|
f"feature {feature_of_interest} at platform {platform_name} "
|
||
|
"========================= End =========================")
|
||
|
# for loop sensors end
|
||
|
pg_session.close()
|
||
|
|
||
|
|
||
|
def create_db_observations(firebird_observations: List[FbObservation],
|
||
|
roll_dataset: Dataset,
|
||
|
slope_dataset: Dataset,
|
||
|
temperature_dataset: Dataset,
|
||
|
pg_session: session):
|
||
|
''' insert new observations ito db '''
|
||
|
# roll_result = (
|
||
|
# pg_session.query(Observation.result_time)
|
||
|
# .filter(Observation.fk_dataset_id == roll_dataset.id)
|
||
|
# .all()
|
||
|
# )
|
||
|
# roll_result_time_db_list1: List[str] = list(chain(*roll_result))
|
||
|
# roll_result_time_db_list: List[float] = [time.mktime(
|
||
|
# date_obj.timetuple()) for date_obj in roll_result_time_db_list1]
|
||
|
|
||
|
# slope_result = (
|
||
|
# pg_session.query(Observation.result_time)
|
||
|
# .filter(Observation.fk_dataset_id == slope_dataset.id)
|
||
|
# .all()
|
||
|
# )
|
||
|
# slope_result_time_db_list1: List[str] = list(chain(*slope_result))
|
||
|
# slope_result_time_db_list: List[float] = [time.mktime(
|
||
|
# date_obj.timetuple()) for date_obj in slope_result_time_db_list1]
|
||
|
|
||
|
# temperature_result = (
|
||
|
# pg_session.query(Observation.result_time)
|
||
|
# .filter(Observation.fk_dataset_id == temperature_dataset.id)
|
||
|
# .all()
|
||
|
# )
|
||
|
# temperature_result_time_db_list1: List[str] = list(
|
||
|
# chain(*temperature_result))
|
||
|
# temperature_result_time_db_list: List[float] = [time.mktime(
|
||
|
# date_obj.timetuple()) for date_obj in temperature_result_time_db_list1]
|
||
|
|
||
|
for fb_observation in firebird_observations:
|
||
|
# print(fb_observation.catena.name)
|
||
|
if(fb_observation.roll is not None and roll_dataset is not None):
|
||
|
value = fb_observation.roll
|
||
|
add_observation(roll_dataset, fb_observation,
|
||
|
value, pg_session)
|
||
|
|
||
|
if(fb_observation.pitch is not None and slope_dataset is not None):
|
||
|
# max_id = max_id + 1
|
||
|
value = fb_observation.pitch
|
||
|
add_observation(slope_dataset, fb_observation,
|
||
|
value, pg_session)
|
||
|
|
||
|
if(fb_observation.temperature is not None and temperature_dataset is not None):
|
||
|
# max_id = max_id + 1
|
||
|
value = fb_observation.temperature
|
||
|
add_observation(temperature_dataset, fb_observation,
|
||
|
value, pg_session)
|
||
|
|
||
|
|
||
|
def add_observation(
|
||
|
dataset: Dataset,
|
||
|
fb_observation: FbObservation,
|
||
|
value: str,
|
||
|
db_session: session):
|
||
|
''' check if observation still extists in db,
|
||
|
otherwise add it to fb'''
|
||
|
# ob_id: str = str(observation_json.get('id'))
|
||
|
|
||
|
existing_observation: bool = (
|
||
|
db_session.query(Observation)
|
||
|
.filter(Observation.result_time == fb_observation.result_time,
|
||
|
Observation.fk_dataset_id == dataset.id)
|
||
|
.one_or_none()
|
||
|
)
|
||
|
# existing_observation: bool = time.mktime(
|
||
|
# fb_observation.result_time.timetuple()) in value_identifier_db_list
|
||
|
# Can we insert this observation?
|
||
|
if existing_observation is None:
|
||
|
# insert bew observation
|
||
|
new_observation: Observation = Observation()
|
||
|
new_observation = Observation(
|
||
|
# id=max_id,
|
||
|
value_type='quantity',
|
||
|
sampling_time_start=fb_observation.result_time,
|
||
|
sampling_time_end=fb_observation.result_time,
|
||
|
result_time=fb_observation.result_time,
|
||
|
sta_identifier=str(uuid.uuid4()),
|
||
|
value_identifier=str(time.mktime(
|
||
|
fb_observation.result_time.timetuple())),
|
||
|
value_quantity=value
|
||
|
)
|
||
|
dataset.observations.append(new_observation)
|
||
|
print(f"new observation with result time {new_observation.result_time} "
|
||
|
f"for inclinometer {dataset.procedure.name} succesfully imported!")
|
||
|
else:
|
||
|
print(f"observation with result time {fb_observation.result_time} "
|
||
|
f"for inclinometer {dataset.procedure.name} already exists!")
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
load_dotenv(find_dotenv())
|
||
|
main()
|