moved from josepy to pyjwt

This commit is contained in:
Oscar Krause
2025-05-15 09:24:35 +02:00
parent 26a5bdb320
commit f58e54cf28
8 changed files with 28 additions and 30 deletions

View File

@@ -15,7 +15,7 @@ from dotenv import load_dotenv
from fastapi import FastAPI
from fastapi.requests import Request
from fastapi.responses import Response, RedirectResponse, StreamingResponse
from josepy import jws, jwk, RS256
import jwt
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from starlette.middleware.cors import CORSMiddleware
@@ -62,8 +62,8 @@ my_si_certificate = Cert.from_file(ca_setup.si_certificate_filename)
my_si_private_key = PrivateKey.from_file(ca_setup.si_private_key_filename)
my_si_public_key = my_si_private_key.public_key()
jwt_encode_key = jwk.JWK.load(my_si_private_key.pem())
jwt_decode_key = jwk.JWK.load(my_si_private_key.public_key().pem())
jwt_encode_key = my_si_private_key.pem() # todo: replace directly in code
jwt_decode_key = my_si_private_key.public_key().pem() # todo: replace directly in code
# Logging
LOG_LEVEL = logging.DEBUG if DEBUG else logging.INFO
@@ -115,9 +115,7 @@ def __get_token(request: Request) -> dict:
token = authorization_header.split(' ')[1]
# return jwt.decode(token=token, key=jwt_decode_key, algorithms=ALGORITHMS.RS256, options={'verify_aud': False})
_ = jws.Signature()
_.verify(payload=token.encode('utf-8'), key=jwt_decode_key)
return _.to_partial_json()
return jwt.decode(jwt=token, key=jwt_decode_key, algorithms=['RS256'], options={'verify_aud': False})
# Endpoints
@@ -299,8 +297,7 @@ async def _client_token():
}
# content = jws.sign(payload, key=jwt_encode_key, headers=None, algorithm=ALGORITHMS.RS256)
payload = json_dumps(payload).encode('utf-8')
content = jws.Signature.sign(payload=payload, key=jwt_encode_key, alg=RS256, include_jwk=False)
content = jwt.encode(payload=payload, key=jwt_encode_key, headers=None, algorithm='RS256')
# response = StreamingResponse(iter([content]), media_type="text/plain")
response = StreamingResponse(iter(content), media_type="text/plain")
@@ -393,8 +390,7 @@ async def auth_v1_code(request: Request):
}
# auth_code = jws.sign(payload, key=jwt_encode_key, headers={'kid': payload.get('kid')}, algorithm=ALGORITHMS.RS256)
payload = json_dumps(payload).encode('utf-8')
auth_code = jws.Signature.sign(payload=payload, key=jwt_encode_key, alg=RS256, include_jwk=True)
auth_code = jwt.encode(payload=payload, key=jwt_encode_key, headers={'kid': payload.get('kid')}, algorithm='RS256')
response = {
"auth_code": auth_code,
@@ -412,7 +408,8 @@ async def auth_v1_token(request: Request):
j, cur_time = json_loads((await request.body()).decode('utf-8')), datetime.now(UTC)
try:
payload = jwt.decode(token=j.get('auth_code'), key=jwt_decode_key, algorithms=ALGORITHMS.RS256)
#payload = jwt.decode(token=j.get('auth_code'), key=jwt_decode_key, algorithms=ALGORITHMS.RS256)
payload = jwt.decode(jwt=j.get('auth_code'), key=jwt_decode_key, algorithms=['RS256'])
except JWTError as e:
response = {'status': 400, 'title': 'invalid token', 'detail': str(e)}
return Response(content=json_dumps(response), media_type='application/json', status_code=400)
@@ -478,8 +475,9 @@ async def leasing_v1_config_token(request: Request):
},
}
my_jwt_encode_key = jwk.construct(my_si_private_key.pem().decode('utf-8'), algorithm=ALGORITHMS.RS256)
config_token = jws.sign(payload, key=my_jwt_encode_key, headers=None, algorithm=ALGORITHMS.RS256)
# my_jwt_encode_key = jwk.construct(my_si_private_key.pem().decode('utf-8'), algorithm=ALGORITHMS.RS256)
# config_token = jws.sign(payload, key=my_jwt_encode_key, headers=None, algorithm=ALGORITHMS.RS256)
config_token = jwt.encode(payload=payload, key=my_si_private_key.pem(), headers=None, algorithm='RS256')
response_ca_chain = my_ca_certificate.pem().decode('utf-8').strip()