Cat Feeder  1.0.0
The Cat feeder project
Loading...
Searching...
No Matches
crons.py
Go to the documentation of this file.
1r"""
2# +==== BEGIN CatFeeder =================+
3# LOGO:
4# ..............(..../\
5# ...............)..(.')
6# ..............(../..)
7# ...............\‍(__)|
8# Inspired by Joan Stark
9# source https://www.asciiart.eu/
10# animals/cats
11# /STOP
12# PROJECT: CatFeeder
13# FILE: crons.py
14# CREATION DATE: 11-10-2025
15# LAST Modified: 18:44:14 31-01-2026
16# DESCRIPTION:
17# This is the backend server in charge of making the actual website work.
18# /STOP
19# COPYRIGHT: (c) Cat Feeder
20# PURPOSE: File in charge of containing the functions that will be run in the background.
21# // AR
22# +==== END CatFeeder =================+
23"""
24
25from typing import Any, Union, List, Dict, Optional
26from datetime import datetime
27from display_tty import Disp, initialise_logger
28from .background_tasks import BackgroundTasks
29from . import crons_constants as CRON_CONST
30
31from ..core import FinalClass
32from ..core.runtime_manager import RuntimeManager, RI
33from ..utils import constants as CONST
34from ..utils.oauth_authentication import OAuthAuthentication
35from ..sql import SQL
36from ..boilerplates import BoilerplateNonHTTP
37
38
39class Crons(metaclass=FinalClass):
40 """_summary_
41 """
42
43 disp: Disp = initialise_logger(__qualname__, False)
44
45 def __init__(self, error: int = 84, success: int = 0, debug: bool = False) -> None:
46
47 # ------------------------ The logging function ------------------------
48 self.disp.update_disp_debug(debug)
49 self.disp.log_debug("Initialising...")
50 # -------------------------- Inherited values --------------------------
51 self.error: int = error
52 self.success: int = success
53 self.debug: bool = debug
54 self.runtime_manager: RuntimeManager = RI
55 # ------ The class in charge of handling tasks in the background ------
56 if not self.runtime_manager.exists(BackgroundTasks):
57 self.runtime_manager.set(
58 BackgroundTasks,
59 **{"success": self.success, "error": self.error, "debug": self.debug}
60 )
61 self.background_tasks: BackgroundTasks = self.runtime_manager.get(
62 BackgroundTasks)
63 # ---------------------- The Database connection ----------------------
64 self.database_link: SQL = self.runtime_manager.get(SQL)
65 # --------------------------- OAuth handler ---------------------------
66 self.oauth_authentication_initialisedoauth_authentication_initialised: Optional[OAuthAuthentication] = self.runtime_manager.get_if_exists(
67 OAuthAuthentication,
68 None
69 )
70 # ----------------- Regular server checking processes -----------------
71 self.boilerplate_non_http_initialised: Optional[BoilerplateNonHTTP] = self.runtime_manager.get_if_exists(
72 BoilerplateNonHTTP,
73 None
74 )
75 self.disp.log_debug("Initialised")
76
77 def __del__(self) -> None:
78 """_summary_
79 The destructor of the class
80 """
81 if self.disp:
82 self.disp.log_info("Cron sub processes are shutting down.")
83
84 def inject_crons(self) -> int:
85 """_summary_
86 Add the cron functions to the cron loop.
87
88 Returns:
89 int: _description_: The overall status of the injection.
90 """
91 if self.background_tasks is None:
92 return self.error
93 self.background_tasks.safe_add_task(
94 func=self.check_actions,
95 args=None,
96 trigger='interval',
97 seconds=CRON_CONST.CHECK_ACTIONS_INTERVAL
98 )
99 self.background_tasks.safe_add_task(
100 func=self.reset_food_counters,
101 args=None,
102 trigger='interval',
103 seconds=CRON_CONST.RESET_FOOD_COUNTERS_INTERVAL
104 )
105 if CRON_CONST.ENABLE_TEST_CRONS is True:
106 self.background_tasks.safe_add_task(
107 func=self._test_current_date,
108 args=(datetime.now,),
109 trigger='interval',
110 seconds=CRON_CONST.TEST_CRONS_INTERVAL
111 )
112 self.background_tasks.safe_add_task(
113 func=self._test_hello_world,
114 args=None,
115 trigger='interval',
116 seconds=CRON_CONST.TEST_CRONS_INTERVAL
117 )
118 if CRON_CONST.CLEAN_TOKENS is True:
119 self.background_tasks.safe_add_task(
120 func=self.clean_expired_tokens,
121 args=None,
122 trigger='interval',
123 seconds=CRON_CONST.CLEAN_TOKENS_INTERVAL
124 )
125 if CRON_CONST.CLEAN_VERIFICATION is True:
126 self.background_tasks.safe_add_task(
128 args=None,
129 trigger='interval',
130 seconds=CRON_CONST.CLEAN_VERIFICATION_INTERVAL
131 )
132 if CRON_CONST.RENEW_OATH_TOKENS is True:
133 self.background_tasks.safe_add_task(
134 func=self.renew_oaths,
135 args=None,
136 trigger='interval',
137 seconds=CRON_CONST.RENEW_OATH_TOKENS_INTERVAL
138 )
139 return self.success
140
141 def _test_hello_world(self) -> None:
142 """_summary_
143 This is a test function that will print "Hello World".
144 """
145 self.disp.log_info("Hello World", "_test_hello_world")
146
147 def _test_current_date(self, *args: Any) -> None:
148 """_summary_
149 This is a test function that will print the current date.
150 Args:
151 date (datetime): _description_
152 """
153 if len(args) >= 1:
154 date = args[0]
155 else:
156 date = datetime.now()
157 if callable(date):
158 self.disp.log_info(f"(Called) Current date: {date()}")
159 else:
160 self.disp.log_info(f"(Not called) Current date: {date}")
161
162 def reset_food_counters(self) -> None:
163 """_summary_
164 Reset the food counters for all feeders.
165 """
166 title = "reset_food_counters"
167 self.disp.log_info("Resetting food counters for all feeders", title)
168 feeders: Union[List[Dict[str, Any]], int] = self.database_link.get_data_from_table(
169 table=CONST.TAB_PET,
170 column="*",
171 where="",
172 beautify=True
173 )
174 if isinstance(feeders, int):
175 msg = f"There is no feeders to reset in {CONST.TAB_PET} table."
176 self.disp.log_warning(msg, title)
177 return
178 for feeder in feeders:
179 self.database_link.update_data_in_table(
180 table=CONST.TAB_PET,
181 data=[0],
182 column=["food_eaten"],
183 where=f"id='{feeder['id']}'"
184 )
185 self.disp.log_debug(
186 f"Reset food counter for feeder {feeder['id']}", title
187 )
188 self.disp.log_info("Reset food counters for all feeders", title)
189
190 def clean_expired_tokens(self) -> None:
191 """_summary_
192 Remove the tokens that have passed their lifespan.
193 """
194 title = "clean_expired_tokens"
195 date_node = "expiration_date"
196 current_time = datetime.now()
197 self.disp.log_info("Cleaning expired tokens", title)
198 current_tokens = self.database_link.get_data_from_table(
199 table=CONST.TAB_CONNECTIONS,
200 column="*",
201 where="",
202 beautify=True
203 )
204 if isinstance(current_tokens, int):
205 msg = "There is no data to be cleared in "
206 msg += f"{CONST.TAB_CONNECTIONS} table."
207 self.disp.log_warning(msg, title)
208 return
209 self.disp.log_debug(f"current tokens = {current_tokens}", title)
210 for i in current_tokens:
211 if i[date_node] is not None and i[date_node] != "" and isinstance(i[date_node], str) is True:
212 datetime_node = self.database_link.string_to_datetime(
213 i[date_node]
214 )
215 msg = f"Converted {i[date_node]} to a datetime instance"
216 msg += f" ({datetime_node})."
217 self.disp.log_debug(msg, title)
218 else:
219 datetime_node = i[date_node]
220 self.disp.log_debug(f"Did not convert {i[date_node]}.", title)
221 if datetime_node < current_time:
222 self.database_link.remove_data_from_table(
223 table=CONST.TAB_CONNECTIONS,
224 where=f"id='{i['id']}'"
225 )
226 self.disp.log_debug(f"Removed {i}.", title)
227 else:
228 self.disp.log_debug(
229 f"Did not remove {i} because it is not yet time.", title
230 )
231 self.disp.log_debug("Cleaned expired tokens", title)
232
234 """_summary_
235 Remove the nodes in the verification table that have passed their lifespan.
236 """
237 title = "clean_expired_verification_nodes"
238 date_node = "expiration"
239 current_time = datetime.now()
240 self.disp.log_info(
241 f"Cleaning expired lines in the {CONST.TAB_VERIFICATION} table.",
242 title
243 )
244 current_lines = self.database_link.get_data_from_table(
245 table=CONST.TAB_VERIFICATION,
246 column="*",
247 where="",
248 beautify=True
249 )
250 if isinstance(current_lines, int):
251 msg = "There is no data to be cleared in "
252 msg += f"{CONST.TAB_VERIFICATION} table."
253 self.disp.log_warning(
254 msg,
255 title
256 )
257 return
258 self.disp.log_debug(f"current lines = {current_lines}", title)
259 for i in current_lines:
260 if i[date_node] is not None and i[date_node] != "" and isinstance(i[date_node], str) is True:
261 datetime_node = self.database_link.string_to_datetime(
262 i[date_node]
263 )
264 msg = f"Converted {i[date_node]} to a datetime instance"
265 msg += f" ({datetime_node})."
266 self.disp.log_debug(msg, title)
267 else:
268 datetime_node = i[date_node]
269 self.disp.log_debug(f"Did not convert {i[date_node]}.", title)
270 if datetime_node < current_time:
271 self.database_link.remove_data_from_table(
272 table=CONST.TAB_VERIFICATION,
273 where=f"id='{i['id']}'"
274 )
275 self.disp.log_debug(f"Removed {i}.", title)
276 self.disp.log_debug("Cleaned expired lines", title)
277
278 def renew_oaths(self) -> None:
279 """_summary_
280 Function in charge of renewing the oath tokens that are about to expire.
281 """
282 title = "renew_oaths"
283 self.disp.log_debug(
284 "Checking for oaths that need to be renewed", title
285 )
286 self.boilerplate_non_http_initialised = self.runtime_manager.get_if_exists(
287 BoilerplateNonHTTP,
289 )
291 self.disp.log_error(
292 "Boilerplate Non Http class not present, aborting check."
293 )
294 return None
296 OAuthAuthentication,
298 )
300 self.disp.log_error(
301 "Oauth Authentication class not present, aborting check."
302 )
303 return None
304 oath_connections: Union[List[Dict[str, Any]], int] = self.database_link.get_data_from_table(
305 table=CONST.TAB_ACTIVE_OAUTHS,
306 column="*",
307 where="",
308 beautify=True
309 )
310 if isinstance(oath_connections, int) or len(oath_connections) == 0:
311 return
312 current_time: datetime = datetime.now()
313 for oath in oath_connections:
314 if oath["token_lifespan"] == 0:
315 self.disp.log_debug(
316 f"Token for {oath['id']} does not need to be renewed.", title
317 )
318 continue
319 node_id: str = oath['id']
320 expiration_raw = oath["token_expiration"]
321 if isinstance(expiration_raw, datetime):
322 token_expiration: datetime = expiration_raw
323 else:
324 token_expiration = self.database_link.string_to_datetime(
325 expiration_raw, False
326 )
327 if current_time > token_expiration:
328 renew_link: str = oath["refresh_link"]
329 lifespan: int = int(oath["token_lifespan"])
330 provider_name: Union[int, List[Dict[str, Any]]] = self.database_link.get_data_from_table(
331 table=CONST.TAB_SERVICES,
332 column="*",
333 where=f"id='{oath['service_id']}'",
334 beautify=True
335 )
336 if isinstance(provider_name, int):
337 self.disp.log_error(
338 f"Could not find provider name for {node_id}", title
339 )
340 continue
341 # narrowed type: List[Dict[str, Any]]
342 provider_list = provider_name
343 if not provider_list:
344 self.disp.log_error(
345 f"Empty provider list for {node_id}", title
346 )
347 continue
348 provider_node = provider_list[0]
349 new_token: Union[str, None] = self.oauth_authentication_initialisedoauth_authentication_initialised.refresh_token(
350 provider_node.get('name', ''),
351 renew_link
352 )
353 if new_token is None:
354 self.disp.log_error(
355 "Refresh token failed to generate a new token.", title)
356 continue
357 # Produce new expiration string for database separately; do not overwrite datetime variable.
358 new_expiration_dt = self.boilerplate_non_http_initialised.set_lifespan(
359 seconds=lifespan
360 )
361 token_expiration_str: str = self.database_link.datetime_to_string(
362 datetime_instance=new_expiration_dt,
363 date_only=False,
364 sql_mode=True
365 )
366 self.disp.log_debug(
367 f"token expiration = {token_expiration_str}", title
368 )
369 if new_token != "":
370 self.database_link.update_data_in_table(
371 table=CONST.TAB_ACTIVE_OAUTHS,
372 data=[
373 new_token,
374 token_expiration_str
375 ],
376 column=[
377 "token",
378 "token_expiration"
379 ],
380 where=f"id='{node_id}'"
381 )
382 self.disp.log_debug(
383 f"token {new_token} updated for {node_id}"
384 )
385 else:
386 self.disp.log_error(f"Could not renew token for {node_id}")
387 else:
388 self.disp.log_debug(
389 f"Token for {node_id} does not need to be renewed.", title
390 )
391 self.disp.log_debug("Checked for oath that need to be renewed", title)
392
393 def check_actions(self) -> None:
394 """_summary_
395 Function in charge of checking if any actions need to be run.
396 """
397 title = "check_actions"
398 if self.background_tasks is None:
399 return None
400 self.disp.log_debug("Checking for actions that need to be run.", title)
401 self.disp.log_debug("Checked for actions that needed to be run", title)
402 return None
None _test_current_date(self, *Any args)
Definition crons.py:147
None clean_expired_verification_nodes(self)
Definition crons.py:233
Optional[OAuthAuthentication] oauth_authentication_initialised
Definition crons.py:66
RuntimeManager runtime_manager
Definition crons.py:54
BackgroundTasks background_tasks
Definition crons.py:61
None __init__(self, int error=84, int success=0, bool debug=False)
Definition crons.py:45
Optional[BoilerplateNonHTTP] boilerplate_non_http_initialised
Definition crons.py:71