from abc import ABC |
import json |
import smtplib |
import logging |
from email.mime.text import MIMEText |
from email.mime.multipart import MIMEMultipart |
from email.header import Header |
from email.utils import formataddr |
from agent.component.base import ComponentBase, ComponentParamBase |
class EmailParam(ComponentParamBase): |
""" |
Define the Email component parameters. |
""" |
def __init__(self): |
super().__init__() |
self.smtp_server = "" |
self.smtp_port = 465 |
self.email = "" |
self.password = "" |
self.sender_name = "" |
def check(self): |
self.check_empty(self.smtp_server, "SMTP Server") |
self.check_empty(self.email, "Email") |
self.check_empty(self.password, "Password") |
self.check_empty(self.sender_name, "Sender Name") |
class Email(ComponentBase, ABC): |
component_name = "Email" |
def _run(self, history, **kwargs): |
ans = self.get_input() |
content = "".join(ans["content"]) if "content" in ans else "" |
if not content: |
return Email.be_output("No content to send") |
success = False |
try: |
email_data = json.loads(content) |
if "to_email" not in email_data: |
return Email.be_output("Missing required field: to_email") |
msg = MIMEMultipart('alternative') |
msg['From'] = formataddr((str(Header(self._param.sender_name,'utf-8')), self._param.email)) |
msg['To'] = email_data["to_email"] |
if "cc_email" in email_data and email_data["cc_email"]: |
msg['Cc'] = email_data["cc_email"] |
msg['Subject'] = Header(email_data.get("subject", "No Subject"), 'utf-8').encode() |
email_content = email_data.get("content", "No content provided") |
msg.attach(MIMEText(email_content, 'html', 'utf-8')) |
logging.info(f"Connecting to SMTP server {self._param.smtp_server}:{self._param.smtp_port}") |
context = smtplib.ssl.create_default_context() |
with smtplib.SMTP_SSL(self._param.smtp_server, self._param.smtp_port, context=context) as server: |
logging.info(f"Attempting to login with email: {self._param.email}") |
server.login(self._param.email, self._param.password) |
recipients = [email_data["to_email"]] |
if "cc_email" in email_data and email_data["cc_email"]: |
recipients.extend(email_data["cc_email"].split(',')) |
logging.info(f"Sending email to recipients: {recipients}") |
try: |
server.send_message(msg, self._param.email, recipients) |
success = True |
except Exception as e: |
logging.error(f"Error during send_message: {str(e)}") |
server.sendmail(self._param.email, recipients, msg.as_string()) |
success = True |
try: |
server.quit() |
except Exception as e: |
logging.warning(f"Non-fatal error during connection close: {str(e)}") |
if success: |
return Email.be_output("Email sent successfully") |
except json.JSONDecodeError: |
error_msg = "Invalid JSON format in input" |
logging.error(error_msg) |
return Email.be_output(error_msg) |
except smtplib.SMTPAuthenticationError: |
error_msg = "SMTP Authentication failed. Please check your email and authorization code." |
logging.error(error_msg) |
return Email.be_output(f"Failed to send email: {error_msg}") |
except smtplib.SMTPConnectError: |
error_msg = f"Failed to connect to SMTP server {self._param.smtp_server}:{self._param.smtp_port}" |
logging.error(error_msg) |
return Email.be_output(f"Failed to send email: {error_msg}") |
except smtplib.SMTPException as e: |
error_msg = f"SMTP error occurred: {str(e)}" |
logging.error(error_msg) |
return Email.be_output(f"Failed to send email: {error_msg}") |
except Exception as e: |
error_msg = f"Unexpected error: {str(e)}" |
logging.error(error_msg) |
return Email.be_output(f"Failed to send email: {error_msg}") |