geomon/gschliefgraben_glasfaser/my_api.py

119 lines
4.7 KiB
Python
Raw Permalink Normal View History

2022-02-28 16:25:48 +00:00
'''
Python version: 3.7
https://www.mybluelinux.com/python-how-to-refresh-an-jwt-access-token-using-decorators/
https://stackoverflow.com/questions/37094419/python-requests-retry-request-after-re-authentication
'''
import string
import time
import os
import requests
class MyApi():
""" MyApi class """
host = None
key = None
secret = None
access_token = None
access_token_expiration = None
def __init__(self, host):
# the function that is executed when
# an instance of the class is created
self.host = host
# self.key = key
# self.secret = secret
try:
self._session = requests.Session() # Session for tokens
self.access_token = self.authenticate()
if self.access_token is None:
raise Exception("Request for access token failed.")
else:
self.session = requests.Session() # Authenticated session
self.session.headers["Authorization"] = "Bearer " + \
self.access_token
self.session.hooks['response'].append(self.reauth)
self.access_token_expiration = time.time() + 3600
except requests.exceptions.HTTPError as errh:
print("Http Error:", errh)
except requests.exceptions.ConnectionError as errc:
print("Error Connecting:", errc)
except requests.exceptions.Timeout as errt:
print("Timeout Error:", errt)
except requests.exceptions.RequestException as err:
print("OOps: Something Else", err)
def reauth(self, res, *args, **kwargs):
"""Hook to re-authenticate whenever authentication expires."""
if res.status_code == requests.codes['unauthorized']:
if res.request.headers.get('REATTEMPT'):
res.raise_for_status()
# self.refresh_auth()
req = res.request
req.headers['REATTEMPT'] = 1
# req = self.session.auth(req)
res = self.session.send(req)
return res
# def refresh_auth(self):
# """Use the refresh token to get a new access token."""
# res = self._session.post(self.abs_url(
# "api/token/refresh/"), data={"refresh": self.refresh})
# if res.status_code == 200:
# self.access_token = res.json()['access_token']
# else:
# # Token expired -> re-authenticate
def getSensorData(self, sensor: string, date_start, date_end):
2022-02-28 16:25:48 +00:00
''' request observations'''
try:
request = self.session.get('https://api.dgnss-sensors.com/gschliefgraben?sensors=(\''+sensor+ '\')&start=' + date_start+ '&end=' + date_end,
2022-02-28 16:25:48 +00:00
headers={
'cache-control': 'no-cache',
'Content-Type': 'application/x-www-form-urlencoded',
'accept': 'application/json'
},
data="grant_type=client_credentials&scope=gschliefgraben"
2022-02-28 16:25:48 +00:00
)
# optional: raise exception for status code
request.raise_for_status()
except requests.exceptions.HTTPError as err:
print(err)
return None
else:
# assuming the response's structure is
# {"access_token": ""}
data = request.json()
return data
def authenticate(self):
''' the function that is
# used to request the JWT '''
try:
# build the JWT and store
# in the variable `token_body`
refresh_token = os.environ.get("REFRESH_TOKEN")
# request an access token
request = requests.post(self.host,
headers={
'Authorization': 'Basic ' + refresh_token,
'cache-control': 'no-cache',
'Content-Type': 'application/x-www-form-urlencoded',
'accept': 'application/json'
},
data='grant_type=client_credentials&scope=gschliefgraben')
# optional: raise exception for status code
request.raise_for_status()
except requests.exceptions.HTTPError as err:
print(err)
return None
else:
# assuming the response's structure is
# {"access_token": ""}
data = request.json()
access_token_expire = time.time() + int(data['expires_in'])
return data['access_token']