Cat Feeder  1.0.0
The Cat feeder project
Loading...
Searching...
No Matches
oauth_authentication.py
Go to the documentation of this file.
1r"""
2# +==== BEGIN CatFeeder =================+
3# LOGO:
4# ..............(..../\
5# ...............)..(.')
6# ..............(../..)
7# ...............\‍(__)|
8# Inspired by Joan Stark
9# source https://www.asciiart.eu/
10# animals/cats
11# /STOP
12# PROJECT: CatFeeder
13# FILE: oauth_authentication.py
14# CREATION DATE: 11-10-2025
15# LAST Modified: 14:53:3 19-12-2025
16# DESCRIPTION:
17# This is the backend server in charge of making the actual website work.
18# /STOP
19# COPYRIGHT: (c) Cat Feeder
20# PURPOSE: The file that contains all the methods for the OAuth authentication.
21# // AR
22# +==== END CatFeeder =================+
23"""
24
25
26import uuid
27from datetime import datetime, timedelta
28from typing import Union, Dict
29import requests
30from fastapi import Response, Request
31from display_tty import Disp, initialise_logger
32from . import constants as CONST
33from ..server_header import ServerHeaders
34from ..sql import SQL
35from ..core import RuntimeManager, RI
36from ..http_codes import HCI, HTTP_DEFAULT_TYPE
37from ..boilerplates import BoilerplateIncoming, BoilerplateNonHTTP, BoilerplateResponses
38
39
41 """
42 The class that handle the oauth authentication
43 """
44
45 disp: Disp = initialise_logger(__qualname__, False)
46
47 def __init__(self, success: int = 0, error: int = 84, debug: bool = False) -> None:
48 """_summary_
49 The constructor of the OAuth authentication class
50
51 Args:
52 runtime_data (RuntimeData): _description_: The initialised version of the runtime_data class.
53 success (int, optional): _description_. Defaults to 0.: The status code for success.
54 error (int, optional): _description_. Defaults to 84.: The status code for error.
55 debug (bool, optional): _description_. Defaults to False.: The info on if to activate debug mode.
56 """
57 # ------------------------ The logging function ------------------------
58 self.disp.update_disp_debug(debug)
59 self.disp.log_debug("Initialising...")
60 # -------------------------- Inherited values --------------------------
61 self.debug: bool = debug
62 self.success: int = success
63 self.error: int = error
64 self.runtime_manager_initialised: RuntimeManager = RI
65 # -------------------------- Shared instances --------------------------
68 BoilerplateNonHTTP
69 )
71 BoilerplateResponses)
73 BoilerplateIncoming)
75 ServerHeaders)
76 self.disp.log_debug("Initialised")
77
78 def _generate_oauth_authorization_url(self, provider: str) -> Union[int, str]:
79 """
80 Generate an OAuth authorization url depends on the given provider
81 """
82 title = "generate_oauth_authorization_url"
83 retrived_provider = self.database_link.get_data_from_table(
84 CONST.TAB_USER_OAUTH_CONNECTION,
85 "*",
86 f"provider_name='{provider}'"
87 )
88 self.disp.log_debug(f"Retrived provider: {retrived_provider}", title)
89 if isinstance(retrived_provider, int):
90 self.disp.log_error("Unknown or Unsupported OAuth provider", title)
91 return self.error
92 base_url = retrived_provider[0]["authorisation_base_url"]
93 client_id = retrived_provider[0]["client_id"]
94 scope = retrived_provider[0]["provider_scope"]
95 redirect_uri = CONST.REDIRECT_URI
96 state = str(uuid.uuid4())
97 columns = self.database_link.get_table_column_names(
98 CONST.TAB_VERIFICATION)
99 self.disp.log_debug(f"Columns list: {columns}", title)
100 if isinstance(columns, int):
101 return self.error
102 columns.pop(0)
103 expiration_time = self.boilerplate_non_http_initialised.set_lifespan(
104 CONST.EMAIL_VERIFICATION_DELAY)
105 et_str = self.database_link.datetime_to_string(
106 expiration_time, False)
107 self.disp.log_debug(f"Expiration time: {et_str}", et_str)
108 data: list = []
109 data.append("state")
110 data.append(state)
111 data.append(et_str)
112 if self.database_link.insert_data_into_table(CONST.TAB_VERIFICATION, data, columns) == self.error:
113 return self.error
114 state += ":"
115 state += provider
116 if provider == "google":
117 url = f"{base_url}?access_type=offline&client_id={client_id}&redirect_uri={redirect_uri}&prompt=consent"
118 else:
119 url = f"{base_url}?client_id={client_id}&redirect_uri={redirect_uri}"
120 url += f"&response_type=code&scope={scope}&state={state}"
121 url = url.replace(" ", "%20")
122 url = url.replace(":", "%3A")
123 url = url.replace("/", "%2F")
124 url = url.replace("?", "%3F")
125 url = url.replace("&", "%26")
126 self.disp.log_debug(f"url = {url}", title)
127 return url
128
129 def _exchange_code_for_token(self, provider: str, code: str):
130 """
131 Exchange the OAuth authorization code for an access token
132 """
133 title = "exchange_code_for_token"
134
135 retrieved_provider = self.database_link.get_data_from_table(
136 CONST.TAB_USER_OAUTH_CONNECTION,
137 "*",
138 f"provider_name='{provider}'"
139 )
140 if isinstance(retrieved_provider, int):
141 return self.boilerplate_responses_initialised.build_response_body(
142 "exchange_code_for_token",
143 "Internal server error.",
144 "Internal server error.",
145 None,
146 True
147 )
148 headers: dict = {}
149 headers["Accept"] = "application/json"
150 headers["Content-Type"] = "application/x-www-form-urlencoded"
151 token_url = retrieved_provider[0]["token_grabber_base_url"]
152
153 data: dict = {}
154 data["client_id"] = retrieved_provider[0]["client_id"]
155 data["client_secret"] = retrieved_provider[0]["client_secret"]
156 data["code"] = code
157 data["redirect_uri"] = CONST.REDIRECT_URI
158 data["grant_type"] = "authorization_code"
159 try:
160 response = requests.post(
161 token_url, data=data, headers=headers, timeout=10
162 )
163 self.disp.log_debug(f"Exchange response = {response}", title)
164 response.raise_for_status()
165 token_response = response.json()
166 if "error" in token_response:
167 msg = "OAuth error: "
168 msg += f"{token_response['error_description']}"
169 self.disp.log_error(msg, title)
170 return self.boilerplate_responses_initialised.build_response_body(
171 "exchange_code_for_token",
172 "Failed to get the token",
173 f"{token_response['error']}",
174 None,
175 True
176 )
177 return token_response
178 except requests.RequestException as e:
179 self.disp.log_error(f"RequestException: {str(e)}", title)
180 return self.boilerplate_responses_initialised.build_response_body(
181 "exchange_code_for_token",
182 "HTTP request failed.",
183 f"{str(e)}",
184 None,
185 True
186 )
187 except ValueError:
188 self.disp.log_error("Failed to parse response JSON.", title)
189 return self.boilerplate_responses_initialised.build_response_body(
190 "exchange_code_for_token",
191 "Invalid JSON response from provider.",
192 "Invalid JSON",
193 None,
194 True
195 )
196
197 def _get_user_info(self, provider: str, access_token: str):
198 """
199 Get a user information depending
200 """
201 title: str = "get_user_info"
202 retrieved_data = self.database_link.get_data_from_table(
203 CONST.TAB_USER_OAUTH_CONNECTION,
204 "*",
205 f"provider_name='{provider}'"
206 )
207 self.disp.log_debug(
208 f"Retrieved oauth provider data: {retrieved_data}", title)
209 if isinstance(retrieved_data, int):
210 return self.boilerplate_responses_initialised.build_response_body(
211 "get_user_info",
212 "Failed to fetch the OAuth provider information.",
213 "Failed to fetch the OAuth provider information.",
214 None,
215 True
216 )
217 headers = {
218 "Authorization": f"Bearer {access_token}"
219 }
220 user_info_url = retrieved_data[0]["user_info_base_url"]
221 self.disp.log_debug(f"User info headers: {headers}", title)
222 self.disp.log_debug(f"User info url: {user_info_url}", title)
223 response = requests.get(user_info_url, headers=headers, timeout=10)
224 if response.status_code != 200:
225 return self.boilerplate_responses_initialised.build_response_body(
226 "get_user_info",
227 "Failed to retrieve the user email from the provider",
228 response,
229 None,
230 True
231 )
232 user_info = response.json()
233 self.disp.log_debug(f"User info: {user_info}", title)
234 if provider == "github":
235 for _, info in enumerate(user_info):
236 if info["primary"]:
237 email: dict = {}
238 email["email"] = info["email"]
239 return email
240 return user_info
241
242 def _oauth_user_logger(self, user_info: Dict, provider: str, connection_data: list) -> Response:
243 """
244 The function to insert or update the user information in the database
245 """
246 title: str = "oauth_user_logger"
247 email: str = user_info["email"]
248 retrieved_user = self.database_link.get_data_from_table(
249 CONST.TAB_ACCOUNTS,
250 "*",
251 f"email='{email}'"
252 )
253 self.disp.log_debug(f"Retrieved user: {retrieved_user}", title)
254 if isinstance(retrieved_user, int) is False:
255 retrieved_provider = self.database_link.get_data_from_table(
256 CONST.TAB_SERVICES,
257 "*",
258 f"name='{provider}'"
259 )
260 msg = "Retrieved provider: "
261 msg += f"{retrieved_provider}"
262 self.disp.log_debug(msg, title)
263 if isinstance(retrieved_user, int):
264 return self.boilerplate_responses_initialised.internal_server_error(title, None)
265 connection_data.append(str(retrieved_provider[0]["id"]))
266 connection_data.append(str(retrieved_user[0]["id"]))
267 self.disp.log_debug(f"Connection data: {connection_data}", title)
268
269 provider_id = str(retrieved_provider[0]["id"])
270 user_id = str(retrieved_user[0]["id"])
271 if isinstance(self.database_link.get_data_from_table(
272 CONST.TAB_ACTIVE_OAUTHS,
273 "*",
274 f"service_id='{provider_id}' AND user_id='{user_id}'"
275 ), int):
276 columns = self.database_link.get_table_column_names(
277 CONST.TAB_ACTIVE_OAUTHS)
278 if isinstance(columns, int):
279 return self.boilerplate_responses_initialised.internal_server_error(title, None)
280 columns.pop(0)
281 self.disp.log_debug(f"Columns list = {columns}", title)
282 if self.database_link.insert_data_into_table(
283 CONST.TAB_ACTIVE_OAUTHS,
284 connection_data,
285 columns
286 ) == self.error:
287 return self.boilerplate_responses_initialised.internal_server_error(title, None)
288 user_data = self.boilerplate_incoming_initialised.log_user_in(
289 email
290 )
291 if user_data["status"] == self.error:
292 return self.boilerplate_responses_initialised.internal_server_error(title, None)
293 body = self.boilerplate_responses_initialised.build_response_body(
294 title=title,
295 message="User successfully logged in.",
296 resp="success.",
297 token=user_data["token"],
298 error=False
299 )
300 body["token"] = user_data["token"]
301 return HCI.accepted(
302 body,
303 content_type=HTTP_DEFAULT_TYPE,
304 headers=self.server_headers_initialised.for_json()
305 )
306 columns = self.database_link.get_table_column_names(
307 CONST.TAB_ACCOUNTS)
308 if isinstance(columns, int):
309 return self.boilerplate_responses_initialised.internal_server_error(title, None)
310 columns.pop(0)
311 self.disp.log_debug(f"Columns list = {columns}", title)
312 username: str = email.split('@')[0]
313 user_data: list = []
314 user_data.append(username)
315 user_data.append(email)
316 user_data.append(provider)
317 user_data.append(provider)
318 user_data.append("NULL")
319 user_data.append(str(int(False)))
320 self.disp.log_debug(f"Data list = {user_data}", title)
321 if self.database_link.insert_data_into_table(CONST.TAB_ACCOUNTS, user_data, columns) == self.error:
322 return self.boilerplate_responses_initialised.internal_server_error(title, None)
323 retrieved_user = self.database_link.get_data_from_table(
324 CONST.TAB_ACCOUNTS,
325 "*",
326 f"email='{email}'"
327 )
328 self.disp.log_debug(f"Retrieved user: {retrieved_user}", title)
329 if isinstance(retrieved_user, int):
330 return self.boilerplate_responses_initialised.internal_server_error(title, None)
331 retrieved_provider = self.database_link.get_data_from_table(
332 CONST.TAB_SERVICES,
333 "*",
334 f"name='{provider}'"
335 )
336 self.disp.log_debug(f"Retrieved provider: {retrieved_provider}", title)
337 if isinstance(retrieved_user, int):
338 return self.boilerplate_responses_initialised.internal_server_error(title, None)
339 connection_data.append(str(retrieved_provider[0]["id"]))
340 connection_data.append(str(retrieved_user[0]["id"]))
341 self.disp.log_debug(f"Connection data: {connection_data}", title)
342 columns = self.database_link.get_table_column_names(
343 CONST.TAB_ACTIVE_OAUTHS)
344 if isinstance(columns, int):
345 return self.boilerplate_responses_initialised.internal_server_error(title, None)
346 columns.pop(0)
347 self.disp.log_debug(f"Columns list = {columns}", title)
348 if self.database_link.insert_data_into_table(
349 CONST.TAB_ACTIVE_OAUTHS,
350 connection_data,
351 columns
352 ) == self.error:
353 return self.boilerplate_responses_initialised.internal_server_error(title, None)
354 user_data = self.boilerplate_incoming_initialised.log_user_in(
355 email)
356 if user_data["status"] == self.error:
357 return self.boilerplate_responses_initialised.internal_server_error(title, None)
358 body = self.boilerplate_responses_initialised.build_response_body(
359 title=title,
360 message="User successfully logged in.",
361 resp="success.",
362 token=user_data["token"],
363 error=False
364 )
365 body["token"] = user_data["token"]
366 return HCI.accepted(
367 body,
368 content_type=HTTP_DEFAULT_TYPE,
369 headers=self.server_headers_initialised.for_json()
370 )
371
372 def _handle_token_response(self, token_response: Dict, provider: str) -> Response:
373 """
374 The function that handle the response given by the provider for the oauth token
375 """
376 title = "handle_token_response"
377 data: list = []
378 access_token: str = token_response["access_token"]
379 if not access_token:
380 return self.boilerplate_responses_initialised.no_access_token(title, None)
381 data.append(access_token)
382 self.disp.log_debug(f"Gotten access token: {access_token}", title)
383 if provider == "github":
384 data.append(
385 self.database_link.datetime_to_string(
386 datetime.now()
387 )
388 )
389 data.append("0")
390 data.append("NULL")
391 else:
392 expires: int = token_response["expires_in"]
393 if not expires:
394 body = self.boilerplate_responses_initialised.build_response_body(
395 title,
396 "The expiration time was not found in the provided response.",
397 "Expiration time not found.",
398 token=access_token,
399 error=True
400 )
401 return HCI.bad_request(
402 body,
403 content_type=HTTP_DEFAULT_TYPE,
404 headers=self.server_headers_initialised.for_json()
405 )
406 current_time = datetime.now()
407 new_time = current_time + timedelta(seconds=expires)
408 expiration_date = self.database_link.datetime_to_string(
409 new_time
410 )
411 data.append(expiration_date)
412 data.append(str(expires))
413 refresh_link = token_response["refresh_token"]
414 if not refresh_link:
415 body = self.boilerplate_responses_initialised.build_response_body(
416 title,
417 "The refresh link was not found in the provided response.",
418 "Refresh link not found.",
419 token=access_token,
420 error=True
421 )
422 return HCI.bad_request(
423 body,
424 content_type=HTTP_DEFAULT_TYPE,
425 headers=self.server_headers_initialised.for_json()
426 )
427 data.append(refresh_link)
428 self.disp.log_debug(
429 f"Generated data for new oauth connexion user: {data}", title)
430 user_info = self._get_user_info(provider, access_token)
431 if "error" in user_info:
432 body = self.boilerplate_responses_initialised.build_response_body(
433 title,
434 user_info["error"],
435 "internal error",
436 token=access_token,
437 error=True
438 )
439 return HCI.internal_server_error(
440 body,
441 content_type=HTTP_DEFAULT_TYPE,
442 headers=self.server_headers_initialised.for_json()
443 )
444 return self._oauth_user_logger(user_info, provider, data)
445
446 def refresh_token(self, provider_name: str, refresh_link: str) -> Union[str, None]:
447 """
448 The function that use the given provider name and refresh link to generate a new token for oauth authentication
449 """
450 title: str = "refresh_token"
451 if any(word in provider_name for word in ("google", "discord", "spotify")):
452 retrieved_data = self.database_link.get_data_from_table(
453 CONST.TAB_USER_OAUTH_CONNECTION,
454 "*",
455 f"provider_name='{provider_name}'"
456 )
457 msg = "Retrieved provider data:"
458 msg += f"{retrieved_data}"
459 self.disp.log_debug(msg, title)
460 if isinstance(retrieved_data, int):
461 self.disp.log_error(
462 "An error has been detected when retrieving the provider data", title
463 )
464 return None
465 token_url: str = retrieved_data[0]["token_grabber_base_url"]
466 generated_data: dict = {}
467 generated_data["client_id"] = retrieved_data[0]["client_id"]
468 generated_data["client_secret"] = retrieved_data[0]["client_secret"]
469 generated_data["refresh_token"] = refresh_link
470 generated_data["grant_type"] = "refresh_token"
471 self.disp.log_debug(f"Generated data: {generated_data}", title)
472 provider_response: Response = requests.post(
473 token_url, data=generated_data, timeout=10
474 )
475 self.disp.log_debug(
476 f"Provider response: {provider_response}", title)
477 if provider_response.status_code == 200:
478 token_response = provider_response.json()
479 msg = "Provider response to json: "
480 msg += f"{token_response}"
481 self.disp.log_debug(msg, title)
482 if "access_token" in token_response:
483 return token_response["access_token"]
484 else:
485 return None
486 self.disp.log_error("The provider is not recognised", title)
487 return None
488
489 async def oauth_callback(self, request: Request) -> Response:
490 """
491 Callback of the OAuth login
492 """
493 title = "oauth_callback"
494 query_params = request.query_params
495 if not query_params:
496 body = self.boilerplate_responses_initialised.build_response_body(
497 title=title,
498 message="Query parameters not provided.",
499 resp="no query parameters",
500 token=None,
501 error=True
502 )
503 return HCI.bad_request(
504 body,
505 content_type=HTTP_DEFAULT_TYPE,
506 headers=self.server_headers_initialised.for_json()
507 )
508 self.disp.log_debug(f"Query params: {query_params}", title)
509 code = query_params.get("code")
510 self.disp.log_debug(f"Code: {code}", title)
511 state = query_params.get("state")
512 self.disp.log_debug(f"State: {state}", title)
513 if not code or not state:
514 body = self.boilerplate_responses_initialised.build_response_body(
515 title=title,
516 message="Authorization code or state not provided.",
517 resp="no code or state",
518 token=None,
519 error=True
520 )
521 return HCI.bad_request(
522 body,
523 content_type=HTTP_DEFAULT_TYPE,
524 headers=self.server_headers_initialised.for_json()
525 )
526 uuid_gotten, provider = state.split(":")
527 if not uuid_gotten or not provider:
528 body = self.boilerplate_responses_initialised.build_response_body(
529 title=title,
530 message="The state is in bad format.",
531 resp="bad state format",
532 token=None,
533 error=True
534 )
535 return HCI.bad_request(
536 body,
537 content_type=HTTP_DEFAULT_TYPE,
538 headers=self.server_headers_initialised.for_json()
539 )
540 self.disp.log_debug(f"Uuid retrived: {uuid_gotten}", title)
541 self.disp.log_debug(f"Provider: {provider}", title)
542 data = self.database_link.get_data_from_table(
543 CONST.TAB_VERIFICATION,
544 "*",
545 f"definition='{uuid_gotten}'"
546 )
547 self.disp.log_debug(f"Data received: {data}", title)
548 if isinstance(data, int):
549 return self.boilerplate_responses_initialised.internal_server_error(title, None)
550 if isinstance(self.database_link.drop_data_from_table(
551 CONST.TAB_VERIFICATION,
552 f"definition='{uuid_gotten}'"
553 ), int) is False:
554 return self.boilerplate_responses_initialised.internal_server_error(title, None)
555 token_response = self._exchange_code_for_token(provider, code)
556 self.disp.log_debug(f"Token response: {token_response}", title)
557 if "error" in token_response:
558 body = self.boilerplate_responses_initialised.build_response_body(
559 title=title,
560 message="Failed to get the token.",
561 resp=token_response["error"],
562 token=None,
563 error=True
564 )
565 return HCI.bad_request(
566 body,
567 content_type=HTTP_DEFAULT_TYPE,
568 headers=self.server_headers_initialised.for_json()
569 )
570 return self._handle_token_response(token_response, provider)
571
572 async def oauth_login(self, request: Request) -> Response:
573 """
574 Get the authorization url for the OAuth login depending on the provider
575 """
576 title = "oauth_login"
577 request_body = await self.boilerplate_incoming_initialised.get_body(request)
578 self.disp.log_debug(f"Request body: {request_body}", title)
579 if not request_body or "provider" not in request_body:
580 body = self.boilerplate_responses_initialised.build_response_body(
581 title=title,
582 message="The provider is not found in the request.",
583 resp="no provider",
584 token=None,
585 error=True
586 )
587 return HCI.bad_request(
588 body,
589 content_type=HTTP_DEFAULT_TYPE,
590 headers=self.server_headers_initialised.for_json()
591 )
592 provider = request_body["provider"]
593 self.disp.log_debug(f"Oauth login provider: {provider}", title)
594 authorization_url = self._generate_oauth_authorization_url(provider)
595 self.disp.log_debug(f"Authorization url: {authorization_url}", title)
596 if isinstance(authorization_url, int):
597 return self.boilerplate_responses_initialised.internal_server_error(title, None)
598 body = self.boilerplate_responses_initialised.build_response_body(
599 title=title,
600 message="Authorization url successfully generated.",
601 resp="success",
602 token=None,
603 error=False
604 )
605 body["authorization_url"] = authorization_url
606 return HCI.success(
607 body,
608 content_type=HTTP_DEFAULT_TYPE,
609 headers=self.server_headers_initialised.for_json()
610 )
611
612 async def add_oauth_provider(self, request: Request, provider: str = "") -> Response:
613 """
614 Add a new oauth provider to the database (only for admin)
615 """
616 title: str = "add_oauth_provider"
617 if not provider:
618 body = self.boilerplate_responses_initialised.build_response_body(
619 title=title,
620 message="The provider is not provided.",
621 resp="no provider",
622 token=None,
623 error=True
624 )
625 return HCI.bad_request(
626 body,
627 content_type=HTTP_DEFAULT_TYPE,
628 headers=self.server_headers_initialised.for_json()
629 )
630 self.disp.log_debug(f"Provider: {provider}", title)
631 token: str = self.boilerplate_incoming_initialised.get_token_if_present(
632 request
633 )
634 if self.boilerplate_non_http_initialised.is_token_admin(token) is False:
635 self.disp.log_error("You're not admin.", title)
636 return self.boilerplate_responses_initialised.insuffisant_rights(title, token)
637 retrived_data = self.database_link.get_data_from_table(
638 CONST.TAB_USER_OAUTH_CONNECTION,
639 "*",
640 f"provider_name='{provider}'"
641 )
642 if isinstance(retrived_data, int) is False:
643 body = self.boilerplate_responses_initialised.build_response_body(
644 title=title,
645 message="The provider already exist in the database.",
646 resp="provider already exist",
647 token=token,
648 error=True
649 )
650 return HCI.conflict(
651 body,
652 content_type=HTTP_DEFAULT_TYPE,
653 headers=self.server_headers_initialised.for_json()
654 )
655 request_body = await self.boilerplate_incoming_initialised.get_body(request)
656 if not request_body or not all(key in request_body for key in ("client_id", "client_secret", "provider_scope", "authorisation_base_url", "token_grabber_base_url", "user_info_base_url")):
657 body = self.boilerplate_responses_initialised.build_response_body(
658 title=title,
659 message="A variable is missing in the body.",
660 resp="missing variable",
661 token=token,
662 error=True
663 )
664 return HCI.bad_request(
665 body,
666 content_type=HTTP_DEFAULT_TYPE,
667 headers=self.server_headers_initialised.for_json()
668 )
669 self.disp.log_debug(f"Request body: {request_body}", title)
670 client_id = request_body["client_id"]
671 client_secret = request_body["client_secret"]
672 provider_scope = request_body["provider_scope"]
673 authorisation_base_url = request_body["authorisation_base_url"]
674 token_grabber_base_url = request_body["token_grabber_base_url"]
675 user_info_base_url = request_body["user_info_base_url"]
676 data: list = [
677 provider,
678 client_id,
679 client_secret,
680 provider_scope,
681 authorisation_base_url,
682 token_grabber_base_url,
683 user_info_base_url
684 ]
685 columns = self.database_link.get_table_column_names(
686 CONST.TAB_USER_OAUTH_CONNECTION
687 )
688 if isinstance(columns, int):
689 return self.boilerplate_responses_initialised.internal_server_error(title, token)
690 columns.pop(0)
691 self.disp.log_debug(f"Columns: {columns}", title)
692 if self.database_link.insert_data_into_table(CONST.TAB_USER_OAUTH_CONNECTION, data, columns) == self.error:
693 return self.boilerplate_responses_initialised.internal_server_error(title, token)
694 oauth: list = ["1"]
695 column: list = ["oauth"]
696 if self.database_link.update_data_in_table(
697 CONST.TAB_SERVICES,
698 oauth,
699 column,
700 f"name='{provider}'"
701 ) == self.error:
702 return self.boilerplate_responses_initialised.internal_server_error(title, token)
703 body = self.boilerplate_responses_initialised.build_response_body(
704 title=title,
705 message="The provider is successfully added.",
706 resp="success",
707 token=token,
708 error=False
709 )
710 return HCI.success(
711 body,
712 content_type=HTTP_DEFAULT_TYPE,
713 headers=self.server_headers_initialised.for_json()
714 )
715
716 async def update_oauth_provider_data(self, request: Request, provider: str) -> Response:
717 """
718 The function that modify every value of an oauth provider
719 """
720 title: str = "update_oauth_provider_data"
721 if not provider:
722 return self.boilerplate_responses_initialised.provider_not_given(title, None)
723 self.disp.log_debug(f"Provider: {provider}", title)
724 token: str = self.boilerplate_incoming_initialised.get_token_if_present(
725 request
726 )
727 self.disp.log_debug(f"Token received: {token}", title)
728 if self.boilerplate_non_http_initialised.is_token_admin(token) is False:
729 self.disp.log_error("You're not admin.", title)
730 return self.boilerplate_responses_initialised.insuffisant_rights(title, token)
731 retrived_data = self.database_link.get_data_from_table(
732 CONST.TAB_USER_OAUTH_CONNECTION,
733 "*",
734 f"provider_name='{provider}'")
735 if isinstance(retrived_data, int):
736 return self.boilerplate_responses_initialised.internal_server_error(title, None)
737 request_body = await self.boilerplate_incoming_initialised.get_body(request)
738 if not request_body or not all(key in request_body for key in ("client_id", "client_secret", "provider_scope", "authorisation_base_url", "token_grabber_base_url", "user_info_base_url")):
739 body = self.boilerplate_responses_initialised.build_response_body(
740 title=title,
741 message="A variable is missing in the body.",
742 resp="missing variable",
743 token=token,
744 error=True
745 )
746 return HCI.bad_request(
747 body,
748 content_type=HTTP_DEFAULT_TYPE,
749 headers=self.server_headers_initialised.for_json()
750 )
751 self.disp.log_debug(f"Request body: {request_body}", title)
752 client_id = request_body["client_id"]
753 client_secret = request_body["client_secret"]
754 provider_scope = request_body["provider_scope"]
755 authorisation_base_url = request_body["authorisation_base_url"]
756 token_grabber_base_url = request_body["token_grabber_base_url"]
757 user_info_base_url = request_body["user_info_base_url"]
758 data: list = [
759 client_id,
760 client_secret,
761 provider_scope,
762 authorisation_base_url,
763 token_grabber_base_url,
764 user_info_base_url
765 ]
766 self.disp.log_debug(f"Generated data: {data}", title)
767 columns: list = self.database_link.get_table_column_names(
768 CONST.TAB_USER_OAUTH_CONNECTION
769 )
770 if isinstance(columns, int):
771 return self.boilerplate_responses_initialised.internal_server_error(title, None)
772 columns.pop(0)
773 columns.pop(0)
774 self.disp.log_debug(f"Columns: {columns}", title)
775 if self.database_link.update_data_in_table(
776 CONST.TAB_USER_OAUTH_CONNECTION,
777 data,
778 columns,
779 f"provider_name='{provider}'"
780 ) == self.error:
781 return self.boilerplate_responses_initialised.internal_server_error(title, None)
782 body = self.boilerplate_responses_initialised.build_response_body(
783 title=title,
784 message="The provider is successfully updated.",
785 resp="success",
786 token=token,
787 error=False
788 )
789 return HCI.success(
790 body,
791 content_type=HTTP_DEFAULT_TYPE,
792 headers=self.server_headers_initialised.for_json()
793 )
794
795 async def patch_oauth_provider_data(self, request: Request, provider: str) -> Response:
796 """
797 The function that modify every value of an oauth provider
798 """
799 title: str = "update_oauth_provider_data"
800 if not provider:
801 return self.boilerplate_responses_initialised.provider_not_given(title, None)
802 self.disp.log_debug(f"Provider: {provider}", title)
803 token: str = self.boilerplate_incoming_initialised.get_token_if_present(
804 request
805 )
806 self.disp.log_debug(f"Token gotten: {token}", title)
807 if self.boilerplate_non_http_initialised.is_token_admin(token) is False:
808 self.disp.log_error("You're not admin.", title)
809 return self.boilerplate_responses_initialised.insuffisant_rights(title, token)
810 retrived_data = self.database_link.get_data_from_table(
811 CONST.TAB_USER_OAUTH_CONNECTION,
812 "*",
813 f"provider_name='{provider}'"
814 )
815 if isinstance(retrived_data, int):
816 return self.boilerplate_responses_initialised.internal_server_error(title, token)
817 request_body = await self.boilerplate_incoming_initialised.get_body(request)
818 if not request_body:
819 return self.boilerplate_responses_initialised.missing_variable_in_body(title, token)
820 self.disp.log_debug(f"Request body: {request_body}", title)
821 if "provider_name" in request_body:
822 if self.boilerplate_non_http_initialised.update_single_data(
823 CONST.TAB_USER_OAUTH_CONNECTION,
824 "provider_name",
825 "provider_name",
826 provider,
827 request_body
828 ) == self.error:
829 return self.boilerplate_responses_initialised.internal_server_error(title, token)
830 if "client_id" in request_body:
831 if self.boilerplate_non_http_initialised.update_single_data(
832 CONST.TAB_USER_OAUTH_CONNECTION,
833 "provider_name",
834 "client_id",
835 provider,
836 request_body
837 ) == self.error:
838 return self.boilerplate_responses_initialised.internal_server_error(title, token)
839 if "client_secret" in request_body:
840 if self.boilerplate_non_http_initialised.update_single_data(
841 CONST.TAB_USER_OAUTH_CONNECTION,
842 "provider_name",
843 "client_secret",
844 provider,
845 request_body
846 ) == self.error:
847 return self.boilerplate_responses_initialised.internal_server_error(title, token)
848 if "provider_scope" in request_body:
849 if self.boilerplate_non_http_initialised.update_single_data(
850 CONST.TAB_USER_OAUTH_CONNECTION,
851 "provider_name",
852 "provider_scope",
853 provider,
854 request_body
855 ) == self.error:
856 return self.boilerplate_responses_initialised.internal_server_error(title, token)
857 if "authorisation_base_url" in request_body:
858 if self.boilerplate_non_http_initialised.update_single_data(
859 CONST.TAB_USER_OAUTH_CONNECTION,
860 "provider_name",
861 "authorisation_base_url",
862 provider,
863 request_body
864 ) == self.error:
865 return self.boilerplate_responses_initialised.internal_server_error(title, token)
866 if "token_grabber_base_url" in request_body:
867 if self.boilerplate_non_http_initialised.update_single_data(
868 CONST.TAB_USER_OAUTH_CONNECTION,
869 "provider_name",
870 "token_grabber_base_url",
871 provider,
872 request_body
873 ) == self.error:
874 return self.boilerplate_responses_initialised.internal_server_error(title, token)
875 if "user_info_base_url" in request_body:
876 if self.boilerplate_non_http_initialised.update_single_data(
877 CONST.TAB_USER_OAUTH_CONNECTION,
878 "provider_name",
879 "user_info_base_url",
880 provider,
881 request_body
882 ) == self.error:
883 return self.boilerplate_responses_initialised.internal_server_error(title, token)
884 body = self.boilerplate_responses_initialised.build_response_body(
885 title=title,
886 message="The provider is successfully updated.",
887 resp="success",
888 token=token,
889 error=False
890 )
891 return HCI.success(body, content_type=HTTP_DEFAULT_TYPE, headers=self.server_headers_initialised.for_json())
892
893 async def delete_oauth_provider(self, request: Request, provider: str) -> Response:
894 """
895 The function to delete an oauth provider from the database
896 """
897 title: str = "delete_oauth_provider"
898 if not provider:
899 return self.boilerplate_responses_initialised.provider_not_given(
900 title,
901 None
902 )
903 self.disp.log_debug(f"Provider: {provider}", title)
904 token: str = self.boilerplate_incoming_initialised.get_token_if_present(
905 request
906 )
907 self.disp.log_debug(f"Token gotten: {token}", title)
908 if self.boilerplate_non_http_initialised.is_token_admin(token) is False:
909 self.disp.log_error("You're not admin.", title)
910 return self.boilerplate_responses_initialised.insuffisant_rights(
911 title,
912 token
913 )
914 retrived_data = self.database_link.get_data_from_table(
915 CONST.TAB_USER_OAUTH_CONNECTION,
916 "*",
917 f"provider_name='{provider}'"
918 )
919 if isinstance(retrived_data, int):
920 return self.boilerplate_responses_initialised.internal_server_error(
921 title,
922 token
923 )
924 retrieved_service_id = self.database_link.get_data_from_table(
925 CONST.TAB_SERVICES,
926 "id",
927 f"name='{provider}'"
928 )
929 if isinstance(retrieved_service_id, int):
930 return self.boilerplate_responses_initialised.internal_server_error(
931 title,
932 token
933 )
934 service_id = str(retrieved_service_id[0]["id"])
935 if self.database_link.drop_data_from_table(
936 CONST.TAB_ACTIVE_OAUTHS,
937 f"service_id='{service_id}'"
938 ) == self.error:
939 return self.boilerplate_responses_initialised.internal_server_error(
940 title,
941 token
942 )
943 if self.database_link.update_data_in_table(
944 CONST.TAB_SERVICES,
945 "0",
946 "oauth",
947 f"id='{service_id}'"
948 ) == self.error:
949 return self.boilerplate_responses_initialised.internal_server_error(
950 title,
951 token
952 )
953 if self.database_link.drop_data_from_table(
954 CONST.TAB_USER_OAUTH_CONNECTION,
955 f"provider_name='{provider}'"
956 ) == self.error:
957 return self.boilerplate_responses_initialised.internal_server_error(
958 title,
959 token
960 )
961 body = self.boilerplate_responses_initialised.build_response_body(
962 title=title,
963 message="The oauth provider has been deleted successfully.",
964 resp="success",
965 token=token,
966 )
967 return HCI.success(
968 body,
969 content_type=HTTP_DEFAULT_TYPE,
970 headers=self.server_headers_initialised.for_json()
971 )
None __init__(self, int success=0, int error=84, bool debug=False)
Response update_oauth_provider_data(self, Request request, str provider)
Response add_oauth_provider(self, Request request, str provider="")
Response _handle_token_response(self, Dict token_response, str provider)
Response _oauth_user_logger(self, Dict user_info, str provider, list connection_data)
Response patch_oauth_provider_data(self, Request request, str provider)
Union[str, None] refresh_token(self, str provider_name, str refresh_link)
Response delete_oauth_provider(self, Request request, str provider)