Skip to content

add LogtoBaseClient with only the server-side configuration #37

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 77 additions & 28 deletions logto/LogtoClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
The Logto client class and the related models.
"""

from dataclasses import dataclass, field
import time
import urllib.parse
from typing import Dict, List, Literal, Optional, Union
from typing import Dict, List, Literal, Optional, Self, Union

from pydantic import BaseModel

Expand Down Expand Up @@ -140,35 +141,83 @@ class AccessTokenMap(BaseModel):
"""


async def get_oidc_core(config: LogtoConfig) -> OidcCore:
"""
Get the OIDC core object. You can use it to get the provider metadata, verify
the ID token, fetch tokens by code or refresh token, etc.
"""
return OidcCore(
await OidcCore.getProviderMetadata(
f"{config.endpoint}/oidc/.well-known/openid-configuration"
)
)

@dataclass
class LogtoBaseClient:
"""
The class for the Logto client with only the server-side configuration, no user-side storage.

```python
config = LogtoConfig(
endpoint="https://logto.app",
appId="your-app-id",
appSecret="your-app-secret",
)
client = await LogtoBaseClient.new(config)

class SessionStorage(Storage):
# Implement the Storage interface to store the user-side data
# Which can read and write the data from the request session (cookies or somewhere else)
...

async def login(request: Request):
session_storage = SessionStorage(request.session)
user_client = await client.get_user_client(session_storage)
user_client.signIn()
...

async def protected_route(request: Request):
session_storage = SessionStorage(request.session)
user_client = await client.get_user_client(session_storage)
user_client.isAuthenticated()
...
```
"""
config: LogtoConfig
oidc_core: OidcCore

@classmethod
async def new(cls, config: LogtoConfig) -> Self:
oidc_core = await get_oidc_core(config)
return cls(config, oidc_core)

async def get_user_client(self, user_storage: Storage = MemoryStorage()) -> "LogtoClient":
return LogtoClient(self.config, user_storage, self.oidc_core)

@dataclass
class LogtoClient:
"""
The main class of the Logto client. You should create an instance of this class
and use it to sign in, sign out, get access token, etc.
"""

def __init__(self, config: LogtoConfig, storage: Storage = MemoryStorage()) -> None:
self.config = config
self._oidcCore: Optional[OidcCore] = None
self._storage = storage
config: LogtoConfig
storage: Storage = field(default_factory=MemoryStorage)
oidc_core: Optional[OidcCore] = field(default=None)

async def getOidcCore(self) -> OidcCore:
"""
Get the OIDC core object. You can use it to get the provider metadata, verify
the ID token, fetch tokens by code or refresh token, etc.
"""
if self._oidcCore is None:
self._oidcCore = OidcCore(
await OidcCore.getProviderMetadata(
f"{self.config.endpoint}/oidc/.well-known/openid-configuration"
)
)
return self._oidcCore
if self.oidc_core is None:
self.oidc_core = await get_oidc_core(self.config)
return self.oidc_core

def _getAccessTokenMap(self) -> AccessTokenMap:
"""
Get the access token map from storage.
"""
accessTokenMap = self._storage.get("accessTokenMap")
accessTokenMap = self.storage.get("accessTokenMap")
try:
# Returns parsed `AccessTokenMap` if valid JSON, otherwise will be caught by except clause
return AccessTokenMap.model_validate_json(accessTokenMap) # type: ignore
Expand All @@ -186,7 +235,7 @@ def _setAccessToken(self, resource: str, accessToken: str, expiresIn: int) -> No
+ expiresIn
- 60, # 60 seconds earlier to avoid clock skew
)
self._storage.set("accessTokenMap", accessTokenMap.model_dump_json())
self.storage.set("accessTokenMap", accessTokenMap.model_dump_json())

def _getAccessToken(self, resource: str) -> Optional[str]:
"""
Expand All @@ -212,10 +261,10 @@ async def _handleTokenResponse(
(await self.getOidcCore()).verifyIdToken(
tokenResponse.id_token, self.config.appId
)
self._storage.set("idToken", tokenResponse.id_token)
self.storage.set("idToken", tokenResponse.id_token)

if tokenResponse.refresh_token is not None:
self._storage.set("refreshToken", tokenResponse.refresh_token)
self.storage.set("refreshToken", tokenResponse.refresh_token)

self._setAccessToken(
resource, tokenResponse.access_token, tokenResponse.expires_in
Expand Down Expand Up @@ -284,7 +333,7 @@ def _getSignInSession(self) -> Optional[SignInSession]:
Try to parse the current sign-in session from storage. If the value does not
exist or parse failed, return None.
"""
signInSession = self._storage.get("signInSession")
signInSession = self.storage.get("signInSession")
if signInSession is None:
return None
try:
Expand All @@ -293,12 +342,12 @@ def _getSignInSession(self) -> Optional[SignInSession]:
return None

def _setSignInSession(self, signInSession: SignInSession) -> None:
self._storage.set("signInSession", signInSession.model_dump_json())
self.storage.set("signInSession", signInSession.model_dump_json())

def _clearAllTokens(self) -> None:
self._storage.delete("idToken")
self._storage.delete("refreshToken")
self._storage.delete("accessTokenMap")
self.storage.delete("idToken")
self.storage.delete("refreshToken")
self.storage.delete("accessTokenMap")

async def signIn(
self,
Expand Down Expand Up @@ -433,7 +482,7 @@ async def handleSignInCallback(self, callbackUri: str) -> None:
)

await self._handleTokenResponse("", tokenResponse)
self._storage.delete("signInSession")
self.storage.delete("signInSession")

async def getAccessToken(self, resource: str = "") -> Optional[str]:
"""
Expand All @@ -453,7 +502,7 @@ async def getAccessToken(self, resource: str = "") -> Optional[str]:
"The `UserInfoScope.organizations` scope is required to fetch organization tokens"
)

refreshToken = self._storage.get("refreshToken")
refreshToken = self.storage.get("refreshToken")
if refreshToken is None:
return None

Expand Down Expand Up @@ -501,14 +550,14 @@ def getIdToken(self) -> Optional[str]:
Get the ID Token string. If you need to get the claims in the ID Token, use
`getIdTokenClaims` instead.
"""
return self._storage.get("idToken")
return self.storage.get("idToken")

def getIdTokenClaims(self) -> IdTokenClaims:
"""
Get the claims in the ID Token. If the ID Token does not exist, an exception
will be thrown.
"""
idToken = self._storage.get("idToken")
idToken = self.storage.get("idToken")
if idToken is None:
raise LogtoException("ID Token not found")

Expand All @@ -518,13 +567,13 @@ def getRefreshToken(self) -> Optional[str]:
"""
Get the refresh token string.
"""
return self._storage.get("refreshToken")
return self.storage.get("refreshToken")

def isAuthenticated(self) -> bool:
"""
Check if the user is authenticated by checking if the ID Token exists.
"""
return self._storage.get("idToken") is not None
return self.storage.get("idToken") is not None

async def fetchUserInfo(self) -> UserInfoResponse:
"""
Expand Down