2# +==== BEGIN CatFeeder =================+
5# ...............)..(.')
7# ...............\(__)|
8# Inspired by Joan Stark
9# source https://www.asciiart.eu/
13# FILE: sql_connections.py
14# CREATION DATE: 11-10-2025
15# LAST Modified: 14:52:3 19-12-2025
17# This is the backend server in charge of making the actual website work.
19# COPYRIGHT: (c) Cat Feeder
20# PURPOSE: File in charge of containing the class that will manage the sql connections.
22# +==== END CatFeeder =================+
26from typing
import Union, Any, List, Optional
27from threading
import RLock
31import mysql.connector.cursor
32from display_tty
import Disp, initialise_logger
34from .
import sql_constants
as SCONST
35from ..utils
import constants
as CONST
40 Async connection manager for SQL using `mysql`.
42 Provides a small, async-friendly facade around an
43 :class:`mysql.Pool` instance. Access is serialized using an
46 disp (Disp): Logger instance for debugging and error reporting.
47 error (int): Numeric error code.
48 success (int): Numeric success code.
49 debug (bool): Debug mode flag.
50 url (str): Host or URL string.
51 port (int): Port number.
52 username (str): Username for the database.
53 password (str): Password for the database.
54 db_name (str): Name of the database to connect to.
55 pool_parameters (dict): Configuration for the connection pool.
56 pool (Optional[mysql.connector.pooling.MySQLConnectionPool]): Connection pool instance.
60 disp: Disp = initialise_logger(__qualname__,
False)
74 Initialize the connection manager instance.
77 url (str): Host or URL string.
78 port (int): Port number.
79 username (str): Username for the database.
80 password (str): Password for the database.
81 db_name (str): Name of the database to connect to.
82 success (int, optional): Success return code. Defaults to 0.
83 error (int, optional): Error return code. Defaults to 84.
84 debug (bool, optional): Enable debug logging. Defaults to False.
87 self.
disp.update_disp_debug(debug)
88 self.
disp.log_debug(
"Initialising...")
102 "pool_name": CONST.DATABASE_POOL_NAME,
103 "pool_size": CONST.DATABASE_MAX_POOL_CONNECTIONS,
104 "pool_reset_session": CONST.DATABASE_RESET_POOL_NODE_CONNECTION,
110 "collation": CONST.DATABASE_COLLATION,
111 "connection_timeout": CONST.DATABASE_CONNECTION_TIMEOUT,
112 "allow_local_infile": CONST.DATABASE_LOCAL_INFILE,
113 "init_command": CONST.DATABASE_INIT_COMMAND,
114 "option_files": CONST.DATABASE_DEFAULT_FILE,
115 "autocommit": CONST.DATABASE_AUTOCOMMIT,
116 "ssl_disabled":
not CONST.DATABASE_SSL,
117 "ssl_key": CONST.DATABASE_SSL_KEY,
118 "ssl_cert": CONST.DATABASE_SSL_CERT,
119 "ssl_ca": CONST.DATABASE_SSL_CA,
120 "ssl_cipher": CONST.DATABASE_SSL_CIPHER,
121 "ssl_verify_cert": CONST.DATABASE_SSL_VERIFY_CERT
126 mysql.connector.pooling.MySQLConnectionPool
128 self.
disp.log_debug(
"Initialised")
132 Log connection metadata for debugging.
134 This method does not perform any I/O; it only logs the configured
135 connection information (database filename, host/url, and port) via the
136 project's logging helper.
139 func_name (str): Optional title used in the logger.
143 msg += f
"{key} = '{value}': Type: {type(value)}\n"
144 self.
disp.log_debug(msg, func_name)
147 """Check if there is an active connection to the database.
150 bool: The state of the connection.
160 Initialize a connection pool for the database.
163 RuntimeError: If the pool initialization fails.
166 int: `self.success` if the function succeeds, otherwise raises an error.
168 title =
"initialise_pool"
169 self.
disp.log_debug(
"Initialising the connection pool.", title)
172 "pool_name": CONST.DATABASE_POOL_NAME,
173 "pool_size": CONST.DATABASE_MAX_POOL_CONNECTIONS,
174 "pool_reset_session": CONST.DATABASE_RESET_POOL_NODE_CONNECTION,
180 "collation": CONST.DATABASE_COLLATION,
181 "connection_timeout": CONST.DATABASE_CONNECTION_TIMEOUT,
182 "allow_local_infile": CONST.DATABASE_LOCAL_INFILE,
183 "init_command": CONST.DATABASE_INIT_COMMAND,
184 "option_files": CONST.DATABASE_DEFAULT_FILE,
185 "autocommit": CONST.DATABASE_AUTOCOMMIT,
186 "ssl_ca": CONST.DATABASE_SSL_CA,
187 "ssl_cert": CONST.DATABASE_SSL_CERT,
188 "ssl_key": CONST.DATABASE_SSL_KEY,
189 "ssl_verify_cert":
False,
190 "ssl_verify_identity":
False,
191 "allow_local_infile_in_path":
None
194 for i
in SCONST.UNWANTED_ARGUMENTS:
197 f
"Removed '{i}' from the pool parameters.", title
202 self.
poolpool = mysql.connector.pooling.MySQLConnectionPool(
206 except mysql.connector.errors.ProgrammingError
as pe:
207 msg =
"ProgrammingError: The pool could not be initialized."
208 msg += f
"Original error: {str(pe)}"
209 self.
disp.log_critical(msg, title)
210 raise RuntimeError(msg)
from pe
211 except mysql.connector.errors.IntegrityError
as ie:
212 msg =
"IntegrityError: Integrity issue while initializing the pool."
213 msg += f
" Original error: {str(ie)}"
214 self.
disp.log_critical(msg, title)
215 raise RuntimeError(msg)
from ie
216 except mysql.connector.errors.OperationalError
as oe:
217 msg =
"OperationalError: Operational error occurred during pool initialization."
218 msg += f
" Original error: {str(oe)}"
219 self.
disp.log_critical(msg, title)
220 raise RuntimeError(msg)
from oe
221 except mysql.connector.Error
as e:
222 msg =
"MySQL Error: An unexpected error occurred during pool initialization."
223 msg += f
"Original error: {str(e)}"
224 self.
disp.log_critical(msg, title)
225 raise RuntimeError(msg)
from e
229 Retrieve a connection from the pool.
232 mysql.connector.pooling.PooledMySQLConnection: A pooled connection instance.
235 RuntimeError: If the connection pool is not initialized or an error occurs.
237 title =
"get_connection"
239 raise RuntimeError(
"Connection pool is not initialized.")
241 self.
disp.log_debug(
"Getting an sql connection", title)
243 except mysql.connector.errors.OperationalError
as oe:
244 msg =
"OperationalError: Could not retrieve a connection from the pool."
245 msg += f
" Original error: {str(oe)}"
246 self.
disp.log_critical(msg, title)
247 raise RuntimeError(msg)
from oe
248 except mysql.connector.Error
as e:
249 msg =
"MySQL Error: An unexpected error occurred while getting the connection."
250 msg += f
" Original error: {str(e)}"
251 self.
disp.log_critical(msg, title)
252 raise RuntimeError(msg)
from e
254 def get_cursor(self, connection: mysql.connector.pooling.PooledMySQLConnection) -> mysql.connector.cursor.MySQLCursor:
256 Retrieve a cursor from the given connection.
259 connection (mysql.connector.pooling.PooledMySQLConnection): The active connection.
262 mysql.connector.cursor.MySQLCursor: The cursor object.
265 RuntimeError: If the connection is not active.
268 raise RuntimeError(
"Cannot get cursor, connection is not active.")
269 return connection.cursor()
271 def close_cursor(self, cursor: mysql.connector.cursor.MySQLCursor) -> int:
273 Close the given cursor.
276 cursor (mysql.connector.cursor.MySQLCursor): The cursor to close.
279 int: `self.success` if the cursor is closed successfully, otherwise `self.error`.
281 title =
"close_cursor"
282 self.
disp.log_debug(
"Closing cursor, if it is open.", title)
284 self.
disp.log_debug(
"Closing cursor", title)
289 "The cursor did not have an active connection.", title
295 Return a connection to the pool by closing it.
298 connection (mysql.connector.pooling.PooledMySQLConnection): The connection to close.
301 int: `self.success` if the connection is closed successfully, otherwise `self.error`.
303 title =
"return_connection"
304 self.
disp.log_debug(
"Closing a database connection.", title)
306 self.
disp.log_debug(
"Connection has been closed.", title)
310 "Connection was not open in the first place.", title
316 Destroy the connection pool.
319 int: `self.success` if the pool is destroyed successfully, otherwise `self.error`.
321 title =
"destroy_pool"
322 self.
disp.log_debug(
"Destroying pool, if it exists.", title)
324 self.
disp.log_debug(
"Destroying pool.", title)
327 self.
disp.log_warning(
"There was no pool to be destroyed.", title)
330 def release_connection_and_cursor(self, connection: Union[mysql.connector.pooling.PooledMySQLConnection,
None], cursor: Union[mysql.connector.cursor.MySQLCursor,
None] =
None) ->
None:
332 Release both the connection and cursor.
335 connection (Optional[mysql.connector.pooling.PooledMySQLConnection]): The connection to release.
336 cursor (Optional[mysql.connector.cursor.MySQLCursor]): The cursor to release.
338 title =
"release_connection_and_cursor"
339 msg =
"Connections have ended with status: "
340 self.
disp.log_debug(
"Closing cursor.", title)
341 if cursor
is not None:
343 msg += f
"cursor = {status}, "
344 self.
disp.log_debug(
"Closing connection.", title)
345 if connection
is not None:
347 msg += f
"connection = {status}"
348 self.
disp.log_debug(msg, title)
350 def run_and_commit(self, query: str, values: Optional[List[Union[str, int, float,
None]]] =
None, cursor: Union[mysql.connector.cursor.MySQLCursor,
None] =
None) -> int:
352 Execute a query and commit changes.
355 query (str): The SQL query to execute.
356 values (Optional[List[Union[str, int, float, None]]]): Values to bind to the query.
357 cursor (Optional[mysql.connector.cursor.MySQLCursor]): The cursor to use for execution.
360 int: `self.success` if the query executes successfully, otherwise `self.error`.
362 title =
"run_and_commit"
363 self.
disp.log_debug(
"Running and committing sql query.", title)
365 f
"Arguments: Query='{query}', Values='{values}', cursor='{cursor}'",
369 self.
disp.log_debug(
"No cursor found, generating one.", title)
371 if connection
is None:
372 self.
disp.log_critical(SCONST.CONNECTION_FAILED, title)
375 if internal_cursor
is None:
376 self.
disp.log_critical(SCONST.CURSOR_FAILED, title)
379 self.
disp.log_debug(
"Cursor found, using it.", title)
380 internal_cursor = cursor
383 f
"Executing query: {query} with parameter(s): {values}",
386 internal_cursor.execute(query, params=values)
387 self.
disp.log_debug(
"Committing content.", title)
388 con: Optional[mysql.connector.MySQLConnection] = getattr(
389 internal_cursor,
"_connection",
None)
393 self.
disp.log_warning(
394 "No internal cursor found, skipping manual commit.", title
398 "The cursor was generated by us, releasing.", title
403 "The cursor was provided, not releasing.", title
406 except mysql.connector.errors.ProgrammingError
as pe:
407 msg =
"ProgrammingError: Failed to execute the query."
408 msg += f
" Original error: {str(pe)}"
409 self.
disp.log_error(msg, title)
412 "The cursor was generated by us, releasing.", title
417 "The cursor was provided, not releasing.", title
419 raise RuntimeError(msg)
from pe
420 except mysql.connector.errors.IntegrityError
as ie:
421 msg =
"IntegrityError: Integrity constraint issue occurred during query execution."
422 msg += f
" Original error: {str(ie)}"
423 self.
disp.log_error(msg, title)
426 "The cursor was generated by us, releasing.", title
431 "The cursor was provided, not releasing.", title
433 raise RuntimeError(msg)
from ie
434 except mysql.connector.errors.OperationalError
as oe:
435 msg =
"OperationalError: Operational error occurred during query execution."
436 msg += f
" Original error: {str(oe)}"
437 self.
disp.log_error(msg, title)
440 "The cursor was generated by us, releasing.", title
445 "The cursor was provided, not releasing.", title
447 raise RuntimeError(msg)
from oe
448 except mysql.connector.Error
as e:
449 msg =
"MySQL Error: An unexpected error occurred during query execution."
450 msg += f
" Original error: {str(e)}"
451 self.
disp.log_error(msg, title)
454 "The cursor was generated by us, releasing.", title
459 "The cursor was provided, not releasing.", title
461 raise RuntimeError(msg)
from e
463 def run_and_fetch_all(self, query: str, values: Optional[List[Union[str, int, float,
None]]] =
None, cursor: Union[mysql.connector.cursor.MySQLCursor,
None] =
None) -> Union[int, Any]:
465 Execute a SELECT-style query and return fetched rows.
468 query (str): The SQL SELECT statement to execute.
469 values (Optional[List[Union[str, int, float, None]]]): Values to bind to the query.
470 cursor (Optional[mysql.connector.cursor.MySQLCursor]): The cursor to use for execution.
473 Union[int, Any]: The fetched rows (usually a List[tuple]) or `self.error` on failure.
475 title =
"run_and_fetchall"
478 if connection
is None:
479 self.
disp.log_critical(SCONST.CONNECTION_FAILED, title)
482 if internal_cursor
is None:
483 self.
disp.log_critical(SCONST.CURSOR_FAILED, title)
486 internal_cursor = cursor
489 f
"Executing query: {query}, values: {values}.", title)
490 internal_cursor.execute(query, params=values)
491 if internal_cursor
is None or internal_cursor.description
is None:
493 "Failed to gather data from the table, cursor is invalid.", title
497 "The cursor was generated by us, releasing.", title
500 connection, internal_cursor
504 "The cursor was provided, not releasing.", title
508 "Storing a copy of the content of the cursor.", title
510 raw_data = internal_cursor.fetchall()
511 self.
disp.log_debug(f
"Raw gathered data {raw_data}", title)
512 data = raw_data.copy()
513 self.
disp.log_debug(f
"Data gathered: {data}.", title)
516 "The cursor was generated by us, releasing.", title
521 "The cursor was provided, not releasing.", title
524 except mysql.connector.errors.ProgrammingError
as pe:
525 msg =
"ProgrammingError: Failed to execute the query."
526 msg += f
" Original error: {str(pe)}"
527 self.
disp.log_error(msg, title)
530 "The cursor was generated by us, releasing.", title
535 "The cursor was provided, not releasing.", title
537 raise RuntimeError(msg)
from pe
538 except mysql.connector.errors.IntegrityError
as ie:
539 msg =
"IntegrityError: Integrity constraint issue occurred during query execution."
540 msg += f
" Original error: {str(ie)}"
541 self.
disp.log_error(msg, title)
544 "The cursor was generated by us, releasing.", title
549 "The cursor was provided, not releasing.", title
551 raise RuntimeError(msg)
from ie
552 except mysql.connector.errors.OperationalError
as oe:
553 msg =
"OperationalError: Operational error occurred during query execution."
554 msg += f
" Original error: {str(oe)}"
555 self.
disp.log_error(msg, title)
558 "The cursor was generated by us, releasing.", title
563 "The cursor was provided, not releasing.", title
565 raise RuntimeError(msg)
from oe
566 except mysql.connector.Error
as e:
567 msg =
"MySQL Error: An unexpected error occurred during query execution."
568 msg += f
" Original error: {str(e)}"
569 self.
disp.log_error(msg, title)
572 "The cursor was generated by us, releasing.", title
577 "The cursor was provided, not releasing.", title
579 raise RuntimeError(msg)
from e
581 def run_editing_command(self, sql_query: str, values: Optional[List[Union[str, int, float,
None]]] =
None, table: str =
"<not_specified>", action_type: str =
"update") -> int:
583 Execute an editing command (e.g., INSERT, UPDATE, DELETE).
586 sql_query (str): The SQL query to execute.
587 values (Optional[List[Union[str, int, float, None]]]): Values to bind to the query.
588 table (str): The name of the table being modified.
589 action_type (str): The type of action being performed (e.g., "update").
592 int: `self.success` if the command executes successfully, otherwise `self.error`.
594 title =
"run_editing_command"
596 "Going to run a command that will alter the database."
599 f
"Query: '{sql_query}', values: '{values}', table: '{table}', action_type: '{action_type}'",
606 f
"Failed to {action_type} data in '{table}'.", title
609 self.
disp.log_debug(
"command ran successfully.", title)
611 except mysql.connector.Error
as e:
613 f
"Failed to {action_type} data in '{table}': {str(e)}", title
625 Check if the connection pool is active.
628 bool: True if the pool is active, False otherwise.
630 title =
"is_pool_active"
631 self.
disp.log_debug(
"Checking if the connection is active.", title)
634 self.
disp.log_debug(
"The connection is active.", title)
636 self.
disp.log_error(
"The connection is not active.", title)
641 Check if the connection is active.
644 connection (mysql.connector.pooling.PooledMySQLConnection): The connection to check.
647 bool: True if the connection is active, False otherwise.
649 title =
"is_connection_active"
651 "Checking if the connection is active.", title
655 connection.ping(reconnect=
False)
656 self.
disp.log_debug(
"The connection is active.", title)
658 except (mysql.connector.Error, mysql.connector.errors.Error):
659 self.
disp.log_error(
"The connection is not active.", title)
661 self.
disp.log_error(
"The connection is not active.", title)
666 Check if the cursor is active.
669 cursor (mysql.connector.cursor.MySQLCursor): The cursor to check.
672 bool: True if the cursor is active, False otherwise.
674 title =
"is_cursor_active"
676 "Checking if the provided cursor is active.", title
678 self.
disp.log_debug(f
"Content of the cursor: {dir(cursor)}.", title)
679 con = getattr(cursor,
"_connection",
None)
680 resp = cursor
is not None and con
is not None
682 self.
disp.log_debug(
"The cursor is active.", title)
684 self.
disp.log_error(
"The cursor is not active.", title)
int initialise_pool(self)
mysql.connector.pooling.PooledMySQLConnection get_connection(self)
Union[int, Any] run_and_fetch_all(self, str query, Optional[List[Union[str, int, float, None]]] values=None, Union[mysql.connector.cursor.MySQLCursor, None] cursor=None)
mysql.connector.cursor.MySQLCursor get_cursor(self, mysql.connector.pooling.PooledMySQLConnection connection)
None show_connection_info(self, str func_name="show_connection_info")
bool is_pool_active(self)
None __init__(self, str url, int port, str username, str password, str db_name, int success=0, int error=84, bool debug=False)
bool is_connection_active(self, mysql.connector.pooling.PooledMySQLConnection connection)
int return_connection(self, mysql.connector.pooling.PooledMySQLConnection connection)
int run_and_commit(self, str query, Optional[List[Union[str, int, float, None]]] values=None, Union[mysql.connector.cursor.MySQLCursor, None] cursor=None)
None release_connection_and_cursor(self, Union[mysql.connector.pooling.PooledMySQLConnection, None] connection, Union[mysql.connector.cursor.MySQLCursor, None] cursor=None)
int close_cursor(self, mysql.connector.cursor.MySQLCursor cursor)
bool is_cursor_active(self, mysql.connector.cursor.MySQLCursor cursor)
int run_editing_command(self, str sql_query, Optional[List[Union[str, int, float, None]]] values=None, str table="<not_specified>", str action_type="update")