Cat Feeder  1.0.0
The Cat feeder project
Loading...
Searching...
No Matches
sql_query_boilerplates.py
Go to the documentation of this file.
1r"""
2# +==== BEGIN CatFeeder =================+
3# LOGO:
4# ..............(..../\
5# ...............)..(.')
6# ..............(../..)
7# ...............\‍(__)|
8# Inspired by Joan Stark
9# source https://www.asciiart.eu/
10# animals/cats
11# /STOP
12# PROJECT: CatFeeder
13# FILE: sql_query_boilerplates.py
14# CREATION DATE: 11-10-2025
15# LAST Modified: 1:39:32 06-02-2026
16# DESCRIPTION:
17# This is the backend server in charge of making the actual website work.
18# /STOP
19# COPYRIGHT: (c) Cat Feeder
20# PURPOSE:
21# File in charge of containing the interfacing between an sql library and the program.
22# This contains functions that simplify the process of interracting with databases as well as check for injection attempts.
23# /STOP
24# // AR
25# +==== END CatFeeder =================+
26"""
27
28import re
29from typing import List, Dict, Union, Any, Tuple, Optional, Literal, Set, overload
30
31import mysql
32import mysql.connector
33from display_tty import Disp, initialise_logger
34
35
36from . import sql_constants as SCONST
37from .sql_injection import SQLInjection
38from .sql_connections import SQLManageConnections
39from .sql_sanitisation_functions import SQLSanitiseFunctions
40
41
43 """Provide reusable SQL query templates and helpers.
44
45 This class contains methods for generating common SQL queries, such as
46 creating tables, inserting data, and fetching results. It simplifies
47 database interactions by abstracting repetitive query patterns.
48
49 Attributes:
50 disp (Disp): Logger instance for debugging and error reporting.
51 sql_pool (SQLManageConnections): Connection manager for database operations.
52 error (int): Numeric error code.
53 success (int): Numeric success code.
54 debug (bool): Debug mode flag.
55 db_version (Optional[Tuple[int, int, int]]): Parsed database version.
56 sql_injection (SQLInjection): SQL injection protection utility.
57 sanitize_functions (SQLSanitiseFunctions): SQL sanitization utility.
58 """
59
60 disp: Disp = initialise_logger(__qualname__, False)
61
62 def __init__(self, sql_pool: SQLManageConnections, success: int = 0, error: int = 84, debug: bool = False) -> None:
63 """Initialize the query helper.
64
65 Args:
66 sql_pool (SQLManageConnections): Connection manager used to run queries and commands.
67 redis_cacher (Optional[RedisCaching]): Optional Redis caching layer to enable transparent caching.
68 success (int, optional): Numeric success code. Defaults to 0.
69 error (int, optional): Numeric error code. Defaults to 84.
70 debug (bool, optional): Enable debug logging. Defaults to False.
71 """
72 # ------------------------ The logging function ------------------------
73 self.disp.update_disp_debug(debug)
74 self.disp.log_debug("Initialising...")
75 # -------------------------- Inherited values --------------------------
76 self.sql_pool: SQLManageConnections = sql_pool
77 self.error: int = error
78 self.debug: bool = debug
79 self.success: int = success
80 # ------------------------ SQL Database version ------------------------
81 self.db_version: Optional[Tuple[int, int, int]] = None
82 self.db_version_major: Optional[int] = None
83 self.db_version_minor: Optional[int] = None
84 self.db_version_retro: Optional[int] = None
85 if self.sql_pool.is_pool_active():
87 # ---------------------------- Where regex ----------------------------
88 self.compiled_where: re.Pattern = re.compile(
89 r"""
90'[^']*' | # single-quoted strings
91"[^"]*" | # double-quoted strings
92<=|>=|!=|=|<|> | # comparison operators
93\‍(|\‍) | # parentheses
94, | # literal comma
95\bAND\b|\bOR\b|\bIN\b|\bLIKE\b|\bIS\b|\bNOT\b | # SQL keywords
96[A-Za-z_][A-Za-z0-9_]* | # identifiers
97-?\d+(?:\.\d+)? # integers and floats (including negatives)
98""",
99 re.IGNORECASE | re.VERBOSE,
100 )
101 self.compiled_digit_check: re.Pattern = re.compile(r"-?\d+(?:\.\d+)?$")
102 self.where_clause_safe_tokens: Set[str] = {
103 '(', ')', ',', 'OR', 'AND', '=', '!=', '<', '>', '<=',
104 '>=', 'LIKE', 'IN', 'NOT', 'IS', 'NULL', 'TRUE', 'FALSE'
105 }
107 self.where_clause_single_space_skippers_post: Set[str] = {')', ','}
109 '=', '!=', '<', '>', '<=', '>='
110 }
111 # ------------------- Pre-built injection exception -------------------
112 self.where_injection_exception: RuntimeError = RuntimeError(
113 "SQL injection detected in WHERE clause"
114 )
115 # ---------------------- The anty injection class ----------------------
116 self.sql_injection: SQLInjection = SQLInjection(
117 self.error,
118 self.success,
119 self.debug
120 )
121 # -------------------- Keyword sanitizing functions --------------------
122 self.sanitize_functions: SQLSanitiseFunctions = SQLSanitiseFunctions(
123 success=self.success, error=self.error, debug=self.debug
124 )
125 self.disp.log_debug("Initialised")
126
127 def _normalize_cell(self, cell: object) -> Union[str, None, int, float]:
128 """Normalize a cell value for parameter binding.
129
130 Converts special tokens (like 'now', 'current_date') to their appropriate SQL values, preserves numeric types, and returns None for null-like inputs.
131
132 Args:
133 cell (object): The cell value to normalize.
134
135 Returns:
136 Union[str, None, int, float]: Normalized value suitable for SQL parameters.
137 """
138 if cell is None:
139 return None
140 if isinstance(cell, (int, float)):
141 return cell
142 s = str(cell)
143 sl = s.lower()
144 if sl in ("now", "now()"):
145 return self.sanitize_functions.sql_time_manipulation.get_correct_now_value()
146 if sl in ("current_date", "current_date()"):
147 return self.sanitize_functions.sql_time_manipulation.get_correct_current_date_value()
148 return s
149
150 def _tokenize_where(self, clause: str) -> List[str]:
151 """Tokenize a WHERE clause into SQL tokens.
152
153 Splits a clause into individual tokens such as identifiers, operators, numbers, strings, and SQL keywords.
154
155 Args:
156 clause (str): The SQL WHERE clause to tokenize.
157
158 Returns:
159 List[str]: List of tokens extracted from the clause.
160 """
161 return self.compiled_where.findall(clause)
162
163 def _is_digit(self, token: str) -> bool:
164 """Check if the given token represents a number
165
166 Args:
167 token (str): The token to check
168
169 Returns:
170 bool: True is a digit, False otherwise
171 """
172 return bool(self.compiled_digit_check.fullmatch(token))
173
174 def _escape_risky_column_name(self, token: str) -> str:
175 """Escape column names that could be considered as SQL keywords instead of text.
176
177 Args:
178 token (str): The column part of the token
179
180 Returns:
181 str: The token (escaped if necessary)
182 """
183 token_lower = token.lower()
184 if (
185 token_lower not in self.sanitize_functions.keyword_logic_gates
186 and token_lower in self.sanitize_functions.risky_keywords
187 ):
188 self.disp.log_debug(f"Escaping risky column name {token}")
189 return f"`{token}`"
190 return token
191
192 def _strip_outer_quotes(self, value: str) -> str:
193 """Remove at most one leading and/or trailing quote character.
194
195 Preserves inner quotes and does not affect the string if no quotes are present.
196
197 Args:
198 value (str): The string from which to remove outer quotes.
199
200 Returns:
201 str: String with outer quotes removed if present.
202 """
203 if not value:
204 return value
205
206 self.disp.log_debug(f"Stripping outer quotes for {value}")
207 if value[0] in ("'", '"'):
208 value = value[1:]
209
210 if value and value[-1] in ("'", '"'):
211 value = value[:-1]
212 self.disp.log_debug(f"Outer quotes stripped for {value}")
213
214 return value
215
216 def _check_where_node(self, token: str) -> None:
217 """Validate a single WHERE clause token for SQL injection.
218
219 Checks whether the token is safe (operators, keywords, numbers, or quoted literals) and raises an error if an injection risk is detected.
220
221 Args:
222 token (str): Token from the WHERE clause to validate.
223
224 Raises:
225 RuntimeError: If the token is determined to be unsafe.
226 """
227
228 raw = token.strip()
229 upper = raw.upper()
230
231 # Token has been escaped already
232 if raw.startswith("`") and raw.endswith("`"):
233 return
234
235 # Known-safe SQL tokens
236 if upper in self.where_clause_safe_tokens:
237 return
238
239 # Numeric literals
240 if raw.isdigit():
241 return
242
243 # Quoted literals
244 if (
245 raw.startswith("'") and raw.endswith("'")
246 ) or (
247 raw.startswith('"') and raw.endswith('"')
248 ):
249 return
250
251 # Strip quotes before injection check
252 check_token = raw.strip("'\"")
253
254 if self.sql_injection.check_if_symbol_and_command_injection(check_token):
255 self.disp.log_error(
256 f"SQL injection detected in WHERE token: {raw}"
257 )
259
260 def _is_quoted(self, value: str) -> bool:
261 """Check if a string is surrounded by single or double quotes.
262
263 Args:
264 value (str): The string to check.
265
266 Returns:
267 bool: True if the string starts and ends with the same quote character
268 (either single `'` or double `"`), False otherwise.
269 """
270 if not value or len(value) < 2:
271 return False
272 if (value[0] == value[-1]) and value[0] in ("'", '"'):
273 self.disp.log_debug(f"value ({value}) is quoted")
274 return True
275 return False
276
277 def _where_space_handler(self, token: str, rebuilt_tokens: List[str], skip_space: bool = False) -> bool:
278 """Determine where to put spaces during query rebuild, this is mostly for esthetic reasons.
279
280 Args:
281 token (str): The current token being processed.
282 rebuilt_tokens (List[str]): The list of tokens being rebuilt.
283 skip_space (bool, optional): This is wether adding whitespace on the next turn should be skipped or not. Defaults to False.
284
285 Returns:
286 bool: The state of skip_space for the next turn.
287 """
288 space = " "
289 # Add a space next turn
290 if skip_space or not rebuilt_tokens or token in self.where_clause_single_space_skippers_post:
291 return False
292 # Add a space now but not next turn
294 rebuilt_tokens.append(space)
295 return True
296 # Do not add a space now and neither on the next turn
297 if token in self.where_clause_double_space_skippers:
298 return True
299 # Add a space (default behaviour)
300 rebuilt_tokens.append(space)
301 return False
302
303 def _sanity_check_where_clause(self, clause_str: str) -> None:
304 """
305 Perform sanity checks on a WHERE clause to detect potential SQL injection after it has been broken down and reconstructed.
306
307 Args:
308 clause_str (str): The clause to check.
309
310 Raises:
311 RuntimeError: If SQL injection is detected.
312 """
313 # Check for unbalanced parentheses
314 if clause_str.count("(") != clause_str.count(")"):
316 # Disallow trailing identifiers after a complete expression
317 if re.search(r"%s\s+`[^`]+`", clause_str):
319
320 def _check_complex_clause_for_injection(self, clause_str: str) -> Tuple[str, List[Union[str, int, float, None]]]:
321 """
322 Validate a complex WHERE clause and extract parameterizable values.
323
324 Args:
325 clause_str (str): The clause to check.
326
327 Returns:
328 Tuple[str, List[Union[str, int, float, None]]]:
329 - The clause with placeholders (%s) for parameterizable values.
330 - List of extracted values.
331 """
332 skip_space = False
333 self.disp.log_debug(f"Raw clause: {clause_str}")
334 tokens = self._tokenize_where(clause_str)
335 self.disp.log_debug(f"Tokenised clause: {tokens}")
336
337 params: List[Union[str, int, float, None]] = []
338 rebuilt_tokens: List[str] = []
339 is_column: bool = False
340
341 for token in tokens:
342 is_column = (
343 self._is_digit(token)
344 or self._is_quoted(token)
345 ) is False
346 if is_column:
347 token = self._escape_risky_column_name(token)
348 self._check_where_node(token) # existing validation
349 # Handle spacing (this is just esthetics)
350 skip_space = self._where_space_handler(
351 token, rebuilt_tokens, skip_space
352 )
353
354 # Decide if token is a value to parameterize
355 if not is_column:
356 normalized = self._normalize_cell(
357 self._strip_outer_quotes(token)
358 )
359 params.append(normalized)
360 rebuilt_tokens.append("%s")
361 else:
362 rebuilt_tokens.append(token)
363
364 self.disp.log_debug(f"rebuilt_tokens: {rebuilt_tokens}")
365 rebuilt_clause = "".join(rebuilt_tokens)
366 self.disp.log_debug(f"rebuilt_clause: {rebuilt_clause}")
367 self.disp.log_debug(f"parameters: {params}")
368 self._sanity_check_where_clause(rebuilt_clause)
369 self.disp.log_debug("WHERE clause passed sanity check")
370 return rebuilt_clause, params
371
372 def _parse_where_clause(self, where: Union[str, List[str]]) -> Tuple[str, List[Union[str, int, float, None]]]:
373 """Parse and parameterize a WHERE clause.
374
375 This method performs the following steps:
376 1. Validates each clause for SQL injection.
377 2. Parameterizes simple equality clauses with %s placeholders.
378 3. Escapes risky column names while leaving logical operators unescaped.
379 4. Returns the parameterized WHERE string and list of extracted values.
380
381 Args:
382 where (Union[str, List[str]]): WHERE clause(s) to parse. Can be a string
383 or a list of strings joined by AND.
384
385 Returns:
386 Tuple[str, List[Union[str, int, float, None]]]:
387 - Parameterized WHERE clause string.
388 - List of extracted values for parameters.
389
390 Raises:
391 RuntimeError: If SQL injection is detected in the WHERE clauses.
392 """
393
394 title = "_parse_where_clause"
395
396 if where == "" or (isinstance(where, list) and not where):
397 return "", []
398
399 self.disp.log_debug(f"unchecked WHERE clause={where}")
400
401 params: List[Union[str, int, float, None]] = []
402 parsed_clauses: List[str] = []
403 join_term = ""
404
405 if isinstance(where, str):
406 self.disp.log_debug("Where clause is a string")
407 where_list = [where]
408 elif hasattr(where, "__iter__") or hasattr(where, "__getitem__"):
409 self.disp.log_debug(
410 f"Where clause is a iterable, type: {type(where)}"
411 )
412 where_list = where
413 join_term = " AND "
414 else:
415 raise ValueError("Unhandled type for where checking")
416
417 for clause in where_list:
418 clause_str = str(clause).strip()
419 self.disp.log_debug(f"stripped string clause: {clause_str}")
420
421 # ALWAYS validate the clause first
422 processed_check: Tuple[
423 str,
424 List[Union[str, int, float, None]]
425 ] = self._check_complex_clause_for_injection(clause_str)
426 self.disp.log_debug(f"processed_checks: {processed_check}")
427
428 # Clause is already validated — keep as-is
429 parsed_clauses.append(processed_check[0])
430 params.extend(processed_check[1])
431
432 where_string = join_term.join(parsed_clauses)
433
434 self.disp.log_debug(
435 f"Parsed WHERE: '{where_string}', params: {params}",
436 title
437 )
438
439 return where_string, params
440
441 def get_database_version(self) -> Optional[Tuple[int, int, int]]:
442 """Fetch and parse the database version.
443
444 Returns:
445 Optional[Tuple[int, int, int]]: A tuple representing the database version,
446 or None if the query fails.
447 """
448 _query: str = "SELECT VERSION()"
449 resp = self.sql_pool.run_and_fetch_all(_query)
450 if isinstance(resp, int):
451 return None
452 if not resp or not isinstance(resp, list):
453 return None
454 if not resp[0] or not isinstance(resp[0], tuple):
455 return None
456 if not resp[0][0] or not isinstance(resp[0][0], str):
457 return None
458 vers = resp[0][0]
459 version_parts = vers.split('.')
460 parsed_version = []
461 for part in version_parts:
462 if part.isdigit():
463 parsed_version.append(int(part))
464 else:
465 parsed_version.append(part)
466 self.db_version = tuple(parsed_version)
467 self.db_version_major = self.db_version[0]
468 self.db_version_minor = self.db_version[1]
469 self.db_version_retro = self.db_version[2]
470 return self.db_version
471
472 def get_table_column_names(self, table_name: str) -> Union[List[str], int]:
473 """Return the list of column names for a given table.
474
475 Args:
476 table_name (str): Name of the table to retrieve column names from.
477
478 Returns:
479 Union[List[str], int]: List of column names on success, or `self.error` on failure.
480 """
481 title = "get_table_column_names"
482 try:
483 columns = self.describe_table(table_name)
484 if isinstance(columns, int) is True:
485 self.disp.log_error(
486 f"Failed to describe table {table_name}.",
487 title
488 )
489 return self.error
490 data = []
491 if isinstance(columns, List):
492 for i in columns:
493 data.append(i[0])
494 return data
495 except RuntimeError as e:
496 msg = "Error: Failed to get column names of the tables.\n"
497 msg += f"\"{str(e)}\""
498 self.disp.log_error(msg, "get_table_column_names")
499 return self.error
500
501 def get_table_names(self) -> Union[int, List[str]]:
502 """Retrieve the names of all tables in the database.
503
504 Returns:
505 Union[int, List[str]]: List of table names on success, or `self.error` on failure.
506 """
507 title = "get_table_names"
508 self.disp.log_debug("Getting table names.", title)
509 resp = self.sql_pool.run_and_fetch_all(query="SHOW TABLES")
510 if isinstance(resp, int) is True:
511 self.disp.log_error(
512 "Failed to fetch the table names.",
513 title
514 )
515 return self.error
516 data = []
517 if isinstance(resp, (List, Dict, Tuple)):
518 for i in resp:
519 data.append(i[0])
520 self.disp.log_debug("Tables fetched", title)
521 return data
522
523 def get_triggers(self) -> Union[int, Dict[str, str]]:
524 """Retrieve all triggers and their SQL definitions.
525
526 Returns:
527 Union[int, Dict[str, str]]: Dictionary of {trigger_name: sql_definition},
528 or `self.error` on failure.
529 """
530 title = "get_triggers"
531 self.disp.log_debug(
532 "Fetching all triggers and their SQL definitions.", title
533 )
534
535 query = "SHOW TRIGGERS;"
536 resp = self.sql_pool.run_and_fetch_all(query=query, values=[])
537
538 if isinstance(resp, int):
539 self.disp.log_error("Failed to fetch triggers.", title)
540 return self.error
541
542 data: Dict[str, str] = {}
543 for row in resp:
544 if len(row) >= 2 and row[0] and row[1]:
545 data[row[0]] = row[1]
546
547 self.disp.log_debug(f"Triggers fetched: {list(data.keys())}", title)
548 return data
549
550 def get_trigger(self, trigger_name: str, db_name: Optional[str] = None) -> Union[int, str]:
551 """Retrieve the SQL definition of a specific trigger.
552
553 Args:
554 trigger_name (str): The trigger name to fetch.
555 db_name (Optional[str], optional): Database name. Defaults to None.
556
557 Returns:
558 Union[int, str]: The SQL definition, or `self.error` on failure.
559 """
560 title = "get_trigger"
561 self.disp.log_debug(
562 f"Getting trigger definition for '{trigger_name}'", title
563 )
564
565 if not trigger_name:
566 self.disp.log_error("Trigger name cannot be empty.", title)
567 return self.error
568
569 to_check: List[str] = [trigger_name]
570 if db_name:
571 to_check.append(db_name)
572
573 if self.sql_injection.check_if_injections_in_strings(to_check):
574 self.disp.log_error(
575 "SQL Injection detected in trigger name.", title)
576 return self.error
577
578 if db_name:
579 query = f"SHOW CREATE TRIGGER `{db_name}`.`{trigger_name}`"
580 else:
581 query = f"SHOW CREATE TRIGGER `{trigger_name}`"
582
583 resp = self.sql_pool.run_and_fetch_all(query=query, values=None)
584
585 if isinstance(resp, int) or not resp:
586 self.disp.log_error(
587 f"Failed to retrieve trigger '{trigger_name}'.", title
588 )
589 return self.error
590
591 sql_definition = None
592 if resp and len(resp[0]) > 2:
593 sql_definition = resp[0][2]
594 if not sql_definition:
595 self.disp.log_error(
596 f"No SQL definition found for trigger '{trigger_name}'.", title
597 )
598 return self.error
599
600 self.disp.log_debug(
601 f"SQL for trigger '{trigger_name}':\n{sql_definition}", title
602 )
603 return sql_definition
604
605 def get_trigger_names(self, db_name: Optional[str] = None) -> Union[int, List[str]]:
606 """Return a list of trigger names in the current or specified MySQL database.
607
608 Args:
609 db_name (Optional[str], optional):
610 Name of the database/schema to query.
611 Defaults to None, which uses the currently selected database.
612
613 Returns:
614 Union[int, List[str]]: List of trigger names, or ``self.error`` on failure.
615 """
616 title = "get_trigger_names"
617 self.disp.log_debug("Getting trigger names.", title)
618 if db_name:
619 if self.sql_injection.check_if_injections_in_strings([db_name]):
620 self.disp.log_error(
621 "SQL Injection detected in database name.", title
622 )
623 return self.error
624
625 query = (
626 "SELECT TRIGGER_NAME "
627 "FROM information_schema.triggers "
628 "WHERE TRIGGER_SCHEMA = %s "
629 "ORDER BY TRIGGER_NAME;"
630 )
631 values: List[Union[str, int, float, None]] = [db_name]
632 else:
633 query = (
634 "SELECT TRIGGER_NAME "
635 "FROM information_schema.triggers "
636 "WHERE TRIGGER_SCHEMA = DATABASE() "
637 "ORDER BY TRIGGER_NAME;"
638 )
639 values: List[Union[str, int, float, None]] = []
640
641 # --------------------------------------------------------------------------
642 # Execute the query
643 # --------------------------------------------------------------------------
644 self.disp.log_debug(f"Running query: {query}", title)
645 response = self.sql_pool.run_and_fetch_all(query=query, values=values)
646
647 if isinstance(response, int):
648 self.disp.log_error("Failed to fetch trigger names.", title)
649 return self.error
650
651 if not response:
652 self.disp.log_debug(
653 "No triggers found in the selected database.", title)
654 return []
655
656 # --------------------------------------------------------------------------
657 # Extract trigger names
658 # --------------------------------------------------------------------------
659 trigger_names: List[str] = []
660 for row in response:
661 if row and row[0]:
662 trigger_names.append(row[0])
663
664 self.disp.log_debug(f"Triggers fetched: {trigger_names}", title)
665 return trigger_names
666
667 def describe_table(self, table: str) -> Union[int, List[Any]]:
668 """Fetch the headers (description) of a table from the database.
669
670 Args:
671 table (str): The name of the table to describe.
672
673 Returns:
674 Union[int, List[Any]]: A list containing the description of the table, or self.error if an error occurs.
675 """
676 title = "describe_table"
677 self.disp.log_debug(f"Describing table {table}", title)
678 if self.sql_injection.check_if_sql_injection(table) is True:
679 self.disp.log_error("Injection detected.", "sql")
680 return self.error
681 try:
682 resp = self.sql_pool.run_and_fetch_all(query=f"DESCRIBE {table}")
683 if isinstance(resp, int) is True:
684 self.disp.log_error(
685 f"Failed to describe table {table}", title
686 )
687 return self.error
688 return resp
689 except mysql.connector.errors.ProgrammingError as pe:
690 msg = f"ProgrammingError: The table '{table}'"
691 msg += "does not exist or the query failed."
692 self.disp.log_critical(msg, title)
693 raise RuntimeError(msg) from pe
694 except mysql.connector.errors.IntegrityError as ie:
695 msg = "IntegrityError: There was an integrity constraint "
696 msg += f"issue while describing the table '{table}'."
697 self.disp.log_critical(msg, title)
698 raise RuntimeError(msg) from ie
699 except mysql.connector.errors.OperationalError as oe:
700 msg = "OperationalError: There was an operational error "
701 msg += f"while describing the table '{table}'."
702 self.disp.log_critical(msg, title)
703 raise RuntimeError(msg) from oe
704 except mysql.connector.Error as e:
705 msg = "MySQL Error: An unexpected error occurred while "
706 msg += f"describing the table '{table}'."
707 self.disp.log_critical(msg, title)
708 raise RuntimeError(msg) from e
709 except RuntimeError as e:
710 msg = "A runtime error occurred during the table description process."
711 self.disp.log_critical(msg, title)
712 raise RuntimeError(msg) from e
713
714 def create_table(self, table: str, columns: List[Tuple[str, str]]) -> int:
715 """Create a new table in the MySQL database, compatible with MySQL 5.0+.
716
717 Args:
718 table (str): Name of the new table.
719 columns (List[Tuple[str, str]]): List of (column_name, column_type) pairs.
720
721 Returns:
722 int: ``self.success`` on success, or ``self.error`` on failure.
723
724 Example:
725 .. code-block:: python
726
727 table_name = "users"
728 columns = [
729 ("id", "INT AUTO_INCREMENT PRIMARY KEY"),
730 ("username", "VARCHAR(255) NOT NULL UNIQUE"),
731 ("email", "VARCHAR(255) NOT NULL"),
732 ("created_at", "DATETIME DEFAULT CURRENT_TIMESTAMP")
733 ]
734
735 result = self.create_table(table_name, columns)
736 if result == self.success:
737 print(f"Table '{table_name}' created successfully.")
738 else:
739 print(f"Failed to create table '{table_name}'.")
740
741 Notes:
742 - Protects against SQL injection using :class:`SQLInjection`.
743 - Escapes table and column names with backticks for MySQL.
744 - Includes version-aware fallback for MySQL 5.0 compatibility.
745 """
746 title = "create_table"
747 self.disp.log_debug(f"Creating table '{table}'", title)
748
749 # --- SQL injection protection ---
750 if self.sql_injection.check_if_injections_in_strings([table]):
751 self.disp.log_error("Injection detected in table name.", title)
752 return self.error
753
754 try:
755 # --- Escape table name ---
756 table_safe = table.replace("`", "``")
757
758 # --- Build column definitions safely ---
759 _tmp = []
760 for name, col_type in columns:
761 safe_name = name.replace("`", "``")
762
763 # Fallback for old MySQL versions (5.0–5.5)
764 if (
765 "DEFAULT CURRENT_TIMESTAMP" in col_type.upper()
766 and (
767 not self.db_version_major or self.db_version_major <= 5
768 )
769 ):
770 self.disp.log_warning(
771 f"MySQL 5.0 does not support 'DEFAULT CURRENT_TIMESTAMP' "
772 f"on DATETIME columns. Downgrading '{safe_name}' definition.",
773 title,
774 )
775 col_type = col_type.upper().replace(
776 "DEFAULT CURRENT_TIMESTAMP", "NULL"
777 )
778
779 _tmp.append(f"`{safe_name}` {col_type}")
780
781 columns_def = ", ".join(_tmp)
782 query = f"CREATE TABLE IF NOT EXISTS `{table_safe}` ({columns_def}) ENGINE=InnoDB;"
783
784 self.disp.log_debug(f"Executing SQL: {query}", title)
785
786 result = self.sql_pool.run_and_commit(query=query, values=[])
787
788 if isinstance(result, int) and result == self.error:
789 self.disp.log_error(f"Failed to create table '{table}'", title)
790 return self.error
791
792 self.disp.log_info(f"Table '{table}' created successfully.", title)
793 return self.success
794
795 except mysql.connector.OperationalError as oe:
796 msg = f"MySQL OperationalError while creating table '{table}': {oe}"
797 self.disp.log_critical(msg, title)
798 raise RuntimeError(msg) from oe
799 except mysql.connector.Error as e:
800 msg = f"MySQL Error while creating table '{table}': {e}"
801 self.disp.log_critical(msg, title)
802 raise RuntimeError(msg) from e
803 except Exception as e:
804 msg = f"Unexpected error while creating table '{table}': {e}"
805 self.disp.log_critical(msg, title)
806 raise RuntimeError(msg) from e
807
808 def insert_data_into_table(self, table: str, data: Union[List[List[Union[str, None, int, float]]], List[Union[str, None, int, float]]], column: Union[List[str], None] = None) -> int:
809 """Insert data into a table.
810
811 Args:
812 table (str): Name of the table.
813 data (Union[List[List[Union[str, None, int, float]]], List[Union[str, None, int, float]]]): Data to insert.
814 column (Union[List[str], None], optional): List of column names. Defaults to None.
815
816 Returns:
817 int: ``self.success`` on success, or ``self.error`` on failure.
818 """
819 title = "insert_data_into_table"
820 self.disp.log_debug("Inserting data into the table.", title)
821 if column is None:
822 column = []
823 check_data: Union[
824 List[List[Union[str, None, int, float]]],
825 List[Union[str, None, int, float]]
826 ] = [table]
827 if column is not None:
828 check_data.extend(column)
829 if isinstance(data, List):
830 for i in data:
831 if isinstance(i, List):
832 check_data.extend(i)
833 else:
834 check_data.append(i)
835 if self.sql_injection.check_if_injections_in_strings(check_data) is True:
836 self.disp.log_error("Injection detected.", "sql")
837 return self.error
838
839 if isinstance(column, List) and len(column) == 0:
840 column_raw = self.get_table_column_names(table)
841 if column_raw is None:
842 return self.error
843 if isinstance(column_raw, list):
844 column = column_raw
845
846 if column is None:
847 return self.error
848 column_checked: Union[List[str], str] = column
849
850 column_checked = self.sanitize_functions.escape_risky_column_names(
851 column_checked
852 )
853
854 column_str = ", ".join(column)
855 column_length = len(column)
856
857 try:
858 cleaned_lines: Tuple[
859 str, List[Union[str, int, float, None]]
860 ] = self.sanitize_functions.process_sql_line(data, column, column_length)
861 line = cleaned_lines[0]
862 values = cleaned_lines[1]
863 self.disp.log_debug(f"Cleaned lines = '{cleaned_lines}'", title)
864 sql_query = f"INSERT INTO {table} ({column_str}) VALUES {line}"
865 self.disp.log_debug(
866 f"sql_query = '{sql_query}', values = '{values}'", title
867 )
868 return self.sql_pool.run_editing_command(sql_query, values, table, "insert")
869 except RuntimeError as e:
870 self.disp.log_error(
871 f"Failed to check and clean the data needed to be inserted: {e}"
872 )
873 return self.error
874
875 def insert_trigger(self, trigger_name: str, table_name: str, timing_event: str, body: str) -> int:
876 """Insert (create) a new SQL trigger into a MySQL or MariaDB database.
877
878 Args:
879 trigger_name (str): The name of the trigger to create.
880 table_name (str): The name of the table the trigger is being applied to.
881 timing_event (str): The rule when the event is to be triggered. e.g., 'BEFORE INSERT'.
882 body (str): The full SQL CREATE TRIGGER statement.
883
884 Returns:
885 int: ``self.success`` on success, or ``self.error`` on failure.
886 """
887 title = "insert_trigger"
888
889 # --- Sanity checks ---
890 if not all([trigger_name, table_name, timing_event, body]):
891 self.disp.log_error("All parameters must be provided.", title)
892 return self.error
893
894 self.disp.log_debug(f"Inserting trigger: {trigger_name}", title)
895 # --- SQL injection prevention ---
896 if self.sql_injection.check_if_injections_in_strings([trigger_name, table_name]):
897 self.disp.log_error("SQL injection detected.", title)
898 return self.error
899
900 if self.sql_injection.check_if_symbol_and_logic_gate_injection(timing_event):
901 self.disp.log_error("SQL injection detected", title)
902 return self.error
903
904 # --- Build the SQL ---
905 sql_query = self.sanitize_functions.clean_trigger_creation(
906 trigger_name, table_name, timing_event, body)
907 self.disp.log_debug(f"Trigger SQL:\n{sql_query}", title)
908 if sql_query != self.success or not isinstance(sql_query, str):
909 self.disp.log_error("Sql trigger query is invalid.", title)
910 return self.error
911
912 # --- Drop existing trigger (MySQL has no CREATE TRIGGER IF NOT EXISTS) ---
913 response: int = self.remove_trigger(trigger_name)
914 if response != self.success:
915 self.disp.log_warning(
916 f"Could not drop trigger '{trigger_name}' (may not exist).", title
917 )
918
919 # --- Execute trigger creation ---
920 result = self.sql_pool.run_editing_command(
921 sql_query, [], trigger_name, "create_trigger"
922 )
923 if result != self.success:
924 self.disp.log_error(
925 f"Failed to create trigger '{trigger_name}'", title
926 )
927 return self.error
928
929 self.disp.log_info(
930 f"Trigger '{trigger_name}' successfully created.", title
931 )
932 return self.success
933
934 @overload
936 self,
937 table: str,
938 column: Union[str, List[str]],
939 where: Union[str, List[str]] = "",
940 beautify: Literal[True] = True,
941 ) -> Union[int, List[Dict[str, Any]]]: ...
942
943 @overload
945 self,
946 table: str,
947 column: Union[str, List[str]],
948 where: Union[str, List[str]] = "",
949 beautify: Literal[False] = False,
950 ) -> Union[int, List[Tuple[Any, Any]]]: ...
951
952 def get_data_from_table(self, table: str, column: Union[str, List[str]], where: Union[str, List[str]] = "", beautify: bool = True) -> Union[int, Union[List[Dict[str, Any]], List[Tuple[Any, Any]]]]:
953 """Fetch rows from a table.
954
955 Args:
956 table (str): Name of the table to query.
957 column (Union[str, List[str]]): Column selector; a single column name or a list of column names.
958 where (Union[str, List[str]]): Optional WHERE clause content; string or list joined by ``AND``.
959 beautify (bool): When True, return a list of dict rows; when False, return a list of tuples.
960
961 Returns:
962 Union[int, List[Dict[str, Any]], List[Tuple[Any, Any]]]: Query result on success, or ``self.error`` on failure.
963 """
964 title = "get_data_from_table"
965 self.disp.log_debug(f"fetching data from the table {table}", title)
966
967 # Only check table/column names for injection — WHERE values will be parameterized
968 if self.sql_injection.check_if_injections_in_strings([table, column]) is True:
969 self.disp.log_error("Injection detected.", "sql")
970 return self.error
971
972 # Store column name for later use if beautifying is needed (make a copy if it's a list)
973 if isinstance(column, list):
974 original_column = column.copy()
975 else:
976 original_column = column
977 if isinstance(column, list) is True:
978 column = self.sanitize_functions.escape_risky_column_names(column)
979 column = ", ".join(column)
980
981 sql_command = f"SELECT {column} FROM {table}"
982
983 # Parse WHERE clause and extract values for parameterization
984 try:
985 where_clause, where_params = self._parse_where_clause(where)
986 except RuntimeError as e:
987 self.disp.log_error(f"WHERE clause parsing failed: {e}", title)
988 return self.error
989
990 if where_clause != "":
991 sql_command += f" WHERE {where_clause}"
992
993 self.disp.log_debug(
994 f"sql_query = '{sql_command}', params = {where_params}", title)
995 resp = self.sql_pool.run_and_fetch_all(
996 query=sql_command, values=where_params)
997 if isinstance(resp, int):
998 if resp != self.success:
999 self.disp.log_error(
1000 "Failed to fetch the data from the table.", title
1001 )
1002 return self.error
1003 resp_list = []
1004 else:
1005 resp_list = resp
1006 self.disp.log_debug(f"Queried data: {resp}", title)
1007 if beautify is False:
1008 return resp_list
1009 # Return the raw list if no data is present
1010 if len(resp_list) == 0:
1011 return resp_list
1012 # Determine columns to beautify
1013 if original_column in ("*", ["*"]):
1014 data = self.describe_table(table)
1015 self.disp.log_debug(f"Described table columns: {data}", title)
1016 if isinstance(data, int):
1017 return self.error
1018 else:
1019 if isinstance(original_column, str):
1020 data = [original_column]
1021 else:
1022 data = original_column
1023 # Beautify and return the data
1024 return self.sanitize_functions.beautify_table(data, resp_list)
1025
1026 def get_table_size(self, table: str, column: Union[str, List[str]], where: Union[str, List[str]] = "") -> int:
1027 """Return the row count for a table.
1028
1029 Args:
1030 table (str): Name of the table to count rows in.
1031 column (Union[str, List[str]]): Column expression passed to ``COUNT(...)``.
1032 where (Union[str, List[str]]): Optional WHERE clause content; string or list joined by ``AND``.
1033
1034 Returns:
1035 int: Number of rows on success, or ``SCONST.GET_TABLE_SIZE_ERROR`` on failure.
1036 """
1037 title = "get_table_size"
1038 self.disp.log_debug(f"fetching data from the table {table}", title)
1039
1040 # Only check table/column names for injection — WHERE values will be parameterized
1041 if self.sql_injection.check_if_injections_in_strings([table, column]) is True:
1042 self.disp.log_error("Injection detected.", "sql")
1043 return SCONST.GET_TABLE_SIZE_ERROR
1044
1045 if isinstance(column, list) is True:
1046 column = ", ".join(column)
1047
1048 sql_command = f"SELECT COUNT({column}) FROM {table}"
1049
1050 # Parse WHERE clause and extract values for parameterization
1051 try:
1052 where_clause, where_params = self._parse_where_clause(where)
1053 except RuntimeError as e:
1054 self.disp.log_error(f"WHERE clause parsing failed: {e}", title)
1055 return SCONST.GET_TABLE_SIZE_ERROR
1056
1057 if where_clause != "":
1058 sql_command += f" WHERE {where_clause}"
1059
1060 self.disp.log_debug(
1061 f"sql_query = '{sql_command}', params = {where_params}", title)
1062 resp = self.sql_pool.run_and_fetch_all(
1063 query=sql_command, values=where_params)
1064 if isinstance(resp, int):
1065 if resp != self.success:
1066 self.disp.log_error(
1067 "Failed to fetch the data from the table.", title
1068 )
1069 return SCONST.GET_TABLE_SIZE_ERROR
1070 resp_list = []
1071 else:
1072 resp_list = resp
1073 if len(resp_list) == 0:
1074 self.disp.log_error(
1075 "There was no data returned by the query.", title
1076 )
1077 return SCONST.GET_TABLE_SIZE_ERROR
1078 if isinstance(resp_list[0], tuple) is False:
1079 self.disp.log_error("The data returned is not a tuple.", title)
1080 return SCONST.GET_TABLE_SIZE_ERROR
1081 return resp_list[0][0]
1082
1083 def update_data_in_table(self, table: str, data: Union[List[List[Union[str, None, int, float]]], List[Union[str, None, int, float]]], column: List[str], where: Union[str, List[str]] = "") -> int:
1084 """Update rows in a table.
1085
1086 Args:
1087 table (str): Name of the table to update.
1088 data (Union[List[List[Union[str, None, int, float]]], List[Union[str, None, int, float]]]): Values aligned to ``column`` order; single row or list of rows.
1089 column (List[str]): Column names to update.
1090 where (Union[str, List[str]]): Optional WHERE clause content; string or list joined by ``AND``.
1091
1092 Returns:
1093 int: ``self.success`` on success, or ``self.error`` on failure.
1094 """
1095 title = "update_data_in_table"
1096 msg = f"Updating the data contained in the table: {table}"
1097 self.disp.log_debug(msg, title)
1098 if column is None:
1099 column = ""
1100
1101 # Only check table/column names for injection — data and WHERE values are parameterized
1102 self.disp.log_debug(
1103 "Checking the table/column name to make sure everything is parameterised."
1104 )
1105 check_items = [table]
1106 if isinstance(column, list):
1107 check_items.extend([str(c) for c in column])
1108 else:
1109 check_items.append(str(column))
1110 if self.sql_injection.check_if_injections_in_strings(check_items):
1111 self.disp.log_error("Injection detected.", "sql")
1112 return self.error
1113
1114 self.disp.log_debug("Injection checking passed")
1115 if column == "":
1116 self.disp.log_debug("No table column names provided, deducing")
1117 columns_raw = self.get_table_column_names(table)
1118 if isinstance(columns_raw, int):
1119 self.disp.log_debug("Failed to deduce table column names")
1120 return self.error
1121 column = columns_raw
1122
1123 # Ensure column is a List[str] for subsequent operations
1124 self.disp.log_debug(
1125 "Making sure that the column variable is of type List[str]"
1126 )
1127 _tmp_cols2: Union[List[str], str] = self.sanitize_functions.escape_risky_column_names(
1128 column
1129 )
1130 if isinstance(_tmp_cols2, list):
1131 column = _tmp_cols2
1132 else:
1133 column = [str(_tmp_cols2)]
1134 self.disp.log_debug(
1135 "Made sure that the column was of the appropriate type"
1136 )
1137
1138 if isinstance(column, str) and isinstance(data, str):
1139 data = [data]
1140 column = [column]
1141 column_length = len(column)
1142
1143 column_length = len(column)
1144 self.disp.log_debug(
1145 f"data = {data}, column = {column}, length = {column_length}",
1146 title
1147 )
1148
1149 # Build SET clause with placeholders and parameter list
1150 self.disp.log_debug("Building the set clause and parameters")
1151 set_parts: List[str] = []
1152 params: List[Union[str, None, int, float]] = []
1153 for i in range(column_length):
1154 set_parts.append(f"{column[i]} = %s")
1155 if i < len(data):
1156 v = data[i]
1157 else:
1158 v = None
1159 normalised_cell: Union[
1160 int,
1161 str,
1162 float,
1163 None
1164 ] = self._normalize_cell(v)
1165 self.disp.log_debug(f"Normalised cell: {normalised_cell}")
1166 params.append(normalised_cell)
1167 self.disp.log_debug(f"Set clause and parameters built: {set_parts}")
1168 self.disp.log_debug(f"Parameters: {params}")
1169
1170 update_line = ", ".join(set_parts)
1171 sql_query = f"UPDATE {table} SET {update_line}"
1172
1173 # Parse WHERE clause and add its parameters
1174 try:
1175 where_clause, where_params = self._parse_where_clause(where)
1176 except RuntimeError as e:
1177 self.disp.log_error(f"WHERE clause parsing failed: {e}", title)
1178 return self.error
1179 params.extend(where_params)
1180
1181 if where_clause != "":
1182 sql_query += f" WHERE {where_clause}"
1183
1184 self.disp.log_debug(
1185 f"sql_query = '{sql_query}', params = {params}",
1186 title
1187 )
1188
1189 return self.sql_pool.run_editing_command(sql_query, params, table=table, action_type="update")
1190
1191 def insert_or_update_data_into_table(self, table: str, data: Union[List[List[Union[str, None, int, float]]], List[Union[str, None, int, float]]], columns: Union[List[str], None] = None) -> int:
1192 """Insert or update rows using the first column as key.
1193
1194 Args:
1195 table (str): Table name.
1196 data (Union[List[List[Union[str, None, int, float]]], List[Union[str, None, int, float]]]): Single row or list of rows to upsert.
1197 columns (Union[List[str], None], optional): Column names for ``data``; when None, infer columns from the table.
1198
1199 Returns:
1200 int: ``self.success`` on success, or ``self.error`` on failure.
1201 """
1202 title = "insert_or_update_data_into_table"
1203 self.disp.log_debug(
1204 "Inserting or updating data into the table.", title
1205 )
1206
1207 check_list = [table]
1208 if columns:
1209 check_list.extend(columns)
1210 if self.sql_injection.check_if_injections_in_strings(check_list):
1211 self.disp.log_error("SQL Injection detected.", "sql")
1212 return self.error
1213
1214 if columns is None:
1215 cols_raw = self.get_table_column_names(table)
1216 if isinstance(cols_raw, int):
1217 return self.error
1218 columns = cols_raw
1219
1220 # ensure columns is a concrete list for downstream calls
1221 if columns is not None and not isinstance(columns, list):
1222 try:
1223 columns = list(columns)
1224 except TypeError:
1225 columns = [str(columns)]
1226
1228 table=table, column=columns, where="", beautify=False
1229 )
1230 # ensure table_content is iterable
1231 if isinstance(table_content, int):
1232 if table_content != self.success:
1233 self.disp.log_critical(
1234 f"Failed to retrieve data from table {table}", title
1235 )
1236 return self.error
1237 table_content_list = table_content
1238 # table_content_list is now safe to iterate over (ensure runtime type for static checkers)
1239 if not isinstance(table_content_list, list):
1240 self.disp.log_error(
1241 f"Unexpected table content type for table {table}", title
1242 )
1243 return self.error
1244
1245 if isinstance(data, list) and data and isinstance(data[0], list):
1246 self.disp.log_debug("Processing double data List", title)
1247 table_content_dict = {}
1248 for line in table_content_list:
1249 table_content_dict[str(line[0])] = line
1250
1251 for line in data:
1252 if not line:
1253 self.disp.log_warning("Empty line, skipping.", title)
1254 continue
1255 # narrow type for the linter/typing
1256 if isinstance(line, str):
1257 line_list: List = [line]
1258 elif not isinstance(line, list):
1259 line_list: List = [line]
1260 else:
1261 line_list = line
1262 node0 = str(line_list[0])
1263 if node0 in table_content_dict:
1265 table,
1266 line_list,
1267 columns,
1268 f"{columns[0]} = {node0}"
1269 )
1270 else:
1271 # ensure column arg is a concrete list
1272 if isinstance(columns, list):
1273 cols: List = columns
1274 else:
1275 cols: List = [columns]
1276 self.insert_data_into_table(table, line_list, cols)
1277 # finished processing multiple rows
1278 return self.success
1279
1280 # Single-row processing
1281 if isinstance(data, list):
1282 self.disp.log_debug("Processing single data List", title)
1283 if not data:
1284 self.disp.log_warning("Empty data List, skipping.", title)
1285 return self.success
1286
1287 node0 = str(data[0])
1288 # If a row with the same first-column key exists, update it
1289 for line in table_content_list:
1290 if str(line[0]) == node0:
1291 return self.update_data_in_table(
1292 table, data, columns, f"{columns[0]} = {node0}"
1293 )
1294
1295 # No existing row found — insert as new row
1296 if isinstance(columns, list):
1297 cols = columns
1298 else:
1299 cols = [columns]
1300 return self.insert_data_into_table(table, data, cols)
1301
1302 # If we reach here, the input type was unexpected
1303 self.disp.log_error(
1304 "Data must be of type List[str] or List[List[str]]", title
1305 )
1306 return self.error
1307
1308 def insert_or_update_trigger(self, trigger_name: str, table_name: str, timing_event: str, body: str) -> int:
1309 """Insert (create) or update an SQL trigger into a MySQL or MariaDB database.
1310
1311 Args:
1312 trigger_name (str): The name of the trigger to create.
1313 table_name (str): The name of the table the trigger is being applied to.
1314 timing_event (str): The rule when the event is to be triggered. e.g., 'BEFORE INSERT'.
1315 body (str): The full SQL CREATE TRIGGER statement.
1316
1317 Returns:
1318 int: ``self.success`` on success, or ``self.error`` on failure.
1319 """
1320 title = "insert_or_update_trigger"
1321 self.disp.log_debug(
1322 f"Creating or replacing trigger: {trigger_name}", title
1323 )
1324 # --- SQL injection prevention ---
1325 if self.sql_injection.check_if_injections_in_strings([trigger_name, table_name]):
1326 self.disp.log_error("SQL injection detected.", title)
1327 return self.error
1328
1329 if self.sql_injection.check_if_symbol_and_logic_gate_injection(timing_event):
1330 self.disp.log_error("SQL injection detected", title)
1331 return self.error
1332 # First, drop the existing trigger (if any)
1333 drop_result = self.remove_trigger(trigger_name)
1334 if drop_result not in (self.success, self.error):
1335 self.disp.log_warning(
1336 f"Unexpected drop_trigger result: {drop_result}", title
1337 )
1338
1339 # Insert the new one
1340 return self.insert_trigger(trigger_name, table_name, timing_event, body)
1341
1342 def remove_data_from_table(self, table: str, where: Union[str, List[str]] = "") -> int:
1343 """Delete rows from a table.
1344
1345 Args:
1346 table (str): Table name.
1347 where (Union[str, List[str]]): Optional WHERE clause to limit deletions.
1348
1349 Returns:
1350 int: ``self.success`` on success, or ``self.error`` on failure.
1351 """
1352 self.disp.log_debug(
1353 f"Removing data from table {table}",
1354 "remove_data_from_table"
1355 )
1356
1357 # Only check table name for injection — WHERE values will be parameterized
1358 if self.sql_injection.check_if_sql_injection(table):
1359 self.disp.log_error("Injection detected.", "sql")
1360 return self.error
1361
1362 sql_query = f"DELETE FROM {table}"
1363
1364 # Parse WHERE clause and extract values for parameterization
1365 try:
1366 where_clause, where_params = self._parse_where_clause(where)
1367 except RuntimeError as e:
1368 self.disp.log_error(
1369 f"WHERE clause parsing failed: {e}"
1370 )
1371 return self.error
1372
1373 if where_clause != "":
1374 sql_query += f" WHERE {where_clause}"
1375
1376 self.disp.log_debug(
1377 f"sql_query = '{sql_query}', params = {where_params}"
1378 )
1379
1380 return self.sql_pool.run_editing_command(sql_query, where_params, table, "delete")
1381
1382 def remove_table(self, table: str) -> int:
1383 """Drop a table from the MySQL database.
1384
1385 Args:
1386 table (str): Name of the table to drop.
1387
1388 Returns:
1389 int: ``self.success`` on success, or ``self.error`` on failure.
1390
1391 Notes:
1392 - Performs SQL injection detection on the table name.
1393 - Uses ``DROP TABLE IF EXISTS`` to avoid errors when the table is missing.
1394 """
1395 title = "drop_table"
1396 self.disp.log_debug(f"Dropping table '{table}'", title)
1397
1398 # --- SQL injection protection ---
1399 if self.sql_injection.check_if_injections_in_strings([table]):
1400 self.disp.log_error("Injection detected in table name.", title)
1401 return self.error
1402
1403 try:
1404 # Escape quotes for safety
1405 table_safe = table.replace("'", "''")
1406 query = f"DROP TABLE IF EXISTS '{table_safe}';"
1407 self.disp.log_debug(f"Executing SQL: {query}", title)
1408
1409 result = self.sql_pool.run_and_commit(query=query, values=[])
1410 if isinstance(result, int) and result == self.error:
1411 self.disp.log_error(f"Failed to drop table '{table}'", title)
1412 return self.error
1413
1414 self.disp.log_info(f"Table '{table}' dropped successfully.", title)
1415 return self.success
1416
1417 except mysql.connector.PoolError as oe:
1418 msg = f"OperationalError while dropping table '{table}': {oe}"
1419 self.disp.log_critical(msg, title)
1420 return self.error
1421 except mysql.connector.ProgrammingError as e:
1422 msg = f"SQL Error while dropping table '{table}': {e}"
1423 self.disp.log_critical(msg, title)
1424 return self.error
1425 except Exception as e:
1426 msg = f"Unexpected error while dropping table '{table}': {e}"
1427 self.disp.log_critical(msg, title)
1428 return self.error
1429
1430 def remove_trigger(self, trigger_name: str) -> int:
1431 """Drop/Remove an existing SQL trigger if it exists.
1432
1433 Args:
1434 trigger_name (str): Name of the trigger to drop.
1435
1436 Returns:
1437 int: ``self.success`` on success, or ``self.error`` on error.
1438 """
1439 title = "drop_trigger"
1440 self.disp.log_debug(f"Dropping trigger: {trigger_name}", title)
1441
1442 if not trigger_name:
1443 self.disp.log_error("Trigger name cannot be empty.", title)
1444 return self.error
1445
1446 # Sanitize to prevent injections
1447 if self.sql_injection.check_if_injections_in_strings([trigger_name]):
1448 self.disp.log_error(
1449 "SQL Injection detected in trigger name.", title
1450 )
1451 return self.error
1452
1453 sql_query = f"DROP TRIGGER IF EXISTS {trigger_name};"
1454 self.disp.log_debug(f"Executing SQL:\n{sql_query}", title)
1455
1456 result = self.sql_pool.run_editing_command(
1457 sql_query, [], trigger_name, "drop_trigger")
1458 if result != self.success:
1459 self.disp.log_error(
1460 f"Failed to drop trigger '{trigger_name}'.", title
1461 )
1462 return self.error
1463
1464 self.disp.log_info(
1465 f"Trigger '{trigger_name}' dropped successfully.", title
1466 )
1467 return self.success
Union[int, List[Dict[str, Any]]] get_data_from_table(self, str table, Union[str, List[str]] column, Union[str, List[str]] where="", Literal[True] beautify=True)
int insert_or_update_data_into_table(self, str table, Union[List[List[Union[str, None, int, float]]], List[Union[str, None, int, float]]] data, Union[List[str], None] columns=None)
Union[int, List[Tuple[Any, Any]]] get_data_from_table(self, str table, Union[str, List[str]] column, Union[str, List[str]] where="", Literal[False] beautify=False)
Union[int, Union[List[Dict[str, Any]], List[Tuple[Any, Any]]]] get_data_from_table(self, str table, Union[str, List[str]] column, Union[str, List[str]] where="", bool beautify=True)
int insert_trigger(self, str trigger_name, str table_name, str timing_event, str body)
bool _where_space_handler(self, str token, List[str] rebuilt_tokens, bool skip_space=False)
int update_data_in_table(self, str table, Union[List[List[Union[str, None, int, float]]], List[Union[str, None, int, float]]] data, List[str] column, Union[str, List[str]] where="")
None __init__(self, SQLManageConnections sql_pool, int success=0, int error=84, bool debug=False)
int insert_or_update_trigger(self, str trigger_name, str table_name, str timing_event, str body)
int remove_data_from_table(self, str table, Union[str, List[str]] where="")
Union[str, None, int, float] _normalize_cell(self, object cell)
Union[int, str] get_trigger(self, str trigger_name, Optional[str] db_name=None)
Union[int, List[str]] get_trigger_names(self, Optional[str] db_name=None)
int create_table(self, str table, List[Tuple[str, str]] columns)
int get_table_size(self, str table, Union[str, List[str]] column, Union[str, List[str]] where="")
Tuple[str, List[Union[str, int, float, None]]] _parse_where_clause(self, Union[str, List[str]] where)
int insert_data_into_table(self, str table, Union[List[List[Union[str, None, int, float]]], List[Union[str, None, int, float]]] data, Union[List[str], None] column=None)
Tuple[str, List[Union[str, int, float, None]]] _check_complex_clause_for_injection(self, str clause_str)