224 lines
12 KiB
Python
224 lines
12 KiB
Python
''' module for importing observations '''
|
|
import csv
|
|
# import requests
|
|
from datetime import datetime
|
|
from typing import List
|
|
import uuid
|
|
from pyproj import Transformer
|
|
# from insert_sensor.wrapper import (Offering, FoI, Procedure)
|
|
from sqlalchemy.orm import session
|
|
from sqlalchemy import asc, desc
|
|
from db.models import (
|
|
Observation,
|
|
create_pg_session,
|
|
Dataset,
|
|
Procedure,
|
|
Phenomenon,
|
|
Platform,
|
|
Format
|
|
)
|
|
|
|
|
|
def main():
|
|
''' main method '''
|
|
pg_session: session = create_pg_session()
|
|
platform_sta_identifier = "voegelsberg_tachymeter"
|
|
|
|
with open('voegelsberg/data.txt', 'rt', encoding="utf-8") as csvfile:
|
|
spamreader = csv.DictReader(csvfile, delimiter=';', quotechar='"')
|
|
for row in spamreader:
|
|
# print(row)
|
|
|
|
sensor: str = row['Punktnummer']
|
|
pg_query = pg_session.query(Dataset) \
|
|
.join(Procedure) \
|
|
.join(Phenomenon) \
|
|
.filter(Procedure.sta_identifier == sensor)
|
|
location_dataset: Dataset = pg_query.filter(
|
|
Phenomenon.sta_identifier == "TachymeterLocation").first()
|
|
if not location_dataset:
|
|
print("Sensor " + sensor +
|
|
" ist noch nicht in der Datenbank angelegt!")
|
|
continue
|
|
|
|
platform_exists: bool = pg_session.query(Platform.id).filter_by(
|
|
sta_identifier=platform_sta_identifier).scalar() is not None
|
|
if not platform_exists:
|
|
sensor_platform = Platform()
|
|
# max_id = pg_session.query(func.max(Platform.id)).scalar()
|
|
# sensor_platform.id = max_id + 1
|
|
sensor_platform.sta_identifier = platform_sta_identifier.lower()
|
|
sensor_platform.identifier = platform_sta_identifier.lower()
|
|
sensor_platform.name = platform_sta_identifier.lower()
|
|
location_dataset.platform = sensor_platform
|
|
else:
|
|
sensor_platform = pg_session.query(Platform.id) \
|
|
.filter(Platform.sta_identifier == platform_sta_identifier) \
|
|
.first()
|
|
location_dataset.fk_platform_id = sensor_platform.id
|
|
|
|
format_exists: bool = pg_session.query(Format.id).filter_by(
|
|
definition="http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_GeometryObservation"
|
|
).scalar() is not None
|
|
if format_exists:
|
|
sensor_format = pg_session.query(Format.id) \
|
|
.filter(Format.definition ==
|
|
"http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_GeometryObservation"
|
|
) \
|
|
.first()
|
|
location_dataset.fk_format_id = sensor_format.id
|
|
pg_session.commit()
|
|
successfully_inserted = create_observation(
|
|
location_dataset, row, pg_session)
|
|
|
|
# commit new observations:
|
|
if successfully_inserted:
|
|
if not location_dataset.is_published:
|
|
location_dataset.is_published = 1
|
|
location_dataset.is_hidden = 0
|
|
location_dataset.dataset_type = "trajectory"
|
|
location_dataset.observation_type = "simple"
|
|
location_dataset.value_type = "geometry"
|
|
pg_session.commit()
|
|
|
|
# last_location_observation = pg_session.query(Observation) \
|
|
# .filter(Observation.fk_dataset_id == location_dataset.id) \
|
|
# .order_by(desc('sampling_time_start')) \
|
|
# .first()
|
|
# if last_location_observation is not None:
|
|
# location_dataset.last_time = last_location_observation.sampling_time_start
|
|
# #location_dataset.last_value = last_location_observation.value_quantity
|
|
# location_dataset.fk_last_observation_id = last_location_observation.id
|
|
|
|
# first_location_observation = pg_session.query(Observation) \
|
|
# .filter(Observation.fk_dataset_id == location_dataset.id) \
|
|
# .order_by(asc('sampling_time_start')) \
|
|
# .first()
|
|
# if first_location_observation is not None:
|
|
# location_dataset.first_time = first_location_observation.sampling_time_start
|
|
# # roll_dataset.first_value = first_location_observation.value_quantity
|
|
# location_dataset.fk_first_observation_id = first_location_observation.id
|
|
|
|
# pg_session.commit()
|
|
|
|
# for loop sensors end
|
|
actualize_first_last_observations()
|
|
pg_session.close()
|
|
|
|
|
|
def create_observation(location_dataset: Dataset, data, pg_session: session):
|
|
''' create observation in db'''
|
|
# print("Sesnor key exist in JSON data")
|
|
transprojr = Transformer.from_crs(31254, 4326, always_xy=True)
|
|
x_1, y_1, z_1 = (float(data['Y']), float(data['X']), float(data['H']))
|
|
cord_x, cord_y = map(float, transprojr.transform(x_1, y_1))
|
|
print((cord_x, cord_y)) # (11.597409730065536, 47.27196543449542)
|
|
|
|
sensor: str = data['Punktnummer']
|
|
|
|
zeitstempel = data['Epoche']
|
|
date_obj = datetime.strptime(zeitstempel, '%d.%m.%Y').isoformat()
|
|
|
|
existing_observation: bool = (
|
|
pg_session.query(Observation)
|
|
.filter(Observation.result_time ==
|
|
date_obj, Observation.fk_dataset_id == location_dataset.id)
|
|
.one_or_none()
|
|
)
|
|
|
|
# Can we insert this observation?
|
|
if existing_observation is None:
|
|
# insert bew observation
|
|
new_observation: Observation = Observation()
|
|
# new_observation.id = max_id
|
|
new_observation.sta_identifier = str(uuid.uuid4())
|
|
new_observation.result_time = date_obj
|
|
new_observation.sampling_time_start = new_observation.result_time
|
|
new_observation.sampling_time_end = new_observation.result_time
|
|
new_observation.value_type = "geometry"
|
|
new_observation.value_geometry = f'SRID=4326;POINTZ({cord_x} {cord_y} {z_1})'
|
|
new_observation.fk_dataset_id = location_dataset.id
|
|
pg_session.add(new_observation)
|
|
print(f"new observation with result time {new_observation.result_time} "
|
|
f"for drill hole {sensor} succesfully imported!")
|
|
return True
|
|
else:
|
|
print(f"observation with result time {existing_observation.result_time} "
|
|
f"for tachymeter {sensor} already exists!")
|
|
return False
|
|
|
|
|
|
def actualize_first_last_observations():
|
|
''' iterate throug all datasets of Voregelsberg project area
|
|
and actualize last and first corresponding observations'''
|
|
pg_session: session = create_pg_session()
|
|
platform_sta_identifier = "voegelsberg_tachymeter"
|
|
# sensor_platform = pg_session.query(Platform.id) \
|
|
# .filter(Platform.sta_identifier == platform_sta_identifier) \
|
|
# .first()
|
|
|
|
voegelsberg_datasets: List[Dataset] = []
|
|
voegelsberg_datasets = pg_session.query(Dataset) \
|
|
.join(Procedure) \
|
|
.join(Phenomenon) \
|
|
.join(Platform) \
|
|
.filter(Platform.sta_identifier == platform_sta_identifier).all()
|
|
|
|
for location_dataset in voegelsberg_datasets:
|
|
''' iterate throug all datasets of Voregelsberg project area
|
|
and actualize last and first corresponding observations'''
|
|
last_location_observation = pg_session.query(Observation) \
|
|
.filter(Observation.fk_dataset_id == location_dataset.id) \
|
|
.order_by(desc('sampling_time_start')) \
|
|
.first()
|
|
if last_location_observation is not None:
|
|
location_dataset.last_time = last_location_observation.sampling_time_start
|
|
# location_dataset.last_value = last_location_observation.value_quantity
|
|
location_dataset.fk_last_observation_id = last_location_observation.id
|
|
|
|
first_location_observation = pg_session.query(Observation) \
|
|
.filter(Observation.fk_dataset_id == location_dataset.id) \
|
|
.order_by(asc('sampling_time_start')) \
|
|
.first()
|
|
if first_location_observation is not None:
|
|
location_dataset.first_time = first_location_observation.sampling_time_start
|
|
# roll_dataset.first_value = first_location_observation.value_quantity
|
|
location_dataset.fk_first_observation_id = first_location_observation.id
|
|
|
|
pg_session.commit()
|
|
|
|
|
|
def get_xml(offering, procedure, foi, result_time, identifier):
|
|
''' """
|
|
Prepares the body of a InsertSensor request for JSON biding.
|
|
:param offering: an instance of class Offering.Type object.
|
|
:param Procedure: instance of class Procedure. type object.
|
|
:param foi: feature of interest. Instance of FoI
|
|
:param sensor_type: SensorType object
|
|
:return: valid body for an InsertSensor request.
|
|
"""'''
|
|
offering_name = offering.name
|
|
# offering_label = offering.label
|
|
|
|
# procedure_name = procedure.name
|
|
procedure_identifier = procedure.id
|
|
|
|
# featureName = featureID = cordX = cordY = height = h_unit = z_unit = coordinates = ""
|
|
if foi is not None: # check if feature of interest should be declare
|
|
# feature_id = 'https://geomon.geologie.ac.at/52n-sos-webapp/api/features/' + \
|
|
# str(foi.fid) # URL format
|
|
cord_x = str(foi.x) # longitude degrees, float
|
|
cord_y = str(foi.y) # latitude degrees, float
|
|
cord_z = str(foi.z)
|
|
coordinates = cord_x + " " + cord_y + " " + cord_z
|
|
feature_id = foi.fid # "feature location"
|
|
feature_name = foi.name # "feature location"
|
|
else:
|
|
pass
|
|
xml = f'<?xml version="1.0" encoding="UTF-8"?><env:Envelope xmlns:env="http://www.w3.org/2003/05/soap-envelope" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.w3.org/2003/05/soap-envelope http://www.w3.org/2003/05/soap-envelope/soap-envelope.xsd"><env:Body><sos:InsertObservation service="SOS" version="2.0.0" xmlns:sos="http://www.opengis.net/sos/2.0" xmlns:swes="http://www.opengis.net/swes/2.0" xmlns:swe="http://www.opengis.net/swe/2.0" xmlns:sml="http://www.opengis.net/sensorML/1.0.1" xmlns:gml="http://www.opengis.net/gml/3.2" xmlns:xlink="http://www.w3.org/1999/xlink" xmlns:om="http://www.opengis.net/om/2.0" xmlns:sams="http://www.opengis.net/samplingSpatial/2.0" xmlns:sf="http://www.opengis.net/sampling/2.0" xmlns:xs="http://www.w3.org/2001/XMLSchema" xsi:schemaLocation="http://www.opengis.net/sos/2.0 http://schemas.opengis.net/sos/2.0/sos.xsd http://www.opengis.net/samplingSpatial/2.0 http://schemas.opengis.net/samplingSpatial/2.0/spatialSamplingFeature.xsd"><!-- multiple offerings are possible --><sos:offering>{offering_name}</sos:offering><sos:observation><om:OM_Observation gml:id="{identifier}"><om:type xlink:href="http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_GeometryObservation"/><om:phenomenonTime><gml:TimeInstant gml:id="phenomenonTime"><gml:timePosition>{result_time}</gml:timePosition></gml:TimeInstant></om:phenomenonTime><om:resultTime xlink:href="#phenomenonTime"/><om:procedure xlink:href="{procedure_identifier}"/><om:parameter><om:NamedValue><om:name xlink:href="http://www.opengis.net/def/param-name/OGC-OM/2.0/samplingGeometry"/><om:value xsi:type="gml:GeometryPropertyType"><gml:Point gml:id="SamplingPoint1"><gml:description>description</gml:description><gml:identifier codeSpace="">hereIdentifier</gml:identifier><gml:name>hereIam</gml:name><gml:pos srsName="http://www.opengis.net/def/crs/EPSG/0/4326">{coordinates}</gml:pos></gml:Point></om:value></om:NamedValue></om:parameter><om:observedProperty xlink:href="TachymeterLocation"/><om:featureOfInterest><sams:SF_SpatialSamplingFeature gml:id="ssf_instance"><gml:identifier codeSpace="">{feature_id}</gml:identifier><gml:name>{feature_name}</gml:name><sf:type xlink:href="http://www.opengis.net/def/samplingFeatureType/OGC-OM/2.0/SF_SamplingPoint"/><sf:sampledFeature xlink:href="http://www.opengis.net/def/nil/OGC/0/unknown"/><sams:shape><ns:Point xmlns:ns="http://www.opengis.net/gml/3.2" ns:id="Point_ssf_b3a826dd44012201b013c90c51da28c041f7a92e0cc47260eb9888f6a4e9f747"><ns:pos srsName="http://www.opengis.net/def/crs/EPSG/0/4326">11.597409730065536 47.27196543449542</ns:pos></ns:Point></sams:shape></sams:SF_SpatialSamplingFeature></om:featureOfInterest><om:result xsi:type="gml:GeometryPropertyType"><gml:Point gml:id="value"><gml:pos srsName="http://www.opengis.net/def/crs/EPSG/0/4326">{coordinates}</gml:pos></gml:Point></om:result></om:OM_Observation></sos:observation></sos:InsertObservation></env:Body></env:Envelope>'
|
|
return xml
|
|
|
|
|
|
if __name__ == '__main__':
|
|
actualize_first_last_observations()
|