geomon/pechgraben_images/import_image_observations.py

166 lines
6.4 KiB
Python
Raw Normal View History

'''
Sqlalchemy version: 1.2.15
Python version: 3.7
'''
import os
import uuid
from datetime import datetime
from sqlalchemy.orm import session
from sqlalchemy import asc, desc
from exif import Image
from db.models import (
create_pg_session, Observation,
Dataset, Procedure, Phenomenon, Platform, Format)
def main():
''' main method '''
pg_session: session = create_pg_session()
platform_sta_identifier = "pechgraben_images"
sensor = "camera2"
pg_query = pg_session.query(Dataset) \
.join(Procedure) \
.join(Phenomenon) \
.filter(Procedure.sta_identifier == sensor.lower())
visual_perception_dataset: Dataset = pg_query.filter(
Phenomenon.sta_identifier == "HumanVisualPerception").first()
if not visual_perception_dataset:
print("Sensor " + sensor + " ist noch nicht angelegt!")
exit()
if not visual_perception_dataset.is_published:
visual_perception_dataset.is_published = 1
visual_perception_dataset.is_hidden = 0
visual_perception_dataset.dataset_type = "timeseries"
visual_perception_dataset.observation_type = "simple"
visual_perception_dataset.value_type = "text"
pg_session.commit()
platform_exists: bool = pg_session.query(Platform.id).filter_by(
sta_identifier=platform_sta_identifier).scalar() is not None
if platform_exists:
sensor_platform = pg_session.query(Platform.id) \
.filter(Platform.sta_identifier == platform_sta_identifier) \
.first()
visual_perception_dataset.fk_platform_id = sensor_platform.id
format_exists: bool = pg_session.query(Format.id).filter_by(
definition="http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_TextObservation"
).scalar() is not None
if format_exists:
sensor_format = pg_session.query(Format.id) \
.filter(Format.definition == "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_TextObservation") \
.first()
visual_perception_dataset.fk_format_id = sensor_format.id
# import all the images for the given sensor names
import_images(visual_perception_dataset, pg_session)
# save first and last values of all the observations
first_observation: Observation = pg_session.query(Observation) \
.filter(Observation.fk_dataset_id == visual_perception_dataset.id) \
.order_by(asc('sampling_time_start')) \
.first()
if first_observation is not None:
visual_perception_dataset.first_time = first_observation.sampling_time_start
# visual_perception_dataset.first_value = first_observation.value_quantity
visual_perception_dataset.fk_first_observation_id = first_observation.id
last_observation: Observation = pg_session.query(Observation) \
.filter(Observation.fk_dataset_id == visual_perception_dataset.id) \
.order_by(desc('sampling_time_start')) \
.first()
if last_observation is not None:
visual_perception_dataset.last_time = last_observation.sampling_time_start
# visual_perception_dataset.last_value = last_observation.value_quantity
visual_perception_dataset.fk_last_observation_id = last_observation.id
pg_session.commit()
pg_session.close()
def import_images(dataset: Dataset, pg_session):
''' main method '''
folder_path = 'C:/Users/kaiarn/Documents/Fotos'
# img_filename = '_DSC9548.JPG'
# img_path = f'{folder_path}/{img_filename}'
# Get the list of image files in the directory that exifread supports
directory = os.listdir(folder_path)
for file_name in directory:
if file_name.endswith(('jpg', 'JPG', 'png', 'PNG', 'tiff', 'TIFF')):
file_path = os.path.join(folder_path, file_name)
# print(file_path)
img_file = open(file_path, 'rb')
img: Image = Image(img_file)
if img.has_exif:
info = f" has the EXIF {img.exif_version}"
else:
info = "does not contain any EXIF information"
# print(f"Image {img_file.name}: {info}")
# Original datetime that image was taken (photographed)
# print(f'DateTime (Original): {img.get("datetime_original")}')
datetime_original = img.get("datetime_original")
# Grab the date
date_obj = datetime.strptime(
datetime_original, '%Y:%m:%d %H:%M:%S')
# print(date_obj)
create_observation(dataset, date_obj, file_name)
pg_session.commit()
def create_observation(dataset: Dataset, datetime_original, file_name):
"""
This function creates a new observation in the people structure
based on the passed-in observation data
:param observation: person to create in people structure
:return: 201 on success, observation on person exists
"""
# deserialize to python object
new_observation: Observation = Observation()
# new_observation.id = max_id
new_observation.sta_identifier = str(uuid.uuid4())
new_observation.result_time = datetime_original
new_observation.sampling_time_start = new_observation.result_time
new_observation.sampling_time_end = new_observation.result_time
new_observation.value_type = "text"
new_observation.value_text = "https://geomon.geologie.ac.at/images/" + file_name
new_observation.fk_dataset_id = dataset.id
# Add the person to the database
dataset.observations.append(new_observation)
# db_session.commit()
if __name__ == "__main__":
# load_dotenv(find_dotenv())
# print('sensors: {}'.format(os.environ.get(
# 'GLASFASER_GSCHLIEFGRABEN_SENSORS', [])))
main()
# print(img.list_all())
# print(img.has_exif)
# # Make of device which captured image: NIKON CORPORATION
# print(f'Make: {img.get("make")}')
# # Model of device: NIKON D7000
# print(f'Model: {img.get("model")}')
# # Software involved in uploading and digitizing image: Ver.1.04
# print(f'Software: {img.get("software")}')
# # Name of photographer who took the image: not defined
# print(f'Artist: {img.get("artist")}')
# # Original datetime that image was taken (photographed)
# print(f'DateTime (Original): {img.get("datetime_original")}')
# # Details of flash function
# print(f'Flash Details: {img.get("flash")}')
# print(f"Coordinates - Image")
# print("---------------------")
# print(f"Latitude: {img.copyright} {img.get('gps_latitude_ref')}")
# print(f"Longitude: {img.get('gps_longitude')} {img.get('gps_longitude_ref')}\n")