SQL Module
Overview
The sql module provides a high-level interface for database operations with MySQL, featuring connection pooling, Redis-backed caching, query sanitization, and time manipulation utilities.
Location: backend/src/libs/sql/
Key Files:
Purpose
- Database Abstraction: High-level API for common database operations
- Connection Pooling: Efficient connection reuse and management
- Caching Layer: Redis-backed query result caching with TTL
- Query Safety: SQL injection detection and sanitization
- Time Handling: Conversion between Python datetime and SQL timestamps
- Performance: Reduced database load through intelligent caching
Architecture
SQL Class: Main Interface
Overview
SQL is the primary entry point for database operations. It uses FinalClass metaclass (not FinalSingleton), meaning multiple instances can exist but it cannot be inherited.
Initialization
from sql import SQL
from core import RuntimeManager
sql = SQL.create(
url="localhost",
port=3306,
username="Cat Feeder",
password="secure_password",
db_name="Cat Feeder_db"
)
RuntimeManager.set(SQL,
url="localhost",
port=3306,
username="Cat Feeder",
password="secure_password",
db_name="Cat Feeder_db"
)
sql = RuntimeManager.get(SQL)
Core Operations
SELECT Queries
users = sql.get_data_from_table(
table="users",
column=["id", "username", "email"],
where=["active=1"],
beautify=True
)
user = sql.get_data_from_table(
table="users",
column=["*"],
where=["id=1"],
beautify=True
)
INSERT Operations
result = sql.insert_data_into_table(
table="users",
data=["alice", "alice@example.com", "hashed_password"],
column=["username", "email", "password_hash"]
)
UPDATE Operations
result = sql.update_data_in_table(
table="users",
data=["newemail@example.com"],
column=["email"],
where=["id=1"]
)
DELETE Operations
result = sql.delete_data_from_table(
table="users",
where=["id=1"]
)
Existence Checks
exists = sql.check_if_data_exists(
table="users",
where=["username='alice'"]
)
Connection Management
Connection Pool
SQLManageConnections
Manages a pool of database connections for efficiency:
class SQLManageConnections:
def __init__(self, url, port, username, password, db_name):
self.pool = self._create_pool()
def get_connection(self):
"""Get connection from pool (thread-safe)."""
return self.pool.get_connection()
def execute_query(self, query, params=None):
"""Execute query using pooled connection."""
conn = self.get_connection()
try:
cursor = conn.cursor()
cursor.execute(query, params)
return cursor.fetchall()
finally:
conn.close()
Pool Configuration
Environment variables for connection pool:
SQL_POOL_SIZE: Maximum pool size (default: 10)
SQL_POOL_TIMEOUT: Connection timeout (default: 30s)
SQL_MAX_OVERFLOW: Extra connections beyond pool_size (default: 5)
Caching Strategy
Query Flow with Cache
SQLCacheOrchestrator
Coordinates between SQL queries and Redis cache:
class SQLCacheOrchestrator:
def get_cached_or_fetch(self, table, columns, where, fetcher, ttl):
cache_key = self._generate_key(table, columns, where)
cached = self.redis_cache.get(cache_key)
if cached is not None:
return cached
result = fetcher()
self.redis_cache.set(cache_key, result, ttl)
return result
def invalidate_table_cache(self, table):
"""Clear all cache entries for a table."""
pattern = f"sql:{table}:*"
self.redis_cache.invalidate(pattern)
Cache TTLs
Default time-to-live values:
default_ttls = {
"data": 300,
"schema": 3600,
"metadata": 7200,
"count": 60
}
Cache Invalidation
sql.update_data_in_table(...)
sql.sql_cache_orchestrator.invalidate_table_cache("users")
sql.sql_cache_orchestrator.redis_cache.invalidate("sql:users:id=*")
Query Safety
SQL Injection Prevention
Sanitization Functions
from sql.sql_sanitisation_functions import (
sanitize_column_name,
sanitize_table_name,
sanitize_value
)
table = sanitize_table_name(user_input_table)
column = sanitize_column_name(user_input_column)
value = sanitize_value(user_input)
Injection Detection
from sql.sql_injection import detect_sql_injection
user_input = "'; DROP TABLE users; --"
if detect_sql_injection(user_input):
raise ValueError("Potential SQL injection detected")
Parameterized Queries
The SQL module uses parameterized queries internally:
sql.get_data_from_table(
table="users",
where=[f"username='{username}'"]
)
from sql.sql_sanitisation_functions import sanitize_value
sql.get_data_from_table(
table="users",
where=[f"username={sanitize_value(username)}"]
)
sql.get_data_from_table(
table="users",
column=["*"],
where=["username=?"],
params=[username]
)
Time Manipulation
SQLTimeManipulation
Handles conversion between Python datetime and SQL timestamps:
from sql.sql_time_manipulation import SQLTimeManipulation
time_utils = SQLTimeManipulation()
from datetime import datetime
now = datetime.now()
sql_timestamp = time_utils.convert_to_sql_datetime(now)
sql_time = "2025-12-02 15:30:45"
py_datetime = time_utils.convert_from_sql_datetime(sql_time)
current = time_utils.get_current_sql_timestamp()
Usage in Queries
time_utils = sql.sql_time_manipulation
timestamp = time_utils.get_current_sql_timestamp()
sql.insert_data_into_table(
table="events",
data=["user_login", timestamp],
column=["event_type", "created_at"]
)
Query Boilerplates
SQLQueryBoilerplates
Provides helper methods for building common SQL queries:
from sql.sql_query_boilerplates import SQLQueryBoilerplates
qb = SQLQueryBoilerplates()
query = qb.build_select_query(
table="users",
columns=["id", "username"],
where_clauses=["active=1"],
order_by="created_at DESC",
limit=10
)
query = qb.build_insert_query(
table="users",
columns=["username", "email"],
values=["alice", "alice@example.com"]
)
query = qb.build_update_query(
table="users",
columns=["email"],
values=["newemail@example.com"],
where_clauses=["id=1"]
)
query = qb.build_delete_query(
table="users",
where_clauses=["id=1"]
)
Error Handling
Exception Types
try:
sql.get_data_from_table(...)
except mysql_connector.Error as e:
print(f"Database error: {e}")
except AttributeError:
print("SQL not properly initialized")
except RuntimeError as e:
print(f"Cache error: {e}")
Return Codes
result = sql.insert_data_into_table(...)
if result == sql.success:
print("Insert successful")
else:
print("Insert failed")
Performance Optimization
Best Practices
1. Use Caching for Repeated Queries
users = sql.get_data_from_table(table="users", column=["*"], where=[])
users = sql.get_data_from_table(table="users", column=["*"], where=[])
2. Limit Result Sets
all_users = sql.get_data_from_table(table="users", column=["*"])
recent_users = sql.get_data_from_table(
table="users",
column=["*"],
where=["created_at > DATE_SUB(NOW(), INTERVAL 7 DAY)"]
)
3. Use Specific Columns
users = sql.get_data_from_table(table="users", column=["*"])
users = sql.get_data_from_table(
table="users",
column=["id", "username"]
)
4. Batch Operations
for user in users:
sql.insert_data_into_table(table="users", data=[user], column=["username"])
sql.insert_many_data_into_table(
table="users",
data=[[u] for u in users],
column=["username"]
)
Testing Strategies
Mocking SQL
import unittest
from unittest.mock import Mock, patch
from sql import SQL
class TestEndpoint(unittest.TestCase):
def setUp(self):
self.mock_sql = Mock(spec=SQL)
self.mock_sql.get_data_from_table.return_value = [
{"id": 1, "username": "alice"}
]
RuntimeManager.set(SQL, self.mock_sql)
def test_get_users(self):
sql = RuntimeManager.get(SQL)
users = sql.get_data_from_table(table="users", column=["*"])
self.assertEqual(len(users), 1)
self.assertEqual(users[0]["username"], "alice")
Integration Testing
def test_sql_integration():
sql = SQL.create(
url="localhost",
port=3306,
username="test_user",
password="test_pass",
db_name="test_db"
)
result = sql.insert_data_into_table(
table="test_users",
data=["testuser"],
column=["username"]
)
assert result == sql.success
users = sql.get_data_from_table(
table="test_users",
column=["username"],
where=["username='testuser'"]
)
assert len(users) == 1
sql.delete_data_from_table(
table="test_users",
where=["username='testuser'"]
)
Configuration
Environment Variables
# Database connection
SQL_HOST=localhost
SQL_PORT=3306
SQL_USERNAME=Cat Feeder
SQL_PASSWORD=secure_password
SQL_DATABASE=Cat Feeder_db
# Connection pool
SQL_POOL_SIZE=10
SQL_POOL_TIMEOUT=30
SQL_MAX_OVERFLOW=5
# Caching
SQL_CACHE_ENABLED=true
SQL_CACHE_DEFAULT_TTL=300
Runtime Configuration
sql.disp.update_disp_debug(True)
sql.sql_cache_orchestrator.redis_cache.default_ttls["data"] = 600
Shutdown
Graceful Shutdown
class SQL:
def shutdown(self):
"""Cleanup connections and cache."""
try:
if self.sql_manage_connections:
self.sql_manage_connections.close_all()
if self.sql_cache_orchestrator:
self.sql_cache_orchestrator.redis_cache.client.save()
self.disp.log_info("SQL shutdown complete")
except Exception as e:
self.disp.log_error(f"Error during SQL shutdown: {e}")
Usage:
sql = RuntimeManager.get(SQL)
sql.shutdown()
Dependencies
This module depends on:
mysql-connector-python - MySQL driver
redis - Redis client (via redis module)
display_tty - Logging
core - FinalClass metaclass
Used by:
server.py - Initialized during server startup
- All endpoint handlers - Database access
boilerplates - Data validation
crons - Background tasks accessing database
Related Documentation