From fdc5da737307c54091768653d8b36d6202689df0 Mon Sep 17 00:00:00 2001 From: Arno Kaimbacher Date: Fri, 4 Mar 2022 14:47:36 +0100 Subject: [PATCH] - add daily cron job for inserting sensor data --- gschliefgraben_glasfaser/create_db.py | 26 ---- gschliefgraben_glasfaser/main.py | 144 ++++++++++-------- gschliefgraben_glasfaser/my_api.py | 4 +- gschliefgraben_glasfaser/update_daily_cron.py | 142 +++++++++++++++++ insert_sensor/InsertGschliefgraben.xml | 14 +- insert_sensor/execute.py | 6 +- insert_sensor/transactional.py | 2 +- notes.txt | 2 +- requirements.txt | Bin 1114 -> 1046 bytes 9 files changed, 228 insertions(+), 112 deletions(-) delete mode 100644 gschliefgraben_glasfaser/create_db.py create mode 100644 gschliefgraben_glasfaser/update_daily_cron.py diff --git a/gschliefgraben_glasfaser/create_db.py b/gschliefgraben_glasfaser/create_db.py deleted file mode 100644 index 7da1815..0000000 --- a/gschliefgraben_glasfaser/create_db.py +++ /dev/null @@ -1,26 +0,0 @@ -from sqlalchemy import create_engine -from sqlalchemy import MetaData -from sqlalchemy import Table -from sqlalchemy import Column -from sqlalchemy import Integer, String - -db_url = 'sqlite:///db.sqlite' -engine = create_engine(db_url ) - -# Create a metadata instance -metadata = MetaData(engine) -# Declare a table -table = Table('Example',metadata, - Column('id',Integer, primary_key=True), - Column('name',String)) -# Create all tables -students = Table( - 'students', metadata, - Column('id', Integer, primary_key = True), - Column('name', String), - Column('lastname', String), -) -metadata.create_all(engine) - - - \ No newline at end of file diff --git a/gschliefgraben_glasfaser/main.py b/gschliefgraben_glasfaser/main.py index fcdf65a..3689b72 100644 --- a/gschliefgraben_glasfaser/main.py +++ b/gschliefgraben_glasfaser/main.py @@ -7,6 +7,8 @@ Python version: 3.7 import os import uuid +from typing import List +from itertools import chain # import sys, inspect # currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe()))) # parentdir = os.path.dirname(currentdir) @@ -25,75 +27,75 @@ from datetime import datetime, date, timedelta # print(response.json()) # shows the response's JSON response body, if it has one # print(response.content) # get the data content of the response - def main(): ''' main method ''' - db_user = os.environ.get("POSTGIS_DBUSER") - print(db_user) - pg_session: session = create_pg_session() - # pg_person: Person = pg_session.query(Person).first() - observation: Observation = pg_session.query(Observation).first() - # print(pg_person) - - # serialize db data to json - # person_schema = PersonSchema() - # dump_data = person_schema.dump(pg_person) - # print(dump_data) - # serialize db data to json - # observation_schema = ObservationSchema() - # dump_data = observation_schema.dump(observation) - # print(dump_data) - - # request ortmann api - # response = - # requests.get('https://api.dgnss-sensors.com/gschliefgraben?sensors=("inclino1_14")', - # headers={ - # 'Authorization': 'Bearer' + token, - # 'cache-control': 'no-cache', - # 'Content-Type': 'application/x-www-form-urlencoded', - # 'accept': 'application/json' - # }, - # data='grant_type=client_credentials&scope=gschliefgraben') - # print(response) - - sensor: str = "inclino1_14" - pg_query = pg_session.query(Dataset) \ - .join(Procedure) \ - .join(Phenomenon) \ - .filter(Procedure.sta_identifier == sensor.lower()) - slope_dataset: Dataset = pg_query.filter( - Phenomenon.sta_identifier == "Slope").first() - if not slope_dataset.is_published: - slope_dataset.is_published = 1 - slope_dataset.is_hidden = 0 - slope_dataset.dataset_type = "timeseries" - slope_dataset.observation_type = "simple" - slope_dataset.value_type = "quantity" - pg_session.commit() + platform_sta_identifier = "gschliefgraben_glasfaser" + # sensor_list = ["inclino1_14", "inclino1_02"] + sensor_list = os.environ.get("GLASFASER_GSCHLIEFGRABEN_SENSORS") + # this will print elements along with their index value + for sensor in enumerate(sensor_list): + + pg_query = pg_session.query(Dataset) \ + .join(Procedure) \ + .join(Phenomenon) \ + .filter(Procedure.sta_identifier == sensor.lower()) + slope_dataset: Dataset = pg_query.filter( + Phenomenon.sta_identifier == "Slope").first() + if not slope_dataset: + print("Sensor " + sensor + " ist noch nicht angelegt!") + exit() + if not slope_dataset.is_published: + slope_dataset.is_published = 1 + slope_dataset.is_hidden = 0 + slope_dataset.dataset_type = "timeseries" + slope_dataset.observation_type = "simple" + slope_dataset.value_type = "quantity" + pg_session.commit() + + platform_exists: bool = pg_session.query(Platform.id).filter_by( + sta_identifier = platform_sta_identifier).scalar() is not None + if platform_exists: + sensor_platform = pg_session.query(Platform.id) \ + .filter(Platform.sta_identifier == platform_sta_identifier) \ + .first() + slope_dataset.fk_platform_id = sensor_platform.id + else: + exit() + + # create all the observation for the given sensor names + create_observations(sensor, slope_dataset) + +def create_observations(sensor: str, slope_dataset: Dataset): + ''' create_observations method for given sensor ''' + + pg_session: session = create_pg_session() + # The size of each step in days # consider the start date as 2021-february 1 st start_date = date(2022, 1, 1) # consider the end date as 2021-march 1 st - end_date = date(2022, 3, 1) + end_date = date(2022, 3, 3) # delta time - delta = timedelta(days=1) + delta = timedelta(days=7) token_api = os.environ.get("TOKEN_API") test_api = MyApi(token_api) # iterate over range of dates while start_date <= end_date: # print(start_date, end="\n") - query_date = start_date.strftime('%Y-%m-%d') - create_db_observations(query_date, test_api, pg_session, slope_dataset) - start_date += delta + query_date_start: str = start_date.strftime('%Y-%m-%d') + end_date_temp: date = start_date + delta # (plus 7 days) + if end_date_temp > end_date: + end_date_temp = end_date + query_date_end: str = end_date_temp.strftime('%Y-%m-%d') + create_db_observations(sensor, query_date_start, query_date_end, test_api, pg_session, slope_dataset) + # for next loop step set new start_date (1 day greate then last end_date) + start_date = end_date_temp + timedelta(days=1) pg_session.commit() - # for i in rrule(DAILY , dtstart=start_date,until=end_date): - # print(i.strftime('%Y%b%d'),sep='\n') - # query_date = "2022-02-28" # create_db_observations(query_date, test_api, pg_session) # query_date_obj = datetime.strptime(query_date, "%Y-%m-%d") @@ -117,13 +119,20 @@ def main(): # pg_session.commit() -def create_db_observations(query_date, test_api, pg_session, dataset: Dataset): +def create_db_observations(sensor, query_date_start, query_date_end, test_api, pg_session, dataset: Dataset): ''' to do ''' - query_date_obj = datetime.strptime(query_date, "%Y-%m-%d") - data = test_api.getSensorData("inclino1_14", query_date) + query_date_start_obj = datetime.strptime(query_date_start, "%Y-%m-%d") + query_date_end_obj = datetime.strptime(query_date_end, "%Y-%m-%d") + data = test_api.getSensorData(sensor, query_date_start, query_date_end) observation_array = (data['FeatureCollection'] ['Features'][0]['geometry']['properties'][0]) # print(observation_array) + result = ( + pg_session.query(Observation.value_identifier) + .filter(Observation.fk_dataset_id == dataset.id) + .all() + ) + value_identifier_db_list: List[str] = list(chain(*result)) max_id = pg_session.query(func.max(Observation.id)).scalar() if max_id is None: @@ -132,19 +141,21 @@ def create_db_observations(query_date, test_api, pg_session, dataset: Dataset): for observation_json in observation_array: ob_date_time = observation_json.get('DateTime') datetime_obj = datetime.strptime(ob_date_time, "%Y-%m-%dT%H:%M:%S.%fZ") - if datetime_obj.date() != query_date_obj.date(): + if datetime_obj.date() < query_date_start_obj.date(): + continue + if datetime_obj.date() > query_date_end_obj.date(): continue ob_value = observation_json.get('Value') if ob_value is None: continue # max_id = max_id + 1 max_id = create_observation( - observation_json, pg_session, max_id, dataset) + observation_json, pg_session, max_id, dataset, value_identifier_db_list) # pg_session.commit() - print("observations for date " + query_date + "succesfully imported \n") + print("observations for date " + query_date_start + " to " + query_date_end + " succesfully imported \n") -def create_observation(observation_json: ObservationSchema, db_session, max_id, dataset: Dataset): +def create_observation(observation_json: ObservationSchema, db_session, max_id, dataset: Dataset, value_identifier_db_list): """ This function creates a new observation in the people structure based on the passed-in observation data @@ -155,14 +166,15 @@ def create_observation(observation_json: ObservationSchema, db_session, max_id, ob_id: str = str(observation_json.get('id')) # db_session = create_pg_session() - existing_observation: bool = ( - db_session.query(Observation) - .filter(Observation.value_identifier == ob_id) - .one_or_none() - ) + # existing_observation: bool = ( + # db_session.query(Observation) + # .filter(Observation.value_identifier == ob_id) + # .one_or_none() + # ) + existing_observation: bool = ob_id in value_identifier_db_list # Can we insert this observation? - if existing_observation is None: + if existing_observation is False: max_id += 1 # Create a person instance using the schema and the passed in person schema = ObservationSchema() @@ -170,8 +182,8 @@ def create_observation(observation_json: ObservationSchema, db_session, max_id, new_observation: Observation = schema.load(observation_json) new_observation.id = max_id new_observation.sta_identifier = str(uuid.uuid4()) - new_observation.sampling_time_start=new_observation.result_time - new_observation.sampling_time_end=new_observation.result_time + new_observation.sampling_time_start = new_observation.result_time + new_observation.sampling_time_end = new_observation.result_time new_observation.fk_dataset_id = dataset.id # Add the person to the database diff --git a/gschliefgraben_glasfaser/my_api.py b/gschliefgraben_glasfaser/my_api.py index f6fa8cd..3adb6a9 100644 --- a/gschliefgraben_glasfaser/my_api.py +++ b/gschliefgraben_glasfaser/my_api.py @@ -65,10 +65,10 @@ class MyApi(): # self.access_token = res.json()['access_token'] # else: # # Token expired -> re-authenticate - def getSensorData(self, sensor: string, date): + def getSensorData(self, sensor: string, date_start, date_end): ''' request observations''' try: - request = self.session.get('https://api.dgnss-sensors.com/gschliefgraben?sensors=(\''+sensor+ '\')&start='+date+'&end='+date, + request = self.session.get('https://api.dgnss-sensors.com/gschliefgraben?sensors=(\''+sensor+ '\')&start=' + date_start+ '&end=' + date_end, headers={ 'cache-control': 'no-cache', 'Content-Type': 'application/x-www-form-urlencoded', diff --git a/gschliefgraben_glasfaser/update_daily_cron.py b/gschliefgraben_glasfaser/update_daily_cron.py new file mode 100644 index 0000000..77d2fed --- /dev/null +++ b/gschliefgraben_glasfaser/update_daily_cron.py @@ -0,0 +1,142 @@ +''' +Tutorial link: https://realpython.com/flask-connexion-rest-api-part-2/ +https://github.com/realpython/materials/blob/master/flask-connexion-rest-part-2/version_1/people.py +Sqlalchemy version: 1.2.15 +Python version: 3.10 +''' + +import os +import uuid +from sqlalchemy.orm import session +from sqlalchemy import func +# from db.pg_models import Platform +from gschliefgraben_glasfaser.models import ObservationSchema, Observation, create_pg_session, Dataset, Procedure, Phenomenon, Platform +from gschliefgraben_glasfaser.my_api import MyApi +from datetime import datetime + +def main(): + ''' main method ''' + pg_session: session = create_pg_session() + platform_sta_identifier = "gschliefgraben_glasfaser" + # sensor_list = ["inclino1_14", "inclino1_02"] + sensor_list = os.environ.get("GLASFASER_GSCHLIEFGRABEN_SENSORS") + + # this will print elements along with their index value + for sensor in enumerate(sensor_list): + pg_query = pg_session.query(Dataset) \ + .join(Procedure) \ + .join(Phenomenon) \ + .filter(Procedure.sta_identifier == sensor.lower()) + slope_dataset: Dataset = pg_query.filter( + Phenomenon.sta_identifier == "Slope").first() + if not slope_dataset: + print("Sensor " + sensor + " ist noch nicht angelegt!") + exit() + if not slope_dataset.is_published: + slope_dataset.is_published = 1 + slope_dataset.is_hidden = 0 + slope_dataset.dataset_type = "timeseries" + slope_dataset.observation_type = "simple" + slope_dataset.value_type = "quantity" + pg_session.commit() + + platform_exists: bool = pg_session.query(Platform.id).filter_by( + sta_identifier = platform_sta_identifier).scalar() is not None + if platform_exists: + sensor_platform = pg_session.query(Platform.id) \ + .filter(Platform.sta_identifier == platform_sta_identifier) \ + .first() + slope_dataset.fk_platform_id = sensor_platform.id + + # create all the observation for the given sensor names + create_observations(sensor, slope_dataset) + +def create_observations(sensor: str, slope_dataset: Dataset): + ''' create_observations method for given sensor ''' + + pg_session: session = create_pg_session() + + # create access token + token_api = os.environ.get("TOKEN_API") + test_api = MyApi(token_api) + + # The size of each step in days + # consider the start date as 2021-february 1 st + start_date = datetime.today() + query_date = start_date.strftime('%Y-%m-%d') + create_db_observations(sensor, query_date, test_api, pg_session, slope_dataset) + pg_session.commit() + +def create_db_observations(sensor: str, query_date, test_api, pg_session, dataset: Dataset): + ''' to do ''' + query_date_obj = datetime.strptime(query_date, "%Y-%m-%d") + data = test_api.getSensorData(sensor, query_date, query_date) + observation_array = (data['FeatureCollection'] + ['Features'][0]['geometry']['properties'][0]) + # print(observation_array) + + max_id = pg_session.query(func.max(Observation.id)).scalar() + if max_id is None: + max_id = -1 + # pg_session.bulk_save_objects(observations) + for observation_json in observation_array: + ob_date_time = observation_json.get('DateTime') + datetime_obj = datetime.strptime(ob_date_time, "%Y-%m-%dT%H:%M:%S.%fZ") + if datetime_obj.date() != query_date_obj.date(): + continue + ob_value = observation_json.get('Value') + if ob_value is None: + continue + # max_id = max_id + 1 + max_id = create_observation( + observation_json, pg_session, max_id, dataset) + # pg_session.commit() + print("observations for date " + query_date + " succesfully imported \n") + + +def create_observation(observation_json: ObservationSchema, db_session, max_id, dataset: Dataset): + """ + This function creates a new observation in the people structure + based on the passed-in observation data + :param observation: person to create in people structure + :return: 201 on success, observation on person exists + """ + + ob_id: str = str(observation_json.get('id')) + # db_session = create_pg_session() + + existing_observation: bool = ( + db_session.query(Observation) + .filter(Observation.value_identifier == ob_id, Observation.fk_dataset_id == dataset.id) + .one_or_none() + ) + + # Can we insert this observation? + if existing_observation is None: + max_id += 1 + # Create a person instance using the schema and the passed in person + schema = ObservationSchema() + # deserialize to python object + new_observation: Observation = schema.load(observation_json) + new_observation.id = max_id + new_observation.sta_identifier = str(uuid.uuid4()) + new_observation.sampling_time_start=new_observation.result_time + new_observation.sampling_time_end=new_observation.result_time + new_observation.fk_dataset_id = dataset.id + + # Add the person to the database + db_session.add(new_observation) + # dataset.observations.append(new_observation) + # db_session.commit() + + # Serialize and return the newly created person in the response + # data = schema.dump(new_observation) + # return data, 201 + return max_id + # Otherwise, nope, person exists already + else: + print(409, f'Observation {ob_id} exists already') + return max_id + +if __name__ == "__main__": + main() diff --git a/insert_sensor/InsertGschliefgraben.xml b/insert_sensor/InsertGschliefgraben.xml index 5b303bc..86573a7 100644 --- a/insert_sensor/InsertGschliefgraben.xml +++ b/insert_sensor/InsertGschliefgraben.xml @@ -54,20 +54,8 @@ - - - Roll - - - - - - InSystemTemperature - - - - + diff --git a/insert_sensor/execute.py b/insert_sensor/execute.py index 695cee3..f5bbd22 100644 --- a/insert_sensor/execute.py +++ b/insert_sensor/execute.py @@ -50,10 +50,10 @@ def main(): ####################### Gschliefgraben Glasfaser offering = Offering( "https://geomon.geologie.ac.at/52n-sos-webapp/api/offerings/", - "inclino1_14", - "Inklinometer inclino1_14, Gschliefgraben Glasfaser" + "inclino1_02", + "Inklinometer inclino1_02, Gschliefgraben Glasfaser" ) - procedure = Procedure( "inclino1_14","inclino1_14") + procedure = Procedure( "inclino1_02","inclino1_02") foi = FoI("degree", "m", (47.910849, 13.774966, 0.0), "FBGuard23", "Glasfaser Untersuchungen am Gschliefgraben (Gmunden)") diff --git a/insert_sensor/transactional.py b/insert_sensor/transactional.py index fd892ec..29803ab 100644 --- a/insert_sensor/transactional.py +++ b/insert_sensor/transactional.py @@ -79,7 +79,7 @@ def insert_sensor(offering, procedure, foi, sensor_type): "service": "SOS", "version": "2.0.0", "procedureDescriptionFormat": "http://www.opengis.net/sensorml/2.0", - "procedureDescription": f'{procedure_identifier}shortName{procedure_name}{offering_label}{offering_name}featuresOfInterest{feature_id}{feature_name}{coordinates}SlopeRollInSystemTemperature{cordX}{cordY}{height}', + "procedureDescription": f'{procedure_identifier}shortName{procedure_name}{offering_label}{offering_name}featuresOfInterest{feature_id}{feature_name}{coordinates}Slope{cordX}{cordY}{height}', "observableProperty": [ "Slope", "Roll", diff --git a/notes.txt b/notes.txt index 9864bda..3a1da10 100644 --- a/notes.txt +++ b/notes.txt @@ -1,5 +1,5 @@ pip freeze > requirements.txt -pip install -f ./requirements.txt +pip install -r requirements.txt =========================================================================================== python -m venv .venv d:/Software/geomon/.venv/Scripts/python.exe -m pip install -U pylint diff --git a/requirements.txt b/requirements.txt index 2718fd8329c956344f72a111a240150a9cf03239..7afca30c4adbb409bdd3483ca1403f791061b60e 100644 GIT binary patch delta 24 fcmcb`F^ywG79)2qLn1>lLkUAFL(%4)j0>3nT)ziH delta 92 zcmbQnaf@R^7NdVEgAqeALn=cOgCTOnZ#uO-O4&)^R bVG4r@kW2%Lm;go6z|a^dV!64LaUl}`9bgc`