mew77 commited on
Commit
d7870cb
·
verified ·
1 Parent(s): 9ad12c3

Update mew_log/log_utils.py

Browse files
Files changed (1) hide show
  1. mew_log/log_utils.py +502 -502
mew_log/log_utils.py CHANGED
@@ -1,502 +1,502 @@
1
- # Standard Library Imports
2
- import datetime
3
- import threading
4
- import time
5
- import traceback
6
- from pathlib import Path
7
-
8
- import enum
9
-
10
- # Third-Party Imports
11
- from loguru import logger
12
-
13
- logger.add("./data/log/file_{time}.log", rotation="500 MB") # Automatically rotate too big log files
14
-
15
-
16
- # Local Imports
17
- from ..mew_log.ansi_utils import ansi_color_str, ansi_link_str
18
- from ..mew_log.attr_utils import get_prev_caller_info, get_caller_info, get_thread_info, get_prev_frame, get_prev_frame_from_frame, is_of_type
19
- from ..mew_log.table_utils import format_property, format_table
20
- from ..mew_log.extended_symbols import ExtendedSymbols, draw_box, draw_arrow, format_arrow
21
- #from ..mew_log.mew_context import MewContext, resolve_context
22
-
23
- # Constants
24
- lastLog = None # Stores the most recent log entry
25
- debug_log = [] # Stores all log entries
26
- startTime=time.time()
27
- #settings vars
28
- enable_log = True
29
- enable_log_to_file = False
30
- log_filename = "log"
31
- log_file_path = "data/log/log.txt"
32
- enable_fancy = True
33
- enable_use_color = True
34
-
35
- type_color='yellow'
36
- value_color='bright_yellow'
37
- arrow_color = 'bright_white'
38
-
39
- def allow_curly_braces(original_string):
40
- if "{" in original_string or "}" in original_string:
41
- escaped_string = original_string.replace("{", "{{").replace("}", "}}")
42
- #print("Escaped String:", escaped_string) # Debug output
43
- return escaped_string
44
- return original_string
45
-
46
- def format_arg(arg, use_color, fancy):
47
- def is_complex_type(obj):
48
- return is_of_type(obj, (list, set, dict)) or hasattr(obj, '__dict__')
49
-
50
- type_str = f"<{type(arg).__name__}>"
51
- value_str = repr(arg) if not isinstance(arg, str) and not fancy else format_table(arg, use_color=use_color)
52
- newline_if_needed = '\n' if is_complex_type(arg) else ""
53
- formatted_arg = f"{type_str}:{newline_if_needed}{value_str}"
54
- formatted_arg = allow_curly_braces(formatted_arg)
55
- return ansi_color_str(formatted_arg, fg=value_color) if use_color else formatted_arg
56
-
57
-
58
- def format_kwarg(kw, arg, use_color, fancy):
59
- name_str = ansi_color_str(kw, fg=kw_color) if use_color else kw
60
- arg_str = format_arg(arg, use_color, fancy)
61
- formatted_arg = f"{name_str}: {arg_str}"
62
- return ansi_color_str(formatted_arg, fg=value_color) if use_color else formatted_arg
63
-
64
- def format_args(use_color=enable_use_color, fancy=enable_fancy, *args, **kwargs):
65
- formatted_args = [ format_arg(arg, use_color, fancy) for arg in args]
66
- formatted_kwargs = {k: format_kwarg(k, v, use_color, fancy) for k, v in kwargs.items()}
67
- return formatted_args, formatted_kwargs
68
-
69
- def make_formatted_msg(msg, *args, **kwargs):
70
- #fetch format vars from kwargs
71
- fancy = kwargs.pop('fancy', enable_fancy)
72
- use_color = kwargs.pop('use_color', enable_use_color)
73
- context_depth = kwargs.pop('context_depth', None)
74
- context = kwargs.pop('context', None)
75
- # print(f"args={args}")
76
- # print(f"kwargs={kwargs}")
77
-
78
- formatted_args, formatted_kwargs = format_args(use_color, fancy, *args, **kwargs)
79
-
80
- msgf = stringf(msg, *formatted_args, **formatted_kwargs)
81
-
82
- #context = MewContext.resolve_context(context, depth, steps=5)
83
- if fancy:
84
- formatted_msg = f"\n ~ | {get_timestamp()} | {get_thread_info(get_prev_caller_info(msgf, use_color=use_color, steps=2))}"
85
- else:
86
- formatted_msg = f"\n ~ | {get_timestamp()} | {context}:{msgf}"
87
-
88
- return formatted_msg
89
-
90
- def stringf(s: str, *args, **kwargs):
91
- # s = allow_curly_braces(s) # Uncomment or modify as needed
92
- if not args and not kwargs:
93
- #print("Both args and kwargs are empty.")
94
- return s
95
- else:
96
- return s.format(*args, **kwargs)
97
- # if not args:
98
- # print("Args is empty.")
99
- # if not kwargs:
100
- # print("Kwargs is empty.")
101
-
102
- # print(s)
103
- # print(f"args: {args}")
104
- # print(f"kwargs: {kwargs}")
105
- # return s.format(*args, **kwargs)
106
-
107
- # No arguments or keyword arguments
108
- # stringf("Hello, World!")
109
-
110
- # # With positional arguments
111
- # stringf("Hello, {}!", "World")
112
-
113
- # # With keyword arguments
114
- # stringf("Hello, {name}!", name="Alice")
115
-
116
- # # With both args and kwargs empty (only prints "Hello, World!")
117
- # stringf("Hello, World!")
118
-
119
- def ensure_directory_exists(directory):
120
- path = Path(directory)
121
- path.mkdir(parents=True, exist_ok=True)
122
-
123
- def ensure_file_exists(file_path):
124
- path = Path(file_path)
125
- path.touch(exist_ok=True)
126
-
127
- #from ..mew_log.mew_log_helper import MewLogHelper
128
-
129
- # Logging Configuration
130
-
131
- # Configure Loguru's logger to use a custom format
132
- import sys
133
- import io
134
-
135
- # Set UTF-8 encoding for stdout and stderr
136
- #sys.stdout = io.TextIOWrapper(sys.stdout.detach(), encoding='utf-8', line_buffering=True)
137
- #sys.stderr = io.TextIOWrapper(sys.stderr.detach(), encoding='utf-8', line_buffering=True)
138
-
139
- #logger.add(sys.stdout, format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | {level} | {message}", backtrace=True)
140
-
141
- logger.configure(handlers=[
142
- {
143
- "sink": sys.stdout,
144
- "format": "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | {level} | {message}", # | <cyan>{file}:{line}:{function}</cyan>
145
- "level": "INFO"
146
- },
147
- {
148
- "sink": sys.stdout,
149
- "format": "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | {level} | {message}", # | <yellow>{file}:{line}:{function}</yellow>
150
- "level": "WARNING"
151
- },
152
- {
153
- "sink": sys.stderr,
154
- "format": "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | {level} | {message}", #| <red>{file}:{line}:{function}</red>
155
- "level": "ERROR"
156
- }
157
- ])
158
-
159
-
160
- # Logging Configuration
161
- def init_log(log_config):
162
- global enable_log_to_file, enable_log, log_filename, log_file_path
163
- enable_log_to_file = log_config['log_to_file']
164
- clear_file_first = log_config['clear_file_first']
165
- enable_log = log_config['enable_log']
166
- log_filename = log_config['log_filename']
167
- log_dir = 'data/log/'
168
- ensure_directory_exists(log_dir)
169
- log_file_path = f"data/log/{log_filename}.txt"
170
- ensure_file_exists(log_file_path)
171
- with open(log_file_path, 'w') as fp:
172
- pass
173
-
174
-
175
- print(f'init_log: \n ~ enable_log: {enable_log} \n ~ enable_log_to_file: {enable_log_to_file} \n ~ log_file_path: {log_file_path}')
176
-
177
-
178
- def _update_log(formatted_msg_str):
179
- global lastLog, debug_log, enable_log_to_file
180
-
181
- lastLog = formatted_msg_str
182
-
183
- debug_log.append(lastLog)
184
-
185
- if enable_log_to_file:
186
- log_file(lastLog)
187
-
188
-
189
- def log_file(msg, file_path = None):
190
- global log_file_path
191
- if not enable_log:
192
- return
193
- if file_path is None:
194
- file_path = log_file_path
195
-
196
- curTime = time.time()
197
- elapsedTime = curTime - startTime
198
- with open(log_file_path, 'w') as fp:
199
- fp.write(f"\n ~ | time={elapsedTime:.2f}s ~ {msg}")
200
- #logger.add(log_file_path, format="time={elapsedTime:.2f}s ~ {msg}")
201
-
202
-
203
-
204
- def log_info(msg, *args,**kwargs):
205
- if not enable_log:
206
- return
207
-
208
- formatted_msg_str = make_formatted_msg(msg, *args,**kwargs)
209
-
210
- _update_log(formatted_msg_str)
211
-
212
- formatted_log_str = lastLog
213
- print(formatted_log_str)
214
- #logger.info(formatted_log_str)
215
-
216
- def log_warning(msg, *args,**kwargs):
217
- if not enable_log:
218
- return
219
-
220
- formatted_msg_str = make_formatted_msg(msg, *args,**kwargs)
221
-
222
- _update_log(formatted_msg_str)
223
-
224
- formatted_log_str = lastLog
225
-
226
- #print(formatted_log_str)
227
-
228
- logger.warning(formatted_log_str)
229
-
230
- def log_error(msg, e, *args,**kwargs):
231
- if not enable_log:
232
- return
233
-
234
- formatted_msg_str = f"\n===^_^===\n{make_formatted_msg(msg, *args, **kwargs)}"
235
- formatted_msg_str += f"{trace_error(e)}\n===>_<==="
236
-
237
- _update_log(formatted_msg_str)
238
-
239
- formatted_log_str = lastLog
240
-
241
- #print(formatted_log_str)
242
-
243
- logger.error(formatted_log_str)
244
-
245
-
246
- import functools
247
- import traceback
248
-
249
- def log_function(func):
250
- @functools.wraps(func)
251
- def wrapper(*args, **kwargs):
252
- def get_func_args():
253
- arg_names = func.__code__.co_varnames[:func.__code__.co_argcount]
254
- func_args = dict(zip(arg_names, args))
255
- func_args.update(kwargs)
256
-
257
- try:
258
- arg_names = func.__code__.co_varnames[:func.__code__.co_argcount]
259
- func_args = dict(zip(arg_names, args))
260
- func_args.update(kwargs)
261
- func_args_str = format_table(func_args, headers=None, tablefmt='simple')
262
- #print(get_prev_caller_info(f"({func_args_str} )"))
263
- result = func(*args, **kwargs)
264
- print(get_thread_info(get_prev_caller_info(f"({func_args} ) -> result: {format_arg(result, use_color=True, fancy=True)}")))
265
- return result
266
- except Exception as e:
267
- arg_names = func.__code__.co_varnames[:func.__code__.co_argcount]
268
- func_args = dict(zip(arg_names, args))
269
- func_args.update(kwargs)
270
-
271
- logger.error(f"Exception: {str(e)}")
272
- logger.error(f"Format Traceback: {traceback.format_exc()}")
273
- raise # Re-raise the exception after logging
274
- return wrapper
275
-
276
- # @log_function
277
- # def dive(a, b):
278
- # return a / b
279
-
280
- # dive(2,3)
281
-
282
- # dive(12,3)
283
-
284
- # dive(14,3)
285
-
286
- # dive(21,3)
287
-
288
- # dive(21,0)
289
-
290
- error_color = 'bright_red'
291
-
292
- def format_traceback(tb_entry, depth=0, use_color=enable_use_color):
293
- arrow_str = format_arrow('T', 'double', 3 + 2 * depth)
294
-
295
- filename, lineno, funcname, line = tb_entry
296
- file_path = f"file:///{filename}"
297
- file_link = f"{filename}::{lineno}::{funcname}"
298
- trace_link = ansi_link_str(file_path, file_link, link_color="bright_magenta") if use_color else file_link
299
-
300
- arrow_str2 = format_arrow('L', 'double', 3 + 2 * (depth+1))
301
- #(" " * (3 + 2 * depth))+
302
- formatted_tb_str = f'\n ~ | {arrow_str}trace({depth}): {trace_link}\n ~ | {arrow_str2} Line: "{line}"'
303
- return ansi_color_str(formatted_tb_str, fg=error_color) if use_color else formatted_tb_str
304
-
305
- def trace_error(e, use_color=True, fancy=True):
306
- exception_str = f"\n ~ | {ExtendedSymbols.DOUBLE_VERTICAL}Exception: {repr(e)}"
307
- out_str = ansi_color_str(exception_str, fg=error_color) if use_color else exception_str
308
- if isinstance(e, BaseException):
309
- arrow_str = format_arrow('T', 'double', 3)
310
- traceback_obj = e.__traceback__
311
- if traceback_obj:
312
- file_path = f"file:///{traceback_obj.tb_frame.f_code.co_filename}"
313
- file_link = f"{traceback_obj.tb_frame.f_code.co_filename}::{traceback_obj.tb_lineno}"
314
- trace_link = ansi_link_str(file_path, file_link, link_color="bright_magenta") if use_color else file_link
315
- formatted_tb_str = f"\n ~ | {arrow_str}trace(0): {trace_link}"
316
- out_str += ansi_color_str(formatted_tb_str, fg=error_color) if use_color else formatted_tb_str
317
-
318
- tb = traceback.extract_tb(traceback_obj)
319
- for i, tb_entry in enumerate(tb):
320
- out_str += format_traceback(tb_entry, depth=i+1, use_color=use_color)
321
-
322
- return ansi_color_str(out_str, fg=error_color) if use_color else out_str
323
-
324
-
325
- # log_info(' ~ | test log_info: {}', 1)
326
- # log_warning(' ~ | test log_warning: {}', 2)
327
-
328
-
329
- def log_text_input(key: str):
330
- log_info(f'\n ~ | Text Input[{key}]: ')
331
- input_text = input()
332
- log_info(f'\n ~ | Recv input: "{input_text}"')
333
- return input_text
334
-
335
-
336
- def format_duration(seconds):
337
- hours, remainder = divmod(seconds, 3600)
338
- minutes, seconds = divmod(remainder, 60)
339
- return f"{int(hours)}h {int(minutes)}m {int(seconds)}s"
340
-
341
- def parse_duration(duration_str):
342
- parts = duration_str.split()
343
- hours = int(parts[0][:-1]) if 'h' in parts[0] else 0
344
- minutes = int(parts[1][:-1]) if 'm' in parts[1] else 0
345
- seconds = int(parts[2][:-1]) if 's' in parts[2] else 0
346
- return hours * 3600 + minutes * 60 + seconds
347
-
348
-
349
- def get_timestamp():
350
- return datetime.datetime.now().strftime("%Y-%b-%d %H:%M:%S")
351
-
352
-
353
- def format_path(directory, depth=0, max_depth=3, use_color=True):
354
- """Formats the directory structure as a string up to a specified depth, with optional color.
355
-
356
- Args:
357
- directory (str or Path): The directory path to format.
358
- depth (int): The current depth in the directory structure.
359
- max_depth (int): The maximum depth to format.
360
- use_color (bool): Whether to apply color to the output.
361
-
362
- Returns:
363
- str: The formatted directory structure.
364
- """
365
- if max_depth is not None and depth > max_depth:
366
- return ""
367
-
368
- directory_path = Path(directory)
369
- if not directory_path.is_dir():
370
- return "The specified path is not a directory.\n"
371
-
372
- # Build the formatted string, applying color if requested
373
- line_prefix = "│ " * depth + "├── "
374
- directory_name = directory_path.name
375
- if use_color:
376
- line_prefix = ansi_color_str(line_prefix, fg='cyan') # Assuming color_str function exists
377
- directory_name = ansi_color_str(directory_name, fg='green') # Adjust colors as needed
378
-
379
- formatted_str = line_prefix + directory_name + "\n"
380
-
381
- # Sort directory contents for consistent order
382
- sorted_items = sorted(directory_path.iterdir(), key=lambda x: (x.is_file(), x.name))
383
-
384
- for item in sorted_items:
385
- if item.is_dir():
386
- # Recursive call for directories, increasing the depth
387
- formatted_str += format_path(item, depth + 1, max_depth, use_color)
388
- else:
389
- # Include file name with indentation, applying color if requested
390
- file_line = "│ " * (depth + 1) + "├── " + item.name
391
- if use_color:
392
- file_line = ansi_color_str(file_line, fg='white') # Example color, adjust as needed
393
- formatted_str += file_line + "\n"
394
-
395
- return formatted_str
396
-
397
-
398
- # Spinner Class
399
- class LogSpinner:
400
- def __init__(self, message="", rainbow=True, anim_speed=0.1):
401
- self.message = message
402
- self.speed = anim_speed
403
- self.colors = ['\033[31m', '\033[33m', '\033[32m', '\033[34m', '\033[35m', '\033[36m']
404
- self.spinner = ['⠇', '⠋', '⠙', '⠸', '⠼', '⠴', '⠦', '⠧']
405
-
406
- #self.spinner = ['|', '/', '-', '\\']
407
- self.rainbow = rainbow
408
-
409
- def __enter__(self):
410
- self.stop_spinner = False
411
- self.startTime = time.time()
412
- # spinner_text = f'^_^ | {self.message} - time: {0.0:.2f}s | Begin!'
413
- # print(spinner_text)
414
- self.spinner_thread = threading.Thread(target=self.spin)
415
- self.spinner_thread.start()
416
- return self
417
-
418
- def __exit__(self, exc_type, exc_value, traceback):
419
- self.stop_spinner = True
420
- self.spinner_thread.join()
421
-
422
- curTime = time.time()
423
- spinner_text = f'>_< | {self.message} - time: {format_duration(curTime - self.startTime)} | Done!'
424
- print(spinner_text)
425
-
426
- def spin(self):
427
- while not self.stop_spinner:
428
- #self.update_spinner()
429
- if self.rainbow: self.update_color_spinner()
430
- else: self.update_spinner()
431
-
432
- def update_spinner(self):
433
- for char in self.spinner:
434
- curTime = time.time()
435
- spinner_text = f'{self.message} - time: {format_duration(curTime - self.startTime)} |'
436
- print(f'\r ^_^ | {char} | {spinner_text}', end='\r', flush=True)
437
- time.sleep(self.speed)
438
-
439
- def update_color_spinner(self):
440
- for color in self.colors:
441
- for char in self.spinner:
442
- curTime = time.time()
443
- spinner_text = f'{self.message} - time: {format_duration(curTime - self.startTime)} |'
444
- print(f'^_^ | {color}{char}\033[0m | {spinner_text}', end='\r', flush=True)
445
- time.sleep(self.speed)
446
-
447
-
448
- def run_unit_test():
449
- # Example usage:
450
- class Person:
451
- def __init__(self, name, age, city):
452
- self.name = name
453
- self.age = age
454
- self.city = city
455
-
456
- # Create instances of Person
457
- person1 = Person('John', 30, 'New York')
458
- person2 = Person('Alice', 25, 'Los Angeles')
459
- person3 = Person('Bob', 30, 'Hong Kong')
460
- person4 = Person('Charlie', 35, 'Shanghai')
461
- person5 = Person('David', 40, 'Beijing')
462
-
463
- # Define other data structures
464
- data_dict = {'Name': 'John', 'Age': 30, 'City': 'New York'}
465
- data_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
466
- data_set = {'apple', 'banana', 'orange'}
467
- data_dict_of_lists = {'Name': ['John', 'Alice'], 'Age': [30, 25], 'City': ['New York', 'Los Angeles']}
468
- data_list_of_dicts = [person1.__dict__, person2.__dict__]
469
- data_dict_of_dicts = {'Person1': person1.__dict__, 'Person2': person2.__dict__}
470
- list_of_objs = [person3, person4, person5]
471
- data_list_of_lists = [['John', 30, 'New York'], ['Alice', 25, 'Los Angeles']]
472
- dict_of_objs = {'Alice': person3, 'Bob': person4, 'Charlie': person5}
473
- dict_of_list_of_objs = {'Group1': [person3, person4], 'Group2': [person5]}
474
-
475
- # Combine all data into a complex structure
476
- complex_data = {
477
- 'data_dict': data_dict,
478
- 'data_list': data_list,
479
- 'data_set': data_set,
480
- 'data_dict_of_lists': data_dict_of_lists,
481
- 'data_list_of_dicts': data_list_of_dicts,
482
- 'data_dict_of_dicts': data_dict_of_dicts,
483
- 'list_of_objs': list_of_objs,
484
- 'data_list_of_lists': data_list_of_lists,
485
- 'dict_of_objs': dict_of_objs,
486
- 'dict_of_list_of_objs': dict_of_list_of_objs
487
- }
488
-
489
- # Log each unique data structure
490
- log_info("Data Dictionary: {}", data_dict)
491
- log_info("Data List: {}", data_list)
492
- log_info("Data Set: {}", data_set)
493
- log_info("Data Dictionary of Lists: {}", data_dict_of_lists)
494
- log_info("Data List of Dicts: {}", data_list_of_dicts)
495
- log_info("Data Dictionary of Dicts: {}", data_dict_of_dicts)
496
- log_info("List of Objects: {}", list_of_objs)
497
- log_info("Data List of Lists: {}", data_list_of_lists)
498
- log_info("Dictionary of Objects: {}", dict_of_objs)
499
- log_info("Dictionary of List of Objects: {}", dict_of_list_of_objs)
500
-
501
-
502
- #run_unit_test()
 
1
+ # Standard Library Imports
2
+ import datetime
3
+ import threading
4
+ import time
5
+ import traceback
6
+ from pathlib import Path
7
+
8
+ import enum
9
+
10
+ # Third-Party Imports
11
+ from loguru import logger
12
+
13
+ logger.add("./data/log/file_{time}.log", rotation="500 MB") # Automatically rotate too big log files
14
+
15
+
16
+ # Local Imports
17
+ from .ansi_utils import ansi_color_str, ansi_link_str
18
+ from .attr_utils import get_prev_caller_info, get_caller_info, get_thread_info, get_prev_frame, get_prev_frame_from_frame, is_of_type
19
+ from .table_utils import format_property, format_table
20
+ from .extended_symbols import ExtendedSymbols, draw_box, draw_arrow, format_arrow
21
+ #from ..mew_log.mew_context import MewContext, resolve_context
22
+
23
+ # Constants
24
+ lastLog = None # Stores the most recent log entry
25
+ debug_log = [] # Stores all log entries
26
+ startTime=time.time()
27
+ #settings vars
28
+ enable_log = True
29
+ enable_log_to_file = False
30
+ log_filename = "log"
31
+ log_file_path = "data/log/log.txt"
32
+ enable_fancy = True
33
+ enable_use_color = True
34
+
35
+ type_color='yellow'
36
+ value_color='bright_yellow'
37
+ arrow_color = 'bright_white'
38
+
39
+ def allow_curly_braces(original_string):
40
+ if "{" in original_string or "}" in original_string:
41
+ escaped_string = original_string.replace("{", "{{").replace("}", "}}")
42
+ #print("Escaped String:", escaped_string) # Debug output
43
+ return escaped_string
44
+ return original_string
45
+
46
+ def format_arg(arg, use_color, fancy):
47
+ def is_complex_type(obj):
48
+ return is_of_type(obj, (list, set, dict)) or hasattr(obj, '__dict__')
49
+
50
+ type_str = f"<{type(arg).__name__}>"
51
+ value_str = repr(arg) if not isinstance(arg, str) and not fancy else format_table(arg, use_color=use_color)
52
+ newline_if_needed = '\n' if is_complex_type(arg) else ""
53
+ formatted_arg = f"{type_str}:{newline_if_needed}{value_str}"
54
+ formatted_arg = allow_curly_braces(formatted_arg)
55
+ return ansi_color_str(formatted_arg, fg=value_color) if use_color else formatted_arg
56
+
57
+
58
+ def format_kwarg(kw, arg, use_color, fancy):
59
+ name_str = ansi_color_str(kw, fg=kw_color) if use_color else kw
60
+ arg_str = format_arg(arg, use_color, fancy)
61
+ formatted_arg = f"{name_str}: {arg_str}"
62
+ return ansi_color_str(formatted_arg, fg=value_color) if use_color else formatted_arg
63
+
64
+ def format_args(use_color=enable_use_color, fancy=enable_fancy, *args, **kwargs):
65
+ formatted_args = [ format_arg(arg, use_color, fancy) for arg in args]
66
+ formatted_kwargs = {k: format_kwarg(k, v, use_color, fancy) for k, v in kwargs.items()}
67
+ return formatted_args, formatted_kwargs
68
+
69
+ def make_formatted_msg(msg, *args, **kwargs):
70
+ #fetch format vars from kwargs
71
+ fancy = kwargs.pop('fancy', enable_fancy)
72
+ use_color = kwargs.pop('use_color', enable_use_color)
73
+ context_depth = kwargs.pop('context_depth', None)
74
+ context = kwargs.pop('context', None)
75
+ # print(f"args={args}")
76
+ # print(f"kwargs={kwargs}")
77
+
78
+ formatted_args, formatted_kwargs = format_args(use_color, fancy, *args, **kwargs)
79
+
80
+ msgf = stringf(msg, *formatted_args, **formatted_kwargs)
81
+
82
+ #context = MewContext.resolve_context(context, depth, steps=5)
83
+ if fancy:
84
+ formatted_msg = f"\n ~ | {get_timestamp()} | {get_thread_info(get_prev_caller_info(msgf, use_color=use_color, steps=2))}"
85
+ else:
86
+ formatted_msg = f"\n ~ | {get_timestamp()} | {context}:{msgf}"
87
+
88
+ return formatted_msg
89
+
90
+ def stringf(s: str, *args, **kwargs):
91
+ # s = allow_curly_braces(s) # Uncomment or modify as needed
92
+ if not args and not kwargs:
93
+ #print("Both args and kwargs are empty.")
94
+ return s
95
+ else:
96
+ return s.format(*args, **kwargs)
97
+ # if not args:
98
+ # print("Args is empty.")
99
+ # if not kwargs:
100
+ # print("Kwargs is empty.")
101
+
102
+ # print(s)
103
+ # print(f"args: {args}")
104
+ # print(f"kwargs: {kwargs}")
105
+ # return s.format(*args, **kwargs)
106
+
107
+ # No arguments or keyword arguments
108
+ # stringf("Hello, World!")
109
+
110
+ # # With positional arguments
111
+ # stringf("Hello, {}!", "World")
112
+
113
+ # # With keyword arguments
114
+ # stringf("Hello, {name}!", name="Alice")
115
+
116
+ # # With both args and kwargs empty (only prints "Hello, World!")
117
+ # stringf("Hello, World!")
118
+
119
+ def ensure_directory_exists(directory):
120
+ path = Path(directory)
121
+ path.mkdir(parents=True, exist_ok=True)
122
+
123
+ def ensure_file_exists(file_path):
124
+ path = Path(file_path)
125
+ path.touch(exist_ok=True)
126
+
127
+ #from ..mew_log.mew_log_helper import MewLogHelper
128
+
129
+ # Logging Configuration
130
+
131
+ # Configure Loguru's logger to use a custom format
132
+ import sys
133
+ import io
134
+
135
+ # Set UTF-8 encoding for stdout and stderr
136
+ #sys.stdout = io.TextIOWrapper(sys.stdout.detach(), encoding='utf-8', line_buffering=True)
137
+ #sys.stderr = io.TextIOWrapper(sys.stderr.detach(), encoding='utf-8', line_buffering=True)
138
+
139
+ #logger.add(sys.stdout, format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | {level} | {message}", backtrace=True)
140
+
141
+ logger.configure(handlers=[
142
+ {
143
+ "sink": sys.stdout,
144
+ "format": "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | {level} | {message}", # | <cyan>{file}:{line}:{function}</cyan>
145
+ "level": "INFO"
146
+ },
147
+ {
148
+ "sink": sys.stdout,
149
+ "format": "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | {level} | {message}", # | <yellow>{file}:{line}:{function}</yellow>
150
+ "level": "WARNING"
151
+ },
152
+ {
153
+ "sink": sys.stderr,
154
+ "format": "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | {level} | {message}", #| <red>{file}:{line}:{function}</red>
155
+ "level": "ERROR"
156
+ }
157
+ ])
158
+
159
+
160
+ # Logging Configuration
161
+ def init_log(log_config):
162
+ global enable_log_to_file, enable_log, log_filename, log_file_path
163
+ enable_log_to_file = log_config['log_to_file']
164
+ clear_file_first = log_config['clear_file_first']
165
+ enable_log = log_config['enable_log']
166
+ log_filename = log_config['log_filename']
167
+ log_dir = 'data/log/'
168
+ ensure_directory_exists(log_dir)
169
+ log_file_path = f"data/log/{log_filename}.txt"
170
+ ensure_file_exists(log_file_path)
171
+ with open(log_file_path, 'w') as fp:
172
+ pass
173
+
174
+
175
+ print(f'init_log: \n ~ enable_log: {enable_log} \n ~ enable_log_to_file: {enable_log_to_file} \n ~ log_file_path: {log_file_path}')
176
+
177
+
178
+ def _update_log(formatted_msg_str):
179
+ global lastLog, debug_log, enable_log_to_file
180
+
181
+ lastLog = formatted_msg_str
182
+
183
+ debug_log.append(lastLog)
184
+
185
+ if enable_log_to_file:
186
+ log_file(lastLog)
187
+
188
+
189
+ def log_file(msg, file_path = None):
190
+ global log_file_path
191
+ if not enable_log:
192
+ return
193
+ if file_path is None:
194
+ file_path = log_file_path
195
+
196
+ curTime = time.time()
197
+ elapsedTime = curTime - startTime
198
+ with open(log_file_path, 'w') as fp:
199
+ fp.write(f"\n ~ | time={elapsedTime:.2f}s ~ {msg}")
200
+ #logger.add(log_file_path, format="time={elapsedTime:.2f}s ~ {msg}")
201
+
202
+
203
+
204
+ def log_info(msg, *args,**kwargs):
205
+ if not enable_log:
206
+ return
207
+
208
+ formatted_msg_str = make_formatted_msg(msg, *args,**kwargs)
209
+
210
+ _update_log(formatted_msg_str)
211
+
212
+ formatted_log_str = lastLog
213
+ print(formatted_log_str)
214
+ #logger.info(formatted_log_str)
215
+
216
+ def log_warning(msg, *args,**kwargs):
217
+ if not enable_log:
218
+ return
219
+
220
+ formatted_msg_str = make_formatted_msg(msg, *args,**kwargs)
221
+
222
+ _update_log(formatted_msg_str)
223
+
224
+ formatted_log_str = lastLog
225
+
226
+ #print(formatted_log_str)
227
+
228
+ logger.warning(formatted_log_str)
229
+
230
+ def log_error(msg, e, *args,**kwargs):
231
+ if not enable_log:
232
+ return
233
+
234
+ formatted_msg_str = f"\n===^_^===\n{make_formatted_msg(msg, *args, **kwargs)}"
235
+ formatted_msg_str += f"{trace_error(e)}\n===>_<==="
236
+
237
+ _update_log(formatted_msg_str)
238
+
239
+ formatted_log_str = lastLog
240
+
241
+ #print(formatted_log_str)
242
+
243
+ logger.error(formatted_log_str)
244
+
245
+
246
+ import functools
247
+ import traceback
248
+
249
+ def log_function(func):
250
+ @functools.wraps(func)
251
+ def wrapper(*args, **kwargs):
252
+ def get_func_args():
253
+ arg_names = func.__code__.co_varnames[:func.__code__.co_argcount]
254
+ func_args = dict(zip(arg_names, args))
255
+ func_args.update(kwargs)
256
+
257
+ try:
258
+ arg_names = func.__code__.co_varnames[:func.__code__.co_argcount]
259
+ func_args = dict(zip(arg_names, args))
260
+ func_args.update(kwargs)
261
+ func_args_str = format_table(func_args, headers=None, tablefmt='simple')
262
+ #print(get_prev_caller_info(f"({func_args_str} )"))
263
+ result = func(*args, **kwargs)
264
+ print(get_thread_info(get_prev_caller_info(f"({func_args} ) -> result: {format_arg(result, use_color=True, fancy=True)}")))
265
+ return result
266
+ except Exception as e:
267
+ arg_names = func.__code__.co_varnames[:func.__code__.co_argcount]
268
+ func_args = dict(zip(arg_names, args))
269
+ func_args.update(kwargs)
270
+
271
+ logger.error(f"Exception: {str(e)}")
272
+ logger.error(f"Format Traceback: {traceback.format_exc()}")
273
+ raise # Re-raise the exception after logging
274
+ return wrapper
275
+
276
+ # @log_function
277
+ # def dive(a, b):
278
+ # return a / b
279
+
280
+ # dive(2,3)
281
+
282
+ # dive(12,3)
283
+
284
+ # dive(14,3)
285
+
286
+ # dive(21,3)
287
+
288
+ # dive(21,0)
289
+
290
+ error_color = 'bright_red'
291
+
292
+ def format_traceback(tb_entry, depth=0, use_color=enable_use_color):
293
+ arrow_str = format_arrow('T', 'double', 3 + 2 * depth)
294
+
295
+ filename, lineno, funcname, line = tb_entry
296
+ file_path = f"file:///{filename}"
297
+ file_link = f"{filename}::{lineno}::{funcname}"
298
+ trace_link = ansi_link_str(file_path, file_link, link_color="bright_magenta") if use_color else file_link
299
+
300
+ arrow_str2 = format_arrow('L', 'double', 3 + 2 * (depth+1))
301
+ #(" " * (3 + 2 * depth))+
302
+ formatted_tb_str = f'\n ~ | {arrow_str}trace({depth}): {trace_link}\n ~ | {arrow_str2} Line: "{line}"'
303
+ return ansi_color_str(formatted_tb_str, fg=error_color) if use_color else formatted_tb_str
304
+
305
+ def trace_error(e, use_color=True, fancy=True):
306
+ exception_str = f"\n ~ | {ExtendedSymbols.DOUBLE_VERTICAL}Exception: {repr(e)}"
307
+ out_str = ansi_color_str(exception_str, fg=error_color) if use_color else exception_str
308
+ if isinstance(e, BaseException):
309
+ arrow_str = format_arrow('T', 'double', 3)
310
+ traceback_obj = e.__traceback__
311
+ if traceback_obj:
312
+ file_path = f"file:///{traceback_obj.tb_frame.f_code.co_filename}"
313
+ file_link = f"{traceback_obj.tb_frame.f_code.co_filename}::{traceback_obj.tb_lineno}"
314
+ trace_link = ansi_link_str(file_path, file_link, link_color="bright_magenta") if use_color else file_link
315
+ formatted_tb_str = f"\n ~ | {arrow_str}trace(0): {trace_link}"
316
+ out_str += ansi_color_str(formatted_tb_str, fg=error_color) if use_color else formatted_tb_str
317
+
318
+ tb = traceback.extract_tb(traceback_obj)
319
+ for i, tb_entry in enumerate(tb):
320
+ out_str += format_traceback(tb_entry, depth=i+1, use_color=use_color)
321
+
322
+ return ansi_color_str(out_str, fg=error_color) if use_color else out_str
323
+
324
+
325
+ # log_info(' ~ | test log_info: {}', 1)
326
+ # log_warning(' ~ | test log_warning: {}', 2)
327
+
328
+
329
+ def log_text_input(key: str):
330
+ log_info(f'\n ~ | Text Input[{key}]: ')
331
+ input_text = input()
332
+ log_info(f'\n ~ | Recv input: "{input_text}"')
333
+ return input_text
334
+
335
+
336
+ def format_duration(seconds):
337
+ hours, remainder = divmod(seconds, 3600)
338
+ minutes, seconds = divmod(remainder, 60)
339
+ return f"{int(hours)}h {int(minutes)}m {int(seconds)}s"
340
+
341
+ def parse_duration(duration_str):
342
+ parts = duration_str.split()
343
+ hours = int(parts[0][:-1]) if 'h' in parts[0] else 0
344
+ minutes = int(parts[1][:-1]) if 'm' in parts[1] else 0
345
+ seconds = int(parts[2][:-1]) if 's' in parts[2] else 0
346
+ return hours * 3600 + minutes * 60 + seconds
347
+
348
+
349
+ def get_timestamp():
350
+ return datetime.datetime.now().strftime("%Y-%b-%d %H:%M:%S")
351
+
352
+
353
+ def format_path(directory, depth=0, max_depth=3, use_color=True):
354
+ """Formats the directory structure as a string up to a specified depth, with optional color.
355
+
356
+ Args:
357
+ directory (str or Path): The directory path to format.
358
+ depth (int): The current depth in the directory structure.
359
+ max_depth (int): The maximum depth to format.
360
+ use_color (bool): Whether to apply color to the output.
361
+
362
+ Returns:
363
+ str: The formatted directory structure.
364
+ """
365
+ if max_depth is not None and depth > max_depth:
366
+ return ""
367
+
368
+ directory_path = Path(directory)
369
+ if not directory_path.is_dir():
370
+ return "The specified path is not a directory.\n"
371
+
372
+ # Build the formatted string, applying color if requested
373
+ line_prefix = "│ " * depth + "├── "
374
+ directory_name = directory_path.name
375
+ if use_color:
376
+ line_prefix = ansi_color_str(line_prefix, fg='cyan') # Assuming color_str function exists
377
+ directory_name = ansi_color_str(directory_name, fg='green') # Adjust colors as needed
378
+
379
+ formatted_str = line_prefix + directory_name + "\n"
380
+
381
+ # Sort directory contents for consistent order
382
+ sorted_items = sorted(directory_path.iterdir(), key=lambda x: (x.is_file(), x.name))
383
+
384
+ for item in sorted_items:
385
+ if item.is_dir():
386
+ # Recursive call for directories, increasing the depth
387
+ formatted_str += format_path(item, depth + 1, max_depth, use_color)
388
+ else:
389
+ # Include file name with indentation, applying color if requested
390
+ file_line = "│ " * (depth + 1) + "├── " + item.name
391
+ if use_color:
392
+ file_line = ansi_color_str(file_line, fg='white') # Example color, adjust as needed
393
+ formatted_str += file_line + "\n"
394
+
395
+ return formatted_str
396
+
397
+
398
+ # Spinner Class
399
+ class LogSpinner:
400
+ def __init__(self, message="", rainbow=True, anim_speed=0.1):
401
+ self.message = message
402
+ self.speed = anim_speed
403
+ self.colors = ['\033[31m', '\033[33m', '\033[32m', '\033[34m', '\033[35m', '\033[36m']
404
+ self.spinner = ['⠇', '⠋', '⠙', '⠸', '⠼', '⠴', '⠦', '⠧']
405
+
406
+ #self.spinner = ['|', '/', '-', '\\']
407
+ self.rainbow = rainbow
408
+
409
+ def __enter__(self):
410
+ self.stop_spinner = False
411
+ self.startTime = time.time()
412
+ # spinner_text = f'^_^ | {self.message} - time: {0.0:.2f}s | Begin!'
413
+ # print(spinner_text)
414
+ self.spinner_thread = threading.Thread(target=self.spin)
415
+ self.spinner_thread.start()
416
+ return self
417
+
418
+ def __exit__(self, exc_type, exc_value, traceback):
419
+ self.stop_spinner = True
420
+ self.spinner_thread.join()
421
+
422
+ curTime = time.time()
423
+ spinner_text = f'>_< | {self.message} - time: {format_duration(curTime - self.startTime)} | Done!'
424
+ print(spinner_text)
425
+
426
+ def spin(self):
427
+ while not self.stop_spinner:
428
+ #self.update_spinner()
429
+ if self.rainbow: self.update_color_spinner()
430
+ else: self.update_spinner()
431
+
432
+ def update_spinner(self):
433
+ for char in self.spinner:
434
+ curTime = time.time()
435
+ spinner_text = f'{self.message} - time: {format_duration(curTime - self.startTime)} |'
436
+ print(f'\r ^_^ | {char} | {spinner_text}', end='\r', flush=True)
437
+ time.sleep(self.speed)
438
+
439
+ def update_color_spinner(self):
440
+ for color in self.colors:
441
+ for char in self.spinner:
442
+ curTime = time.time()
443
+ spinner_text = f'{self.message} - time: {format_duration(curTime - self.startTime)} |'
444
+ print(f'^_^ | {color}{char}\033[0m | {spinner_text}', end='\r', flush=True)
445
+ time.sleep(self.speed)
446
+
447
+
448
+ def run_unit_test():
449
+ # Example usage:
450
+ class Person:
451
+ def __init__(self, name, age, city):
452
+ self.name = name
453
+ self.age = age
454
+ self.city = city
455
+
456
+ # Create instances of Person
457
+ person1 = Person('John', 30, 'New York')
458
+ person2 = Person('Alice', 25, 'Los Angeles')
459
+ person3 = Person('Bob', 30, 'Hong Kong')
460
+ person4 = Person('Charlie', 35, 'Shanghai')
461
+ person5 = Person('David', 40, 'Beijing')
462
+
463
+ # Define other data structures
464
+ data_dict = {'Name': 'John', 'Age': 30, 'City': 'New York'}
465
+ data_list = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
466
+ data_set = {'apple', 'banana', 'orange'}
467
+ data_dict_of_lists = {'Name': ['John', 'Alice'], 'Age': [30, 25], 'City': ['New York', 'Los Angeles']}
468
+ data_list_of_dicts = [person1.__dict__, person2.__dict__]
469
+ data_dict_of_dicts = {'Person1': person1.__dict__, 'Person2': person2.__dict__}
470
+ list_of_objs = [person3, person4, person5]
471
+ data_list_of_lists = [['John', 30, 'New York'], ['Alice', 25, 'Los Angeles']]
472
+ dict_of_objs = {'Alice': person3, 'Bob': person4, 'Charlie': person5}
473
+ dict_of_list_of_objs = {'Group1': [person3, person4], 'Group2': [person5]}
474
+
475
+ # Combine all data into a complex structure
476
+ complex_data = {
477
+ 'data_dict': data_dict,
478
+ 'data_list': data_list,
479
+ 'data_set': data_set,
480
+ 'data_dict_of_lists': data_dict_of_lists,
481
+ 'data_list_of_dicts': data_list_of_dicts,
482
+ 'data_dict_of_dicts': data_dict_of_dicts,
483
+ 'list_of_objs': list_of_objs,
484
+ 'data_list_of_lists': data_list_of_lists,
485
+ 'dict_of_objs': dict_of_objs,
486
+ 'dict_of_list_of_objs': dict_of_list_of_objs
487
+ }
488
+
489
+ # Log each unique data structure
490
+ log_info("Data Dictionary: {}", data_dict)
491
+ log_info("Data List: {}", data_list)
492
+ log_info("Data Set: {}", data_set)
493
+ log_info("Data Dictionary of Lists: {}", data_dict_of_lists)
494
+ log_info("Data List of Dicts: {}", data_list_of_dicts)
495
+ log_info("Data Dictionary of Dicts: {}", data_dict_of_dicts)
496
+ log_info("List of Objects: {}", list_of_objs)
497
+ log_info("Data List of Lists: {}", data_list_of_lists)
498
+ log_info("Dictionary of Objects: {}", dict_of_objs)
499
+ log_info("Dictionary of List of Objects: {}", dict_of_list_of_objs)
500
+
501
+
502
+ #run_unit_test()