Skip to content

Commit

Permalink
Update send_email.py
Browse files Browse the repository at this point in the history
  • Loading branch information
CAVIND46016 authored May 30, 2023
1 parent 12b84d9 commit 0c56378
Showing 1 changed file with 151 additions and 78 deletions.
229 changes: 151 additions & 78 deletions common/send_email.py
Original file line number Diff line number Diff line change
@@ -1,124 +1,197 @@
"""
Creates a SMTP connection and sends an email.
Creates an SMTP connection and sends an email.
"""

import os
import ast
import logging
from email.header import Header
from email.utils import formataddr
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
from email import encoders
import os

import smtplib
from smtplib import SMTPNotSupportedError, SMTPAuthenticationError, \
SMTPServerDisconnected, SMTPSenderRefused, SMTPConnectError
from validate_email import validate_email

logger = logging.getLogger(__name__)

DEFAULT_EMAIL_CREDENTIALS = ast.literal_eval(
os.environ.get("DEFAULT_EMAIL_CREDENTIALS")
)


class Email:
"""
Contains the logic for sending an email.
"""
def __init__(self, **kwargs):
"""
Args:
**kwargs: Keyword arguments with host, port,
user, password and use_tls as keys.
"""
self.__dict__.update(kwargs)

def _login(self):
def __init__(
self,
credentials=DEFAULT_EMAIL_CREDENTIALS
):
self.credentials = credentials
self.smtp_conn = self.connect()
self.msg = MIMEMultipart()

@property
def credentials(self):
return self._credentials

@credentials.setter
def credentials(self, codes):
if not codes:
raise ValueError("'Credentials' cannot be empty")

self._credentials = codes

def connect(self):
"""
Sets up an SMTP connection object using the connection parameters.
Establishes an SMTP connection with the email server.
"""

con = None
use_tls = self.credentials.get("use_tls", True)
try:
if self.use_tls:
smtp_conn = smtplib.SMTP(self.host, self.port)
smtp_conn.starttls()
else:
smtp_conn = smtplib.SMTP_SSL(self.host, self.port)

smtp_conn.login(self.user, self.password)
except smtplib.SMTPNotSupportedError:
raise Exception(f"SMTP AUTH extension not supported by server.")
except smtplib.SMTPAuthenticationError:
raise Exception(f"The server didn't accept the username/password "
f"combination provided OR account does not provide "
f"access to less secure apps.")
except smtplib.SMTPServerDisconnected:
raise Exception(f"Connection closed unexpectedly.")
except:
raise Exception("Something went wrong.")

return smtp_conn
con_method = smtplib.SMTP if use_tls else smtplib.SMTP_SSL
con = con_method(
self.credentials.get("host"),
self.credentials.get("port"),
)

con.login(
self.credentials.get("user"),
self.credentials.get("password")
)
except (
SMTPNotSupportedError,
SMTPAuthenticationError,
SMTPServerDisconnected,
SMTPConnectError
) as ex:
logger.error(f"<%s>: %s", type(ex).__name__, ex, exc_info=True)

return con

@staticmethod
def _parse_attachments(attachments):
def parse_attachments(attachments):
"""
Args:
attachments <list/tuple>: a list of full file paths of respective attachments.
Parse and prepare attachments for the email message.
:param attachments: List of file paths for attachments.
:return: List of MIMEBase attachment parts.
"""

if not isinstance(attachments, (list, tuple)):
raise ValueError("Attachments are provided in an invalid format.")

parts = []
for path in attachments:
part = MIMEBase('application', "octet-stream")
with open(path, 'rb') as file:
part = MIMEBase("application", "octet-stream")
with open(path, "rb") as file:
part.set_payload(file.read())
encoders.encode_base64(part)
part.add_header('Content-Disposition', f'attachment; filename="{os.path.basename(path)}"')
part.add_header(
"Content-Disposition",
f'attachment; filename="{os.path.basename(path)}"'
)
parts.append(part)

return parts

@staticmethod
def _check_mail_address(emails):
def init_msg(
self,
subject,
body,
attachments=None
):
"""
mail: list of email addresses. <list>
Initialize the email message.
:param subject: The subject of the email.
:param body: The body/content of the email.
:param attachments: List of file paths for attachments (optional).
:return: None
"""
if not isinstance(emails, (list, tuple)):
return False

return all([validate_email(email) for email in emails])
if not attachments:
attachments = []
self.msg["Subject"] = subject
self.msg.attach(MIMEText(body, "plain"))

def send(self, to_address=None, from_name=None, cc_address=None,
bcc_address=None, subject='', body='', attachments=None):
"""
Args:
to_address <list/tuple>: 'To' email address
from_name <str>: From name to be displayed. Default value is user email address
cc_address <list/tuple>: 'cc' email address
bcc_address <list/tuple>: 'bcc' email address
subject <str>: Email subject
body <str>: Email body
attachments <list/tuple>: A list of full file paths of respective attachments
attachment_parts = Email.parse_attachments(attachments)
for part in attachment_parts:
self.msg.attach(part)

def add_recipients(
self,
from_name,
to_address,
cc_address,
bcc_address
):
"""
if not to_address and not cc_address and not bcc_address:
raise ValueError("No recipients provided.")
Add recipients to the email message.
if (to_address and not self._check_mail_address(to_address)) or \
(cc_address and not self._check_mail_address(cc_address)) or \
(bcc_address and not self._check_mail_address(bcc_address)):
raise ValueError("Email address is not in required format and/or is invalid.")
:param from_name: The name of the sender.
:param to_address: List of email addresses for the primary recipients.
:param cc_address: List of email addresses for the carbon copy recipients.
:param bcc_address: List of email addresses for the blind carbon copy recipients.
"""

msg = MIMEMultipart()
msg['From'] = formataddr((str(Header(from_name, 'utf-8')), self.user)) if from_name else self.user
def validate(emails_):
return emails_ and isinstance(
emails_, (list, tuple)
) and all(validate_email(k) for k in emails_)

msg['To'] = ', '.join(to_address) if to_address else None
msg['Cc'] = ', '.join(cc_address) if cc_address else None
msg['Bcc'] = ', '.join(bcc_address) if bcc_address else None
if not validate(to_address) or not validate(cc_address) or not validate(bcc_address):
raise ValueError("Email address is not in required format and/or is invalid.")

msg['Subject'] = subject
msg.attach(MIMEText(body, 'plain'))
self.msg["From"] = formataddr(
(str(Header(from_name, "utf-8")),
self.credentials.get("user"))) if from_name else self.credentials.get("user")

self.msg["To"] = ", ".join(to_address) if to_address else None
self.msg["Cc"] = ", ".join(cc_address) if cc_address else None
self.msg["Bcc"] = ", ".join(bcc_address) if bcc_address else None

def send(
self,
to_address=None,
from_name=None,
cc_address=None,
bcc_address=None,
subject="",
body="",
attachments=None
):
"""
Send an email.
:param to_address: List of email addresses for the primary recipients.
:param from_name: The name of the sender.
:param cc_address: List of email addresses for the carbon copy recipients.
:param bcc_address: List of email addresses for the blind carbon copy recipients.
:param subject: The subject of the email.
:param body: The body/content of the email.
:param attachments: List of file paths for attachments (optional).
"""

attachment_parts = self._parse_attachments(attachments) if attachments else []
for part in attachment_parts:
msg.attach(part)
self.init_msg(
subject,
body,
attachments
)
self.add_recipients(
from_name,
to_address,
cc_address,
bcc_address
)

smtp_conn = self._login()
try:
smtp_conn.send_message(msg)
except smtplib.SMTPSenderRefused:
raise Exception("Your message exceeded allowed message size limits.\n"
"Message sending failed. Connection not found.")
self.smtp_conn.send_message(self.msg)
except SMTPSenderRefused as ex:
logger.error(f"<%s>: %s", type(ex).__name__, ex, exc_info=True)
finally:
smtp_conn.quit()
self.smtp_conn.quit()

0 comments on commit 0c56378

Please sign in to comment.