TTY OV  1
A cross platform python terminal
Loading...
Searching...
No Matches
polyguard.py
Go to the documentation of this file.
1"""
2# +==== BEGIN polyguard =================+
3# LOGO:
4# input
5#
6# @#$%! hello
7# | |
8# +--+--+
9# |
10# v
11# +------------+
12# | POLY GUARD |
13# +------------+
14# | |
15# v v
16# BLOCKED PASSED
17# KO OK
18# /STOP
19# PROJECT: polyguard
20# FILE: polyguard.py
21# CREATION DATE: 13-03-2026
22# LAST Modified: 3:36:1 22-03-2026
23# DESCRIPTION:
24# A module that provides a set of swearwords to listen to when filtering while allowing to toggle on and off different languages.
25# /STOP
26# COPYRIGHT: (c) Henry Letellier
27# PURPOSE: This is the entry file of the module.
28# // AR
29# +==== END polyguard =================+
30"""
31
32import sys
33from typing import Any, Optional, List, Set, Dict
34from threading import Lock
35from collections import OrderedDict
36
37import sqlite3
38from display_tty import Disp, initialise_logger
39from warnings import warn
40
41from . import constants as POLY_CONST
42from .sqlite_handler import SQLiteHandler
43
44
46 """Singleton profanity filter with multilingual support and LRU caching.
47
48 Manages a persistent connection to an SQLite database of language-specific
49 word lists. Provides thread-safe word detection with per-language caching
50 to optimize repeated lookups. Supports configurable language subsets and
51 can check single words or phrases.
52
53 Note:
54 This class uses the singleton pattern. Multiple instantiations return
55 the same instance.
56 """
57
58 _instance: Optional["PolyGuard"] = None
59 _class_lock: Lock = Lock()
60 disp: Disp = initialise_logger(__qualname__, False)
61
62 def __new__(cls, *args, **kwargs) -> "PolyGuard":
63 """Create or return singleton instance.
64
65 Returns:
66 PolyGuard: The singleton instance.
67 """
68 with cls._class_lock:
69 if cls._instance is None:
70 cls._instance = super().__new__(cls)
71 return cls._instance
72
73 def __init__(self, langs: POLY_CONST.LangConfig, db_path: Optional[str] = None, success: int = 0, error: int = 1, log: bool = True, debug: bool = False) -> None:
74 """Initialize the PolyGuard instance.
75
76 On first call, attempts to establish a persistent database connection.
77 If the connection fails, the instance will attempt to reconnect on demand.
78
79 Args:
80 langs (LangConfig): LangConfig instance specifying which languages to check.
81 db_path (Optional[str]): Path to the SQLite database. Default: None (package default).
82 success (int): Exit code for successful initialization. Default: 0.
83 error (int): Exit code for failures. Default: 1.
84 log (bool): Enable logging output. Default: True.
85 debug (bool): Enable debug-level logging. Default: False.
86 """
87 # Lock instance to prevent racing calls
88 self._function_lock: Lock = Lock()
89 # Inherited calls
90 self.success = success
91 self.error = error
92 self.log = log
93 self.debug = debug
94 self.default_choice: POLY_CONST.LangConfig = langs
95 # Determine DB path: use provided override or fall back to package default
96 if db_path is None:
97 self.db_path = POLY_CONST.DEFAULT_DB_PATH
98 else:
99 self.db_path = db_path
100
101 # Lazy SQLite handler; do not connect automatically.
102 self.sqlitesqlite: Optional[SQLiteHandler] = None
103 # Indicates whether the configured DB was successfully probed.
104 self._db_ready: bool = False
105 self.disp.update_disp_debug(debug=debug)
106 # LRU cache for loaded languages -> set(words)
107 self._cache_limit: int = int(POLY_CONST.DEFAULT_CACHE_MAX_LANGS)
108 self._lang_cache: "OrderedDict[POLY_CONST.Langs, set]" = OrderedDict()
109
110 self.disp.log_info(
111 f"PolyGuard initialised; db_path={self.db_path}; cache_limit={self._cache_limit}")
112 # Try to establish a persistent connection now (middleware)
113 if not self.ensure_connection():
114 self.disp.log_warning(
115 "Initial DB connection failed; will attempt on demand")
116
117 def __call__(self, *args: Any, **kwds: Any) -> int:
118 """Callable interface. Delegates to main().
119
120 Returns:
121 int: Result code from main() (0 for success, non-zero for error).
122 """
123 return self.main()
124
125 def _sanify_word(self, word: str) -> Optional[str]:
126 """Sanitize and normalize input word for processing.
127
128 Strips whitespace, converts to lowercase, and validates non-empty.
129
130 Args:
131 word (str): Raw input word or phrase to sanitize.
132
133 Returns:
134 Optional[str]: Lowercased, stripped word, or None if empty/invalid.
135 """
136 # Quick sanity checks
137 if word is None:
138 return None
139
140 text = word.strip()
141
142 if not text:
143 return None
144
145 # If the input contains whitespace, check each token individually
146 text_low = text.lower()
147 return text_low
148
149 def _ensure_initialized(self) -> bool:
150 """Ensure database is initialized on first use."""
151 if not self._db_ready:
152 return self.main() == self.success
153 return True
154
155 def _determine_language_set(self, language: Optional[POLY_CONST.LangConfig]) -> POLY_CONST.LangConfig:
156 """Resolve language configuration, falling back to default if needed.
157
158 Args:
159 language (Optional[LangConfig]): Language config override. Default: None.
160
161 Returns:
162 LangConfig: Provided language config or default instance config.
163 """
164 if language is None:
165 return self.default_choice
166 return language
167
168 def _tokenify(self, text: str) -> List[str]:
169 """Tokenize text by splitting on whitespace after removing delimiters.
170
171 Uses pre-computed translation table for fast processing. Employs CPython's
172 optimized .split() fast-path (any-whitespace split with empty filtering).
173
174 Args:
175 text (str): Text to tokenize (assumed already lowercased).
176
177 Returns:
178 List[str]: List of non-empty token strings.
179 """
180 split_data = text.translate(POLY_CONST.TOKENISER_TABLE).split()
181 return split_data
182
183 def extract_swearword_if_present(self, word: str, *, languages_to_check: Optional[POLY_CONST.LangConfig] = None) -> Optional[str]:
184 """Extract first profanity match from word or phrase.
185
186 Tokenizes input and checks each token against enabled language word lists.
187 Returns immediately on first match for efficiency.
188
189 Args:
190 word (str): The word or phrase to check.
191 languages_to_check (Optional[LangConfig]): Language config override. Default: None.
192
193 Returns:
194 Optional[str]: First matching swearword token found, or None if none detected.
195 """
196 initialised = self._ensure_initialized()
197 if not initialised:
198 self.disp.log_error("Initial caching failed, retuning early")
199 return None
200 word_san = self._sanify_word(word)
201 if word_san is None:
202 return None
203 languages = self._determine_language_set(languages_to_check)
204 tokens = self._tokenify(word_san)
205 for tok in tokens:
206 if self._check_token(tok, languages):
207 return tok
208 return None
209
210 def is_a_swearword(self, word: str, *, languages_to_check: Optional[POLY_CONST.LangConfig] = None) -> bool:
211 """Check if a word or phrase contains profanity.
212
213 Checks individual tokens in phrases and the full phrase itself.
214 Uses per-language LRU cache to optimize repeated lookups.
215
216 Args:
217 word (str): The word or phrase to check (whitespace-stripped).
218 languages_to_check (Optional[LangConfig]): Language config override. Default: None.
219
220 Returns:
221 bool: True if any enabled language contains the word, False otherwise.
222 """
223 self.disp.log_debug(f"is_a_swearword called with word={word!r}")
224 initialised = self._ensure_initialized()
225 if not initialised:
226 self.disp.log_error("Initial caching failed, retuning early")
227 return False
228 word_san = self._sanify_word(word)
229 if word_san is None:
230 return False
231
232 languages = self._determine_language_set(languages_to_check)
233 tokens = self._tokenify(word_san)
234 for tok in tokens:
235 if self._check_token(tok, languages):
236 return True
237 return False
238
239 def get_list_of_swearwords(self, *, languages: Optional[POLY_CONST.LangConfig] = None) -> Dict[str, Set]:
240 """Retrieve all swearwords for enabled languages.
241
242 Returns cached word sets if loaded, otherwise queries database.
243 Useful for inspection, testing, or bulk operations.
244
245 Args:
246 languages (Optional[LangConfig]): Language config override. Default: None.
247
248 Returns:
249 Dict[str, Set]: Dictionary mapping language names to sets of profanity words.
250 Empty dict if database connection unavailable.
251 """
252 final = {}
253 language_check = self._determine_language_set(languages)
254 if not self.ensure_connection() or not self.sqlitesqlite:
255 self.disp.log_error(
256 "No DB connection available; aborting check"
257 )
258 return final
259 for lang in POLY_CONST.Langs:
260 lang_state = getattr(language_check, lang.value, None)
261 if lang_state is None:
262 self.disp.log_warning(f"{lang.value} is defined but not set")
263 continue
264 if lang_state is not None and lang_state is False:
265 self.disp.log_debug(f"{lang.value} is set to not be retrieved")
266 continue
267 cache_node = self._lang_cache.get(lang, None)
268 if cache_node is not None:
269 final[str(lang.name)] = cache_node
270 else:
271 final[str(lang.name)] = self.sqlitesqlite.get_words(lang)
272 return final
273
274 def _check_token(self, text_low: str, languages: POLY_CONST.LangConfig) -> bool:
275 """Check if a single token exists in any enabled language's word list.
276
277 Internal method that performs the actual word lookup using cache and
278 database queries. Token must already be lowercased.
279
280 Args:
281 text_low (str): Lowercased token to search for.
282 languages (LangConfig): LangConfig specifying which languages to query.
283
284 Returns:
285 bool: True if token found in any enabled language, False otherwise.
286
287 Raises:
288 RuntimeError: If database connection becomes unavailable mid-check.
289 """
290 # Build list of languages enabled in the provided config
291 to_check = []
292 for lang in POLY_CONST.Langs:
293 try:
294 if getattr(languages, lang.value):
295 to_check.append(lang)
296 except AttributeError:
297 continue
298
299 if not to_check:
300 return False
301
302 # First consult in-memory cache under short lock sections
303 missing = []
304 for lang in to_check:
305 with self._function_lock:
306 cached = self._lang_cache.get(lang)
307 if cached is not None:
308 # mark as recently used
309 try:
310 self._lang_cache.move_to_end(lang)
311 except (KeyError, AttributeError):
312 pass
313
314 if text_low in cached:
315 self.disp.log_debug(f"Cache hit for lang={lang.value}")
316 return True
317 else:
318 missing.append(lang)
319
320 if not missing:
321 return False
322
323 # Ensure persistent connection before DB access
324 if not self.ensure_connection() or not self.sqlitesqlite:
325 self.disp.log_error(
326 "No DB connection available; aborting check"
327 )
328 return False
329
330 try:
331 loaded = {}
332 for lang in missing:
333 # type: ignore[attr-defined]
334 words = self.sqlitesqlite.get_words(lang)
335 loaded[lang] = words
336
337 except (sqlite3.Error, RuntimeError) as exc: # pragma: no cover - defensive
338 self.disp.log_error(f"DB access failed in is_a_swearword: {exc}")
339 if self.log:
340 warn(f"PolyGuard DB access failed: {exc}")
341 return False
342
343 # Update cache under lock and test loaded sets
344 for lang, words in loaded.items():
345 with self._function_lock:
346 self._lang_cache[lang] = words # type: ignore[attr-defined]
347 try:
348 self._lang_cache.move_to_end(lang)
349 except (KeyError, AttributeError):
350 pass
351
352 # Enforce cache size limit
353 while len(self._lang_cache) > self._cache_limit:
354 try:
355 evicted_lang, _ = self._lang_cache.popitem(last=False)
356 self.disp.log_debug(
357 f"Evicted lang from cache: {evicted_lang.value}")
358 except (KeyError, IndexError):
359 break
360
361 if text_low in words:
362 self.disp.log_debug(
363 f"Match found after DB load for lang={lang.value}"
364 )
365 return True
366
367 return False
368
369 def main(self) -> int:
370 """Probe the database and preload enabled languages into cache.
371
372 Attempts to verify database accessibility, then preloads up to
373 cache_limit languages into memory for faster lookup.
374
375 Returns:
376 int: Success code (0) if DB ready, error code otherwise.
377 """
378 # Probe the configured DB to ensure it is accessible and usable.
379 self.disp.log_debug("main() called to probe DB")
380 # Ensure persistent connection
381 conn_status = self.ensure_connection()
382 if not conn_status or not self.sqlitesqlite:
383 self._db_ready = False # type: ignore[attr-defined]
384 self.disp.log_error("DB probe failed: cannot connect")
385 return self.error
386
387 try:
388 # Simple probe using the persistent handler
389 # type: ignore[attr-defined]
390 _ = self.sqlitesqlite.get_words(next(iter(POLY_CONST.Langs)))
391
392 # Optionally preload enabled languages into cache up to the cache limit
393 to_preload = []
394 for lang in POLY_CONST.Langs:
395 try:
396 if getattr(self.default_choice, lang.value):
397 to_preload.append(lang)
398 except AttributeError:
399 continue
400
401 loaded_count = 0
402 for lang in to_preload:
403 if loaded_count >= self._cache_limit:
404 break
405 # type: ignore[attr-defined]
406 words = self.sqlitesqlite.get_words(lang)
407 with self._function_lock:
408 self._lang_cache[lang] = words
409 try:
410 self._lang_cache.move_to_end(lang)
411 except (KeyError, AttributeError):
412 pass
413 loaded_count += 1
414
415 self._db_ready = True
416 self.disp.log_info("DB probe successful; ready")
417 return self.success
418
419 except (sqlite3.Error, RuntimeError) as exc: # pragma: no cover - defensive
420 self._db_ready = False
421 self.disp.log_error(f"DB probe failed: {exc}")
422 if self.log:
423 warn(f"PolyGuard failed to open DB '{self.db_path}': {exc}")
424
425 return self.error
426
427 def ensure_connection(self) -> bool:
428 """Ensure a persistent SQLiteHandler is created and connected.
429
430 Creates a new handler if needed or reconnects an existing one.
431 Cleans up stale connections gracefully.
432
433 Returns:
434 bool: True if connection is now open and usable, False otherwise.
435 """
436 with self._function_lock:
437 if self.sqlitesqlite is not None:
438 try:
439 self.sqlitesqlite.connect()
440 return True
441 except (sqlite3.Error, RuntimeError):
442 try:
443 self.sqlitesqlite.close()
444 except (sqlite3.Error, RuntimeError):
445 pass
446 self.sqlitesqlite = None
447
448 try:
449 handler = SQLiteHandler(
450 str(self.db_path), readonly=True, log=self.log)
451 handler.connect()
452 self.sqlitesqlite = handler
453 return True
454 except (sqlite3.Error, RuntimeError) as exc: # pragma: no cover - defensive
455 self.disp.log_error(f"ensure_connection failed: {exc}")
456 self.sqlitesqlite = None
457 return False
458
459
460if __name__ == "__main__":
461 CONF = POLY_CONST.LangConfig()
462 instance = PolyGuard(langs=CONF)
463 sys.exit(instance())
POLY_CONST.LangConfig _determine_language_set(self, Optional[POLY_CONST.LangConfig] language)
Definition polyguard.py:155
List[str] _tokenify(self, str text)
Definition polyguard.py:168
Dict[str, Set] get_list_of_swearwords(self, *, Optional[POLY_CONST.LangConfig] languages=None)
Definition polyguard.py:239
Optional[SQLiteHandler] sqlite
Definition polyguard.py:102
POLY_CONST.LangConfig default_choice
Definition polyguard.py:94
Optional[str] extract_swearword_if_present(self, str word, *, Optional[POLY_CONST.LangConfig] languages_to_check=None)
Definition polyguard.py:183
"PolyGuard" __new__(cls, *args, **kwargs)
Definition polyguard.py:62
int __call__(self, *Any args, **Any kwds)
Definition polyguard.py:117
bool is_a_swearword(self, str word, *, Optional[POLY_CONST.LangConfig] languages_to_check=None)
Definition polyguard.py:210
None __init__(self, POLY_CONST.LangConfig langs, Optional[str] db_path=None, int success=0, int error=1, bool log=True, bool debug=False)
Definition polyguard.py:73
Optional[str] _sanify_word(self, str word)
Definition polyguard.py:125
bool _check_token(self, str text_low, POLY_CONST.LangConfig languages)
Definition polyguard.py:274