2# +==== BEGIN CatFeeder =================+
5# ...............)..(.')
7# ...............\(__)|
8# Inspired by Joan Stark
9# source https://www.asciiart.eu/
13# FILE: sql_injection.py
14# CREATION DATE: 11-10-2025
15# LAST Modified: 22:33:21 12-01-2026
17# SQL injection detection module for backend connectors.
19# COPYRIGHT: (c) Cat Feeder
20# PURPOSE: Detect, log, and prevent SQL injection attempts.
22# +==== END CatFeeder =================+
28from typing
import Union, List, Dict, Any, Callable, Sequence, overload, Optional
30 from email_validator
import validate_email, EmailNotValidError
32 EmailNotValidError =
None
35from display_tty
import Disp, initialise_logger
37from .sql_constants
import RISKY_KEYWORDS, KEYWORD_LOGIC_GATES
41 """Detect and prevent SQL injection attempts.
43 This class provides helper methods to scan strings or nested lists of
44 strings for symbols, keywords, or logical operators commonly used in
45 SQL injection attacks. It also includes utilities for sanitizing input
46 and compiling regex patterns for detection.
49 disp: Disp = initialise_logger(__qualname__,
False)
51 def __init__(self, error: int = 84, success: int = 0, debug: bool =
False) ->
None:
52 """Initialize the SQLInjection helper.
55 error (int): Numeric error code returned by helper predicates (default is 84).
56 success (int): Numeric success code, unused by predicates (default is 0).
57 debug (bool): Enable debug logging when True (default is False).
60 self.
disp.update_disp_debug(debug)
61 self.
disp.log_debug(
"Initialising...")
79 _base_symbols: List[str] = [
80 ';',
'--',
'/*',
'*/',
81 '#',
'@@',
'@',
"'",
'"',
'`',
'||'
83 _base_keywords: List[str] = [
84 'SELECT',
'INSERT',
'UPDATE',
'DELETE',
85 'DROP',
'CREATE',
'ALTER',
'TABLE',
'UNION',
'JOIN',
'WHERE'
87 _base_keywords.extend(RISKY_KEYWORDS)
88 _base_logic_gates: List[str] = [
'OR',
'AND',
'NOT']
89 _base_logic_gates.extend(KEYWORD_LOGIC_GATES)
108 r"^\s*(?:[^\s=]+\s*=\s*)?['\"]?(?P<local>(?:\"(?:\\.|[^\\\"])+\"|(?:[\w!#$%&'*+/=?^`{|}~-]+(?:\.[\w!#$%&'*+/=?^`{|}~-]+)*)))@(?P<domain>(?:[\w-]+\.)+[\w-]{2,})['\"]?\s*$",
109 re.UNICODE | re.IGNORECASE,
120 r"order\s?by\s?(asc|desc)?",
126 self.
safe_regexes.append(re.compile(pat, re.IGNORECASE))
127 self.
disp.log_debug(
"Initialised")
140 material_raw: List[str]
144 """Sanitize class-level checking material by converting to lowercase.
147 material_raw (Union[List[str], str]): Raw material to sanitize.
150 Union[List[str], str]: Sanitized material.
152 if isinstance(material_raw, list):
154 for i
in material_raw:
155 if isinstance(i, str):
156 result.append(i.lower())
160 if isinstance(material_raw, str):
161 return material_raw.lower()
173 material_raw: List[str]
177 """Sanitize user input by converting to lowercase.
180 material_raw (Union[List[str], str]): Raw material to sanitize.
183 Union[List[str], str]: Sanitized material.
185 if isinstance(material_raw, list):
187 for i
in material_raw:
188 if isinstance(i, str):
189 result.append(i.lower())
193 if isinstance(material_raw, str):
194 return material_raw.lower()
198 """Precompile regex patterns with heuristics for boundary usage.
201 tokens (List[str]): Tokens to compile into regex patterns.
204 List[re.Pattern]: Compiled regex patterns.
208 escaped = re.escape(token)
211 if c
in r".*+?|{}[]()^$\"'\\":
213 if token.isalnum()
or token.replace(
"_",
"").isalnum():
214 pattern = rf
"\b{token}\b"
219 patterns.append(re.compile(pattern, re.IGNORECASE))
223 """Check if string matches known safe patterns.
226 string (str): String to check.
229 bool: True if string matches a safe pattern, False otherwise.
237 """Check if string is valid base64.
240 string (str): Candidate string.
243 bool: True if string decodes as base64, False otherwise.
246 base64.b64decode(string, validate=
True)
248 except (binascii.Error, ValueError):
253 def _scan_compiled(self, needle: str, regex_list: List[re.Pattern], parent_function: str) -> bool:
254 """Scan a string against a list of compiled regex patterns.
257 needle (str): String to scan.
258 regex_list (List[re.Pattern]): List of compiled regex patterns.
259 parent_function (str): Name of the calling function for logging.
262 bool: True if any pattern matches, False otherwise.
263 True if any pattern matches, False otherwise.
265 for regex
in regex_list:
266 if regex.search(needle):
269 f
"Failed for {needle}, pattern {regex.pattern} matched.",
276 """Check if a string is a valid CSS-style hex colour code.
278 The input is preprocessed:
279 - Strips surrounding whitespace and optional key=value wrappers
280 - Strips surrounding quotes
281 - Rejects internal whitespace
282 - Validates starts with '#' and allowed hex lengths
284 Acceptable formats (without '#'):
293 colour (str): The colour string to validate.
296 bool: True if valid hex colour, False otherwise.
299 colour, function=
"_is_hex_colour_valid:_extract_wrapped_value"
305 if not s.startswith(
"#"):
314 length = len(hex_part)
317 allowed_lengths = {3, 4, 6, 8, 9, 12}
318 if length
not in allowed_lengths:
330 """Return True if string is purely numeric."""
331 return bool(re.fullmatch(
r'\d+(\.\d+)?', s))
334 """Extract and return the e-mail candidate from raw string.
336 Handles optional ``key=value`` wrappers and surrounding quotes.
339 raw (str): Raw string to extract email from.
342 Optional[str]: The inner candidate string or None if it cannot be an e-mail
343 (missing '@' or contains whitespace).
345 if not isinstance(raw, str)
or "@" not in raw:
346 self.
disp.log_debug(
"No @'s in string", function)
350 m_kv = re.match(
r"^\s*([^\s=]+)\s*=\s*(.+)$", s)
351 self.
disp.log_debug(f
"s='{s}', m_kv='{m_kv}'", function)
353 candidate = m_kv.group(2).strip()
356 self.
disp.log_debug(f
"candidate='{candidate}'", function)
359 (candidate.startswith(
"'")
and candidate.endswith(
"'"))
360 or (candidate.startswith(
'"')
and candidate.endswith(
'"'))
363 "' or \" found at beginning and end of string, stripping.",
366 candidate = candidate[1:-1]
368 if re.search(
r"\s", candidate):
370 "string found in candidate, returning None", function)
376 """Extract and normalize a potentially wrapped value.
378 Handles optional `key=value` wrappers, strips surrounding quotes and
379 whitespace, and returns None for inputs containing internal whitespace
380 or that are not strings. This mirrors the preprocessing used for
381 e-mail extraction but is generic for other single-value tokens.
384 raw (str): Raw input to process.
385 function (str): Caller name for debug logging.
388 Optional[str]: The normalized inner value, or ``None`` if the
389 input is not a valid single token.
391 if not isinstance(raw, str):
392 self.
disp.log_debug(
"Not a string", function)
396 m_kv = re.match(
r"^\s*([^\s=]+)\s*=\s*(.+)$", s)
398 s = m_kv.group(2).strip()
401 if (s.startswith(
"'")
and s.endswith(
"'"))
or (s.startswith(
'"')
and s.endswith(
'"')):
405 if re.search(
r"\s", s):
407 "Whitespace found in candidate, rejecting", function)
412 def _is_email(self, raw: str, function: str =
"_is_email") -> Optional[str]:
413 """Check if the provided text is an e-mail and return normalized value.
415 This accepts either a plain e-mail or a `key=value` pair where the
416 value is an e-mail and returns the normalized e-mail string. If the
417 raw token does not represent an exact e-mail (for example it contains
418 trailing SQL such as ``user@example.com; DROP TABLES;``) this returns
422 raw (str): The input to check.
425 Optional[str]: Normalized email string if valid, None otherwise.
429 f
"{function}:_extract_email_candidate"
431 if candidate
is None:
432 self.
disp.log_debug(
"Candidate is none, returning none", function)
438 if not email_re.fullmatch(raw):
439 self.
disp.log_debug(
"fullmatch failed, returning None", function)
446 if validate_email
and EmailNotValidError:
448 "validate_email and EmailNotValidError are present", function)
450 res = validate_email(candidate, check_deliverability=
False)
451 self.
disp.log_debug(f
"res: {res}", function)
453 norm = getattr(res,
"normalized",
None)
or getattr(res,
"email",
None)
or getattr(
454 res,
"ascii_email",
None)
or candidate
455 self.
disp.log_debug(f
"norm={norm}", function)
457 except EmailNotValidError:
463 self.
disp.log_debug(f
"candidate={candidate}", function)
469 """Detect injection-like symbols in the input.
471 This looks for characters or sequences commonly used in SQL
472 injection payloads (for example semicolon or double-dash). If ``string``
473 is a list, each element is checked recursively.
476 string (Union[str, None, int, float, Sequence]): String or list of strings to scan.
479 bool: True when an injection-like symbol is detected, False otherwise.
483 if isinstance(string, list):
489 self.
disp.log_debug(f
"raw: '{raw}'")
492 self.
disp.log_debug(
"E-mail found")
496 self.
disp.log_debug(
"Hex colour detected; treating as safe input")
506 "check_if_symbol_sql_injection:_scan_compiled"
508 self.
disp.log_debug(f
"Final response={final}")
512 """Detect SQL keywords in the input.
514 This checks for common SQL command keywords (SELECT, DROP, UNION,
515 etc.). If ``string`` is a list, each element is checked recursively.
518 string (Union[str, None, int, float, Sequence]): String or list of strings to scan.
521 bool: True when an SQL keyword is found, False otherwise.
524 msg =
"(check_if_command_sql_injection) string = "
525 msg += f
"'{string}', type(string) = '{type(string)}'"
526 self.
disp.disp_print_debug(msg)
527 if isinstance(string, list):
535 self.
disp.log_debug(f
"raw: '{raw}'")
538 self.
disp.log_debug(
"E-mail found")
542 self.
disp.log_debug(
"Hex colour detected; treating as safe input")
552 "check_if_command_sql_injection:_scan_compiled"
554 self.
disp.log_debug(f
"Final response={final}")
558 """Detect logical operators (AND/OR/NOT) in the input.
560 Useful to catch attempts that combine conditions to bypass simple
561 filters. Accepts a string or list of strings.
564 string (Union[str, None, int, float, Sequence]): String or list of strings to scan.
567 bool: True when a logic gate is present, False otherwise.
571 if isinstance(string, list):
577 self.
disp.log_debug(f
"raw: '{raw}'")
580 self.
disp.log_debug(
"E-mail found")
584 self.
disp.log_debug(
"Hex colour detected; treating as safe input")
594 "check_if_logic_gate_sql_injection:_scan_compiled"
596 self.
disp.log_debug(f
"Final response={final}")
600 """Combined check for symbol- or keyword-based injection patterns.
603 string (Union[str, None, int, float, Sequence]): Input to scan.
606 bool: True when either symbol- or keyword-based injection is found.
610 if is_symbol
or is_command:
615 """Combined check for symbol- or logic-gate-based injection patterns.
618 string (Union[str, None, int, float, Sequence]): Input to scan.
621 bool: True when symbol- or logic-gate-based injection is found.
625 if is_symbol
or is_logic_gate:
630 """Combined check for keyword- or logic-gate-based injection patterns.
633 string (Union[str, None, int, float, Sequence]): Input to scan.
636 bool: True when a command or logic-gate-based injection is found.
640 if is_command
or is_logic_gate:
644 def check_if_sql_injection(self, string: Union[Union[str,
None, int, float], Sequence[Union[str,
None, int, float]]]) -> bool:
645 """High-level SQL injection detection using all configured checks.
647 This method runs a combined scan (symbols, keywords, and logic gates)
648 and returns True if any of the component checks considers the input
652 string (Union[str, None, int, float, Sequence]): Input to scan; may be a string or a
653 list (including nested lists).
656 bool: True when an injection-like pattern is detected, False otherwise.
660 if isinstance(string, list):
666 self.
disp.log_debug(f
"raw: '{raw}'")
669 self.
disp.log_debug(
"E-mail found")
673 self.
disp.log_debug(
"Hex colour detected; treating as safe input")
681 string, self.
regex_map[
'all'],
"check_if_sql_injection:_scan_compiled")
682 self.
disp.log_debug(f
"Final response={final}")
685 def check_if_injections_in_strings(self, array_of_strings: Union[Union[str,
None, int, float], Sequence[Union[str,
None, int, float]], Sequence[Sequence[Union[str,
None, int, float]]]]) -> bool:
686 """Scan an array (possibly nested) of strings for injection patterns.
688 This convenience function accepts a string, a list of strings, or a
689 nested list of strings and returns True if any element appears to be
693 array_of_strings (Union[str, None, int, float, Sequence, Sequence[Sequence]]): Item(s) to scan.
696 bool: True when an injection-like value is detected.
698 if array_of_strings
is None:
700 if isinstance(array_of_strings, list):
701 for i
in array_of_strings:
702 if isinstance(i, list):
718 def run_test(self, title: str, array: List[Any], function: Callable[[Any], bool], expected_response: bool =
False, global_status: int = 0) -> int:
719 """Run a small functional test over the injection-checker functions.
721 This helper is used by :meth:`test_injection_class` and not by the
722 production code path. It calls ``function`` for each element in
723 ``array`` and compares the result to ``expected_response``.
726 title (str): Short test title printed to stdout.
727 array (List[Any]): Items to test (may be strings or nested lists).
728 function (Callable[[Any], bool]): Function to call for each item.
729 expected_response (bool): Expected boolean response for each call.
730 global_status (int): Running global status to update.
733 int: Updated global status (``0`` for success, error code otherwise).
736 global_response = global_status
737 print(f
"{title}", end=
"")
740 if function(i) != expected_response:
742 global_response = err
744 return global_response
747 """Run a small suite of self-tests for the injection checks.
750 int: ``0`` on success, non-zero error code if any test fails.
753 global_status = success
759 "SELECT * FROM table;",
763 title=
"Logic gate test:",
766 expected_response=
True,
767 global_status=global_status
770 title=
"Keyword check:",
773 expected_response=
True,
774 global_status=global_status
777 title=
"Symbol check:",
780 expected_response=
True,
781 global_status=global_status
784 title=
"All injections:",
787 expected_response=
True,
788 global_status=global_status
791 title=
"Array check:",
794 expected_response=
True,
795 global_status=global_status
798 title=
"Double array check:",
799 array=[self.
all, self.
all],
801 expected_response=
True,
802 global_status=global_status
805 title=
"SQL sentences:",
806 array=test_sentences,
808 expected_response=
True,
809 global_status=global_status
814if __name__ ==
"__main__":
817 res = II.test_injection_class()
818 print(f
"test status = {res}")
None __init__(self, int error=84, int success=0, bool debug=False)
Union[List[str], str] _sanitize_usr_input(self, Union[List[str], str] material_raw)
bool check_if_sql_injection(self, Union[Union[str, None, int, float], Sequence[Union[str, None, int, float]]] string)
Union[List[str], str] _sanitize_class_checking_material(self, Union[List[str], str] material_raw)
bool check_if_symbol_and_logic_gate_injection(self, Union[Union[str, None, int, float], Sequence[Union[str, None, int, float]]] string)
List[re.Pattern] _compile_patterns(self, List[str] tokens)
int run_test(self, str title, List[Any] array, Callable[[Any], bool] function, bool expected_response=False, int global_status=0)
bool check_if_injections_in_strings(self, Union[Union[str, None, int, float], Sequence[Union[str, None, int, float]], Sequence[Sequence[Union[str, None, int, float]]]] array_of_strings)
str _sanitize_usr_input(self, str material_raw)
bool check_if_command_sql_injection(self, Union[Union[str, None, int, float], Sequence[Union[str, None, int, float]]] string)
Optional[str] _is_email(self, str raw, str function="_is_email")
List[str] _sanitize_usr_input(self, List[str] material_raw)
bool check_if_symbol_and_command_injection(self, Union[Union[str, None, int, float], Sequence[Union[str, None, int, float]]] string)
bool check_if_logic_gate_sql_injection(self, Union[Union[str, None, int, float], Sequence[Union[str, None, int, float]]] string)
bool _scan_compiled(self, str needle, List[re.Pattern] regex_list, str parent_function)
List[str] _sanitize_class_checking_material(self, List[str] material_raw)
bool _is_numeric(self, str s)
bool _is_base64(self, str string)
str _sanitize_class_checking_material(self, str material_raw)
bool check_if_symbol_sql_injection(self, Union[Union[str, None, int, float], Sequence[Union[str, None, int, float]]] string)
bool _is_hex_colour_valid(self, str colour)
bool check_if_command_and_logic_gate_injection(self, Union[Union[str, None, int, float], Sequence[Union[str, None, int, float]]] string)
bool _is_safe_pattern(self, str string)
int test_injection_class(self)
Optional[str] _extract_email_candidate(self, str raw, str function="_extract_email_candidate")
Optional[str] _extract_wrapped_value(self, str raw, str function="_extract_wrapped_value")