""" import firebird, export to postgresql: SELECT dataset_id, dataset_type, observation_type, value_type, fk_procedure_id, fk_phenomenon_id, fk_offering_id, fk_category_id, fk_feature_id, fk_platform_id, fk_format_id, fk_unit_id, is_deleted, is_disabled, is_published, is_mobile, is_insitu, is_hidden, origin_timezone, first_time, last_time, first_value, last_value, fk_first_observation_id, fk_last_observation_id, decimals, identifier, fk_identifier_codespace_id, name, fk_name_codespace_id, description, fk_value_profile_id FROM gba.dataset where fk_platform_id = 6 and fk_feature_id = 43 ORDER BY dataset_id ASC; """ #!/usr/bin/python# -*- coding: utf-8 -*- import os import time from typing import List from itertools import chain import uuid import json from dotenv import load_dotenv, find_dotenv from sqlalchemy.orm import session from sqlalchemy import asc, desc # from sqlalchemy.dialects import firebird from sqlalchemy.sql import or_ from db.fb_models import (create_session, FbObservation, Catena) from db.models import (create_pg_session, Dataset, Observation, Procedure, Phenomenon, Platform, Format) def main(): """ Main function. """ #sensor_id = 0 # name of project area in firebird db feature_of_interest = 'GSA02A-010-1210' # Ampflwang KB1 # sensor name in postgis db # sensor = 'wolfsegg_kb1_0' platform = 'ampflwang_kb1_inclinometer' sensor_env_list = os.getenv('AMPFLWANG_KB1_SENSORS').replace('\n', '') sensor_list = json.loads(sensor_env_list) # print(sensor_list) firebird_session: session = create_session() # this will print elements along with their index value for sensor_id, sensor in enumerate(sensor_list): # db_observation = session.query(Observation) \ # .filter_by(name='John Snow').first() query_count = firebird_session.query(FbObservation).join(FbObservation.catena) \ .filter(FbObservation.sensore == sensor_id) \ .filter(Catena.name == feature_of_interest) \ .filter( or_( FbObservation.temperature != None, FbObservation.pitch != None # this is used to check NULL values )) \ .count() # if query_count == 0: # print(f"sensor {sensor} " # f"doesn't have any observations with measured values in firebird database!") # # hop to next for iteration, next sensor in list # continue # test = query_count.statement.compile(dialect=firebird.dialect()) firebird_observations: List[FbObservation] = [] if query_count > 0: query = firebird_session.query(FbObservation).join(FbObservation.catena) \ .filter(FbObservation.sensore == sensor_id) \ .filter(Catena.name == feature_of_interest) # print (query.statement.compile(dialect=firebird.dialect())) firebird_observations: List[FbObservation] = query.all() firebird_session.close() pg_session: session = create_pg_session() # pg_datasets: List[Dataset] = pg_query.all() pg_query = pg_session.query(Dataset) \ .join(Procedure) \ .join(Phenomenon) \ .filter(Procedure.sta_identifier == sensor.lower()) # .join(Platform).all() \ roll_dataset: Dataset = pg_query.filter( Phenomenon.sta_identifier == "Roll").first() slope_dataset: Dataset = pg_query.filter( Phenomenon.sta_identifier == "Slope").first() temperature_dataset: Dataset = pg_query.filter( Phenomenon.sta_identifier == "InSystemTemperature").first() platform_exists = pg_session.query(Platform.id).filter_by( name=platform.lower()).scalar() is not None if not platform_exists: sensor_platform = Platform() sensor_platform.sta_identifier = platform.lower() sensor_platform.identifier = platform.lower() sensor_platform.name = platform.lower() slope_dataset.platform = sensor_platform roll_dataset.platform = sensor_platform temperature_dataset.platform = sensor_platform else: sensor_platform = pg_session.query(Platform.id) \ .filter(Platform.name == platform.lower()) \ .first() slope_dataset.fk_platform_id = sensor_platform.id roll_dataset.fk_platform_id = sensor_platform.id temperature_dataset.fk_platform_id = sensor_platform.id # commit dataset changes: pg_session.commit() format_exists: bool = pg_session.query(Format.id).filter_by( definition="http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement" ).scalar() is not None if format_exists: sensor_format = pg_session.query(Format.id) \ .filter(Format.definition == "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement") \ .first() slope_dataset.fk_format_id = sensor_format.id roll_dataset.fk_format_id = sensor_format.id temperature_dataset.fk_format_id = sensor_format.id pg_session.commit() if query_count == 0: print(f"sensor {sensor} " f"doesn't have any observations with measured values in firebird database!") # hop to next for iteration, next sensor in list, don't insert any observations continue create_db_observations(firebird_observations, roll_dataset, slope_dataset, temperature_dataset, pg_session) # commit new observations: pg_session.commit() if len(roll_dataset.observations) > 0: # if not published yet, publish the roll dataset if not roll_dataset.is_published: roll_dataset.is_published = 1 roll_dataset.is_hidden = 0 roll_dataset.dataset_type = "timeseries" roll_dataset.observation_type = "simple" roll_dataset.value_type = "quantity" if len(slope_dataset.observations) > 0: # if not published yet, publish the roll dataset if not slope_dataset.is_published: slope_dataset.is_published = 1 slope_dataset.is_hidden = 0 slope_dataset.dataset_type = "timeseries" slope_dataset.observation_type = "simple" slope_dataset.value_type = "quantity" if len(temperature_dataset.observations) > 0: # if not published yet, publish the temperature dataset if not temperature_dataset.is_published: temperature_dataset.is_published = 1 temperature_dataset.is_hidden = 0 temperature_dataset.dataset_type = "timeseries" temperature_dataset.observation_type = "simple" temperature_dataset.value_type = "quantity" pg_session.commit() last_roll_observation = pg_session.query(Observation) \ .filter(Observation.fk_dataset_id == roll_dataset.id) \ .order_by(desc('sampling_time_start')) \ .first() if last_roll_observation is not None: roll_dataset.last_time = last_roll_observation.sampling_time_start roll_dataset.last_value = last_roll_observation.value_quantity roll_dataset.fk_last_observation_id = last_roll_observation.id last_slope_observation = pg_session.query(Observation) \ .filter(Observation.fk_dataset_id == slope_dataset.id) \ .order_by(desc('sampling_time_start')) \ .first() if last_slope_observation is not None: slope_dataset.last_time = last_slope_observation.sampling_time_start slope_dataset.last_value = last_slope_observation.value_quantity slope_dataset.fk_last_observation_id = last_slope_observation.id last_temperature_observation = pg_session.query(Observation) \ .filter(Observation.fk_dataset_id == temperature_dataset.id) \ .order_by(desc('sampling_time_start')) \ .first() if last_temperature_observation is not None: temperature_dataset.last_time = last_temperature_observation.sampling_time_start temperature_dataset.last_value = last_temperature_observation.value_quantity temperature_dataset.fk_last_observation_id = last_temperature_observation.id first_roll_observation = pg_session.query(Observation) \ .filter(Observation.fk_dataset_id == roll_dataset.id) \ .order_by(asc('sampling_time_start')) \ .first() if first_roll_observation is not None: roll_dataset.first_time = first_roll_observation.sampling_time_start roll_dataset.first_value = first_roll_observation.value_quantity roll_dataset.fk_first_observation_id = first_roll_observation.id first_slope_observation = pg_session.query(Observation) \ .filter(Observation.fk_dataset_id == slope_dataset.id) \ .order_by(asc('sampling_time_start')) \ .first() if first_slope_observation is not None: slope_dataset.first_time = first_slope_observation.sampling_time_start slope_dataset.first_value = first_slope_observation.value_quantity slope_dataset.fk_first_observation_id = first_slope_observation.id first_temperature_observation = pg_session.query(Observation) \ .filter(Observation.fk_dataset_id == temperature_dataset.id) \ .order_by(asc('sampling_time_start')) \ .first() if first_temperature_observation is not None: temperature_dataset.first_time = first_temperature_observation.sampling_time_start temperature_dataset.first_value = first_temperature_observation.value_quantity temperature_dataset.fk_first_observation_id = first_temperature_observation.id pg_session.commit() # for loop sensors end pg_session.close() # firebird_session.close() def create_db_observations(firebird_observations: List[FbObservation], roll_dataset: Dataset, slope_dataset: Dataset, temperature_dataset: Dataset, pg_session: session): ''' insert new observations ito db ''' roll_result = ( pg_session.query(Observation.result_time) .filter(Observation.fk_dataset_id == roll_dataset.id) .all() ) roll_result_time_db_list1: List[str] = list(chain(*roll_result)) roll_result_time_db_list: List[float] = [time.mktime( date_obj.timetuple()) for date_obj in roll_result_time_db_list1] slope_result = ( pg_session.query(Observation.result_time) .filter(Observation.fk_dataset_id == slope_dataset.id) .all() ) slope_result_time_db_list1: List[str] = list(chain(*slope_result)) slope_result_time_db_list: List[float] = [time.mktime( date_obj.timetuple()) for date_obj in slope_result_time_db_list1] temperature_result = ( pg_session.query(Observation.result_time) .filter(Observation.fk_dataset_id == temperature_dataset.id) .all() ) temperature_result_time_db_list1: List[str] = list( chain(*temperature_result)) temperature_result_time_db_list: List[float] = [time.mktime( date_obj.timetuple()) for date_obj in temperature_result_time_db_list1] for fb_observation in firebird_observations: # print(fb_observation.catena.name) if(fb_observation.roll is not None and roll_dataset is not None): value = fb_observation.roll add_observation(roll_dataset, fb_observation, value, roll_result_time_db_list) if(fb_observation.pitch is not None and slope_dataset is not None): # max_id = max_id + 1 value = fb_observation.pitch add_observation(slope_dataset, fb_observation, value, slope_result_time_db_list) if(fb_observation.temperature is not None and temperature_dataset is not None): # max_id = max_id + 1 value = fb_observation.temperature add_observation(temperature_dataset, fb_observation, value, temperature_result_time_db_list) def add_observation( dataset: Dataset, fb_observation: FbObservation, value: str, value_identifier_db_list: List[float]): ''' check if observation still extists in db, otherwise add it to fb''' # ob_id: str = str(observation_json.get('id')) # existing_observation: bool = ( # db_session.query(Observation) # .filter(Observation.result_time == fb_observation.result_time, # Observation.fk_dataset_id == dataset.id) # .one_or_none() # ) existing_observation: bool = time.mktime( fb_observation.result_time.timetuple()) in value_identifier_db_list # Can we insert this observation? if existing_observation is False: # insert bew observation new_observation: Observation = Observation() new_observation = Observation( # id=max_id, value_type='quantity', sampling_time_start=fb_observation.result_time, sampling_time_end=fb_observation.result_time, result_time=fb_observation.result_time, sta_identifier=str(uuid.uuid4()), value_identifier=str(time.mktime( fb_observation.result_time.timetuple())), value_quantity=value ) dataset.observations.append(new_observation) print(f"new observation with result time {new_observation.result_time} " f"for inclinometer {dataset.procedure.name} succesfully imported!") else: print(f"observation with result time {fb_observation.result_time} " f"for inclinometer {dataset.procedure.name} already exists!") # ----------------------------------------------------------------------------- if __name__ == "__main__": load_dotenv(find_dotenv()) main()