2# +==== BEGIN CatFeeder =================+
5# ...............)..(.')
7# ...............\(__)|
8# Inspired by Joan Stark
9# source https://www.asciiart.eu/
13# FILE: sql_query_boilerplates.py
14# CREATION DATE: 11-10-2025
15# LAST Modified: 1:39:32 06-02-2026
17# This is the backend server in charge of making the actual website work.
19# COPYRIGHT: (c) Cat Feeder
21# File in charge of containing the interfacing between an sql library and the program.
22# This contains functions that simplify the process of interracting with databases as well as check for injection attempts.
25# +==== END CatFeeder =================+
29from typing
import List, Dict, Union, Any, Tuple, Optional, Literal, Set, overload
33from display_tty
import Disp, initialise_logger
36from .
import sql_constants
as SCONST
37from .sql_injection
import SQLInjection
38from .sql_connections
import SQLManageConnections
39from .sql_sanitisation_functions
import SQLSanitiseFunctions
43 """Provide reusable SQL query templates and helpers.
45 This class contains methods for generating common SQL queries, such as
46 creating tables, inserting data, and fetching results. It simplifies
47 database interactions by abstracting repetitive query patterns.
50 disp (Disp): Logger instance for debugging and error reporting.
51 sql_pool (SQLManageConnections): Connection manager for database operations.
52 error (int): Numeric error code.
53 success (int): Numeric success code.
54 debug (bool): Debug mode flag.
55 db_version (Optional[Tuple[int, int, int]]): Parsed database version.
56 sql_injection (SQLInjection): SQL injection protection utility.
57 sanitize_functions (SQLSanitiseFunctions): SQL sanitization utility.
60 disp: Disp = initialise_logger(__qualname__,
False)
62 def __init__(self, sql_pool: SQLManageConnections, success: int = 0, error: int = 84, debug: bool =
False) ->
None:
63 """Initialize the query helper.
66 sql_pool (SQLManageConnections): Connection manager used to run queries and commands.
67 redis_cacher (Optional[RedisCaching]): Optional Redis caching layer to enable transparent caching.
68 success (int, optional): Numeric success code. Defaults to 0.
69 error (int, optional): Numeric error code. Defaults to 84.
70 debug (bool, optional): Enable debug logging. Defaults to False.
73 self.
disp.update_disp_debug(debug)
74 self.
disp.log_debug(
"Initialising...")
90'[^']*' | # single-quoted strings
91"[^"]*" | # double-quoted strings
92<=|>=|!=|=|<|> | # comparison operators
93\(|\) | # parentheses
95\bAND\b|\bOR\b|\bIN\b|\bLIKE\b|\bIS\b|\bNOT\b | # SQL keywords
96[A-Za-z_][A-Za-z0-9_]* | # identifiers
97-?\d+(?:\.\d+)? # integers and floats (including negatives)
99 re.IGNORECASE | re.VERBOSE,
103 '(',
')',
',',
'OR',
'AND',
'=',
'!=',
'<',
'>',
'<=',
104 '>=',
'LIKE',
'IN',
'NOT',
'IS',
'NULL',
'TRUE',
'FALSE'
109 '=',
'!=',
'<',
'>',
'<=',
'>='
113 "SQL injection detected in WHERE clause"
125 self.
disp.log_debug(
"Initialised")
128 """Normalize a cell value for parameter binding.
130 Converts special tokens (like 'now', 'current_date') to their appropriate SQL values, preserves numeric types, and returns None for null-like inputs.
133 cell (object): The cell value to normalize.
136 Union[str, None, int, float]: Normalized value suitable for SQL parameters.
140 if isinstance(cell, (int, float)):
144 if sl
in (
"now",
"now()"):
146 if sl
in (
"current_date",
"current_date()"):
151 """Tokenize a WHERE clause into SQL tokens.
153 Splits a clause into individual tokens such as identifiers, operators, numbers, strings, and SQL keywords.
156 clause (str): The SQL WHERE clause to tokenize.
159 List[str]: List of tokens extracted from the clause.
164 """Check if the given token represents a number
167 token (str): The token to check
170 bool: True is a digit, False otherwise
175 """Escape column names that could be considered as SQL keywords instead of text.
178 token (str): The column part of the token
181 str: The token (escaped if necessary)
183 token_lower = token.lower()
188 self.
disp.log_debug(f
"Escaping risky column name {token}")
193 """Remove at most one leading and/or trailing quote character.
195 Preserves inner quotes and does not affect the string if no quotes are present.
198 value (str): The string from which to remove outer quotes.
201 str: String with outer quotes removed if present.
206 self.
disp.log_debug(f
"Stripping outer quotes for {value}")
207 if value[0]
in (
"'",
'"'):
210 if value
and value[-1]
in (
"'",
'"'):
212 self.
disp.log_debug(f
"Outer quotes stripped for {value}")
217 """Validate a single WHERE clause token for SQL injection.
219 Checks whether the token is safe (operators, keywords, numbers, or quoted literals) and raises an error if an injection risk is detected.
222 token (str): Token from the WHERE clause to validate.
225 RuntimeError: If the token is determined to be unsafe.
232 if raw.startswith(
"`")
and raw.endswith(
"`"):
245 raw.startswith(
"'")
and raw.endswith(
"'")
247 raw.startswith(
'"')
and raw.endswith(
'"')
252 check_token = raw.strip(
"'\"")
254 if self.
sql_injection.check_if_symbol_and_command_injection(check_token):
256 f
"SQL injection detected in WHERE token: {raw}"
261 """Check if a string is surrounded by single or double quotes.
264 value (str): The string to check.
267 bool: True if the string starts and ends with the same quote character
268 (either single `'` or double `"`), False otherwise.
270 if not value
or len(value) < 2:
272 if (value[0] == value[-1])
and value[0]
in (
"'",
'"'):
273 self.
disp.log_debug(f
"value ({value}) is quoted")
278 """Determine where to put spaces during query rebuild, this is mostly for esthetic reasons.
281 token (str): The current token being processed.
282 rebuilt_tokens (List[str]): The list of tokens being rebuilt.
283 skip_space (bool, optional): This is wether adding whitespace on the next turn should be skipped or not. Defaults to False.
286 bool: The state of skip_space for the next turn.
294 rebuilt_tokens.append(space)
300 rebuilt_tokens.append(space)
305 Perform sanity checks on a WHERE clause to detect potential SQL injection after it has been broken down and reconstructed.
308 clause_str (str): The clause to check.
311 RuntimeError: If SQL injection is detected.
314 if clause_str.count(
"(") != clause_str.count(
")"):
317 if re.search(
r"%s\s+`[^`]+`", clause_str):
322 Validate a complex WHERE clause and extract parameterizable values.
325 clause_str (str): The clause to check.
328 Tuple[str, List[Union[str, int, float, None]]]:
329 - The clause with placeholders (%s) for parameterizable values.
330 - List of extracted values.
333 self.
disp.log_debug(f
"Raw clause: {clause_str}")
335 self.
disp.log_debug(f
"Tokenised clause: {tokens}")
337 params: List[Union[str, int, float,
None]] = []
338 rebuilt_tokens: List[str] = []
339 is_column: bool =
False
351 token, rebuilt_tokens, skip_space
359 params.append(normalized)
360 rebuilt_tokens.append(
"%s")
362 rebuilt_tokens.append(token)
364 self.
disp.log_debug(f
"rebuilt_tokens: {rebuilt_tokens}")
365 rebuilt_clause =
"".join(rebuilt_tokens)
366 self.
disp.log_debug(f
"rebuilt_clause: {rebuilt_clause}")
367 self.
disp.log_debug(f
"parameters: {params}")
369 self.
disp.log_debug(
"WHERE clause passed sanity check")
370 return rebuilt_clause, params
372 def _parse_where_clause(self, where: Union[str, List[str]]) -> Tuple[str, List[Union[str, int, float,
None]]]:
373 """Parse and parameterize a WHERE clause.
375 This method performs the following steps:
376 1. Validates each clause for SQL injection.
377 2. Parameterizes simple equality clauses with %s placeholders.
378 3. Escapes risky column names while leaving logical operators unescaped.
379 4. Returns the parameterized WHERE string and list of extracted values.
382 where (Union[str, List[str]]): WHERE clause(s) to parse. Can be a string
383 or a list of strings joined by AND.
386 Tuple[str, List[Union[str, int, float, None]]]:
387 - Parameterized WHERE clause string.
388 - List of extracted values for parameters.
391 RuntimeError: If SQL injection is detected in the WHERE clauses.
394 title =
"_parse_where_clause"
396 if where ==
"" or (isinstance(where, list)
and not where):
399 self.
disp.log_debug(f
"unchecked WHERE clause={where}")
401 params: List[Union[str, int, float,
None]] = []
402 parsed_clauses: List[str] = []
405 if isinstance(where, str):
406 self.
disp.log_debug(
"Where clause is a string")
408 elif hasattr(where,
"__iter__")
or hasattr(where,
"__getitem__"):
410 f
"Where clause is a iterable, type: {type(where)}"
415 raise ValueError(
"Unhandled type for where checking")
417 for clause
in where_list:
418 clause_str = str(clause).strip()
419 self.
disp.log_debug(f
"stripped string clause: {clause_str}")
422 processed_check: Tuple[
424 List[Union[str, int, float,
None]]
426 self.
disp.log_debug(f
"processed_checks: {processed_check}")
429 parsed_clauses.append(processed_check[0])
430 params.extend(processed_check[1])
432 where_string = join_term.join(parsed_clauses)
435 f
"Parsed WHERE: '{where_string}', params: {params}",
439 return where_string, params
442 """Fetch and parse the database version.
445 Optional[Tuple[int, int, int]]: A tuple representing the database version,
446 or None if the query fails.
448 _query: str =
"SELECT VERSION()"
449 resp = self.
sql_pool.run_and_fetch_all(_query)
450 if isinstance(resp, int):
452 if not resp
or not isinstance(resp, list):
454 if not resp[0]
or not isinstance(resp[0], tuple):
456 if not resp[0][0]
or not isinstance(resp[0][0], str):
459 version_parts = vers.split(
'.')
461 for part
in version_parts:
463 parsed_version.append(int(part))
465 parsed_version.append(part)
473 """Return the list of column names for a given table.
476 table_name (str): Name of the table to retrieve column names from.
479 Union[List[str], int]: List of column names on success, or `self.error` on failure.
481 title =
"get_table_column_names"
484 if isinstance(columns, int)
is True:
486 f
"Failed to describe table {table_name}.",
491 if isinstance(columns, List):
495 except RuntimeError
as e:
496 msg =
"Error: Failed to get column names of the tables.\n"
497 msg += f
"\"{str(e)}\""
498 self.
disp.log_error(msg,
"get_table_column_names")
502 """Retrieve the names of all tables in the database.
505 Union[int, List[str]]: List of table names on success, or `self.error` on failure.
507 title =
"get_table_names"
508 self.
disp.log_debug(
"Getting table names.", title)
509 resp = self.
sql_pool.run_and_fetch_all(query=
"SHOW TABLES")
510 if isinstance(resp, int)
is True:
512 "Failed to fetch the table names.",
517 if isinstance(resp, (List, Dict, Tuple)):
520 self.
disp.log_debug(
"Tables fetched", title)
524 """Retrieve all triggers and their SQL definitions.
527 Union[int, Dict[str, str]]: Dictionary of {trigger_name: sql_definition},
528 or `self.error` on failure.
530 title =
"get_triggers"
532 "Fetching all triggers and their SQL definitions.", title
535 query =
"SHOW TRIGGERS;"
536 resp = self.
sql_pool.run_and_fetch_all(query=query, values=[])
538 if isinstance(resp, int):
539 self.
disp.log_error(
"Failed to fetch triggers.", title)
542 data: Dict[str, str] = {}
544 if len(row) >= 2
and row[0]
and row[1]:
545 data[row[0]] = row[1]
547 self.
disp.log_debug(f
"Triggers fetched: {list(data.keys())}", title)
550 def get_trigger(self, trigger_name: str, db_name: Optional[str] =
None) -> Union[int, str]:
551 """Retrieve the SQL definition of a specific trigger.
554 trigger_name (str): The trigger name to fetch.
555 db_name (Optional[str], optional): Database name. Defaults to None.
558 Union[int, str]: The SQL definition, or `self.error` on failure.
560 title =
"get_trigger"
562 f
"Getting trigger definition for '{trigger_name}'", title
566 self.
disp.log_error(
"Trigger name cannot be empty.", title)
569 to_check: List[str] = [trigger_name]
571 to_check.append(db_name)
573 if self.
sql_injection.check_if_injections_in_strings(to_check):
575 "SQL Injection detected in trigger name.", title)
579 query = f
"SHOW CREATE TRIGGER `{db_name}`.`{trigger_name}`"
581 query = f
"SHOW CREATE TRIGGER `{trigger_name}`"
583 resp = self.
sql_pool.run_and_fetch_all(query=query, values=
None)
585 if isinstance(resp, int)
or not resp:
587 f
"Failed to retrieve trigger '{trigger_name}'.", title
591 sql_definition =
None
592 if resp
and len(resp[0]) > 2:
593 sql_definition = resp[0][2]
594 if not sql_definition:
596 f
"No SQL definition found for trigger '{trigger_name}'.", title
601 f
"SQL for trigger '{trigger_name}':\n{sql_definition}", title
603 return sql_definition
606 """Return a list of trigger names in the current or specified MySQL database.
609 db_name (Optional[str], optional):
610 Name of the database/schema to query.
611 Defaults to None, which uses the currently selected database.
614 Union[int, List[str]]: List of trigger names, or ``self.error`` on failure.
616 title =
"get_trigger_names"
617 self.
disp.log_debug(
"Getting trigger names.", title)
619 if self.
sql_injection.check_if_injections_in_strings([db_name]):
621 "SQL Injection detected in database name.", title
626 "SELECT TRIGGER_NAME "
627 "FROM information_schema.triggers "
628 "WHERE TRIGGER_SCHEMA = %s "
629 "ORDER BY TRIGGER_NAME;"
631 values: List[Union[str, int, float,
None]] = [db_name]
634 "SELECT TRIGGER_NAME "
635 "FROM information_schema.triggers "
636 "WHERE TRIGGER_SCHEMA = DATABASE() "
637 "ORDER BY TRIGGER_NAME;"
639 values: List[Union[str, int, float,
None]] = []
644 self.
disp.log_debug(f
"Running query: {query}", title)
645 response = self.
sql_pool.run_and_fetch_all(query=query, values=values)
647 if isinstance(response, int):
648 self.
disp.log_error(
"Failed to fetch trigger names.", title)
653 "No triggers found in the selected database.", title)
659 trigger_names: List[str] = []
662 trigger_names.append(row[0])
664 self.
disp.log_debug(f
"Triggers fetched: {trigger_names}", title)
668 """Fetch the headers (description) of a table from the database.
671 table (str): The name of the table to describe.
674 Union[int, List[Any]]: A list containing the description of the table, or self.error if an error occurs.
676 title =
"describe_table"
677 self.
disp.log_debug(f
"Describing table {table}", title)
679 self.
disp.log_error(
"Injection detected.",
"sql")
682 resp = self.
sql_pool.run_and_fetch_all(query=f
"DESCRIBE {table}")
683 if isinstance(resp, int)
is True:
685 f
"Failed to describe table {table}", title
689 except mysql.connector.errors.ProgrammingError
as pe:
690 msg = f
"ProgrammingError: The table '{table}'"
691 msg +=
"does not exist or the query failed."
692 self.
disp.log_critical(msg, title)
693 raise RuntimeError(msg)
from pe
694 except mysql.connector.errors.IntegrityError
as ie:
695 msg =
"IntegrityError: There was an integrity constraint "
696 msg += f
"issue while describing the table '{table}'."
697 self.
disp.log_critical(msg, title)
698 raise RuntimeError(msg)
from ie
699 except mysql.connector.errors.OperationalError
as oe:
700 msg =
"OperationalError: There was an operational error "
701 msg += f
"while describing the table '{table}'."
702 self.
disp.log_critical(msg, title)
703 raise RuntimeError(msg)
from oe
704 except mysql.connector.Error
as e:
705 msg =
"MySQL Error: An unexpected error occurred while "
706 msg += f
"describing the table '{table}'."
707 self.
disp.log_critical(msg, title)
708 raise RuntimeError(msg)
from e
709 except RuntimeError
as e:
710 msg =
"A runtime error occurred during the table description process."
711 self.
disp.log_critical(msg, title)
712 raise RuntimeError(msg)
from e
714 def create_table(self, table: str, columns: List[Tuple[str, str]]) -> int:
715 """Create a new table in the MySQL database, compatible with MySQL 5.0+.
718 table (str): Name of the new table.
719 columns (List[Tuple[str, str]]): List of (column_name, column_type) pairs.
722 int: ``self.success`` on success, or ``self.error`` on failure.
725 .. code-block:: python
729 ("id", "INT AUTO_INCREMENT PRIMARY KEY"),
730 ("username", "VARCHAR(255) NOT NULL UNIQUE"),
731 ("email", "VARCHAR(255) NOT NULL"),
732 ("created_at", "DATETIME DEFAULT CURRENT_TIMESTAMP")
735 result = self.create_table(table_name, columns)
736 if result == self.success:
737 print(f"Table '{table_name}' created successfully.")
739 print(f"Failed to create table '{table_name}'.")
742 - Protects against SQL injection using :class:`SQLInjection`.
743 - Escapes table and column names with backticks for MySQL.
744 - Includes version-aware fallback for MySQL 5.0 compatibility.
746 title =
"create_table"
747 self.
disp.log_debug(f
"Creating table '{table}'", title)
750 if self.
sql_injection.check_if_injections_in_strings([table]):
751 self.
disp.log_error(
"Injection detected in table name.", title)
756 table_safe = table.replace(
"`",
"``")
760 for name, col_type
in columns:
761 safe_name = name.replace(
"`",
"``")
765 "DEFAULT CURRENT_TIMESTAMP" in col_type.upper()
770 self.
disp.log_warning(
771 f
"MySQL 5.0 does not support 'DEFAULT CURRENT_TIMESTAMP' "
772 f
"on DATETIME columns. Downgrading '{safe_name}' definition.",
775 col_type = col_type.upper().replace(
776 "DEFAULT CURRENT_TIMESTAMP",
"NULL"
779 _tmp.append(f
"`{safe_name}` {col_type}")
781 columns_def =
", ".join(_tmp)
782 query = f
"CREATE TABLE IF NOT EXISTS `{table_safe}` ({columns_def}) ENGINE=InnoDB;"
784 self.
disp.log_debug(f
"Executing SQL: {query}", title)
786 result = self.
sql_pool.run_and_commit(query=query, values=[])
788 if isinstance(result, int)
and result == self.
error:
789 self.
disp.log_error(f
"Failed to create table '{table}'", title)
792 self.
disp.log_info(f
"Table '{table}' created successfully.", title)
795 except mysql.connector.OperationalError
as oe:
796 msg = f
"MySQL OperationalError while creating table '{table}': {oe}"
797 self.
disp.log_critical(msg, title)
798 raise RuntimeError(msg)
from oe
799 except mysql.connector.Error
as e:
800 msg = f
"MySQL Error while creating table '{table}': {e}"
801 self.
disp.log_critical(msg, title)
802 raise RuntimeError(msg)
from e
803 except Exception
as e:
804 msg = f
"Unexpected error while creating table '{table}': {e}"
805 self.
disp.log_critical(msg, title)
806 raise RuntimeError(msg)
from e
808 def insert_data_into_table(self, table: str, data: Union[List[List[Union[str,
None, int, float]]], List[Union[str,
None, int, float]]], column: Union[List[str],
None] =
None) -> int:
809 """Insert data into a table.
812 table (str): Name of the table.
813 data (Union[List[List[Union[str, None, int, float]]], List[Union[str, None, int, float]]]): Data to insert.
814 column (Union[List[str], None], optional): List of column names. Defaults to None.
817 int: ``self.success`` on success, or ``self.error`` on failure.
819 title =
"insert_data_into_table"
820 self.
disp.log_debug(
"Inserting data into the table.", title)
824 List[List[Union[str,
None, int, float]]],
825 List[Union[str,
None, int, float]]
827 if column
is not None:
828 check_data.extend(column)
829 if isinstance(data, List):
831 if isinstance(i, List):
835 if self.
sql_injection.check_if_injections_in_strings(check_data)
is True:
836 self.
disp.log_error(
"Injection detected.",
"sql")
839 if isinstance(column, List)
and len(column) == 0:
841 if column_raw
is None:
843 if isinstance(column_raw, list):
848 column_checked: Union[List[str], str] = column
854 column_str =
", ".join(column)
855 column_length = len(column)
858 cleaned_lines: Tuple[
859 str, List[Union[str, int, float,
None]]
861 line = cleaned_lines[0]
862 values = cleaned_lines[1]
863 self.
disp.log_debug(f
"Cleaned lines = '{cleaned_lines}'", title)
864 sql_query = f
"INSERT INTO {table} ({column_str}) VALUES {line}"
866 f
"sql_query = '{sql_query}', values = '{values}'", title
868 return self.
sql_pool.run_editing_command(sql_query, values, table,
"insert")
869 except RuntimeError
as e:
871 f
"Failed to check and clean the data needed to be inserted: {e}"
875 def insert_trigger(self, trigger_name: str, table_name: str, timing_event: str, body: str) -> int:
876 """Insert (create) a new SQL trigger into a MySQL or MariaDB database.
879 trigger_name (str): The name of the trigger to create.
880 table_name (str): The name of the table the trigger is being applied to.
881 timing_event (str): The rule when the event is to be triggered. e.g., 'BEFORE INSERT'.
882 body (str): The full SQL CREATE TRIGGER statement.
885 int: ``self.success`` on success, or ``self.error`` on failure.
887 title =
"insert_trigger"
890 if not all([trigger_name, table_name, timing_event, body]):
891 self.
disp.log_error(
"All parameters must be provided.", title)
894 self.
disp.log_debug(f
"Inserting trigger: {trigger_name}", title)
896 if self.
sql_injection.check_if_injections_in_strings([trigger_name, table_name]):
897 self.
disp.log_error(
"SQL injection detected.", title)
900 if self.
sql_injection.check_if_symbol_and_logic_gate_injection(timing_event):
901 self.
disp.log_error(
"SQL injection detected", title)
906 trigger_name, table_name, timing_event, body)
907 self.
disp.log_debug(f
"Trigger SQL:\n{sql_query}", title)
908 if sql_query != self.
success or not isinstance(sql_query, str):
909 self.
disp.log_error(
"Sql trigger query is invalid.", title)
915 self.
disp.log_warning(
916 f
"Could not drop trigger '{trigger_name}' (may not exist).", title
920 result = self.
sql_pool.run_editing_command(
921 sql_query, [], trigger_name,
"create_trigger"
925 f
"Failed to create trigger '{trigger_name}'", title
930 f
"Trigger '{trigger_name}' successfully created.", title
938 column: Union[str, List[str]],
939 where: Union[str, List[str]] =
"",
940 beautify: Literal[
True] =
True,
941 ) -> Union[int, List[Dict[str, Any]]]: ...
947 column: Union[str, List[str]],
948 where: Union[str, List[str]] =
"",
949 beautify: Literal[
False] =
False,
950 ) -> Union[int, List[Tuple[Any, Any]]]: ...
952 def get_data_from_table(self, table: str, column: Union[str, List[str]], where: Union[str, List[str]] =
"", beautify: bool =
True) -> Union[int, Union[List[Dict[str, Any]], List[Tuple[Any, Any]]]]:
953 """Fetch rows from a table.
956 table (str): Name of the table to query.
957 column (Union[str, List[str]]): Column selector; a single column name or a list of column names.
958 where (Union[str, List[str]]): Optional WHERE clause content; string or list joined by ``AND``.
959 beautify (bool): When True, return a list of dict rows; when False, return a list of tuples.
962 Union[int, List[Dict[str, Any]], List[Tuple[Any, Any]]]: Query result on success, or ``self.error`` on failure.
964 title =
"get_data_from_table"
965 self.
disp.log_debug(f
"fetching data from the table {table}", title)
968 if self.
sql_injection.check_if_injections_in_strings([table, column])
is True:
969 self.
disp.log_error(
"Injection detected.",
"sql")
973 if isinstance(column, list):
974 original_column = column.copy()
976 original_column = column
977 if isinstance(column, list)
is True:
979 column =
", ".join(column)
981 sql_command = f
"SELECT {column} FROM {table}"
986 except RuntimeError
as e:
987 self.
disp.log_error(f
"WHERE clause parsing failed: {e}", title)
990 if where_clause !=
"":
991 sql_command += f
" WHERE {where_clause}"
994 f
"sql_query = '{sql_command}', params = {where_params}", title)
995 resp = self.
sql_pool.run_and_fetch_all(
996 query=sql_command, values=where_params)
997 if isinstance(resp, int):
1000 "Failed to fetch the data from the table.", title
1006 self.
disp.log_debug(f
"Queried data: {resp}", title)
1007 if beautify
is False:
1010 if len(resp_list) == 0:
1013 if original_column
in (
"*", [
"*"]):
1015 self.
disp.log_debug(f
"Described table columns: {data}", title)
1016 if isinstance(data, int):
1019 if isinstance(original_column, str):
1020 data = [original_column]
1022 data = original_column
1026 def get_table_size(self, table: str, column: Union[str, List[str]], where: Union[str, List[str]] =
"") -> int:
1027 """Return the row count for a table.
1030 table (str): Name of the table to count rows in.
1031 column (Union[str, List[str]]): Column expression passed to ``COUNT(...)``.
1032 where (Union[str, List[str]]): Optional WHERE clause content; string or list joined by ``AND``.
1035 int: Number of rows on success, or ``SCONST.GET_TABLE_SIZE_ERROR`` on failure.
1037 title =
"get_table_size"
1038 self.
disp.log_debug(f
"fetching data from the table {table}", title)
1041 if self.
sql_injection.check_if_injections_in_strings([table, column])
is True:
1042 self.
disp.log_error(
"Injection detected.",
"sql")
1043 return SCONST.GET_TABLE_SIZE_ERROR
1045 if isinstance(column, list)
is True:
1046 column =
", ".join(column)
1048 sql_command = f
"SELECT COUNT({column}) FROM {table}"
1053 except RuntimeError
as e:
1054 self.
disp.log_error(f
"WHERE clause parsing failed: {e}", title)
1055 return SCONST.GET_TABLE_SIZE_ERROR
1057 if where_clause !=
"":
1058 sql_command += f
" WHERE {where_clause}"
1060 self.
disp.log_debug(
1061 f
"sql_query = '{sql_command}', params = {where_params}", title)
1062 resp = self.
sql_pool.run_and_fetch_all(
1063 query=sql_command, values=where_params)
1064 if isinstance(resp, int):
1066 self.
disp.log_error(
1067 "Failed to fetch the data from the table.", title
1069 return SCONST.GET_TABLE_SIZE_ERROR
1073 if len(resp_list) == 0:
1074 self.
disp.log_error(
1075 "There was no data returned by the query.", title
1077 return SCONST.GET_TABLE_SIZE_ERROR
1078 if isinstance(resp_list[0], tuple)
is False:
1079 self.
disp.log_error(
"The data returned is not a tuple.", title)
1080 return SCONST.GET_TABLE_SIZE_ERROR
1081 return resp_list[0][0]
1083 def update_data_in_table(self, table: str, data: Union[List[List[Union[str,
None, int, float]]], List[Union[str,
None, int, float]]], column: List[str], where: Union[str, List[str]] =
"") -> int:
1084 """Update rows in a table.
1087 table (str): Name of the table to update.
1088 data (Union[List[List[Union[str, None, int, float]]], List[Union[str, None, int, float]]]): Values aligned to ``column`` order; single row or list of rows.
1089 column (List[str]): Column names to update.
1090 where (Union[str, List[str]]): Optional WHERE clause content; string or list joined by ``AND``.
1093 int: ``self.success`` on success, or ``self.error`` on failure.
1095 title =
"update_data_in_table"
1096 msg = f
"Updating the data contained in the table: {table}"
1097 self.
disp.log_debug(msg, title)
1102 self.
disp.log_debug(
1103 "Checking the table/column name to make sure everything is parameterised."
1105 check_items = [table]
1106 if isinstance(column, list):
1107 check_items.extend([str(c)
for c
in column])
1109 check_items.append(str(column))
1110 if self.
sql_injection.check_if_injections_in_strings(check_items):
1111 self.
disp.log_error(
"Injection detected.",
"sql")
1114 self.
disp.log_debug(
"Injection checking passed")
1116 self.
disp.log_debug(
"No table column names provided, deducing")
1118 if isinstance(columns_raw, int):
1119 self.
disp.log_debug(
"Failed to deduce table column names")
1121 column = columns_raw
1124 self.
disp.log_debug(
1125 "Making sure that the column variable is of type List[str]"
1127 _tmp_cols2: Union[List[str], str] = self.
sanitize_functions.escape_risky_column_names(
1130 if isinstance(_tmp_cols2, list):
1133 column = [str(_tmp_cols2)]
1134 self.
disp.log_debug(
1135 "Made sure that the column was of the appropriate type"
1138 if isinstance(column, str)
and isinstance(data, str):
1141 column_length = len(column)
1143 column_length = len(column)
1144 self.
disp.log_debug(
1145 f
"data = {data}, column = {column}, length = {column_length}",
1150 self.
disp.log_debug(
"Building the set clause and parameters")
1151 set_parts: List[str] = []
1152 params: List[Union[str,
None, int, float]] = []
1153 for i
in range(column_length):
1154 set_parts.append(f
"{column[i]} = %s")
1159 normalised_cell: Union[
1165 self.
disp.log_debug(f
"Normalised cell: {normalised_cell}")
1166 params.append(normalised_cell)
1167 self.
disp.log_debug(f
"Set clause and parameters built: {set_parts}")
1168 self.
disp.log_debug(f
"Parameters: {params}")
1170 update_line =
", ".join(set_parts)
1171 sql_query = f
"UPDATE {table} SET {update_line}"
1176 except RuntimeError
as e:
1177 self.
disp.log_error(f
"WHERE clause parsing failed: {e}", title)
1179 params.extend(where_params)
1181 if where_clause !=
"":
1182 sql_query += f
" WHERE {where_clause}"
1184 self.
disp.log_debug(
1185 f
"sql_query = '{sql_query}', params = {params}",
1189 return self.
sql_pool.run_editing_command(sql_query, params, table=table, action_type=
"update")
1191 def insert_or_update_data_into_table(self, table: str, data: Union[List[List[Union[str,
None, int, float]]], List[Union[str,
None, int, float]]], columns: Union[List[str],
None] =
None) -> int:
1192 """Insert or update rows using the first column as key.
1195 table (str): Table name.
1196 data (Union[List[List[Union[str, None, int, float]]], List[Union[str, None, int, float]]]): Single row or list of rows to upsert.
1197 columns (Union[List[str], None], optional): Column names for ``data``; when None, infer columns from the table.
1200 int: ``self.success`` on success, or ``self.error`` on failure.
1202 title =
"insert_or_update_data_into_table"
1203 self.
disp.log_debug(
1204 "Inserting or updating data into the table.", title
1207 check_list = [table]
1209 check_list.extend(columns)
1210 if self.
sql_injection.check_if_injections_in_strings(check_list):
1211 self.
disp.log_error(
"SQL Injection detected.",
"sql")
1216 if isinstance(cols_raw, int):
1221 if columns
is not None and not isinstance(columns, list):
1223 columns = list(columns)
1225 columns = [str(columns)]
1228 table=table, column=columns, where=
"", beautify=
False
1231 if isinstance(table_content, int):
1232 if table_content != self.
success:
1233 self.
disp.log_critical(
1234 f
"Failed to retrieve data from table {table}", title
1237 table_content_list = table_content
1239 if not isinstance(table_content_list, list):
1240 self.
disp.log_error(
1241 f
"Unexpected table content type for table {table}", title
1245 if isinstance(data, list)
and data
and isinstance(data[0], list):
1246 self.
disp.log_debug(
"Processing double data List", title)
1247 table_content_dict = {}
1248 for line
in table_content_list:
1249 table_content_dict[str(line[0])] = line
1253 self.
disp.log_warning(
"Empty line, skipping.", title)
1256 if isinstance(line, str):
1257 line_list: List = [line]
1258 elif not isinstance(line, list):
1259 line_list: List = [line]
1262 node0 = str(line_list[0])
1263 if node0
in table_content_dict:
1268 f
"{columns[0]} = {node0}"
1272 if isinstance(columns, list):
1273 cols: List = columns
1275 cols: List = [columns]
1281 if isinstance(data, list):
1282 self.
disp.log_debug(
"Processing single data List", title)
1284 self.
disp.log_warning(
"Empty data List, skipping.", title)
1287 node0 = str(data[0])
1289 for line
in table_content_list:
1290 if str(line[0]) == node0:
1292 table, data, columns, f
"{columns[0]} = {node0}"
1296 if isinstance(columns, list):
1303 self.
disp.log_error(
1304 "Data must be of type List[str] or List[List[str]]", title
1309 """Insert (create) or update an SQL trigger into a MySQL or MariaDB database.
1312 trigger_name (str): The name of the trigger to create.
1313 table_name (str): The name of the table the trigger is being applied to.
1314 timing_event (str): The rule when the event is to be triggered. e.g., 'BEFORE INSERT'.
1315 body (str): The full SQL CREATE TRIGGER statement.
1318 int: ``self.success`` on success, or ``self.error`` on failure.
1320 title =
"insert_or_update_trigger"
1321 self.
disp.log_debug(
1322 f
"Creating or replacing trigger: {trigger_name}", title
1325 if self.
sql_injection.check_if_injections_in_strings([trigger_name, table_name]):
1326 self.
disp.log_error(
"SQL injection detected.", title)
1329 if self.
sql_injection.check_if_symbol_and_logic_gate_injection(timing_event):
1330 self.
disp.log_error(
"SQL injection detected", title)
1335 self.
disp.log_warning(
1336 f
"Unexpected drop_trigger result: {drop_result}", title
1340 return self.
insert_trigger(trigger_name, table_name, timing_event, body)
1343 """Delete rows from a table.
1346 table (str): Table name.
1347 where (Union[str, List[str]]): Optional WHERE clause to limit deletions.
1350 int: ``self.success`` on success, or ``self.error`` on failure.
1352 self.
disp.log_debug(
1353 f
"Removing data from table {table}",
1354 "remove_data_from_table"
1359 self.
disp.log_error(
"Injection detected.",
"sql")
1362 sql_query = f
"DELETE FROM {table}"
1367 except RuntimeError
as e:
1368 self.
disp.log_error(
1369 f
"WHERE clause parsing failed: {e}"
1373 if where_clause !=
"":
1374 sql_query += f
" WHERE {where_clause}"
1376 self.
disp.log_debug(
1377 f
"sql_query = '{sql_query}', params = {where_params}"
1380 return self.
sql_pool.run_editing_command(sql_query, where_params, table,
"delete")
1383 """Drop a table from the MySQL database.
1386 table (str): Name of the table to drop.
1389 int: ``self.success`` on success, or ``self.error`` on failure.
1392 - Performs SQL injection detection on the table name.
1393 - Uses ``DROP TABLE IF EXISTS`` to avoid errors when the table is missing.
1395 title =
"drop_table"
1396 self.
disp.log_debug(f
"Dropping table '{table}'", title)
1399 if self.
sql_injection.check_if_injections_in_strings([table]):
1400 self.
disp.log_error(
"Injection detected in table name.", title)
1405 table_safe = table.replace(
"'",
"''")
1406 query = f
"DROP TABLE IF EXISTS '{table_safe}';"
1407 self.
disp.log_debug(f
"Executing SQL: {query}", title)
1409 result = self.
sql_pool.run_and_commit(query=query, values=[])
1410 if isinstance(result, int)
and result == self.
error:
1411 self.
disp.log_error(f
"Failed to drop table '{table}'", title)
1414 self.
disp.log_info(f
"Table '{table}' dropped successfully.", title)
1417 except mysql.connector.PoolError
as oe:
1418 msg = f
"OperationalError while dropping table '{table}': {oe}"
1419 self.
disp.log_critical(msg, title)
1421 except mysql.connector.ProgrammingError
as e:
1422 msg = f
"SQL Error while dropping table '{table}': {e}"
1423 self.
disp.log_critical(msg, title)
1425 except Exception
as e:
1426 msg = f
"Unexpected error while dropping table '{table}': {e}"
1427 self.
disp.log_critical(msg, title)
1431 """Drop/Remove an existing SQL trigger if it exists.
1434 trigger_name (str): Name of the trigger to drop.
1437 int: ``self.success`` on success, or ``self.error`` on error.
1439 title =
"drop_trigger"
1440 self.
disp.log_debug(f
"Dropping trigger: {trigger_name}", title)
1442 if not trigger_name:
1443 self.
disp.log_error(
"Trigger name cannot be empty.", title)
1447 if self.
sql_injection.check_if_injections_in_strings([trigger_name]):
1448 self.
disp.log_error(
1449 "SQL Injection detected in trigger name.", title
1453 sql_query = f
"DROP TRIGGER IF EXISTS {trigger_name};"
1454 self.
disp.log_debug(f
"Executing SQL:\n{sql_query}", title)
1456 result = self.
sql_pool.run_editing_command(
1457 sql_query, [], trigger_name,
"drop_trigger")
1459 self.
disp.log_error(
1460 f
"Failed to drop trigger '{trigger_name}'.", title
1465 f
"Trigger '{trigger_name}' dropped successfully.", title
Union[int, List[Dict[str, Any]]] get_data_from_table(self, str table, Union[str, List[str]] column, Union[str, List[str]] where="", Literal[True] beautify=True)
int insert_or_update_data_into_table(self, str table, Union[List[List[Union[str, None, int, float]]], List[Union[str, None, int, float]]] data, Union[List[str], None] columns=None)
dict where_clause_double_space_skippers
bool _is_digit(self, str token)
Union[int, List[Tuple[Any, Any]]] get_data_from_table(self, str table, Union[str, List[str]] column, Union[str, List[str]] where="", Literal[False] beautify=False)
bool _is_quoted(self, str value)
Union[int, Union[List[Dict[str, Any]], List[Tuple[Any, Any]]]] get_data_from_table(self, str table, Union[str, List[str]] column, Union[str, List[str]] where="", bool beautify=True)
int remove_trigger(self, str trigger_name)
int insert_trigger(self, str trigger_name, str table_name, str timing_event, str body)
Union[List[str], int] get_table_column_names(self, str table_name)
str _strip_outer_quotes(self, str value)
Optional[int] db_version_retro
Optional[int] db_version_major
SQLManageConnections sql_pool
SQLSanitiseFunctions sanitize_functions
Union[int, Dict[str, str]] get_triggers(self)
bool _where_space_handler(self, str token, List[str] rebuilt_tokens, bool skip_space=False)
int update_data_in_table(self, str table, Union[List[List[Union[str, None, int, float]]], List[Union[str, None, int, float]]] data, List[str] column, Union[str, List[str]] where="")
Optional[Tuple[int, int, int]] db_version
None __init__(self, SQLManageConnections sql_pool, int success=0, int error=84, bool debug=False)
int insert_or_update_trigger(self, str trigger_name, str table_name, str timing_event, str body)
int remove_data_from_table(self, str table, Union[str, List[str]] where="")
Union[int, List[str]] get_table_names(self)
Union[str, None, int, float] _normalize_cell(self, object cell)
Optional[Tuple[int, int, int]] get_database_version(self)
Union[int, str] get_trigger(self, str trigger_name, Optional[str] db_name=None)
None _check_where_node(self, str token)
None _sanity_check_where_clause(self, str clause_str)
re.Pattern compiled_digit_check
Union[int, List[str]] get_trigger_names(self, Optional[str] db_name=None)
int create_table(self, str table, List[Tuple[str, str]] columns)
dict where_clause_safe_tokens
List[str] _tokenize_where(self, str clause)
RuntimeError where_injection_exception
int get_table_size(self, str table, Union[str, List[str]] column, Union[str, List[str]] where="")
dict where_clause_single_space_skippers_pre
SQLInjection sql_injection
Tuple[str, List[Union[str, int, float, None]]] _parse_where_clause(self, Union[str, List[str]] where)
int insert_data_into_table(self, str table, Union[List[List[Union[str, None, int, float]]], List[Union[str, None, int, float]]] data, Union[List[str], None] column=None)
Optional[int] db_version_minor
dict where_clause_single_space_skippers_post
re.Pattern compiled_where
str _escape_risky_column_name(self, str token)
int remove_table(self, str table)
Union[int, List[Any]] describe_table(self, str table)
Tuple[str, List[Union[str, int, float, None]]] _check_complex_clause_for_injection(self, str clause_str)