Cat Feeder  1.0.0
The Cat feeder project
Loading...
Searching...
No Matches
background_tasks.py
Go to the documentation of this file.
1r"""
2# +==== BEGIN CatFeeder =================+
3# LOGO:
4# ..............(..../\
5# ...............)..(.')
6# ..............(../..)
7# ...............\‍(__)|
8# Inspired by Joan Stark
9# source https://www.asciiart.eu/
10# animals/cats
11# /STOP
12# PROJECT: CatFeeder
13# FILE: background_tasks.py
14# CREATION DATE: 11-10-2025
15# LAST Modified: 14:43:26 19-12-2025
16# DESCRIPTION:
17# This is the backend server in charge of making the actual website work.
18# /STOP
19# COPYRIGHT: (c) Cat Feeder
20# PURPOSE: File in charge of setting up the cron jobs for the server.
21# // AR
22# +==== END CatFeeder =================+
23"""
24
25
26from typing import Union, Any, Dict, Tuple, Callable
27from apscheduler.job import Job
28from apscheduler.schedulers.background import BackgroundScheduler
29from apscheduler.schedulers import SchedulerAlreadyRunningError, SchedulerNotRunningError
30from display_tty import Disp, initialise_logger
31from ..core import FinalClass
32
33
34class BackgroundTasks(metaclass=FinalClass):
35 """_summary_
36 This is the class that is in charge of scheduling background tasks that need to run on intervals
37 """
38
39 disp: Disp = initialise_logger(__qualname__, False)
40
41 def __init__(self, success: int = 0, error: int = 84, debug: bool = False) -> None:
42 # ------------------------ The logging function ------------------------
43 self.disp.update_disp_debug(debug)
44 self.disp.log_debug("Initialising...")
45 # -------------------------- Inherited values --------------------------
46 self.success: int = success
47 self.error: int = error
48 self.debug: bool = debug
49 # ------------------------ The scheduler class ------------------------
50 self.scheduler = BackgroundScheduler()
51 self.disp.log_debug("Initialised")
52
53 def __del__(self) -> None:
54 """_summary_
55 The destructor of the class
56 """
57 self.disp.log_info("Stopping background tasks.", "__del__")
58 exit_code = self.safe_stop()
59 msg = f"The cron exited with status {exit_code}."
60 if exit_code != self.success:
61 self.disp.log_error(msg, "__del__")
62 else:
63 self.disp.log_debug(msg, "__del__")
64
65 def safe_add_task(self, func: Callable, args: Union[Tuple, None] = None, kwargs: Union[Dict, None] = None, trigger: Union[str, Any] = "interval", seconds: int = 5) -> Union[int, Job]:
66 """_summary_
67 A non-crashing implementation of the add_task function.
68
69 Args:
70 func (Callable): _description_: The function to be called when it is time to run the job
71 args (Union[Tuple, None], optional): _description_. Defaults to None.: Arguments you wish to pass to the function when executed.
72 kwargs (Union[Dict, None], optional): _description_. Defaults to None.: Arguments you wish to pass to the function when executed.
73 trigger (Union[str, Any], optional): _description_. Defaults to "interval".
74 seconds (int, optional): _description_. Defaults to 5. The amount of seconds to wait before executing the task again (I don't think it is effective for the cron option)
75
76 Returns:
77 Union[int, Job]: _description_: returns self.error if there was an error, otherwise, returns a Job instance.
78 """
79 try:
80 data = self.add_task(
81 func=func,
82 args=args,
83 kwargs=kwargs,
84 trigger=trigger,
85 seconds=seconds
86 )
87 if data is None:
88 self.disp.log_error(
89 "The add_task function returned None, which is unexpected.",
90 "safe_add_task"
91 )
92 return self.error
93 return data
94 except ValueError as e:
95 self.disp.log_error(
96 f"Runtime Error for add_task. {e}",
97 "safe_add_task"
98 )
99 return self.error
100
101 def safe_start(self) -> int:
102 """_summary_
103 This function is in charge of starting the scheduler. In a non-breaking way.
104
105 Returns:
106 int: _description_: Will return self.success if it worked, otherwise self.error.
107 """
108 try:
109 data = self.start()
110 if data is None:
111 self.disp.log_error(
112 "The start function returned None, which is unexpected.",
113 "safe_start"
114 )
115 return self.error
116 return data
117 except RuntimeError as e:
118 self.disp.log_error(
119 f"Runtime Error for start. {e}",
120 "safe_start"
121 )
122 return self.error
123
124 def safe_pause(self, pause: bool = True) -> int:
125 """_summary_
126 This function is in charge of pausing the scheduler. In a non-breaking way.
127
128 Args:
129 pause (bool, optional): _description_: This is the boolean that will determine if the scheduler should be paused or not. Defaults to True.
130
131 Returns:
132 int: _description_: Will return self.success if it worked, otherwise self.error
133 """
134 try:
135 data = self.pause(pause=pause)
136 if data is None:
137 self.disp.log_error(
138 "The pause function returned None, which is unexpected.",
139 "safe_pause"
140 )
141 return self.error
142 return data
143 except RuntimeError as e:
144 self.disp.log_error(
145 f"Runtime Error for start. {e}",
146 "safe_pause"
147 )
148 return self.error
149
150 def safe_resume(self) -> int:
151 """_summary_
152 This function is in charge of resuming the scheduler. In a non-breaking way.
153
154 Returns:
155 int: _description_: Will return self.success if it worked, otherwise self.error.
156 """
157 try:
158 data = self.resume()
159 if data is None:
160 self.disp.log_error(
161 "The resume function returned None, which is unexpected.",
162 "safe_resume"
163 )
164 return self.error
165 return data
166 except RuntimeError as e:
167 self.disp.log_error(
168 f"Runtime Error for start. {e}",
169 "safe_resume"
170 )
171 return self.error
172
173 def safe_stop(self, wait: bool = True) -> int:
174 """_summary_
175 This function is in charge of stopping the scheduler. In a non-breaking way.
176
177 Args:
178 wait (bool, optional): _description_: Wait for the running tasks to finish. Defaults to True.
179
180 Returns:
181 int: _description_: will return self.success if it succeeds, otherwise self.error
182 """
183 try:
184 data = self.stop(wait=wait)
185 if data is None:
186 self.disp.log_error(
187 "The stop function returned None, which is unexpected.",
188 "safe_stop"
189 )
190 return self.error
191 return data
192 except RuntimeError as e:
193 self.disp.log_error(
194 f"Runtime Error for start. {e}",
195 "safe_stop"
196 )
197 return self.error
198
199 def _to_dict(self, data: Union[Any, None] = None) -> dict:
200 """_summary_
201 Convert any data input into a dictionnary.
202 Args:
203 data (Union[Any, None], optional): _description_. Defaults to None. This is the data you are providing.
204
205 Returns:
206 dict: _description_: A dictionnary crea ted with what could be extracted from the data.
207 """
208 if data is None:
209 return {"none": None}
210 if isinstance(data, dict) is True:
211 return data
212 if isinstance(data, (list, tuple)) is True:
213 res = {}
214 for i in list(data):
215 res[i] = None
216 return res
217 return {"data": data}
218
219 def add_task(self, func: Callable, args: Union[Tuple, None] = None, kwargs: Union[Dict, None] = None, trigger: Union[str, Any] = "interval", seconds: int = 5) -> Union[Job, None]:
220 """_summary_
221 Function in charge of adding an automated call to functions that are meant to run in the background.
222 They are meant to run on interval.
223
224 Args:
225 func (Callable): _description_: The function to be called when it is time to run the job
226 args (Union[Tuple, None], optional): _description_. Defaults to None.: Arguments you wish to pass to the function when executed.
227 kwargs (Union[Dict, None], optional): _description_. Defaults to None.: Arguments you wish to pass to the function when executed.
228 trigger (Union[str, Any], optional): _description_. Defaults to "interval".
229 seconds (int, optional): _description_. Defaults to 5. The amount of seconds to wait before executing the task again (I don't think it is effective for the cron option)
230
231 Returns:
232 Union[int,Job]: _description_: will raise a ValueError when an error occurs, otherwise, returns an instance of Job.
233 """
234 if callable(func) is False:
235 self.disp.log_error(
236 f"The provided function is not callable: {func}.",
237 "add_task"
238 )
239 raise ValueError("The function must be callable.")
240 if args is not None and isinstance(args, tuple) is False:
241 msg = f"The provided args for {func.__name__} are not tuples.\n"
242 msg += f"Converting args: '{args}' to tuples."
243 self.disp.log_warning(msg, "add_task")
244 args = tuple((args,))
245 if kwargs is not None and isinstance(kwargs, dict) is False:
246 msg = f"The provided kwargs for {func.__name__}"
247 msg += "are not dictionaries.\n"
248 msg += f"Converting kwargs: '{kwargs}' to dictionaries."
249 self.disp.log_warning(msg, "add_task")
250 kwargs = self._to_dict(kwargs)
251 self.disp.log_warning(f"Converted data = {kwargs}.", "add_task")
252 if trigger is not None and isinstance(trigger, str) is False:
253 self.disp.log_error(
254 f"The provided trigger is not a string: {trigger}.",
255 "add_task"
256 )
257 raise ValueError("The trigger must be a string.")
258 if isinstance(seconds, int) is False:
259 self.disp.log_error(
260 f"The provided seconds is not an integer: {seconds}.",
261 "add_task"
262 )
263 raise ValueError("The seconds must be an integer.")
264 msg = f"Adding job: {func.__name__} "
265 msg += f"with trigger: {trigger}, "
266 msg += f"seconds = {seconds}, "
267 msg += f"args = {args}, "
268 msg += f"kwargs = {kwargs}."
269 self.disp.log_debug(msg, "add_task")
270 return self.scheduler.add_job(
271 func=func,
272 trigger=trigger,
273 seconds=seconds,
274 args=args,
275 kwargs=kwargs
276 )
277
278 def start(self) -> Union[int, None]:
279 """_summary_
280 The function in charge of starting the scheduler loop.
281
282 Raises:
283 RuntimeError: _description_: Will raise a runtile error if the underlying functions failled.
284
285 Returns:
286 Union[int, None]: _description_: Will return self.success if it worked, otherwise None because it will have raised an error.
287 """
288 try:
289 self.scheduler.start()
290 self.disp.log_info("Scheduler started...", "start")
291 return self.success
292 except SchedulerAlreadyRunningError:
293 self.disp.log_info("Scheduler is already running...", "start")
294 return self.success
295 except RuntimeError as e:
296 self.disp.log_error(
297 f"An error occurred while starting the scheduler: {e}",
298 "start"
299 )
300 msg = f"Error({self.__class__.__name__}): "
301 msg += "Failed to call the scheduler's start wrapper function."
302 raise RuntimeError(msg) from e
303 except Exception as e:
304 self.disp.log_error(
305 f"An error occurred while starting the scheduler: {e}", "start"
306 )
307 msg = f"Error({self.__class__.__name__}): "
308 msg += "Failed to call the scheduler's start wrapper function."
309 raise RuntimeError(msg) from e
310
311 def pause(self, pause: bool = True) -> Union[int, None]:
312 """_summary_
313 This function is in charge of pausing the scheduler if it was running.
314
315 Args:
316 pause (bool, optional): _description_: This is the boolean that will determine if the scheduler should be paused or not. Defaults to True.
317
318 Returns:
319 Union[int, None]: _description_: Will return self.success if it worked, otherwise None because it will have raised an error.
320 """
321 try:
322 if pause is True:
323 self.scheduler.pause()
324 self.disp.log_info("Scheduler paused.", "pause")
325 else:
326 self.scheduler.resume()
327 self.disp.log_info("Scheduler resumed.", "pause")
328 return self.success
329 except Exception as e:
330 self.disp.log_error(
331 f"An error occurred while pausing the scheduler: {e}",
332 "pause"
333 )
334 msg = f"Error({self.__class__.__name__}): "
335 msg += "Failed to call the chron pause wrapper function."
336 raise RuntimeError(msg) from e
337
338 def resume(self) -> Union[int, None]:
339 """_summary_
340 This function is in charge of resuming the scheduler loop if it was paused.
341
342 Returns:
343 Union[int]: _description_: Will return self.success if it worked, otherwise None because it will have raised an error.
344 """
345 return self.pause(pause=False)
346
347 def stop(self, wait: bool = True) -> Union[int, None]:
348 """_summary_
349 This function is responsible for shutting down the scheduler, terminating any running jobs, and optionally waiting for those jobs to complete before exiting.
350
351 Args:
352 wait (bool, optional): _description_. Defaults to True. Wait for the running tasks to finish.
353
354 Raises:
355 RuntimeError: _description_: The function failed to call the underlying processes that were required for it to run.
356
357 Returns:
358 Union[int, None]: _description_: will return self.success if it succeeds, or none if it raised an error.
359 """
360 try:
361 self.scheduler.shutdown(wait=wait)
362 self.disp.log_info("Scheduler stopped.", "stop")
363 return self.success
364 except SchedulerNotRunningError:
365 self.disp.log_info("Scheduler is already stopped.", "stop")
366 return self.success
367 except Exception as e:
368 self.disp.log_error(
369 f"An error occurred while stopping the scheduler: {e}", "stop"
370 )
371 msg = f"Error({self.__class__.__name__}): "
372 msg += "Failed to call the chron stop wrapper function."
373 raise RuntimeError(msg) from e
374
375
376if __name__ == "__main__":
377 import sys
378 from time import sleep
379 from datetime import datetime
380
381 def test_current_date(*args: Any) -> None:
382 """_summary_
383 This is a test function that will print the current date.
384 Args:
385 date (datetime): _description_
386 """
387 if len(args) >= 1:
388 date = args[0]
389 else:
390 date = datetime.now()
391 if callable(date):
392 print(f"(test_current_date) (Called) Current date: {date()}")
393 else:
394 print(f"(test_current_date) (Not called) Current date: {date}",)
395
396 def hello_world() -> None:
397 """_summary_
398 This is a test function that will print "Hello, World!"
399 """
400 print("Hello, World!")
401
402 def pending_world() -> None:
403 """_summary_
404 This is a test function that will print "Pending, World!"
405 """
406 print("Pending, World!")
407
408 def goodbye_world() -> None:
409 """_summary_
410 This is a test function that will print "Goodbye, World!"
411 """
412 print("Goodbye, World!")
413
414 print("Testing declared functions.")
419 print("Declared functions tested.")
420
421 SUCCES = 0
422 ERROR = 84
423 DEBUG = True
424 KIND_KILL = True
425 NB_REPEATS = 2
426 TRIGGER = "interval"
427 SECONDS = 2
428 NB_FUNCTIONS = 5
429 MAIN_THREAD_DELAY = int((SECONDS*NB_FUNCTIONS)*NB_REPEATS)
430
431 print(
432 f"Statuses:\nSUCCESS = {SUCCES}, ERROR = {ERROR}\n"
433 f"DEBUG = {DEBUG}, KIND_KILL = {KIND_KILL}, "
434 f"NB_REPEATS = {NB_REPEATS}, "
435 f"TRIGGER = {TRIGGER}, SECONDS = {SECONDS}, "
436 f"NB_FUNCTIONS = {NB_FUNCTIONS}, "
437 f"MAIN_THREAD_DELAY = {MAIN_THREAD_DELAY}"
438 )
439
440 print("Initialising class BackgroundTasks.")
442 success=SUCCES,
443 error=ERROR,
444 debug=DEBUG
445 )
446 print("Class BackgroundTasks initialised.")
447
448 print("Adding tasks to the scheduler.")
449 status = BTI.safe_add_task(
450 func=test_current_date,
451 args=(datetime.now,),
452 kwargs=None,
453 trigger=TRIGGER,
454 seconds=SECONDS
455 )
456 print(f"status {status}")
457 status = BTI.add_task(
458 hello_world,
459 args=None,
460 kwargs=None,
461 trigger=TRIGGER,
462 seconds=SECONDS
463 )
464 print(f"status {status}")
465 status = BTI.safe_add_task(
466 pending_world,
467 args=None,
468 kwargs=None,
469 trigger=TRIGGER,
470 seconds=SECONDS
471 )
472 print(f"status {status}")
473 status = BTI.add_task(
474 goodbye_world,
475 args=None,
476 kwargs=None,
477 trigger=TRIGGER,
478 seconds=SECONDS
479 )
480 print(f"status {status}")
481 status = BTI.add_task(
482 func=test_current_date,
483 args=(datetime.now,),
484 kwargs=None,
485 trigger=TRIGGER,
486 seconds=SECONDS
487 )
488 print(f"status {status}")
489 print("Added tasks to the scheduler.")
490
491 print("Startins scheduler.")
492 print(f"Status: {BTI.safe_start()}")
493 print("Scheduler started.")
494 print(f"Waiting {MAIN_THREAD_DELAY} on the main thread.")
495 sleep(MAIN_THREAD_DELAY)
496 print(f"Waited {MAIN_THREAD_DELAY} on the main thread.")
497 print("Stopping scheduler.")
498 status = BTI.safe_stop(KIND_KILL)
499 print(f"Status: {status}")
500 sys.exit(status)
None __init__(self, int success=0, int error=84, bool debug=False)
Union[int, Job] safe_add_task(self, Callable func, Union[Tuple, None] args=None, Union[Dict, None] kwargs=None, Union[str, Any] trigger="interval", int seconds=5)
Union[int, None] pause(self, bool pause=True)
Union[Job, None] add_task(self, Callable func, Union[Tuple, None] args=None, Union[Dict, None] kwargs=None, Union[str, Any] trigger="interval", int seconds=5)
dict _to_dict(self, Union[Any, None] data=None)