Cat Feeder  1.0.0
The Cat feeder project
Loading...
Searching...
No Matches
user_endpoints.py
Go to the documentation of this file.
1r"""
2# +==== BEGIN CatFeeder =================+
3# LOGO:
4# ..............(..../\
5# ...............)..(.')
6# ..............(../..)
7# ...............\‍(__)|
8# Inspired by Joan Stark
9# source https://www.asciiart.eu/
10# animals/cats
11# /STOP
12# PROJECT: CatFeeder
13# FILE: user_endpoints.py
14# CREATION DATE: 19-11-2025
15# LAST Modified: 15:34:9 22-01-2026
16# DESCRIPTION:
17# This is the backend server in charge of making the actual website work.
18# /STOP
19# COPYRIGHT: (c) Cat Feeder
20# PURPOSE: The endpoints used for tracking the user requirements.
21# // AR
22# +==== END CatFeeder =================+
23"""
24from typing import TYPE_CHECKING, Union
25from display_tty import Disp, initialise_logger
26from fastapi import Request, Response
27from ...core import RuntimeManager, RI
28from ...utils import constants as CONST, PasswordHandling
29from ...e_mail import MailManagement
30from ...http_codes import HCI, HttpDataTypes, HTTP_DEFAULT_TYPE
31
32if TYPE_CHECKING:
33 from typing import List, Dict, Any, Optional
34 from ...sql import SQL
35 from ...server_header import ServerHeaders
36 from ...boilerplates import BoilerplateIncoming, BoilerplateResponses, BoilerplateNonHTTP
37 from ...favicon import FaviconUser
38
39
41 """Handle user-related HTTP endpoints.
42
43 The class implements endpoints for authentication, registration,
44 password reset, profile management, email verification and session
45 management used by the CatFeeder backend.
46 """
47
48 disp: Disp = initialise_logger(__qualname__, False)
49
50 def __init__(self, error: int = 84, success: int = 0, debug: bool = False) -> None:
51 """_summary_
52 """
53 # ------------------------ The logging function ------------------------
54 self.disp.update_disp_debug(debug)
55 self.disp.log_debug("Initialising...")
56 # -------------------------- Inherited values --------------------------
57 self.error: int = error
58 self.success: int = success
59 self.debug: bool = debug
60 self.runtime_manager: RuntimeManager = RI
61 # ------------------------ The password checker ------------------------
62 if not self.runtime_manager.exists("PasswordHandling"):
63 self.runtime_manager.set(
64 PasswordHandling,
65 error=self.error,
66 success=self.success,
67 debug=self.debug
68 )
69 self.password_handling_initialised: PasswordHandling = self.runtime_manager.get(
70 "PasswordHandling")
71 # ---------------------------- Mail sending ----------------------------
72 if not self.runtime_manager.exists("MailManagement"):
73 self.runtime_manager.set(
74 MailManagement,
75 **{
76 "error": self.error,
77 "success": self.success,
78 "debug": self.debug
79 }
80 )
81 self.mail_management_initialised: MailManagement = self.runtime_manager.get(
82 "MailManagement")
83 # -------------------------- Shared instances --------------------------
84 self.boilerplate_incoming_initialised: "BoilerplateIncoming" = self.runtime_manager.get(
85 "BoilerplateIncoming")
86 self.boilerplate_responses_initialised: "BoilerplateResponses" = self.runtime_manager.get(
87 "BoilerplateResponses")
88 self.boilerplate_non_http_initialised: "BoilerplateNonHTTP" = self.runtime_manager.get(
89 "BoilerplateNonHTTP")
90 self.database_link: "SQL" = self.runtime_manager.get("SQL")
91 self.server_headers_initialised: "ServerHeaders" = self.runtime_manager.get(
92 "ServerHeaders")
93 self.favicon_user: Optional["FaviconUser"] = self.runtime_manager.get_if_exists(
94 "FaviconUser", None)
95 self.disp.log_debug("Initialised")
96
97 async def post_login(self, request: Request) -> Response:
98 """Log in a user.
99
100 Validate provided credentials and return an authentication token on
101 success. Produces a suitable HTTP response for success, unauthorized
102 or error cases.
103
104 Args:
105 request: The incoming FastAPI request containing JSON with
106 ``email`` and ``password``.
107
108 Returns:
109 Response: A FastAPI response with the result body and status.
110 """
111 title = "Login"
112 request_body = await self.boilerplate_incoming_initialised.get_body(request)
113 self.disp.log_debug(f"Request body: {request_body}", title)
114 if not request_body or not all(key in request_body for key in ("email", "password")):
115 return self.boilerplate_responses_initialised.bad_request(title)
116 email = request_body["email"]
117 password = request_body["password"]
118 user_info = self.database_link.get_data_from_table(
119 CONST.TAB_ACCOUNTS, "*", f"email='{email}'"
120 )
121 self.disp.log_debug(f"Retrived data: {user_info}", title)
122 if isinstance(user_info, int) or len(user_info) == 0:
123 return self.boilerplate_responses_initialised.unauthorized(title)
124 if self.password_handling_initialised.check_password(password, user_info[0]["password"]) is False:
125 return self.boilerplate_responses_initialised.unauthorized(title)
126 data = self.boilerplate_incoming_initialised.log_user_in(
127 email
128 )
129 if data["status"] == self.error:
130 body = self.boilerplate_responses_initialised.build_response_body(
131 title=title,
132 message="Login failed.",
133 resp="error",
134 token=data["token"],
135 error=True
136 )
137 return HCI.forbidden(content=body, content_type=HTTP_DEFAULT_TYPE, headers=self.server_headers_initialised.for_json())
138 name = user_info[0]["username"]
139 body = self.boilerplate_responses_initialised.build_response_body(
140 title=title,
141 message=f"Welcome {name}",
142 resp="success",
143 token=data["token"],
144 error=False
145 )
146 body["token"] = data["token"]
147 return HCI.success(content=body, content_type=HTTP_DEFAULT_TYPE, headers=self.server_headers_initialised.for_json())
148
149 def _check_gender(self, gender: str) -> str:
150 default = "RNS"
151 self.disp.log_debug(f"Default gender: '{default}'")
152 if not isinstance(gender, str):
153 self.disp.log_debug(
154 "The provided gender is not a string, returning default"
155 )
156 return default
157 gender_lower = gender.lower()
158 if gender_lower in ("m", "male", "man"):
159 self.disp.log_debug("Gender is male")
160 return "M"
161 if gender_lower in ("f", "female", "femme"):
162 self.disp.log_debug("Gender is female")
163 return "F"
164 self.disp.log_debug(
165 "Gender is either not in the list or Rather Not Say"
166 )
167 return default
168
169 def _check_age_range(self, age: Union[str, int, float, None]) -> str:
170 default: str = "15-20"
171 self.disp.log_debug(f"Default age: '{default}'")
172
173 # Define valid age ranges
174 age_ranges = [
175 (0, 5, "0-5"),
176 (6, 10, "6-10"),
177 (11, 15, "11-15"),
178 (16, 20, "16-20"),
179 (21, 25, "21-25"),
180 (26, 30, "26-30"),
181 (31, 40, "31-40"),
182 (41, 50, "41-50"),
183 (51, 60, "51-60"),
184 (61, 100, "61+")
185 ]
186
187 # If age is a number, find its range
188 if isinstance(age, (int, float)):
189 age_num = int(age)
190 self.disp.log_debug(f"Age provided as number: {age_num}")
191 for min_age, max_age, range_str in age_ranges:
192 if min_age <= age_num <= max_age:
193 self.disp.log_debug(
194 f"Age {age_num} falls in range {range_str}")
195 return range_str
196 self.disp.log_debug(
197 f"Age {age_num} doesn't fall in any defined range")
198 return default
199
200 # If age is a string
201 if isinstance(age, str):
202 age_str = age.strip()
203 self.disp.log_debug(f"Age provided as string: '{age_str}'")
204
205 # Check if it's already a valid range
206 valid_range_strs = [r[2] for r in age_ranges]
207 if age_str in valid_range_strs:
208 self.disp.log_debug(
209 f"Age string '{age_str}' is already a valid range")
210 return age_str
211
212 # Try to parse as a number string
213 try:
214 age_num = int(float(age_str))
215 self.disp.log_debug(f"Age string parsed as number: {age_num}")
216 for min_age, max_age, range_str in age_ranges:
217 if min_age <= age_num <= max_age:
218 self.disp.log_debug(
219 f"Age {age_num} falls in range {range_str}")
220 return range_str
221 self.disp.log_debug(
222 f"Age {age_num} doesn't fall in any defined range")
223 except (ValueError, TypeError):
224 self.disp.log_debug(
225 f"Could not parse age string '{age_str}' as number")
226
227 return default
228
229 # If age is None or any other type
230 self.disp.log_debug(
231 "Age is either not in the list or not provided"
232 )
233 return default
234
235 async def post_register(self, request: Request) -> Response:
236 """Register a new user account.
237
238 Create a new account from provided ``email`` and ``password`` and
239 automatically log the user in on success.
240
241 Args:
242 request: The incoming FastAPI request with required fields.
243
244 Returns:
245 Response: A FastAPI response indicating success or a suitable
246 error code (e.g. conflict if the email already exists).
247 """
248 title = "Register"
249 request_body = await self.boilerplate_incoming_initialised.get_body(request)
250 self.disp.log_debug(f"Request body: {request_body}", title)
251 if not request_body or ("email" not in request_body and "password" not in request_body):
252 return self.boilerplate_responses_initialised.bad_request(title)
253 email: str = request_body["email"]
254 password: str = request_body["password"]
255 if "gender" not in request_body:
256 self.disp.log_warning(
257 "The gender field was not provided during registration."
258 )
259 if "age" not in request_body:
260 self.disp.log_warning(
261 "The age field was not provided during registration."
262 )
263 gender: str = self._check_gender(request_body.get("gender", ""))
264 age: str = self._check_age_range(request_body.get("age", None))
265 if not (email and password):
266 return self.boilerplate_responses_initialised.bad_request(title)
267 user_info = self.database_link.get_data_from_table(
268 CONST.TAB_ACCOUNTS, "*", f"email='{email}'"
269 )
270 account_creation_error = self.boilerplate_responses_initialised.build_response_body(
271 title=title,
272 message="Unable to create account. Please try again later.",
273 resp="registration_failed",
274 token=None,
275 error=True
276 )
277 if not isinstance(user_info, int):
278 for user in user_info:
279 if email in user["email"]:
280 return HCI.bad_request(account_creation_error)
281 else:
282 return HCI.bad_request(account_creation_error)
283 hashed_password = self.password_handling_initialised.hash_password(
284 password)
285 username = email.split('@')[0]
286 self.disp.log_debug(f"Username = {username}", title)
287 admin = int(False)
288 favicon = None
289 deletion_date = None
290 data: List[Union[str, int, float, None]] = [
291 username, email, hashed_password, "local", gender, age, favicon, admin, deletion_date
292 ]
293 self.disp.log_debug(f"Data list = {data}", title)
294 column = self.database_link.get_table_column_names(
295 CONST.TAB_ACCOUNTS
296 )
297 self.disp.log_debug(f"Column = {column}", title)
298 if isinstance(column, int):
299 return self.boilerplate_responses_initialised.internal_server_error(title)
300 column = CONST.clean_list(
301 column,
302 CONST.TABLE_COLUMNS_TO_IGNORE_USER,
303 self.disp
304 )
305 if self.database_link.insert_data_into_table(CONST.TAB_ACCOUNTS, data, column) == self.error:
306 return self.boilerplate_responses_initialised.internal_server_error(title)
307 login_data = self.boilerplate_incoming_initialised.log_user_in(
308 email
309 )
310 if login_data["status"] == self.error:
311 body = self.boilerplate_responses_initialised.build_response_body(
312 title=title,
313 message="Login failed.",
314 resp="error",
315 token=login_data["token"],
316 error=True
317 )
318 return HCI.forbidden(content=body, content_type=HTTP_DEFAULT_TYPE, headers=self.server_headers_initialised.for_json())
319 body = self.boilerplate_responses_initialised.build_response_body(
320 title=title,
321 message=f"Welcome {username}",
322 resp="success",
323 token=login_data["token"],
324 error=False
325 )
326 body["token"] = login_data["token"]
327 return HCI.success(
328 content=body,
329 content_type=HTTP_DEFAULT_TYPE,
330 headers=self.server_headers_initialised.for_json()
331 )
332
333 async def post_send_email_verification(self, request: Request) -> Response:
334 """Send an email verification code to a user.
335
336 Generate and store a verification code for the provided email and
337 send it via the configured mail management subsystem.
338
339 Args:
340 request: The incoming FastAPI request containing the ``email``
341 field.
342
343 Returns:
344 Response: A FastAPI response indicating whether the email was
345 sent successfully or an error occurred.
346 """
347 title = "Send e-mail verification"
348 request_body = await self.boilerplate_incoming_initialised.get_body(request)
349 self.disp.log_debug(f"Request body: {request_body}", title)
350 if not request_body or ("email") not in request_body:
351 return self.boilerplate_responses_initialised.bad_request(title)
352 email: str = request_body["email"]
353 data = self.database_link.get_data_from_table(
354 table=CONST.TAB_ACCOUNTS,
355 column="*",
356 where=f"email='{email}'",
357 beautify=True
358 )
359 if isinstance(data, int):
360 return self.boilerplate_responses_initialised.bad_request(title)
361 self.disp.log_debug(f"user query = {data}", title)
362 if data == self.error or len(data) == 0:
363 return self.boilerplate_responses_initialised.bad_request(title)
364 email_subject = "[Asperguide] Verification code"
365 code = self.boilerplate_non_http_initialised.generate_check_token(
366 CONST.CHECK_TOKEN_SIZE
367 )
368 expiration_time = self.boilerplate_non_http_initialised.set_lifespan(
369 CONST.EMAIL_VERIFICATION_DELAY
370 )
371 expiration_time_str = self.database_link.datetime_to_string(
372 expiration_time, False
373 )
374 new_node = {}
375 new_node['email'] = email
376 new_node['code'] = code
377 tab_column = self.database_link.get_table_column_names(
378 CONST.TAB_VERIFICATION)
379 if isinstance(tab_column, int):
380 return self.boilerplate_responses_initialised.bad_request(title)
381 if tab_column == self.error or len(tab_column) == 0:
382 return self.boilerplate_responses_initialised.internal_server_error(title)
383 tab_column = CONST.clean_list(
384 tab_column,
385 CONST.TABLE_COLUMNS_TO_IGNORE,
386 self.disp
387 )
388 self.database_link.remove_data_from_table(
389 CONST.TAB_VERIFICATION,
390 f"term='{email}'"
391 )
392 status = self.database_link.insert_data_into_table(
393 table=CONST.TAB_VERIFICATION,
394 data=[
395 email,
396 code,
397 self.database_link.datetime_to_string(
398 expiration_time, False, True
399 )
400 ],
401 column=tab_column
402 )
403 if status == self.error:
404 return self.boilerplate_responses_initialised.internal_server_error(title)
405 code_style = "background-color: lightgray;border: 2px lightgray solid;border-radius: 6px;color: black;font-weight: bold;padding: 5px;padding-top: 5px;padding-bottom: 5px;padding-top: 0px;padding-bottom: 0px;"
406 body = ""
407 body += "<p>The code is: "
408 body += f"<span style=\"{code_style}\">{code}</span></p>"
409 body += "<p>The code will be valid until "
410 body += f"<span style=\"{code_style}\">"
411 body += f"{expiration_time_str}</span>.</p>"
412 self.disp.log_debug(f"e-mail body: {body}", title)
413 status = self.mail_management_initialised.send_email(
414 email, email_subject, body
415 )
416 if status == self.error:
417 return self.boilerplate_responses_initialised.internal_server_error(title)
418 body = self.boilerplate_responses_initialised.build_response_body(
419 title=title,
420 message="Email send successfully.",
421 resp="success",
422 token=None,
423 error=False
424 )
425 return HCI.success(body)
426
427 async def put_reset_password(self, request: Request) -> Response:
428 """Reset a user's password using a verification code.
429
430 Validates the provided code for the email and updates the account password to the supplied new password on success.
431
432 Args:
433 request: The incoming FastAPI request containing ``email``, ``code`` and ``password``.
434
435 Returns:
436 Response: A FastAPI response indicating success or an error such as invalid verification code.
437 """
438 title = "Reset password"
439 request_body = await self.boilerplate_incoming_initialised.get_body(request)
440 self.disp.log_debug(f"Request body: {request_body}", title)
441 if not request_body or not all(key in request_body for key in ("email", "code", "password")):
442 return self.boilerplate_responses_initialised.bad_request(title)
443 body_email: str = request_body["email"]
444 body_code: str = request_body["code"]
445 body_password: str = request_body["password"]
446 verified_user: dict = {}
447 current_codes = self.database_link.get_data_from_table(
448 CONST.TAB_VERIFICATION,
449 column="*",
450 where=f"term='{body_email}'",
451 beautify=True
452 )
453 self.disp.log_debug(f"Current codes: {current_codes}", title)
454 nodes_of_interest = []
455 if isinstance(current_codes, int) and current_codes == self.error:
456 return self.boilerplate_responses_initialised.internal_server_error(title)
457 current_codes_list = []
458 if isinstance(current_codes, list):
459 current_codes_list: List[Dict[str, Any]] = current_codes
460 if len(current_codes_list) == 0:
461 return self.boilerplate_responses_initialised.internal_server_error(title)
462 for user in current_codes_list:
463 if user.get("term") == body_email and user.get("definition") == body_code:
464 verified_user = user
465 nodes_of_interest.append(user)
466 if not verified_user:
467 return self.boilerplate_responses_initialised.invalid_verification_code(title)
468 data: list = []
469 column: list = []
470 hashed_password = self.password_handling_initialised.hash_password(
471 body_password
472 )
473 data.append(hashed_password)
474 column.append("password")
475 status = self.database_link.update_data_in_table(
476 CONST.TAB_ACCOUNTS, data, column, f"email='{body_email}'"
477 )
478 if status == self.error:
479 return self.boilerplate_responses_initialised.internal_server_error(title)
480 self.disp.log_debug(f"Nodes found: {nodes_of_interest}", title)
481 for line in nodes_of_interest:
482 self.disp.log_debug(f"line removed: {line}", title)
483 self.database_link.remove_data_from_table(
484 CONST.TAB_VERIFICATION,
485 f"id='{line['id']}'"
486 )
487 response_body = self.boilerplate_responses_initialised.build_response_body(
488 title=title,
489 message="Password changed successfully.",
490 resp="success",
491 token=None,
492 error=False
493 )
494 return HCI.success(response_body, content_type=HTTP_DEFAULT_TYPE, headers=self.server_headers_initialised.for_json())
495
496 async def put_user(self, request: Request) -> Response:
497 """Replace a user's account information.
498
499 Requires a valid authentication token. Replaces username, email and
500 password with the submitted values.
501
502 Args:
503 request (Request): The incoming FastAPI request with ``username``,
504 ``email`` and ``password`` fields and authentication token.
505
506 Returns:
507 Response: A FastAPI response indicating update success or an
508 appropriate error response.
509 """
510 title = "Put user"
511 token_raw: Optional[str] = self.boilerplate_incoming_initialised.get_token_if_present(
512 request
513 )
514 if not token_raw:
515 return self.boilerplate_responses_initialised.no_access_token(title, token_raw)
516 token: str = token_raw
517 token_valid: bool = self.boilerplate_non_http_initialised.is_token_correct(
518 token
519 )
520 self.disp.log_debug(f"token = {token}, valid = {token_valid}", title)
521 if token_valid is False:
522 return self.boilerplate_responses_initialised.unauthorized(title, token)
523 request_body = await self.boilerplate_incoming_initialised.get_body(request)
524 self.disp.log_debug(f"Request body: {request_body}", title)
525 if not request_body or not all(key in request_body for key in ("username", "email", "password")):
526 return self.boilerplate_responses_initialised.bad_request(title)
527 body_username: str = request_body["username"]
528 body_email: str = request_body["email"]
529 body_password: str = request_body["password"]
530 usr_id = self.boilerplate_non_http_initialised.get_user_id_from_token(
531 title, token
532 )
533 if not usr_id:
534 return self.boilerplate_responses_initialised.internal_server_error(title, token)
535 if isinstance(usr_id, Response):
536 return usr_id
537 user_profile_raw: Union[int, List[Dict[str, Any]]] = self.database_link.get_data_from_table(
538 table=CONST.TAB_ACCOUNTS,
539 column="*",
540 where=f"id='{usr_id}'",
541 )
542 if isinstance(user_profile_raw, int):
543 return self.boilerplate_responses_initialised.internal_server_error(title, token)
544 user_profile: List[Dict[str, Any]] = user_profile_raw
545 self.disp.log_debug(f"User profile = {user_profile}", title)
546 if user_profile == self.error or len(user_profile) == 0:
547 return self.boilerplate_responses_initialised.user_not_found(title, token)
548 data: List[Optional[Union[str, int, float]]] = [
549 body_username,
550 body_email,
551 self.password_handling_initialised.hash_password(body_password),
552 user_profile[0]["method"],
553 user_profile[0]["favicon"],
554 str(user_profile[0]["admin"])
555 ]
556 status = self.boilerplate_non_http_initialised.update_user_data(
557 title, usr_id, data
558 )
559 if isinstance(status, Response):
560 return status
561 if not status:
562 return self.boilerplate_responses_initialised.internal_server_error(title, token)
563 data_response = self.boilerplate_responses_initialised.build_response_body(
564 title=title,
565 message="The account information has been updated.",
566 resp="success",
567 token=token,
568 error=False
569 )
570 return HCI.success(content=data_response, content_type=HTTP_DEFAULT_TYPE, headers=self.server_headers_initialised.for_json())
571
572 async def patch_user(self, request: Request) -> Response:
573 """Partially update a user's account information.
574
575 Only fields present in the request body are updated. Requires a
576 valid authentication token.
577
578 Args:
579 request (Request): The incoming FastAPI request which may include any of
580 ``username``, ``email`` or ``password``.
581
582 Returns:
583 Response: A FastAPI response indicating the update result.
584 """
585 title = "Patch user"
586 token_raw: Optional[str] = self.boilerplate_incoming_initialised.get_token_if_present(
587 request
588 )
589 if not token_raw:
590 return self.boilerplate_responses_initialised.no_access_token(title, token_raw)
591 token: str = token_raw
592 token_valid: bool = self.boilerplate_non_http_initialised.is_token_correct(
593 token
594 )
595 self.disp.log_debug(f"token = {token}, valid = {token_valid}", title)
596 if token_valid is False:
597 return self.boilerplate_responses_initialised.unauthorized(title, token)
598 request_body = await self.boilerplate_incoming_initialised.get_body(request)
599 self.disp.log_debug(f"Request body: {request_body}", title)
600 body_username: str = request_body.get("username", "")
601 body_email: str = request_body.get("email", "")
602 body_password: str = request_body.get("password", "")
603 usr_id = self.boilerplate_non_http_initialised.get_user_id_from_token(
604 title, token
605 )
606 if isinstance(usr_id, Response):
607 return usr_id
608 if not usr_id:
609 return self.boilerplate_responses_initialised.user_not_found(title, token)
610 user_profile_raw: Union[int, List[Dict[str, Any]]] = self.database_link.get_data_from_table(
611 table=CONST.TAB_ACCOUNTS,
612 column="*",
613 where=f"id='{usr_id}'",
614 )
615 if isinstance(user_profile_raw, int) or user_profile_raw == self.error:
616 return self.boilerplate_responses_initialised.user_not_found(title, token)
617 user_profile: List[Dict[str, Any]] = user_profile_raw
618 self.disp.log_debug(f"User profile = {user_profile}", title)
619 if len(user_profile) == 0:
620 return self.boilerplate_responses_initialised.user_not_found(title, token)
621 email: str = user_profile[0]["email"]
622 username: str = user_profile[0]["username"]
623 password: str = user_profile[0]["password"]
624 msg = f"body_username = {body_username}, body_email = {body_email}, "
625 msg += f"body_password = {body_password}, email = {email}, "
626 msg += f"username = {username}, password = {password}"
627 self.disp.log_debug(msg, title)
628 if body_username is not None:
629 username = body_username
630 self.disp.log_debug(f"username is now: {username}", title)
631 if body_email is not None:
632 email = body_email
633 self.disp.log_debug(f"email is now: {email}", title)
634 if body_password is not None:
635 password = self.password_handling_initialised.hash_password(
636 body_password
637 )
638 self.disp.log_debug(f"password is now: {password}", title)
639 data: List[Union[str, int, float, None]] = [
640 username, email, password,
641 user_profile[0]["method"], user_profile[0]["favicon"],
642 str(user_profile[0]["admin"])
643 ]
644 status = self.boilerplate_non_http_initialised.update_user_data(
645 title, usr_id, data
646 )
647 if isinstance(status, int) or not status:
648 return self.boilerplate_responses_initialised.update_failed(title, token)
649 if isinstance(status, Response):
650 return status
651 data_response = self.boilerplate_responses_initialised.build_response_body(
652 title=title,
653 message="The account information has been updated.",
654 resp="success",
655 token=token,
656 error=False
657 )
658 return HCI.success(content=data_response, content_type=HTTP_DEFAULT_TYPE, headers=self.server_headers_initialised.for_json())
659
660 async def get_user(self, request: Request) -> Response:
661 """Return the authenticated user's profile data.
662
663 Sensitive or banned fields are removed from the returned profile.
664
665 Args:
666 request (Request): The incoming FastAPI request carrying an auth token.
667
668 Returns:
669 Response: A FastAPI response with the user profile on success.
670 """
671 title = "Get user"
672 token_raw: Optional[str] = self.boilerplate_incoming_initialised.get_token_if_present(
673 request
674 )
675 if not token_raw:
676 return self.boilerplate_responses_initialised.no_access_token(title, token_raw)
677 token: str = token_raw
678 token_valid: bool = self.boilerplate_non_http_initialised.is_token_correct(
679 token
680 )
681 self.disp.log_debug(f"token = {token}, valid = {token_valid}", title)
682 if token_valid is False:
683 return self.boilerplate_responses_initialised.unauthorized(title, token)
684 usr_id = self.boilerplate_non_http_initialised.get_user_id_from_token(
685 title, token
686 )
687 self.disp.log_debug(f"user_id = {usr_id}", title)
688 if isinstance(usr_id, Response):
689 return usr_id
690 user_profile_raw: Union[int, List[Dict[str, Any]]] = self.database_link.get_data_from_table(
691 table=CONST.TAB_ACCOUNTS,
692 column="*",
693 where=f"id='{usr_id}'",
694 )
695 if isinstance(user_profile_raw, int):
696 return self.boilerplate_responses_initialised.internal_server_error(title, token)
697 user_profile: List[Dict[str, Any]] = user_profile_raw
698 self.disp.log_debug(f"User profile = {user_profile}", title)
699 if user_profile == self.error or len(user_profile) == 0:
700 return self.boilerplate_responses_initialised.user_not_found(title, token)
701 new_profile = user_profile[0]
702 for i in CONST.USER_INFO_BANNED:
703 if i in new_profile:
704 new_profile.pop(i)
705 if CONST.USER_INFO_ADMIN_NODE in new_profile:
706 new_profile[CONST.USER_INFO_ADMIN_NODE] = bool(
707 new_profile[CONST.USER_INFO_ADMIN_NODE]
708 )
709 data = self.boilerplate_responses_initialised.build_response_body(
710 title=title,
711 message="success",
712 resp=new_profile,
713 token=token,
714 error=False
715 )
716 return HCI.success(content=data, content_type=HTTP_DEFAULT_TYPE, headers=self.server_headers_initialised.for_json())
717
718 async def delete_user(self, request: Request) -> Response:
719 """Delete the authenticated user's account and related data.
720
721 Removes the user record and cleans up related tables (services,
722 actions, connections, OAuth sessions). Requires a valid auth token.
723
724 Args:
725 request (Request): The incoming FastAPI request containing the auth token.
726
727 Returns:
728 Response: A FastAPI response indicating deletion success or an
729 internal server error if cleanup fails.
730 """
731 title = "Delete user"
732 token_raw: Optional[str] = self.boilerplate_incoming_initialised.get_token_if_present(
733 request
734 )
735 if not token_raw:
736 return self.boilerplate_responses_initialised.no_access_token(title, token_raw)
737 token: str = token_raw
738 token_valid: bool = self.boilerplate_non_http_initialised.is_token_correct(
739 token
740 )
741 self.disp.log_debug(f"token = {token}, valid = {token_valid}", title)
742 if token_valid is False:
743 return self.boilerplate_responses_initialised.unauthorized(title, token)
744 usr_id = self.boilerplate_non_http_initialised.get_user_id_from_token(
745 title, token
746 )
747 self.disp.log_debug(f"user_id = {usr_id}", title)
748 if isinstance(usr_id, Response):
749 return usr_id
750 user_profile_raw: Union[int, List[Dict[str, Any]]] = self.database_link.get_data_from_table(
751 table=CONST.TAB_ACCOUNTS,
752 column="*",
753 where=f"id='{usr_id}'",
754 )
755 if isinstance(user_profile_raw, int):
756 return self.boilerplate_responses_initialised.internal_server_error(title, token)
757 user_profile: List[Dict[str, Any]] = user_profile_raw
758 self.disp.log_debug(f"User profile = {user_profile}", title)
759 if user_profile == self.error or len(user_profile) == 0:
760 return self.boilerplate_responses_initialised.user_not_found(title, token)
761 self.disp.log_warning("-------------------------------------")
762 self.disp.log_warning("-------------------------------------")
763 self.disp.log_warning("-------------------------------------")
764 self.disp.log_warning("-------------------------------------")
765 self.disp.log_warning(
766 "Check table section and make sure that it is up to date."
767 )
768 self.disp.log_warning("-------------------------------------")
769 self.disp.log_warning("-------------------------------------")
770 self.disp.log_warning("-------------------------------------")
771 self.disp.log_warning("-------------------------------------")
772 tables_of_interest = [
773 CONST.TAB_ACTIONS,
774 CONST.TAB_CONNECTIONS, CONST.TAB_ACTIVE_OAUTHS
775 ]
776 removal_status = self.boilerplate_non_http_initialised.remove_user_from_tables(
777 f"user_id={usr_id}", tables_of_interest
778 )
779 if isinstance(removal_status, int) or self.error in list(removal_status.values()):
780 return self.boilerplate_responses_initialised.internal_server_error(title, token)
781 status = self.database_link.remove_data_from_table(
782 CONST.TAB_ACCOUNTS, f"id={usr_id}"
783 )
784 if status == self.error:
785 return self.boilerplate_responses_initialised.internal_server_error(title, token)
786 data = self.boilerplate_responses_initialised.build_response_body(
787 title=title,
788 message="The account has successfully been deleted.",
789 resp="success",
790 token=token,
791 error=False
792 )
793 return HCI.success(content=data, content_type=HTTP_DEFAULT_TYPE, headers=self.server_headers_initialised.for_json())
794
795 async def post_logout(self, request: Request) -> Response:
796 """Log out the current user by removing the session token.
797
798 Requires a valid token; removes the connection record associated with the token and returns an appropriate response.
799
800 Args:
801 request: The incoming FastAPI request carrying the auth token.
802
803 Returns:
804 Response: A FastAPI response indicating logout success or an
805 internal server error.
806 """
807 title = "Logout"
808 self.disp.log_debug("Checking is a token is present", title)
809 token_raw: Optional[str] = self.boilerplate_incoming_initialised.get_token_if_present(
810 request
811 )
812 if not token_raw:
813 return self.boilerplate_responses_initialised.no_access_token(title, token_raw)
814 token: str = token_raw
815 self.disp.log_debug("Checking if the token is still valid", title)
816 token_valid: bool = self.boilerplate_non_http_initialised.is_token_correct(
817 token
818 )
819 self.disp.log_debug(f"token = {token}, valid = {token_valid}", title)
820 if not token_valid:
821 return self.boilerplate_responses_initialised.unauthorized(title, token)
822 self.disp.log_debug("Attempting to remove data from the table", title)
823 self.disp.log_debug(
824 f"token: {token}, table: {CONST.TAB_CONNECTIONS}", title
825 )
826 response = self.database_link.remove_data_from_table(
827 CONST.TAB_CONNECTIONS,
828 f"token='{token}'"
829 )
830 if response == self.error:
831 return self.boilerplate_responses_initialised.internal_server_error(title)
832 self.disp.log_debug(
833 "Informing the user that the removal was a success", title
834 )
835 data = self.boilerplate_responses_initialised.build_response_body(
836 title=title,
837 message="You have successfully logged out...",
838 resp="success",
839 token=token,
840 error=False
841 )
842 return HCI.success(content=data, content_type=HTTP_DEFAULT_TYPE, headers=self.server_headers_initialised.for_json())
843
844 async def get_user_id(self, request: Request) -> Response:
845 """Return the numerical id of the authenticated user.
846
847 Validates the provided token and returns the user id in the response body.
848
849 Args:
850 request (Request): The incoming FastAPI request containing the auth token.
851
852 Returns:
853 Response: A FastAPI response with the user's id on success.
854 """
855 title = "Get user id"
856 token_raw: Optional[str] = self.boilerplate_incoming_initialised.get_token_if_present(
857 request
858 )
859 if not token_raw:
860 return self.boilerplate_responses_initialised.no_access_token(title, token_raw)
861 token: str = token_raw
862 token_valid: bool = self.boilerplate_non_http_initialised.is_token_correct(
863 token
864 )
865 self.disp.log_debug(f"token = {token}, valid = {token_valid}", title)
866 if token_valid is False:
867 return self.boilerplate_responses_initialised.unauthorized(title, token)
868 usr_id = self.boilerplate_non_http_initialised.get_user_id_from_token(
869 title, token
870 )
871 self.disp.log_debug(f"user_id = {usr_id}", title)
872 if usr_id == self.error or isinstance(usr_id, list) and len(usr_id) == 0:
873 return self.boilerplate_responses_initialised.user_not_found(title, token)
874 if isinstance(usr_id, Response):
875 return usr_id
876 data = self.boilerplate_responses_initialised.build_response_body(
877 title=title,
878 message=f"Your id is {usr_id}",
879 resp=usr_id,
880 token=token,
881 error=False
882 )
883 return HCI.success(content=data, content_type=HTTP_DEFAULT_TYPE, headers=self.server_headers_initialised.for_json())
None __init__(self, int error=84, int success=0, bool debug=False)