Spaces:
Running
Running
import json | |
import re | |
from typing import Optional, Tuple, Dict | |
from collections import OrderedDict | |
from collections.abc import Mapping, Iterable | |
from datetime import datetime | |
# import torch | |
# from cradle import constants | |
from utils.string_utils import contains_punctuation, is_numbered_bullet_list_item | |
def load_json(file_path): | |
with open(file_path, mode='r', encoding='utf8') as fp: | |
json_dict = json.load(fp) | |
return json_dict | |
# def serialize_data(item): | |
# """Recursively convert non-serializable items in the dictionary.""" | |
# if isinstance(item, (str, int, float, bool)): | |
# return item | |
# elif isinstance(item, torch.Tensor): | |
# # Check if the tensor is 0-d (a scalar) | |
# if item.dim() == 0: | |
# # Convert scalar tensor to a Python number | |
# return item.item() | |
# else: | |
# # Check if tensor is on a GPU, move to CPU first | |
# if item.is_cuda: | |
# item = item.cpu() | |
# # Convert tensor to a list | |
# return item.numpy().tolist() | |
# elif isinstance(item, datetime): | |
# return item.isoformat() | |
# if isinstance(item, Mapping): | |
# return {key: serialize_data(value) for key, value in item.items()} | |
# elif isinstance(item, Iterable): | |
# return [serialize_data(element) for element in item] | |
# elif isinstance(item, JsonFrameStructure): # Assuming JSONStructure needs to be handled | |
# return item.to_dict() # Assuming JSONStructure objects have a to_dict method or similar | |
# return item | |
# def save_json(file_path, json_dict, indent=-1): | |
# processed_data = serialize_data(json_dict) | |
# with open(file_path, mode='w', encoding='utf8') as fp: | |
# if indent == -1: | |
# json.dump(processed_data, fp, ensure_ascii=False) | |
# else: | |
# json.dump(processed_data, fp, ensure_ascii=False, indent=indent) | |
def check_json(json_string): | |
try: | |
json.loads(json_string) | |
except: | |
return False | |
return True | |
def refine_json(json_string): | |
patterns = [ | |
r"^`+json(.*?)`+", # ```json content```, ```json content``, ... | |
r"^json(.*?)", # json content | |
r"^json(.*?)\." # json content. | |
] | |
for pattern in patterns: | |
match = re.search(pattern, json_string, re.DOTALL) | |
if match: | |
json_string = match.group(1) | |
if check_json(json_string): | |
return json_string | |
return json_string | |
def parse_semi_formatted_json(json_string): | |
obj = None | |
try: | |
response = refine_json(json_string) | |
obj = json.loads(response) | |
except Exception as e: | |
raise ValueError(f"Error in processing json: {e}. Object was: {json_string}.") from e | |
return obj | |
def _is_line_key_candidate(line: str) -> Tuple[bool, Optional[str]]: | |
result = False | |
likely_key = None | |
if line.endswith(':'): | |
# Cannot have other previous punctuation, except if it's a numbered bullet list item | |
num_idx = is_numbered_bullet_list_item(line) | |
post_num_idx = 0 | |
if num_idx > -1: | |
post_num_idx = num_idx | |
likely_key = line[post_num_idx:-1].strip() | |
result = not contains_punctuation(likely_key) | |
return result, likely_key | |
### Parses the semi-formatted text from model response | |
def parse_semi_formatted_text(text): | |
lines = text.split('\n') | |
lines = [line.rstrip() for line in lines if line.rstrip()] | |
result_dict = {} | |
current_key = None | |
current_value = [] | |
parsed_data = [] | |
in_code_flag = False | |
for line in lines: | |
line = line.replace("**", "").replace("###", "").replace("##", "") # Remove unnecessary in Markdown formatting | |
is_key, key_candidate = _is_line_key_candidate(line) | |
# Check if the line indicates a new key | |
if is_key and in_code_flag == False: | |
# If there's a previous key, process its values | |
if current_key and current_key == 'action_guidance': | |
result_dict[current_key] = parsed_data | |
elif current_key: | |
result_dict[current_key] = '\n'.join(current_value).strip() | |
try: | |
current_key = key_candidate.replace(" ", "_").lower() | |
except Exception as e: | |
# logger.error(f"Response is not in the correct format: {e}\nReceived text was: {text}") | |
raise | |
current_value = [] | |
parsed_data = [] | |
else: | |
if current_key == 'action_guidance': | |
in_code_flag = True | |
if line.strip() == '```': | |
if current_value: # Process previous code block and description | |
entry = {"code": '\n'.join(current_value[1:])} | |
parsed_data.append(entry) | |
current_value = [] | |
in_code_flag = False | |
else: | |
current_value.append(line) | |
if line.strip().lower() == 'null': | |
in_code_flag = False | |
else: | |
in_code_flag = False | |
line = line.strip() | |
current_value.append(line) | |
# Process the last key | |
if current_key == 'action_guidance': | |
if current_value: # Process the last code block and description | |
entry = {"code": '\n'.join(current_value[:-1]).strip()} | |
parsed_data.append(entry) | |
result_dict[current_key] = parsed_data | |
else: | |
result_dict[current_key] = '\n'.join(current_value).strip() | |
if "success" in result_dict: | |
result_dict["success"] = result_dict["success"].lower() == "true" | |
return result_dict | |
class JsonFrameStructure(): | |
def __init__(self): | |
self.data_structure: Dict[int, Dict[str, list[Dict[str, any]]]] = {} | |
self.end_index: int = -1 | |
def add_instance(self, timestamp: str, instance: dict[str, any]) -> None: | |
# Check if the timestamp already exists across all indices | |
exists = False | |
for index_data in self.data_structure.values(): | |
if timestamp in index_data: | |
# Timestamp already exists, append the instance to the existing timestamp | |
index_data[timestamp].append(instance) | |
exists = True | |
break | |
if not exists: | |
# Timestamp is new, create a new entry and increment the end_index | |
self.end_index += 1 | |
self.data_structure.setdefault(self.end_index, {}).setdefault(timestamp, []).append(instance) | |
def sort_index_by_timestamp(self) -> None: | |
extracted_data = [(key, value) for entry in self.data_structure.values() for key, value in entry.items()] | |
sorted_data = sorted(extracted_data, key=lambda x: x[0]) | |
# Reconstructing the JSON structure with sorted data | |
self.data_structure = OrderedDict({index: {key: value} for index, (key, value) in enumerate(sorted_data)}) | |
def search_type_across_all_indices(self, search_type: str) -> list[dict[str, any]]: | |
results = [] | |
# Sort the keys in ascending order | |
for index, index_data in sorted(self.data_structure.items()): | |
for object_id, instances in index_data.items(): | |
for instance in instances: | |
for type, values in instance.items(): | |
if type == search_type and values != "" and values != []: | |
results.append({"index": index, "object_id": object_id, "values":values}) | |
return results | |
def to_dict(self): | |
return { | |
"data_structure": self.data_structure, | |
"end_index": self.end_index | |
} | |
def from_dict(cls, data_dict): | |
instance = cls() | |
instance.data_structure = data_dict.get("data_structure", {}) | |
instance.end_index = data_dict.get("end_index", -1) | |
return instance | |