practiceai / repository_recommender.py
frankjosh's picture
Upload repository_recommender.py
347e3cb verified
# -*- coding: utf-8 -*-
"""repository_recommender.ipynb
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/1qv09N8Vtcw5vr5NqCSfZonFeh1SQmVW5
"""
pip install pyarrow pandas numpy streamlit gdown torch transformers
import warnings
warnings.filterwarnings('ignore')
import streamlit as st
import pandas as pd
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from transformers import AutoTokenizer, AutoModel
import torch
import gdown
from pathlib import Path
from datetime import datetime
import json
# Initialize session state for history and feedback
if 'search_history' not in st.session_state:
st.session_state.search_history = []
if 'feedback_data' not in st.session_state:
st.session_state.feedback_data = {}
# Model Loading Optimization
class ModelManager:
def __init__(self):
self.model = None
self.tokenizer = None
self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
@st.cache_resource
def load_model_and_tokenizer(self):
"""Optimized model loading with device placement"""
model_name = "Salesforce/codet5-small"
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModel.from_pretrained(model_name).to(self.device)
model.eval() # Set model to evaluation mode
return tokenizer, model
def get_model_and_tokenizer(self):
if self.model is None or self.tokenizer is None:
self.tokenizer, self.model = self.load_model_and_tokenizer()
return self.tokenizer, self.model
@torch.no_grad() # Disable gradient computation
def generate_embedding(self, text, max_length=512):
"""Optimized embedding generation"""
tokenizer, model = self.get_model_and_tokenizer()
inputs = tokenizer(
text,
return_tensors="pt",
padding=True,
truncation=True,
max_length=max_length
).to(self.device)
outputs = model.encoder(**inputs)
embedding = outputs.last_hidden_state.mean(dim=1).squeeze().cpu().numpy()
return embedding
# Data Management
class DataManager:
@st.cache_resource
def load_dataset():
"""Load and prepare dataset"""
Path("data").mkdir(exist_ok=True)
dataset_path = "/content/drive/MyDrive/practice_ml/filtered_dataset.csv"
if not Path(dataset_path).exists():
with st.spinner('Downloading dataset... This might take a few minutes...'):
url = "/content/drive/MyDrive/practice_ml"
gdown.download(url, dataset_path, quiet=False)
data = pd.read_csv(dataset_path)
data['text'] = data['docstring'].fillna('') + ' ' + data['summary'].fillna('')
return data
@st.cache_data
def compute_embeddings(_data, _model_manager):
"""Compute embeddings in batches"""
embeddings = []
batch_size = 32
with st.progress(0) as progress_bar:
for i in range(0, len(_data), batch_size):
batch = _data['text'].iloc[i:i+batch_size]
batch_embeddings = [_model_manager.generate_embedding(text) for text in batch]
embeddings.extend(batch_embeddings)
progress_bar.progress(min((i + batch_size) / len(_data), 1.0))
return embeddings
# History and Feedback Management
def add_to_history(query, recommendations):
"""Add search to history"""
history_entry = {
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'query': query,
'recommendations': recommendations[['repo', 'path', 'url', 'similarity']].to_dict('records')
}
st.session_state.search_history.insert(0, history_entry)
# Keep only last 10 searches
if len(st.session_state.search_history) > 10:
st.session_state.search_history.pop()
def save_feedback(repo_id, feedback_type):
"""Save user feedback"""
if repo_id not in st.session_state.feedback_data:
st.session_state.feedback_data[repo_id] = {'likes': 0, 'dislikes': 0}
if feedback_type == 'like':
st.session_state.feedback_data[repo_id]['likes'] += 1
else:
st.session_state.feedback_data[repo_id]['dislikes'] += 1
def get_recommendations(query, data, model_manager, top_n=5):
"""Get repository recommendations"""
query_embedding = model_manager.generate_embedding(query)
similarities = data['embedding'].apply(
lambda x: cosine_similarity([query_embedding], [x])[0][0]
)
recommendations = data.assign(similarity=similarities)\
.sort_values(by='similarity', ascending=False)\
.head(top_n)
return recommendations
# Streamlit UI
def main():
st.title("Repository Recommender System πŸš€")
# Sidebar with history
with st.sidebar:
st.header("Search History πŸ“œ")
if st.session_state.search_history:
for entry in st.session_state.search_history:
with st.expander(f"πŸ” {entry['timestamp']}", expanded=False):
st.write(f"Query: {entry['query']}")
for rec in entry['recommendations'][:3]: # Show top 3
st.write(f"- {rec['repo']} ({rec['similarity']:.2%})")
else:
st.info("No search history yet")
# Main interface
st.markdown("""
**Welcome to the Enhanced Repo_Recommender!**
Enter your project description to get personalized repository recommendations.
New features:
- πŸ“œ Search history (check sidebar)
- πŸ‘ Repository feedback
- ⚑ Optimized performance
""")
# Initialize managers
model_manager = ModelManager()
data = DataManager.load_dataset()
# Compute embeddings if not already done
if 'embedding' not in data.columns:
data['embedding'] = DataManager.compute_embeddings(data, model_manager)
# User input
user_query = st.text_area(
"Describe your project:",
height=150,
placeholder="Example: I need a machine learning project for customer churn prediction..."
)
# Get recommendations
if st.button("Get Recommendations", type="primary"):
if user_query.strip():
with st.spinner("Finding relevant repositories..."):
recommendations = get_recommendations(user_query, data, model_manager)
add_to_history(user_query, recommendations)
# Display recommendations
st.markdown("### 🎯 Top Recommendations")
for idx, row in recommendations.iterrows():
with st.expander(f"Repository {idx + 1}: {row['repo']}", expanded=True):
cols = st.columns([2, 1])
with cols[0]:
st.markdown(f"**Path:** `{row['path']}`")
st.markdown(f"**Summary:** {row['summary']}")
st.markdown(f"**URL:** [View Repository]({row['url']})")
with cols[1]:
st.metric("Similarity", f"{row['similarity']:.2%}")
# Feedback buttons
feedback_cols = st.columns(2)
repo_id = f"{row['repo']}_{row['path']}"
with feedback_cols[0]:
if st.button("πŸ‘", key=f"like_{repo_id}"):
save_feedback(repo_id, 'like')
st.success("Thanks for your feedback!")
with feedback_cols[1]:
if st.button("πŸ‘Ž", key=f"dislike_{repo_id}"):
save_feedback(repo_id, 'dislike')
st.success("Thanks for your feedback!")
# Show feedback stats
if repo_id in st.session_state.feedback_data:
stats = st.session_state.feedback_data[repo_id]
st.write(f"Likes: {stats['likes']} | Dislikes: {stats['dislikes']}")
if row['docstring']:
with st.expander("View Documentation"):
st.markdown(row['docstring'])
else:
st.warning("Please enter a project description.")
# Footer
st.markdown("---")
st.markdown("Made with πŸ€– using CodeT5 and Streamlit")
if __name__ == "__main__":
main()
import warnings
warnings.filterwarnings('ignore')
import streamlit as st
import pandas as pd
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from transformers import AutoTokenizer, AutoModel
import torch
import gdown
from pathlib import Path
from datetime import datetime
# Initialize session state
if 'search_history' not in st.session_state:
st.session_state.search_history = []
if 'feedback_data' not in st.session_state:
st.session_state.feedback_data = {}
# Model Loading Optimization
@st.cache_resource
def load_model_and_tokenizer():
"""Optimized model loading with device placement"""
model_name = "Salesforce/codet5-small"
tokenizer = AutoTokenizer.from_pretrained(model_name)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = AutoModel.from_pretrained(model_name).to(device)
model.eval() # Set model to evaluation mode
return tokenizer, model, device
@st.cache_resource
def load_dataset():
"""Load and prepare dataset"""
Path("data").mkdir(exist_ok=True)
dataset_path = "/content/drive/MyDrive/practice_ml/filtered_dataset.csv"
if not Path(dataset_path).exists():
with st.spinner('Downloading dataset... This might take a few minutes...'):
url = "/content/drive/MyDrive/practice_ml"
gdown.download(url, dataset_path, quiet=False)
data = pd.read_csv(dataset_path)
data['text'] = data['docstring'].fillna('') + ' ' + data['summary'].fillna('')
return data
@st.cache_data
def generate_embedding(_tokenizer, _model, _device, text, max_length=512):
"""Generate embedding for a single text"""
with torch.no_grad():
inputs = _tokenizer(
text,
return_tensors="pt",
padding=True,
truncation=True,
max_length=max_length
).to(_device)
outputs = _model.encoder(**inputs)
embedding = outputs.last_hidden_state.mean(dim=1).squeeze().cpu().numpy()
return embedding
@st.cache_data
def compute_embeddings(_data, _tokenizer, _model, _device):
"""Compute embeddings in batches"""
embeddings = []
batch_size = 32
texts = _data['text'].tolist()
with st.progress(0) as progress_bar:
progress_container = st.empty()
for i in range(0, len(texts), batch_size):
batch = texts[i:i+batch_size]
batch_embeddings = [
generate_embedding(_tokenizer, _model, _device, text)
for text in batch
]
embeddings.extend(batch_embeddings)
progress_container.progress(min((i + batch_size) / len(texts), 1.0))
return embeddings
def add_to_history(query, recommendations):
"""Add search to history"""
history_entry = {
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'query': query,
'recommendations': recommendations[['repo', 'path', 'url', 'similarity']].to_dict('records')
}
st.session_state.search_history.insert(0, history_entry)
if len(st.session_state.search_history) > 10:
st.session_state.search_history.pop()
def save_feedback(repo_id, feedback_type):
"""Save user feedback"""
if repo_id not in st.session_state.feedback_data:
st.session_state.feedback_data[repo_id] = {'likes': 0, 'dislikes': 0}
if feedback_type == 'like':
st.session_state.feedback_data[repo_id]['likes'] += 1
else:
st.session_state.feedback_data[repo_id]['dislikes'] += 1
def get_recommendations(query, data, tokenizer, model, device, top_n=5):
"""Get repository recommendations"""
query_embedding = generate_embedding(tokenizer, model, device, query)
similarities = []
for emb in data['embedding']:
sim = cosine_similarity([query_embedding], [emb])[0][0]
similarities.append(sim)
recommendations = data.assign(similarity=similarities)\
.sort_values(by='similarity', ascending=False)\
.head(top_n)
return recommendations
def main():
st.title("Repository Recommender System πŸš€")
# Sidebar with history
with st.sidebar:
st.header("Search History πŸ“œ")
if st.session_state.search_history:
for entry in st.session_state.search_history:
with st.expander(f"πŸ” {entry['timestamp']}", expanded=False):
st.write(f"Query: {entry['query']}")
for rec in entry['recommendations'][:3]:
st.write(f"- {rec['repo']} ({rec['similarity']:.2%})")
else:
st.info("No search history yet")
st.markdown("""
**Welcome to the Enhanced Repo_Recommender!**
Enter your project description to get personalized repository recommendations.
New features:
- πŸ“œ Search history (check sidebar)
- πŸ‘ Repository feedback
- ⚑ Optimized performance
""")
# Load resources
with st.spinner("Loading model and data..."):
tokenizer, model, device = load_model_and_tokenizer()
data = load_dataset()
# Compute embeddings if not already done
if 'embedding' not in data.columns:
data['embedding'] = compute_embeddings(data, tokenizer, model, device)
# User input
user_query = st.text_area(
"Describe your project:",
height=150,
placeholder="Example: I need a machine learning project for customer churn prediction..."
)
# Get recommendations
if st.button("Get Recommendations", type="primary"):
if user_query.strip():
with st.spinner("Finding relevant repositories..."):
recommendations = get_recommendations(
user_query, data, tokenizer, model, device
)
add_to_history(user_query, recommendations)
# Display recommendations
st.markdown("### 🎯 Top Recommendations")
for idx, row in recommendations.iterrows():
with st.expander(f"Repository {idx + 1}: {row['repo']}", expanded=True):
cols = st.columns([2, 1])
with cols[0]:
st.markdown(f"**Path:** `{row['path']}`")
st.markdown(f"**Summary:** {row['summary']}")
st.markdown(f"**URL:** [View Repository]({row['url']})")
with cols[1]:
st.metric("Similarity", f"{row['similarity']:.2%}")
# Feedback buttons
feedback_cols = st.columns(2)
repo_id = f"{row['repo']}_{row['path']}"
with feedback_cols[0]:
if st.button("πŸ‘", key=f"like_{repo_id}"):
save_feedback(repo_id, 'like')
st.success("Thanks for your feedback!")
with feedback_cols[1]:
if st.button("πŸ‘Ž", key=f"dislike_{repo_id}"):
save_feedback(repo_id, 'dislike')
st.success("Thanks for your feedback!")
# Show feedback stats
if repo_id in st.session_state.feedback_data:
stats = st.session_state.feedback_data[repo_id]
st.write(f"Likes: {stats['likes']} | Dislikes: {stats['dislikes']}")
if row['docstring']:
with st.expander("View Documentation"):
st.markdown(row['docstring'])
else:
st.warning("Please enter a project description.")
# Footer
st.markdown("---")
st.markdown("Made with πŸ€– using CodeT5 and Streamlit")
if __name__ == "__main__":
main()