Cat Feeder  1.0.0
The Cat feeder project
Loading...
Searching...
No Matches
redis_instance.py
Go to the documentation of this file.
1r"""
2# +==== BEGIN CatFeeder =================+
3# LOGO:
4# ..............(..../\
5# ...............)..(.')
6# ..............(../..)
7# ...............\‍(__)|
8# Inspired by Joan Stark
9# source https://www.asciiart.eu/
10# animals/cats
11# /STOP
12# PROJECT: CatFeeder
13# FILE: redis_instance.py
14# CREATION DATE: 11-10-2025
15# LAST Modified: 14:51:35 19-12-2025
16# DESCRIPTION:
17# This is the backend server in charge of making the actual website work.
18# /STOP
19# COPYRIGHT: (c) Cat Feeder
20# PURPOSE: The file in charge of handling the redis connection as well as cache.
21# // AR
22# +==== END CatFeeder =================+
23"""
24
25import os
26import json
27import hashlib
28from typing import Any, Callable, Dict, List, Optional, Union, cast
29from platform import system
30
31import redis
32
33from display_tty import Disp, initialise_logger
34
35from .redis_args import RedisArgs
36from . import redis_constants as CONST
37
38
39def _build_redis_client() -> redis.Redis:
40 """Create and return a configured Redis client.
41
42 This function does not connect immediately; the client will establish
43 a connection upon first command execution.
44
45 Returns:
46 redis.Redis: Configured Redis client instance.
47 """
48 socket_path = os.getenv(CONST.REDIS_SOCKET_KEY, CONST.REDIS_SOCKET_DEFAULT)
49 password = os.getenv(CONST.REDIS_PASSWORD_KEY)
50 if system().lower() != "Windows":
51 unix_socket_path = socket_path
52 else:
53 unix_socket_path = None
54 node: redis.Redis = redis.Redis(
55 unix_socket_path=unix_socket_path,
56 password=password,
57 decode_responses=True
58 )
59 return node
60
61
63 """High-level Redis cache facade for SQL-related data.
64
65 Provides read-through caching, key namespacing, and targeted invalidation for database metadata (version, schema objects), table data, and row counts. The layer is intentionally thin: actual SQL execution is delegated to caller‑supplied callables (``fetcher``/``writer``) so this class remains decoupled from any specific database driver.
66
67 Design Highlights:
68 - JSON Serialization: All cached values are stored as JSON for portability and human inspection. Tuples are normalized to lists.
69 - Deterministic Keys: Keys follow the pattern ``{namespace}:{db_label}:{category}:{name}[:{param_hash}]``. The optional hash is a stable SHA‑256 prefix over normalized parameters ensuring low collision probability while keeping keys concise.
70 - Lazy Client: A Redis client is created only on first access allowing import/use without forcing an immediate socket connection.
71 - Safe Invalidation: Category/table/trigger specific invalidation methods use ``SCAN`` + ``DELETE`` rather than broad ``FLUSHDB`` calls.
72 - Error Sentinel Respect: Methods can be given an ``error_token`` so failed results are not cached (preventing sticky error states).
73
74 Typical Read Usage:
75 cache = RedisCaching()
76 rows = cache.get_data_from_table(
77 table="users",
78 column=["id", "name"],
79 where=["active=1"],
80 beautify=True,
81 fetcher=lambda: sql.get_data_from_table("users", ["id", "name"], ["active=1"], True),
82 ttl_seconds=cache.default_ttls["data"],
83 error_token=84,
84 )
85
86 Typical Write Usage:
87 result = cache.update_data_in_table(
88 table="users",
89 data=["42", "new_name"],
90 column=["id", "name"],
91 where="id = 42",
92 writer=lambda t, d, c, w: sql.update_data_in_table(t, d, c, w)
93 ) # On success, relevant keys are invalidated automatically.
94
95 Attributes:
96 namespace (str): Top‑level logical partition for all keys (default ``"sql"``).
97 db_label (str): Database label inserted into every key for multi‑DB isolation.
98 default_ttls (dict[str,int]): Per category TTL defaults (``version``, ``schema``, ``data``, ``count``).
99 client (redis.Redis | RedisArgs | None): Underlying Redis client or lazy args container.
100 disp (Disp): Logger used for debug and diagnostic messages.
101 """
102 existing_instance: Optional["RedisCaching"] = None
103 disp: Disp = initialise_logger(__qualname__, False)
104
105 def __new__(cls, *args, existing_instance: Optional["RedisCaching"] = None, **kwargs):
106 """Return a new or previously existing instance.
107
108 If ``existing_instance`` is provided and is an instance of ``cls`` (the
109 exact class being constructed, potentially a subclass), it is returned
110 as-is and ``__init__`` will not run again. Otherwise, a fresh instance
111 of ``cls`` is allocated normally.
112
113 Args:
114 existing_instance (RedisCaching | None): An existing instance to reuse.
115
116 Returns:
117 RedisCaching: Either the supplied ``existing_instance`` or a freshly allocated object.
118 """
119 if existing_instance is not None and isinstance(existing_instance, cls):
120 return existing_instance
121 return super().__new__(cls)
122
124 self,
125 client: Optional[Union[redis.Redis, RedisArgs]] = None,
126 debug: bool = False,
127 *,
128 namespace: str = "sql",
129 db_label: Optional[str] = None,
130 default_ttls: Optional[Dict[str, int]] = None,
131 existing_instance: Optional["RedisCaching"] = None,
132 ) -> None:
133 """Initialize the Redis caching layer.
134
135 Args:
136 client (redis.Redis | None): Optional Redis client. When omitted, a client is built from environment variables.
137 namespace (str): Top-level namespace used to prefix keys.
138 db_label (str | None): Optional database label to isolate keys per DB. Defaults to the ``DB_NAME`` environment variable or ``"default"``.
139 default_ttls (dict[str, int] | None): Optional TTLs in seconds per category (``version``, ``schema``, ``data``, ``count``).
140 existing_instance (RedisCaching | None): When provided, indicates this __init__ call is for a reused instance and initialization should be skipped.
141 """
142 # ------------------------ The logging function ------------------------
143 self.disp.update_disp_debug(debug)
144 self.disp.log_debug("Initialising...")
145 # -------------------------- Inherited values --------------------------
146 # Only initialize if this is NOT a reused instance
147 if existing_instance is not None:
148 return
149 # Redis client (constructed lazily if not provided)
150 self._client_factory: Callable[[], redis.Redis] = _build_redis_client
151 if client is not None:
152 self.clientclient: Optional[Union[redis.Redis, RedisArgs]] = client
153 else:
154 self.clientclient = None
155
156 # Namespace and DB label (used to isolate keys per application/database)
157 ns_candidate = namespace.strip()
158 if ns_candidate:
159 self.namespace: str = ns_candidate
160 else:
161 self.namespace = "sql"
162
163 label_candidate: Optional[str]
164 if db_label is not None and db_label.strip():
165 label_candidate = db_label
166 else:
167 env_label = os.getenv("DB_NAME")
168 if env_label is not None and env_label.strip():
169 label_candidate = env_label
170 else:
171 label_candidate = "default"
172 self.db_label = label_candidate.strip()
173
174 # Default per-category TTLs (seconds)
175 self.default_ttls: Dict[str, int] = {
176 "version": CONST.HOUR_24, # 24h
177 "schema": CONST.HOUR_1, # 1h
178 "data": CONST.MIN_2, # 2min
179 "count": CONST.MIN_1, # 1min
180 }
181 if default_ttls:
182 for k, v in default_ttls.items():
183 if v is not None:
184 self.default_ttls[k] = int(v)
185
186 # Summarize initialization
187 client_kind: str = "None"
188 if self.clientclient is None:
189 client_kind = "None"
190 if isinstance(self.clientclient, RedisArgs):
191 client_kind = "RedisArgs"
192 else:
193 client_kind = "Redis"
194 self.disp.log_debug(
195 f"Initialized RedisCaching namespace='{self.namespace}', db_label='{self.db_label}', client={client_kind}"
196 )
197 self.disp.log_debug("Initialised")
198
199 # ------------------------------------------------------------------
200 # Generic helpers
201 # ------------------------------------------------------------------
202 def _stable_dump(self, obj: Any) -> str:
203 """Return a stable JSON string for ``obj`` used in key hashing.
204
205 Tuples are converted to lists for JSON compatibility; sets are
206 converted to sorted lists. Dicts are sorted by key.
207
208 Args:
209 obj (Any): Any JSON-serializable Python object.
210
211 Returns:
212 str: A deterministically ordered JSON string.
213 """
214 def normalize(x: Any) -> Any:
215 if isinstance(x, tuple):
216 out_list: List[Any] = []
217 for v in x:
218 out_list.append(normalize(v))
219 return out_list
220 if isinstance(x, set):
221 tmp: List[Any] = []
222 for v in x:
223 tmp.append(normalize(v))
224 tmp.sort(key=lambda i: json.dumps(
225 i, sort_keys=True, ensure_ascii=False))
226 return tmp
227 if isinstance(x, list):
228 out_list2: List[Any] = []
229 for v in x:
230 out_list2.append(normalize(v))
231 return out_list2
232 if isinstance(x, dict):
233 out_dict: Dict[str, Any] = {}
234 for k in sorted(x.keys()):
235 out_dict[k] = normalize(x[k])
236 return out_dict
237 return x
238
239 return json.dumps(normalize(obj), separators=(",", ":"), ensure_ascii=False)
240
241 def _hash_params(self, params: Any) -> str:
242 """Hash arbitrary parameters deterministically for compact keys.
243
244 Args:
245 params (Any): Parameters to be represented in the cache key.
246
247 Returns:
248 str: A short SHA-256 hex digest prefix.
249 """
250 dumped = self._stable_dump(params)
251 digest = hashlib.sha256(dumped.encode("utf-8")).hexdigest()[:20]
252 self.disp.log_debug(f"Computed params hash='{digest}'")
253 return digest
254
255 def _key(self, category: str, name: str, params: Optional[Any] = None) -> str:
256 """Build a namespaced Redis key.
257
258 Key format: ``{namespace}:{db_label}:{category}:{name}[:{hash}]``
259
260 Args:
261 category (str): Logical grouping (e.g., ``schema``, ``data``).
262 name (str): Base key name within the category.
263 params (Any | None): Optional parameters to hash for uniqueness.
264
265 Returns:
266 str: Fully-qualified Redis key.
267 """
268 base = f"{self.namespace}:{self.db_label}:{category}:{name}"
269 if params is None:
270 self.disp.log_debug(f"Key generated='{base}' (no params)")
271 return base
272 key = f"{base}:{self._hash_params(params)}"
273 self.disp.log_debug(f"Key generated='{key}' (with params)")
274 return key
275
276 def _serialize(self, value: Any) -> str:
277 """Serialize a Python value to a JSON string.
278
279 Tuples are converted to lists; on read we may convert back where needed.
280
281 Args:
282 value (Any): Value to serialize.
283
284 Returns:
285 str: JSON string representation.
286 """
287 def normalize(x: Any) -> Any:
288 if isinstance(x, tuple):
289 out_list: List[Any] = []
290 for v in x:
291 out_list.append(normalize(v))
292 return out_list
293 if isinstance(x, list):
294 out_list2: List[Any] = []
295 for v in x:
296 out_list2.append(normalize(v))
297 return out_list2
298 if isinstance(x, dict):
299 out_dict: Dict[str, Any] = {}
300 for k, v in x.items():
301 out_dict[k] = normalize(v)
302 return out_dict
303 return x
304
305 payload = json.dumps(normalize(value), ensure_ascii=False)
306 self.disp.log_debug(f"Serialized payload length={len(payload)}")
307 return payload
308
309 def _deserialize(self, raw: Optional[str]) -> Any:
310 """Deserialize a JSON string back to Python.
311
312 Args:
313 raw (str | None): Raw JSON string from Redis or ``None``.
314
315 Returns:
316 Any: Deserialized Python value, or ``None`` if input is ``None``.
317 """
318 if raw is None:
319 self.disp.log_debug(
320 "Deserialize called with raw=None (cache miss)")
321 return None
322 try:
323 value = json.loads(raw)
324 self.disp.log_debug("Deserialized payload successfully")
325 return value
326 except json.JSONDecodeError as e:
327 self.disp.log_warning(
328 f"Failed to deserialize payload: {e}"
329 )
330 raise
331
332 def _get_cached(self, key: str) -> Any:
333 """Get and deserialize a cached value for ``key``.
334
335 Args:
336 key (str): Redis key to fetch.
337
338 Returns:
339 Any: Deserialized cached value, or ``None`` if not present.
340 """
341 self.disp.log_debug(f"GET key='{key}'")
342 raw = cast(Optional[str], self._ensure_client().get(key))
343 val = self._deserialize(raw)
344 if val is None:
345 self.disp.log_debug("Cache miss")
346 else:
347 self.disp.log_debug("Cache hit")
348 return val
349
350 def _set_cached(self, key: str, value: Any, ttl_seconds: int) -> None:
351 """Serialize and set a value under ``key`` with TTL.
352
353 Uses ``SETEX`` when ``ttl_seconds > 0``; otherwise uses ``SET``.
354
355 Args:
356 key (str): Redis key.
357 value (Any): Python value to serialize and store.
358 ttl_seconds (int): Time to live in seconds; 0 or negative disables TTL.
359 """
360 payload = self._serialize(value)
361 if ttl_seconds > 0:
362 self.disp.log_debug(f"SETEX key='{key}', ttl={ttl_seconds}s")
363 self._ensure_client().setex(key, ttl_seconds, payload)
364 else:
365 self.disp.log_debug(f"SET key='{key}' (no ttl)")
366 self._ensure_client().set(key, payload)
367
368 def _should_cache_union_result(self, result: Any, error_token: Optional[int]) -> bool:
369 """Decide whether to cache a union-typed result.
370
371 When ``error_token`` is provided, skip caching if result equals it.
372 Otherwise, cache the value (including ints such as row counts).
373
374 Args:
375 result (Any): Value returned by the fetcher.
376 error_token (int | None): Error sentinel value to avoid caching.
377
378 Returns:
379 bool: ``True`` if the value should be cached, else ``False``.
380 """
381 if error_token is None:
382 self.disp.log_debug("No error_token provided; will cache result")
383 return True
384 decision = result != error_token
385 self.disp.log_debug(
386 f"Error token present; will_cache={decision} (result == error_token? {not decision})"
387 )
388 return decision
389
390 def _invalidate_patterns(self, patterns: List[str]) -> None:
391 """Delete all keys matching the provided patterns.
392
393 Args:
394 patterns (list[str]): List of glob-style patterns for ``SCAN``.
395 """
396 for p in patterns:
397 deleted = 0
398 self.disp.log_debug(f"Invalidating keys with pattern='{p}'")
399 for key in self._ensure_client().scan_iter(p):
400 self._ensure_client().delete(key)
401 deleted += 1
402 self.disp.log_debug(
403 f"Invalidation pattern='{p}' deleted={deleted} keys")
404
405 def _ensure_client(self) -> redis.Redis:
406 """Return a Redis client, constructing it on first use.
407
408 Returns:
409 redis.Redis: Active Redis client instance.
410 """
411 if self.clientclient is None:
412 self.disp.log_debug("Constructing Redis client via factory")
413 self.clientclient = self._client_factory()
414 elif isinstance(self.clientclient, RedisArgs):
415 self.disp.log_debug(
416 "Building Redis client from RedisArgs configuration")
417 tmp = redis.Redis(
418 host=self.clientclient.host,
419 port=self.clientclient.port,
420 db=self.clientclient.db,
421 password=self.clientclient.password,
422 socket_timeout=self.clientclient.socket_timeout,
423 socket_connect_timeout=self.clientclient.socket_connect_timeout,
424 socket_keepalive=self.clientclient.socket_keepalive,
425 socket_keepalive_options=self.clientclient.socket_keepalive_options,
426 connection_pool=self.clientclient.connection_pool,
427 unix_socket_path=self.clientclient.unix_socket_path,
428 encoding=self.clientclient.encoding,
429 encoding_errors=self.clientclient.encoding_errors,
430 decode_responses=self.clientclient.decode_responses,
431 retry_on_timeout=self.clientclient.retry_on_timeout,
432 retry=self.clientclient.retry,
433 retry_on_error=self.clientclient.retry_on_error,
434 ssl=self.clientclient.ssl,
435 ssl_keyfile=self.clientclient.ssl_keyfile,
436 ssl_certfile=self.clientclient.ssl_certfile,
437 ssl_cert_reqs=self.clientclient.ssl_cert_reqs,
438 ssl_include_verify_flags=self.clientclient.ssl_include_verify_flags,
439 ssl_exclude_verify_flags=self.clientclient.ssl_exclude_verify_flags,
440 ssl_ca_certs=self.clientclient.ssl_ca_certs,
441 ssl_ca_path=self.clientclient.ssl_ca_path,
442 ssl_ca_data=self.clientclient.ssl_ca_data,
443 ssl_check_hostname=self.clientclient.ssl_check_hostname,
444 ssl_password=self.clientclient.ssl_password,
445 ssl_validate_ocsp=self.clientclient.ssl_validate_ocsp,
446 ssl_validate_ocsp_stapled=self.clientclient.ssl_validate_ocsp_stapled,
447 ssl_ocsp_context=self.clientclient.ssl_ocsp_context,
448 ssl_ocsp_expected_cert=self.clientclient.ssl_ocsp_expected_cert,
449 ssl_min_version=self.clientclient.ssl_min_version,
450 ssl_ciphers=self.clientclient.ssl_ciphers,
451 max_connections=self.clientclient.max_connections,
452 single_connection_client=self.clientclient.single_connection_client,
453 health_check_interval=self.clientclient.health_check_interval,
454 client_name=self.clientclient.client_name,
455 lib_name=self.clientclient.lib_name,
456 lib_version=self.clientclient.lib_version,
457 username=self.clientclient.username,
458 redis_connect_func=self.clientclient.redis_connect_func,
459 credential_provider=self.clientclient.credential_provider,
460 protocol=self.clientclient.protocol,
461 cache=self.clientclient.cache,
462 cache_config=self.clientclient.cache_config,
463 event_dispatcher=self.clientclient.event_dispatcher,
464 maint_notifications_config=self.clientclient.maint_notifications_config
465 )
466 self.clientclient = tmp
467 else:
468 self.disp.log_debug("Using provided Redis client instance")
469 return self.clientclient
470
471 def _normalize_selector(self, selector: Union[str, List[str]]) -> Any:
472 """Return selector as-is, normalizing list vs string types.
473
474 Args:
475 selector (str | list[str]): Column list or ``"*"``.
476
477 Returns:
478 Any: The selector unchanged, used for hashing and display.
479 """
480 if isinstance(selector, list):
481 self.disp.log_debug("Normalizing selector: list")
482 return selector
483 self.disp.log_debug("Normalizing selector: str")
484 return selector
485
486 # ------------------------------------------------------------------
487 # Public invalidation helpers
488 # ------------------------------------------------------------------
489 def invalidate_all(self) -> None:
490 """Remove all keys for the current namespace and DB label."""
491 prefix = f"{self.namespace}:{self.db_label}:*"
492 self.disp.log_debug("Invalidating all keys for namespace/db_label")
493 self._invalidate_patterns([prefix])
494
495 def invalidate_schema(self) -> None:
496 """Remove all schema-related keys (tables, columns, triggers, descriptions)."""
497 base = f"{self.namespace}:{self.db_label}:schema:*"
498 self.disp.log_debug("Invalidating all schema keys")
499 self._invalidate_patterns([base])
500
501 def invalidate_table(self, table: str) -> None:
502 """Remove all cache entries related to a specific table.
503
504 Args:
505 table (str): Table name.
506 """
507 ns = f"{self.namespace}:{self.db_label}"
508 self.disp.log_debug(f"Invalidating caches for table='{table}'")
509 patterns = [
510 f"{ns}:data:{table}:*",
511 f"{ns}:count:{table}:*",
512 f"{ns}:schema:columns:{table}:*",
513 f"{ns}:schema:describe:{table}:*",
514 ]
515 self._invalidate_patterns(patterns)
516
517 def invalidate_trigger(self, trigger_name: Optional[str] = None) -> None:
518 """Remove cache entries related to triggers.
519
520 Args:
521 trigger_name (str | None): Specific trigger to clean. When omitted,
522 all trigger-related entries are removed.
523 """
524 ns = f"{self.namespace}:{self.db_label}"
525 if trigger_name:
526 self.disp.log_debug(
527 f"Invalidating caches for trigger='{trigger_name}'")
529 [f"{ns}:schema:trigger:{trigger_name}:*"])
530 else:
531 self.disp.log_debug("Invalidating all trigger-related caches")
533 f"{ns}:schema:trigger:*",
534 f"{ns}:schema:triggers",
535 f"{ns}:schema:trigger_names",
536 ])
str _key(self, str category, str name, Optional[Any] params=None)
None _invalidate_patterns(self, List[str] patterns)
Any _normalize_selector(self, Union[str, List[str]] selector)
None _set_cached(self, str key, Any value, int ttl_seconds)
__new__(cls, *args, Optional["RedisCaching"] existing_instance=None, **kwargs)
None invalidate_trigger(self, Optional[str] trigger_name=None)
Optional[Union[redis.Redis, RedisArgs]] client
bool _should_cache_union_result(self, Any result, Optional[int] error_token)
None __init__(self, Optional[Union[redis.Redis, RedisArgs]] client=None, bool debug=False, *, str namespace="sql", Optional[str] db_label=None, Optional[Dict[str, int]] default_ttls=None, Optional["RedisCaching"] existing_instance=None)