Spaces:
Running
Running
File size: 8,323 Bytes
fe5c39d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 |
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
@Time : 2023/6/8 14:03
@Author : alexanderwu
@File : document.py
@Desc : Classes and Operations Related to Files in the File System.
"""
from enum import Enum
from pathlib import Path
from typing import Optional, Union
import pandas as pd
from llama_index.core import Document, SimpleDirectoryReader
from llama_index.core.node_parser import SimpleNodeParser
from llama_index.readers.file import PDFReader
from pydantic import BaseModel, ConfigDict, Field
from tqdm import tqdm
from metagpt.logs import logger
from metagpt.repo_parser import RepoParser
def validate_cols(content_col: str, df: pd.DataFrame):
if content_col not in df.columns:
raise ValueError("Content column not found in DataFrame.")
def read_data(data_path: Path) -> Union[pd.DataFrame, list[Document]]:
suffix = data_path.suffix
if ".xlsx" == suffix:
data = pd.read_excel(data_path)
elif ".csv" == suffix:
data = pd.read_csv(data_path)
elif ".json" == suffix:
data = pd.read_json(data_path)
elif suffix in (".docx", ".doc"):
data = SimpleDirectoryReader(input_files=[str(data_path)]).load_data()
elif ".txt" == suffix:
data = SimpleDirectoryReader(input_files=[str(data_path)]).load_data()
node_parser = SimpleNodeParser.from_defaults(separator="\n", chunk_size=256, chunk_overlap=0)
data = node_parser.get_nodes_from_documents(data)
elif ".pdf" == suffix:
data = PDFReader.load_data(str(data_path))
else:
raise NotImplementedError("File format not supported.")
return data
class DocumentStatus(Enum):
"""Indicates document status, a mechanism similar to RFC/PEP"""
DRAFT = "draft"
UNDERREVIEW = "underreview"
APPROVED = "approved"
DONE = "done"
class Document(BaseModel):
"""
Document: Handles operations related to document files.
"""
path: Path = Field(default=None)
name: str = Field(default="")
content: str = Field(default="")
# metadata? in content perhaps.
author: str = Field(default="")
status: DocumentStatus = Field(default=DocumentStatus.DRAFT)
reviews: list = Field(default_factory=list)
@classmethod
def from_path(cls, path: Path):
"""
Create a Document instance from a file path.
"""
if not path.exists():
raise FileNotFoundError(f"File {path} not found.")
content = path.read_text()
return cls(content=content, path=path)
@classmethod
def from_text(cls, text: str, path: Optional[Path] = None):
"""
Create a Document from a text string.
"""
return cls(content=text, path=path)
def to_path(self, path: Optional[Path] = None):
"""
Save content to the specified file path.
"""
if path is not None:
self.path = path
if self.path is None:
raise ValueError("File path is not set.")
self.path.parent.mkdir(parents=True, exist_ok=True)
# TODO: excel, csv, json, etc.
self.path.write_text(self.content, encoding="utf-8")
def persist(self):
"""
Persist document to disk.
"""
return self.to_path()
class IndexableDocument(Document):
"""
Advanced document handling: For vector databases or search engines.
"""
model_config = ConfigDict(arbitrary_types_allowed=True)
data: Union[pd.DataFrame, list]
content_col: Optional[str] = Field(default="")
meta_col: Optional[str] = Field(default="")
@classmethod
def from_path(cls, data_path: Path, content_col="content", meta_col="metadata"):
if not data_path.exists():
raise FileNotFoundError(f"File {data_path} not found.")
data = read_data(data_path)
if isinstance(data, pd.DataFrame):
validate_cols(content_col, data)
return cls(data=data, content=str(data), content_col=content_col, meta_col=meta_col)
try:
content = data_path.read_text()
except Exception as e:
logger.debug(f"Load {str(data_path)} error: {e}")
content = ""
return cls(data=data, content=content, content_col=content_col, meta_col=meta_col)
def _get_docs_and_metadatas_by_df(self) -> (list, list):
df = self.data
docs = []
metadatas = []
for i in tqdm(range(len(df))):
docs.append(df[self.content_col].iloc[i])
if self.meta_col:
metadatas.append({self.meta_col: df[self.meta_col].iloc[i]})
else:
metadatas.append({})
return docs, metadatas
def _get_docs_and_metadatas_by_llamaindex(self) -> (list, list):
data = self.data
docs = [i.text for i in data]
metadatas = [i.metadata for i in data]
return docs, metadatas
def get_docs_and_metadatas(self) -> (list, list):
if isinstance(self.data, pd.DataFrame):
return self._get_docs_and_metadatas_by_df()
elif isinstance(self.data, list):
return self._get_docs_and_metadatas_by_llamaindex()
else:
raise NotImplementedError("Data type not supported for metadata extraction.")
class RepoMetadata(BaseModel):
name: str = Field(default="")
n_docs: int = Field(default=0)
n_chars: int = Field(default=0)
symbols: list = Field(default_factory=list)
class Repo(BaseModel):
# Name of this repo.
name: str = Field(default="")
# metadata: RepoMetadata = Field(default=RepoMetadata)
docs: dict[Path, Document] = Field(default_factory=dict)
codes: dict[Path, Document] = Field(default_factory=dict)
assets: dict[Path, Document] = Field(default_factory=dict)
path: Path = Field(default=None)
def _path(self, filename):
return self.path / filename
@classmethod
def from_path(cls, path: Path):
"""Load documents, code, and assets from a repository path."""
path.mkdir(parents=True, exist_ok=True)
repo = Repo(path=path, name=path.name)
for file_path in path.rglob("*"):
# FIXME: These judgments are difficult to support multiple programming languages and need to be more general
if file_path.is_file() and file_path.suffix in [".json", ".txt", ".md", ".py", ".js", ".css", ".html"]:
repo._set(file_path.read_text(), file_path)
return repo
def to_path(self):
"""Persist all documents, code, and assets to the given repository path."""
for doc in self.docs.values():
doc.to_path()
for code in self.codes.values():
code.to_path()
for asset in self.assets.values():
asset.to_path()
def _set(self, content: str, path: Path):
"""Add a document to the appropriate category based on its file extension."""
suffix = path.suffix
doc = Document(content=content, path=path, name=str(path.relative_to(self.path)))
# FIXME: These judgments are difficult to support multiple programming languages and need to be more general
if suffix.lower() == ".md":
self.docs[path] = doc
elif suffix.lower() in [".py", ".js", ".css", ".html"]:
self.codes[path] = doc
else:
self.assets[path] = doc
return doc
def set(self, filename: str, content: str):
"""Set a document and persist it to disk."""
path = self._path(filename)
doc = self._set(content, path)
doc.to_path()
def get(self, filename: str) -> Optional[Document]:
"""Get a document by its filename."""
path = self._path(filename)
return self.docs.get(path) or self.codes.get(path) or self.assets.get(path)
def get_text_documents(self) -> list[Document]:
return list(self.docs.values()) + list(self.codes.values())
def eda(self) -> RepoMetadata:
n_docs = sum(len(i) for i in [self.docs, self.codes, self.assets])
n_chars = sum(sum(len(j.content) for j in i.values()) for i in [self.docs, self.codes, self.assets])
symbols = RepoParser(base_directory=self.path).generate_symbols()
return RepoMetadata(name=self.name, n_docs=n_docs, n_chars=n_chars, symbols=symbols)
|