tgas-theme2-ph2-demo / src /group_classification.py
yyuri's picture
Upload 4 files
4848895 verified
import os
import dotenv
import requests
from langchain_openai import AzureOpenAIEmbeddings
from langchain_community.vectorstores import AzureSearch
from langchain_community.document_loaders import PyMuPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from openai import AzureOpenAI
from openai import OpenAIError
from azure.search.documents.indexes import SearchIndexClient
from azure.core.credentials import AzureKeyCredential
import time
dotenv.load_dotenv()
from azure.identity import DefaultAzureCredential
from src.myLogger import set_logger
openai_endpoint = os.environ.get("AZURE_OPENAI_ENDPOINT")
openai_api_key = os.environ.get("AZURE_OPENAI_API_KEY")
# ???
index_name = 'group'
api_type = 'gpt-4o'
search_endpoint = os.environ.get('AZURE_AI_SEARCH_ENDPOINT')
search_key = os.environ.get('AZURE_AI_SEARCH_API_KEY')
version = os.environ.get("AZURE_OPENAI_API_VERSION")
embedding_model = os.environ.get("AZURE_OPENAI_EMBEDDING_MODEL")
credential = search_key or DefaultAzureCredential()
logger = set_logger("my_app", level="INFO")
si_client = SearchIndexClient(search_endpoint, AzureKeyCredential(search_key))
def main(PDF):
si_client.delete_index(index_name)
query = '二酸化炭素の排出量の記載はされていますか。また削減目標は記載されていますか。脱炭素、カーボンニュートラル、温暖化に関する取り組みを何かおこなっていますか。'
file = 'copy.pdf'
response = requests.get(PDF)
with open(file, 'wb') as f:
f.write(response.content)
try:
loader = PyMuPDFLoader(file)
document = loader.load()
except Exception as e:
print(f"error occured: {e}")
return
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=100,
length_function=len,
)
pages = text_splitter.split_documents(document)
embeddings = AzureOpenAIEmbeddings(model=embedding_model)
vector_store = AzureSearch(
azure_search_endpoint=search_endpoint,
azure_search_key=search_key,
index_name=index_name,
embedding_function=embeddings.embed_query,
semantic_configuration_name="default"
)
results = vector_store.add_documents(documents=pages)
# perform a hybrid search with semantic reranking
docs = vector_store.semantic_hybrid_search_with_score(
query=query,
k=3,
)
# for doc, score in docs:
# logger.info("-" * 80)
# logger.info(f"Chunk Content: {doc.page_content}")
texts = " ".join([doc.page_content for doc, score in docs])
client = AzureOpenAI(
api_key=openai_api_key,
api_version=version,
azure_endpoint=openai_endpoint
)
message = [
{"role": "system",
"content":
"""You are an assistant designed to analyze provided text and answer questions regarding CO2 emissions and reduction targets.
When given a prompt, you should extract the relevant information from the provided text and answer in the form (排出量, 削減目標).
- 排出量 (CO2 Emission): Answer 1 if there is any mention of CO2 emissions, otherwise answer 0.
- 削減目標 (Reduction Target): Answer 1 if there is any mention of a CO2 emission reduction target, otherwise answer 0.
Example:
- If the text mentions CO2 emissions but does not mention any reduction targets, your response should be (1, 0).
- If the text mentions neither CO2 emissions nor reduction targets, your response should be (0, 0)."""
},
{"role": "user",
"content": f"""Using the provided information: {texts}, please answer the following question in the given form (排出量, 削減目標): {query}."""}
]
retries = 0
max_retries = 100
delay = 5
while retries < max_retries:
try:
response = client.chat.completions.create(
model=api_type,
messages=message,
temperature=0
)
# print(response.choices[0].message.content)
time.sleep(1)
return response.choices[0].message.content
except OpenAIError as e:
print(f"Error occurred: {e}. Retrying in {delay} seconds...")
retries += 1
time.sleep(delay)
except Exception as e:
print(f"Unexpected error: {e}. Retrying in {delay} seconds...")
retries += 1
time.sleep(delay)
raise RuntimeError("Maximum retries exceeded. Could not get a valid response.")
if __name__ == '__main__':
main()