|
import streamlit as st |
|
import spacy |
|
import pandas as pd |
|
import re |
|
from transformers import pipeline, AutoTokenizer, AutoModelForSeq2SeqLM |
|
import subprocess |
|
import os |
|
os.environ["TRANSFORMERS_CACHE"] = "/home/user/.cache/huggingface" |
|
os.environ["HF_HOME"] = "/home/user/.cache/huggingface" |
|
os.environ["TORCH_HOME"] = "/home/user/.cache/torch" |
|
os.environ["HF_HUB_DISABLE_SYMLINKS_WARNING"] = "1" |
|
|
|
import torch |
|
import nltk |
|
from nltk.tokenize import sent_tokenize |
|
import traceback |
|
from collections import Counter |
|
|
|
|
|
st.set_page_config(page_title="FinBrief: Financial Document Insights", layout="wide") |
|
|
|
try: |
|
nlp = spacy.load("en_core_web_sm") |
|
st.write("spaCy model loaded successfully!") |
|
print("spaCy model loaded successfully!") |
|
except OSError: |
|
st.write("Failed to load spaCy model. Attempting to install...") |
|
print("Failed to load spaCy model. Attempting to install...") |
|
subprocess.run(["python", "-m", "spacy", "download", "en_core_web_sm"]) |
|
try: |
|
nlp = spacy.load("en_core_web_sm") |
|
st.write("spaCy model installed and loaded successfully!") |
|
print("spaCy model installed and loaded successfully!") |
|
except Exception as e: |
|
st.write(f"Still failed to load spaCy model: {e}") |
|
print(f"Still failed to load spaCy model: {e}") |
|
nlp = None |
|
|
|
model_name = "kritsadaK/bart-financial-summarization" |
|
|
|
try: |
|
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) |
|
model = AutoModelForSeq2SeqLM.from_pretrained(model_name, trust_remote_code=True) |
|
summarizer = pipeline("summarization", model=model, tokenizer=tokenizer) |
|
st.write("Hugging Face summarization model loaded successfully!") |
|
print("Hugging Face summarization model loaded successfully!") |
|
except Exception as e: |
|
st.write(f"Failed to load Hugging Face summarization model: {e}") |
|
print(f"Failed to load Hugging Face summarization model: {e}") |
|
summarizer = None |
|
|
|
|
|
st.session_state["nlp"] = nlp |
|
st.session_state["summarizer"] = summarizer |
|
|
|
|
|
if nlp is None: |
|
st.error("The spaCy model failed to load. Ensure it is installed.") |
|
if summarizer is None: |
|
st.error("The summarization model failed to load. Check the model path or internet connection.") |
|
|
|
st.title("FinBrief: Financial Document Insights") |
|
st.write("Upload a financial document for analysis.") |
|
|
|
|
|
if "nlp" not in st.session_state: |
|
st.session_state["nlp"] = nlp |
|
if "summarizer" not in st.session_state: |
|
st.session_state["summarizer"] = summarizer |
|
|
|
|
|
nltk_data_dir = os.path.join(os.getcwd(), 'nltk_data') |
|
if not os.path.exists(nltk_data_dir): |
|
os.makedirs(nltk_data_dir) |
|
nltk.data.path.append(nltk_data_dir) |
|
|
|
def download_nltk_punkt(): |
|
try: |
|
nltk.data.find('tokenizers/punkt') |
|
st.write("NLTK 'punkt' tokenizer is already installed.") |
|
print("NLTK 'punkt' tokenizer is already installed.") |
|
except LookupError: |
|
st.write("NLTK 'punkt' tokenizer not found. Attempting to download...") |
|
print("NLTK 'punkt' tokenizer not found. Attempting to download...") |
|
try: |
|
nltk.download('punkt', download_dir=nltk_data_dir, quiet=True) |
|
nltk.data.find('tokenizers/punkt') |
|
st.write("NLTK 'punkt' tokenizer downloaded successfully.") |
|
print("NLTK 'punkt' tokenizer downloaded successfully.") |
|
except Exception as e: |
|
st.error(f"NLTK 'punkt' tokenizer download failed: {e}") |
|
print(f"NLTK 'punkt' tokenizer download failed: {e}") |
|
|
|
|
|
download_nltk_punkt() |
|
|
|
|
|
print(f"Session State - NLP: {st.session_state['nlp'] is not None}, Summarizer: {st.session_state['summarizer'] is not None}") |
|
|
|
|
|
patterns = { |
|
"Fund Name": r"^(.*?) Fund", |
|
"CUSIP": r"CUSIP\s+(\d+)", |
|
"Inception Date": r"Inception Date\s+([\w\s\d]+)", |
|
"Benchmark": r"Benchmark\s+([\w\s\d]+)", |
|
"Expense Ratio": r"Expense Information.*?(\d+\.\d+%)", |
|
"Total Assets": r"Total Assets\s+USD\s+([\d,]+)", |
|
"Portfolio Turnover": r"Portfolio Holdings Turnover.*?(\d+\.\d+%)", |
|
"Cash Allocation": r"% of Portfolio in Cash\s+(\d+\.\d+%)", |
|
"Alpha": r"Alpha\s+(-?\d+\.\d+%)", |
|
"Standard Deviation": r"Standard Deviation\s+(\d+\.\d+%)" |
|
} |
|
|
|
|
|
st.markdown("[Example Financial Documents](https://drive.google.com/drive/folders/1jMu3S7S_Hc_RgK6_cvsCqIB8x3SSS-R6)") |
|
|
|
|
|
st.markdown( |
|
""" |
|
<style> |
|
.sidebar .sidebar-content { |
|
background-color: #f7f7f7; |
|
color: #333; |
|
} |
|
.css-1d391kg { |
|
background-color: #f0f4f8; |
|
} |
|
.stButton>button { |
|
background-color: #4CAF50; |
|
color: white; |
|
padding: 10px 20px; |
|
border-radius: 5px; |
|
font-size: 16px; |
|
} |
|
.stTextArea textarea { |
|
border: 2px solid #4CAF50; |
|
border-radius: 5px; |
|
padding: 10px; |
|
} |
|
</style> |
|
""", |
|
unsafe_allow_html=True, |
|
) |
|
|
|
|
|
def extract_text_tables_pdfplumber(pdf_file): |
|
import io |
|
import pdfplumber |
|
|
|
print("\nPDFPlumber: Extracting text and tables...") |
|
with pdfplumber.open(io.BytesIO(pdf_file.read())) as pdf: |
|
all_text = "" |
|
all_tables = [] |
|
|
|
for page in pdf.pages: |
|
page_text = page.extract_text() |
|
if page_text: |
|
all_text += page_text + "\n" |
|
|
|
|
|
tables = page.extract_tables() |
|
all_tables.extend(tables) |
|
|
|
if all_text.strip(): |
|
print(all_text[:1000]) |
|
return all_text, all_tables |
|
else: |
|
print("No text extracted. The PDF might be image-based.") |
|
return None, None |
|
|
|
|
|
def evaluate_chunk_relevance(chunk, keywords=None): |
|
""" |
|
Evaluate the relevance of a text chunk based on various factors. |
|
Returns a score representing the chunk's relevance. |
|
""" |
|
if not keywords: |
|
|
|
keywords = ["fund", "portfolio", "performance", "return", "asset", "investment", |
|
"expense", "risk", "benchmark", "allocation", "strategy", "market", |
|
"growth", "income", "dividend", "yield", "capital", "equity", "bond", |
|
"summary", "overview", "highlight", "key", "important", "significant"] |
|
|
|
score = 0 |
|
|
|
|
|
word_count = len(chunk.split()) |
|
score += min(word_count / 100, 5) |
|
|
|
|
|
|
|
lower_chunk = chunk.lower() |
|
keyword_count = sum(1 for keyword in keywords if keyword.lower() in lower_chunk) |
|
keyword_density = keyword_count / max(1, word_count) * 100 |
|
score += min(keyword_density * 2, 10) |
|
|
|
|
|
number_count = len(re.findall(r'\d+\.?\d*%?', chunk)) |
|
score += min(number_count / 5, 5) |
|
|
|
|
|
bullet_count = len(re.findall(r'•|\*|-|–|[0-9]+\.', chunk)) |
|
score += min(bullet_count, 5) |
|
|
|
|
|
header_patterns = [ |
|
r'^[A-Z][A-Za-z\s]+:', |
|
r'^[A-Z][A-Z\s]+', |
|
r'^\d+\.\s+[A-Z]' |
|
] |
|
header_count = sum(1 for pattern in header_patterns if re.search(pattern, chunk, re.MULTILINE)) |
|
score += min(header_count * 2, 5) |
|
|
|
return score |
|
|
|
|
|
def rank_and_select_chunks(chunks, max_chunks=5, keywords=None): |
|
""" |
|
Rank chunks by relevance and return the top chunks. |
|
""" |
|
|
|
chunk_scores = [(chunk, evaluate_chunk_relevance(chunk, keywords)) for chunk in chunks] |
|
|
|
|
|
sorted_chunks = sorted(chunk_scores, key=lambda x: x[1], reverse=True) |
|
|
|
|
|
top_chunks = [chunk for chunk, score in sorted_chunks[:max_chunks]] |
|
|
|
|
|
print("Chunk scores:") |
|
for i, (chunk, score) in enumerate(sorted_chunks): |
|
print(f"Chunk {i+1}: Score {score:.2f}, Length {len(chunk.split())} words") |
|
print(f"First 100 chars: {chunk[:100]}...") |
|
|
|
return top_chunks |
|
|
|
def split_text_into_chunks(text, tokenizer, max_tokens=512): |
|
sentences = nltk.sent_tokenize(text) |
|
chunks = [] |
|
current_chunk = '' |
|
current_length = 0 |
|
|
|
for sentence in sentences: |
|
sentence_tokens = tokenizer.encode(sentence, add_special_tokens=False) |
|
sentence_length = len(sentence_tokens) |
|
|
|
|
|
if current_length + sentence_length > max_tokens: |
|
if current_chunk: |
|
chunks.append(current_chunk.strip()) |
|
|
|
current_chunk = sentence |
|
current_length = sentence_length |
|
else: |
|
current_chunk += ' ' + sentence |
|
current_length += sentence_length |
|
|
|
if current_chunk: |
|
chunks.append(current_chunk.strip()) |
|
|
|
return chunks |
|
|
|
def remove_duplicate_sentences(text): |
|
sentences = nltk.sent_tokenize(text) |
|
unique_sentences = [] |
|
seen_sentences = set() |
|
|
|
for sentence in sentences: |
|
|
|
normalized_sentence = sentence.strip().lower() |
|
if normalized_sentence not in seen_sentences: |
|
seen_sentences.add(normalized_sentence) |
|
unique_sentences.append(sentence) |
|
|
|
return ' '.join(unique_sentences) |
|
|
|
|
|
if "pdf_text" not in st.session_state: |
|
st.session_state["pdf_text"] = "" |
|
if "pdf_tables" not in st.session_state: |
|
st.session_state["pdf_tables"] = [] |
|
|
|
|
|
st.sidebar.header("Upload Your Financial Document") |
|
uploaded_file = st.sidebar.file_uploader("Choose a PDF file", type="pdf") |
|
|
|
if uploaded_file is not None: |
|
st.sidebar.write(f"You uploaded: {uploaded_file.name}") |
|
|
|
|
|
pdf_text, pdf_tables = extract_text_tables_pdfplumber(uploaded_file) |
|
|
|
if pdf_text is not None: |
|
|
|
st.session_state["pdf_text"] = pdf_text |
|
st.session_state["pdf_tables"] = pdf_tables |
|
|
|
st.sidebar.success("PDF uploaded and text extracted!") |
|
else: |
|
st.markdown("[Example Financial Documents](https://drive.google.com/drive/folders/1jMu3S7S_Hc_RgK6_cvsCqIB8x3SSS-R6)") |
|
st.error("No text extracted from the uploaded PDF.") |
|
|
|
|
|
st.subheader("Extracted Text") |
|
if st.session_state["pdf_text"]: |
|
st.text_area("Document Text", st.session_state["pdf_text"], height=400) |
|
else: |
|
st.warning("No text extracted yet. Upload a PDF to start.") |
|
|
|
|
|
|
|
st.subheader("Extracted Tables") |
|
if st.session_state["pdf_tables"]: |
|
for idx, table in enumerate(st.session_state["pdf_tables"]): |
|
st.write(f"Table {idx+1}") |
|
st.write(pd.DataFrame(table)) |
|
else: |
|
st.info("No tables extracted.") |
|
|
|
|
|
nlp = st.session_state["nlp"] |
|
summarizer = st.session_state["summarizer"] |
|
pdf_text = st.session_state["pdf_text"] |
|
pdf_tables = st.session_state["pdf_tables"] |
|
|
|
|
|
if nlp is None or summarizer is None: |
|
st.error("Models are not properly loaded. Please check model paths and installation.") |
|
else: |
|
|
|
st.subheader("NER Analysis") |
|
|
|
|
|
example_text = st.text_area( |
|
"Enter or paste text for analysis", |
|
height=400, |
|
value=st.session_state["pdf_text"] if st.session_state["pdf_text"] else "" |
|
) |
|
|
|
if st.button("Analyze"): |
|
|
|
text_for_analysis = st.session_state["pdf_text"].strip() if st.session_state["pdf_text"] else example_text.strip() |
|
|
|
if text_for_analysis: |
|
with st.spinner("Analyzing text..."): |
|
|
|
extracted_data = { |
|
key: (match.group(1) if match else "N/A") |
|
for key, pattern in patterns.items() |
|
if (match := re.search(pattern, text_for_analysis, re.IGNORECASE)) |
|
} |
|
|
|
doc = nlp(text_for_analysis) |
|
financial_entities = [(ent.text, ent.label_) for ent in doc.ents if ent.label_ in ["MONEY", "PERCENT", "ORG", "DATE"]] |
|
|
|
|
|
structured_data = {**extracted_data, "Named Entities Extracted": financial_entities} |
|
|
|
|
|
st.write("Entities Found:") |
|
st.write(pd.DataFrame(financial_entities, columns=["Entity", "Label"])) |
|
|
|
st.write("Structured Data Extracted:") |
|
st.write(pd.DataFrame([structured_data])) |
|
|
|
else: |
|
st.error("Please provide some text for analysis.") |
|
|
|
|
|
st.subheader("Summarization") |
|
st.write("Generate concise summaries of financial documents.") |
|
|
|
|
|
st.sidebar.header("Summarization Settings") |
|
max_chunks_to_process = st.sidebar.slider( |
|
"Max chunks to summarize", |
|
min_value=1, |
|
max_value=10, |
|
value=3, |
|
help="Select fewer chunks for faster processing but less comprehensive summaries" |
|
) |
|
|
|
|
|
custom_keywords = st.sidebar.text_input( |
|
"Add custom keywords (comma separated)", |
|
value="", |
|
help="Add domain-specific keywords to improve chunk selection" |
|
) |
|
|
|
|
|
input_text = st.text_area( |
|
"Enter text to summarize", |
|
height=200, |
|
value=st.session_state.get("pdf_text", "") if "pdf_text" in st.session_state else "" |
|
) |
|
|
|
|
|
show_chunk_details = st.sidebar.checkbox("Show chunk selection details", value=False) |
|
|
|
if st.button("Summarize"): |
|
text_to_summarize = input_text.strip() |
|
if text_to_summarize: |
|
try: |
|
|
|
input_length = len(text_to_summarize.split()) |
|
st.write(f"Original text length: {input_length} words") |
|
|
|
|
|
keywords = None |
|
if custom_keywords: |
|
keywords = [kw.strip() for kw in custom_keywords.split(",") if kw.strip()] |
|
st.write(f"Using custom keywords: {', '.join(keywords)}") |
|
|
|
|
|
chunks = split_text_into_chunks(text_to_summarize, tokenizer) |
|
st.write(f"Text has been split into {len(chunks)} chunks.") |
|
|
|
|
|
selected_chunks = rank_and_select_chunks( |
|
chunks, |
|
max_chunks=max_chunks_to_process, |
|
keywords=keywords |
|
) |
|
|
|
st.write(f"Selected {len(selected_chunks)} highest-ranked chunks for summarization.") |
|
|
|
|
|
if show_chunk_details: |
|
with st.expander("Chunk Selection Details"): |
|
for i, chunk in enumerate(selected_chunks): |
|
st.markdown(f"**Chunk {i+1}**") |
|
st.write(f"Length: {len(chunk.split())} words") |
|
st.text(chunk[:300] + "..." if len(chunk) > 300 else chunk) |
|
st.write("---") |
|
|
|
|
|
summaries = [] |
|
with st.spinner(f"Summarizing {len(selected_chunks)} chunks..."): |
|
for i, chunk in enumerate(selected_chunks): |
|
st.write(f"Summarizing chunk {i+1}/{len(selected_chunks)}...") |
|
|
|
chunk_length = len(chunk.split()) |
|
max_summary_length = min(150, chunk_length // 2) |
|
min_summary_length = max(50, max_summary_length // 2) |
|
|
|
try: |
|
summary_output = summarizer( |
|
chunk, |
|
max_length=max_summary_length, |
|
min_length=min_summary_length, |
|
do_sample=False, |
|
truncation=True |
|
) |
|
chunk_summary = summary_output[0]['summary_text'].strip() |
|
|
|
if not chunk_summary: |
|
st.warning(f"The summary for chunk {i+1} is empty.") |
|
else: |
|
summaries.append(chunk_summary) |
|
|
|
except Exception as e: |
|
st.error(f"Summarization failed for chunk {i+1}: {e}") |
|
st.text(traceback.format_exc()) |
|
continue |
|
|
|
if summaries: |
|
|
|
combined_summary = ' '.join(summaries) |
|
final_summary = remove_duplicate_sentences(combined_summary) |
|
|
|
|
|
summary_length = len(final_summary.split()) |
|
compression_ratio = (1 - summary_length / input_length) * 100 |
|
|
|
st.subheader("Final Summary") |
|
st.success(final_summary) |
|
st.write(f"Summary length: {summary_length} words ({compression_ratio:.1f}% compression)") |
|
|
|
|
|
st.subheader("Summary Statistics") |
|
stats_col1, stats_col2 = st.columns(2) |
|
with stats_col1: |
|
st.metric("Original Length", f"{input_length} words") |
|
st.metric("Total Chunks", str(len(chunks))) |
|
with stats_col2: |
|
st.metric("Summary Length", f"{summary_length} words") |
|
st.metric("Chunks Processed", str(len(selected_chunks))) |
|
|
|
else: |
|
st.error("No summaries were generated.") |
|
|
|
except Exception as e: |
|
st.error("An error occurred during summarization.") |
|
st.text(traceback.format_exc()) |
|
else: |
|
st.error("Please provide text to summarize.") |
|
|
|
|
|
st.sidebar.markdown("---") |
|
with st.sidebar.expander("How Chunk Selection Works"): |
|
st.markdown(""" |
|
The chunk selection algorithm ranks text chunks based on: |
|
|
|
1. **Keyword density** - Presence of financial terms |
|
2. **Length** - Longer chunks often contain more information |
|
3. **Numbers** - Financial documents with numbers are often important |
|
4. **Structure** - Lists and bullet points signal key information |
|
5. **Headers** - Section headers often introduce important content |
|
|
|
Adjust the settings above to customize the selection process. |
|
""") |