119 lines
4.7 KiB
Python
119 lines
4.7 KiB
Python
'''
|
|
Python version: 3.7
|
|
https://www.mybluelinux.com/python-how-to-refresh-an-jwt-access-token-using-decorators/
|
|
https://stackoverflow.com/questions/37094419/python-requests-retry-request-after-re-authentication
|
|
'''
|
|
|
|
import string
|
|
import time
|
|
import os
|
|
import requests
|
|
|
|
|
|
class MyApi():
|
|
""" MyApi class """
|
|
host = None
|
|
key = None
|
|
secret = None
|
|
access_token = None
|
|
access_token_expiration = None
|
|
|
|
def __init__(self, host):
|
|
# the function that is executed when
|
|
# an instance of the class is created
|
|
self.host = host
|
|
# self.key = key
|
|
# self.secret = secret
|
|
|
|
try:
|
|
self._session = requests.Session() # Session for tokens
|
|
self.access_token = self.authenticate()
|
|
if self.access_token is None:
|
|
raise Exception("Request for access token failed.")
|
|
else:
|
|
self.session = requests.Session() # Authenticated session
|
|
self.session.headers["Authorization"] = "Bearer " + \
|
|
self.access_token
|
|
self.session.hooks['response'].append(self.reauth)
|
|
self.access_token_expiration = time.time() + 3600
|
|
except requests.exceptions.HTTPError as errh:
|
|
print("Http Error:", errh)
|
|
except requests.exceptions.ConnectionError as errc:
|
|
print("Error Connecting:", errc)
|
|
except requests.exceptions.Timeout as errt:
|
|
print("Timeout Error:", errt)
|
|
except requests.exceptions.RequestException as err:
|
|
print("OOps: Something Else", err)
|
|
|
|
def reauth(self, res, *args, **kwargs):
|
|
"""Hook to re-authenticate whenever authentication expires."""
|
|
if res.status_code == requests.codes['unauthorized']:
|
|
if res.request.headers.get('REATTEMPT'):
|
|
res.raise_for_status()
|
|
# self.refresh_auth()
|
|
req = res.request
|
|
req.headers['REATTEMPT'] = 1
|
|
# req = self.session.auth(req)
|
|
res = self.session.send(req)
|
|
return res
|
|
|
|
# def refresh_auth(self):
|
|
# """Use the refresh token to get a new access token."""
|
|
# res = self._session.post(self.abs_url(
|
|
# "api/token/refresh/"), data={"refresh": self.refresh})
|
|
# if res.status_code == 200:
|
|
# self.access_token = res.json()['access_token']
|
|
# else:
|
|
# # Token expired -> re-authenticate
|
|
def getSensorData(self, sensor: string):
|
|
''' request observations'''
|
|
try:
|
|
request = self.session.get('https://api.dgnss-sensors.com/gschliefgraben?sensors=(\''+sensor+ '\')&start=2022-02-28&end=2022-02-28',
|
|
headers={
|
|
'cache-control': 'no-cache',
|
|
'Content-Type': 'application/x-www-form-urlencoded',
|
|
'accept': 'application/json'
|
|
},
|
|
data="grant_type=client_credentials&scope=gschliefgraben"
|
|
)
|
|
# optional: raise exception for status code
|
|
request.raise_for_status()
|
|
except requests.exceptions.HTTPError as err:
|
|
print(err)
|
|
return None
|
|
else:
|
|
# assuming the response's structure is
|
|
# {"access_token": ""}
|
|
data = request.json()
|
|
return data
|
|
|
|
def authenticate(self):
|
|
''' the function that is
|
|
# used to request the JWT '''
|
|
try:
|
|
# build the JWT and store
|
|
# in the variable `token_body`
|
|
refresh_token = os.environ.get("REFRESH_TOKEN")
|
|
|
|
# request an access token
|
|
request = requests.post(self.host,
|
|
headers={
|
|
'Authorization': 'Basic ' + refresh_token,
|
|
'cache-control': 'no-cache',
|
|
'Content-Type': 'application/x-www-form-urlencoded',
|
|
'accept': 'application/json'
|
|
},
|
|
data='grant_type=client_credentials&scope=gschliefgraben')
|
|
|
|
# optional: raise exception for status code
|
|
request.raise_for_status()
|
|
except requests.exceptions.HTTPError as err:
|
|
print(err)
|
|
return None
|
|
else:
|
|
# assuming the response's structure is
|
|
# {"access_token": ""}
|
|
data = request.json()
|
|
access_token_expire = time.time() + int(data['expires_in'])
|
|
return data['access_token']
|