Spaces:
Sleeping
Sleeping
| import json | |
| import gradio as gr | |
| from transformers import AutoTokenizer, AutoModelForSeq2SeqLM, AutoModelForCausalLM, BitsAndBytesConfig | |
| import torch | |
| import os | |
| import gradio_client.utils as client_utils | |
| import sys | |
| import tempfile | |
| # =============================== | |
| # Recursion Handling Fix | |
| # =============================== | |
| def _patched_json_schema_to_python_type(schema, defs=None, depth=0): | |
| if depth > 100: | |
| return "Any" | |
| if isinstance(schema, bool): | |
| return "Any" if schema else "None" | |
| try: | |
| return client_utils._json_schema_to_python_type(schema, defs) | |
| except RecursionError: | |
| return "Any" | |
| client_utils._json_schema_to_python_type = _patched_json_schema_to_python_type | |
| sys.setrecursionlimit(10000) | |
| # =============================== | |
| # Device and Model Setup | |
| # =============================== | |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| hf_token = os.environ["HF_TOKEN"] | |
| model_path = "AI-Mock-Interviewer/T5" | |
| tokenizer = AutoTokenizer.from_pretrained(model_path) | |
| model = AutoModelForSeq2SeqLM.from_pretrained(model_path) | |
| model.to(device) | |
| bnb_config = BitsAndBytesConfig( | |
| load_in_8bit=True, | |
| llm_int8_enable_fp32_cpu_offload=True, | |
| ) | |
| qwq_model_id = "unsloth/QwQ-32B-unsloth-bnb-4bit" | |
| qwq_tokenizer = AutoTokenizer.from_pretrained(qwq_model_id, trust_remote_code=True) | |
| qwq_model = AutoModelForCausalLM.from_pretrained( | |
| qwq_model_id, | |
| quantization_config=bnb_config, | |
| device_map="auto", | |
| trust_remote_code=True | |
| ) | |
| # =============================== | |
| # Prompts and Scoring | |
| # =============================== | |
| system_prompt = """ | |
| You are conducting a mock technical interview. The candidate's experience level can be entry-level, mid-level, or senior-level... | |
| """ | |
| subtopic_keywords = { | |
| "data analysis": ["data cleaning", "missing data", "EDA", "visualization"], | |
| "machine learning": ["supervised learning", "overfitting", "hyperparameter tuning"], | |
| "software engineering": ["code optimization", "design patterns", "database design"], | |
| } | |
| rating_scores = {"Good": 3, "Average": 2, "Needs Improvement": 1} | |
| score_categories = [(90, "Excellent"), (75, "Very Good"), (60, "Good"), (45, "Average"), (0, "Needs Improvement")] | |
| # =============================== | |
| # Utility for Gradio Chat Format | |
| # =============================== | |
| def convert_for_gradio(convo): | |
| role_map = { | |
| "Interviewer": "assistant", | |
| "Candidate": "user", | |
| "Evaluator": "system", | |
| "System": "system" | |
| } | |
| return [{"role": role_map.get(msg["role"], "system"), "content": msg["content"]} for msg in convo] | |
| # =============================== | |
| # Core Functions | |
| # =============================== | |
| def identify_subtopic(question, domain): | |
| domain = domain.lower() | |
| if domain in subtopic_keywords: | |
| for subtopic in subtopic_keywords[domain]: | |
| if subtopic in question.lower(): | |
| return subtopic | |
| return None | |
| def generate_question(prompt, domain, state=None): | |
| full_prompt = system_prompt + "\n" + prompt | |
| tokenizer.padding_side = "left" | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| inputs = tokenizer(full_prompt, return_tensors="pt", padding=True, truncation=True).to(device) | |
| outputs = model.generate( | |
| inputs["input_ids"], | |
| attention_mask=inputs["attention_mask"], | |
| max_new_tokens=50, | |
| no_repeat_ngram_size=2, | |
| top_k=30, | |
| top_p=0.9, | |
| temperature=0.7, | |
| do_sample=True, | |
| pad_token_id=tokenizer.pad_token_id, | |
| ) | |
| question = tokenizer.decode(outputs[0], skip_special_tokens=True).strip() | |
| if not question.endswith("?"): | |
| question += "?" | |
| subtopic = identify_subtopic(question, domain) | |
| if state is not None: | |
| if question not in state["asked_questions"] and (subtopic is None or subtopic not in state["asked_subtopics"]): | |
| state["asked_questions"].append(question) | |
| if subtopic: | |
| state["asked_subtopics"].append(subtopic) | |
| return question | |
| return question | |
| def evaluate_response(response, question): | |
| qwq_tokenizer.padding_side = "left" | |
| if qwq_tokenizer.pad_token is None: | |
| qwq_tokenizer.pad_token = qwq_tokenizer.eos_token | |
| eval_prompt = ( | |
| "Evaluate the following candidate response to an interview question.\n\n" | |
| f"**Question:** {question}\n" | |
| f"**Candidate's Response:** {response}\n\n" | |
| "Provide a rating as: 'Good', 'Average', or 'Needs Improvement'.\n" | |
| "Also, provide a brief suggestion for improvement. Format:\n" | |
| "Rating: <Rating>\nSuggestion: <Suggestion>" | |
| ) | |
| inputs = qwq_tokenizer(eval_prompt, return_tensors="pt", padding=True, truncation=True).to(qwq_model.device) | |
| outputs = qwq_model.generate( | |
| inputs["input_ids"], | |
| attention_mask=inputs["attention_mask"], | |
| max_new_tokens=100, | |
| top_k=30, | |
| top_p=0.9, | |
| temperature=0.7, | |
| do_sample=True, | |
| pad_token_id=qwq_tokenizer.pad_token_id, | |
| ) | |
| evaluation = qwq_tokenizer.decode(outputs[0], skip_special_tokens=True) | |
| rating, suggestion = "Unknown", "No suggestion available." | |
| for line in evaluation.splitlines(): | |
| if "Rating:" in line: | |
| rating = line.split("Rating:")[1].strip() | |
| if "Suggestion:" in line: | |
| suggestion = line.split("Suggestion:")[1].strip() | |
| return rating, suggestion | |
| def reset_state(name, domain, company, level): | |
| return { | |
| "name": name, | |
| "domain": domain, | |
| "company": company, | |
| "level": level, | |
| "asked_questions": [], | |
| "asked_subtopics": [], | |
| "conversation": [], | |
| "evaluations": [], | |
| "interview_active": True | |
| } | |
| def start_interview(name, domain, company, level): | |
| try: | |
| print(f"Start Interview Called:\nName: {name}\nDomain: {domain}\nLevel: {level}") | |
| if not name or not domain: | |
| return [{"role": "system", "content": "Please provide a name and domain"}], None | |
| state = reset_state(name, domain, company, level) | |
| prompt = f"Domain: {domain}. Candidate experience level: {level}. Generate the first question:" | |
| question = generate_question(prompt, domain, state) | |
| state["conversation"].append({"role": "Interviewer", "content": question}) | |
| return convert_for_gradio(state["conversation"]), state | |
| except Exception as e: | |
| return [{"role": "system", "content": f"Critical error: {e}"}], None | |
| def submit_response(response, state): | |
| if state is None or not state.get("interview_active", False): | |
| return [{"role": "system", "content": "Interview is not active."}], state | |
| if not response or not response.strip(): | |
| state["conversation"].append({"role": "System", "content": "β οΈ Please answer the question before proceeding."}) | |
| return convert_for_gradio(state["conversation"]), state | |
| if response.strip().lower() == "exit": | |
| return end_interview(state) | |
| state["conversation"].append({"role": "Candidate", "content": response}) | |
| last_q = next((msg["content"] for msg in reversed(state["conversation"]) if msg["role"] == "Interviewer"), "") | |
| rating, suggestion = evaluate_response(response, last_q) | |
| state["evaluations"].append({ | |
| "question": last_q, | |
| "response": response, | |
| "rating": rating, | |
| "suggestion": suggestion | |
| }) | |
| state["conversation"].append({"role": "Evaluator", "content": f"Rating: {rating}\nSuggestion: {suggestion}"}) | |
| prompt = f"Domain: {state['domain']}. Candidate's last response: {response}. Generate a follow-up question:" | |
| follow_up = generate_question(prompt, state["domain"], state) | |
| state["conversation"].append({"role": "Interviewer", "content": follow_up}) | |
| return convert_for_gradio(state["conversation"]), state | |
| def end_interview(state): | |
| state["interview_active"] = False | |
| total = sum(rating_scores.get(ev["rating"], 0) for ev in state["evaluations"]) | |
| max_total = len(state["evaluations"]) * 3 | |
| percent = (total / max_total * 100) if max_total > 0 else 0 | |
| category = next(label for threshold, label in score_categories if percent >= threshold) | |
| summary = { | |
| "name": state["name"], | |
| "domain": state["domain"], | |
| "level": state["level"], | |
| "company": state["company"], | |
| "score": f"{total}/{max_total}", | |
| "percentage": round(percent, 2), | |
| "category": category, | |
| "evaluations": state["evaluations"] | |
| } | |
| filename = f"sessions/{state['name'].replace(' ', '_').lower()}_session.json" | |
| os.makedirs("sessions", exist_ok=True) | |
| with open(filename, "w") as f: | |
| json.dump(summary, f, indent=4) | |
| # Add detailed evaluations to the conversation | |
| for ev in state["evaluations"]: | |
| detail = ( | |
| f"π **Question:** {ev['question']}\n" | |
| f"π¬ **Your Response:** {ev['response']}\n" | |
| f"π’ **Rating:** {ev['rating']}\n" | |
| f"π‘ **Suggestion:** {ev['suggestion']}" | |
| ) | |
| state["conversation"].append({"role": "System", "content": detail}) | |
| state["conversation"].append({"role": "System", "content": f"β Interview ended. \nFinal Score: {summary['score']} ({summary['category']})"}) | |
| return convert_for_gradio(state["conversation"]), state | |
| def prepare_downloadable_summary(state): | |
| total_score = sum(rating_scores.get(ev["rating"], 0) for ev in state["evaluations"]) | |
| max_score = len(state["evaluations"]) * 3 | |
| percentage = (total_score / max_score * 100) if max_score > 0 else 0 | |
| category = next(label for threshold, label in score_categories if percentage >= threshold) | |
| summary = { | |
| "name": state["name"], | |
| "domain": state["domain"], | |
| "level": state["level"], | |
| "company": state["company"], | |
| "score": f"{total_score}/{max_score}", | |
| "percentage": round(percentage, 2), | |
| "category": category, | |
| "evaluations": state["evaluations"] | |
| } | |
| with tempfile.NamedTemporaryFile(mode="w+", delete=False, suffix=".json") as f: | |
| json.dump(summary, f, indent=4) | |
| return f.name | |
| def clear_state(): | |
| return [], reset_state("", "", "", "Entry-Level") | |
| # =============================== | |
| # Gradio UI | |
| # =============================== | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# π§ AI Mock Interview with Evaluation") | |
| with gr.Row(): | |
| name_input = gr.Textbox(label="Your Name") | |
| domain_input = gr.Textbox(label="Domain", placeholder="e.g. Software Engineering") | |
| company_input = gr.Textbox(label="Company (Optional)", placeholder="e.g. Google") | |
| level_input = gr.Dropdown( | |
| label="Experience Level", | |
| choices=["Entry-Level", "Mid-Level", "Senior-Level"], | |
| value="Entry-Level" | |
| ) | |
| start_button = gr.Button("Start Interview") | |
| chatbot = gr.Chatbot(label="Interview Conversation", height=450, type="messages") | |
| with gr.Row(): | |
| response_input = gr.Textbox(label="Your Response (type 'exit' to quit)", lines=2) | |
| submit_button = gr.Button("Submit") | |
| exit_button = gr.Button("Exit Interview") | |
| clear_button = gr.Button("Clear Session") | |
| with gr.Row(): | |
| download_button = gr.Button("π₯ Download Evaluation Report") | |
| download_file = gr.File(label="Download", visible=True) | |
| # Session state holder | |
| state = gr.State(value=reset_state("", "", "", "Entry-Level")) | |
| # Hooking up logic to UI | |
| start_button.click(start_interview, inputs=[name_input, domain_input, company_input, level_input], outputs=[chatbot, state]) | |
| submit_button.click(submit_response, inputs=[response_input, state], outputs=[chatbot, state]).then(lambda: "", None, response_input) | |
| exit_button.click(end_interview, inputs=state, outputs=[chatbot, state]) | |
| clear_button.click(clear_state, outputs=[chatbot, state]) | |
| download_button.click(prepare_downloadable_summary, inputs=[state], outputs=[download_file]) | |
| demo.launch() | |