|
import streamlit as st
|
|
import concurrent.futures
|
|
from concurrent.futures import ThreadPoolExecutor,as_completed
|
|
from functools import partial
|
|
import numpy as np
|
|
from io import StringIO
|
|
import sys
|
|
import time
|
|
import pandas as pd
|
|
from pymongo import MongoClient
|
|
import plotly.express as px
|
|
from pinecone import Pinecone, ServerlessSpec
|
|
import chromadb
|
|
import requests
|
|
from io import BytesIO
|
|
from PyPDF2 import PdfReader
|
|
import hashlib
|
|
import os
|
|
|
|
|
|
from embedding import get_embeddings,get_image_embeddings,get_embed_chroma,imporve_text
|
|
from preprocess import filtering
|
|
from search import *
|
|
|
|
|
|
|
|
client = chromadb.PersistentClient(path = "embeddings")
|
|
collection = client.get_or_create_collection(name="data",metadata={"hnsw:space": "l2"})
|
|
|
|
|
|
def generate_hash(content):
|
|
return hashlib.sha256(content.encode('utf-8')).hexdigest()
|
|
|
|
def get_key(link):
|
|
text = ''
|
|
try:
|
|
|
|
response = requests.get(link)
|
|
response.raise_for_status()
|
|
|
|
|
|
pdf_file = BytesIO(response.content)
|
|
|
|
|
|
reader = PdfReader(pdf_file)
|
|
num_pages = len(reader.pages)
|
|
|
|
first_page_text = reader.pages[0].extract_text()
|
|
if first_page_text:
|
|
text += first_page_text
|
|
|
|
|
|
last_page_text = reader.pages[-1].extract_text()
|
|
if last_page_text:
|
|
text += last_page_text
|
|
|
|
except requests.exceptions.HTTPError as e:
|
|
print(f'HTTP error occurred: {e}')
|
|
except Exception as e:
|
|
print(f'An error occurred: {e}')
|
|
|
|
unique_key = generate_hash(text)
|
|
|
|
return unique_key
|
|
|
|
|
|
def cosine_similarity(vec1, vec2):
|
|
vec1 = np.array(vec1)
|
|
vec2 = np.array(vec2)
|
|
|
|
dot_product = np.dot(vec1, vec2.T)
|
|
magnitude_vec1 = np.linalg.norm(vec1)
|
|
magnitude_vec2 = np.linalg.norm(vec2)
|
|
|
|
if magnitude_vec1 == 0 or magnitude_vec2 == 0:
|
|
return 0.0
|
|
|
|
cosine_sim = dot_product / (magnitude_vec1 * magnitude_vec2)
|
|
return cosine_sim
|
|
|
|
def update_chroma(product_name,url,key,text,vector,log_area):
|
|
|
|
id_list = [key+str(i) for i in range(len(text))]
|
|
|
|
metadata_list = [
|
|
{ 'key':key,
|
|
'product_name': product_name,
|
|
'url': url,
|
|
'text':item
|
|
}
|
|
for item in text
|
|
]
|
|
|
|
collection.upsert(
|
|
ids = id_list,
|
|
embeddings = vector,
|
|
metadatas = metadata_list
|
|
)
|
|
|
|
logger.write(f"\n\u2713 Updated DB - {url}\n\n")
|
|
log_area.text(logger.getvalue())
|
|
|
|
|
|
|
|
class StreamCapture:
|
|
def __init__(self):
|
|
self.output = StringIO()
|
|
self._stdout = sys.stdout
|
|
|
|
def __enter__(self):
|
|
sys.stdout = self.output
|
|
return self.output
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
sys.stdout = self._stdout
|
|
|
|
|
|
def score(main_product, main_url, product_count, link_count, search, logger, log_area):
|
|
|
|
|
|
data = {}
|
|
similar_products = extract_similar_products(main_product)[:product_count]
|
|
|
|
print("--> Fetching Manual Links")
|
|
|
|
if search == 'All':
|
|
|
|
def process_product(product, search_function, main_product):
|
|
search_result = search_function(product)
|
|
return filtering(search_result, main_product, product, link_count)
|
|
|
|
|
|
search_functions = {
|
|
'google': search_google,
|
|
'duckduckgo': search_duckduckgo,
|
|
|
|
'github': search_github,
|
|
'wikipedia': search_wikipedia
|
|
}
|
|
|
|
with ThreadPoolExecutor() as executor:
|
|
future_to_product_search = {
|
|
executor.submit(process_product, product, search_function, main_product): (product, search_name)
|
|
for product in similar_products
|
|
for search_name, search_function in search_functions.items()
|
|
}
|
|
|
|
for future in as_completed(future_to_product_search):
|
|
product, search_name = future_to_product_search[future]
|
|
try:
|
|
if product not in data:
|
|
data[product] = {}
|
|
data[product] = future.result()
|
|
except Exception as e:
|
|
print(f"Error processing product {product} with {search_name}: {e}")
|
|
|
|
else:
|
|
|
|
for product in similar_products:
|
|
|
|
if search == 'google':
|
|
data[product] = filtering(search_google(product), main_product, product, link_count)
|
|
elif search == 'duckduckgo':
|
|
data[product] = filtering(search_duckduckgo(product), main_product, product, link_count)
|
|
elif search == 'archive':
|
|
data[product] = filtering(search_archive(product), main_product, product, link_count)
|
|
elif search == 'github':
|
|
data[product] = filtering(search_github(product), main_product, product, link_count)
|
|
elif search == 'wikipedia':
|
|
data[product] = filtering(search_wikipedia(product), main_product, product, link_count)
|
|
|
|
|
|
|
|
logger.write("\n\n\u2713 Filtered Links\n")
|
|
log_area.text(logger.getvalue())
|
|
|
|
|
|
|
|
logger.write("\n\n--> Creating Main product Embeddings\n")
|
|
|
|
main_key = get_key(main_url)
|
|
main_text,main_vector = get_embed_chroma(main_url)
|
|
|
|
update_chroma(main_product,main_url,main_key,main_text,main_vector,log_area)
|
|
|
|
|
|
print("\n\n\u2713 Main Product embeddings Created")
|
|
|
|
|
|
logger.write("\n\n--> Creating Similar product Embeddings\n")
|
|
log_area.text(logger.getvalue())
|
|
test_embedding = [0]*768
|
|
|
|
for product in data:
|
|
for link in data[product]:
|
|
|
|
url, _ = link
|
|
similar_key = get_key(url)
|
|
|
|
res = collection.query(
|
|
query_embeddings = [test_embedding],
|
|
n_results=1,
|
|
where={"key": similar_key},
|
|
)
|
|
|
|
if not res['distances'][0]:
|
|
similar_text,similar_vector = get_embed_chroma(url)
|
|
update_chroma(product,url,similar_key,similar_text,similar_vector,log_area)
|
|
|
|
|
|
logger.write("\n\n\u2713 Similar Product embeddings Created\n")
|
|
log_area.text(logger.getvalue())
|
|
|
|
top_similar = []
|
|
|
|
for idx,chunk in enumerate(main_vector):
|
|
res = collection.query(
|
|
query_embeddings = [chunk],
|
|
n_results=1,
|
|
where={"key": {'$ne':main_key}},
|
|
include=['metadatas','embeddings','distances']
|
|
)
|
|
|
|
top_similar.append((main_text[idx],chunk,res,res['distances'][0]))
|
|
|
|
most_similar_items = sorted(top_similar,key = lambda x:x[3])[:top_similar_count]
|
|
|
|
|
|
logger.write("--------------- DONE -----------------\n")
|
|
log_area.text(logger.getvalue())
|
|
|
|
return most_similar_items
|
|
|
|
|
|
|
|
|
|
|
|
|
|
st.title("Check Infringement")
|
|
|
|
|
|
|
|
main_product = st.text_input('Enter Main Product Name', 'Philips led 7w bulb')
|
|
main_url = st.text_input('Enter Main Product Manual URL', 'https://www.assets.signify.com/is/content/PhilipsConsumer/PDFDownloads/Colombia/technical-sheets/ODLI20180227_001-UPD-es_CO-Ficha_Tecnica_LED_MR16_Master_7W_Dim_12V_CRI90.pdf')
|
|
search_method = st.selectbox('Choose Search Engine', ['All','duckduckgo', 'google', 'archive', 'github', 'wikipedia'])
|
|
|
|
col1, col2, col3= st.columns(3)
|
|
with col1:
|
|
product_count = st.number_input("Number of Simliar Products",min_value=1, step=1, format="%i")
|
|
with col2:
|
|
link_count = st.number_input("Number of Links per product",min_value=1, step=1, format="%i")
|
|
with col3:
|
|
need_image = st.selectbox("Process Images", ['True','False'])
|
|
|
|
top_similar_count = st.number_input("Top Similarities to be displayed",value=3,min_value=1, step=1, format="%i")
|
|
tag_option = "Complete Document Similarity"
|
|
|
|
|
|
if st.button('Check for Infringement'):
|
|
global log_output
|
|
|
|
tab1, tab2 = st.tabs(["Output", "Console"])
|
|
|
|
with tab2:
|
|
log_output = st.empty()
|
|
|
|
with tab1:
|
|
with st.spinner('Processing...'):
|
|
with StreamCapture() as logger:
|
|
top_similar_values = score(main_product, main_url, product_count, link_count, search_method, logger, log_output)
|
|
|
|
st.success('Processing complete!')
|
|
|
|
st.subheader("Cosine Similarity Scores")
|
|
|
|
for main_text, main_vector, response, _ in top_similar_values:
|
|
product_name = response['metadatas'][0][0]['product_name']
|
|
link = response['metadatas'][0][0]['url']
|
|
similar_text = response['metadatas'][0][0]['text']
|
|
|
|
cosine_score = cosine_similarity([main_vector], response['embeddings'][0])[0][0]
|
|
|
|
|
|
with st.container():
|
|
st.markdown(f"### [Product: {product_name}]({link})")
|
|
st.markdown(f"#### Cosine Score: {cosine_score:.4f}")
|
|
col1, col2 = st.columns(2)
|
|
with col1:
|
|
st.markdown(f"**Main Text:** \n{imporve_text(main_text)}")
|
|
with col2:
|
|
st.markdown(f"**Similar Text:** \n{imporve_text(similar_text)}")
|
|
|
|
st.markdown("---")
|
|
|
|
if need_image == 'True':
|
|
with st.spinner('Processing Images...'):
|
|
emb_main = get_image_embeddings(main_product)
|
|
similar_prod = extract_similar_products(main_product)[0]
|
|
emb_similar = get_image_embeddings(similar_prod)
|
|
|
|
similarity_matrix = np.zeros((5, 5))
|
|
for i in range(5):
|
|
for j in range(5):
|
|
similarity_matrix[i][j] = cosine_similarity([emb_main[i]], [emb_similar[j]])[0][0]
|
|
|
|
st.subheader("Image Similarity")
|
|
|
|
fig = px.imshow(similarity_matrix,
|
|
labels=dict(x=f"{similar_prod} Images", y=f"{main_product} Images", color="Similarity"),
|
|
x=[f"Image {i+1}" for i in range(5)],
|
|
y=[f"Image {i+1}" for i in range(5)],
|
|
color_continuous_scale="Viridis")
|
|
|
|
|
|
fig.update_layout(title="Image Similarity Heatmap")
|
|
|
|
|
|
st.plotly_chart(fig)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|