Spaces:
Sleeping
Sleeping
import gradio as gr | |
import os | |
import torch | |
import numpy as np | |
import random | |
from huggingface_hub import login, HfFolder | |
from transformers import AutoTokenizer, AutoModelForSequenceClassification, AutoModelForCausalLM, TextIteratorStreamer | |
from scipy.special import softmax | |
import logging | |
import spaces | |
from threading import Thread | |
from collections.abc import Iterator | |
import csv | |
# Increase CSV field size limit | |
csv.field_size_limit(1000000) | |
# Setup logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s') | |
# Set a seed for reproducibility | |
seed = 42 | |
np.random.seed(seed) | |
random.seed(seed) | |
torch.manual_seed(seed) | |
if torch.cuda.is_available(): | |
torch.cuda.manual_seed_all(seed) | |
# Login to Hugging Face | |
token = os.getenv("hf_token") | |
HfFolder.save_token(token) | |
login(token) | |
model_paths = [ | |
'karths/binary_classification_train_port', | |
'karths/binary_classification_train_perf', | |
"karths/binary_classification_train_main", | |
"karths/binary_classification_train_secu", | |
"karths/binary_classification_train_reli", | |
"karths/binary_classification_train_usab", | |
"karths/binary_classification_train_comp" | |
] | |
quality_mapping = { | |
'binary_classification_train_port': 'Portability', | |
'binary_classification_train_main': 'Maintainability', | |
'binary_classification_train_secu': 'Security', | |
'binary_classification_train_reli': 'Reliability', | |
'binary_classification_train_usab': 'Usability', | |
'binary_classification_train_perf': 'Performance', | |
'binary_classification_train_comp': 'Compatibility' | |
} | |
# Pre-load models and tokenizer for quality prediction | |
tokenizer = AutoTokenizer.from_pretrained("distilroberta-base") | |
models = {path: AutoModelForSequenceClassification.from_pretrained(path) for path in model_paths} | |
def get_quality_name(model_name): | |
return quality_mapping.get(model_name.split('/')[-1], "Unknown Quality") | |
def model_prediction(model, text, device): | |
model.to(device) | |
model.eval() | |
inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512) | |
inputs = {k: v.to(device) for k, v in inputs.items()} | |
with torch.no_grad(): | |
outputs = model(**inputs) | |
logits = outputs.logits | |
probs = softmax(logits.cpu().numpy(), axis=1) | |
avg_prob = np.mean(probs[:, 1]) | |
model.to("cpu") | |
return avg_prob | |
# --- Llama 3.2 3B Model Setup --- | |
LLAMA_MAX_MAX_NEW_TOKENS = 512 | |
LLAMA_DEFAULT_MAX_NEW_TOKENS = 512 | |
LLAMA_MAX_INPUT_TOKEN_LENGTH = int(os.getenv("MAX_INPUT_TOKEN_LENGTH", "1024")) | |
llama_device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | |
llama_model_id = "meta-llama/Llama-3.2-1B-Instruct" | |
llama_tokenizer = AutoTokenizer.from_pretrained(llama_model_id) | |
llama_model = AutoModelForCausalLM.from_pretrained( | |
llama_model_id, | |
device_map="auto", | |
torch_dtype=torch.bfloat16, | |
) | |
llama_model.eval() | |
if llama_tokenizer.pad_token is None: | |
llama_tokenizer.pad_token = llama_tokenizer.eos_token | |
def llama_generate( | |
message: str, | |
max_new_tokens: int = LLAMA_DEFAULT_MAX_NEW_TOKENS, | |
temperature: float = 0.3, | |
top_p: float = 0.9, | |
top_k: int = 50, | |
repetition_penalty: float = 1.2, | |
) -> str: | |
inputs = llama_tokenizer(message, return_tensors="pt", padding=True, truncation=True, max_length=LLAMA_MAX_INPUT_TOKEN_LENGTH).to(llama_model.device) | |
if inputs.input_ids.shape[1] > LLAMA_MAX_INPUT_TOKEN_LENGTH: | |
inputs.input_ids = inputs.input_ids[:, -LLAMA_MAX_INPUT_TOKEN_LENGTH:] | |
gr.Warning(f"Trimmed input from conversation as it was longer than {LLAMA_MAX_INPUT_TOKEN_LENGTH} tokens.") | |
with torch.no_grad(): | |
generate_ids = llama_model.generate( | |
**inputs, | |
max_new_tokens=max_new_tokens, | |
do_sample=True, | |
top_p=top_p, | |
top_k=top_k, | |
temperature=temperature, | |
num_beams=1, | |
repetition_penalty=repetition_penalty, | |
pad_token_id=llama_tokenizer.pad_token_id, | |
eos_token_id=llama_tokenizer.eos_token_id, | |
) | |
output_text = llama_tokenizer.decode(generate_ids[0], skip_special_tokens=True) | |
torch.cuda.empty_cache() | |
return output_text | |
def generate_explanation(issue_text, top_quality): | |
"""Generates an explanation for the *single* top quality above threshold.""" | |
if not top_quality: | |
return "<div style='color: red;'>No explanation available as no quality tags met the threshold.</div>" | |
quality_name = top_quality[0][0] # Get the name of the top quality | |
prompt = f""" | |
Given the following issue description: | |
--- | |
{issue_text} | |
--- | |
Explain why this issue might be classified as a **{quality_name}** issue. Provide a concise explanation, relating it back to the issue description. Keep the explanation short and concise. | |
""" | |
print(prompt) | |
try: | |
explanation = llama_generate(prompt) | |
# Format for better readability, directly including the quality name. | |
formatted_explanation = f"<p><b>{quality_name}:</b></p><p>{explanation}</p>" | |
return f"<div style='overflow-y: scroll; max-height: 400px;'>{formatted_explanation}</div>" | |
except Exception as e: | |
logging.error(f"Error during Llama generation: {e}") | |
return "<div style='color: red;'>An error occurred while generating the explanation.</div>" | |
# @spaces.GPU(duration=60) | |
def main_interface(text): | |
if not text.strip(): | |
return "<div style='color: red;'>No text provided. Please enter a valid issue description.</div>", "", "" | |
if len(text) < 30: | |
return "<div style='color: red;'>Text is less than 30 characters.</div>", "", "" | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
results = [] | |
for model_path, model in models.items(): | |
quality_name = get_quality_name(model_path) | |
avg_prob = model_prediction(model, text, device) | |
if avg_prob >= 0.95: # Keep *all* results above the threshold | |
results.append((quality_name, avg_prob)) | |
logging.info(f"Model: {model_path}, Quality: {quality_name}, Average Probability: {avg_prob:.3f}") | |
if not results: | |
return "<div style='color: red;'>No recommendation. Prediction probability is below the threshold.</div>", "", "" | |
# Sort and get the top result (if any meet the threshold) | |
top_result = sorted(results, key=lambda x: x[1], reverse=True) | |
if top_result: | |
top_quality = top_result[:1] # Select only the top result | |
output_html = render_html_output(top_quality) | |
explanation = generate_explanation(text, top_quality) | |
else: # Handle case no predictions >= 0.95 | |
output_html = "<div style='color: red;'>No quality tag met the prediction probability threshold (>= 0.95).</div>" | |
explanation = "" | |
return output_html, "", explanation | |
def render_html_output(top_qualities): | |
#Simplified to show only the top prediction | |
styles = """ | |
<style> | |
.quality-container { | |
font-family: Arial, sans-serif; | |
text-align: center; | |
margin-top: 20px; | |
} | |
.quality-label, .ranking { | |
display: inline-block; | |
padding: 0.5em 1em; | |
font-size: 18px; | |
font-weight: bold; | |
color: white; | |
background-color: #007bff; | |
border-radius: 0.5rem; | |
margin-right: 10px; | |
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.2); | |
} | |
</style> | |
""" | |
if not top_qualities: # Handle empty case | |
return styles + "<div class='quality-container'>No Top Prediction</div>" | |
quality, _ = top_qualities[0] #We know there is only one | |
html_content = f""" | |
<div class="quality-container"> | |
<span class="ranking">Top Prediction</span> | |
<span class="quality-label">{quality}</span> | |
</div> | |
""" | |
return styles + html_content | |
example_texts = [ | |
["The algorithm does not accurately distinguish between the positive and negative classes during edge cases.\n\nEnvironment: Production\nReproduction: Run the classifier on the test dataset with known edge cases."], | |
["The regression tests do not cover scenarios involving concurrent user sessions.\n\nEnvironment: Test automation suite\nReproduction: Update the test scripts to include tests for concurrent sessions."], | |
["There is frequent miscommunication between the development and QA teams regarding feature specifications.\n\nEnvironment: Inter-team meetings\nReproduction: Audit recent communication logs and meeting notes between the teams."], | |
["The service-oriented architecture does not effectively isolate failures, leading to cascading failures across services.\n\nEnvironment: Microservices architecture\nReproduction: Simulate a service failure and observe the impact on other services."] | |
] | |
# Improved CSS for better layout and appearance | |
css = """ | |
.quality-container { | |
font-family: Arial, sans-serif; | |
text-align: center; | |
margin-top: 20px; | |
padding: 10px; | |
border: 1px solid #ddd; /* Added border */ | |
border-radius: 8px; /* Rounded corners */ | |
background-color: #f9f9f9; /* Light background */ | |
} | |
.quality-label, .ranking { | |
display: inline-block; | |
padding: 0.5em 1em; | |
font-size: 18px; | |
font-weight: bold; | |
color: white; | |
background-color: #007bff; | |
border-radius: 0.5rem; | |
margin-right: 10px; | |
box-shadow: 0 2px 4px rgba(0, 0, 0, 0.2); | |
} | |
#explanation { | |
border: 1px solid #ccc; | |
padding: 10px; | |
margin-top: 10px; | |
border-radius: 4px; | |
background-color: #fff; /* White background for explanation */ | |
overflow-y: auto; /* Ensure scrollbar appears if needed */ | |
} | |
""" | |
interface = gr.Interface( | |
fn=main_interface, | |
inputs=gr.Textbox(lines=7, label="Issue Description", placeholder="Enter your issue text here"), | |
outputs=[ | |
gr.HTML(label="Prediction Output"), | |
gr.Textbox(label="Predictions", visible=False), | |
gr.Markdown(label="Explanation") | |
], | |
title="QualityTagger", | |
description="This tool classifies text into different quality domains such as Security, Usability,Mantainability, Reliability etc., and provides explanations.", | |
examples=example_texts, | |
css=css # Apply the CSS | |
) | |
interface.launch(share=True) | |