Cat Feeder  1.0.0
The Cat feeder project
Loading...
Searching...
No Matches
token.py
Go to the documentation of this file.
1r"""
2# +==== BEGIN CatFeeder =================+
3# LOGO:
4# ..............(..../\
5# ...............)..(.')
6# ..............(../..)
7# ...............\‍(__)|
8# Inspired by Joan Stark
9# source https://www.asciiart.eu/
10# animals/cats
11# /STOP
12# PROJECT: CatFeeder
13# FILE: token.py
14# CREATION DATE: 10-01-2026
15# LAST Modified: 22:18:0 11-01-2026
16# DESCRIPTION:
17# This is the backend server in charge of making the actual website work.
18# /STOP
19# COPYRIGHT: (c) Cat Feeder
20# PURPOSE: File in charge of providing endpoints to allow the front-end to gather info on the tokens.
21# // AR
22# +==== END CatFeeder =================+
23"""
24
25
26from typing import Optional, Dict, Union, Any, List, TYPE_CHECKING
27from dataclasses import dataclass
28from datetime import datetime, timedelta
29from display_tty import Disp, initialise_logger
30from fastapi import Request, Response
31from ...http_codes import HCI, HttpDataTypes
32from ...core import RuntimeManager, RI
33from ...utils import CONST
34from ..endpoint_helpers import datetime_to_string
35
36if TYPE_CHECKING:
37 from ...sql import SQL
38 from ...server_header import ServerHeaders
39 from ...boilerplates import BoilerplateIncoming, BoilerplateResponses, BoilerplateNonHTTP
40
41
42DEFAULT_DATE_PLACEHOLDER: datetime = datetime(1984, 1, 1)
43
44
45@dataclass
47 """Container for token metadata retrieved from database.
48
49 Attributes:
50 token: The token string value.
51 user_id: The ID of the user who owns the token.
52 creation_date: When the token was created (server-set).
53 edit_date: When the token was last modified in database.
54 expiration_date: When the token expires and becomes invalid.
55 """
56 token: str = ""
57 user_id: int = 0
58 creation_date: datetime = DEFAULT_DATE_PLACEHOLDER
59 edit_date: datetime = DEFAULT_DATE_PLACEHOLDER
60 expiration_date: datetime = DEFAULT_DATE_PLACEHOLDER
61
62
64 """Manages token-related HTTP endpoints for authentication and validation.
65
66 Provides endpoints for token validation, token information retrieval,
67 token refresh, admin status checking, and token revocation.
68 """
69 disp: Disp = initialise_logger(__qualname__, False)
70
71 def __init__(self, success: int = 0, error: int = 84, debug: bool = False) -> None:
72 """Initialize the TokenEndpoints manager.
73
74 Args:
75 success (int, optional): Success status code. Defaults to 0.
76 error (int, optional): Error status code. Defaults to 84.
77 debug (bool, optional): Enable debug logging. Defaults to False.
78 """
79 # ------------------------ The logging function ------------------------
80 self.disp.update_disp_debug(debug)
81 self.disp.log_debug("Initialising...")
82 # -------------------------- Inherited values --------------------------
83 self.debug: bool = debug
84 self.success: int = success
85 self.error: int = error
86 self.runtime_manager: RuntimeManager = RI
87 # -------------------------- Shared instances --------------------------
88 self.sql_connectionsql_connectionsql_connection: Optional["SQL"] = self.runtime_manager.get_if_exists(
89 "SQL",
90 None
91 )
92 self.server_headers_initialised: "ServerHeaders" = self.runtime_manager.get(
93 "ServerHeaders", None)
94 self.boilerplate_incoming_initialised: Optional["BoilerplateIncoming"] = self.runtime_manager.get_if_exists(
95 "BoilerplateIncoming", None)
96 self.boilerplate_responses_initialisedboilerplate_responses_initialised: Optional["BoilerplateResponses"] = self.runtime_manager.get_if_exists(
97 "BoilerplateResponses", None)
98 self.boilerplate_non_http_initialisedboilerplate_non_http_initialised: Optional["BoilerplateNonHTTP"] = self.runtime_manager.get_if_exists(
99 "BoilerplateNonHTTP", None)
100 self.disp.log_debug("Initialised")
101
102 def _get_ttl_breakdown(self, ttl_delta: timedelta) -> Dict[str, int]:
103 """Break down timedelta into hours, minutes, and seconds components.
104
105 Args:
106 ttl_delta: The timedelta object to break down.
107
108 Returns:
109 Dict[str, int]: Dictionary with 'hours', 'minutes', 'seconds' keys.
110 """
111 ttl_seconds = int(ttl_delta.total_seconds())
112 hours = ttl_seconds // 3600
113 remaining = ttl_seconds % 3600
114 minutes = remaining // 60
115 seconds = remaining % 60
116 return {
117 "days": ttl_delta.days,
118 "hours": hours,
119 "minutes": minutes,
120 "seconds": seconds
121 }
122
123 def _get_token_if_present(self, request: Request) -> Union[str, None]:
124 """Extract token from request headers.
125
126 Checks multiple header formats for token presence: direct token key,
127 Bearer token format, and request body parameters.
128
129 Args:
130 request (Request): The HTTP request object containing headers.
131
132 Returns:
133 Union[str, None]: The extracted token string if found, None otherwise.
134 """
135 mtoken: Union[str, None] = request.get(CONST.REQUEST_TOKEN_KEY)
136 mbearer: Union[str, None] = request.get(CONST.REQUEST_BEARER_KEY)
137 token: Union[str, None] = request.headers.get(CONST.REQUEST_TOKEN_KEY)
138 bearer: Union[str, None] = request.headers.get(
139 CONST.REQUEST_BEARER_KEY
140 )
141 msg = f"mtoken = {mtoken}, mbearer = {mbearer}"
142 msg += f", token = {token}, bearer = {bearer}"
143 self.disp.log_debug(msg, "get_token_if_present")
144 if token is None and bearer is None and token is None and bearer is None:
145 return None
146 if mbearer is not None and mbearer.startswith('Bearer '):
147 return mbearer.split(" ")[1]
148 if bearer is not None and bearer.startswith('Bearer '):
149 return bearer.split(" ")[1]
150 if token is not None:
151 return token
152 return mtoken
153
154 def _is_token_correct(self, token: str) -> bool:
155 """Validate token correctness by checking expiration in database.
156
157 Args:
158 token (str): The token string to validate.
159
160 Returns:
161 bool: True if token is valid and not expired, False otherwise.
162 """
163 title = "is_token_correct"
164 self.disp.log_debug("Checking if the token is correct.", title)
165 if isinstance(token, str) is False:
166 return False
170 return False
171 login_table = self.sql_connectionsql_connectionsql_connection.get_data_from_table(
172 CONST.TAB_CONNECTIONS,
173 ["expiration_date"],
174 where=f"token={token}",
175 beautify=False
176 )
177 if isinstance(login_table, int):
178 return False
179 if len(login_table) != 1:
180 return False
181 self.disp.log_debug(f"login_table = {login_table}", title)
182 if datetime.now() > login_table[0][0]:
183 self.disp.log_warning(
184 "The provided token is invalid due to excessive idle time."
185 )
186 return False
187 self.disp.log_debug("The token is still valid.")
188 return True
189
190 def _token_correct(self, request: Request) -> bool:
191 """Validate token from request using BoilerplateNonHTTP validation.
192
193 Args:
194 request (Request): The HTTP request object containing the token.
195
196 Returns:
197 bool: True if token is valid, False otherwise.
198
199 Raises:
200 RuntimeError: If BoilerplateNonHTTP service is unavailable.
201 """
202 title = "token_correct"
203 self.disp.log_debug(
204 f"request = {request}", title
205 )
206 token = self._get_token_if_present(request)
207 self.disp.log_debug(
208 f"token = {token}", title
209 )
210 if token is None:
211 return False
213 "BoilerplateNonHTTP",
215 )
217 self.disp.log_error("BoilerplateNonHttp is missing")
218 raise RuntimeError("Token validation service unavailable")
220
221 def _get_user_id_from_token(self, title: str, token: str) -> Optional[Union[str, Response]]:
222 """Retrieve user ID associated with a given token.
223
224 Args:
225 title (str): The name of the calling endpoint for logging purposes.
226 token (str): The token to look up.
227
228 Returns:
229 Optional[Union[str, Response]]: User ID as string on success, Response object on error, None if services unavailable.
230 """
231 function_title = "get_user_id_from_token"
232 usr_id_node: str = "user_id"
233 self.boilerplate_responses_initialisedboilerplate_responses_initialised: Optional[BoilerplateResponses] = RI.get_if_exists(
236 self.disp.log_error(
237 "BoilerplateResponses not found, retuning None",
238 f"{title}:{function_title}"
239 )
240 return None
244 self.disp.log_error(
245 "SQL not found, returning None",
246 f"{title}:{function_title}"
247 )
248 return None
249 self.disp.log_debug(
250 f"Getting user id based on {token}", function_title
251 )
252 current_user_raw: Union[int, List[Dict[str, Any]]] = self.sql_connectionsql_connectionsql_connection.get_data_from_table(
253 table=CONST.TAB_CONNECTIONS,
254 column="*",
255 where=f"token='{token}'",
256 beautify=True
257 )
258 if isinstance(current_user_raw, int):
259 return self.boilerplate_responses_initialisedboilerplate_responses_initialised.user_not_found(title, token)
260 current_user: List[Dict[str, Any]] = current_user_raw
261 self.disp.log_debug(f"current_user = {current_user}", function_title)
262 if current_user == self.error:
263 return self.boilerplate_responses_initialisedboilerplate_responses_initialised.user_not_found(title, token)
264 self.disp.log_debug(
265 f"user_length = {len(current_user)}", function_title
266 )
267 if len(current_user) == 0 or len(current_user) > 1:
268 return self.boilerplate_responses_initialisedboilerplate_responses_initialised.user_not_found(title, token)
269 self.disp.log_debug(
270 f"current_user[0] = {current_user[0]}", function_title
271 )
272 if usr_id_node not in current_user[0]:
273 return self.boilerplate_responses_initialisedboilerplate_responses_initialised.user_not_found(title, token)
274 msg = "str(current_user[0]["
275 msg += f"{usr_id_node}]) = {str(current_user[0][usr_id_node])}"
276 self.disp.log_debug(msg, function_title)
277 return str(current_user[0][usr_id_node])
278
279 def _is_token_admin(self, token: str) -> Optional[bool]:
280 """Check if a given token correspond to a user that is an administrator or not.
281
282 Args:
283 token (str): The token to analyse.
284
285 Returns:
286 bool: The administrative status.
287 """
288 title: str = "is_token_admin"
292 self.disp.log_error(
293 "The SQL class was not initialised in the runtime manager.")
294 return None
295 usr_id: Union[
296 str, Response, None
297 ] = self._get_user_id_from_token(title, token)
298 if not isinstance(usr_id, str):
299 return False
300 current_user_raw: Union[int, List[Dict[str, Any]]] = self.sql_connectionsql_connectionsql_connection.get_data_from_table(
301 table=CONST.TAB_ACCOUNTS,
302 column="*",
303 where=f"id='{usr_id}'",
304 beautify=True
305 )
306 self.disp.log_debug(f"Queried data = {current_user_raw}")
307 if isinstance(current_user_raw, int) or current_user_raw == []:
308 return False
309 if "admin" in current_user_raw[0]:
310 if str(current_user_raw[0].get("admin", "0")) == "1":
311 self.disp.log_warning(
312 f"User account {usr_id} with name {current_user_raw[0].get('username', '<unknown_username>')} is an admin"
313 )
314 self.disp.log_warning(
315 "They probably called an admin endpoint."
316 )
317 return True
318 return False
319
320 def _get_token_info(self, token: str) -> Optional[TokenInfo]:
321 """Retrieve token metadata from database.
322
323 Args:
324 token (str): The token string to look up.
325
326 Returns:
327 Optional[TokenInfo]: Token information dataclass if found, None otherwise.
328 """
329 resp: TokenInfo = TokenInfo()
333 self.disp.log_error(
334 "The SQL class was not initialised in the runtime manager."
335 )
336 return None
337 token_data: Union[int, List[Dict[str, Any]]] = self.sql_connectionsql_connectionsql_connection.get_data_from_table(
338 table=CONST.TAB_CONNECTIONS,
339 column="*",
340 where=f"token='{token}'",
341 beautify=True
342 )
343 self.disp.log_debug(f"Queried data = {token_data}")
344 if isinstance(token_data, int) or token_data == []:
345 return None
346 token_data_dict: Dict[str, Any] = token_data[0]
347 resp.token = token_data_dict.get("token", "")
348 resp.user_id = int(token_data_dict.get("user_id", 0))
349 resp.creation_date = token_data_dict.get(
350 "creation_date", DEFAULT_DATE_PLACEHOLDER)
351 resp.edit_date = token_data_dict.get(
352 "edit_date", DEFAULT_DATE_PLACEHOLDER)
353 resp.expiration_date = token_data_dict.get(
354 "expiration_date", DEFAULT_DATE_PLACEHOLDER)
355 return resp
356
357 def _get_user_info(self, user_id: int) -> Optional[Dict[str, Any]]:
358 """Retrieve user account information from database.
359
360 Args:
361 user_id (int): The user ID to look up.
362
363 Returns:
364 Optional[Dict[str, Any]]: User account data dictionary if found, None otherwise.
365 """
366
370 self.disp.log_error(
371 "The SQL class was not initialised in the runtime manager.")
372 return None
373 current_user_raw: Union[int, List[Dict[str, Any]]] = self.sql_connectionsql_connectionsql_connection.get_data_from_table(
374 table=CONST.TAB_ACCOUNTS,
375 column="*",
376 where=f"id='{user_id}'",
377 beautify=True
378 )
379 self.disp.log_debug(f"Queried data = {current_user_raw}")
380 if isinstance(current_user_raw, int) or current_user_raw == []:
381 return None
382 return current_user_raw[0]
383
384 def get_token_valid(self, request: Request) -> Response:
385 """Validate if the provided token is currently valid.
386
387 Returns ok/ko message indicating token validity.
388
389 Args:
390 request (Request): The HTTP request containing the token.
391
392 Returns:
393 Response: HTTP response with validity status (ok/ko).
394 """
395 token_correct = self._token_correct(request)
396 if token_correct:
397 return HCI.success("ok", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
398 return HCI.invalid_token("ko", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
399
400 def get_admin(self, request: Request) -> Response:
401 """Check if the provided token corresponds to an administrator account.
402
403 Returns ok if admin, ko if not admin, and error responses for invalid/missing tokens.
404
405 Args:
406 request (Request): The HTTP request containing the token.
407
408 Returns:
409 Response: HTTP response with admin status (ok/ko) or error code.
410 """
411 token = self._get_token_if_present(request)
412 if not token:
413 return HCI.invalid_token("No token provided", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
414 admin = self._is_token_admin(token)
415 if admin is None:
416 return HCI.internal_server_error("Failed to check admin status", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
417 if not admin:
418 return HCI.unauthorized("ko", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
419 return HCI.success("ok", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
420
421 def get_time_to_live(self, request: Request) -> Response:
422 """Retrieve the remaining time-to-live (TTL) for the provided token.
423
424 Calculates TTL as the difference between expiration date and current time in seconds.
425
426 Args:
427 request (Request): The HTTP request containing the token.
428
429 Returns:
430 Response: JSON response containing TTL in seconds, or appropriate error response.
431 """
432 token = self._get_token_if_present(request)
433 if not token:
434 return HCI.unauthorized("No token provided", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
436 return HCI.service_unavailable("The server is missing a critical component for this endpoint to complete.", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
437 token_info = self._get_token_info(token)
438 if not token_info:
439 return HCI.invalid_token("The provided token does not exist.", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
440 ttl_delta = (token_info.expiration_date - datetime.now())
441 body_content: Dict[str, Any] = {
442 "ttl_seconds": ttl_delta.total_seconds(),
443 "ttl_breakdown": self._get_ttl_breakdown(ttl_delta)
444 }
445 return HCI.success(body_content, content_type=HttpDataTypes.JSON, headers=self.server_headers_initialised.for_json())
446
447 def get_token_info(self, request: Request) -> Response:
448 """Endpoint to get information about the provided token.
449
450 Args:
451 request (Request): The incoming request.
452
453 Returns:
454 Response: The response to send back to the client.
455 """
456 self.disp.log_debug("Getting token info...")
458 return HCI.service_unavailable("The server is missing a critical component for this endpoint to complete.", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
459 token = self._get_token_if_present(request)
460 if not token:
461 return HCI.unauthorized("No token provided", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
462 connected = self._is_token_correct(token)
463 if not connected:
464 return HCI.invalid_token("The provided token is invalid", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
465 token_info = self._get_token_info(token)
466 if not token_info:
467 return HCI.invalid_token("The provided token does not exist", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
468 user_info = self._get_user_info(token_info.user_id)
469 if not user_info:
470 return HCI.internal_server_error("Failed to retrieve user information", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
471 ttl_delta = (token_info.expiration_date - datetime.now())
472 body_content: Dict[str, Any] = {
473 "user_id": token_info.user_id,
474 "token_creation_date": token_info.creation_date.isoformat(),
475 "token_edit_date": token_info.edit_date.isoformat(),
476 "token_expiration_date": token_info.expiration_date.isoformat(),
477 "ttl_seconds": ttl_delta.total_seconds(),
478 "ttl_breakdown": self._get_ttl_breakdown(ttl_delta),
479 "username": user_info.get("username", "<unknown_username>"),
480 "email": CONST.hide_user_email(user_info.get("email", ""), self.disp),
481 "admin": str(user_info.get("admin", "0")) == "1",
482 "password": user_info.get("password", "") != "",
483 "gender": user_info.get("gender", "<unknown_gender>"),
484 "age": user_info.get("age", "<unknown_age>"),
485 "last_connection": datetime_to_string(user_info.get("last_connection", "<unknown_last_connection>"), "<unknown_last_connection>"),
486 "creation_date": datetime_to_string(user_info.get("creation_date", "<unknown_creation_date>"), "<unknown_creation_date>"),
487 "edit_date": datetime_to_string(user_info.get("edit_date", "<unknown_edit_date>"), "<unknown_edit_date>"),
488 "deletion_date": datetime_to_string(user_info.get("deletion_date", "<unknown_deletion_date>"), "<unknown_deletion_date>")
489 }
490 self.disp.log_debug(f"Token info response body: {body_content}")
491 return HCI.success(body_content, content_type=HttpDataTypes.JSON, headers=self.server_headers_initialised.for_json())
492
493 def post_refresh_token(self, request: Request) -> Response:
494 """Generate and store a new token for the authenticated user.
495
496 Invalidates the old token and creates a fresh one with updated expiration date.
497 This operation is non-idempotent: each call produces a different token.
498
499 Args:
500 request (Request): The HTTP request containing the current token.
501
502 Returns:
503 Response: JSON response containing the new token and expiration date, or error response.
504 """
508 return HCI.service_unavailable("The server is missing a critical component for this endpoint to complete.", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
512 return HCI.service_unavailable("The server is missing a critical component for this endpoint to complete.", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
516 return HCI.service_unavailable("The server is missing a critical component for this endpoint to complete.", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
517 token = self._get_token_if_present(request)
518 if not token:
519 return HCI.unauthorized("No token provided", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
520 connected = self._is_token_correct(token)
521 if not connected:
522 return HCI.invalid_token("The provided token is invalid", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
523 token_info = self._get_token_info(token)
524 if not token_info:
525 return HCI.invalid_token("The provided token does not exist", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
528 CONST.UA_TOKEN_LIFESPAN
529 )
530 creation_date = datetime.now()
531 data = [
532 new_token,
533 new_lifespan,
534 creation_date
535 ]
536 columns = [
537 "token",
538 "expiration_date",
539 "creation_date"
540 ]
541
542 status = self.sql_connectionsql_connectionsql_connection.update_data_in_table(
543 table=CONST.TAB_CONNECTIONS,
544 data=data,
545 column=columns,
546 where=f"token='{token}'"
547 )
548 if status != self.success:
549 return HCI.internal_server_error("Failed to refresh token", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
551 title="refresh_token",
552 message="Token refreshed successfully",
553 resp="success",
554 token=new_token,
555 error=False
556 )
557 body["token"] = new_token
558 body["expiration_date"] = new_lifespan.isoformat()
559 return HCI.success(body, content_type=HttpDataTypes.JSON, headers=self.server_headers_initialised.for_json())
560
561 def delete_revoke_account_token(self, request: Request) -> Response:
562 """Revoke all active tokens for the authenticated user's account.
563
564 Removes all tokens associated with the user, effectively logging them out
565 from all devices/sessions.
566
567 Args:
568 request (Request): The HTTP request containing the user's current token.
569
570 Returns:
571 Response: Success message if all tokens revoked, or appropriate error response.
572 """
573 token = self._get_token_if_present(request)
577 return HCI.service_unavailable("The server is missing a critical component for this endpoint to complete.", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
578 if not token:
579 return HCI.unauthorized("No token provided", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
580 connected = self._is_token_correct(token)
581 if not connected:
582 return HCI.invalid_token("The provided token is invalid", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
583 token_info = self._get_token_info(token)
584 if not token_info:
585 return HCI.invalid_token("The provided token does not exist", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
586 status = self.sql_connectionsql_connectionsql_connection.remove_data_from_table(
587 table=CONST.TAB_CONNECTIONS,
588 where=f"user_id='{token_info.user_id}'"
589 )
590 if status != self.success:
591 return HCI.internal_server_error("Failed to revoke tokens", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
592 return HCI.success("All tokens revoked", content_type=HttpDataTypes.TEXT, headers=self.server_headers_initialised.for_text())
Optional[TokenInfo] _get_token_info(self, str token)
Definition token.py:320
Union[str, None] _get_token_if_present(self, Request request)
Definition token.py:123
Optional[Union[str, Response]] _get_user_id_from_token(self, str title, str token)
Definition token.py:221
Response delete_revoke_account_token(self, Request request)
Definition token.py:561
Dict[str, int] _get_ttl_breakdown(self, timedelta ttl_delta)
Definition token.py:102
Optional[Dict[str, Any]] _get_user_info(self, int user_id)
Definition token.py:357
None __init__(self, int success=0, int error=84, bool debug=False)
Definition token.py:71