Spaces:
Running
Running
from .ansi_utils import ansi_link_str, ansi_color_str | |
def get_type_name(attr): | |
return type(attr).__name__ | |
def is_complex_type(obj): | |
return is_of_type(variable, (list, set, dict)) or hasattr(variable, '__dict__') | |
def is_bytes_like_type(obj, bytes_like_types=(memoryview, bytes, bytearray), raise_err=False): | |
try: | |
return is_of_type(obj, bytes_like_types) | |
except TypeError as e: | |
if raise_err: | |
raise TypeError(f"Provided object does not match the provided types: {bytes_like_types}") | |
return False | |
def is_of_type(obj, types, raise_err=False): | |
if isinstance(obj, (types)): | |
return True | |
elif raise_err: | |
raise TypeError(f"Provided object does not match the provided types: {tuple(types)}") | |
return False | |
def get_partial_argspec(method): | |
if not callable(method): | |
return None # Not a callable object | |
try: | |
full_argspec = inspect.getfullargspec(method) | |
return full_argspec | |
except TypeError: | |
# Fallback to using inspect.signature | |
signature = get_signature(method) | |
if signature: | |
parameters = signature.parameters | |
args = [param for param in parameters if parameters[param].default == parameters[param].empty] | |
varargs = signature.varargs | |
varkw = signature.varkw | |
defaults = [parameters[param].default for param in args] | |
return inspect.FullArgSpec(args, varargs, varkw, defaults) | |
def get_signature(method): | |
try: | |
signature = inspect.signature(method) | |
return signature | |
except (TypeError, ValueError): | |
return None | |
def get_method_args(method): | |
full_arg_spec = get_partial_argspec(method) | |
if full_arg_spec: | |
args = [arg for arg in full_arg_spec.args if | |
getattr(method, arg, None) is not None and getattr(method, arg, "") != ""] | |
kwargs = {key: getattr(method, key, None) for key in full_arg_spec.kwonlyargs} | |
kwargs_defaults = {key: value for key, value in | |
zip(full_arg_spec.kwonlyargs, full_arg_spec.kwonlydefaults or ())} | |
args.extend(f"{key}={value}" for key, value in kwargs.items() if value is not None and value != "") | |
return args | |
return None | |
def get_source_info(attr): | |
try: | |
source_lines, line_number = inspect.getsourcelines(attr) | |
source_file_path = inspect.getsourcefile(attr) | |
source_file = os.path.relpath(source_file_path) | |
return f"/{source_file}::{line_number}" | |
except Exception as e: | |
return "Source info not available!" | |
def get_method_info(method): | |
args = get_method_args(method) | |
args_str = ", ".join(args) if args else '' | |
signature = get_signature(method) | |
return_str = f' -> {signature.return_annotation}' if signature and signature.return_annotation is not inspect.Signature.empty else '' | |
try: | |
source_info = get_source_info(method) | |
except Exception as e: | |
raise Exception("Source info not available!", e) | |
# Construct the file:// URL with line number for the method if available | |
method_file_url = f"file://{inspect.getsourcefile(method)}#L{inspect.getsourcelines(method)[1]}" | |
method_link = ansi_link_str(method_file_url, "Source") | |
# Include the link in the method signature string | |
method_signature = f"{signature}{return_str}: {method_link}\n-->{source_info}" | |
return method_signature | |
def get_var_value(variable): | |
return f'{str(variable)}' if not is_of_type(variable, (list, set, dict)) or hasattr(variable, '__dict__') else '...' | |
def get_variable_info(variable): | |
return f"<{ get_type_name(variable) }>: { get_var_value(variable) }" | |
def list_class_attributes(cls, verbose=True, use_color=False): | |
def format_str(s, fg=None): | |
return ansi_color_str(s, fg=fg) if use_color else s | |
# Determine whether cls is a class or an instance of a class | |
if inspect.isclass(cls): | |
class_name = cls.__name__ | |
else: | |
class_name = cls.__class__.__name__ | |
variables = [ | |
f'{attribute}{get_variable_info(getattr(cls, attribute))}' if verbose else f'{attribute}<{get_type_name(getattr(cls, attribute))}>' | |
for attribute in dir(cls) if not attribute.startswith('__') and not callable(getattr(cls, attribute))] | |
methods = [f'{attribute}{get_method_info(getattr(cls, attribute))}' if verbose else f'{attribute}<method>' for | |
attribute in dir(cls) if not attribute.startswith('__') and callable(getattr(cls, attribute))] | |
variables_str = '\n'.join([f' - {format_str(var, fg="green")}' for var in variables]) | |
methods_str = '\n'.join([f' - {format_str(method, fg="blue")}' for method in methods]) | |
cls_name = format_str(class_name, fg="cyan") if use_color else class_name | |
return f'===list_class_attributes of: {cls_name}:\n===<variables>===\n{variables_str}\n===<methods>===\n{methods_str}' | |
def get_class_attributes(cls, verbose=True, use_color=True): | |
def format_str(s, fg=None): | |
return ansi_color_str(s, fg=fg) if use_color else s | |
attributes_dict = {'variables': {}, 'methods': {}} | |
for attribute_v in vars(cls): | |
if not attribute_v.startswith('__') and 'stat' not in attribute_v: | |
attr = getattr(cls, attribute_v) | |
if not callable(attr): | |
attr_info = get_variable_info(attr) if verbose else '' | |
formatted_key = format_str(attribute_v, fg="green") | |
formatted_value = format_str(attr_info, fg="cyan") if verbose else '' | |
attributes_dict['variables'][attribute_v] = f'\n ~ {formatted_key}{formatted_value}' | |
for attribute in dir(cls): | |
if not attribute.startswith('__'): | |
attr = getattr(cls, attribute) | |
if callable(attr): | |
method_info = get_method_info(attr) if verbose else '' | |
formatted_key = format_str(attribute, fg="blue") | |
formatted_value = format_str(method_info, fg="cyan") if verbose else '' | |
attributes_dict['methods'][attribute] = f'\n ~ {formatted_key}{formatted_value}' | |
return attributes_dict | |
import threading | |
def get_thread_info(message='', use_color=True): | |
current_thread = threading.current_thread() | |
current_thread_name = current_thread.name | |
current_thread_id = current_thread.ident | |
current_thread_alive = current_thread.is_alive() | |
# Construct the colored thread info | |
thread_info = f'thread:{current_thread_name}::{current_thread_id}::{current_thread_alive}' | |
if use_color: | |
thread_info = ansi_color_str(thread_info,fg='yellow', bg='bright_yellow') | |
formatted_message = f'{thread_info}:{message}' | |
return formatted_message | |
import inspect | |
import os | |
# Get the current frame | |
def get_prev_frame(steps=1): | |
curr_frame = inspect.currentframe() | |
# Traverse back the specified number of steps in the call stack | |
for _ in range(steps): | |
if curr_frame is not None: | |
curr_frame = curr_frame.f_back | |
if curr_frame is None: | |
return None | |
return curr_frame | |
def get_prev_frame_from_frame(frame, steps=1): | |
curr_frame = frame | |
# Traverse back the specified number of steps in the call stack | |
for _ in range(steps): | |
if frame is not None: | |
curr_frame = curr_frame.f_back | |
if curr_frame is None: | |
return None | |
return curr_frame | |
def get_prev_caller_info(message='', use_color=True, steps=99): | |
# Get the current frame | |
curr_frame = inspect.currentframe() | |
caller_frame = curr_frame.f_back | |
while not caller_frame.f_back is None: | |
caller_frame = caller_frame.f_back | |
steps -=1 | |
if steps <= 0: | |
break | |
previous_frame = caller_frame | |
# Retrieve the information about the previous frame | |
frame_info = inspect.getframeinfo(previous_frame) | |
# Get the file name where the function was called | |
filename_with_path = frame_info.filename | |
# Extract only the file name | |
filename = os.path.basename(filename_with_path) | |
# Get the line number in the file where the function was called | |
linenumber = frame_info.lineno | |
# Get the function name | |
function = frame_info.function | |
# Format the string to include the passed message | |
caller_link = ansi_link_str(f"file:///{filename_with_path}", f"{filename}::{linenumber}::{function}") if use_color else f"{filename}::{linenumber}::{function}" | |
info_str = f"{caller_link}: {message}" | |
# Clean up to prevent reference cycles | |
del curr_frame | |
del caller_frame | |
del previous_frame | |
return info_str | |
def get_caller_info(message='', use_color=True): | |
# Get the current frame | |
curr_frame = inspect.currentframe() | |
# Get the caller's frame | |
caller_frame = curr_frame.f_back | |
# Retrieve the information about the caller's frame | |
frame_info = inspect.getframeinfo(caller_frame) | |
# Get the file name where the function was called | |
filename_with_path = frame_info.filename | |
# Extract only the file name | |
filename = os.path.basename(filename_with_path) | |
# Get the line number in the file where the function was called | |
linenumber = frame_info.lineno | |
# get the function name | |
function = frame_info.function | |
# Format the string to include the passed message | |
#caller_link = f"file:///{filename_with_path}", f"{filename}::{linenumber}::{function}" | |
caller_link = ansi_link_str(f"file:///{filename_with_path}", f"{filename}::{linenumber}::{function}") if use_color else f"{filename}::{linenumber}::{function}" | |
#caller_link = f"{filename}::{linenumber}::{function}" | |
info_str = f"{caller_link}: {message}" # file:// | |
# Clean up to prevent reference cycles | |
del curr_frame | |
del caller_frame | |
return info_str | |
def get_class_name(obj): | |
return obj.__class__.__name__ | |