DeepSeekCoderChat / mew_log /profile_utils.py
mew77's picture
Update mew_log/profile_utils.py
100d5c1 verified
raw
history blame
6.11 kB
import time
from .ansi_utils import ansi_color_str
from .attr_utils import get_prev_caller_info
from .log_utils import log_info, LogSpinner
from .table_utils import format_table
time_color = ''
# helper funcs
class ProfileSection:
Registry = {} # Class Variable
def __init__(self, newMsg=None, enable=True, do_print=False):
self.msg = newMsg if newMsg else get_prev_caller_info()
self.enable = enable
self.startTime = 0.0
self.endTime = 0.0
self.elapsedTime = 0.0
self.totalTime = 0.0
self.avgTime = 0.0
self.numCalls = 0
self.do_print = do_print
if self.msg in ProfileSection.Registry:
p = ProfileSection.Registry[self.msg]
self.__dict__ = p.__dict__
self.numCalls += 1
else:
self.numCalls += 1
if (self.enable):
self.start()
def __del__(self):
if (self.enable):
self.stop()
def __enter__(self):
# if self.enable:
self.start()
return self
def __exit__(self, exc_type, exc_value, traceback):
# if self.enable:
self.stop()
def start(self):
self.startTime = time.time()
# if self.do_print : log_info(str(self))
def stop(self):
self.endTime = time.time()
self.update_stats()
stopMsg = str(self)
p = create_or_get_profile(self.msg)
p.__dict__.update(self.__dict__)
if self.do_print: log_info(str(p))
# st.toast(stopMsg)
return stopMsg
def update_stats(self):
self.elapsedTime = self.endTime - self.startTime
self.totalTime += self.elapsedTime
self.avgTime = self.totalTime / float(self.numCalls)
def __str__(self, use_color=True):
msg_str = self.msg #ansi_color_str(self.msg, fg='green') # Green color for message
elapsed_time_str = make_time_str('elapsed', self.elapsedTime)
total_time_str = make_time_str('total', self.totalTime)
avg_time_str = make_time_str('avg', self.avgTime)
calls_str = f'calls={self.numCalls}'
if use_color:
elapsed_time_str = ansi_color_str(elapsed_time_str, fg='bright_cyan') # Cyan color for elapsed time
total_time_str = ansi_color_str(total_time_str, fg='bright_cyan') # Cyan color for total time
avg_time_str = ansi_color_str(avg_time_str, fg='bright_cyan') # Cyan color for average time
calls_str = ansi_color_str(calls_str, fg='yellow') # Cyan color for calls information
return f"{msg_str} ~ Elapsed Time: {elapsed_time_str} | Total Time: {total_time_str} | Avg Time: {avg_time_str} | {calls_str}"
from functools import wraps
from threading import Thread
from rich.progress import Progress, BarColumn, TextColumn, TimeRemainingColumn, TaskProgressColumn
from rich.console import Console
from rich.style import Style
from rich.panel import Panel
def profile_function(func):
"""
Decorator to track the progress of a function with a spinner.
The decorated function should not require explicit progress updates.
"""
@wraps(func)
def wrapper(*args, **kwargs):
profile_section = ProfileSection(func.__name__, enable=True, do_print=False)
console = Console()
with LogSpinner(func.__name__):
with profile_section:
result = func(*args, **kwargs)
#profile_section.stop()
# Display timing information using rich
panel = Panel.fit(
f"[bold green]{profile_section.msg}[/bold green]\n"
f"[cyan]Elapsed Time:[/cyan] {make_time_str('elapsed', profile_section.elapsedTime)}\n"
f"[cyan]Total Time:[/cyan] {make_time_str('total', profile_section.totalTime)}\n"
f"[cyan]Avg Time:[/cyan] {make_time_str('avg', profile_section.avgTime)}\n"
f"[yellow]Calls:[/yellow] {profile_section.numCalls}",
title="Profile Report",
border_style="bright_blue"
)
console.print(panel)
#print(str(profile_section))
return result
return wrapper
def make_time_str(msg, value):
# do something fancy
value, time_unit = (value / 60, 'min') if value >= 60 else (value * 1000, 'ms') if value < 0.01 else (value, 's')
return f"{msg}={int(value) if value % 1 == 0 else value:.2f} {time_unit}"
def create_or_get_profile(key, enable=False, do_print=False):
if key not in ProfileSection.Registry:
ProfileSection.Registry[key] = ProfileSection(key, enable, do_print)
return ProfileSection.Registry[key]
def profile_start(msg, enable=True, do_print=False):
p = create_or_get_profile(msg, enable, do_print)
if not enable: p.start()
def profile_stop(msg):
if key in ProfileSection.Registry:
create_or_get_profile(msg).stop()
def get_profile_registry():
return ProfileSection.Registry
from loguru import logger
def get_profile_reports():
reports = [value for value in ProfileSection.Registry.values()]
reports.sort(key=lambda x: (x.totalTime, x.avgTime), reverse=True)
return reports
def log_profile_registry(use_color=True):
formatted_output = format_profile_registry(use_color=use_color)
print(formatted_output)
#logger.info(formatted_output)
#return formatted_output
def allow_curly_braces(original_string):
escaped_string = original_string.replace("{", "{{").replace("}", "}}")
#print("Escaped String:", escaped_string) # Debug output
return escaped_string
def format_profile_registry(use_color=True):
reports = get_profile_reports()
out_str = []
out_str.append('=== Profile Reports ===\n')
for report in reports:
out_str.append(str(report)+'\n')
out_str.append('===>_<===\n')
return ''.join(out_str)
# import random
# def do_this(y):
# p = ProfileSection("do_this", True) #only way to use the auto destruct method
# x = random.randint(0, (y+1)*2)
# print(f'do_this: {x} - {y}')
# for index in range(1000):
# do_this(index)
# log_profile_registry()