Spaces:
Running
Running
# Standard Library Imports | |
import datetime | |
import threading | |
import time | |
import traceback | |
from pathlib import Path | |
import enum | |
# Third-Party Imports | |
from loguru import logger | |
logger.add("./data/log/file_{time}.log", rotation="500 MB") # Automatically rotate too big log files | |
# Local Imports | |
from .ansi_utils import ansi_color_str, ansi_link_str | |
from .attr_utils import get_prev_caller_info, get_caller_info, get_thread_info, get_prev_frame, get_prev_frame_from_frame, is_of_type | |
from .table_utils import format_property, format_table | |
from .extended_symbols import ExtendedSymbols, draw_box, draw_arrow, format_arrow | |
#from ..mew_log.mew_context import MewContext, resolve_context | |
# Constants | |
lastLog = None # Stores the most recent log entry | |
debug_log = [] # Stores all log entries | |
startTime=time.time() | |
#settings vars | |
enable_log = True | |
enable_log_to_file = False | |
log_filename = "log" | |
log_file_path = "data/log/log.txt" | |
enable_fancy = True | |
enable_use_color = True | |
type_color='yellow' | |
value_color='bright_yellow' | |
arrow_color = 'bright_white' | |
def allow_curly_braces(original_string): | |
if "{" in original_string or "}" in original_string: | |
escaped_string = original_string.replace("{", "{{").replace("}", "}}") | |
#print("Escaped String:", escaped_string) # Debug output | |
return escaped_string | |
return original_string | |
def format_arg(arg, use_color, fancy): | |
def is_complex_type(obj): | |
return is_of_type(obj, (list, set, dict)) or hasattr(obj, '__dict__') | |
type_str = f"<{type(arg).__name__}>" | |
value_str = repr(arg) if not isinstance(arg, str) and not fancy else format_table(arg, use_color=use_color) | |
newline_if_needed = '\n' if is_complex_type(arg) else "" | |
formatted_arg = f"{type_str}:{newline_if_needed}{value_str}" | |
formatted_arg = allow_curly_braces(formatted_arg) | |
return ansi_color_str(formatted_arg, fg=value_color) if use_color else formatted_arg | |
def format_kwarg(kw, arg, use_color, fancy): | |
name_str = ansi_color_str(kw, fg=kw_color) if use_color else kw | |
arg_str = format_arg(arg, use_color, fancy) | |
formatted_arg = f"{name_str}: {arg_str}" | |
return ansi_color_str(formatted_arg, fg=value_color) if use_color else formatted_arg | |
def format_args(use_color=enable_use_color, fancy=enable_fancy, *args, **kwargs): | |
formatted_args = [ format_arg(arg, use_color, fancy) for arg in args] | |
formatted_kwargs = {k: format_kwarg(k, v, use_color, fancy) for k, v in kwargs.items()} | |
return formatted_args, formatted_kwargs | |
def make_formatted_msg(msg, *args, **kwargs): | |
#fetch format vars from kwargs | |
fancy = kwargs.pop('fancy', enable_fancy) | |
use_color = kwargs.pop('use_color', enable_use_color) | |
context_depth = kwargs.pop('context_depth', None) | |
context = kwargs.pop('context', None) | |
# print(f"args={args}") | |
# print(f"kwargs={kwargs}") | |
formatted_args, formatted_kwargs = format_args(use_color, fancy, *args, **kwargs) | |
msgf = stringf(msg, *formatted_args, **formatted_kwargs) | |
#context = MewContext.resolve_context(context, depth, steps=5) | |
if fancy: | |
formatted_msg = f"\n ~ | {get_timestamp()} | {get_thread_info(get_prev_caller_info(msgf, use_color=use_color, steps=2))}" | |
else: | |
formatted_msg = f"\n ~ | {get_timestamp()} | {context}:{msgf}" | |
return formatted_msg | |
def stringf(s: str, *args, **kwargs): | |
# s = allow_curly_braces(s) # Uncomment or modify as needed | |
if not args and not kwargs: | |
#print("Both args and kwargs are empty.") | |
return s | |
else: | |
return s.format(*args, **kwargs) | |
# if not args: | |
# print("Args is empty.") | |
# if not kwargs: | |
# print("Kwargs is empty.") | |
# print(s) | |
# print(f"args: {args}") | |
# print(f"kwargs: {kwargs}") | |
# return s.format(*args, **kwargs) | |
# No arguments or keyword arguments | |
# stringf("Hello, World!") | |
# # With positional arguments | |
# stringf("Hello, {}!", "World") | |
# # With keyword arguments | |
# stringf("Hello, {name}!", name="Alice") | |
# # With both args and kwargs empty (only prints "Hello, World!") | |
# stringf("Hello, World!") | |
def ensure_directory_exists(directory): | |
path = Path(directory) | |
path.mkdir(parents=True, exist_ok=True) | |
def ensure_file_exists(file_path): | |
path = Path(file_path) | |
path.touch(exist_ok=True) | |
#from ..mew_log.mew_log_helper import MewLogHelper | |
# Logging Configuration | |
# Configure Loguru's logger to use a custom format | |
import sys | |
import io | |
# Set UTF-8 encoding for stdout and stderr | |
#sys.stdout = io.TextIOWrapper(sys.stdout.detach(), encoding='utf-8', line_buffering=True) | |
#sys.stderr = io.TextIOWrapper(sys.stderr.detach(), encoding='utf-8', line_buffering=True) | |
#logger.add(sys.stdout, format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | {level} | {message}", backtrace=True) | |
logger.configure(handlers=[ | |
{ | |
"sink": sys.stdout, | |
"format": "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | {level} | {message}", # | <cyan>{file}:{line}:{function}</cyan> | |
"level": "INFO" | |
}, | |
{ | |
"sink": sys.stdout, | |
"format": "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | {level} | {message}", # | <yellow>{file}:{line}:{function}</yellow> | |
"level": "WARNING" | |
}, | |
{ | |
"sink": sys.stderr, | |
"format": "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | {level} | {message}", #| <red>{file}:{line}:{function}</red> | |
"level": "ERROR" | |
} | |
]) | |
# Logging Configuration | |
def init_log(log_config): | |
global enable_log_to_file, enable_log, log_filename, log_file_path | |
enable_log_to_file = log_config['log_to_file'] | |
clear_file_first = log_config['clear_file_first'] | |
enable_log = log_config['enable_log'] | |
log_filename = log_config['log_filename'] | |
log_dir = 'data/log/' | |
ensure_directory_exists(log_dir) | |
log_file_path = f"data/log/{log_filename}.txt" | |
ensure_file_exists(log_file_path) | |
with open(log_file_path, 'w') as fp: | |
pass | |
print(f'init_log: \n ~ enable_log: {enable_log} \n ~ enable_log_to_file: {enable_log_to_file} \n ~ log_file_path: {log_file_path}') | |
def _update_log(formatted_msg_str): | |
global lastLog, debug_log, enable_log_to_file | |
lastLog = formatted_msg_str | |
debug_log.append(lastLog) | |
if enable_log_to_file: | |
log_file(lastLog) | |
def log_file(msg, file_path = None): | |
global log_file_path | |
if not enable_log: | |
return | |
if file_path is None: | |
file_path = log_file_path | |
curTime = time.time() | |
elapsedTime = curTime - startTime | |
with open(log_file_path, 'w') as fp: | |
fp.write(f"\n ~ | time={elapsedTime:.2f}s ~ {msg}") | |
#logger.add(log_file_path, format="time={elapsedTime:.2f}s ~ {msg}") | |
def log_info(msg, *args,**kwargs): | |
if not enable_log: | |
return | |
formatted_msg_str = make_formatted_msg(msg, *args,**kwargs) | |
_update_log(formatted_msg_str) | |
formatted_log_str = lastLog | |
print(formatted_log_str) | |
#logger.info(formatted_log_str) | |
def log_warning(msg, *args,**kwargs): | |
if not enable_log: | |
return | |
formatted_msg_str = make_formatted_msg(msg, *args,**kwargs) | |
_update_log(formatted_msg_str) | |
formatted_log_str = lastLog | |
#print(formatted_log_str) | |
logger.warning(formatted_log_str) | |
def log_error(msg, e, *args,**kwargs): | |
if not enable_log: | |
return | |
formatted_msg_str = f"\n===^_^===\n{make_formatted_msg(msg, *args, **kwargs)}" | |
formatted_msg_str += f"{trace_error(e)}\n===>_<===" | |
_update_log(formatted_msg_str) | |
formatted_log_str = lastLog | |
#print(formatted_log_str) | |
logger.error(formatted_log_str) | |
import functools | |
import traceback | |
def log_function(func): | |
def wrapper(*args, **kwargs): | |
def get_func_args(): | |
arg_names = func.__code__.co_varnames[:func.__code__.co_argcount] | |
func_args = dict(zip(arg_names, args)) | |
func_args.update(kwargs) | |
try: | |
arg_names = func.__code__.co_varnames[:func.__code__.co_argcount] | |
func_args = dict(zip(arg_names, args)) | |
func_args.update(kwargs) | |
func_args_str = format_table(func_args, headers=None, tablefmt='simple') | |
#print(get_prev_caller_info(f"({func_args_str} )")) | |
result = func(*args, **kwargs) | |
print(get_thread_info(get_prev_caller_info(f"({func_args} ) -> result: {format_arg(result, use_color=True, fancy=True)}"))) | |
return result | |
except Exception as e: | |
arg_names = func.__code__.co_varnames[:func.__code__.co_argcount] | |
func_args = dict(zip(arg_names, args)) | |
func_args.update(kwargs) | |
logger.error(f"Exception: {str(e)}") | |
logger.error(f"Format Traceback: {traceback.format_exc()}") | |
raise # Re-raise the exception after logging | |
return wrapper | |
# @log_function | |
# def dive(a, b): | |
# return a / b | |
# dive(2,3) | |
# dive(12,3) | |
# dive(14,3) | |
# dive(21,3) | |
# dive(21,0) | |
error_color = 'bright_red' | |
def format_traceback(tb_entry, depth=0, use_color=enable_use_color): | |
arrow_str = format_arrow('T', 'double', 3 + 2 * depth) | |
filename, lineno, funcname, line = tb_entry | |
file_path = f"file:///{filename}" | |
file_link = f"{filename}::{lineno}::{funcname}" | |
trace_link = ansi_link_str(file_path, file_link, link_color="bright_magenta") if use_color else file_link | |
arrow_str2 = format_arrow('L', 'double', 3 + 2 * (depth+1)) | |
#(" " * (3 + 2 * depth))+ | |
formatted_tb_str = f'\n ~ | {arrow_str}trace({depth}): {trace_link}\n ~ | {arrow_str2} Line: "{line}"' | |
return ansi_color_str(formatted_tb_str, fg=error_color) if use_color else formatted_tb_str | |
def trace_error(e, use_color=True, fancy=True): | |
exception_str = f"\n ~ | {ExtendedSymbols.DOUBLE_VERTICAL}Exception: {repr(e)}" | |
out_str = ansi_color_str(exception_str, fg=error_color) if use_color else exception_str | |
if isinstance(e, BaseException): | |
arrow_str = format_arrow('T', 'double', 3) | |
traceback_obj = e.__traceback__ | |
if traceback_obj: | |
file_path = f"file:///{traceback_obj.tb_frame.f_code.co_filename}" | |
file_link = f"{traceback_obj.tb_frame.f_code.co_filename}::{traceback_obj.tb_lineno}" | |
trace_link = ansi_link_str(file_path, file_link, link_color="bright_magenta") if use_color else file_link | |
formatted_tb_str = f"\n ~ | {arrow_str}trace(0): {trace_link}" | |
out_str += ansi_color_str(formatted_tb_str, fg=error_color) if use_color else formatted_tb_str | |
tb = traceback.extract_tb(traceback_obj) | |
for i, tb_entry in enumerate(tb): | |
out_str += format_traceback(tb_entry, depth=i+1, use_color=use_color) | |
return ansi_color_str(out_str, fg=error_color) if use_color else out_str | |
# log_info(' ~ | test log_info: {}', 1) | |
# log_warning(' ~ | test log_warning: {}', 2) | |
def log_text_input(key: str): | |
log_info(f'\n ~ | Text Input[{key}]: ') | |
input_text = input() | |
log_info(f'\n ~ | Recv input: "{input_text}"') | |
return input_text | |
def format_duration(seconds): | |
hours, remainder = divmod(seconds, 3600) | |
minutes, seconds = divmod(remainder, 60) | |
return f"{int(hours)}h {int(minutes)}m {int(seconds)}s" | |
def parse_duration(duration_str): | |
parts = duration_str.split() | |
hours = int(parts[0][:-1]) if 'h' in parts[0] else 0 | |
minutes = int(parts[1][:-1]) if 'm' in parts[1] else 0 | |
seconds = int(parts[2][:-1]) if 's' in parts[2] else 0 | |
return hours * 3600 + minutes * 60 + seconds | |
def get_timestamp(): | |
return datetime.datetime.now().strftime("%Y-%b-%d %H:%M:%S") | |
def format_path(directory, depth=0, max_depth=3, use_color=True): | |
"""Formats the directory structure as a string up to a specified depth, with optional color. | |
Args: | |
directory (str or Path): The directory path to format. | |
depth (int): The current depth in the directory structure. | |
max_depth (int): The maximum depth to format. | |
use_color (bool): Whether to apply color to the output. | |
Returns: | |
str: The formatted directory structure. | |
""" | |
if max_depth is not None and depth > max_depth: | |
return "" | |
directory_path = Path(directory) | |
if not directory_path.is_dir(): | |
return "The specified path is not a directory.\n" | |
# Build the formatted string, applying color if requested | |
line_prefix = "│ " * depth + "├── " | |
directory_name = directory_path.name | |
if use_color: | |
line_prefix = ansi_color_str(line_prefix, fg='cyan') # Assuming color_str function exists | |
directory_name = ansi_color_str(directory_name, fg='green') # Adjust colors as needed | |
formatted_str = line_prefix + directory_name + "\n" | |
# Sort directory contents for consistent order | |
sorted_items = sorted(directory_path.iterdir(), key=lambda x: (x.is_file(), x.name)) | |
for item in sorted_items: | |
if item.is_dir(): | |
# Recursive call for directories, increasing the depth | |
formatted_str += format_path(item, depth + 1, max_depth, use_color) | |
else: | |
# Include file name with indentation, applying color if requested | |
file_line = "│ " * (depth + 1) + "├── " + item.name | |
if use_color: | |
file_line = ansi_color_str(file_line, fg='white') # Example color, adjust as needed | |
formatted_str += file_line + "\n" | |
return formatted_str | |
# Spinner Class | |
class LogSpinner: | |
def __init__(self, message="", rainbow=True, anim_speed=0.1): | |
self.message = message | |
self.speed = anim_speed | |
self.colors = ['\033[31m', '\033[33m', '\033[32m', '\033[34m', '\033[35m', '\033[36m'] | |
self.spinner = ['â ‡', 'â ‹', 'â ™', 'â ¸', 'â ¼', 'â ´', 'â ¦', 'â §'] | |
#self.spinner = ['|', '/', '-', '\\'] | |
self.rainbow = rainbow | |
def __enter__(self): | |
self.stop_spinner = False | |
self.startTime = time.time() | |
# spinner_text = f'^_^ | {self.message} - time: {0.0:.2f}s | Begin!' | |
# print(spinner_text) | |
self.spinner_thread = threading.Thread(target=self.spin) | |
self.spinner_thread.start() | |
return self | |
def __exit__(self, exc_type, exc_value, traceback): | |
self.stop_spinner = True | |
self.spinner_thread.join() | |
curTime = time.time() | |
spinner_text = f'>_< | {self.message} - time: {format_duration(curTime - self.startTime)} | Done!' | |
print(spinner_text) | |
def spin(self): | |
while not self.stop_spinner: | |
#self.update_spinner() | |
if self.rainbow: self.update_color_spinner() | |
else: self.update_spinner() | |
def update_spinner(self): | |
for char in self.spinner: | |
curTime = time.time() | |
spinner_text = f'{self.message} - time: {format_duration(curTime - self.startTime)} |' | |
print(f'\r ^_^ | {char} | {spinner_text}', end='\r', flush=True) | |
time.sleep(self.speed) | |
def update_color_spinner(self): | |
for color in self.colors: | |
for char in self.spinner: | |
curTime = time.time() | |
spinner_text = f'{self.message} - time: {format_duration(curTime - self.startTime)} |' | |
print(f'^_^ | {color}{char}\033[0m | {spinner_text}', end='\r', flush=True) | |
time.sleep(self.speed) | |
def run_unit_test(): | |
# Example usage: | |
class Person: | |
def __init__(self, name, age, city): | |
self.name = name | |
self.age = age | |
self.city = city | |
# Create instances of Person | |
person1 = Person('John', 30, 'New York') | |
person2 = Person('Alice', 25, 'Los Angeles') | |
person3 = Person('Bob', 30, 'Hong Kong') | |
person4 = Person('Charlie', 35, 'Shanghai') | |
person5 = Person('David', 40, 'Beijing') | |
# Define other data structures | |
data_dict = {'Name': 'John', 'Age': 30, 'City': 'New York'} | |
data_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] | |
data_set = {'apple', 'banana', 'orange'} | |
data_dict_of_lists = {'Name': ['John', 'Alice'], 'Age': [30, 25], 'City': ['New York', 'Los Angeles']} | |
data_list_of_dicts = [person1.__dict__, person2.__dict__] | |
data_dict_of_dicts = {'Person1': person1.__dict__, 'Person2': person2.__dict__} | |
list_of_objs = [person3, person4, person5] | |
data_list_of_lists = [['John', 30, 'New York'], ['Alice', 25, 'Los Angeles']] | |
dict_of_objs = {'Alice': person3, 'Bob': person4, 'Charlie': person5} | |
dict_of_list_of_objs = {'Group1': [person3, person4], 'Group2': [person5]} | |
# Combine all data into a complex structure | |
complex_data = { | |
'data_dict': data_dict, | |
'data_list': data_list, | |
'data_set': data_set, | |
'data_dict_of_lists': data_dict_of_lists, | |
'data_list_of_dicts': data_list_of_dicts, | |
'data_dict_of_dicts': data_dict_of_dicts, | |
'list_of_objs': list_of_objs, | |
'data_list_of_lists': data_list_of_lists, | |
'dict_of_objs': dict_of_objs, | |
'dict_of_list_of_objs': dict_of_list_of_objs | |
} | |
# Log each unique data structure | |
log_info("Data Dictionary: {}", data_dict) | |
log_info("Data List: {}", data_list) | |
log_info("Data Set: {}", data_set) | |
log_info("Data Dictionary of Lists: {}", data_dict_of_lists) | |
log_info("Data List of Dicts: {}", data_list_of_dicts) | |
log_info("Data Dictionary of Dicts: {}", data_dict_of_dicts) | |
log_info("List of Objects: {}", list_of_objs) | |
log_info("Data List of Lists: {}", data_list_of_lists) | |
log_info("Dictionary of Objects: {}", dict_of_objs) | |
log_info("Dictionary of List of Objects: {}", dict_of_list_of_objs) | |
#run_unit_test() | |