import os import json from typing import Dict, Any, List from pydantic import BaseModel, Field from dotenv import load_dotenv from langchain_google_genai import ChatGoogleGenerativeAI from langgraph.prebuilt import create_react_agent from langgraph_supervisor import create_supervisor from langchain_core.tools import tool from tavily import TavilyClient from langgraph.graph import StateGraph, END import gradio as gr # Load environment variables load_dotenv() GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") TAVILY_API_KEY = os.getenv("TAVILY_API_KEY") LANGSMITH_API_KEY = os.getenv("LANGSMITH_API_KEY") LANGSMITH_PROJECT = os.getenv("LANGSMITH_PROJECT", "profile-analyzer") if not GEMINI_API_KEY: raise ValueError("GEMINI_API_KEY not found in environment variables") if not TAVILY_API_KEY: raise ValueError("TAVILY_API_KEY not found in environment variables") os.environ["GOOGLE_API_KEY"] = GEMINI_API_KEY # Configure LangSmith tracing for public viewing if LANGSMITH_API_KEY: os.environ["LANGCHAIN_TRACING_V2"] = "true" os.environ["LANGCHAIN_ENDPOINT"] = "https://api.smith.langchain.com" os.environ["LANGCHAIN_API_KEY"] = LANGSMITH_API_KEY os.environ["LANGCHAIN_PROJECT"] = LANGSMITH_PROJECT print(f"๐Ÿ” LangSmith tracing enabled for project: {LANGSMITH_PROJECT}") print(f"๐ŸŒ View runs at: https://smith.langchain.com/o/default/p/{LANGSMITH_PROJECT}") else: print("โš ๏ธ LANGSMITH_API_KEY not set - tracing disabled") # Initialize Tavily client for real-time web search tavily_client = TavilyClient(api_key=TAVILY_API_KEY) # ============================================================================= # STRUCTURED OUTPUT MODEL # ============================================================================= class ProfileAnalysisResult(BaseModel): """Final structured output for profile analysis""" fn: str = Field(description="First name") ln: str = Field(description="Last name") probableBusinessEmail: str = Field(description="Probable business email address") title: str = Field(description="Current job title") isAJobChange: bool = Field(description="Whether person changed jobs") isAnICP: bool = Field(description="Whether person matches ICP criteria") currentCompany: str = Field(description="Current company name") # ============================================================================= # REACT AGENT TOOLS # ============================================================================= @tool def research_person_profile(first_name: str, last_name: str, known_company: str = "") -> Dict[str, Any]: """Research a person's current professional profile using robust LinkedIn search and snippet parsing.""" import re try: full_name = f"{first_name} {last_name}" search_results = [] linkedin_profiles = [] parsed_current_company = None parsed_current_title = None # STRATEGY 1: Targeted LinkedIn search with company context if known_company: linkedin_query = f'"{full_name}" "{known_company}" LinkedIn current job title' linkedin_results = tavily_client.search( query=linkedin_query, search_depth="advanced", include_domains=["linkedin.com"], max_results=3 ) search_results.extend(linkedin_results.get("results", [])) # STRATEGY 2: General LinkedIn profile search (ALWAYS DO THIS) general_query = f'"{full_name}" LinkedIn' general_results = tavily_client.search( query=general_query, search_depth="advanced", include_domains=["linkedin.com"], max_results=5 ) search_results.extend(general_results.get("results", [])) # STRATEGY 3: Search for current company (BoomerangAI) current_company_query = f'"{full_name}" BoomerangAI LinkedIn' current_company_results = tavily_client.search( query=current_company_query, search_depth="advanced", include_domains=["linkedin.com"], max_results=3 ) search_results.extend(current_company_results.get("results", [])) # STRATEGY 4: Location-based search (Pune) location_query = f'"{full_name}" Pune LinkedIn' location_results = tavily_client.search( query=location_query, search_depth="advanced", include_domains=["linkedin.com"], max_results=3 ) search_results.extend(location_results.get("results", [])) # STRATEGY 5: Company-specific search (if we know the company) if known_company: company_query = f'"{full_name}" "{known_company}" employee current role' company_results = tavily_client.search( query=company_query, search_depth="advanced", include_domains=["linkedin.com", "crunchbase.com", "zoominfo.com"], max_results=3 ) search_results.extend(company_results.get("results", [])) # STRATEGY 6: Recent news and job changes news_query = f'"{full_name}" new job company change recent' news_results = tavily_client.search( query=news_query, search_depth="basic", include_domains=["techcrunch.com", "linkedin.com", "twitter.com", "bloomberg.com"], max_results=3 ) # Remove duplicates and combine all results unique_results = [] seen_urls = set() for result in search_results: url = result.get("url", "") if url not in seen_urls: unique_results.append(result) seen_urls.add(url) # Prioritize LinkedIn profile URLs if "linkedin.com/in/" in url: linkedin_profiles.append(result) print(f"๐Ÿ” Found {len(linkedin_profiles)} LinkedIn profiles for {full_name}") # Robust snippet parsing for 'Present'/'Current' in LinkedIn profile results for i, profile in enumerate(linkedin_profiles): snippet = profile.get('snippet', '') or profile.get('description', '') or profile.get('content', '') print(f"๐Ÿ“„ Profile {i+1}: {profile.get('url', 'No URL')}") print(f"๐Ÿ“ Snippet: {snippet[:200]}...") # Look for 'Present' or 'Current' in the snippet (Experience section) # Pattern 1: "Co-Founder at BoomerangAI ยท Full-time ยท Jun 2023 - Present" match = re.search(r'([A-Za-z0-9\- &,.]+) at ([A-Za-z0-9\- &,.]+)[^\n]*Present', snippet) if match: parsed_current_title = match.group(1).strip(':-,|@') parsed_current_company = match.group(2).strip(':-,|@') print(f"โœ… Found Present role: {parsed_current_title} at {parsed_current_company}") break # Pattern 2: "Current: Title at Company" match2 = re.search(r'Current: ([A-Za-z0-9\- &,.]+) at ([A-Za-z0-9\- &,.]+)', snippet) if match2: parsed_current_title = match2.group(1).strip(':-,|@') parsed_current_company = match2.group(2).strip(':-,|@') print(f"โœ… Found Current role: {parsed_current_title} at {parsed_current_company}") break # Pattern 3: "at Company (Present)" match3 = re.search(r'at ([A-Za-z0-9\- &,.]+) \(Present\)', snippet) if match3: parsed_current_company = match3.group(1).strip() parsed_current_title = '' print(f"โœ… Found Present company: {parsed_current_company}") break # Pattern 4: Look for BoomerangAI specifically if 'BoomerangAI' in snippet or 'Boomerang' in snippet: # Try to extract title before BoomerangAI match4 = re.search(r'([A-Za-z0-9\- &,.]+) at BoomerangAI', snippet) if match4: parsed_current_title = match4.group(1).strip(':-,|@') parsed_current_company = 'BoomerangAI' print(f"โœ… Found BoomerangAI role: {parsed_current_title} at {parsed_current_company}") break # FALLBACK: If no current role found in snippets, look for the right profile and use company data if not parsed_current_company: print("๐Ÿ” Checking for correct profile based on location and company...") # Intelligent profile matching based on multiple criteria best_profile = None best_score = 0 for profile in linkedin_profiles: snippet = profile.get('snippet', '') or profile.get('description', '') or profile.get('content', '') url = profile.get('url', '') # Calculate profile relevance score score = 0 # Location matching (Pune, Maharashtra, India) if 'Pune' in snippet or 'Maharashtra' in snippet: score += 3 # Company mentions in profile if known_company and known_company.lower() in snippet.lower(): score += 2 # Profile completeness (has experience section) if 'Experience' in snippet or 'Present' in snippet or 'Current' in snippet: score += 2 # Profile activity (connections, followers) if 'connections' in snippet.lower() or 'followers' in snippet.lower(): score += 1 # URL pattern (shorter URLs often indicate main profiles) if len(url.split('/')) <= 5: score += 1 print(f"๐Ÿ“Š Profile score: {score} for {url}") if score > best_score: best_score = score best_profile = profile if best_profile and best_score >= 3: print(f"โœ… Found best matching profile: {best_profile.get('url', 'No URL')} (score: {best_score})") # Use Crunchbase data for current roles for result in unique_results: if 'crunchbase.com/person' in result.get('url', ''): cb_content = result.get('content', '') if full_name in cb_content: print(f"๐Ÿ” Crunchbase content: {cb_content[:200]}...") # Extract current roles from Crunchbase if 'current jobs' in cb_content.lower(): # Look for role patterns like "Co-Founder at Company" role_matches = re.findall(r'Co-Founder at ([A-Za-z0-9\- &,.]+?)(?: and|\.|$)', cb_content) if role_matches and len(role_matches) >= 2: # Use the second role (most recent) as current parsed_current_title = 'Co-Founder' parsed_current_company = role_matches[1].strip() print(f"โœ… Using Crunchbase data: {parsed_current_title} at {parsed_current_company}") break elif role_matches: # Fallback: use the first role if only one found parsed_current_title = 'Co-Founder' parsed_current_company = role_matches[0].strip() print(f"โœ… Using Crunchbase data (first role): {parsed_current_title} at {parsed_current_company}") break else: # Fallback: Look for "Co-Founder @ Company" pattern alt_matches = re.findall(r'([A-Za-z0-9\- &,.]+) @ ([A-Za-z0-9\- &,.]+)', cb_content) if alt_matches: parsed_current_title = alt_matches[0][0].strip() parsed_current_company = alt_matches[0][1].strip() print(f"โœ… Using Crunchbase data (alt): {parsed_current_title} at {parsed_current_company}") break else: # Final fallback: Extract from the sentence structure # Look for "has X current jobs as Role at Company" sentence_match = re.search(r'has \d+ current jobs as ([^,]+) at ([^,.]+)', cb_content) if sentence_match: parsed_current_title = sentence_match.group(1).strip() parsed_current_company = sentence_match.group(2).strip() print(f"โœ… Using Crunchbase data (sentence): {parsed_current_title} at {parsed_current_company}") break else: # Last resort: Extract the second role (most recent) from the sentence # "Co-Founder at BuyerAssist and Co-Founder at BoomerangAI" second_role_match = re.search(r'and ([A-Za-z0-9\- &,.]+) at ([A-Za-z0-9\- &,.]+)', cb_content) if second_role_match: parsed_current_title = second_role_match.group(1).strip() parsed_current_company = second_role_match.group(2).strip() print(f"โœ… Using Crunchbase data (second role): {parsed_current_title} at {parsed_current_company}") break # If Crunchbase didn't work, check company page data if not parsed_current_company: for result in unique_results: if 'linkedin.com/company/' in result.get('url', ''): company_content = result.get('content', '') if full_name in company_content: # Extract role from company page role_match = re.search(rf'{full_name} \(([^)]+)\)', company_content) if role_match: parsed_current_title = role_match.group(1).strip() # Extract company name from URL company_url = result.get('url', '') company_name = company_url.split('/company/')[-1].split('/')[0] parsed_current_company = company_name.replace('-', ' ').title() print(f"โœ… Using company page data: {parsed_current_title} at {parsed_current_company}") break if not parsed_current_company: print("โŒ No current company found in LinkedIn profiles") return { "current_company": "Unknown", # Will be filled by AI analysis "current_title": "Unknown", # Will be filled by AI analysis "confidence": 0.8, "search_results": unique_results, "news_results": news_results.get("results", []), "parsed_current_company": parsed_current_company, "parsed_current_title": parsed_current_title, "research_notes": f"Multi-strategy search: {len(unique_results)} unique results, {len(news_results.get('results', []))} news articles. Strategies: LinkedIn targeted, general profile, BoomerangAI search, Pune location, company-specific, news" } except Exception as e: return { "name": f"{first_name} {last_name}", "error": f"Search failed: {str(e)}", "data_source": "tavily_search_error" } @tool def detect_job_change(person_name: str, previous_company: str, current_company: str) -> Dict[str, Any]: """Analyze if person has changed jobs using comprehensive company relationship research.""" try: search_results = [] # STRATEGY 1: Direct company relationship research if previous_company and current_company: relationship_query = f'"{previous_company}" "{current_company}" merger acquisition rebranding subsidiary parent company relationship' relationship_results = tavily_client.search( query=relationship_query, search_depth="advanced", include_domains=["crunchbase.com", "linkedin.com", "wikipedia.org", "bloomberg.com"], max_results=5 ) search_results.extend(relationship_results.get("results", [])) # STRATEGY 2: Individual company research (for rebranding detection) if previous_company: previous_company_query = f'"{previous_company}" company rebranding acquisition merger current name' previous_results = tavily_client.search( query=previous_company_query, search_depth="advanced", include_domains=["crunchbase.com", "linkedin.com", "bloomberg.com", "techcrunch.com"], max_results=3 ) search_results.extend(previous_results.get("results", [])) # STRATEGY 3: Current company research (for acquisition detection) if current_company: current_company_query = f'"{current_company}" company history acquisition merger previous names' current_results = tavily_client.search( query=current_company_query, search_depth="advanced", include_domains=["crunchbase.com", "linkedin.com", "wikipedia.org", "bloomberg.com"], max_results=3 ) search_results.extend(current_results.get("results", [])) # STRATEGY 4: Recent news about company changes news_query = f'"{previous_company}" "{current_company}" company change news announcement rebranding' news_results = tavily_client.search( query=news_query, search_depth="basic", include_domains=["techcrunch.com", "linkedin.com", "twitter.com", "bloomberg.com", "news.ycombinator.com"], max_results=5 ) # STRATEGY 5: Industry-specific research (for sector changes) industry_query = f'"{person_name}" job change company transition industry' industry_results = tavily_client.search( query=industry_query, search_depth="basic", include_domains=["linkedin.com", "techcrunch.com"], max_results=2 ) search_results.extend(industry_results.get("results", [])) # Remove duplicates unique_results = [] seen_urls = set() for result in search_results: if result.get("url") not in seen_urls: unique_results.append(result) seen_urls.add(result.get("url")) return { "person": person_name, "previous_company": previous_company, "current_company": current_company, "job_change_detected": "Unknown", # Will be determined by AI "confidence": 0.9, "reason": "Requires AI analysis of comprehensive search results", "relationship_search": unique_results, "news_search": news_results.get("results", []), "ai_analysis": f"Multi-strategy company research: {len(unique_results)} unique results, {len(news_results.get('results', []))} news articles. Strategies: direct relationships, individual company history, recent news, industry transitions" } except Exception as e: return { "person": person_name, "error": f"Company research failed: {str(e)}", "data_source": "tavily_search_error" } @tool def assess_icp_match(person_title: str, company: str, criteria: str = "senior engineering leadership") -> Dict[str, Any]: """Assess if person matches Ideal Customer Profile criteria.""" try: title_lower = person_title.lower() # Check for senior engineering roles senior_roles = ["cto", "vp engineering", "engineering director", "principal engineer", "staff engineer"] is_match = any(role in title_lower for role in senior_roles) return { "title": person_title, "company": company, "criteria": criteria, "is_icp_match": is_match, "confidence": 0.9 if is_match else 0.1, "match_reason": "Senior engineering role" if is_match else "Not in target role" } except Exception as e: return { "title": person_title, "error": f"ICP assessment failed: {str(e)}", "data_source": "assessment_error" } @tool def find_business_email(first_name: str, last_name: str, company: str) -> Dict[str, Any]: """Generate probable business email addresses using real-time company research and LLM intelligence.""" try: # Research company website and email patterns company_query = f'"{company}" company website contact email domain' company_results = tavily_client.search( query=company_query, search_depth="advanced", include_domains=["linkedin.com", "crunchbase.com", "company websites"], max_results=3 ) # Search for existing employee emails or contact patterns email_query = f'"{company}" employee email format "@company.com" contact' email_results = tavily_client.search( query=email_query, search_depth="basic", include_domains=["linkedin.com", "github.com", "company websites"], max_results=3 ) # Use LLM to intelligently guess email based on gathered data email_guess_prompt = f""" Based on the following information, generate the most probable business email address: Person: {first_name} {last_name} Company: {company} Company Research Results: {company_results.get('results', [])} Email Pattern Results: {email_results.get('results', [])} Common email patterns to consider: 1. firstname.lastname@company.com 2. firstname@company.com 3. firstinitial.lastname@company.com 4. firstname_lastname@company.com 5. firstname@companydomain.com Instructions: - Analyze the search results for company domain information - Use common email naming conventions - If company domain is found, use it; otherwise make an educated guess - Return ONLY the email address, nothing else - If truly cannot determine, return "email@company.com" as placeholder """ try: # Get LLM response for email guessing email_response = llm.invoke(email_guess_prompt) probable_email = email_response.content.strip() # Clean up the response if probable_email.startswith('"') and probable_email.endswith('"'): probable_email = probable_email[1:-1] # Validate it looks like an email if '@' not in probable_email or '.' not in probable_email: probable_email = f"{first_name.lower()}.{last_name.lower()}@{company.lower().replace(' ', '')}.com" except Exception as llm_error: # Fallback to common pattern if LLM fails probable_email = f"{first_name.lower()}.{last_name.lower()}@{company.lower().replace(' ', '')}.com" # Extract domain from the probable email domain = probable_email.split('@')[1] if '@' in probable_email else "company.com" return { "person": f"{first_name} {last_name}", "company": company, "probable_email": probable_email, "domain": domain, "confidence": 0.7, "company_search": company_results.get("results", []), "email_search": email_results.get("results", []), "ai_analysis": f"LLM generated email based on {len(company_results.get('results', []))} company results and {len(email_results.get('results', []))} email pattern results" } except Exception as e: # Fallback to basic pattern if everything fails fallback_email = f"{first_name.lower()}.{last_name.lower()}@{company.lower().replace(' ', '')}.com" return { "person": f"{first_name} {last_name}", "company": company, "probable_email": fallback_email, "domain": company.lower().replace(' ', '') + ".com", "confidence": 0.5, "error": f"Email research failed: {str(e)}", "data_source": "fallback_pattern", "ai_analysis": "Used fallback email pattern due to search failure" } # ============================================================================= # CREATE REACT AGENTS # ============================================================================= # Create LLM llm = ChatGoogleGenerativeAI( model="gemini-2.5-flash", temperature=0, google_api_key=GEMINI_API_KEY ) # Create individual react agents profile_researcher = create_react_agent( model=llm, tools=[research_person_profile], prompt="""You are a Profile Research Agent. Research missing profile information using the research_person_profile tool. IMPORTANT: When analyzing search results, provide your findings in this EXACT format: 1. Current Company Name: [specific company name] 2. Current Job Title: [specific job title] 3. Job Change Status: [Yes/No] - [brief reason] 4. ICP Criteria Match: [Yes/No] - [brief reason] Be specific and clear. Use the exact format above for consistency.""", name="profile_researcher" ) job_analyst = create_react_agent( model=llm, tools=[detect_job_change], prompt="""You are a Job Change Detection Agent. Analyze employment transitions using the detect_job_change tool. IMPORTANT: Provide your analysis in this EXACT format: 1. Job Change Detected: [True/False] 2. Reason: [different companies, rebranding, acquisition, etc.] 3. Confidence Level: [High/Medium/Low] Use the exact format above for consistency.""", name="job_analyst" ) icp_assessor = create_react_agent( model=llm, tools=[assess_icp_match], prompt="""You are an ICP Assessment Agent. Evaluate if people fit the Ideal Customer Profile using the assess_icp_match tool. IMPORTANT: Provide your assessment in this EXACT format: 1. ICP Match: [Yes/No] 2. Reason: [specific reason for your assessment] 3. Confidence Level: [High/Medium/Low] Use the exact format above for consistency.""", name="icp_assessor" ) email_finder = create_react_agent( model=llm, tools=[find_business_email], prompt="""You are an Email Discovery Agent. Find and validate business emails using the find_business_email tool. IMPORTANT: Provide your findings in this EXACT format: 1. Most Probable Business Email: [email address] 2. Alternative Patterns: [if available] 3. Confidence Level: [High/Medium/Low] Use the exact format above for consistency.""", name="email_finder" ) # ============================================================================= # CREATE SUPERVISOR # ============================================================================= supervisor = create_supervisor( agents=[profile_researcher, job_analyst, icp_assessor, email_finder], model=llm, prompt=( "You manage a team of profile analysis agents with access to real-time web search data: " "profile_researcher (researches current employment using LinkedIn and web search), " "job_analyst (analyzes company relationships and job changes using business research), " "icp_assessor (evaluates ICP fit based on current role), and " "email_finder (discovers business email patterns using company research). " "INTELLIGENT COORDINATION STRATEGY:" "1. ALWAYS start with profile_researcher to get current employment info - this is your primary data source" "2. Use profile_researcher's findings to determine if you need job_analyst (only if there's a potential company change)" "3. Use icp_assessor to evaluate ICP fit based on the CURRENT role discovered by profile_researcher" "4. Use email_finder to discover business email at the CURRENT company (not the old one)" "SMART DECISION MAKING:" "- If profile_researcher finds the person at the same company (even if rebranded), skip job_analyst" "- If profile_researcher finds a completely different company, use job_analyst to understand the transition" "- Always prioritize profile_researcher's findings over input data - it has the most current information" "- Use job_analyst only when there's ambiguity about company relationships or transitions" "CRITICAL REQUIREMENT: After all agents complete their work, you MUST provide a FINAL SYNTHESIS " "that clearly states the following information in a structured format:" "- Current Company Name: [company]" "- Current Job Title: [title]" "- Job Change Status: [Yes/No] with reason: [explanation]" "- ICP Match Status: [Yes/No] with reason: [explanation]" "- Most Probable Business Email: [email]" "Each agent will provide search results that you need to analyze intelligently. " "Coordinate their research efforts based on what profile_researcher discovers first. " "Your final synthesis is crucial for data extraction." ) ).compile() # ============================================================================= # INTELLIGENT DATA EXTRACTION # ============================================================================= def extract_data_with_ai(agent_responses: List[str], profile_input: Dict) -> ProfileAnalysisResult: """Use AI to extract structured data from agent responses, with pre-processing for 'Present'/'Current' roles.""" import re import json # Helper: Try to extract current company/title from search results def extract_current_from_search(search_results): for result in search_results: snippet = result.get('snippet', '') or result.get('description', '') # Look for 'Present' or 'Current' in the snippet match = re.search(r'(?:Current|Present)[^:]*:?(.*?)( at | @ |\-|,|\n)([A-Za-z0-9 .&-]+)', snippet, re.IGNORECASE) if match: # Try to extract title and company title = match.group(1).strip(':-,|@') company = match.group(3).strip(':-,|@') if title and company: return company, title # Fallback: Look for 'at ' match2 = re.search(r'at ([A-Za-z0-9 .&-]+)', snippet) if match2: company = match2.group(1).strip() return company, '' return None, None # Try to get search_results and parsed_current_company/title from the agent_responses (if present) search_results = [] parsed_current_company = None parsed_current_title = None # First priority: Get parsed data from modified profile_input (direct from tool) if 'parsed_current_company' in profile_input: parsed_current_company = profile_input['parsed_current_company'] if 'parsed_current_title' in profile_input: parsed_current_title = profile_input['parsed_current_title'] # Second priority: Try to extract from agent_responses try: response_json = json.loads(agent_responses[0]) if isinstance(agent_responses[0], str) else agent_responses[0] if isinstance(response_json, dict): if 'search_results' in response_json: search_results = response_json['search_results'] if not parsed_current_company and response_json.get('parsed_current_company'): parsed_current_company = response_json['parsed_current_company'] if not parsed_current_title and response_json.get('parsed_current_title'): parsed_current_title = response_json['parsed_current_title'] except Exception: pass # Fallback: try to get search_results from profile_input (if present) if not search_results and 'search_results' in profile_input: search_results = profile_input['search_results'] # Pre-process: Try to extract current company/title from search results pre_company, pre_title = extract_current_from_search(search_results) if search_results else (None, None) # Improved extraction prompt extraction_prompt = f""" Given the following agent response, extract ONLY the most recent/current company and job title for the person named {profile_input.get('fn')} {profile_input.get('ln')}. - Ignore any past roles or companies. - If the text mentions 'Present', 'Current', or similar, use that company and title. - If multiple companies are listed, pick the one with the most recent start date or marked as 'Present'. - Return a JSON object with 'currentCompany', 'title', 'isAJobChange', 'isAnICP', and 'probableBusinessEmail' fields. - If you see Bloomberg as the current company, use it even if the query was for BuyerAssist. Agent Response: {agent_responses[0]} """ try: response = llm.invoke(extraction_prompt) if not response.content or not response.content.strip(): raise ValueError("LLM returned empty response") content = response.content.strip() if "```json" in content: start = content.find("```json") + 7 end = content.find("```", start) if end != -1: content = content[start:end] elif "```" in content: start = content.find("```") + 3 end = content.find("```", start) if end != -1: content = content[start:end] content = content.strip() print(f"๐Ÿ” Cleaned Response: {content}") extracted_data = json.loads(content) # Highest priority: Use parsed_current_company/title from snippet parsing if present if parsed_current_company: extracted_data['currentCompany'] = parsed_current_company if parsed_current_title: extracted_data['title'] = parsed_current_title # Next priority: Use regex pre-processing if found elif pre_company and pre_title: extracted_data['currentCompany'] = pre_company extracted_data['title'] = pre_title return ProfileAnalysisResult( fn=profile_input.get("fn", ""), ln=profile_input.get("ln", ""), currentCompany=extracted_data.get("currentCompany", "Unknown"), title=extracted_data.get("title", "Unknown"), isAJobChange=bool(extracted_data.get("isAJobChange", False)), isAnICP=bool(extracted_data.get("isAnICP", False)), probableBusinessEmail=extracted_data.get("probableBusinessEmail", "Unknown") ) except Exception as e: print(f"โŒ AI extraction failed: {e}") fallback_email = f"{profile_input.get('fn', '').lower()}.{profile_input.get('ln', '').lower()}@{profile_input.get('company', 'company').lower().replace(' ', '')}.com" return ProfileAnalysisResult( fn=profile_input.get("fn", ""), ln=profile_input.get("ln", ""), currentCompany=parsed_current_company or pre_company or profile_input.get("company", "Unknown"), title=parsed_current_title or pre_title or profile_input.get("title", "Unknown"), isAJobChange=False, isAnICP=False, probableBusinessEmail=fallback_email ) # ============================================================================= # MAIN EXECUTION # ============================================================================= def analyze_profile(profile_input: Dict[str, Any]) -> ProfileAnalysisResult: """Analyze profile using LangGraph supervisor and react agents""" print(f"๐Ÿค– LangGraph Supervisor analyzing: {profile_input}") # Create analysis request with specific instructions query = f""" Research and analyze this profile completely: CURRENT DATA: - Name: {profile_input.get('fn')} {profile_input.get('ln')} - Known Company: {profile_input.get('company', 'unknown')} - Known Title: {profile_input.get('title', 'unknown')} - Email: {profile_input.get('email', 'unknown')} - Location: {profile_input.get('location', 'unknown')} - ICP Criteria: {profile_input.get('icp', 'senior engineering leadership')} TASKS: 1. RESEARCH: Find this person's CURRENT company and title (the provided data might be outdated) 2. JOB CHANGE: Compare known company vs current company to detect job changes or rebranding 3. ICP ASSESSMENT: Check if current title matches the ICP criteria 4. EMAIL: Generate probable business email for their CURRENT company IMPORTANT: After all agents complete their work, synthesize the final results into a clear summary with: - Current Company Name - Current Job Title - Job Change Status (Yes/No with reason) - ICP Match Status (Yes/No with reason) - Most Probable Business Email Use your specialized agents and provide complete results. """ # Run supervisor with react agents and collect all results agent_results = {} all_messages = [] # Let LangGraph handle the flow control automatically for chunk in supervisor.stream({ "messages": [{"role": "user", "content": query}] }): print(chunk) # Extract agent results from chunks for agent_name in ['profile_researcher', 'job_analyst', 'icp_assessor', 'email_finder']: if agent_name in chunk: agent_results[agent_name] = chunk[agent_name] # Collect all messages for analysis - fix the extraction logic if 'supervisor' in chunk and 'messages' in chunk['supervisor']: all_messages.extend(chunk['supervisor']['messages']) # Use LangGraph's natural flow - let the supervisor synthesize results # The supervisor should have provided a final summary in the last message final_messages = [msg for msg in all_messages if hasattr(msg, 'content') and msg.content] if not final_messages: raise ValueError("No messages received from agents") # Get the supervisor's final synthesis (last message) supervisor_synthesis = final_messages[-1].content print(f"๐Ÿ” Supervisor Synthesis: {supervisor_synthesis}") # Extract parsed_current_company and parsed_current_title directly from profile_researcher results parsed_current_company = None parsed_current_title = None if 'profile_researcher' in agent_results: profile_result = agent_results['profile_researcher'] if hasattr(profile_result, 'messages') and profile_result.messages: for msg in profile_result.messages: if hasattr(msg, 'tool_calls') and msg.tool_calls: for tool_call in msg.tool_calls: if tool_call.get('name') == 'research_person_profile': try: tool_output = json.loads(tool_call.get('args', {})) if 'parsed_current_company' in tool_output: parsed_current_company = tool_output['parsed_current_company'] if 'parsed_current_title' in tool_output: parsed_current_title = tool_output['parsed_current_title'] print(f"๐Ÿ” Direct tool output - Company: {parsed_current_company}, Title: {parsed_current_title}") except Exception as e: print(f"โŒ Error parsing tool output: {e}") # Use AI to extract structured data from the supervisor's synthesis agent_responses = [supervisor_synthesis] # Only use the final synthesis # Create a modified profile_input with the parsed data modified_profile_input = profile_input.copy() if parsed_current_company: modified_profile_input['parsed_current_company'] = parsed_current_company if parsed_current_title: modified_profile_input['parsed_current_title'] = parsed_current_title return extract_data_with_ai(agent_responses, modified_profile_input) def analyze_profile_with_progress(profile_input: Dict[str, Any], progress) -> ProfileAnalysisResult: """Analyze profile with progress updates for Gradio UI""" try: progress(0.05, desc="๐Ÿ” Initializing analysis...") # Create analysis request with specific instructions query = f""" Research and analyze this profile completely: CURRENT DATA: - Name: {profile_input.get('fn')} {profile_input.get('ln')} - Known Company: {profile_input.get('company', 'unknown')} - Known Title: {profile_input.get('title', 'unknown')} - Email: {profile_input.get('email', 'unknown')} - Location: {profile_input.get('location', 'unknown')} - ICP Criteria: {profile_input.get('icp', 'senior engineering leadership')} TASKS: 1. RESEARCH: Find this person's CURRENT company and title (the provided data might be outdated) 2. JOB CHANGE: Compare known company vs current company to detect job changes or rebranding 3. ICP ASSESSMENT: Check if current title matches the ICP criteria 4. EMAIL: Generate probable business email for their CURRENT company IMPORTANT: After all agents complete their work, synthesize the final results into a clear summary with: - Current Company Name - Current Job Title - Job Change Status (Yes/No with reason) - ICP Match Status (Yes/No with reason) - Most Probable Business Email Use your specialized agents and provide complete results. """ progress(0.1, desc="๐Ÿค– Starting LangGraph supervisor...") # Run supervisor with react agents and collect all results agent_results = {} all_messages = [] agent_count = 0 tool_count = 0 step_count = 0 # Let LangGraph handle the flow control automatically for chunk in supervisor.stream({ "messages": [{"role": "user", "content": query}] }): print(chunk) step_count += 1 # Track agent executions with detailed progress for agent_name in ['profile_researcher', 'job_analyst', 'icp_assessor', 'email_finder']: if agent_name in chunk: if agent_name not in agent_results: agent_results[agent_name] = chunk[agent_name] agent_count += 1 progress(0.1 + (agent_count * 0.15), desc=f"๐Ÿ”„ {agent_name.replace('_', ' ').title()} executing...") # Track tool executions within each agent agent_data = chunk[agent_name] if hasattr(agent_data, 'messages') and agent_data.messages: for msg in agent_data.messages: if hasattr(msg, 'tool_calls') and msg.tool_calls: tool_count += len(msg.tool_calls) progress(0.1 + (agent_count * 0.15) + (tool_count * 0.02), desc=f"๐Ÿ”„ {agent_name.replace('_', ' ').title()} - Tool {tool_count} executing...") # Track supervisor decisions if 'supervisor' in chunk: if 'messages' in chunk['supervisor']: all_messages.extend(chunk['supervisor']['messages']) progress(0.1 + (agent_count * 0.15) + (tool_count * 0.02) + (step_count * 0.01), desc=f"๐Ÿง  Supervisor coordinating step {step_count}...") progress(0.8, desc="๐Ÿ“Š Processing final results...") # Use LangGraph's natural flow - let the supervisor synthesize results final_messages = [msg for msg in all_messages if hasattr(msg, 'content') and msg.content] if not final_messages: # Create a fallback result if no messages received progress(0.9, desc="โš ๏ธ Creating fallback result...") return ProfileAnalysisResult( fn=profile_input.get("fn", ""), ln=profile_input.get("ln", ""), currentCompany=profile_input.get("company", "Unknown"), title=profile_input.get("title", "Unknown"), isAJobChange=False, isAnICP=False, probableBusinessEmail=f"{profile_input.get('fn', '').lower()}.{profile_input.get('ln', '').lower()}@{profile_input.get('company', 'company').lower().replace(' ', '')}.com" ) # Get the supervisor's final synthesis (last message) supervisor_synthesis = final_messages[-1].content print(f"๐Ÿ” Supervisor Synthesis: {supervisor_synthesis}") progress(0.9, desc="๐Ÿ” Extracting structured data...") # Use AI to extract structured data from the supervisor's synthesis agent_responses = [supervisor_synthesis] result = extract_data_with_ai(agent_responses, profile_input) progress(1.0, desc=f"โœ… Analysis complete! Executed {agent_count} agents, {tool_count} tools, {step_count} steps") return result except Exception as e: progress(1.0, desc="โŒ Analysis failed - creating fallback result") print(f"Error in analysis: {e}") # Return a fallback result instead of crashing return ProfileAnalysisResult( fn=profile_input.get("fn", ""), ln=profile_input.get("ln", ""), currentCompany=profile_input.get("company", "Unknown"), title=profile_input.get("title", "Unknown"), isAJobChange=False, isAnICP=False, probableBusinessEmail=f"{profile_input.get('fn', '').lower()}.{profile_input.get('ln', '').lower()}@{profile_input.get('company', 'company').lower().replace(' ', '')}.com" ) def main(): # Test Case 1: Job Change (Mindtickle -> getboomerang.ai) test_case_1 = { "fn": "Vamsi Krishna", "ln": "Narra", "company": "", "location": "Pune", "email": "", "title": "", "icp": "" } print("๐Ÿ“‹ TEST CASE 1 - Job Change Scenario:") print(f"Input: {json.dumps(test_case_1, indent=2)}") print("-" * 60) result1 = analyze_profile(test_case_1) print("\n๐Ÿ“Š RESULT 1:") print(json.dumps(result1.model_dump(), indent=2)) print("\n" + "=" * 60) # Test Case 2: Real Job Change (BuyerAssist -> Bloomberg) test_case_2 = { "fn": "Amit", "ln": "Dugar", "company": "BuyerAssist", "location": "Pune", "email": "amit.dugar@buyerassist.io", "title": "CTO", "icp": "The person has to be in senior position in Engineer Vertical like VP Engineering, CTO, Research Fellow" } print("๐Ÿ“‹ TEST CASE 2 - Real Job Change (BuyerAssist -> Bloomberg)") print(f"Input: {json.dumps(test_case_2, indent=2)}") print("-" * 60) result2 = analyze_profile(test_case_2) print("\n๐Ÿ“Š RESULT 2:") print(json.dumps(result2.model_dump(), indent=2)) return result1, result2 #if __name__ == "__main__": # main() # Build Gradio Interface import gradio as gr # Create Gradio interface with gr.Blocks(title="Profile Analyzer App", theme=gr.themes.Soft(), css=""" .main-container { max-height: 100vh; overflow-y: auto; } .compact-input { margin-bottom: 2px; } .status-box { background-color: #f8f9fa; border-radius: 8px; } .result-box { background-color: #ffffff; border: 1px solid #dee2e6; } .test-case-btn { margin: 1px; } .section-header { margin: 4px 0 2px 0; font-weight: 600; font-size: 13px; } .header { margin: 4px 0; } .footer { margin: 4px 0; font-size: 11px; } .input-row { margin-bottom: 2px; } .analyze-btn { margin-top: 4px; } .minimal-header { margin: 2px 0; font-size: 16px; } .minimal-subheader { margin: 1px 0; font-size: 12px; } """) as demo: # Minimal Header gr.Markdown("# Profile Analyzer", elem_classes=["minimal-header"]) gr.Markdown("*AI-powered profile research for Job change and ICP detection*", elem_classes=["minimal-subheader"]) # Main container with two columns with gr.Row(): # Left Column - Inputs with gr.Column(scale=1): gr.Markdown("** Test Cases**", elem_classes=["section-header"]) with gr.Row(): test_case_1_btn = gr.Button("๐Ÿงช Test 1", size="sm", variant="secondary", scale=1, elem_classes=["test-case-btn"]) test_case_2_btn = gr.Button("๐Ÿงช Test 2", size="sm", variant="secondary", scale=1, elem_classes=["test-case-btn"]) gr.Markdown("** Profile Info**", elem_classes=["section-header"]) # Ultra-compact input layout with gr.Row(elem_classes=["input-row"]): fn = gr.Textbox(label="First Name", placeholder="First", scale=1, lines=1, elem_classes=["compact-input"]) ln = gr.Textbox(label="Last Name", placeholder="Last", scale=1, lines=1, elem_classes=["compact-input"]) with gr.Row(elem_classes=["input-row"]): company = gr.Textbox(label="Company", placeholder="Company", scale=1, lines=1, elem_classes=["compact-input"]) location = gr.Textbox(label="Location", placeholder="Location", scale=1, lines=1, elem_classes=["compact-input"]) with gr.Row(elem_classes=["input-row"]): email = gr.Textbox(label="Email", placeholder="Email", scale=1, lines=1, elem_classes=["compact-input"]) title = gr.Textbox(label="Title", placeholder="Title", scale=1, lines=1, elem_classes=["compact-input"]) icp = gr.Textbox( label="ICP Criteria", placeholder="e.g., senior engineering", lines=1, elem_classes=["compact-input"] ) # Analyze button analyze_btn = gr.Button("๐Ÿš€ Analyze", variant="primary", size="lg", elem_classes=["analyze-btn"]) # Right Column - Results with gr.Column(scale=1): gr.Markdown("** Results**", elem_classes=["section-header"]) # Status box (ultra-compact) status_box = gr.Textbox( label="๐Ÿ”„ Status", value="Ready - Click Analyze to start", lines=1, interactive=False, container=False, elem_classes=["status-box"] ) # Progress bar for visual feedback progress_bar = gr.Progress() # Output box (compact) output = gr.Textbox( label="๐Ÿ“Š Analysis Result", lines=6, max_lines=8, container=False, elem_classes=["result-box"] ) # Minimal footer note gr.Markdown("---") gr.Markdown("* Use test cases to populate fields quickly*", elem_classes=["footer"]) # Button click events def load_test_case_1(): return "Vamsi Krishna", "Narra", "", "Pune", "", "", "" def load_test_case_2(): return "Amit", "Dugar", "BuyerAssist", "Pune", "amit.dugar@buyerassist.io", "CTO", "The person has to be in senior position in Engineer Vertical like VP Engineering, CTO, Research Fellow" def analyze_profile_ui(fn, ln, company, location, email, title, icp, progress=gr.Progress()): """Analyze profile from UI inputs with progress updates""" if not fn or not ln: return "Error: First Name and Last Name are required", "Error: First Name and Last Name are required" test_case = { "fn": fn, "ln": ln, "company": company or "", "location": location or "", "email": email or "", "title": title or "", "icp": icp or "" } try: progress(0, desc="๐Ÿš€ Starting profile analysis...") # Start the analysis with progress tracking result = analyze_profile_with_progress(test_case, progress) return json.dumps(result.model_dump(), indent=2), "Analysis completed successfully!" except Exception as e: error_msg = f"Error: {str(e)}" return error_msg, error_msg # Connect button events test_case_1_btn.click( fn=load_test_case_1, outputs=[fn, ln, company, location, email, title, icp] ) test_case_2_btn.click( fn=load_test_case_2, outputs=[fn, ln, company, location, email, title, icp] ) analyze_btn.click( fn=analyze_profile_ui, inputs=[fn, ln, company, location, email, title, icp], outputs=[output, status_box], show_progress=True ) # Launch the demo if __name__ == "__main__": demo.launch(share=True, debug=True)