''' Sqlalchemy version: 1.2.15 Python version: 3.7 ''' import os import uuid from datetime import datetime from sqlalchemy.orm import session from sqlalchemy import asc, desc from exif import Image from db.models import ( create_pg_session, Observation, Dataset, Procedure, Phenomenon, Platform, Format) def main(): ''' main method ''' pg_session: session = create_pg_session() platform_sta_identifier = "pechgraben_images" sensor = "camera2" pg_query = pg_session.query(Dataset) \ .join(Procedure) \ .join(Phenomenon) \ .filter(Procedure.sta_identifier == sensor.lower()) visual_perception_dataset: Dataset = pg_query.filter( Phenomenon.sta_identifier == "HumanVisualPerception").first() if not visual_perception_dataset: print("Sensor " + sensor + " ist noch nicht angelegt!") exit() if not visual_perception_dataset.is_published: visual_perception_dataset.is_published = 1 visual_perception_dataset.is_hidden = 0 visual_perception_dataset.dataset_type = "timeseries" visual_perception_dataset.observation_type = "simple" visual_perception_dataset.value_type = "text" pg_session.commit() platform_exists: bool = pg_session.query(Platform.id).filter_by( sta_identifier=platform_sta_identifier).scalar() is not None if platform_exists: sensor_platform = pg_session.query(Platform.id) \ .filter(Platform.sta_identifier == platform_sta_identifier) \ .first() visual_perception_dataset.fk_platform_id = sensor_platform.id format_exists: bool = pg_session.query(Format.id).filter_by( definition="http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_TextObservation" ).scalar() is not None if format_exists: sensor_format = pg_session.query(Format.id) \ .filter(Format.definition == "http://www.opengis.net/def/observationType/OGC-OM/2.0/OM_TextObservation") \ .first() visual_perception_dataset.fk_format_id = sensor_format.id # import all the images for the given sensor names import_images(visual_perception_dataset, pg_session) # save first and last values of all the observations first_observation: Observation = pg_session.query(Observation) \ .filter(Observation.fk_dataset_id == visual_perception_dataset.id) \ .order_by(asc('sampling_time_start')) \ .first() if first_observation is not None: visual_perception_dataset.first_time = first_observation.sampling_time_start # visual_perception_dataset.first_value = first_observation.value_quantity visual_perception_dataset.fk_first_observation_id = first_observation.id last_observation: Observation = pg_session.query(Observation) \ .filter(Observation.fk_dataset_id == visual_perception_dataset.id) \ .order_by(desc('sampling_time_start')) \ .first() if last_observation is not None: visual_perception_dataset.last_time = last_observation.sampling_time_start # visual_perception_dataset.last_value = last_observation.value_quantity visual_perception_dataset.fk_last_observation_id = last_observation.id pg_session.commit() pg_session.close() def import_images(dataset: Dataset, pg_session): ''' main method ''' folder_path = 'C:/Users/kaiarn/Documents/Fotos' # img_filename = '_DSC9548.JPG' # img_path = f'{folder_path}/{img_filename}' # Get the list of image files in the directory that exifread supports directory = os.listdir(folder_path) for file_name in directory: if file_name.endswith(('jpg', 'JPG', 'png', 'PNG', 'tiff', 'TIFF')): file_path = os.path.join(folder_path, file_name) # print(file_path) img_file = open(file_path, 'rb') img: Image = Image(img_file) # if img.has_exif: # info = f" has the EXIF {img.exif_version}" # else: # info = "does not contain any EXIF information" # print(f"Image {img_file.name}: {info}") # Original datetime that image was taken (photographed) # print(f'DateTime (Original): {img.get("datetime_original")}') datetime_original = img.get("datetime_original") # Grab the date date_obj = datetime.strptime( datetime_original, '%Y:%m:%d %H:%M:%S') # print(date_obj) create_observation(dataset, date_obj, file_name) pg_session.commit() def create_observation(dataset: Dataset, datetime_original, file_name): """ This function creates a new observation in the people structure based on the passed-in observation data :param observation: person to create in people structure :return: 201 on success, observation on person exists """ # deserialize to python object new_observation: Observation = Observation() # new_observation.id = max_id new_observation.sta_identifier = str(uuid.uuid4()) new_observation.result_time = datetime_original new_observation.sampling_time_start = new_observation.result_time new_observation.sampling_time_end = new_observation.result_time new_observation.value_type = "text" new_observation.value_text = "https://geomon.geologie.ac.at/images/" + file_name new_observation.fk_dataset_id = dataset.id # Add the person to the database dataset.observations.append(new_observation) # db_session.commit() if __name__ == "__main__": # load_dotenv(find_dotenv()) # print('sensors: {}'.format(os.environ.get( # 'GLASFASER_GSCHLIEFGRABEN_SENSORS', []))) main() # print(img.list_all()) # print(img.has_exif) # # Make of device which captured image: NIKON CORPORATION # print(f'Make: {img.get("make")}') # # Model of device: NIKON D7000 # print(f'Model: {img.get("model")}') # # Software involved in uploading and digitizing image: Ver.1.04 # print(f'Software: {img.get("software")}') # # Name of photographer who took the image: not defined # print(f'Artist: {img.get("artist")}') # # Original datetime that image was taken (photographed) # print(f'DateTime (Original): {img.get("datetime_original")}') # # Details of flash function # print(f'Flash Details: {img.get("flash")}') # print(f"Coordinates - Image") # print("---------------------") # print(f"Latitude: {img.copyright} {img.get('gps_latitude_ref')}") # print(f"Longitude: {img.get('gps_longitude')} {img.get('gps_longitude_ref')}\n")