geomon/voegelsberg/import_tachymeter_observations.py

239 lines
13 KiB
Python
Raw Permalink Normal View History

''' module for importing observations '''
import csv
# import requests
from datetime import datetime
from typing import List
import uuid
from pyproj import Transformer
# from insert_sensor.wrapper import (Offering, FoI, Procedure)
from sqlalchemy.orm import session
2022-04-06 13:17:16 +00:00
from sqlalchemy import asc, desc
from db.models import (
Observation,
create_pg_session,
Dataset,
Procedure,
Phenomenon,
Platform,
Format
)
def main():
''' main method '''
pg_session: session = create_pg_session()
platform_sta_identifier = "voegelsberg_tachymeter"
with open('voegelsberg/data.txt', 'rt', encoding="utf-8") as csvfile:
spamreader = csv.DictReader(csvfile, delimiter=';', quotechar='"')
for row in spamreader:
# print(row)
sensor: str = row['Punktnummer']
pg_query = pg_session.query(Dataset) \
.join(Procedure) \
.join(Phenomenon) \
.filter(Procedure.sta_identifier == sensor)
location_dataset: Dataset = pg_query.filter(
Phenomenon.sta_identifier == "TachymeterLocation").first()
if not location_dataset:
print("Sensor " + sensor +
" ist noch nicht in der Datenbank angelegt!")
continue
platform_exists: bool = pg_session.query(Platform.id).filter_by(
sta_identifier=platform_sta_identifier).scalar() is not None
if not platform_exists:
sensor_platform = Platform()
# max_id = pg_session.query(func.max(Platform.id)).scalar()
# sensor_platform.id = max_id + 1
sensor_platform.sta_identifier = platform_sta_identifier.lower()
sensor_platform.identifier = platform_sta_identifier.lower()
sensor_platform.name = platform_sta_identifier.lower()
location_dataset.platform = sensor_platform
else:
sensor_platform = pg_session.query(Platform.id) \
.filter(Platform.sta_identifier == platform_sta_identifier) \
.first()
location_dataset.fk_platform_id = sensor_platform.id
format_exists: bool = pg_session.query(Format.id).filter_by(
definition="http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_GeometryObservation"
).scalar() is not None
if format_exists:
sensor_format = pg_session.query(Format.id) \
.filter(Format.definition ==
"http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_GeometryObservation"
) \
.first()
location_dataset.fk_format_id = sensor_format.id
pg_session.commit()
# offering = Offering(
# "https://geomon.geologie.ac.at/52n-sos-webapp/api/offerings/",
# sensor,
# "Vögelsberg Tachymeter"
# )
# procedure = Procedure(sensor, sensor)
# foi_name = "origin of " + sensor
# foi = FoI("degree", "m", (cord_x, cord_y, z_1),
# sensor, foi_name)
# xml = get_xml(offering, procedure, foi, result_time, identifier)
# print(xml)
2022-04-06 13:17:16 +00:00
successfully_inserted = create_observation(
location_dataset, row, pg_session)
# commit new observations:
if successfully_inserted:
if not location_dataset.is_published:
location_dataset.is_published = 1
location_dataset.is_hidden = 0
location_dataset.dataset_type = "timeseries"
# location_dataset.dataset_type = "trajectory"
location_dataset.observation_type = "simple"
# location_dataset.value_type = "geometry"
location_dataset.value_type = "text"
2022-04-06 13:17:16 +00:00
pg_session.commit()
# last_location_observation = pg_session.query(Observation) \
# .filter(Observation.fk_dataset_id == location_dataset.id) \
# .order_by(desc('sampling_time_start')) \
# .first()
# if last_location_observation is not None:
# location_dataset.last_time = last_location_observation.sampling_time_start
# #location_dataset.last_value = last_location_observation.value_quantity
# location_dataset.fk_last_observation_id = last_location_observation.id
2022-04-06 13:17:16 +00:00
# first_location_observation = pg_session.query(Observation) \
# .filter(Observation.fk_dataset_id == location_dataset.id) \
# .order_by(asc('sampling_time_start')) \
# .first()
# if first_location_observation is not None:
# location_dataset.first_time = first_location_observation.sampling_time_start
# # roll_dataset.first_value = first_location_observation.value_quantity
# location_dataset.fk_first_observation_id = first_location_observation.id
# pg_session.commit()
2022-04-06 13:17:16 +00:00
# for loop sensors end
actualize_first_last_observations()
2022-04-06 13:17:16 +00:00
pg_session.close()
def create_observation(location_dataset: Dataset, data, pg_session: session):
''' create observation in db'''
# print("Sesnor key exist in JSON data")
transprojr = Transformer.from_crs(31254, 4326, always_xy=True)
2022-04-06 13:17:16 +00:00
x_1, y_1, z_1 = (float(data['Y']), float(data['X']), float(data['H']))
cord_x, cord_y = map(float, transprojr.transform(x_1, y_1))
print((cord_x, cord_y)) # (11.597409730065536, 47.27196543449542)
sensor: str = data['Punktnummer']
zeitstempel = data['Epoche']
date_obj = datetime.strptime(zeitstempel, '%d.%m.%Y').isoformat()
existing_observation: bool = (
pg_session.query(Observation)
.filter(Observation.result_time ==
date_obj, Observation.fk_dataset_id == location_dataset.id)
.one_or_none()
)
# Can we insert this observation?
if existing_observation is None:
# insert bew observation
new_observation: Observation = Observation()
# new_observation.id = max_id
new_observation.sta_identifier = str(uuid.uuid4())
new_observation.result_time = date_obj
new_observation.sampling_time_start = new_observation.result_time
new_observation.sampling_time_end = new_observation.result_time
new_observation.value_type = "text"
new_observation.value_geometry = f'POINT({cord_x} {cord_y} {z_1})'
new_observation.value_text = '{"type":"Point","coordinates":['+ str(cord_x) +',' + str(cord_y) + ',' + str(z_1) + ']}'
new_observation.fk_dataset_id = location_dataset.id
pg_session.add(new_observation)
print(f"new observation with result time {new_observation.result_time} "
f"for drill hole {sensor} succesfully imported!")
return True
else:
print(f"observation with result time {existing_observation.result_time} "
f"for tachymeter {sensor} already exists!")
return False
def actualize_first_last_observations():
''' iterate through all datasets of Voregelsberg project area
and actualize last and first corresponding observations'''
pg_session: session = create_pg_session()
platform_sta_identifier = "voegelsberg_tachymeter"
# sensor_platform = pg_session.query(Platform.id) \
# .filter(Platform.sta_identifier == platform_sta_identifier) \
# .first()
voegelsberg_datasets: List[Dataset] = []
voegelsberg_datasets = pg_session.query(Dataset) \
.join(Procedure) \
.join(Phenomenon) \
.join(Platform) \
.filter(Platform.sta_identifier == platform_sta_identifier).all()
for location_dataset in voegelsberg_datasets:
last_location_observation = pg_session.query(Observation) \
.filter(Observation.fk_dataset_id == location_dataset.id) \
.order_by(desc('sampling_time_start')) \
.first()
if last_location_observation is not None:
location_dataset.last_time = last_location_observation.sampling_time_start
# location_dataset.last_value = last_location_observation.value_quantity
location_dataset.fk_last_observation_id = last_location_observation.id
first_location_observation = pg_session.query(Observation) \
.filter(Observation.fk_dataset_id == location_dataset.id) \
.order_by(asc('sampling_time_start')) \
.first()
if first_location_observation is not None:
location_dataset.first_time = first_location_observation.sampling_time_start
# roll_dataset.first_value = first_location_observation.value_quantity
location_dataset.fk_first_observation_id = first_location_observation.id
pg_session.commit()
def get_xml(offering, procedure, foi, result_time, identifier):
''' """
Prepares the body of a InsertSensor request for JSON biding.
:param offering: an instance of class Offering.Type object.
:param Procedure: instance of class Procedure. type object.
:param foi: feature of interest. Instance of FoI
:param sensor_type: SensorType object
:return: valid body for an InsertSensor request.
"""'''
offering_name = offering.name
# offering_label = offering.label
# procedure_name = procedure.name
procedure_identifier = procedure.id
# featureName = featureID = cordX = cordY = height = h_unit = z_unit = coordinates = ""
if foi is not None: # check if feature of interest should be declare
# feature_id = 'https://geomon.geologie.ac.at/52n-sos-webapp/api/features/' + \
# str(foi.fid) # URL format
cord_x = str(foi.x) # longitude degrees, float
cord_y = str(foi.y) # latitude degrees, float
cord_z = str(foi.z)
coordinates = cord_x + " " + cord_y + " " + cord_z
feature_id = foi.fid # "feature location"
feature_name = foi.name # "feature location"
else:
pass
xml = f'<?xml version="1.0" encoding="UTF-8"?><env:Envelope xmlns:env="http://www.w3.org/2003/05/soap-envelope" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.w3.org/2003/05/soap-envelope http://www.w3.org/2003/05/soap-envelope/soap-envelope.xsd"><env:Body><sos:InsertObservation service="SOS" version="2.0.0" xmlns:sos="http://www.opengis.net/sos/2.0" xmlns:swes="http://www.opengis.net/swes/2.0" xmlns:swe="http://www.opengis.net/swe/2.0" xmlns:sml="http://www.opengis.net/sensorML/1.0.1" xmlns:gml="http://www.opengis.net/gml/3.2" xmlns:xlink="http://www.w3.org/1999/xlink" xmlns:om="http://www.opengis.net/om/2.0" xmlns:sams="http://www.opengis.net/samplingSpatial/2.0" xmlns:sf="http://www.opengis.net/sampling/2.0" xmlns:xs="http://www.w3.org/2001/XMLSchema" xsi:schemaLocation="http://www.opengis.net/sos/2.0 http://schemas.opengis.net/sos/2.0/sos.xsd http://www.opengis.net/samplingSpatial/2.0 http://schemas.opengis.net/samplingSpatial/2.0/spatialSamplingFeature.xsd"><!-- multiple offerings are possible --><sos:offering>{offering_name}</sos:offering><sos:observation><om:OM_Observation gml:id="{identifier}"><om:type xlink:href="http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_GeometryObservation"/><om:phenomenonTime><gml:TimeInstant gml:id="phenomenonTime"><gml:timePosition>{result_time}</gml:timePosition></gml:TimeInstant></om:phenomenonTime><om:resultTime xlink:href="#phenomenonTime"/><om:procedure xlink:href="{procedure_identifier}"/><om:parameter><om:NamedValue><om:name xlink:href="http://www.opengis.net/def/param-name/OGC-OM/2.0/samplingGeometry"/><om:value xsi:type="gml:GeometryPropertyType"><gml:Point gml:id="SamplingPoint1"><gml:description>description</gml:description><gml:identifier codeSpace="">hereIdentifier</gml:identifier><gml:name>hereIam</gml:name><gml:pos srsName="http://www.opengis.net/def/crs/EPSG/0/4326">{coordinates}</gml:pos></gml:Point></om:value></om:NamedValue></om:parameter><om:observedProperty xlink:href="TachymeterLocation"/><om:featureOfInterest><sams:SF_SpatialSamplingFeature gml:id="ssf_instance"><gml:identifier codeSpace="">{feature_id}</gml:identifier><gml:name>{feature_name}</gml:name><sf:type xlink:href="http://www.opengis.net/def/samplingFeatureType/OGC-OM/2.0/SF_SamplingPoint"/><sf:sampledFeature xlink:href="http://www.opengis.net/def/nil/OGC/0/unknown"/><sams:shape><ns:Point xmlns:ns="http://www.opengis.net/gml/3.2" ns:id="Point_ssf_b3a826dd44012201b013c90c51da28c041f7a92e0cc47260eb9888f6a4e9f747"><ns:pos srsName="http://www.opengis.net/def/crs/EPSG/0/4326">11.597409730065536 47.27196543449542</ns:pos></ns:Point></sams:shape></sams:SF_SpatialSamplingFeature></om:featureOfInterest><om:result xsi:type="gml:GeometryPropertyType"><gml:Point gml:id="value"><gml:pos srsName="http://www.opengis.net/def/crs/EPSG/0/4326">{coordinates}</gml:pos></gml:Point></om:result></om:OM_Observation></sos:observation></sos:InsertObservation></env:Body></env:Envelope>'
return xml
if __name__ == '__main__':
main()