80 Generate an OAuth authorization url depends on the given provider
82 title =
"generate_oauth_authorization_url"
84 CONST.TAB_USER_OAUTH_CONNECTION,
86 f
"provider_name='{provider}'"
88 self.
disp.log_debug(f
"Retrived provider: {retrived_provider}", title)
89 if isinstance(retrived_provider, int):
90 self.
disp.log_error(
"Unknown or Unsupported OAuth provider", title)
92 base_url = retrived_provider[0][
"authorisation_base_url"]
93 client_id = retrived_provider[0][
"client_id"]
94 scope = retrived_provider[0][
"provider_scope"]
95 redirect_uri = CONST.REDIRECT_URI
96 state = str(uuid.uuid4())
98 CONST.TAB_VERIFICATION)
99 self.
disp.log_debug(f
"Columns list: {columns}", title)
100 if isinstance(columns, int):
104 CONST.EMAIL_VERIFICATION_DELAY)
106 expiration_time,
False)
107 self.
disp.log_debug(f
"Expiration time: {et_str}", et_str)
112 if self.
database_link.insert_data_into_table(CONST.TAB_VERIFICATION, data, columns) == self.
error:
116 if provider ==
"google":
117 url = f
"{base_url}?access_type=offline&client_id={client_id}&redirect_uri={redirect_uri}&prompt=consent"
119 url = f
"{base_url}?client_id={client_id}&redirect_uri={redirect_uri}"
120 url += f
"&response_type=code&scope={scope}&state={state}"
121 url = url.replace(
" ",
"%20")
122 url = url.replace(
":",
"%3A")
123 url = url.replace(
"/",
"%2F")
124 url = url.replace(
"?",
"%3F")
125 url = url.replace(
"&",
"%26")
126 self.
disp.log_debug(f
"url = {url}", title)
131 Exchange the OAuth authorization code for an access token
133 title =
"exchange_code_for_token"
136 CONST.TAB_USER_OAUTH_CONNECTION,
138 f
"provider_name='{provider}'"
140 if isinstance(retrieved_provider, int):
142 "exchange_code_for_token",
143 "Internal server error.",
144 "Internal server error.",
149 headers[
"Accept"] =
"application/json"
150 headers[
"Content-Type"] =
"application/x-www-form-urlencoded"
151 token_url = retrieved_provider[0][
"token_grabber_base_url"]
154 data[
"client_id"] = retrieved_provider[0][
"client_id"]
155 data[
"client_secret"] = retrieved_provider[0][
"client_secret"]
157 data[
"redirect_uri"] = CONST.REDIRECT_URI
158 data[
"grant_type"] =
"authorization_code"
160 response = requests.post(
161 token_url, data=data, headers=headers, timeout=10
163 self.
disp.log_debug(f
"Exchange response = {response}", title)
164 response.raise_for_status()
165 token_response = response.json()
166 if "error" in token_response:
167 msg =
"OAuth error: "
168 msg += f
"{token_response['error_description']}"
169 self.
disp.log_error(msg, title)
171 "exchange_code_for_token",
172 "Failed to get the token",
173 f
"{token_response['error']}",
177 return token_response
178 except requests.RequestException
as e:
179 self.
disp.log_error(f
"RequestException: {str(e)}", title)
181 "exchange_code_for_token",
182 "HTTP request failed.",
188 self.
disp.log_error(
"Failed to parse response JSON.", title)
190 "exchange_code_for_token",
191 "Invalid JSON response from provider.",
199 Get a user information depending
201 title: str =
"get_user_info"
203 CONST.TAB_USER_OAUTH_CONNECTION,
205 f
"provider_name='{provider}'"
208 f
"Retrieved oauth provider data: {retrieved_data}", title)
209 if isinstance(retrieved_data, int):
212 "Failed to fetch the OAuth provider information.",
213 "Failed to fetch the OAuth provider information.",
218 "Authorization": f
"Bearer {access_token}"
220 user_info_url = retrieved_data[0][
"user_info_base_url"]
221 self.
disp.log_debug(f
"User info headers: {headers}", title)
222 self.
disp.log_debug(f
"User info url: {user_info_url}", title)
223 response = requests.get(user_info_url, headers=headers, timeout=10)
224 if response.status_code != 200:
227 "Failed to retrieve the user email from the provider",
232 user_info = response.json()
233 self.
disp.log_debug(f
"User info: {user_info}", title)
234 if provider ==
"github":
235 for _, info
in enumerate(user_info):
238 email[
"email"] = info[
"email"]
244 The function to insert or update the user information in the database
246 title: str =
"oauth_user_logger"
247 email: str = user_info[
"email"]
253 self.
disp.log_debug(f
"Retrieved user: {retrieved_user}", title)
254 if isinstance(retrieved_user, int)
is False:
260 msg =
"Retrieved provider: "
261 msg += f
"{retrieved_provider}"
262 self.
disp.log_debug(msg, title)
263 if isinstance(retrieved_user, int):
265 connection_data.append(str(retrieved_provider[0][
"id"]))
266 connection_data.append(str(retrieved_user[0][
"id"]))
267 self.
disp.log_debug(f
"Connection data: {connection_data}", title)
269 provider_id = str(retrieved_provider[0][
"id"])
270 user_id = str(retrieved_user[0][
"id"])
272 CONST.TAB_ACTIVE_OAUTHS,
274 f
"service_id='{provider_id}' AND user_id='{user_id}'"
277 CONST.TAB_ACTIVE_OAUTHS)
278 if isinstance(columns, int):
281 self.
disp.log_debug(f
"Columns list = {columns}", title)
283 CONST.TAB_ACTIVE_OAUTHS,
291 if user_data[
"status"] == self.
error:
295 message=
"User successfully logged in.",
297 token=user_data[
"token"],
300 body[
"token"] = user_data[
"token"]
303 content_type=HTTP_DEFAULT_TYPE,
308 if isinstance(columns, int):
311 self.
disp.log_debug(f
"Columns list = {columns}", title)
312 username: str = email.split(
'@')[0]
314 user_data.append(username)
315 user_data.append(email)
316 user_data.append(provider)
317 user_data.append(provider)
318 user_data.append(
"NULL")
319 user_data.append(str(int(
False)))
320 self.
disp.log_debug(f
"Data list = {user_data}", title)
321 if self.
database_link.insert_data_into_table(CONST.TAB_ACCOUNTS, user_data, columns) == self.
error:
328 self.
disp.log_debug(f
"Retrieved user: {retrieved_user}", title)
329 if isinstance(retrieved_user, int):
336 self.
disp.log_debug(f
"Retrieved provider: {retrieved_provider}", title)
337 if isinstance(retrieved_user, int):
339 connection_data.append(str(retrieved_provider[0][
"id"]))
340 connection_data.append(str(retrieved_user[0][
"id"]))
341 self.
disp.log_debug(f
"Connection data: {connection_data}", title)
343 CONST.TAB_ACTIVE_OAUTHS)
344 if isinstance(columns, int):
347 self.
disp.log_debug(f
"Columns list = {columns}", title)
349 CONST.TAB_ACTIVE_OAUTHS,
356 if user_data[
"status"] == self.
error:
360 message=
"User successfully logged in.",
362 token=user_data[
"token"],
365 body[
"token"] = user_data[
"token"]
368 content_type=HTTP_DEFAULT_TYPE,
374 The function that handle the response given by the provider for the oauth token
376 title =
"handle_token_response"
378 access_token: str = token_response[
"access_token"]
381 data.append(access_token)
382 self.
disp.log_debug(f
"Gotten access token: {access_token}", title)
383 if provider ==
"github":
392 expires: int = token_response[
"expires_in"]
396 "The expiration time was not found in the provided response.",
397 "Expiration time not found.",
401 return HCI.bad_request(
403 content_type=HTTP_DEFAULT_TYPE,
406 current_time = datetime.now()
407 new_time = current_time + timedelta(seconds=expires)
411 data.append(expiration_date)
412 data.append(str(expires))
413 refresh_link = token_response[
"refresh_token"]
417 "The refresh link was not found in the provided response.",
418 "Refresh link not found.",
422 return HCI.bad_request(
424 content_type=HTTP_DEFAULT_TYPE,
427 data.append(refresh_link)
429 f
"Generated data for new oauth connexion user: {data}", title)
431 if "error" in user_info:
439 return HCI.internal_server_error(
441 content_type=HTTP_DEFAULT_TYPE,
446 def refresh_token(self, provider_name: str, refresh_link: str) -> Union[str,
None]:
448 The function that use the given provider name and refresh link to generate a new token for oauth authentication
450 title: str =
"refresh_token"
451 if any(word
in provider_name
for word
in (
"google",
"discord",
"spotify")):
453 CONST.TAB_USER_OAUTH_CONNECTION,
455 f
"provider_name='{provider_name}'"
457 msg =
"Retrieved provider data:"
458 msg += f
"{retrieved_data}"
459 self.
disp.log_debug(msg, title)
460 if isinstance(retrieved_data, int):
462 "An error has been detected when retrieving the provider data", title
465 token_url: str = retrieved_data[0][
"token_grabber_base_url"]
466 generated_data: dict = {}
467 generated_data[
"client_id"] = retrieved_data[0][
"client_id"]
468 generated_data[
"client_secret"] = retrieved_data[0][
"client_secret"]
469 generated_data[
"refresh_token"] = refresh_link
470 generated_data[
"grant_type"] =
"refresh_token"
471 self.
disp.log_debug(f
"Generated data: {generated_data}", title)
472 provider_response: Response = requests.post(
473 token_url, data=generated_data, timeout=10
476 f
"Provider response: {provider_response}", title)
477 if provider_response.status_code == 200:
478 token_response = provider_response.json()
479 msg =
"Provider response to json: "
480 msg += f
"{token_response}"
481 self.
disp.log_debug(msg, title)
482 if "access_token" in token_response:
483 return token_response[
"access_token"]
486 self.
disp.log_error(
"The provider is not recognised", title)
491 Callback of the OAuth login
493 title =
"oauth_callback"
494 query_params = request.query_params
498 message=
"Query parameters not provided.",
499 resp=
"no query parameters",
503 return HCI.bad_request(
505 content_type=HTTP_DEFAULT_TYPE,
508 self.
disp.log_debug(f
"Query params: {query_params}", title)
509 code = query_params.get(
"code")
510 self.
disp.log_debug(f
"Code: {code}", title)
511 state = query_params.get(
"state")
512 self.
disp.log_debug(f
"State: {state}", title)
513 if not code
or not state:
516 message=
"Authorization code or state not provided.",
517 resp=
"no code or state",
521 return HCI.bad_request(
523 content_type=HTTP_DEFAULT_TYPE,
526 uuid_gotten, provider = state.split(
":")
527 if not uuid_gotten
or not provider:
530 message=
"The state is in bad format.",
531 resp=
"bad state format",
535 return HCI.bad_request(
537 content_type=HTTP_DEFAULT_TYPE,
540 self.
disp.log_debug(f
"Uuid retrived: {uuid_gotten}", title)
541 self.
disp.log_debug(f
"Provider: {provider}", title)
543 CONST.TAB_VERIFICATION,
545 f
"definition='{uuid_gotten}'"
547 self.
disp.log_debug(f
"Data received: {data}", title)
548 if isinstance(data, int):
551 CONST.TAB_VERIFICATION,
552 f
"definition='{uuid_gotten}'"
556 self.
disp.log_debug(f
"Token response: {token_response}", title)
557 if "error" in token_response:
560 message=
"Failed to get the token.",
561 resp=token_response[
"error"],
565 return HCI.bad_request(
567 content_type=HTTP_DEFAULT_TYPE,
574 Get the authorization url for the OAuth login depending on the provider
576 title =
"oauth_login"
578 self.
disp.log_debug(f
"Request body: {request_body}", title)
579 if not request_body
or "provider" not in request_body:
582 message=
"The provider is not found in the request.",
587 return HCI.bad_request(
589 content_type=HTTP_DEFAULT_TYPE,
592 provider = request_body[
"provider"]
593 self.
disp.log_debug(f
"Oauth login provider: {provider}", title)
595 self.
disp.log_debug(f
"Authorization url: {authorization_url}", title)
596 if isinstance(authorization_url, int):
600 message=
"Authorization url successfully generated.",
605 body[
"authorization_url"] = authorization_url
608 content_type=HTTP_DEFAULT_TYPE,
614 Add a new oauth provider to the database (only for admin)
616 title: str =
"add_oauth_provider"
620 message=
"The provider is not provided.",
625 return HCI.bad_request(
627 content_type=HTTP_DEFAULT_TYPE,
630 self.
disp.log_debug(f
"Provider: {provider}", title)
635 self.
disp.log_error(
"You're not admin.", title)
638 CONST.TAB_USER_OAUTH_CONNECTION,
640 f
"provider_name='{provider}'"
642 if isinstance(retrived_data, int)
is False:
645 message=
"The provider already exist in the database.",
646 resp=
"provider already exist",
652 content_type=HTTP_DEFAULT_TYPE,
656 if not request_body
or not all(key
in request_body
for key
in (
"client_id",
"client_secret",
"provider_scope",
"authorisation_base_url",
"token_grabber_base_url",
"user_info_base_url")):
659 message=
"A variable is missing in the body.",
660 resp=
"missing variable",
664 return HCI.bad_request(
666 content_type=HTTP_DEFAULT_TYPE,
669 self.
disp.log_debug(f
"Request body: {request_body}", title)
670 client_id = request_body[
"client_id"]
671 client_secret = request_body[
"client_secret"]
672 provider_scope = request_body[
"provider_scope"]
673 authorisation_base_url = request_body[
"authorisation_base_url"]
674 token_grabber_base_url = request_body[
"token_grabber_base_url"]
675 user_info_base_url = request_body[
"user_info_base_url"]
681 authorisation_base_url,
682 token_grabber_base_url,
686 CONST.TAB_USER_OAUTH_CONNECTION
688 if isinstance(columns, int):
691 self.
disp.log_debug(f
"Columns: {columns}", title)
692 if self.
database_link.insert_data_into_table(CONST.TAB_USER_OAUTH_CONNECTION, data, columns) == self.
error:
695 column: list = [
"oauth"]
705 message=
"The provider is successfully added.",
712 content_type=HTTP_DEFAULT_TYPE,
718 The function that modify every value of an oauth provider
720 title: str =
"update_oauth_provider_data"
723 self.
disp.log_debug(f
"Provider: {provider}", title)
727 self.
disp.log_debug(f
"Token received: {token}", title)
729 self.
disp.log_error(
"You're not admin.", title)
732 CONST.TAB_USER_OAUTH_CONNECTION,
734 f
"provider_name='{provider}'")
735 if isinstance(retrived_data, int):
738 if not request_body
or not all(key
in request_body
for key
in (
"client_id",
"client_secret",
"provider_scope",
"authorisation_base_url",
"token_grabber_base_url",
"user_info_base_url")):
741 message=
"A variable is missing in the body.",
742 resp=
"missing variable",
746 return HCI.bad_request(
748 content_type=HTTP_DEFAULT_TYPE,
751 self.
disp.log_debug(f
"Request body: {request_body}", title)
752 client_id = request_body[
"client_id"]
753 client_secret = request_body[
"client_secret"]
754 provider_scope = request_body[
"provider_scope"]
755 authorisation_base_url = request_body[
"authorisation_base_url"]
756 token_grabber_base_url = request_body[
"token_grabber_base_url"]
757 user_info_base_url = request_body[
"user_info_base_url"]
762 authorisation_base_url,
763 token_grabber_base_url,
766 self.
disp.log_debug(f
"Generated data: {data}", title)
768 CONST.TAB_USER_OAUTH_CONNECTION
770 if isinstance(columns, int):
774 self.
disp.log_debug(f
"Columns: {columns}", title)
776 CONST.TAB_USER_OAUTH_CONNECTION,
779 f
"provider_name='{provider}'"
784 message=
"The provider is successfully updated.",
791 content_type=HTTP_DEFAULT_TYPE,
797 The function that modify every value of an oauth provider
799 title: str =
"update_oauth_provider_data"
802 self.
disp.log_debug(f
"Provider: {provider}", title)
806 self.
disp.log_debug(f
"Token gotten: {token}", title)
808 self.
disp.log_error(
"You're not admin.", title)
811 CONST.TAB_USER_OAUTH_CONNECTION,
813 f
"provider_name='{provider}'"
815 if isinstance(retrived_data, int):
820 self.
disp.log_debug(f
"Request body: {request_body}", title)
821 if "provider_name" in request_body:
823 CONST.TAB_USER_OAUTH_CONNECTION,
830 if "client_id" in request_body:
832 CONST.TAB_USER_OAUTH_CONNECTION,
839 if "client_secret" in request_body:
841 CONST.TAB_USER_OAUTH_CONNECTION,
848 if "provider_scope" in request_body:
850 CONST.TAB_USER_OAUTH_CONNECTION,
857 if "authorisation_base_url" in request_body:
859 CONST.TAB_USER_OAUTH_CONNECTION,
861 "authorisation_base_url",
866 if "token_grabber_base_url" in request_body:
868 CONST.TAB_USER_OAUTH_CONNECTION,
870 "token_grabber_base_url",
875 if "user_info_base_url" in request_body:
877 CONST.TAB_USER_OAUTH_CONNECTION,
879 "user_info_base_url",
886 message=
"The provider is successfully updated.",
895 The function to delete an oauth provider from the database
897 title: str =
"delete_oauth_provider"
903 self.
disp.log_debug(f
"Provider: {provider}", title)
907 self.
disp.log_debug(f
"Token gotten: {token}", title)
909 self.
disp.log_error(
"You're not admin.", title)
915 CONST.TAB_USER_OAUTH_CONNECTION,
917 f
"provider_name='{provider}'"
919 if isinstance(retrived_data, int):
924 retrieved_service_id = self.
database_link.get_data_from_table(
929 if isinstance(retrieved_service_id, int):
934 service_id = str(retrieved_service_id[0][
"id"])
936 CONST.TAB_ACTIVE_OAUTHS,
937 f
"service_id='{service_id}'"
954 CONST.TAB_USER_OAUTH_CONNECTION,
955 f
"provider_name='{provider}'"
963 message=
"The oauth provider has been deleted successfully.",
969 content_type=HTTP_DEFAULT_TYPE,