''' Tutorial link: https://realpython.com/flask-connexion-rest-api-part-2/ https://github.com/realpython/materials/blob/master/flask-connexion-rest-part-2/version_1/people.py Sqlalchemy version: 1.2.15 Python version: 3.7 ''' import os import uuid from typing import List from itertools import chain import json # import sys, inspect # currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) # parentdir = os.path.dirname(currentdir) # sys.path.insert(0, parentdir) # import requests from datetime import datetime, date, timedelta from dotenv import load_dotenv, find_dotenv from sqlalchemy.orm import session from sqlalchemy import func, asc, desc # from db.pg_models import Platform from db.models import ( ObservationSchema, Person, PersonSchema, Observation, create_pg_session, Dataset, Procedure, Phenomenon, Platform, Format) from gschliefgraben_glasfaser.my_api import MyApi def main(): ''' main method ''' pg_session: session = create_pg_session() platform_sta_identifier = "gschliefgraben_glasfaser" # sensor_list = ["inclino1_14", "inclino1_02"] #sensor_list = os.environ.get("GLASFASER_GSCHLIEFGRABEN_SENSORS") sensor_list = json.loads(os.environ['GLASFASER_GSCHLIEFGRABEN_SENSORS']) # this will print elements along with their index value for sensor in sensor_list: pg_query = pg_session.query(Dataset) \ .join(Procedure) \ .join(Phenomenon) \ .filter(Procedure.sta_identifier == sensor.lower()) slope_dataset: Dataset = pg_query.filter( Phenomenon.sta_identifier == "Slope").first() if not slope_dataset: print("Sensor " + sensor + " ist noch nicht angelegt!") exit() if not slope_dataset.is_published: slope_dataset.is_published = 1 slope_dataset.is_hidden = 0 slope_dataset.dataset_type = "timeseries" slope_dataset.observation_type = "simple" slope_dataset.value_type = "quantity" pg_session.commit() platform_exists: bool = pg_session.query(Platform.id).filter_by( sta_identifier=platform_sta_identifier).scalar() is not None # if platform_exists: # sensor_platform = pg_session.query(Platform.id) \ # .filter(Platform.sta_identifier == platform_sta_identifier) \ # .first() # slope_dataset.fk_platform_id = sensor_platform.id # else: # exit() if not platform_exists: sensor_platform = Platform() # max_id = pg_session.query(func.max(Platform.id)).scalar() # sensor_platform.id = max_id + 1 sensor_platform.sta_identifier = platform_sta_identifier.lower() sensor_platform.identifier = platform_sta_identifier.lower() sensor_platform.name = platform_sta_identifier.lower() slope_dataset.platform = sensor_platform else: sensor_platform = pg_session.query(Platform.id) \ .filter(Platform.sta_identifier == platform_sta_identifier) \ .first() slope_dataset.fk_platform_id = sensor_platform.id format_exists: bool = pg_session.query(Format.id).filter_by( definition="http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement" ).scalar() is not None if format_exists: sensor_format = pg_session.query(Format.id) \ .filter(Format.definition == "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement") \ .first() slope_dataset.fk_format_id = sensor_format.id # create all the observation for the given sensor names create_observations(sensor, slope_dataset) # update first and last observations for the dataset first_slope_observation = pg_session.query(Observation) \ .filter(Observation.fk_dataset_id == slope_dataset.id) \ .order_by(asc('sampling_time_start')) \ .first() if first_slope_observation is not None: slope_dataset.first_time = first_slope_observation.sampling_time_start slope_dataset.first_value = first_slope_observation.value_quantity slope_dataset.fk_first_observation_id = first_slope_observation.id last_slope_observation = pg_session.query(Observation) \ .filter(Observation.fk_dataset_id == slope_dataset.id) \ .order_by(desc('sampling_time_start')) \ .first() if last_slope_observation is not None: slope_dataset.last_time = last_slope_observation.sampling_time_start slope_dataset.last_value = last_slope_observation.value_quantity slope_dataset.fk_last_observation_id = last_slope_observation.id pg_session.commit() pg_session.close() def create_observations(sensor: str, slope_dataset: Dataset): ''' create_observations method for given sensor ''' pg_session: session = create_pg_session() # The size of each step in days # consider the start date as 2021-february 1 st start_date = date(2022, 1, 1) # consider the end date as 2021-march 1 st end_date = date(2022, 3, 20) # delta time delta = timedelta(days=7) token_api = os.environ.get("TOKEN_API") test_api = MyApi(token_api) # iterate over range of dates while start_date <= end_date: # print(start_date, end="\n") query_date_start: str = start_date.strftime('%Y-%m-%d') end_date_temp: date = start_date + delta # (plus 7 days) if end_date_temp > end_date: end_date_temp = end_date query_date_end: str = end_date_temp.strftime('%Y-%m-%d') create_db_observations( sensor, query_date_start, query_date_end, test_api, pg_session, slope_dataset) # for next loop step set new start_date (1 day greate then last end_date) start_date = end_date_temp + timedelta(days=1) pg_session.commit() # query_date = "2022-02-28" # create_db_observations(query_date, test_api, pg_session) # query_date_obj = datetime.strptime(query_date, "%Y-%m-%d") # data = test_api.getSensorData("inclino1_14", query_date) # observation_array = (data['FeatureCollection'] # ['Features'][0]['geometry']['properties'][0]) # print(observation_array) # max_id = pg_session.query(func.max(Observation.id)).scalar() # if max_id is None: # max_id = -1 # # pg_session.bulk_save_objects(observations) # for observation_json in observation_array: # ob_date_time = observation_json.get('DateTime') # datetime_obj = datetime.strptime(ob_date_time, "%Y-%m-%dT%H:%M:%S.%fZ") # if datetime_obj.date() != query_date_obj.date(): # continue # max_id = max_id + 1 # create_observation(observation_json, pg_session, max_id) # pg_session.commit() def create_db_observations(sensor, query_date_start, query_date_end, test_api, pg_session, dataset: Dataset): ''' to do ''' query_date_start_obj = datetime.strptime(query_date_start, "%Y-%m-%d") query_date_end_obj = datetime.strptime(query_date_end, "%Y-%m-%d") data = test_api.getSensorData(sensor, query_date_start, query_date_end) observation_array = (data['FeatureCollection'] ['Features'][0]['geometry']['properties'][0]) # print(observation_array) result = ( pg_session.query(Observation.value_identifier) .filter(Observation.fk_dataset_id == dataset.id) .all() ) value_identifier_db_list: List[str] = list(chain(*result)) max_id = pg_session.query(func.max(Observation.id)).scalar() if max_id is None: max_id = -1 # pg_session.bulk_save_objects(observations) for observation_json in observation_array: ob_date_time = observation_json.get('DateTime') datetime_obj = datetime.strptime(ob_date_time, "%Y-%m-%dT%H:%M:%S.%fZ") if datetime_obj.date() < query_date_start_obj.date(): continue if datetime_obj.date() > query_date_end_obj.date(): continue ob_value = observation_json.get('Value') if ob_value is None: continue # max_id = max_id + 1 max_id = create_observation( observation_json, pg_session, max_id, dataset, value_identifier_db_list) # pg_session.commit() print("observations for date " + query_date_start + " to " + query_date_end + " for sensor " + sensor + " succesfully imported \n") def create_observation(observation_json: ObservationSchema, db_session, max_id, dataset: Dataset, value_identifier_db_list): """ This function creates a new observation in the people structure based on the passed-in observation data :param observation: observation to create in people structure :return: 201 on success, observation on person exists """ ob_id: str = str(observation_json.get('id')) # db_session = create_pg_session() # existing_observation: bool = ( # db_session.query(Observation) # .filter(Observation.value_identifier == ob_id) # .one_or_none() # ) existing_observation: bool = ob_id in value_identifier_db_list # Can we insert this observation? if existing_observation is False: max_id += 1 # Create a person instance using the schema and the passed in person schema = ObservationSchema() # deserialize to python object new_observation: Observation = schema.load(observation_json) # new_observation.id = max_id new_observation.sta_identifier = str(uuid.uuid4()) new_observation.sampling_time_start = new_observation.result_time new_observation.sampling_time_end = new_observation.result_time new_observation.fk_dataset_id = dataset.id # Add the person to the database db_session.add(new_observation) # dataset.observations.append(new_observation) # db_session.commit() # Serialize and return the newly created person in the response # data = schema.dump(new_observation) # return data, 201 return max_id # Otherwise, nope, person exists already else: # print(409, f'Observation {ob_id} exists already') return max_id def create(person_json: PersonSchema): """ This function creates a new person in the people structure based on the passed-in person data :param person: person to create in people structure :return: 201 on success, 406 on person exists """ login = person_json.get('login') #lname = person.get('lname') db_session = create_pg_session() # existing_person = Person.query \ # .filter(Person.login == login) \ # .one_or_none() existing_person: bool = ( db_session.query(Person) .filter(Person.login == login) .one_or_none() ) # Can we insert this person? if existing_person is None: # Create a person instance using the schema and the passed in person schema = PersonSchema() # deserialize to object new_person: Person = schema.load(person_json) # Add the person to the database db_session.add(new_person) db_session.commit() # Serialize and return the newly created person in the response data = schema.dump(new_person) return data, 201 # Otherwise, nope, person exists already else: print(409, f'Person {login} exists already') if __name__ == "__main__": load_dotenv(find_dotenv()) sensor_list1 = os.environ.get('GLASFASER_GSCHLIEFGRABEN_SENSORS', []).replace(r'\n', '\n') print(f'sensors: {sensor_list1} .') main()