- add api calss for geting access_token

This commit is contained in:
Arno Kaimbacher 2022-02-25 16:05:31 +01:00
parent 7dd739b04e
commit a7b80c4c8d
2 changed files with 115 additions and 12 deletions

View File

@ -13,6 +13,8 @@ import os
from sqlalchemy.orm import session
from models import ObservationSchema, Person, PersonSchema, Observation
from db.pg_models import create_pg_session
from myApi import myAPI
import requests
#from models import Person, PersonSchema
# response = requests.get('https://api.com/')
# print(response) # shows the response's HTTP status code
@ -26,23 +28,37 @@ def main():
print(db_user)
pg_session: session = create_pg_session()
# pg_person: Person = pg_session.query(Person).first()
observation: Observation = pg_session.query(Observation).first()
print (observation)
pg_person: Person = pg_session.query(Person).first()
#observation: Observation = pg_session.query(Observation).first()
print(pg_person)
# serialize db data to json
# person_schema = PersonSchema()
# dump_data = person_schema.dump(pg_person)
# print(dump_data)
# serialize db data to json
observation_schema = ObservationSchema()
dump_data = observation_schema.dump(observation)
person_schema = PersonSchema()
dump_data = person_schema.dump(pg_person)
print(dump_data)
# serialize db data to json
# observation_schema = ObservationSchema()
# dump_data = observation_schema.dump(observation)
# print(dump_data)
# # deserialize
# load_data: Person = person_schema.load(dump_data)
# print(load_data)
# request ortmann api
# token = 'eyJraWQiOiJlakFmX1MwMTBMU3doS0Zod05wZDQtQkZPYTM4cDRYRE1zU1hFa0lrRlhFIiwiYWxnIjoiUlMyNTYifQ.eyJ2ZXIiOjEsImp0aSI6IkFULkZRUHNCOWh5Snd6eEM5d3ZWelRvaTNpZVlMWlJiT3U4YzFCbWJWRGM1SFkiLCJpc3MiOiJodHRwczovL2Rldi01MjUwMDA2Lm9rdGEuY29tL29hdXRoMi9kZWZhdWx0IiwiYXVkIjoiYXBpOi8vZGVmYXVsdCIsImlhdCI6MTY0NTc4Mjg0NSwiZXhwIjoxNjQ1Nzg2NDQ1LCJjaWQiOiIwb2EyOWhzdGZ3RnFya1BrUDVkNyIsInNjcCI6WyJnc2NobGllZmdyYWJlbiJdLCJzdWIiOiIwb2EyOWhzdGZ3RnFya1BrUDVkNyJ9.c-pTs-3VJMnFO2SOqxOvsABAloprUmOjk6SO9J71NrgLj7claKZOMLZxRyUeSBLWCJFFNI3A6xMd4twEexjJdUR8UEM4U50srxr2p_enaMm1_jZTSt_76u6H05kwV-A2AOQPkx-Fxxaj_PDjT7w43Zlg6SUEoT11uGKR6KtxVYbclGtWgOR7wvH4NZav-P_EDjHwHxbk2kQSf7tBU1JbWl74Xt58gzv1t8VNtLYLICabRsuTNQUNiO7Y1rtUEav4ugf7WZMIY1cP_4rCupZrAFbxrnyprAuXA2x01Z9hbFmiaK0QDlrwHcCHL_1fKvj9uIbO5JeI1x81X6g7eAxQdA'
# response = requests.get('https://api.dgnss-sensors.com/gschliefgraben?sensors=("inclino1_14")',
# headers={
# 'Authorization': 'Bearer' + token,
# 'cache-control': 'no-cache',
# 'Content-Type': 'application/x-www-form-urlencoded',
# 'accept': 'application/json'
# },
# data='grant_type=client_credentials&scope=gschliefgraben')
# print(response)
token_api = os.environ.get("TOKEN_API")
test = myAPI(token_api)
# create(dump_data)
@ -61,15 +77,15 @@ def create(person_json: PersonSchema):
# existing_person = Person.query \
# .filter(Person.login == login) \
# .one_or_none()
existing_person: bool = ( \
db_session.query(Person) \
existing_person: bool = (
db_session.query(Person)
.filter(Person.login == login)
.one_or_none()
)
# Can we insert this person?
if existing_person is None:
# Create a person instance using the schema and the passed in person
# Create a person instance using the schema and the passed in person
schema = PersonSchema()
# deserialize to object
new_person: Person = schema.load(person_json)

View File

@ -0,0 +1,87 @@
'''
Python version: 3.7
https://www.mybluelinux.com/python-how-to-refresh-an-jwt-access-token-using-decorators/
https://stackoverflow.com/questions/37094419/python-requests-retry-request-after-re-authentication
'''
import requests
import time, os
class myAPI():
host = None
key = None
secret = None
access_token = None
access_token_expiration = None
def __init__(self, host):
# the function that is executed when
# an instance of the class is created
self.host = host
# self.key = key
# self.secret = secret
try:
self._session = requests.Session() # Session for tokens
self.access_token = self.getAccessToken()
if self.access_token is None:
raise Exception("Request for access token failed.")
self.session = requests.Session() # Authenticated session
self.session.auth = self.auth
self.session.hooks['response'].append(self.reauth)
except Exception as e:
print(e)
else:
self.access_token_expiration = time.time() + 3600
def reauth(self, res, *args, **kwargs):
"""Hook to re-authenticate whenever authentication expires."""
if res.status_code == requests.codes.unauthorized:
if res.request.headers.get('REATTEMPT'):
res.raise_for_status()
self.refresh_auth()
req = res.request
req.headers['REATTEMPT'] = 1
req = self.session.auth(req)
res = self.session.send(req)
return res
def refresh_auth(self):
"""Use the refresh token to get a new access token."""
res = self._session.post(self.abs_url("api/token/refresh/"), data={"refresh": self.refresh})
if res.status_code == 200:
self.access_token = res.json()['access_token']
else:
# Token expired -> re-authenticate
self.reauth()
def authenticate(self):
# the function that is
# used to request the JWT
try:
# build the JWT and store
# in the variable `token_body`
refresh_token = os.environ.get("REFRESH_TOKEN")
# request an access token
request = requests.post(self.host,
headers={
'Authorization': 'Basic ' + refresh_token,
'cache-control': 'no-cache',
'Content-Type': 'application/x-www-form-urlencoded',
'accept': 'application/json'
},
data='grant_type=client_credentials&scope=gschliefgraben')
# optional: raise exception for status code
request.raise_for_status()
except Exception as e:
print(e)
return None
else:
# assuming the response's structure is
# {"access_token": ""}
data = request.json()
return data['access_token']