DeepSeekCoderChat / mew_log /log_utils.py
mew77's picture
Update mew_log/log_utils.py
d7870cb verified
raw
history blame
17.7 kB
# Standard Library Imports
import datetime
import threading
import time
import traceback
from pathlib import Path
import enum
# Third-Party Imports
from loguru import logger
logger.add("./data/log/file_{time}.log", rotation="500 MB") # Automatically rotate too big log files
# Local Imports
from .ansi_utils import ansi_color_str, ansi_link_str
from .attr_utils import get_prev_caller_info, get_caller_info, get_thread_info, get_prev_frame, get_prev_frame_from_frame, is_of_type
from .table_utils import format_property, format_table
from .extended_symbols import ExtendedSymbols, draw_box, draw_arrow, format_arrow
#from ..mew_log.mew_context import MewContext, resolve_context
# Constants
lastLog = None # Stores the most recent log entry
debug_log = [] # Stores all log entries
startTime=time.time()
#settings vars
enable_log = True
enable_log_to_file = False
log_filename = "log"
log_file_path = "data/log/log.txt"
enable_fancy = True
enable_use_color = True
type_color='yellow'
value_color='bright_yellow'
arrow_color = 'bright_white'
def allow_curly_braces(original_string):
if "{" in original_string or "}" in original_string:
escaped_string = original_string.replace("{", "{{").replace("}", "}}")
#print("Escaped String:", escaped_string) # Debug output
return escaped_string
return original_string
def format_arg(arg, use_color, fancy):
def is_complex_type(obj):
return is_of_type(obj, (list, set, dict)) or hasattr(obj, '__dict__')
type_str = f"<{type(arg).__name__}>"
value_str = repr(arg) if not isinstance(arg, str) and not fancy else format_table(arg, use_color=use_color)
newline_if_needed = '\n' if is_complex_type(arg) else ""
formatted_arg = f"{type_str}:{newline_if_needed}{value_str}"
formatted_arg = allow_curly_braces(formatted_arg)
return ansi_color_str(formatted_arg, fg=value_color) if use_color else formatted_arg
def format_kwarg(kw, arg, use_color, fancy):
name_str = ansi_color_str(kw, fg=kw_color) if use_color else kw
arg_str = format_arg(arg, use_color, fancy)
formatted_arg = f"{name_str}: {arg_str}"
return ansi_color_str(formatted_arg, fg=value_color) if use_color else formatted_arg
def format_args(use_color=enable_use_color, fancy=enable_fancy, *args, **kwargs):
formatted_args = [ format_arg(arg, use_color, fancy) for arg in args]
formatted_kwargs = {k: format_kwarg(k, v, use_color, fancy) for k, v in kwargs.items()}
return formatted_args, formatted_kwargs
def make_formatted_msg(msg, *args, **kwargs):
#fetch format vars from kwargs
fancy = kwargs.pop('fancy', enable_fancy)
use_color = kwargs.pop('use_color', enable_use_color)
context_depth = kwargs.pop('context_depth', None)
context = kwargs.pop('context', None)
# print(f"args={args}")
# print(f"kwargs={kwargs}")
formatted_args, formatted_kwargs = format_args(use_color, fancy, *args, **kwargs)
msgf = stringf(msg, *formatted_args, **formatted_kwargs)
#context = MewContext.resolve_context(context, depth, steps=5)
if fancy:
formatted_msg = f"\n ~ | {get_timestamp()} | {get_thread_info(get_prev_caller_info(msgf, use_color=use_color, steps=2))}"
else:
formatted_msg = f"\n ~ | {get_timestamp()} | {context}:{msgf}"
return formatted_msg
def stringf(s: str, *args, **kwargs):
# s = allow_curly_braces(s) # Uncomment or modify as needed
if not args and not kwargs:
#print("Both args and kwargs are empty.")
return s
else:
return s.format(*args, **kwargs)
# if not args:
# print("Args is empty.")
# if not kwargs:
# print("Kwargs is empty.")
# print(s)
# print(f"args: {args}")
# print(f"kwargs: {kwargs}")
# return s.format(*args, **kwargs)
# No arguments or keyword arguments
# stringf("Hello, World!")
# # With positional arguments
# stringf("Hello, {}!", "World")
# # With keyword arguments
# stringf("Hello, {name}!", name="Alice")
# # With both args and kwargs empty (only prints "Hello, World!")
# stringf("Hello, World!")
def ensure_directory_exists(directory):
path = Path(directory)
path.mkdir(parents=True, exist_ok=True)
def ensure_file_exists(file_path):
path = Path(file_path)
path.touch(exist_ok=True)
#from ..mew_log.mew_log_helper import MewLogHelper
# Logging Configuration
# Configure Loguru's logger to use a custom format
import sys
import io
# Set UTF-8 encoding for stdout and stderr
#sys.stdout = io.TextIOWrapper(sys.stdout.detach(), encoding='utf-8', line_buffering=True)
#sys.stderr = io.TextIOWrapper(sys.stderr.detach(), encoding='utf-8', line_buffering=True)
#logger.add(sys.stdout, format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | {level} | {message}", backtrace=True)
logger.configure(handlers=[
{
"sink": sys.stdout,
"format": "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | {level} | {message}", # | <cyan>{file}:{line}:{function}</cyan>
"level": "INFO"
},
{
"sink": sys.stdout,
"format": "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | {level} | {message}", # | <yellow>{file}:{line}:{function}</yellow>
"level": "WARNING"
},
{
"sink": sys.stderr,
"format": "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | {level} | {message}", #| <red>{file}:{line}:{function}</red>
"level": "ERROR"
}
])
# Logging Configuration
def init_log(log_config):
global enable_log_to_file, enable_log, log_filename, log_file_path
enable_log_to_file = log_config['log_to_file']
clear_file_first = log_config['clear_file_first']
enable_log = log_config['enable_log']
log_filename = log_config['log_filename']
log_dir = 'data/log/'
ensure_directory_exists(log_dir)
log_file_path = f"data/log/{log_filename}.txt"
ensure_file_exists(log_file_path)
with open(log_file_path, 'w') as fp:
pass
print(f'init_log: \n ~ enable_log: {enable_log} \n ~ enable_log_to_file: {enable_log_to_file} \n ~ log_file_path: {log_file_path}')
def _update_log(formatted_msg_str):
global lastLog, debug_log, enable_log_to_file
lastLog = formatted_msg_str
debug_log.append(lastLog)
if enable_log_to_file:
log_file(lastLog)
def log_file(msg, file_path = None):
global log_file_path
if not enable_log:
return
if file_path is None:
file_path = log_file_path
curTime = time.time()
elapsedTime = curTime - startTime
with open(log_file_path, 'w') as fp:
fp.write(f"\n ~ | time={elapsedTime:.2f}s ~ {msg}")
#logger.add(log_file_path, format="time={elapsedTime:.2f}s ~ {msg}")
def log_info(msg, *args,**kwargs):
if not enable_log:
return
formatted_msg_str = make_formatted_msg(msg, *args,**kwargs)
_update_log(formatted_msg_str)
formatted_log_str = lastLog
print(formatted_log_str)
#logger.info(formatted_log_str)
def log_warning(msg, *args,**kwargs):
if not enable_log:
return
formatted_msg_str = make_formatted_msg(msg, *args,**kwargs)
_update_log(formatted_msg_str)
formatted_log_str = lastLog
#print(formatted_log_str)
logger.warning(formatted_log_str)
def log_error(msg, e, *args,**kwargs):
if not enable_log:
return
formatted_msg_str = f"\n===^_^===\n{make_formatted_msg(msg, *args, **kwargs)}"
formatted_msg_str += f"{trace_error(e)}\n===>_<==="
_update_log(formatted_msg_str)
formatted_log_str = lastLog
#print(formatted_log_str)
logger.error(formatted_log_str)
import functools
import traceback
def log_function(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
def get_func_args():
arg_names = func.__code__.co_varnames[:func.__code__.co_argcount]
func_args = dict(zip(arg_names, args))
func_args.update(kwargs)
try:
arg_names = func.__code__.co_varnames[:func.__code__.co_argcount]
func_args = dict(zip(arg_names, args))
func_args.update(kwargs)
func_args_str = format_table(func_args, headers=None, tablefmt='simple')
#print(get_prev_caller_info(f"({func_args_str} )"))
result = func(*args, **kwargs)
print(get_thread_info(get_prev_caller_info(f"({func_args} ) -> result: {format_arg(result, use_color=True, fancy=True)}")))
return result
except Exception as e:
arg_names = func.__code__.co_varnames[:func.__code__.co_argcount]
func_args = dict(zip(arg_names, args))
func_args.update(kwargs)
logger.error(f"Exception: {str(e)}")
logger.error(f"Format Traceback: {traceback.format_exc()}")
raise # Re-raise the exception after logging
return wrapper
# @log_function
# def dive(a, b):
# return a / b
# dive(2,3)
# dive(12,3)
# dive(14,3)
# dive(21,3)
# dive(21,0)
error_color = 'bright_red'
def format_traceback(tb_entry, depth=0, use_color=enable_use_color):
arrow_str = format_arrow('T', 'double', 3 + 2 * depth)
filename, lineno, funcname, line = tb_entry
file_path = f"file:///{filename}"
file_link = f"{filename}::{lineno}::{funcname}"
trace_link = ansi_link_str(file_path, file_link, link_color="bright_magenta") if use_color else file_link
arrow_str2 = format_arrow('L', 'double', 3 + 2 * (depth+1))
#(" " * (3 + 2 * depth))+
formatted_tb_str = f'\n ~ | {arrow_str}trace({depth}): {trace_link}\n ~ | {arrow_str2} Line: "{line}"'
return ansi_color_str(formatted_tb_str, fg=error_color) if use_color else formatted_tb_str
def trace_error(e, use_color=True, fancy=True):
exception_str = f"\n ~ | {ExtendedSymbols.DOUBLE_VERTICAL}Exception: {repr(e)}"
out_str = ansi_color_str(exception_str, fg=error_color) if use_color else exception_str
if isinstance(e, BaseException):
arrow_str = format_arrow('T', 'double', 3)
traceback_obj = e.__traceback__
if traceback_obj:
file_path = f"file:///{traceback_obj.tb_frame.f_code.co_filename}"
file_link = f"{traceback_obj.tb_frame.f_code.co_filename}::{traceback_obj.tb_lineno}"
trace_link = ansi_link_str(file_path, file_link, link_color="bright_magenta") if use_color else file_link
formatted_tb_str = f"\n ~ | {arrow_str}trace(0): {trace_link}"
out_str += ansi_color_str(formatted_tb_str, fg=error_color) if use_color else formatted_tb_str
tb = traceback.extract_tb(traceback_obj)
for i, tb_entry in enumerate(tb):
out_str += format_traceback(tb_entry, depth=i+1, use_color=use_color)
return ansi_color_str(out_str, fg=error_color) if use_color else out_str
# log_info(' ~ | test log_info: {}', 1)
# log_warning(' ~ | test log_warning: {}', 2)
def log_text_input(key: str):
log_info(f'\n ~ | Text Input[{key}]: ')
input_text = input()
log_info(f'\n ~ | Recv input: "{input_text}"')
return input_text
def format_duration(seconds):
hours, remainder = divmod(seconds, 3600)
minutes, seconds = divmod(remainder, 60)
return f"{int(hours)}h {int(minutes)}m {int(seconds)}s"
def parse_duration(duration_str):
parts = duration_str.split()
hours = int(parts[0][:-1]) if 'h' in parts[0] else 0
minutes = int(parts[1][:-1]) if 'm' in parts[1] else 0
seconds = int(parts[2][:-1]) if 's' in parts[2] else 0
return hours * 3600 + minutes * 60 + seconds
def get_timestamp():
return datetime.datetime.now().strftime("%Y-%b-%d %H:%M:%S")
def format_path(directory, depth=0, max_depth=3, use_color=True):
"""Formats the directory structure as a string up to a specified depth, with optional color.
Args:
directory (str or Path): The directory path to format.
depth (int): The current depth in the directory structure.
max_depth (int): The maximum depth to format.
use_color (bool): Whether to apply color to the output.
Returns:
str: The formatted directory structure.
"""
if max_depth is not None and depth > max_depth:
return ""
directory_path = Path(directory)
if not directory_path.is_dir():
return "The specified path is not a directory.\n"
# Build the formatted string, applying color if requested
line_prefix = "│ " * depth + "├── "
directory_name = directory_path.name
if use_color:
line_prefix = ansi_color_str(line_prefix, fg='cyan') # Assuming color_str function exists
directory_name = ansi_color_str(directory_name, fg='green') # Adjust colors as needed
formatted_str = line_prefix + directory_name + "\n"
# Sort directory contents for consistent order
sorted_items = sorted(directory_path.iterdir(), key=lambda x: (x.is_file(), x.name))
for item in sorted_items:
if item.is_dir():
# Recursive call for directories, increasing the depth
formatted_str += format_path(item, depth + 1, max_depth, use_color)
else:
# Include file name with indentation, applying color if requested
file_line = "│ " * (depth + 1) + "├── " + item.name
if use_color:
file_line = ansi_color_str(file_line, fg='white') # Example color, adjust as needed
formatted_str += file_line + "\n"
return formatted_str
# Spinner Class
class LogSpinner:
def __init__(self, message="", rainbow=True, anim_speed=0.1):
self.message = message
self.speed = anim_speed
self.colors = ['\033[31m', '\033[33m', '\033[32m', '\033[34m', '\033[35m', '\033[36m']
self.spinner = ['â ‡', 'â ‹', 'â ™', 'â ¸', 'â ¼', 'â ´', 'â ¦', 'â §']
#self.spinner = ['|', '/', '-', '\\']
self.rainbow = rainbow
def __enter__(self):
self.stop_spinner = False
self.startTime = time.time()
# spinner_text = f'^_^ | {self.message} - time: {0.0:.2f}s | Begin!'
# print(spinner_text)
self.spinner_thread = threading.Thread(target=self.spin)
self.spinner_thread.start()
return self
def __exit__(self, exc_type, exc_value, traceback):
self.stop_spinner = True
self.spinner_thread.join()
curTime = time.time()
spinner_text = f'>_< | {self.message} - time: {format_duration(curTime - self.startTime)} | Done!'
print(spinner_text)
def spin(self):
while not self.stop_spinner:
#self.update_spinner()
if self.rainbow: self.update_color_spinner()
else: self.update_spinner()
def update_spinner(self):
for char in self.spinner:
curTime = time.time()
spinner_text = f'{self.message} - time: {format_duration(curTime - self.startTime)} |'
print(f'\r ^_^ | {char} | {spinner_text}', end='\r', flush=True)
time.sleep(self.speed)
def update_color_spinner(self):
for color in self.colors:
for char in self.spinner:
curTime = time.time()
spinner_text = f'{self.message} - time: {format_duration(curTime - self.startTime)} |'
print(f'^_^ | {color}{char}\033[0m | {spinner_text}', end='\r', flush=True)
time.sleep(self.speed)
def run_unit_test():
# Example usage:
class Person:
def __init__(self, name, age, city):
self.name = name
self.age = age
self.city = city
# Create instances of Person
person1 = Person('John', 30, 'New York')
person2 = Person('Alice', 25, 'Los Angeles')
person3 = Person('Bob', 30, 'Hong Kong')
person4 = Person('Charlie', 35, 'Shanghai')
person5 = Person('David', 40, 'Beijing')
# Define other data structures
data_dict = {'Name': 'John', 'Age': 30, 'City': 'New York'}
data_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
data_set = {'apple', 'banana', 'orange'}
data_dict_of_lists = {'Name': ['John', 'Alice'], 'Age': [30, 25], 'City': ['New York', 'Los Angeles']}
data_list_of_dicts = [person1.__dict__, person2.__dict__]
data_dict_of_dicts = {'Person1': person1.__dict__, 'Person2': person2.__dict__}
list_of_objs = [person3, person4, person5]
data_list_of_lists = [['John', 30, 'New York'], ['Alice', 25, 'Los Angeles']]
dict_of_objs = {'Alice': person3, 'Bob': person4, 'Charlie': person5}
dict_of_list_of_objs = {'Group1': [person3, person4], 'Group2': [person5]}
# Combine all data into a complex structure
complex_data = {
'data_dict': data_dict,
'data_list': data_list,
'data_set': data_set,
'data_dict_of_lists': data_dict_of_lists,
'data_list_of_dicts': data_list_of_dicts,
'data_dict_of_dicts': data_dict_of_dicts,
'list_of_objs': list_of_objs,
'data_list_of_lists': data_list_of_lists,
'dict_of_objs': dict_of_objs,
'dict_of_list_of_objs': dict_of_list_of_objs
}
# Log each unique data structure
log_info("Data Dictionary: {}", data_dict)
log_info("Data List: {}", data_list)
log_info("Data Set: {}", data_set)
log_info("Data Dictionary of Lists: {}", data_dict_of_lists)
log_info("Data List of Dicts: {}", data_list_of_dicts)
log_info("Data Dictionary of Dicts: {}", data_dict_of_dicts)
log_info("List of Objects: {}", list_of_objs)
log_info("Data List of Lists: {}", data_list_of_lists)
log_info("Dictionary of Objects: {}", dict_of_objs)
log_info("Dictionary of List of Objects: {}", dict_of_list_of_objs)
#run_unit_test()