''' Python version: 3.7 https://www.mybluelinux.com/python-how-to-refresh-an-jwt-access-token-using-decorators/ https://stackoverflow.com/questions/37094419/python-requests-retry-request-after-re-authentication ''' import string import time import os import requests class MyApi(): """ MyApi class """ host = None key = None secret = None access_token = None access_token_expiration = None def __init__(self, host): # the function that is executed when # an instance of the class is created self.host = host # self.key = key # self.secret = secret try: self._session = requests.Session() # Session for tokens self.access_token = self.authenticate() if self.access_token is None: raise Exception("Request for access token failed.") else: self.session = requests.Session() # Authenticated session self.session.headers["Authorization"] = "Bearer " + \ self.access_token self.session.hooks['response'].append(self.reauth) self.access_token_expiration = time.time() + 3600 except requests.exceptions.HTTPError as errh: print("Http Error:", errh) except requests.exceptions.ConnectionError as errc: print("Error Connecting:", errc) except requests.exceptions.Timeout as errt: print("Timeout Error:", errt) except requests.exceptions.RequestException as err: print("OOps: Something Else", err) def reauth(self, res, *args, **kwargs): """Hook to re-authenticate whenever authentication expires.""" if res.status_code == requests.codes['unauthorized']: if res.request.headers.get('REATTEMPT'): res.raise_for_status() # self.refresh_auth() req = res.request req.headers['REATTEMPT'] = 1 # req = self.session.auth(req) res = self.session.send(req) return res # def refresh_auth(self): # """Use the refresh token to get a new access token.""" # res = self._session.post(self.abs_url( # "api/token/refresh/"), data={"refresh": self.refresh}) # if res.status_code == 200: # self.access_token = res.json()['access_token'] # else: # # Token expired -> re-authenticate def getSensorData(self, sensor: string, date_start, date_end): ''' request observations''' try: request = self.session.get('https://api.dgnss-sensors.com/gschliefgraben?sensors=(\''+sensor+ '\')&start=' + date_start+ '&end=' + date_end, headers={ 'cache-control': 'no-cache', 'Content-Type': 'application/x-www-form-urlencoded', 'accept': 'application/json' }, data="grant_type=client_credentials&scope=gschliefgraben" ) # optional: raise exception for status code request.raise_for_status() except requests.exceptions.HTTPError as err: print(err) return None else: # assuming the response's structure is # {"access_token": ""} data = request.json() return data def authenticate(self): ''' the function that is # used to request the JWT ''' try: # build the JWT and store # in the variable `token_body` refresh_token = os.environ.get("REFRESH_TOKEN") # request an access token request = requests.post(self.host, headers={ 'Authorization': 'Basic ' + refresh_token, 'cache-control': 'no-cache', 'Content-Type': 'application/x-www-form-urlencoded', 'accept': 'application/json' }, data='grant_type=client_credentials&scope=gschliefgraben') # optional: raise exception for status code request.raise_for_status() except requests.exceptions.HTTPError as err: print(err) return None else: # assuming the response's structure is # {"access_token": ""} data = request.json() access_token_expire = time.time() + int(data['expires_in']) return data['access_token']