|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from abc import ABC |
|
import json |
|
import smtplib |
|
import logging |
|
from email.mime.text import MIMEText |
|
from email.mime.multipart import MIMEMultipart |
|
from email.header import Header |
|
from email.utils import formataddr |
|
from agent.component.base import ComponentBase, ComponentParamBase |
|
|
|
class EmailParam(ComponentParamBase): |
|
""" |
|
Define the Email component parameters. |
|
""" |
|
def __init__(self): |
|
super().__init__() |
|
|
|
self.smtp_server = "" |
|
self.smtp_port = 465 |
|
self.email = "" |
|
self.password = "" |
|
self.sender_name = "" |
|
|
|
def check(self): |
|
|
|
self.check_empty(self.smtp_server, "SMTP Server") |
|
self.check_empty(self.email, "Email") |
|
self.check_empty(self.password, "Password") |
|
self.check_empty(self.sender_name, "Sender Name") |
|
|
|
class Email(ComponentBase, ABC): |
|
component_name = "Email" |
|
|
|
def _run(self, history, **kwargs): |
|
|
|
ans = self.get_input() |
|
content = "".join(ans["content"]) if "content" in ans else "" |
|
if not content: |
|
return Email.be_output("No content to send") |
|
|
|
success = False |
|
try: |
|
|
|
email_data = json.loads(content) |
|
|
|
|
|
if "to_email" not in email_data: |
|
return Email.be_output("Missing required field: to_email") |
|
|
|
|
|
msg = MIMEMultipart('alternative') |
|
|
|
|
|
msg['From'] = formataddr((str(Header(self._param.sender_name,'utf-8')), self._param.email)) |
|
msg['To'] = email_data["to_email"] |
|
if "cc_email" in email_data and email_data["cc_email"]: |
|
msg['Cc'] = email_data["cc_email"] |
|
msg['Subject'] = Header(email_data.get("subject", "No Subject"), 'utf-8').encode() |
|
|
|
|
|
email_content = email_data.get("content", "No content provided") |
|
|
|
msg.attach(MIMEText(email_content, 'html', 'utf-8')) |
|
|
|
|
|
logging.info(f"Connecting to SMTP server {self._param.smtp_server}:{self._param.smtp_port}") |
|
|
|
context = smtplib.ssl.create_default_context() |
|
with smtplib.SMTP_SSL(self._param.smtp_server, self._param.smtp_port, context=context) as server: |
|
|
|
logging.info(f"Attempting to login with email: {self._param.email}") |
|
server.login(self._param.email, self._param.password) |
|
|
|
|
|
recipients = [email_data["to_email"]] |
|
if "cc_email" in email_data and email_data["cc_email"]: |
|
recipients.extend(email_data["cc_email"].split(',')) |
|
|
|
|
|
logging.info(f"Sending email to recipients: {recipients}") |
|
try: |
|
server.send_message(msg, self._param.email, recipients) |
|
success = True |
|
except Exception as e: |
|
logging.error(f"Error during send_message: {str(e)}") |
|
|
|
server.sendmail(self._param.email, recipients, msg.as_string()) |
|
success = True |
|
|
|
try: |
|
server.quit() |
|
except Exception as e: |
|
|
|
logging.warning(f"Non-fatal error during connection close: {str(e)}") |
|
|
|
if success: |
|
return Email.be_output("Email sent successfully") |
|
|
|
except json.JSONDecodeError: |
|
error_msg = "Invalid JSON format in input" |
|
logging.error(error_msg) |
|
return Email.be_output(error_msg) |
|
|
|
except smtplib.SMTPAuthenticationError: |
|
error_msg = "SMTP Authentication failed. Please check your email and authorization code." |
|
logging.error(error_msg) |
|
return Email.be_output(f"Failed to send email: {error_msg}") |
|
|
|
except smtplib.SMTPConnectError: |
|
error_msg = f"Failed to connect to SMTP server {self._param.smtp_server}:{self._param.smtp_port}" |
|
logging.error(error_msg) |
|
return Email.be_output(f"Failed to send email: {error_msg}") |
|
|
|
except smtplib.SMTPException as e: |
|
error_msg = f"SMTP error occurred: {str(e)}" |
|
logging.error(error_msg) |
|
return Email.be_output(f"Failed to send email: {error_msg}") |
|
|
|
except Exception as e: |
|
error_msg = f"Unexpected error: {str(e)}" |
|
logging.error(error_msg) |
|
return Email.be_output(f"Failed to send email: {error_msg}") |