killian31
initial commit
8b09391
import contextlib
import io
import re
import sys
import threading
import streamlit as st
class _Redirect:
class IOStuff(io.StringIO):
def __init__(
self, trigger, max_buffer, buffer_separator, regex, dup, need_dup, on_thread
):
super().__init__()
self._trigger = trigger
self._max_buffer = max_buffer
self._buffer_separator = buffer_separator
self._regex = regex and re.compile(regex)
self._dup = dup
self._need_dup = need_dup
self._on_thread = on_thread
def write(self, __s: str) -> int:
res = None
if self._on_thread == threading.get_ident():
if self._max_buffer:
concatenated_len = super().tell() + len(__s)
if concatenated_len > self._max_buffer:
rest = self.get_filtered_output()[
concatenated_len - self._max_buffer :
]
if self._buffer_separator is not None:
rest = rest.split(self._buffer_separator, 1)[-1]
super().seek(0)
super().write(rest)
super().truncate(super().tell() + len(__s))
res = super().write(__s)
self._trigger(self.get_filtered_output())
if self._on_thread != threading.get_ident() or self._need_dup:
self._dup.write(__s)
return res
def get_filtered_output(self):
if self._regex is None or self._buffer_separator is None:
return self.getvalue()
return self._buffer_separator.join(
filter(
self._regex.search, self.getvalue().split(self._buffer_separator)
)
)
def print_at_end(self):
self._trigger(self.get_filtered_output())
def __init__(
self,
stdout=None,
stderr=False,
format=None,
to=None,
max_buffer=None,
buffer_separator="\n",
regex=None,
duplicate_out=False,
):
self.io_args = {
"trigger": self._write,
"max_buffer": max_buffer,
"buffer_separator": buffer_separator,
"regex": regex,
"on_thread": threading.get_ident(),
}
self.redirections = []
self.st = None
self.stderr = stderr is True
self.stdout = stdout is True or (stdout is None and not self.stderr)
self.format = format or "code"
self.to = to
self.fun = None
self.duplicate_out = duplicate_out or None
self.active_nested = None
if not self.stdout and not self.stderr:
raise ValueError("one of stdout or stderr must be True")
if self.format not in ["text", "markdown", "latex", "code", "write"]:
raise ValueError(
f"format need oneof the following: {', '.join(['text', 'markdown', 'latex', 'code', 'write'])}"
)
if self.to and (not hasattr(self.to, "text") or not hasattr(self.to, "empty")):
raise ValueError(f"'to' is not a streamlit container object")
def __enter__(self):
if self.st is not None:
if self.to is None:
if self.active_nested is None:
self.active_nested = self(
format=self.format,
max_buffer=self.io_args["max_buffer"],
buffer_separator=self.io_args["buffer_separator"],
regex=self.io_args["regex"],
duplicate_out=self.duplicate_out,
)
return self.active_nested.__enter__()
else:
raise Exception("Already entered")
to = self.to or st
to.text("Logs:")
self.st = to.empty()
self.fun = getattr(self.st, self.format)
io_obj = None
def redirect(to_duplicate, context_redirect):
nonlocal io_obj
io_obj = _Redirect.IOStuff(
need_dup=self.duplicate_out and True, dup=to_duplicate, **self.io_args
)
redirection = context_redirect(io_obj)
self.redirections.append((redirection, io_obj))
redirection.__enter__()
if self.stderr:
redirect(sys.stderr, contextlib.redirect_stderr)
if self.stdout:
redirect(sys.stdout, contextlib.redirect_stdout)
return io_obj
def __call__(
self,
to=None,
format=None,
max_buffer=None,
buffer_separator="\n",
regex=None,
duplicate_out=False,
):
return _Redirect(
self.stdout,
self.stderr,
format=format,
to=to,
max_buffer=max_buffer,
buffer_separator=buffer_separator,
regex=regex,
duplicate_out=duplicate_out,
)
def __exit__(self, *exc):
if self.active_nested is not None:
nested = self.active_nested
if nested.active_nested is None:
self.active_nested = None
return nested.__exit__(*exc)
res = None
for redirection, io_obj in reversed(self.redirections):
res = redirection.__exit__(*exc)
io_obj.print_at_end()
self.redirections = []
self.st = None
self.fun = None
return res
def _write(self, data):
self.fun(data)
stdout = _Redirect()
stderr = _Redirect(stderr=True)
stdouterr = _Redirect(stdout=True, stderr=True)
"""
# can be used as
import time
import sys
from random import getrandbits
import streamlit.redirect as rd
st.text('Suboutput:')
so = st.empty()
with rd.stdout, rd.stderr(format='markdown', to=st.sidebar):
print("hello ")
time.sleep(1)
i = 5
while i > 0:
print("**M**izu? ", file=sys.stdout if getrandbits(1) else sys.stderr)
i -= 1
with rd.stdout(to=so):
print(f" cica {i}")
if i:
time.sleep(1)
# """