# -*- coding: utf-8 -*- """repository_recommender.ipynb Automatically generated by Colab. Original file is located at https://colab.research.google.com/drive/1qv09N8Vtcw5vr5NqCSfZonFeh1SQmVW5 """ pip install pyarrow pandas numpy streamlit gdown torch transformers import warnings warnings.filterwarnings('ignore') import streamlit as st import pandas as pd import numpy as np from sklearn.metrics.pairwise import cosine_similarity from transformers import AutoTokenizer, AutoModel import torch import gdown from pathlib import Path from datetime import datetime import json # Initialize session state for history and feedback if 'search_history' not in st.session_state: st.session_state.search_history = [] if 'feedback_data' not in st.session_state: st.session_state.feedback_data = {} # Model Loading Optimization class ModelManager: def __init__(self): self.model = None self.tokenizer = None self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') @st.cache_resource def load_model_and_tokenizer(self): """Optimized model loading with device placement""" model_name = "Salesforce/codet5-small" tokenizer = AutoTokenizer.from_pretrained(model_name) model = AutoModel.from_pretrained(model_name).to(self.device) model.eval() # Set model to evaluation mode return tokenizer, model def get_model_and_tokenizer(self): if self.model is None or self.tokenizer is None: self.tokenizer, self.model = self.load_model_and_tokenizer() return self.tokenizer, self.model @torch.no_grad() # Disable gradient computation def generate_embedding(self, text, max_length=512): """Optimized embedding generation""" tokenizer, model = self.get_model_and_tokenizer() inputs = tokenizer( text, return_tensors="pt", padding=True, truncation=True, max_length=max_length ).to(self.device) outputs = model.encoder(**inputs) embedding = outputs.last_hidden_state.mean(dim=1).squeeze().cpu().numpy() return embedding # Data Management class DataManager: @st.cache_resource def load_dataset(): """Load and prepare dataset""" Path("data").mkdir(exist_ok=True) dataset_path = "/content/drive/MyDrive/practice_ml/filtered_dataset.csv" if not Path(dataset_path).exists(): with st.spinner('Downloading dataset... This might take a few minutes...'): url = "/content/drive/MyDrive/practice_ml" gdown.download(url, dataset_path, quiet=False) data = pd.read_csv(dataset_path) data['text'] = data['docstring'].fillna('') + ' ' + data['summary'].fillna('') return data @st.cache_data def compute_embeddings(_data, _model_manager): """Compute embeddings in batches""" embeddings = [] batch_size = 32 with st.progress(0) as progress_bar: for i in range(0, len(_data), batch_size): batch = _data['text'].iloc[i:i+batch_size] batch_embeddings = [_model_manager.generate_embedding(text) for text in batch] embeddings.extend(batch_embeddings) progress_bar.progress(min((i + batch_size) / len(_data), 1.0)) return embeddings # History and Feedback Management def add_to_history(query, recommendations): """Add search to history""" history_entry = { 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'query': query, 'recommendations': recommendations[['repo', 'path', 'url', 'similarity']].to_dict('records') } st.session_state.search_history.insert(0, history_entry) # Keep only last 10 searches if len(st.session_state.search_history) > 10: st.session_state.search_history.pop() def save_feedback(repo_id, feedback_type): """Save user feedback""" if repo_id not in st.session_state.feedback_data: st.session_state.feedback_data[repo_id] = {'likes': 0, 'dislikes': 0} if feedback_type == 'like': st.session_state.feedback_data[repo_id]['likes'] += 1 else: st.session_state.feedback_data[repo_id]['dislikes'] += 1 def get_recommendations(query, data, model_manager, top_n=5): """Get repository recommendations""" query_embedding = model_manager.generate_embedding(query) similarities = data['embedding'].apply( lambda x: cosine_similarity([query_embedding], [x])[0][0] ) recommendations = data.assign(similarity=similarities)\ .sort_values(by='similarity', ascending=False)\ .head(top_n) return recommendations # Streamlit UI def main(): st.title("Repository Recommender System 🚀") # Sidebar with history with st.sidebar: st.header("Search History 📜") if st.session_state.search_history: for entry in st.session_state.search_history: with st.expander(f"🔍 {entry['timestamp']}", expanded=False): st.write(f"Query: {entry['query']}") for rec in entry['recommendations'][:3]: # Show top 3 st.write(f"- {rec['repo']} ({rec['similarity']:.2%})") else: st.info("No search history yet") # Main interface st.markdown(""" **Welcome to the Enhanced Repo_Recommender!** Enter your project description to get personalized repository recommendations. New features: - 📜 Search history (check sidebar) - 👍 Repository feedback - ⚡ Optimized performance """) # Initialize managers model_manager = ModelManager() data = DataManager.load_dataset() # Compute embeddings if not already done if 'embedding' not in data.columns: data['embedding'] = DataManager.compute_embeddings(data, model_manager) # User input user_query = st.text_area( "Describe your project:", height=150, placeholder="Example: I need a machine learning project for customer churn prediction..." ) # Get recommendations if st.button("Get Recommendations", type="primary"): if user_query.strip(): with st.spinner("Finding relevant repositories..."): recommendations = get_recommendations(user_query, data, model_manager) add_to_history(user_query, recommendations) # Display recommendations st.markdown("### 🎯 Top Recommendations") for idx, row in recommendations.iterrows(): with st.expander(f"Repository {idx + 1}: {row['repo']}", expanded=True): cols = st.columns([2, 1]) with cols[0]: st.markdown(f"**Path:** `{row['path']}`") st.markdown(f"**Summary:** {row['summary']}") st.markdown(f"**URL:** [View Repository]({row['url']})") with cols[1]: st.metric("Similarity", f"{row['similarity']:.2%}") # Feedback buttons feedback_cols = st.columns(2) repo_id = f"{row['repo']}_{row['path']}" with feedback_cols[0]: if st.button("👍", key=f"like_{repo_id}"): save_feedback(repo_id, 'like') st.success("Thanks for your feedback!") with feedback_cols[1]: if st.button("👎", key=f"dislike_{repo_id}"): save_feedback(repo_id, 'dislike') st.success("Thanks for your feedback!") # Show feedback stats if repo_id in st.session_state.feedback_data: stats = st.session_state.feedback_data[repo_id] st.write(f"Likes: {stats['likes']} | Dislikes: {stats['dislikes']}") if row['docstring']: with st.expander("View Documentation"): st.markdown(row['docstring']) else: st.warning("Please enter a project description.") # Footer st.markdown("---") st.markdown("Made with 🤖 using CodeT5 and Streamlit") if __name__ == "__main__": main() import warnings warnings.filterwarnings('ignore') import streamlit as st import pandas as pd import numpy as np from sklearn.metrics.pairwise import cosine_similarity from transformers import AutoTokenizer, AutoModel import torch import gdown from pathlib import Path from datetime import datetime # Initialize session state if 'search_history' not in st.session_state: st.session_state.search_history = [] if 'feedback_data' not in st.session_state: st.session_state.feedback_data = {} # Model Loading Optimization @st.cache_resource def load_model_and_tokenizer(): """Optimized model loading with device placement""" model_name = "Salesforce/codet5-small" tokenizer = AutoTokenizer.from_pretrained(model_name) device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') model = AutoModel.from_pretrained(model_name).to(device) model.eval() # Set model to evaluation mode return tokenizer, model, device @st.cache_resource def load_dataset(): """Load and prepare dataset""" Path("data").mkdir(exist_ok=True) dataset_path = "/content/drive/MyDrive/practice_ml/filtered_dataset.csv" if not Path(dataset_path).exists(): with st.spinner('Downloading dataset... This might take a few minutes...'): url = "/content/drive/MyDrive/practice_ml" gdown.download(url, dataset_path, quiet=False) data = pd.read_csv(dataset_path) data['text'] = data['docstring'].fillna('') + ' ' + data['summary'].fillna('') return data @st.cache_data def generate_embedding(_tokenizer, _model, _device, text, max_length=512): """Generate embedding for a single text""" with torch.no_grad(): inputs = _tokenizer( text, return_tensors="pt", padding=True, truncation=True, max_length=max_length ).to(_device) outputs = _model.encoder(**inputs) embedding = outputs.last_hidden_state.mean(dim=1).squeeze().cpu().numpy() return embedding @st.cache_data def compute_embeddings(_data, _tokenizer, _model, _device): """Compute embeddings in batches""" embeddings = [] batch_size = 32 texts = _data['text'].tolist() with st.progress(0) as progress_bar: progress_container = st.empty() for i in range(0, len(texts), batch_size): batch = texts[i:i+batch_size] batch_embeddings = [ generate_embedding(_tokenizer, _model, _device, text) for text in batch ] embeddings.extend(batch_embeddings) progress_container.progress(min((i + batch_size) / len(texts), 1.0)) return embeddings def add_to_history(query, recommendations): """Add search to history""" history_entry = { 'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), 'query': query, 'recommendations': recommendations[['repo', 'path', 'url', 'similarity']].to_dict('records') } st.session_state.search_history.insert(0, history_entry) if len(st.session_state.search_history) > 10: st.session_state.search_history.pop() def save_feedback(repo_id, feedback_type): """Save user feedback""" if repo_id not in st.session_state.feedback_data: st.session_state.feedback_data[repo_id] = {'likes': 0, 'dislikes': 0} if feedback_type == 'like': st.session_state.feedback_data[repo_id]['likes'] += 1 else: st.session_state.feedback_data[repo_id]['dislikes'] += 1 def get_recommendations(query, data, tokenizer, model, device, top_n=5): """Get repository recommendations""" query_embedding = generate_embedding(tokenizer, model, device, query) similarities = [] for emb in data['embedding']: sim = cosine_similarity([query_embedding], [emb])[0][0] similarities.append(sim) recommendations = data.assign(similarity=similarities)\ .sort_values(by='similarity', ascending=False)\ .head(top_n) return recommendations def main(): st.title("Repository Recommender System 🚀") # Sidebar with history with st.sidebar: st.header("Search History 📜") if st.session_state.search_history: for entry in st.session_state.search_history: with st.expander(f"🔍 {entry['timestamp']}", expanded=False): st.write(f"Query: {entry['query']}") for rec in entry['recommendations'][:3]: st.write(f"- {rec['repo']} ({rec['similarity']:.2%})") else: st.info("No search history yet") st.markdown(""" **Welcome to the Enhanced Repo_Recommender!** Enter your project description to get personalized repository recommendations. New features: - 📜 Search history (check sidebar) - 👍 Repository feedback - ⚡ Optimized performance """) # Load resources with st.spinner("Loading model and data..."): tokenizer, model, device = load_model_and_tokenizer() data = load_dataset() # Compute embeddings if not already done if 'embedding' not in data.columns: data['embedding'] = compute_embeddings(data, tokenizer, model, device) # User input user_query = st.text_area( "Describe your project:", height=150, placeholder="Example: I need a machine learning project for customer churn prediction..." ) # Get recommendations if st.button("Get Recommendations", type="primary"): if user_query.strip(): with st.spinner("Finding relevant repositories..."): recommendations = get_recommendations( user_query, data, tokenizer, model, device ) add_to_history(user_query, recommendations) # Display recommendations st.markdown("### 🎯 Top Recommendations") for idx, row in recommendations.iterrows(): with st.expander(f"Repository {idx + 1}: {row['repo']}", expanded=True): cols = st.columns([2, 1]) with cols[0]: st.markdown(f"**Path:** `{row['path']}`") st.markdown(f"**Summary:** {row['summary']}") st.markdown(f"**URL:** [View Repository]({row['url']})") with cols[1]: st.metric("Similarity", f"{row['similarity']:.2%}") # Feedback buttons feedback_cols = st.columns(2) repo_id = f"{row['repo']}_{row['path']}" with feedback_cols[0]: if st.button("👍", key=f"like_{repo_id}"): save_feedback(repo_id, 'like') st.success("Thanks for your feedback!") with feedback_cols[1]: if st.button("👎", key=f"dislike_{repo_id}"): save_feedback(repo_id, 'dislike') st.success("Thanks for your feedback!") # Show feedback stats if repo_id in st.session_state.feedback_data: stats = st.session_state.feedback_data[repo_id] st.write(f"Likes: {stats['likes']} | Dislikes: {stats['dislikes']}") if row['docstring']: with st.expander("View Documentation"): st.markdown(row['docstring']) else: st.warning("Please enter a project description.") # Footer st.markdown("---") st.markdown("Made with 🤖 using CodeT5 and Streamlit") if __name__ == "__main__": main()