2# +==== BEGIN CatFeeder =================+
5# ...............)..(.')
7# ...............\(__)|
8# Inspired by Joan Stark
9# source https://www.asciiart.eu/
13# FILE: background_tasks.py
14# CREATION DATE: 11-10-2025
15# LAST Modified: 14:43:26 19-12-2025
17# This is the backend server in charge of making the actual website work.
19# COPYRIGHT: (c) Cat Feeder
20# PURPOSE: File in charge of setting up the cron jobs for the server.
22# +==== END CatFeeder =================+
26from typing
import Union, Any, Dict, Tuple, Callable
27from apscheduler.job
import Job
28from apscheduler.schedulers.background
import BackgroundScheduler
29from apscheduler.schedulers
import SchedulerAlreadyRunningError, SchedulerNotRunningError
30from display_tty
import Disp, initialise_logger
31from ..core
import FinalClass
36 This is the class that is in charge of scheduling background tasks that need to run on intervals
39 disp: Disp = initialise_logger(__qualname__,
False)
41 def __init__(self, success: int = 0, error: int = 84, debug: bool =
False) ->
None:
43 self.
disp.update_disp_debug(debug)
44 self.
disp.log_debug(
"Initialising...")
51 self.
disp.log_debug(
"Initialised")
55 The destructor of the class
57 self.
disp.log_info(
"Stopping background tasks.",
"__del__")
59 msg = f
"The cron exited with status {exit_code}."
61 self.
disp.log_error(msg,
"__del__")
63 self.
disp.log_debug(msg,
"__del__")
65 def safe_add_task(self, func: Callable, args: Union[Tuple,
None] =
None, kwargs: Union[Dict,
None] =
None, trigger: Union[str, Any] =
"interval", seconds: int = 5) -> Union[int, Job]:
67 A non-crashing implementation of the add_task function.
70 func (Callable): _description_: The function to be called when it is time to run the job
71 args (Union[Tuple, None], optional): _description_. Defaults to None.: Arguments you wish to pass to the function when executed.
72 kwargs (Union[Dict, None], optional): _description_. Defaults to None.: Arguments you wish to pass to the function when executed.
73 trigger (Union[str, Any], optional): _description_. Defaults to "interval".
74 seconds (int, optional): _description_. Defaults to 5. The amount of seconds to wait before executing the task again (I don't think it is effective for the cron option)
77 Union[int, Job]: _description_: returns self.error if there was an error, otherwise, returns a Job instance.
89 "The add_task function returned None, which is unexpected.",
94 except ValueError
as e:
96 f
"Runtime Error for add_task. {e}",
103 This function is in charge of starting the scheduler. In a non-breaking way.
106 int: _description_: Will return self.success if it worked, otherwise self.error.
112 "The start function returned None, which is unexpected.",
117 except RuntimeError
as e:
119 f
"Runtime Error for start. {e}",
126 This function is in charge of pausing the scheduler. In a non-breaking way.
129 pause (bool, optional): _description_: This is the boolean that will determine if the scheduler should be paused or not. Defaults to True.
132 int: _description_: Will return self.success if it worked, otherwise self.error
135 data = self.
pause(pause=pause)
138 "The pause function returned None, which is unexpected.",
143 except RuntimeError
as e:
145 f
"Runtime Error for start. {e}",
152 This function is in charge of resuming the scheduler. In a non-breaking way.
155 int: _description_: Will return self.success if it worked, otherwise self.error.
161 "The resume function returned None, which is unexpected.",
166 except RuntimeError
as e:
168 f
"Runtime Error for start. {e}",
175 This function is in charge of stopping the scheduler. In a non-breaking way.
178 wait (bool, optional): _description_: Wait for the running tasks to finish. Defaults to True.
181 int: _description_: will return self.success if it succeeds, otherwise self.error
184 data = self.
stop(wait=wait)
187 "The stop function returned None, which is unexpected.",
192 except RuntimeError
as e:
194 f
"Runtime Error for start. {e}",
199 def _to_dict(self, data: Union[Any,
None] =
None) -> dict:
201 Convert any data input into a dictionnary.
203 data (Union[Any, None], optional): _description_. Defaults to None. This is the data you are providing.
206 dict: _description_: A dictionnary crea ted with what could be extracted from the data.
209 return {
"none":
None}
210 if isinstance(data, dict)
is True:
212 if isinstance(data, (list, tuple))
is True:
217 return {
"data": data}
219 def add_task(self, func: Callable, args: Union[Tuple,
None] =
None, kwargs: Union[Dict,
None] =
None, trigger: Union[str, Any] =
"interval", seconds: int = 5) -> Union[Job,
None]:
221 Function in charge of adding an automated call to functions that are meant to run in the background.
222 They are meant to run on interval.
225 func (Callable): _description_: The function to be called when it is time to run the job
226 args (Union[Tuple, None], optional): _description_. Defaults to None.: Arguments you wish to pass to the function when executed.
227 kwargs (Union[Dict, None], optional): _description_. Defaults to None.: Arguments you wish to pass to the function when executed.
228 trigger (Union[str, Any], optional): _description_. Defaults to "interval".
229 seconds (int, optional): _description_. Defaults to 5. The amount of seconds to wait before executing the task again (I don't think it is effective for the cron option)
232 Union[int,Job]: _description_: will raise a ValueError when an error occurs, otherwise, returns an instance of Job.
234 if callable(func)
is False:
236 f
"The provided function is not callable: {func}.",
239 raise ValueError(
"The function must be callable.")
240 if args
is not None and isinstance(args, tuple)
is False:
241 msg = f
"The provided args for {func.__name__} are not tuples.\n"
242 msg += f
"Converting args: '{args}' to tuples."
243 self.
disp.log_warning(msg,
"add_task")
244 args = tuple((args,))
245 if kwargs
is not None and isinstance(kwargs, dict)
is False:
246 msg = f
"The provided kwargs for {func.__name__}"
247 msg +=
"are not dictionaries.\n"
248 msg += f
"Converting kwargs: '{kwargs}' to dictionaries."
249 self.
disp.log_warning(msg,
"add_task")
251 self.
disp.log_warning(f
"Converted data = {kwargs}.",
"add_task")
252 if trigger
is not None and isinstance(trigger, str)
is False:
254 f
"The provided trigger is not a string: {trigger}.",
257 raise ValueError(
"The trigger must be a string.")
258 if isinstance(seconds, int)
is False:
260 f
"The provided seconds is not an integer: {seconds}.",
263 raise ValueError(
"The seconds must be an integer.")
264 msg = f
"Adding job: {func.__name__} "
265 msg += f
"with trigger: {trigger}, "
266 msg += f
"seconds = {seconds}, "
267 msg += f
"args = {args}, "
268 msg += f
"kwargs = {kwargs}."
269 self.
disp.log_debug(msg,
"add_task")
278 def start(self) -> Union[int, None]:
280 The function in charge of starting the scheduler loop.
283 RuntimeError: _description_: Will raise a runtile error if the underlying functions failled.
286 Union[int, None]: _description_: Will return self.success if it worked, otherwise None because it will have raised an error.
290 self.
disp.log_info(
"Scheduler started...",
"start")
292 except SchedulerAlreadyRunningError:
293 self.
disp.log_info(
"Scheduler is already running...",
"start")
295 except RuntimeError
as e:
297 f
"An error occurred while starting the scheduler: {e}",
300 msg = f
"Error({self.__class__.__name__}): "
301 msg +=
"Failed to call the scheduler's start wrapper function."
302 raise RuntimeError(msg)
from e
303 except Exception
as e:
305 f
"An error occurred while starting the scheduler: {e}",
"start"
307 msg = f
"Error({self.__class__.__name__}): "
308 msg +=
"Failed to call the scheduler's start wrapper function."
309 raise RuntimeError(msg)
from e
311 def pause(self, pause: bool =
True) -> Union[int,
None]:
313 This function is in charge of pausing the scheduler if it was running.
316 pause (bool, optional): _description_: This is the boolean that will determine if the scheduler should be paused or not. Defaults to True.
319 Union[int, None]: _description_: Will return self.success if it worked, otherwise None because it will have raised an error.
324 self.
disp.log_info(
"Scheduler paused.",
"pause")
327 self.
disp.log_info(
"Scheduler resumed.",
"pause")
329 except Exception
as e:
331 f
"An error occurred while pausing the scheduler: {e}",
334 msg = f
"Error({self.__class__.__name__}): "
335 msg +=
"Failed to call the chron pause wrapper function."
336 raise RuntimeError(msg)
from e
340 This function is in charge of resuming the scheduler loop if it was paused.
343 Union[int]: _description_: Will return self.success if it worked, otherwise None because it will have raised an error.
345 return self.
pause(pause=
False)
347 def stop(self, wait: bool =
True) -> Union[int,
None]:
349 This function is responsible for shutting down the scheduler, terminating any running jobs, and optionally waiting for those jobs to complete before exiting.
352 wait (bool, optional): _description_. Defaults to True. Wait for the running tasks to finish.
355 RuntimeError: _description_: The function failed to call the underlying processes that were required for it to run.
358 Union[int, None]: _description_: will return self.success if it succeeds, or none if it raised an error.
362 self.
disp.log_info(
"Scheduler stopped.",
"stop")
364 except SchedulerNotRunningError:
365 self.
disp.log_info(
"Scheduler is already stopped.",
"stop")
367 except Exception
as e:
369 f
"An error occurred while stopping the scheduler: {e}",
"stop"
371 msg = f
"Error({self.__class__.__name__}): "
372 msg +=
"Failed to call the chron stop wrapper function."
373 raise RuntimeError(msg)
from e
376if __name__ ==
"__main__":
378 from time
import sleep
379 from datetime
import datetime
383 This is a test function that will print the current date.
385 date (datetime): _description_
390 date = datetime.now()
392 print(f
"(test_current_date) (Called) Current date: {date()}")
394 print(f
"(test_current_date) (Not called) Current date: {date}",)
398 This is a test function that will print "Hello, World!"
400 print(
"Hello, World!")
404 This is a test function that will print "Pending, World!"
406 print(
"Pending, World!")
410 This is a test function that will print "Goodbye, World!"
412 print(
"Goodbye, World!")
414 print(
"Testing declared functions.")
419 print(
"Declared functions tested.")
429 MAIN_THREAD_DELAY = int((SECONDS*NB_FUNCTIONS)*NB_REPEATS)
432 f
"Statuses:\nSUCCESS = {SUCCES}, ERROR = {ERROR}\n"
433 f
"DEBUG = {DEBUG}, KIND_KILL = {KIND_KILL}, "
434 f
"NB_REPEATS = {NB_REPEATS}, "
435 f
"TRIGGER = {TRIGGER}, SECONDS = {SECONDS}, "
436 f
"NB_FUNCTIONS = {NB_FUNCTIONS}, "
437 f
"MAIN_THREAD_DELAY = {MAIN_THREAD_DELAY}"
440 print(
"Initialising class BackgroundTasks.")
446 print(
"Class BackgroundTasks initialised.")
448 print(
"Adding tasks to the scheduler.")
449 status = BTI.safe_add_task(
450 func=test_current_date,
451 args=(datetime.now,),
456 print(f
"status {status}")
457 status = BTI.add_task(
464 print(f
"status {status}")
465 status = BTI.safe_add_task(
472 print(f
"status {status}")
473 status = BTI.add_task(
480 print(f
"status {status}")
481 status = BTI.add_task(
482 func=test_current_date,
483 args=(datetime.now,),
488 print(f
"status {status}")
489 print(
"Added tasks to the scheduler.")
491 print(
"Startins scheduler.")
492 print(f
"Status: {BTI.safe_start()}")
493 print(
"Scheduler started.")
494 print(f
"Waiting {MAIN_THREAD_DELAY} on the main thread.")
495 sleep(MAIN_THREAD_DELAY)
496 print(f
"Waited {MAIN_THREAD_DELAY} on the main thread.")
497 print(
"Stopping scheduler.")
498 status = BTI.safe_stop(KIND_KILL)
499 print(f
"Status: {status}")
None __init__(self, int success=0, int error=84, bool debug=False)
Union[int, None] start(self)
int safe_pause(self, bool pause=True)
Union[int, None] stop(self, bool wait=True)
Union[int, Job] safe_add_task(self, Callable func, Union[Tuple, None] args=None, Union[Dict, None] kwargs=None, Union[str, Any] trigger="interval", int seconds=5)
Union[int, None] resume(self)
Union[int, None] pause(self, bool pause=True)
int safe_stop(self, bool wait=True)
Union[Job, None] add_task(self, Callable func, Union[Tuple, None] args=None, Union[Dict, None] kwargs=None, Union[str, Any] trigger="interval", int seconds=5)
dict _to_dict(self, Union[Any, None] data=None)
None test_current_date(*Any args)