Rotary Logger  1.0.2
The middleware rotary logger
Loading...
Searching...
No Matches
file_instance.py
Go to the documentation of this file.
1"""
2# +==== BEGIN rotary_logger =================+
3# LOGO:
4# ..........####...####..........
5# ......###.....#.#########......
6# ....##........#.###########....
7# ...#..........#.############...
8# ...#..........#.#####.######...
9# ..#.....##....#.###..#...####..
10# .#.....#.##...#.##..##########.
11# #.....##########....##...######
12# #.....#...##..#.##..####.######
13# .#...##....##.#.##..###..#####.
14# ..#.##......#.#.####...######..
15# ..#...........#.#############..
16# ..#...........#.#############..
17# ...##.........#.############...
18# ......#.......#.#########......
19# .......#......#.########.......
20# .........#####...#####.........
21# /STOP
22# PROJECT: rotary_logger
23# FILE: file_instance.py
24# CREATION DATE: 30-10-2025
25# LAST Modified: 10:44:15 27-03-2026
26# DESCRIPTION:
27# A module that provides a universal python light on iops way of logging to files your program execution.
28# /STOP
29# COPYRIGHT: (c) Asperguide
30# PURPOSE: This is the file containing the class that is in charge of handling writing and rotating files regardless of the number of external processes calling it
31# // AR
32# +==== END rotary_logger =================+
33"""
34
35import sys
36import os
37from datetime import datetime, timezone
38from pathlib import Path
39from typing import Optional, Union, Dict, List
40from threading import RLock
41from warnings import warn
42
43try:
44 from . import constants as CONST
45 from .rogger import Rogger, RI
46except ImportError:
47 import constants as CONST
48 from rogger import Rogger, RI
49
50
52 """Manage buffered writes, file descriptors, and log rotation.
53
54 Public methods are thread-safe and documented below. Writes are
55 appended to an in-memory buffer and flushed to disk when the
56 configured flush size is reached. Rotation is performed when the
57 underlying file grows beyond `max_size`.
58 """
59
61 self,
62 file_path: Optional[Union[str, Path, CONST.FileInfo]],
63 override: Optional[bool] = None,
64 merged: Optional[bool] = None,
65 encoding: Optional[str] = None,
66 prefix: Optional[CONST.Prefix] = None,
67 *,
68 max_size_mb: Optional[int] = None,
69 flush_size_kb: Optional[int] = None,
70 folder_prefix: Optional[CONST.StdMode] = None,
71 log_to_file: bool = True,
72 merge_stdin: Optional[bool] = None,
73 ) -> None:
74 """Create a FileInstance wrapper.
75
76 Arguments:
77 file_path (Optional[Union[str, Path, CONST.FileInfo]]): Initial file path, Path or FileInfo, or None to defer opening.
78
79 Keyword Arguments:
80 override (Optional[bool]): When True open files for write ('w') instead of append ('a'). Default: None
81 merged (Optional[bool]): Whether multiple streams should share the same file. Default: None
82 encoding (Optional[str]): Text encoding to use for file I/O. Default: None
83 prefix (Optional[CONST.Prefix]): Optional Prefix configuration to use when teeing. Default: None
84 max_size_mb (Optional[int]): Maximum logfile size in MB before rotation. Default: None
85 flush_size_kb (Optional[int]): Buffer flush threshold in KB. Default: None
86 folder_prefix (Optional[CONST.StdMode]): StdMode used to segregate per-stream subfolders. Default: None
87 log_to_file (bool): Whether file logging is enabled. Default: True
88 merge_stdin (Optional[bool]): Whether stdin is merged into the shared log file. Default: None
89 """
90
91 # per-instance mutable defaults (avoid sharing across instances)
92 self.rogger: Rogger = RI
93 self._file_lock_file_lock_file_lock: RLock = RLock()
94 self._mode: str = "a"
95 self._log_to_file: bool = log_to_file
96 self.filefile: Optional[CONST.FileInfo] = None
97 self.override: bool = False
98 self.merged: bool = True
99 self.merge_stdin: bool = False
100 self.encoding: str = CONST.DEFAULT_ENCODING
101 self.prefix: Optional[CONST.Prefix] = None
102 self.max_size: int = CONST.DEFAULT_LOG_MAX_FILE_SIZE
103 self.flush_size: int = CONST.BUFFER_FLUSH_SIZE
104 self.folder_prefix: Optional[CONST.StdMode] = None
105
106 self._buffer: List[str] = []
107 if override is not None:
108 self.set_override(override)
109 if merged is not None:
110 self.set_merged(merged)
111 if merge_stdin is not None:
112 self.set_merge_stdin(merge_stdin)
113 if encoding is not None:
114 self.set_encoding(encoding)
115 if prefix is not None:
116 self.set_prefix(prefix)
117 if max_size_mb is not None:
118 self._set_max_size(max_size_mb)
119 if flush_size_kb is not None:
120 self._set_flush_size(flush_size_kb)
121 if folder_prefix is not None:
122 self.set_folder_prefix(folder_prefix)
123 if file_path is not None:
124 self.set_filepath(file_path)
125 try:
126 self.rogger.log_success(
127 f"Initialized FileInstance with file_path={file_path}, override={override}, merged={merged}, encoding={encoding}, prefix={prefix}, max_size_mb={max_size_mb}, flush_size_kb={flush_size_kb}, folder_prefix={folder_prefix}, log_to_file={log_to_file}, merge_stdin={merge_stdin}",
128 stream=CONST.RAW_STDOUT
129 )
130 except (AttributeError, OSError, ValueError):
131 pass
132
133 def __del__(self) -> None:
134 """Best-effort cleanup on object deletion.
135
136 Attempt to close the current file descriptor if it exists and is
137 open. This method must never raise during interpreter shutdown
138 (where `__del__` may be called), so IO-related errors are
139 swallowed. After attempting to close the descriptor the internal
140 `file` reference is cleared.
141 """
142 if self.filefile and self.filefile.descriptor:
143 if not self.filefile.descriptor.closed:
144 try:
145 self._close_file()
146 except OSError:
147 # Swallow errors during destructor cleanup
148 pass
149 self.filefile = None
150
151 def set_log_to_file(self, log_to_file: bool = True, *, lock: bool = True) -> None:
152 """Enable or disable file logging.
153
154 When `log_to_file` is False no data will be written to disk even
155 if a file descriptor is open.
156
157 Arguments:
158 log_to_file (bool): Whether writes to the log file are enabled. Default: True
159
160 Keyword Arguments:
161 lock (bool): When True the instance lock is acquired while updating the flag. Default: True
162 """
163 if lock:
165 self._log_to_file: bool = log_to_file
166 try:
167 self.rogger.log_info(
168 f"set_log_to_file -> {log_to_file}",
169 stream=CONST.RAW_STDOUT
170 )
171 except (AttributeError, OSError, ValueError):
172 pass
173 return
174 self._log_to_file: bool = log_to_file
175 try:
176 self.rogger.log_info(
177 f"set_log_to_file -> {log_to_file}",
178 stream=CONST.RAW_STDOUT
179 )
180 except (AttributeError, OSError, ValueError):
181 pass
182
183 def set_max_size(self, max_size_mb: int, *, lock: bool = True) -> None:
184 """Public wrapper to set the maximum logfile size.
185
186 Delegates to `_set_max_size` which normalises the value to bytes
187 and applies range checks.
188
189 Arguments:
190 max_size_mb (int): Maximum size in megabytes. The value is normalised by `_set_max_size` and stored in `self.max_size` as bytes.
191
192 Keyword Arguments:
193 lock (bool): When True the instance lock is acquired while updating the configuration. Default: True
194 """
195 if lock:
197 self._set_max_size(max_size_mb)
198 try:
199 self.rogger.log_debug(
200 f"set_max_size -> {max_size_mb}MB",
201 stream=CONST.RAW_STDOUT
202 )
203 except (AttributeError, OSError, ValueError):
204 pass
205 return
206 self._set_max_size(max_size_mb)
207 try:
208 self.rogger.log_debug(
209 f"set_max_size -> {max_size_mb}MB",
210 stream=CONST.RAW_STDOUT
211 )
212 except (AttributeError, OSError, ValueError):
213 pass
214
215 def set_folder_prefix(self, folder_prefix: Optional[CONST.StdMode], *, lock: bool = True) -> None:
216 """Public setter for `folder_prefix`.
217
218 When `lock` is True the instance lock is held while the value is
219 updated. The internal method `_set_folder_prefix` performs the
220 validation and assignment.
221 """
222 if lock:
224 self._set_folder_prefix(folder_prefix)
225 return
226 self._set_folder_prefix(folder_prefix)
227
228 def set_flush_size(self, flush_size: int, *, lock: bool = True) -> None:
229 """Public wrapper to configure the buffer flush threshold.
230
231 `flush_size` is provided as a KB-like value and normalised by
232 `_set_flush_size`. If `lock` is True the operation is
233 performed while holding the instance lock.
234 """
235 if lock:
237 self._set_flush_size(flush_size)
238 try:
239 self.rogger.log_debug(
240 f"set_flush_size -> {flush_size}KB",
241 stream=CONST.RAW_STDOUT
242 )
243 except (AttributeError, OSError, ValueError):
244 pass
245 return
246 self._set_flush_size(flush_size)
247 try:
248 self.rogger.log_debug(
249 f"set_flush_size -> {flush_size}KB",
250 stream=CONST.RAW_STDOUT
251 )
252 except (AttributeError, OSError, ValueError):
253 pass
254
255 def set_merged(self, merged: bool, *, lock: bool = True) -> None:
256 """Enable or disable stream merging.
257
258 When `merged` is True multiple streams may share a single log
259 file; when False separate per-stream files are used. The
260 `lock` parameter controls whether the instance lock is
261 acquired.
262 """
263 if lock:
265 self.merged = bool(merged)
266 try:
267 self.rogger.log_info(
268 f"set_merged -> {bool(merged)}",
269 stream=CONST.RAW_STDOUT
270 )
271 except (AttributeError, OSError, ValueError):
272 pass
273 return
274 self.merged = bool(merged)
275 try:
276 self.rogger.log_info(
277 f"set_merged -> {bool(merged)}",
278 stream=CONST.RAW_STDOUT
279 )
280 except (AttributeError, OSError, ValueError):
281 pass
282
283 def set_merge_stdin(self, merge_stdin: bool, *, lock: bool = True) -> None:
284 """Enable or disable stdin merging into the shared log file.
285
286 When `merge_stdin` is True stdin data is written to the same file
287 as stdout/stderr; when False stdin is kept separate or not logged.
288 The `lock` parameter controls whether the instance lock is acquired.
289
290 Arguments:
291 merge_stdin (bool): Whether stdin should be merged into the shared log file.
292
293 Keyword Arguments:
294 lock (bool): When True the instance lock is acquired while updating the flag. Default: True
295 """
296 if lock:
298 self.merge_stdin = bool(merge_stdin)
299 try:
300 self.rogger.log_info(
301 f"set_merge_stdin -> {bool(merge_stdin)}",
302 stream=CONST.RAW_STDOUT
303 )
304 except (AttributeError, OSError, ValueError):
305 pass
306 return
307 self.merge_stdin = bool(merge_stdin)
308 try:
309 self.rogger.log_info(
310 f"set_merge_stdin -> {bool(merge_stdin)}",
311 stream=CONST.RAW_STDOUT
312 )
313 except (AttributeError, OSError, ValueError):
314 pass
315
316 def set_encoding(self, encoding: str, *, lock: bool = True) -> None:
317 """Set the text encoding used for file I/O.
318
319 Arguments:
320 encoding (str): A codec name such as 'utf-8'.
321
322 Keyword Arguments:
323 lock (bool): When True the change is performed while holding the instance lock. Default: True
324 """
325 if lock:
327 self.encoding = encoding
328 try:
329 self.rogger.log_info(
330 f"set_encoding -> {encoding}",
331 stream=CONST.RAW_STDOUT
332 )
333 except (AttributeError, OSError, ValueError):
334 pass
335 return
336 self.encoding = encoding
337 try:
338 self.rogger.log_info(
339 f"set_encoding -> {encoding}",
340 stream=CONST.RAW_STDOUT
341 )
342 except (AttributeError, OSError, ValueError):
343 pass
344
345 def set_prefix(self, prefix: Optional[CONST.Prefix], *, lock: bool = True) -> None:
346 """Public setter for `Prefix` configuration.
347
348 Copies the provided `prefix` into an internal `CONST.Prefix`
349 object. Use `get_prefix()` to obtain a safe copy of the
350 current configuration.
351 """
352 if lock:
354 self._set_prefix(prefix)
355 try:
356 self.rogger.log_debug(
357 f"set_prefix -> {prefix}",
358 stream=CONST.RAW_STDOUT
359 )
360 except (AttributeError, OSError, ValueError):
361 pass
362 return
363 self._set_prefix(prefix)
364 try:
365 self.rogger.log_debug(
366 f"set_prefix -> {prefix}",
367 stream=CONST.RAW_STDOUT
368 )
369 except (AttributeError, OSError, ValueError):
370 pass
371 return
372
373 def set_override(self, override: bool = False, *, lock: bool = True) -> None:
374 """Public setter for the file open mode.
375
376 When `override` is True files will be opened with mode 'w'
377 (overwrite); otherwise 'a' (append) is used. The lock
378 behaviour is controlled by `lock`.
379 """
380 _value: Dict[bool, str] = {
381 True: "w",
382 False: "a"
383 }
384 if lock:
386 self._set_mode(_value[bool(override)], lock=False)
387 try:
388 self.rogger.log_info(
389 f"set_override -> {bool(override)}",
390 stream=CONST.RAW_STDOUT
391 )
392 except (AttributeError, OSError, ValueError):
393 pass
394 return
395 self._set_mode(_value[bool(override)], lock=False)
396 try:
397 self.rogger.log_info(
398 f"set_override -> {bool(override)}",
399 stream=CONST.RAW_STDOUT
400 )
401 except (AttributeError, OSError, ValueError):
402 pass
403
404 def set_filepath(self, file_path: Optional[Union[str, Path, CONST.FileInfo]], *, lock: bool = True) -> None:
405 """Set or clear the active file/file path for this instance.
406
407 Arguments:
408 file_path (Optional[Union[str, Path, CONST.FileInfo]]): A path-like object, a `CONST.FileInfo` instance describing an already-open file, or None to clear the current file.
409
410 Keyword Arguments:
411 lock (bool): When True the instance lock is held while the change is applied. Default: True
412 """
413 if not file_path:
414 if lock:
416 self._close_file(lock=False)
417 self.filefile = None
418 try:
419 self.rogger.log_info(
420 "set_filepath -> cleared file_path",
421 stream=CONST.RAW_STDOUT
422 )
423 except (AttributeError, OSError, ValueError):
424 pass
425 return
426 self._close_file(lock=False)
427 self.filefile = None
428 try:
429 self.rogger.log_info(
430 "set_filepath -> cleared file_path",
431 stream=CONST.RAW_STDOUT
432 )
433 except (AttributeError, OSError, ValueError):
434 pass
435 return
436 if lock:
438 self._set_filepath_child(file_path)
439 try:
440 self.rogger.log_info(
441 f"set_filepath -> set to {file_path}",
442 stream=CONST.RAW_STDOUT
443 )
444 except (AttributeError, OSError, ValueError):
445 pass
446 return
447 self._set_filepath_child(file_path)
448 try:
449 self.rogger.log_info(
450 f"set_filepath -> set to {file_path}",
451 stream=CONST.RAW_STDOUT
452 )
453 except (AttributeError, OSError, ValueError):
454 pass
455
456 def get_log_to_file(self, *, lock: bool = True) -> bool:
457 """Return True when file logging is enabled.
458
459 Keyword Arguments:
460 lock (bool): When True the instance lock is acquired before reading the flag. Default: True
461
462 Returns:
463 True if writes to the log file are enabled, False otherwise.
464 """
465 if lock:
467 return self._log_to_file
468 return self._log_to_file
469
470 def get_mode(self, *, lock: bool = True) -> str:
471 """Return the current file open mode ('w' or 'a').
472
473 When `lock` is True the instance lock is acquired prior to
474 reading the value.
475 """
476 if lock:
478 return self._mode
479 return self._mode
480
481 def get_merged(self, *, lock: bool = True) -> bool:
482 """Return the merged flag (True when streams share a file).
483
484 The optional `lock` parameter controls whether the instance
485 lock is held while reading the value.
486 """
487 if lock:
489 return self.merged
490 return self.merged
491
492 def get_merge_stdin(self, *, lock: bool = True) -> bool:
493 """Return the merge_stdin flag (True when stdin is merged into the shared log file).
494
495 Keyword Arguments:
496 lock (bool): When True the instance lock is held while reading the value. Default: True
497
498 Returns:
499 True if stdin is merged into the shared log file, False otherwise.
500 """
501 if lock:
503 return self.merge_stdin
504 return self.merge_stdin
505
506 def get_encoding(self, *, lock: bool = True) -> str:
507 """Return the configured text encoding for file writes.
508
509 When `lock` is True the instance lock is held while the value
510 is read.
511 """
512 if lock:
514 return self.encoding
515 return self.encoding
516
517 def get_prefix(self, *, lock: bool = True) -> Optional[CONST.Prefix]:
518 """Return a safe copy of the current `Prefix` configuration.
519
520 The returned `CONST.Prefix` is a fresh object to avoid exposing
521 internal references. If no prefix is configured None is
522 returned. When `lock` is True the instance lock is held while
523 copying the object.
524 """
525 if self.prefix:
526 if lock:
528 _prefix = CONST.Prefix()
529 _prefix.std_in = self.prefix.std_in
530 _prefix.std_out = self.prefix.std_out
531 _prefix.std_err = self.prefix.std_err
532 return _prefix
533 _prefix = CONST.Prefix()
534 _prefix.std_in = self.prefix.std_in
535 _prefix.std_out = self.prefix.std_out
536 _prefix.std_err = self.prefix.std_err
537 return _prefix
538 return None
539
540 def get_override(self, *, lock: bool = True) -> bool:
541 """Return True when override mode ('w') is active.
542
543 This convenience maps internal mode strings to a boolean.
544 When `lock` is True the instance lock is held for the check.
545 """
546 _value: Dict[str, bool] = {
547 "w": True,
548 "a": False
549 }
550 if lock:
552 mode: str = self.get_mode(lock=False).lower()
553 if mode in _value:
554 return _value[mode]
555 self.rogger.log_error(
556 "Unsupported mode",
557 stream=CONST.RAW_STDERR
558 )
559
560 raise ValueError("Unsupported mode")
561 mode: str = self.get_mode(lock=False).lower()
562 if mode in _value:
563 return _value[mode]
564 self.rogger.log_error(
565 "Unsupported mode",
566 stream=CONST.RAW_STDERR
567 )
568 raise ValueError("Unsupported mode")
569
570 def get_filepath(self, *, lock: bool = True) -> Optional[CONST.FileInfo]:
571 """Return the internal `FileInfo` reference (may be None).
572
573 Note: the returned object may be shared; callers that need to
574 mutate it should take care to hold the instance lock or use
575 `copy()` to obtain an independent view.
576 """
577 if lock:
579 return self.filefile
580 return self.filefile
581
582 def get_flush_size(self, *, lock: bool = True) -> int:
583 """Return the configured buffer flush threshold in bytes."""
584 if lock:
586 return self.flush_size
587 return self.flush_size
588
589 def get_max_size(self, *, lock: bool = True) -> int:
590 """Return the configured maximum file size in bytes."""
591 if lock:
593 return self.max_size
594 return self.max_size
595
596 def get_folder_prefix(self, *, lock: bool = True) -> Optional[CONST.StdMode]:
597 """Return the configured folder prefix (StdMode) or None."""
598 if lock:
600 return self.folder_prefix
601 return self.folder_prefix
602
603 def update(self, file_data: Optional['FileInstance'], *, lock: bool = True) -> None:
604 """Public method to copy configuration from another instance.
605
606 This method acquires the lock by default and delegates to the
607 `_update` implementation which performs the actual field
608 assignments.
609 """
610 if lock:
612 self._update(file_data)
613 return
614 self._update(file_data)
615
616 def copy(self, *, lock: bool = True) -> "FileInstance":
617 """Return a shallow copy of this instance's configuration.
618
619 The returned `FileInstance` is a new object populated from the
620 current instance. By default the instance lock is acquired to
621 provide a consistent snapshot.
622 """
623 if lock:
625 return self._copy()
626 return self._copy()
627
628 def write(self, message: str) -> None:
629 """Append `message` to the internal buffer (thread-safe).
630
631 The message is encoded and counted toward `flush_size`. When the
632 buffer reaches the configured threshold a background flush is
633 triggered (performed synchronously inside `_flush_buffer()` but
634 with I/O outside the main lock to minimize blocking).
635 """
636 # append under lock, then decide whether to flush
638 self._buffer.append(message)
639 should = self._should_flush()
640 try:
641 self.rogger.log_debug(
642 f"write: appended {len(message)} chars, should_flush={should}",
643 stream=CONST.RAW_STDOUT
644 )
645 except (AttributeError, OSError, ValueError):
646 pass
647 if should:
648 self.rogger.log_debug(
649 "Flushing buffer",
650 stream=CONST.RAW_STDOUT
651 )
652 self._flush_buffer()
653
654 def flush(self):
655 """Flush any buffered log lines to disk immediately.
656
657 This is a blocking call that performs disk I/O; callers should
658 avoid calling it too frequently. Errors raised by the underlying
659 I/O are propagated as OSError or ValueError when appropriate.
660 """
661 try:
662 self.rogger.log_debug(
663 "flush: manual flush requested",
664 stream=CONST.RAW_STDOUT
665 )
666 except (AttributeError, OSError, ValueError):
667 pass
668 self._flush_buffer()
669
670 def _set_prefix(self, prefix: Optional[CONST.Prefix]) -> None:
671 """Set the internal `Prefix` object from an external one.
672
673 This internal setter copies boolean flag values from `prefix`
674 into a fresh `CONST.Prefix()` instance. The caller is
675 responsible for holding any required locks; this routine does
676 not perform locking itself.
677 """
678 if not prefix:
679 self.prefix = None
680 self.rogger.log_debug(
681 "Prefixes set to None",
682 stream=CONST.RAW_STDOUT
683 )
684 return
685 # internal setter assumes caller handles locking
686 self.rogger.log_debug(
687 "Setting prefixes",
688 stream=CONST.RAW_STDOUT
689 )
690 self.prefix = CONST.Prefix()
691 self.prefix.std_in = prefix.std_in
692 self.prefix.std_out = prefix.std_out
693 self.prefix.std_err = prefix.std_err
694 self.rogger.log_debug(
695 f"Prefixes set: {self.prefix}",
696 stream=CONST.RAW_STDOUT
697 )
698
699 def _set_folder_prefix(self, folder_prefix: Optional[CONST.StdMode]) -> None:
700 """Configure the per-stream folder prefix.
701
702 If `folder_prefix` is a valid `CONST.StdMode` value present in
703 `CONST.CORRECT_FOLDER`, it is stored; otherwise the stored
704 `folder_prefix` is cleared (set to None).
705 """
706 if folder_prefix and folder_prefix in CONST.CORRECT_FOLDER:
707 self.folder_prefix = folder_prefix
708 self.rogger.log_debug(
709 f"Folder prefix: {self.folder_prefix}",
710 stream=CONST.RAW_STDOUT
711 )
712 return
713 self.folder_prefix = None
714
715 def _set_mode(self, mode: str, *, lock: bool = True) -> None:
716 """Set the file mode to 'w' (overwrite) or 'a' (append).
717
718 The `lock` parameter indicates whether this function should
719 acquire `self._file_lock` before updating the internal mode.
720 Invalid input is ignored.
721 """
722 if lock:
724 if mode in ("w", "a"):
725 self._mode = mode
726 return
727 if mode in ("w", "a"):
728 self._mode = mode
729
730 def _set_max_size(self, max_size_mb: int) -> None:
731 """Configure the maximum file size used for rotation.
732
733 `max_size_mb` is interpreted as megabytes when reasonable; the
734 function attempts to coerce the input to an integer. Negative
735 or too-small values are corrected with warnings. The resulting
736 internal `self.max_size` is stored in bytes.
737 """
738 # Treat parameter as a count of megabytes (MB)
739 try:
740 _resp: int = int(max_size_mb)
741 except (ValueError, TypeError):
742 _resp = CONST.DEFAULT_LOG_MAX_FILE_SIZE
743 if _resp < 0:
744 warn("Max provided size cannot be negative, converting to positive")
745 _resp = abs(_resp)
746 if _resp < 1:
747 warn("Max provided size is smaller than 1 MB, using default max file size")
748 _resp = CONST.DEFAULT_LOG_MAX_FILE_SIZE
749 # Convert MB to bytes (if the provided number looks like MB)
750 if _resp < CONST.MB1:
751 self.max_size = _resp * CONST.MB1
752 else:
753 self.max_size = _resp
754
755 def _set_flush_size(self, flush_size_kb: int) -> None:
756 """Configure the flush threshold for buffered writes.
757
758 `flush_size_kb` is interpreted as kilobytes when reasonable; the
759 function coerces input to int, normalises negative values and
760 ensures the internal `self.flush_size` is stored in bytes.
761 """
762 # Treat parameter as a count of kilobytes (KB)
763 try:
764 _resp = int(flush_size_kb)
765 except (ValueError, TypeError):
766 _resp = CONST.DEFAULT_LOG_BUFFER_FLUSH_SIZE
767 if _resp < 0:
768 warn("Flush size cannot be negative, converting to positive")
769 _resp = abs(_resp)
770 if _resp < 1:
771 warn("Flush size is smaller than 1 KB, using default flush size")
772 _resp = CONST.DEFAULT_LOG_BUFFER_FLUSH_SIZE
773 # Convert KB to bytes
774 if _resp < CONST.KB1:
775 self.flush_size = _resp * CONST.KB1
776 else:
777 self.flush_size = _resp
778
779 def _set_filepath_child(self, file_path: Union[str, Path, CONST.FileInfo]) -> None:
780 """Internal routine to set the instance's file reference.
781
782 This method closes any previously-open file, clears internal
783 state, and opens the provided `file_path`. The `file_path` may
784 be a string/Path (in which case a new `FileInfo` is created)
785 or an existing `CONST.FileInfo` instance which may be re-opened
786 if necessary.
787 """
788 self._close_file(lock=False)
789 # ensure the internal reference is cleared
790 self.filefile = None
791 if isinstance(file_path, (str, Path)):
792 _path = Path(file_path)
793 self.filefile = self._open_file(_path)
794 elif isinstance(file_path, CONST.FileInfo):
795 self.filefile = file_path
796 if self.filefile:
797 if self.filefile.path and not self.filefile.descriptor:
798 self.filefile = self._open_file(self.filefile.path)
799
800 def _update(self, file_data: Optional['FileInstance']) -> None:
801 """Copy configuration values from another FileInstance.
802
803 This helper updates the receiver to match the provided
804 `file_data`. The method is intended to be called while holding
805 the caller's lock; it delegates to public setters with the
806 `lock=False` flag to avoid deadlocks.
807 """
808 if not file_data:
809 return
810 self.set_filepath(file_data.get_filepath(), lock=False)
811 self.set_override(file_data.get_override(), lock=False)
812 self.set_merged(file_data.get_merged(), lock=False)
813 self.set_encoding(file_data.get_encoding(), lock=False)
814 self.set_prefix(file_data.get_prefix(), lock=False)
815 self.set_max_size(file_data.get_max_size(), lock=False)
816 self.set_flush_size(file_data.get_flush_size(), lock=False)
817 self.set_folder_prefix(file_data.get_folder_prefix(), lock=False)
818 self.set_log_to_file(file_data.get_log_to_file(), lock=False)
819 self.set_merge_stdin(file_data.get_merge_stdin(), lock=False)
820
821 def _copy(self) -> "FileInstance":
822 """Return a shallow copy of this FileInstance configuration.
823
824 The returned `FileInstance` will share the same `FileInfo`
825 reference but will otherwise have the same configuration
826 values (mode, encoding, prefix, sizes). The copy is useful for
827 creating per-stream views of shared configuration.
828 """
829 tmp = FileInstance(None)
830 tmp.set_filepath(self.get_filepath(), lock=False)
831 tmp.set_override(self.get_override(), lock=False)
832 tmp.set_merged(self.get_merged(), lock=False)
833 tmp.set_encoding(self.get_encoding(), lock=False)
834 tmp.set_prefix(self.get_prefix(), lock=False)
835 tmp.set_flush_size(self.get_flush_size(), lock=False)
836 tmp.set_max_size(self.get_max_size(), lock=False)
837 tmp.set_folder_prefix(self.get_folder_prefix(), lock=False)
838 tmp.set_log_to_file(self.get_log_to_file(), lock=False)
839 tmp.set_merge_stdin(self.get_merge_stdin(), lock=False)
840 return tmp
841
842 def _get_current_date(self) -> datetime:
843 """Return the current UTC datetime used for naming files.
844
845 Centralising the time provider makes it easier to test
846 filename generation and ensures all timestamps use UTC.
847 """
848 return datetime.now(timezone.utc)
849
850 def _get_filename(self) -> str:
851 """Construct a timestamped log filename.
852
853 The filename format is driven by `CONST.FILE_LOG_DATE_FORMAT` and
854 uses the current UTC time returned by `_get_current_date()`.
855 """
856 return self._get_current_date().strftime(f"{CONST.FILE_LOG_DATE_FORMAT}.log")
857
858 def _should_flush(self) -> bool:
859 """Check whether the in-memory buffer has reached the flush threshold.
860
861 Returns:
862 True if the total encoded size of buffered lines meets or exceeds `flush_size`, False otherwise.
863 """
864 _tmp: int = 0
865 for line in self._buffer:
866 _tmp += len(line.encode(self.encoding))
867 return _tmp >= self.flush_size
868
869 def _refresh_written_bytes(self) -> None:
870 """Add the sizes of buffered lines to `file.written_bytes`.
871
872 This method is called after a successful write to update the
873 persisted byte counter. It encodes lines using the configured
874 encoding and falls back to 'utf-8' on lookup errors. The in-
875 memory buffer is cleared after accounting.
876 """
877 if not self.filefile:
878 return
879 for line in self._buffer:
880 try:
881 self.filefile.written_bytes += len(line.encode(self.encoding))
882 except LookupError:
883 self.filefile.written_bytes += len(line.encode('utf-8'))
884 self._buffer.clear()
885
886 def _should_rotate(self) -> bool:
887 """Check whether the current log file has exceeded the maximum size threshold.
888
889 Returns:
890 True if the file's `written_bytes` exceeds `max_size` or no file is open, False otherwise.
891 """
892 if self.filefile:
893 return self.filefile.written_bytes > self.max_size
894 return True
895
896 def _rotate_file(self) -> None:
897 """Rotate the current file if the bytes threshold is exceeded.
898
899 When rotation is needed the current descriptor is closed and a
900 fresh `FileInfo` is opened at a newly-created path returned by
901 `_create_log_path()`.
902 """
903 if self._should_rotate():
904 self._close_file()
905 log_path: Path = self._create_log_path()
906 self.filefile = self._open_file(log_path)
907
908 def _create_log_path(self, base_override: Optional[Path] = None) -> Path:
909 """Build the timestamped log file path and create the parent directories.
910
911 The path is organised as `<root>/logs/<year>/<month>/<day>/[<stream>/]<timestamp>.log`.
912 If `base_override` is provided it is used as the root; otherwise the instance's
913 current file path or the package directory is used as the fallback.
914
915 Keyword Arguments:
916 base_override (Optional[Path]): Override root directory for the log path. Default: None
917
918 Returns:
919 The resolved Path for the new log file.
920 """
921 # If a base_override is provided (for example when opening from
922 # `_open_file` with a user-supplied directory), prefer it. Otherwise
923 # fall back to the instance-configured path or package default.
924 base = None
925 now = self._get_current_date()
926 if base_override is not None:
927 _root: Path = base_override
928 else:
929 if self.filefile and isinstance(self.filefile.path, Path):
930 candidate = self.filefile.path
931 if candidate.suffix == '.log':
932 candidate.parent.mkdir(parents=True, exist_ok=True)
933 return candidate
934 base = candidate
935
936 now = self._get_current_date()
937 _root: Path = Path(__file__).parent
938 if base is not None:
939 _root = base
940 elif self.filefile and self.filefile.path:
941 _root = self.filefile.path
942
943 if _root.suffix == "" and CONST.LOG_FOLDER_BASE_NAME != _root.name:
944 _root = _root / CONST.LOG_FOLDER_BASE_NAME
945 elif _root.suffix != "" and CONST.LOG_FOLDER_BASE_NAME != _root.parent:
946 _root = _root.parent / CONST.LOG_FOLDER_BASE_NAME / _root.name
947
948 year_dir = _root / str(now.year)
949 month_dir = year_dir / f"{now.month:02d}"
950 day_dir = month_dir / f"{now.day:02d}"
951 if self.folder_prefix and self.folder_prefix in CONST.CORRECT_FOLDER:
952 day_dir = day_dir / CONST.CORRECT_FOLDER[self.folder_prefix]
953
954 # Snapshot the _log_to_file flag under lock, then perform mkdir
955 # outside the lock to avoid blocking other callers on filesystem I/O.
956 _should_create = False
958 _should_create = bool(self._log_to_file)
959
960 if _should_create:
961 day_dir.mkdir(parents=True, exist_ok=True)
962 filename = self._get_filename()
963 self.rogger.log_debug(
964 f"Determined file name: {filename}, determined file path: {day_dir}",
965 stream=CONST.RAW_STDOUT
966 )
967 return day_dir / filename
968
969 def _looks_like_directory(self, dir_path: Path) -> bool:
970 """Heuristic check to decide whether a path refers to a directory.
971
972 Returns True when the path has no file extension, already exists as a
973 directory, or ends with a path separator. Used to determine whether a
974 caller-supplied path should be treated as a folder (triggering
975 automatic log-file naming) or as an explicit file path.
976
977 Arguments:
978 dir_path (Path): Path to evaluate.
979
980 Returns:
981 True if the path appears to represent a directory, False otherwise.
982 """
983 if not str(dir_path):
984 return False
985 if dir_path.suffix == "":
986 return True
987 # If it already exists and is a directory, fine
988 if dir_path.exists():
989 return dir_path.is_dir()
990
991 # If the parent exists and is writable, assume it *could* be a directory
992 str_path: str = str(dir_path)
993 looks_like_it = str_path.endswith(os.sep) or str_path.endswith(
994 "/") or str_path.endswith("\\")
995 self.rogger.log_debug(
996 f"Looks_like_it: {looks_like_it}",
997 stream=CONST.RAW_STDOUT
998 )
999 return looks_like_it
1000
1001 def _open_file(self, file_path: Path) -> CONST.FileInfo:
1002 """Open or create the log file at `file_path` and return a populated `FileInfo`.
1003
1004 If `file_path` looks like a directory (as determined by `_looks_like_directory`)
1005 a timestamped filename is generated via `_create_log_path`. The parent directory
1006 is created if it does not exist. The file descriptor is opened outside the
1007 instance lock; the resulting `FileInfo` is populated atomically under the lock.
1008
1009 Arguments:
1010 file_path (Path): Destination path for the log file, or a directory.
1011
1012 Returns:
1013 A `CONST.FileInfo` with `path`, `descriptor`, and `written_bytes` populated.
1014 """
1015 _node: CONST.FileInfo = CONST.FileInfo()
1016 _node.path = Path(file_path)
1017 if self._looks_like_directory(_node.path):
1018 _node.path = self._create_log_path(base_override=_node.path)
1019 # Ensure parent directory exists before attempting to open the file.
1020 _node.path.parent.mkdir(parents=True, exist_ok=True)
1021
1022 # Snapshot whether we should open the descriptor under lock, then
1023 # perform the potentially blocking open() outside the lock.
1024 should_open = False
1026 should_open = bool(self._log_to_file)
1027
1028 descriptor = None
1029 if should_open:
1030 self.rogger.log_debug(
1031 "File is not open, attempting to open",
1032 stream=CONST.RAW_STDOUT
1033 )
1034 try:
1035 descriptor = open(
1036 _node.path,
1037 self._mode,
1038 encoding=self.encoding,
1039 newline="\n"
1040 )
1041 except (OSError, ValueError):
1042 self.rogger.log_error(
1043 f"Failed to open file: {_node.path}",
1044 stream=CONST.RAW_STDERR
1045 )
1046 # Opening failed; keep descriptor as None. Caller will handle.
1047 descriptor = None
1048
1049 # Assign descriptor under the lock to keep state updates atomic.
1051 _node.descriptor = descriptor
1052 if file_path.exists():
1053 _node.written_bytes = file_path.stat().st_size
1054 else:
1055 _node.written_bytes = 0
1056 self.rogger.log_debug(
1057 f"Written bytes: {_node.written_bytes}",
1058 stream=CONST.RAW_STDOUT
1059 )
1060 return _node
1061
1062 def _close_file_inner(self) -> bool:
1063 """Close the underlying file descriptor without acquiring locks.
1064
1065 This inner helper is used by `_close_file()`; it performs the
1066 actual descriptor close and clears the reference. It is safe to
1067 call when already closed. Errors during close are suppressed
1068 because this is a best-effort operation.
1069 """
1070 if self.filefile:
1071 self.rogger.log_debug(
1072 "File is open, closing",
1073 stream=CONST.RAW_STDOUT
1074 )
1075 descriptor = getattr(self.filefile, "descriptor", None)
1076 if descriptor:
1077 try:
1078 descriptor.close()
1079 except (OSError, ValueError):
1080 # ignore errors closing the descriptor (IO and value errors)
1081 pass
1082 # clear descriptor reference so subsequent checks see it's closed
1083 try:
1084 self.filefile.descriptor = None
1085 except AttributeError:
1086 # unlikely, but guard against missing attribute
1087 pass
1088 return True
1089
1090 def _close_file(self, *, lock: Optional[bool] = True) -> bool:
1091 """Close the current file descriptor, optionally acquiring a lock.
1092
1093 When `lock` is True the instance lock `self._file_lock` is
1094 acquired before closing the file. Returns True on completion.
1095 """
1096 # Snapshot and clear the descriptor under lock, then perform the
1097 # actual close outside the lock to avoid blocking other callers.
1098 descriptor = None
1099 if lock:
1101 if self.filefile:
1102 descriptor = getattr(self.filefile, "descriptor", None)
1103 try:
1104 self.filefile.descriptor = None
1105 except AttributeError:
1106 pass
1107 else:
1108 if self.filefile:
1109 descriptor = getattr(self.filefile, "descriptor", None)
1110
1111 if descriptor:
1112 try:
1113 descriptor.close()
1114 except (OSError, ValueError):
1115 # ignore close errors
1116 pass
1117 return True
1118
1119 def _flush_buffer(self) -> None:
1120 """Internal: detach pending buffer and write to disk.
1121
1122 Implements the swap-buffer pattern: capture and clear the in-memory
1123 buffer while holding the lock, perform I/O outside the lock, then
1124 update counters and rotate under lock.
1125 """
1126 # Swap-buffer pattern: capture and detach the in-memory buffer under
1127 # lock, then perform I/O outside the lock. If the file descriptor is
1128 # missing, perform the open outside the lock as well to avoid blocking
1129 # other callers.
1130 self.rogger.log_debug("Flushing buffer", stream=CONST.RAW_STDOUT)
1132 if not self._buffer:
1133 return
1134 to_write = self._buffer
1135 self._buffer = []
1136
1137 if not self._log_to_file:
1138 return
1139
1140 # Determine whether we need to open a descriptor. Do not perform
1141 # the actual open while holding the lock.
1142 needs_open = False
1143 descriptor = None
1144 if self.filefile:
1145 descriptor = getattr(self.filefile, "descriptor", None)
1146 if descriptor and not getattr(descriptor, "closed", False):
1147 needs_open = False
1148 else:
1149 needs_open = True
1150 else:
1151 needs_open = True
1152
1153 # If necessary, open the file outside the lock
1154 if needs_open:
1155 try:
1156 self.rogger.log_debug(
1157 f"_flush_buffer: needs_open=True, to_write_lines={len(to_write)}",
1158 stream=CONST.RAW_STDOUT
1159 )
1160 except (AttributeError, OSError, ValueError):
1161 pass
1162 try:
1163 log_path: Path = self._create_log_path()
1164 self.filefile = self._open_file(log_path)
1165 except (OSError, ValueError):
1166 # If we can't open the file, skip writing but keep buffer
1167 # detached (to avoid blocking writers). We'll attempt again
1168 # on the next flush.
1169 return
1170 try:
1171 self.rogger.log_debug(
1172 f"_flush_buffer: performing write, lines={len(to_write)}",
1173 stream=CONST.RAW_STDOUT
1174 )
1175 except (AttributeError, OSError, ValueError):
1176 pass
1177 # perform actual write outside the lock
1178 try:
1179 descriptor = None
1180 if self.filefile:
1181 descriptor = getattr(self.filefile, "descriptor", None)
1182 if descriptor and not getattr(descriptor, "closed", False):
1183 descriptor.writelines(to_write)
1184 descriptor.flush()
1185 except (ValueError, OSError):
1186 try:
1187 self.rogger.log_warning(
1188 "_flush_buffer: write failed, attempting reopen",
1189 stream=CONST.RAW_STDERR
1190 )
1191 except (AttributeError, OSError, ValueError):
1192 pass
1193 # try reopening and write again once
1194 try:
1195 log_path: Path = self._create_log_path()
1196 self.filefile = self._open_file(log_path)
1197 except (OSError, ValueError):
1198 return
1199 descriptor = None
1200 if self.filefile:
1201 descriptor = getattr(self.filefile, "descriptor", None)
1202 if descriptor and not getattr(descriptor, "closed", False):
1203 try:
1204 descriptor.writelines(to_write)
1205 descriptor.flush()
1206 except (ValueError, OSError):
1207 try:
1208 self.rogger.log_error(
1209 "_flush_buffer: retry write failed, giving up on this flush",
1210 stream=CONST.RAW_STDERR
1211 )
1212 except (AttributeError, OSError, ValueError):
1213 # give up on this flush attempt
1214 pass
1215 else:
1216 try:
1217 # account bytes written for logging purposes
1218 _bytes = 0
1219 for line in to_write:
1220 try:
1221 _bytes += len(line.encode(self.encoding))
1222 except LookupError:
1223 _bytes += len(line.encode('utf-8'))
1224 self.rogger.log_debug(
1225 f"_flush_buffer: write successful, approx_bytes={_bytes}",
1226 stream=CONST.RAW_STDOUT
1227 )
1228 except (AttributeError, OSError, ValueError):
1229 pass
1230 # update counters and rotate under lock
1232 if not self.filefile:
1233 return
1234 for line in to_write:
1235 try:
1236 self.filefile.written_bytes += len(line.encode(self.encoding))
1237 except LookupError:
1238 self.filefile.written_bytes += len(line.encode('utf-8'))
1239 # perform rotation if needed
1240 self._rotate_file()
int get_max_size(self, *, bool lock=True)
bool get_merge_stdin(self, *, bool lock=True)
Optional[CONST.Prefix] get_prefix(self, *, bool lock=True)
bool _close_file(self, *, Optional[bool] lock=True)
None set_folder_prefix(self, Optional[CONST.StdMode] folder_prefix, *, bool lock=True)
None set_encoding(self, str encoding, *, bool lock=True)
None _set_flush_size(self, int flush_size_kb)
str get_mode(self, *, bool lock=True)
None set_filepath(self, Optional[Union[str, Path, CONST.FileInfo]] file_path, *, bool lock=True)
None set_prefix(self, Optional[CONST.Prefix] prefix, *, bool lock=True)
Optional[CONST.FileInfo] get_filepath(self, *, bool lock=True)
None _set_mode(self, str mode, *, bool lock=True)
bool get_override(self, *, bool lock=True)
None set_override(self, bool override=False, *, bool lock=True)
None set_merge_stdin(self, bool merge_stdin, *, bool lock=True)
None set_max_size(self, int max_size_mb, *, bool lock=True)
"FileInstance" copy(self, *, bool lock=True)
None __init__(self, Optional[Union[str, Path, CONST.FileInfo]] file_path, Optional[bool] override=None, Optional[bool] merged=None, Optional[str] encoding=None, Optional[CONST.Prefix] prefix=None, *, Optional[int] max_size_mb=None, Optional[int] flush_size_kb=None, Optional[CONST.StdMode] folder_prefix=None, bool log_to_file=True, Optional[bool] merge_stdin=None)
None _set_folder_prefix(self, Optional[CONST.StdMode] folder_prefix)
None _set_prefix(self, Optional[CONST.Prefix] prefix)
None _update(self, Optional['FileInstance'] file_data)
CONST.FileInfo _open_file(self, Path file_path)
bool get_log_to_file(self, *, bool lock=True)
None update(self, Optional['FileInstance'] file_data, *, bool lock=True)
None set_log_to_file(self, bool log_to_file=True, *, bool lock=True)
str get_encoding(self, *, bool lock=True)
None _set_max_size(self, int max_size_mb)
bool get_merged(self, *, bool lock=True)
None _set_filepath_child(self, Union[str, Path, CONST.FileInfo] file_path)
None set_flush_size(self, int flush_size, *, bool lock=True)
None set_merged(self, bool merged, *, bool lock=True)
bool _looks_like_directory(self, Path dir_path)
Optional[CONST.StdMode] get_folder_prefix(self, *, bool lock=True)
Path _create_log_path(self, Optional[Path] base_override=None)
int get_flush_size(self, *, bool lock=True)