Spaces:
Running
Running
import os | |
from dotenv import load_dotenv | |
import os.path | |
from google.auth.transport.requests import Request | |
from google.oauth2.credentials import Credentials | |
from google_auth_oauthlib.flow import InstalledAppFlow | |
from googleapiclient.discovery import build | |
from googleapiclient.errors import HttpError | |
import base64 | |
from email.message import EmailMessage | |
from bs4 import BeautifulSoup | |
import webbrowser | |
import datetime | |
SCOPES = ["https://www.googleapis.com/auth/gmail.compose", | |
"https://www.googleapis.com/auth/gmail.modify", | |
"https://mail.google.com/", | |
"https://www.googleapis.com/auth/calendar.readonly", | |
"https://www.googleapis.com/auth/documents.readonly", | |
"https://www.googleapis.com/auth/forms.body", | |
"https://www.googleapis.com/auth/spreadsheets.readonly"] | |
#---------------------------------------------------------- LETTURA EMAIL --------------------------------------------------------- | |
def converti_email_txt(body): | |
try: | |
soup = BeautifulSoup(body, 'html.parser') | |
body_content = soup.find('body').get_text() if soup.find('body') else body | |
body = body_content | |
except: | |
body = body | |
return body | |
def leggi_gmail(max_results=10): | |
creds=connetti_google() | |
links = [] | |
service = build("gmail", "v1", credentials=creds) | |
results = service.users().messages().list(userId="me", labelIds=["INBOX"], q="is:unread", maxResults=max_results).execute() | |
messages = results.get("messages", []) | |
testo_email = '' | |
if not messages: | |
print("You have no New Messages.") | |
else: | |
message_count = 0 | |
for message in messages: | |
msg = service.users().messages().get(userId="me", id=message["id"]).execute() | |
message_count = message_count + 1 | |
email_data = msg["payload"]["headers"] | |
for values in email_data: | |
name = values["name"] | |
if name == "From": | |
from_name = values["value"] | |
subject = [j["value"] for j in email_data if j["name"] == "Subject"] | |
elif name == "Date": | |
received_date = values["value"] | |
body = "" | |
if "parts" in msg["payload"]: | |
for part in msg["payload"]["parts"]: | |
if part["mimeType"] == "text/plain": | |
body = base64.urlsafe_b64decode(part["body"]["data"]).decode("utf-8") | |
body = converti_email_txt(body) | |
break | |
elif part["mimeType"] == "multipart/alternative": | |
for subpart in part["parts"]: | |
if subpart["mimeType"] == "text/plain": | |
body = base64.urlsafe_b64decode(subpart["body"]["data"]).decode("utf-8") | |
body = converti_email_txt(body) | |
break | |
else: | |
body = base64.urlsafe_b64decode(msg["payload"]["body"]["data"]).decode("utf-8") | |
body = converti_email_txt(body) | |
testo_email += 'Data Ricezione: ' + received_date[5:] + '\nMittente: ' + from_name + '\nOggetto: ' + ', '.join(subject) + '\nTesto Email: ' + body + '\n---------------------------------------------------\n' | |
links.append(('Mittente: ' + from_name, 'Data: ' + received_date[5:] + '\n\nOggetto: ' + ', '.join(subject))) | |
return testo_email, links | |
#---------------------------------------------------------- SCRITTURA BOZZA EMAIL --------------------------------------------------------- | |
def scrivi_bozza_gmail(testo): | |
draft_url = '' | |
creds=connetti_google() | |
try: | |
service = build("gmail", "v1", credentials=creds) | |
message = EmailMessage() | |
message.set_content(testo) | |
message["To"] = "gduser1@workspacesamples.dev" | |
message["From"] = "gduser2@workspacesamples.dev" | |
message["Subject"] = "Automated draft" | |
encoded_message = base64.urlsafe_b64encode(message.as_bytes()).decode() | |
create_message = {"message": {"raw": encoded_message}} | |
draft = (service.users().drafts().create(userId="me", body=create_message).execute()) | |
print(f'Draft id: {draft["id"]}\nDraft message: {draft["message"]}') | |
draft_id = draft["id"] | |
draft_details = service.users().drafts().get(userId="me", id=draft_id).execute() | |
print(draft_details) | |
except HttpError as error: | |
print(f"An error occurred: {error}") | |
return draft_url | |
#---------------------------------------------------------- LEGGI GOOGLE CALENDAR --------------------------------------------------------- | |
def leggi_calendario_google(max_results=10): | |
creds=connetti_google() | |
try: | |
service = build("calendar", "v3", credentials=creds) | |
calendar_list_result = service.calendarList().list().execute() | |
calendars = calendar_list_result.get('items', []) | |
descrizione_eventi = '' | |
links = [] | |
for calendar in calendars: | |
events_result = (service.events().list(calendarId=calendar['id'], timeMin=datetime.datetime.now().isoformat() + 'Z', maxResults=max_results, singleEvents=True, orderBy="startTime", ).execute()) | |
events = events_result.get("items", []) | |
for event in events: | |
start = event["start"].get("dateTime", event["start"].get("date")) | |
end = event["end"].get("dateTime", event["end"].get("date")) | |
descrizione = '' | |
calendario = '' | |
if 'description' in event: | |
descrizione = event['description'].replace('\n', '. ') | |
if 'displayName' in event['organizer']: | |
calendario = event['organizer']['displayName'] | |
else: | |
calendario = 'Principale' | |
descrizione_eventi += f'Calendario: {calendario} --- ' | |
descrizione_eventi += f'Data Inizio: {start.replace("T", " ").replace("Z", " ")} - Data Fine: {end.replace("T", " ").replace("Z", " ")}: ' | |
descrizione_link = f'Data: {start.replace("T", " ").replace("Z", " ")} \n\n Titolo: {event["summary"]}' | |
if descrizione != '': | |
descrizione_eventi += f'{event["summary"]} ({descrizione})' | |
descrizione_link += f'\n\nDescrizione: {event["summary"]} ({descrizione})' | |
else: | |
descrizione_eventi += f'{event["summary"]}' | |
descrizione_eventi += '\n' | |
links.append((f'Calendario: {calendario}', descrizione_link)) | |
except HttpError as error: | |
print(f"An error occurred: {error}") | |
return descrizione_eventi, links | |
#---------------------------------------------------------- CONNESSIONE ACCOUNT GOOGLE --------------------------------------------------------- | |
def connetti_google(): | |
load_dotenv() | |
json_variable_str = os.getenv("JSON_GOOGLE") | |
with open("credentials.json", "w") as f: | |
f.write(json_variable_str) | |
creds = None | |
if os.path.exists("token.json"): | |
creds = Credentials.from_authorized_user_file("token.json", SCOPES) | |
if not creds or not creds.valid: | |
if creds and creds.expired and creds.refresh_token: | |
creds.refresh(Request()) | |
else: | |
flow = InstalledAppFlow.from_client_secrets_file("credentials.json", SCOPES) | |
creds = flow.run_local_server(port=0) | |
with open("token.json", "w") as token: | |
token.write(creds.to_json()) | |
return creds |