2# +==== BEGIN CatFeeder =================+
5# ...............)..(.')
7# ...............\(__)|
8# Inspired by Joan Stark
9# source https://www.asciiart.eu/
14# CREATION DATE: 10-01-2026
15# LAST Modified: 22:18:0 11-01-2026
17# This is the backend server in charge of making the actual website work.
19# COPYRIGHT: (c) Cat Feeder
20# PURPOSE: File in charge of providing endpoints to allow the front-end to gather info on the tokens.
22# +==== END CatFeeder =================+
26from typing
import Optional, Dict, Union, Any, List, TYPE_CHECKING
27from dataclasses
import dataclass
28from datetime
import datetime, timedelta
29from display_tty
import Disp, initialise_logger
30from fastapi
import Request, Response
31from ...http_codes
import HCI, HttpDataTypes
32from ...core
import RuntimeManager, RI
33from ...utils
import CONST
34from ..endpoint_helpers
import datetime_to_string
37 from ...sql
import SQL
38 from ...server_header
import ServerHeaders
39 from ...boilerplates
import BoilerplateIncoming, BoilerplateResponses, BoilerplateNonHTTP
42DEFAULT_DATE_PLACEHOLDER: datetime = datetime(1984, 1, 1)
47 """Container for token metadata retrieved from database.
50 token: The token string value.
51 user_id: The ID of the user who owns the token.
52 creation_date: When the token was created (server-set).
53 edit_date: When the token was last modified in database.
54 expiration_date: When the token expires and becomes invalid.
58 creation_date: datetime = DEFAULT_DATE_PLACEHOLDER
59 edit_date: datetime = DEFAULT_DATE_PLACEHOLDER
60 expiration_date: datetime = DEFAULT_DATE_PLACEHOLDER
64 """Manages token-related HTTP endpoints for authentication and validation.
66 Provides endpoints for token validation, token information retrieval,
67 token refresh, admin status checking, and token revocation.
69 disp: Disp = initialise_logger(__qualname__,
False)
71 def __init__(self, success: int = 0, error: int = 84, debug: bool =
False) ->
None:
72 """Initialize the TokenEndpoints manager.
75 success (int, optional): Success status code. Defaults to 0.
76 error (int, optional): Error status code. Defaults to 84.
77 debug (bool, optional): Enable debug logging. Defaults to False.
80 self.
disp.update_disp_debug(debug)
81 self.
disp.log_debug(
"Initialising...")
93 "ServerHeaders",
None)
95 "BoilerplateIncoming",
None)
97 "BoilerplateResponses",
None)
99 "BoilerplateNonHTTP",
None)
100 self.
disp.log_debug(
"Initialised")
103 """Break down timedelta into hours, minutes, and seconds components.
106 ttl_delta: The timedelta object to break down.
109 Dict[str, int]: Dictionary with 'hours', 'minutes', 'seconds' keys.
111 ttl_seconds = int(ttl_delta.total_seconds())
112 hours = ttl_seconds // 3600
113 remaining = ttl_seconds % 3600
114 minutes = remaining // 60
115 seconds = remaining % 60
117 "days": ttl_delta.days,
124 """Extract token from request headers.
126 Checks multiple header formats for token presence: direct token key,
127 Bearer token format, and request body parameters.
130 request (Request): The HTTP request object containing headers.
133 Union[str, None]: The extracted token string if found, None otherwise.
135 mtoken: Union[str,
None] = request.get(CONST.REQUEST_TOKEN_KEY)
136 mbearer: Union[str,
None] = request.get(CONST.REQUEST_BEARER_KEY)
137 token: Union[str,
None] = request.headers.get(CONST.REQUEST_TOKEN_KEY)
138 bearer: Union[str,
None] = request.headers.get(
139 CONST.REQUEST_BEARER_KEY
141 msg = f
"mtoken = {mtoken}, mbearer = {mbearer}"
142 msg += f
", token = {token}, bearer = {bearer}"
143 self.
disp.log_debug(msg,
"get_token_if_present")
144 if token
is None and bearer
is None and token
is None and bearer
is None:
146 if mbearer
is not None and mbearer.startswith(
'Bearer '):
147 return mbearer.split(
" ")[1]
148 if bearer
is not None and bearer.startswith(
'Bearer '):
149 return bearer.split(
" ")[1]
150 if token
is not None:
155 """Validate token correctness by checking expiration in database.
158 token (str): The token string to validate.
161 bool: True if token is valid and not expired, False otherwise.
163 title =
"is_token_correct"
164 self.
disp.log_debug(
"Checking if the token is correct.", title)
165 if isinstance(token, str)
is False:
172 CONST.TAB_CONNECTIONS,
174 where=f
"token={token}",
177 if isinstance(login_table, int):
179 if len(login_table) != 1:
181 self.
disp.log_debug(f
"login_table = {login_table}", title)
182 if datetime.now() > login_table[0][0]:
183 self.
disp.log_warning(
184 "The provided token is invalid due to excessive idle time."
187 self.
disp.log_debug(
"The token is still valid.")
191 """Validate token from request using BoilerplateNonHTTP validation.
194 request (Request): The HTTP request object containing the token.
197 bool: True if token is valid, False otherwise.
200 RuntimeError: If BoilerplateNonHTTP service is unavailable.
202 title =
"token_correct"
204 f
"request = {request}", title
208 f
"token = {token}", title
213 "BoilerplateNonHTTP",
217 self.
disp.log_error(
"BoilerplateNonHttp is missing")
218 raise RuntimeError(
"Token validation service unavailable")
222 """Retrieve user ID associated with a given token.
225 title (str): The name of the calling endpoint for logging purposes.
226 token (str): The token to look up.
229 Optional[Union[str, Response]]: User ID as string on success, Response object on error, None if services unavailable.
231 function_title =
"get_user_id_from_token"
232 usr_id_node: str =
"user_id"
237 "BoilerplateResponses not found, retuning None",
238 f
"{title}:{function_title}"
245 "SQL not found, returning None",
246 f
"{title}:{function_title}"
250 f
"Getting user id based on {token}", function_title
253 table=CONST.TAB_CONNECTIONS,
255 where=f
"token='{token}'",
258 if isinstance(current_user_raw, int):
260 current_user: List[Dict[str, Any]] = current_user_raw
261 self.
disp.log_debug(f
"current_user = {current_user}", function_title)
262 if current_user == self.
error:
265 f
"user_length = {len(current_user)}", function_title
267 if len(current_user) == 0
or len(current_user) > 1:
270 f
"current_user[0] = {current_user[0]}", function_title
272 if usr_id_node
not in current_user[0]:
274 msg =
"str(current_user[0]["
275 msg += f
"{usr_id_node}]) = {str(current_user[0][usr_id_node])}"
276 self.
disp.log_debug(msg, function_title)
277 return str(current_user[0][usr_id_node])
280 """Check if a given token correspond to a user that is an administrator or not.
283 token (str): The token to analyse.
286 bool: The administrative status.
288 title: str =
"is_token_admin"
293 "The SQL class was not initialised in the runtime manager.")
298 if not isinstance(usr_id, str):
301 table=CONST.TAB_ACCOUNTS,
303 where=f
"id='{usr_id}'",
306 self.
disp.log_debug(f
"Queried data = {current_user_raw}")
307 if isinstance(current_user_raw, int)
or current_user_raw == []:
309 if "admin" in current_user_raw[0]:
310 if str(current_user_raw[0].get(
"admin",
"0")) ==
"1":
311 self.
disp.log_warning(
312 f
"User account {usr_id} with name {current_user_raw[0].get('username', '<unknown_username>')} is an admin"
314 self.
disp.log_warning(
315 "They probably called an admin endpoint."
321 """Retrieve token metadata from database.
324 token (str): The token string to look up.
327 Optional[TokenInfo]: Token information dataclass if found, None otherwise.
334 "The SQL class was not initialised in the runtime manager."
338 table=CONST.TAB_CONNECTIONS,
340 where=f
"token='{token}'",
343 self.
disp.log_debug(f
"Queried data = {token_data}")
344 if isinstance(token_data, int)
or token_data == []:
346 token_data_dict: Dict[str, Any] = token_data[0]
347 resp.token = token_data_dict.get(
"token",
"")
348 resp.user_id = int(token_data_dict.get(
"user_id", 0))
349 resp.creation_date = token_data_dict.get(
350 "creation_date", DEFAULT_DATE_PLACEHOLDER)
351 resp.edit_date = token_data_dict.get(
352 "edit_date", DEFAULT_DATE_PLACEHOLDER)
353 resp.expiration_date = token_data_dict.get(
354 "expiration_date", DEFAULT_DATE_PLACEHOLDER)
358 """Retrieve user account information from database.
361 user_id (int): The user ID to look up.
364 Optional[Dict[str, Any]]: User account data dictionary if found, None otherwise.
371 "The SQL class was not initialised in the runtime manager.")
374 table=CONST.TAB_ACCOUNTS,
376 where=f
"id='{user_id}'",
379 self.
disp.log_debug(f
"Queried data = {current_user_raw}")
380 if isinstance(current_user_raw, int)
or current_user_raw == []:
382 return current_user_raw[0]
385 """Validate if the provided token is currently valid.
387 Returns ok/ko message indicating token validity.
390 request (Request): The HTTP request containing the token.
393 Response: HTTP response with validity status (ok/ko).
401 """Check if the provided token corresponds to an administrator account.
403 Returns ok if admin, ko if not admin, and error responses for invalid/missing tokens.
406 request (Request): The HTTP request containing the token.
409 Response: HTTP response with admin status (ok/ko) or error code.
413 return HCI.invalid_token(
"No token provided", content_type=HttpDataTypes.TEXT, headers=self.
server_headers_initialised.for_text())
416 return HCI.internal_server_error(
"Failed to check admin status", content_type=HttpDataTypes.TEXT, headers=self.
server_headers_initialised.for_text())
422 """Retrieve the remaining time-to-live (TTL) for the provided token.
424 Calculates TTL as the difference between expiration date and current time in seconds.
427 request (Request): The HTTP request containing the token.
430 Response: JSON response containing TTL in seconds, or appropriate error response.
434 return HCI.unauthorized(
"No token provided", content_type=HttpDataTypes.TEXT, headers=self.
server_headers_initialised.for_text())
436 return HCI.service_unavailable(
"The server is missing a critical component for this endpoint to complete.", content_type=HttpDataTypes.TEXT, headers=self.
server_headers_initialised.for_text())
439 return HCI.invalid_token(
"The provided token does not exist.", content_type=HttpDataTypes.TEXT, headers=self.
server_headers_initialised.for_text())
440 ttl_delta = (token_info.expiration_date - datetime.now())
441 body_content: Dict[str, Any] = {
442 "ttl_seconds": ttl_delta.total_seconds(),
448 """Endpoint to get information about the provided token.
451 request (Request): The incoming request.
454 Response: The response to send back to the client.
456 self.
disp.log_debug(
"Getting token info...")
458 return HCI.service_unavailable(
"The server is missing a critical component for this endpoint to complete.", content_type=HttpDataTypes.TEXT, headers=self.
server_headers_initialised.for_text())
461 return HCI.unauthorized(
"No token provided", content_type=HttpDataTypes.TEXT, headers=self.
server_headers_initialised.for_text())
464 return HCI.invalid_token(
"The provided token is invalid", content_type=HttpDataTypes.TEXT, headers=self.
server_headers_initialised.for_text())
467 return HCI.invalid_token(
"The provided token does not exist", content_type=HttpDataTypes.TEXT, headers=self.
server_headers_initialised.for_text())
470 return HCI.internal_server_error(
"Failed to retrieve user information", content_type=HttpDataTypes.TEXT, headers=self.
server_headers_initialised.for_text())
471 ttl_delta = (token_info.expiration_date - datetime.now())
472 body_content: Dict[str, Any] = {
473 "user_id": token_info.user_id,
474 "token_creation_date": token_info.creation_date.isoformat(),
475 "token_edit_date": token_info.edit_date.isoformat(),
476 "token_expiration_date": token_info.expiration_date.isoformat(),
477 "ttl_seconds": ttl_delta.total_seconds(),
479 "username": user_info.get(
"username",
"<unknown_username>"),
480 "email": CONST.hide_user_email(user_info.get(
"email",
""), self.
disp),
481 "admin": str(user_info.get(
"admin",
"0")) ==
"1",
482 "password": user_info.get(
"password",
"") !=
"",
483 "gender": user_info.get(
"gender",
"<unknown_gender>"),
484 "age": user_info.get(
"age",
"<unknown_age>"),
485 "last_connection": datetime_to_string(user_info.get(
"last_connection",
"<unknown_last_connection>"),
"<unknown_last_connection>"),
486 "creation_date": datetime_to_string(user_info.get(
"creation_date",
"<unknown_creation_date>"),
"<unknown_creation_date>"),
487 "edit_date": datetime_to_string(user_info.get(
"edit_date",
"<unknown_edit_date>"),
"<unknown_edit_date>"),
488 "deletion_date": datetime_to_string(user_info.get(
"deletion_date",
"<unknown_deletion_date>"),
"<unknown_deletion_date>")
490 self.
disp.log_debug(f
"Token info response body: {body_content}")
494 """Generate and store a new token for the authenticated user.
496 Invalidates the old token and creates a fresh one with updated expiration date.
497 This operation is non-idempotent: each call produces a different token.
500 request (Request): The HTTP request containing the current token.
503 Response: JSON response containing the new token and expiration date, or error response.
508 return HCI.service_unavailable(
"The server is missing a critical component for this endpoint to complete.", content_type=HttpDataTypes.TEXT, headers=self.
server_headers_initialised.for_text())
512 return HCI.service_unavailable(
"The server is missing a critical component for this endpoint to complete.", content_type=HttpDataTypes.TEXT, headers=self.
server_headers_initialised.for_text())
516 return HCI.service_unavailable(
"The server is missing a critical component for this endpoint to complete.", content_type=HttpDataTypes.TEXT, headers=self.
server_headers_initialised.for_text())
519 return HCI.unauthorized(
"No token provided", content_type=HttpDataTypes.TEXT, headers=self.
server_headers_initialised.for_text())
522 return HCI.invalid_token(
"The provided token is invalid", content_type=HttpDataTypes.TEXT, headers=self.
server_headers_initialised.for_text())
525 return HCI.invalid_token(
"The provided token does not exist", content_type=HttpDataTypes.TEXT, headers=self.
server_headers_initialised.for_text())
528 CONST.UA_TOKEN_LIFESPAN
530 creation_date = datetime.now()
543 table=CONST.TAB_CONNECTIONS,
546 where=f
"token='{token}'"
549 return HCI.internal_server_error(
"Failed to refresh token", content_type=HttpDataTypes.TEXT, headers=self.
server_headers_initialised.for_text())
551 title=
"refresh_token",
552 message=
"Token refreshed successfully",
557 body[
"token"] = new_token
558 body[
"expiration_date"] = new_lifespan.isoformat()
562 """Revoke all active tokens for the authenticated user's account.
564 Removes all tokens associated with the user, effectively logging them out
565 from all devices/sessions.
568 request (Request): The HTTP request containing the user's current token.
571 Response: Success message if all tokens revoked, or appropriate error response.
577 return HCI.service_unavailable(
"The server is missing a critical component for this endpoint to complete.", content_type=HttpDataTypes.TEXT, headers=self.
server_headers_initialised.for_text())
579 return HCI.unauthorized(
"No token provided", content_type=HttpDataTypes.TEXT, headers=self.
server_headers_initialised.for_text())
582 return HCI.invalid_token(
"The provided token is invalid", content_type=HttpDataTypes.TEXT, headers=self.
server_headers_initialised.for_text())
585 return HCI.invalid_token(
"The provided token does not exist", content_type=HttpDataTypes.TEXT, headers=self.
server_headers_initialised.for_text())
587 table=CONST.TAB_CONNECTIONS,
588 where=f
"user_id='{token_info.user_id}'"
591 return HCI.internal_server_error(
"Failed to revoke tokens", content_type=HttpDataTypes.TEXT, headers=self.
server_headers_initialised.for_text())
Response get_time_to_live(self, Request request)
Optional[TokenInfo] _get_token_info(self, str token)
bool _is_token_correct(self, str token)
Optional[] boilerplate_incoming_initialised
Union[str, None] _get_token_if_present(self, Request request)
Response get_token_info(self, Request request)
Optional[Union[str, Response]] _get_user_id_from_token(self, str title, str token)
Optional[bool] _is_token_admin(self, str token)
Response delete_revoke_account_token(self, Request request)
RuntimeManager server_headers_initialised
Response get_admin(self, Request request)
RuntimeManager runtime_manager
Response post_refresh_token(self, Request request)
Optional[] sql_connection
RuntimeManager boilerplate_responses_initialised
Optional[] boilerplate_non_http_initialised
Optional[Union[str, Response]] sql_connection
Optional[] boilerplate_responses_initialised
Response get_token_valid(self, Request request)
Dict[str, int] _get_ttl_breakdown(self, timedelta ttl_delta)
RuntimeManager boilerplate_non_http_initialised
Optional[Dict[str, Any]] _get_user_info(self, int user_id)
bool _token_correct(self, Request request)
None __init__(self, int success=0, int error=84, bool debug=False)