Spaces:
Sleeping
Sleeping
import json | |
import os | |
from logging import getLogger | |
from openai import OpenAI | |
from openai import AzureOpenAI | |
from openai import OpenAIError | |
import fitz | |
import requests | |
import time | |
from langchain.document_loaders import PyPDFLoader | |
from langchain.prompts import ChatPromptTemplate | |
from langchain.text_splitter import ( # CharacterTextSplitter, | |
RecursiveCharacterTextSplitter, | |
) | |
from langchain.vectorstores import Chroma | |
from langchain_core.output_parsers import StrOutputParser | |
import json | |
# from langchain_openai import ChatOpenAI, OpenAIEmbeddings | |
# from langchain_openai import AzureChatOpenAI, AzureOpenAIEmbeddings | |
import csv | |
from bs4 import BeautifulSoup | |
from src.myLogger import set_logger | |
logger = set_logger("my_app", level="INFO") | |
system_prompt = """ | |
あなたのタスクは、提供された資料が指定された会社のものであるかどうかを判断することです。以下の基準に基づいて判断を行ってください: | |
1. 資料に会社名が明記されているかどうかを確認してください。 | |
2. 資料の内容の大部分が指定された会社に関する情報であるかどうかを確認してください。 | |
これらの基準を満たしている場合、その資料は会社のものであると見なされます。出力フォーマットに従って結果を出力してください。 | |
""" | |
json_schema = { | |
"type": "object", | |
"properties": { | |
"judge": { | |
"type": "integer", | |
"description": "{company_name}の資料であれば1,そうでなければ0" | |
}, | |
"reason": { | |
"type": "string", | |
"description": "どうしてそう判断したのか" | |
} | |
}, | |
"required": ["judge", "reason"] | |
} | |
def generate_check_(reference): | |
api_key = os.getenv("OPENAI_API_KEY") | |
client = OpenAI( | |
api_key=api_key, | |
) | |
retries = 0 | |
max_retries = 100 | |
delay = 5 | |
while retries < max_retries: | |
try: | |
response = client.chat.completions.create( | |
model="gpt-3.5-turbo", | |
messages=[ | |
{ | |
"role": "system", | |
"content": system_prompt, | |
}, | |
{ | |
"role": "user", | |
"content": reference, | |
}, | |
], | |
functions=[{"name": "generate_queries", "parameters": json_schema}], | |
function_call={"name": "generate_queries"}, | |
temperature=0.0, | |
top_p=0.0, | |
) | |
output = response.choices[0].message.function_call.arguments | |
time.sleep(1) | |
return output | |
except OpenAIError as e: | |
print(f"Error occurred: {e}. Retrying in {delay} seconds...") | |
retries += 1 | |
time.sleep(delay) | |
except Exception as e: | |
print(f"Unexpected error: {e}. Retrying in {delay} seconds...") | |
retries += 1 | |
time.sleep(delay) | |
raise RuntimeError("Maximum retries exceeded. Could not get a valid response.") | |
def ch(company_name, reference): | |
com = company_name.replace("株式会社", "") | |
user_prompt = f""" | |
以下の資料は,{company_name}のものであるか,理由をつけて答えてください。資料が{company_name}のものであるとは, | |
資料に{company_name}と明記されておりまた資料の内容の大部分が{company_name}に関する情報であるということです。 | |
以下のフォーマットに従い出力してください。 | |
[資料] | |
{reference} | |
[出力フォーマット] | |
{{ | |
judge: 0 or 1({com}の資料であれば1,そうでなければ0) | |
reason: "どうしてそう判断したのか具体的に説明してください。" | |
}} | |
""" | |
ret = generate_check_(user_prompt) | |
js = json.loads(ret) | |
return js["judge"], js["reason"] | |
def pdf_to_text(url): | |
# リクエストをきちんと送るためのもの | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36" | |
} | |
result = requests.get(url, headers=headers, timeout=5) | |
finename = ( | |
# "./out/" + company_name + url.replace("/", "_").replace(":", "_") + "IR.pdf" | |
# url.replace("/", "_").replace(":", "+") + "IR.pdf" | |
"./out/tmp.pdf" | |
) | |
try: | |
with open(finename, "wb") as file: | |
for chunk in result.iter_content(1000000000): | |
file.write(chunk) | |
except Exception as e: | |
print(e) | |
document = fitz.open(finename) | |
full_text = "" | |
for page_num in range(len(document)): | |
page = document.load_page(page_num) | |
full_text += page.get_text() | |
document.close() | |
return full_text | |
def html_to_text(url): | |
# リクエストをきちんと送るためのもの | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/50.0.2661.102 Safari/537.36" | |
} | |
logger.info(f"requesting {url}") | |
result = requests.get(url, headers=headers, timeout=5) | |
result.encoding = result.apparent_encoding | |
soup = BeautifulSoup(result.text, "html.parser") | |
paragraphs = soup.find_all("p") | |
return "\n".join([paragraph.get_text() for paragraph in paragraphs]) | |
def co(company_name, url): | |
try: | |
if url.endswith(".pdf"): | |
full_text = pdf_to_text(url) | |
else: | |
full_text = html_to_text(url) | |
except Exception as e: | |
print(e) | |
return -1 | |
reference = full_text[:10000] | |
judge, reason = ch(company_name, reference) | |
logger.info(f'Judge: {judge}, Reason: {reason}') | |
return judge, reason | |