Spaces:
Sleeping
Sleeping
import gradio as gr | |
import os | |
import json | |
from google.oauth2.credentials import Credentials | |
from google_auth_oauthlib.flow import InstalledAppFlow | |
from google.auth.transport.requests import Request | |
from googleapiclient.discovery import build | |
import base64 | |
from datetime import datetime, timedelta | |
# Gmail API scope | |
# Gmail API scope | |
SCOPES = ['https://www.googleapis.com/auth/gmail.readonly'] | |
# Instead of reading from file, get credentials from environment variables | |
CLIENT_CONFIG = { | |
"web": { # Changed to "web" for web credentials | |
"client_id": os.environ.get('GOOGLE_CLIENT_ID'), | |
"client_secret": os.environ.get('GOOGLE_CLIENT_SECRET'), | |
"redirect_uris": ["https://huggingface.co/spaces/xiddiqui/recruitment-helper"], # Standard Google OAuth endpoint | |
"auth_uri": "https://accounts.google.com/o/oauth2/auth", | |
"token_uri": "https://oauth2.googleapis.com/token", | |
} | |
} | |
# Keywords for filtering emails | |
KEYWORDS = { | |
'job_related': [ | |
'job opportunity', 'position', 'career', 'recruitment', 'hiring', | |
'interview', 'resume', 'CV', 'application', 'job posting', | |
'employment', 'role', 'vacancy', 'opening' | |
], | |
'personal': [ | |
'personal', 'private', 'confidential', 'family', 'friend', | |
'social', 'invitation', 'gathering', 'meetup' | |
] | |
} | |
def get_gmail_service(state_dict): | |
"""Creates Gmail API service""" | |
creds = None | |
# Check if token exists in state | |
if 'token' in state_dict: | |
creds = Credentials.from_authorized_user_info(state_dict['token'], SCOPES) | |
# If credentials are invalid or don't exist | |
if not creds or not creds.valid: | |
if creds and creds.expired and creds.refresh_token: | |
creds.refresh(Request()) | |
else: | |
# Create flow without running local server | |
flow = InstalledAppFlow.from_client_config(CLIENT_CONFIG, SCOPES) | |
# Generate authorization URL | |
auth_url, _ = flow.authorization_url( | |
access_type='offline', | |
include_granted_scopes='true' | |
) | |
# Return the authorization URL to the user | |
return f"Please visit this URL to authorize the application: {auth_url}\n\nAfter authorization, you will receive a code. Please enter it in the application." | |
return build('gmail', 'v1', credentials=creds) | |
def get_email_content(service, msg_id): | |
"""Retrieves email content and metadata""" | |
try: | |
message = service.users().messages().get(userId='me', id=msg_id, format='full').execute() | |
headers = message['payload']['headers'] | |
subject = next((h['value'] for h in headers if h['name'].lower() == 'subject'), 'No Subject') | |
from_email = next((h['value'] for h in headers if h['name'].lower() == 'from'), 'No Sender') | |
date = next((h['value'] for h in headers if h['name'].lower() == 'date'), 'No Date') | |
# Get email body | |
if 'parts' in message['payload']: | |
parts = message['payload']['parts'] | |
data = parts[0]['body'].get('data', '') | |
else: | |
data = message['payload']['body'].get('data', '') | |
if data: | |
text = base64.urlsafe_b64decode(data).decode('utf-8') | |
else: | |
text = "No content available" | |
# Check for attachments | |
attachments = [] | |
if 'parts' in message['payload']: | |
for part in message['payload']['parts']: | |
if 'filename' in part and part['filename']: | |
attachments.append(part['filename']) | |
return { | |
'subject': subject, | |
'from': from_email, | |
'date': date, | |
'content': text, | |
'attachments': attachments | |
} | |
except Exception as e: | |
return f"Error retrieving email: {str(e)}" | |
def classify_email(email_data): | |
"""Classifies email based on content and attachments""" | |
text = f"{email_data['subject']} {email_data['content']}".lower() | |
# Check attachments for CV/Resume | |
has_cv = any( | |
att.lower().endswith(('.pdf', '.doc', '.docx')) or | |
any(kw in att.lower() for kw in ['cv', 'resume']) | |
for att in email_data['attachments'] | |
) | |
# Check content for keywords | |
is_job_related = has_cv or any(kw.lower() in text for kw in KEYWORDS['job_related']) | |
is_personal = any(kw.lower() in text for kw in KEYWORDS['personal']) | |
return { | |
'job_related': is_job_related, | |
'personal': is_personal, | |
'has_cv': has_cv | |
} | |
def fetch_emails(days_back, auth_code="", include_job=True, include_personal=True, progress=gr.Progress()): | |
"""Main function to fetch and filter emails""" | |
if not auth_code: | |
try: | |
# Configure OAuth2 flow for web application | |
flow = InstalledAppFlow.from_client_config( | |
CLIENT_CONFIG, | |
SCOPES, | |
redirect_uri="https://accounts.google.com/o/oauth2/auth" # Standard Google OAuth endpoint | |
) | |
# Generate authorization URL | |
auth_url, _ = flow.authorization_url( | |
access_type='offline', | |
include_granted_scopes='true' | |
) | |
return f"""Please follow these steps: | |
1. Click this link to authorize the application: | |
{auth_url} | |
2. Sign in with your Google account | |
3. Click 'Allow' to grant access | |
4. Copy the authorization code shown | |
5. Paste the code here and click 'Connect and Fetch Emails' again""" | |
except Exception as e: | |
return f"Error generating authorization URL: {str(e)}" | |
try: | |
# Process the auth code and fetch emails | |
flow = InstalledAppFlow.from_client_config( | |
CLIENT_CONFIG, | |
SCOPES, | |
redirect_uri="https://accounts.google.com/o/oauth2/auth" # Same redirect URI here | |
) | |
service = get_gmail_service(state_dict) | |
if isinstance(service, str): | |
# This means we got an auth URL instead of a service | |
return service | |
# Search for recent emails | |
query = f'after:{int((datetime.now() - timedelta(days=int(days_back))).timestamp())}' | |
results = service.users().messages().list(userId='me', q=query, maxResults=100).execute() | |
messages = results.get('messages', []) | |
if not messages: | |
return "No emails found in the specified time range." | |
filtered_emails = [] | |
# Process emails with progress tracking | |
for i, message in enumerate(messages): | |
progress(i/len(messages), desc="Processing emails...") | |
email_data = get_email_content(service, message['id']) | |
if isinstance(email_data, str): # Error message | |
continue | |
classification = classify_email(email_data) | |
email_data.update(classification) | |
if ((include_job and classification['job_related']) or | |
(include_personal and classification['personal'])): | |
filtered_emails.append(email_data) | |
# Format output | |
output = f"Found {len(filtered_emails)} matching emails\n\n" | |
for email in filtered_emails: | |
output += f"π§ {email['subject']}\n" | |
output += f"From: {email['from']}\n" | |
output += f"Date: {email['date']}\n" | |
tags = [] | |
if email['job_related']: | |
tags.append("π― Job Related") | |
if email['personal']: | |
tags.append("π€ Personal") | |
if email['has_cv']: | |
tags.append("π Has CV/Resume") | |
output += f"Tags: {', '.join(tags)}\n" | |
if email['attachments']: | |
output += "Attachments:\n" | |
for att in email['attachments']: | |
output += f"- {att}\n" | |
output += "\nContent Preview:\n" | |
preview = email['content'][:500] + "..." if len(email['content']) > 500 else email['content'] | |
output += f"{preview}\n" | |
output += "-" * 80 + "\n\n" | |
return output | |
except Exception as e: | |
return f"Error: {str(e)}" | |
def create_interface(): | |
with gr.Blocks(title="Email Filter") as demo: | |
gr.Markdown("# π§ Smart Email Filter") | |
gr.Markdown("Connect to your Gmail account to filter important emails") | |
auth_code = gr.Textbox( | |
label="Authorization Code (if required)", | |
placeholder="Enter the authorization code here after visiting the auth URL" | |
) | |
with gr.Row(): | |
days_back = gr.Slider( | |
minimum=1, | |
maximum=30, | |
value=7, | |
step=1, | |
label="Days to look back" | |
) | |
include_job = gr.Checkbox( | |
value=True, | |
label="Include Job Related Emails" | |
) | |
include_personal = gr.Checkbox( | |
value=True, | |
label="Include Personal Emails" | |
) | |
fetch_button = gr.Button("Connect and Fetch Emails") | |
output = gr.Textbox( | |
label="Results", | |
lines=20, | |
show_copy_button=True | |
) | |
fetch_button.click( | |
fn=fetch_emails, | |
inputs=[days_back, auth_code, include_job, include_personal], | |
outputs=output | |
) | |
return demo | |
demo = create_interface() | |
demo.launch() |