2# +==== BEGIN CatFeeder =================+
5# ...............)..(.')
7# ...............\(__)|
8# Inspired by Joan Stark
9# source https://www.asciiart.eu/
13# FILE: sql_sanitisation_functions.py
14# CREATION DATE: 11-10-2025
15# LAST Modified: 1:50:47 06-02-2026
17# This is the backend server in charge of making the actual website work.
19# COPYRIGHT: (c) Cat Feeder
20# PURPOSE: File in charge of cleaning and sanitising sql queries before they are submitted to the database.
22# +==== END CatFeeder =================+
26from typing
import List, Dict, Any, Union, Optional, Tuple
28from display_tty
import Disp, initialise_logger
30from .
import sql_constants
as SCONST
31from .sql_time_manipulation
import SQLTimeManipulation
35 """Provide functions to sanitize SQL queries before execution.
37 This class contains methods to clean and escape SQL queries, ensuring
38 they are safe to execute and free from injection vulnerabilities.
41 disp (Disp): Logger instance for debugging and error reporting.
42 risky_keywords (List[str]): List of risky SQL keywords to sanitize.
43 keyword_logic_gates (List[str]): List of logical operators to handle.
44 none_value (str): Default value for NULL representation.
45 sql_time_manipulation (SQLTimeManipulation): Handles time-related SQL operations.
48 disp: Disp = initialise_logger(__qualname__,
False)
50 def __init__(self, success: int = 0, error: int = 84, debug: bool =
False) ->
None:
51 """Initialize the SQLSanitiseFunctions instance.
54 success (int, optional): Numeric success code. Defaults to 0.
55 error (int, optional): Numeric error code. Defaults to 84.
56 debug (bool, optional): Enable debug logging. Defaults to False.
59 self.
disp.update_disp_debug(debug)
60 self.
disp.log_debug(
"Initialising...")
73 self.
disp.log_debug(
"Initialised")
76 """Escape characters in a SQL cell to prevent query breaking.
79 cell (Optional[str]): The cell to sanitize.
82 str: Sanitized string safe for SQL queries.
88 if char
in (
"'",
'"',
"\\",
'\0',
"\r"):
90 f
"Escaped character '{char}' in '{cell}'.",
99 """Escape risky column names to prevent SQL injection.
102 columns (Union[List[str], str]): Column names to sanitize.
105 Union[List[str], str]: Sanitized column names.
107 title =
"_escape_risky_column_names"
108 self.
disp.log_debug(
"Escaping risky column names.", title)
109 if isinstance(columns, str):
113 for index, item
in enumerate(data):
115 key, value = item.split(
"=", maxsplit=1)
116 self.
disp.log_debug(f
"key = {key}, value = {value}", title)
118 self.
disp.log_warning(
119 f
"Escaping risky column name '{key}'.",
120 "_escape_risky_column_names"
122 data[index] = f
"`{key}`={value}"
124 self.
disp.log_warning(
125 f
"Escaping risky column name '{item}'.",
126 "_escape_risky_column_names"
128 data[index] = f
"`{item}`"
131 self.
disp.log_debug(
"Escaped risky column names.", title)
132 if isinstance(columns, str):
137 """Ensure a value is safely passed as a string in an SQL query.
140 value (Optional[str]): The value to protect.
143 str: Protected value safe for SQL queries.
145 title =
"_protect_value"
146 self.
disp.log_debug(f
"protecting value: {value}", title)
148 self.
disp.log_debug(
"Value is none, thus returning NULL", title)
151 if isinstance(value, str)
is False:
152 self.
disp.log_debug(
"Value is not a string, converting", title)
156 self.
disp.log_debug(
"Value is empty, returning ''", title)
159 if value[0] ==
'`' and value[-1] ==
'`':
161 "string has special backtics, skipping.", title
167 "Value already has a single quote at the start, removing", title
172 "Value already has a single quote at the end, removing", title
177 f
"Value before quote escaping: {value}", title
179 protected_value = value.replace(
"'",
"''")
181 f
"Value after quote escaping: {protected_value}", title
184 protected_value = f
"'{protected_value}'"
186 f
"Value after being converted to a string: {protected_value}.",
189 return protected_value
192 """Escape risky column names in WHERE mode.
195 columns (Union[List[str], str]): Column names to sanitize.
198 Union[List[str], str]: Sanitized column names.
200 title =
"_escape_risky_column_names_where_mode"
202 "Escaping risky column names in where mode.", title
205 if isinstance(columns, str):
210 for index, item
in enumerate(data):
212 key, value = item.split(
"=", maxsplit=1)
213 self.
disp.log_debug(f
"key = {key}, value = {value}", title)
217 self.
disp.log_warning(
218 f
"Escaping risky column name '{key}'.", title
220 data[index] = f
"`{key}`={protected_value}"
222 data[index] = f
"{key}={protected_value}"
225 self.
disp.log_warning(
226 f
"Escaping risky column name '{item}'.",
230 data[index] = protected_value
232 self.
disp.log_debug(
"Escaped risky column names in where mode.", title)
234 if isinstance(columns, str):
238 def check_sql_cell(self, cell: Union[str, int, float,
None], raw: bool =
True) -> Union[str, Union[str, int, float,
None]]:
239 """Check and sanitize a SQL cell value.
242 cell (Union[str, int, float, None]): The cell value to check.
243 raw (bool, optional): Whether to process raw values. Defaults to True.
246 Union[str, Union[str, int, float, None]]: Sanitized cell value.
248 title: str =
"check_sql_cell"
250 if raw
and isinstance(cell, (float, int)):
252 if raw
and cell
is None:
254 if isinstance(cell, (str, float, int))
is True:
255 cell_cleaned = str(cell)
256 if isinstance(cell, str)
is False:
257 msg =
"The expected type of the input is a string,"
258 msg += f
"but got {type(cell)}"
259 self.
disp.log_error(msg, title)
263 if tmp
in (
"now",
"now()"):
265 elif tmp
in (
"current_date",
"current_date()"):
269 if ";base" not in tmp:
270 self.
disp.log_debug(f
"result = {tmp}", title)
275 def beautify_table(self, column_names: List[str], table_content: List[List[Any]]) -> Union[List[Dict[str, Any]], int]:
276 """Convert raw table rows to a list of dictionaries keyed by column.
279 column_names (List[str]): Column descriptors (name as first item).
280 table_content (List[List[Any]]): Raw rows as sequences.
283 Union[List[Dict[str, Any]], int]: Beautified table or error code.
285 self.
disp.log_debug(
"Beautifying table.")
286 data: List[Dict[str, Any]] = []
287 if len(column_names) == 0:
288 self.
disp.log_error(
"There are no provided table column names.")
290 if len(table_content) == 0:
291 self.
disp.log_warning(
"There is no table content.")
294 column_length = len(column_names)
297 columns_are_tuples = isinstance(column_names[0], tuple)
300 if columns_are_tuples:
302 for col
in column_names:
303 column_keys.append(col[0])
305 column_keys = column_names
308 for row
in table_content:
309 cell_length = len(row)
310 if cell_length != column_length:
311 self.
disp.log_warning(
312 "Table content and column lengths do not correspond."
316 for index
in range(min(cell_length, column_length)):
317 row_dict[column_keys[index]] = row[index]
319 data.append(row_dict)
321 self.
disp.log_debug(f
"beautified_table = {data}")
325 """Compile the line required for an SQL update to work.
328 line (List): Data line to compile.
329 column (List): Column names.
330 column_length (int): Number of columns.
333 str: Compiled SQL update line.
335 title =
"compile_update_line"
337 self.
disp.log_debug(
"Compiling update line.", title)
338 for i
in range(0, column_length):
340 final_line += f
"{column[i]} = {cell_content}"
341 if i < column_length - 1:
343 if i == column_length:
345 self.
disp.log_debug(f
"line = {final_line}", title)
348 def _process_single_sql_line(self, line: List[Union[str, int, float,
None]], column_length: int) -> Tuple[str, List[Union[str, int, float,
None]]]:
349 """Process a single SQL value line while preserving column logic.
352 line (List[Union[str, int, float, None]]): Data line to process.
353 column_length (int): Number of columns.
356 Tuple[str, List[Union[str, int, float, None]]]: Placeholder string and values.
358 title: str =
"_process_single_sql_line"
359 if not isinstance(line, list):
361 line_length = len(line)
363 placeholders: List[str] = []
364 values: List[Union[str, int, float,
None]] = []
366 if self.
debugdebug and ";base" not in str(line):
367 self.
disp.log_debug(f
"line = {line}", title)
369 for i
in range(column_length):
372 f
"Line shorter than expected (columns={column_length}, data={line_length}). "
373 f
"Missing columns will not be inserted beyond index {i}."
375 self.
disp.log_warning(msg, title)
379 values.append(checked_value)
380 placeholders.append(
"%s")
382 if i == column_length - 1
and line_length > column_length:
384 f
"The line is longer than the number of columns ({line_length} > {column_length}), "
385 f
"truncating excess values."
387 self.
disp.log_warning(msg, title)
390 line_placeholder =
"(" +
", ".join(placeholders) +
")"
393 msg = f
"line_placeholder = '{line_placeholder}', type = {type(line_placeholder)}"
394 self.
disp.log_debug(msg, title)
395 self.
disp.log_debug(f
"values = {values}", title)
397 tuple_version = [line_placeholder, values]
398 return tuple(tuple_version)
400 def process_sql_line(self, line: Union[str, int, float, List[Union[str, int, float,
None]], List[List[Union[str, int, float,
None]]],
None], column: List[str], column_length: int = -1) -> Tuple[str, List[Union[str, int, float,
None]]]:
401 """Convert a dataset to MySQL/MariaDB-safe placeholders.
404 line (Union[str, int, float, List, None]): Data to process.
405 column (List[str]): Column names.
406 column_length (int, optional): Number of columns. Defaults to -1.
409 Tuple[str, List[Union[str, int, float, None]]]: Placeholder string and values.
411 title: str =
"process_sql_line"
413 if column_length == -1:
414 column_length = len(column)
416 if not isinstance(line, list):
419 results: List[str] = []
420 all_values: List[Union[str, int, float,
None]] = []
422 processed_list_instances: int = 0
425 if isinstance(line, list)
and len(line) > 0
and isinstance(line[0], list):
427 if isinstance(row, list):
431 results.append(placeholders)
432 all_values.extend(vals)
433 processed_list_instances += 1
436 "Incorrect data format, aborting process")
437 line_final: str =
", ".join(results)
440 f
"Final placeholder string = '{line_final}'", title)
441 self.
disp.log_debug(f
"Total values = {len(all_values)}", title)
443 return line_final, all_values
446 if isinstance(line, list)
and not isinstance(line[0], list):
448 line_length = len(line)
449 for index, row
in enumerate(line):
450 if self.
debugdebug and ";base" not in str(row):
451 self.
disp.log_debug(f
"row = {row}", title)
453 if not isinstance(row, list):
455 all_values.append(checked_value)
459 "Incorrect data format, aborting process"
466 if index < line_length - 1
and index - processed_list_instances < column_length - 1:
469 if index - processed_list_instances == column_length - 1:
470 if index - processed_list_instances < len(line) - 1:
472 "The line is longer than the number of columns, truncating."
474 self.
disp.log_warning(msg, title)
478 if buffer
not in (
"()",
""):
479 results.append(buffer)
481 line_final: str =
", ".join(results)
485 f
"Final placeholder string = '{line_final}'", title
487 self.
disp.log_debug(f
"Total values = {len(all_values)}", title)
489 return line_final, all_values
492 """Check for double queries in a trigger.
495 sql (str): SQL trigger statement.
496 table_name (str): Name of the table.
499 Union[int, str]: Validated SQL trigger or error code.
501 title: str =
"_check_for_double_query"
503 normalized_lower = sql.lower()
506 if len(re.findall(
r"\bcreate\s+trigger\b", normalized_lower)) > 1:
508 "Multiple CREATE TRIGGER statements detected.", title
513 for keyword
in SCONST.SQL_RISKY_DDL_TRIGGER_KEYWORDS:
514 if keyword
in normalized_lower:
516 f
"Unsafe keyword '{keyword.strip()}' detected in trigger SQL.", title
521 if re.match(
r"(?i)^(mysql|information_schema|performance_schema|sys)\.", table_name):
523 "Trigger cannot be created on system schema tables.", title)
527 begin_count = normalized_lower.count(
"begin")
528 end_count = normalized_lower.count(
"end")
529 if begin_count != end_count:
531 f
"Unbalanced BEGIN/END block ({begin_count} BEGIN vs {end_count} END).", title
536 if begin_count == 0
and sql.count(
";") > 1:
537 self.
disp.log_warning(
538 "Multiple SQL statements found outside BEGIN/END. "
539 "MySQL triggers only support one statement unless wrapped.",
544 if not re.match(
r"(?i)^CREATE\s+TRIGGER\s+[`\"\w]+", sql):
545 self.
disp.log_error(
"Malformed CREATE TRIGGER statement.", title)
550 """Clean and validate SQL trigger creation.
553 trigger_name (str): Name of the trigger.
554 table_name (str): Name of the table.
555 timing_event (str): Timing event for the trigger.
556 body (str): Trigger body.
559 Union[str, int]: Validated SQL trigger or error code.
561 title =
"clean_trigger_creation"
563 if not all([trigger_name, table_name, timing_event, body]):
564 self.
disp.log_error(
"All parameters must be provided.", title)
568 self.
disp.log_debug(f
"Raw trigger SQL received: {sql[:200]}...", title)
571 if not re.match(
r"(?i)^\s*CREATE\s+TRIGGER\b", sql):
574 f
"CREATE TRIGGER `{trigger_name}` "
575 f
"{timing_event} ON `{table_name}` "
576 f
"FOR EACH ROW {sql}"
579 f
"Wrapped raw body into CREATE TRIGGER template:\n{sql}", title
585 r"(?i)\bCREATE\s+TRIGGER\s+IF\s+NOT\s+EXISTS\b",
591 sql = re.sub(
r"(?im)^\s*DELIMITER\s+\S+\s*$",
"", sql)
594 sql = re.sub(
r"(?s)\s*END\s*[\$;/]+\s*$",
"END;", sql)
597 sql = re.sub(
r"[ \t]+",
" ", sql).strip()
600 if not re.match(
r"(?i)^CREATE\s+TRIGGER\s+[`\"\w]+", sql):
602 f
"Malformed trigger SQL after cleaning → {sql[:80]}", title
606 self.
disp.log_debug(f
"Normalized trigger SQL:\n{sql}", title)
List[str] keyword_logic_gates
Union[List[Dict[str, Any]], int] beautify_table(self, List[str] column_names, List[List[Any]] table_content)
Tuple[str, List[Union[str, int, float, None]]] process_sql_line(self, Union[str, int, float, List[Union[str, int, float, None]], List[List[Union[str, int, float, None]]], None] line, List[str] column, int column_length=-1)
Union[List[str], str] escape_risky_column_names(self, Union[List[str], str] columns)
Union[int, str] _check_for_double_query_in_trigger(self, str sql, str table_name)
Union[List[str], str] escape_risky_column_names_where_mode(self, Union[List[str], str] columns)
str protect_sql_cell(self, Optional[str] cell)
str compile_update_line(self, List line, List column, int column_length)
Union[str, Union[str, int, float, None]] check_sql_cell(self, Union[str, int, float, None] cell, bool raw=True)
None __init__(self, int success=0, int error=84, bool debug=False)
Union[str, int] clean_trigger_creation(self, str trigger_name, str table_name, str timing_event, str body)
SQLTimeManipulation sql_time_manipulation
Tuple[str, List[Union[str, int, float, None]]] _process_single_sql_line(self, List[Union[str, int, float, None]] line, int column_length)
str _protect_value(self, Optional[str] value)