301 lines
12 KiB
Python
301 lines
12 KiB
Python
'''
|
|
Tutorial link: https://realpython.com/flask-connexion-rest-api-part-2/
|
|
https://github.com/realpython/materials/blob/master/flask-connexion-rest-part-2/version_1/people.py
|
|
Sqlalchemy version: 1.2.15
|
|
Python version: 3.7
|
|
'''
|
|
|
|
import os
|
|
import uuid
|
|
from typing import List
|
|
from itertools import chain
|
|
import json
|
|
# import sys, inspect
|
|
# currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
|
|
# parentdir = os.path.dirname(currentdir)
|
|
# sys.path.insert(0, parentdir)
|
|
# import requests
|
|
from datetime import datetime, date, timedelta
|
|
from dotenv import load_dotenv, find_dotenv
|
|
from sqlalchemy.orm import session
|
|
from sqlalchemy import func, asc, desc
|
|
# from db.pg_models import Platform
|
|
from db.models import (
|
|
ObservationSchema, Person, PersonSchema, Observation,
|
|
create_pg_session, Dataset, Procedure, Phenomenon, Platform, Format)
|
|
from gschliefgraben_glasfaser.my_api import MyApi
|
|
|
|
|
|
def main():
|
|
''' main method '''
|
|
pg_session: session = create_pg_session()
|
|
platform_sta_identifier = "gschliefgraben_glasfaser"
|
|
# sensor_list = ["inclino1_14", "inclino1_02"]
|
|
#sensor_list = os.environ.get("GLASFASER_GSCHLIEFGRABEN_SENSORS")
|
|
sensor_list = json.loads(os.environ['GLASFASER_GSCHLIEFGRABEN_SENSORS'])
|
|
|
|
# this will print elements along with their index value
|
|
for sensor in sensor_list:
|
|
|
|
pg_query = pg_session.query(Dataset) \
|
|
.join(Procedure) \
|
|
.join(Phenomenon) \
|
|
.filter(Procedure.sta_identifier == sensor.lower())
|
|
slope_dataset: Dataset = pg_query.filter(
|
|
Phenomenon.sta_identifier == "Slope").first()
|
|
if not slope_dataset:
|
|
print("Sensor " + sensor + " ist noch nicht angelegt!")
|
|
exit()
|
|
if not slope_dataset.is_published:
|
|
slope_dataset.is_published = 1
|
|
slope_dataset.is_hidden = 0
|
|
slope_dataset.dataset_type = "timeseries"
|
|
slope_dataset.observation_type = "simple"
|
|
slope_dataset.value_type = "quantity"
|
|
pg_session.commit()
|
|
|
|
platform_exists: bool = pg_session.query(Platform.id).filter_by(
|
|
sta_identifier=platform_sta_identifier).scalar() is not None
|
|
# if platform_exists:
|
|
# sensor_platform = pg_session.query(Platform.id) \
|
|
# .filter(Platform.sta_identifier == platform_sta_identifier) \
|
|
# .first()
|
|
# slope_dataset.fk_platform_id = sensor_platform.id
|
|
# else:
|
|
# exit()
|
|
if not platform_exists:
|
|
sensor_platform = Platform()
|
|
# max_id = pg_session.query(func.max(Platform.id)).scalar()
|
|
# sensor_platform.id = max_id + 1
|
|
sensor_platform.sta_identifier = platform_sta_identifier.lower()
|
|
sensor_platform.identifier = platform_sta_identifier.lower()
|
|
sensor_platform.name = platform_sta_identifier.lower()
|
|
slope_dataset.platform = sensor_platform
|
|
else:
|
|
sensor_platform = pg_session.query(Platform.id) \
|
|
.filter(Platform.sta_identifier == platform_sta_identifier) \
|
|
.first()
|
|
slope_dataset.fk_platform_id = sensor_platform.id
|
|
|
|
pg_session.commit()
|
|
|
|
format_exists: bool = pg_session.query(Format.id).filter_by(
|
|
definition="http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement"
|
|
).scalar() is not None
|
|
if format_exists:
|
|
sensor_format = pg_session.query(Format.id) \
|
|
.filter(Format.definition ==
|
|
"http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_Measurement") \
|
|
.first()
|
|
slope_dataset.fk_format_id = sensor_format.id
|
|
pg_session.commit()
|
|
|
|
# create all the observation for the given sensor names
|
|
create_observations(sensor, slope_dataset)
|
|
|
|
# update first and last observations for the dataset
|
|
first_slope_observation = pg_session.query(Observation) \
|
|
.filter(Observation.fk_dataset_id == slope_dataset.id) \
|
|
.order_by(asc('sampling_time_start')) \
|
|
.first()
|
|
if first_slope_observation is not None:
|
|
slope_dataset.first_time = first_slope_observation.sampling_time_start
|
|
slope_dataset.first_value = first_slope_observation.value_quantity
|
|
slope_dataset.fk_first_observation_id = first_slope_observation.id
|
|
last_slope_observation = pg_session.query(Observation) \
|
|
.filter(Observation.fk_dataset_id == slope_dataset.id) \
|
|
.order_by(desc('sampling_time_start')) \
|
|
.first()
|
|
if last_slope_observation is not None:
|
|
slope_dataset.last_time = last_slope_observation.sampling_time_start
|
|
slope_dataset.last_value = last_slope_observation.value_quantity
|
|
slope_dataset.fk_last_observation_id = last_slope_observation.id
|
|
|
|
pg_session.commit()
|
|
pg_session.close()
|
|
|
|
|
|
def create_observations(sensor: str, slope_dataset: Dataset):
|
|
''' create_observations method for given sensor '''
|
|
|
|
pg_session: session = create_pg_session()
|
|
|
|
# The size of each step in days
|
|
# consider the start date as 2021-february 1 st
|
|
start_date = date(2022, 1, 1)
|
|
# consider the end date as 2021-march 1 st
|
|
end_date = date(2022, 3, 20)
|
|
|
|
# delta time
|
|
delta = timedelta(days=7)
|
|
token_api = os.environ.get("TOKEN_API")
|
|
test_api = MyApi(token_api)
|
|
|
|
# iterate over range of dates
|
|
while start_date <= end_date:
|
|
# print(start_date, end="\n")
|
|
query_date_start: str = start_date.strftime('%Y-%m-%d')
|
|
end_date_temp: date = start_date + delta # (plus 7 days)
|
|
if end_date_temp > end_date:
|
|
end_date_temp = end_date
|
|
query_date_end: str = end_date_temp.strftime('%Y-%m-%d')
|
|
create_db_observations(
|
|
sensor, query_date_start, query_date_end, test_api, pg_session, slope_dataset)
|
|
# for next loop step set new start_date (1 day greate then last end_date)
|
|
start_date = end_date_temp + timedelta(days=1)
|
|
pg_session.commit()
|
|
|
|
# query_date = "2022-02-28"
|
|
# create_db_observations(query_date, test_api, pg_session)
|
|
# query_date_obj = datetime.strptime(query_date, "%Y-%m-%d")
|
|
# data = test_api.getSensorData("inclino1_14", query_date)
|
|
# observation_array = (data['FeatureCollection']
|
|
# ['Features'][0]['geometry']['properties'][0])
|
|
# print(observation_array)
|
|
|
|
# max_id = pg_session.query(func.max(Observation.id)).scalar()
|
|
# if max_id is None:
|
|
# max_id = -1
|
|
# # pg_session.bulk_save_objects(observations)
|
|
# for observation_json in observation_array:
|
|
# ob_date_time = observation_json.get('DateTime')
|
|
# datetime_obj = datetime.strptime(ob_date_time, "%Y-%m-%dT%H:%M:%S.%fZ")
|
|
# if datetime_obj.date() != query_date_obj.date():
|
|
# continue
|
|
# max_id = max_id + 1
|
|
# create_observation(observation_json, pg_session, max_id)
|
|
|
|
# pg_session.commit()
|
|
|
|
|
|
def create_db_observations(sensor, query_date_start, query_date_end, test_api,
|
|
pg_session, dataset: Dataset):
|
|
''' to do '''
|
|
query_date_start_obj = datetime.strptime(query_date_start, "%Y-%m-%d")
|
|
query_date_end_obj = datetime.strptime(query_date_end, "%Y-%m-%d")
|
|
data = test_api.getSensorData(sensor, query_date_start, query_date_end)
|
|
observation_array = (data['FeatureCollection']
|
|
['Features'][0]['geometry']['properties'][0])
|
|
# print(observation_array)
|
|
result = (
|
|
pg_session.query(Observation.value_identifier)
|
|
.filter(Observation.fk_dataset_id == dataset.id)
|
|
.all()
|
|
)
|
|
value_identifier_db_list: List[str] = list(chain(*result))
|
|
|
|
max_id = pg_session.query(func.max(Observation.id)).scalar()
|
|
if max_id is None:
|
|
max_id = -1
|
|
# pg_session.bulk_save_objects(observations)
|
|
for observation_json in observation_array:
|
|
ob_date_time = observation_json.get('DateTime')
|
|
datetime_obj = datetime.strptime(ob_date_time, "%Y-%m-%dT%H:%M:%S.%fZ")
|
|
if datetime_obj.date() < query_date_start_obj.date():
|
|
continue
|
|
if datetime_obj.date() > query_date_end_obj.date():
|
|
continue
|
|
ob_value = observation_json.get('Value')
|
|
if ob_value is None:
|
|
continue
|
|
# max_id = max_id + 1
|
|
max_id = create_observation(
|
|
observation_json, pg_session, max_id, dataset, value_identifier_db_list)
|
|
# pg_session.commit()
|
|
print("observations for date " + query_date_start +
|
|
" to " + query_date_end + " for sensor " + sensor + " succesfully imported \n")
|
|
|
|
|
|
def create_observation(observation_json: ObservationSchema, db_session,
|
|
max_id, dataset: Dataset, value_identifier_db_list):
|
|
"""
|
|
This function creates a new observation in the people structure
|
|
based on the passed-in observation data
|
|
:param observation: observation to create in people structure
|
|
:return: 201 on success, observation on person exists
|
|
"""
|
|
|
|
ob_id: str = str(observation_json.get('id'))
|
|
# db_session = create_pg_session()
|
|
|
|
# existing_observation: bool = (
|
|
# db_session.query(Observation)
|
|
# .filter(Observation.value_identifier == ob_id)
|
|
# .one_or_none()
|
|
# )
|
|
existing_observation: bool = ob_id in value_identifier_db_list
|
|
|
|
# Can we insert this observation?
|
|
if existing_observation is False:
|
|
max_id += 1
|
|
# Create a person instance using the schema and the passed in person
|
|
schema = ObservationSchema()
|
|
# deserialize to python object
|
|
new_observation: Observation = schema.load(observation_json)
|
|
# new_observation.id = max_id
|
|
new_observation.sta_identifier = str(uuid.uuid4())
|
|
new_observation.sampling_time_start = new_observation.result_time
|
|
new_observation.sampling_time_end = new_observation.result_time
|
|
new_observation.fk_dataset_id = dataset.id
|
|
|
|
# Add the person to the database
|
|
db_session.add(new_observation)
|
|
# dataset.observations.append(new_observation)
|
|
# db_session.commit()
|
|
|
|
# Serialize and return the newly created person in the response
|
|
# data = schema.dump(new_observation)
|
|
# return data, 201
|
|
return max_id
|
|
# Otherwise, nope, person exists already
|
|
else:
|
|
# print(409, f'Observation {ob_id} exists already')
|
|
return max_id
|
|
|
|
|
|
def create(person_json: PersonSchema):
|
|
"""
|
|
This function creates a new person in the people structure
|
|
based on the passed-in person data
|
|
:param person: person to create in people structure
|
|
:return: 201 on success, 406 on person exists
|
|
"""
|
|
|
|
login = person_json.get('login')
|
|
#lname = person.get('lname')
|
|
db_session = create_pg_session()
|
|
|
|
# existing_person = Person.query \
|
|
# .filter(Person.login == login) \
|
|
# .one_or_none()
|
|
existing_person: bool = (
|
|
db_session.query(Person)
|
|
.filter(Person.login == login)
|
|
.one_or_none()
|
|
)
|
|
|
|
# Can we insert this person?
|
|
if existing_person is None:
|
|
# Create a person instance using the schema and the passed in person
|
|
schema = PersonSchema()
|
|
# deserialize to object
|
|
new_person: Person = schema.load(person_json)
|
|
|
|
# Add the person to the database
|
|
db_session.add(new_person)
|
|
db_session.commit()
|
|
|
|
# Serialize and return the newly created person in the response
|
|
data = schema.dump(new_person)
|
|
return data, 201
|
|
# Otherwise, nope, person exists already
|
|
else:
|
|
print(409, f'Person {login} exists already')
|
|
|
|
|
|
if __name__ == "__main__":
|
|
load_dotenv(find_dotenv())
|
|
sensor_list1 = os.environ.get('GLASFASER_GSCHLIEFGRABEN_SENSORS', []).replace(r'\n', '\n')
|
|
print(f'sensors: {sensor_list1} .')
|
|
main()
|