Spaces:
Sleeping
Sleeping
# -*- coding: utf-8 -*- | |
"""repository_recommender.ipynb | |
Automatically generated by Colab. | |
Original file is located at | |
https://colab.research.google.com/drive/1qv09N8Vtcw5vr5NqCSfZonFeh1SQmVW5 | |
""" | |
pip install pyarrow pandas numpy streamlit gdown torch transformers | |
import warnings | |
warnings.filterwarnings('ignore') | |
import streamlit as st | |
import pandas as pd | |
import numpy as np | |
from sklearn.metrics.pairwise import cosine_similarity | |
from transformers import AutoTokenizer, AutoModel | |
import torch | |
import gdown | |
from pathlib import Path | |
from datetime import datetime | |
import json | |
# Initialize session state for history and feedback | |
if 'search_history' not in st.session_state: | |
st.session_state.search_history = [] | |
if 'feedback_data' not in st.session_state: | |
st.session_state.feedback_data = {} | |
# Model Loading Optimization | |
class ModelManager: | |
def __init__(self): | |
self.model = None | |
self.tokenizer = None | |
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
def load_model_and_tokenizer(self): | |
"""Optimized model loading with device placement""" | |
model_name = "Salesforce/codet5-small" | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
model = AutoModel.from_pretrained(model_name).to(self.device) | |
model.eval() # Set model to evaluation mode | |
return tokenizer, model | |
def get_model_and_tokenizer(self): | |
if self.model is None or self.tokenizer is None: | |
self.tokenizer, self.model = self.load_model_and_tokenizer() | |
return self.tokenizer, self.model | |
# Disable gradient computation | |
def generate_embedding(self, text, max_length=512): | |
"""Optimized embedding generation""" | |
tokenizer, model = self.get_model_and_tokenizer() | |
inputs = tokenizer( | |
text, | |
return_tensors="pt", | |
padding=True, | |
truncation=True, | |
max_length=max_length | |
).to(self.device) | |
outputs = model.encoder(**inputs) | |
embedding = outputs.last_hidden_state.mean(dim=1).squeeze().cpu().numpy() | |
return embedding | |
# Data Management | |
class DataManager: | |
def load_dataset(): | |
"""Load and prepare dataset""" | |
Path("data").mkdir(exist_ok=True) | |
dataset_path = "/content/drive/MyDrive/practice_ml/filtered_dataset.csv" | |
if not Path(dataset_path).exists(): | |
with st.spinner('Downloading dataset... This might take a few minutes...'): | |
url = "/content/drive/MyDrive/practice_ml" | |
gdown.download(url, dataset_path, quiet=False) | |
data = pd.read_csv(dataset_path) | |
data['text'] = data['docstring'].fillna('') + ' ' + data['summary'].fillna('') | |
return data | |
def compute_embeddings(_data, _model_manager): | |
"""Compute embeddings in batches""" | |
embeddings = [] | |
batch_size = 32 | |
with st.progress(0) as progress_bar: | |
for i in range(0, len(_data), batch_size): | |
batch = _data['text'].iloc[i:i+batch_size] | |
batch_embeddings = [_model_manager.generate_embedding(text) for text in batch] | |
embeddings.extend(batch_embeddings) | |
progress_bar.progress(min((i + batch_size) / len(_data), 1.0)) | |
return embeddings | |
# History and Feedback Management | |
def add_to_history(query, recommendations): | |
"""Add search to history""" | |
history_entry = { | |
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), | |
'query': query, | |
'recommendations': recommendations[['repo', 'path', 'url', 'similarity']].to_dict('records') | |
} | |
st.session_state.search_history.insert(0, history_entry) | |
# Keep only last 10 searches | |
if len(st.session_state.search_history) > 10: | |
st.session_state.search_history.pop() | |
def save_feedback(repo_id, feedback_type): | |
"""Save user feedback""" | |
if repo_id not in st.session_state.feedback_data: | |
st.session_state.feedback_data[repo_id] = {'likes': 0, 'dislikes': 0} | |
if feedback_type == 'like': | |
st.session_state.feedback_data[repo_id]['likes'] += 1 | |
else: | |
st.session_state.feedback_data[repo_id]['dislikes'] += 1 | |
def get_recommendations(query, data, model_manager, top_n=5): | |
"""Get repository recommendations""" | |
query_embedding = model_manager.generate_embedding(query) | |
similarities = data['embedding'].apply( | |
lambda x: cosine_similarity([query_embedding], [x])[0][0] | |
) | |
recommendations = data.assign(similarity=similarities)\ | |
.sort_values(by='similarity', ascending=False)\ | |
.head(top_n) | |
return recommendations | |
# Streamlit UI | |
def main(): | |
st.title("Repository Recommender System π") | |
# Sidebar with history | |
with st.sidebar: | |
st.header("Search History π") | |
if st.session_state.search_history: | |
for entry in st.session_state.search_history: | |
with st.expander(f"π {entry['timestamp']}", expanded=False): | |
st.write(f"Query: {entry['query']}") | |
for rec in entry['recommendations'][:3]: # Show top 3 | |
st.write(f"- {rec['repo']} ({rec['similarity']:.2%})") | |
else: | |
st.info("No search history yet") | |
# Main interface | |
st.markdown(""" | |
**Welcome to the Enhanced Repo_Recommender!** | |
Enter your project description to get personalized repository recommendations. | |
New features: | |
- π Search history (check sidebar) | |
- π Repository feedback | |
- β‘ Optimized performance | |
""") | |
# Initialize managers | |
model_manager = ModelManager() | |
data = DataManager.load_dataset() | |
# Compute embeddings if not already done | |
if 'embedding' not in data.columns: | |
data['embedding'] = DataManager.compute_embeddings(data, model_manager) | |
# User input | |
user_query = st.text_area( | |
"Describe your project:", | |
height=150, | |
placeholder="Example: I need a machine learning project for customer churn prediction..." | |
) | |
# Get recommendations | |
if st.button("Get Recommendations", type="primary"): | |
if user_query.strip(): | |
with st.spinner("Finding relevant repositories..."): | |
recommendations = get_recommendations(user_query, data, model_manager) | |
add_to_history(user_query, recommendations) | |
# Display recommendations | |
st.markdown("### π― Top Recommendations") | |
for idx, row in recommendations.iterrows(): | |
with st.expander(f"Repository {idx + 1}: {row['repo']}", expanded=True): | |
cols = st.columns([2, 1]) | |
with cols[0]: | |
st.markdown(f"**Path:** `{row['path']}`") | |
st.markdown(f"**Summary:** {row['summary']}") | |
st.markdown(f"**URL:** [View Repository]({row['url']})") | |
with cols[1]: | |
st.metric("Similarity", f"{row['similarity']:.2%}") | |
# Feedback buttons | |
feedback_cols = st.columns(2) | |
repo_id = f"{row['repo']}_{row['path']}" | |
with feedback_cols[0]: | |
if st.button("π", key=f"like_{repo_id}"): | |
save_feedback(repo_id, 'like') | |
st.success("Thanks for your feedback!") | |
with feedback_cols[1]: | |
if st.button("π", key=f"dislike_{repo_id}"): | |
save_feedback(repo_id, 'dislike') | |
st.success("Thanks for your feedback!") | |
# Show feedback stats | |
if repo_id in st.session_state.feedback_data: | |
stats = st.session_state.feedback_data[repo_id] | |
st.write(f"Likes: {stats['likes']} | Dislikes: {stats['dislikes']}") | |
if row['docstring']: | |
with st.expander("View Documentation"): | |
st.markdown(row['docstring']) | |
else: | |
st.warning("Please enter a project description.") | |
# Footer | |
st.markdown("---") | |
st.markdown("Made with π€ using CodeT5 and Streamlit") | |
if __name__ == "__main__": | |
main() | |
import warnings | |
warnings.filterwarnings('ignore') | |
import streamlit as st | |
import pandas as pd | |
import numpy as np | |
from sklearn.metrics.pairwise import cosine_similarity | |
from transformers import AutoTokenizer, AutoModel | |
import torch | |
import gdown | |
from pathlib import Path | |
from datetime import datetime | |
# Initialize session state | |
if 'search_history' not in st.session_state: | |
st.session_state.search_history = [] | |
if 'feedback_data' not in st.session_state: | |
st.session_state.feedback_data = {} | |
# Model Loading Optimization | |
def load_model_and_tokenizer(): | |
"""Optimized model loading with device placement""" | |
model_name = "Salesforce/codet5-small" | |
tokenizer = AutoTokenizer.from_pretrained(model_name) | |
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
model = AutoModel.from_pretrained(model_name).to(device) | |
model.eval() # Set model to evaluation mode | |
return tokenizer, model, device | |
def load_dataset(): | |
"""Load and prepare dataset""" | |
Path("data").mkdir(exist_ok=True) | |
dataset_path = "/content/drive/MyDrive/practice_ml/filtered_dataset.csv" | |
if not Path(dataset_path).exists(): | |
with st.spinner('Downloading dataset... This might take a few minutes...'): | |
url = "/content/drive/MyDrive/practice_ml" | |
gdown.download(url, dataset_path, quiet=False) | |
data = pd.read_csv(dataset_path) | |
data['text'] = data['docstring'].fillna('') + ' ' + data['summary'].fillna('') | |
return data | |
def generate_embedding(_tokenizer, _model, _device, text, max_length=512): | |
"""Generate embedding for a single text""" | |
with torch.no_grad(): | |
inputs = _tokenizer( | |
text, | |
return_tensors="pt", | |
padding=True, | |
truncation=True, | |
max_length=max_length | |
).to(_device) | |
outputs = _model.encoder(**inputs) | |
embedding = outputs.last_hidden_state.mean(dim=1).squeeze().cpu().numpy() | |
return embedding | |
def compute_embeddings(_data, _tokenizer, _model, _device): | |
"""Compute embeddings in batches""" | |
embeddings = [] | |
batch_size = 32 | |
texts = _data['text'].tolist() | |
with st.progress(0) as progress_bar: | |
progress_container = st.empty() | |
for i in range(0, len(texts), batch_size): | |
batch = texts[i:i+batch_size] | |
batch_embeddings = [ | |
generate_embedding(_tokenizer, _model, _device, text) | |
for text in batch | |
] | |
embeddings.extend(batch_embeddings) | |
progress_container.progress(min((i + batch_size) / len(texts), 1.0)) | |
return embeddings | |
def add_to_history(query, recommendations): | |
"""Add search to history""" | |
history_entry = { | |
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), | |
'query': query, | |
'recommendations': recommendations[['repo', 'path', 'url', 'similarity']].to_dict('records') | |
} | |
st.session_state.search_history.insert(0, history_entry) | |
if len(st.session_state.search_history) > 10: | |
st.session_state.search_history.pop() | |
def save_feedback(repo_id, feedback_type): | |
"""Save user feedback""" | |
if repo_id not in st.session_state.feedback_data: | |
st.session_state.feedback_data[repo_id] = {'likes': 0, 'dislikes': 0} | |
if feedback_type == 'like': | |
st.session_state.feedback_data[repo_id]['likes'] += 1 | |
else: | |
st.session_state.feedback_data[repo_id]['dislikes'] += 1 | |
def get_recommendations(query, data, tokenizer, model, device, top_n=5): | |
"""Get repository recommendations""" | |
query_embedding = generate_embedding(tokenizer, model, device, query) | |
similarities = [] | |
for emb in data['embedding']: | |
sim = cosine_similarity([query_embedding], [emb])[0][0] | |
similarities.append(sim) | |
recommendations = data.assign(similarity=similarities)\ | |
.sort_values(by='similarity', ascending=False)\ | |
.head(top_n) | |
return recommendations | |
def main(): | |
st.title("Repository Recommender System π") | |
# Sidebar with history | |
with st.sidebar: | |
st.header("Search History π") | |
if st.session_state.search_history: | |
for entry in st.session_state.search_history: | |
with st.expander(f"π {entry['timestamp']}", expanded=False): | |
st.write(f"Query: {entry['query']}") | |
for rec in entry['recommendations'][:3]: | |
st.write(f"- {rec['repo']} ({rec['similarity']:.2%})") | |
else: | |
st.info("No search history yet") | |
st.markdown(""" | |
**Welcome to the Enhanced Repo_Recommender!** | |
Enter your project description to get personalized repository recommendations. | |
New features: | |
- π Search history (check sidebar) | |
- π Repository feedback | |
- β‘ Optimized performance | |
""") | |
# Load resources | |
with st.spinner("Loading model and data..."): | |
tokenizer, model, device = load_model_and_tokenizer() | |
data = load_dataset() | |
# Compute embeddings if not already done | |
if 'embedding' not in data.columns: | |
data['embedding'] = compute_embeddings(data, tokenizer, model, device) | |
# User input | |
user_query = st.text_area( | |
"Describe your project:", | |
height=150, | |
placeholder="Example: I need a machine learning project for customer churn prediction..." | |
) | |
# Get recommendations | |
if st.button("Get Recommendations", type="primary"): | |
if user_query.strip(): | |
with st.spinner("Finding relevant repositories..."): | |
recommendations = get_recommendations( | |
user_query, data, tokenizer, model, device | |
) | |
add_to_history(user_query, recommendations) | |
# Display recommendations | |
st.markdown("### π― Top Recommendations") | |
for idx, row in recommendations.iterrows(): | |
with st.expander(f"Repository {idx + 1}: {row['repo']}", expanded=True): | |
cols = st.columns([2, 1]) | |
with cols[0]: | |
st.markdown(f"**Path:** `{row['path']}`") | |
st.markdown(f"**Summary:** {row['summary']}") | |
st.markdown(f"**URL:** [View Repository]({row['url']})") | |
with cols[1]: | |
st.metric("Similarity", f"{row['similarity']:.2%}") | |
# Feedback buttons | |
feedback_cols = st.columns(2) | |
repo_id = f"{row['repo']}_{row['path']}" | |
with feedback_cols[0]: | |
if st.button("π", key=f"like_{repo_id}"): | |
save_feedback(repo_id, 'like') | |
st.success("Thanks for your feedback!") | |
with feedback_cols[1]: | |
if st.button("π", key=f"dislike_{repo_id}"): | |
save_feedback(repo_id, 'dislike') | |
st.success("Thanks for your feedback!") | |
# Show feedback stats | |
if repo_id in st.session_state.feedback_data: | |
stats = st.session_state.feedback_data[repo_id] | |
st.write(f"Likes: {stats['likes']} | Dislikes: {stats['dislikes']}") | |
if row['docstring']: | |
with st.expander("View Documentation"): | |
st.markdown(row['docstring']) | |
else: | |
st.warning("Please enter a project description.") | |
# Footer | |
st.markdown("---") | |
st.markdown("Made with π€ using CodeT5 and Streamlit") | |
if __name__ == "__main__": | |
main() |