Rotary Logger  1.0.2
The middleware rotary logger
Loading...
Searching...
No Matches
tee_stream.py
Go to the documentation of this file.
1"""
2# +==== BEGIN rotary_logger =================+
3# LOGO:
4# ..........####...####..........
5# ......###.....#.#########......
6# ....##........#.###########....
7# ...#..........#.############...
8# ...#..........#.#####.######...
9# ..#.....##....#.###..#...####..
10# .#.....#.##...#.##..##########.
11# #.....##########....##...######
12# #.....#...##..#.##..####.######
13# .#...##....##.#.##..###..#####.
14# ..#.##......#.#.####...######..
15# ..#...........#.#############..
16# ..#...........#.#############..
17# ...##.........#.############...
18# ......#.......#.#########......
19# .......#......#.########.......
20# .........#####...#####.........
21# /STOP
22# PROJECT: rotary_logger
23# FILE: tee_stream.py
24# CREATION DATE: 29-10-2025
25# LAST Modified: 10:46:20 27-03-2026
26# DESCRIPTION:
27# A module that provides a universal python light on iops way of logging to files your program execution.
28# /STOP
29# COPYRIGHT: (c) Asperguide
30# PURPOSE: The file containing the code that is actually doing the heavy lifting, this is the file that takes a stream, logs it to a file while outputting it to the terminal.
31# // AR
32# +==== END rotary_logger =================+
33"""
34
35import sys
36from pathlib import Path
37from typing import TextIO, Optional, Union, BinaryIO, List, Any
38from threading import RLock
39
40try:
41 from . import constants as CONST
42 from .file_instance import FileInstance
43 from .rogger import Rogger, RI
44except ImportError:
45 import constants as CONST
46 from file_instance import FileInstance
47 from rogger import Rogger, RI
48
49
51 """Mirror a TextIO stream to disk while preserving normal output.
52
53 This class is intentionally lightweight: it captures short-lived
54 references under a tiny lock and then performs I/O without holding
55 that lock. Terminal writes are performed on the caller thread and
56 are wrapped to avoid raising unexpected errors back into the
57 application; disk writes are delegated to a `FileInstance` which
58 buffers and handles rotation.
59 """
60
62 self,
63 root: Union[str, Path, FileInstance],
64 original_stream: TextIO,
65 *,
66 max_size_mb: Optional[int] = None,
67 flush_size: Optional[int] = None,
68 mode: CONST.StdMode = CONST.StdMode.STDUNKNOWN,
69 error_mode: CONST.ErrorMode = CONST.ErrorMode.WARN_NO_PIPE,
70 encoding: Optional[str] = None,
71 log_to_file: bool = True,
72 log_function_calls: bool = False
73 ):
74 """Initialise a new TeeStream.
75
76 Mirrors original_stream to disk via a FileInstance while forwarding
77 every write transparently to the caller.
78
79 Arguments:
80 root (Union[str, Path, FileInstance]): Path, string, or FileInstance describing the log destination.
81 original_stream (TextIO): The TextIO stream to mirror (usually sys.stdout).
82
83 Keyword Arguments:
84 max_size_mb (Optional[int]): Optional maximum log-file size in MB; forwarded to FileInstance. Default: None
85 flush_size (Optional[int]): Optional buffer-flush threshold in bytes; forwarded to FileInstance. Default: None
86 mode (CONST.StdMode): Which standard stream this instance wraps. Default: CONST.StdMode.STDUNKNOWN
87 error_mode (CONST.ErrorMode): Broken-pipe handling policy. Default: CONST.ErrorMode.WARN_NO_PIPE
88 encoding (Optional[str]): Optional file-encoding override; forwarded to FileInstance. Default: None
89 log_to_file (bool): Whether disk logging is enabled on construction. Default: True
90
91 Raises:
92 ValueError: If root is not a str, Path, or FileInstance.
93 """
94
95 self._file_lock: RLock = RLock()
96 if isinstance(root, (Path, str)):
97 self.file_instance = FileInstance(Path(root))
98 elif isinstance(root, FileInstance):
99 self.file_instance = root
100 else:
101 raise ValueError(
102 f"{CONST.MODULE_NAME} Unsupported type for the file {type(root)}"
103 )
104 if max_size_mb:
105 self.file_instance.set_max_size(max_size_mb)
106 if flush_size:
107 self.file_instance.set_flush_size(flush_size)
108 if encoding:
109 self.file_instance.set_encoding(encoding)
110 self.file_instance.set_log_to_file(log_to_file)
111 self.stream_mode: CONST.StdMode = mode
112 self.original_stream: TextIO = original_stream
113 self.error_mode: CONST.ErrorMode = error_mode
114 self.stream_not_present: AttributeError = AttributeError(
115 f"{CONST.MODULE_NAME} No stream available"
116 )
117 self.function_callsfunction_calls = log_function_calls
118 self.rogger: Rogger = RI
119 # Log TeeStream creation
120 try:
121 self.rogger.log_debug(
122 f"TeeStream initialized (mode={self.stream_mode}, log_to_file={log_to_file})",
123 stream=CONST.RAW_STDOUT
124 )
125 except (AttributeError, OSError, ValueError):
126 # Never allow logging to break stream setup
127 pass
128
129 def __del__(self):
130 """Best-effort cleanup on object deletion.
131
132 Attempts to flush buffered data but never raises. Note that __del__
133 may not be called at interpreter shutdown; callers that need
134 deterministic flushing should invoke flush() explicitly before
135 releasing the object. OSError and ValueError are intentionally
136 swallowed so that interpreter-shutdown teardown cannot trigger
137 unexpected tracebacks.
138 """
139 try:
140 self.flush()
141 except (OSError, ValueError):
142 # best-effort flush; ignore expected I/O errors during interpreter shutdown
143 pass
144 # avoid deleting attributes in __del__; simply drop the reference
145 self.file_instance = None
146
147 def _get_correct_prefix(self, function_call: CONST.PrefixFunctionCall = CONST.PrefixFunctionCall.EMPTY) -> str:
148 """Return the correct prefix string for the configured StdMode.
149
150 The returned string already contains a trailing space when non-empty so
151 that callers can concatenate it with the message directly. Returns an
152 empty string when logging to file is disabled, when no FileInstance is
153 set, or when the Prefix configuration has no flags enabled.
154
155 Returns:
156 The prefix string (with trailing space) matching the active StdMode,
157 or an empty string.
158 """
159 # ensure we have a file instance and a valid StdMode enum value
160 _file_inst: Optional[FileInstance] = None
161 _prefix: Optional[CONST.Prefix] = None
162 _mode: Optional[CONST.StdMode] = None
163 # Use object.__getattribute__ to safely access lock and prevent recursion
164 try:
165 file_lock = object.__getattribute__(self, '_file_lock')
166 file_inst = object.__getattribute__(self, 'file_instance')
167 stream_mode = object.__getattribute__(self, 'stream_mode')
168 except AttributeError:
169 return ""
170
171 with file_lock:
172 if not file_inst:
173 return ""
174 _file_inst = file_inst
175 if not isinstance(stream_mode, CONST.StdMode):
176 return ""
177 _mode = stream_mode
178
179 if _file_inst is None:
180 return ""
181
182 # Fast-path: if logging to file is disabled, skip prefix work.
183 try:
184 if not _file_inst.get_log_to_file():
185 return ""
186 except (OSError, ValueError, AttributeError):
187 # Defensive: don't allow file-side errors to break stdout/stderr
188 return ""
189
190 try:
191 _prefix = _file_inst.get_prefix()
192 except (OSError, ValueError, AttributeError):
193 # Defensive: if FileInstance misbehaves, return no prefix
194 return ""
195
196 _final_prefix: str = ""
197 if not _prefix:
198 _final_prefix = ""
199 elif _prefix.std_err and _mode == CONST.StdMode.STDERR:
200 _final_prefix = CONST.CORRECT_PREFIX[CONST.StdMode.STDERR]
201 elif _prefix.std_in and _mode == CONST.StdMode.STDIN:
202 _final_prefix = CONST.CORRECT_PREFIX[CONST.StdMode.STDIN]
203 elif _prefix.std_out and _mode == CONST.StdMode.STDOUT:
204 _final_prefix = CONST.CORRECT_PREFIX[CONST.StdMode.STDOUT]
205 elif _prefix.std_in or _prefix.std_err or _prefix.std_out:
206 _final_prefix = CONST.CORRECT_PREFIX[CONST.StdMode.STDUNKNOWN]
207 else:
208 _final_prefix = ""
209 if self.function_callsfunction_calls and function_call != CONST.PrefixFunctionCall.EMPTY:
210 _final_prefix = f"{_final_prefix}{function_call.value}"
211 if _final_prefix != "":
212 _final_prefix = f"{_final_prefix}{CONST.SPACE}"
213 return _final_prefix
214
215 def _write_to_log(self, data: Union[str, List[str]], function_call: CONST.PrefixFunctionCall) -> None:
216 """Write data to the log file if file logging is enabled.
217
218 Shared helper used by write(), writelines(), read(), readline() and
219 readlines() to avoid duplicating the snapshot-check-prefix-write
220 sequence in every method.
221
222 Arguments:
223 data (str): The string to append to the log file.
224 function_call (CONST.PrefixFunctionCall): Context passed to
225 _get_correct_prefix() to select the right prefix.
226 """
227 _file_instance: Optional[FileInstance] = None
228 # Use object.__getattribute__ to safely access internal state and prevent recursion
229 try:
230 file_lock = object.__getattribute__(self, '_file_lock')
231 file_inst = object.__getattribute__(self, 'file_instance')
232 except AttributeError:
233 return
234
235 with file_lock:
236 if file_inst:
237 _file_instance = file_inst
238 if not _file_instance:
239 return
240 try:
241 if not _file_instance.get_log_to_file():
242 return
243 except (OSError, ValueError, AttributeError):
244 return
245 try:
246 _prefix: str = self._get_correct_prefix(
247 function_call=function_call)
248 except (OSError, ValueError, AttributeError):
249 _prefix = ""
250 try:
251 if isinstance(data, list):
252 for i in data:
253 _file_instance.write(f"{_prefix}{i}")
254 else:
255 _file_instance.write(f"{_prefix}{data}")
256 except (OSError, ValueError):
257 try:
258 err_msg = f"{CONST.MODULE_NAME} Error writing to log file"
259 self.rogger.log_error(err_msg, stream=CONST.RAW_STDERR)
260 sys.stderr.write(f"{err_msg}\n")
261 except OSError:
262 pass
263
264 def _get_stream_if_present(self) -> TextIO:
265 """Return the underlying stream, raising if it has been cleared.
266
267 Used as a guard by every delegating method so that operations on an
268 uninitialised or already-destroyed TeeStream fail predictably rather
269 than with an obscure AttributeError.
270
271 Raises:
272 AttributeError: If original_stream is None.
273
274 Returns:
275 The original TextIO stream passed at construction time.
276 """
277 # Use object.__getattribute__ to bypass __getattr__ and avoid
278 # infinite recursion when __getattr__ itself calls this method
279 stream = object.__getattribute__(self, 'original_stream')
280 if stream is not None:
281 return stream
282 raise object.__getattribute__(self, 'stream_not_present')
283
284 def write(self, message: str) -> None:
285 """Write message to the original stream and buffer it to the log file.
286
287 Thread-safe: the FileInstance reference is captured under a lock before
288 any I/O is performed. The terminal write is carried out on the caller
289 thread with explicit BrokenPipeError / OSError handling; disk writes
290 are delegated to FileInstance.write().
291
292 Arguments:
293 message (str): The string to write.
294 """
295 _tmp_message: str = message
296
297 try:
298 # Always attempt to write to the original stream
299 self.original_stream.write(_tmp_message)
300 except BrokenPipeError:
301 if self.error_mode in (CONST.ErrorMode.EXIT, CONST.ErrorMode.EXIT_NO_PIPE):
302 sys.exit(CONST.ERROR)
303 elif self.error_mode in (CONST.ErrorMode.WARN, CONST.ErrorMode.WARN_NO_PIPE):
304 try:
305 self.rogger.log_error(
306 CONST.BROKEN_PIPE_ERROR,
307 stream=CONST.RAW_STDERR
308 )
309 sys.stderr.write(f"{CONST.BROKEN_PIPE_ERROR}\n")
310 except OSError:
311 pass
312 except OSError as exc:
313 # Unexpected I/O error writing to original stream: report and continue
314 try:
315 err_msg = f"{CONST.MODULE_NAME} I/O error writing to original stream: {exc}"
316 self.rogger.log_error(err_msg, stream=CONST.RAW_STDERR)
317 sys.stderr.write(f"{err_msg}\n")
318 except OSError:
319 # swallow any errors writing to stderr during shutdown
320 pass
321
322 self._write_to_log(_tmp_message, CONST.PrefixFunctionCall.WRITE)
323 try:
324 # Debug log about the write operation (non-intrusive)
325 self.rogger.log_debug(
326 f"write: forwarded {len(_tmp_message)} chars to original stream (mode={self.stream_mode})",
327 stream=CONST.RAW_STDOUT
328 )
329 except (AttributeError, OSError, ValueError):
330 pass
331
332 def writelines(self, lines: List[str]) -> None:
333 """Write a list of strings to the original stream and buffer them to the log file.
334
335 Thread-safe: behaves like write() but accepts a sequence of strings,
336 forwarding the entire sequence to the original stream via writelines()
337 and then writing each element individually to FileInstance with the
338 appropriate prefix.
339
340 Arguments:
341 lines (List[str]): The sequence of strings to write.
342 """
343 _tmp_message: List[str] = lines.copy()
344 try:
345 # Always attempt to write to the original stream
346 self.original_stream.writelines(_tmp_message)
347 except BrokenPipeError:
348 if self.error_mode in (CONST.ErrorMode.EXIT, CONST.ErrorMode.EXIT_NO_PIPE):
349 sys.exit(CONST.ERROR)
350 elif self.error_mode in (CONST.ErrorMode.WARN, CONST.ErrorMode.WARN_NO_PIPE):
351 try:
352 self.rogger.log_error(
353 CONST.BROKEN_PIPE_ERROR,
354 stream=CONST.RAW_STDERR
355 )
356 sys.stderr.write(f"{CONST.BROKEN_PIPE_ERROR}\n")
357 except OSError:
358 pass
359 except OSError as exc:
360 # Unexpected I/O error writing to original stream: report and continue
361 try:
362 err_msg = f"{CONST.MODULE_NAME} I/O error writing to original stream: {exc}"
363 self.rogger.log_error(err_msg, stream=CONST.RAW_STDERR)
364 sys.stderr.writelines(f"{err_msg}\n")
365 except OSError:
366 # swallow any errors writing to stderr during shutdown
367 pass
368 self._write_to_log(_tmp_message, CONST.PrefixFunctionCall.WRITELINES)
369 try:
370 total = 0
371 for l in _tmp_message:
372 total += len(l)
373 self.rogger.log_debug(
374 f"writelines: forwarded {total} chars across {len(_tmp_message)} items (mode={self.stream_mode})",
375 stream=CONST.RAW_STDOUT
376 )
377 except (AttributeError, OSError, ValueError):
378 pass
379
380 def read(self, size: int = -1) -> str:
381 """Read and return up to size characters from the original stream.
382
383 Keyword Arguments:
384 size (int): Maximum number of characters to read; -1 reads until EOF. Default: -1
385
386 Raises:
387 AttributeError: If the original stream is not set.
388
389 Returns:
390 The characters read.
391 """
392
393 # Always attempt to read from the original stream first so that we don't lose data if the stream is interactive and the file instance is misconfigured
394 data = self._get_stream_if_present().read(size)
395 self._write_to_log(data, CONST.PrefixFunctionCall.READ)
396 try:
397 self.rogger.log_debug(
398 f"read: read {len(data)} chars from original stream (mode={self.stream_mode})",
399 stream=CONST.RAW_STDOUT
400 )
401 except (AttributeError, OSError, ValueError):
402 pass
403 return data
404
405 def readline(self, size: int = -1) -> str:
406 """Read and return one line from the original stream.
407
408 Keyword Arguments:
409 size (int): If non-negative, at most size characters are read; -1 reads until a newline or EOF. Default: -1
410
411 Raises:
412 AttributeError: If the original stream is not set.
413
414 Returns:
415 The line read, including the trailing newline if present.
416 """
417 data = self._get_stream_if_present().readline(size)
418 self._write_to_log(data, CONST.PrefixFunctionCall.READLINE)
419 try:
420 self.rogger.log_debug(
421 f"readline: read {len(data)} chars from original stream (mode={self.stream_mode})",
422 stream=CONST.RAW_STDOUT
423 )
424 except (AttributeError, OSError, ValueError):
425 pass
426 return data
427
428 def readlines(self, hint: int = -1) -> list[str]:
429 """Read and return a list of lines from the original stream.
430
431 Keyword Arguments:
432 hint (int): If non-negative, approximately hint bytes or characters are read; -1 reads until EOF. Default: -1
433
434 Raises:
435 AttributeError: If the original stream is not set.
436
437 Returns:
438 List of lines read from the stream.
439 """
440 data = self._get_stream_if_present().readlines(hint)
441 self._write_to_log(data, CONST.PrefixFunctionCall.READLINES)
442 try:
443 total = 0
444 for d in data:
445 total += len(d)
446 self.rogger.log_debug(
447 f"readlines: read {len(data)} lines, {total} chars (mode={self.stream_mode})",
448 stream=CONST.RAW_STDOUT
449 )
450 except (AttributeError, OSError, ValueError):
451 pass
452 return data
453
454 def flush(self) -> None:
455 """Best-effort flush of terminal and buffered file output.
456
457 Attempts to flush both the original stream and the associated
458 FileInstance. All OSError and ValueError exceptions are swallowed to
459 avoid crashing the caller. For stricter guarantees call
460 FileInstance.flush() directly. This method is also called by __del__
461 during object teardown.
462 """
463
464 # Use object.__getattribute__ to safely access state and prevent recursion
465 original_stream = None
466 try:
467 original_stream = object.__getattribute__(self, 'original_stream')
468 rogger = object.__getattribute__(self, 'rogger')
469 stream_mode = object.__getattribute__(self, 'stream_mode')
470 except AttributeError:
471 return
472
473 # Flush the original stream first (best-effort)
474 try:
475 rogger.log_debug(
476 f"Flushing TeeStream (mode={stream_mode})",
477 stream=CONST.RAW_STDOUT
478 )
479 except (AttributeError, OSError, ValueError):
480 pass
481 if original_stream and not original_stream.closed:
482 try:
483 original_stream.flush()
484 except OSError as exc:
485 try:
486 err_msg = f"{CONST.MODULE_NAME} I/O error flushing original stream: {exc}"
487 rogger.log_error(err_msg, stream=CONST.RAW_STDERR)
488 sys.stderr.write(f"{err_msg}")
489 except OSError:
490 pass
491
492 _file_instance: Optional[FileInstance] = None
493 # Snapshot the file instance under lock
494 try:
495 file_lock = object.__getattribute__(self, '_file_lock')
496 file_inst = object.__getattribute__(self, 'file_instance')
497 except AttributeError:
498 return
499
500 with file_lock:
501 if file_inst:
502 _file_instance = file_inst
503
504 if _file_instance:
505 try:
506 if not _file_instance.get_log_to_file():
507 return
508 except (OSError, ValueError, AttributeError):
509 # If the check fails, continue and attempt a flush anyway.
510 pass
511
512 try:
514 try:
515 _prefix: str = self._get_correct_prefix(
516 function_call=CONST.PrefixFunctionCall.FLUSH
517 )
518 except (OSError, ValueError, AttributeError):
519 _prefix: str = ""
520 _file_instance.write(_prefix)
521 _file_instance.flush()
522 try:
523 self.rogger.log_debug(
524 f"Flushed file_instance for mode={self.stream_mode}",
525 stream=CONST.RAW_STDOUT
526 )
527 except (AttributeError, OSError, ValueError):
528 pass
529 except (OSError, ValueError):
530 try:
531 self.rogger.log_warning(
532 f"TeeStream flush encountered I/O error for mode={self.stream_mode}",
533 stream=CONST.RAW_STDERR
534 )
535 except (AttributeError, OSError, ValueError):
536 pass
537 # don't let file flush failures propagate from a best-effort flush
538
539 # sys.TextIO rebinds so the redirection is transparent to the caller
540
541 @property
542 def buffer(self) -> BinaryIO:
543 """Return the underlying binary buffer of the original stream.
544
545 Raises:
546 AttributeError: If the original stream is not set.
547
548 Returns:
549 The binary buffer exposed by the wrapped TextIO.
550 """
551 data = self._get_stream_if_present().buffer
552 return data
553
554 @property
555 def closed(self) -> bool:
556 """Return whether the original stream is closed.
557
558 Raises:
559 AttributeError: If the original stream is not set.
560
561 Returns:
562 True if the wrapped stream has been closed, False otherwise.
563 """
564 data = self._get_stream_if_present().closed
565 self.rogger.log_debug(f"Closed: {data}", stream=CONST.RAW_STDOUT)
566 return data
567
568 @property
569 def errors(self) -> Optional[str]:
570 """Return the error-handling mode of the original stream.
571
572 Raises:
573 AttributeError: If the original stream is not set.
574
575 Returns:
576 The error-handling mode string (e.g. 'strict'), or None.
577 """
578 data = self._get_stream_if_present().errors
579 self.rogger.log_debug(f"Errors: {data}", stream=CONST.RAW_STDOUT)
580 return data
581
582 @property
583 def encoding(self) -> str:
584 """Return the character encoding of the original stream.
585
586 Raises:
587 AttributeError: If the original stream is not set.
588
589 Returns:
590 The encoding string (e.g. 'utf-8').
591 """
592 data = self._get_stream_if_present().encoding
593 self.rogger.log_debug(f"Encoding: {data}", stream=CONST.RAW_STDOUT)
594 return data
595
596 @property
597 def line_buffering(self) -> int:
598 """Return whether line buffering is enabled on the original stream.
599
600 Raises:
601 AttributeError: If the original stream is not set.
602
603 Returns:
604 Non-zero if line buffering is active, zero otherwise.
605 """
606 data = self._get_stream_if_present().line_buffering
607 self.rogger.log_debug(
608 f"Line buffering: {data}",
609 stream=CONST.RAW_STDOUT
610 )
611 return data
612
613 @property
614 def mode(self) -> str:
615 """Return the file-mode string of the original stream.
616
617 Raises:
618 AttributeError: If the original stream is not set.
619
620 Returns:
621 The mode string (e.g. 'w', 'r').
622 """
623 data = self._get_stream_if_present().mode
624 self.rogger.log_debug(f"Mode: {data}", stream=CONST.RAW_STDOUT)
625 return data
626
627 @property
628 def name(self) -> Union[str, Any]:
629 """Return the name of the original stream.
630
631 Raises:
632 AttributeError: If the original stream is not set.
633
634 Returns:
635 The stream name; typically a file path string or an integer file
636 descriptor for standard streams.
637 """
638 data = self._get_stream_if_present().name
639 self.rogger.log_debug(f"Name: {data}", stream=CONST.RAW_STDOUT)
640 return data
641
642 @property
643 def newlines(self) -> Any:
644 """Return the newline translation used by the original stream.
645
646 Raises:
647 AttributeError: If the original stream is not set.
648
649 Returns:
650 A string, tuple of strings, or None as described by the standard
651 io.TextIOBase.newlines attribute.
652 """
653 data = self._get_stream_if_present().newlines
654 self.rogger.log_debug(f"Newlines: {data}", stream=CONST.RAW_STDOUT)
655 return data
656
657 def close(self) -> None:
658 """Flush pending data and close the original stream.
659
660 Calls flush() before delegating to the original stream's close()
661 method, ensuring buffered log data is written before the stream is
662 released.
663
664 Raises:
665 AttributeError: If the original stream is not set.
666 """
667 try:
668 self.rogger.log_info(
669 f"Closing TeeStream (mode={self.stream_mode})",
670 stream=CONST.RAW_STDOUT
671 )
672 except (AttributeError, OSError, ValueError):
673 pass
674 self.flush()
676
677 def fileno(self) -> int:
678 """Return the underlying file descriptor of the original stream.
679
680 Raises:
681 AttributeError: If the original stream is not set.
682 io.UnsupportedOperation: If the stream has no file descriptor.
683
684 Returns:
685 Integer file descriptor.
686 """
687 data = self._get_stream_if_present().fileno()
688 self.rogger.log_debug(f"Fileno: {data}", stream=CONST.RAW_STDOUT)
689 return data
690
691 def isatty(self) -> bool:
692 """Return whether the original stream is connected to a TTY device.
693
694 Raises:
695 AttributeError: If the original stream is not set.
696
697 Returns:
698 True if the stream is interactive (TTY), False otherwise.
699 """
700 data = self._get_stream_if_present().isatty()
701 self.rogger.log_debug(f"IsATTY: {data}", stream=CONST.RAW_STDOUT)
702 return data
703
704 def readable(self) -> bool:
705 """Return whether the original stream supports reading.
706
707 Raises:
708 AttributeError: If the original stream is not set.
709
710 Returns:
711 True if the stream can be read from, False otherwise.
712 """
713 data = self._get_stream_if_present().readable()
714 self.rogger.log_debug(f"Readable: {data}", stream=CONST.RAW_STDOUT)
715 return data
716
717 def seek(self, offset: int, whence: int = 0) -> int:
718 """Move the stream position to the given byte offset.
719
720 Arguments:
721 offset (int): Number of bytes to move the position.
722
723 Keyword Arguments:
724 whence (int): Reference point: 0 = start, 1 = current, 2 = end. Default: 0
725
726 Raises:
727 AttributeError: If the original stream is not set.
728
729 Returns:
730 The new absolute stream position.
731 """
732 data = self._get_stream_if_present().seek(offset, whence)
733 self.rogger.log_debug(
734 f"Seek(offset={offset},whence={whence}): {data}",
735 stream=CONST.RAW_STDOUT
736 )
737 return data
738
739 def seekable(self) -> bool:
740 """Return whether the original stream supports random-access seeking.
741
742 Raises:
743 AttributeError: If the original stream is not set.
744
745 Returns:
746 True if the stream supports seek() and tell(), False otherwise.
747 """
748 data = self._get_stream_if_present().seekable()
749 self.rogger.log_debug(f"Seekable: {data}", stream=CONST.RAW_STDOUT)
750 return data
751
752 def tell(self) -> int:
753 """Return the current stream position of the original stream.
754
755 Raises:
756 AttributeError: If the original stream is not set.
757
758 Returns:
759 The current byte offset within the stream.
760 """
761 data = self._get_stream_if_present().tell()
762 self.rogger.log_debug(f"Tell: {data}", stream=CONST.RAW_STDOUT)
763 return data
764
765 def truncate(self, size: Optional[int] = None) -> int:
766 """Truncate the original stream to at most size bytes.
767
768 Keyword Arguments:
769 size (Optional[int]): Desired size in bytes; defaults to the current stream position. Default: None
770
771 Raises:
772 AttributeError: If the original stream is not set.
773
774 Returns:
775 The new file size in bytes.
776 """
777 data = self._get_stream_if_present().truncate(size)
778 self.rogger.log_debug(
779 f"Truncate(size={size}): {data}",
780 stream=CONST.RAW_STDOUT
781 )
782 return data
783
784 def writable(self) -> bool:
785 """Return whether the original stream supports writing.
786
787 Raises:
788 AttributeError: If the original stream is not set.
789
790 Returns:
791 True if the stream can be written to, False otherwise.
792 """
793 data = self._get_stream_if_present().writable()
794 self.rogger.log_debug(f"Writable: {data}", stream=CONST.RAW_STDOUT)
795 return data
796
797 def __enter__(self) -> TextIO:
798 """Enter the runtime context for the original stream.
799
800 Delegates to the wrapped stream's __enter__() so that TeeStream can be
801 used as a context manager wherever a plain TextIO is expected.
802
803 Raises:
804 AttributeError: If the original stream is not set.
805
806 Returns:
807 The original stream as returned by its own __enter__().
808 """
809 data = self._get_stream_if_present().__enter__()
810 return data
811
812 def __exit__(self, exc_type, exc_val, exc_tb) -> Optional[bool]:
813 """Exit the runtime context and delegate to the original stream.
814
815 Delegates to the wrapped stream's __exit__() so that TeeStream can be
816 used as a context manager wherever a plain TextIO is expected.
817
818 Arguments:
819 exc_type (type): The exception type, or None if no exception.
820 exc_val (BaseException): The exception instance, or None.
821 exc_tb (traceback): The traceback, or None.
822
823 Raises:
824 AttributeError: If the original stream is not set.
825
826 Returns:
827 The return value of the original stream's __exit__(); True suppresses
828 the exception, False or None propagates it.
829 """
830 data = self._get_stream_if_present().__exit__(exc_type, exc_val, exc_tb)
831 self.rogger.log_debug(
832 f"__exit__(exc_type={exc_type},exc_val={exc_val},exc_tb={exc_tb}): {data}",
833 stream=CONST.RAW_STDOUT
834 )
835 return data
836
837 def __iter__(self):
838 """Return an iterator over the stream (line-by-line).
839
840 This makes `for line in tee_stream:` work like the underlying
841 TextIO object. Iteration yields lines as produced by `readline()`.
842 """
843 return self
844
845 def __next__(self):
846 """Return the next line from the wrapped stream, logging it.
847
848 Prefer delegating to the underlying stream's `__next__` if it
849 exists (and then log the returned line). Otherwise fall back to
850 `readline()` which already takes care of logging. Raise
851 `StopIteration` when the stream is exhausted.
852 """
853 stream = self._get_stream_if_present()
854 next_method = getattr(stream, "__next__", None)
855 if callable(next_method):
856 # Delegate to underlying iterator protocol and log the line
857 line = next_method()
858 try:
859 self._write_to_log(
860 str(line),
861 CONST.PrefixFunctionCall.READLINE
862 )
863 except (OSError, ValueError, AttributeError):
864 pass
865 return line
866
867 # Fallback: use readline() which already logs
868 line = self.readline()
869 if line == "":
870 raise StopIteration
871 return line
872
873 def detach(self):
874 """Detach and return the underlying binary buffer if supported.
875
876 Delegates to the wrapped stream's `detach()` when present. If the
877 wrapped stream does not support `detach()` an AttributeError is
878 raised to mirror standard behavior.
879 """
880 stream = self._get_stream_if_present()
881 detach_method = getattr(stream, "detach", None)
882 if callable(detach_method):
883 return detach_method()
884 raise AttributeError(
885 f"{CONST.MODULE_NAME} underlying stream has no detach() method")
886
887 def __getattr__(self, name: str):
888 """Delegate attribute access to the wrapped stream for missing attrs.
889
890 This helps the TeeStream more closely mimic the wrapped TextIO
891 object without having to manually proxy every attribute.
892 """
893 # _get_stream_if_present will raise the same AttributeError we
894 # want when the underlying stream is missing.
895 stream = self._get_stream_if_present()
896 return getattr(stream, name)
str _get_correct_prefix(self, CONST.PrefixFunctionCall function_call=CONST.PrefixFunctionCall.EMPTY)
str read(self, int size=-1)
None write(self, str message)
None _write_to_log(self, Union[str, List[str]] data, CONST.PrefixFunctionCall function_call)
int truncate(self, Optional[int] size=None)
list[str] readlines(self, int hint=-1)
__init__(self, Union[str, Path, FileInstance] root, TextIO original_stream, *, Optional[int] max_size_mb=None, Optional[int] flush_size=None, CONST.StdMode mode=CONST.StdMode.STDUNKNOWN, CONST.ErrorMode error_mode=CONST.ErrorMode.WARN_NO_PIPE, Optional[str] encoding=None, bool log_to_file=True, bool log_function_calls=False)
Definition tee_stream.py:73
str readline(self, int size=-1)
None writelines(self, List[str] lines)
Optional[bool] __exit__(self, exc_type, exc_val, exc_tb)
int seek(self, int offset, int whence=0)