- add daily cron job for inserting sensor data

This commit is contained in:
Arno Kaimbacher 2022-03-04 14:47:36 +01:00
parent f104e9e74b
commit fdc5da7373
9 changed files with 228 additions and 112 deletions

View File

@ -1,26 +0,0 @@
from sqlalchemy import create_engine
from sqlalchemy import MetaData
from sqlalchemy import Table
from sqlalchemy import Column
from sqlalchemy import Integer, String
db_url = 'sqlite:///db.sqlite'
engine = create_engine(db_url )
# Create a metadata instance
metadata = MetaData(engine)
# Declare a table
table = Table('Example',metadata,
Column('id',Integer, primary_key=True),
Column('name',String))
# Create all tables
students = Table(
'students', metadata,
Column('id', Integer, primary_key = True),
Column('name', String),
Column('lastname', String),
)
metadata.create_all(engine)

View File

@ -7,6 +7,8 @@ Python version: 3.7
import os
import uuid
from typing import List
from itertools import chain
# import sys, inspect
# currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
# parentdir = os.path.dirname(currentdir)
@ -25,45 +27,25 @@ from datetime import datetime, date, timedelta
# print(response.json()) # shows the response's JSON response body, if it has one
# print(response.content) # get the data content of the response
def main():
''' main method '''
db_user = os.environ.get("POSTGIS_DBUSER")
print(db_user)
pg_session: session = create_pg_session()
# pg_person: Person = pg_session.query(Person).first()
observation: Observation = pg_session.query(Observation).first()
# print(pg_person)
platform_sta_identifier = "gschliefgraben_glasfaser"
# sensor_list = ["inclino1_14", "inclino1_02"]
sensor_list = os.environ.get("GLASFASER_GSCHLIEFGRABEN_SENSORS")
# serialize db data to json
# person_schema = PersonSchema()
# dump_data = person_schema.dump(pg_person)
# print(dump_data)
# serialize db data to json
# observation_schema = ObservationSchema()
# dump_data = observation_schema.dump(observation)
# print(dump_data)
# this will print elements along with their index value
for sensor in enumerate(sensor_list):
# request ortmann api
# response =
# requests.get('https://api.dgnss-sensors.com/gschliefgraben?sensors=("inclino1_14")',
# headers={
# 'Authorization': 'Bearer' + token,
# 'cache-control': 'no-cache',
# 'Content-Type': 'application/x-www-form-urlencoded',
# 'accept': 'application/json'
# },
# data='grant_type=client_credentials&scope=gschliefgraben')
# print(response)
sensor: str = "inclino1_14"
pg_query = pg_session.query(Dataset) \
.join(Procedure) \
.join(Phenomenon) \
.filter(Procedure.sta_identifier == sensor.lower())
slope_dataset: Dataset = pg_query.filter(
Phenomenon.sta_identifier == "Slope").first()
if not slope_dataset:
print("Sensor " + sensor + " ist noch nicht angelegt!")
exit()
if not slope_dataset.is_published:
slope_dataset.is_published = 1
slope_dataset.is_hidden = 0
@ -72,28 +54,48 @@ def main():
slope_dataset.value_type = "quantity"
pg_session.commit()
platform_exists: bool = pg_session.query(Platform.id).filter_by(
sta_identifier = platform_sta_identifier).scalar() is not None
if platform_exists:
sensor_platform = pg_session.query(Platform.id) \
.filter(Platform.sta_identifier == platform_sta_identifier) \
.first()
slope_dataset.fk_platform_id = sensor_platform.id
else:
exit()
# create all the observation for the given sensor names
create_observations(sensor, slope_dataset)
def create_observations(sensor: str, slope_dataset: Dataset):
''' create_observations method for given sensor '''
pg_session: session = create_pg_session()
# The size of each step in days
# consider the start date as 2021-february 1 st
start_date = date(2022, 1, 1)
# consider the end date as 2021-march 1 st
end_date = date(2022, 3, 1)
end_date = date(2022, 3, 3)
# delta time
delta = timedelta(days=1)
delta = timedelta(days=7)
token_api = os.environ.get("TOKEN_API")
test_api = MyApi(token_api)
# iterate over range of dates
while start_date <= end_date:
# print(start_date, end="\n")
query_date = start_date.strftime('%Y-%m-%d')
create_db_observations(query_date, test_api, pg_session, slope_dataset)
start_date += delta
query_date_start: str = start_date.strftime('%Y-%m-%d')
end_date_temp: date = start_date + delta # (plus 7 days)
if end_date_temp > end_date:
end_date_temp = end_date
query_date_end: str = end_date_temp.strftime('%Y-%m-%d')
create_db_observations(sensor, query_date_start, query_date_end, test_api, pg_session, slope_dataset)
# for next loop step set new start_date (1 day greate then last end_date)
start_date = end_date_temp + timedelta(days=1)
pg_session.commit()
# for i in rrule(DAILY , dtstart=start_date,until=end_date):
# print(i.strftime('%Y%b%d'),sep='\n')
# query_date = "2022-02-28"
# create_db_observations(query_date, test_api, pg_session)
# query_date_obj = datetime.strptime(query_date, "%Y-%m-%d")
@ -117,13 +119,20 @@ def main():
# pg_session.commit()
def create_db_observations(query_date, test_api, pg_session, dataset: Dataset):
def create_db_observations(sensor, query_date_start, query_date_end, test_api, pg_session, dataset: Dataset):
''' to do '''
query_date_obj = datetime.strptime(query_date, "%Y-%m-%d")
data = test_api.getSensorData("inclino1_14", query_date)
query_date_start_obj = datetime.strptime(query_date_start, "%Y-%m-%d")
query_date_end_obj = datetime.strptime(query_date_end, "%Y-%m-%d")
data = test_api.getSensorData(sensor, query_date_start, query_date_end)
observation_array = (data['FeatureCollection']
['Features'][0]['geometry']['properties'][0])
# print(observation_array)
result = (
pg_session.query(Observation.value_identifier)
.filter(Observation.fk_dataset_id == dataset.id)
.all()
)
value_identifier_db_list: List[str] = list(chain(*result))
max_id = pg_session.query(func.max(Observation.id)).scalar()
if max_id is None:
@ -132,19 +141,21 @@ def create_db_observations(query_date, test_api, pg_session, dataset: Dataset):
for observation_json in observation_array:
ob_date_time = observation_json.get('DateTime')
datetime_obj = datetime.strptime(ob_date_time, "%Y-%m-%dT%H:%M:%S.%fZ")
if datetime_obj.date() != query_date_obj.date():
if datetime_obj.date() < query_date_start_obj.date():
continue
if datetime_obj.date() > query_date_end_obj.date():
continue
ob_value = observation_json.get('Value')
if ob_value is None:
continue
# max_id = max_id + 1
max_id = create_observation(
observation_json, pg_session, max_id, dataset)
observation_json, pg_session, max_id, dataset, value_identifier_db_list)
# pg_session.commit()
print("observations for date " + query_date + "succesfully imported \n")
print("observations for date " + query_date_start + " to " + query_date_end + " succesfully imported \n")
def create_observation(observation_json: ObservationSchema, db_session, max_id, dataset: Dataset):
def create_observation(observation_json: ObservationSchema, db_session, max_id, dataset: Dataset, value_identifier_db_list):
"""
This function creates a new observation in the people structure
based on the passed-in observation data
@ -155,14 +166,15 @@ def create_observation(observation_json: ObservationSchema, db_session, max_id,
ob_id: str = str(observation_json.get('id'))
# db_session = create_pg_session()
existing_observation: bool = (
db_session.query(Observation)
.filter(Observation.value_identifier == ob_id)
.one_or_none()
)
# existing_observation: bool = (
# db_session.query(Observation)
# .filter(Observation.value_identifier == ob_id)
# .one_or_none()
# )
existing_observation: bool = ob_id in value_identifier_db_list
# Can we insert this observation?
if existing_observation is None:
if existing_observation is False:
max_id += 1
# Create a person instance using the schema and the passed in person
schema = ObservationSchema()
@ -170,8 +182,8 @@ def create_observation(observation_json: ObservationSchema, db_session, max_id,
new_observation: Observation = schema.load(observation_json)
new_observation.id = max_id
new_observation.sta_identifier = str(uuid.uuid4())
new_observation.sampling_time_start=new_observation.result_time
new_observation.sampling_time_end=new_observation.result_time
new_observation.sampling_time_start = new_observation.result_time
new_observation.sampling_time_end = new_observation.result_time
new_observation.fk_dataset_id = dataset.id
# Add the person to the database

View File

@ -65,10 +65,10 @@ class MyApi():
# self.access_token = res.json()['access_token']
# else:
# # Token expired -> re-authenticate
def getSensorData(self, sensor: string, date):
def getSensorData(self, sensor: string, date_start, date_end):
''' request observations'''
try:
request = self.session.get('https://api.dgnss-sensors.com/gschliefgraben?sensors=(\''+sensor+ '\')&start='+date+'&end='+date,
request = self.session.get('https://api.dgnss-sensors.com/gschliefgraben?sensors=(\''+sensor+ '\')&start=' + date_start+ '&end=' + date_end,
headers={
'cache-control': 'no-cache',
'Content-Type': 'application/x-www-form-urlencoded',

View File

@ -0,0 +1,142 @@
'''
Tutorial link: https://realpython.com/flask-connexion-rest-api-part-2/
https://github.com/realpython/materials/blob/master/flask-connexion-rest-part-2/version_1/people.py
Sqlalchemy version: 1.2.15
Python version: 3.10
'''
import os
import uuid
from sqlalchemy.orm import session
from sqlalchemy import func
# from db.pg_models import Platform
from gschliefgraben_glasfaser.models import ObservationSchema, Observation, create_pg_session, Dataset, Procedure, Phenomenon, Platform
from gschliefgraben_glasfaser.my_api import MyApi
from datetime import datetime
def main():
''' main method '''
pg_session: session = create_pg_session()
platform_sta_identifier = "gschliefgraben_glasfaser"
# sensor_list = ["inclino1_14", "inclino1_02"]
sensor_list = os.environ.get("GLASFASER_GSCHLIEFGRABEN_SENSORS")
# this will print elements along with their index value
for sensor in enumerate(sensor_list):
pg_query = pg_session.query(Dataset) \
.join(Procedure) \
.join(Phenomenon) \
.filter(Procedure.sta_identifier == sensor.lower())
slope_dataset: Dataset = pg_query.filter(
Phenomenon.sta_identifier == "Slope").first()
if not slope_dataset:
print("Sensor " + sensor + " ist noch nicht angelegt!")
exit()
if not slope_dataset.is_published:
slope_dataset.is_published = 1
slope_dataset.is_hidden = 0
slope_dataset.dataset_type = "timeseries"
slope_dataset.observation_type = "simple"
slope_dataset.value_type = "quantity"
pg_session.commit()
platform_exists: bool = pg_session.query(Platform.id).filter_by(
sta_identifier = platform_sta_identifier).scalar() is not None
if platform_exists:
sensor_platform = pg_session.query(Platform.id) \
.filter(Platform.sta_identifier == platform_sta_identifier) \
.first()
slope_dataset.fk_platform_id = sensor_platform.id
# create all the observation for the given sensor names
create_observations(sensor, slope_dataset)
def create_observations(sensor: str, slope_dataset: Dataset):
''' create_observations method for given sensor '''
pg_session: session = create_pg_session()
# create access token
token_api = os.environ.get("TOKEN_API")
test_api = MyApi(token_api)
# The size of each step in days
# consider the start date as 2021-february 1 st
start_date = datetime.today()
query_date = start_date.strftime('%Y-%m-%d')
create_db_observations(sensor, query_date, test_api, pg_session, slope_dataset)
pg_session.commit()
def create_db_observations(sensor: str, query_date, test_api, pg_session, dataset: Dataset):
''' to do '''
query_date_obj = datetime.strptime(query_date, "%Y-%m-%d")
data = test_api.getSensorData(sensor, query_date, query_date)
observation_array = (data['FeatureCollection']
['Features'][0]['geometry']['properties'][0])
# print(observation_array)
max_id = pg_session.query(func.max(Observation.id)).scalar()
if max_id is None:
max_id = -1
# pg_session.bulk_save_objects(observations)
for observation_json in observation_array:
ob_date_time = observation_json.get('DateTime')
datetime_obj = datetime.strptime(ob_date_time, "%Y-%m-%dT%H:%M:%S.%fZ")
if datetime_obj.date() != query_date_obj.date():
continue
ob_value = observation_json.get('Value')
if ob_value is None:
continue
# max_id = max_id + 1
max_id = create_observation(
observation_json, pg_session, max_id, dataset)
# pg_session.commit()
print("observations for date " + query_date + " succesfully imported \n")
def create_observation(observation_json: ObservationSchema, db_session, max_id, dataset: Dataset):
"""
This function creates a new observation in the people structure
based on the passed-in observation data
:param observation: person to create in people structure
:return: 201 on success, observation on person exists
"""
ob_id: str = str(observation_json.get('id'))
# db_session = create_pg_session()
existing_observation: bool = (
db_session.query(Observation)
.filter(Observation.value_identifier == ob_id, Observation.fk_dataset_id == dataset.id)
.one_or_none()
)
# Can we insert this observation?
if existing_observation is None:
max_id += 1
# Create a person instance using the schema and the passed in person
schema = ObservationSchema()
# deserialize to python object
new_observation: Observation = schema.load(observation_json)
new_observation.id = max_id
new_observation.sta_identifier = str(uuid.uuid4())
new_observation.sampling_time_start=new_observation.result_time
new_observation.sampling_time_end=new_observation.result_time
new_observation.fk_dataset_id = dataset.id
# Add the person to the database
db_session.add(new_observation)
# dataset.observations.append(new_observation)
# db_session.commit()
# Serialize and return the newly created person in the response
# data = schema.dump(new_observation)
# return data, 201
return max_id
# Otherwise, nope, person exists already
else:
print(409, f'Observation {ob_id} exists already')
return max_id
if __name__ == "__main__":
main()

View File

@ -54,18 +54,6 @@
<swe:uom code=\"deg\"/>
</swe:Quantity>
</sml:output>
<sml:output name=\"Roll\">
<swe:Quantity definition=\"Roll\">
<swe:label>Roll</swe:label>
<swe:uom code=\"deg\"/>
</swe:Quantity>
</sml:output>
<sml:output name=\"InSystemTemperature\">
<swe:Quantity definition=\"InSystemTemperature\">
<swe:label>InSystemTemperature</swe:label>
<swe:uom code=\"degC\"/>
</swe:Quantity>
</sml:output>
</sml:OutputList>
</sml:outputs>
<sml:position>

View File

@ -50,10 +50,10 @@ def main():
####################### Gschliefgraben Glasfaser
offering = Offering(
"https://geomon.geologie.ac.at/52n-sos-webapp/api/offerings/",
"inclino1_14",
"Inklinometer inclino1_14, Gschliefgraben Glasfaser"
"inclino1_02",
"Inklinometer inclino1_02, Gschliefgraben Glasfaser"
)
procedure = Procedure( "inclino1_14","inclino1_14")
procedure = Procedure( "inclino1_02","inclino1_02")
foi = FoI("degree", "m", (47.910849, 13.774966, 0.0),
"FBGuard23", "Glasfaser Untersuchungen am Gschliefgraben (Gmunden)")

View File

@ -79,7 +79,7 @@ def insert_sensor(offering, procedure, foi, sensor_type):
"service": "SOS",
"version": "2.0.0",
"procedureDescriptionFormat": "http://www.opengis.net/sensorml/2.0",
"procedureDescription": f'<sml:PhysicalSystem gml:id={off_name} xmlns:swes=\"http://www.opengis.net/swes/2.0\" xmlns:sos=\"http://www.opengis.net/sos/2.0\" xmlns:swe=\"http://www.opengis.net/swe/2.0\" xmlns:sml=\"http://www.opengis.net/sensorml/2.0\" xmlns:gml=\"http://www.opengis.net/gml/3.2\" xmlns:xlink=\"http://www.w3.org/1999/xlink\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:gco=\"http://www.isotc211.org/2005/gco\" xmlns:gmd=\"http://www.isotc211.org/2005/gmd\"><gml:identifier codeSpace=\"uniqueID\">{procedure_identifier}</gml:identifier><sml:identification><sml:IdentifierList><sml:identifier><sml:Term definition=\"urn:ogc:def:identifier:OGC:1.0:shortName\"><sml:label>shortName</sml:label><sml:value>{procedure_name}</sml:value></sml:Term></sml:identifier></sml:IdentifierList></sml:identification><sml:capabilities name=\"offerings\"><sml:CapabilityList><sml:capability name=\"offeringID\"><swe:Text definition=\"urn:ogc:def:identifier:OGC:offeringID\"><swe:label>{offering_label}</swe:label><swe:value>{offering_name}</swe:value></swe:Text></sml:capability></sml:CapabilityList></sml:capabilities><sml:featuresOfInterest><sml:FeatureList definition=\"http://www.opengis.net/def/featureOfInterest/identifier\"><swe:label>featuresOfInterest</swe:label><sml:feature><sams:SF_SpatialSamplingFeature xmlns:sams=\"http://www.opengis.net/samplingSpatial/2.0\" gml:id=\"ssf_b3a826dd44012201b01323232323041f7a92e0cc47260eb9888f6a4e9f747\"><gml:identifier codeSpace=\"http://www.opengis.net/def/nil/OGC/0/unknown\">{feature_id}</gml:identifier><gml:name codeSpace=\"http://www.opengis.net/def/nil/OGC/0/unknown\">{feature_name}</gml:name><sf:type xmlns:sf=\"http://www.opengis.net/sampling/2.0\" xlink:href=\"http://www.opengis.net/def/samplingFeatureType/OGC-OM/2.0/SF_SamplingPoint\"/><sf:sampledFeature xmlns:sf=\"http://www.opengis.net/sampling/2.0\" xlink:href=\"http://www.opengis.net/def/nil/OGC/0/unknown\"/><sams:shape><ns:Point xmlns:ns=\"http://www.opengis.net/gml/3.2\" ns:id=\"Point_ssf_b3a826dd44012201b013c90c51da28c041f7a92e0cc47260eb9888f6a4e9f747\"><ns:pos srsName=\"http://www.opengis.net/def/crs/EPSG/0/4326\">{coordinates}</ns:pos></ns:Point></sams:shape></sams:SF_SpatialSamplingFeature></sml:feature></sml:FeatureList></sml:featuresOfInterest><sml:outputs><sml:OutputList><sml:output name=\"Slope\"><swe:Quantity definition=\"Slope\"><swe:label>Slope</swe:label><swe:uom code=\"deg\"/></swe:Quantity></sml:output><sml:output name=\"Roll\"><swe:Quantity definition=\"Roll\"><swe:label>Roll</swe:label><swe:uom code=\"deg\"/></swe:Quantity></sml:output><sml:output name=\"InSystemTemperature\"><swe:Quantity definition=\"InSystemTemperature\"><swe:label>InSystemTemperature</swe:label><swe:uom code=\"degC\"/></swe:Quantity></sml:output></sml:OutputList></sml:outputs><sml:position><swe:Vector referenceFrame=\"urn:ogc:def:crs:EPSG::4326\"><swe:coordinate name=\"easting\"><swe:Quantity axisID=\"x\"><swe:uom code=\"degree\" /><swe:value>{cordX}</swe:value></swe:Quantity></swe:coordinate><swe:coordinate name=\"northing\"><swe:Quantity axisID=\"y\"><swe:uom code=\"degree\" /><swe:value>{cordY}</swe:value></swe:Quantity></swe:coordinate><swe:coordinate name=\"altitude\"><swe:Quantity axisID=\"z\"><swe:uom code=\"m\" /><swe:value>{height}</swe:value></swe:Quantity></swe:coordinate></swe:Vector></sml:position></sml:PhysicalSystem>',
"procedureDescription": f'<sml:PhysicalSystem gml:id={off_name} xmlns:swes=\"http://www.opengis.net/swes/2.0\" xmlns:sos=\"http://www.opengis.net/sos/2.0\" xmlns:swe=\"http://www.opengis.net/swe/2.0\" xmlns:sml=\"http://www.opengis.net/sensorml/2.0\" xmlns:gml=\"http://www.opengis.net/gml/3.2\" xmlns:xlink=\"http://www.w3.org/1999/xlink\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns:gco=\"http://www.isotc211.org/2005/gco\" xmlns:gmd=\"http://www.isotc211.org/2005/gmd\"><gml:identifier codeSpace=\"uniqueID\">{procedure_identifier}</gml:identifier><sml:identification><sml:IdentifierList><sml:identifier><sml:Term definition=\"urn:ogc:def:identifier:OGC:1.0:shortName\"><sml:label>shortName</sml:label><sml:value>{procedure_name}</sml:value></sml:Term></sml:identifier></sml:IdentifierList></sml:identification><sml:capabilities name=\"offerings\"><sml:CapabilityList><sml:capability name=\"offeringID\"><swe:Text definition=\"urn:ogc:def:identifier:OGC:offeringID\"><swe:label>{offering_label}</swe:label><swe:value>{offering_name}</swe:value></swe:Text></sml:capability></sml:CapabilityList></sml:capabilities><sml:featuresOfInterest><sml:FeatureList definition=\"http://www.opengis.net/def/featureOfInterest/identifier\"><swe:label>featuresOfInterest</swe:label><sml:feature><sams:SF_SpatialSamplingFeature xmlns:sams=\"http://www.opengis.net/samplingSpatial/2.0\" gml:id=\"ssf_b3a826dd44012201b01323232323041f7a92e0cc47260eb9888f6a4e9f747\"><gml:identifier codeSpace=\"http://www.opengis.net/def/nil/OGC/0/unknown\">{feature_id}</gml:identifier><gml:name codeSpace=\"http://www.opengis.net/def/nil/OGC/0/unknown\">{feature_name}</gml:name><sf:type xmlns:sf=\"http://www.opengis.net/sampling/2.0\" xlink:href=\"http://www.opengis.net/def/samplingFeatureType/OGC-OM/2.0/SF_SamplingPoint\"/><sf:sampledFeature xmlns:sf=\"http://www.opengis.net/sampling/2.0\" xlink:href=\"http://www.opengis.net/def/nil/OGC/0/unknown\"/><sams:shape><ns:Point xmlns:ns=\"http://www.opengis.net/gml/3.2\" ns:id=\"Point_ssf_b3a826dd44012201b013c90c51da28c041f7a92e0cc47260eb9888f6a4e9f747\"><ns:pos srsName=\"http://www.opengis.net/def/crs/EPSG/0/4326\">{coordinates}</ns:pos></ns:Point></sams:shape></sams:SF_SpatialSamplingFeature></sml:feature></sml:FeatureList></sml:featuresOfInterest><sml:outputs><sml:OutputList><sml:output name=\"Slope\"><swe:Quantity definition=\"Slope\"><swe:label>Slope</swe:label><swe:uom code=\"deg\"/></swe:Quantity></sml:output></sml:OutputList></sml:outputs><sml:position><swe:Vector referenceFrame=\"urn:ogc:def:crs:EPSG::4326\"><swe:coordinate name=\"easting\"><swe:Quantity axisID=\"x\"><swe:uom code=\"degree\" /><swe:value>{cordX}</swe:value></swe:Quantity></swe:coordinate><swe:coordinate name=\"northing\"><swe:Quantity axisID=\"y\"><swe:uom code=\"degree\" /><swe:value>{cordY}</swe:value></swe:Quantity></swe:coordinate><swe:coordinate name=\"altitude\"><swe:Quantity axisID=\"z\"><swe:uom code=\"m\" /><swe:value>{height}</swe:value></swe:Quantity></swe:coordinate></swe:Vector></sml:position></sml:PhysicalSystem>',
"observableProperty": [
"Slope",
"Roll",

View File

@ -1,5 +1,5 @@
pip freeze > requirements.txt
pip install -f ./requirements.txt
pip install -r requirements.txt
===========================================================================================
python -m venv .venv
d:/Software/geomon/.venv/Scripts/python.exe -m pip install -U pylint

Binary file not shown.