geomon/gschliefgraben_glasfaser/main.py

217 lines
7.5 KiB
Python
Raw Normal View History

'''
Tutorial link: https://realpython.com/flask-connexion-rest-api-part-2/
https://github.com/realpython/materials/blob/master/flask-connexion-rest-part-2/version_1/people.py
Sqlalchemy version: 1.2.15
Python version: 3.7
'''
import os
from tokenize import String
import uuid
# import sys, inspect
# currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
# parentdir = os.path.dirname(currentdir)
# sys.path.insert(0, parentdir)
2022-02-28 16:25:48 +00:00
# import requests
from sqlalchemy.orm import session
from sqlalchemy import func
from gschliefgraben_glasfaser.models import ObservationSchema, Person, PersonSchema, Observation, create_pg_session
2022-02-28 16:25:48 +00:00
from gschliefgraben_glasfaser.my_api import MyApi
from datetime import datetime, date, timedelta
# from db.pg_models import create_pg_session
#from models import Person, PersonSchema
# response = requests.get('https://api.com/')
# print(response) # shows the response's HTTP status code
# print(response.json()) # shows the response's JSON response body, if it has one
# print(response.content) # get the data content of the response
def main():
''' main method '''
db_user = os.environ.get("POSTGIS_DBUSER")
print(db_user)
pg_session: session = create_pg_session()
2022-02-28 16:25:48 +00:00
# pg_person: Person = pg_session.query(Person).first()
observation: Observation = pg_session.query(Observation).first()
# print(pg_person)
# serialize db data to json
2022-02-28 16:25:48 +00:00
# person_schema = PersonSchema()
# dump_data = person_schema.dump(pg_person)
# print(dump_data)
2022-02-28 16:25:48 +00:00
# serialize db data to json
observation_schema = ObservationSchema()
dump_data = observation_schema.dump(observation)
print(dump_data)
# # deserialize to db model
# load_data: Person = person_schema.load(dump_data)
# print(load_data)
# request ortmann api
# response =
# requests.get('https://api.dgnss-sensors.com/gschliefgraben?sensors=("inclino1_14")',
# headers={
# 'Authorization': 'Bearer' + token,
# 'cache-control': 'no-cache',
# 'Content-Type': 'application/x-www-form-urlencoded',
# 'accept': 'application/json'
# },
# data='grant_type=client_credentials&scope=gschliefgraben')
# print(response)
# The size of each step in days
# consider the start date as 2021-february 1 st
start_date = date(2021, 2, 28)
# consider the end date as 2021-march 1 st
end_date = date(2022, 3, 1)
# delta time
delta = timedelta(days=1)
token_api = os.environ.get("TOKEN_API")
2022-02-28 16:25:48 +00:00
test_api = MyApi(token_api)
# iterate over range of dates
while start_date <= end_date:
# print(start_date, end="\n")
query_date = start_date.strftime('%Y-%m-%d')
create_db_observations(query_date, test_api, pg_session)
start_date += delta
# for i in rrule(DAILY , dtstart=start_date,until=end_date):
# print(i.strftime('%Y%b%d'),sep='\n')
# query_date = "2022-02-28"
# create_db_observations(query_date, test_api, pg_session)
# query_date_obj = datetime.strptime(query_date, "%Y-%m-%d")
# data = test_api.getSensorData("inclino1_14", query_date)
# observation_array = (data['FeatureCollection']
# ['Features'][0]['geometry']['properties'][0])
# print(observation_array)
# max_id = pg_session.query(func.max(Observation.id)).scalar()
# if max_id is None:
# max_id = -1
# # pg_session.bulk_save_objects(observations)
# for observation_json in observation_array:
# ob_date_time = observation_json.get('DateTime')
# datetime_obj = datetime.strptime(ob_date_time, "%Y-%m-%dT%H:%M:%S.%fZ")
# if datetime_obj.date() != query_date_obj.date():
# continue
# max_id = max_id + 1
# create_observation(observation_json, pg_session, max_id)
# pg_session.commit()
def create_db_observations(query_date, test_api, pg_session):
''' to do '''
query_date_obj = datetime.strptime(query_date, "%Y-%m-%d")
data = test_api.getSensorData("inclino1_14", query_date)
observation_array = (data['FeatureCollection']
['Features'][0]['geometry']['properties'][0])
# print(observation_array)
max_id = pg_session.query(func.max(Observation.id)).scalar()
if max_id is None:
max_id = -1
# pg_session.bulk_save_objects(observations)
for observation_json in observation_array:
ob_date_time = observation_json.get('DateTime')
datetime_obj = datetime.strptime(ob_date_time, "%Y-%m-%dT%H:%M:%S.%fZ")
if datetime_obj.date() != query_date_obj.date():
continue
ob_value = observation_json.get('Value')
if ob_value is None:
continue
max_id = max_id + 1
create_observation(observation_json, pg_session, max_id)
pg_session.commit()
print("observations for date " +query_date+ "succesfully imported \n")
def create_observation(observation_json: ObservationSchema, db_session, max_id):
"""
This function creates a new observation in the people structure
based on the passed-in observation data
:param observation: person to create in people structure
:return: 201 on success, observation on person exists
"""
ob_id = observation_json.get('id')
# db_session = create_pg_session()
existing_observation: bool = (
db_session.query(Observation)
.filter(Observation.id == ob_id)
.one_or_none()
)
# Can we insert this observation?
if existing_observation is None:
# Create a person instance using the schema and the passed in person
schema = ObservationSchema()
# deserialize to object
new_observation: Observation = schema.load(observation_json)
new_observation.id = max_id + 1
new_observation.sta_identifier = str(uuid.uuid4())
# Add the person to the database
db_session.add(new_observation)
# db_session.commit()
# Serialize and return the newly created person in the response
data = schema.dump(new_observation)
return data, 201
# Otherwise, nope, person exists already
else:
print(409, f'Observation {ob_id} exists already')
def create(person_json: PersonSchema):
"""
This function creates a new person in the people structure
based on the passed-in person data
:param person: person to create in people structure
:return: 201 on success, 406 on person exists
"""
login = person_json.get('login')
#lname = person.get('lname')
db_session = create_pg_session()
# existing_person = Person.query \
# .filter(Person.login == login) \
# .one_or_none()
existing_person: bool = (
db_session.query(Person)
.filter(Person.login == login)
.one_or_none()
)
# Can we insert this person?
if existing_person is None:
# Create a person instance using the schema and the passed in person
schema = PersonSchema()
# deserialize to object
new_person: Person = schema.load(person_json)
# Add the person to the database
db_session.add(new_person)
db_session.commit()
# Serialize and return the newly created person in the response
data = schema.dump(new_person)
return data, 201
# Otherwise, nope, person exists already
else:
print(409, f'Person {login} exists already')
if __name__ == "__main__":
main()