- api data queries

This commit is contained in:
Arno Kaimbacher 2022-02-28 17:25:48 +01:00
parent a7b80c4c8d
commit 941d2006a1
4 changed files with 136 additions and 101 deletions

View File

@ -10,11 +10,12 @@ import os
# currentdir = os.path.dirname(os.path.abspath(inspect.getfile(inspect.currentframe())))
# parentdir = os.path.dirname(currentdir)
# sys.path.insert(0, parentdir)
# import requests
from sqlalchemy.orm import session
from models import ObservationSchema, Person, PersonSchema, Observation
from gschliefgraben_glasfaser.models import ObservationSchema, Person, PersonSchema, Observation
from gschliefgraben_glasfaser.my_api import MyApi
from db.pg_models import create_pg_session
from myApi import myAPI
import requests
#from models import Person, PersonSchema
# response = requests.get('https://api.com/')
# print(response) # shows the response's HTTP status code
@ -28,18 +29,18 @@ def main():
print(db_user)
pg_session: session = create_pg_session()
pg_person: Person = pg_session.query(Person).first()
#observation: Observation = pg_session.query(Observation).first()
print(pg_person)
# pg_person: Person = pg_session.query(Person).first()
observation: Observation = pg_session.query(Observation).first()
# print(pg_person)
# serialize db data to json
person_schema = PersonSchema()
dump_data = person_schema.dump(pg_person)
print(dump_data)
# serialize db data to json
# observation_schema = ObservationSchema()
# dump_data = observation_schema.dump(observation)
# person_schema = PersonSchema()
# dump_data = person_schema.dump(pg_person)
# print(dump_data)
# serialize db data to json
observation_schema = ObservationSchema()
dump_data = observation_schema.dump(observation)
print(dump_data)
# # deserialize
# load_data: Person = person_schema.load(dump_data)
@ -57,7 +58,9 @@ def main():
# data='grant_type=client_credentials&scope=gschliefgraben')
# print(response)
token_api = os.environ.get("TOKEN_API")
test = myAPI(token_api)
test_api = MyApi(token_api)
data = test_api.getSensorData("inclino1_14")
print(data)
# create(dump_data)

View File

@ -1,87 +0,0 @@
'''
Python version: 3.7
https://www.mybluelinux.com/python-how-to-refresh-an-jwt-access-token-using-decorators/
https://stackoverflow.com/questions/37094419/python-requests-retry-request-after-re-authentication
'''
import requests
import time, os
class myAPI():
host = None
key = None
secret = None
access_token = None
access_token_expiration = None
def __init__(self, host):
# the function that is executed when
# an instance of the class is created
self.host = host
# self.key = key
# self.secret = secret
try:
self._session = requests.Session() # Session for tokens
self.access_token = self.getAccessToken()
if self.access_token is None:
raise Exception("Request for access token failed.")
self.session = requests.Session() # Authenticated session
self.session.auth = self.auth
self.session.hooks['response'].append(self.reauth)
except Exception as e:
print(e)
else:
self.access_token_expiration = time.time() + 3600
def reauth(self, res, *args, **kwargs):
"""Hook to re-authenticate whenever authentication expires."""
if res.status_code == requests.codes.unauthorized:
if res.request.headers.get('REATTEMPT'):
res.raise_for_status()
self.refresh_auth()
req = res.request
req.headers['REATTEMPT'] = 1
req = self.session.auth(req)
res = self.session.send(req)
return res
def refresh_auth(self):
"""Use the refresh token to get a new access token."""
res = self._session.post(self.abs_url("api/token/refresh/"), data={"refresh": self.refresh})
if res.status_code == 200:
self.access_token = res.json()['access_token']
else:
# Token expired -> re-authenticate
self.reauth()
def authenticate(self):
# the function that is
# used to request the JWT
try:
# build the JWT and store
# in the variable `token_body`
refresh_token = os.environ.get("REFRESH_TOKEN")
# request an access token
request = requests.post(self.host,
headers={
'Authorization': 'Basic ' + refresh_token,
'cache-control': 'no-cache',
'Content-Type': 'application/x-www-form-urlencoded',
'accept': 'application/json'
},
data='grant_type=client_credentials&scope=gschliefgraben')
# optional: raise exception for status code
request.raise_for_status()
except Exception as e:
print(e)
return None
else:
# assuming the response's structure is
# {"access_token": ""}
data = request.json()
return data['access_token']

View File

@ -0,0 +1,118 @@
'''
Python version: 3.7
https://www.mybluelinux.com/python-how-to-refresh-an-jwt-access-token-using-decorators/
https://stackoverflow.com/questions/37094419/python-requests-retry-request-after-re-authentication
'''
import string
import time
import os
import requests
class MyApi():
""" MyApi class """
host = None
key = None
secret = None
access_token = None
access_token_expiration = None
def __init__(self, host):
# the function that is executed when
# an instance of the class is created
self.host = host
# self.key = key
# self.secret = secret
try:
self._session = requests.Session() # Session for tokens
self.access_token = self.authenticate()
if self.access_token is None:
raise Exception("Request for access token failed.")
else:
self.session = requests.Session() # Authenticated session
self.session.headers["Authorization"] = "Bearer " + \
self.access_token
self.session.hooks['response'].append(self.reauth)
self.access_token_expiration = time.time() + 3600
except requests.exceptions.HTTPError as errh:
print("Http Error:", errh)
except requests.exceptions.ConnectionError as errc:
print("Error Connecting:", errc)
except requests.exceptions.Timeout as errt:
print("Timeout Error:", errt)
except requests.exceptions.RequestException as err:
print("OOps: Something Else", err)
def reauth(self, res, *args, **kwargs):
"""Hook to re-authenticate whenever authentication expires."""
if res.status_code == requests.codes['unauthorized']:
if res.request.headers.get('REATTEMPT'):
res.raise_for_status()
# self.refresh_auth()
req = res.request
req.headers['REATTEMPT'] = 1
# req = self.session.auth(req)
res = self.session.send(req)
return res
# def refresh_auth(self):
# """Use the refresh token to get a new access token."""
# res = self._session.post(self.abs_url(
# "api/token/refresh/"), data={"refresh": self.refresh})
# if res.status_code == 200:
# self.access_token = res.json()['access_token']
# else:
# # Token expired -> re-authenticate
def getSensorData(self, sensor: string):
''' request observations'''
try:
request = self.session.get('https://api.dgnss-sensors.com/gschliefgraben',
headers={
'cache-control': 'no-cache',
'Content-Type': 'application/x-www-form-urlencoded',
'accept': 'application/json'
},
data="grant_type=client_credentials&scope=gschliefgraben&sensors=('\"inclino1_14'\")"
)
# optional: raise exception for status code
request.raise_for_status()
except requests.exceptions.HTTPError as err:
print(err)
return None
else:
# assuming the response's structure is
# {"access_token": ""}
data = request.json()
return data
def authenticate(self):
''' the function that is
# used to request the JWT '''
try:
# build the JWT and store
# in the variable `token_body`
refresh_token = os.environ.get("REFRESH_TOKEN")
# request an access token
request = requests.post(self.host,
headers={
'Authorization': 'Basic ' + refresh_token,
'cache-control': 'no-cache',
'Content-Type': 'application/x-www-form-urlencoded',
'accept': 'application/json'
},
data='grant_type=client_credentials&scope=gschliefgraben')
# optional: raise exception for status code
request.raise_for_status()
except requests.exceptions.HTTPError as err:
print(err)
return None
else:
# assuming the response's structure is
# {"access_token": ""}
data = request.json()
access_token_expire = time.time() + int(data['expires_in'])
return data['access_token']

View File

@ -1,5 +1,6 @@
''' setup module for internal dependcies '''
from setuptools import setup, find_packages
setup(
name = 'geomon',
packages = find_packages(),
)
)